batch state sends

This commit is contained in:
J. Nick Koston
2025-06-05 16:50:27 +01:00
parent d19997a056
commit 1b7e1afd9b
5 changed files with 378 additions and 108 deletions

View File

@@ -170,6 +170,12 @@ void APIConnection::loop() {
this->deferred_message_queue_.process_queue();
}
// Process deferred state batch if scheduled
if (this->deferred_state_batch_.batch_scheduled &&
App.get_loop_component_start_time() - this->deferred_state_batch_.batch_start_time >= STATE_BATCH_DELAY_MS) {
this->process_state_batch_();
}
if (!this->list_entities_iterator_.completed())
this->list_entities_iterator_.advance();
if (!this->initial_state_iterator_.completed() && this->list_entities_iterator_.completed())
@@ -1650,7 +1656,13 @@ bool APIConnection::try_to_clear_buffer(bool log_out_of_space) {
}
return false;
}
bool APIConnection::send_buffer(ProtoWriteBuffer buffer, uint32_t message_type) {
bool APIConnection::send_buffer(ProtoWriteBuffer buffer, uint16_t message_type) {
// If we're in batch mode, just capture the message type and return success
if (this->batch_mode_) {
this->captured_message_type_ = message_type;
return true;
}
if (!this->try_to_clear_buffer(message_type != 29)) { // SubscribeLogsResponse
return false;
}
@@ -1684,6 +1696,139 @@ void APIConnection::on_fatal_error() {
this->remove_ = true;
}
void APIConnection::DeferredStateBatch::add_update(void *entity, send_message_t send_func) {
// Check if we already have an update for this entity
for (auto &update : updates) {
if (update.entity == entity && update.send_func == send_func) {
// Update timestamp to latest
update.timestamp = App.get_loop_component_start_time();
return;
}
}
// Add new update
updates.push_back({entity, send_func, App.get_loop_component_start_time()});
}
void APIConnection::schedule_state_batch_() {
if (!this->deferred_state_batch_.batch_scheduled) {
this->deferred_state_batch_.batch_scheduled = true;
this->deferred_state_batch_.batch_start_time = App.get_loop_component_start_time();
}
}
void APIConnection::process_state_batch_() {
if (this->deferred_state_batch_.empty()) {
this->deferred_state_batch_.batch_scheduled = false;
return;
}
// Try to clear buffer first
if (!this->helper_->can_write_without_blocking()) {
// Can't write now, defer everything to the regular deferred queue
for (const auto &update : this->deferred_state_batch_.updates) {
this->deferred_message_queue_.defer(update.entity, update.send_func);
}
this->deferred_state_batch_.clear();
return;
}
// Enable batch mode to capture message types
this->batch_mode_ = true;
// Track packet information (type, offset, length)
std::vector<std::tuple<uint16_t, uint32_t, uint16_t>> packet_info;
size_t total_size = 0;
size_t processed_count = 0;
// Create initial buffer with estimated size
ProtoWriteBuffer batch_buffer = this->create_buffer(MAX_BATCH_SIZE_BYTES);
// Conservative estimate for minimum packet size: 6 byte header + 100 bytes minimum message + footer
const uint16_t min_next_packet_size = 106 + this->helper_->frame_footer_size();
for (size_t i = 0; i < this->deferred_state_batch_.updates.size(); i++) {
const auto &update = this->deferred_state_batch_.updates[i];
// For the first message, check if we have enough space for at least one message
// Use conservative estimates: max header (6 bytes) + some payload + footer
if (processed_count == 0) {
// Always try to send at least one message
} else if (total_size + min_next_packet_size > MAX_BATCH_SIZE_BYTES) {
// For subsequent messages, check if we have reasonable space left
// Probably won't fit, stop here
break;
}
// Save current buffer position before extending
uint32_t msg_offset = 0;
this->captured_message_type_ = 0;
// For messages after the first, extend the buffer with padding
if (processed_count > 0) {
msg_offset = static_cast<uint32_t>(this->proto_write_buffer_.size());
batch_buffer = this->extend_buffer();
}
// Try to encode the message
if (!(this->*update.send_func)(update.entity)) {
// Encoding failed, revert buffer to previous size
this->proto_write_buffer_.resize(msg_offset);
continue;
}
// Get the captured message type
uint16_t message_type = this->captured_message_type_;
// Calculate message length
uint16_t msg_len =
static_cast<uint16_t>(this->proto_write_buffer_.size() - msg_offset - this->helper_->frame_header_padding());
// Record packet info
packet_info.push_back(std::make_tuple(message_type, msg_offset, msg_len));
processed_count++;
// Calculate actual packet size including protocol overhead
uint16_t packet_overhead = this->helper_->calculate_packet_overhead(message_type, msg_len);
uint16_t packet_size = msg_len + packet_overhead;
total_size += packet_size;
}
// Disable batch mode
this->batch_mode_ = false;
// Send all collected packets
if (!packet_info.empty()) {
// Add final footer space for Noise if needed
if (this->helper_->frame_footer_size() > 0) {
this->proto_write_buffer_.resize(this->proto_write_buffer_.size() + this->helper_->frame_footer_size());
}
APIError err = this->helper_->write_protobuf_packets(batch_buffer, packet_info);
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
on_fatal_error();
if (err == APIError::SOCKET_WRITE_FAILED && errno == ECONNRESET) {
ESP_LOGW(TAG, "%s: Connection reset during batch write", this->client_combined_info_.c_str());
} else {
ESP_LOGW(TAG, "%s: Batch write failed %s errno=%d", this->client_combined_info_.c_str(), api_error_to_str(err),
errno);
}
}
}
// Remove processed updates from the batch
if (processed_count < this->deferred_state_batch_.updates.size()) {
// Some updates weren't processed, keep them for next batch
this->deferred_state_batch_.updates.erase(this->deferred_state_batch_.updates.begin(),
this->deferred_state_batch_.updates.begin() + processed_count);
// Reschedule for remaining updates
this->schedule_state_batch_();
} else {
// All updates processed
this->deferred_state_batch_.clear();
}
}
} // namespace api
} // namespace esphome
#endif