13#include <gtest/gtest.h>
28TEST(ShmTest, SingleClientSmallRingHighVolume)
30 constexpr size_t RING_SIZE = 2UL * 1024;
31 constexpr size_t NUM_ITERATIONS = 10000000;
33 constexpr size_t MAX_MSG_SIZE = (RING_SIZE / 2) - 4;
36 std::string wrap_test_shm =
"shm_wrap_" +
std::to_string(getpid());
38 ASSERT_TRUE(server->listen()) <<
"Wrap test server failed to listen";
40 std::atomic<bool> server_running{
true };
41 std::atomic<size_t> corruptions{ 0 };
44 std::thread server_thread([&]() {
49 int client_id = server->wait_for_data(10000000);
54 auto request_buf = server->receive(client_id);
57 if (request_buf.empty()) {
62 std::vector<uint8_t> request(request_buf.begin(), request_buf.end());
63 server->release(client_id, request.size());
67 if (request.size() > 0) {
68 uint8_t first = request[0];
69 for (
size_t i = 0; i < std::min(request.size(),
size_t(16)); i++) {
70 uint8_t expected =
static_cast<uint8_t
>((first ^ i) & 0xFF);
71 if (request[i] != expected) {
72 corruptions.fetch_add(1);
73 std::cerr <<
"Pattern mismatch at offset " << i <<
": expected=" << (int)expected
74 <<
" actual=" << (
int)request[i] <<
'\n';
81 while (!server->send(client_id, request.data(), request.size())) {
83 std::cerr << iter <<
" Server send size " << request.size() <<
" timeout, retrying..." <<
'\n';
84 dynamic_cast<ShmServer*
>(server.get())->debug_dump();
91 std::this_thread::sleep_for(std::chrono::milliseconds(300));
94 ASSERT_TRUE(client->connect());
102 std::vector<size_t> iteration_sizes(NUM_ITERATIONS);
103 for (
size_t i = 0; i < NUM_ITERATIONS; i++) {
104 iteration_sizes[i] = size_dist(gen);
109 std::thread sender_thread([&]() {
110 std::vector<uint8_t> send_buffer(MAX_MSG_SIZE);
112 for (
size_t iter = 0; iter < NUM_ITERATIONS; iter++) {
113 size_t size = iteration_sizes[iter];
118 uint8_t iter_byte =
static_cast<uint8_t
>(iter & 0xFF);
119 for (
size_t i = 0; i < size; i++) {
120 send_buffer[i] =
static_cast<uint8_t
>((iter_byte ^ i) & 0xFF);
124 while (!client->send(send_buffer.data(), size, 100000000)) {
126 std::cerr << iter <<
" Client send size " << size <<
" timeout, retrying..." <<
'\n';
127 dynamic_cast<ShmClient*
>(client.get())->debug_dump();
133 std::thread receiver_thread([&]() {
134 for (
size_t iter = 0; iter < NUM_ITERATIONS; iter++) {
135 size_t expected_size = iteration_sizes[iter];
139 while ((response = client->receive(100000000)).empty()) {
140 std::cerr << iter <<
" Client receive timeout, retrying..." <<
'\n';
145 ASSERT_EQ(response.size(), expected_size) <<
"Size mismatch at iteration " << iter;
148 uint8_t iter_byte =
static_cast<uint8_t
>(iter & 0xFF);
149 if (response.size() > 0) {
150 ASSERT_EQ(response[0], iter_byte) <<
"Iteration byte mismatch at iteration " << iter;
151 for (
size_t i = 0; i < response.size(); i++) {
152 uint8_t expected =
static_cast<uint8_t
>((iter_byte ^ i) & 0xFF);
153 if (response[i] != expected) {
154 FAIL() <<
"Data corruption at iteration " << iter <<
" offset " << i
155 <<
": expected=" << (int)expected <<
" actual=" << (
int)response[i];
160 client->release(response.size());
164 sender_thread.join();
165 receiver_thread.join();
169 server_running.store(
false);
170 server->request_shutdown();
171 server_thread.join();
174 EXPECT_EQ(corruptions.load(), 0) <<
"Corruptions detected in single-threaded wrap test";
224TEST(ShmTest, MpscEchoTwoClients)
226 constexpr size_t NUM_CLIENTS = 2;
227 constexpr size_t NUM_MESSAGES = 200;
228 constexpr size_t MSG_SIZE = 64;
229 constexpr size_t RING_SIZE = 4UL * 1024;
233 ASSERT_TRUE(server->listen()) <<
"MPSC server failed to listen";
235 std::atomic<bool> server_running{
true };
238 std::thread server_thread([&]() {
241 int client_id = server->wait_for_data(1000000);
245 auto request_buf = server->receive(client_id);
246 if (request_buf.empty()) {
249 std::vector<uint8_t> request(request_buf.begin(), request_buf.end());
250 server->release(client_id, request.size());
251 while (!server->send(client_id, request.data(), request.size())) {
257 std::this_thread::sleep_for(std::chrono::milliseconds(100));
259 auto run_client = [&](
size_t client_id) {
261 ASSERT_TRUE(client->connect()) <<
"Client " << client_id <<
" failed to connect";
263 for (
size_t iter = 0; iter < NUM_MESSAGES; iter++) {
264 std::vector<uint8_t> payload(MSG_SIZE);
266 payload[0] =
static_cast<uint8_t
>(client_id);
267 for (
size_t i = 1; i < MSG_SIZE; i++) {
268 payload[i] =
static_cast<uint8_t
>((client_id ^ iter ^ i) & 0xFF);
271 while (!client->send(payload.data(), payload.size(), 100000000)) {
276 while ((response = client->receive(100000000)).empty()) {
280 ASSERT_EQ(response.size(), MSG_SIZE) <<
"client " << client_id <<
" iter " << iter;
282 ASSERT_EQ(response[0],
static_cast<uint8_t
>(client_id))
283 <<
"client " << client_id <<
" got cross-client response at iter " << iter;
284 for (
size_t i = 1; i < MSG_SIZE; i++) {
285 uint8_t expected =
static_cast<uint8_t
>((client_id ^ iter ^ i) & 0xFF);
286 ASSERT_EQ(response[i], expected) <<
"client " << client_id <<
" iter " << iter <<
" offset " << i;
288 client->release(response.size());
293 std::vector<std::thread> client_threads;
294 client_threads.reserve(NUM_CLIENTS);
295 for (
size_t id = 0;
id < NUM_CLIENTS;
id++) {
296 client_threads.emplace_back(run_client,
id);
298 for (
auto& t : client_threads) {
302 server_running.store(
false);
303 server->request_shutdown();
304 server_thread.join();
TEST(acir_formal_proofs, uint_terms_add)
Tests 128-bit unsigned addition Verifies that the ACIR implementation of addition is correct Executio...
static std::unique_ptr< IpcClient > create_mpsc_shm(const std::string &base_name, size_t client_id)
static std::unique_ptr< IpcClient > create_shm(const std::string &base_name)
static std::unique_ptr< IpcServer > create_shm(const std::string &base_name, size_t request_ring_size=static_cast< size_t >(1024 *1024), size_t response_ring_size=static_cast< size_t >(1024 *1024))
static std::unique_ptr< IpcServer > create_mpsc_shm(const std::string &base_name, size_t max_clients, size_t request_ring_size=static_cast< size_t >(1024 *1024), size_t response_ring_size=static_cast< size_t >(1024 *1024))
IPC client implementation using shared memory.
IPC server implementation using shared memory.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
std::string to_string(bb::avm2::ValueTag tag)
Single-producer/single-consumer shared-memory ring buffer (Linux, x86-64 optimized)