batch
This commit is contained in:
@@ -170,10 +170,10 @@ void APIConnection::loop() {
|
||||
this->deferred_message_queue_.process_queue();
|
||||
}
|
||||
|
||||
// Process deferred state batch if scheduled
|
||||
if (this->deferred_state_batch_.batch_scheduled &&
|
||||
App.get_loop_component_start_time() - this->deferred_state_batch_.batch_start_time >= STATE_BATCH_DELAY_MS) {
|
||||
this->process_state_batch_();
|
||||
// Process deferred batch if scheduled
|
||||
if (this->deferred_batch_.batch_scheduled &&
|
||||
App.get_loop_component_start_time() - this->deferred_batch_.batch_start_time >= BATCH_DELAY_MS) {
|
||||
this->process_batch_();
|
||||
}
|
||||
|
||||
if (!this->list_entities_iterator_.completed())
|
||||
@@ -1696,30 +1696,30 @@ void APIConnection::on_fatal_error() {
|
||||
this->remove_ = true;
|
||||
}
|
||||
|
||||
void APIConnection::DeferredStateBatch::add_update(void *entity, send_message_t send_func) {
|
||||
// Check if we already have an update for this entity
|
||||
for (auto &update : updates) {
|
||||
if (update.entity == entity && update.send_func == send_func) {
|
||||
void APIConnection::DeferredBatch::add_item(void *entity, send_message_t send_func) {
|
||||
// Check if we already have an item for this entity
|
||||
for (auto &item : items) {
|
||||
if (item.entity == entity && item.send_func == send_func) {
|
||||
// Update timestamp to latest
|
||||
update.timestamp = App.get_loop_component_start_time();
|
||||
item.timestamp = App.get_loop_component_start_time();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Add new update
|
||||
updates.push_back({entity, send_func, App.get_loop_component_start_time()});
|
||||
// Add new item
|
||||
items.push_back({entity, send_func, App.get_loop_component_start_time()});
|
||||
}
|
||||
|
||||
void APIConnection::schedule_state_batch_() {
|
||||
if (!this->deferred_state_batch_.batch_scheduled) {
|
||||
this->deferred_state_batch_.batch_scheduled = true;
|
||||
this->deferred_state_batch_.batch_start_time = App.get_loop_component_start_time();
|
||||
void APIConnection::schedule_batch_() {
|
||||
if (!this->deferred_batch_.batch_scheduled) {
|
||||
this->deferred_batch_.batch_scheduled = true;
|
||||
this->deferred_batch_.batch_start_time = App.get_loop_component_start_time();
|
||||
}
|
||||
}
|
||||
|
||||
void APIConnection::process_state_batch_() {
|
||||
if (this->deferred_state_batch_.empty()) {
|
||||
this->deferred_state_batch_.batch_scheduled = false;
|
||||
void APIConnection::process_batch_() {
|
||||
if (this->deferred_batch_.empty()) {
|
||||
this->deferred_batch_.batch_scheduled = false;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -472,8 +472,8 @@ class APIConnection : public APIServerConnection {
|
||||
if (!this->state_subscription_)
|
||||
return false;
|
||||
// Add to batch instead of sending immediately
|
||||
this->deferred_state_batch_.add_update(entity, try_send_func);
|
||||
this->schedule_state_batch_();
|
||||
this->deferred_batch_.add_item(entity, try_send_func);
|
||||
this->schedule_batch_();
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -503,8 +503,8 @@ class APIConnection : public APIServerConnection {
|
||||
return false;
|
||||
// For state updates with values, we defer using the entity-only function
|
||||
// The current state will be read when the batch is processed
|
||||
this->deferred_state_batch_.add_update(entity, reinterpret_cast<send_message_t>(try_send_entity_func));
|
||||
this->schedule_state_batch_();
|
||||
this->deferred_batch_.add_item(entity, reinterpret_cast<send_message_t>(try_send_entity_func));
|
||||
this->schedule_batch_();
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -516,10 +516,9 @@ class APIConnection : public APIServerConnection {
|
||||
* @param try_send_func The function that tries to send the info
|
||||
*/
|
||||
void send_info_(esphome::EntityBase *entity, send_message_t try_send_func) {
|
||||
if (this->try_to_clear_buffer(true) && (this->*try_send_func)(entity)) {
|
||||
return;
|
||||
}
|
||||
this->deferred_message_queue_.defer(entity, try_send_func);
|
||||
// Always batch entity info messages
|
||||
this->deferred_batch_.add_item(entity, try_send_func);
|
||||
this->schedule_batch_();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -588,38 +587,38 @@ class APIConnection : public APIServerConnection {
|
||||
ListEntitiesIterator list_entities_iterator_;
|
||||
int state_subs_at_ = -1;
|
||||
|
||||
// State batching mechanism
|
||||
struct DeferredStateBatch {
|
||||
struct StateUpdate {
|
||||
// Generic batching mechanism for both state updates and entity info
|
||||
struct DeferredBatch {
|
||||
struct BatchItem {
|
||||
void *entity;
|
||||
send_message_t send_func;
|
||||
uint32_t timestamp; // When this update was queued
|
||||
};
|
||||
|
||||
std::vector<StateUpdate> updates;
|
||||
std::vector<BatchItem> items;
|
||||
uint32_t batch_start_time{0};
|
||||
bool batch_scheduled{false};
|
||||
|
||||
// Add update with deduplication - newer updates replace older ones for same entity
|
||||
void add_update(void *entity, send_message_t send_func);
|
||||
// Add item with deduplication - newer updates replace older ones for same entity
|
||||
void add_item(void *entity, send_message_t send_func);
|
||||
void clear() {
|
||||
updates.clear();
|
||||
items.clear();
|
||||
batch_scheduled = false;
|
||||
batch_start_time = 0;
|
||||
}
|
||||
bool empty() const { return updates.empty(); }
|
||||
bool empty() const { return items.empty(); }
|
||||
};
|
||||
|
||||
DeferredStateBatch deferred_state_batch_;
|
||||
static constexpr uint32_t STATE_BATCH_DELAY_MS = 10;
|
||||
DeferredBatch deferred_batch_;
|
||||
static constexpr uint32_t BATCH_DELAY_MS = 10;
|
||||
static constexpr size_t MAX_BATCH_SIZE_BYTES = 1360; // MTU - 100 bytes safety margin
|
||||
|
||||
// Batch mode state for capturing message types
|
||||
bool batch_mode_{false};
|
||||
uint16_t captured_message_type_{0};
|
||||
|
||||
void schedule_state_batch_();
|
||||
void process_state_batch_();
|
||||
void schedule_batch_();
|
||||
void process_batch_();
|
||||
};
|
||||
|
||||
} // namespace api
|
||||
|
||||
Reference in New Issue
Block a user