frame helper opt
This commit is contained in:
@@ -63,6 +63,11 @@ APIConnection::APIConnection(std::unique_ptr<socket::Socket> sock, APIServer *pa
|
||||
: parent_(parent), deferred_message_queue_(this), initial_state_iterator_(this), list_entities_iterator_(this) {
|
||||
this->proto_write_buffer_.reserve(64);
|
||||
|
||||
// Explicitly initialize stats
|
||||
this->stats_enabled_ = true;
|
||||
this->next_stats_log_ = 0;
|
||||
ESP_LOGD(STATS_TAG, "API Connection created with stats_enabled_=true");
|
||||
|
||||
#if defined(USE_API_PLAINTEXT) && defined(USE_API_NOISE)
|
||||
auto noise_ctx = parent->get_noise_ctx();
|
||||
if (noise_ctx->has_psk()) {
|
||||
@@ -97,6 +102,11 @@ void APIConnection::log_section_stats_() {
|
||||
ESP_LOGI(STATS_TAG, "API Connection Section Runtime Statistics");
|
||||
ESP_LOGI(STATS_TAG, "Period stats (last %" PRIu32 "ms):", this->stats_log_interval_);
|
||||
|
||||
if (this->section_stats_.empty()) {
|
||||
ESP_LOGW(STATS_TAG, "No section stats collected yet");
|
||||
return;
|
||||
}
|
||||
|
||||
// First collect stats we want to display
|
||||
std::vector<std::pair<std::string, const APISectionStats *>> stats_to_display;
|
||||
|
||||
@@ -140,6 +150,7 @@ void APIConnection::log_section_stats_() {
|
||||
}
|
||||
|
||||
void APIConnection::reset_section_stats_() {
|
||||
ESP_LOGD(STATS_TAG, "Resetting API section stats, sections count: %u", this->section_stats_.size());
|
||||
for (auto &it : this->section_stats_) {
|
||||
it.second.reset_period_stats();
|
||||
}
|
||||
@@ -348,10 +359,24 @@ void APIConnection::loop() {
|
||||
// If next_stats_log_ is 0, initialize it
|
||||
if (this->next_stats_log_ == 0) {
|
||||
this->next_stats_log_ = now + this->stats_log_interval_;
|
||||
ESP_LOGI(STATS_TAG, "API section stats logging enabled, next log at %u", this->next_stats_log_);
|
||||
} else if (now >= this->next_stats_log_) {
|
||||
ESP_LOGI(STATS_TAG, "Logging API section stats now (current time: %u, scheduled time: %u)", now,
|
||||
this->next_stats_log_);
|
||||
// Force logging even if no stats are collected yet
|
||||
ESP_LOGI(STATS_TAG, "Stats collection status: enabled=%d, sections=%u", this->stats_enabled_,
|
||||
this->section_stats_.size());
|
||||
|
||||
// Explicitly log some stats we know should exist
|
||||
ESP_LOGI(STATS_TAG, "Record count for key sections: helper_loop=%u, read_packet=%u, total_loop=%u",
|
||||
this->section_stats_["helper_loop"].get_period_count(),
|
||||
this->section_stats_["read_packet"].get_period_count(),
|
||||
this->section_stats_["total_loop"].get_period_count());
|
||||
|
||||
this->log_section_stats_();
|
||||
this->reset_section_stats_();
|
||||
this->next_stats_log_ = now + this->stats_log_interval_;
|
||||
ESP_LOGI(STATS_TAG, "Next API section stats log scheduled for %u", this->next_stats_log_);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -86,6 +86,11 @@ class APIConnection : public APIServerConnection {
|
||||
this->total_time_ms_ += duration_ms;
|
||||
if (duration_ms > this->total_max_time_ms_)
|
||||
this->total_max_time_ms_ = duration_ms;
|
||||
|
||||
// Log if this is the first record in this period
|
||||
if (this->period_count_ == 1) {
|
||||
ESP_LOGV("api.stats", "First time recording stats for this section: %u ms", duration_ms);
|
||||
}
|
||||
}
|
||||
|
||||
void reset_period_stats() {
|
||||
@@ -433,6 +438,9 @@ class APIConnection : public APIServerConnection {
|
||||
bool stats_enabled_{true};
|
||||
void log_section_stats_();
|
||||
void reset_section_stats_();
|
||||
|
||||
// Method to enable/disable section stats
|
||||
void set_stats_enabled(bool enabled) { this->stats_enabled_ = enabled; }
|
||||
};
|
||||
|
||||
} // namespace api
|
||||
|
||||
@@ -821,96 +821,143 @@ APIError APIPlaintextFrameHelper::loop() {
|
||||
*
|
||||
* error API_ERROR_BAD_INDICATOR: Bad indicator byte at start of frame.
|
||||
*/
|
||||
|
||||
APIError APIPlaintextFrameHelper::handle_socket_read_result_(ssize_t received) {
|
||||
if (received == -1) {
|
||||
if (errno == EWOULDBLOCK || errno == EAGAIN) {
|
||||
return APIError::WOULD_BLOCK;
|
||||
}
|
||||
state_ = State::FAILED;
|
||||
HELPER_LOG("Socket read failed with errno %d", errno);
|
||||
return APIError::SOCKET_READ_FAILED;
|
||||
} else if (received == 0) {
|
||||
state_ = State::FAILED;
|
||||
HELPER_LOG("Connection closed");
|
||||
return APIError::CONNECTION_CLOSED;
|
||||
}
|
||||
return APIError::OK;
|
||||
}
|
||||
|
||||
APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) {
|
||||
// Minimum header size: 1 byte indicator + at least 1 byte for each varint (size and type)
|
||||
static const size_t MIN_HEADER_SIZE = 3;
|
||||
|
||||
if (frame == nullptr) {
|
||||
HELPER_LOG("Bad argument for try_read_frame_");
|
||||
return APIError::BAD_ARG;
|
||||
}
|
||||
|
||||
// read header
|
||||
// Read and parse the header
|
||||
while (!rx_header_parsed_) {
|
||||
uint8_t data;
|
||||
ssize_t received = socket_->read(&data, 1);
|
||||
if (received == -1) {
|
||||
if (errno == EWOULDBLOCK || errno == EAGAIN) {
|
||||
return APIError::WOULD_BLOCK;
|
||||
// Read the minimum bytes needed for initial header parsing
|
||||
if (rx_header_buf_size_ < MIN_HEADER_SIZE) {
|
||||
// Try to read up to the minimum header size
|
||||
size_t to_read = MIN_HEADER_SIZE - rx_header_buf_size_;
|
||||
ssize_t received = socket_->read(&rx_header_buf_[rx_header_buf_size_], to_read);
|
||||
|
||||
// Handle socket read result
|
||||
APIError err = handle_socket_read_result_(received);
|
||||
if (err != APIError::OK)
|
||||
return err;
|
||||
|
||||
// Update buffer size with bytes received
|
||||
rx_header_buf_size_ += received;
|
||||
|
||||
// Check indicator byte - we'll fail if it's wrong (regardless of which read detected it)
|
||||
if (rx_header_buf_[0] != 0x00) {
|
||||
state_ = State::FAILED;
|
||||
HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]);
|
||||
return APIError::BAD_INDICATOR;
|
||||
}
|
||||
state_ = State::FAILED;
|
||||
HELPER_LOG("Socket read failed with errno %d", errno);
|
||||
return APIError::SOCKET_READ_FAILED;
|
||||
} else if (received == 0) {
|
||||
state_ = State::FAILED;
|
||||
HELPER_LOG("Connection closed");
|
||||
return APIError::CONNECTION_CLOSED;
|
||||
}
|
||||
rx_header_buf_.push_back(data);
|
||||
|
||||
// try parse header
|
||||
if (rx_header_buf_[0] != 0x00) {
|
||||
// If we don't have the minimum bytes needed yet, there's no more data available
|
||||
if (rx_header_buf_size_ < MIN_HEADER_SIZE)
|
||||
return APIError::WOULD_BLOCK;
|
||||
} else if (rx_header_buf_size_ >= sizeof(rx_header_buf_)) {
|
||||
// Buffer is full but we still couldn't parse the header
|
||||
state_ = State::FAILED;
|
||||
HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]);
|
||||
return APIError::BAD_INDICATOR;
|
||||
HELPER_LOG("Header too large for buffer");
|
||||
return APIError::BAD_DATA_PACKET;
|
||||
} else {
|
||||
// Already have 3+ bytes, read one byte at a time for rest of header
|
||||
// to make sure we don't read into the next message
|
||||
ssize_t received = socket_->read(&rx_header_buf_[rx_header_buf_size_], 1);
|
||||
|
||||
// Handle socket read result
|
||||
APIError err = handle_socket_read_result_(received);
|
||||
if (err != APIError::OK)
|
||||
return err;
|
||||
|
||||
// Update buffer size with bytes received
|
||||
rx_header_buf_size_ += received;
|
||||
}
|
||||
|
||||
size_t i = 1;
|
||||
// Parse the header - at this point we're guaranteed to have at least the minimum bytes needed
|
||||
size_t header_pos = 1; // Start after the indicator byte
|
||||
uint32_t consumed = 0;
|
||||
auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed);
|
||||
if (!msg_size_varint.has_value()) {
|
||||
// not enough data there yet
|
||||
continue;
|
||||
}
|
||||
|
||||
i += consumed;
|
||||
// Parse message size varint
|
||||
auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[header_pos], rx_header_buf_size_ - header_pos, &consumed);
|
||||
if (!msg_size_varint.has_value())
|
||||
// Not enough data for message size yet
|
||||
continue;
|
||||
|
||||
header_pos += consumed;
|
||||
rx_header_parsed_len_ = msg_size_varint->as_uint32();
|
||||
|
||||
auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed);
|
||||
if (!msg_type_varint.has_value()) {
|
||||
// not enough data there yet
|
||||
if (header_pos >= rx_header_buf_size_)
|
||||
// Not enough data for message type yet
|
||||
continue;
|
||||
|
||||
// Parse message type varint
|
||||
auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[header_pos], rx_header_buf_size_ - header_pos, &consumed);
|
||||
if (!msg_type_varint.has_value())
|
||||
// Not enough data for message type yet
|
||||
continue;
|
||||
}
|
||||
rx_header_parsed_type_ = msg_type_varint->as_uint32();
|
||||
rx_header_parsed_ = true;
|
||||
}
|
||||
// header reading done
|
||||
|
||||
// reserve space for body
|
||||
if (rx_buf_.size() != rx_header_parsed_len_) {
|
||||
// Now that we know the message size, allocate the buffer
|
||||
rx_buf_.resize(rx_header_parsed_len_);
|
||||
}
|
||||
// Header parsing complete
|
||||
|
||||
if (rx_buf_len_ < rx_header_parsed_len_) {
|
||||
// more data to read
|
||||
size_t to_read = rx_header_parsed_len_ - rx_buf_len_;
|
||||
ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], to_read);
|
||||
if (received == -1) {
|
||||
if (errno == EWOULDBLOCK || errno == EAGAIN) {
|
||||
return APIError::WOULD_BLOCK;
|
||||
}
|
||||
state_ = State::FAILED;
|
||||
HELPER_LOG("Socket read failed with errno %d", errno);
|
||||
return APIError::SOCKET_READ_FAILED;
|
||||
} else if (received == 0) {
|
||||
state_ = State::FAILED;
|
||||
HELPER_LOG("Connection closed");
|
||||
return APIError::CONNECTION_CLOSED;
|
||||
// Special case: if message length is 0, we're already done (no body to read)
|
||||
if (rx_header_parsed_len_ != 0) {
|
||||
// Calculate how much more data we need
|
||||
size_t remaining = rx_header_parsed_len_ - rx_buf_len_;
|
||||
|
||||
// Try to read all remaining bytes at once to minimize syscalls
|
||||
ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], remaining);
|
||||
|
||||
// Handle socket read result
|
||||
APIError err = handle_socket_read_result_(received);
|
||||
if (err != APIError::OK) {
|
||||
return err;
|
||||
}
|
||||
|
||||
// Update our buffer position
|
||||
rx_buf_len_ += received;
|
||||
if ((size_t) received != to_read) {
|
||||
// not all read
|
||||
|
||||
// If we didn't get all the data we need, wait for more
|
||||
if (rx_buf_len_ < rx_header_parsed_len_)
|
||||
return APIError::WOULD_BLOCK;
|
||||
}
|
||||
}
|
||||
|
||||
// uncomment for even more debugging
|
||||
#ifdef HELPER_LOG_PACKETS
|
||||
ESP_LOGVV(TAG, "Received frame: %s", format_hex_pretty(rx_buf_).c_str());
|
||||
#endif
|
||||
|
||||
// Move the message buffer to the frame
|
||||
frame->msg = std::move(rx_buf_);
|
||||
// consume msg
|
||||
|
||||
// Reset all state for the next message
|
||||
rx_buf_ = {};
|
||||
rx_buf_len_ = 0;
|
||||
rx_header_buf_.clear();
|
||||
rx_header_buf_size_ = 0;
|
||||
rx_header_parsed_ = false;
|
||||
|
||||
return APIError::OK;
|
||||
}
|
||||
|
||||
@@ -921,6 +968,7 @@ APIError APIPlaintextFrameHelper::read_packet(ReadPacketBuffer *buffer) {
|
||||
return APIError::WOULD_BLOCK;
|
||||
}
|
||||
|
||||
// Try to read data efficiently
|
||||
ParsedFrame frame;
|
||||
aerr = try_read_frame_(&frame);
|
||||
if (aerr != APIError::OK) {
|
||||
|
||||
@@ -176,10 +176,17 @@ class APIPlaintextFrameHelper : public APIFrameHelper {
|
||||
return APIFrameHelper::write_raw_(iov, iovcnt, socket_.get(), tx_buf_, info_, state_, State::FAILED);
|
||||
}
|
||||
|
||||
protected:
|
||||
// Helper method to handle socket read results consistently
|
||||
APIError handle_socket_read_result_(ssize_t received);
|
||||
|
||||
std::unique_ptr<socket::Socket> socket_;
|
||||
|
||||
std::string info_;
|
||||
std::vector<uint8_t> rx_header_buf_;
|
||||
// Fixed-size header buffer - max we need is indicator byte (1) + max 2 varints (10 bytes each for uint64)
|
||||
// In practice, for small packets this is much smaller
|
||||
uint8_t rx_header_buf_[21];
|
||||
size_t rx_header_buf_size_ = 0;
|
||||
bool rx_header_parsed_ = false;
|
||||
uint32_t rx_header_parsed_type_ = 0;
|
||||
uint32_t rx_header_parsed_len_ = 0;
|
||||
|
||||
Reference in New Issue
Block a user