Files
esphome/esphome/writer.py
Jonathan Swoboda 2b7695ba3f [core] Fix clean all windows (#12217)
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
2025-12-01 12:40:56 -05:00

413 lines
13 KiB
Python

from collections.abc import Callable
import importlib
import logging
import os
from pathlib import Path
import re
import shutil
import stat
from types import TracebackType
from esphome import loader
from esphome.config import iter_component_configs, iter_components
from esphome.const import (
HEADER_FILE_EXTENSIONS,
PLATFORM_ESP32,
SOURCE_FILE_EXTENSIONS,
__version__,
)
from esphome.core import CORE, EsphomeError
from esphome.helpers import (
copy_file_if_changed,
get_str_env,
is_ha_addon,
read_file,
walk_files,
write_file_if_changed,
)
from esphome.storage_json import StorageJSON, storage_path
_LOGGER = logging.getLogger(__name__)
CPP_AUTO_GENERATE_BEGIN = "// ========== AUTO GENERATED CODE BEGIN ==========="
CPP_AUTO_GENERATE_END = "// =========== AUTO GENERATED CODE END ============"
CPP_INCLUDE_BEGIN = "// ========== AUTO GENERATED INCLUDE BLOCK BEGIN ==========="
CPP_INCLUDE_END = "// ========== AUTO GENERATED INCLUDE BLOCK END ==========="
CPP_BASE_FORMAT = (
"""// Auto generated code by esphome
""",
""""
void setup() {
""",
"""
App.setup();
}
void loop() {
App.loop();
}
""",
)
UPLOAD_SPEED_OVERRIDE = {
"esp210": 57600,
}
def get_flags(key):
flags = set()
for _, component, conf in iter_component_configs(CORE.config):
flags |= getattr(component, key)(conf)
return flags
def get_include_text():
include_text = '#include "esphome.h"\nusing namespace esphome;\n'
for _, component, conf in iter_component_configs(CORE.config):
if not hasattr(component, "includes"):
continue
includes = component.includes
if callable(includes):
includes = includes(conf)
if includes is None:
continue
if isinstance(includes, list):
includes = "\n".join(includes)
if not includes:
continue
include_text += f"{includes}\n"
return include_text
def replace_file_content(text, pattern, repl):
content_new, count = re.subn(pattern, repl, text, flags=re.MULTILINE)
return content_new, count
def storage_should_clean(old: StorageJSON | None, new: StorageJSON) -> bool:
if old is None:
return True
if old.src_version != new.src_version:
return True
if old.build_path != new.build_path:
return True
# Check if any components have been removed
return bool(old.loaded_integrations - new.loaded_integrations)
def storage_should_update_cmake_cache(old: StorageJSON, new: StorageJSON) -> bool:
if (
old.loaded_integrations != new.loaded_integrations
or old.loaded_platforms != new.loaded_platforms
) and new.core_platform == PLATFORM_ESP32:
from esphome.components.esp32 import FRAMEWORK_ESP_IDF
return new.framework == FRAMEWORK_ESP_IDF
return False
def update_storage_json() -> None:
path = storage_path()
old = StorageJSON.load(path)
new = StorageJSON.from_esphome_core(CORE, old)
if old == new:
return
if storage_should_clean(old, new):
if old is not None and old.loaded_integrations - new.loaded_integrations:
removed = old.loaded_integrations - new.loaded_integrations
_LOGGER.info(
"Components removed (%s), cleaning build files...",
", ".join(sorted(removed)),
)
else:
_LOGGER.info("Core config or version changed, cleaning build files...")
clean_build(clear_pio_cache=False)
elif storage_should_update_cmake_cache(old, new):
_LOGGER.info("Integrations changed, cleaning cmake cache...")
clean_cmake_cache()
new.save(path)
def find_begin_end(text, begin_s, end_s):
begin_index = text.find(begin_s)
if begin_index == -1:
raise EsphomeError(
"Could not find auto generated code begin in file, either "
"delete the main sketch file or insert the comment again."
)
if text.find(begin_s, begin_index + 1) != -1:
raise EsphomeError(
"Found multiple auto generate code begins, don't know "
"which to chose, please remove one of them."
)
end_index = text.find(end_s)
if end_index == -1:
raise EsphomeError(
"Could not find auto generated code end in file, either "
"delete the main sketch file or insert the comment again."
)
if text.find(end_s, end_index + 1) != -1:
raise EsphomeError(
"Found multiple auto generate code endings, don't know "
"which to chose, please remove one of them."
)
return text[:begin_index], text[(end_index + len(end_s)) :]
DEFINES_H_FORMAT = ESPHOME_H_FORMAT = """\
#pragma once
#include "esphome/core/macros.h"
{}
"""
VERSION_H_FORMAT = """\
#pragma once
#include "esphome/core/macros.h"
#define ESPHOME_VERSION "{}"
#define ESPHOME_VERSION_CODE VERSION_CODE({}, {}, {})
"""
DEFINES_H_TARGET = "esphome/core/defines.h"
VERSION_H_TARGET = "esphome/core/version.h"
ESPHOME_README_TXT = """
THIS DIRECTORY IS AUTO-GENERATED, DO NOT MODIFY
ESPHome automatically populates the build directory, and any
changes to this directory will be removed the next time esphome is
run.
For modifying esphome's core files, please use a development esphome install,
the custom_components folder or the external_components feature.
"""
def copy_src_tree():
source_files: list[loader.FileResource] = []
for _, component in iter_components(CORE.config):
source_files += component.resources
source_files_map = {
Path(x.package.replace(".", "/") + "/" + x.resource): x for x in source_files
}
# Convert to list and sort
source_files_l = list(source_files_map.items())
source_files_l.sort()
# Build #include list for esphome.h
include_l = []
for target, _ in source_files_l:
if target.suffix in HEADER_FILE_EXTENSIONS:
include_l.append(f'#include "{target}"')
include_l.append("")
include_s = "\n".join(include_l)
source_files_copy = source_files_map.copy()
ignore_targets = [Path(x) for x in (DEFINES_H_TARGET, VERSION_H_TARGET)]
for t in ignore_targets:
source_files_copy.pop(t)
for fname in walk_files(CORE.relative_src_path("esphome")):
p = Path(fname)
if p.suffix not in SOURCE_FILE_EXTENSIONS:
# Not a source file, ignore
continue
# Transform path to target path name
target = p.relative_to(CORE.relative_src_path())
if target in ignore_targets:
# Ignore defines.h, will be dealt with later
continue
if target not in source_files_copy:
# Source file removed, delete target
p.unlink()
else:
src_file = source_files_copy.pop(target)
with src_file.path() as src_path:
copy_file_if_changed(src_path, p)
# Now copy new files
for target, src_file in source_files_copy.items():
dst_path = CORE.relative_src_path(*target.parts)
with src_file.path() as src_path:
copy_file_if_changed(src_path, dst_path)
# Finally copy defines
write_file_if_changed(
CORE.relative_src_path("esphome", "core", "defines.h"), generate_defines_h()
)
write_file_if_changed(CORE.relative_build_path("README.txt"), ESPHOME_README_TXT)
write_file_if_changed(
CORE.relative_src_path("esphome.h"), ESPHOME_H_FORMAT.format(include_s)
)
write_file_if_changed(
CORE.relative_src_path("esphome", "core", "version.h"), generate_version_h()
)
platform = "esphome.components." + CORE.target_platform
try:
module = importlib.import_module(platform)
copy_files = getattr(module, "copy_files")
copy_files()
except AttributeError:
pass
def generate_defines_h():
define_content_l = [x.as_macro for x in CORE.defines]
define_content_l.sort()
return DEFINES_H_FORMAT.format("\n".join(define_content_l))
def generate_version_h():
match = re.match(r"^(\d+)\.(\d+).(\d+)-?\w*$", __version__)
if not match:
raise EsphomeError(f"Could not parse version {__version__}.")
return VERSION_H_FORMAT.format(
__version__, match.group(1), match.group(2), match.group(3)
)
def write_cpp(code_s):
path = CORE.relative_src_path("main.cpp")
if path.is_file():
text = read_file(path)
code_format = find_begin_end(
text, CPP_AUTO_GENERATE_BEGIN, CPP_AUTO_GENERATE_END
)
code_format_ = find_begin_end(
code_format[0], CPP_INCLUDE_BEGIN, CPP_INCLUDE_END
)
code_format = (code_format_[0], code_format_[1], code_format[1])
else:
code_format = CPP_BASE_FORMAT
copy_src_tree()
global_s = '#include "esphome.h"\n'
global_s += CORE.cpp_global_section
full_file = f"{code_format[0] + CPP_INCLUDE_BEGIN}\n{global_s}{CPP_INCLUDE_END}"
full_file += (
f"{code_format[1] + CPP_AUTO_GENERATE_BEGIN}\n{code_s}{CPP_AUTO_GENERATE_END}"
)
full_file += code_format[2]
write_file_if_changed(path, full_file)
def clean_cmake_cache():
pioenvs = CORE.relative_pioenvs_path()
if pioenvs.is_dir():
pioenvs_cmake_path = pioenvs / CORE.name / "CMakeCache.txt"
if pioenvs_cmake_path.is_file():
_LOGGER.info("Deleting %s", pioenvs_cmake_path)
pioenvs_cmake_path.unlink()
def _rmtree_error_handler(
func: Callable[[str], object],
path: str,
exc_info: tuple[type[BaseException], BaseException, TracebackType | None],
) -> None:
"""Error handler for shutil.rmtree to handle read-only files on Windows.
On Windows, git pack files and other files may be marked read-only,
causing shutil.rmtree to fail with "Access is denied". This handler
removes the read-only flag and retries the deletion.
"""
if os.access(path, os.W_OK):
raise exc_info[1].with_traceback(exc_info[2])
os.chmod(path, stat.S_IWUSR | stat.S_IRUSR)
func(path)
def clean_build(clear_pio_cache: bool = True):
# Allow skipping cache cleaning for integration tests
if os.environ.get("ESPHOME_SKIP_CLEAN_BUILD"):
_LOGGER.warning("Skipping build cleaning (ESPHOME_SKIP_CLEAN_BUILD set)")
return
pioenvs = CORE.relative_pioenvs_path()
if pioenvs.is_dir():
_LOGGER.info("Deleting %s", pioenvs)
shutil.rmtree(pioenvs, onerror=_rmtree_error_handler)
piolibdeps = CORE.relative_piolibdeps_path()
if piolibdeps.is_dir():
_LOGGER.info("Deleting %s", piolibdeps)
shutil.rmtree(piolibdeps, onerror=_rmtree_error_handler)
dependencies_lock = CORE.relative_build_path("dependencies.lock")
if dependencies_lock.is_file():
_LOGGER.info("Deleting %s", dependencies_lock)
dependencies_lock.unlink()
if not clear_pio_cache:
return
# Clean PlatformIO cache to resolve CMake compiler detection issues
# This helps when toolchain paths change or get corrupted
try:
from platformio.project.config import ProjectConfig
except ImportError:
# PlatformIO is not available, skip cache cleaning
pass
else:
config = ProjectConfig.get_instance()
cache_dir = Path(config.get("platformio", "cache_dir"))
if cache_dir.is_dir():
_LOGGER.info("Deleting PlatformIO cache %s", cache_dir)
shutil.rmtree(cache_dir, onerror=_rmtree_error_handler)
def clean_all(configuration: list[str]):
data_dirs = []
for config in configuration:
item = Path(config)
if item.is_file() and item.suffix in (".yaml", ".yml"):
data_dirs.append(item.parent / ".esphome")
else:
data_dirs.append(item / ".esphome")
if is_ha_addon():
data_dirs.append(Path("/data"))
if "ESPHOME_DATA_DIR" in os.environ:
data_dirs.append(Path(get_str_env("ESPHOME_DATA_DIR", None)))
# Clean build dir
for dir in data_dirs:
if dir.is_dir():
_LOGGER.info("Cleaning %s", dir)
# Don't remove storage or .json files which are needed by the dashboard
for item in dir.iterdir():
if item.is_file() and not item.name.endswith(".json"):
item.unlink()
elif item.is_dir() and item.name != "storage":
shutil.rmtree(item, onerror=_rmtree_error_handler)
# Clean PlatformIO project files
try:
from platformio.project.config import ProjectConfig
except ImportError:
# PlatformIO is not available, skip cleaning
pass
else:
config = ProjectConfig.get_instance()
for pio_dir in ["cache_dir", "packages_dir", "platforms_dir", "core_dir"]:
path = Path(config.get("platformio", pio_dir))
if path.is_dir():
_LOGGER.info("Deleting PlatformIO %s %s", pio_dir, path)
shutil.rmtree(path, onerror=_rmtree_error_handler)
GITIGNORE_CONTENT = """# Gitignore settings for ESPHome
# This is an example and may include too much for your use-case.
# You can modify this file to suit your needs.
/.esphome/
/secrets.yaml
"""
def write_gitignore():
path = CORE.relative_config_path(".gitignore")
if not path.is_file():
path.write_text(GITIGNORE_CONTENT, encoding="utf-8")