1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/buffers/slice.hpp>
18  
#include <boost/capy/buffers/slice.hpp>
19  
#include <boost/capy/concept/buffer_source.hpp>
19  
#include <boost/capy/concept/buffer_source.hpp>
20  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/io_awaitable.hpp>
21  
#include <boost/capy/concept/read_source.hpp>
21  
#include <boost/capy/concept/read_source.hpp>
22  
#include <boost/capy/error.hpp>
22  
#include <boost/capy/error.hpp>
23  
#include <boost/capy/ex/io_env.hpp>
23  
#include <boost/capy/ex/io_env.hpp>
24  
#include <boost/capy/io_result.hpp>
24  
#include <boost/capy/io_result.hpp>
25  
#include <boost/capy/io_task.hpp>
25  
#include <boost/capy/io_task.hpp>
26  

26  

27  
#include <concepts>
27  
#include <concepts>
28  
#include <coroutine>
28  
#include <coroutine>
29  
#include <cstddef>
29  
#include <cstddef>
30  
#include <exception>
30  
#include <exception>
31  
#include <new>
31  
#include <new>
32  
#include <span>
32  
#include <span>
33  
#include <stop_token>
33  
#include <stop_token>
34  
#include <system_error>
34  
#include <system_error>
35  
#include <utility>
35  
#include <utility>
36  

36  

37  
namespace boost {
37  
namespace boost {
38  
namespace capy {
38  
namespace capy {
39  

39  

40  
/** Type-erased wrapper for any BufferSource.
40  
/** Type-erased wrapper for any BufferSource.
41  

41  

42  
    This class provides type erasure for any type satisfying the
42  
    This class provides type erasure for any type satisfying the
43  
    @ref BufferSource concept, enabling runtime polymorphism for
43  
    @ref BufferSource concept, enabling runtime polymorphism for
44  
    buffer pull operations. It uses cached awaitable storage to achieve
44  
    buffer pull operations. It uses cached awaitable storage to achieve
45  
    zero steady-state allocation after construction.
45  
    zero steady-state allocation after construction.
46  

46  

47  
    The wrapper also satisfies @ref ReadSource. When the wrapped type
47  
    The wrapper also satisfies @ref ReadSource. When the wrapped type
48  
    satisfies only @ref BufferSource, the read operations are
48  
    satisfies only @ref BufferSource, the read operations are
49  
    synthesized using @ref pull and @ref consume with an extra
49  
    synthesized using @ref pull and @ref consume with an extra
50  
    buffer copy. When the wrapped type satisfies both @ref BufferSource
50  
    buffer copy. When the wrapped type satisfies both @ref BufferSource
51  
    and @ref ReadSource, the native read operations are forwarded
51  
    and @ref ReadSource, the native read operations are forwarded
52  
    directly across the virtual boundary, avoiding the copy.
52  
    directly across the virtual boundary, avoiding the copy.
53  

53  

54  
    The wrapper supports two construction modes:
54  
    The wrapper supports two construction modes:
55  
    - **Owning**: Pass by value to transfer ownership. The wrapper
55  
    - **Owning**: Pass by value to transfer ownership. The wrapper
56  
      allocates storage and owns the source.
56  
      allocates storage and owns the source.
57  
    - **Reference**: Pass a pointer to wrap without ownership. The
57  
    - **Reference**: Pass a pointer to wrap without ownership. The
58  
      pointed-to source must outlive this wrapper.
58  
      pointed-to source must outlive this wrapper.
59  

59  

60  
    Within each mode, the vtable is populated at compile time based
60  
    Within each mode, the vtable is populated at compile time based
61  
    on whether the wrapped type also satisfies @ref ReadSource:
61  
    on whether the wrapped type also satisfies @ref ReadSource:
62  
    - **BufferSource only**: @ref read_some and @ref read are
62  
    - **BufferSource only**: @ref read_some and @ref read are
63  
      synthesized from @ref pull and @ref consume, incurring one
63  
      synthesized from @ref pull and @ref consume, incurring one
64  
      buffer copy per operation.
64  
      buffer copy per operation.
65  
    - **BufferSource + ReadSource**: All read operations are
65  
    - **BufferSource + ReadSource**: All read operations are
66  
      forwarded natively through the type-erased boundary with
66  
      forwarded natively through the type-erased boundary with
67  
      no extra copy.
67  
      no extra copy.
68  

68  

69  
    @par Awaitable Preallocation
69  
    @par Awaitable Preallocation
70  
    The constructor preallocates storage for the type-erased awaitable.
70  
    The constructor preallocates storage for the type-erased awaitable.
71  
    This reserves all virtual address space at server startup
71  
    This reserves all virtual address space at server startup
72  
    so memory usage can be measured up front, rather than
72  
    so memory usage can be measured up front, rather than
73  
    allocating piecemeal as traffic arrives.
73  
    allocating piecemeal as traffic arrives.
74  

74  

75  
    @par Thread Safety
75  
    @par Thread Safety
76  
    Not thread-safe. Concurrent operations on the same wrapper
76  
    Not thread-safe. Concurrent operations on the same wrapper
77  
    are undefined behavior.
77  
    are undefined behavior.
78  

78  

79  
    @par Example
79  
    @par Example
80  
    @code
80  
    @code
81  
    // Owning - takes ownership of the source
81  
    // Owning - takes ownership of the source
82  
    any_buffer_source abs(some_buffer_source{args...});
82  
    any_buffer_source abs(some_buffer_source{args...});
83  

83  

84  
    // Reference - wraps without ownership
84  
    // Reference - wraps without ownership
85  
    some_buffer_source src;
85  
    some_buffer_source src;
86  
    any_buffer_source abs(&src);
86  
    any_buffer_source abs(&src);
87  

87  

88  
    const_buffer arr[16];
88  
    const_buffer arr[16];
89  
    auto [ec, bufs] = co_await abs.pull(arr);
89  
    auto [ec, bufs] = co_await abs.pull(arr);
90  

90  

91  
    // ReadSource interface also available
91  
    // ReadSource interface also available
92  
    char buf[64];
92  
    char buf[64];
93  
    auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
93  
    auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
94  
    @endcode
94  
    @endcode
95  

95  

96  
    @see any_buffer_sink, BufferSource, ReadSource
96  
    @see any_buffer_sink, BufferSource, ReadSource
97  
*/
97  
*/
98  
class any_buffer_source
98  
class any_buffer_source
99  
{
99  
{
100  
    struct vtable;
100  
    struct vtable;
101  
    struct awaitable_ops;
101  
    struct awaitable_ops;
102  
    struct read_awaitable_ops;
102  
    struct read_awaitable_ops;
103  

103  

104  
    template<BufferSource S>
104  
    template<BufferSource S>
105  
    struct vtable_for_impl;
105  
    struct vtable_for_impl;
106  

106  

107  
    // hot-path members first for cache locality
107  
    // hot-path members first for cache locality
108  
    void* source_ = nullptr;
108  
    void* source_ = nullptr;
109  
    vtable const* vt_ = nullptr;
109  
    vtable const* vt_ = nullptr;
110  
    void* cached_awaitable_ = nullptr;
110  
    void* cached_awaitable_ = nullptr;
111  
    awaitable_ops const* active_ops_ = nullptr;
111  
    awaitable_ops const* active_ops_ = nullptr;
112  
    read_awaitable_ops const* active_read_ops_ = nullptr;
112  
    read_awaitable_ops const* active_read_ops_ = nullptr;
113  
    void* storage_ = nullptr;
113  
    void* storage_ = nullptr;
114  

114  

115  
public:
115  
public:
116  
    /** Destructor.
116  
    /** Destructor.
117  

117  

118  
        Destroys the owned source (if any) and releases the cached
118  
        Destroys the owned source (if any) and releases the cached
119  
        awaitable storage.
119  
        awaitable storage.
120  
    */
120  
    */
121  
    ~any_buffer_source();
121  
    ~any_buffer_source();
122  

122  

123  
    /** Default constructor.
123  
    /** Default constructor.
124  

124  

125  
        Constructs an empty wrapper. Operations on a default-constructed
125  
        Constructs an empty wrapper. Operations on a default-constructed
126  
        wrapper result in undefined behavior.
126  
        wrapper result in undefined behavior.
127  
    */
127  
    */
128  
    any_buffer_source() = default;
128  
    any_buffer_source() = default;
129  

129  

130  
    /** Non-copyable.
130  
    /** Non-copyable.
131  

131  

132  
        The awaitable cache is per-instance and cannot be shared.
132  
        The awaitable cache is per-instance and cannot be shared.
133  
    */
133  
    */
134  
    any_buffer_source(any_buffer_source const&) = delete;
134  
    any_buffer_source(any_buffer_source const&) = delete;
135  
    any_buffer_source& operator=(any_buffer_source const&) = delete;
135  
    any_buffer_source& operator=(any_buffer_source const&) = delete;
136  

136  

137  
    /** Move constructor.
137  
    /** Move constructor.
138  

138  

139  
        Transfers ownership of the wrapped source (if owned) and
139  
        Transfers ownership of the wrapped source (if owned) and
140  
        cached awaitable storage from `other`. After the move, `other` is
140  
        cached awaitable storage from `other`. After the move, `other` is
141  
        in a default-constructed state.
141  
        in a default-constructed state.
142  

142  

143  
        @param other The wrapper to move from.
143  
        @param other The wrapper to move from.
144  
    */
144  
    */
145  
    any_buffer_source(any_buffer_source&& other) noexcept
145  
    any_buffer_source(any_buffer_source&& other) noexcept
146  
        : source_(std::exchange(other.source_, nullptr))
146  
        : source_(std::exchange(other.source_, nullptr))
147  
        , vt_(std::exchange(other.vt_, nullptr))
147  
        , vt_(std::exchange(other.vt_, nullptr))
148  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
148  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
149  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
149  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
150  
        , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
150  
        , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
151  
        , storage_(std::exchange(other.storage_, nullptr))
151  
        , storage_(std::exchange(other.storage_, nullptr))
152  
    {
152  
    {
153  
    }
153  
    }
154  

154  

155  
    /** Move assignment operator.
155  
    /** Move assignment operator.
156  

156  

157  
        Destroys any owned source and releases existing resources,
157  
        Destroys any owned source and releases existing resources,
158  
        then transfers ownership from `other`.
158  
        then transfers ownership from `other`.
159  

159  

160  
        @param other The wrapper to move from.
160  
        @param other The wrapper to move from.
161  
        @return Reference to this wrapper.
161  
        @return Reference to this wrapper.
162  
    */
162  
    */
163  
    any_buffer_source&
163  
    any_buffer_source&
164  
    operator=(any_buffer_source&& other) noexcept;
164  
    operator=(any_buffer_source&& other) noexcept;
165  

165  

166  
    /** Construct by taking ownership of a BufferSource.
166  
    /** Construct by taking ownership of a BufferSource.
167  

167  

168  
        Allocates storage and moves the source into this wrapper.
168  
        Allocates storage and moves the source into this wrapper.
169  
        The wrapper owns the source and will destroy it. If `S` also
169  
        The wrapper owns the source and will destroy it. If `S` also
170  
        satisfies @ref ReadSource, native read operations are
170  
        satisfies @ref ReadSource, native read operations are
171  
        forwarded through the virtual boundary.
171  
        forwarded through the virtual boundary.
172  

172  

173  
        @param s The source to take ownership of.
173  
        @param s The source to take ownership of.
174  
    */
174  
    */
175  
    template<BufferSource S>
175  
    template<BufferSource S>
176  
        requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
176  
        requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
177  
    any_buffer_source(S s);
177  
    any_buffer_source(S s);
178  

178  

179  
    /** Construct by wrapping a BufferSource without ownership.
179  
    /** Construct by wrapping a BufferSource without ownership.
180  

180  

181  
        Wraps the given source by pointer. The source must remain
181  
        Wraps the given source by pointer. The source must remain
182  
        valid for the lifetime of this wrapper. If `S` also
182  
        valid for the lifetime of this wrapper. If `S` also
183  
        satisfies @ref ReadSource, native read operations are
183  
        satisfies @ref ReadSource, native read operations are
184  
        forwarded through the virtual boundary.
184  
        forwarded through the virtual boundary.
185  

185  

186  
        @param s Pointer to the source to wrap.
186  
        @param s Pointer to the source to wrap.
187  
    */
187  
    */
188  
    template<BufferSource S>
188  
    template<BufferSource S>
189  
    any_buffer_source(S* s);
189  
    any_buffer_source(S* s);
190  

190  

191  
    /** Check if the wrapper contains a valid source.
191  
    /** Check if the wrapper contains a valid source.
192  

192  

193  
        @return `true` if wrapping a source, `false` if default-constructed
193  
        @return `true` if wrapping a source, `false` if default-constructed
194  
            or moved-from.
194  
            or moved-from.
195  
    */
195  
    */
196  
    bool
196  
    bool
197  
    has_value() const noexcept
197  
    has_value() const noexcept
198  
    {
198  
    {
199  
        return source_ != nullptr;
199  
        return source_ != nullptr;
200  
    }
200  
    }
201  

201  

202  
    /** Check if the wrapper contains a valid source.
202  
    /** Check if the wrapper contains a valid source.
203  

203  

204  
        @return `true` if wrapping a source, `false` if default-constructed
204  
        @return `true` if wrapping a source, `false` if default-constructed
205  
            or moved-from.
205  
            or moved-from.
206  
    */
206  
    */
207  
    explicit
207  
    explicit
208  
    operator bool() const noexcept
208  
    operator bool() const noexcept
209  
    {
209  
    {
210  
        return has_value();
210  
        return has_value();
211  
    }
211  
    }
212  

212  

213  
    /** Consume bytes from the source.
213  
    /** Consume bytes from the source.
214  

214  

215  
        Advances the internal read position of the underlying source
215  
        Advances the internal read position of the underlying source
216  
        by the specified number of bytes. The next call to @ref pull
216  
        by the specified number of bytes. The next call to @ref pull
217  
        returns data starting after the consumed bytes.
217  
        returns data starting after the consumed bytes.
218  

218  

219  
        @param n The number of bytes to consume. Must not exceed the
219  
        @param n The number of bytes to consume. Must not exceed the
220  
        total size of buffers returned by the previous @ref pull.
220  
        total size of buffers returned by the previous @ref pull.
221  

221  

222  
        @par Preconditions
222  
        @par Preconditions
223  
        The wrapper must contain a valid source (`has_value() == true`).
223  
        The wrapper must contain a valid source (`has_value() == true`).
224  
    */
224  
    */
225  
    void
225  
    void
226  
    consume(std::size_t n) noexcept;
226  
    consume(std::size_t n) noexcept;
227  

227  

228  
    /** Pull buffer data from the source.
228  
    /** Pull buffer data from the source.
229  

229  

230  
        Fills the provided span with buffer descriptors from the
230  
        Fills the provided span with buffer descriptors from the
231  
        underlying source. The operation completes when data is
231  
        underlying source. The operation completes when data is
232  
        available, the source is exhausted, or an error occurs.
232  
        available, the source is exhausted, or an error occurs.
233  

233  

234  
        @param dest Span of const_buffer to fill.
234  
        @param dest Span of const_buffer to fill.
235  

235  

236  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
236  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
237  
            On success with data, a non-empty span of filled buffers.
237  
            On success with data, a non-empty span of filled buffers.
238  
            On EOF, `ec == cond::eof` and span is empty.
238  
            On EOF, `ec == cond::eof` and span is empty.
239  

239  

240  
        @par Preconditions
240  
        @par Preconditions
241  
        The wrapper must contain a valid source (`has_value() == true`).
241  
        The wrapper must contain a valid source (`has_value() == true`).
242  
        The caller must not call this function again after a prior
242  
        The caller must not call this function again after a prior
243  
        call returned an error.
243  
        call returned an error.
244  
    */
244  
    */
245  
    auto
245  
    auto
246  
    pull(std::span<const_buffer> dest);
246  
    pull(std::span<const_buffer> dest);
247  

247  

248  
    /** Read some data into a mutable buffer sequence.
248  
    /** Read some data into a mutable buffer sequence.
249  

249  

250  
        Reads one or more bytes into the caller's buffers. May fill
250  
        Reads one or more bytes into the caller's buffers. May fill
251  
        less than the full sequence.
251  
        less than the full sequence.
252  

252  

253  
        When the wrapped type provides native @ref ReadSource support,
253  
        When the wrapped type provides native @ref ReadSource support,
254  
        the operation forwards directly. Otherwise it is synthesized
254  
        the operation forwards directly. Otherwise it is synthesized
255  
        from @ref pull, @ref buffer_copy, and @ref consume.
255  
        from @ref pull, @ref buffer_copy, and @ref consume.
256  

256  

257  
        @param buffers The buffer sequence to fill.
257  
        @param buffers The buffer sequence to fill.
258  

258  

259  
        @return An awaitable yielding `(error_code,std::size_t)`.
259  
        @return An awaitable yielding `(error_code,std::size_t)`.
260  

260  

261  
        @par Preconditions
261  
        @par Preconditions
262  
        The wrapper must contain a valid source (`has_value() == true`).
262  
        The wrapper must contain a valid source (`has_value() == true`).
263  
        The caller must not call this function again after a prior
263  
        The caller must not call this function again after a prior
264  
        call returned an error (including EOF).
264  
        call returned an error (including EOF).
265  

265  

266  
        @see pull, consume
266  
        @see pull, consume
267  
    */
267  
    */
268  
    template<MutableBufferSequence MB>
268  
    template<MutableBufferSequence MB>
269  
    io_task<std::size_t>
269  
    io_task<std::size_t>
270  
    read_some(MB buffers);
270  
    read_some(MB buffers);
271  

271  

272  
    /** Read data into a mutable buffer sequence.
272  
    /** Read data into a mutable buffer sequence.
273  

273  

274  
        Fills the provided buffer sequence completely. When the
274  
        Fills the provided buffer sequence completely. When the
275  
        wrapped type provides native @ref ReadSource support, each
275  
        wrapped type provides native @ref ReadSource support, each
276  
        window is forwarded directly. Otherwise the data is
276  
        window is forwarded directly. Otherwise the data is
277  
        synthesized from @ref pull, @ref buffer_copy, and @ref consume.
277  
        synthesized from @ref pull, @ref buffer_copy, and @ref consume.
278  

278  

279  
        @param buffers The buffer sequence to fill.
279  
        @param buffers The buffer sequence to fill.
280  

280  

281  
        @return An awaitable yielding `(error_code,std::size_t)`.
281  
        @return An awaitable yielding `(error_code,std::size_t)`.
282  
            On success, `n == buffer_size(buffers)`.
282  
            On success, `n == buffer_size(buffers)`.
283  
            On EOF, `ec == error::eof` and `n` is bytes transferred.
283  
            On EOF, `ec == error::eof` and `n` is bytes transferred.
284  

284  

285  
        @par Preconditions
285  
        @par Preconditions
286  
        The wrapper must contain a valid source (`has_value() == true`).
286  
        The wrapper must contain a valid source (`has_value() == true`).
287  
        The caller must not call this function again after a prior
287  
        The caller must not call this function again after a prior
288  
        call returned an error (including EOF).
288  
        call returned an error (including EOF).
289  

289  

290  
        @see pull, consume
290  
        @see pull, consume
291  
    */
291  
    */
292  
    template<MutableBufferSequence MB>
292  
    template<MutableBufferSequence MB>
293  
    io_task<std::size_t>
293  
    io_task<std::size_t>
294  
    read(MB buffers);
294  
    read(MB buffers);
295  

295  

296  
protected:
296  
protected:
297  
    /** Rebind to a new source after move.
297  
    /** Rebind to a new source after move.
298  

298  

299  
        Updates the internal pointer to reference a new source object.
299  
        Updates the internal pointer to reference a new source object.
300  
        Used by owning wrappers after move assignment when the owned
300  
        Used by owning wrappers after move assignment when the owned
301  
        object has moved to a new location.
301  
        object has moved to a new location.
302  

302  

303  
        @param new_source The new source to bind to. Must be the same
303  
        @param new_source The new source to bind to. Must be the same
304  
            type as the original source.
304  
            type as the original source.
305  

305  

306  
        @note Terminates if called with a source of different type
306  
        @note Terminates if called with a source of different type
307  
            than the original.
307  
            than the original.
308  
    */
308  
    */
309  
    template<BufferSource S>
309  
    template<BufferSource S>
310  
    void
310  
    void
311  
    rebind(S& new_source) noexcept
311  
    rebind(S& new_source) noexcept
312  
    {
312  
    {
313  
        if(vt_ != &vtable_for_impl<S>::value)
313  
        if(vt_ != &vtable_for_impl<S>::value)
314  
            std::terminate();
314  
            std::terminate();
315  
        source_ = &new_source;
315  
        source_ = &new_source;
316  
    }
316  
    }
317  

317  

318  
private:
318  
private:
319  
    /** Forward a partial read through the vtable.
319  
    /** Forward a partial read through the vtable.
320  

320  

321  
        Constructs the underlying `read_some` awaitable in
321  
        Constructs the underlying `read_some` awaitable in
322  
        cached storage and returns a type-erased awaitable.
322  
        cached storage and returns a type-erased awaitable.
323  
    */
323  
    */
324  
    auto
324  
    auto
325  
    read_some_(std::span<mutable_buffer const> buffers);
325  
    read_some_(std::span<mutable_buffer const> buffers);
326  

326  

327  
    /** Forward a complete read through the vtable.
327  
    /** Forward a complete read through the vtable.
328  

328  

329  
        Constructs the underlying `read` awaitable in
329  
        Constructs the underlying `read` awaitable in
330  
        cached storage and returns a type-erased awaitable.
330  
        cached storage and returns a type-erased awaitable.
331  
    */
331  
    */
332  
    auto
332  
    auto
333  
    read_(std::span<mutable_buffer const> buffers);
333  
    read_(std::span<mutable_buffer const> buffers);
334  
};
334  
};
335  

335  

336  
//----------------------------------------------------------
336  
//----------------------------------------------------------
337  

337  

338  
/** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
338  
/** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
339  
struct any_buffer_source::awaitable_ops
339  
struct any_buffer_source::awaitable_ops
340  
{
340  
{
341  
    bool (*await_ready)(void*);
341  
    bool (*await_ready)(void*);
342  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
342  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
343  
    io_result<std::span<const_buffer>> (*await_resume)(void*);
343  
    io_result<std::span<const_buffer>> (*await_resume)(void*);
344  
    void (*destroy)(void*) noexcept;
344  
    void (*destroy)(void*) noexcept;
345  
};
345  
};
346  

346  

347  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
347  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
348  
struct any_buffer_source::read_awaitable_ops
348  
struct any_buffer_source::read_awaitable_ops
349  
{
349  
{
350  
    bool (*await_ready)(void*);
350  
    bool (*await_ready)(void*);
351  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
351  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
352  
    io_result<std::size_t> (*await_resume)(void*);
352  
    io_result<std::size_t> (*await_resume)(void*);
353  
    void (*destroy)(void*) noexcept;
353  
    void (*destroy)(void*) noexcept;
354  
};
354  
};
355  

355  

356  
struct any_buffer_source::vtable
356  
struct any_buffer_source::vtable
357  
{
357  
{
358  
    // BufferSource ops (always populated)
358  
    // BufferSource ops (always populated)
359  
    void (*destroy)(void*) noexcept;
359  
    void (*destroy)(void*) noexcept;
360  
    void (*do_consume)(void* source, std::size_t n) noexcept;
360  
    void (*do_consume)(void* source, std::size_t n) noexcept;
361  
    std::size_t awaitable_size;
361  
    std::size_t awaitable_size;
362  
    std::size_t awaitable_align;
362  
    std::size_t awaitable_align;
363  
    awaitable_ops const* (*construct_awaitable)(
363  
    awaitable_ops const* (*construct_awaitable)(
364  
        void* source,
364  
        void* source,
365  
        void* storage,
365  
        void* storage,
366  
        std::span<const_buffer> dest);
366  
        std::span<const_buffer> dest);
367  

367  

368  
    // ReadSource forwarding (null when wrapped type is BufferSource-only)
368  
    // ReadSource forwarding (null when wrapped type is BufferSource-only)
369  
    read_awaitable_ops const* (*construct_read_some_awaitable)(
369  
    read_awaitable_ops const* (*construct_read_some_awaitable)(
370  
        void* source,
370  
        void* source,
371  
        void* storage,
371  
        void* storage,
372  
        std::span<mutable_buffer const> buffers);
372  
        std::span<mutable_buffer const> buffers);
373  
    read_awaitable_ops const* (*construct_read_awaitable)(
373  
    read_awaitable_ops const* (*construct_read_awaitable)(
374  
        void* source,
374  
        void* source,
375  
        void* storage,
375  
        void* storage,
376  
        std::span<mutable_buffer const> buffers);
376  
        std::span<mutable_buffer const> buffers);
377  
};
377  
};
378  

378  

379  
template<BufferSource S>
379  
template<BufferSource S>
380  
struct any_buffer_source::vtable_for_impl
380  
struct any_buffer_source::vtable_for_impl
381  
{
381  
{
382  
    using PullAwaitable = decltype(std::declval<S&>().pull(
382  
    using PullAwaitable = decltype(std::declval<S&>().pull(
383  
        std::declval<std::span<const_buffer>>()));
383  
        std::declval<std::span<const_buffer>>()));
384  

384  

385  
    static void
385  
    static void
386  
    do_destroy_impl(void* source) noexcept
386  
    do_destroy_impl(void* source) noexcept
387  
    {
387  
    {
388  
        static_cast<S*>(source)->~S();
388  
        static_cast<S*>(source)->~S();
389  
    }
389  
    }
390  

390  

391  
    static void
391  
    static void
392  
    do_consume_impl(void* source, std::size_t n) noexcept
392  
    do_consume_impl(void* source, std::size_t n) noexcept
393  
    {
393  
    {
394  
        static_cast<S*>(source)->consume(n);
394  
        static_cast<S*>(source)->consume(n);
395  
    }
395  
    }
396  

396  

397  
    static awaitable_ops const*
397  
    static awaitable_ops const*
398  
    construct_awaitable_impl(
398  
    construct_awaitable_impl(
399  
        void* source,
399  
        void* source,
400  
        void* storage,
400  
        void* storage,
401  
        std::span<const_buffer> dest)
401  
        std::span<const_buffer> dest)
402  
    {
402  
    {
403  
        auto& s = *static_cast<S*>(source);
403  
        auto& s = *static_cast<S*>(source);
404  
        ::new(storage) PullAwaitable(s.pull(dest));
404  
        ::new(storage) PullAwaitable(s.pull(dest));
405  

405  

406  
        static constexpr awaitable_ops ops = {
406  
        static constexpr awaitable_ops ops = {
407  
            +[](void* p) {
407  
            +[](void* p) {
408  
                return static_cast<PullAwaitable*>(p)->await_ready();
408  
                return static_cast<PullAwaitable*>(p)->await_ready();
409  
            },
409  
            },
410  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
410  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
411  
                return detail::call_await_suspend(
411  
                return detail::call_await_suspend(
412  
                    static_cast<PullAwaitable*>(p), h, env);
412  
                    static_cast<PullAwaitable*>(p), h, env);
413  
            },
413  
            },
414  
            +[](void* p) {
414  
            +[](void* p) {
415  
                return static_cast<PullAwaitable*>(p)->await_resume();
415  
                return static_cast<PullAwaitable*>(p)->await_resume();
416  
            },
416  
            },
417  
            +[](void* p) noexcept {
417  
            +[](void* p) noexcept {
418  
                static_cast<PullAwaitable*>(p)->~PullAwaitable();
418  
                static_cast<PullAwaitable*>(p)->~PullAwaitable();
419  
            }
419  
            }
420  
        };
420  
        };
421  
        return &ops;
421  
        return &ops;
422  
    }
422  
    }
423  

423  

424  
    //------------------------------------------------------
424  
    //------------------------------------------------------
425  
    // ReadSource forwarding (only instantiated when ReadSource<S>)
425  
    // ReadSource forwarding (only instantiated when ReadSource<S>)
426  

426  

427  
    static read_awaitable_ops const*
427  
    static read_awaitable_ops const*
428  
    construct_read_some_awaitable_impl(
428  
    construct_read_some_awaitable_impl(
429  
        void* source,
429  
        void* source,
430  
        void* storage,
430  
        void* storage,
431  
        std::span<mutable_buffer const> buffers)
431  
        std::span<mutable_buffer const> buffers)
432  
        requires ReadSource<S>
432  
        requires ReadSource<S>
433  
    {
433  
    {
434  
        using Aw = decltype(std::declval<S&>().read_some(
434  
        using Aw = decltype(std::declval<S&>().read_some(
435  
            std::span<mutable_buffer const>{}));
435  
            std::span<mutable_buffer const>{}));
436  
        auto& s = *static_cast<S*>(source);
436  
        auto& s = *static_cast<S*>(source);
437  
        ::new(storage) Aw(s.read_some(buffers));
437  
        ::new(storage) Aw(s.read_some(buffers));
438  

438  

439  
        static constexpr read_awaitable_ops ops = {
439  
        static constexpr read_awaitable_ops ops = {
440  
            +[](void* p) {
440  
            +[](void* p) {
441  
                return static_cast<Aw*>(p)->await_ready();
441  
                return static_cast<Aw*>(p)->await_ready();
442  
            },
442  
            },
443  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
443  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
444  
                return detail::call_await_suspend(
444  
                return detail::call_await_suspend(
445  
                    static_cast<Aw*>(p), h, env);
445  
                    static_cast<Aw*>(p), h, env);
446  
            },
446  
            },
447  
            +[](void* p) {
447  
            +[](void* p) {
448  
                return static_cast<Aw*>(p)->await_resume();
448  
                return static_cast<Aw*>(p)->await_resume();
449  
            },
449  
            },
450  
            +[](void* p) noexcept {
450  
            +[](void* p) noexcept {
451  
                static_cast<Aw*>(p)->~Aw();
451  
                static_cast<Aw*>(p)->~Aw();
452  
            }
452  
            }
453  
        };
453  
        };
454  
        return &ops;
454  
        return &ops;
455  
    }
455  
    }
456  

456  

457  
    static read_awaitable_ops const*
457  
    static read_awaitable_ops const*
458  
    construct_read_awaitable_impl(
458  
    construct_read_awaitable_impl(
459  
        void* source,
459  
        void* source,
460  
        void* storage,
460  
        void* storage,
461  
        std::span<mutable_buffer const> buffers)
461  
        std::span<mutable_buffer const> buffers)
462  
        requires ReadSource<S>
462  
        requires ReadSource<S>
463  
    {
463  
    {
464  
        using Aw = decltype(std::declval<S&>().read(
464  
        using Aw = decltype(std::declval<S&>().read(
465  
            std::span<mutable_buffer const>{}));
465  
            std::span<mutable_buffer const>{}));
466  
        auto& s = *static_cast<S*>(source);
466  
        auto& s = *static_cast<S*>(source);
467  
        ::new(storage) Aw(s.read(buffers));
467  
        ::new(storage) Aw(s.read(buffers));
468  

468  

469  
        static constexpr read_awaitable_ops ops = {
469  
        static constexpr read_awaitable_ops ops = {
470  
            +[](void* p) {
470  
            +[](void* p) {
471  
                return static_cast<Aw*>(p)->await_ready();
471  
                return static_cast<Aw*>(p)->await_ready();
472  
            },
472  
            },
473  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
473  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
474  
                return detail::call_await_suspend(
474  
                return detail::call_await_suspend(
475  
                    static_cast<Aw*>(p), h, env);
475  
                    static_cast<Aw*>(p), h, env);
476  
            },
476  
            },
477  
            +[](void* p) {
477  
            +[](void* p) {
478  
                return static_cast<Aw*>(p)->await_resume();
478  
                return static_cast<Aw*>(p)->await_resume();
479  
            },
479  
            },
480  
            +[](void* p) noexcept {
480  
            +[](void* p) noexcept {
481  
                static_cast<Aw*>(p)->~Aw();
481  
                static_cast<Aw*>(p)->~Aw();
482  
            }
482  
            }
483  
        };
483  
        };
484  
        return &ops;
484  
        return &ops;
485  
    }
485  
    }
486  

486  

487  
    //------------------------------------------------------
487  
    //------------------------------------------------------
488  

488  

489  
    static consteval std::size_t
489  
    static consteval std::size_t
490  
    compute_max_size() noexcept
490  
    compute_max_size() noexcept
491  
    {
491  
    {
492  
        std::size_t s = sizeof(PullAwaitable);
492  
        std::size_t s = sizeof(PullAwaitable);
493  
        if constexpr (ReadSource<S>)
493  
        if constexpr (ReadSource<S>)
494  
        {
494  
        {
495  
            using RS = decltype(std::declval<S&>().read_some(
495  
            using RS = decltype(std::declval<S&>().read_some(
496  
                std::span<mutable_buffer const>{}));
496  
                std::span<mutable_buffer const>{}));
497  
            using R = decltype(std::declval<S&>().read(
497  
            using R = decltype(std::declval<S&>().read(
498  
                std::span<mutable_buffer const>{}));
498  
                std::span<mutable_buffer const>{}));
499  

499  

500  
            if(sizeof(RS) > s) s = sizeof(RS);
500  
            if(sizeof(RS) > s) s = sizeof(RS);
501  
            if(sizeof(R) > s) s = sizeof(R);
501  
            if(sizeof(R) > s) s = sizeof(R);
502  
        }
502  
        }
503  
        return s;
503  
        return s;
504  
    }
504  
    }
505  

505  

506  
    static consteval std::size_t
506  
    static consteval std::size_t
507  
    compute_max_align() noexcept
507  
    compute_max_align() noexcept
508  
    {
508  
    {
509  
        std::size_t a = alignof(PullAwaitable);
509  
        std::size_t a = alignof(PullAwaitable);
510  
        if constexpr (ReadSource<S>)
510  
        if constexpr (ReadSource<S>)
511  
        {
511  
        {
512  
            using RS = decltype(std::declval<S&>().read_some(
512  
            using RS = decltype(std::declval<S&>().read_some(
513  
                std::span<mutable_buffer const>{}));
513  
                std::span<mutable_buffer const>{}));
514  
            using R = decltype(std::declval<S&>().read(
514  
            using R = decltype(std::declval<S&>().read(
515  
                std::span<mutable_buffer const>{}));
515  
                std::span<mutable_buffer const>{}));
516  

516  

517  
            if(alignof(RS) > a) a = alignof(RS);
517  
            if(alignof(RS) > a) a = alignof(RS);
518  
            if(alignof(R) > a) a = alignof(R);
518  
            if(alignof(R) > a) a = alignof(R);
519  
        }
519  
        }
520  
        return a;
520  
        return a;
521  
    }
521  
    }
522  

522  

523  
    static consteval vtable
523  
    static consteval vtable
524  
    make_vtable() noexcept
524  
    make_vtable() noexcept
525  
    {
525  
    {
526  
        vtable v{};
526  
        vtable v{};
527  
        v.destroy = &do_destroy_impl;
527  
        v.destroy = &do_destroy_impl;
528  
        v.do_consume = &do_consume_impl;
528  
        v.do_consume = &do_consume_impl;
529  
        v.awaitable_size = compute_max_size();
529  
        v.awaitable_size = compute_max_size();
530  
        v.awaitable_align = compute_max_align();
530  
        v.awaitable_align = compute_max_align();
531  
        v.construct_awaitable = &construct_awaitable_impl;
531  
        v.construct_awaitable = &construct_awaitable_impl;
532  
        v.construct_read_some_awaitable = nullptr;
532  
        v.construct_read_some_awaitable = nullptr;
533  
        v.construct_read_awaitable = nullptr;
533  
        v.construct_read_awaitable = nullptr;
534  

534  

535  
        if constexpr (ReadSource<S>)
535  
        if constexpr (ReadSource<S>)
536  
        {
536  
        {
537  
            v.construct_read_some_awaitable =
537  
            v.construct_read_some_awaitable =
538  
                &construct_read_some_awaitable_impl;
538  
                &construct_read_some_awaitable_impl;
539  
            v.construct_read_awaitable =
539  
            v.construct_read_awaitable =
540  
                &construct_read_awaitable_impl;
540  
                &construct_read_awaitable_impl;
541  
        }
541  
        }
542  
        return v;
542  
        return v;
543  
    }
543  
    }
544  

544  

545  
    static constexpr vtable value = make_vtable();
545  
    static constexpr vtable value = make_vtable();
546  
};
546  
};
547  

547  

548  
//----------------------------------------------------------
548  
//----------------------------------------------------------
549  

549  

550  
inline
550  
inline
551  
any_buffer_source::~any_buffer_source()
551  
any_buffer_source::~any_buffer_source()
552  
{
552  
{
553  
    if(storage_)
553  
    if(storage_)
554  
    {
554  
    {
555  
        vt_->destroy(source_);
555  
        vt_->destroy(source_);
556  
        ::operator delete(storage_);
556  
        ::operator delete(storage_);
557  
    }
557  
    }
558  
    if(cached_awaitable_)
558  
    if(cached_awaitable_)
559  
        ::operator delete(cached_awaitable_);
559  
        ::operator delete(cached_awaitable_);
560  
}
560  
}
561  

561  

562  
inline any_buffer_source&
562  
inline any_buffer_source&
563  
any_buffer_source::operator=(any_buffer_source&& other) noexcept
563  
any_buffer_source::operator=(any_buffer_source&& other) noexcept
564  
{
564  
{
565  
    if(this != &other)
565  
    if(this != &other)
566  
    {
566  
    {
567  
        if(storage_)
567  
        if(storage_)
568  
        {
568  
        {
569  
            vt_->destroy(source_);
569  
            vt_->destroy(source_);
570  
            ::operator delete(storage_);
570  
            ::operator delete(storage_);
571  
        }
571  
        }
572  
        if(cached_awaitable_)
572  
        if(cached_awaitable_)
573  
            ::operator delete(cached_awaitable_);
573  
            ::operator delete(cached_awaitable_);
574  
        source_ = std::exchange(other.source_, nullptr);
574  
        source_ = std::exchange(other.source_, nullptr);
575  
        vt_ = std::exchange(other.vt_, nullptr);
575  
        vt_ = std::exchange(other.vt_, nullptr);
576  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
576  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
577  
        storage_ = std::exchange(other.storage_, nullptr);
577  
        storage_ = std::exchange(other.storage_, nullptr);
578  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
578  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
579  
        active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
579  
        active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
580  
    }
580  
    }
581  
    return *this;
581  
    return *this;
582  
}
582  
}
583  

583  

584  
template<BufferSource S>
584  
template<BufferSource S>
585  
    requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
585  
    requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
586  
any_buffer_source::any_buffer_source(S s)
586  
any_buffer_source::any_buffer_source(S s)
587  
    : vt_(&vtable_for_impl<S>::value)
587  
    : vt_(&vtable_for_impl<S>::value)
588  
{
588  
{
589  
    struct guard {
589  
    struct guard {
590  
        any_buffer_source* self;
590  
        any_buffer_source* self;
591  
        bool committed = false;
591  
        bool committed = false;
592  
        ~guard() {
592  
        ~guard() {
593  
            if(!committed && self->storage_) {
593  
            if(!committed && self->storage_) {
594  
                self->vt_->destroy(self->source_);
594  
                self->vt_->destroy(self->source_);
595  
                ::operator delete(self->storage_);
595  
                ::operator delete(self->storage_);
596  
                self->storage_ = nullptr;
596  
                self->storage_ = nullptr;
597  
                self->source_ = nullptr;
597  
                self->source_ = nullptr;
598  
            }
598  
            }
599  
        }
599  
        }
600  
    } g{this};
600  
    } g{this};
601  

601  

602  
    storage_ = ::operator new(sizeof(S));
602  
    storage_ = ::operator new(sizeof(S));
603  
    source_ = ::new(storage_) S(std::move(s));
603  
    source_ = ::new(storage_) S(std::move(s));
604  

604  

605  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
605  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
606  

606  

607  
    g.committed = true;
607  
    g.committed = true;
608  
}
608  
}
609  

609  

610  
template<BufferSource S>
610  
template<BufferSource S>
611  
any_buffer_source::any_buffer_source(S* s)
611  
any_buffer_source::any_buffer_source(S* s)
612  
    : source_(s)
612  
    : source_(s)
613  
    , vt_(&vtable_for_impl<S>::value)
613  
    , vt_(&vtable_for_impl<S>::value)
614  
{
614  
{
615  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
615  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
616  
}
616  
}
617  

617  

618  
//----------------------------------------------------------
618  
//----------------------------------------------------------
619  

619  

620  
inline void
620  
inline void
621  
any_buffer_source::consume(std::size_t n) noexcept
621  
any_buffer_source::consume(std::size_t n) noexcept
622  
{
622  
{
623  
    vt_->do_consume(source_, n);
623  
    vt_->do_consume(source_, n);
624  
}
624  
}
625  

625  

626  
inline auto
626  
inline auto
627  
any_buffer_source::pull(std::span<const_buffer> dest)
627  
any_buffer_source::pull(std::span<const_buffer> dest)
628  
{
628  
{
629  
    struct awaitable
629  
    struct awaitable
630  
    {
630  
    {
631  
        any_buffer_source* self_;
631  
        any_buffer_source* self_;
632  
        std::span<const_buffer> dest_;
632  
        std::span<const_buffer> dest_;
633  

633  

634  
        bool
634  
        bool
635  
        await_ready()
635  
        await_ready()
636  
        {
636  
        {
637  
            self_->active_ops_ = self_->vt_->construct_awaitable(
637  
            self_->active_ops_ = self_->vt_->construct_awaitable(
638  
                self_->source_,
638  
                self_->source_,
639  
                self_->cached_awaitable_,
639  
                self_->cached_awaitable_,
640  
                dest_);
640  
                dest_);
641  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
641  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
642  
        }
642  
        }
643  

643  

644  
        std::coroutine_handle<>
644  
        std::coroutine_handle<>
645  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
645  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
646  
        {
646  
        {
647  
            return self_->active_ops_->await_suspend(
647  
            return self_->active_ops_->await_suspend(
648  
                self_->cached_awaitable_, h, env);
648  
                self_->cached_awaitable_, h, env);
649  
        }
649  
        }
650  

650  

651  
        io_result<std::span<const_buffer>>
651  
        io_result<std::span<const_buffer>>
652  
        await_resume()
652  
        await_resume()
653  
        {
653  
        {
654  
            struct guard {
654  
            struct guard {
655  
                any_buffer_source* self;
655  
                any_buffer_source* self;
656  
                ~guard() {
656  
                ~guard() {
657  
                    self->active_ops_->destroy(self->cached_awaitable_);
657  
                    self->active_ops_->destroy(self->cached_awaitable_);
658  
                    self->active_ops_ = nullptr;
658  
                    self->active_ops_ = nullptr;
659  
                }
659  
                }
660  
            } g{self_};
660  
            } g{self_};
661  
            return self_->active_ops_->await_resume(
661  
            return self_->active_ops_->await_resume(
662  
                self_->cached_awaitable_);
662  
                self_->cached_awaitable_);
663  
        }
663  
        }
664  
    };
664  
    };
665  
    return awaitable{this, dest};
665  
    return awaitable{this, dest};
666  
}
666  
}
667  

667  

668  
//----------------------------------------------------------
668  
//----------------------------------------------------------
669  
// Private helpers for native ReadSource forwarding
669  
// Private helpers for native ReadSource forwarding
670  

670  

671  
inline auto
671  
inline auto
672  
any_buffer_source::read_some_(
672  
any_buffer_source::read_some_(
673  
    std::span<mutable_buffer const> buffers)
673  
    std::span<mutable_buffer const> buffers)
674  
{
674  
{
675  
    struct awaitable
675  
    struct awaitable
676  
    {
676  
    {
677  
        any_buffer_source* self_;
677  
        any_buffer_source* self_;
678  
        std::span<mutable_buffer const> buffers_;
678  
        std::span<mutable_buffer const> buffers_;
679  

679  

680  
        bool
680  
        bool
681  
        await_ready() const noexcept
681  
        await_ready() const noexcept
682  
        {
682  
        {
683  
            return false;
683  
            return false;
684  
        }
684  
        }
685  

685  

686  
        std::coroutine_handle<>
686  
        std::coroutine_handle<>
687  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
687  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
688  
        {
688  
        {
689  
            self_->active_read_ops_ =
689  
            self_->active_read_ops_ =
690  
                self_->vt_->construct_read_some_awaitable(
690  
                self_->vt_->construct_read_some_awaitable(
691  
                    self_->source_,
691  
                    self_->source_,
692  
                    self_->cached_awaitable_,
692  
                    self_->cached_awaitable_,
693  
                    buffers_);
693  
                    buffers_);
694  

694  

695  
            if(self_->active_read_ops_->await_ready(
695  
            if(self_->active_read_ops_->await_ready(
696  
                self_->cached_awaitable_))
696  
                self_->cached_awaitable_))
697  
                return h;
697  
                return h;
698  

698  

699  
            return self_->active_read_ops_->await_suspend(
699  
            return self_->active_read_ops_->await_suspend(
700  
                self_->cached_awaitable_, h, env);
700  
                self_->cached_awaitable_, h, env);
701  
        }
701  
        }
702  

702  

703  
        io_result<std::size_t>
703  
        io_result<std::size_t>
704  
        await_resume()
704  
        await_resume()
705  
        {
705  
        {
706  
            struct guard {
706  
            struct guard {
707  
                any_buffer_source* self;
707  
                any_buffer_source* self;
708  
                ~guard() {
708  
                ~guard() {
709  
                    self->active_read_ops_->destroy(
709  
                    self->active_read_ops_->destroy(
710  
                        self->cached_awaitable_);
710  
                        self->cached_awaitable_);
711  
                    self->active_read_ops_ = nullptr;
711  
                    self->active_read_ops_ = nullptr;
712  
                }
712  
                }
713  
            } g{self_};
713  
            } g{self_};
714  
            return self_->active_read_ops_->await_resume(
714  
            return self_->active_read_ops_->await_resume(
715  
                self_->cached_awaitable_);
715  
                self_->cached_awaitable_);
716  
        }
716  
        }
717  
    };
717  
    };
718  
    return awaitable{this, buffers};
718  
    return awaitable{this, buffers};
719  
}
719  
}
720  

720  

721  
inline auto
721  
inline auto
722  
any_buffer_source::read_(
722  
any_buffer_source::read_(
723  
    std::span<mutable_buffer const> buffers)
723  
    std::span<mutable_buffer const> buffers)
724  
{
724  
{
725  
    struct awaitable
725  
    struct awaitable
726  
    {
726  
    {
727  
        any_buffer_source* self_;
727  
        any_buffer_source* self_;
728  
        std::span<mutable_buffer const> buffers_;
728  
        std::span<mutable_buffer const> buffers_;
729  

729  

730  
        bool
730  
        bool
731  
        await_ready() const noexcept
731  
        await_ready() const noexcept
732  
        {
732  
        {
733  
            return false;
733  
            return false;
734  
        }
734  
        }
735  

735  

736  
        std::coroutine_handle<>
736  
        std::coroutine_handle<>
737  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
737  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
738  
        {
738  
        {
739  
            self_->active_read_ops_ =
739  
            self_->active_read_ops_ =
740  
                self_->vt_->construct_read_awaitable(
740  
                self_->vt_->construct_read_awaitable(
741  
                    self_->source_,
741  
                    self_->source_,
742  
                    self_->cached_awaitable_,
742  
                    self_->cached_awaitable_,
743  
                    buffers_);
743  
                    buffers_);
744  

744  

745  
            if(self_->active_read_ops_->await_ready(
745  
            if(self_->active_read_ops_->await_ready(
746  
                self_->cached_awaitable_))
746  
                self_->cached_awaitable_))
747  
                return h;
747  
                return h;
748  

748  

749  
            return self_->active_read_ops_->await_suspend(
749  
            return self_->active_read_ops_->await_suspend(
750  
                self_->cached_awaitable_, h, env);
750  
                self_->cached_awaitable_, h, env);
751  
        }
751  
        }
752  

752  

753  
        io_result<std::size_t>
753  
        io_result<std::size_t>
754  
        await_resume()
754  
        await_resume()
755  
        {
755  
        {
756  
            struct guard {
756  
            struct guard {
757  
                any_buffer_source* self;
757  
                any_buffer_source* self;
758  
                ~guard() {
758  
                ~guard() {
759  
                    self->active_read_ops_->destroy(
759  
                    self->active_read_ops_->destroy(
760  
                        self->cached_awaitable_);
760  
                        self->cached_awaitable_);
761  
                    self->active_read_ops_ = nullptr;
761  
                    self->active_read_ops_ = nullptr;
762  
                }
762  
                }
763  
            } g{self_};
763  
            } g{self_};
764  
            return self_->active_read_ops_->await_resume(
764  
            return self_->active_read_ops_->await_resume(
765  
                self_->cached_awaitable_);
765  
                self_->cached_awaitable_);
766  
        }
766  
        }
767  
    };
767  
    };
768  
    return awaitable{this, buffers};
768  
    return awaitable{this, buffers};
769  
}
769  
}
770  

770  

771  
//----------------------------------------------------------
771  
//----------------------------------------------------------
772  
// Public ReadSource methods
772  
// Public ReadSource methods
773  

773  

774  
template<MutableBufferSequence MB>
774  
template<MutableBufferSequence MB>
775  
io_task<std::size_t>
775  
io_task<std::size_t>
776  
any_buffer_source::read_some(MB buffers)
776  
any_buffer_source::read_some(MB buffers)
777  
{
777  
{
778  
    buffer_param<MB> bp(buffers);
778  
    buffer_param<MB> bp(buffers);
779  
    auto dest = bp.data();
779  
    auto dest = bp.data();
780  
    if(dest.empty())
780  
    if(dest.empty())
781  
        co_return {{}, 0};
781  
        co_return {{}, 0};
782  

782  

783  
    // Native ReadSource path
783  
    // Native ReadSource path
784  
    if(vt_->construct_read_some_awaitable)
784  
    if(vt_->construct_read_some_awaitable)
785  
        co_return co_await read_some_(dest);
785  
        co_return co_await read_some_(dest);
786  

786  

787  
    // Synthesized path: pull + buffer_copy + consume
787  
    // Synthesized path: pull + buffer_copy + consume
788  
    const_buffer arr[detail::max_iovec_];
788  
    const_buffer arr[detail::max_iovec_];
789  
    auto [ec, bufs] = co_await pull(arr);
789  
    auto [ec, bufs] = co_await pull(arr);
790  
    if(ec)
790  
    if(ec)
791  
        co_return {ec, 0};
791  
        co_return {ec, 0};
792  

792  

793  
    auto n = buffer_copy(dest, bufs);
793  
    auto n = buffer_copy(dest, bufs);
794  
    consume(n);
794  
    consume(n);
795  
    co_return {{}, n};
795  
    co_return {{}, n};
796  
}
796  
}
797  

797  

798  
template<MutableBufferSequence MB>
798  
template<MutableBufferSequence MB>
799  
io_task<std::size_t>
799  
io_task<std::size_t>
800  
any_buffer_source::read(MB buffers)
800  
any_buffer_source::read(MB buffers)
801  
{
801  
{
802  
    buffer_param<MB> bp(buffers);
802  
    buffer_param<MB> bp(buffers);
803  
    std::size_t total = 0;
803  
    std::size_t total = 0;
804  

804  

805  
    // Native ReadSource path
805  
    // Native ReadSource path
806  
    if(vt_->construct_read_awaitable)
806  
    if(vt_->construct_read_awaitable)
807  
    {
807  
    {
808  
        for(;;)
808  
        for(;;)
809  
        {
809  
        {
810  
            auto dest = bp.data();
810  
            auto dest = bp.data();
811  
            if(dest.empty())
811  
            if(dest.empty())
812  
                break;
812  
                break;
813  

813  

814  
            auto [ec, n] = co_await read_(dest);
814  
            auto [ec, n] = co_await read_(dest);
815  
            total += n;
815  
            total += n;
816  
            if(ec)
816  
            if(ec)
817  
                co_return {ec, total};
817  
                co_return {ec, total};
818  
            bp.consume(n);
818  
            bp.consume(n);
819  
        }
819  
        }
820  
        co_return {{}, total};
820  
        co_return {{}, total};
821  
    }
821  
    }
822  

822  

823  
    // Synthesized path: pull + buffer_copy + consume
823  
    // Synthesized path: pull + buffer_copy + consume
824  
    for(;;)
824  
    for(;;)
825  
    {
825  
    {
826  
        auto dest = bp.data();
826  
        auto dest = bp.data();
827  
        if(dest.empty())
827  
        if(dest.empty())
828  
            break;
828  
            break;
829  

829  

830  
        const_buffer arr[detail::max_iovec_];
830  
        const_buffer arr[detail::max_iovec_];
831  
        auto [ec, bufs] = co_await pull(arr);
831  
        auto [ec, bufs] = co_await pull(arr);
832  

832  

833  
        if(ec)
833  
        if(ec)
834  
            co_return {ec, total};
834  
            co_return {ec, total};
835  

835  

836  
        auto n = buffer_copy(dest, bufs);
836  
        auto n = buffer_copy(dest, bufs);
837  
        consume(n);
837  
        consume(n);
838  
        total += n;
838  
        total += n;
839  
        bp.consume(n);
839  
        bp.consume(n);
840  
    }
840  
    }
841  

841  

842  
    co_return {{}, total};
842  
    co_return {{}, total};
843  
}
843  
}
844  

844  

845  
//----------------------------------------------------------
845  
//----------------------------------------------------------
846  

846  

847  
static_assert(BufferSource<any_buffer_source>);
847  
static_assert(BufferSource<any_buffer_source>);
848  
static_assert(ReadSource<any_buffer_source>);
848  
static_assert(ReadSource<any_buffer_source>);
849  

849  

850  
} // namespace capy
850  
} // namespace capy
851  
} // namespace boost
851  
} // namespace boost
852  

852  

853  
#endif
853  
#endif