Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
wsdb_ipc_server.cpp
Go to the documentation of this file.
8
9#include <csignal>
10#include <cstdint>
11#include <iostream>
12#include <memory>
13#include <string>
14#include <thread>
15#include <unistd.h>
16#include <unordered_map>
17#include <vector>
18
19#ifdef __linux__
20#include <sys/prctl.h>
21#elif defined(__APPLE__)
22#include <sys/event.h>
23#endif
24
25// Use nlohmann/json if available, otherwise minimal parsing
26#include <sstream>
27
28namespace bb::wsdb {
29
30using namespace bb::world_state;
31using namespace bb::crypto::merkle_tree;
32
33// ---------------------------------------------------------------------------
34// Platform-specific parent death monitoring
35// (Same pattern as api_msgpack.cpp)
36// ---------------------------------------------------------------------------
37
38static void setup_parent_death_monitoring()
39{
40#ifdef __linux__
41 if (prctl(PR_SET_PDEATHSIG, SIGTERM) == -1) {
42 std::cerr << "Warning: Could not set parent death signal" << '\n';
43 }
44#elif defined(__APPLE__)
45 pid_t parent_pid = getppid();
46 std::thread([parent_pid]() {
47 int kq = kqueue();
48 if (kq == -1) {
49 std::cerr << "Warning: Could not create kqueue for parent monitoring" << '\n';
50 return;
51 }
52 struct kevent change;
53 EV_SET(&change, parent_pid, EVFILT_PROC, EV_ADD | EV_ENABLE, NOTE_EXIT, 0, nullptr);
54 if (kevent(kq, &change, 1, nullptr, 0, nullptr) == -1) {
55 std::cerr << "Warning: Could not monitor parent process" << '\n';
56 close(kq);
57 return;
58 }
59 struct kevent event;
60 kevent(kq, nullptr, 0, &event, 1, nullptr);
61 std::cerr << "Parent process exited, shutting down..." << '\n';
62 close(kq);
63 std::exit(0);
64 }).detach();
65#endif
66}
67
68// ---------------------------------------------------------------------------
69// Simple JSON-like parsing for config maps
70// Parses "{0:1024,1:2048,...}" into unordered_map<uint32_t, uint64_t>
71// ---------------------------------------------------------------------------
72
73static std::unordered_map<MerkleTreeId, uint64_t> parse_tree_uint64_map(const std::string& json)
74{
76 if (json.empty()) {
77 return result;
78 }
79 std::string cleaned;
80 for (char c : json) {
81 if (c != '{' && c != '}' && c != ' ') {
82 cleaned += c;
83 }
84 }
85 std::istringstream ss(cleaned);
86 std::string pair;
87 while (std::getline(ss, pair, ',')) {
88 auto colon_pos = pair.find(':');
89 if (colon_pos != std::string::npos) {
90 auto key = static_cast<MerkleTreeId>(std::stoi(pair.substr(0, colon_pos)));
91 auto value = static_cast<uint64_t>(std::stoull(pair.substr(colon_pos + 1)));
92 result[key] = value;
93 }
94 }
95 return result;
96}
97
98static std::unordered_map<MerkleTreeId, uint32_t> parse_tree_uint32_map(const std::string& json)
99{
100 std::unordered_map<MerkleTreeId, uint32_t> result;
101 if (json.empty()) {
102 return result;
103 }
104 auto u64_map = parse_tree_uint64_map(json);
105 for (const auto& [k, v] : u64_map) {
106 result[k] = static_cast<uint32_t>(v);
107 }
108 return result;
109}
110
111static std::unordered_map<MerkleTreeId, index_t> parse_tree_index_map(const std::string& json)
112{
113 std::unordered_map<MerkleTreeId, index_t> result;
114 if (json.empty()) {
115 return result;
116 }
117 auto u64_map = parse_tree_uint64_map(json);
118 for (const auto& [k, v] : u64_map) {
119 result[k] = static_cast<index_t>(v);
120 }
121 return result;
122}
123
124// ---------------------------------------------------------------------------
125// Parse prefilled public data from JSON: [["slot_hex","value_hex"],...]
126// Each hex string is a 64-char (32-byte) hex-encoded field element.
127// ---------------------------------------------------------------------------
128
129static fr hex_to_fr(const std::string& hex)
130{
131 std::string cleaned = hex;
132 if (cleaned.size() >= 2 && cleaned[0] == '0' && (cleaned[1] == 'x' || cleaned[1] == 'X')) {
133 cleaned = cleaned.substr(2);
134 }
135 return fr(cleaned);
136}
137
138static std::vector<PublicDataLeafValue> parse_prefilled_public_data(const std::string& json)
139{
141 if (json.empty() || json == "[]") {
142 return result;
143 }
144
145 // Simple state-machine parser for [["hex","hex"],["hex","hex"],...]
146 std::vector<std::string> hex_values;
147 std::string current;
148 bool in_string = false;
149
150 for (char c : json) {
151 if (c == '"') {
152 in_string = !in_string;
153 } else if (in_string) {
154 current += c;
155 } else if ((c == ',' || c == ']') && !current.empty()) {
156 hex_values.push_back(std::move(current));
157 current.clear();
158 }
159 }
160
161 // hex_values should have pairs: slot, value, slot, value, ...
162 if (hex_values.size() % 2 != 0) {
163 std::cerr << "Warning: odd number of hex values in prefilled public data, ignoring last" << '\n';
164 }
165 for (size_t i = 0; i + 1 < hex_values.size(); i += 2) {
166 result.emplace_back(hex_to_fr(hex_values[i]), hex_to_fr(hex_values[i + 1]));
167 }
168 return result;
169}
170
171// ---------------------------------------------------------------------------
172// IPC server execution
173// ---------------------------------------------------------------------------
174
175int execute_wsdb_server(const std::string& input_path,
176 const std::string& data_dir,
177 const std::string& tree_heights_json,
178 const std::string& tree_prefill_json,
179 const std::string& map_sizes_json,
180 uint32_t threads,
181 uint32_t initial_header_generator_point,
182 const std::string& prefilled_public_data_json,
183 uint64_t genesis_timestamp,
184 size_t request_ring_size,
185 size_t response_ring_size)
186{
187 const uint64_t DEFAULT_MAP_SIZE = 1024UL * 1024;
188
189 // Parse config
190 auto tree_height = parse_tree_uint32_map(tree_heights_json);
191 auto tree_prefill = parse_tree_index_map(tree_prefill_json);
192
194 { MerkleTreeId::ARCHIVE, DEFAULT_MAP_SIZE },
195 { MerkleTreeId::NULLIFIER_TREE, DEFAULT_MAP_SIZE },
196 { MerkleTreeId::NOTE_HASH_TREE, DEFAULT_MAP_SIZE },
197 { MerkleTreeId::PUBLIC_DATA_TREE, DEFAULT_MAP_SIZE },
198 { MerkleTreeId::L1_TO_L2_MESSAGE_TREE, DEFAULT_MAP_SIZE },
199 };
200 if (!map_sizes_json.empty()) {
201 auto parsed = parse_tree_uint64_map(map_sizes_json);
202 for (const auto& [k, v] : parsed) {
203 map_size[k] = v;
204 }
205 }
206
207 // Parse prefilled public data: JSON array of ["slot_hex","value_hex"] pairs
208 std::vector<PublicDataLeafValue> prefilled_public_data;
209 if (!prefilled_public_data_json.empty()) {
210 prefilled_public_data = parse_prefilled_public_data(prefilled_public_data_json);
211 std::cerr << "Parsed " << prefilled_public_data.size() << " prefilled public data entries" << '\n';
212 }
213
214 // Create WorldState
215 std::cerr << "Creating WorldState at " << data_dir << " with " << threads << " threads" << '\n';
216 auto ws = std::make_unique<WorldState>(threads,
217 data_dir,
218 map_size,
219 tree_height,
220 tree_prefill,
221 prefilled_public_data,
222 initial_header_generator_point,
223 genesis_timestamp);
224
225 WsdbRequest request{ .world_state = *ws };
226
227 // Create IPC server based on path suffix
229
230 if (input_path.size() >= 4 && input_path.substr(input_path.size() - 4) == ".shm") {
231 std::string base_name = input_path.substr(0, input_path.size() - 4);
232 constexpr size_t MAX_SHM_CLIENTS = 2; // TS backend (client 0) + AVM binary (client 1)
233 server = ipc::IpcServer::create_mpsc_shm(base_name, MAX_SHM_CLIENTS, request_ring_size, response_ring_size);
234 std::cerr << "MPSC shared memory server at " << base_name << " (max " << MAX_SHM_CLIENTS << " clients)\n";
235 } else if (input_path.size() >= 5 && input_path.substr(input_path.size() - 5) == ".sock") {
236 server = ipc::IpcServer::create_socket(input_path, 1);
237 std::cerr << "Socket server at " << input_path << '\n';
238 } else {
239 std::cerr << "Error: --input path must end with .sock or .shm" << '\n';
240 return 1;
241 }
242
243 // Set up signal handlers
244 static ipc::IpcServer* global_server = server.get();
245
246 auto graceful_shutdown_handler = [](int signal) {
247 std::cerr << "\nReceived signal " << signal << ", shutting down gracefully..." << '\n';
248 if (global_server) {
249 global_server->request_shutdown();
250 }
251 };
252
253 auto fatal_error_handler = [](int signal) {
254 const char* signal_name = (signal == SIGBUS) ? "SIGBUS" : (signal == SIGSEGV) ? "SIGSEGV" : "UNKNOWN";
255 std::cerr << "\nFatal error: received " << signal_name << '\n';
256 if (global_server) {
257 global_server->close();
258 }
259 std::exit(1);
260 };
261
262 (void)std::signal(SIGTERM, graceful_shutdown_handler);
263 (void)std::signal(SIGINT, graceful_shutdown_handler);
264 (void)std::signal(SIGBUS, fatal_error_handler);
265 (void)std::signal(SIGSEGV, fatal_error_handler);
266
267 setup_parent_death_monitoring();
268
269 if (!server->listen()) {
270 std::cerr << "Error: Could not start IPC server" << '\n';
271 return 1;
272 }
273
274 std::cerr << "aztec-wsdb IPC server ready" << '\n';
275
276 // Run server with wsdb command handler
277 server->run([&request](int client_id, std::span<const uint8_t> raw_request) -> std::vector<uint8_t> {
278 try {
279 // Deserialize msgpack command
280 // Format: [["CommandName", {payload}]] - a 1-element tuple containing the NamedUnion
281 auto unpacked = msgpack::unpack(reinterpret_cast<const char*>(raw_request.data()), raw_request.size());
282 auto obj = unpacked.get();
283
284 // Expect array of size 1 (tuple wrapping)
285 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
286 if (obj.type != msgpack::type::ARRAY || obj.via.array.size != 1) {
287 std::cerr << "Error: Expected array of size 1 from client " << client_id << '\n';
288 return {};
289 }
290
291 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
292 auto& command_obj = obj.via.array.ptr[0];
293
294 // Check for shutdown before converting
295 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
296 if (command_obj.type == msgpack::type::ARRAY && command_obj.via.array.size == 2 &&
297 command_obj.via.array.ptr[0].type == msgpack::type::STR) {
298 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
299 std::string_view command_name(command_obj.via.array.ptr[0].via.str.ptr,
300 command_obj.via.array.ptr[0].via.str.size);
301 bool is_shutdown = (command_name == "WsdbShutdown");
302
303 // Convert and execute
304 WsdbCommand command;
305 command_obj.convert(command);
306 auto response = wsdb(request, std::move(command));
307
308 // Serialize response
309 msgpack::sbuffer response_buffer;
310 msgpack::pack(response_buffer, response);
311 std::vector<uint8_t> result(response_buffer.data(), response_buffer.data() + response_buffer.size());
312
313 if (is_shutdown) {
314 throw ipc::ShutdownRequested(std::move(result));
315 }
316
317 return result;
318 }
319
320 // Fallback: try converting directly
321 WsdbCommand command;
322 command_obj.convert(command);
323 auto response = wsdb(request, std::move(command));
324
325 msgpack::sbuffer response_buffer;
326 msgpack::pack(response_buffer, response);
327 return std::vector<uint8_t>(response_buffer.data(), response_buffer.data() + response_buffer.size());
328
329 } catch (const ipc::ShutdownRequested&) {
330 throw;
331 } catch (const std::exception& e) {
332 std::cerr << "Error processing request from client " << client_id << ": " << e.what() << '\n';
333 std::cerr.flush();
334
335 WsdbErrorResponse error_response{ .message = std::string(e.what()) };
336 WsdbCommandResponse response = error_response;
337
338 msgpack::sbuffer response_buffer;
339 msgpack::pack(response_buffer, response);
340 return std::vector<uint8_t>(response_buffer.data(), response_buffer.data() + response_buffer.size());
341 }
342 });
343
344 server->close();
345 return 0;
346}
347
348} // namespace bb::wsdb
A wrapper around std::variant that provides msgpack serialization based on type names.
Abstract interface for IPC server.
virtual void close()=0
Close the server and all client connections.
static std::unique_ptr< IpcServer > create_socket(const std::string &socket_path, int max_clients)
virtual void request_shutdown()
Request graceful shutdown.
static std::unique_ptr< IpcServer > create_mpsc_shm(const std::string &base_name, size_t max_clients, size_t request_ring_size=static_cast< size_t >(1024 *1024), size_t response_ring_size=static_cast< size_t >(1024 *1024))
Exception thrown by handler to signal graceful shutdown.
const uint64_t DEFAULT_MAP_SIZE
WsdbCommandResponse wsdb(WsdbRequest &request, WsdbCommand &&command)
Top-level wsdb API entry point. Takes a WsdbRequest and dispatches the command.
int execute_wsdb_server(const std::string &input_path, const std::string &data_dir, const std::string &tree_heights_json, const std::string &tree_prefill_json, const std::string &map_sizes_json, uint32_t threads, uint32_t initial_header_generator_point, const std::string &prefilled_public_data_json, uint64_t genesis_timestamp, size_t request_ring_size, size_t response_ring_size)
Start the aztec-wsdb IPC server.
field< Bn254FrParams > fr
Definition fr.hpp:155
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
pair(A, B) -> pair< unwrap_ref_decay_t< A >, unwrap_ref_decay_t< B > >
simulation::PublicDataTreeReadWriteEvent event
Error response returned when a command fails.
Context passed to each command's execute() method, providing access to the WorldState.
world_state::WorldState & world_state
WsdbCommand NamedUnion, WsdbRequest context, and dispatch function.