25 : socket_path_(
std::move(socket_path))
26 , initial_max_clients_(initial_max_clients)
28 const size_t reserve_size = initial_max_clients > 0 ?
static_cast<size_t>(initial_max_clients) : 10;
74 return static_cast<int>(i);
84 if (client_id < 0 ||
static_cast<size_t>(client_id) >=
client_fds_.size() ||
90 int fd =
client_fds_[
static_cast<size_t>(client_id)];
93 auto msg_len =
static_cast<uint32_t
>(
len);
94 ssize_t n =
::send(fd, &msg_len,
sizeof(msg_len), 0);
95 if (n < 0 ||
static_cast<size_t>(n) !=
sizeof(msg_len)) {
104 const auto bytes_sent =
static_cast<size_t>(n);
105 return bytes_sent ==
len;
117 if (client_id < 0 ||
static_cast<size_t>(client_id) >=
client_fds_.size() ||
122 int fd =
client_fds_[
static_cast<size_t>(client_id)];
123 const auto client_idx =
static_cast<size_t>(client_id);
131 uint32_t msg_len = 0;
132 size_t total_read = 0;
133 while (total_read <
sizeof(msg_len)) {
134 ssize_t n = ::recv(fd,
reinterpret_cast<uint8_t*
>(&msg_len) + total_read,
sizeof(msg_len) - total_read, 0);
136 if (errno == EINTR) {
146 total_read +=
static_cast<size_t>(n);
150 size_t total_size =
sizeof(uint32_t) + msg_len;
160 while (total_read < msg_len) {
162 ::recv(fd,
recv_buffers_[client_idx].
data() +
sizeof(uint32_t) + total_read, msg_len - total_read, 0);
164 if (errno == EINTR) {
175 total_read +=
static_cast<size_t>(n);
203 if (flags < 0 || fcntl(
listen_fd_, F_SETFL, flags | O_NONBLOCK) < 0) {
210 struct sockaddr_un addr;
212 addr.sun_family = AF_UNIX;
215 if (bind(
listen_fd_,
reinterpret_cast<struct sockaddr*
>(&addr),
sizeof(addr)) < 0) {
244 EV_SET(&ev,
listen_fd_, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0,
nullptr);
245 if (kevent(
fd_, &ev, 1,
nullptr, 0,
nullptr) < 0) {
266 int last_client_id = -1;
273 if (errno == EAGAIN || errno == EWOULDBLOCK) {
278 if (last_client_id >= 0) {
287 int flags = fcntl(client_fd, F_GETFL, 0);
289 fcntl(client_fd, F_SETFL, flags & ~O_NONBLOCK);
296 const auto client_id_unsigned =
static_cast<size_t>(client_id);
300 client_fds_[
static_cast<size_t>(client_id)] = client_fd;
306 EV_SET(&kev, client_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0,
nullptr);
307 if (kevent(
fd_, &kev, 1,
nullptr, 0,
nullptr) < 0) {
313 last_client_id = client_id;
316 return last_client_id;
327 struct timespec timeout;
328 struct timespec* timeout_ptr =
nullptr;
330 if (timeout_ns > 0) {
331 timeout.tv_sec =
static_cast<time_t
>(timeout_ns / 1000000000ULL);
332 timeout.tv_nsec =
static_cast<long>(timeout_ns % 1000000000ULL);
333 timeout_ptr = &timeout;
334 }
else if (timeout_ns == 0) {
337 timeout_ptr = &timeout;
340 int n = kevent(
fd_,
nullptr, 0, &ev, 1, timeout_ptr);
345 int ready_fd =
static_cast<int>(ev.ident);
365 if (client_id < 0 ||
static_cast<size_t>(client_id) >=
client_fds_.size()) {
369 int fd =
client_fds_[
static_cast<size_t>(client_id)];
374 EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0,
nullptr);
375 kevent(
fd_, &ev, 1,
nullptr, 0,
nullptr);
407 if (flags < 0 || fcntl(
listen_fd_, F_SETFL, flags | O_NONBLOCK) < 0) {
414 struct sockaddr_un addr;
416 addr.sun_family = AF_UNIX;
419 if (bind(
listen_fd_,
reinterpret_cast<struct sockaddr*
>(&addr),
sizeof(addr)) < 0) {
438 fd_ = epoll_create1(0);
447 struct epoll_event ev;
471 int last_client_id = -1;
478 if (errno == EAGAIN || errno == EWOULDBLOCK) {
483 if (last_client_id >= 0) {
492 int flags = fcntl(client_fd, F_GETFL, 0);
494 fcntl(client_fd, F_SETFL, flags & ~O_NONBLOCK);
501 const auto client_id_unsigned =
static_cast<size_t>(client_id);
505 client_fds_[
static_cast<size_t>(client_id)] = client_fd;
510 struct epoll_event client_ev;
511 client_ev.events = EPOLLIN;
512 client_ev.data.fd = client_fd;
513 if (epoll_ctl(
fd_, EPOLL_CTL_ADD, client_fd, &client_ev) < 0) {
519 last_client_id = client_id;
522 return last_client_id;
532 struct epoll_event ev;
533 int timeout_ms = timeout_ns > 0 ?
static_cast<int>(timeout_ns / 1000000) : -1;
534 int n = epoll_wait(
fd_, &ev, 1, timeout_ms);
557 if (client_id < 0 ||
static_cast<size_t>(client_id) >=
client_fds_.size()) {
561 int fd =
client_fds_[
static_cast<size_t>(client_id)];
563 epoll_ctl(
fd_, EPOLL_CTL_DEL, fd,
nullptr);
int accept() override
Accept a new client connection (optional for some transports)
SocketServer(std::string socket_path, int initial_max_clients)
std::vector< std::vector< uint8_t > > recv_buffers_
bool listen() override
Start listening for client connections.
std::vector< int > client_fds_
void close() override
Close the server and all client connections.
std::unordered_map< int, int > fd_to_client_id_
void disconnect_client(int client_id)
bool send(int client_id, const void *data, size_t len) override
Send a message to a specific client.
std::span< const uint8_t > receive(int client_id) override
Receive next message from a specific client.
void release(int client_id, size_t message_size) override
Release/consume the previously received message.
int wait_for_data(uint64_t timeout_ns) override
Wait for data from any connected client.
const std::vector< MemoryValue > data
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept