This commit is contained in:
J. Nick Koston
2025-05-14 15:19:07 -05:00
parent 3fbbec81af
commit fc609f02f3
2 changed files with 54 additions and 109 deletions

View File

@@ -821,143 +821,96 @@ APIError APIPlaintextFrameHelper::loop() {
*
* error API_ERROR_BAD_INDICATOR: Bad indicator byte at start of frame.
*/
APIError APIPlaintextFrameHelper::handle_socket_read_result_(ssize_t received) {
if (received == -1) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
return APIError::WOULD_BLOCK;
}
state_ = State::FAILED;
HELPER_LOG("Socket read failed with errno %d", errno);
return APIError::SOCKET_READ_FAILED;
} else if (received == 0) {
state_ = State::FAILED;
HELPER_LOG("Connection closed");
return APIError::CONNECTION_CLOSED;
}
return APIError::OK;
}
APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) {
// Minimum header size: 1 byte indicator + at least 1 byte for each varint (size and type)
static const size_t MIN_HEADER_SIZE = 3;
if (frame == nullptr) {
HELPER_LOG("Bad argument for try_read_frame_");
return APIError::BAD_ARG;
}
// Read and parse the header
// read header
while (!rx_header_parsed_) {
// Read the minimum bytes needed for initial header parsing
if (rx_header_buf_size_ < MIN_HEADER_SIZE) {
// Try to read up to the minimum header size
size_t to_read = MIN_HEADER_SIZE - rx_header_buf_size_;
ssize_t received = socket_->read(&rx_header_buf_[rx_header_buf_size_], to_read);
// Handle socket read result
APIError err = handle_socket_read_result_(received);
if (err != APIError::OK)
return err;
// Update buffer size with bytes received
rx_header_buf_size_ += received;
// Check indicator byte - we'll fail if it's wrong (regardless of which read detected it)
if (rx_header_buf_[0] != 0x00) {
state_ = State::FAILED;
HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]);
return APIError::BAD_INDICATOR;
}
// If we don't have the minimum bytes needed yet, there's no more data available
if (rx_header_buf_size_ < MIN_HEADER_SIZE)
uint8_t data;
ssize_t received = socket_->read(&data, 1);
if (received == -1) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
return APIError::WOULD_BLOCK;
} else if (rx_header_buf_size_ >= sizeof(rx_header_buf_)) {
// Buffer is full but we still couldn't parse the header
}
state_ = State::FAILED;
HELPER_LOG("Header too large for buffer");
return APIError::BAD_DATA_PACKET;
} else {
// Already have 3+ bytes, read one byte at a time for rest of header
// to make sure we don't read into the next message
ssize_t received = socket_->read(&rx_header_buf_[rx_header_buf_size_], 1);
HELPER_LOG("Socket read failed with errno %d", errno);
return APIError::SOCKET_READ_FAILED;
} else if (received == 0) {
state_ = State::FAILED;
HELPER_LOG("Connection closed");
return APIError::CONNECTION_CLOSED;
}
rx_header_buf_.push_back(data);
// Handle socket read result
APIError err = handle_socket_read_result_(received);
if (err != APIError::OK)
return err;
// Update buffer size with bytes received
rx_header_buf_size_ += received;
// try parse header
if (rx_header_buf_[0] != 0x00) {
state_ = State::FAILED;
HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]);
return APIError::BAD_INDICATOR;
}
// Parse the header - at this point we're guaranteed to have at least the minimum bytes needed
size_t header_pos = 1; // Start after the indicator byte
size_t i = 1;
uint32_t consumed = 0;
// Parse message size varint
auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[header_pos], rx_header_buf_size_ - header_pos, &consumed);
if (!msg_size_varint.has_value())
// Not enough data for message size yet
auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed);
if (!msg_size_varint.has_value()) {
// not enough data there yet
continue;
}
header_pos += consumed;
i += consumed;
rx_header_parsed_len_ = msg_size_varint->as_uint32();
if (header_pos >= rx_header_buf_size_)
// Not enough data for message type yet
continue;
// Parse message type varint
auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[header_pos], rx_header_buf_size_ - header_pos, &consumed);
if (!msg_type_varint.has_value())
// Not enough data for message type yet
auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed);
if (!msg_type_varint.has_value()) {
// not enough data there yet
continue;
}
rx_header_parsed_type_ = msg_type_varint->as_uint32();
rx_header_parsed_ = true;
}
// header reading done
// Now that we know the message size, allocate the buffer
// reserve space for body
if (rx_buf_.size() != rx_header_parsed_len_) {
rx_buf_.resize(rx_header_parsed_len_);
}
// Header parsing complete
// Special case: if message length is 0, we're already done (no body to read)
if (rx_header_parsed_len_ != 0) {
// Calculate how much more data we need
size_t remaining = rx_header_parsed_len_ - rx_buf_len_;
// Try to read all remaining bytes at once to minimize syscalls
ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], remaining);
// Handle socket read result
APIError err = handle_socket_read_result_(received);
if (err != APIError::OK) {
return err;
if (rx_buf_len_ < rx_header_parsed_len_) {
// more data to read
size_t to_read = rx_header_parsed_len_ - rx_buf_len_;
ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], to_read);
if (received == -1) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
return APIError::WOULD_BLOCK;
}
state_ = State::FAILED;
HELPER_LOG("Socket read failed with errno %d", errno);
return APIError::SOCKET_READ_FAILED;
} else if (received == 0) {
state_ = State::FAILED;
HELPER_LOG("Connection closed");
return APIError::CONNECTION_CLOSED;
}
// Update our buffer position
rx_buf_len_ += received;
// If we didn't get all the data we need, wait for more
if (rx_buf_len_ < rx_header_parsed_len_)
if ((size_t) received != to_read) {
// not all read
return APIError::WOULD_BLOCK;
}
}
// uncomment for even more debugging
#ifdef HELPER_LOG_PACKETS
ESP_LOGVV(TAG, "Received frame: %s", format_hex_pretty(rx_buf_).c_str());
#endif
// Move the message buffer to the frame
frame->msg = std::move(rx_buf_);
// Reset all state for the next message
// consume msg
rx_buf_ = {};
rx_buf_len_ = 0;
rx_header_buf_size_ = 0;
rx_header_buf_.clear();
rx_header_parsed_ = false;
return APIError::OK;
}
@@ -968,7 +921,6 @@ APIError APIPlaintextFrameHelper::read_packet(ReadPacketBuffer *buffer) {
return APIError::WOULD_BLOCK;
}
// Try to read data efficiently
ParsedFrame frame;
aerr = try_read_frame_(&frame);
if (aerr != APIError::OK) {

View File

@@ -176,17 +176,10 @@ class APIPlaintextFrameHelper : public APIFrameHelper {
return APIFrameHelper::write_raw_(iov, iovcnt, socket_.get(), tx_buf_, info_, state_, State::FAILED);
}
protected:
// Helper method to handle socket read results consistently
APIError handle_socket_read_result_(ssize_t received);
std::unique_ptr<socket::Socket> socket_;
std::string info_;
// Fixed-size header buffer - max we need is indicator byte (1) + max 2 varints (10 bytes each for uint64)
// In practice, for small packets this is much smaller
uint8_t rx_header_buf_[21];
size_t rx_header_buf_size_ = 0;
std::vector<uint8_t> rx_header_buf_;
bool rx_header_parsed_ = false;
uint32_t rx_header_parsed_type_ = 0;
uint32_t rx_header_parsed_len_ = 0;