Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 : #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_array.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/read_stream.hpp>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/io_result.hpp>
21 :
22 : #include <concepts>
23 : #include <coroutine>
24 : #include <cstddef>
25 : #include <new>
26 : #include <span>
27 : #include <stop_token>
28 : #include <system_error>
29 : #include <utility>
30 :
31 : namespace boost {
32 : namespace capy {
33 :
34 : /** Type-erased wrapper for any ReadStream.
35 :
36 : This class provides type erasure for any type satisfying the
37 : @ref ReadStream concept, enabling runtime polymorphism for
38 : read operations. It uses cached awaitable storage to achieve
39 : zero steady-state allocation after construction.
40 :
41 : The wrapper supports two construction modes:
42 : - **Owning**: Pass by value to transfer ownership. The wrapper
43 : allocates storage and owns the stream.
44 : - **Reference**: Pass a pointer to wrap without ownership. The
45 : pointed-to stream must outlive this wrapper.
46 :
47 : @par Awaitable Preallocation
48 : The constructor preallocates storage for the type-erased awaitable.
49 : This reserves all virtual address space at server startup
50 : so memory usage can be measured up front, rather than
51 : allocating piecemeal as traffic arrives.
52 :
53 : @par Immediate Completion
54 : When the underlying stream's awaitable reports ready immediately
55 : (e.g. buffered data already available), the wrapper skips
56 : coroutine suspension entirely and returns the result inline.
57 :
58 : @par Thread Safety
59 : Not thread-safe. Concurrent operations on the same wrapper
60 : are undefined behavior.
61 :
62 : @par Example
63 : @code
64 : // Owning - takes ownership of the stream
65 : any_read_stream stream(socket{ioc});
66 :
67 : // Reference - wraps without ownership
68 : socket sock(ioc);
69 : any_read_stream stream(&sock);
70 :
71 : mutable_buffer buf(data, size);
72 : auto [ec, n] = co_await stream.read_some(buf);
73 : @endcode
74 :
75 : @see any_write_stream, any_stream, ReadStream
76 : */
77 : class any_read_stream
78 : {
79 : struct vtable;
80 :
81 : template<ReadStream S>
82 : struct vtable_for_impl;
83 :
84 : // ordered for cache line coherence
85 : void* stream_ = nullptr;
86 : vtable const* vt_ = nullptr;
87 : void* cached_awaitable_ = nullptr;
88 : void* storage_ = nullptr;
89 : bool awaitable_active_ = false;
90 :
91 : public:
92 : /** Destructor.
93 :
94 : Destroys the owned stream (if any) and releases the cached
95 : awaitable storage.
96 : */
97 : ~any_read_stream();
98 :
99 : /** Default constructor.
100 :
101 : Constructs an empty wrapper. Operations on a default-constructed
102 : wrapper result in undefined behavior.
103 : */
104 1 : any_read_stream() = default;
105 :
106 : /** Non-copyable.
107 :
108 : The awaitable cache is per-instance and cannot be shared.
109 : */
110 : any_read_stream(any_read_stream const&) = delete;
111 : any_read_stream& operator=(any_read_stream const&) = delete;
112 :
113 : /** Move constructor.
114 :
115 : Transfers ownership of the wrapped stream (if owned) and
116 : cached awaitable storage from `other`. After the move, `other` is
117 : in a default-constructed state.
118 :
119 : @param other The wrapper to move from.
120 : */
121 2 : any_read_stream(any_read_stream&& other) noexcept
122 2 : : stream_(std::exchange(other.stream_, nullptr))
123 2 : , vt_(std::exchange(other.vt_, nullptr))
124 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125 2 : , storage_(std::exchange(other.storage_, nullptr))
126 2 : , awaitable_active_(std::exchange(other.awaitable_active_, false))
127 : {
128 2 : }
129 :
130 : /** Move assignment operator.
131 :
132 : Destroys any owned stream and releases existing resources,
133 : then transfers ownership from `other`.
134 :
135 : @param other The wrapper to move from.
136 : @return Reference to this wrapper.
137 : */
138 : any_read_stream&
139 : operator=(any_read_stream&& other) noexcept;
140 :
141 : /** Construct by taking ownership of a ReadStream.
142 :
143 : Allocates storage and moves the stream into this wrapper.
144 : The wrapper owns the stream and will destroy it.
145 :
146 : @param s The stream to take ownership of.
147 : */
148 : template<ReadStream S>
149 : requires (!std::same_as<std::decay_t<S>, any_read_stream>)
150 : any_read_stream(S s);
151 :
152 : /** Construct by wrapping a ReadStream without ownership.
153 :
154 : Wraps the given stream by pointer. The stream must remain
155 : valid for the lifetime of this wrapper.
156 :
157 : @param s Pointer to the stream to wrap.
158 : */
159 : template<ReadStream S>
160 : any_read_stream(S* s);
161 :
162 : /** Check if the wrapper contains a valid stream.
163 :
164 : @return `true` if wrapping a stream, `false` if default-constructed
165 : or moved-from.
166 : */
167 : bool
168 25 : has_value() const noexcept
169 : {
170 25 : return stream_ != nullptr;
171 : }
172 :
173 : /** Check if the wrapper contains a valid stream.
174 :
175 : @return `true` if wrapping a stream, `false` if default-constructed
176 : or moved-from.
177 : */
178 : explicit
179 3 : operator bool() const noexcept
180 : {
181 3 : return has_value();
182 : }
183 :
184 : /** Initiate an asynchronous read operation.
185 :
186 : Reads data into the provided buffer sequence. The operation
187 : completes when at least one byte has been read, or an error
188 : occurs.
189 :
190 : @param buffers The buffer sequence to read into. Passed by
191 : value to ensure the sequence lives in the coroutine frame
192 : across suspension points.
193 :
194 : @return An awaitable yielding `(error_code,std::size_t)`.
195 :
196 : @par Immediate Completion
197 : The operation completes immediately without suspending
198 : the calling coroutine when the underlying stream's
199 : awaitable reports immediate readiness via `await_ready`.
200 :
201 : @note This is a partial operation and may not process the
202 : entire buffer sequence. Use the composed @ref read algorithm
203 : for guaranteed complete transfer.
204 :
205 : @par Preconditions
206 : The wrapper must contain a valid stream (`has_value() == true`).
207 : The caller must not call this function again after a prior
208 : call returned an error (including EOF).
209 : */
210 : template<MutableBufferSequence MB>
211 : auto
212 : read_some(MB buffers);
213 :
214 : protected:
215 : /** Rebind to a new stream after move.
216 :
217 : Updates the internal pointer to reference a new stream object.
218 : Used by owning wrappers after move assignment when the owned
219 : object has moved to a new location.
220 :
221 : @param new_stream The new stream to bind to. Must be the same
222 : type as the original stream.
223 :
224 : @note Terminates if called with a stream of different type
225 : than the original.
226 : */
227 : template<ReadStream S>
228 : void
229 : rebind(S& new_stream) noexcept
230 : {
231 : if(vt_ != &vtable_for_impl<S>::value)
232 : std::terminate();
233 : stream_ = &new_stream;
234 : }
235 : };
236 :
237 : //----------------------------------------------------------
238 :
239 : struct any_read_stream::vtable
240 : {
241 : // ordered by call frequency for cache line coherence
242 : void (*construct_awaitable)(
243 : void* stream,
244 : void* storage,
245 : std::span<mutable_buffer const> buffers);
246 : bool (*await_ready)(void*);
247 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248 : io_result<std::size_t> (*await_resume)(void*);
249 : void (*destroy_awaitable)(void*) noexcept;
250 : std::size_t awaitable_size;
251 : std::size_t awaitable_align;
252 : void (*destroy)(void*) noexcept;
253 : };
254 :
255 : template<ReadStream S>
256 : struct any_read_stream::vtable_for_impl
257 : {
258 : using Awaitable = decltype(std::declval<S&>().read_some(
259 : std::span<mutable_buffer const>{}));
260 :
261 : static void
262 1 : do_destroy_impl(void* stream) noexcept
263 : {
264 1 : static_cast<S*>(stream)->~S();
265 1 : }
266 :
267 : static void
268 91 : construct_awaitable_impl(
269 : void* stream,
270 : void* storage,
271 : std::span<mutable_buffer const> buffers)
272 : {
273 91 : auto& s = *static_cast<S*>(stream);
274 91 : ::new(storage) Awaitable(s.read_some(buffers));
275 91 : }
276 :
277 : static constexpr vtable value = {
278 : &construct_awaitable_impl,
279 91 : +[](void* p) {
280 91 : return static_cast<Awaitable*>(p)->await_ready();
281 : },
282 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283 0 : return detail::call_await_suspend(
284 0 : static_cast<Awaitable*>(p), h, env);
285 : },
286 89 : +[](void* p) {
287 89 : return static_cast<Awaitable*>(p)->await_resume();
288 : },
289 93 : +[](void* p) noexcept {
290 16 : static_cast<Awaitable*>(p)->~Awaitable();
291 : },
292 : sizeof(Awaitable),
293 : alignof(Awaitable),
294 : &do_destroy_impl
295 : };
296 : };
297 :
298 : //----------------------------------------------------------
299 :
300 : inline
301 101 : any_read_stream::~any_read_stream()
302 : {
303 101 : if(storage_)
304 : {
305 1 : vt_->destroy(stream_);
306 1 : ::operator delete(storage_);
307 : }
308 101 : if(cached_awaitable_)
309 : {
310 91 : if(awaitable_active_)
311 1 : vt_->destroy_awaitable(cached_awaitable_);
312 91 : ::operator delete(cached_awaitable_);
313 : }
314 101 : }
315 :
316 : inline any_read_stream&
317 5 : any_read_stream::operator=(any_read_stream&& other) noexcept
318 : {
319 5 : if(this != &other)
320 : {
321 5 : if(storage_)
322 : {
323 0 : vt_->destroy(stream_);
324 0 : ::operator delete(storage_);
325 : }
326 5 : if(cached_awaitable_)
327 : {
328 2 : if(awaitable_active_)
329 1 : vt_->destroy_awaitable(cached_awaitable_);
330 2 : ::operator delete(cached_awaitable_);
331 : }
332 5 : stream_ = std::exchange(other.stream_, nullptr);
333 5 : vt_ = std::exchange(other.vt_, nullptr);
334 5 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
335 5 : storage_ = std::exchange(other.storage_, nullptr);
336 5 : awaitable_active_ = std::exchange(other.awaitable_active_, false);
337 : }
338 5 : return *this;
339 : }
340 :
341 : template<ReadStream S>
342 : requires (!std::same_as<std::decay_t<S>, any_read_stream>)
343 1 : any_read_stream::any_read_stream(S s)
344 1 : : vt_(&vtable_for_impl<S>::value)
345 : {
346 : struct guard {
347 : any_read_stream* self;
348 : bool committed = false;
349 1 : ~guard() {
350 1 : if(!committed && self->storage_) {
351 0 : self->vt_->destroy(self->stream_);
352 0 : ::operator delete(self->storage_);
353 0 : self->storage_ = nullptr;
354 0 : self->stream_ = nullptr;
355 : }
356 1 : }
357 1 : } g{this};
358 :
359 1 : storage_ = ::operator new(sizeof(S));
360 1 : stream_ = ::new(storage_) S(std::move(s));
361 :
362 : // Preallocate the awaitable storage
363 1 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
364 :
365 1 : g.committed = true;
366 1 : }
367 :
368 : template<ReadStream S>
369 92 : any_read_stream::any_read_stream(S* s)
370 92 : : stream_(s)
371 92 : , vt_(&vtable_for_impl<S>::value)
372 : {
373 : // Preallocate the awaitable storage
374 92 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
375 92 : }
376 :
377 : //----------------------------------------------------------
378 :
379 : template<MutableBufferSequence MB>
380 : auto
381 91 : any_read_stream::read_some(MB buffers)
382 : {
383 : // VFALCO in theory, we could use if constexpr to detect a
384 : // span and then pass that through to read_some without the array
385 : struct awaitable
386 : {
387 : any_read_stream* self_;
388 : mutable_buffer_array<detail::max_iovec_> ba_;
389 :
390 : bool
391 91 : await_ready()
392 : {
393 91 : self_->vt_->construct_awaitable(
394 91 : self_->stream_,
395 91 : self_->cached_awaitable_,
396 91 : ba_.to_span());
397 91 : self_->awaitable_active_ = true;
398 :
399 182 : return self_->vt_->await_ready(
400 91 : self_->cached_awaitable_);
401 : }
402 :
403 : std::coroutine_handle<>
404 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
405 : {
406 0 : return self_->vt_->await_suspend(
407 0 : self_->cached_awaitable_, h, env);
408 : }
409 :
410 : io_result<std::size_t>
411 89 : await_resume()
412 : {
413 : struct guard {
414 : any_read_stream* self;
415 89 : ~guard() {
416 89 : self->vt_->destroy_awaitable(self->cached_awaitable_);
417 89 : self->awaitable_active_ = false;
418 89 : }
419 89 : } g{self_};
420 89 : return self_->vt_->await_resume(
421 154 : self_->cached_awaitable_);
422 89 : }
423 : };
424 : return awaitable{this,
425 91 : mutable_buffer_array<detail::max_iovec_>(buffers)};
426 91 : }
427 :
428 : } // namespace capy
429 : } // namespace boost
430 :
431 : #endif
|