1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/write_sink.hpp>
19  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <coroutine>
20  
#include <coroutine>
21  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
22  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_task.hpp>
23  
#include <boost/capy/io_task.hpp>
24  

24  

25  
#include <concepts>
25  
#include <concepts>
26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <cstddef>
27  
#include <cstddef>
28  
#include <exception>
28  
#include <exception>
29  
#include <new>
29  
#include <new>
30  
#include <span>
30  
#include <span>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <system_error>
32  
#include <system_error>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
namespace boost {
35  
namespace boost {
36  
namespace capy {
36  
namespace capy {
37  

37  

38  
/** Type-erased wrapper for any WriteSink.
38  
/** Type-erased wrapper for any WriteSink.
39  

39  

40  
    This class provides type erasure for any type satisfying the
40  
    This class provides type erasure for any type satisfying the
41  
    @ref WriteSink concept, enabling runtime polymorphism for
41  
    @ref WriteSink concept, enabling runtime polymorphism for
42  
    sink write operations. It uses cached awaitable storage to achieve
42  
    sink write operations. It uses cached awaitable storage to achieve
43  
    zero steady-state allocation after construction.
43  
    zero steady-state allocation after construction.
44  

44  

45  
    The wrapper supports two construction modes:
45  
    The wrapper supports two construction modes:
46  
    - **Owning**: Pass by value to transfer ownership. The wrapper
46  
    - **Owning**: Pass by value to transfer ownership. The wrapper
47  
      allocates storage and owns the sink.
47  
      allocates storage and owns the sink.
48  
    - **Reference**: Pass a pointer to wrap without ownership. The
48  
    - **Reference**: Pass a pointer to wrap without ownership. The
49  
      pointed-to sink must outlive this wrapper.
49  
      pointed-to sink must outlive this wrapper.
50  

50  

51  
    @par Awaitable Preallocation
51  
    @par Awaitable Preallocation
52  
    The constructor preallocates storage for the type-erased awaitable.
52  
    The constructor preallocates storage for the type-erased awaitable.
53  
    This reserves all virtual address space at server startup
53  
    This reserves all virtual address space at server startup
54  
    so memory usage can be measured up front, rather than
54  
    so memory usage can be measured up front, rather than
55  
    allocating piecemeal as traffic arrives.
55  
    allocating piecemeal as traffic arrives.
56  

56  

57  
    @par Immediate Completion
57  
    @par Immediate Completion
58  
    Operations complete immediately without suspending when the
58  
    Operations complete immediately without suspending when the
59  
    buffer sequence is empty, or when the underlying sink's
59  
    buffer sequence is empty, or when the underlying sink's
60  
    awaitable reports readiness via `await_ready`.
60  
    awaitable reports readiness via `await_ready`.
61  

61  

62  
    @par Thread Safety
62  
    @par Thread Safety
63  
    Not thread-safe. Concurrent operations on the same wrapper
63  
    Not thread-safe. Concurrent operations on the same wrapper
64  
    are undefined behavior.
64  
    are undefined behavior.
65  

65  

66  
    @par Example
66  
    @par Example
67  
    @code
67  
    @code
68  
    // Owning - takes ownership of the sink
68  
    // Owning - takes ownership of the sink
69  
    any_write_sink ws(some_sink{args...});
69  
    any_write_sink ws(some_sink{args...});
70  

70  

71  
    // Reference - wraps without ownership
71  
    // Reference - wraps without ownership
72  
    some_sink sink;
72  
    some_sink sink;
73  
    any_write_sink ws(&sink);
73  
    any_write_sink ws(&sink);
74  

74  

75  
    const_buffer buf(data, size);
75  
    const_buffer buf(data, size);
76  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
76  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77  
    auto [ec2] = co_await ws.write_eof();
77  
    auto [ec2] = co_await ws.write_eof();
78  
    @endcode
78  
    @endcode
79  

79  

80  
    @see any_write_stream, WriteSink
80  
    @see any_write_stream, WriteSink
81  
*/
81  
*/
82  
class any_write_sink
82  
class any_write_sink
83  
{
83  
{
84  
    struct vtable;
84  
    struct vtable;
85  
    struct write_awaitable_ops;
85  
    struct write_awaitable_ops;
86  
    struct eof_awaitable_ops;
86  
    struct eof_awaitable_ops;
87  

87  

88  
    template<WriteSink S>
88  
    template<WriteSink S>
89  
    struct vtable_for_impl;
89  
    struct vtable_for_impl;
90  

90  

91  
    void* sink_ = nullptr;
91  
    void* sink_ = nullptr;
92  
    vtable const* vt_ = nullptr;
92  
    vtable const* vt_ = nullptr;
93  
    void* cached_awaitable_ = nullptr;
93  
    void* cached_awaitable_ = nullptr;
94  
    void* storage_ = nullptr;
94  
    void* storage_ = nullptr;
95  
    write_awaitable_ops const* active_write_ops_ = nullptr;
95  
    write_awaitable_ops const* active_write_ops_ = nullptr;
96  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
96  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
97  

97  

98  
public:
98  
public:
99  
    /** Destructor.
99  
    /** Destructor.
100  

100  

101  
        Destroys the owned sink (if any) and releases the cached
101  
        Destroys the owned sink (if any) and releases the cached
102  
        awaitable storage.
102  
        awaitable storage.
103  
    */
103  
    */
104  
    ~any_write_sink();
104  
    ~any_write_sink();
105  

105  

106  
    /** Default constructor.
106  
    /** Default constructor.
107  

107  

108  
        Constructs an empty wrapper. Operations on a default-constructed
108  
        Constructs an empty wrapper. Operations on a default-constructed
109  
        wrapper result in undefined behavior.
109  
        wrapper result in undefined behavior.
110  
    */
110  
    */
111  
    any_write_sink() = default;
111  
    any_write_sink() = default;
112  

112  

113  
    /** Non-copyable.
113  
    /** Non-copyable.
114  

114  

115  
        The awaitable cache is per-instance and cannot be shared.
115  
        The awaitable cache is per-instance and cannot be shared.
116  
    */
116  
    */
117  
    any_write_sink(any_write_sink const&) = delete;
117  
    any_write_sink(any_write_sink const&) = delete;
118  
    any_write_sink& operator=(any_write_sink const&) = delete;
118  
    any_write_sink& operator=(any_write_sink const&) = delete;
119  

119  

120  
    /** Move constructor.
120  
    /** Move constructor.
121  

121  

122  
        Transfers ownership of the wrapped sink (if owned) and
122  
        Transfers ownership of the wrapped sink (if owned) and
123  
        cached awaitable storage from `other`. After the move, `other` is
123  
        cached awaitable storage from `other`. After the move, `other` is
124  
        in a default-constructed state.
124  
        in a default-constructed state.
125  

125  

126  
        @param other The wrapper to move from.
126  
        @param other The wrapper to move from.
127  
    */
127  
    */
128  
    any_write_sink(any_write_sink&& other) noexcept
128  
    any_write_sink(any_write_sink&& other) noexcept
129  
        : sink_(std::exchange(other.sink_, nullptr))
129  
        : sink_(std::exchange(other.sink_, nullptr))
130  
        , vt_(std::exchange(other.vt_, nullptr))
130  
        , vt_(std::exchange(other.vt_, nullptr))
131  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
131  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132  
        , storage_(std::exchange(other.storage_, nullptr))
132  
        , storage_(std::exchange(other.storage_, nullptr))
133  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
133  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
134  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135  
    {
135  
    {
136  
    }
136  
    }
137  

137  

138  
    /** Move assignment operator.
138  
    /** Move assignment operator.
139  

139  

140  
        Destroys any owned sink and releases existing resources,
140  
        Destroys any owned sink and releases existing resources,
141  
        then transfers ownership from `other`.
141  
        then transfers ownership from `other`.
142  

142  

143  
        @param other The wrapper to move from.
143  
        @param other The wrapper to move from.
144  
        @return Reference to this wrapper.
144  
        @return Reference to this wrapper.
145  
    */
145  
    */
146  
    any_write_sink&
146  
    any_write_sink&
147  
    operator=(any_write_sink&& other) noexcept;
147  
    operator=(any_write_sink&& other) noexcept;
148  

148  

149  
    /** Construct by taking ownership of a WriteSink.
149  
    /** Construct by taking ownership of a WriteSink.
150  

150  

151  
        Allocates storage and moves the sink into this wrapper.
151  
        Allocates storage and moves the sink into this wrapper.
152  
        The wrapper owns the sink and will destroy it.
152  
        The wrapper owns the sink and will destroy it.
153  

153  

154  
        @param s The sink to take ownership of.
154  
        @param s The sink to take ownership of.
155  
    */
155  
    */
156  
    template<WriteSink S>
156  
    template<WriteSink S>
157  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
157  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158  
    any_write_sink(S s);
158  
    any_write_sink(S s);
159  

159  

160  
    /** Construct by wrapping a WriteSink without ownership.
160  
    /** Construct by wrapping a WriteSink without ownership.
161  

161  

162  
        Wraps the given sink by pointer. The sink must remain
162  
        Wraps the given sink by pointer. The sink must remain
163  
        valid for the lifetime of this wrapper.
163  
        valid for the lifetime of this wrapper.
164  

164  

165  
        @param s Pointer to the sink to wrap.
165  
        @param s Pointer to the sink to wrap.
166  
    */
166  
    */
167  
    template<WriteSink S>
167  
    template<WriteSink S>
168  
    any_write_sink(S* s);
168  
    any_write_sink(S* s);
169  

169  

170  
    /** Check if the wrapper contains a valid sink.
170  
    /** Check if the wrapper contains a valid sink.
171  

171  

172  
        @return `true` if wrapping a sink, `false` if default-constructed
172  
        @return `true` if wrapping a sink, `false` if default-constructed
173  
            or moved-from.
173  
            or moved-from.
174  
    */
174  
    */
175  
    bool
175  
    bool
176  
    has_value() const noexcept
176  
    has_value() const noexcept
177  
    {
177  
    {
178  
        return sink_ != nullptr;
178  
        return sink_ != nullptr;
179  
    }
179  
    }
180  

180  

181  
    /** Check if the wrapper contains a valid sink.
181  
    /** Check if the wrapper contains a valid sink.
182  

182  

183  
        @return `true` if wrapping a sink, `false` if default-constructed
183  
        @return `true` if wrapping a sink, `false` if default-constructed
184  
            or moved-from.
184  
            or moved-from.
185  
    */
185  
    */
186  
    explicit
186  
    explicit
187  
    operator bool() const noexcept
187  
    operator bool() const noexcept
188  
    {
188  
    {
189  
        return has_value();
189  
        return has_value();
190  
    }
190  
    }
191  

191  

192  
    /** Initiate a partial write operation.
192  
    /** Initiate a partial write operation.
193  

193  

194  
        Writes one or more bytes from the provided buffer sequence.
194  
        Writes one or more bytes from the provided buffer sequence.
195  
        May consume less than the full sequence.
195  
        May consume less than the full sequence.
196  

196  

197  
        @param buffers The buffer sequence containing data to write.
197  
        @param buffers The buffer sequence containing data to write.
198  

198  

199  
        @return An awaitable yielding `(error_code,std::size_t)`.
199  
        @return An awaitable yielding `(error_code,std::size_t)`.
200  

200  

201  
        @par Immediate Completion
201  
        @par Immediate Completion
202  
        The operation completes immediately without suspending
202  
        The operation completes immediately without suspending
203  
        the calling coroutine when:
203  
        the calling coroutine when:
204  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
204  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
205  
        @li The underlying sink's awaitable reports immediate
205  
        @li The underlying sink's awaitable reports immediate
206  
            readiness via `await_ready`.
206  
            readiness via `await_ready`.
207  

207  

208  
        @note This is a partial operation and may not process the
208  
        @note This is a partial operation and may not process the
209  
        entire buffer sequence. Use @ref write for guaranteed
209  
        entire buffer sequence. Use @ref write for guaranteed
210  
        complete transfer.
210  
        complete transfer.
211  

211  

212  
        @par Preconditions
212  
        @par Preconditions
213  
        The wrapper must contain a valid sink (`has_value() == true`).
213  
        The wrapper must contain a valid sink (`has_value() == true`).
214  
    */
214  
    */
215  
    template<ConstBufferSequence CB>
215  
    template<ConstBufferSequence CB>
216  
    auto
216  
    auto
217  
    write_some(CB buffers);
217  
    write_some(CB buffers);
218  

218  

219  
    /** Initiate a complete write operation.
219  
    /** Initiate a complete write operation.
220  

220  

221  
        Writes data from the provided buffer sequence. The operation
221  
        Writes data from the provided buffer sequence. The operation
222  
        completes when all bytes have been consumed, or an error
222  
        completes when all bytes have been consumed, or an error
223  
        occurs. Forwards to the underlying sink's `write` operation,
223  
        occurs. Forwards to the underlying sink's `write` operation,
224  
        windowed through @ref buffer_param when the sequence exceeds
224  
        windowed through @ref buffer_param when the sequence exceeds
225  
        the per-call buffer limit.
225  
        the per-call buffer limit.
226  

226  

227  
        @param buffers The buffer sequence containing data to write.
227  
        @param buffers The buffer sequence containing data to write.
228  

228  

229  
        @return An awaitable yielding `(error_code,std::size_t)`.
229  
        @return An awaitable yielding `(error_code,std::size_t)`.
230  

230  

231  
        @par Immediate Completion
231  
        @par Immediate Completion
232  
        The operation completes immediately without suspending
232  
        The operation completes immediately without suspending
233  
        the calling coroutine when:
233  
        the calling coroutine when:
234  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
234  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
235  
        @li Every underlying `write` call completes
235  
        @li Every underlying `write` call completes
236  
            immediately (the wrapped sink reports readiness
236  
            immediately (the wrapped sink reports readiness
237  
            via `await_ready` on each iteration).
237  
            via `await_ready` on each iteration).
238  

238  

239  
        @par Preconditions
239  
        @par Preconditions
240  
        The wrapper must contain a valid sink (`has_value() == true`).
240  
        The wrapper must contain a valid sink (`has_value() == true`).
241  
    */
241  
    */
242  
    template<ConstBufferSequence CB>
242  
    template<ConstBufferSequence CB>
243  
    io_task<std::size_t>
243  
    io_task<std::size_t>
244  
    write(CB buffers);
244  
    write(CB buffers);
245  

245  

246  
    /** Atomically write data and signal end-of-stream.
246  
    /** Atomically write data and signal end-of-stream.
247  

247  

248  
        Writes all data from the buffer sequence and then signals
248  
        Writes all data from the buffer sequence and then signals
249  
        end-of-stream. The implementation decides how to partition
249  
        end-of-stream. The implementation decides how to partition
250  
        the data across calls to the underlying sink's @ref write
250  
        the data across calls to the underlying sink's @ref write
251  
        and `write_eof`. When the caller's buffer sequence is
251  
        and `write_eof`. When the caller's buffer sequence is
252  
        non-empty, the final call to the underlying sink is always
252  
        non-empty, the final call to the underlying sink is always
253  
        `write_eof` with a non-empty buffer sequence. When the
253  
        `write_eof` with a non-empty buffer sequence. When the
254  
        caller's buffer sequence is empty, only `write_eof()` with
254  
        caller's buffer sequence is empty, only `write_eof()` with
255  
        no data is called.
255  
        no data is called.
256  

256  

257  
        @param buffers The buffer sequence containing data to write.
257  
        @param buffers The buffer sequence containing data to write.
258  

258  

259  
        @return An awaitable yielding `(error_code,std::size_t)`.
259  
        @return An awaitable yielding `(error_code,std::size_t)`.
260  

260  

261  
        @par Immediate Completion
261  
        @par Immediate Completion
262  
        The operation completes immediately without suspending
262  
        The operation completes immediately without suspending
263  
        the calling coroutine when:
263  
        the calling coroutine when:
264  
        @li The buffer sequence is empty. Only the @ref write_eof()
264  
        @li The buffer sequence is empty. Only the @ref write_eof()
265  
            call is performed.
265  
            call is performed.
266  
        @li All underlying operations complete immediately (the
266  
        @li All underlying operations complete immediately (the
267  
            wrapped sink reports readiness via `await_ready`).
267  
            wrapped sink reports readiness via `await_ready`).
268  

268  

269  
        @par Preconditions
269  
        @par Preconditions
270  
        The wrapper must contain a valid sink (`has_value() == true`).
270  
        The wrapper must contain a valid sink (`has_value() == true`).
271  
    */
271  
    */
272  
    template<ConstBufferSequence CB>
272  
    template<ConstBufferSequence CB>
273  
    io_task<std::size_t>
273  
    io_task<std::size_t>
274  
    write_eof(CB buffers);
274  
    write_eof(CB buffers);
275  

275  

276  
    /** Signal end of data.
276  
    /** Signal end of data.
277  

277  

278  
        Indicates that no more data will be written to the sink.
278  
        Indicates that no more data will be written to the sink.
279  
        The operation completes when the sink is finalized, or
279  
        The operation completes when the sink is finalized, or
280  
        an error occurs.
280  
        an error occurs.
281  

281  

282  
        @return An awaitable yielding `(error_code)`.
282  
        @return An awaitable yielding `(error_code)`.
283  

283  

284  
        @par Immediate Completion
284  
        @par Immediate Completion
285  
        The operation completes immediately without suspending
285  
        The operation completes immediately without suspending
286  
        the calling coroutine when the underlying sink's awaitable
286  
        the calling coroutine when the underlying sink's awaitable
287  
        reports immediate readiness via `await_ready`.
287  
        reports immediate readiness via `await_ready`.
288  

288  

289  
        @par Preconditions
289  
        @par Preconditions
290  
        The wrapper must contain a valid sink (`has_value() == true`).
290  
        The wrapper must contain a valid sink (`has_value() == true`).
291  
    */
291  
    */
292  
    auto
292  
    auto
293  
    write_eof();
293  
    write_eof();
294  

294  

295  
protected:
295  
protected:
296  
    /** Rebind to a new sink after move.
296  
    /** Rebind to a new sink after move.
297  

297  

298  
        Updates the internal pointer to reference a new sink object.
298  
        Updates the internal pointer to reference a new sink object.
299  
        Used by owning wrappers after move assignment when the owned
299  
        Used by owning wrappers after move assignment when the owned
300  
        object has moved to a new location.
300  
        object has moved to a new location.
301  

301  

302  
        @param new_sink The new sink to bind to. Must be the same
302  
        @param new_sink The new sink to bind to. Must be the same
303  
            type as the original sink.
303  
            type as the original sink.
304  

304  

305  
        @note Terminates if called with a sink of different type
305  
        @note Terminates if called with a sink of different type
306  
            than the original.
306  
            than the original.
307  
    */
307  
    */
308  
    template<WriteSink S>
308  
    template<WriteSink S>
309  
    void
309  
    void
310  
    rebind(S& new_sink) noexcept
310  
    rebind(S& new_sink) noexcept
311  
    {
311  
    {
312  
        if(vt_ != &vtable_for_impl<S>::value)
312  
        if(vt_ != &vtable_for_impl<S>::value)
313  
            std::terminate();
313  
            std::terminate();
314  
        sink_ = &new_sink;
314  
        sink_ = &new_sink;
315  
    }
315  
    }
316  

316  

317  
private:
317  
private:
318  
    auto
318  
    auto
319  
    write_some_(std::span<const_buffer const> buffers);
319  
    write_some_(std::span<const_buffer const> buffers);
320  

320  

321  
    auto
321  
    auto
322  
    write_(std::span<const_buffer const> buffers);
322  
    write_(std::span<const_buffer const> buffers);
323  

323  

324  
    auto
324  
    auto
325  
    write_eof_buffers_(std::span<const_buffer const> buffers);
325  
    write_eof_buffers_(std::span<const_buffer const> buffers);
326  
};
326  
};
327  

327  

328  
//----------------------------------------------------------
328  
//----------------------------------------------------------
329  

329  

330  
struct any_write_sink::write_awaitable_ops
330  
struct any_write_sink::write_awaitable_ops
331  
{
331  
{
332  
    bool (*await_ready)(void*);
332  
    bool (*await_ready)(void*);
333  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
333  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
334  
    io_result<std::size_t> (*await_resume)(void*);
334  
    io_result<std::size_t> (*await_resume)(void*);
335  
    void (*destroy)(void*) noexcept;
335  
    void (*destroy)(void*) noexcept;
336  
};
336  
};
337  

337  

338  
struct any_write_sink::eof_awaitable_ops
338  
struct any_write_sink::eof_awaitable_ops
339  
{
339  
{
340  
    bool (*await_ready)(void*);
340  
    bool (*await_ready)(void*);
341  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
341  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
342  
    io_result<> (*await_resume)(void*);
342  
    io_result<> (*await_resume)(void*);
343  
    void (*destroy)(void*) noexcept;
343  
    void (*destroy)(void*) noexcept;
344  
};
344  
};
345  

345  

346  
struct any_write_sink::vtable
346  
struct any_write_sink::vtable
347  
{
347  
{
348  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
348  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
349  
        void* sink,
349  
        void* sink,
350  
        void* storage,
350  
        void* storage,
351  
        std::span<const_buffer const> buffers);
351  
        std::span<const_buffer const> buffers);
352  
    write_awaitable_ops const* (*construct_write_awaitable)(
352  
    write_awaitable_ops const* (*construct_write_awaitable)(
353  
        void* sink,
353  
        void* sink,
354  
        void* storage,
354  
        void* storage,
355  
        std::span<const_buffer const> buffers);
355  
        std::span<const_buffer const> buffers);
356  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
356  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
357  
        void* sink,
357  
        void* sink,
358  
        void* storage,
358  
        void* storage,
359  
        std::span<const_buffer const> buffers);
359  
        std::span<const_buffer const> buffers);
360  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
360  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
361  
        void* sink,
361  
        void* sink,
362  
        void* storage);
362  
        void* storage);
363  
    std::size_t awaitable_size;
363  
    std::size_t awaitable_size;
364  
    std::size_t awaitable_align;
364  
    std::size_t awaitable_align;
365  
    void (*destroy)(void*) noexcept;
365  
    void (*destroy)(void*) noexcept;
366  
};
366  
};
367  

367  

368  
template<WriteSink S>
368  
template<WriteSink S>
369  
struct any_write_sink::vtable_for_impl
369  
struct any_write_sink::vtable_for_impl
370  
{
370  
{
371  
    using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
371  
    using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
372  
        std::span<const_buffer const>{}));
372  
        std::span<const_buffer const>{}));
373  
    using WriteAwaitable = decltype(std::declval<S&>().write(
373  
    using WriteAwaitable = decltype(std::declval<S&>().write(
374  
        std::span<const_buffer const>{}));
374  
        std::span<const_buffer const>{}));
375  
    using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
375  
    using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
376  
        std::span<const_buffer const>{}));
376  
        std::span<const_buffer const>{}));
377  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
377  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
378  

378  

379  
    static void
379  
    static void
380  
    do_destroy_impl(void* sink) noexcept
380  
    do_destroy_impl(void* sink) noexcept
381  
    {
381  
    {
382  
        static_cast<S*>(sink)->~S();
382  
        static_cast<S*>(sink)->~S();
383  
    }
383  
    }
384  

384  

385  
    static write_awaitable_ops const*
385  
    static write_awaitable_ops const*
386  
    construct_write_some_awaitable_impl(
386  
    construct_write_some_awaitable_impl(
387  
        void* sink,
387  
        void* sink,
388  
        void* storage,
388  
        void* storage,
389  
        std::span<const_buffer const> buffers)
389  
        std::span<const_buffer const> buffers)
390  
    {
390  
    {
391  
        auto& s = *static_cast<S*>(sink);
391  
        auto& s = *static_cast<S*>(sink);
392  
        ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
392  
        ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
393  

393  

394  
        static constexpr write_awaitable_ops ops = {
394  
        static constexpr write_awaitable_ops ops = {
395  
            +[](void* p) {
395  
            +[](void* p) {
396  
                return static_cast<WriteSomeAwaitable*>(p)->await_ready();
396  
                return static_cast<WriteSomeAwaitable*>(p)->await_ready();
397  
            },
397  
            },
398  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
398  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
399  
                return detail::call_await_suspend(
399  
                return detail::call_await_suspend(
400  
                    static_cast<WriteSomeAwaitable*>(p), h, env);
400  
                    static_cast<WriteSomeAwaitable*>(p), h, env);
401  
            },
401  
            },
402  
            +[](void* p) {
402  
            +[](void* p) {
403  
                return static_cast<WriteSomeAwaitable*>(p)->await_resume();
403  
                return static_cast<WriteSomeAwaitable*>(p)->await_resume();
404  
            },
404  
            },
405  
            +[](void* p) noexcept {
405  
            +[](void* p) noexcept {
406  
                static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
406  
                static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
407  
            }
407  
            }
408  
        };
408  
        };
409  
        return &ops;
409  
        return &ops;
410  
    }
410  
    }
411  

411  

412  
    static write_awaitable_ops const*
412  
    static write_awaitable_ops const*
413  
    construct_write_awaitable_impl(
413  
    construct_write_awaitable_impl(
414  
        void* sink,
414  
        void* sink,
415  
        void* storage,
415  
        void* storage,
416  
        std::span<const_buffer const> buffers)
416  
        std::span<const_buffer const> buffers)
417  
    {
417  
    {
418  
        auto& s = *static_cast<S*>(sink);
418  
        auto& s = *static_cast<S*>(sink);
419  
        ::new(storage) WriteAwaitable(s.write(buffers));
419  
        ::new(storage) WriteAwaitable(s.write(buffers));
420  

420  

421  
        static constexpr write_awaitable_ops ops = {
421  
        static constexpr write_awaitable_ops ops = {
422  
            +[](void* p) {
422  
            +[](void* p) {
423  
                return static_cast<WriteAwaitable*>(p)->await_ready();
423  
                return static_cast<WriteAwaitable*>(p)->await_ready();
424  
            },
424  
            },
425  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
425  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
426  
                return detail::call_await_suspend(
426  
                return detail::call_await_suspend(
427  
                    static_cast<WriteAwaitable*>(p), h, env);
427  
                    static_cast<WriteAwaitable*>(p), h, env);
428  
            },
428  
            },
429  
            +[](void* p) {
429  
            +[](void* p) {
430  
                return static_cast<WriteAwaitable*>(p)->await_resume();
430  
                return static_cast<WriteAwaitable*>(p)->await_resume();
431  
            },
431  
            },
432  
            +[](void* p) noexcept {
432  
            +[](void* p) noexcept {
433  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
433  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
434  
            }
434  
            }
435  
        };
435  
        };
436  
        return &ops;
436  
        return &ops;
437  
    }
437  
    }
438  

438  

439  
    static write_awaitable_ops const*
439  
    static write_awaitable_ops const*
440  
    construct_write_eof_buffers_awaitable_impl(
440  
    construct_write_eof_buffers_awaitable_impl(
441  
        void* sink,
441  
        void* sink,
442  
        void* storage,
442  
        void* storage,
443  
        std::span<const_buffer const> buffers)
443  
        std::span<const_buffer const> buffers)
444  
    {
444  
    {
445  
        auto& s = *static_cast<S*>(sink);
445  
        auto& s = *static_cast<S*>(sink);
446  
        ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
446  
        ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
447  

447  

448  
        static constexpr write_awaitable_ops ops = {
448  
        static constexpr write_awaitable_ops ops = {
449  
            +[](void* p) {
449  
            +[](void* p) {
450  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
450  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
451  
            },
451  
            },
452  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
452  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
453  
                return detail::call_await_suspend(
453  
                return detail::call_await_suspend(
454  
                    static_cast<WriteEofBuffersAwaitable*>(p), h, env);
454  
                    static_cast<WriteEofBuffersAwaitable*>(p), h, env);
455  
            },
455  
            },
456  
            +[](void* p) {
456  
            +[](void* p) {
457  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
457  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
458  
            },
458  
            },
459  
            +[](void* p) noexcept {
459  
            +[](void* p) noexcept {
460  
                static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
460  
                static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
461  
            }
461  
            }
462  
        };
462  
        };
463  
        return &ops;
463  
        return &ops;
464  
    }
464  
    }
465  

465  

466  
    static eof_awaitable_ops const*
466  
    static eof_awaitable_ops const*
467  
    construct_eof_awaitable_impl(
467  
    construct_eof_awaitable_impl(
468  
        void* sink,
468  
        void* sink,
469  
        void* storage)
469  
        void* storage)
470  
    {
470  
    {
471  
        auto& s = *static_cast<S*>(sink);
471  
        auto& s = *static_cast<S*>(sink);
472  
        ::new(storage) EofAwaitable(s.write_eof());
472  
        ::new(storage) EofAwaitable(s.write_eof());
473  

473  

474  
        static constexpr eof_awaitable_ops ops = {
474  
        static constexpr eof_awaitable_ops ops = {
475  
            +[](void* p) {
475  
            +[](void* p) {
476  
                return static_cast<EofAwaitable*>(p)->await_ready();
476  
                return static_cast<EofAwaitable*>(p)->await_ready();
477  
            },
477  
            },
478  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
478  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
479  
                return detail::call_await_suspend(
479  
                return detail::call_await_suspend(
480  
                    static_cast<EofAwaitable*>(p), h, env);
480  
                    static_cast<EofAwaitable*>(p), h, env);
481  
            },
481  
            },
482  
            +[](void* p) {
482  
            +[](void* p) {
483  
                return static_cast<EofAwaitable*>(p)->await_resume();
483  
                return static_cast<EofAwaitable*>(p)->await_resume();
484  
            },
484  
            },
485  
            +[](void* p) noexcept {
485  
            +[](void* p) noexcept {
486  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
486  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
487  
            }
487  
            }
488  
        };
488  
        };
489  
        return &ops;
489  
        return &ops;
490  
    }
490  
    }
491  

491  

492  
    static constexpr std::size_t max4(
492  
    static constexpr std::size_t max4(
493  
        std::size_t a, std::size_t b,
493  
        std::size_t a, std::size_t b,
494  
        std::size_t c, std::size_t d) noexcept
494  
        std::size_t c, std::size_t d) noexcept
495  
    {
495  
    {
496  
        std::size_t ab = a > b ? a : b;
496  
        std::size_t ab = a > b ? a : b;
497  
        std::size_t cd = c > d ? c : d;
497  
        std::size_t cd = c > d ? c : d;
498  
        return ab > cd ? ab : cd;
498  
        return ab > cd ? ab : cd;
499  
    }
499  
    }
500  

500  

501  
    static constexpr std::size_t max_awaitable_size =
501  
    static constexpr std::size_t max_awaitable_size =
502  
        max4(sizeof(WriteSomeAwaitable),
502  
        max4(sizeof(WriteSomeAwaitable),
503  
             sizeof(WriteAwaitable),
503  
             sizeof(WriteAwaitable),
504  
             sizeof(WriteEofBuffersAwaitable),
504  
             sizeof(WriteEofBuffersAwaitable),
505  
             sizeof(EofAwaitable));
505  
             sizeof(EofAwaitable));
506  

506  

507  
    static constexpr std::size_t max_awaitable_align =
507  
    static constexpr std::size_t max_awaitable_align =
508  
        max4(alignof(WriteSomeAwaitable),
508  
        max4(alignof(WriteSomeAwaitable),
509  
             alignof(WriteAwaitable),
509  
             alignof(WriteAwaitable),
510  
             alignof(WriteEofBuffersAwaitable),
510  
             alignof(WriteEofBuffersAwaitable),
511  
             alignof(EofAwaitable));
511  
             alignof(EofAwaitable));
512  

512  

513  
    static constexpr vtable value = {
513  
    static constexpr vtable value = {
514  
        &construct_write_some_awaitable_impl,
514  
        &construct_write_some_awaitable_impl,
515  
        &construct_write_awaitable_impl,
515  
        &construct_write_awaitable_impl,
516  
        &construct_write_eof_buffers_awaitable_impl,
516  
        &construct_write_eof_buffers_awaitable_impl,
517  
        &construct_eof_awaitable_impl,
517  
        &construct_eof_awaitable_impl,
518  
        max_awaitable_size,
518  
        max_awaitable_size,
519  
        max_awaitable_align,
519  
        max_awaitable_align,
520  
        &do_destroy_impl
520  
        &do_destroy_impl
521  
    };
521  
    };
522  
};
522  
};
523  

523  

524  
//----------------------------------------------------------
524  
//----------------------------------------------------------
525  

525  

526  
inline
526  
inline
527  
any_write_sink::~any_write_sink()
527  
any_write_sink::~any_write_sink()
528  
{
528  
{
529  
    if(storage_)
529  
    if(storage_)
530  
    {
530  
    {
531  
        vt_->destroy(sink_);
531  
        vt_->destroy(sink_);
532  
        ::operator delete(storage_);
532  
        ::operator delete(storage_);
533  
    }
533  
    }
534  
    if(cached_awaitable_)
534  
    if(cached_awaitable_)
535  
    {
535  
    {
536  
        if(active_write_ops_)
536  
        if(active_write_ops_)
537  
            active_write_ops_->destroy(cached_awaitable_);
537  
            active_write_ops_->destroy(cached_awaitable_);
538  
        else if(active_eof_ops_)
538  
        else if(active_eof_ops_)
539  
            active_eof_ops_->destroy(cached_awaitable_);
539  
            active_eof_ops_->destroy(cached_awaitable_);
540  
        ::operator delete(cached_awaitable_);
540  
        ::operator delete(cached_awaitable_);
541  
    }
541  
    }
542  
}
542  
}
543  

543  

544  
inline any_write_sink&
544  
inline any_write_sink&
545  
any_write_sink::operator=(any_write_sink&& other) noexcept
545  
any_write_sink::operator=(any_write_sink&& other) noexcept
546  
{
546  
{
547  
    if(this != &other)
547  
    if(this != &other)
548  
    {
548  
    {
549  
        if(storage_)
549  
        if(storage_)
550  
        {
550  
        {
551  
            vt_->destroy(sink_);
551  
            vt_->destroy(sink_);
552  
            ::operator delete(storage_);
552  
            ::operator delete(storage_);
553  
        }
553  
        }
554  
        if(cached_awaitable_)
554  
        if(cached_awaitable_)
555  
        {
555  
        {
556  
            if(active_write_ops_)
556  
            if(active_write_ops_)
557  
                active_write_ops_->destroy(cached_awaitable_);
557  
                active_write_ops_->destroy(cached_awaitable_);
558  
            else if(active_eof_ops_)
558  
            else if(active_eof_ops_)
559  
                active_eof_ops_->destroy(cached_awaitable_);
559  
                active_eof_ops_->destroy(cached_awaitable_);
560  
            ::operator delete(cached_awaitable_);
560  
            ::operator delete(cached_awaitable_);
561  
        }
561  
        }
562  
        sink_ = std::exchange(other.sink_, nullptr);
562  
        sink_ = std::exchange(other.sink_, nullptr);
563  
        vt_ = std::exchange(other.vt_, nullptr);
563  
        vt_ = std::exchange(other.vt_, nullptr);
564  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
564  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
565  
        storage_ = std::exchange(other.storage_, nullptr);
565  
        storage_ = std::exchange(other.storage_, nullptr);
566  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
566  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
567  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
567  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
568  
    }
568  
    }
569  
    return *this;
569  
    return *this;
570  
}
570  
}
571  

571  

572  
template<WriteSink S>
572  
template<WriteSink S>
573  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
573  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
574  
any_write_sink::any_write_sink(S s)
574  
any_write_sink::any_write_sink(S s)
575  
    : vt_(&vtable_for_impl<S>::value)
575  
    : vt_(&vtable_for_impl<S>::value)
576  
{
576  
{
577  
    struct guard {
577  
    struct guard {
578  
        any_write_sink* self;
578  
        any_write_sink* self;
579  
        bool committed = false;
579  
        bool committed = false;
580  
        ~guard() {
580  
        ~guard() {
581  
            if(!committed && self->storage_) {
581  
            if(!committed && self->storage_) {
582  
                self->vt_->destroy(self->sink_);
582  
                self->vt_->destroy(self->sink_);
583  
                ::operator delete(self->storage_);
583  
                ::operator delete(self->storage_);
584  
                self->storage_ = nullptr;
584  
                self->storage_ = nullptr;
585  
                self->sink_ = nullptr;
585  
                self->sink_ = nullptr;
586  
            }
586  
            }
587  
        }
587  
        }
588  
    } g{this};
588  
    } g{this};
589  

589  

590  
    storage_ = ::operator new(sizeof(S));
590  
    storage_ = ::operator new(sizeof(S));
591  
    sink_ = ::new(storage_) S(std::move(s));
591  
    sink_ = ::new(storage_) S(std::move(s));
592  

592  

593  
    // Preallocate the awaitable storage (sized for max of write/eof)
593  
    // Preallocate the awaitable storage (sized for max of write/eof)
594  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
594  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
595  

595  

596  
    g.committed = true;
596  
    g.committed = true;
597  
}
597  
}
598  

598  

599  
template<WriteSink S>
599  
template<WriteSink S>
600  
any_write_sink::any_write_sink(S* s)
600  
any_write_sink::any_write_sink(S* s)
601  
    : sink_(s)
601  
    : sink_(s)
602  
    , vt_(&vtable_for_impl<S>::value)
602  
    , vt_(&vtable_for_impl<S>::value)
603  
{
603  
{
604  
    // Preallocate the awaitable storage (sized for max of write/eof)
604  
    // Preallocate the awaitable storage (sized for max of write/eof)
605  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
605  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
606  
}
606  
}
607  

607  

608  
//----------------------------------------------------------
608  
//----------------------------------------------------------
609  

609  

610  
inline auto
610  
inline auto
611  
any_write_sink::write_some_(
611  
any_write_sink::write_some_(
612  
    std::span<const_buffer const> buffers)
612  
    std::span<const_buffer const> buffers)
613  
{
613  
{
614  
    struct awaitable
614  
    struct awaitable
615  
    {
615  
    {
616  
        any_write_sink* self_;
616  
        any_write_sink* self_;
617  
        std::span<const_buffer const> buffers_;
617  
        std::span<const_buffer const> buffers_;
618  

618  

619  
        bool
619  
        bool
620  
        await_ready() const noexcept
620  
        await_ready() const noexcept
621  
        {
621  
        {
622  
            return false;
622  
            return false;
623  
        }
623  
        }
624  

624  

625  
        std::coroutine_handle<>
625  
        std::coroutine_handle<>
626  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
626  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
627  
        {
627  
        {
628  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
628  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
629  
                self_->sink_,
629  
                self_->sink_,
630  
                self_->cached_awaitable_,
630  
                self_->cached_awaitable_,
631  
                buffers_);
631  
                buffers_);
632  

632  

633  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
633  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
634  
                return h;
634  
                return h;
635  

635  

636  
            return self_->active_write_ops_->await_suspend(
636  
            return self_->active_write_ops_->await_suspend(
637  
                self_->cached_awaitable_, h, env);
637  
                self_->cached_awaitable_, h, env);
638  
        }
638  
        }
639  

639  

640  
        io_result<std::size_t>
640  
        io_result<std::size_t>
641  
        await_resume()
641  
        await_resume()
642  
        {
642  
        {
643  
            struct guard {
643  
            struct guard {
644  
                any_write_sink* self;
644  
                any_write_sink* self;
645  
                ~guard() {
645  
                ~guard() {
646  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
646  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
647  
                    self->active_write_ops_ = nullptr;
647  
                    self->active_write_ops_ = nullptr;
648  
                }
648  
                }
649  
            } g{self_};
649  
            } g{self_};
650  
            return self_->active_write_ops_->await_resume(
650  
            return self_->active_write_ops_->await_resume(
651  
                self_->cached_awaitable_);
651  
                self_->cached_awaitable_);
652  
        }
652  
        }
653  
    };
653  
    };
654  
    return awaitable{this, buffers};
654  
    return awaitable{this, buffers};
655  
}
655  
}
656  

656  

657  
inline auto
657  
inline auto
658  
any_write_sink::write_(
658  
any_write_sink::write_(
659  
    std::span<const_buffer const> buffers)
659  
    std::span<const_buffer const> buffers)
660  
{
660  
{
661  
    struct awaitable
661  
    struct awaitable
662  
    {
662  
    {
663  
        any_write_sink* self_;
663  
        any_write_sink* self_;
664  
        std::span<const_buffer const> buffers_;
664  
        std::span<const_buffer const> buffers_;
665  

665  

666  
        bool
666  
        bool
667  
        await_ready() const noexcept
667  
        await_ready() const noexcept
668  
        {
668  
        {
669  
            return false;
669  
            return false;
670  
        }
670  
        }
671  

671  

672  
        std::coroutine_handle<>
672  
        std::coroutine_handle<>
673  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
673  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
674  
        {
674  
        {
675  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
675  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
676  
                self_->sink_,
676  
                self_->sink_,
677  
                self_->cached_awaitable_,
677  
                self_->cached_awaitable_,
678  
                buffers_);
678  
                buffers_);
679  

679  

680  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
680  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
681  
                return h;
681  
                return h;
682  

682  

683  
            return self_->active_write_ops_->await_suspend(
683  
            return self_->active_write_ops_->await_suspend(
684  
                self_->cached_awaitable_, h, env);
684  
                self_->cached_awaitable_, h, env);
685  
        }
685  
        }
686  

686  

687  
        io_result<std::size_t>
687  
        io_result<std::size_t>
688  
        await_resume()
688  
        await_resume()
689  
        {
689  
        {
690  
            struct guard {
690  
            struct guard {
691  
                any_write_sink* self;
691  
                any_write_sink* self;
692  
                ~guard() {
692  
                ~guard() {
693  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
693  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
694  
                    self->active_write_ops_ = nullptr;
694  
                    self->active_write_ops_ = nullptr;
695  
                }
695  
                }
696  
            } g{self_};
696  
            } g{self_};
697  
            return self_->active_write_ops_->await_resume(
697  
            return self_->active_write_ops_->await_resume(
698  
                self_->cached_awaitable_);
698  
                self_->cached_awaitable_);
699  
        }
699  
        }
700  
    };
700  
    };
701  
    return awaitable{this, buffers};
701  
    return awaitable{this, buffers};
702  
}
702  
}
703  

703  

704  
inline auto
704  
inline auto
705  
any_write_sink::write_eof()
705  
any_write_sink::write_eof()
706  
{
706  
{
707  
    struct awaitable
707  
    struct awaitable
708  
    {
708  
    {
709  
        any_write_sink* self_;
709  
        any_write_sink* self_;
710  

710  

711  
        bool
711  
        bool
712  
        await_ready() const noexcept
712  
        await_ready() const noexcept
713  
        {
713  
        {
714  
            return false;
714  
            return false;
715  
        }
715  
        }
716  

716  

717  
        std::coroutine_handle<>
717  
        std::coroutine_handle<>
718  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
718  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
719  
        {
719  
        {
720  
            // Construct the underlying awaitable into cached storage
720  
            // Construct the underlying awaitable into cached storage
721  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
721  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
722  
                self_->sink_,
722  
                self_->sink_,
723  
                self_->cached_awaitable_);
723  
                self_->cached_awaitable_);
724  

724  

725  
            // Check if underlying is immediately ready
725  
            // Check if underlying is immediately ready
726  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
726  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
727  
                return h;
727  
                return h;
728  

728  

729  
            // Forward to underlying awaitable
729  
            // Forward to underlying awaitable
730  
            return self_->active_eof_ops_->await_suspend(
730  
            return self_->active_eof_ops_->await_suspend(
731  
                self_->cached_awaitable_, h, env);
731  
                self_->cached_awaitable_, h, env);
732  
        }
732  
        }
733  

733  

734  
        io_result<>
734  
        io_result<>
735  
        await_resume()
735  
        await_resume()
736  
        {
736  
        {
737  
            struct guard {
737  
            struct guard {
738  
                any_write_sink* self;
738  
                any_write_sink* self;
739  
                ~guard() {
739  
                ~guard() {
740  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
740  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
741  
                    self->active_eof_ops_ = nullptr;
741  
                    self->active_eof_ops_ = nullptr;
742  
                }
742  
                }
743  
            } g{self_};
743  
            } g{self_};
744  
            return self_->active_eof_ops_->await_resume(
744  
            return self_->active_eof_ops_->await_resume(
745  
                self_->cached_awaitable_);
745  
                self_->cached_awaitable_);
746  
        }
746  
        }
747  
    };
747  
    };
748  
    return awaitable{this};
748  
    return awaitable{this};
749  
}
749  
}
750  

750  

751  
inline auto
751  
inline auto
752  
any_write_sink::write_eof_buffers_(
752  
any_write_sink::write_eof_buffers_(
753  
    std::span<const_buffer const> buffers)
753  
    std::span<const_buffer const> buffers)
754  
{
754  
{
755  
    struct awaitable
755  
    struct awaitable
756  
    {
756  
    {
757  
        any_write_sink* self_;
757  
        any_write_sink* self_;
758  
        std::span<const_buffer const> buffers_;
758  
        std::span<const_buffer const> buffers_;
759  

759  

760  
        bool
760  
        bool
761  
        await_ready() const noexcept
761  
        await_ready() const noexcept
762  
        {
762  
        {
763  
            return false;
763  
            return false;
764  
        }
764  
        }
765  

765  

766  
        std::coroutine_handle<>
766  
        std::coroutine_handle<>
767  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
767  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
768  
        {
768  
        {
769  
            self_->active_write_ops_ =
769  
            self_->active_write_ops_ =
770  
                self_->vt_->construct_write_eof_buffers_awaitable(
770  
                self_->vt_->construct_write_eof_buffers_awaitable(
771  
                    self_->sink_,
771  
                    self_->sink_,
772  
                    self_->cached_awaitable_,
772  
                    self_->cached_awaitable_,
773  
                    buffers_);
773  
                    buffers_);
774  

774  

775  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
775  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
776  
                return h;
776  
                return h;
777  

777  

778  
            return self_->active_write_ops_->await_suspend(
778  
            return self_->active_write_ops_->await_suspend(
779  
                self_->cached_awaitable_, h, env);
779  
                self_->cached_awaitable_, h, env);
780  
        }
780  
        }
781  

781  

782  
        io_result<std::size_t>
782  
        io_result<std::size_t>
783  
        await_resume()
783  
        await_resume()
784  
        {
784  
        {
785  
            struct guard {
785  
            struct guard {
786  
                any_write_sink* self;
786  
                any_write_sink* self;
787  
                ~guard() {
787  
                ~guard() {
788  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
788  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
789  
                    self->active_write_ops_ = nullptr;
789  
                    self->active_write_ops_ = nullptr;
790  
                }
790  
                }
791  
            } g{self_};
791  
            } g{self_};
792  
            return self_->active_write_ops_->await_resume(
792  
            return self_->active_write_ops_->await_resume(
793  
                self_->cached_awaitable_);
793  
                self_->cached_awaitable_);
794  
        }
794  
        }
795  
    };
795  
    };
796  
    return awaitable{this, buffers};
796  
    return awaitable{this, buffers};
797  
}
797  
}
798  

798  

799  
template<ConstBufferSequence CB>
799  
template<ConstBufferSequence CB>
800  
auto
800  
auto
801  
any_write_sink::write_some(CB buffers)
801  
any_write_sink::write_some(CB buffers)
802  
{
802  
{
803  
    struct awaitable
803  
    struct awaitable
804  
    {
804  
    {
805  
        any_write_sink* self_;
805  
        any_write_sink* self_;
806  
        const_buffer_array<detail::max_iovec_> ba_;
806  
        const_buffer_array<detail::max_iovec_> ba_;
807  

807  

808  
        awaitable(
808  
        awaitable(
809  
            any_write_sink* self,
809  
            any_write_sink* self,
810  
            CB const& buffers)
810  
            CB const& buffers)
811  
            : self_(self)
811  
            : self_(self)
812  
            , ba_(buffers)
812  
            , ba_(buffers)
813  
        {
813  
        {
814  
        }
814  
        }
815  

815  

816  
        bool
816  
        bool
817  
        await_ready() const noexcept
817  
        await_ready() const noexcept
818  
        {
818  
        {
819  
            return ba_.to_span().empty();
819  
            return ba_.to_span().empty();
820  
        }
820  
        }
821  

821  

822  
        std::coroutine_handle<>
822  
        std::coroutine_handle<>
823  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
823  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
824  
        {
824  
        {
825  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
825  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
826  
                self_->sink_,
826  
                self_->sink_,
827  
                self_->cached_awaitable_,
827  
                self_->cached_awaitable_,
828  
                ba_.to_span());
828  
                ba_.to_span());
829  

829  

830  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
830  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
831  
                return h;
831  
                return h;
832  

832  

833  
            return self_->active_write_ops_->await_suspend(
833  
            return self_->active_write_ops_->await_suspend(
834  
                self_->cached_awaitable_, h, env);
834  
                self_->cached_awaitable_, h, env);
835  
        }
835  
        }
836  

836  

837  
        io_result<std::size_t>
837  
        io_result<std::size_t>
838  
        await_resume()
838  
        await_resume()
839  
        {
839  
        {
840  
            if(ba_.to_span().empty())
840  
            if(ba_.to_span().empty())
841  
                return {{}, 0};
841  
                return {{}, 0};
842  

842  

843  
            struct guard {
843  
            struct guard {
844  
                any_write_sink* self;
844  
                any_write_sink* self;
845  
                ~guard() {
845  
                ~guard() {
846  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
846  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
847  
                    self->active_write_ops_ = nullptr;
847  
                    self->active_write_ops_ = nullptr;
848  
                }
848  
                }
849  
            } g{self_};
849  
            } g{self_};
850  
            return self_->active_write_ops_->await_resume(
850  
            return self_->active_write_ops_->await_resume(
851  
                self_->cached_awaitable_);
851  
                self_->cached_awaitable_);
852  
        }
852  
        }
853  
    };
853  
    };
854  
    return awaitable{this, buffers};
854  
    return awaitable{this, buffers};
855  
}
855  
}
856  

856  

857  
template<ConstBufferSequence CB>
857  
template<ConstBufferSequence CB>
858  
io_task<std::size_t>
858  
io_task<std::size_t>
859  
any_write_sink::write(CB buffers)
859  
any_write_sink::write(CB buffers)
860  
{
860  
{
861  
    buffer_param<CB> bp(buffers);
861  
    buffer_param<CB> bp(buffers);
862  
    std::size_t total = 0;
862  
    std::size_t total = 0;
863  

863  

864  
    for(;;)
864  
    for(;;)
865  
    {
865  
    {
866  
        auto bufs = bp.data();
866  
        auto bufs = bp.data();
867  
        if(bufs.empty())
867  
        if(bufs.empty())
868  
            break;
868  
            break;
869  

869  

870  
        auto [ec, n] = co_await write_(bufs);
870  
        auto [ec, n] = co_await write_(bufs);
871  
        total += n;
871  
        total += n;
872  
        if(ec)
872  
        if(ec)
873  
            co_return {ec, total};
873  
            co_return {ec, total};
874  
        bp.consume(n);
874  
        bp.consume(n);
875  
    }
875  
    }
876  

876  

877  
    co_return {{}, total};
877  
    co_return {{}, total};
878  
}
878  
}
879  

879  

880  
template<ConstBufferSequence CB>
880  
template<ConstBufferSequence CB>
881  
io_task<std::size_t>
881  
io_task<std::size_t>
882  
any_write_sink::write_eof(CB buffers)
882  
any_write_sink::write_eof(CB buffers)
883  
{
883  
{
884  
    const_buffer_param<CB> bp(buffers);
884  
    const_buffer_param<CB> bp(buffers);
885  
    std::size_t total = 0;
885  
    std::size_t total = 0;
886  

886  

887  
    for(;;)
887  
    for(;;)
888  
    {
888  
    {
889  
        auto bufs = bp.data();
889  
        auto bufs = bp.data();
890  
        if(bufs.empty())
890  
        if(bufs.empty())
891  
        {
891  
        {
892  
            auto [ec] = co_await write_eof();
892  
            auto [ec] = co_await write_eof();
893  
            co_return {ec, total};
893  
            co_return {ec, total};
894  
        }
894  
        }
895  

895  

896  
        if(! bp.more())
896  
        if(! bp.more())
897  
        {
897  
        {
898  
            // Last window — send atomically with EOF
898  
            // Last window — send atomically with EOF
899  
            auto [ec, n] = co_await write_eof_buffers_(bufs);
899  
            auto [ec, n] = co_await write_eof_buffers_(bufs);
900  
            total += n;
900  
            total += n;
901  
            co_return {ec, total};
901  
            co_return {ec, total};
902  
        }
902  
        }
903  

903  

904  
        auto [ec, n] = co_await write_(bufs);
904  
        auto [ec, n] = co_await write_(bufs);
905  
        total += n;
905  
        total += n;
906  
        if(ec)
906  
        if(ec)
907  
            co_return {ec, total};
907  
            co_return {ec, total};
908  
        bp.consume(n);
908  
        bp.consume(n);
909  
    }
909  
    }
910  
}
910  
}
911  

911  

912  
} // namespace capy
912  
} // namespace capy
913  
} // namespace boost
913  
} // namespace boost
914  

914  

915  
#endif
915  
#endif