Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
parallel_for_mutex_pool.cpp
Go to the documentation of this file.
4#ifndef NO_MULTITHREADING
5#include "log.hpp"
6#include "thread.hpp"
7#include <atomic>
8#include <condition_variable>
9#include <functional>
10#include <mutex>
11#include <queue>
12#include <thread>
13#include <vector>
14
16
17// Fix for https://github.com/AztecProtocol/aztec-packages/issues/19769
18// Zig's Mach-O linker (https://codeberg.org/ziglang/zig/issues/31461) misaligns
19// __thread_bss TLS template offsets when __thread_data is also present (from Rust
20// static libraries), causing x86_64-macos segfaults on any thread_local requiring
21// 16-byte alignment (e.g. std::mutex). Adding an alignas(16) initialized
22// thread_local forces __thread_data alignment to 16, ensuring __thread_bss starts
23// at a correctly aligned TLS template offset.
24// NOLINTBEGIN
25alignas(16) thread_local char tls_alignment_pad[16] __attribute__((used)) = { 1 };
26// NOLINTEND
27
28namespace {
29
30class ThreadPool {
31 public:
32 ThreadPool(size_t num_threads);
33 ThreadPool(const ThreadPool& other) = delete;
34 ThreadPool(ThreadPool&& other) = delete;
35 ~ThreadPool();
36
37 ThreadPool& operator=(const ThreadPool& other) = delete;
38 ThreadPool& operator=(ThreadPool&& other) = delete;
39
40 void start_tasks(size_t num_iterations, const std::function<void(size_t)>& func)
41 {
43 {
44 std::unique_lock<std::mutex> lock(tasks_mutex);
45 task_ = func;
46 num_iterations_ = num_iterations;
47 iteration_ = 0;
48 complete_ = 0;
49 }
50 condition.notify_all();
51
52 do_iterations();
53
54 {
55 // BB_BENCH_NAME("spinning main thread");
56 std::unique_lock<std::mutex> lock(tasks_mutex);
57 complete_condition_.wait(lock, [this] { return complete_ == num_iterations_; });
58 }
59 }
60
61 private:
63 std::vector<std::thread> workers;
64 std::mutex tasks_mutex;
65 std::function<void(size_t)> task_;
66 size_t num_iterations_ = 0;
67 size_t iteration_ = 0;
68 size_t complete_ = 0;
70 std::condition_variable complete_condition_;
71 bool stop = false;
72
73 BB_NO_PROFILE void worker_loop(size_t thread_index);
74
75 void do_iterations()
76 {
77 while (true) {
78 size_t iteration = 0;
79 {
80 std::unique_lock<std::mutex> lock(tasks_mutex);
81 if (iteration_ == num_iterations_) {
82 return;
83 }
84 iteration = iteration_++;
85 }
86 task_(iteration);
87 {
88 std::unique_lock<std::mutex> lock(tasks_mutex);
89 if (++complete_ == num_iterations_) {
90 complete_condition_.notify_one();
91 return;
92 }
93 }
94 }
95 }
96};
97
98ThreadPool::ThreadPool(size_t num_threads)
99{
100 workers.reserve(num_threads);
101 for (size_t i = 0; i < num_threads; ++i) {
102 workers.emplace_back(&ThreadPool::worker_loop, this, i);
103 }
104}
105
106ThreadPool::~ThreadPool()
107{
108 {
109 std::unique_lock<std::mutex> lock(tasks_mutex);
110 stop = true;
111 }
112 condition.notify_all();
113 for (auto& worker : workers) {
114 worker.join();
115 }
116}
117
118void ThreadPool::worker_loop([[maybe_unused]] size_t thread_index)
119{
120 // info("created worker ", thread_index);
121 while (true) {
122 {
123 std::unique_lock<std::mutex> lock(tasks_mutex);
124 condition.wait(lock, [this] { return (iteration_ < num_iterations_) || stop; });
125
126 if (stop) {
127 break;
128 }
129 }
130 // Make sure nested stats accounting works under multithreading
131 // Note: parent is a thread-local variable.
133 do_iterations();
134 }
135 // info("worker exit ", worker_num);
136}
137} // namespace
138
139namespace bb {
144void parallel_for_mutex_pool(size_t num_iterations, const std::function<void(size_t)>& func)
145{
146#ifdef __wasm__
147#define THREAD_LOCAL_MAYBE
148#else
149#define THREAD_LOCAL_MAYBE thread_local
150#endif
151
152 static THREAD_LOCAL_MAYBE ThreadPool pool(get_num_cpus() - 1);
153 static THREAD_LOCAL_MAYBE bool nested = false;
154
155 // If nested, fall back to serial execution
156 if (nested) {
157 for (size_t i = 0; i < num_iterations; ++i) {
158 func(i);
159 }
160 return;
161 }
162
163 nested = true;
164 pool.start_tasks(num_iterations, func);
165 nested = false;
166}
167} // namespace bb
168#endif
#define BB_NO_PROFILE
Entry point for Barretenberg command-line interface.
Definition api.hpp:5
void parallel_for_mutex_pool(size_t num_iterations, const std::function< void(size_t)> &func)
size_t get_num_cpus()
Definition thread.cpp:33
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
thread_local char tls_alignment_pad[16] __attribute__((used))
#define THREAD_LOCAL_MAYBE
static thread_local TimeStatsEntry * parent
Definition bb_bench.hpp:109