LCOV - code coverage report
Current view: top level - capy/io - any_buffer_source.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 86.6 % 172 149
Test Date: 2026-02-12 22:58:59 Functions: 88.2 % 51 45

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_copy.hpp>
      17              : #include <boost/capy/buffers/buffer_param.hpp>
      18              : #include <boost/capy/buffers/slice.hpp>
      19              : #include <boost/capy/concept/buffer_source.hpp>
      20              : #include <boost/capy/concept/io_awaitable.hpp>
      21              : #include <boost/capy/concept/read_source.hpp>
      22              : #include <boost/capy/error.hpp>
      23              : #include <boost/capy/ex/io_env.hpp>
      24              : #include <boost/capy/io_result.hpp>
      25              : #include <boost/capy/io_task.hpp>
      26              : 
      27              : #include <concepts>
      28              : #include <coroutine>
      29              : #include <cstddef>
      30              : #include <exception>
      31              : #include <new>
      32              : #include <span>
      33              : #include <stop_token>
      34              : #include <system_error>
      35              : #include <utility>
      36              : 
      37              : namespace boost {
      38              : namespace capy {
      39              : 
      40              : /** Type-erased wrapper for any BufferSource.
      41              : 
      42              :     This class provides type erasure for any type satisfying the
      43              :     @ref BufferSource concept, enabling runtime polymorphism for
      44              :     buffer pull operations. It uses cached awaitable storage to achieve
      45              :     zero steady-state allocation after construction.
      46              : 
      47              :     The wrapper also satisfies @ref ReadSource. When the wrapped type
      48              :     satisfies only @ref BufferSource, the read operations are
      49              :     synthesized using @ref pull and @ref consume with an extra
      50              :     buffer copy. When the wrapped type satisfies both @ref BufferSource
      51              :     and @ref ReadSource, the native read operations are forwarded
      52              :     directly across the virtual boundary, avoiding the copy.
      53              : 
      54              :     The wrapper supports two construction modes:
      55              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      56              :       allocates storage and owns the source.
      57              :     - **Reference**: Pass a pointer to wrap without ownership. The
      58              :       pointed-to source must outlive this wrapper.
      59              : 
      60              :     Within each mode, the vtable is populated at compile time based
      61              :     on whether the wrapped type also satisfies @ref ReadSource:
      62              :     - **BufferSource only**: @ref read_some and @ref read are
      63              :       synthesized from @ref pull and @ref consume, incurring one
      64              :       buffer copy per operation.
      65              :     - **BufferSource + ReadSource**: All read operations are
      66              :       forwarded natively through the type-erased boundary with
      67              :       no extra copy.
      68              : 
      69              :     @par Awaitable Preallocation
      70              :     The constructor preallocates storage for the type-erased awaitable.
      71              :     This reserves all virtual address space at server startup
      72              :     so memory usage can be measured up front, rather than
      73              :     allocating piecemeal as traffic arrives.
      74              : 
      75              :     @par Thread Safety
      76              :     Not thread-safe. Concurrent operations on the same wrapper
      77              :     are undefined behavior.
      78              : 
      79              :     @par Example
      80              :     @code
      81              :     // Owning - takes ownership of the source
      82              :     any_buffer_source abs(some_buffer_source{args...});
      83              : 
      84              :     // Reference - wraps without ownership
      85              :     some_buffer_source src;
      86              :     any_buffer_source abs(&src);
      87              : 
      88              :     const_buffer arr[16];
      89              :     auto [ec, bufs] = co_await abs.pull(arr);
      90              : 
      91              :     // ReadSource interface also available
      92              :     char buf[64];
      93              :     auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
      94              :     @endcode
      95              : 
      96              :     @see any_buffer_sink, BufferSource, ReadSource
      97              : */
      98              : class any_buffer_source
      99              : {
     100              :     struct vtable;
     101              :     struct awaitable_ops;
     102              :     struct read_awaitable_ops;
     103              : 
     104              :     template<BufferSource S>
     105              :     struct vtable_for_impl;
     106              : 
     107              :     // hot-path members first for cache locality
     108              :     void* source_ = nullptr;
     109              :     vtable const* vt_ = nullptr;
     110              :     void* cached_awaitable_ = nullptr;
     111              :     awaitable_ops const* active_ops_ = nullptr;
     112              :     read_awaitable_ops const* active_read_ops_ = nullptr;
     113              :     void* storage_ = nullptr;
     114              : 
     115              : public:
     116              :     /** Destructor.
     117              : 
     118              :         Destroys the owned source (if any) and releases the cached
     119              :         awaitable storage.
     120              :     */
     121              :     ~any_buffer_source();
     122              : 
     123              :     /** Default constructor.
     124              : 
     125              :         Constructs an empty wrapper. Operations on a default-constructed
     126              :         wrapper result in undefined behavior.
     127              :     */
     128              :     any_buffer_source() = default;
     129              : 
     130              :     /** Non-copyable.
     131              : 
     132              :         The awaitable cache is per-instance and cannot be shared.
     133              :     */
     134              :     any_buffer_source(any_buffer_source const&) = delete;
     135              :     any_buffer_source& operator=(any_buffer_source const&) = delete;
     136              : 
     137              :     /** Move constructor.
     138              : 
     139              :         Transfers ownership of the wrapped source (if owned) and
     140              :         cached awaitable storage from `other`. After the move, `other` is
     141              :         in a default-constructed state.
     142              : 
     143              :         @param other The wrapper to move from.
     144              :     */
     145            2 :     any_buffer_source(any_buffer_source&& other) noexcept
     146            2 :         : source_(std::exchange(other.source_, nullptr))
     147            2 :         , vt_(std::exchange(other.vt_, nullptr))
     148            2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     149            2 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     150            2 :         , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
     151            2 :         , storage_(std::exchange(other.storage_, nullptr))
     152              :     {
     153            2 :     }
     154              : 
     155              :     /** Move assignment operator.
     156              : 
     157              :         Destroys any owned source and releases existing resources,
     158              :         then transfers ownership from `other`.
     159              : 
     160              :         @param other The wrapper to move from.
     161              :         @return Reference to this wrapper.
     162              :     */
     163              :     any_buffer_source&
     164              :     operator=(any_buffer_source&& other) noexcept;
     165              : 
     166              :     /** Construct by taking ownership of a BufferSource.
     167              : 
     168              :         Allocates storage and moves the source into this wrapper.
     169              :         The wrapper owns the source and will destroy it. If `S` also
     170              :         satisfies @ref ReadSource, native read operations are
     171              :         forwarded through the virtual boundary.
     172              : 
     173              :         @param s The source to take ownership of.
     174              :     */
     175              :     template<BufferSource S>
     176              :         requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     177              :     any_buffer_source(S s);
     178              : 
     179              :     /** Construct by wrapping a BufferSource without ownership.
     180              : 
     181              :         Wraps the given source by pointer. The source must remain
     182              :         valid for the lifetime of this wrapper. If `S` also
     183              :         satisfies @ref ReadSource, native read operations are
     184              :         forwarded through the virtual boundary.
     185              : 
     186              :         @param s Pointer to the source to wrap.
     187              :     */
     188              :     template<BufferSource S>
     189              :     any_buffer_source(S* s);
     190              : 
     191              :     /** Check if the wrapper contains a valid source.
     192              : 
     193              :         @return `true` if wrapping a source, `false` if default-constructed
     194              :             or moved-from.
     195              :     */
     196              :     bool
     197           16 :     has_value() const noexcept
     198              :     {
     199           16 :         return source_ != nullptr;
     200              :     }
     201              : 
     202              :     /** Check if the wrapper contains a valid source.
     203              : 
     204              :         @return `true` if wrapping a source, `false` if default-constructed
     205              :             or moved-from.
     206              :     */
     207              :     explicit
     208            2 :     operator bool() const noexcept
     209              :     {
     210            2 :         return has_value();
     211              :     }
     212              : 
     213              :     /** Consume bytes from the source.
     214              : 
     215              :         Advances the internal read position of the underlying source
     216              :         by the specified number of bytes. The next call to @ref pull
     217              :         returns data starting after the consumed bytes.
     218              : 
     219              :         @param n The number of bytes to consume. Must not exceed the
     220              :         total size of buffers returned by the previous @ref pull.
     221              : 
     222              :         @par Preconditions
     223              :         The wrapper must contain a valid source (`has_value() == true`).
     224              :     */
     225              :     void
     226              :     consume(std::size_t n) noexcept;
     227              : 
     228              :     /** Pull buffer data from the source.
     229              : 
     230              :         Fills the provided span with buffer descriptors from the
     231              :         underlying source. The operation completes when data is
     232              :         available, the source is exhausted, or an error occurs.
     233              : 
     234              :         @param dest Span of const_buffer to fill.
     235              : 
     236              :         @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
     237              :             On success with data, a non-empty span of filled buffers.
     238              :             On EOF, `ec == cond::eof` and span is empty.
     239              : 
     240              :         @par Preconditions
     241              :         The wrapper must contain a valid source (`has_value() == true`).
     242              :         The caller must not call this function again after a prior
     243              :         call returned an error.
     244              :     */
     245              :     auto
     246              :     pull(std::span<const_buffer> dest);
     247              : 
     248              :     /** Read some data into a mutable buffer sequence.
     249              : 
     250              :         Reads one or more bytes into the caller's buffers. May fill
     251              :         less than the full sequence.
     252              : 
     253              :         When the wrapped type provides native @ref ReadSource support,
     254              :         the operation forwards directly. Otherwise it is synthesized
     255              :         from @ref pull, @ref buffer_copy, and @ref consume.
     256              : 
     257              :         @param buffers The buffer sequence to fill.
     258              : 
     259              :         @return An awaitable yielding `(error_code,std::size_t)`.
     260              : 
     261              :         @par Preconditions
     262              :         The wrapper must contain a valid source (`has_value() == true`).
     263              :         The caller must not call this function again after a prior
     264              :         call returned an error (including EOF).
     265              : 
     266              :         @see pull, consume
     267              :     */
     268              :     template<MutableBufferSequence MB>
     269              :     io_task<std::size_t>
     270              :     read_some(MB buffers);
     271              : 
     272              :     /** Read data into a mutable buffer sequence.
     273              : 
     274              :         Fills the provided buffer sequence completely. When the
     275              :         wrapped type provides native @ref ReadSource support, each
     276              :         window is forwarded directly. Otherwise the data is
     277              :         synthesized from @ref pull, @ref buffer_copy, and @ref consume.
     278              : 
     279              :         @param buffers The buffer sequence to fill.
     280              : 
     281              :         @return An awaitable yielding `(error_code,std::size_t)`.
     282              :             On success, `n == buffer_size(buffers)`.
     283              :             On EOF, `ec == error::eof` and `n` is bytes transferred.
     284              : 
     285              :         @par Preconditions
     286              :         The wrapper must contain a valid source (`has_value() == true`).
     287              :         The caller must not call this function again after a prior
     288              :         call returned an error (including EOF).
     289              : 
     290              :         @see pull, consume
     291              :     */
     292              :     template<MutableBufferSequence MB>
     293              :     io_task<std::size_t>
     294              :     read(MB buffers);
     295              : 
     296              : protected:
     297              :     /** Rebind to a new source after move.
     298              : 
     299              :         Updates the internal pointer to reference a new source object.
     300              :         Used by owning wrappers after move assignment when the owned
     301              :         object has moved to a new location.
     302              : 
     303              :         @param new_source The new source to bind to. Must be the same
     304              :             type as the original source.
     305              : 
     306              :         @note Terminates if called with a source of different type
     307              :             than the original.
     308              :     */
     309              :     template<BufferSource S>
     310              :     void
     311              :     rebind(S& new_source) noexcept
     312              :     {
     313              :         if(vt_ != &vtable_for_impl<S>::value)
     314              :             std::terminate();
     315              :         source_ = &new_source;
     316              :     }
     317              : 
     318              : private:
     319              :     /** Forward a partial read through the vtable.
     320              : 
     321              :         Constructs the underlying `read_some` awaitable in
     322              :         cached storage and returns a type-erased awaitable.
     323              :     */
     324              :     auto
     325              :     read_some_(std::span<mutable_buffer const> buffers);
     326              : 
     327              :     /** Forward a complete read through the vtable.
     328              : 
     329              :         Constructs the underlying `read` awaitable in
     330              :         cached storage and returns a type-erased awaitable.
     331              :     */
     332              :     auto
     333              :     read_(std::span<mutable_buffer const> buffers);
     334              : };
     335              : 
     336              : //----------------------------------------------------------
     337              : 
     338              : /** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
     339              : struct any_buffer_source::awaitable_ops
     340              : {
     341              :     bool (*await_ready)(void*);
     342              :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     343              :     io_result<std::span<const_buffer>> (*await_resume)(void*);
     344              :     void (*destroy)(void*) noexcept;
     345              : };
     346              : 
     347              : /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
     348              : struct any_buffer_source::read_awaitable_ops
     349              : {
     350              :     bool (*await_ready)(void*);
     351              :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     352              :     io_result<std::size_t> (*await_resume)(void*);
     353              :     void (*destroy)(void*) noexcept;
     354              : };
     355              : 
     356              : struct any_buffer_source::vtable
     357              : {
     358              :     // BufferSource ops (always populated)
     359              :     void (*destroy)(void*) noexcept;
     360              :     void (*do_consume)(void* source, std::size_t n) noexcept;
     361              :     std::size_t awaitable_size;
     362              :     std::size_t awaitable_align;
     363              :     awaitable_ops const* (*construct_awaitable)(
     364              :         void* source,
     365              :         void* storage,
     366              :         std::span<const_buffer> dest);
     367              : 
     368              :     // ReadSource forwarding (null when wrapped type is BufferSource-only)
     369              :     read_awaitable_ops const* (*construct_read_some_awaitable)(
     370              :         void* source,
     371              :         void* storage,
     372              :         std::span<mutable_buffer const> buffers);
     373              :     read_awaitable_ops const* (*construct_read_awaitable)(
     374              :         void* source,
     375              :         void* storage,
     376              :         std::span<mutable_buffer const> buffers);
     377              : };
     378              : 
     379              : template<BufferSource S>
     380              : struct any_buffer_source::vtable_for_impl
     381              : {
     382              :     using PullAwaitable = decltype(std::declval<S&>().pull(
     383              :         std::declval<std::span<const_buffer>>()));
     384              : 
     385              :     static void
     386            7 :     do_destroy_impl(void* source) noexcept
     387              :     {
     388            7 :         static_cast<S*>(source)->~S();
     389            7 :     }
     390              : 
     391              :     static void
     392           45 :     do_consume_impl(void* source, std::size_t n) noexcept
     393              :     {
     394           45 :         static_cast<S*>(source)->consume(n);
     395           45 :     }
     396              : 
     397              :     static awaitable_ops const*
     398          110 :     construct_awaitable_impl(
     399              :         void* source,
     400              :         void* storage,
     401              :         std::span<const_buffer> dest)
     402              :     {
     403          110 :         auto& s = *static_cast<S*>(source);
     404          110 :         ::new(storage) PullAwaitable(s.pull(dest));
     405              : 
     406              :         static constexpr awaitable_ops ops = {
     407          110 :             +[](void* p) {
     408          110 :                 return static_cast<PullAwaitable*>(p)->await_ready();
     409              :             },
     410            0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     411            0 :                 return detail::call_await_suspend(
     412            0 :                     static_cast<PullAwaitable*>(p), h, env);
     413              :             },
     414          110 :             +[](void* p) {
     415          110 :                 return static_cast<PullAwaitable*>(p)->await_resume();
     416              :             },
     417          110 :             +[](void* p) noexcept {
     418          110 :                 static_cast<PullAwaitable*>(p)->~PullAwaitable();
     419              :             }
     420              :         };
     421          110 :         return &ops;
     422              :     }
     423              : 
     424              :     //------------------------------------------------------
     425              :     // ReadSource forwarding (only instantiated when ReadSource<S>)
     426              : 
     427              :     static read_awaitable_ops const*
     428           48 :     construct_read_some_awaitable_impl(
     429              :         void* source,
     430              :         void* storage,
     431              :         std::span<mutable_buffer const> buffers)
     432              :         requires ReadSource<S>
     433              :     {
     434              :         using Aw = decltype(std::declval<S&>().read_some(
     435              :             std::span<mutable_buffer const>{}));
     436           48 :         auto& s = *static_cast<S*>(source);
     437           48 :         ::new(storage) Aw(s.read_some(buffers));
     438              : 
     439              :         static constexpr read_awaitable_ops ops = {
     440           48 :             +[](void* p) {
     441           48 :                 return static_cast<Aw*>(p)->await_ready();
     442              :             },
     443            0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     444            0 :                 return detail::call_await_suspend(
     445            0 :                     static_cast<Aw*>(p), h, env);
     446              :             },
     447           48 :             +[](void* p) {
     448           48 :                 return static_cast<Aw*>(p)->await_resume();
     449              :             },
     450           48 :             +[](void* p) noexcept {
     451           48 :                 static_cast<Aw*>(p)->~Aw();
     452              :             }
     453              :         };
     454           48 :         return &ops;
     455              :     }
     456              : 
     457              :     static read_awaitable_ops const*
     458           18 :     construct_read_awaitable_impl(
     459              :         void* source,
     460              :         void* storage,
     461              :         std::span<mutable_buffer const> buffers)
     462              :         requires ReadSource<S>
     463              :     {
     464              :         using Aw = decltype(std::declval<S&>().read(
     465              :             std::span<mutable_buffer const>{}));
     466           18 :         auto& s = *static_cast<S*>(source);
     467           18 :         ::new(storage) Aw(s.read(buffers));
     468              : 
     469              :         static constexpr read_awaitable_ops ops = {
     470           18 :             +[](void* p) {
     471           18 :                 return static_cast<Aw*>(p)->await_ready();
     472              :             },
     473            0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     474            0 :                 return detail::call_await_suspend(
     475            0 :                     static_cast<Aw*>(p), h, env);
     476              :             },
     477           18 :             +[](void* p) {
     478           18 :                 return static_cast<Aw*>(p)->await_resume();
     479              :             },
     480           18 :             +[](void* p) noexcept {
     481           18 :                 static_cast<Aw*>(p)->~Aw();
     482              :             }
     483              :         };
     484           18 :         return &ops;
     485              :     }
     486              : 
     487              :     //------------------------------------------------------
     488              : 
     489              :     static consteval std::size_t
     490              :     compute_max_size() noexcept
     491              :     {
     492              :         std::size_t s = sizeof(PullAwaitable);
     493              :         if constexpr (ReadSource<S>)
     494              :         {
     495              :             using RS = decltype(std::declval<S&>().read_some(
     496              :                 std::span<mutable_buffer const>{}));
     497              :             using R = decltype(std::declval<S&>().read(
     498              :                 std::span<mutable_buffer const>{}));
     499              : 
     500              :             if(sizeof(RS) > s) s = sizeof(RS);
     501              :             if(sizeof(R) > s) s = sizeof(R);
     502              :         }
     503              :         return s;
     504              :     }
     505              : 
     506              :     static consteval std::size_t
     507              :     compute_max_align() noexcept
     508              :     {
     509              :         std::size_t a = alignof(PullAwaitable);
     510              :         if constexpr (ReadSource<S>)
     511              :         {
     512              :             using RS = decltype(std::declval<S&>().read_some(
     513              :                 std::span<mutable_buffer const>{}));
     514              :             using R = decltype(std::declval<S&>().read(
     515              :                 std::span<mutable_buffer const>{}));
     516              : 
     517              :             if(alignof(RS) > a) a = alignof(RS);
     518              :             if(alignof(R) > a) a = alignof(R);
     519              :         }
     520              :         return a;
     521              :     }
     522              : 
     523              :     static consteval vtable
     524              :     make_vtable() noexcept
     525              :     {
     526              :         vtable v{};
     527              :         v.destroy = &do_destroy_impl;
     528              :         v.do_consume = &do_consume_impl;
     529              :         v.awaitable_size = compute_max_size();
     530              :         v.awaitable_align = compute_max_align();
     531              :         v.construct_awaitable = &construct_awaitable_impl;
     532              :         v.construct_read_some_awaitable = nullptr;
     533              :         v.construct_read_awaitable = nullptr;
     534              : 
     535              :         if constexpr (ReadSource<S>)
     536              :         {
     537              :             v.construct_read_some_awaitable =
     538              :                 &construct_read_some_awaitable_impl;
     539              :             v.construct_read_awaitable =
     540              :                 &construct_read_awaitable_impl;
     541              :         }
     542              :         return v;
     543              :     }
     544              : 
     545              :     static constexpr vtable value = make_vtable();
     546              : };
     547              : 
     548              : //----------------------------------------------------------
     549              : 
     550              : inline
     551          124 : any_buffer_source::~any_buffer_source()
     552              : {
     553          124 :     if(storage_)
     554              :     {
     555            7 :         vt_->destroy(source_);
     556            7 :         ::operator delete(storage_);
     557              :     }
     558          124 :     if(cached_awaitable_)
     559          119 :         ::operator delete(cached_awaitable_);
     560          124 : }
     561              : 
     562              : inline any_buffer_source&
     563            2 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
     564              : {
     565            2 :     if(this != &other)
     566              :     {
     567            2 :         if(storage_)
     568              :         {
     569            0 :             vt_->destroy(source_);
     570            0 :             ::operator delete(storage_);
     571              :         }
     572            2 :         if(cached_awaitable_)
     573            0 :             ::operator delete(cached_awaitable_);
     574            2 :         source_ = std::exchange(other.source_, nullptr);
     575            2 :         vt_ = std::exchange(other.vt_, nullptr);
     576            2 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     577            2 :         storage_ = std::exchange(other.storage_, nullptr);
     578            2 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     579            2 :         active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
     580              :     }
     581            2 :     return *this;
     582              : }
     583              : 
     584              : template<BufferSource S>
     585              :     requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     586            7 : any_buffer_source::any_buffer_source(S s)
     587            7 :     : vt_(&vtable_for_impl<S>::value)
     588              : {
     589              :     struct guard {
     590              :         any_buffer_source* self;
     591              :         bool committed = false;
     592            7 :         ~guard() {
     593            7 :             if(!committed && self->storage_) {
     594            0 :                 self->vt_->destroy(self->source_);
     595            0 :                 ::operator delete(self->storage_);
     596            0 :                 self->storage_ = nullptr;
     597            0 :                 self->source_ = nullptr;
     598              :             }
     599            7 :         }
     600            7 :     } g{this};
     601              : 
     602            7 :     storage_ = ::operator new(sizeof(S));
     603            7 :     source_ = ::new(storage_) S(std::move(s));
     604              : 
     605            7 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     606              : 
     607            7 :     g.committed = true;
     608            7 : }
     609              : 
     610              : template<BufferSource S>
     611          112 : any_buffer_source::any_buffer_source(S* s)
     612          112 :     : source_(s)
     613          112 :     , vt_(&vtable_for_impl<S>::value)
     614              : {
     615          112 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     616          112 : }
     617              : 
     618              : //----------------------------------------------------------
     619              : 
     620              : inline void
     621           45 : any_buffer_source::consume(std::size_t n) noexcept
     622              : {
     623           45 :     vt_->do_consume(source_, n);
     624           45 : }
     625              : 
     626              : inline auto
     627          110 : any_buffer_source::pull(std::span<const_buffer> dest)
     628              : {
     629              :     struct awaitable
     630              :     {
     631              :         any_buffer_source* self_;
     632              :         std::span<const_buffer> dest_;
     633              : 
     634              :         bool
     635          110 :         await_ready()
     636              :         {
     637          220 :             self_->active_ops_ = self_->vt_->construct_awaitable(
     638          110 :                 self_->source_,
     639          110 :                 self_->cached_awaitable_,
     640              :                 dest_);
     641          110 :             return self_->active_ops_->await_ready(self_->cached_awaitable_);
     642              :         }
     643              : 
     644              :         std::coroutine_handle<>
     645            0 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     646              :         {
     647            0 :             return self_->active_ops_->await_suspend(
     648            0 :                 self_->cached_awaitable_, h, env);
     649              :         }
     650              : 
     651              :         io_result<std::span<const_buffer>>
     652          110 :         await_resume()
     653              :         {
     654              :             struct guard {
     655              :                 any_buffer_source* self;
     656          110 :                 ~guard() {
     657          110 :                     self->active_ops_->destroy(self->cached_awaitable_);
     658          110 :                     self->active_ops_ = nullptr;
     659          110 :                 }
     660          110 :             } g{self_};
     661          110 :             return self_->active_ops_->await_resume(
     662          195 :                 self_->cached_awaitable_);
     663          110 :         }
     664              :     };
     665          110 :     return awaitable{this, dest};
     666              : }
     667              : 
     668              : //----------------------------------------------------------
     669              : // Private helpers for native ReadSource forwarding
     670              : 
     671              : inline auto
     672           48 : any_buffer_source::read_some_(
     673              :     std::span<mutable_buffer const> buffers)
     674              : {
     675              :     struct awaitable
     676              :     {
     677              :         any_buffer_source* self_;
     678              :         std::span<mutable_buffer const> buffers_;
     679              : 
     680              :         bool
     681           48 :         await_ready() const noexcept
     682              :         {
     683           48 :             return false;
     684              :         }
     685              : 
     686              :         std::coroutine_handle<>
     687           48 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     688              :         {
     689           96 :             self_->active_read_ops_ =
     690           96 :                 self_->vt_->construct_read_some_awaitable(
     691           48 :                     self_->source_,
     692           48 :                     self_->cached_awaitable_,
     693              :                     buffers_);
     694              : 
     695           48 :             if(self_->active_read_ops_->await_ready(
     696           48 :                 self_->cached_awaitable_))
     697           48 :                 return h;
     698              : 
     699            0 :             return self_->active_read_ops_->await_suspend(
     700            0 :                 self_->cached_awaitable_, h, env);
     701              :         }
     702              : 
     703              :         io_result<std::size_t>
     704           48 :         await_resume()
     705              :         {
     706              :             struct guard {
     707              :                 any_buffer_source* self;
     708           48 :                 ~guard() {
     709           48 :                     self->active_read_ops_->destroy(
     710           48 :                         self->cached_awaitable_);
     711           48 :                     self->active_read_ops_ = nullptr;
     712           48 :                 }
     713           48 :             } g{self_};
     714           48 :             return self_->active_read_ops_->await_resume(
     715           88 :                 self_->cached_awaitable_);
     716           48 :         }
     717              :     };
     718           48 :     return awaitable{this, buffers};
     719              : }
     720              : 
     721              : inline auto
     722           18 : any_buffer_source::read_(
     723              :     std::span<mutable_buffer const> buffers)
     724              : {
     725              :     struct awaitable
     726              :     {
     727              :         any_buffer_source* self_;
     728              :         std::span<mutable_buffer const> buffers_;
     729              : 
     730              :         bool
     731           18 :         await_ready() const noexcept
     732              :         {
     733           18 :             return false;
     734              :         }
     735              : 
     736              :         std::coroutine_handle<>
     737           18 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     738              :         {
     739           36 :             self_->active_read_ops_ =
     740           36 :                 self_->vt_->construct_read_awaitable(
     741           18 :                     self_->source_,
     742           18 :                     self_->cached_awaitable_,
     743              :                     buffers_);
     744              : 
     745           18 :             if(self_->active_read_ops_->await_ready(
     746           18 :                 self_->cached_awaitable_))
     747           18 :                 return h;
     748              : 
     749            0 :             return self_->active_read_ops_->await_suspend(
     750            0 :                 self_->cached_awaitable_, h, env);
     751              :         }
     752              : 
     753              :         io_result<std::size_t>
     754           18 :         await_resume()
     755              :         {
     756              :             struct guard {
     757              :                 any_buffer_source* self;
     758           18 :                 ~guard() {
     759           18 :                     self->active_read_ops_->destroy(
     760           18 :                         self->cached_awaitable_);
     761           18 :                     self->active_read_ops_ = nullptr;
     762           18 :                 }
     763           18 :             } g{self_};
     764           18 :             return self_->active_read_ops_->await_resume(
     765           30 :                 self_->cached_awaitable_);
     766           18 :         }
     767              :     };
     768           18 :     return awaitable{this, buffers};
     769              : }
     770              : 
     771              : //----------------------------------------------------------
     772              : // Public ReadSource methods
     773              : 
     774              : template<MutableBufferSequence MB>
     775              : io_task<std::size_t>
     776           58 : any_buffer_source::read_some(MB buffers)
     777              : {
     778              :     buffer_param<MB> bp(buffers);
     779              :     auto dest = bp.data();
     780              :     if(dest.empty())
     781              :         co_return {{}, 0};
     782              : 
     783              :     // Native ReadSource path
     784              :     if(vt_->construct_read_some_awaitable)
     785              :         co_return co_await read_some_(dest);
     786              : 
     787              :     // Synthesized path: pull + buffer_copy + consume
     788              :     const_buffer arr[detail::max_iovec_];
     789              :     auto [ec, bufs] = co_await pull(arr);
     790              :     if(ec)
     791              :         co_return {ec, 0};
     792              : 
     793              :     auto n = buffer_copy(dest, bufs);
     794              :     consume(n);
     795              :     co_return {{}, n};
     796          116 : }
     797              : 
     798              : template<MutableBufferSequence MB>
     799              : io_task<std::size_t>
     800           24 : any_buffer_source::read(MB buffers)
     801              : {
     802              :     buffer_param<MB> bp(buffers);
     803              :     std::size_t total = 0;
     804              : 
     805              :     // Native ReadSource path
     806              :     if(vt_->construct_read_awaitable)
     807              :     {
     808              :         for(;;)
     809              :         {
     810              :             auto dest = bp.data();
     811              :             if(dest.empty())
     812              :                 break;
     813              : 
     814              :             auto [ec, n] = co_await read_(dest);
     815              :             total += n;
     816              :             if(ec)
     817              :                 co_return {ec, total};
     818              :             bp.consume(n);
     819              :         }
     820              :         co_return {{}, total};
     821              :     }
     822              : 
     823              :     // Synthesized path: pull + buffer_copy + consume
     824              :     for(;;)
     825              :     {
     826              :         auto dest = bp.data();
     827              :         if(dest.empty())
     828              :             break;
     829              : 
     830              :         const_buffer arr[detail::max_iovec_];
     831              :         auto [ec, bufs] = co_await pull(arr);
     832              : 
     833              :         if(ec)
     834              :             co_return {ec, total};
     835              : 
     836              :         auto n = buffer_copy(dest, bufs);
     837              :         consume(n);
     838              :         total += n;
     839              :         bp.consume(n);
     840              :     }
     841              : 
     842              :     co_return {{}, total};
     843           48 : }
     844              : 
     845              : //----------------------------------------------------------
     846              : 
     847              : static_assert(BufferSource<any_buffer_source>);
     848              : static_assert(ReadSource<any_buffer_source>);
     849              : 
     850              : } // namespace capy
     851              : } // namespace boost
     852              : 
     853              : #endif
        

Generated by: LCOV version 2.3