LCOV - code coverage report
Current view: top level - capy/ex - async_event.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 68 68
Test Date: 2026-02-12 22:58:59 Functions: 100.0 % 13 13

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
      11              : #define BOOST_CAPY_ASYNC_EVENT_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/intrusive.hpp>
      15              : #include <boost/capy/concept/executor.hpp>
      16              : #include <boost/capy/error.hpp>
      17              : #include <boost/capy/ex/io_env.hpp>
      18              : #include <boost/capy/io_result.hpp>
      19              : 
      20              : #include <stop_token>
      21              : 
      22              : #include <atomic>
      23              : #include <coroutine>
      24              : #include <new>
      25              : #include <utility>
      26              : 
      27              : /*  async_event implementation notes
      28              :     =================================
      29              : 
      30              :     Same cancellation pattern as async_mutex (see that file for the
      31              :     full discussion on claimed_, stop_cb lifetime, member ordering,
      32              :     and threading assumptions).
      33              : 
      34              :     Key difference: set() wakes ALL waiters (broadcast), not one.
      35              :     It pops every waiter from the list and posts the ones it
      36              :     claims. Waiters already claimed by a stop callback are skipped.
      37              : 
      38              :     Because set() pops all waiters, a canceled waiter may have been
      39              :     removed from the list by set() before its await_resume runs.
      40              :     This requires a separate in_list_ flag (unlike async_mutex where
      41              :     active_ served double duty). await_resume only calls remove()
      42              :     when in_list_ is true.
      43              : */
      44              : 
      45              : namespace boost {
      46              : namespace capy {
      47              : 
      48              : /** An asynchronous event for coroutines.
      49              : 
      50              :     This event provides a way to notify multiple coroutines that some
      51              :     condition has occurred. When a coroutine awaits an unset event, it
      52              :     suspends and is added to a wait queue. When the event is set, all
      53              :     waiting coroutines are resumed.
      54              : 
      55              :     @par Cancellation
      56              : 
      57              :     When a coroutine is suspended waiting for the event and its stop
      58              :     token is triggered, the waiter completes with `error::canceled`
      59              :     instead of waiting for `set()`.
      60              : 
      61              :     Cancellation only applies while the coroutine is suspended in the
      62              :     wait queue. If the event is already set when `wait()` is called,
      63              :     the wait completes immediately even if the stop token is already
      64              :     signaled.
      65              : 
      66              :     @par Zero Allocation
      67              : 
      68              :     No heap allocation occurs for wait operations.
      69              : 
      70              :     @par Thread Safety
      71              : 
      72              :     The event operations are designed for single-threaded use on one
      73              :     executor. The stop callback may fire from any thread.
      74              : 
      75              :     @par Example
      76              :     @code
      77              :     async_event event;
      78              : 
      79              :     task<> waiter() {
      80              :         auto [ec] = co_await event.wait();
      81              :         if(ec)
      82              :             co_return;
      83              :         // ... event was set ...
      84              :     }
      85              : 
      86              :     task<> notifier() {
      87              :         // ... do some work ...
      88              :         event.set();  // Wake all waiters
      89              :     }
      90              :     @endcode
      91              : */
      92              : class async_event
      93              : {
      94              : public:
      95              :     class wait_awaiter;
      96              : 
      97              : private:
      98              :     bool set_ = false;
      99              :     detail::intrusive_list<wait_awaiter> waiters_;
     100              : 
     101              : public:
     102              :     /** Awaiter returned by wait().
     103              :     */
     104              :     class wait_awaiter
     105              :         : public detail::intrusive_list<wait_awaiter>::node
     106              :     {
     107              :         friend class async_event;
     108              : 
     109              :         async_event* e_;
     110              :         std::coroutine_handle<> h_;
     111              :         executor_ref ex_;
     112              : 
     113              :         // Declared before stop_cb_buf_: the callback
     114              :         // accesses these members, so they must still be
     115              :         // alive if the stop_cb_ destructor blocks.
     116              :         std::atomic<bool> claimed_{false};
     117              :         bool canceled_ = false;
     118              :         bool active_ = false;
     119              :         bool in_list_ = false;
     120              : 
     121              :         struct cancel_fn
     122              :         {
     123              :             wait_awaiter* self_;
     124              : 
     125           21 :             void operator()() const noexcept
     126              :             {
     127           21 :                 if(!self_->claimed_.exchange(
     128              :                     true, std::memory_order_acq_rel))
     129              :                 {
     130           20 :                     self_->canceled_ = true;
     131           20 :                     self_->ex_.post(self_->h_);
     132              :                 }
     133           21 :             }
     134              :         };
     135              : 
     136              :         using stop_cb_t =
     137              :             std::stop_callback<cancel_fn>;
     138              : 
     139              :         // Aligned storage for stop_cb_t. Declared last:
     140              :         // its destructor may block while the callback
     141              :         // accesses the members above.
     142              : #ifdef _MSC_VER
     143              : # pragma warning(push)
     144              : # pragma warning(disable: 4324) // padded due to alignas
     145              : #endif
     146              :         alignas(stop_cb_t)
     147              :             unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
     148              : #ifdef _MSC_VER
     149              : # pragma warning(pop)
     150              : #endif
     151              : 
     152           37 :         stop_cb_t& stop_cb_() noexcept
     153              :         {
     154              :             return *reinterpret_cast<stop_cb_t*>(
     155           37 :                 stop_cb_buf_);
     156              :         }
     157              : 
     158              :     public:
     159          251 :         ~wait_awaiter()
     160              :         {
     161          251 :             if(active_)
     162            1 :                 stop_cb_().~stop_cb_t();
     163          251 :             if(in_list_)
     164            1 :                 e_->waiters_.remove(this);
     165          251 :         }
     166              : 
     167           57 :         explicit wait_awaiter(async_event* e) noexcept
     168           57 :             : e_(e)
     169              :         {
     170           57 :         }
     171              : 
     172          194 :         wait_awaiter(wait_awaiter&& o) noexcept
     173          194 :             : e_(o.e_)
     174          194 :             , h_(o.h_)
     175          194 :             , ex_(o.ex_)
     176          194 :             , claimed_(o.claimed_.load(
     177              :                 std::memory_order_relaxed))
     178          194 :             , canceled_(o.canceled_)
     179          194 :             , active_(std::exchange(o.active_, false))
     180          194 :             , in_list_(std::exchange(o.in_list_, false))
     181              :         {
     182          194 :         }
     183              : 
     184              :         wait_awaiter(wait_awaiter const&) = delete;
     185              :         wait_awaiter& operator=(wait_awaiter const&) = delete;
     186              :         wait_awaiter& operator=(wait_awaiter&&) = delete;
     187              : 
     188           57 :         bool await_ready() const noexcept
     189              :         {
     190           57 :             return e_->set_;
     191              :         }
     192              : 
     193              :         /** IoAwaitable protocol overload. */
     194              :         std::coroutine_handle<>
     195           47 :         await_suspend(
     196              :             std::coroutine_handle<> h,
     197              :             io_env const* env) noexcept
     198              :         {
     199           47 :             if(env->stop_token.stop_requested())
     200              :             {
     201           10 :                 canceled_ = true;
     202           10 :                 return h;
     203              :             }
     204           37 :             h_ = h;
     205           37 :             ex_ = env->executor;
     206           37 :             e_->waiters_.push_back(this);
     207           37 :             in_list_ = true;
     208          111 :             ::new(stop_cb_buf_) stop_cb_t(
     209           37 :                 env->stop_token, cancel_fn{this});
     210           37 :             active_ = true;
     211           37 :             return std::noop_coroutine();
     212              :         }
     213              : 
     214           54 :         io_result<> await_resume() noexcept
     215              :         {
     216           54 :             if(active_)
     217              :             {
     218           36 :                 stop_cb_().~stop_cb_t();
     219           36 :                 active_ = false;
     220              :             }
     221           54 :             if(canceled_)
     222              :             {
     223           30 :                 if(in_list_)
     224              :                 {
     225           20 :                     e_->waiters_.remove(this);
     226           20 :                     in_list_ = false;
     227              :                 }
     228           30 :                 return {make_error_code(
     229           30 :                     error::canceled)};
     230              :             }
     231           24 :             return {{}};
     232              :         }
     233              :     };
     234              : 
     235           20 :     async_event() = default;
     236              : 
     237              :     // Non-copyable, non-movable
     238              :     async_event(async_event const&) = delete;
     239              :     async_event& operator=(async_event const&) = delete;
     240              : 
     241              :     /** Returns an awaiter that waits until the event is set.
     242              : 
     243              :         If the event is already set, completes immediately.
     244              : 
     245              :         @return An awaitable yielding `(error_code)`.
     246              :     */
     247           57 :     wait_awaiter wait() noexcept
     248              :     {
     249           57 :         return wait_awaiter{this};
     250              :     }
     251              : 
     252              :     /** Sets the event.
     253              : 
     254              :         All waiting coroutines are resumed. Canceled waiters
     255              :         are skipped. Subsequent calls to wait() complete
     256              :         immediately until clear() is called.
     257              :     */
     258           23 :     void set()
     259              :     {
     260           23 :         set_ = true;
     261              :         for(;;)
     262              :         {
     263           39 :             auto* w = waiters_.pop_front();
     264           39 :             if(!w)
     265           23 :                 break;
     266           16 :             w->in_list_ = false;
     267           16 :             if(!w->claimed_.exchange(
     268              :                 true, std::memory_order_acq_rel))
     269              :             {
     270           16 :                 w->ex_.post(w->h_);
     271              :             }
     272           16 :         }
     273           23 :     }
     274              : 
     275              :     /** Clears the event.
     276              : 
     277              :         Subsequent calls to wait() will suspend until
     278              :         set() is called again.
     279              :     */
     280            2 :     void clear() noexcept
     281              :     {
     282            2 :         set_ = false;
     283            2 :     }
     284              : 
     285              :     /** Returns true if the event is currently set.
     286              :     */
     287            9 :     bool is_set() const noexcept
     288              :     {
     289            9 :         return set_;
     290              :     }
     291              : };
     292              : 
     293              : } // namespace capy
     294              : } // namespace boost
     295              : 
     296              : #endif
        

Generated by: LCOV version 2.3