ex/async_mutex.hpp

98.9% Lines (93/94) 100.0% Functions (20/20) 90.9% Branches (20/22)
ex/async_mutex.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 #define BOOST_CAPY_ASYNC_MUTEX_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/error.hpp>
17 #include <boost/capy/ex/io_env.hpp>
18 #include <boost/capy/io_result.hpp>
19
20 #include <stop_token>
21
22 #include <atomic>
23 #include <coroutine>
24 #include <new>
25 #include <utility>
26
27 /* async_mutex implementation notes
28 ================================
29
30 Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 async_mutex::waiters_.
33
34 Cancellation via stop_token
35 ---------------------------
36 A std::stop_callback is registered in await_suspend. Two actors can
37 race to resume the suspended coroutine: unlock() and the stop callback.
38 An atomic bool `claimed_` resolves the race -- whoever does
39 claimed_.exchange(true) and reads false wins. The loser does nothing.
40
41 The stop callback calls ex_.post(h_). The stop_callback is
42 destroyed later in await_resume. cancel_fn touches no members
43 after post returns (same pattern as delete-this).
44
45 unlock() pops waiters from the front. If the popped waiter was
46 already claimed by the stop callback, unlock() skips it and tries
47 the next. await_resume removes the (still-linked) canceled waiter
48 via waiters_.remove(this).
49
50 The stop_callback lives in a union to suppress automatic
51 construction/destruction. Placement new in await_suspend, explicit
52 destructor call in await_resume and ~lock_awaiter.
53
54 Member ordering constraint
55 --------------------------
56 The union containing stop_cb_ must be declared AFTER the members
57 the callback accesses (h_, ex_, claimed_, canceled_). If the
58 stop_cb_ destructor blocks waiting for a concurrent callback, those
59 members must still be alive (C++ destroys in reverse declaration
60 order).
61
62 active_ flag
63 ------------
64 Tracks both list membership and stop_cb_ lifetime (they are always
65 set and cleared together). Used by the destructor to clean up if the
66 coroutine is destroyed while suspended (e.g. execution_context
67 shutdown).
68
69 Cancellation scope
70 ------------------
71 Cancellation only takes effect while the coroutine is suspended in
72 the wait queue. If the mutex is unlocked, await_ready acquires it
73 immediately without checking the stop token. This is intentional:
74 the fast path has no token access and no overhead.
75
76 Threading assumptions
77 ---------------------
78 - All list mutations happen on the executor thread (await_suspend,
79 await_resume, unlock, ~lock_awaiter).
80 - The stop callback may fire from any thread, but only touches
81 claimed_ (atomic) and then calls post. It never touches the
82 list.
83 - ~lock_awaiter must be called from the executor thread. This is
84 guaranteed during normal shutdown but NOT if the coroutine frame
85 is destroyed from another thread while a stop callback could
86 fire (precondition violation, same as cppcoro/folly).
87 */
88
89 namespace boost {
90 namespace capy {
91
92 /** An asynchronous mutex for coroutines.
93
94 This mutex provides mutual exclusion for coroutines without blocking.
95 When a coroutine attempts to acquire a locked mutex, it suspends and
96 is added to an intrusive wait queue. When the holder unlocks, the next
97 waiter is resumed with the lock held.
98
99 @par Cancellation
100
101 When a coroutine is suspended waiting for the mutex and its stop
102 token is triggered, the waiter completes with `error::canceled`
103 instead of acquiring the lock.
104
105 Cancellation only applies while the coroutine is suspended in the
106 wait queue. If the mutex is unlocked when `lock()` is called, the
107 lock is acquired immediately even if the stop token is already
108 signaled.
109
110 @par Zero Allocation
111
112 No heap allocation occurs for lock operations.
113
114 @par Thread Safety
115
116 The mutex operations are designed for single-threaded use on one
117 executor. The stop callback may fire from any thread.
118
119 @par Example
120 @code
121 async_mutex cm;
122
123 task<> protected_operation() {
124 auto [ec] = co_await cm.lock();
125 if(ec)
126 co_return;
127 // ... critical section ...
128 cm.unlock();
129 }
130
131 // Or with RAII:
132 task<> protected_operation() {
133 auto [ec, guard] = co_await cm.scoped_lock();
134 if(ec)
135 co_return;
136 // ... critical section ...
137 // unlocks automatically
138 }
139 @endcode
140 */
141 class async_mutex
142 {
143 public:
144 class lock_awaiter;
145 class lock_guard;
146 class lock_guard_awaiter;
147
148 private:
149 bool locked_ = false;
150 detail::intrusive_list<lock_awaiter> waiters_;
151
152 public:
153 /** Awaiter returned by lock().
154 */
155 class lock_awaiter
156 : public detail::intrusive_list<lock_awaiter>::node
157 {
158 friend class async_mutex;
159
160 async_mutex* m_;
161 std::coroutine_handle<> h_;
162 executor_ref ex_;
163
164 // These members must be declared before stop_cb_
165 // (see comment on the union below).
166 std::atomic<bool> claimed_{false};
167 bool canceled_ = false;
168 bool active_ = false;
169
170 struct cancel_fn
171 {
172 lock_awaiter* self_;
173
174 6 void operator()() const noexcept
175 {
176
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 if(!self_->claimed_.exchange(
177 true, std::memory_order_acq_rel))
178 {
179 6 self_->canceled_ = true;
180 6 self_->ex_.post(self_->h_);
181 }
182 6 }
183 };
184
185 using stop_cb_t =
186 std::stop_callback<cancel_fn>;
187
188 // Aligned storage for stop_cb_t. Declared last:
189 // its destructor may block while the callback
190 // accesses the members above.
191 #ifdef _MSC_VER
192 # pragma warning(push)
193 # pragma warning(disable: 4324) // padded due to alignas
194 #endif
195 alignas(stop_cb_t)
196 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
197 #ifdef _MSC_VER
198 # pragma warning(pop)
199 #endif
200
201 17 stop_cb_t& stop_cb_() noexcept
202 {
203 return *reinterpret_cast<stop_cb_t*>(
204 17 stop_cb_buf_);
205 }
206
207 public:
208 70 ~lock_awaiter()
209 {
210
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 67 times.
70 if(active_)
211 {
212 3 stop_cb_().~stop_cb_t();
213 3 m_->waiters_.remove(this);
214 }
215 70 }
216
217 35 explicit lock_awaiter(async_mutex* m) noexcept
218 35 : m_(m)
219 {
220 35 }
221
222 35 lock_awaiter(lock_awaiter&& o) noexcept
223 35 : m_(o.m_)
224 35 , h_(o.h_)
225 35 , ex_(o.ex_)
226 35 , claimed_(o.claimed_.load(
227 std::memory_order_relaxed))
228 35 , canceled_(o.canceled_)
229 35 , active_(std::exchange(o.active_, false))
230 {
231 35 }
232
233 lock_awaiter(lock_awaiter const&) = delete;
234 lock_awaiter& operator=(lock_awaiter const&) = delete;
235 lock_awaiter& operator=(lock_awaiter&&) = delete;
236
237 35 bool await_ready() const noexcept
238 {
239
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 19 times.
35 if(!m_->locked_)
240 {
241 16 m_->locked_ = true;
242 16 return true;
243 }
244 19 return false;
245 }
246
247 /** IoAwaitable protocol overload. */
248 std::coroutine_handle<>
249 19 await_suspend(
250 std::coroutine_handle<> h,
251 io_env const* env) noexcept
252 {
253
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 17 times.
19 if(env->stop_token.stop_requested())
254 {
255 2 canceled_ = true;
256 2 return h;
257 }
258 17 h_ = h;
259 17 ex_ = env->executor;
260 17 m_->waiters_.push_back(this);
261 51 ::new(stop_cb_buf_) stop_cb_t(
262 17 env->stop_token, cancel_fn{this});
263 17 active_ = true;
264 17 return std::noop_coroutine();
265 }
266
267 32 io_result<> await_resume() noexcept
268 {
269
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 18 times.
32 if(active_)
270 {
271 14 stop_cb_().~stop_cb_t();
272
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 8 times.
14 if(canceled_)
273 {
274 6 m_->waiters_.remove(this);
275 6 active_ = false;
276 6 return {make_error_code(
277 6 error::canceled)};
278 }
279 8 active_ = false;
280 }
281
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 24 times.
26 if(canceled_)
282 2 return {make_error_code(
283 2 error::canceled)};
284 24 return {{}};
285 }
286 };
287
288 /** RAII lock guard for async_mutex.
289
290 Automatically unlocks the mutex when destroyed.
291 */
292 class [[nodiscard]] lock_guard
293 {
294 async_mutex* m_;
295
296 public:
297 5 ~lock_guard()
298 {
299
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5 if(m_)
300 2 m_->unlock();
301 5 }
302
303 2 lock_guard() noexcept
304 2 : m_(nullptr)
305 {
306 2 }
307
308 2 explicit lock_guard(async_mutex* m) noexcept
309 2 : m_(m)
310 {
311 2 }
312
313 1 lock_guard(lock_guard&& o) noexcept
314 1 : m_(std::exchange(o.m_, nullptr))
315 {
316 1 }
317
318 lock_guard& operator=(lock_guard&& o) noexcept
319 {
320 if(this != &o)
321 {
322 if(m_)
323 m_->unlock();
324 m_ = std::exchange(o.m_, nullptr);
325 }
326 return *this;
327 }
328
329 lock_guard(lock_guard const&) = delete;
330 lock_guard& operator=(lock_guard const&) = delete;
331 };
332
333 /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
334 */
335 class lock_guard_awaiter
336 {
337 async_mutex* m_;
338 lock_awaiter inner_;
339
340 public:
341 4 explicit lock_guard_awaiter(async_mutex* m) noexcept
342 4 : m_(m)
343 4 , inner_(m)
344 {
345 4 }
346
347 4 bool await_ready() const noexcept
348 {
349 4 return inner_.await_ready();
350 }
351
352 /** IoAwaitable protocol overload. */
353 std::coroutine_handle<>
354 2 await_suspend(
355 std::coroutine_handle<> h,
356 io_env const* env) noexcept
357 {
358 2 return inner_.await_suspend(h, env);
359 }
360
361 4 io_result<lock_guard> await_resume() noexcept
362 {
363 4 auto r = inner_.await_resume();
364
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if(r.ec)
365 2 return {r.ec, {}};
366 2 return {{}, lock_guard(m_)};
367 4 }
368 };
369
370 async_mutex() = default;
371
372 // Non-copyable, non-movable
373 async_mutex(async_mutex const&) = delete;
374 async_mutex& operator=(async_mutex const&) = delete;
375
376 /** Returns an awaiter that acquires the mutex.
377
378 @return An awaitable yielding `(error_code)`.
379 */
380 31 lock_awaiter lock() noexcept
381 {
382 31 return lock_awaiter{this};
383 }
384
385 /** Returns an awaiter that acquires the mutex with RAII.
386
387 @return An awaitable yielding `(error_code,lock_guard)`.
388 */
389 4 lock_guard_awaiter scoped_lock() noexcept
390 {
391 4 return lock_guard_awaiter(this);
392 }
393
394 /** Releases the mutex.
395
396 If waiters are queued, the next eligible waiter is
397 resumed with the lock held. Canceled waiters are
398 skipped. If no eligible waiter remains, the mutex
399 becomes unlocked.
400 */
401 24 void unlock() noexcept
402 {
403 for(;;)
404 {
405 24 auto* waiter = waiters_.pop_front();
406
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 8 times.
24 if(!waiter)
407 {
408 16 locked_ = false;
409 16 return;
410 }
411
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 if(!waiter->claimed_.exchange(
412 true, std::memory_order_acq_rel))
413 {
414 8 waiter->ex_.post(waiter->h_);
415 8 return;
416 }
417 }
418 }
419
420 /** Returns true if the mutex is currently locked.
421 */
422 26 bool is_locked() const noexcept
423 {
424 26 return locked_;
425 }
426 };
427
428 } // namespace capy
429 } // namespace boost
430
431 #endif
432