Implement select()
This commit is contained in:
@@ -307,6 +307,12 @@ APIError APINoiseFrameHelper::try_read_frame_(ParsedFrame *frame) {
|
||||
return APIError::BAD_ARG;
|
||||
}
|
||||
|
||||
// Check if socket has data available before attempting to read
|
||||
int fd = this->socket_->get_fd();
|
||||
if (fd >= 0 && !App.is_socket_ready(fd)) {
|
||||
return APIError::WOULD_BLOCK;
|
||||
}
|
||||
|
||||
// read header
|
||||
if (rx_header_buf_len_ < 3) {
|
||||
// no header information yet
|
||||
@@ -829,6 +835,12 @@ APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) {
|
||||
return APIError::BAD_ARG;
|
||||
}
|
||||
|
||||
// Check if socket has data available before attempting to read
|
||||
int fd = this->socket_->get_fd();
|
||||
if (fd >= 0 && !App.is_socket_ready(fd)) {
|
||||
return APIError::WOULD_BLOCK;
|
||||
}
|
||||
|
||||
// read header
|
||||
while (!rx_header_parsed_) {
|
||||
uint8_t data;
|
||||
|
||||
@@ -112,18 +112,21 @@ void APIServer::setup() {
|
||||
}
|
||||
|
||||
void APIServer::loop() {
|
||||
// Accept new clients
|
||||
while (true) {
|
||||
struct sockaddr_storage source_addr;
|
||||
socklen_t addr_len = sizeof(source_addr);
|
||||
auto sock = this->socket_->accept((struct sockaddr *) &source_addr, &addr_len);
|
||||
if (!sock)
|
||||
break;
|
||||
ESP_LOGD(TAG, "Accepted %s", sock->getpeername().c_str());
|
||||
// Accept new clients only if the socket has incoming connections
|
||||
int server_fd = this->socket_->get_fd();
|
||||
if (server_fd >= 0 && App.is_socket_ready(server_fd)) {
|
||||
while (true) {
|
||||
struct sockaddr_storage source_addr;
|
||||
socklen_t addr_len = sizeof(source_addr);
|
||||
auto sock = this->socket_->accept((struct sockaddr *) &source_addr, &addr_len);
|
||||
if (!sock)
|
||||
break;
|
||||
ESP_LOGD(TAG, "Accepted %s", sock->getpeername().c_str());
|
||||
|
||||
auto *conn = new APIConnection(std::move(sock), this);
|
||||
this->clients_.emplace_back(conn);
|
||||
conn->start();
|
||||
auto *conn = new APIConnection(std::move(sock), this);
|
||||
this->clients_.emplace_back(conn);
|
||||
conn->start();
|
||||
}
|
||||
}
|
||||
|
||||
// Process clients and remove disconnected ones in a single pass
|
||||
|
||||
@@ -100,9 +100,13 @@ void ESPHomeOTAComponent::handle_() {
|
||||
#endif
|
||||
|
||||
if (client_ == nullptr) {
|
||||
struct sockaddr_storage source_addr;
|
||||
socklen_t addr_len = sizeof(source_addr);
|
||||
client_ = server_->accept((struct sockaddr *) &source_addr, &addr_len);
|
||||
// Check if the server socket is ready before accepting
|
||||
int server_fd = this->server_->get_fd();
|
||||
if (server_fd >= 0 && App.is_socket_ready(server_fd)) {
|
||||
struct sockaddr_storage source_addr;
|
||||
socklen_t addr_len = sizeof(source_addr);
|
||||
client_ = server_->accept((struct sockaddr *) &source_addr, &addr_len);
|
||||
}
|
||||
}
|
||||
if (client_ == nullptr)
|
||||
return;
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#ifdef USE_SOCKET_IMPL_BSD_SOCKETS
|
||||
|
||||
#include <cstring>
|
||||
#include "esphome/core/application.h"
|
||||
|
||||
#ifdef USE_ESP32
|
||||
#include <esp_idf_version.h>
|
||||
@@ -40,7 +41,12 @@ std::string format_sockaddr(const struct sockaddr_storage &storage) {
|
||||
|
||||
class BSDSocketImpl : public Socket {
|
||||
public:
|
||||
BSDSocketImpl(int fd) : fd_(fd) {}
|
||||
BSDSocketImpl(int fd) : fd_(fd) {
|
||||
// Register new socket with the application for select()
|
||||
if (fd_ >= 0) {
|
||||
App.register_socket_fd(fd_);
|
||||
}
|
||||
}
|
||||
~BSDSocketImpl() override {
|
||||
if (!closed_) {
|
||||
close(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
|
||||
@@ -55,9 +61,14 @@ class BSDSocketImpl : public Socket {
|
||||
}
|
||||
int bind(const struct sockaddr *addr, socklen_t addrlen) override { return ::bind(fd_, addr, addrlen); }
|
||||
int close() override {
|
||||
int ret = ::close(fd_);
|
||||
closed_ = true;
|
||||
return ret;
|
||||
if (!closed_) {
|
||||
// Unregister from select() before closing
|
||||
App.unregister_socket_fd(fd_);
|
||||
int ret = ::close(fd_);
|
||||
closed_ = true;
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int shutdown(int how) override { return ::shutdown(fd_, how); }
|
||||
|
||||
@@ -126,6 +137,8 @@ class BSDSocketImpl : public Socket {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int get_fd() const override { return fd_; }
|
||||
|
||||
protected:
|
||||
int fd_;
|
||||
bool closed_ = false;
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#ifdef USE_SOCKET_IMPL_LWIP_SOCKETS
|
||||
|
||||
#include <cstring>
|
||||
#include "esphome/core/application.h"
|
||||
|
||||
namespace esphome {
|
||||
namespace socket {
|
||||
@@ -33,7 +34,12 @@ std::string format_sockaddr(const struct sockaddr_storage &storage) {
|
||||
|
||||
class LwIPSocketImpl : public Socket {
|
||||
public:
|
||||
LwIPSocketImpl(int fd) : fd_(fd) {}
|
||||
LwIPSocketImpl(int fd) : fd_(fd) {
|
||||
// Register new socket with the application for select()
|
||||
if (fd_ >= 0) {
|
||||
App.register_socket_fd(fd_);
|
||||
}
|
||||
}
|
||||
~LwIPSocketImpl() override {
|
||||
if (!closed_) {
|
||||
close(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
|
||||
@@ -48,9 +54,14 @@ class LwIPSocketImpl : public Socket {
|
||||
}
|
||||
int bind(const struct sockaddr *addr, socklen_t addrlen) override { return lwip_bind(fd_, addr, addrlen); }
|
||||
int close() override {
|
||||
int ret = lwip_close(fd_);
|
||||
closed_ = true;
|
||||
return ret;
|
||||
if (!closed_) {
|
||||
// Unregister from select() before closing
|
||||
App.unregister_socket_fd(fd_);
|
||||
int ret = lwip_close(fd_);
|
||||
closed_ = true;
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int shutdown(int how) override { return lwip_shutdown(fd_, how); }
|
||||
|
||||
@@ -98,6 +109,8 @@ class LwIPSocketImpl : public Socket {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int get_fd() const override { return fd_; }
|
||||
|
||||
protected:
|
||||
int fd_;
|
||||
bool closed_ = false;
|
||||
|
||||
@@ -44,6 +44,9 @@ class Socket {
|
||||
|
||||
virtual int setblocking(bool blocking) = 0;
|
||||
virtual int loop() { return 0; };
|
||||
|
||||
/// Get the underlying file descriptor (returns -1 if not supported)
|
||||
virtual int get_fd() const { return -1; }
|
||||
};
|
||||
|
||||
/// Create a socket of the given domain, type and protocol.
|
||||
|
||||
@@ -7,6 +7,11 @@
|
||||
#include "esphome/components/status_led/status_led.h"
|
||||
#endif
|
||||
|
||||
#ifdef FD_SETSIZE
|
||||
#include <sys/select.h>
|
||||
#include <cerrno>
|
||||
#endif
|
||||
|
||||
namespace esphome {
|
||||
|
||||
static const char *const TAG = "app";
|
||||
@@ -106,7 +111,48 @@ void Application::loop() {
|
||||
// otherwise interval=0 schedules result in constant looping with almost no sleep
|
||||
next_schedule = std::max(next_schedule, delay_time / 2);
|
||||
delay_time = std::min(next_schedule, delay_time);
|
||||
|
||||
#ifdef FD_SETSIZE
|
||||
if (!this->socket_fds_.empty()) {
|
||||
// Use select() with timeout when we have sockets to monitor
|
||||
|
||||
// Update fd_set if socket list has changed
|
||||
if (this->socket_fds_changed_) {
|
||||
FD_ZERO(&this->base_read_fds_);
|
||||
for (int fd : this->socket_fds_) {
|
||||
if (fd >= 0 && fd < FD_SETSIZE) {
|
||||
FD_SET(fd, &this->base_read_fds_);
|
||||
}
|
||||
}
|
||||
this->socket_fds_changed_ = false;
|
||||
}
|
||||
|
||||
// Copy base fd_set before each select
|
||||
this->read_fds_ = this->base_read_fds_;
|
||||
|
||||
// Convert delay_time (milliseconds) to timeval
|
||||
struct timeval tv;
|
||||
tv.tv_sec = delay_time / 1000;
|
||||
tv.tv_usec = (delay_time % 1000) * 1000;
|
||||
|
||||
// Call select with timeout
|
||||
int ret = select(this->max_fd_ + 1, &this->read_fds_, nullptr, nullptr, &tv);
|
||||
|
||||
if (ret < 0 && errno != EINTR) {
|
||||
// Log error but continue - fall back to delay
|
||||
ESP_LOGW(TAG, "select() failed with errno %d", errno);
|
||||
delay(delay_time);
|
||||
}
|
||||
// If ret == 0, timeout occurred (normal)
|
||||
// If ret > 0, socket(s) ready for reading (will be handled in component loops)
|
||||
} else {
|
||||
// No sockets registered, use regular delay
|
||||
delay(delay_time);
|
||||
}
|
||||
#else
|
||||
// No select support, use regular delay
|
||||
delay(delay_time);
|
||||
#endif
|
||||
}
|
||||
this->last_loop_ = last_op_end_time;
|
||||
|
||||
@@ -167,6 +213,44 @@ void Application::calculate_looping_components_() {
|
||||
}
|
||||
}
|
||||
|
||||
void Application::register_socket_fd(int fd) {
|
||||
if (fd < 0)
|
||||
return;
|
||||
|
||||
this->socket_fds_.insert(fd);
|
||||
this->socket_fds_changed_ = true;
|
||||
|
||||
if (fd > this->max_fd_) {
|
||||
this->max_fd_ = fd;
|
||||
}
|
||||
}
|
||||
|
||||
void Application::unregister_socket_fd(int fd) {
|
||||
if (fd < 0)
|
||||
return;
|
||||
|
||||
this->socket_fds_.erase(fd);
|
||||
this->socket_fds_changed_ = true;
|
||||
|
||||
// Recalculate max_fd if necessary
|
||||
if (fd == this->max_fd_ && !this->socket_fds_.empty()) {
|
||||
this->max_fd_ = *this->socket_fds_.rbegin();
|
||||
} else if (this->socket_fds_.empty()) {
|
||||
this->max_fd_ = -1;
|
||||
}
|
||||
}
|
||||
|
||||
bool Application::is_socket_ready(int fd) const {
|
||||
#ifdef FD_SETSIZE
|
||||
if (fd < 0 || fd >= FD_SETSIZE)
|
||||
return false;
|
||||
return FD_ISSET(fd, &this->read_fds_);
|
||||
#else
|
||||
// If we don't have select support, assume socket is always ready
|
||||
return true;
|
||||
#endif
|
||||
}
|
||||
|
||||
Application App; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
|
||||
} // namespace esphome
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <set>
|
||||
#include "esphome/core/component.h"
|
||||
#include "esphome/core/defines.h"
|
||||
#include "esphome/core/hal.h"
|
||||
@@ -73,6 +74,12 @@
|
||||
#include "esphome/components/update/update_entity.h"
|
||||
#endif
|
||||
|
||||
#ifdef USE_BSD_SOCKETS
|
||||
#include <sys/select.h>
|
||||
#elif defined(USE_LWIP_SOCKETS) || defined(USE_SOCKET_IMPL_LWIP_SOCKETS)
|
||||
#include "lwip/sockets.h"
|
||||
#endif
|
||||
|
||||
namespace esphome {
|
||||
|
||||
class Application {
|
||||
@@ -467,6 +474,13 @@ class Application {
|
||||
|
||||
Scheduler scheduler;
|
||||
|
||||
/// Register a socket file descriptor to be monitored for read events
|
||||
void register_socket_fd(int fd);
|
||||
/// Unregister a socket file descriptor
|
||||
void unregister_socket_fd(int fd);
|
||||
/// Check if there's data available on a socket without blocking
|
||||
bool is_socket_ready(int fd) const;
|
||||
|
||||
protected:
|
||||
friend Component;
|
||||
|
||||
@@ -555,6 +569,15 @@ class Application {
|
||||
uint32_t app_state_{0};
|
||||
Component *current_component_{nullptr};
|
||||
uint32_t loop_component_start_time_{0};
|
||||
|
||||
// Socket select management
|
||||
std::set<int> socket_fds_;
|
||||
bool socket_fds_changed_{false};
|
||||
int max_fd_{-1};
|
||||
#ifdef FD_SETSIZE
|
||||
fd_set base_read_fds_{};
|
||||
fd_set read_fds_{};
|
||||
#endif
|
||||
};
|
||||
|
||||
/// Global storage of Application pointer - only one Application can exist.
|
||||
|
||||
Reference in New Issue
Block a user