lock
This commit is contained in:
5
.github/workflows/ci.yml
vendored
5
.github/workflows/ci.yml
vendored
@@ -430,8 +430,9 @@ jobs:
|
||||
. venv/bin/activate
|
||||
mkdir -p build_cache
|
||||
export PLATFORMIO_BUILD_CACHE_DIR=$PWD/build_cache
|
||||
# Use 2 parallel jobs for compilation (resource intensive)
|
||||
./script/test_build_components -e compile -c ${{ matrix.file }} -j 2 -f -b $PWD/build_cache
|
||||
# Use sequential compilation to avoid race conditions
|
||||
# PlatformIO handles its own parallelization internally
|
||||
./script/test_build_components -e compile -c ${{ matrix.file }} -j 1 -f -b $PWD/build_cache
|
||||
|
||||
test-build-components-splitter:
|
||||
name: Split components for testing into 20 groups maximum
|
||||
|
||||
101
esphome/git.py
101
esphome/git.py
@@ -10,6 +10,7 @@ import urllib.parse
|
||||
|
||||
import esphome.config_validation as cv
|
||||
from esphome.core import CORE, TimePeriodSeconds
|
||||
from esphome.git_lock import git_operation_lock
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -59,66 +60,72 @@ def clone_or_update(
|
||||
)
|
||||
|
||||
repo_dir = _compute_destination_path(key, domain)
|
||||
if not repo_dir.is_dir():
|
||||
_LOGGER.info("Cloning %s", key)
|
||||
_LOGGER.debug("Location: %s", repo_dir)
|
||||
cmd = ["git", "clone", "--depth=1"]
|
||||
cmd += ["--", url, str(repo_dir)]
|
||||
run_git_command(cmd)
|
||||
|
||||
if ref is not None:
|
||||
# We need to fetch the PR branch first, otherwise git will complain
|
||||
# about missing objects
|
||||
_LOGGER.info("Fetching %s", ref)
|
||||
run_git_command(["git", "fetch", "--", "origin", ref], str(repo_dir))
|
||||
run_git_command(["git", "reset", "--hard", "FETCH_HEAD"], str(repo_dir))
|
||||
|
||||
if submodules is not None:
|
||||
_LOGGER.info(
|
||||
"Initialising submodules (%s) for %s", ", ".join(submodules), key
|
||||
)
|
||||
run_git_command(
|
||||
["git", "submodule", "update", "--init"] + submodules, str(repo_dir)
|
||||
)
|
||||
|
||||
else:
|
||||
# Check refresh needed
|
||||
file_timestamp = Path(repo_dir / ".git" / "FETCH_HEAD")
|
||||
# On first clone, FETCH_HEAD does not exists
|
||||
if not file_timestamp.exists():
|
||||
file_timestamp = Path(repo_dir / ".git" / "HEAD")
|
||||
age = datetime.now() - datetime.fromtimestamp(file_timestamp.stat().st_mtime)
|
||||
if refresh is None or age.total_seconds() > refresh.total_seconds:
|
||||
old_sha = run_git_command(["git", "rev-parse", "HEAD"], str(repo_dir))
|
||||
_LOGGER.info("Updating %s", key)
|
||||
# Use lock to prevent concurrent access to the same repository
|
||||
with git_operation_lock(key):
|
||||
if not repo_dir.is_dir():
|
||||
_LOGGER.info("Cloning %s", key)
|
||||
_LOGGER.debug("Location: %s", repo_dir)
|
||||
# Stash local changes (if any)
|
||||
run_git_command(
|
||||
["git", "stash", "push", "--include-untracked"], str(repo_dir)
|
||||
)
|
||||
# Fetch remote ref
|
||||
cmd = ["git", "fetch", "--", "origin"]
|
||||
cmd = ["git", "clone", "--depth=1"]
|
||||
cmd += ["--", url, str(repo_dir)]
|
||||
run_git_command(cmd)
|
||||
|
||||
if ref is not None:
|
||||
cmd.append(ref)
|
||||
run_git_command(cmd, str(repo_dir))
|
||||
# Hard reset to FETCH_HEAD (short-lived git ref corresponding to most recent fetch)
|
||||
run_git_command(["git", "reset", "--hard", "FETCH_HEAD"], str(repo_dir))
|
||||
# We need to fetch the PR branch first, otherwise git will complain
|
||||
# about missing objects
|
||||
_LOGGER.info("Fetching %s", ref)
|
||||
run_git_command(["git", "fetch", "--", "origin", ref], str(repo_dir))
|
||||
run_git_command(["git", "reset", "--hard", "FETCH_HEAD"], str(repo_dir))
|
||||
|
||||
if submodules is not None:
|
||||
_LOGGER.info(
|
||||
"Updating submodules (%s) for %s", ", ".join(submodules), key
|
||||
"Initialising submodules (%s) for %s", ", ".join(submodules), key
|
||||
)
|
||||
run_git_command(
|
||||
["git", "submodule", "update", "--init"] + submodules, str(repo_dir)
|
||||
)
|
||||
|
||||
def revert():
|
||||
_LOGGER.info("Reverting changes to %s -> %s", key, old_sha)
|
||||
run_git_command(["git", "reset", "--hard", old_sha], str(repo_dir))
|
||||
else:
|
||||
# Check refresh needed
|
||||
file_timestamp = Path(repo_dir / ".git" / "FETCH_HEAD")
|
||||
# On first clone, FETCH_HEAD does not exists
|
||||
if not file_timestamp.exists():
|
||||
file_timestamp = Path(repo_dir / ".git" / "HEAD")
|
||||
age = datetime.now() - datetime.fromtimestamp(
|
||||
file_timestamp.stat().st_mtime
|
||||
)
|
||||
if refresh is None or age.total_seconds() > refresh.total_seconds:
|
||||
old_sha = run_git_command(["git", "rev-parse", "HEAD"], str(repo_dir))
|
||||
_LOGGER.info("Updating %s", key)
|
||||
_LOGGER.debug("Location: %s", repo_dir)
|
||||
# Stash local changes (if any)
|
||||
run_git_command(
|
||||
["git", "stash", "push", "--include-untracked"], str(repo_dir)
|
||||
)
|
||||
# Fetch remote ref
|
||||
cmd = ["git", "fetch", "--", "origin"]
|
||||
if ref is not None:
|
||||
cmd.append(ref)
|
||||
run_git_command(cmd, str(repo_dir))
|
||||
# Hard reset to FETCH_HEAD (short-lived git ref corresponding to most recent fetch)
|
||||
run_git_command(["git", "reset", "--hard", "FETCH_HEAD"], str(repo_dir))
|
||||
|
||||
return repo_dir, revert
|
||||
if submodules is not None:
|
||||
_LOGGER.info(
|
||||
"Updating submodules (%s) for %s", ", ".join(submodules), key
|
||||
)
|
||||
run_git_command(
|
||||
["git", "submodule", "update", "--init"] + submodules,
|
||||
str(repo_dir),
|
||||
)
|
||||
|
||||
return repo_dir, None
|
||||
def revert():
|
||||
_LOGGER.info("Reverting changes to %s -> %s", key, old_sha)
|
||||
run_git_command(["git", "reset", "--hard", old_sha], str(repo_dir))
|
||||
|
||||
return repo_dir, revert
|
||||
|
||||
return repo_dir, None
|
||||
|
||||
|
||||
GIT_DOMAINS = {
|
||||
|
||||
119
esphome/git_lock.py
Normal file
119
esphome/git_lock.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""File locking for git operations to prevent race conditions."""
|
||||
|
||||
from contextlib import contextmanager
|
||||
import hashlib
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
# Platform-specific imports
|
||||
if sys.platform == "win32":
|
||||
import msvcrt
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Global lock directory
|
||||
LOCK_DIR = Path(tempfile.gettempdir()) / "esphome_git_locks"
|
||||
LOCK_DIR.mkdir(exist_ok=True)
|
||||
|
||||
|
||||
def _acquire_lock_unix(lock_file, timeout, identifier):
|
||||
"""Acquire lock on Unix systems using fcntl."""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
return True
|
||||
except OSError:
|
||||
if time.time() - start_time > timeout:
|
||||
raise TimeoutError(
|
||||
f"Could not acquire lock for {identifier} within {timeout}s"
|
||||
)
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def _release_lock_unix(lock_file):
|
||||
"""Release lock on Unix systems."""
|
||||
try:
|
||||
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _acquire_lock_windows(lock_file, timeout, identifier):
|
||||
"""Acquire lock on Windows systems using msvcrt."""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
msvcrt.locking(lock_file.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
return True
|
||||
except OSError:
|
||||
if time.time() - start_time > timeout:
|
||||
raise TimeoutError(
|
||||
f"Could not acquire lock for {identifier} within {timeout}s"
|
||||
)
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def _release_lock_windows(lock_file):
|
||||
"""Release lock on Windows systems."""
|
||||
try:
|
||||
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@contextmanager
|
||||
def git_operation_lock(identifier: str, timeout: float = 30.0):
|
||||
"""
|
||||
Acquire a file lock for a git operation.
|
||||
|
||||
:param identifier: Unique identifier for the operation (e.g., repo URL or path)
|
||||
:param timeout: Maximum time to wait for the lock in seconds
|
||||
"""
|
||||
# Create a safe filename from the identifier
|
||||
lock_name = hashlib.sha256(identifier.encode()).hexdigest()[:16]
|
||||
lock_path = LOCK_DIR / f"{lock_name}.lock"
|
||||
|
||||
# Ensure lock file exists
|
||||
lock_path.touch(exist_ok=True)
|
||||
|
||||
lock_file = None
|
||||
acquired = False
|
||||
|
||||
try:
|
||||
# Open in binary mode for Windows compatibility
|
||||
lock_file = open(lock_path, "r+b")
|
||||
|
||||
# Platform-specific lock acquisition
|
||||
if sys.platform == "win32":
|
||||
acquired = _acquire_lock_windows(lock_file, timeout, identifier)
|
||||
else:
|
||||
acquired = _acquire_lock_unix(lock_file, timeout, identifier)
|
||||
|
||||
if acquired:
|
||||
_LOGGER.debug(f"Acquired lock for {identifier}")
|
||||
|
||||
yield
|
||||
|
||||
finally:
|
||||
if lock_file:
|
||||
if acquired:
|
||||
# Platform-specific lock release
|
||||
if sys.platform == "win32":
|
||||
_release_lock_windows(lock_file)
|
||||
else:
|
||||
_release_lock_unix(lock_file)
|
||||
_LOGGER.debug(f"Released lock for {identifier}")
|
||||
lock_file.close()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def platformio_init_lock(timeout: float = 30.0):
|
||||
"""Lock for PlatformIO initialization to prevent race conditions."""
|
||||
with git_operation_lock("platformio_init", timeout=timeout):
|
||||
yield
|
||||
@@ -86,7 +86,11 @@ def run_platformio_cli(*args, **kwargs) -> str | int:
|
||||
if os.environ.get("ESPHOME_USE_SUBPROCESS") is not None:
|
||||
return run_external_process(*cmd, **kwargs)
|
||||
|
||||
import platformio.__main__
|
||||
# Import platformio with lock to prevent race conditions during initialization
|
||||
from esphome.git_lock import platformio_init_lock
|
||||
|
||||
with platformio_init_lock():
|
||||
import platformio.__main__
|
||||
|
||||
patch_structhash()
|
||||
return run_external_command(platformio.__main__.main, *cmd, **kwargs)
|
||||
|
||||
Reference in New Issue
Block a user