From 57284b1ac3449b6570ebfc34ac463b1e627cf456 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 14 May 2025 23:26:28 -0500 Subject: [PATCH 1/8] Bump cairosvg from 2.8.0 to 2.8.1 (#8799) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 106a6ff901..c89ad4a6e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,7 +19,7 @@ puremagic==1.29 ruamel.yaml==0.18.10 # dashboard_import esphome-glyphsets==0.2.0 pillow==10.4.0 -cairosvg==2.8.0 +cairosvg==2.8.1 freetype-py==2.5.1 # esp-idf requires this, but doesn't bundle it by default From dd8d8ad95207933c4cfc258c3dc1e33899f83a90 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 00:16:08 -0500 Subject: [PATCH 2/8] Use fixed buffer for plaintext protocol like noise protocol (#8800) --- esphome/components/api/api_frame_helper.cpp | 59 +++++++++++++++++---- esphome/components/api/api_frame_helper.h | 14 ++++- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index 31b0732275..f251ceb6e4 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -830,6 +830,10 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { // read header while (!rx_header_parsed_) { uint8_t data; + // Reading one byte at a time is fastest in practice for ESP32 when + // there is no data on the wire (which is the common case). + // This results in faster failure detection compared to + // attempting to read multiple bytes at once. ssize_t received = socket_->read(&data, 1); if (received == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { @@ -843,27 +847,60 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { HELPER_LOG("Connection closed"); return APIError::CONNECTION_CLOSED; } - rx_header_buf_.push_back(data); - // try parse header - if (rx_header_buf_[0] != 0x00) { - state_ = State::FAILED; - HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]); - return APIError::BAD_INDICATOR; + // Successfully read a byte + + // Process byte according to current buffer position + if (rx_header_buf_pos_ == 0) { // Case 1: First byte (indicator byte) + if (data != 0x00) { + state_ = State::FAILED; + HELPER_LOG("Bad indicator byte %u", data); + return APIError::BAD_INDICATOR; + } + // We don't store the indicator byte, just increment position + rx_header_buf_pos_ = 1; // Set to 1 directly + continue; // Need more bytes before we can parse } - size_t i = 1; + // Check buffer overflow before storing + if (rx_header_buf_pos_ == 5) { // Case 2: Buffer would overflow (5 bytes is max allowed) + state_ = State::FAILED; + HELPER_LOG("Header buffer overflow"); + return APIError::BAD_DATA_PACKET; + } + + // Store byte in buffer (adjust index to account for skipped indicator byte) + rx_header_buf_[rx_header_buf_pos_ - 1] = data; + + // Increment position after storing + rx_header_buf_pos_++; + + // Case 3: If we only have one varint byte, we need more + if (rx_header_buf_pos_ == 2) { // Have read indicator + 1 byte + continue; // Need more bytes before we can parse + } + + // At this point, we have at least 3 bytes total: + // - Validated indicator byte (0x00) but not stored + // - At least 2 bytes in the buffer for the varints + // Buffer layout: + // First 1-3 bytes: Message size varint (variable length) + // - 2 bytes would only allow up to 16383, which is less than noise's 65535 + // - 3 bytes allows up to 2097151, ensuring we support at least as much as noise + // Remaining 1-2 bytes: Message type varint (variable length) + // We now attempt to parse both varints. If either is incomplete, + // we'll continue reading more bytes. + uint32_t consumed = 0; - auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed); + auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[0], rx_header_buf_pos_ - 1, &consumed); if (!msg_size_varint.has_value()) { // not enough data there yet continue; } - i += consumed; rx_header_parsed_len_ = msg_size_varint->as_uint32(); - auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed); + auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[consumed], rx_header_buf_pos_ - 1 - consumed, &consumed); if (!msg_type_varint.has_value()) { // not enough data there yet continue; @@ -909,7 +946,7 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { // consume msg rx_buf_ = {}; rx_buf_len_ = 0; - rx_header_buf_.clear(); + rx_header_buf_pos_ = 0; rx_header_parsed_ = false; return APIError::OK; } diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h index 59f3cf7471..db506ea1ce 100644 --- a/esphome/components/api/api_frame_helper.h +++ b/esphome/components/api/api_frame_helper.h @@ -119,6 +119,9 @@ class APINoiseFrameHelper : public APIFrameHelper { std::unique_ptr socket_; std::string info_; + // Fixed-size header buffer for noise protocol: + // 1 byte for indicator + 2 bytes for message size (16-bit value, not varint) + // Note: Maximum message size is 65535, with a limit of 128 bytes during handshake phase uint8_t rx_header_buf_[3]; size_t rx_header_buf_len_ = 0; std::vector rx_buf_; @@ -179,7 +182,16 @@ class APIPlaintextFrameHelper : public APIFrameHelper { std::unique_ptr socket_; std::string info_; - std::vector rx_header_buf_; + // Fixed-size header buffer for plaintext protocol: + // We only need space for the two varints since we validate the indicator byte separately. + // To match noise protocol's maximum message size (65535), we need: + // 3 bytes for message size varint (supports up to 2097151) + 2 bytes for message type varint + // + // While varints could theoretically be up to 10 bytes each for 64-bit values, + // attempting to process messages with headers that large would likely crash the + // ESP32 due to memory constraints. + uint8_t rx_header_buf_[5]; // 5 bytes for varints (3 for size + 2 for type) + uint8_t rx_header_buf_pos_ = 0; bool rx_header_parsed_ = false; uint32_t rx_header_parsed_type_ = 0; uint32_t rx_header_parsed_len_ = 0; From efa6745a5e28313830e94557b4423ff857bed4ff Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 00:16:25 -0500 Subject: [PATCH 3/8] Optimize protobuf varint decoder for ESPHome use case (#8791) --- esphome/components/api/proto.h | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/esphome/components/api/proto.h b/esphome/components/api/proto.h index b8ee6b7920..e110a58eda 100644 --- a/esphome/components/api/proto.h +++ b/esphome/components/api/proto.h @@ -20,16 +20,26 @@ class ProtoVarInt { explicit ProtoVarInt(uint64_t value) : value_(value) {} static optional parse(const uint8_t *buffer, uint32_t len, uint32_t *consumed) { - if (consumed != nullptr) - *consumed = 0; - - if (len == 0) + if (len == 0) { + if (consumed != nullptr) + *consumed = 0; return {}; + } - uint64_t result = 0; - uint8_t bitpos = 0; + // Most common case: single-byte varint (values 0-127) + if ((buffer[0] & 0x80) == 0) { + if (consumed != nullptr) + *consumed = 1; + return ProtoVarInt(buffer[0]); + } - for (uint32_t i = 0; i < len; i++) { + // General case for multi-byte varints + // Since we know buffer[0]'s high bit is set, initialize with its value + uint64_t result = buffer[0] & 0x7F; + uint8_t bitpos = 7; + + // Start from the second byte since we've already processed the first + for (uint32_t i = 1; i < len; i++) { uint8_t val = buffer[i]; result |= uint64_t(val & 0x7F) << uint64_t(bitpos); bitpos += 7; @@ -40,7 +50,9 @@ class ProtoVarInt { } } - return {}; + if (consumed != nullptr) + *consumed = 0; + return {}; // Incomplete or invalid varint } uint32_t as_uint32() const { return this->value_; } From d4b42ebf2012bae197693c2030ad77536e696429 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 01:31:22 -0500 Subject: [PATCH 4/8] Eliminate outbound buffer expensive O(n) with O(1) queue operations - Replaced inefficient vector-based buffer with a queue of discrete message buffers - Moved common code to base class to reduce duplication - Removed unnecessary data copying after partial sends - Added small-buffer optimization to allow writing with backlog <256 bytes - Moved common code to base class to reduce duplication --- esphome/components/api/api_frame_helper.cpp | 272 +++++++++----------- esphome/components/api/api_frame_helper.h | 179 +++++++------ 2 files changed, 225 insertions(+), 226 deletions(-) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index f251ceb6e4..669be8052c 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -13,14 +13,6 @@ namespace api { static const char *const TAG = "api.socket"; -/// Is the given return value (from write syscalls) a wouldblock error? -bool is_would_block(ssize_t ret) { - if (ret == -1) { - return errno == EWOULDBLOCK || errno == EAGAIN; - } - return ret == 0; -} - const char *api_error_to_str(APIError err) { // not using switch to ensure compiler doesn't try to build a big table out of it if (err == APIError::OK) { @@ -74,13 +66,10 @@ const char *api_error_to_str(APIError err) { } // Common implementation for writing raw data to socket -template -APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt, socket::Socket *socket, - std::vector &tx_buf, const std::string &info, StateEnum &state, - StateEnum failed_state) { +APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) { // This method writes data to socket or buffers it // Returns APIError::OK if successful (or would block, but data has been buffered) - // Returns APIError::SOCKET_WRITE_FAILED if socket write failed, and sets state to failed_state + // Returns APIError::SOCKET_WRITE_FAILED if socket write failed, and sets state to FAILED if (iovcnt == 0) return APIError::OK; // Nothing to do, success @@ -94,71 +83,131 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt, socket: total_write_len += iov[i].iov_len; } - if (!tx_buf.empty()) { - // try to empty tx_buf first - while (!tx_buf.empty()) { - ssize_t sent = socket->write(tx_buf.data(), tx_buf.size()); - if (is_would_block(sent)) { - break; - } else if (sent == -1) { - ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", info.c_str(), errno); - state = failed_state; - return APIError::SOCKET_WRITE_FAILED; // Socket write failed - } - // TODO: inefficient if multiple packets in txbuf - // replace with deque of buffers - tx_buf.erase(tx_buf.begin(), tx_buf.begin() + sent); + // Try to send any existing buffered data first + if (!this->tx_buf_.empty()) { + // Get the first buffer in the queue + SendBuffer &front_buffer = this->tx_buf_.front(); + + // Try to send the remaining data in this buffer + ssize_t sent = this->socket_->write(front_buffer.current_data(), front_buffer.remaining()); + + if (this->is_would_block(sent)) { + // Socket would block, we'll try again later + } else if (sent == -1) { + ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); + this->state_ = State::FAILED; + return APIError::SOCKET_WRITE_FAILED; // Socket write failed + } else if (sent > 0 && static_cast(sent) < front_buffer.remaining()) { + // Partially sent, update offset + front_buffer.offset += sent; + } else if (sent > 0) { + // Buffer completely sent, remove it from the queue + this->tx_buf_.pop_front(); } } - if (!tx_buf.empty()) { - // tx buf not empty, can't write now because then stream would be inconsistent - // Reserve space upfront to avoid multiple reallocations - tx_buf.reserve(tx_buf.size() + total_write_len); + // If we still have pending data, append the new data to the queue + if (!this->tx_buf_.empty()) { + // Add new data as a new buffer + SendBuffer buffer; + + // Calculate total size needed + buffer.data.reserve(total_write_len); + + // Copy all iov segments to the buffer for (int i = 0; i < iovcnt; i++) { - tx_buf.insert(tx_buf.end(), reinterpret_cast(iov[i].iov_base), - reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + const uint8_t *data = reinterpret_cast(iov[i].iov_base); + buffer.data.insert(buffer.data.end(), data, data + iov[i].iov_len); } + + // Add to the queue + this->tx_buf_.push_back(std::move(buffer)); + return APIError::OK; // Success, data buffered } - ssize_t sent = socket->writev(iov, iovcnt); - if (is_would_block(sent)) { - // operation would block, add buffer to tx_buf - // Reserve space upfront to avoid multiple reallocations - tx_buf.reserve(tx_buf.size() + total_write_len); + // Try to send directly if no buffered data + ssize_t sent = this->socket_->writev(iov, iovcnt); + + if (this->is_would_block(sent)) { + // Socket would block, buffer the data + SendBuffer buffer; + buffer.data.reserve(total_write_len); + for (int i = 0; i < iovcnt; i++) { - tx_buf.insert(tx_buf.end(), reinterpret_cast(iov[i].iov_base), - reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + const uint8_t *data = reinterpret_cast(iov[i].iov_base); + buffer.data.insert(buffer.data.end(), data, data + iov[i].iov_len); } + + this->tx_buf_.push_back(std::move(buffer)); return APIError::OK; // Success, data buffered } else if (sent == -1) { - // an error occurred - ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", info.c_str(), errno); - state = failed_state; + // Socket error + ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); + this->state_ = State::FAILED; return APIError::SOCKET_WRITE_FAILED; // Socket write failed - } else if ((size_t) sent != total_write_len) { - // partially sent, add end to tx_buf - size_t remaining = total_write_len - sent; - // Reserve space upfront to avoid multiple reallocations - tx_buf.reserve(tx_buf.size() + remaining); - + } else if (static_cast(sent) < total_write_len) { + // Partially sent, buffer the remaining data + SendBuffer buffer; size_t to_consume = sent; + size_t remaining = total_write_len - sent; + + buffer.data.reserve(remaining); + for (int i = 0; i < iovcnt; i++) { if (to_consume >= iov[i].iov_len) { + // This segment was fully sent to_consume -= iov[i].iov_len; } else { - tx_buf.insert(tx_buf.end(), reinterpret_cast(iov[i].iov_base) + to_consume, - reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + // This segment was partially sent or not sent at all + const uint8_t *data = reinterpret_cast(iov[i].iov_base) + to_consume; + size_t len = iov[i].iov_len - to_consume; + buffer.data.insert(buffer.data.end(), data, data + len); to_consume = 0; } } - return APIError::OK; // Success, data buffered + + this->tx_buf_.push_back(std::move(buffer)); + return APIError::OK; // Success, remaining data buffered } + return APIError::OK; // Success, all data sent } -#define HELPER_LOG(msg, ...) ESP_LOGVV(TAG, "%s: " msg, info_.c_str(), ##__VA_ARGS__) +// Common implementation for trying to send buffered data +APIError APIFrameHelper::try_send_tx_buf_() { + // Try to send from tx_buf + while (state_ != State::CLOSED && !this->tx_buf_.empty()) { + // Get the first buffer in the queue + SendBuffer &front_buffer = this->tx_buf_.front(); + + // Try to send the remaining data in this buffer + ssize_t sent = this->socket_->write(front_buffer.current_data(), front_buffer.remaining()); + + if (this->is_would_block(sent)) { + // Socket would block, try again later + break; + } else if (sent == -1) { + // Socket error + state_ = State::FAILED; + ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); + return APIError::SOCKET_WRITE_FAILED; + } else if (sent == 0) { + // No data sent but not an error, try again later + break; + } else if (static_cast(sent) < front_buffer.remaining()) { + // Partially sent, update offset + front_buffer.offset += sent; + } else { + // Buffer completely sent, remove it from the queue + this->tx_buf_.pop_front(); + } + } + + return APIError::OK; +} + +#define HELPER_LOG(msg, ...) ESP_LOGVV(TAG, "%s: " msg, this->info_.c_str(), ##__VA_ARGS__) // uncomment to log raw packets //#define HELPER_LOG_PACKETS @@ -206,11 +255,11 @@ std::string noise_err_to_str(int err) { /// Initialize the frame helper, returns OK if successful. APIError APINoiseFrameHelper::init() { - if (state_ != State::INITIALIZE || socket_ == nullptr) { + if (state_ != State::INITIALIZE || this->socket_ == nullptr) { HELPER_LOG("Bad state for init %d", (int) state_); return APIError::BAD_STATE; } - int err = socket_->setblocking(false); + int err = this->socket_->setblocking(false); if (err != 0) { state_ = State::FAILED; HELPER_LOG("Setting nonblocking failed with errno %d", errno); @@ -218,7 +267,7 @@ APIError APINoiseFrameHelper::init() { } int enable = 1; - err = socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); + err = this->socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); if (err != 0) { state_ = State::FAILED; HELPER_LOG("Setting nodelay failed with errno %d", errno); @@ -238,7 +287,7 @@ APIError APINoiseFrameHelper::loop() { return APIError::OK; if (err != APIError::OK) return err; - if (!tx_buf_.empty()) { + if (!this->tx_buf_.empty()) { err = try_send_tx_buf_(); if (err != APIError::OK) { return err; @@ -271,7 +320,7 @@ APIError APINoiseFrameHelper::try_read_frame_(ParsedFrame *frame) { if (rx_header_buf_len_ < 3) { // no header information yet size_t to_read = 3 - rx_header_buf_len_; - ssize_t received = socket_->read(&rx_header_buf_[rx_header_buf_len_], to_read); + ssize_t received = this->socket_->read(&rx_header_buf_[rx_header_buf_len_], to_read); if (received == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { return APIError::WOULD_BLOCK; @@ -318,7 +367,7 @@ APIError APINoiseFrameHelper::try_read_frame_(ParsedFrame *frame) { if (rx_buf_len_ < msg_size) { // more data to read size_t to_read = msg_size - rx_buf_len_; - ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], to_read); + ssize_t received = this->socket_->read(&rx_buf_[rx_buf_len_], to_read); if (received == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { return APIError::WOULD_BLOCK; @@ -556,7 +605,6 @@ APIError APINoiseFrameHelper::read_packet(ReadPacketBuffer *buffer) { buffer->type = type; return APIError::OK; } -bool APINoiseFrameHelper::can_write_without_blocking() { return state_ == State::DATA && tx_buf_.empty(); } APIError APINoiseFrameHelper::write_packet(uint16_t type, const uint8_t *payload, size_t payload_len) { int err; APIError aerr; @@ -610,27 +658,7 @@ APIError APINoiseFrameHelper::write_packet(uint16_t type, const uint8_t *payload iov.iov_len = total_len; // write raw to not have two packets sent if NAGLE disabled - return write_raw_(&iov, 1); -} -APIError APINoiseFrameHelper::try_send_tx_buf_() { - // try send from tx_buf - while (state_ != State::CLOSED && !tx_buf_.empty()) { - ssize_t sent = socket_->write(tx_buf_.data(), tx_buf_.size()); - if (sent == -1) { - if (errno == EWOULDBLOCK || errno == EAGAIN) - break; - state_ = State::FAILED; - HELPER_LOG("Socket write failed with errno %d", errno); - return APIError::SOCKET_WRITE_FAILED; - } else if (sent == 0) { - break; - } - // TODO: inefficient if multiple packets in txbuf - // replace with deque of buffers - tx_buf_.erase(tx_buf_.begin(), tx_buf_.begin() + sent); - } - - return APIError::OK; + return APIFrameHelper::write_raw_(&iov, 1); } APIError APINoiseFrameHelper::write_frame_(const uint8_t *data, size_t len) { uint8_t header[3]; @@ -642,12 +670,12 @@ APIError APINoiseFrameHelper::write_frame_(const uint8_t *data, size_t len) { iov[0].iov_base = header; iov[0].iov_len = 3; if (len == 0) { - return write_raw_(iov, 1); + return APIFrameHelper::write_raw_(iov, 1); } iov[1].iov_base = const_cast(data); iov[1].iov_len = len; - return write_raw_(iov, 2); + return APIFrameHelper::write_raw_(iov, 2); } /** Initiate the data structures for the handshake. @@ -740,22 +768,6 @@ APINoiseFrameHelper::~APINoiseFrameHelper() { } } -APIError APINoiseFrameHelper::close() { - state_ = State::CLOSED; - int err = socket_->close(); - if (err == -1) - return APIError::CLOSE_FAILED; - return APIError::OK; -} -APIError APINoiseFrameHelper::shutdown(int how) { - int err = socket_->shutdown(how); - if (err == -1) - return APIError::SHUTDOWN_FAILED; - if (how == SHUT_RDWR) { - state_ = State::CLOSED; - } - return APIError::OK; -} extern "C" { // declare how noise generates random bytes (here with a good HWRNG based on the RF system) void noise_rand_bytes(void *output, size_t len) { @@ -766,28 +778,24 @@ void noise_rand_bytes(void *output, size_t len) { } } -// Explicit template instantiation for Noise -template APIError APIFrameHelper::write_raw_( - const struct iovec *iov, int iovcnt, socket::Socket *socket, std::vector &tx_buf_, const std::string &info, - APINoiseFrameHelper::State &state, APINoiseFrameHelper::State failed_state); #endif // USE_API_NOISE #ifdef USE_API_PLAINTEXT /// Initialize the frame helper, returns OK if successful. APIError APIPlaintextFrameHelper::init() { - if (state_ != State::INITIALIZE || socket_ == nullptr) { + if (state_ != State::INITIALIZE || this->socket_ == nullptr) { HELPER_LOG("Bad state for init %d", (int) state_); return APIError::BAD_STATE; } - int err = socket_->setblocking(false); + int err = this->socket_->setblocking(false); if (err != 0) { state_ = State::FAILED; HELPER_LOG("Setting nonblocking failed with errno %d", errno); return APIError::TCP_NONBLOCKING_FAILED; } int enable = 1; - err = socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); + err = this->socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); if (err != 0) { state_ = State::FAILED; HELPER_LOG("Setting nodelay failed with errno %d", errno); @@ -803,7 +811,7 @@ APIError APIPlaintextFrameHelper::loop() { return APIError::BAD_STATE; } // try send pending TX data - if (!tx_buf_.empty()) { + if (!this->tx_buf_.empty()) { APIError err = try_send_tx_buf_(); if (err != APIError::OK) { return err; @@ -834,7 +842,7 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { // there is no data on the wire (which is the common case). // This results in faster failure detection compared to // attempting to read multiple bytes at once. - ssize_t received = socket_->read(&data, 1); + ssize_t received = this->socket_->read(&data, 1); if (received == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { return APIError::WOULD_BLOCK; @@ -918,7 +926,7 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { if (rx_buf_len_ < rx_header_parsed_len_) { // more data to read size_t to_read = rx_header_parsed_len_ - rx_buf_len_; - ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], to_read); + ssize_t received = this->socket_->read(&rx_buf_[rx_buf_len_], to_read); if (received == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { return APIError::WOULD_BLOCK; @@ -978,7 +986,7 @@ APIError APIPlaintextFrameHelper::read_packet(ReadPacketBuffer *buffer) { "Bad indicator byte"; iov[0].iov_base = (void *) msg; iov[0].iov_len = 19; - write_raw_(iov, 1); + APIFrameHelper::write_raw_(iov, 1); } return aerr; } @@ -989,7 +997,6 @@ APIError APIPlaintextFrameHelper::read_packet(ReadPacketBuffer *buffer) { buffer->type = rx_header_parsed_type_; return APIError::OK; } -bool APIPlaintextFrameHelper::can_write_without_blocking() { return state_ == State::DATA && tx_buf_.empty(); } APIError APIPlaintextFrameHelper::write_packet(uint16_t type, const uint8_t *payload, size_t payload_len) { if (state_ != State::DATA) { return APIError::BAD_STATE; @@ -1006,53 +1013,14 @@ APIError APIPlaintextFrameHelper::write_packet(uint16_t type, const uint8_t *pay iov[0].iov_base = &header[0]; iov[0].iov_len = header.size(); if (payload_len == 0) { - return write_raw_(iov, 1); + return APIFrameHelper::write_raw_(iov, 1); } iov[1].iov_base = const_cast(payload); iov[1].iov_len = payload_len; - return write_raw_(iov, 2); -} -APIError APIPlaintextFrameHelper::try_send_tx_buf_() { - // try send from tx_buf - while (state_ != State::CLOSED && !tx_buf_.empty()) { - ssize_t sent = socket_->write(tx_buf_.data(), tx_buf_.size()); - if (is_would_block(sent)) { - break; - } else if (sent == -1) { - state_ = State::FAILED; - HELPER_LOG("Socket write failed with errno %d", errno); - return APIError::SOCKET_WRITE_FAILED; - } - // TODO: inefficient if multiple packets in txbuf - // replace with deque of buffers - tx_buf_.erase(tx_buf_.begin(), tx_buf_.begin() + sent); - } - - return APIError::OK; + return APIFrameHelper::write_raw_(iov, 2); } -APIError APIPlaintextFrameHelper::close() { - state_ = State::CLOSED; - int err = socket_->close(); - if (err == -1) - return APIError::CLOSE_FAILED; - return APIError::OK; -} -APIError APIPlaintextFrameHelper::shutdown(int how) { - int err = socket_->shutdown(how); - if (err == -1) - return APIError::SHUTDOWN_FAILED; - if (how == SHUT_RDWR) { - state_ = State::CLOSED; - } - return APIError::OK; -} - -// Explicit template instantiation for Plaintext -template APIError APIFrameHelper::write_raw_( - const struct iovec *iov, int iovcnt, socket::Socket *socket, std::vector &tx_buf_, const std::string &info, - APIPlaintextFrameHelper::State &state, APIPlaintextFrameHelper::State failed_state); #endif // USE_API_PLAINTEXT } // namespace api diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h index db506ea1ce..c4e7072666 100644 --- a/esphome/components/api/api_frame_helper.h +++ b/esphome/components/api/api_frame_helper.h @@ -60,65 +60,138 @@ const char *api_error_to_str(APIError err); class APIFrameHelper { public: + APIFrameHelper() = default; + explicit APIFrameHelper(std::unique_ptr socket) : socket_owned_(std::move(socket)) { + socket_ = socket_owned_.get(); + } virtual ~APIFrameHelper() = default; virtual APIError init() = 0; virtual APIError loop() = 0; virtual APIError read_packet(ReadPacketBuffer *buffer) = 0; - virtual bool can_write_without_blocking() = 0; + bool can_write_without_blocking() { + // First check if we're in the DATA state + if (state_ != State::DATA) { + return false; + } + + // Empty buffer can always accept more data + if (tx_buf_.empty()) { + return true; + } + + // Optimization: Allow writing even with a small buffer backlog to reduce delays in message processing. + // This improves throughput for real-time data like sensor readings and prevents high-priority + // messages from being unnecessarily delayed by a small queue backlog. + // The 256-byte threshold is small enough to not impact memory usage significantly + // but large enough to improve overall system responsiveness. + if (tx_buf_.size() == 1 && tx_buf_.front().remaining() < 256) { + return true; + } + + return false; + } virtual APIError write_packet(uint16_t type, const uint8_t *data, size_t len) = 0; - virtual std::string getpeername() = 0; - virtual int getpeername(struct sockaddr *addr, socklen_t *addrlen) = 0; - virtual APIError close() = 0; - virtual APIError shutdown(int how) = 0; + std::string getpeername() { return socket_->getpeername(); } + int getpeername(struct sockaddr *addr, socklen_t *addrlen) { return socket_->getpeername(addr, addrlen); } + APIError close() { + state_ = State::CLOSED; + int err = this->socket_->close(); + if (err == -1) + return APIError::CLOSE_FAILED; + return APIError::OK; + } + APIError shutdown(int how) { + int err = this->socket_->shutdown(how); + if (err == -1) + return APIError::SHUTDOWN_FAILED; + if (how == SHUT_RDWR) { + state_ = State::CLOSED; + } + return APIError::OK; + } // Give this helper a name for logging - virtual void set_log_info(std::string info) = 0; + void set_log_info(std::string info) { info_ = std::move(info); } protected: + // Struct for holding parsed frame data + struct ParsedFrame { + std::vector msg; + }; + + // Buffer containing data to be sent + struct SendBuffer { + std::vector data; + size_t offset{0}; // Current offset within the buffer + + size_t remaining() const { return data.size() - offset; } + const uint8_t *current_data() const { return data.data() + offset; } + }; + + // Queue of data buffers to be sent + std::deque tx_buf_; + + // Common state enum for all frame helpers + // Note: Not all states are used by all implementations + // - INITIALIZE: Used by both Noise and Plaintext + // - CLIENT_HELLO, SERVER_HELLO, HANDSHAKE: Only used by Noise protocol + // - DATA: Used by both Noise and Plaintext + // - CLOSED: Used by both Noise and Plaintext + // - FAILED: Used by both Noise and Plaintext + // - EXPLICIT_REJECT: Only used by Noise protocol + enum class State { + INITIALIZE = 1, + CLIENT_HELLO = 2, // Noise only + SERVER_HELLO = 3, // Noise only + HANDSHAKE = 4, // Noise only + DATA = 5, + CLOSED = 6, + FAILED = 7, + EXPLICIT_REJECT = 8, // Noise only + }; + + // Current state of the frame helper + State state_{State::INITIALIZE}; + + // Helper name for logging + std::string info_; + + // Socket for communication + socket::Socket *socket_{nullptr}; + std::unique_ptr socket_owned_; + // Common implementation for writing raw data to socket - template - APIError write_raw_(const struct iovec *iov, int iovcnt, socket::Socket *socket, std::vector &tx_buf, - const std::string &info, StateEnum &state, StateEnum failed_state); + APIError write_raw_(const struct iovec *iov, int iovcnt); + + // Try to send data from the tx buffer + APIError try_send_tx_buf_(); + + // Check if the socket would block based on return value + inline bool is_would_block(ssize_t ret) const { + if (ret == -1) { + return errno == EWOULDBLOCK || errno == EAGAIN; + } + return ret == 0; + } }; #ifdef USE_API_NOISE class APINoiseFrameHelper : public APIFrameHelper { public: APINoiseFrameHelper(std::unique_ptr socket, std::shared_ptr ctx) - : socket_(std::move(socket)), ctx_(std::move(std::move(ctx))) {} + : APIFrameHelper(std::move(socket)), ctx_(std::move(ctx)) {} ~APINoiseFrameHelper() override; APIError init() override; APIError loop() override; APIError read_packet(ReadPacketBuffer *buffer) override; - bool can_write_without_blocking() override; APIError write_packet(uint16_t type, const uint8_t *payload, size_t len) override; - std::string getpeername() override { return this->socket_->getpeername(); } - int getpeername(struct sockaddr *addr, socklen_t *addrlen) override { - return this->socket_->getpeername(addr, addrlen); - } - APIError close() override; - APIError shutdown(int how) override; - // Give this helper a name for logging - void set_log_info(std::string info) override { info_ = std::move(info); } protected: - struct ParsedFrame { - std::vector msg; - }; - APIError state_action_(); APIError try_read_frame_(ParsedFrame *frame); - APIError try_send_tx_buf_(); APIError write_frame_(const uint8_t *data, size_t len); - inline APIError write_raw_(const struct iovec *iov, int iovcnt) { - return APIFrameHelper::write_raw_(iov, iovcnt, socket_.get(), tx_buf_, info_, state_, State::FAILED); - } APIError init_handshake_(); APIError check_handshake_finished_(); void send_explicit_handshake_reject_(const std::string &reason); - - std::unique_ptr socket_; - - std::string info_; // Fixed-size header buffer for noise protocol: // 1 byte for indicator + 2 bytes for message size (16-bit value, not varint) // Note: Maximum message size is 65535, with a limit of 128 bytes during handshake phase @@ -127,7 +200,6 @@ class APINoiseFrameHelper : public APIFrameHelper { std::vector rx_buf_; size_t rx_buf_len_ = 0; - std::vector tx_buf_; std::vector prologue_; std::shared_ptr ctx_; @@ -135,53 +207,21 @@ class APINoiseFrameHelper : public APIFrameHelper { NoiseCipherState *send_cipher_{nullptr}; NoiseCipherState *recv_cipher_{nullptr}; NoiseProtocolId nid_; - - enum class State { - INITIALIZE = 1, - CLIENT_HELLO = 2, - SERVER_HELLO = 3, - HANDSHAKE = 4, - DATA = 5, - CLOSED = 6, - FAILED = 7, - EXPLICIT_REJECT = 8, - } state_ = State::INITIALIZE; }; #endif // USE_API_NOISE #ifdef USE_API_PLAINTEXT class APIPlaintextFrameHelper : public APIFrameHelper { public: - APIPlaintextFrameHelper(std::unique_ptr socket) : socket_(std::move(socket)) {} + APIPlaintextFrameHelper(std::unique_ptr socket) : APIFrameHelper(std::move(socket)) {} ~APIPlaintextFrameHelper() override = default; APIError init() override; APIError loop() override; APIError read_packet(ReadPacketBuffer *buffer) override; - bool can_write_without_blocking() override; APIError write_packet(uint16_t type, const uint8_t *payload, size_t len) override; - std::string getpeername() override { return this->socket_->getpeername(); } - int getpeername(struct sockaddr *addr, socklen_t *addrlen) override { - return this->socket_->getpeername(addr, addrlen); - } - APIError close() override; - APIError shutdown(int how) override; - // Give this helper a name for logging - void set_log_info(std::string info) override { info_ = std::move(info); } protected: - struct ParsedFrame { - std::vector msg; - }; - APIError try_read_frame_(ParsedFrame *frame); - APIError try_send_tx_buf_(); - inline APIError write_raw_(const struct iovec *iov, int iovcnt) { - return APIFrameHelper::write_raw_(iov, iovcnt, socket_.get(), tx_buf_, info_, state_, State::FAILED); - } - - std::unique_ptr socket_; - - std::string info_; // Fixed-size header buffer for plaintext protocol: // We only need space for the two varints since we validate the indicator byte separately. // To match noise protocol's maximum message size (65535), we need: @@ -198,15 +238,6 @@ class APIPlaintextFrameHelper : public APIFrameHelper { std::vector rx_buf_; size_t rx_buf_len_ = 0; - - std::vector tx_buf_; - - enum class State { - INITIALIZE = 1, - DATA = 2, - CLOSED = 3, - FAILED = 4, - } state_ = State::INITIALIZE; }; #endif From 7b4e7108c0c3d80a1d02d5d20be492568b13c3cf Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 01:45:24 -0500 Subject: [PATCH 5/8] cleanup --- esphome/components/api/api_frame_helper.cpp | 46 +++++++++++---------- esphome/components/api/api_frame_helper.h | 8 ---- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index 669be8052c..23eccecaef 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -91,12 +91,14 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) { // Try to send the remaining data in this buffer ssize_t sent = this->socket_->write(front_buffer.current_data(), front_buffer.remaining()); - if (this->is_would_block(sent)) { - // Socket would block, we'll try again later - } else if (sent == -1) { - ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); - this->state_ = State::FAILED; - return APIError::SOCKET_WRITE_FAILED; // Socket write failed + if (sent == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + // Socket would block, we'll try again later + } else { + ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); + this->state_ = State::FAILED; + return APIError::SOCKET_WRITE_FAILED; // Socket write failed + } } else if (sent > 0 && static_cast(sent) < front_buffer.remaining()) { // Partially sent, update offset front_buffer.offset += sent; @@ -129,19 +131,20 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) { // Try to send directly if no buffered data ssize_t sent = this->socket_->writev(iov, iovcnt); - if (this->is_would_block(sent)) { - // Socket would block, buffer the data - SendBuffer buffer; - buffer.data.reserve(total_write_len); + if (sent == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + // Socket would block, buffer the data + SendBuffer buffer; + buffer.data.reserve(total_write_len); - for (int i = 0; i < iovcnt; i++) { - const uint8_t *data = reinterpret_cast(iov[i].iov_base); - buffer.data.insert(buffer.data.end(), data, data + iov[i].iov_len); + for (int i = 0; i < iovcnt; i++) { + const uint8_t *data = reinterpret_cast(iov[i].iov_base); + buffer.data.insert(buffer.data.end(), data, data + iov[i].iov_len); + } + + this->tx_buf_.push_back(std::move(buffer)); + return APIError::OK; // Success, data buffered } - - this->tx_buf_.push_back(std::move(buffer)); - return APIError::OK; // Success, data buffered - } else if (sent == -1) { // Socket error ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); this->state_ = State::FAILED; @@ -184,10 +187,11 @@ APIError APIFrameHelper::try_send_tx_buf_() { // Try to send the remaining data in this buffer ssize_t sent = this->socket_->write(front_buffer.current_data(), front_buffer.remaining()); - if (this->is_would_block(sent)) { - // Socket would block, try again later - break; - } else if (sent == -1) { + if (sent == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + // Socket would block, try again later + break; + } // Socket error state_ = State::FAILED; ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h index c4e7072666..15d9d8664d 100644 --- a/esphome/components/api/api_frame_helper.h +++ b/esphome/components/api/api_frame_helper.h @@ -164,14 +164,6 @@ class APIFrameHelper { // Try to send data from the tx buffer APIError try_send_tx_buf_(); - - // Check if the socket would block based on return value - inline bool is_would_block(ssize_t ret) const { - if (ret == -1) { - return errno == EWOULDBLOCK || errno == EAGAIN; - } - return ret == 0; - } }; #ifdef USE_API_NOISE From 7b84eb290332358925b4c74d0ed40aa3146bcd5d Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 01:47:18 -0500 Subject: [PATCH 6/8] fixes --- esphome/components/api/api_frame_helper.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index 23eccecaef..2566d14794 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -94,6 +94,7 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) { if (sent == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { // Socket would block, we'll try again later + return APIError::OK; // Return early, buffer remains for next attempt } else { ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); this->state_ = State::FAILED; From 872a70d235d914bf6666d274b6378bf22612963c Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 01:48:43 -0500 Subject: [PATCH 7/8] fixes --- esphome/components/api/api_frame_helper.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index 2566d14794..23eccecaef 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -94,7 +94,6 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) { if (sent == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { // Socket would block, we'll try again later - return APIError::OK; // Return early, buffer remains for next attempt } else { ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); this->state_ = State::FAILED; From 31e3065600d6528f225e40bce3b209286eee1f6a Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 01:49:38 -0500 Subject: [PATCH 8/8] fixes --- esphome/components/api/api_frame_helper.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index 23eccecaef..90c35c3ec0 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -92,13 +92,13 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) { ssize_t sent = this->socket_->write(front_buffer.current_data(), front_buffer.remaining()); if (sent == -1) { - if (errno == EWOULDBLOCK || errno == EAGAIN) { - // Socket would block, we'll try again later - } else { + if (errno != EWOULDBLOCK && errno != EAGAIN) { + // Real socket error (not just would block) ESP_LOGVV(TAG, "%s: Socket write failed with errno %d", this->info_.c_str(), errno); this->state_ = State::FAILED; return APIError::SOCKET_WRITE_FAILED; // Socket write failed } + // Socket would block, we'll try again later and continue execution to append new data to the buffer } else if (sent > 0 && static_cast(sent) < front_buffer.remaining()) { // Partially sent, update offset front_buffer.offset += sent;