This commit is contained in:
J. Nick Koston
2025-06-05 23:33:41 +01:00
parent 7b2e9fa7d9
commit fdb33a893f
6 changed files with 156 additions and 162 deletions

View File

@@ -1442,9 +1442,8 @@ bool APIConnection::try_to_clear_buffer(bool log_out_of_space) {
return false;
}
bool APIConnection::send_buffer(ProtoWriteBuffer buffer, uint16_t message_type) {
// If we're in batch mode, just capture the message type and return success
// If we're in batch mode, just return success (message already encoded)
if (this->batch_mode_) {
this->captured_message_type_ = message_type;
return true;
}
@@ -1481,18 +1480,9 @@ void APIConnection::on_fatal_error() {
this->remove_ = true;
}
void APIConnection::DeferredBatch::add_item(void *entity, send_message_t send_func) {
// Check if we already have an item for this entity
for (auto &item : items) {
if (item.entity == entity && item.send_func == send_func) {
// Update timestamp to latest
item.timestamp = App.get_loop_component_start_time();
return;
}
}
// Add new item
items.push_back({entity, send_func, App.get_loop_component_start_time()});
void APIConnection::DeferredBatch::add_item(std::unique_ptr<ProtoMessage> message) {
// Add new item without deduplication
items.push_back({std::move(message), App.get_loop_component_start_time()});
}
void APIConnection::schedule_batch_() {
@@ -1512,11 +1502,7 @@ void APIConnection::process_batch_() {
// Try to clear buffer first
if (!this->helper_->can_write_without_blocking()) {
// Can't write now, defer everything to the regular deferred queue
for (const auto &item : this->deferred_batch_.items) {
this->deferred_message_queue_.defer(item.entity, item.send_func);
}
this->deferred_batch_.clear();
// Can't write now, we'll try again later
return;
}
@@ -1553,7 +1539,6 @@ void APIConnection::process_batch_() {
// Save current buffer position before extending
uint32_t msg_offset = 0;
uint32_t msg_content_start = 0;
this->captured_message_type_ = 0;
// For messages after the first, extend the buffer with padding
if (processed_count > 0) {
@@ -1566,16 +1551,18 @@ void APIConnection::process_batch_() {
msg_content_start = this->helper_->frame_header_padding();
}
// Try to encode the message
if (!(this->*item.send_func)(item.entity)) {
// Get the message type before encoding
uint16_t message_type = item.message->get_message_type();
// Encode the message directly from the stored ProtoMessage
bool success = item.message->encode(batch_buffer);
if (!success) {
// Encoding failed, revert buffer to previous size
this->proto_write_buffer_.resize(msg_offset);
continue;
}
// Get the captured message type
uint16_t message_type = this->captured_message_type_;
// Calculate message length - from content start to current buffer end
uint16_t msg_len = static_cast<uint16_t>(this->proto_write_buffer_.size() - msg_content_start);

View File

@@ -385,8 +385,10 @@ class APIConnection : public APIServerConnection {
response.disabled_by_default = entity->is_disabled_by_default();
response.entity_category = static_cast<enums::EntityCategory>(entity->get_entity_category());
// Send the response using the generic send_message method
return this->send_message(response);
// Add to deferred batch
this->deferred_batch_.add_item(std::make_unique<ResponseT>(response));
this->schedule_batch_();
return true;
}
/**
@@ -395,7 +397,12 @@ class APIConnection : public APIServerConnection {
* @param response The state response object with key already set
* @return True if the message was sent successfully
*/
template<typename ResponseT> bool try_send_entity_state(ResponseT &response) { return this->send_message(response); }
template<typename ResponseT> bool try_send_entity_state(ResponseT &response) {
// Add to deferred batch
this->deferred_batch_.add_item(std::make_unique<ResponseT>(response));
this->schedule_batch_();
return true;
}
bool send_(const void *buf, size_t len, bool force);
@@ -438,8 +445,7 @@ class APIConnection : public APIServerConnection {
// Generic batching mechanism for both state updates and entity info
struct DeferredBatch {
struct BatchItem {
void *entity;
send_message_t send_func;
std::unique_ptr<ProtoMessage> message;
uint32_t timestamp; // When this update was queued
};
@@ -447,8 +453,8 @@ class APIConnection : public APIServerConnection {
uint32_t batch_start_time{0};
bool batch_scheduled{false};
// Add item with deduplication - newer updates replace older ones for same entity
void add_item(void *entity, send_message_t send_func);
// Add item to the batch
void add_item(std::unique_ptr<ProtoMessage> message);
void clear() {
items.clear();
batch_scheduled = false;

File diff suppressed because it is too large Load Diff

View File

@@ -14,7 +14,7 @@ class APIServerConnectionBase : public ProtoService {
#ifdef HAS_PROTO_MESSAGE_DUMP
ESP_LOGVV(TAG, "send_message %s: %s", T::message_name(), msg.dump().c_str());
#endif
return this->send_message_(msg, T::message_type());
return this->send_message_(msg, msg.get_message_type());
}
virtual void on_hello_request(const HelloRequest &value){};

View File

@@ -330,6 +330,7 @@ class ProtoMessage {
virtual void encode(ProtoWriteBuffer buffer) const = 0;
void decode(const uint8_t *buffer, size_t length);
virtual void calculate_size(uint32_t &total_size) const = 0;
virtual uint16_t get_message_type() const = 0;
#ifdef HAS_PROTO_MESSAGE_DUMP
std::string dump() const;
virtual void dump_to(std::string &out) const = 0;

View File

@@ -779,7 +779,7 @@ def build_message_type(desc: descriptor.DescriptorProto) -> tuple[str, str]:
# Add message_type method if this is a service message
if message_id is not None:
public_content.append(
f"static constexpr uint32_t message_type() {{ return {message_id}; }}"
f"uint16_t get_message_type() const override {{ return {message_id}; }}"
)
# Add message_name method for debugging
public_content.append("#ifdef HAS_PROTO_MESSAGE_DUMP")
@@ -1097,7 +1097,7 @@ def main() -> None:
hpp += "#ifdef HAS_PROTO_MESSAGE_DUMP\n"
hpp += ' ESP_LOGVV(TAG, "send_message %s: %s", T::message_name(), msg.dump().c_str());\n'
hpp += "#endif\n"
hpp += " return this->send_message_(msg, T::message_type());\n"
hpp += " return this->send_message_(msg, msg.get_message_type());\n"
hpp += " }\n\n"
for mt in file.message_type: