Condy v1.7.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
op_states.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
8#include "condy/concepts.hpp"
11#include "condy/type_traits.hpp"
12#include "condy/utils.hpp"
13#include <array>
14#include <cstddef>
15#include <optional>
16#include <stop_token>
17#include <tuple>
18#include <vector>
19
20namespace condy {
21namespace detail {
22
23template <typename Handle, PrepFuncLike Func> class OpSenderOperationState {
24public:
25 template <typename... HandleArgs>
26 OpSenderOperationState(Func prep_func, HandleArgs &&...handle_args)
27 : prep_func_(std::move(prep_func)),
28 finish_handle_(std::forward<HandleArgs>(handle_args)...) {}
29
30 CONDY_DELETE_COPY_MOVE(OpSenderOperationState);
31
32public:
33 void start(unsigned int flags) noexcept {
34 auto &context = detail::Context::current();
35 auto &ring = context.runtime()->ring();
36 context.runtime()->pend_work();
37 io_uring_sqe *sqe = prep_func_(&ring);
38 assert(sqe && "prep_func must return a valid sqe");
39 io_uring_sqe_set_flags(sqe, sqe->flags | flags);
40 auto work = encode_work(&finish_handle_.get(), WorkType::Common);
41 io_uring_sqe_set_data64(sqe, work);
42 ring.maybe_submit();
43
44 finish_handle_.get().maybe_set_cancel(context.runtime());
45 }
46
47private:
48 Func prep_func_;
49 HandleBox<Handle> finish_handle_;
50};
51
52template <unsigned int Flags, typename Sender, typename Receiver>
53class FlaggedOpState {
54public:
55 FlaggedOpState(Sender sender, Receiver receiver)
56 : op_state_(sender.connect_impl(std::move(receiver))) {}
57
58 CONDY_DELETE_COPY_MOVE(FlaggedOpState);
59
60 void start(unsigned int flags) noexcept { op_state_.start(flags | Flags); }
61
62private:
63 using OperationState = operation_state_t<Sender, Receiver>;
64 OperationState op_state_;
65};
66
67template <typename TokenType> class WhenAnyCanceller {
68public:
69 auto chain_token(TokenType token) noexcept {
70 if (token.stop_possible()) {
71 stop_callback_.emplace(std::move(token), Cancellation{this});
72 }
73 return stop_source_.get_token();
74 }
75
76 template <typename R> void operator()(const R &) noexcept {
77 stop_source_.request_stop();
78 }
79
80 void reset() noexcept { stop_callback_.reset(); }
81
82private:
83 struct Cancellation {
84 WhenAnyCanceller *self;
85 void operator()() noexcept { self->cancel_(); }
86 };
87 void cancel_() noexcept { stop_source_.request_stop(); }
88
89 using StopCallbackType = stop_callback_t<TokenType, Cancellation>;
90
91 std::stop_source stop_source_;
92 std::optional<StopCallbackType> stop_callback_;
93};
94
95template <typename TokenType> class WhenAllCanceller {
96public:
97 auto chain_token(TokenType token) noexcept { return token; }
98
99 template <typename R> void operator()(const R &) noexcept {}
100
101 void reset() noexcept {}
102};
103
104template <typename Receiver, typename Canceller, typename... Senders>
105class ParallelOperationState {
106public:
107 ParallelOperationState(std::tuple<Senders...> senders, Receiver receiver)
108 : receiver_(std::move(receiver)) {
109 auto next_token = canceller_.chain_token(receiver_.get_stop_token());
110 connect_senders_(senders, next_token);
111 }
112
113 CONDY_DELETE_COPY_MOVE(ParallelOperationState);
114
115 ~ParallelOperationState() {
116 std::apply([](auto &&...states) { (states.destroy(), ...); },
117 op_states_);
118 }
119
120 void start(unsigned int flags) noexcept {
121 if constexpr (sizeof...(Senders) == 0) {
122 std::move(receiver_)(
123 std::make_pair(std::move(order_), std::move(results_)));
124 } else {
125 std::apply(
126 [&](auto &&...states) { (states.get().start(flags), ...); },
127 op_states_);
128 }
129 }
130
131private:
132 using TokenType =
133 std::remove_cvref_t<decltype(std::declval<Canceller &>().chain_token(
134 std::declval<stop_token_t<Receiver>>()))>;
135
136 template <size_t I = 0>
137 void connect_senders_(std::tuple<Senders...> &senders,
138 const TokenType &token) noexcept {
139 if constexpr (I < sizeof...(Senders)) {
140 std::get<I>(op_states_).accept([&] {
141 return std::move(std::get<I>(senders))
142 .connect_impl(ChildReceiver<I>{this, token});
143 });
144 connect_senders_<I + 1>(senders, token);
145 }
146 }
147
148 template <size_t I, typename R> void receive_(R &&result) noexcept {
149 canceller_(result);
150 auto no = completed_count_++;
151 order_[no] = I;
152 std::get<I>(results_) = std::forward<R>(result);
153 if (no + 1 == sizeof...(Senders)) {
154 canceller_.reset();
155 std::move(receiver_)(
156 std::make_pair(std::move(order_), std::move(results_)));
157 }
158 }
159
160 template <size_t I> struct ChildReceiver {
161 ParallelOperationState *self;
162 TokenType stop_token;
163 template <typename R> void operator()(R &&result) noexcept {
164 self->receive_<I>(std::forward<R>(result));
165 }
166 auto get_stop_token() const noexcept { return stop_token; }
167 };
168
169 template <typename T> struct operation_state_traits;
170 template <size_t... Is>
171 struct operation_state_traits<std::index_sequence<Is...>> {
172 using type = std::tuple<
173 RawStorage<operation_state_t<Senders, ChildReceiver<Is>>>...>;
174 };
175 using OperationStates = typename operation_state_traits<
176 std::make_index_sequence<sizeof...(Senders)>>::type;
177
178protected:
179 OperationStates op_states_;
180 std::array<size_t, sizeof...(Senders)> order_;
181 std::tuple<typename Senders::ReturnType...> results_;
182 size_t completed_count_ = 0;
183 Receiver receiver_;
184 Canceller canceller_;
185};
186
187template <typename Receiver, typename... Senders>
188using ParallelAnyOperationState =
189 ParallelOperationState<Receiver, WhenAnyCanceller<stop_token_t<Receiver>>,
190 Senders...>;
191
192template <typename Receiver, typename... Senders>
193using ParallelAllOperationState =
194 ParallelOperationState<Receiver, WhenAllCanceller<stop_token_t<Receiver>>,
195 Senders...>;
196
197template <typename Receiver> struct ReceiverAllWrapper {
198 Receiver receiver;
199 ReceiverAllWrapper(Receiver receiver) : receiver(std::move(receiver)) {}
200 template <typename R> void operator()(R &&result) noexcept {
201 auto &[order, results] = result;
202 std::move(receiver)(std::move(results));
203 }
204 auto get_stop_token() const noexcept { return receiver.get_stop_token(); }
205};
206
207template <typename Receiver> struct ReceiverAnyWrapper {
208 Receiver receiver;
209 ReceiverAnyWrapper(Receiver receiver) : receiver(std::move(receiver)) {}
210 template <typename R> void operator()(R &&result) noexcept {
211 auto &[order, results] = result;
212 size_t index = order[0];
213 std::move(receiver)(tuple_at(results, index));
214 }
215 auto get_stop_token() const noexcept { return receiver.get_stop_token(); }
216};
217
218template <typename Receiver, typename... Senders>
219using WhenAnyOperationState =
220 ParallelAnyOperationState<ReceiverAnyWrapper<Receiver>, Senders...>;
221
222template <typename Receiver, typename... Senders>
223using WhenAllOperationState =
224 ParallelAllOperationState<ReceiverAllWrapper<Receiver>, Senders...>;
225
226template <typename Receiver, unsigned int Flags, typename... Senders>
227class LinkOperationState : public WhenAllOperationState<Receiver, Senders...> {
228public:
229 using Base = WhenAllOperationState<Receiver, Senders...>;
230 using Base::Base;
231
232 void start(unsigned int flags) noexcept {
233 auto &ring = detail::Context::current().runtime()->ring();
234 ring.reserve_space(sizeof...(Senders));
235 start_linked_operations_(flags);
236 }
237
238private:
239 template <size_t I = 0>
240 void start_linked_operations_(unsigned int flags) noexcept {
241 if constexpr (I < sizeof...(Senders)) {
242 auto &state = std::get<I>(Base::op_states_);
243 if constexpr (I < sizeof...(Senders) - 1) {
244 state.get().start(flags | Flags);
245 } else {
246 state.get().start(flags);
247 }
248 start_linked_operations_<I + 1>(flags);
249 }
250 }
251};
252
253template <typename Receiver, typename Canceller, typename Sender>
254class RangedParallelOperationState {
255public:
256 RangedParallelOperationState(std::vector<Sender> senders, Receiver receiver)
257 : op_states_(senders.size()), order_(senders.size()),
258 results_(senders.size()), receiver_(std::move(receiver)) {
259 auto next_token = canceller_.chain_token(receiver_.get_stop_token());
260 for (size_t i = 0; i < senders.size(); ++i) {
261 op_states_[i].accept([&] {
262 return std::move(senders[i])
263 .connect_impl(ChildReceiver{this, i, next_token});
264 });
265 }
266 }
267
268 CONDY_DELETE_COPY_MOVE(RangedParallelOperationState);
269
270 ~RangedParallelOperationState() {
271 for (auto &op_state : op_states_) {
272 op_state.destroy();
273 }
274 }
275
276 void start(unsigned int flags) noexcept {
277 if (op_states_.empty()) {
278 std::move(receiver_)(
279 std::make_pair(std::move(order_), std::move(results_)));
280 } else {
281 for (auto &op_state : op_states_) {
282 op_state.get().start(flags);
283 }
284 }
285 }
286
287private:
288 using TokenType =
289 std::remove_cvref_t<decltype(std::declval<Canceller &>().chain_token(
290 std::declval<stop_token_t<Receiver>>()))>;
291
292 template <typename R> void receive_(size_t index, R &&result) noexcept {
293 canceller_(result);
294 size_t no = completed_count_++;
295 order_[no] = index;
296 results_[index] = std::forward<R>(result);
297 if (no + 1 == op_states_.size()) {
298 std::move(receiver_)(
299 std::make_pair(std::move(order_), std::move(results_)));
300 }
301 }
302
303 struct ChildReceiver {
304 RangedParallelOperationState *self;
305 size_t index;
306 TokenType stop_token;
307 template <typename R> void operator()(R &&result) noexcept {
308 self->receive_(index, std::forward<R>(result));
309 }
310 auto get_stop_token() const noexcept { return stop_token; }
311 };
312
313 using OperationStates =
314 std::vector<RawStorage<operation_state_t<Sender, ChildReceiver>>>;
315
316protected:
317 OperationStates op_states_;
318 std::vector<size_t> order_;
319 std::vector<typename Sender::ReturnType> results_;
320 size_t completed_count_ = 0;
321 Receiver receiver_;
322 Canceller canceller_;
323};
324
325template <typename Receiver, typename Sender>
326using RangedParallelAllOperationState = RangedParallelOperationState<
327 Receiver, WhenAllCanceller<stop_token_t<Receiver>>, Sender>;
328
329template <typename Receiver, typename Sender>
330using RangedParallelAnyOperationState = RangedParallelOperationState<
331 Receiver, WhenAnyCanceller<stop_token_t<Receiver>>, Sender>;
332
333template <typename Receiver>
334using ReceiverRangedAllWrapper = ReceiverAllWrapper<Receiver>;
335
336template <typename Receiver> struct ReceiverRangedAnyWrapper {
337 Receiver receiver;
338 ReceiverRangedAnyWrapper(Receiver receiver)
339 : receiver(std::move(receiver)) {}
340 template <typename R> void operator()(R &&result) noexcept {
341 auto &[order, results] = result;
342 size_t index = order[0];
343 std::move(receiver)(std::make_pair(index, std::move(results[index])));
344 }
345 auto get_stop_token() const noexcept { return receiver.get_stop_token(); }
346};
347
348template <typename Receiver, typename Sender>
349using RangedWhenAllOperationState =
350 RangedParallelAllOperationState<ReceiverRangedAllWrapper<Receiver>, Sender>;
351
352template <typename Receiver, typename Sender>
353using RangedWhenAnyOperationState =
354 RangedParallelAnyOperationState<ReceiverRangedAnyWrapper<Receiver>, Sender>;
355
356template <typename Receiver, unsigned int Flags, typename Sender>
357class RangedLinkOperationState
358 : public RangedWhenAllOperationState<Receiver, Sender> {
359public:
360 using Base = RangedWhenAllOperationState<Receiver, Sender>;
361 using Base::Base;
362
363 void start(unsigned int flags) noexcept {
364 auto &ring = detail::Context::current().runtime()->ring();
365 ring.reserve_space(Base::op_states_.size());
366 for (size_t i = 0; i < Base::op_states_.size(); ++i) {
367 auto &op_state = Base::op_states_[i];
368 if (i < Base::op_states_.size() - 1) {
369 op_state.get().start(flags | Flags);
370 } else {
371 op_state.get().start(flags);
372 }
373 }
374 }
375};
376
377} // namespace detail
378
379} // namespace condy
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
Definition condy.hpp:31
Internal utility classes and functions used by Condy.