Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_array.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <boost/capy/concept/read_source.hpp>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/io_result.hpp>
22 : #include <boost/capy/io_task.hpp>
23 :
24 : #include <concepts>
25 : #include <coroutine>
26 : #include <cstddef>
27 : #include <new>
28 : #include <span>
29 : #include <stop_token>
30 : #include <system_error>
31 : #include <utility>
32 :
33 : namespace boost {
34 : namespace capy {
35 :
36 : /** Type-erased wrapper for any ReadSource.
37 :
38 : This class provides type erasure for any type satisfying the
39 : @ref ReadSource concept, enabling runtime polymorphism for
40 : source read operations. It uses cached awaitable storage to achieve
41 : zero steady-state allocation after construction.
42 :
43 : The wrapper supports two construction modes:
44 : - **Owning**: Pass by value to transfer ownership. The wrapper
45 : allocates storage and owns the source.
46 : - **Reference**: Pass a pointer to wrap without ownership. The
47 : pointed-to source must outlive this wrapper.
48 :
49 : @par Awaitable Preallocation
50 : The constructor preallocates storage for the type-erased awaitable.
51 : This reserves all virtual address space at server startup
52 : so memory usage can be measured up front, rather than
53 : allocating piecemeal as traffic arrives.
54 :
55 : @par Immediate Completion
56 : Operations complete immediately without suspending when the
57 : buffer sequence is empty, or when the underlying source's
58 : awaitable reports readiness via `await_ready`.
59 :
60 : @par Thread Safety
61 : Not thread-safe. Concurrent operations on the same wrapper
62 : are undefined behavior.
63 :
64 : @par Example
65 : @code
66 : // Owning - takes ownership of the source
67 : any_read_source rs(some_source{args...});
68 :
69 : // Reference - wraps without ownership
70 : some_source source;
71 : any_read_source rs(&source);
72 :
73 : mutable_buffer buf(data, size);
74 : auto [ec, n] = co_await rs.read(std::span(&buf, 1));
75 : @endcode
76 :
77 : @see any_read_stream, ReadSource
78 : */
79 : class any_read_source
80 : {
81 : struct vtable;
82 : struct awaitable_ops;
83 :
84 : template<ReadSource S>
85 : struct vtable_for_impl;
86 :
87 : void* source_ = nullptr;
88 : vtable const* vt_ = nullptr;
89 : void* cached_awaitable_ = nullptr;
90 : void* storage_ = nullptr;
91 : awaitable_ops const* active_ops_ = nullptr;
92 :
93 : public:
94 : /** Destructor.
95 :
96 : Destroys the owned source (if any) and releases the cached
97 : awaitable storage.
98 : */
99 : ~any_read_source();
100 :
101 : /** Default constructor.
102 :
103 : Constructs an empty wrapper. Operations on a default-constructed
104 : wrapper result in undefined behavior.
105 : */
106 : any_read_source() = default;
107 :
108 : /** Non-copyable.
109 :
110 : The awaitable cache is per-instance and cannot be shared.
111 : */
112 : any_read_source(any_read_source const&) = delete;
113 : any_read_source& operator=(any_read_source const&) = delete;
114 :
115 : /** Move constructor.
116 :
117 : Transfers ownership of the wrapped source (if owned) and
118 : cached awaitable storage from `other`. After the move, `other` is
119 : in a default-constructed state.
120 :
121 : @param other The wrapper to move from.
122 : */
123 1 : any_read_source(any_read_source&& other) noexcept
124 1 : : source_(std::exchange(other.source_, nullptr))
125 1 : , vt_(std::exchange(other.vt_, nullptr))
126 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 1 : , storage_(std::exchange(other.storage_, nullptr))
128 1 : , active_ops_(std::exchange(other.active_ops_, nullptr))
129 : {
130 1 : }
131 :
132 : /** Move assignment operator.
133 :
134 : Destroys any owned source and releases existing resources,
135 : then transfers ownership from `other`.
136 :
137 : @param other The wrapper to move from.
138 : @return Reference to this wrapper.
139 : */
140 : any_read_source&
141 : operator=(any_read_source&& other) noexcept;
142 :
143 : /** Construct by taking ownership of a ReadSource.
144 :
145 : Allocates storage and moves the source into this wrapper.
146 : The wrapper owns the source and will destroy it.
147 :
148 : @param s The source to take ownership of.
149 : */
150 : template<ReadSource S>
151 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
152 : any_read_source(S s);
153 :
154 : /** Construct by wrapping a ReadSource without ownership.
155 :
156 : Wraps the given source by pointer. The source must remain
157 : valid for the lifetime of this wrapper.
158 :
159 : @param s Pointer to the source to wrap.
160 : */
161 : template<ReadSource S>
162 : any_read_source(S* s);
163 :
164 : /** Check if the wrapper contains a valid source.
165 :
166 : @return `true` if wrapping a source, `false` if default-constructed
167 : or moved-from.
168 : */
169 : bool
170 27 : has_value() const noexcept
171 : {
172 27 : return source_ != nullptr;
173 : }
174 :
175 : /** Check if the wrapper contains a valid source.
176 :
177 : @return `true` if wrapping a source, `false` if default-constructed
178 : or moved-from.
179 : */
180 : explicit
181 8 : operator bool() const noexcept
182 : {
183 8 : return has_value();
184 : }
185 :
186 : /** Initiate a partial read operation.
187 :
188 : Reads one or more bytes into the provided buffer sequence.
189 : May fill less than the full sequence.
190 :
191 : @param buffers The buffer sequence to read into.
192 :
193 : @return An awaitable yielding `(error_code,std::size_t)`.
194 :
195 : @par Immediate Completion
196 : The operation completes immediately without suspending
197 : the calling coroutine when:
198 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
199 : @li The underlying source's awaitable reports immediate
200 : readiness via `await_ready`.
201 :
202 : @note This is a partial operation and may not process the
203 : entire buffer sequence. Use @ref read for guaranteed
204 : complete transfer.
205 :
206 : @par Preconditions
207 : The wrapper must contain a valid source (`has_value() == true`).
208 : The caller must not call this function again after a prior
209 : call returned an error (including EOF).
210 : */
211 : template<MutableBufferSequence MB>
212 : auto
213 : read_some(MB buffers);
214 :
215 : /** Initiate a complete read operation.
216 :
217 : Reads data into the provided buffer sequence by forwarding
218 : to the underlying source's `read` operation. Large buffer
219 : sequences are processed in windows, with each window
220 : forwarded as a separate `read` call to the underlying source.
221 : The operation completes when the entire buffer sequence is
222 : filled, end-of-file is reached, or an error occurs.
223 :
224 : @param buffers The buffer sequence to read into.
225 :
226 : @return An awaitable yielding `(error_code,std::size_t)`.
227 :
228 : @par Immediate Completion
229 : The operation completes immediately without suspending
230 : the calling coroutine when:
231 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
232 : @li The underlying source's `read` awaitable reports
233 : immediate readiness via `await_ready`.
234 :
235 : @par Postconditions
236 : Exactly one of the following is true on return:
237 : @li **Success**: `!ec` and `n == buffer_size(buffers)`.
238 : The entire buffer was filled.
239 : @li **End-of-stream or Error**: `ec` and `n` indicates
240 : the number of bytes transferred before the failure.
241 :
242 : @par Preconditions
243 : The wrapper must contain a valid source (`has_value() == true`).
244 : The caller must not call this function again after a prior
245 : call returned an error (including EOF).
246 : */
247 : template<MutableBufferSequence MB>
248 : io_task<std::size_t>
249 : read(MB buffers);
250 :
251 : protected:
252 : /** Rebind to a new source after move.
253 :
254 : Updates the internal pointer to reference a new source object.
255 : Used by owning wrappers after move assignment when the owned
256 : object has moved to a new location.
257 :
258 : @param new_source The new source to bind to. Must be the same
259 : type as the original source.
260 :
261 : @note Terminates if called with a source of different type
262 : than the original.
263 : */
264 : template<ReadSource S>
265 : void
266 : rebind(S& new_source) noexcept
267 : {
268 : if(vt_ != &vtable_for_impl<S>::value)
269 : std::terminate();
270 : source_ = &new_source;
271 : }
272 :
273 : private:
274 : auto
275 : read_(std::span<mutable_buffer const> buffers);
276 : };
277 :
278 : //----------------------------------------------------------
279 :
280 : // ordered by call sequence for cache line coherence
281 : struct any_read_source::awaitable_ops
282 : {
283 : bool (*await_ready)(void*);
284 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285 : io_result<std::size_t> (*await_resume)(void*);
286 : void (*destroy)(void*) noexcept;
287 : };
288 :
289 : // ordered by call frequency for cache line coherence
290 : struct any_read_source::vtable
291 : {
292 : awaitable_ops const* (*construct_read_some_awaitable)(
293 : void* source,
294 : void* storage,
295 : std::span<mutable_buffer const> buffers);
296 : awaitable_ops const* (*construct_read_awaitable)(
297 : void* source,
298 : void* storage,
299 : std::span<mutable_buffer const> buffers);
300 : std::size_t awaitable_size;
301 : std::size_t awaitable_align;
302 : void (*destroy)(void*) noexcept;
303 : };
304 :
305 : template<ReadSource S>
306 : struct any_read_source::vtable_for_impl
307 : {
308 : using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309 : std::span<mutable_buffer const>{}));
310 : using ReadAwaitable = decltype(std::declval<S&>().read(
311 : std::span<mutable_buffer const>{}));
312 :
313 : static void
314 6 : do_destroy_impl(void* source) noexcept
315 : {
316 6 : static_cast<S*>(source)->~S();
317 6 : }
318 :
319 : static awaitable_ops const*
320 52 : construct_read_some_awaitable_impl(
321 : void* source,
322 : void* storage,
323 : std::span<mutable_buffer const> buffers)
324 : {
325 52 : auto& s = *static_cast<S*>(source);
326 52 : ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327 :
328 : static constexpr awaitable_ops ops = {
329 52 : +[](void* p) {
330 52 : return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331 : },
332 2 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
333 2 : return detail::call_await_suspend(
334 2 : static_cast<ReadSomeAwaitable*>(p), h, env);
335 : },
336 50 : +[](void* p) {
337 50 : return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338 : },
339 54 : +[](void* p) noexcept {
340 2 : static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341 : }
342 : };
343 52 : return &ops;
344 : }
345 :
346 : static awaitable_ops const*
347 116 : construct_read_awaitable_impl(
348 : void* source,
349 : void* storage,
350 : std::span<mutable_buffer const> buffers)
351 : {
352 116 : auto& s = *static_cast<S*>(source);
353 116 : ::new(storage) ReadAwaitable(s.read(buffers));
354 :
355 : static constexpr awaitable_ops ops = {
356 116 : +[](void* p) {
357 116 : return static_cast<ReadAwaitable*>(p)->await_ready();
358 : },
359 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
360 0 : return detail::call_await_suspend(
361 0 : static_cast<ReadAwaitable*>(p), h, env);
362 : },
363 116 : +[](void* p) {
364 116 : return static_cast<ReadAwaitable*>(p)->await_resume();
365 : },
366 116 : +[](void* p) noexcept {
367 0 : static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368 : }
369 : };
370 116 : return &ops;
371 : }
372 :
373 : static constexpr std::size_t max_awaitable_size =
374 : sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375 : ? sizeof(ReadSomeAwaitable)
376 : : sizeof(ReadAwaitable);
377 : static constexpr std::size_t max_awaitable_align =
378 : alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379 : ? alignof(ReadSomeAwaitable)
380 : : alignof(ReadAwaitable);
381 :
382 : static constexpr vtable value = {
383 : &construct_read_some_awaitable_impl,
384 : &construct_read_awaitable_impl,
385 : max_awaitable_size,
386 : max_awaitable_align,
387 : &do_destroy_impl
388 : };
389 : };
390 :
391 : //----------------------------------------------------------
392 :
393 : inline
394 145 : any_read_source::~any_read_source()
395 : {
396 145 : if(storage_)
397 : {
398 6 : vt_->destroy(source_);
399 6 : ::operator delete(storage_);
400 : }
401 145 : if(cached_awaitable_)
402 : {
403 139 : if(active_ops_)
404 1 : active_ops_->destroy(cached_awaitable_);
405 139 : ::operator delete(cached_awaitable_);
406 : }
407 145 : }
408 :
409 : inline any_read_source&
410 4 : any_read_source::operator=(any_read_source&& other) noexcept
411 : {
412 4 : if(this != &other)
413 : {
414 3 : if(storage_)
415 : {
416 0 : vt_->destroy(source_);
417 0 : ::operator delete(storage_);
418 : }
419 3 : if(cached_awaitable_)
420 : {
421 2 : if(active_ops_)
422 1 : active_ops_->destroy(cached_awaitable_);
423 2 : ::operator delete(cached_awaitable_);
424 : }
425 3 : source_ = std::exchange(other.source_, nullptr);
426 3 : vt_ = std::exchange(other.vt_, nullptr);
427 3 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
428 3 : storage_ = std::exchange(other.storage_, nullptr);
429 3 : active_ops_ = std::exchange(other.active_ops_, nullptr);
430 : }
431 4 : return *this;
432 : }
433 :
434 : template<ReadSource S>
435 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
436 6 : any_read_source::any_read_source(S s)
437 6 : : vt_(&vtable_for_impl<S>::value)
438 : {
439 : struct guard {
440 : any_read_source* self;
441 : bool committed = false;
442 6 : ~guard() {
443 6 : if(!committed && self->storage_) {
444 0 : self->vt_->destroy(self->source_);
445 0 : ::operator delete(self->storage_);
446 0 : self->storage_ = nullptr;
447 0 : self->source_ = nullptr;
448 : }
449 6 : }
450 6 : } g{this};
451 :
452 6 : storage_ = ::operator new(sizeof(S));
453 6 : source_ = ::new(storage_) S(std::move(s));
454 :
455 : // Preallocate the awaitable storage
456 6 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
457 :
458 6 : g.committed = true;
459 6 : }
460 :
461 : template<ReadSource S>
462 135 : any_read_source::any_read_source(S* s)
463 135 : : source_(s)
464 135 : , vt_(&vtable_for_impl<S>::value)
465 : {
466 : // Preallocate the awaitable storage
467 135 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
468 135 : }
469 :
470 : //----------------------------------------------------------
471 :
472 : template<MutableBufferSequence MB>
473 : auto
474 54 : any_read_source::read_some(MB buffers)
475 : {
476 : struct awaitable
477 : {
478 : any_read_source* self_;
479 : mutable_buffer_array<detail::max_iovec_> ba_;
480 :
481 54 : awaitable(any_read_source* self, MB const& buffers)
482 54 : : self_(self)
483 54 : , ba_(buffers)
484 : {
485 54 : }
486 :
487 : bool
488 54 : await_ready() const noexcept
489 : {
490 54 : return ba_.to_span().empty();
491 : }
492 :
493 : std::coroutine_handle<>
494 52 : await_suspend(std::coroutine_handle<> h, io_env const* env)
495 : {
496 52 : self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
497 52 : self_->source_,
498 52 : self_->cached_awaitable_,
499 52 : ba_.to_span());
500 :
501 52 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
502 50 : return h;
503 :
504 2 : return self_->active_ops_->await_suspend(
505 2 : self_->cached_awaitable_, h, env);
506 : }
507 :
508 : io_result<std::size_t>
509 52 : await_resume()
510 : {
511 52 : if(ba_.to_span().empty())
512 2 : return {{}, 0};
513 :
514 : struct guard {
515 : any_read_source* self;
516 50 : ~guard() {
517 50 : self->active_ops_->destroy(self->cached_awaitable_);
518 50 : self->active_ops_ = nullptr;
519 50 : }
520 50 : } g{self_};
521 50 : return self_->active_ops_->await_resume(
522 50 : self_->cached_awaitable_);
523 50 : }
524 : };
525 54 : return awaitable(this, buffers);
526 : }
527 :
528 : inline auto
529 116 : any_read_source::read_(std::span<mutable_buffer const> buffers)
530 : {
531 : struct awaitable
532 : {
533 : any_read_source* self_;
534 : std::span<mutable_buffer const> buffers_;
535 :
536 : bool
537 116 : await_ready() const noexcept
538 : {
539 116 : return false;
540 : }
541 :
542 : std::coroutine_handle<>
543 116 : await_suspend(std::coroutine_handle<> h, io_env const* env)
544 : {
545 232 : self_->active_ops_ = self_->vt_->construct_read_awaitable(
546 116 : self_->source_,
547 116 : self_->cached_awaitable_,
548 : buffers_);
549 :
550 116 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
551 116 : return h;
552 :
553 0 : return self_->active_ops_->await_suspend(
554 0 : self_->cached_awaitable_, h, env);
555 : }
556 :
557 : io_result<std::size_t>
558 116 : await_resume()
559 : {
560 : struct guard {
561 : any_read_source* self;
562 116 : ~guard() {
563 116 : self->active_ops_->destroy(self->cached_awaitable_);
564 116 : self->active_ops_ = nullptr;
565 116 : }
566 116 : } g{self_};
567 116 : return self_->active_ops_->await_resume(
568 200 : self_->cached_awaitable_);
569 116 : }
570 : };
571 116 : return awaitable{this, buffers};
572 : }
573 :
574 : template<MutableBufferSequence MB>
575 : io_task<std::size_t>
576 110 : any_read_source::read(MB buffers)
577 : {
578 : buffer_param bp(buffers);
579 : std::size_t total = 0;
580 :
581 : for(;;)
582 : {
583 : auto bufs = bp.data();
584 : if(bufs.empty())
585 : break;
586 :
587 : auto [ec, n] = co_await read_(bufs);
588 : total += n;
589 : if(ec)
590 : co_return {ec, total};
591 : bp.consume(n);
592 : }
593 :
594 : co_return {{}, total};
595 220 : }
596 :
597 : } // namespace capy
598 : } // namespace boost
599 :
600 : #endif
|