remove batch
This commit is contained in:
@@ -1472,16 +1472,16 @@ void APIConnection::process_batch_() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Track packet information (type, offset, length)
|
||||
// Calculate the total size needed and build packet info
|
||||
std::vector<std::tuple<uint16_t, uint32_t, uint16_t>> packet_info;
|
||||
size_t total_size = 0;
|
||||
size_t processed_count = 0;
|
||||
packet_info.reserve(this->deferred_batch_.items.size());
|
||||
|
||||
// Calculate the total size needed for all messages that will fit
|
||||
uint32_t total_buffer_size = 0;
|
||||
size_t messages_to_process = 0;
|
||||
uint32_t current_offset = 0; // Track offset for each packet
|
||||
|
||||
for (const auto &item : this->deferred_batch_.items) {
|
||||
for (size_t i = 0; i < this->deferred_batch_.items.size(); i++) {
|
||||
const auto &item = this->deferred_batch_.items[i];
|
||||
uint16_t message_type = item.message->get_message_type();
|
||||
uint16_t overhead = this->helper_->calculate_packet_overhead(message_type, item.message_size);
|
||||
uint32_t packet_size = item.message_size + overhead;
|
||||
@@ -1491,53 +1491,38 @@ void APIConnection::process_batch_() {
|
||||
break; // This and remaining messages won't fit
|
||||
}
|
||||
|
||||
// Add to packet info (type, offset where packet starts, message length)
|
||||
// For the first message, offset is 0; for subsequent messages, it's after previous message + padding
|
||||
packet_info.push_back(std::make_tuple(message_type, current_offset, item.message_size));
|
||||
|
||||
// Update offset for next message: add header padding for next message if not last
|
||||
if (i < this->deferred_batch_.items.size() - 1) {
|
||||
current_offset += item.message_size + this->helper_->frame_header_padding();
|
||||
}
|
||||
|
||||
total_buffer_size += packet_size;
|
||||
messages_to_process++;
|
||||
}
|
||||
|
||||
ProtoWriteBuffer batch_buffer = this->create_buffer(total_buffer_size);
|
||||
|
||||
// Encode all messages
|
||||
for (size_t i = 0; i < messages_to_process; i++) {
|
||||
const auto &item = this->deferred_batch_.items[i];
|
||||
|
||||
// Save current buffer position before extending
|
||||
uint32_t msg_offset = 0;
|
||||
uint32_t msg_content_start = 0;
|
||||
|
||||
// For messages after the first, extend the buffer with padding
|
||||
if (processed_count > 0) {
|
||||
msg_offset = static_cast<uint32_t>(this->proto_write_buffer_.size());
|
||||
if (i > 0) {
|
||||
batch_buffer = this->extend_buffer();
|
||||
// After extend_buffer(), the actual message content starts after the padding
|
||||
msg_content_start = static_cast<uint32_t>(this->proto_write_buffer_.size());
|
||||
} else {
|
||||
// For the first message, content starts after the header padding
|
||||
msg_content_start = this->helper_->frame_header_padding();
|
||||
}
|
||||
|
||||
// Get the message type before encoding
|
||||
uint16_t message_type = item.message->get_message_type();
|
||||
|
||||
// Encode the message directly from the stored ProtoMessage
|
||||
bool success = item.message->encode(batch_buffer);
|
||||
|
||||
if (!success) {
|
||||
// Encoding failed, revert buffer to previous size
|
||||
this->proto_write_buffer_.resize(msg_offset);
|
||||
continue;
|
||||
ESP_LOGW(TAG, "Failed to encode message type %u", item.message->get_message_type());
|
||||
// Since we pre-calculated sizes, encoding should not fail
|
||||
// If it does, we have a serious issue
|
||||
}
|
||||
|
||||
// Calculate message length - from content start to current buffer end
|
||||
uint16_t msg_len = static_cast<uint16_t>(this->proto_write_buffer_.size() - msg_content_start);
|
||||
|
||||
// Record packet info
|
||||
packet_info.push_back(std::make_tuple(message_type, msg_offset, msg_len));
|
||||
processed_count++;
|
||||
|
||||
// Calculate actual packet size including protocol overhead
|
||||
uint16_t packet_overhead = this->helper_->calculate_packet_overhead(message_type, msg_len);
|
||||
uint16_t packet_size = msg_len + packet_overhead;
|
||||
total_size += packet_size;
|
||||
}
|
||||
|
||||
// Send all collected packets
|
||||
@@ -1547,7 +1532,8 @@ void APIConnection::process_batch_() {
|
||||
this->proto_write_buffer_.resize(this->proto_write_buffer_.size() + this->helper_->frame_footer_size());
|
||||
}
|
||||
|
||||
ESP_LOGD(TAG, "Sending batch: %zu messages, %zu total bytes", processed_count, this->proto_write_buffer_.size());
|
||||
ESP_LOGD(TAG, "Sending batch: %zu messages, %zu total bytes", messages_to_process,
|
||||
this->proto_write_buffer_.size());
|
||||
|
||||
APIError err = this->helper_->write_protobuf_packets(batch_buffer, packet_info);
|
||||
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
|
||||
@@ -1562,10 +1548,10 @@ void APIConnection::process_batch_() {
|
||||
}
|
||||
|
||||
// Remove processed items from the batch
|
||||
if (processed_count < this->deferred_batch_.items.size()) {
|
||||
if (messages_to_process < this->deferred_batch_.items.size()) {
|
||||
// Some items weren't processed, keep them for next batch
|
||||
this->deferred_batch_.items.erase(this->deferred_batch_.items.begin(),
|
||||
this->deferred_batch_.items.begin() + processed_count);
|
||||
this->deferred_batch_.items.begin() + messages_to_process);
|
||||
// Reschedule for remaining items
|
||||
this->schedule_batch_();
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user