1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_PULL_FROM_HPP
10  
#ifndef BOOST_CAPY_IO_PULL_FROM_HPP
11  
#define BOOST_CAPY_IO_PULL_FROM_HPP
11  
#define BOOST_CAPY_IO_PULL_FROM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/cond.hpp>
15  
#include <boost/capy/cond.hpp>
16  
#include <boost/capy/concept/buffer_sink.hpp>
16  
#include <boost/capy/concept/buffer_sink.hpp>
17  
#include <boost/capy/concept/read_source.hpp>
17  
#include <boost/capy/concept/read_source.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
19  
#include <boost/capy/io_task.hpp>
19  
#include <boost/capy/io_task.hpp>
20  

20  

21  
#include <cstddef>
21  
#include <cstddef>
22  
#include <span>
22  
#include <span>
23  

23  

24  
namespace boost {
24  
namespace boost {
25  
namespace capy {
25  
namespace capy {
26  

26  

27  
/** Transfer data from a ReadSource to a BufferSink.
27  
/** Transfer data from a ReadSource to a BufferSink.
28  

28  

29  
    This function reads data from the source directly into the sink's
29  
    This function reads data from the source directly into the sink's
30  
    internal buffers using the callee-owns-buffers model. The sink
30  
    internal buffers using the callee-owns-buffers model. The sink
31  
    provides writable buffers via `prepare()`, the source reads into
31  
    provides writable buffers via `prepare()`, the source reads into
32  
    them, and the sink commits the data. When the source signals EOF,
32  
    them, and the sink commits the data. When the source signals EOF,
33  
    `commit_eof()` is called on the sink to finalize the transfer.
33  
    `commit_eof()` is called on the sink to finalize the transfer.
34  

34  

35  
    @tparam Src The source type, must satisfy @ref ReadSource.
35  
    @tparam Src The source type, must satisfy @ref ReadSource.
36  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
36  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
37  

37  

38  
    @param source The source to read data from.
38  
    @param source The source to read data from.
39  
    @param sink The sink to write data to.
39  
    @param sink The sink to write data to.
40  

40  

41  
    @return A task that yields `(std::error_code, std::size_t)`.
41  
    @return A task that yields `(std::error_code, std::size_t)`.
42  
        On success, `ec` is default-constructed (no error) and `n` is
42  
        On success, `ec` is default-constructed (no error) and `n` is
43  
        the total number of bytes transferred. On error, `ec` contains
43  
        the total number of bytes transferred. On error, `ec` contains
44  
        the error code and `n` is the total number of bytes transferred
44  
        the error code and `n` is the total number of bytes transferred
45  
        before the error.
45  
        before the error.
46  

46  

47  
    @par Example
47  
    @par Example
48  
    @code
48  
    @code
49  
    task<void> transfer_body(ReadSource auto& source, BufferSink auto& sink)
49  
    task<void> transfer_body(ReadSource auto& source, BufferSink auto& sink)
50  
    {
50  
    {
51  
        auto [ec, n] = co_await pull_from(source, sink);
51  
        auto [ec, n] = co_await pull_from(source, sink);
52  
        if (ec)
52  
        if (ec)
53  
        {
53  
        {
54  
            // Handle error
54  
            // Handle error
55  
        }
55  
        }
56  
        // n bytes were transferred
56  
        // n bytes were transferred
57  
    }
57  
    }
58  
    @endcode
58  
    @endcode
59  

59  

60  
    @see ReadSource, BufferSink, push_to
60  
    @see ReadSource, BufferSink, push_to
61  
*/
61  
*/
62  
template<ReadSource Src, BufferSink Sink>
62  
template<ReadSource Src, BufferSink Sink>
63  
io_task<std::size_t>
63  
io_task<std::size_t>
64  
pull_from(Src& source, Sink& sink)
64  
pull_from(Src& source, Sink& sink)
65  
{
65  
{
66  
    mutable_buffer dst_arr[detail::max_iovec_];
66  
    mutable_buffer dst_arr[detail::max_iovec_];
67  
    std::size_t total = 0;
67  
    std::size_t total = 0;
68  

68  

69  
    for(;;)
69  
    for(;;)
70  
    {
70  
    {
71  
        auto dst_bufs = sink.prepare(dst_arr);
71  
        auto dst_bufs = sink.prepare(dst_arr);
72  
        if(dst_bufs.empty())
72  
        if(dst_bufs.empty())
73  
        {
73  
        {
74  
            // No buffer space available; commit nothing to flush
74  
            // No buffer space available; commit nothing to flush
75  
            auto [flush_ec] = co_await sink.commit(0);
75  
            auto [flush_ec] = co_await sink.commit(0);
76  
            if(flush_ec)
76  
            if(flush_ec)
77  
                co_return {flush_ec, total};
77  
                co_return {flush_ec, total};
78  
            continue;
78  
            continue;
79  
        }
79  
        }
80  

80  

81  
        auto [ec, n] = co_await source.read(
81  
        auto [ec, n] = co_await source.read(
82  
            std::span<mutable_buffer const>(dst_bufs));
82  
            std::span<mutable_buffer const>(dst_bufs));
83  

83  

84  
        if(n > 0)
84  
        if(n > 0)
85  
        {
85  
        {
86  
            auto [commit_ec] = co_await sink.commit(n);
86  
            auto [commit_ec] = co_await sink.commit(n);
87  
            if(commit_ec)
87  
            if(commit_ec)
88  
                co_return {commit_ec, total};
88  
                co_return {commit_ec, total};
89  
            total += n;
89  
            total += n;
90  
        }
90  
        }
91  

91  

92  
        if(ec == cond::eof)
92  
        if(ec == cond::eof)
93  
        {
93  
        {
94  
            auto [eof_ec] = co_await sink.commit_eof(0);
94  
            auto [eof_ec] = co_await sink.commit_eof(0);
95  
            co_return {eof_ec, total};
95  
            co_return {eof_ec, total};
96  
        }
96  
        }
97  

97  

98  
        if(ec)
98  
        if(ec)
99  
            co_return {ec, total};
99  
            co_return {ec, total};
100  
    }
100  
    }
101  
}
101  
}
102  

102  

103  
/** Transfer data from a ReadStream to a BufferSink.
103  
/** Transfer data from a ReadStream to a BufferSink.
104  

104  

105  
    This function reads data from the stream directly into the sink's
105  
    This function reads data from the stream directly into the sink's
106  
    internal buffers using the callee-owns-buffers model. The sink
106  
    internal buffers using the callee-owns-buffers model. The sink
107  
    provides writable buffers via `prepare()`, the stream reads into
107  
    provides writable buffers via `prepare()`, the stream reads into
108  
    them using `read_some()`, and the sink commits the data. When the
108  
    them using `read_some()`, and the sink commits the data. When the
109  
    stream signals EOF, `commit_eof()` is called on the sink to
109  
    stream signals EOF, `commit_eof()` is called on the sink to
110  
    finalize the transfer.
110  
    finalize the transfer.
111  

111  

112  
    This overload handles partial reads from the stream, committing
112  
    This overload handles partial reads from the stream, committing
113  
    data incrementally as it arrives. It loops until EOF is encountered
113  
    data incrementally as it arrives. It loops until EOF is encountered
114  
    or an error occurs.
114  
    or an error occurs.
115  

115  

116  
    @tparam Src The source type, must satisfy @ref ReadStream.
116  
    @tparam Src The source type, must satisfy @ref ReadStream.
117  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
117  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
118  

118  

119  
    @param source The stream to read data from.
119  
    @param source The stream to read data from.
120  
    @param sink The sink to write data to.
120  
    @param sink The sink to write data to.
121  

121  

122  
    @return A task that yields `(std::error_code, std::size_t)`.
122  
    @return A task that yields `(std::error_code, std::size_t)`.
123  
        On success, `ec` is default-constructed (no error) and `n` is
123  
        On success, `ec` is default-constructed (no error) and `n` is
124  
        the total number of bytes transferred. On error, `ec` contains
124  
        the total number of bytes transferred. On error, `ec` contains
125  
        the error code and `n` is the total number of bytes transferred
125  
        the error code and `n` is the total number of bytes transferred
126  
        before the error.
126  
        before the error.
127  

127  

128  
    @par Example
128  
    @par Example
129  
    @code
129  
    @code
130  
    task<void> transfer_body(ReadStream auto& stream, BufferSink auto& sink)
130  
    task<void> transfer_body(ReadStream auto& stream, BufferSink auto& sink)
131  
    {
131  
    {
132  
        auto [ec, n] = co_await pull_from(stream, sink);
132  
        auto [ec, n] = co_await pull_from(stream, sink);
133  
        if (ec)
133  
        if (ec)
134  
        {
134  
        {
135  
            // Handle error
135  
            // Handle error
136  
        }
136  
        }
137  
        // n bytes were transferred
137  
        // n bytes were transferred
138  
    }
138  
    }
139  
    @endcode
139  
    @endcode
140  

140  

141  
    @see ReadStream, BufferSink, push_to
141  
    @see ReadStream, BufferSink, push_to
142  
*/
142  
*/
143  
template<ReadStream Src, BufferSink Sink>
143  
template<ReadStream Src, BufferSink Sink>
144  
    requires (!ReadSource<Src>)
144  
    requires (!ReadSource<Src>)
145  
io_task<std::size_t>
145  
io_task<std::size_t>
146  
pull_from(Src& source, Sink& sink)
146  
pull_from(Src& source, Sink& sink)
147  
{
147  
{
148  
    mutable_buffer dst_arr[detail::max_iovec_];
148  
    mutable_buffer dst_arr[detail::max_iovec_];
149  
    std::size_t total = 0;
149  
    std::size_t total = 0;
150  

150  

151  
    for(;;)
151  
    for(;;)
152  
    {
152  
    {
153  
        // Prepare destination buffers from the sink
153  
        // Prepare destination buffers from the sink
154  
        auto dst_bufs = sink.prepare(dst_arr);
154  
        auto dst_bufs = sink.prepare(dst_arr);
155  
        if(dst_bufs.empty())
155  
        if(dst_bufs.empty())
156  
        {
156  
        {
157  
            // No buffer space available; commit nothing to flush
157  
            // No buffer space available; commit nothing to flush
158  
            auto [flush_ec] = co_await sink.commit(0);
158  
            auto [flush_ec] = co_await sink.commit(0);
159  
            if(flush_ec)
159  
            if(flush_ec)
160  
                co_return {flush_ec, total};
160  
                co_return {flush_ec, total};
161  
            continue;
161  
            continue;
162  
        }
162  
        }
163  

163  

164  
        // Read data from the stream into the sink's buffers
164  
        // Read data from the stream into the sink's buffers
165  
        auto [ec, n] = co_await source.read_some(
165  
        auto [ec, n] = co_await source.read_some(
166  
            std::span<mutable_buffer const>(dst_bufs));
166  
            std::span<mutable_buffer const>(dst_bufs));
167  

167  

168  
        // Commit any data that was read
168  
        // Commit any data that was read
169  
        if(n > 0)
169  
        if(n > 0)
170  
        {
170  
        {
171  
            auto [commit_ec] = co_await sink.commit(n);
171  
            auto [commit_ec] = co_await sink.commit(n);
172  
            if(commit_ec)
172  
            if(commit_ec)
173  
                co_return {commit_ec, total};
173  
                co_return {commit_ec, total};
174  
            total += n;
174  
            total += n;
175  
        }
175  
        }
176  

176  

177  
        // Check for EOF condition
177  
        // Check for EOF condition
178  
        if(ec == cond::eof)
178  
        if(ec == cond::eof)
179  
        {
179  
        {
180  
            auto [eof_ec] = co_await sink.commit_eof(0);
180  
            auto [eof_ec] = co_await sink.commit_eof(0);
181  
            co_return {eof_ec, total};
181  
            co_return {eof_ec, total};
182  
        }
182  
        }
183  

183  

184  
        // Check for other errors
184  
        // Check for other errors
185  
        if(ec)
185  
        if(ec)
186  
            co_return {ec, total};
186  
            co_return {ec, total};
187  
    }
187  
    }
188  
}
188  
}
189  

189  

190  
} // namespace capy
190  
} // namespace capy
191  
} // namespace boost
191  
} // namespace boost
192  

192  

193  
#endif
193  
#endif