Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
11 : #define BOOST_CAPY_IO_WRITE_NOW_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/consuming_buffers.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_stream.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 :
24 : #include <cstddef>
25 : #include <exception>
26 : #include <new>
27 : #include <stop_token>
28 : #include <utility>
29 :
30 : #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
31 : # if defined(__GNUC__) && !defined(__clang__)
32 : # define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
33 : # else
34 : # define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
35 : # endif
36 : #endif
37 :
38 : namespace boost {
39 : namespace capy {
40 :
41 : /** Eagerly writes complete buffer sequences with frame caching.
42 :
43 : This class wraps a @ref WriteStream and provides an `operator()`
44 : that writes an entire buffer sequence, attempting to complete
45 : synchronously. If every `write_some` completes without suspending,
46 : the entire operation finishes in `await_ready` with no coroutine
47 : suspension.
48 :
49 : The class maintains a one-element coroutine frame cache. After
50 : the first call, subsequent calls reuse the cached frame memory,
51 : avoiding repeated allocation for the internal coroutine.
52 :
53 : @tparam Stream The stream type, must satisfy @ref WriteStream.
54 :
55 : @par Thread Safety
56 : Distinct objects: Safe.
57 : Shared objects: Unsafe.
58 :
59 : @par Preconditions
60 : Only one operation may be outstanding at a time. A new call to
61 : `operator()` must not be made until the previous operation has
62 : completed (i.e., the returned awaitable has been fully consumed).
63 :
64 : @par Example
65 :
66 : @code
67 : template< WriteStream Stream >
68 : task<> send_messages( Stream& stream )
69 : {
70 : write_now wn( stream );
71 : auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
72 : if( ec1 )
73 : detail::throw_system_error( ec1 );
74 : auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
75 : if( ec2 )
76 : detail::throw_system_error( ec2 );
77 : }
78 : @endcode
79 :
80 : @see write, write_some, WriteStream, ConstBufferSequence
81 : */
82 : template<class Stream>
83 : requires WriteStream<Stream>
84 : class write_now
85 : {
86 : Stream& stream_;
87 : void* cached_frame_ = nullptr;
88 : std::size_t cached_size_ = 0;
89 :
90 : struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
91 : op_type
92 : {
93 : struct promise_type
94 : {
95 : io_result<std::size_t> result_;
96 : std::exception_ptr ep_;
97 : std::coroutine_handle<> cont_{nullptr};
98 : io_env const* env_ = nullptr;
99 : bool done_ = false;
100 :
101 68 : op_type get_return_object()
102 : {
103 : return op_type{
104 : std::coroutine_handle<
105 68 : promise_type>::from_promise(*this)};
106 : }
107 :
108 68 : auto initial_suspend() noexcept
109 : {
110 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
111 68 : return std::suspend_always{};
112 : #else
113 : return std::suspend_never{};
114 : #endif
115 : }
116 :
117 68 : auto final_suspend() noexcept
118 : {
119 : struct awaiter
120 : {
121 : promise_type* p_;
122 :
123 68 : bool await_ready() const noexcept
124 : {
125 68 : return false;
126 : }
127 :
128 68 : std::coroutine_handle<> await_suspend(std::coroutine_handle<>) const noexcept
129 : {
130 68 : p_->done_ = true;
131 68 : if(!p_->cont_)
132 0 : return std::noop_coroutine();
133 68 : return p_->cont_;
134 : }
135 :
136 0 : void await_resume() const noexcept
137 : {
138 0 : }
139 : };
140 68 : return awaiter{this};
141 : }
142 :
143 46 : void return_value(
144 : io_result<std::size_t> r) noexcept
145 : {
146 46 : result_ = r;
147 46 : }
148 :
149 22 : void unhandled_exception()
150 : {
151 22 : ep_ = std::current_exception();
152 22 : }
153 :
154 : std::suspend_always yield_value(int) noexcept
155 : {
156 : return {};
157 : }
158 :
159 : template<class A>
160 84 : auto await_transform(A&& a)
161 : {
162 : using decayed = std::decay_t<A>;
163 : if constexpr (IoAwaitable<decayed>)
164 : {
165 : struct wrapper
166 : {
167 : decayed inner_;
168 : promise_type* p_;
169 :
170 84 : bool await_ready()
171 : {
172 84 : return inner_.await_ready();
173 : }
174 :
175 0 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
176 : {
177 0 : return detail::call_await_suspend(
178 : &inner_, h,
179 0 : p_->env_);
180 : }
181 :
182 84 : decltype(auto) await_resume()
183 : {
184 84 : return inner_.await_resume();
185 : }
186 : };
187 : return wrapper{
188 84 : std::forward<A>(a), this};
189 : }
190 : else
191 : {
192 : return std::forward<A>(a);
193 : }
194 : }
195 :
196 : static void*
197 68 : operator new(
198 : std::size_t size,
199 : write_now& self,
200 : auto&)
201 : {
202 68 : if(self.cached_frame_ &&
203 4 : self.cached_size_ >= size)
204 4 : return self.cached_frame_;
205 64 : void* p = ::operator new(size);
206 64 : if(self.cached_frame_)
207 0 : ::operator delete(self.cached_frame_);
208 64 : self.cached_frame_ = p;
209 64 : self.cached_size_ = size;
210 64 : return p;
211 : }
212 :
213 : static void
214 68 : operator delete(void*, std::size_t) noexcept
215 : {
216 68 : }
217 : };
218 :
219 : std::coroutine_handle<promise_type> h_;
220 :
221 136 : ~op_type()
222 : {
223 136 : if(h_)
224 68 : h_.destroy();
225 136 : }
226 :
227 : op_type(op_type const&) = delete;
228 : op_type& operator=(op_type const&) = delete;
229 :
230 68 : op_type(op_type&& other) noexcept
231 68 : : h_(std::exchange(other.h_, nullptr))
232 : {
233 68 : }
234 :
235 : op_type& operator=(op_type&&) = delete;
236 :
237 68 : bool await_ready() const noexcept
238 : {
239 68 : return h_.promise().done_;
240 : }
241 :
242 68 : std::coroutine_handle<> await_suspend(
243 : std::coroutine_handle<> cont,
244 : io_env const* env)
245 : {
246 68 : auto& p = h_.promise();
247 68 : p.cont_ = cont;
248 68 : p.env_ = env;
249 68 : return h_;
250 : }
251 :
252 68 : io_result<std::size_t> await_resume()
253 : {
254 68 : auto& p = h_.promise();
255 68 : if(p.ep_)
256 22 : std::rethrow_exception(p.ep_);
257 46 : return p.result_;
258 : }
259 :
260 : private:
261 68 : explicit op_type(
262 : std::coroutine_handle<promise_type> h)
263 68 : : h_(h)
264 : {
265 68 : }
266 : };
267 :
268 : public:
269 : /** Destructor. Frees the cached coroutine frame. */
270 64 : ~write_now()
271 : {
272 64 : if(cached_frame_)
273 64 : ::operator delete(cached_frame_);
274 64 : }
275 :
276 : /** Construct from a stream reference.
277 :
278 : @param s The stream to write to. Must outlive this object.
279 : */
280 : explicit
281 64 : write_now(Stream& s) noexcept
282 64 : : stream_(s)
283 : {
284 64 : }
285 :
286 : write_now(write_now const&) = delete;
287 : write_now& operator=(write_now const&) = delete;
288 :
289 : /** Eagerly write the entire buffer sequence.
290 :
291 : Writes data to the stream by calling `write_some` repeatedly
292 : until the entire buffer sequence is written or an error
293 : occurs. The operation attempts to complete synchronously:
294 : if every `write_some` completes without suspending, the
295 : entire operation finishes in `await_ready`.
296 :
297 : When the fast path cannot complete, the coroutine suspends
298 : and continues asynchronously. The internal coroutine frame
299 : is cached and reused across calls.
300 :
301 : @param buffers The buffer sequence to write. Passed by
302 : value to ensure the sequence lives in the coroutine
303 : frame across suspension points.
304 :
305 : @return An awaitable yielding `(error_code,std::size_t)`.
306 : On success, `n` equals `buffer_size(buffers)`. On
307 : error, `n` is the number of bytes written before the
308 : error. Compare error codes to conditions:
309 : @li `cond::canceled` - Operation was cancelled
310 : @li `std::errc::broken_pipe` - Peer closed connection
311 :
312 : @par Example
313 :
314 : @code
315 : write_now wn( stream );
316 : auto [ec, n] = co_await wn( make_buffer( body ) );
317 : if( ec )
318 : detail::throw_system_error( ec );
319 : @endcode
320 :
321 : @see write, write_some, WriteStream
322 : */
323 : // GCC falsely warns that the coroutine promise's
324 : // placement operator new(size_t, write_now&, auto&)
325 : // mismatches operator delete(void*, size_t). Per the
326 : // standard, coroutine deallocation lookup is separate.
327 : #if defined(__GNUC__) && !defined(__clang__)
328 : #pragma GCC diagnostic push
329 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
330 : #endif
331 :
332 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
333 : template<ConstBufferSequence Buffers>
334 : op_type
335 68 : operator()(Buffers buffers)
336 : {
337 : std::size_t const total_size = buffer_size(buffers);
338 : std::size_t total_written = 0;
339 : consuming_buffers cb(buffers);
340 : while(total_written < total_size)
341 : {
342 : auto r =
343 : co_await stream_.write_some(cb);
344 : if(r.ec)
345 : co_return io_result<std::size_t>{
346 : r.ec, total_written};
347 : cb.consume(r.t1);
348 : total_written += r.t1;
349 : }
350 : co_return io_result<std::size_t>{
351 : {}, total_written};
352 136 : }
353 : #else
354 : template<ConstBufferSequence Buffers>
355 : op_type
356 : operator()(Buffers buffers)
357 : {
358 : std::size_t const total_size = buffer_size(buffers);
359 : std::size_t total_written = 0;
360 :
361 : // GCC ICE in expand_expr_real_1 (expr.cc:11376)
362 : // when consuming_buffers spans a co_yield, so
363 : // the GCC path uses a separate simple coroutine.
364 : consuming_buffers cb(buffers);
365 : while(total_written < total_size)
366 : {
367 : auto inner = stream_.write_some(cb);
368 : if(!inner.await_ready())
369 : break;
370 : auto r = inner.await_resume();
371 : if(r.ec)
372 : co_return io_result<std::size_t>{
373 : r.ec, total_written};
374 : cb.consume(r.t1);
375 : total_written += r.t1;
376 : }
377 :
378 : if(total_written >= total_size)
379 : co_return io_result<std::size_t>{
380 : {}, total_written};
381 :
382 : co_yield 0;
383 :
384 : while(total_written < total_size)
385 : {
386 : auto r =
387 : co_await stream_.write_some(cb);
388 : if(r.ec)
389 : co_return io_result<std::size_t>{
390 : r.ec, total_written};
391 : cb.consume(r.t1);
392 : total_written += r.t1;
393 : }
394 : co_return io_result<std::size_t>{
395 : {}, total_written};
396 : }
397 : #endif
398 :
399 : #if defined(__GNUC__) && !defined(__clang__)
400 : #pragma GCC diagnostic pop
401 : #endif
402 : };
403 :
404 : template<WriteStream S>
405 : write_now(S&) -> write_now<S>;
406 :
407 : } // namespace capy
408 : } // namespace boost
409 :
410 : #endif
|