From 3fbbec81aff590ce2aecab0f620ead5a571b228d Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 14 May 2025 14:26:05 -0500 Subject: [PATCH] frame helper opt --- esphome/components/api/api_connection.cpp | 25 ++++ esphome/components/api/api_connection.h | 8 + esphome/components/api/api_frame_helper.cpp | 156 +++++++++++++------- esphome/components/api/api_frame_helper.h | 9 +- 4 files changed, 143 insertions(+), 55 deletions(-) diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index 30f9478e1d..4e46f29634 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -63,6 +63,11 @@ APIConnection::APIConnection(std::unique_ptr sock, APIServer *pa : parent_(parent), deferred_message_queue_(this), initial_state_iterator_(this), list_entities_iterator_(this) { this->proto_write_buffer_.reserve(64); + // Explicitly initialize stats + this->stats_enabled_ = true; + this->next_stats_log_ = 0; + ESP_LOGD(STATS_TAG, "API Connection created with stats_enabled_=true"); + #if defined(USE_API_PLAINTEXT) && defined(USE_API_NOISE) auto noise_ctx = parent->get_noise_ctx(); if (noise_ctx->has_psk()) { @@ -97,6 +102,11 @@ void APIConnection::log_section_stats_() { ESP_LOGI(STATS_TAG, "API Connection Section Runtime Statistics"); ESP_LOGI(STATS_TAG, "Period stats (last %" PRIu32 "ms):", this->stats_log_interval_); + if (this->section_stats_.empty()) { + ESP_LOGW(STATS_TAG, "No section stats collected yet"); + return; + } + // First collect stats we want to display std::vector> stats_to_display; @@ -140,6 +150,7 @@ void APIConnection::log_section_stats_() { } void APIConnection::reset_section_stats_() { + ESP_LOGD(STATS_TAG, "Resetting API section stats, sections count: %u", this->section_stats_.size()); for (auto &it : this->section_stats_) { it.second.reset_period_stats(); } @@ -348,10 +359,24 @@ void APIConnection::loop() { // If next_stats_log_ is 0, initialize it if (this->next_stats_log_ == 0) { this->next_stats_log_ = now + this->stats_log_interval_; + ESP_LOGI(STATS_TAG, "API section stats logging enabled, next log at %u", this->next_stats_log_); } else if (now >= this->next_stats_log_) { + ESP_LOGI(STATS_TAG, "Logging API section stats now (current time: %u, scheduled time: %u)", now, + this->next_stats_log_); + // Force logging even if no stats are collected yet + ESP_LOGI(STATS_TAG, "Stats collection status: enabled=%d, sections=%u", this->stats_enabled_, + this->section_stats_.size()); + + // Explicitly log some stats we know should exist + ESP_LOGI(STATS_TAG, "Record count for key sections: helper_loop=%u, read_packet=%u, total_loop=%u", + this->section_stats_["helper_loop"].get_period_count(), + this->section_stats_["read_packet"].get_period_count(), + this->section_stats_["total_loop"].get_period_count()); + this->log_section_stats_(); this->reset_section_stats_(); this->next_stats_log_ = now + this->stats_log_interval_; + ESP_LOGI(STATS_TAG, "Next API section stats log scheduled for %u", this->next_stats_log_); } } diff --git a/esphome/components/api/api_connection.h b/esphome/components/api/api_connection.h index 3da52d3e76..99f6dd800f 100644 --- a/esphome/components/api/api_connection.h +++ b/esphome/components/api/api_connection.h @@ -86,6 +86,11 @@ class APIConnection : public APIServerConnection { this->total_time_ms_ += duration_ms; if (duration_ms > this->total_max_time_ms_) this->total_max_time_ms_ = duration_ms; + + // Log if this is the first record in this period + if (this->period_count_ == 1) { + ESP_LOGV("api.stats", "First time recording stats for this section: %u ms", duration_ms); + } } void reset_period_stats() { @@ -433,6 +438,9 @@ class APIConnection : public APIServerConnection { bool stats_enabled_{true}; void log_section_stats_(); void reset_section_stats_(); + + // Method to enable/disable section stats + void set_stats_enabled(bool enabled) { this->stats_enabled_ = enabled; } }; } // namespace api diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index 31b0732275..9d6c74a686 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -821,96 +821,143 @@ APIError APIPlaintextFrameHelper::loop() { * * error API_ERROR_BAD_INDICATOR: Bad indicator byte at start of frame. */ + +APIError APIPlaintextFrameHelper::handle_socket_read_result_(ssize_t received) { + if (received == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + return APIError::WOULD_BLOCK; + } + state_ = State::FAILED; + HELPER_LOG("Socket read failed with errno %d", errno); + return APIError::SOCKET_READ_FAILED; + } else if (received == 0) { + state_ = State::FAILED; + HELPER_LOG("Connection closed"); + return APIError::CONNECTION_CLOSED; + } + return APIError::OK; +} + APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { + // Minimum header size: 1 byte indicator + at least 1 byte for each varint (size and type) + static const size_t MIN_HEADER_SIZE = 3; + if (frame == nullptr) { HELPER_LOG("Bad argument for try_read_frame_"); return APIError::BAD_ARG; } - // read header + // Read and parse the header while (!rx_header_parsed_) { - uint8_t data; - ssize_t received = socket_->read(&data, 1); - if (received == -1) { - if (errno == EWOULDBLOCK || errno == EAGAIN) { - return APIError::WOULD_BLOCK; + // Read the minimum bytes needed for initial header parsing + if (rx_header_buf_size_ < MIN_HEADER_SIZE) { + // Try to read up to the minimum header size + size_t to_read = MIN_HEADER_SIZE - rx_header_buf_size_; + ssize_t received = socket_->read(&rx_header_buf_[rx_header_buf_size_], to_read); + + // Handle socket read result + APIError err = handle_socket_read_result_(received); + if (err != APIError::OK) + return err; + + // Update buffer size with bytes received + rx_header_buf_size_ += received; + + // Check indicator byte - we'll fail if it's wrong (regardless of which read detected it) + if (rx_header_buf_[0] != 0x00) { + state_ = State::FAILED; + HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]); + return APIError::BAD_INDICATOR; } - state_ = State::FAILED; - HELPER_LOG("Socket read failed with errno %d", errno); - return APIError::SOCKET_READ_FAILED; - } else if (received == 0) { - state_ = State::FAILED; - HELPER_LOG("Connection closed"); - return APIError::CONNECTION_CLOSED; - } - rx_header_buf_.push_back(data); - // try parse header - if (rx_header_buf_[0] != 0x00) { + // If we don't have the minimum bytes needed yet, there's no more data available + if (rx_header_buf_size_ < MIN_HEADER_SIZE) + return APIError::WOULD_BLOCK; + } else if (rx_header_buf_size_ >= sizeof(rx_header_buf_)) { + // Buffer is full but we still couldn't parse the header state_ = State::FAILED; - HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]); - return APIError::BAD_INDICATOR; + HELPER_LOG("Header too large for buffer"); + return APIError::BAD_DATA_PACKET; + } else { + // Already have 3+ bytes, read one byte at a time for rest of header + // to make sure we don't read into the next message + ssize_t received = socket_->read(&rx_header_buf_[rx_header_buf_size_], 1); + + // Handle socket read result + APIError err = handle_socket_read_result_(received); + if (err != APIError::OK) + return err; + + // Update buffer size with bytes received + rx_header_buf_size_ += received; } - size_t i = 1; + // Parse the header - at this point we're guaranteed to have at least the minimum bytes needed + size_t header_pos = 1; // Start after the indicator byte uint32_t consumed = 0; - auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed); - if (!msg_size_varint.has_value()) { - // not enough data there yet - continue; - } - i += consumed; + // Parse message size varint + auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[header_pos], rx_header_buf_size_ - header_pos, &consumed); + if (!msg_size_varint.has_value()) + // Not enough data for message size yet + continue; + + header_pos += consumed; rx_header_parsed_len_ = msg_size_varint->as_uint32(); - auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed); - if (!msg_type_varint.has_value()) { - // not enough data there yet + if (header_pos >= rx_header_buf_size_) + // Not enough data for message type yet + continue; + + // Parse message type varint + auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[header_pos], rx_header_buf_size_ - header_pos, &consumed); + if (!msg_type_varint.has_value()) + // Not enough data for message type yet continue; - } rx_header_parsed_type_ = msg_type_varint->as_uint32(); rx_header_parsed_ = true; - } - // header reading done - // reserve space for body - if (rx_buf_.size() != rx_header_parsed_len_) { + // Now that we know the message size, allocate the buffer rx_buf_.resize(rx_header_parsed_len_); } + // Header parsing complete - if (rx_buf_len_ < rx_header_parsed_len_) { - // more data to read - size_t to_read = rx_header_parsed_len_ - rx_buf_len_; - ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], to_read); - if (received == -1) { - if (errno == EWOULDBLOCK || errno == EAGAIN) { - return APIError::WOULD_BLOCK; - } - state_ = State::FAILED; - HELPER_LOG("Socket read failed with errno %d", errno); - return APIError::SOCKET_READ_FAILED; - } else if (received == 0) { - state_ = State::FAILED; - HELPER_LOG("Connection closed"); - return APIError::CONNECTION_CLOSED; + // Special case: if message length is 0, we're already done (no body to read) + if (rx_header_parsed_len_ != 0) { + // Calculate how much more data we need + size_t remaining = rx_header_parsed_len_ - rx_buf_len_; + + // Try to read all remaining bytes at once to minimize syscalls + ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], remaining); + + // Handle socket read result + APIError err = handle_socket_read_result_(received); + if (err != APIError::OK) { + return err; } + + // Update our buffer position rx_buf_len_ += received; - if ((size_t) received != to_read) { - // not all read + + // If we didn't get all the data we need, wait for more + if (rx_buf_len_ < rx_header_parsed_len_) return APIError::WOULD_BLOCK; - } } // uncomment for even more debugging #ifdef HELPER_LOG_PACKETS ESP_LOGVV(TAG, "Received frame: %s", format_hex_pretty(rx_buf_).c_str()); #endif + + // Move the message buffer to the frame frame->msg = std::move(rx_buf_); - // consume msg + + // Reset all state for the next message rx_buf_ = {}; rx_buf_len_ = 0; - rx_header_buf_.clear(); + rx_header_buf_size_ = 0; rx_header_parsed_ = false; + return APIError::OK; } @@ -921,6 +968,7 @@ APIError APIPlaintextFrameHelper::read_packet(ReadPacketBuffer *buffer) { return APIError::WOULD_BLOCK; } + // Try to read data efficiently ParsedFrame frame; aerr = try_read_frame_(&frame); if (aerr != APIError::OK) { diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h index 59f3cf7471..d25f3b535e 100644 --- a/esphome/components/api/api_frame_helper.h +++ b/esphome/components/api/api_frame_helper.h @@ -176,10 +176,17 @@ class APIPlaintextFrameHelper : public APIFrameHelper { return APIFrameHelper::write_raw_(iov, iovcnt, socket_.get(), tx_buf_, info_, state_, State::FAILED); } + protected: + // Helper method to handle socket read results consistently + APIError handle_socket_read_result_(ssize_t received); + std::unique_ptr socket_; std::string info_; - std::vector rx_header_buf_; + // Fixed-size header buffer - max we need is indicator byte (1) + max 2 varints (10 bytes each for uint64) + // In practice, for small packets this is much smaller + uint8_t rx_header_buf_[21]; + size_t rx_header_buf_size_ = 0; bool rx_header_parsed_ = false; uint32_t rx_header_parsed_type_ = 0; uint32_t rx_header_parsed_len_ = 0;