Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
mpsc_shm_server.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "ipc_server.hpp"
4#include "shm/mpsc_shm.hpp"
5#include "shm/spsc_shm.hpp"
6#include "shm_common.hpp"
7#include <cstdint>
8#include <iostream>
9#include <optional>
10#include <string>
11#include <utility>
12#include <vector>
13
14namespace bb::ipc {
15
26class MpscShmServer : public IpcServer {
27 public:
28 static constexpr size_t DEFAULT_RING_SIZE = 1 << 20; // 1MB
29
30 MpscShmServer(std::string base_name,
31 size_t max_clients,
32 size_t request_ring_size = DEFAULT_RING_SIZE,
33 size_t response_ring_size = DEFAULT_RING_SIZE)
34 : base_name_(std::move(base_name))
35 , max_clients_(max_clients)
36 , request_ring_size_(request_ring_size)
37 , response_ring_size_(response_ring_size)
38 {}
39
40 ~MpscShmServer() override { close(); }
41
42 // Non-copyable, non-movable
43 MpscShmServer(const MpscShmServer&) = delete;
47
48 bool listen() override
49 {
50 if (request_consumer_.has_value()) {
51 return true; // Already listening
52 }
53
54 // Clean up any leftover shared memory
56 for (size_t i = 0; i < max_clients_; i++) {
58 }
59
60 try {
61 // Create MPSC consumer for requests (one ring per client)
63
64 // Create per-client SPSC response rings
66 for (size_t i = 0; i < max_clients_; i++) {
67 std::string resp_name = base_name_ + "_resp_" + std::to_string(i);
69 }
70
71 return true;
72 } catch (...) {
73 close();
74 return false;
75 }
76 }
77
78 int wait_for_data(uint64_t timeout_ns) override
79 {
80 if (!request_consumer_.has_value()) {
81 return -1;
82 }
83 // MpscConsumer::wait_for_data returns ring index = client_id
84 return request_consumer_->wait_for_data(static_cast<uint32_t>(timeout_ns));
85 }
86
87 std::span<const uint8_t> receive(int client_id) override
88 {
89 if (!request_consumer_.has_value() || client_id < 0 || static_cast<size_t>(client_id) >= max_clients_) {
90 return {};
91 }
92 // Peek on the specific client's request ring via MpscConsumer
93 void* len_ptr = request_consumer_->peek(static_cast<size_t>(client_id), sizeof(uint32_t), 100000000);
94 if (len_ptr == nullptr) {
95 return {};
96 }
97 uint32_t msg_len = 0;
98 std::memcpy(&msg_len, len_ptr, sizeof(uint32_t));
99
100 void* msg_ptr = request_consumer_->peek(static_cast<size_t>(client_id), sizeof(uint32_t) + msg_len, 100000000);
101 if (msg_ptr == nullptr) {
102 return {};
103 }
104 return std::span<const uint8_t>(static_cast<const uint8_t*>(msg_ptr) + sizeof(uint32_t), msg_len);
105 }
106
107 void release(int client_id, size_t message_size) override
108 {
109 if (!request_consumer_.has_value() || client_id < 0 || static_cast<size_t>(client_id) >= max_clients_) {
110 return;
111 }
112 request_consumer_->release(static_cast<size_t>(client_id), sizeof(uint32_t) + message_size);
113 }
114
115 bool send(int client_id, const void* data, size_t len) override
116 {
117 if (client_id < 0 || static_cast<size_t>(client_id) >= response_rings_.size()) {
118 return false;
119 }
120 return ring_send_msg(response_rings_[static_cast<size_t>(client_id)], data, len, 100000000);
121 }
122
123 void close() override
124 {
125 request_consumer_.reset();
126 response_rings_.clear();
127
128 // Clean up shared memory
130 for (size_t i = 0; i < max_clients_; i++) {
132 }
133 }
134
135 void wakeup_all() override
136 {
137 if (request_consumer_.has_value()) {
138 request_consumer_->wakeup_all();
139 }
140 for (auto& ring : response_rings_) {
141 ring.wakeup_all();
142 }
143 }
144
145 private:
146 std::string base_name_;
152};
153
154} // namespace bb::ipc
Abstract interface for IPC server.
static MpscConsumer create(const std::string &name, size_t num_producers, size_t ring_capacity)
Create MPSC consumer.
Definition mpsc_shm.cpp:78
static bool unlink(const std::string &name, size_t num_producers)
Unlink all shared memory for this MPSC system.
Definition mpsc_shm.cpp:138
IPC server implementation using shared memory with multi-client support.
std::span< const uint8_t > receive(int client_id) override
Receive next message from a specific client.
std::optional< MpscConsumer > request_consumer_
std::vector< SpscShm > response_rings_
void release(int client_id, size_t message_size) override
Release/consume the previously received message.
MpscShmServer & operator=(MpscShmServer &&)=delete
void wakeup_all() override
Wake all blocked threads (for graceful shutdown)
MpscShmServer & operator=(const MpscShmServer &)=delete
MpscShmServer(std::string base_name, size_t max_clients, size_t request_ring_size=DEFAULT_RING_SIZE, size_t response_ring_size=DEFAULT_RING_SIZE)
bool listen() override
Start listening for client connections.
MpscShmServer(const MpscShmServer &)=delete
bool send(int client_id, const void *data, size_t len) override
Send a message to a specific client.
void close() override
Close the server and all client connections.
int wait_for_data(uint64_t timeout_ns) override
Wait for data from any connected client.
MpscShmServer(MpscShmServer &&)=delete
static constexpr size_t DEFAULT_RING_SIZE
static bool unlink(const std::string &name)
Unlink shared memory object (cleanup after close)
Definition spsc_shm.cpp:192
static SpscShm create(const std::string &name, size_t min_capacity)
Create a new SPSC ring buffer.
Definition spsc_shm.cpp:96
const std::vector< MemoryValue > data
Multi-Producer Single-Consumer via SPSC rings + doorbell futex.
bool ring_send_msg(SpscShm &ring, const void *data, size_t len, uint64_t timeout_ns)
STL namespace.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
std::string to_string(bb::avm2::ValueTag tag)
uint8_t len
Single-producer/single-consumer shared-memory ring buffer (Linux, x86-64 optimized)