LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex/detail - strand_service.cpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 96.7 % 91 88
Test Date: 2026-02-12 22:58:59 Functions: 91.3 % 23 21

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #include "src/ex/detail/strand_queue.hpp"
      11              : #include <boost/capy/ex/detail/strand_service.hpp>
      12              : #include <atomic>
      13              : #include <coroutine>
      14              : #include <mutex>
      15              : #include <thread>
      16              : #include <utility>
      17              : 
      18              : namespace boost {
      19              : namespace capy {
      20              : namespace detail {
      21              : 
      22              : //----------------------------------------------------------
      23              : 
      24              : /** Implementation state for a strand.
      25              : 
      26              :     Each strand_impl provides serialization for coroutines
      27              :     dispatched through strands that share it.
      28              : */
      29              : struct strand_impl
      30              : {
      31              :     std::mutex mutex_;
      32              :     strand_queue pending_;
      33              :     bool locked_ = false;
      34              :     std::atomic<std::thread::id> dispatch_thread_{};
      35              :     void* cached_frame_ = nullptr;
      36              : };
      37              : 
      38              : //----------------------------------------------------------
      39              : 
      40              : /** Invoker coroutine for strand dispatch.
      41              : 
      42              :     Uses custom allocator to recycle frame - one allocation
      43              :     per strand_impl lifetime, stored in trailer for recovery.
      44              : */
      45              : struct strand_invoker
      46              : {
      47              :     struct promise_type
      48              :     {
      49           12 :         void* operator new(std::size_t n, strand_impl& impl)
      50              :         {
      51           12 :             constexpr auto A = alignof(strand_impl*);
      52           12 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      53           12 :             std::size_t total = padded + sizeof(strand_impl*);
      54              : 
      55           12 :             void* p = impl.cached_frame_
      56           12 :                 ? std::exchange(impl.cached_frame_, nullptr)
      57            7 :                 : ::operator new(total);
      58              : 
      59              :             // Trailer lets delete recover impl
      60           12 :             *reinterpret_cast<strand_impl**>(
      61           12 :                 static_cast<char*>(p) + padded) = &impl;
      62           12 :             return p;
      63              :         }
      64              : 
      65           12 :         void operator delete(void* p, std::size_t n) noexcept
      66              :         {
      67           12 :             constexpr auto A = alignof(strand_impl*);
      68           12 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      69              : 
      70           12 :             auto* impl = *reinterpret_cast<strand_impl**>(
      71              :                 static_cast<char*>(p) + padded);
      72              : 
      73           12 :             if (!impl->cached_frame_)
      74           12 :                 impl->cached_frame_ = p;
      75              :             else
      76            0 :                 ::operator delete(p);
      77           12 :         }
      78              : 
      79           12 :         strand_invoker get_return_object() noexcept
      80           12 :         { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
      81              : 
      82           12 :         std::suspend_always initial_suspend() noexcept { return {}; }
      83           12 :         std::suspend_never final_suspend() noexcept { return {}; }
      84           12 :         void return_void() noexcept {}
      85            0 :         void unhandled_exception() { std::terminate(); }
      86              :     };
      87              : 
      88              :     std::coroutine_handle<promise_type> h_;
      89              : };
      90              : 
      91              : //----------------------------------------------------------
      92              : 
      93              : /** Concrete implementation of strand_service.
      94              : 
      95              :     Holds the fixed pool of strand_impl objects.
      96              : */
      97              : class strand_service_impl : public strand_service
      98              : {
      99              :     static constexpr std::size_t num_impls = 211;
     100              : 
     101              :     strand_impl impls_[num_impls];
     102              :     std::size_t salt_ = 0;
     103              :     std::mutex mutex_;
     104              : 
     105              : public:
     106              :     explicit
     107           19 :     strand_service_impl(execution_context&)
     108         4028 :     {
     109           19 :     }
     110              : 
     111              :     strand_impl*
     112           23 :     get_implementation() override
     113              :     {
     114           23 :         std::lock_guard<std::mutex> lock(mutex_);
     115           23 :         std::size_t index = salt_++;
     116           23 :         index = index % num_impls;
     117           23 :         return &impls_[index];
     118           23 :     }
     119              : 
     120              : protected:
     121              :     void
     122           19 :     shutdown() override
     123              :     {
     124         4028 :         for(std::size_t i = 0; i < num_impls; ++i)
     125              :         {
     126         4009 :             std::lock_guard<std::mutex> lock(impls_[i].mutex_);
     127         4009 :             impls_[i].locked_ = true;
     128              : 
     129         4009 :             if(impls_[i].cached_frame_)
     130              :             {
     131            7 :                 ::operator delete(impls_[i].cached_frame_);
     132            7 :                 impls_[i].cached_frame_ = nullptr;
     133              :             }
     134         4009 :         }
     135           19 :     }
     136              : 
     137              : private:
     138              :     static bool
     139          322 :     enqueue(strand_impl& impl, std::coroutine_handle<> h)
     140              :     {
     141          322 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     142          322 :         impl.pending_.push(h);
     143          322 :         if(!impl.locked_)
     144              :         {
     145           12 :             impl.locked_ = true;
     146           12 :             return true;
     147              :         }
     148          310 :         return false;
     149          322 :     }
     150              : 
     151              :     static void
     152           17 :     dispatch_pending(strand_impl& impl)
     153              :     {
     154           17 :         strand_queue::taken_batch batch;
     155              :         {
     156           17 :             std::lock_guard<std::mutex> lock(impl.mutex_);
     157           17 :             batch = impl.pending_.take_all();
     158           17 :         }
     159           17 :         impl.pending_.dispatch_batch(batch);
     160           17 :     }
     161              : 
     162              :     static bool
     163           17 :     try_unlock(strand_impl& impl)
     164              :     {
     165           17 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     166           17 :         if(impl.pending_.empty())
     167              :         {
     168           12 :             impl.locked_ = false;
     169           12 :             return true;
     170              :         }
     171            5 :         return false;
     172           17 :     }
     173              : 
     174              :     static void
     175           17 :     set_dispatch_thread(strand_impl& impl) noexcept
     176              :     {
     177           17 :         impl.dispatch_thread_.store(std::this_thread::get_id());
     178           17 :     }
     179              : 
     180              :     static void
     181           12 :     clear_dispatch_thread(strand_impl& impl) noexcept
     182              :     {
     183           12 :         impl.dispatch_thread_.store(std::thread::id{});
     184           12 :     }
     185              : 
     186              :     // Loops until queue empty (aggressive). Alternative: per-batch fairness
     187              :     // (repost after each batch to let other work run) - explore if starvation observed.
     188              :     static strand_invoker
     189           12 :     make_invoker(strand_impl& impl)
     190              :     {
     191              :         strand_impl* p = &impl;
     192              :         for(;;)
     193              :         {
     194              :             set_dispatch_thread(*p);
     195              :             dispatch_pending(*p);
     196              :             if(try_unlock(*p))
     197              :             {
     198              :                 clear_dispatch_thread(*p);
     199              :                 co_return;
     200              :             }
     201              :         }
     202           24 :     }
     203              : 
     204              :     friend class strand_service;
     205              : };
     206              : 
     207              : //----------------------------------------------------------
     208              : 
     209           19 : strand_service::
     210           19 : strand_service()
     211           19 :     : service()
     212              : {
     213           19 : }
     214              : 
     215           19 : strand_service::
     216              : ~strand_service() = default;
     217              : 
     218              : bool
     219            2 : strand_service::
     220              : running_in_this_thread(strand_impl& impl) noexcept
     221              : {
     222            2 :     return impl.dispatch_thread_.load() == std::this_thread::get_id();
     223              : }
     224              : 
     225              : std::coroutine_handle<>
     226            1 : strand_service::
     227              : dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
     228              : {
     229            1 :     if(running_in_this_thread(impl))
     230            0 :         return h;
     231              : 
     232            1 :     if(strand_service_impl::enqueue(impl, h))
     233            1 :         ex.post(strand_service_impl::make_invoker(impl).h_);
     234            1 :     return std::noop_coroutine();
     235              : }
     236              : 
     237              : void
     238          321 : strand_service::
     239              : post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
     240              : {
     241          321 :     if(strand_service_impl::enqueue(impl, h))
     242           11 :         ex.post(strand_service_impl::make_invoker(impl).h_);
     243          321 : }
     244              : 
     245              : strand_service&
     246           23 : get_strand_service(execution_context& ctx)
     247              : {
     248           23 :     return ctx.use_service<strand_service_impl>();
     249              : }
     250              : 
     251              : } // namespace detail
     252              : } // namespace capy
     253              : } // namespace boost
        

Generated by: LCOV version 2.3