remove batch
This commit is contained in:
@@ -1476,8 +1476,13 @@ void APIConnection::on_fatal_error() {
|
||||
}
|
||||
|
||||
void APIConnection::DeferredBatch::add_item(std::unique_ptr<ProtoMessage> message) {
|
||||
// Calculate message size
|
||||
uint32_t size = 0;
|
||||
message->calculate_size(size);
|
||||
uint16_t message_size = static_cast<uint16_t>(size);
|
||||
|
||||
// Add new item without deduplication
|
||||
items.push_back({std::move(message), App.get_loop_component_start_time()});
|
||||
items.push_back({std::move(message), message_size, App.get_loop_component_start_time()});
|
||||
}
|
||||
|
||||
void APIConnection::schedule_batch_() {
|
||||
@@ -1506,28 +1511,29 @@ void APIConnection::process_batch_() {
|
||||
size_t total_size = 0;
|
||||
size_t processed_count = 0;
|
||||
|
||||
// Conservative estimate for minimum packet size: 6 byte header + 100 bytes minimum message + footer
|
||||
const uint16_t min_next_packet_size = 106 + this->helper_->frame_footer_size();
|
||||
// Calculate the total size needed for all messages that will fit
|
||||
uint32_t total_buffer_size = 0;
|
||||
size_t messages_to_process = 0;
|
||||
|
||||
// Create initial buffer with estimated size based on number of items
|
||||
// Use min of (estimated total size, MAX_BATCH_SIZE_BYTES)
|
||||
uint32_t estimated_size = std::min(static_cast<uint32_t>(min_next_packet_size * this->deferred_batch_.items.size()),
|
||||
static_cast<uint32_t>(MAX_BATCH_SIZE_BYTES));
|
||||
ProtoWriteBuffer batch_buffer = this->create_buffer(estimated_size);
|
||||
for (const auto &item : this->deferred_batch_.items) {
|
||||
uint16_t message_type = item.message->get_message_type();
|
||||
uint16_t overhead = this->helper_->calculate_packet_overhead(message_type, item.message_size);
|
||||
uint32_t packet_size = item.message_size + overhead;
|
||||
|
||||
for (size_t i = 0; i < this->deferred_batch_.items.size(); i++) {
|
||||
const auto &item = this->deferred_batch_.items[i];
|
||||
|
||||
// For the first message, check if we have enough space for at least one message
|
||||
// Use conservative estimates: max header (6 bytes) + some payload + footer
|
||||
if (processed_count == 0) {
|
||||
// Always try to send at least one message
|
||||
} else if (total_size + min_next_packet_size > MAX_BATCH_SIZE_BYTES) {
|
||||
// For subsequent messages, check if we have reasonable space left
|
||||
// Probably won't fit, stop here
|
||||
break;
|
||||
// Check if adding this message would exceed our limit
|
||||
if (messages_to_process > 0 && total_buffer_size + packet_size > MAX_BATCH_SIZE_BYTES) {
|
||||
break; // This and remaining messages won't fit
|
||||
}
|
||||
|
||||
total_buffer_size += packet_size;
|
||||
messages_to_process++;
|
||||
}
|
||||
|
||||
ProtoWriteBuffer batch_buffer = this->create_buffer(total_buffer_size);
|
||||
|
||||
for (size_t i = 0; i < messages_to_process; i++) {
|
||||
const auto &item = this->deferred_batch_.items[i];
|
||||
|
||||
// Save current buffer position before extending
|
||||
uint32_t msg_offset = 0;
|
||||
uint32_t msg_content_start = 0;
|
||||
|
||||
Reference in New Issue
Block a user