diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index 3ff84d1f4e..697f447e4b 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -1687,108 +1687,104 @@ void APIConnection::process_batch_() { return; } - // First pass: create messages and calculate sizes - struct MessageInfo { + // Single pass: create messages, calculate sizes and check what fits + struct MessageData { std::unique_ptr message; + uint16_t message_type; uint16_t message_size; }; - std::vector messages; - messages.reserve(this->deferred_batch_.items.size()); - for (const auto &item : this->deferred_batch_.items) { - // Call the creator function with the entity to generate the message + // Use a small vector optimization - most batches are small + std::vector messages; + messages.reserve(std::min(size_t(8), this->deferred_batch_.items.size())); + + std::vector> packet_info; + packet_info.reserve(messages.capacity()); + + uint32_t total_buffer_size = 0; + uint32_t current_offset = 0; + + // Single pass: create messages and calculate what fits + for (size_t i = 0; i < this->deferred_batch_.items.size(); i++) { + const auto &item = this->deferred_batch_.items[i]; + + // Create message once auto message = item.creator(item.entity); if (!message) { continue; // Skip if creator returned nullptr } - // Calculate message size + // Calculate size and overhead uint32_t size = 0; message->calculate_size(size); uint16_t message_size = static_cast(size); + uint16_t message_type = message->get_message_type(); + uint16_t overhead = this->helper_->calculate_packet_overhead(message_type, message_size); + uint32_t packet_size = message_size + overhead; - messages.push_back({std::move(message), message_size}); + // Check if adding this message would exceed our limit + if (!messages.empty() && total_buffer_size + packet_size > MAX_BATCH_SIZE_BYTES) { + break; // This and remaining messages won't fit + } + + // Store message and packet info + messages.push_back({std::move(message), message_type, message_size}); + packet_info.push_back(std::make_tuple(message_type, current_offset, message_size)); + + // Update offset for next message + if (i < this->deferred_batch_.items.size() - 1) { + current_offset += message_size + this->helper_->frame_header_padding(); + } + + total_buffer_size += packet_size; } - if (messages.empty()) { + size_t items_processed = messages.size(); + if (items_processed == 0) { this->deferred_batch_.clear(); return; } - // Calculate the total size needed and build packet info - std::vector> packet_info; - packet_info.reserve(messages.size()); - - uint32_t total_buffer_size = 0; - size_t messages_to_process = 0; - uint32_t current_offset = 0; // Track offset for each packet - - for (size_t i = 0; i < messages.size(); i++) { - const auto &msg_info = messages[i]; - uint16_t message_type = msg_info.message->get_message_type(); - uint16_t overhead = this->helper_->calculate_packet_overhead(message_type, msg_info.message_size); - uint32_t packet_size = msg_info.message_size + overhead; - - // Check if adding this message would exceed our limit - if (messages_to_process > 0 && total_buffer_size + packet_size > MAX_BATCH_SIZE_BYTES) { - break; // This and remaining messages won't fit - } - - // Add to packet info (type, offset where packet starts, message length) - // For the first message, offset is 0; for subsequent messages, it's after previous message + padding - packet_info.push_back(std::make_tuple(message_type, current_offset, msg_info.message_size)); - - // Update offset for next message: add header padding for next message if not last - if (i < messages.size() - 1) { - current_offset += msg_info.message_size + this->helper_->frame_header_padding(); - } - - total_buffer_size += packet_size; - messages_to_process++; - } - + // Create buffer with calculated size ProtoWriteBuffer batch_buffer = this->create_buffer(total_buffer_size); // Encode all messages - for (size_t i = 0; i < messages_to_process; i++) { - const auto &msg_info = messages[i]; - + for (size_t i = 0; i < messages.size(); i++) { // For messages after the first, extend the buffer with padding if (i > 0) { batch_buffer = this->extend_buffer(); } // Encode the message - msg_info.message->encode(batch_buffer); + messages[i].message->encode(batch_buffer); } + // Add final footer space for Noise if needed + if (this->helper_->frame_footer_size() > 0) { + this->proto_write_buffer_.resize(this->proto_write_buffer_.size() + this->helper_->frame_footer_size()); + } + + ESP_LOGD(TAG, "Sending batch: %zu messages, %zu total bytes", items_processed, this->proto_write_buffer_.size()); + // Send all collected packets - if (!packet_info.empty()) { - // Add final footer space for Noise if needed - if (this->helper_->frame_footer_size() > 0) { - this->proto_write_buffer_.resize(this->proto_write_buffer_.size() + this->helper_->frame_footer_size()); - } - - ESP_LOGD(TAG, "Sending batch: %zu messages, %zu total bytes", messages_to_process, - this->proto_write_buffer_.size()); - - APIError err = this->helper_->write_protobuf_packets(batch_buffer, packet_info); - if (err != APIError::OK && err != APIError::WOULD_BLOCK) { - on_fatal_error(); - if (err == APIError::SOCKET_WRITE_FAILED && errno == ECONNRESET) { - ESP_LOGW(TAG, "%s: Connection reset during batch write", this->client_combined_info_.c_str()); - } else { - ESP_LOGW(TAG, "%s: Batch write failed %s errno=%d", this->client_combined_info_.c_str(), api_error_to_str(err), - errno); - } + APIError err = this->helper_->write_protobuf_packets(batch_buffer, packet_info); + if (err != APIError::OK && err != APIError::WOULD_BLOCK) { + on_fatal_error(); + if (err == APIError::SOCKET_WRITE_FAILED && errno == ECONNRESET) { + ESP_LOGW(TAG, "%s: Connection reset during batch write", this->client_combined_info_.c_str()); + } else { + ESP_LOGW(TAG, "%s: Batch write failed %s errno=%d", this->client_combined_info_.c_str(), api_error_to_str(err), + errno); } } - // Remove processed items from the batch - if (messages_to_process < this->deferred_batch_.items.size()) { - // Some items weren't processed, keep them for next batch - this->deferred_batch_.items.erase(this->deferred_batch_.items.begin(), - this->deferred_batch_.items.begin() + messages_to_process); + // Handle remaining items more efficiently + if (items_processed < this->deferred_batch_.items.size()) { + // Move unprocessed items to the beginning using std::move + std::move(this->deferred_batch_.items.begin() + items_processed, this->deferred_batch_.items.end(), + this->deferred_batch_.items.begin()); + this->deferred_batch_.items.resize(this->deferred_batch_.items.size() - items_processed); + // Reschedule for remaining items this->schedule_batch_(); } else {