From e544f6711e0436a169ef78d7b39873168af1aff4 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 12:47:50 -0500 Subject: [PATCH 1/3] Bail quickly if there is no data to read --- esphome/components/api/api_frame_helper.cpp | 125 +++++++++++------- esphome/components/api/api_frame_helper.h | 6 +- .../components/socket/bsd_sockets_impl.cpp | 7 + .../components/socket/lwip_raw_tcp_impl.cpp | 16 +++ .../components/socket/lwip_sockets_impl.cpp | 7 + esphome/components/socket/socket.h | 1 + 6 files changed, 113 insertions(+), 49 deletions(-) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index f251ceb6e4..33b72acfe7 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -267,6 +267,18 @@ APIError APINoiseFrameHelper::try_read_frame_(ParsedFrame *frame) { return APIError::BAD_ARG; } + // Only check for available data when starting a new frame read + if (rx_header_buf_len_ == 0) { + ssize_t available = socket_->available(); + if (available == 0) { + return APIError::WOULD_BLOCK; + } else if (available == -1) { + state_ = State::FAILED; + HELPER_LOG("Socket available failed with errno %d", errno); + return APIError::SOCKET_READ_FAILED; + } + } + // read header if (rx_header_buf_len_ < 3) { // no header information yet @@ -827,64 +839,84 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { return APIError::BAD_ARG; } + // Only check for available data when starting a new frame read + if (rx_header_buf_pos_ == 0) { + ssize_t available = socket_->available(); + if (available == 0) { + return APIError::WOULD_BLOCK; + } else if (available == -1) { + state_ = State::FAILED; + HELPER_LOG("Socket read_available failed with errno %d", errno); + return APIError::SOCKET_READ_FAILED; + } + } + // read header while (!rx_header_parsed_) { - uint8_t data; - // Reading one byte at a time is fastest in practice for ESP32 when - // there is no data on the wire (which is the common case). - // This results in faster failure detection compared to - // attempting to read multiple bytes at once. - ssize_t received = socket_->read(&data, 1); - if (received == -1) { - if (errno == EWOULDBLOCK || errno == EAGAIN) { - return APIError::WOULD_BLOCK; - } - state_ = State::FAILED; - HELPER_LOG("Socket read failed with errno %d", errno); - return APIError::SOCKET_READ_FAILED; - } else if (received == 0) { - state_ = State::FAILED; - HELPER_LOG("Connection closed"); - return APIError::CONNECTION_CLOSED; - } - - // Successfully read a byte - - // Process byte according to current buffer position - if (rx_header_buf_pos_ == 0) { // Case 1: First byte (indicator byte) - if (data != 0x00) { + if (rx_header_buf_pos_ == 0) { + // Try to read the first 3 bytes at once (indicator + 2 initial bytes) + // We can safely read 3 bytes because the minimum header is indicator + 2 varint bytes + ssize_t received = socket_->read(rx_header_buf_, 3); + if (received == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + return APIError::WOULD_BLOCK; + } state_ = State::FAILED; - HELPER_LOG("Bad indicator byte %u", data); + HELPER_LOG("Socket read failed with errno %d", errno); + return APIError::SOCKET_READ_FAILED; + } else if (received == 0) { + state_ = State::FAILED; + HELPER_LOG("Connection closed"); + return APIError::CONNECTION_CLOSED; + } + + // Validate indicator byte + if (rx_header_buf_[0] != 0x00) { + state_ = State::FAILED; + HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]); return APIError::BAD_INDICATOR; } - // We don't store the indicator byte, just increment position - rx_header_buf_pos_ = 1; // Set to 1 directly - continue; // Need more bytes before we can parse - } - // Check buffer overflow before storing - if (rx_header_buf_pos_ == 5) { // Case 2: Buffer would overflow (5 bytes is max allowed) - state_ = State::FAILED; - HELPER_LOG("Header buffer overflow"); - return APIError::BAD_DATA_PACKET; - } + // Update our position based on how many bytes we got + rx_header_buf_pos_ = received; - // Store byte in buffer (adjust index to account for skipped indicator byte) - rx_header_buf_[rx_header_buf_pos_ - 1] = data; + // If we didn't get all 3 bytes, need more + if (rx_header_buf_pos_ < 3) + continue; + } else { + // For additional bytes (beyond the first 3), read one at a time + // Check buffer overflow before reading + if (rx_header_buf_pos_ >= 6) { // 6 bytes is max allowed (indicator + 5 varint bytes) + state_ = State::FAILED; + HELPER_LOG("Header buffer overflow"); + return APIError::BAD_DATA_PACKET; + } - // Increment position after storing - rx_header_buf_pos_++; + // Read one byte at a time to avoid reading into message body + ssize_t received = socket_->read(&rx_header_buf_[rx_header_buf_pos_], 1); + if (received == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + return APIError::WOULD_BLOCK; + } + state_ = State::FAILED; + HELPER_LOG("Socket read failed with errno %d", errno); + return APIError::SOCKET_READ_FAILED; + } else if (received == 0) { + state_ = State::FAILED; + HELPER_LOG("Connection closed"); + return APIError::CONNECTION_CLOSED; + } - // Case 3: If we only have one varint byte, we need more - if (rx_header_buf_pos_ == 2) { // Have read indicator + 1 byte - continue; // Need more bytes before we can parse + // Increment position + rx_header_buf_pos_++; } // At this point, we have at least 3 bytes total: - // - Validated indicator byte (0x00) but not stored + // - Validated indicator byte (0x00) in the first position // - At least 2 bytes in the buffer for the varints // Buffer layout: - // First 1-3 bytes: Message size varint (variable length) + // Byte 0: Indicator byte (0x00) + // Bytes 1-3: Message size varint (variable length) // - 2 bytes would only allow up to 16383, which is less than noise's 65535 // - 3 bytes allows up to 2097151, ensuring we support at least as much as noise // Remaining 1-2 bytes: Message type varint (variable length) @@ -892,7 +924,7 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { // we'll continue reading more bytes. uint32_t consumed = 0; - auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[0], rx_header_buf_pos_ - 1, &consumed); + auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[1], rx_header_buf_pos_ - 1, &consumed); if (!msg_size_varint.has_value()) { // not enough data there yet continue; @@ -900,7 +932,8 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { rx_header_parsed_len_ = msg_size_varint->as_uint32(); - auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[consumed], rx_header_buf_pos_ - 1 - consumed, &consumed); + auto msg_type_varint = + ProtoVarInt::parse(&rx_header_buf_[1 + consumed], rx_header_buf_pos_ - 1 - consumed, &consumed); if (!msg_type_varint.has_value()) { // not enough data there yet continue; diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h index db506ea1ce..0c8fa6fda6 100644 --- a/esphome/components/api/api_frame_helper.h +++ b/esphome/components/api/api_frame_helper.h @@ -183,14 +183,14 @@ class APIPlaintextFrameHelper : public APIFrameHelper { std::string info_; // Fixed-size header buffer for plaintext protocol: - // We only need space for the two varints since we validate the indicator byte separately. + // We need space for the indicator byte and the two varints. // To match noise protocol's maximum message size (65535), we need: - // 3 bytes for message size varint (supports up to 2097151) + 2 bytes for message type varint + // 1 byte for indicator + 3 bytes for message size varint (supports up to 2097151) + 2 bytes for message type varint // // While varints could theoretically be up to 10 bytes each for 64-bit values, // attempting to process messages with headers that large would likely crash the // ESP32 due to memory constraints. - uint8_t rx_header_buf_[5]; // 5 bytes for varints (3 for size + 2 for type) + uint8_t rx_header_buf_[6]; // 1 byte for indicator + 5 bytes for varints (3 for size + 2 for type) uint8_t rx_header_buf_pos_ = 0; bool rx_header_parsed_ = false; uint32_t rx_header_parsed_type_ = 0; diff --git a/esphome/components/socket/bsd_sockets_impl.cpp b/esphome/components/socket/bsd_sockets_impl.cpp index 1b3916fcab..730f2e1b18 100644 --- a/esphome/components/socket/bsd_sockets_impl.cpp +++ b/esphome/components/socket/bsd_sockets_impl.cpp @@ -101,6 +101,13 @@ class BSDSocketImpl : public Socket { return ::readv(fd_, iov, iovcnt); #endif } + ssize_t available() override { + int bytes_available = 0; + int ret = ::ioctl(fd_, FIONREAD, &bytes_available); + if (ret == -1) + return -1; + return bytes_available; + } ssize_t write(const void *buf, size_t len) override { return ::write(fd_, buf, len); } ssize_t send(void *buf, size_t len, int flags) { return ::send(fd_, buf, len, flags); } ssize_t writev(const struct iovec *iov, int iovcnt) override { diff --git a/esphome/components/socket/lwip_raw_tcp_impl.cpp b/esphome/components/socket/lwip_raw_tcp_impl.cpp index 1d998902ff..cfa7aec784 100644 --- a/esphome/components/socket/lwip_raw_tcp_impl.cpp +++ b/esphome/components/socket/lwip_raw_tcp_impl.cpp @@ -380,6 +380,22 @@ class LWIPRawImpl : public Socket { } return ret; } + ssize_t available() override { + if (pcb_ == nullptr) { + errno = ECONNRESET; + return -1; + } + + // Check if we have data in the receive buffer + if (rx_buf_ != nullptr) { + size_t pb_len = rx_buf_->len; + size_t pb_left = pb_len - rx_buf_offset_; + return pb_left; + } + + // No data in buffer + return 0; + } ssize_t internal_write(const void *buf, size_t len) { if (pcb_ == nullptr) { errno = ECONNRESET; diff --git a/esphome/components/socket/lwip_sockets_impl.cpp b/esphome/components/socket/lwip_sockets_impl.cpp index c41e42fc83..ac40b2a4b4 100644 --- a/esphome/components/socket/lwip_sockets_impl.cpp +++ b/esphome/components/socket/lwip_sockets_impl.cpp @@ -81,6 +81,13 @@ class LwIPSocketImpl : public Socket { int listen(int backlog) override { return lwip_listen(fd_, backlog); } ssize_t read(void *buf, size_t len) override { return lwip_read(fd_, buf, len); } ssize_t readv(const struct iovec *iov, int iovcnt) override { return lwip_readv(fd_, iov, iovcnt); } + ssize_t available() override { + int bytes_available = 0; + int ret = lwip_ioctl(fd_, FIONREAD, &bytes_available); + if (ret == -1) + return -1; + return bytes_available; + } ssize_t write(const void *buf, size_t len) override { return lwip_write(fd_, buf, len); } ssize_t send(void *buf, size_t len, int flags) { return lwip_send(fd_, buf, len, flags); } ssize_t writev(const struct iovec *iov, int iovcnt) override { return lwip_writev(fd_, iov, iovcnt); } diff --git a/esphome/components/socket/socket.h b/esphome/components/socket/socket.h index 917f3c4c7f..fd44fa3ee8 100644 --- a/esphome/components/socket/socket.h +++ b/esphome/components/socket/socket.h @@ -38,6 +38,7 @@ class Socket { virtual ssize_t recvfrom(void *buf, size_t len, sockaddr *addr, socklen_t *addr_len) = 0; #endif virtual ssize_t readv(const struct iovec *iov, int iovcnt) = 0; + virtual ssize_t available() = 0; // Returns number of bytes available to read without blocking, or -1 on error virtual ssize_t write(const void *buf, size_t len) = 0; virtual ssize_t writev(const struct iovec *iov, int iovcnt) = 0; virtual ssize_t sendto(const void *buf, size_t len, int flags, const struct sockaddr *to, socklen_t tolen) = 0; From 7a364ff63a9e4e99215f9dcb2b26e3de29543693 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 12:52:25 -0500 Subject: [PATCH 2/3] cleanup --- esphome/components/api/api_frame_helper.cpp | 22 ++++----------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index 33b72acfe7..98dd6326f0 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -268,15 +268,8 @@ APIError APINoiseFrameHelper::try_read_frame_(ParsedFrame *frame) { } // Only check for available data when starting a new frame read - if (rx_header_buf_len_ == 0) { - ssize_t available = socket_->available(); - if (available == 0) { - return APIError::WOULD_BLOCK; - } else if (available == -1) { - state_ = State::FAILED; - HELPER_LOG("Socket available failed with errno %d", errno); - return APIError::SOCKET_READ_FAILED; - } + if (rx_header_buf_len_ == 0 && socket_->available() == 0) { + return APIError::WOULD_BLOCK; } // read header @@ -840,15 +833,8 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { } // Only check for available data when starting a new frame read - if (rx_header_buf_pos_ == 0) { - ssize_t available = socket_->available(); - if (available == 0) { - return APIError::WOULD_BLOCK; - } else if (available == -1) { - state_ = State::FAILED; - HELPER_LOG("Socket read_available failed with errno %d", errno); - return APIError::SOCKET_READ_FAILED; - } + if (rx_header_buf_pos_ == 0 && socket_->available() == 0) { + return APIError::WOULD_BLOCK; } // read header From 2201f670451605d94eb48a676bec847f331ee3d0 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 15 May 2025 13:26:59 -0500 Subject: [PATCH 3/3] FIONREAD --- esphome/components/esp32/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/esphome/components/esp32/__init__.py b/esphome/components/esp32/__init__.py index 12d0f9fcd5..72f4241f9d 100644 --- a/esphome/components/esp32/__init__.py +++ b/esphome/components/esp32/__init__.py @@ -681,6 +681,10 @@ async def to_code(config): add_idf_sdkconfig_option("CONFIG_ESP_TASK_WDT_CHECK_IDLE_TASK_CPU0", False) add_idf_sdkconfig_option("CONFIG_ESP_TASK_WDT_CHECK_IDLE_TASK_CPU1", False) + # Enable socket receive buffer option for FIONREAD support + # Already enabled on Arduino framework by default + add_idf_sdkconfig_option("CONFIG_LWIP_SO_RCVBUF", True) + # Set default CPU frequency add_idf_sdkconfig_option(f"CONFIG_ESP_DEFAULT_CPU_FREQ_MHZ_{freq}", True)