From b887c1bf08dd89e31c642f4e869ab9c8dd7a0ce2 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 12 Jun 2025 21:58:52 -0500 Subject: [PATCH] lock --- .github/workflows/ci.yml | 5 +- esphome/git.py | 101 +++++++++++++++++--------------- esphome/git_lock.py | 119 ++++++++++++++++++++++++++++++++++++++ esphome/platformio_api.py | 6 +- 4 files changed, 181 insertions(+), 50 deletions(-) create mode 100644 esphome/git_lock.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ffeb49ee55..8266c0e403 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -430,8 +430,9 @@ jobs: . venv/bin/activate mkdir -p build_cache export PLATFORMIO_BUILD_CACHE_DIR=$PWD/build_cache - # Use 2 parallel jobs for compilation (resource intensive) - ./script/test_build_components -e compile -c ${{ matrix.file }} -j 2 -f -b $PWD/build_cache + # Use sequential compilation to avoid race conditions + # PlatformIO handles its own parallelization internally + ./script/test_build_components -e compile -c ${{ matrix.file }} -j 1 -f -b $PWD/build_cache test-build-components-splitter: name: Split components for testing into 20 groups maximum diff --git a/esphome/git.py b/esphome/git.py index 005bcae702..a4cee9f017 100644 --- a/esphome/git.py +++ b/esphome/git.py @@ -10,6 +10,7 @@ import urllib.parse import esphome.config_validation as cv from esphome.core import CORE, TimePeriodSeconds +from esphome.git_lock import git_operation_lock _LOGGER = logging.getLogger(__name__) @@ -59,66 +60,72 @@ def clone_or_update( ) repo_dir = _compute_destination_path(key, domain) - if not repo_dir.is_dir(): - _LOGGER.info("Cloning %s", key) - _LOGGER.debug("Location: %s", repo_dir) - cmd = ["git", "clone", "--depth=1"] - cmd += ["--", url, str(repo_dir)] - run_git_command(cmd) - if ref is not None: - # We need to fetch the PR branch first, otherwise git will complain - # about missing objects - _LOGGER.info("Fetching %s", ref) - run_git_command(["git", "fetch", "--", "origin", ref], str(repo_dir)) - run_git_command(["git", "reset", "--hard", "FETCH_HEAD"], str(repo_dir)) - - if submodules is not None: - _LOGGER.info( - "Initialising submodules (%s) for %s", ", ".join(submodules), key - ) - run_git_command( - ["git", "submodule", "update", "--init"] + submodules, str(repo_dir) - ) - - else: - # Check refresh needed - file_timestamp = Path(repo_dir / ".git" / "FETCH_HEAD") - # On first clone, FETCH_HEAD does not exists - if not file_timestamp.exists(): - file_timestamp = Path(repo_dir / ".git" / "HEAD") - age = datetime.now() - datetime.fromtimestamp(file_timestamp.stat().st_mtime) - if refresh is None or age.total_seconds() > refresh.total_seconds: - old_sha = run_git_command(["git", "rev-parse", "HEAD"], str(repo_dir)) - _LOGGER.info("Updating %s", key) + # Use lock to prevent concurrent access to the same repository + with git_operation_lock(key): + if not repo_dir.is_dir(): + _LOGGER.info("Cloning %s", key) _LOGGER.debug("Location: %s", repo_dir) - # Stash local changes (if any) - run_git_command( - ["git", "stash", "push", "--include-untracked"], str(repo_dir) - ) - # Fetch remote ref - cmd = ["git", "fetch", "--", "origin"] + cmd = ["git", "clone", "--depth=1"] + cmd += ["--", url, str(repo_dir)] + run_git_command(cmd) + if ref is not None: - cmd.append(ref) - run_git_command(cmd, str(repo_dir)) - # Hard reset to FETCH_HEAD (short-lived git ref corresponding to most recent fetch) - run_git_command(["git", "reset", "--hard", "FETCH_HEAD"], str(repo_dir)) + # We need to fetch the PR branch first, otherwise git will complain + # about missing objects + _LOGGER.info("Fetching %s", ref) + run_git_command(["git", "fetch", "--", "origin", ref], str(repo_dir)) + run_git_command(["git", "reset", "--hard", "FETCH_HEAD"], str(repo_dir)) if submodules is not None: _LOGGER.info( - "Updating submodules (%s) for %s", ", ".join(submodules), key + "Initialising submodules (%s) for %s", ", ".join(submodules), key ) run_git_command( ["git", "submodule", "update", "--init"] + submodules, str(repo_dir) ) - def revert(): - _LOGGER.info("Reverting changes to %s -> %s", key, old_sha) - run_git_command(["git", "reset", "--hard", old_sha], str(repo_dir)) + else: + # Check refresh needed + file_timestamp = Path(repo_dir / ".git" / "FETCH_HEAD") + # On first clone, FETCH_HEAD does not exists + if not file_timestamp.exists(): + file_timestamp = Path(repo_dir / ".git" / "HEAD") + age = datetime.now() - datetime.fromtimestamp( + file_timestamp.stat().st_mtime + ) + if refresh is None or age.total_seconds() > refresh.total_seconds: + old_sha = run_git_command(["git", "rev-parse", "HEAD"], str(repo_dir)) + _LOGGER.info("Updating %s", key) + _LOGGER.debug("Location: %s", repo_dir) + # Stash local changes (if any) + run_git_command( + ["git", "stash", "push", "--include-untracked"], str(repo_dir) + ) + # Fetch remote ref + cmd = ["git", "fetch", "--", "origin"] + if ref is not None: + cmd.append(ref) + run_git_command(cmd, str(repo_dir)) + # Hard reset to FETCH_HEAD (short-lived git ref corresponding to most recent fetch) + run_git_command(["git", "reset", "--hard", "FETCH_HEAD"], str(repo_dir)) - return repo_dir, revert + if submodules is not None: + _LOGGER.info( + "Updating submodules (%s) for %s", ", ".join(submodules), key + ) + run_git_command( + ["git", "submodule", "update", "--init"] + submodules, + str(repo_dir), + ) - return repo_dir, None + def revert(): + _LOGGER.info("Reverting changes to %s -> %s", key, old_sha) + run_git_command(["git", "reset", "--hard", old_sha], str(repo_dir)) + + return repo_dir, revert + + return repo_dir, None GIT_DOMAINS = { diff --git a/esphome/git_lock.py b/esphome/git_lock.py new file mode 100644 index 0000000000..1e1515b522 --- /dev/null +++ b/esphome/git_lock.py @@ -0,0 +1,119 @@ +"""File locking for git operations to prevent race conditions.""" + +from contextlib import contextmanager +import hashlib +import logging +from pathlib import Path +import sys +import tempfile +import time + +# Platform-specific imports +if sys.platform == "win32": + import msvcrt +else: + import fcntl + +_LOGGER = logging.getLogger(__name__) + +# Global lock directory +LOCK_DIR = Path(tempfile.gettempdir()) / "esphome_git_locks" +LOCK_DIR.mkdir(exist_ok=True) + + +def _acquire_lock_unix(lock_file, timeout, identifier): + """Acquire lock on Unix systems using fcntl.""" + start_time = time.time() + while True: + try: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + return True + except OSError: + if time.time() - start_time > timeout: + raise TimeoutError( + f"Could not acquire lock for {identifier} within {timeout}s" + ) + time.sleep(0.1) + + +def _release_lock_unix(lock_file): + """Release lock on Unix systems.""" + try: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + except Exception: + pass + + +def _acquire_lock_windows(lock_file, timeout, identifier): + """Acquire lock on Windows systems using msvcrt.""" + start_time = time.time() + while True: + try: + msvcrt.locking(lock_file.fileno(), msvcrt.LK_NBLCK, 1) + return True + except OSError: + if time.time() - start_time > timeout: + raise TimeoutError( + f"Could not acquire lock for {identifier} within {timeout}s" + ) + time.sleep(0.1) + + +def _release_lock_windows(lock_file): + """Release lock on Windows systems.""" + try: + msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1) + except Exception: + pass + + +@contextmanager +def git_operation_lock(identifier: str, timeout: float = 30.0): + """ + Acquire a file lock for a git operation. + + :param identifier: Unique identifier for the operation (e.g., repo URL or path) + :param timeout: Maximum time to wait for the lock in seconds + """ + # Create a safe filename from the identifier + lock_name = hashlib.sha256(identifier.encode()).hexdigest()[:16] + lock_path = LOCK_DIR / f"{lock_name}.lock" + + # Ensure lock file exists + lock_path.touch(exist_ok=True) + + lock_file = None + acquired = False + + try: + # Open in binary mode for Windows compatibility + lock_file = open(lock_path, "r+b") + + # Platform-specific lock acquisition + if sys.platform == "win32": + acquired = _acquire_lock_windows(lock_file, timeout, identifier) + else: + acquired = _acquire_lock_unix(lock_file, timeout, identifier) + + if acquired: + _LOGGER.debug(f"Acquired lock for {identifier}") + + yield + + finally: + if lock_file: + if acquired: + # Platform-specific lock release + if sys.platform == "win32": + _release_lock_windows(lock_file) + else: + _release_lock_unix(lock_file) + _LOGGER.debug(f"Released lock for {identifier}") + lock_file.close() + + +@contextmanager +def platformio_init_lock(timeout: float = 30.0): + """Lock for PlatformIO initialization to prevent race conditions.""" + with git_operation_lock("platformio_init", timeout=timeout): + yield diff --git a/esphome/platformio_api.py b/esphome/platformio_api.py index 808db03231..4dfc381ca8 100644 --- a/esphome/platformio_api.py +++ b/esphome/platformio_api.py @@ -86,7 +86,11 @@ def run_platformio_cli(*args, **kwargs) -> str | int: if os.environ.get("ESPHOME_USE_SUBPROCESS") is not None: return run_external_process(*cmd, **kwargs) - import platformio.__main__ + # Import platformio with lock to prevent race conditions during initialization + from esphome.git_lock import platformio_init_lock + + with platformio_init_lock(): + import platformio.__main__ patch_structhash() return run_external_command(platformio.__main__.main, *cmd, **kwargs)