1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
10  
#ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
11  
#define BOOST_CAPY_TEST_READ_STREAM_HPP
11  
#define BOOST_CAPY_TEST_READ_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <boost/capy/cond.hpp>
17  
#include <boost/capy/cond.hpp>
18  
#include <coroutine>
18  
#include <coroutine>
19  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/test/fuse.hpp>
21  
#include <boost/capy/test/fuse.hpp>
22  

22  

23  
#include <string>
23  
#include <string>
24  
#include <string_view>
24  
#include <string_view>
25  

25  

26  
namespace boost {
26  
namespace boost {
27  
namespace capy {
27  
namespace capy {
28  
namespace test {
28  
namespace test {
29  

29  

30  
/** A mock stream for testing read operations.
30  
/** A mock stream for testing read operations.
31  

31  

32  
    Use this to verify code that performs reads without needing
32  
    Use this to verify code that performs reads without needing
33  
    real I/O. Call @ref provide to supply data, then @ref read_some
33  
    real I/O. Call @ref provide to supply data, then @ref read_some
34  
    to consume it. The associated @ref fuse enables error injection
34  
    to consume it. The associated @ref fuse enables error injection
35  
    at controlled points. An optional `max_read_size` constructor
35  
    at controlled points. An optional `max_read_size` constructor
36  
    parameter limits bytes per read to simulate chunked delivery.
36  
    parameter limits bytes per read to simulate chunked delivery.
37  

37  

38  
    This class satisfies the @ref ReadStream concept.
38  
    This class satisfies the @ref ReadStream concept.
39  

39  

40  
    @par Thread Safety
40  
    @par Thread Safety
41  
    Not thread-safe.
41  
    Not thread-safe.
42  

42  

43  
    @par Example
43  
    @par Example
44  
    @code
44  
    @code
45  
    fuse f;
45  
    fuse f;
46  
    read_stream rs( f );
46  
    read_stream rs( f );
47  
    rs.provide( "Hello, " );
47  
    rs.provide( "Hello, " );
48  
    rs.provide( "World!" );
48  
    rs.provide( "World!" );
49  

49  

50  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
50  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
51  
        char buf[32];
51  
        char buf[32];
52  
        auto [ec, n] = co_await rs.read_some(
52  
        auto [ec, n] = co_await rs.read_some(
53  
            mutable_buffer( buf, sizeof( buf ) ) );
53  
            mutable_buffer( buf, sizeof( buf ) ) );
54  
        if( ec )
54  
        if( ec )
55  
            co_return;
55  
            co_return;
56  
        // buf contains "Hello, World!"
56  
        // buf contains "Hello, World!"
57  
    } );
57  
    } );
58  
    @endcode
58  
    @endcode
59  

59  

60  
    @see fuse, ReadStream
60  
    @see fuse, ReadStream
61  
*/
61  
*/
62  
class read_stream
62  
class read_stream
63  
{
63  
{
64  
    fuse f_;
64  
    fuse f_;
65  
    std::string data_;
65  
    std::string data_;
66  
    std::size_t pos_ = 0;
66  
    std::size_t pos_ = 0;
67  
    std::size_t max_read_size_;
67  
    std::size_t max_read_size_;
68  

68  

69  
public:
69  
public:
70  
    /** Construct a read stream.
70  
    /** Construct a read stream.
71  

71  

72  
        @param f The fuse used to inject errors during reads.
72  
        @param f The fuse used to inject errors during reads.
73  

73  

74  
        @param max_read_size Maximum bytes returned per read.
74  
        @param max_read_size Maximum bytes returned per read.
75  
        Use to simulate chunked network delivery.
75  
        Use to simulate chunked network delivery.
76  
    */
76  
    */
77  
    explicit read_stream(
77  
    explicit read_stream(
78  
        fuse f = {},
78  
        fuse f = {},
79  
        std::size_t max_read_size = std::size_t(-1)) noexcept
79  
        std::size_t max_read_size = std::size_t(-1)) noexcept
80  
        : f_(std::move(f))
80  
        : f_(std::move(f))
81  
        , max_read_size_(max_read_size)
81  
        , max_read_size_(max_read_size)
82  
    {
82  
    {
83  
    }
83  
    }
84  

84  

85  
    /** Append data to be returned by subsequent reads.
85  
    /** Append data to be returned by subsequent reads.
86  

86  

87  
        Multiple calls accumulate data that @ref read_some returns.
87  
        Multiple calls accumulate data that @ref read_some returns.
88  

88  

89  
        @param sv The data to append.
89  
        @param sv The data to append.
90  
    */
90  
    */
91  
    void
91  
    void
92  
    provide(std::string_view sv)
92  
    provide(std::string_view sv)
93  
    {
93  
    {
94  
        data_.append(sv);
94  
        data_.append(sv);
95  
    }
95  
    }
96  

96  

97  
    /// Clear all data and reset the read position.
97  
    /// Clear all data and reset the read position.
98  
    void
98  
    void
99  
    clear() noexcept
99  
    clear() noexcept
100  
    {
100  
    {
101  
        data_.clear();
101  
        data_.clear();
102  
        pos_ = 0;
102  
        pos_ = 0;
103  
    }
103  
    }
104  

104  

105  
    /// Return the number of bytes available for reading.
105  
    /// Return the number of bytes available for reading.
106  
    std::size_t
106  
    std::size_t
107  
    available() const noexcept
107  
    available() const noexcept
108  
    {
108  
    {
109  
        return data_.size() - pos_;
109  
        return data_.size() - pos_;
110  
    }
110  
    }
111  

111  

112  
    /** Asynchronously read data from the stream.
112  
    /** Asynchronously read data from the stream.
113  

113  

114  
        Transfers up to `buffer_size( buffers )` bytes from the internal
114  
        Transfers up to `buffer_size( buffers )` bytes from the internal
115  
        buffer to the provided mutable buffer sequence. If no data remains,
115  
        buffer to the provided mutable buffer sequence. If no data remains,
116  
        returns `error::eof`. Before every read, the attached @ref fuse is
116  
        returns `error::eof`. Before every read, the attached @ref fuse is
117  
        consulted to possibly inject an error for testing fault scenarios.
117  
        consulted to possibly inject an error for testing fault scenarios.
118  
        The returned `std::size_t` is the number of bytes transferred.
118  
        The returned `std::size_t` is the number of bytes transferred.
119  

119  

120  
        @par Effects
120  
        @par Effects
121  
        On success, advances the internal read position by the number of
121  
        On success, advances the internal read position by the number of
122  
        bytes copied. If an error is injected by the fuse, the read position
122  
        bytes copied. If an error is injected by the fuse, the read position
123  
        remains unchanged.
123  
        remains unchanged.
124  

124  

125  
        @par Exception Safety
125  
        @par Exception Safety
126  
        No-throw guarantee.
126  
        No-throw guarantee.
127  

127  

128  
        @param buffers The mutable buffer sequence to receive data.
128  
        @param buffers The mutable buffer sequence to receive data.
129  

129  

130  
        @return An awaitable yielding `(error_code,std::size_t)`.
130  
        @return An awaitable yielding `(error_code,std::size_t)`.
131  

131  

132  
        @see fuse
132  
        @see fuse
133  
    */
133  
    */
134  
    template<MutableBufferSequence MB>
134  
    template<MutableBufferSequence MB>
135  
    auto
135  
    auto
136  
    read_some(MB buffers)
136  
    read_some(MB buffers)
137  
    {
137  
    {
138  
        struct awaitable
138  
        struct awaitable
139  
        {
139  
        {
140  
            read_stream* self_;
140  
            read_stream* self_;
141  
            MB buffers_;
141  
            MB buffers_;
142  

142  

143  
            bool await_ready() const noexcept { return true; }
143  
            bool await_ready() const noexcept { return true; }
144  

144  

145  
            // This method is required to satisfy Capy's IoAwaitable concept,
145  
            // This method is required to satisfy Capy's IoAwaitable concept,
146  
            // but is never called because await_ready() returns true.
146  
            // but is never called because await_ready() returns true.
147  
            //
147  
            //
148  
            // Capy uses a two-layer awaitable system: the promise's
148  
            // Capy uses a two-layer awaitable system: the promise's
149  
            // await_transform wraps awaitables in a transform_awaiter whose
149  
            // await_transform wraps awaitables in a transform_awaiter whose
150  
            // standard await_suspend(coroutine_handle) calls this custom
150  
            // standard await_suspend(coroutine_handle) calls this custom
151  
            // 2-argument overload, passing the io_env from the coroutine's
151  
            // 2-argument overload, passing the io_env from the coroutine's
152  
            // context. For synchronous test awaitables like this one, the
152  
            // context. For synchronous test awaitables like this one, the
153  
            // coroutine never suspends, so this is not invoked. The signature
153  
            // coroutine never suspends, so this is not invoked. The signature
154  
            // exists to allow the same awaitable type to work with both
154  
            // exists to allow the same awaitable type to work with both
155  
            // synchronous (test) and asynchronous (real I/O) code.
155  
            // synchronous (test) and asynchronous (real I/O) code.
156  
            void await_suspend(
156  
            void await_suspend(
157  
                std::coroutine_handle<>,
157  
                std::coroutine_handle<>,
158  
                io_env const*) const noexcept
158  
                io_env const*) const noexcept
159  
            {
159  
            {
160  
            }
160  
            }
161  

161  

162  
            io_result<std::size_t>
162  
            io_result<std::size_t>
163  
            await_resume()
163  
            await_resume()
164  
            {
164  
            {
165  
                // Empty buffer is a no-op regardless of
165  
                // Empty buffer is a no-op regardless of
166  
                // stream state or fuse.
166  
                // stream state or fuse.
167  
                if(buffer_empty(buffers_))
167  
                if(buffer_empty(buffers_))
168  
                    return {{}, 0};
168  
                    return {{}, 0};
169  

169  

170  
                auto ec = self_->f_.maybe_fail();
170  
                auto ec = self_->f_.maybe_fail();
171  
                if(ec)
171  
                if(ec)
172  
                    return {ec, 0};
172  
                    return {ec, 0};
173  

173  

174  
                if(self_->pos_ >= self_->data_.size())
174  
                if(self_->pos_ >= self_->data_.size())
175  
                    return {error::eof, 0};
175  
                    return {error::eof, 0};
176  

176  

177  
                std::size_t avail = self_->data_.size() - self_->pos_;
177  
                std::size_t avail = self_->data_.size() - self_->pos_;
178  
                if(avail > self_->max_read_size_)
178  
                if(avail > self_->max_read_size_)
179  
                    avail = self_->max_read_size_;
179  
                    avail = self_->max_read_size_;
180  
                auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
180  
                auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
181  
                std::size_t const n = buffer_copy(buffers_, src);
181  
                std::size_t const n = buffer_copy(buffers_, src);
182  
                self_->pos_ += n;
182  
                self_->pos_ += n;
183  
                return {{}, n};
183  
                return {{}, n};
184  
            }
184  
            }
185  
        };
185  
        };
186  
        return awaitable{this, buffers};
186  
        return awaitable{this, buffers};
187  
    }
187  
    }
188  
};
188  
};
189  

189  

190  
} // test
190  
} // test
191  
} // capy
191  
} // capy
192  
} // boost
192  
} // boost
193  

193  

194  
#endif
194  
#endif