LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 99.3 % 145 144
Test Date: 2026-02-12 22:58:59 Functions: 93.4 % 518 484

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Michael Vandeberg
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      11              : #define BOOST_CAPY_WHEN_ANY_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/concept/executor.hpp>
      15              : #include <boost/capy/concept/io_awaitable.hpp>
      16              : #include <coroutine>
      17              : #include <boost/capy/ex/executor_ref.hpp>
      18              : #include <boost/capy/ex/frame_allocator.hpp>
      19              : #include <boost/capy/ex/io_env.hpp>
      20              : #include <boost/capy/task.hpp>
      21              : 
      22              : #include <array>
      23              : #include <atomic>
      24              : #include <exception>
      25              : #include <optional>
      26              : #include <ranges>
      27              : #include <stdexcept>
      28              : #include <stop_token>
      29              : #include <tuple>
      30              : #include <type_traits>
      31              : #include <utility>
      32              : #include <variant>
      33              : #include <vector>
      34              : 
      35              : /*
      36              :    when_any - Race multiple tasks, return first completion
      37              :    ========================================================
      38              : 
      39              :    OVERVIEW:
      40              :    ---------
      41              :    when_any launches N tasks concurrently and completes when the FIRST task
      42              :    finishes (success or failure). It then requests stop for all siblings and
      43              :    waits for them to acknowledge before returning.
      44              : 
      45              :    ARCHITECTURE:
      46              :    -------------
      47              :    The design mirrors when_all but with inverted completion semantics:
      48              : 
      49              :      when_all:  complete when remaining_count reaches 0 (all done)
      50              :      when_any:  complete when has_winner becomes true (first done)
      51              :                 BUT still wait for remaining_count to reach 0 for cleanup
      52              : 
      53              :    Key components:
      54              :      - when_any_state:    Shared state tracking winner and completion
      55              :      - when_any_runner:   Wrapper coroutine for each child task
      56              :      - when_any_launcher: Awaitable that starts all runners concurrently
      57              : 
      58              :    CRITICAL INVARIANTS:
      59              :    --------------------
      60              :    1. Exactly one task becomes the winner (via atomic compare_exchange)
      61              :    2. All tasks must complete before parent resumes (cleanup safety)
      62              :    3. Stop is requested immediately when winner is determined
      63              :    4. Only the winner's result/exception is stored
      64              : 
      65              :    TYPE DEDUPLICATION:
      66              :    -------------------
      67              :    std::variant requires unique alternative types. Since when_any can race
      68              :    tasks with identical return types (e.g., three task<int>), we must
      69              :    deduplicate types before constructing the variant.
      70              : 
      71              :    Example: when_any(task<int>, task<string>, task<int>)
      72              :      - Raw types after void->monostate: int, string, int
      73              :      - Deduplicated variant: std::variant<int, string>
      74              :      - Return: pair<size_t, variant<int, string>>
      75              : 
      76              :    The winner_index tells you which task won (0, 1, or 2), while the variant
      77              :    holds the result. Use the index to determine how to interpret the variant.
      78              : 
      79              :    VOID HANDLING:
      80              :    --------------
      81              :    void tasks contribute std::monostate to the variant (then deduplicated).
      82              :    All-void tasks result in: pair<size_t, variant<monostate>>
      83              : 
      84              :    MEMORY MODEL:
      85              :    -------------
      86              :    Synchronization chain from winner's write to parent's read:
      87              : 
      88              :    1. Winner thread writes result_/winner_exception_ (non-atomic)
      89              :    2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
      90              :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      91              :       → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      92              :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      93              :    5. Parent coroutine resumes and reads result_/winner_exception_
      94              : 
      95              :    Synchronization analysis:
      96              :    - All fetch_sub operations on remaining_count_ form a release sequence
      97              :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      98              :      in the modification order of remaining_count_
      99              :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
     100              :      modification order, establishing happens-before from winner's writes
     101              :    - Executor dispatch() is expected to provide queue-based synchronization
     102              :      (release-on-post, acquire-on-execute) completing the chain to parent
     103              :    - Even inline executors work (same thread = sequenced-before)
     104              : 
     105              :    Alternative considered: Adding winner_ready_ atomic (set with release after
     106              :    storing winner data, acquired before reading) would make synchronization
     107              :    self-contained and not rely on executor implementation details. Current
     108              :    approach is correct but requires careful reasoning about release sequences
     109              :    and executor behavior.
     110              : 
     111              :    EXCEPTION SEMANTICS:
     112              :    --------------------
     113              :    Unlike when_all (which captures first exception, discards others), when_any
     114              :    treats exceptions as valid completions. If the winning task threw, that
     115              :    exception is rethrown. Exceptions from non-winners are silently discarded.
     116              : */
     117              : 
     118              : namespace boost {
     119              : namespace capy {
     120              : 
     121              : namespace detail {
     122              : 
     123              : /** Convert void to monostate for variant storage.
     124              : 
     125              :     std::variant<void, ...> is ill-formed, so void tasks contribute
     126              :     std::monostate to the result variant instead. Non-void types
     127              :     pass through unchanged.
     128              : 
     129              :     @tparam T The type to potentially convert (void becomes monostate).
     130              : */
     131              : template<typename T>
     132              : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
     133              : 
     134              : // Type deduplication: std::variant requires unique alternative types.
     135              : // Fold left over the type list, appending each type only if not already present.
     136              : template<typename Variant, typename T>
     137              : struct variant_append_if_unique;
     138              : 
     139              : template<typename... Vs, typename T>
     140              : struct variant_append_if_unique<std::variant<Vs...>, T>
     141              : {
     142              :     using type = std::conditional_t<
     143              :         (std::is_same_v<T, Vs> || ...),
     144              :         std::variant<Vs...>,
     145              :         std::variant<Vs..., T>>;
     146              : };
     147              : 
     148              : template<typename Accumulated, typename... Remaining>
     149              : struct deduplicate_impl;
     150              : 
     151              : template<typename Accumulated>
     152              : struct deduplicate_impl<Accumulated>
     153              : {
     154              :     using type = Accumulated;
     155              : };
     156              : 
     157              : template<typename Accumulated, typename T, typename... Rest>
     158              : struct deduplicate_impl<Accumulated, T, Rest...>
     159              : {
     160              :     using next = typename variant_append_if_unique<Accumulated, T>::type;
     161              :     using type = typename deduplicate_impl<next, Rest...>::type;
     162              : };
     163              : 
     164              : // Deduplicated variant; void types become monostate before deduplication
     165              : template<typename T0, typename... Ts>
     166              : using unique_variant_t = typename deduplicate_impl<
     167              :     std::variant<void_to_monostate_t<T0>>,
     168              :     void_to_monostate_t<Ts>...>::type;
     169              : 
     170              : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
     171              : // when multiple tasks share the same return type.
     172              : template<typename T0, typename... Ts>
     173              : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
     174              : 
     175              : /** Core shared state for when_any operations.
     176              : 
     177              :     Contains all members and methods common to both heterogeneous (variadic)
     178              :     and homogeneous (range) when_any implementations. State classes embed
     179              :     this via composition to avoid CRTP destructor ordering issues.
     180              : 
     181              :     @par Thread Safety
     182              :     Atomic operations protect winner selection and completion count.
     183              : */
     184              : struct when_any_core
     185              : {
     186              :     std::atomic<std::size_t> remaining_count_;
     187              :     std::size_t winner_index_{0};
     188              :     std::exception_ptr winner_exception_;
     189              :     std::stop_source stop_source_;
     190              : 
     191              :     // Bridges parent's stop token to our stop_source
     192              :     struct stop_callback_fn
     193              :     {
     194              :         std::stop_source* source_;
     195            9 :         void operator()() const noexcept { source_->request_stop(); }
     196              :     };
     197              :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     198              :     std::optional<stop_callback_t> parent_stop_callback_;
     199              : 
     200              :     std::coroutine_handle<> continuation_;
     201              :     io_env const* caller_env_ = nullptr;
     202              : 
     203              :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     204              :     std::atomic<bool> has_winner_{false};
     205              : 
     206           65 :     explicit when_any_core(std::size_t count) noexcept
     207           65 :         : remaining_count_(count)
     208              :     {
     209           65 :     }
     210              : 
     211              :     /** Atomically claim winner status; exactly one task succeeds. */
     212          190 :     bool try_win(std::size_t index) noexcept
     213              :     {
     214          190 :         bool expected = false;
     215          190 :         if(has_winner_.compare_exchange_strong(
     216              :             expected, true, std::memory_order_acq_rel))
     217              :         {
     218           65 :             winner_index_ = index;
     219           65 :             stop_source_.request_stop();
     220           65 :             return true;
     221              :         }
     222          125 :         return false;
     223              :     }
     224              : 
     225              :     /** @pre try_win() returned true. */
     226            8 :     void set_winner_exception(std::exception_ptr ep) noexcept
     227              :     {
     228            8 :         winner_exception_ = ep;
     229            8 :     }
     230              : 
     231              :     // Runners signal completion directly via final_suspend; no member function needed.
     232              : };
     233              : 
     234              : /** Shared state for heterogeneous when_any operation.
     235              : 
     236              :     Coordinates winner selection, result storage, and completion tracking
     237              :     for all child tasks in a when_any operation. Uses composition with
     238              :     when_any_core for shared functionality.
     239              : 
     240              :     @par Lifetime
     241              :     Allocated on the parent coroutine's frame, outlives all runners.
     242              : 
     243              :     @tparam T0 First task's result type.
     244              :     @tparam Ts Remaining tasks' result types.
     245              : */
     246              : template<typename T0, typename... Ts>
     247              : struct when_any_state
     248              : {
     249              :     static constexpr std::size_t task_count = 1 + sizeof...(Ts);
     250              :     using variant_type = unique_variant_t<T0, Ts...>;
     251              : 
     252              :     when_any_core core_;
     253              :     std::optional<variant_type> result_;
     254              :     std::array<std::coroutine_handle<>, task_count> runner_handles_{};
     255              : 
     256           43 :     when_any_state()
     257           43 :         : core_(task_count)
     258              :     {
     259           43 :     }
     260              : 
     261              :     // Runners self-destruct in final_suspend. No destruction needed here.
     262              : 
     263              :     /** @pre core_.try_win() returned true.
     264              :         @note Uses in_place_type (not index) because variant is deduplicated.
     265              :     */
     266              :     template<typename T>
     267           35 :     void set_winner_result(T value)
     268              :         noexcept(std::is_nothrow_move_constructible_v<T>)
     269              :     {
     270           35 :         result_.emplace(std::in_place_type<T>, std::move(value));
     271           35 :     }
     272              : 
     273              :     /** @pre core_.try_win() returned true. */
     274            3 :     void set_winner_void() noexcept
     275              :     {
     276            3 :         result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
     277            3 :     }
     278              : };
     279              : 
     280              : /** Wrapper coroutine that runs a single child task for when_any.
     281              : 
     282              :     Propagates executor/stop_token to the child, attempts to claim winner
     283              :     status on completion, and signals completion for cleanup coordination.
     284              : 
     285              :     @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
     286              : */
     287              : template<typename StateType>
     288              : struct when_any_runner
     289              : {
     290              :     struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
     291              :     {
     292              :         StateType* state_ = nullptr;
     293              :         std::size_t index_ = 0;
     294              :         io_env env_;
     295              : 
     296          190 :         when_any_runner get_return_object() noexcept
     297              :         {
     298          190 :             return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
     299              :         }
     300              : 
     301              :         // Starts suspended; launcher sets up state/ex/token then resumes
     302          190 :         std::suspend_always initial_suspend() noexcept
     303              :         {
     304          190 :             return {};
     305              :         }
     306              : 
     307          190 :         auto final_suspend() noexcept
     308              :         {
     309              :             struct awaiter
     310              :             {
     311              :                 promise_type* p_;
     312          190 :                 bool await_ready() const noexcept { return false; }
     313          190 :                 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
     314              :                 {
     315              :                     // Extract everything needed before self-destruction.
     316          190 :                     auto& core = p_->state_->core_;
     317          190 :                     auto* counter = &core.remaining_count_;
     318          190 :                     auto* caller_env = core.caller_env_;
     319          190 :                     auto cont = core.continuation_;
     320              : 
     321          190 :                     h.destroy();
     322              : 
     323              :                     // If last runner, dispatch parent for symmetric transfer.
     324          190 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     325          190 :                     if(remaining == 1)
     326           65 :                         return caller_env->executor.dispatch(cont);
     327          125 :                     return std::noop_coroutine();
     328              :                 }
     329            0 :                 void await_resume() const noexcept {}
     330              :             };
     331          190 :             return awaiter{this};
     332              :         }
     333              : 
     334          178 :         void return_void() noexcept {}
     335              : 
     336              :         // Exceptions are valid completions in when_any (unlike when_all)
     337           12 :         void unhandled_exception()
     338              :         {
     339           12 :             if(state_->core_.try_win(index_))
     340            8 :                 state_->core_.set_winner_exception(std::current_exception());
     341           12 :         }
     342              : 
     343              :         /** Injects executor and stop token into child awaitables. */
     344              :         template<class Awaitable>
     345              :         struct transform_awaiter
     346              :         {
     347              :             std::decay_t<Awaitable> a_;
     348              :             promise_type* p_;
     349              : 
     350          190 :             bool await_ready() { return a_.await_ready(); }
     351          190 :             auto await_resume() { return a_.await_resume(); }
     352              : 
     353              :             template<class Promise>
     354          185 :             auto await_suspend(std::coroutine_handle<Promise> h)
     355              :             {
     356          185 :                 return a_.await_suspend(h, &p_->env_);
     357              :             }
     358              :         };
     359              : 
     360              :         template<class Awaitable>
     361          190 :         auto await_transform(Awaitable&& a)
     362              :         {
     363              :             using A = std::decay_t<Awaitable>;
     364              :             if constexpr (IoAwaitable<A>)
     365              :             {
     366              :                 return transform_awaiter<Awaitable>{
     367          380 :                     std::forward<Awaitable>(a), this};
     368              :             }
     369              :             else
     370              :             {
     371              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     372              :             }
     373          190 :         }
     374              :     };
     375              : 
     376              :     std::coroutine_handle<promise_type> h_;
     377              : 
     378          190 :     explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
     379          190 :         : h_(h)
     380              :     {
     381          190 :     }
     382              : 
     383              :     // Enable move for all clang versions - some versions need it
     384              :     when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
     385              : 
     386              :     // Non-copyable
     387              :     when_any_runner(when_any_runner const&) = delete;
     388              :     when_any_runner& operator=(when_any_runner const&) = delete;
     389              :     when_any_runner& operator=(when_any_runner&&) = delete;
     390              : 
     391          190 :     auto release() noexcept
     392              :     {
     393          190 :         return std::exchange(h_, nullptr);
     394              :     }
     395              : };
     396              : 
     397              : /** Wraps a child awaitable, attempts to claim winner on completion.
     398              : 
     399              :     Uses requires-expressions to detect state capabilities:
     400              :     - set_winner_void(): for heterogeneous void tasks (stores monostate)
     401              :     - set_winner_result(): for non-void tasks
     402              :     - Neither: for homogeneous void tasks (no result storage)
     403              : */
     404              : template<IoAwaitable Awaitable, typename StateType>
     405              : when_any_runner<StateType>
     406          190 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
     407              : {
     408              :     using T = awaitable_result_t<Awaitable>;
     409              :     if constexpr (std::is_void_v<T>)
     410              :     {
     411              :         co_await std::move(inner);
     412              :         if(state->core_.try_win(index))
     413              :         {
     414              :             // Heterogeneous void tasks store monostate in the variant
     415              :             if constexpr (requires { state->set_winner_void(); })
     416              :                 state->set_winner_void();
     417              :             // Homogeneous void tasks have no result to store
     418              :         }
     419              :     }
     420              :     else
     421              :     {
     422              :         auto result = co_await std::move(inner);
     423              :         if(state->core_.try_win(index))
     424              :         {
     425              :             // Defensive: move should not throw (already moved once), but we
     426              :             // catch just in case since an uncaught exception would be devastating.
     427              :             try
     428              :             {
     429              :                 state->set_winner_result(std::move(result));
     430              :             }
     431              :             catch(...)
     432              :             {
     433              :                 state->core_.set_winner_exception(std::current_exception());
     434              :             }
     435              :         }
     436              :     }
     437          380 : }
     438              : 
     439              : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     440              : template<IoAwaitable... Awaitables>
     441              : class when_any_launcher
     442              : {
     443              :     using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
     444              : 
     445              :     std::tuple<Awaitables...>* tasks_;
     446              :     state_type* state_;
     447              : 
     448              : public:
     449           43 :     when_any_launcher(
     450              :         std::tuple<Awaitables...>* tasks,
     451              :         state_type* state)
     452           43 :         : tasks_(tasks)
     453           43 :         , state_(state)
     454              :     {
     455           43 :     }
     456              : 
     457           43 :     bool await_ready() const noexcept
     458              :     {
     459           43 :         return sizeof...(Awaitables) == 0;
     460              :     }
     461              : 
     462              :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     463              :         destroys this object before await_suspend returns. Must not reference
     464              :         `this` after the final launch_one call.
     465              :     */
     466           43 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     467              :     {
     468           43 :         state_->core_.continuation_ = continuation;
     469           43 :         state_->core_.caller_env_ = caller_env;
     470              : 
     471           43 :         if(caller_env->stop_token.stop_possible())
     472              :         {
     473           18 :             state_->core_.parent_stop_callback_.emplace(
     474            9 :                 caller_env->stop_token,
     475            9 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     476              : 
     477            9 :             if(caller_env->stop_token.stop_requested())
     478            3 :                 state_->core_.stop_source_.request_stop();
     479              :         }
     480              : 
     481           43 :         auto token = state_->core_.stop_source_.get_token();
     482           86 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     483           43 :             (..., launch_one<Is>(caller_env->executor, token));
     484           43 :         }(std::index_sequence_for<Awaitables...>{});
     485              : 
     486           86 :         return std::noop_coroutine();
     487           43 :     }
     488              : 
     489           43 :     void await_resume() const noexcept
     490              :     {
     491           43 :     }
     492              : 
     493              : private:
     494              :     /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
     495              :     template<std::size_t I>
     496          105 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     497              :     {
     498          105 :         auto runner = make_when_any_runner(
     499          105 :             std::move(std::get<I>(*tasks_)), state_, I);
     500              : 
     501          105 :         auto h = runner.release();
     502          105 :         h.promise().state_ = state_;
     503          105 :         h.promise().index_ = I;
     504          105 :         h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->allocator};
     505              : 
     506          105 :         std::coroutine_handle<> ch{h};
     507          105 :         state_->runner_handles_[I] = ch;
     508          105 :         caller_ex.post(ch);
     509          210 :     }
     510              : };
     511              : 
     512              : } // namespace detail
     513              : 
     514              : /** Wait for the first awaitable to complete.
     515              : 
     516              :     Races multiple heterogeneous awaitables concurrently and returns when the
     517              :     first one completes. The result includes the winner's index and a
     518              :     deduplicated variant containing the result value.
     519              : 
     520              :     @par Suspends
     521              :     The calling coroutine suspends when co_await is invoked. All awaitables
     522              :     are launched concurrently and execute in parallel. The coroutine resumes
     523              :     only after all awaitables have completed, even though the winner is
     524              :     determined by the first to finish.
     525              : 
     526              :     @par Completion Conditions
     527              :     @li Winner is determined when the first awaitable completes (success or exception)
     528              :     @li Only one task can claim winner status via atomic compare-exchange
     529              :     @li Once a winner exists, stop is requested for all remaining siblings
     530              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     531              :     @li The winner's result is returned; if the winner threw, the exception is rethrown
     532              : 
     533              :     @par Cancellation Semantics
     534              :     Cancellation is supported via stop_token propagated through the
     535              :     IoAwaitable protocol:
     536              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     537              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     538              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     539              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     540              :     @li Stop requests are cooperative; tasks must check and respond to them
     541              : 
     542              :     @par Concurrency/Overlap
     543              :     All awaitables are launched concurrently before any can complete.
     544              :     The launcher iterates through the arguments, starting each task on the
     545              :     caller's executor. Tasks may execute in parallel on multi-threaded
     546              :     executors or interleave on single-threaded executors. There is no
     547              :     guaranteed ordering of task completion.
     548              : 
     549              :     @par Notable Error Conditions
     550              :     @li Winner exception: if the winning task threw, that exception is rethrown
     551              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     552              :     @li Cancellation: tasks may complete via cancellation without throwing
     553              : 
     554              :     @par Example
     555              :     @code
     556              :     task<void> example() {
     557              :         auto [index, result] = co_await when_any(
     558              :             fetch_from_primary(),   // task<Response>
     559              :             fetch_from_backup()     // task<Response>
     560              :         );
     561              :         // index is 0 or 1, result holds the winner's Response
     562              :         auto response = std::get<Response>(result);
     563              :     }
     564              :     @endcode
     565              : 
     566              :     @par Example with Heterogeneous Types
     567              :     @code
     568              :     task<void> mixed_types() {
     569              :         auto [index, result] = co_await when_any(
     570              :             fetch_int(),      // task<int>
     571              :             fetch_string()    // task<std::string>
     572              :         );
     573              :         if (index == 0)
     574              :             std::cout << "Got int: " << std::get<int>(result) << "\n";
     575              :         else
     576              :             std::cout << "Got string: " << std::get<std::string>(result) << "\n";
     577              :     }
     578              :     @endcode
     579              : 
     580              :     @tparam A0 First awaitable type (must satisfy IoAwaitable).
     581              :     @tparam As Remaining awaitable types (must satisfy IoAwaitable).
     582              :     @param a0 The first awaitable to race.
     583              :     @param as Additional awaitables to race concurrently.
     584              :     @return A task yielding a pair of (winner_index, result_variant).
     585              : 
     586              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     587              : 
     588              :     @par Remarks
     589              :     Awaitables are moved into the coroutine frame; original objects become
     590              :     empty after the call. When multiple awaitables share the same return type,
     591              :     the variant is deduplicated to contain only unique types. Use the winner
     592              :     index to determine which awaitable completed first. Void awaitables
     593              :     contribute std::monostate to the variant.
     594              : 
     595              :     @see when_all, IoAwaitable
     596              : */
     597              : template<IoAwaitable A0, IoAwaitable... As>
     598           43 : [[nodiscard]] auto when_any(A0 a0, As... as)
     599              :     -> task<detail::when_any_result_t<
     600              :         detail::awaitable_result_t<A0>,
     601              :         detail::awaitable_result_t<As>...>>
     602              : {
     603              :     using result_type = detail::when_any_result_t<
     604              :         detail::awaitable_result_t<A0>,
     605              :         detail::awaitable_result_t<As>...>;
     606              : 
     607              :     detail::when_any_state<
     608              :         detail::awaitable_result_t<A0>,
     609              :         detail::awaitable_result_t<As>...> state;
     610              :     std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
     611              : 
     612              :     co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
     613              : 
     614              :     if(state.core_.winner_exception_)
     615              :         std::rethrow_exception(state.core_.winner_exception_);
     616              : 
     617              :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     618           86 : }
     619              : 
     620              : /** Concept for ranges of full I/O awaitables.
     621              : 
     622              :     A range satisfies `IoAwaitableRange` if it is a sized input range
     623              :     whose value type satisfies @ref IoAwaitable. This enables when_any
     624              :     to accept any container or view of awaitables, not just std::vector.
     625              : 
     626              :     @tparam R The range type.
     627              : 
     628              :     @par Requirements
     629              :     @li `R` must satisfy `std::ranges::input_range`
     630              :     @li `R` must satisfy `std::ranges::sized_range`
     631              :     @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
     632              : 
     633              :     @par Syntactic Requirements
     634              :     Given `r` of type `R`:
     635              :     @li `std::ranges::begin(r)` is valid
     636              :     @li `std::ranges::end(r)` is valid
     637              :     @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
     638              :     @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
     639              : 
     640              :     @par Example
     641              :     @code
     642              :     template<IoAwaitableRange R>
     643              :     task<void> race_all(R&& awaitables) {
     644              :         auto winner = co_await when_any(std::forward<R>(awaitables));
     645              :         // Process winner...
     646              :     }
     647              :     @endcode
     648              : 
     649              :     @see when_any, IoAwaitable
     650              : */
     651              : template<typename R>
     652              : concept IoAwaitableRange =
     653              :     std::ranges::input_range<R> &&
     654              :     std::ranges::sized_range<R> &&
     655              :     IoAwaitable<std::ranges::range_value_t<R>>;
     656              : 
     657              : namespace detail {
     658              : 
     659              : /** Shared state for homogeneous when_any (range overload).
     660              : 
     661              :     Uses composition with when_any_core for shared functionality.
     662              :     Simpler than heterogeneous: optional<T> instead of variant, vector
     663              :     instead of array for runner handles.
     664              : */
     665              : template<typename T>
     666              : struct when_any_homogeneous_state
     667              : {
     668              :     when_any_core core_;
     669              :     std::optional<T> result_;
     670              :     std::vector<std::coroutine_handle<>> runner_handles_;
     671              : 
     672           19 :     explicit when_any_homogeneous_state(std::size_t count)
     673           19 :         : core_(count)
     674           38 :         , runner_handles_(count)
     675              :     {
     676           19 :     }
     677              : 
     678              :     // Runners self-destruct in final_suspend. No destruction needed here.
     679              : 
     680              :     /** @pre core_.try_win() returned true. */
     681           17 :     void set_winner_result(T value)
     682              :         noexcept(std::is_nothrow_move_constructible_v<T>)
     683              :     {
     684           17 :         result_.emplace(std::move(value));
     685           17 :     }
     686              : };
     687              : 
     688              : /** Specialization for void tasks (no result storage needed). */
     689              : template<>
     690              : struct when_any_homogeneous_state<void>
     691              : {
     692              :     when_any_core core_;
     693              :     std::vector<std::coroutine_handle<>> runner_handles_;
     694              : 
     695            3 :     explicit when_any_homogeneous_state(std::size_t count)
     696            3 :         : core_(count)
     697            6 :         , runner_handles_(count)
     698              :     {
     699            3 :     }
     700              : 
     701              :     // Runners self-destruct in final_suspend. No destruction needed here.
     702              : 
     703              :     // No set_winner_result - void tasks have no result to store
     704              : };
     705              : 
     706              : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     707              : template<IoAwaitableRange Range>
     708              : class when_any_homogeneous_launcher
     709              : {
     710              :     using Awaitable = std::ranges::range_value_t<Range>;
     711              :     using T = awaitable_result_t<Awaitable>;
     712              : 
     713              :     Range* range_;
     714              :     when_any_homogeneous_state<T>* state_;
     715              : 
     716              : public:
     717           22 :     when_any_homogeneous_launcher(
     718              :         Range* range,
     719              :         when_any_homogeneous_state<T>* state)
     720           22 :         : range_(range)
     721           22 :         , state_(state)
     722              :     {
     723           22 :     }
     724              : 
     725           22 :     bool await_ready() const noexcept
     726              :     {
     727           22 :         return std::ranges::empty(*range_);
     728              :     }
     729              : 
     730              :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     731              :         destroys this object before await_suspend returns. Must not reference
     732              :         `this` after dispatching begins.
     733              : 
     734              :         Two-phase approach:
     735              :         1. Create all runners (safe - no dispatch yet)
     736              :         2. Dispatch all runners (any may complete synchronously)
     737              :     */
     738           22 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     739              :     {
     740           22 :         state_->core_.continuation_ = continuation;
     741           22 :         state_->core_.caller_env_ = caller_env;
     742              : 
     743           22 :         if(caller_env->stop_token.stop_possible())
     744              :         {
     745           14 :             state_->core_.parent_stop_callback_.emplace(
     746            7 :                 caller_env->stop_token,
     747            7 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     748              : 
     749            7 :             if(caller_env->stop_token.stop_requested())
     750            4 :                 state_->core_.stop_source_.request_stop();
     751              :         }
     752              : 
     753           22 :         auto token = state_->core_.stop_source_.get_token();
     754              : 
     755              :         // Phase 1: Create all runners without dispatching.
     756              :         // This iterates over *range_ safely because no runners execute yet.
     757           22 :         std::size_t index = 0;
     758          107 :         for(auto&& a : *range_)
     759              :         {
     760           85 :             auto runner = make_when_any_runner(
     761           85 :                 std::move(a), state_, index);
     762              : 
     763           85 :             auto h = runner.release();
     764           85 :             h.promise().state_ = state_;
     765           85 :             h.promise().index_ = index;
     766           85 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->allocator};
     767              : 
     768           85 :             state_->runner_handles_[index] = std::coroutine_handle<>{h};
     769           85 :             ++index;
     770              :         }
     771              : 
     772              :         // Phase 2: Post all runners. Any may complete synchronously.
     773              :         // After last post, state_ and this may be destroyed.
     774              :         // Use raw pointer/count captured before posting.
     775           22 :         std::coroutine_handle<>* handles = state_->runner_handles_.data();
     776           22 :         std::size_t count = state_->runner_handles_.size();
     777          107 :         for(std::size_t i = 0; i < count; ++i)
     778           85 :             caller_env->executor.post(handles[i]);
     779              : 
     780           44 :         return std::noop_coroutine();
     781          107 :     }
     782              : 
     783           22 :     void await_resume() const noexcept
     784              :     {
     785           22 :     }
     786              : };
     787              : 
     788              : } // namespace detail
     789              : 
     790              : /** Wait for the first awaitable to complete (range overload).
     791              : 
     792              :     Races a range of awaitables with the same result type. Accepts any
     793              :     sized input range of IoAwaitable types, enabling use with arrays,
     794              :     spans, or custom containers.
     795              : 
     796              :     @par Suspends
     797              :     The calling coroutine suspends when co_await is invoked. All awaitables
     798              :     in the range are launched concurrently and execute in parallel. The
     799              :     coroutine resumes only after all awaitables have completed, even though
     800              :     the winner is determined by the first to finish.
     801              : 
     802              :     @par Completion Conditions
     803              :     @li Winner is determined when the first awaitable completes (success or exception)
     804              :     @li Only one task can claim winner status via atomic compare-exchange
     805              :     @li Once a winner exists, stop is requested for all remaining siblings
     806              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     807              :     @li The winner's index and result are returned; if the winner threw, the exception is rethrown
     808              : 
     809              :     @par Cancellation Semantics
     810              :     Cancellation is supported via stop_token propagated through the
     811              :     IoAwaitable protocol:
     812              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     813              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     814              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     815              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     816              :     @li Stop requests are cooperative; tasks must check and respond to them
     817              : 
     818              :     @par Concurrency/Overlap
     819              :     All awaitables are launched concurrently before any can complete.
     820              :     The launcher iterates through the range, starting each task on the
     821              :     caller's executor. Tasks may execute in parallel on multi-threaded
     822              :     executors or interleave on single-threaded executors. There is no
     823              :     guaranteed ordering of task completion.
     824              : 
     825              :     @par Notable Error Conditions
     826              :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     827              :     @li Winner exception: if the winning task threw, that exception is rethrown
     828              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     829              :     @li Cancellation: tasks may complete via cancellation without throwing
     830              : 
     831              :     @par Example
     832              :     @code
     833              :     task<void> example() {
     834              :         std::array<task<Response>, 3> requests = {
     835              :             fetch_from_server(0),
     836              :             fetch_from_server(1),
     837              :             fetch_from_server(2)
     838              :         };
     839              : 
     840              :         auto [index, response] = co_await when_any(std::move(requests));
     841              :     }
     842              :     @endcode
     843              : 
     844              :     @par Example with Vector
     845              :     @code
     846              :     task<Response> fetch_fastest(std::vector<Server> const& servers) {
     847              :         std::vector<task<Response>> requests;
     848              :         for (auto const& server : servers)
     849              :             requests.push_back(fetch_from(server));
     850              : 
     851              :         auto [index, response] = co_await when_any(std::move(requests));
     852              :         co_return response;
     853              :     }
     854              :     @endcode
     855              : 
     856              :     @tparam R Range type satisfying IoAwaitableRange.
     857              :     @param awaitables Range of awaitables to race concurrently (must not be empty).
     858              :     @return A task yielding a pair of (winner_index, result).
     859              : 
     860              :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     861              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     862              : 
     863              :     @par Remarks
     864              :     Elements are moved from the range; for lvalue ranges, the original
     865              :     container will have moved-from elements after this call. The range
     866              :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     867              :     the variadic overload, no variant wrapper is needed since all tasks
     868              :     share the same return type.
     869              : 
     870              :     @see when_any, IoAwaitableRange
     871              : */
     872              : template<IoAwaitableRange R>
     873              :     requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
     874           21 : [[nodiscard]] auto when_any(R&& awaitables)
     875              :     -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
     876              : {
     877              :     using Awaitable = std::ranges::range_value_t<R>;
     878              :     using T = detail::awaitable_result_t<Awaitable>;
     879              :     using result_type = std::pair<std::size_t, T>;
     880              :     using OwnedRange = std::remove_cvref_t<R>;
     881              : 
     882              :     auto count = std::ranges::size(awaitables);
     883              :     if(count == 0)
     884              :         throw std::invalid_argument("when_any requires at least one awaitable");
     885              : 
     886              :     // Move/copy range onto coroutine frame to ensure lifetime
     887              :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     888              : 
     889              :     detail::when_any_homogeneous_state<T> state(count);
     890              : 
     891              :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     892              : 
     893              :     if(state.core_.winner_exception_)
     894              :         std::rethrow_exception(state.core_.winner_exception_);
     895              : 
     896              :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     897           42 : }
     898              : 
     899              : /** Wait for the first awaitable to complete (void range overload).
     900              : 
     901              :     Races a range of void-returning awaitables. Since void awaitables have
     902              :     no result value, only the winner's index is returned.
     903              : 
     904              :     @par Suspends
     905              :     The calling coroutine suspends when co_await is invoked. All awaitables
     906              :     in the range are launched concurrently and execute in parallel. The
     907              :     coroutine resumes only after all awaitables have completed, even though
     908              :     the winner is determined by the first to finish.
     909              : 
     910              :     @par Completion Conditions
     911              :     @li Winner is determined when the first awaitable completes (success or exception)
     912              :     @li Only one task can claim winner status via atomic compare-exchange
     913              :     @li Once a winner exists, stop is requested for all remaining siblings
     914              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     915              :     @li The winner's index is returned; if the winner threw, the exception is rethrown
     916              : 
     917              :     @par Cancellation Semantics
     918              :     Cancellation is supported via stop_token propagated through the
     919              :     IoAwaitable protocol:
     920              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     921              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     922              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     923              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     924              :     @li Stop requests are cooperative; tasks must check and respond to them
     925              : 
     926              :     @par Concurrency/Overlap
     927              :     All awaitables are launched concurrently before any can complete.
     928              :     The launcher iterates through the range, starting each task on the
     929              :     caller's executor. Tasks may execute in parallel on multi-threaded
     930              :     executors or interleave on single-threaded executors. There is no
     931              :     guaranteed ordering of task completion.
     932              : 
     933              :     @par Notable Error Conditions
     934              :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     935              :     @li Winner exception: if the winning task threw, that exception is rethrown
     936              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     937              :     @li Cancellation: tasks may complete via cancellation without throwing
     938              : 
     939              :     @par Example
     940              :     @code
     941              :     task<void> example() {
     942              :         std::vector<task<void>> tasks;
     943              :         for (int i = 0; i < 5; ++i)
     944              :             tasks.push_back(background_work(i));
     945              : 
     946              :         std::size_t winner = co_await when_any(std::move(tasks));
     947              :         // winner is the index of the first task to complete
     948              :     }
     949              :     @endcode
     950              : 
     951              :     @par Example with Timeout
     952              :     @code
     953              :     task<void> with_timeout() {
     954              :         std::vector<task<void>> tasks;
     955              :         tasks.push_back(long_running_operation());
     956              :         tasks.push_back(delay(std::chrono::seconds(5)));
     957              : 
     958              :         std::size_t winner = co_await when_any(std::move(tasks));
     959              :         if (winner == 1) {
     960              :             // Timeout occurred
     961              :         }
     962              :     }
     963              :     @endcode
     964              : 
     965              :     @tparam R Range type satisfying IoAwaitableRange with void result.
     966              :     @param awaitables Range of void awaitables to race concurrently (must not be empty).
     967              :     @return A task yielding the winner's index (zero-based).
     968              : 
     969              :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     970              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     971              : 
     972              :     @par Remarks
     973              :     Elements are moved from the range; for lvalue ranges, the original
     974              :     container will have moved-from elements after this call. The range
     975              :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     976              :     the non-void overload, no result storage is needed since void tasks
     977              :     produce no value.
     978              : 
     979              :     @see when_any, IoAwaitableRange
     980              : */
     981              : template<IoAwaitableRange R>
     982              :     requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
     983            3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
     984              : {
     985              :     using OwnedRange = std::remove_cvref_t<R>;
     986              : 
     987              :     auto count = std::ranges::size(awaitables);
     988              :     if(count == 0)
     989              :         throw std::invalid_argument("when_any requires at least one awaitable");
     990              : 
     991              :     // Move/copy range onto coroutine frame to ensure lifetime
     992              :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     993              : 
     994              :     detail::when_any_homogeneous_state<void> state(count);
     995              : 
     996              :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     997              : 
     998              :     if(state.core_.winner_exception_)
     999              :         std::rethrow_exception(state.core_.winner_exception_);
    1000              : 
    1001              :     co_return state.core_.winner_index_;
    1002            6 : }
    1003              : 
    1004              : } // namespace capy
    1005              : } // namespace boost
    1006              : 
    1007              : #endif
        

Generated by: LCOV version 2.3