remove batch

This commit is contained in:
J. Nick Koston
2025-06-06 01:34:44 +01:00
parent e01e8302f3
commit ce98cec9e7

View File

@@ -1687,108 +1687,104 @@ void APIConnection::process_batch_() {
return;
}
// First pass: create messages and calculate sizes
struct MessageInfo {
// Single pass: create messages, calculate sizes and check what fits
struct MessageData {
std::unique_ptr<ProtoMessage> message;
uint16_t message_type;
uint16_t message_size;
};
std::vector<MessageInfo> messages;
messages.reserve(this->deferred_batch_.items.size());
for (const auto &item : this->deferred_batch_.items) {
// Call the creator function with the entity to generate the message
// Use a small vector optimization - most batches are small
std::vector<MessageData> messages;
messages.reserve(std::min(size_t(8), this->deferred_batch_.items.size()));
std::vector<std::tuple<uint16_t, uint32_t, uint16_t>> packet_info;
packet_info.reserve(messages.capacity());
uint32_t total_buffer_size = 0;
uint32_t current_offset = 0;
// Single pass: create messages and calculate what fits
for (size_t i = 0; i < this->deferred_batch_.items.size(); i++) {
const auto &item = this->deferred_batch_.items[i];
// Create message once
auto message = item.creator(item.entity);
if (!message) {
continue; // Skip if creator returned nullptr
}
// Calculate message size
// Calculate size and overhead
uint32_t size = 0;
message->calculate_size(size);
uint16_t message_size = static_cast<uint16_t>(size);
uint16_t message_type = message->get_message_type();
uint16_t overhead = this->helper_->calculate_packet_overhead(message_type, message_size);
uint32_t packet_size = message_size + overhead;
messages.push_back({std::move(message), message_size});
// Check if adding this message would exceed our limit
if (!messages.empty() && total_buffer_size + packet_size > MAX_BATCH_SIZE_BYTES) {
break; // This and remaining messages won't fit
}
// Store message and packet info
messages.push_back({std::move(message), message_type, message_size});
packet_info.push_back(std::make_tuple(message_type, current_offset, message_size));
// Update offset for next message
if (i < this->deferred_batch_.items.size() - 1) {
current_offset += message_size + this->helper_->frame_header_padding();
}
total_buffer_size += packet_size;
}
if (messages.empty()) {
size_t items_processed = messages.size();
if (items_processed == 0) {
this->deferred_batch_.clear();
return;
}
// Calculate the total size needed and build packet info
std::vector<std::tuple<uint16_t, uint32_t, uint16_t>> packet_info;
packet_info.reserve(messages.size());
uint32_t total_buffer_size = 0;
size_t messages_to_process = 0;
uint32_t current_offset = 0; // Track offset for each packet
for (size_t i = 0; i < messages.size(); i++) {
const auto &msg_info = messages[i];
uint16_t message_type = msg_info.message->get_message_type();
uint16_t overhead = this->helper_->calculate_packet_overhead(message_type, msg_info.message_size);
uint32_t packet_size = msg_info.message_size + overhead;
// Check if adding this message would exceed our limit
if (messages_to_process > 0 && total_buffer_size + packet_size > MAX_BATCH_SIZE_BYTES) {
break; // This and remaining messages won't fit
}
// Add to packet info (type, offset where packet starts, message length)
// For the first message, offset is 0; for subsequent messages, it's after previous message + padding
packet_info.push_back(std::make_tuple(message_type, current_offset, msg_info.message_size));
// Update offset for next message: add header padding for next message if not last
if (i < messages.size() - 1) {
current_offset += msg_info.message_size + this->helper_->frame_header_padding();
}
total_buffer_size += packet_size;
messages_to_process++;
}
// Create buffer with calculated size
ProtoWriteBuffer batch_buffer = this->create_buffer(total_buffer_size);
// Encode all messages
for (size_t i = 0; i < messages_to_process; i++) {
const auto &msg_info = messages[i];
for (size_t i = 0; i < messages.size(); i++) {
// For messages after the first, extend the buffer with padding
if (i > 0) {
batch_buffer = this->extend_buffer();
}
// Encode the message
msg_info.message->encode(batch_buffer);
messages[i].message->encode(batch_buffer);
}
// Add final footer space for Noise if needed
if (this->helper_->frame_footer_size() > 0) {
this->proto_write_buffer_.resize(this->proto_write_buffer_.size() + this->helper_->frame_footer_size());
}
ESP_LOGD(TAG, "Sending batch: %zu messages, %zu total bytes", items_processed, this->proto_write_buffer_.size());
// Send all collected packets
if (!packet_info.empty()) {
// Add final footer space for Noise if needed
if (this->helper_->frame_footer_size() > 0) {
this->proto_write_buffer_.resize(this->proto_write_buffer_.size() + this->helper_->frame_footer_size());
}
ESP_LOGD(TAG, "Sending batch: %zu messages, %zu total bytes", messages_to_process,
this->proto_write_buffer_.size());
APIError err = this->helper_->write_protobuf_packets(batch_buffer, packet_info);
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
on_fatal_error();
if (err == APIError::SOCKET_WRITE_FAILED && errno == ECONNRESET) {
ESP_LOGW(TAG, "%s: Connection reset during batch write", this->client_combined_info_.c_str());
} else {
ESP_LOGW(TAG, "%s: Batch write failed %s errno=%d", this->client_combined_info_.c_str(), api_error_to_str(err),
errno);
}
APIError err = this->helper_->write_protobuf_packets(batch_buffer, packet_info);
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
on_fatal_error();
if (err == APIError::SOCKET_WRITE_FAILED && errno == ECONNRESET) {
ESP_LOGW(TAG, "%s: Connection reset during batch write", this->client_combined_info_.c_str());
} else {
ESP_LOGW(TAG, "%s: Batch write failed %s errno=%d", this->client_combined_info_.c_str(), api_error_to_str(err),
errno);
}
}
// Remove processed items from the batch
if (messages_to_process < this->deferred_batch_.items.size()) {
// Some items weren't processed, keep them for next batch
this->deferred_batch_.items.erase(this->deferred_batch_.items.begin(),
this->deferred_batch_.items.begin() + messages_to_process);
// Handle remaining items more efficiently
if (items_processed < this->deferred_batch_.items.size()) {
// Move unprocessed items to the beginning using std::move
std::move(this->deferred_batch_.items.begin() + items_processed, this->deferred_batch_.items.end(),
this->deferred_batch_.items.begin());
this->deferred_batch_.items.resize(this->deferred_batch_.items.size() - items_processed);
// Reschedule for remaining items
this->schedule_batch_();
} else {