Condy v1.7.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
sender_operations.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
8#include "condy/concepts.hpp"
9#include "condy/senders.hpp"
10#include "condy/utils.hpp"
11#include <coroutine>
12#include <stdexcept>
13
14namespace condy {
15
16template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc, typename... Args>
17auto build_op_sender(PrepFunc &&prep_func, Args &&...args) {
18 return OpSender<std::decay_t<PrepFunc>, CQEHandler>(
19 std::forward<PrepFunc>(prep_func),
20 CQEHandler(std::forward<Args>(args)...));
21}
22
23template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc,
24 typename MultiShotFunc, typename... Args>
25auto build_multishot_op_sender(PrepFunc &&func, MultiShotFunc &&multishot_func,
26 Args &&...handler_args) {
27 return MultiShotOpSender<std::decay_t<PrepFunc>, CQEHandler,
28 std::decay_t<MultiShotFunc>>(
29 std::forward<PrepFunc>(func),
30 CQEHandler(std::forward<Args>(handler_args)...),
31 std::forward<MultiShotFunc>(multishot_func));
32}
33
34template <CQEHandlerLike CQEHandler, PrepFuncLike PrepFunc, typename FreeFunc,
35 typename... Args>
36auto build_zero_copy_op_sender(PrepFunc &&func, FreeFunc &&free_func,
37 Args &&...handler_args) {
38 return ZeroCopyOpSender<std::decay_t<PrepFunc>, CQEHandler,
39 std::decay_t<FreeFunc>>(
40 std::forward<PrepFunc>(func),
41 CQEHandler(std::forward<Args>(handler_args)...),
42 std::forward<FreeFunc>(free_func));
43}
44
45namespace detail {
46
47struct NeverStopToken {
48public:
49 template <typename> struct callback_type {
50 constexpr explicit callback_type(NeverStopToken, auto &&) noexcept {}
51 };
52
53 static constexpr bool stop_requested() noexcept { return false; }
54
55 static constexpr bool stop_possible() noexcept { return false; }
56
57 constexpr bool operator==(NeverStopToken const &) const noexcept = default;
58};
59
60template <typename Sender> class [[nodiscard]] SenderAwaiter {
61public:
62 SenderAwaiter(Sender sender)
63 : operation_state_(std::move(sender).connect_impl(Receiver{this})) {}
64
65 CONDY_DELETE_COPY_MOVE(SenderAwaiter);
66
67public:
68 bool await_ready() const noexcept { return false; }
69
70 template <typename Promise>
71 bool await_suspend(std::coroutine_handle<Promise> handle) noexcept {
72 operation_state_.start(0);
73 auto h = std::exchange(handle_, handle);
74 return h == std::noop_coroutine();
75 }
76
77 auto await_resume() noexcept { return std::move(result_); }
78
79private:
80 struct Receiver {
81 SenderAwaiter *self;
82 template <typename R> void operator()(R &&result) noexcept {
83 self->handle_result_(std::forward<R>(result));
84 }
85 NeverStopToken get_stop_token() const noexcept { return {}; }
86 };
87
88 template <typename R> void handle_result_(R &&result) noexcept {
89 result_ = std::forward<R>(result);
90 auto h = std::exchange(handle_, nullptr);
91 h.resume();
92 }
93
94 using OperationState = operation_state_t<Sender, Receiver>;
95 // Await/complete path is serialized, so atomic is not needed here.
96 std::coroutine_handle<> handle_ = std::noop_coroutine();
97 OperationState operation_state_;
98 typename Sender::ReturnType result_;
99};
100
101template <typename Sender> auto as_awaiter(Sender &&sender) {
102 return detail::SenderAwaiter<std::decay_t<Sender>>(
103 std::forward<Sender>(sender));
104}
105
106} // namespace detail
107
113template <unsigned int Flags, typename Sender> auto flag(Sender &&sender) {
114 return FlaggedOpSender<Flags, std::decay_t<Sender>>(
115 std::forward<Sender>(sender));
116}
117
122template <typename Sender> auto drain(Sender &&sender) {
123 return flag<IOSQE_IO_DRAIN>(std::forward<Sender>(sender));
124}
125
130template <typename Sender> auto always_async(Sender &&sender) {
131 return flag<IOSQE_ASYNC>(std::forward<Sender>(sender));
132}
133
140template <template <typename... Senders> typename SenderType,
141 typename... Senders>
142auto parallel(Senders &&...senders) {
143 return SenderType<std::decay_t<Senders>...>(
144 std::forward<Senders>(senders)...);
145}
146
153template <template <typename Sender> typename RangedSenderType,
154 std::ranges::range Range>
155auto parallel(Range &&range) {
156 using SenderType = typename std::remove_cvref_t<Range>::value_type;
157 auto begin = std::make_move_iterator(std::begin(range));
158 auto end = std::make_move_iterator(std::end(range));
159 std::vector<SenderType> senders(begin, end);
160 return RangedSenderType<SenderType>(std::move(senders));
161}
162
168template <typename... Senders> auto when_all(Senders &&...senders) {
169 return parallel<WhenAllSender>(std::forward<Senders>(senders)...);
170}
171
177template <std::ranges::range Range> auto when_all(Range &&range) {
178 return parallel<RangedWhenAllSender>(std::forward<Range>(range));
179}
180
186template <typename... Senders> auto when_any(Senders &&...senders) {
187 static_assert(sizeof...(Senders) > 0,
188 "when_any requires at least one sender");
189 return parallel<WhenAnySender>(std::forward<Senders>(senders)...);
190}
191
197template <std::ranges::range Range> auto when_any(Range &&range) {
198 if (std::ranges::empty(range)) {
199 throw std::invalid_argument("when_any requires at least one sender");
200 }
201 return parallel<RangedWhenAnySender>(std::forward<Range>(range));
202}
203
209template <typename... Senders> auto link(Senders &&...senders) {
210 return parallel<LinkSender>(std::forward<Senders>(senders)...);
211}
212
218template <std::ranges::range Range> auto link(Range &&range) {
219 return parallel<RangedLinkSender>(std::forward<Range>(range));
220}
221
227template <typename... Senders> auto hard_link(Senders &&...senders) {
228 return parallel<HardLinkSender>(std::forward<Senders>(senders)...);
229}
230
236template <std::ranges::range Range> auto hard_link(Range &&range) {
237 return parallel<RangedHardLinkSender>(std::forward<Range>(range));
238}
239
243namespace operators {
244
248template <typename Sender1, typename Sender2>
249auto operator&&(Sender1 s1, Sender2 s2) {
250 return when_all(std::move(s1), std::move(s2));
251}
252
256template <typename S, typename... Ss>
257auto operator&&(WhenAllSender<Ss...> aws, S sender) {
258 return WhenAllSender<Ss..., std::decay_t<S>>(std::move(aws),
259 std::move(sender));
260}
261
265template <typename Sender1, typename Sender2>
266auto operator||(Sender1 s1, Sender2 s2) {
267 return when_any(std::move(s1), std::move(s2));
268}
269
273template <typename S, typename... Ss>
274auto operator||(WhenAnySender<Ss...> aws, S sender) {
275 return WhenAnySender<Ss..., std::decay_t<S>>(std::move(aws),
276 std::move(sender));
277}
278
282template <typename Sender1, typename Sender2>
283auto operator>>(Sender1 s1, Sender2 s2) {
284 return link(std::move(s1), std::move(s2));
285}
286
290template <typename S, typename... Ss>
291auto operator>>(LinkSender<Ss...> aws, S sender) {
292 return LinkSender<Ss..., std::decay_t<S>>(std::move(aws),
293 std::move(sender));
294}
295
296} // namespace operators
297
298} // namespace condy
Operators for composing operations.
auto operator&&(Sender1 s1, Sender2 s2)
Operator overloads version of condy::when_all.
auto operator>>(Sender1 s1, Sender2 s2)
Operator overloads version of condy::link.
auto operator||(Sender1 s1, Sender2 s2)
Operator overloads version of condy::when_any.
The main namespace for the Condy library.
Definition condy.hpp:31
auto drain(Sender &&sender)
Mark an operation as drain operation.
auto link(Senders &&...senders)
Compose multiple operations into a single operation that executes them in sequence.
auto parallel(Senders &&...senders)
Compose multiple operations into a single sender that executes them in parallel.
auto hard_link(Senders &&...senders)
Compose multiple operations into a single operation that executes them in sequence and continues even...
auto when_any(Senders &&...senders)
Compose multiple operations into a single operation that completes when any of them complete.
auto when_all(Senders &&...senders)
Compose multiple operations into a single operation that completes when all of them complete.
auto flag(Sender &&sender)
Decorates an operation with specific io_uring sqe flags.
auto always_async(Sender &&sender)
Mark an operation to always execute asynchronously.
Sender types for composing asynchronous operations.
Internal utility classes and functions used by Condy.