Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 : #define BOOST_CAPY_ASYNC_MUTEX_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/error.hpp>
17 : #include <boost/capy/ex/io_env.hpp>
18 : #include <boost/capy/io_result.hpp>
19 :
20 : #include <stop_token>
21 :
22 : #include <atomic>
23 : #include <coroutine>
24 : #include <new>
25 : #include <utility>
26 :
27 : /* async_mutex implementation notes
28 : ================================
29 :
30 : Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 : inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 : async_mutex::waiters_.
33 :
34 : Cancellation via stop_token
35 : ---------------------------
36 : A std::stop_callback is registered in await_suspend. Two actors can
37 : race to resume the suspended coroutine: unlock() and the stop callback.
38 : An atomic bool `claimed_` resolves the race -- whoever does
39 : claimed_.exchange(true) and reads false wins. The loser does nothing.
40 :
41 : The stop callback calls ex_.post(h_). The stop_callback is
42 : destroyed later in await_resume. cancel_fn touches no members
43 : after post returns (same pattern as delete-this).
44 :
45 : unlock() pops waiters from the front. If the popped waiter was
46 : already claimed by the stop callback, unlock() skips it and tries
47 : the next. await_resume removes the (still-linked) canceled waiter
48 : via waiters_.remove(this).
49 :
50 : The stop_callback lives in a union to suppress automatic
51 : construction/destruction. Placement new in await_suspend, explicit
52 : destructor call in await_resume and ~lock_awaiter.
53 :
54 : Member ordering constraint
55 : --------------------------
56 : The union containing stop_cb_ must be declared AFTER the members
57 : the callback accesses (h_, ex_, claimed_, canceled_). If the
58 : stop_cb_ destructor blocks waiting for a concurrent callback, those
59 : members must still be alive (C++ destroys in reverse declaration
60 : order).
61 :
62 : active_ flag
63 : ------------
64 : Tracks both list membership and stop_cb_ lifetime (they are always
65 : set and cleared together). Used by the destructor to clean up if the
66 : coroutine is destroyed while suspended (e.g. execution_context
67 : shutdown).
68 :
69 : Cancellation scope
70 : ------------------
71 : Cancellation only takes effect while the coroutine is suspended in
72 : the wait queue. If the mutex is unlocked, await_ready acquires it
73 : immediately without checking the stop token. This is intentional:
74 : the fast path has no token access and no overhead.
75 :
76 : Threading assumptions
77 : ---------------------
78 : - All list mutations happen on the executor thread (await_suspend,
79 : await_resume, unlock, ~lock_awaiter).
80 : - The stop callback may fire from any thread, but only touches
81 : claimed_ (atomic) and then calls post. It never touches the
82 : list.
83 : - ~lock_awaiter must be called from the executor thread. This is
84 : guaranteed during normal shutdown but NOT if the coroutine frame
85 : is destroyed from another thread while a stop callback could
86 : fire (precondition violation, same as cppcoro/folly).
87 : */
88 :
89 : namespace boost {
90 : namespace capy {
91 :
92 : /** An asynchronous mutex for coroutines.
93 :
94 : This mutex provides mutual exclusion for coroutines without blocking.
95 : When a coroutine attempts to acquire a locked mutex, it suspends and
96 : is added to an intrusive wait queue. When the holder unlocks, the next
97 : waiter is resumed with the lock held.
98 :
99 : @par Cancellation
100 :
101 : When a coroutine is suspended waiting for the mutex and its stop
102 : token is triggered, the waiter completes with `error::canceled`
103 : instead of acquiring the lock.
104 :
105 : Cancellation only applies while the coroutine is suspended in the
106 : wait queue. If the mutex is unlocked when `lock()` is called, the
107 : lock is acquired immediately even if the stop token is already
108 : signaled.
109 :
110 : @par Zero Allocation
111 :
112 : No heap allocation occurs for lock operations.
113 :
114 : @par Thread Safety
115 :
116 : The mutex operations are designed for single-threaded use on one
117 : executor. The stop callback may fire from any thread.
118 :
119 : @par Example
120 : @code
121 : async_mutex cm;
122 :
123 : task<> protected_operation() {
124 : auto [ec] = co_await cm.lock();
125 : if(ec)
126 : co_return;
127 : // ... critical section ...
128 : cm.unlock();
129 : }
130 :
131 : // Or with RAII:
132 : task<> protected_operation() {
133 : auto [ec, guard] = co_await cm.scoped_lock();
134 : if(ec)
135 : co_return;
136 : // ... critical section ...
137 : // unlocks automatically
138 : }
139 : @endcode
140 : */
141 : class async_mutex
142 : {
143 : public:
144 : class lock_awaiter;
145 : class lock_guard;
146 : class lock_guard_awaiter;
147 :
148 : private:
149 : bool locked_ = false;
150 : detail::intrusive_list<lock_awaiter> waiters_;
151 :
152 : public:
153 : /** Awaiter returned by lock().
154 : */
155 : class lock_awaiter
156 : : public detail::intrusive_list<lock_awaiter>::node
157 : {
158 : friend class async_mutex;
159 :
160 : async_mutex* m_;
161 : std::coroutine_handle<> h_;
162 : executor_ref ex_;
163 :
164 : // These members must be declared before stop_cb_
165 : // (see comment on the union below).
166 : std::atomic<bool> claimed_{false};
167 : bool canceled_ = false;
168 : bool active_ = false;
169 :
170 : struct cancel_fn
171 : {
172 : lock_awaiter* self_;
173 :
174 6 : void operator()() const noexcept
175 : {
176 6 : if(!self_->claimed_.exchange(
177 : true, std::memory_order_acq_rel))
178 : {
179 6 : self_->canceled_ = true;
180 6 : self_->ex_.post(self_->h_);
181 : }
182 6 : }
183 : };
184 :
185 : using stop_cb_t =
186 : std::stop_callback<cancel_fn>;
187 :
188 : // Aligned storage for stop_cb_t. Declared last:
189 : // its destructor may block while the callback
190 : // accesses the members above.
191 : #ifdef _MSC_VER
192 : # pragma warning(push)
193 : # pragma warning(disable: 4324) // padded due to alignas
194 : #endif
195 : alignas(stop_cb_t)
196 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
197 : #ifdef _MSC_VER
198 : # pragma warning(pop)
199 : #endif
200 :
201 17 : stop_cb_t& stop_cb_() noexcept
202 : {
203 : return *reinterpret_cast<stop_cb_t*>(
204 17 : stop_cb_buf_);
205 : }
206 :
207 : public:
208 70 : ~lock_awaiter()
209 : {
210 70 : if(active_)
211 : {
212 3 : stop_cb_().~stop_cb_t();
213 3 : m_->waiters_.remove(this);
214 : }
215 70 : }
216 :
217 35 : explicit lock_awaiter(async_mutex* m) noexcept
218 35 : : m_(m)
219 : {
220 35 : }
221 :
222 35 : lock_awaiter(lock_awaiter&& o) noexcept
223 35 : : m_(o.m_)
224 35 : , h_(o.h_)
225 35 : , ex_(o.ex_)
226 35 : , claimed_(o.claimed_.load(
227 : std::memory_order_relaxed))
228 35 : , canceled_(o.canceled_)
229 35 : , active_(std::exchange(o.active_, false))
230 : {
231 35 : }
232 :
233 : lock_awaiter(lock_awaiter const&) = delete;
234 : lock_awaiter& operator=(lock_awaiter const&) = delete;
235 : lock_awaiter& operator=(lock_awaiter&&) = delete;
236 :
237 35 : bool await_ready() const noexcept
238 : {
239 35 : if(!m_->locked_)
240 : {
241 16 : m_->locked_ = true;
242 16 : return true;
243 : }
244 19 : return false;
245 : }
246 :
247 : /** IoAwaitable protocol overload. */
248 : std::coroutine_handle<>
249 19 : await_suspend(
250 : std::coroutine_handle<> h,
251 : io_env const* env) noexcept
252 : {
253 19 : if(env->stop_token.stop_requested())
254 : {
255 2 : canceled_ = true;
256 2 : return h;
257 : }
258 17 : h_ = h;
259 17 : ex_ = env->executor;
260 17 : m_->waiters_.push_back(this);
261 51 : ::new(stop_cb_buf_) stop_cb_t(
262 17 : env->stop_token, cancel_fn{this});
263 17 : active_ = true;
264 17 : return std::noop_coroutine();
265 : }
266 :
267 32 : io_result<> await_resume() noexcept
268 : {
269 32 : if(active_)
270 : {
271 14 : stop_cb_().~stop_cb_t();
272 14 : if(canceled_)
273 : {
274 6 : m_->waiters_.remove(this);
275 6 : active_ = false;
276 6 : return {make_error_code(
277 6 : error::canceled)};
278 : }
279 8 : active_ = false;
280 : }
281 26 : if(canceled_)
282 2 : return {make_error_code(
283 2 : error::canceled)};
284 24 : return {{}};
285 : }
286 : };
287 :
288 : /** RAII lock guard for async_mutex.
289 :
290 : Automatically unlocks the mutex when destroyed.
291 : */
292 : class [[nodiscard]] lock_guard
293 : {
294 : async_mutex* m_;
295 :
296 : public:
297 5 : ~lock_guard()
298 : {
299 5 : if(m_)
300 2 : m_->unlock();
301 5 : }
302 :
303 2 : lock_guard() noexcept
304 2 : : m_(nullptr)
305 : {
306 2 : }
307 :
308 2 : explicit lock_guard(async_mutex* m) noexcept
309 2 : : m_(m)
310 : {
311 2 : }
312 :
313 1 : lock_guard(lock_guard&& o) noexcept
314 1 : : m_(std::exchange(o.m_, nullptr))
315 : {
316 1 : }
317 :
318 : lock_guard& operator=(lock_guard&& o) noexcept
319 : {
320 : if(this != &o)
321 : {
322 : if(m_)
323 : m_->unlock();
324 : m_ = std::exchange(o.m_, nullptr);
325 : }
326 : return *this;
327 : }
328 :
329 : lock_guard(lock_guard const&) = delete;
330 : lock_guard& operator=(lock_guard const&) = delete;
331 : };
332 :
333 : /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
334 : */
335 : class lock_guard_awaiter
336 : {
337 : async_mutex* m_;
338 : lock_awaiter inner_;
339 :
340 : public:
341 4 : explicit lock_guard_awaiter(async_mutex* m) noexcept
342 4 : : m_(m)
343 4 : , inner_(m)
344 : {
345 4 : }
346 :
347 4 : bool await_ready() const noexcept
348 : {
349 4 : return inner_.await_ready();
350 : }
351 :
352 : /** IoAwaitable protocol overload. */
353 : std::coroutine_handle<>
354 2 : await_suspend(
355 : std::coroutine_handle<> h,
356 : io_env const* env) noexcept
357 : {
358 2 : return inner_.await_suspend(h, env);
359 : }
360 :
361 4 : io_result<lock_guard> await_resume() noexcept
362 : {
363 4 : auto r = inner_.await_resume();
364 4 : if(r.ec)
365 2 : return {r.ec, {}};
366 2 : return {{}, lock_guard(m_)};
367 4 : }
368 : };
369 :
370 : async_mutex() = default;
371 :
372 : // Non-copyable, non-movable
373 : async_mutex(async_mutex const&) = delete;
374 : async_mutex& operator=(async_mutex const&) = delete;
375 :
376 : /** Returns an awaiter that acquires the mutex.
377 :
378 : @return An awaitable yielding `(error_code)`.
379 : */
380 31 : lock_awaiter lock() noexcept
381 : {
382 31 : return lock_awaiter{this};
383 : }
384 :
385 : /** Returns an awaiter that acquires the mutex with RAII.
386 :
387 : @return An awaitable yielding `(error_code,lock_guard)`.
388 : */
389 4 : lock_guard_awaiter scoped_lock() noexcept
390 : {
391 4 : return lock_guard_awaiter(this);
392 : }
393 :
394 : /** Releases the mutex.
395 :
396 : If waiters are queued, the next eligible waiter is
397 : resumed with the lock held. Canceled waiters are
398 : skipped. If no eligible waiter remains, the mutex
399 : becomes unlocked.
400 : */
401 24 : void unlock() noexcept
402 : {
403 : for(;;)
404 : {
405 24 : auto* waiter = waiters_.pop_front();
406 24 : if(!waiter)
407 : {
408 16 : locked_ = false;
409 16 : return;
410 : }
411 8 : if(!waiter->claimed_.exchange(
412 : true, std::memory_order_acq_rel))
413 : {
414 8 : waiter->ex_.post(waiter->h_);
415 8 : return;
416 : }
417 0 : }
418 : }
419 :
420 : /** Returns true if the mutex is currently locked.
421 : */
422 26 : bool is_locked() const noexcept
423 : {
424 26 : return locked_;
425 : }
426 : };
427 :
428 : } // namespace capy
429 : } // namespace boost
430 :
431 : #endif
|