From 7cc2d059bc3d292a3dc0977b959fddcca7bee7d9 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 5 Jun 2025 23:55:08 +0100 Subject: [PATCH] remove batch --- esphome/components/api/api_connection.cpp | 60 +++++++++-------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index c7ae65d532..37edabb169 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -1472,16 +1472,16 @@ void APIConnection::process_batch_() { return; } - // Track packet information (type, offset, length) + // Calculate the total size needed and build packet info std::vector> packet_info; - size_t total_size = 0; - size_t processed_count = 0; + packet_info.reserve(this->deferred_batch_.items.size()); - // Calculate the total size needed for all messages that will fit uint32_t total_buffer_size = 0; size_t messages_to_process = 0; + uint32_t current_offset = 0; // Track offset for each packet - for (const auto &item : this->deferred_batch_.items) { + for (size_t i = 0; i < this->deferred_batch_.items.size(); i++) { + const auto &item = this->deferred_batch_.items[i]; uint16_t message_type = item.message->get_message_type(); uint16_t overhead = this->helper_->calculate_packet_overhead(message_type, item.message_size); uint32_t packet_size = item.message_size + overhead; @@ -1491,53 +1491,38 @@ void APIConnection::process_batch_() { break; // This and remaining messages won't fit } + // Add to packet info (type, offset where packet starts, message length) + // For the first message, offset is 0; for subsequent messages, it's after previous message + padding + packet_info.push_back(std::make_tuple(message_type, current_offset, item.message_size)); + + // Update offset for next message: add header padding for next message if not last + if (i < this->deferred_batch_.items.size() - 1) { + current_offset += item.message_size + this->helper_->frame_header_padding(); + } + total_buffer_size += packet_size; messages_to_process++; } ProtoWriteBuffer batch_buffer = this->create_buffer(total_buffer_size); + // Encode all messages for (size_t i = 0; i < messages_to_process; i++) { const auto &item = this->deferred_batch_.items[i]; - // Save current buffer position before extending - uint32_t msg_offset = 0; - uint32_t msg_content_start = 0; - // For messages after the first, extend the buffer with padding - if (processed_count > 0) { - msg_offset = static_cast(this->proto_write_buffer_.size()); + if (i > 0) { batch_buffer = this->extend_buffer(); - // After extend_buffer(), the actual message content starts after the padding - msg_content_start = static_cast(this->proto_write_buffer_.size()); - } else { - // For the first message, content starts after the header padding - msg_content_start = this->helper_->frame_header_padding(); } - // Get the message type before encoding - uint16_t message_type = item.message->get_message_type(); - // Encode the message directly from the stored ProtoMessage bool success = item.message->encode(batch_buffer); if (!success) { - // Encoding failed, revert buffer to previous size - this->proto_write_buffer_.resize(msg_offset); - continue; + ESP_LOGW(TAG, "Failed to encode message type %u", item.message->get_message_type()); + // Since we pre-calculated sizes, encoding should not fail + // If it does, we have a serious issue } - - // Calculate message length - from content start to current buffer end - uint16_t msg_len = static_cast(this->proto_write_buffer_.size() - msg_content_start); - - // Record packet info - packet_info.push_back(std::make_tuple(message_type, msg_offset, msg_len)); - processed_count++; - - // Calculate actual packet size including protocol overhead - uint16_t packet_overhead = this->helper_->calculate_packet_overhead(message_type, msg_len); - uint16_t packet_size = msg_len + packet_overhead; - total_size += packet_size; } // Send all collected packets @@ -1547,7 +1532,8 @@ void APIConnection::process_batch_() { this->proto_write_buffer_.resize(this->proto_write_buffer_.size() + this->helper_->frame_footer_size()); } - ESP_LOGD(TAG, "Sending batch: %zu messages, %zu total bytes", processed_count, this->proto_write_buffer_.size()); + ESP_LOGD(TAG, "Sending batch: %zu messages, %zu total bytes", messages_to_process, + this->proto_write_buffer_.size()); APIError err = this->helper_->write_protobuf_packets(batch_buffer, packet_info); if (err != APIError::OK && err != APIError::WOULD_BLOCK) { @@ -1562,10 +1548,10 @@ void APIConnection::process_batch_() { } // Remove processed items from the batch - if (processed_count < this->deferred_batch_.items.size()) { + if (messages_to_process < this->deferred_batch_.items.size()) { // Some items weren't processed, keep them for next batch this->deferred_batch_.items.erase(this->deferred_batch_.items.begin(), - this->deferred_batch_.items.begin() + processed_count); + this->deferred_batch_.items.begin() + messages_to_process); // Reschedule for remaining items this->schedule_batch_(); } else {