23template <
typename Handle, PrepFuncLike Func>
class OpSenderOperationState {
25 template <
typename... HandleArgs>
26 OpSenderOperationState(Func prep_func, HandleArgs &&...handle_args)
27 : prep_func_(std::move(prep_func)),
28 finish_handle_(std::forward<HandleArgs>(handle_args)...) {}
30 CONDY_DELETE_COPY_MOVE(OpSenderOperationState);
33 void start(
unsigned int flags)
noexcept {
34 auto &context = detail::Context::current();
35 auto &ring = context.runtime()->ring();
36 context.runtime()->pend_work();
37 io_uring_sqe *sqe = prep_func_(&ring);
38 assert(sqe &&
"prep_func must return a valid sqe");
39 io_uring_sqe_set_flags(sqe, sqe->flags | flags);
40 auto work = encode_work(&finish_handle_.get(), WorkType::Common);
41 io_uring_sqe_set_data64(sqe, work);
44 finish_handle_.get().maybe_set_cancel(context.runtime());
49 HandleBox<Handle> finish_handle_;
52template <
unsigned int Flags,
typename Sender,
typename Receiver>
55 FlaggedOpState(Sender sender, Receiver receiver)
56 : op_state_(sender.connect_impl(std::move(receiver))) {}
58 CONDY_DELETE_COPY_MOVE(FlaggedOpState);
60 void start(
unsigned int flags)
noexcept { op_state_.start(flags | Flags); }
63 using OperationState = operation_state_t<Sender, Receiver>;
64 OperationState op_state_;
67template <
typename TokenType>
class WhenAnyCanceller {
69 auto chain_token(TokenType token)
noexcept {
70 if (token.stop_possible()) {
71 stop_callback_.emplace(std::move(token), Cancellation{
this});
73 return stop_source_.get_token();
76 template <
typename R>
void operator()(
const R &)
noexcept {
77 stop_source_.request_stop();
80 void reset() noexcept { stop_callback_.reset(); }
84 WhenAnyCanceller *self;
85 void operator()() noexcept { self->cancel_(); }
87 void cancel_() noexcept { stop_source_.request_stop(); }
89 using StopCallbackType = stop_callback_t<TokenType, Cancellation>;
91 std::stop_source stop_source_;
92 std::optional<StopCallbackType> stop_callback_;
95template <
typename TokenType>
class WhenAllCanceller {
97 auto chain_token(TokenType token)
noexcept {
return token; }
99 template <
typename R>
void operator()(
const R &)
noexcept {}
101 void reset() noexcept {}
104template <
typename Receiver,
typename Canceller,
typename... Senders>
105class ParallelOperationState {
107 ParallelOperationState(std::tuple<Senders...> senders, Receiver receiver)
108 : receiver_(std::move(receiver)) {
109 auto next_token = canceller_.chain_token(receiver_.get_stop_token());
110 connect_senders_(senders, next_token);
113 CONDY_DELETE_COPY_MOVE(ParallelOperationState);
115 ~ParallelOperationState() {
116 std::apply([](
auto &&...states) { (states.destroy(), ...); },
120 void start(
unsigned int flags)
noexcept {
121 if constexpr (
sizeof...(Senders) == 0) {
122 std::move(receiver_)(
123 std::make_pair(std::move(order_), std::move(results_)));
126 [&](
auto &&...states) { (states.get().start(flags), ...); },
133 std::remove_cvref_t<decltype(std::declval<Canceller &>().chain_token(
134 std::declval<stop_token_t<Receiver>>()))>;
136 template <
size_t I = 0>
137 void connect_senders_(std::tuple<Senders...> &senders,
138 const TokenType &token)
noexcept {
139 if constexpr (I <
sizeof...(Senders)) {
140 std::get<I>(op_states_).accept([&] {
141 return std::move(std::get<I>(senders))
142 .connect_impl(ChildReceiver<I>{
this, token});
144 connect_senders_<I + 1>(senders, token);
148 template <
size_t I,
typename R>
void receive_(R &&result)
noexcept {
150 auto no = completed_count_++;
152 std::get<I>(results_) = std::forward<R>(result);
153 if (no + 1 ==
sizeof...(Senders)) {
155 std::move(receiver_)(
156 std::make_pair(std::move(order_), std::move(results_)));
160 template <
size_t I>
struct ChildReceiver {
161 ParallelOperationState *self;
162 TokenType stop_token;
163 template <
typename R>
void operator()(R &&result)
noexcept {
164 self->receive_<I>(std::forward<R>(result));
166 auto get_stop_token() const noexcept {
return stop_token; }
169 template <
typename T>
struct operation_state_traits;
170 template <
size_t... Is>
171 struct operation_state_traits<std::index_sequence<Is...>> {
172 using type = std::tuple<
173 RawStorage<operation_state_t<Senders, ChildReceiver<Is>>>...>;
175 using OperationStates =
typename operation_state_traits<
176 std::make_index_sequence<
sizeof...(Senders)>>::type;
179 OperationStates op_states_;
180 std::array<size_t,
sizeof...(Senders)> order_;
181 std::tuple<
typename Senders::ReturnType...> results_;
182 size_t completed_count_ = 0;
184 Canceller canceller_;
187template <
typename Receiver,
typename... Senders>
188using ParallelAnyOperationState =
189 ParallelOperationState<Receiver, WhenAnyCanceller<stop_token_t<Receiver>>,
192template <
typename Receiver,
typename... Senders>
193using ParallelAllOperationState =
194 ParallelOperationState<Receiver, WhenAllCanceller<stop_token_t<Receiver>>,
197template <
typename Receiver>
struct ReceiverAllWrapper {
199 ReceiverAllWrapper(Receiver receiver) : receiver(std::move(receiver)) {}
200 template <
typename R>
void operator()(R &&result)
noexcept {
201 auto &[order, results] = result;
202 std::move(receiver)(std::move(results));
204 auto get_stop_token() const noexcept {
return receiver.get_stop_token(); }
207template <
typename Receiver>
struct ReceiverAnyWrapper {
209 ReceiverAnyWrapper(Receiver receiver) : receiver(std::move(receiver)) {}
210 template <
typename R>
void operator()(R &&result)
noexcept {
211 auto &[order, results] = result;
212 size_t index = order[0];
213 std::move(receiver)(tuple_at(results, index));
215 auto get_stop_token() const noexcept {
return receiver.get_stop_token(); }
218template <
typename Receiver,
typename... Senders>
219using WhenAnyOperationState =
220 ParallelAnyOperationState<ReceiverAnyWrapper<Receiver>, Senders...>;
222template <
typename Receiver,
typename... Senders>
223using WhenAllOperationState =
224 ParallelAllOperationState<ReceiverAllWrapper<Receiver>, Senders...>;
226template <
typename Receiver,
unsigned int Flags,
typename... Senders>
227class LinkOperationState :
public WhenAllOperationState<Receiver, Senders...> {
229 using Base = WhenAllOperationState<Receiver, Senders...>;
232 void start(
unsigned int flags)
noexcept {
233 auto &ring = detail::Context::current().runtime()->ring();
234 ring.reserve_space(
sizeof...(Senders));
235 start_linked_operations_(flags);
239 template <
size_t I = 0>
240 void start_linked_operations_(
unsigned int flags)
noexcept {
241 if constexpr (I <
sizeof...(Senders)) {
242 auto &state = std::get<I>(Base::op_states_);
243 if constexpr (I <
sizeof...(Senders) - 1) {
244 state.get().start(flags | Flags);
246 state.get().start(flags);
248 start_linked_operations_<I + 1>(flags);
253template <
typename Receiver,
typename Canceller,
typename Sender>
254class RangedParallelOperationState {
256 RangedParallelOperationState(std::vector<Sender> senders, Receiver receiver)
257 : op_states_(senders.size()), order_(senders.size()),
258 results_(senders.size()), receiver_(std::move(receiver)) {
259 auto next_token = canceller_.chain_token(receiver_.get_stop_token());
260 for (
size_t i = 0; i < senders.size(); ++i) {
261 op_states_[i].accept([&] {
262 return std::move(senders[i])
263 .connect_impl(ChildReceiver{
this, i, next_token});
268 CONDY_DELETE_COPY_MOVE(RangedParallelOperationState);
270 ~RangedParallelOperationState() {
271 for (
auto &op_state : op_states_) {
276 void start(
unsigned int flags)
noexcept {
277 if (op_states_.empty()) {
278 std::move(receiver_)(
279 std::make_pair(std::move(order_), std::move(results_)));
281 for (
auto &op_state : op_states_) {
282 op_state.get().start(flags);
289 std::remove_cvref_t<decltype(std::declval<Canceller &>().chain_token(
290 std::declval<stop_token_t<Receiver>>()))>;
292 template <
typename R>
void receive_(
size_t index, R &&result)
noexcept {
294 size_t no = completed_count_++;
296 results_[index] = std::forward<R>(result);
297 if (no + 1 == op_states_.size()) {
298 std::move(receiver_)(
299 std::make_pair(std::move(order_), std::move(results_)));
303 struct ChildReceiver {
304 RangedParallelOperationState *self;
306 TokenType stop_token;
307 template <
typename R>
void operator()(R &&result)
noexcept {
308 self->receive_(index, std::forward<R>(result));
310 auto get_stop_token() const noexcept {
return stop_token; }
313 using OperationStates =
314 std::vector<RawStorage<operation_state_t<Sender, ChildReceiver>>>;
317 OperationStates op_states_;
318 std::vector<size_t> order_;
319 std::vector<typename Sender::ReturnType> results_;
320 size_t completed_count_ = 0;
322 Canceller canceller_;
325template <
typename Receiver,
typename Sender>
326using RangedParallelAllOperationState = RangedParallelOperationState<
327 Receiver, WhenAllCanceller<stop_token_t<Receiver>>, Sender>;
329template <
typename Receiver,
typename Sender>
330using RangedParallelAnyOperationState = RangedParallelOperationState<
331 Receiver, WhenAnyCanceller<stop_token_t<Receiver>>, Sender>;
333template <
typename Receiver>
334using ReceiverRangedAllWrapper = ReceiverAllWrapper<Receiver>;
336template <
typename Receiver>
struct ReceiverRangedAnyWrapper {
338 ReceiverRangedAnyWrapper(Receiver receiver)
339 : receiver(std::move(receiver)) {}
340 template <
typename R>
void operator()(R &&result)
noexcept {
341 auto &[order, results] = result;
342 size_t index = order[0];
343 std::move(receiver)(std::make_pair(index, std::move(results[index])));
345 auto get_stop_token() const noexcept {
return receiver.get_stop_token(); }
348template <
typename Receiver,
typename Sender>
349using RangedWhenAllOperationState =
350 RangedParallelAllOperationState<ReceiverRangedAllWrapper<Receiver>, Sender>;
352template <
typename Receiver,
typename Sender>
353using RangedWhenAnyOperationState =
354 RangedParallelAnyOperationState<ReceiverRangedAnyWrapper<Receiver>, Sender>;
356template <
typename Receiver,
unsigned int Flags,
typename Sender>
357class RangedLinkOperationState
358 :
public RangedWhenAllOperationState<Receiver, Sender> {
360 using Base = RangedWhenAllOperationState<Receiver, Sender>;
363 void start(
unsigned int flags)
noexcept {
364 auto &ring = detail::Context::current().runtime()->ring();
365 ring.reserve_space(Base::op_states_.size());
366 for (
size_t i = 0; i < Base::op_states_.size(); ++i) {
367 auto &op_state = Base::op_states_[i];
368 if (i < Base::op_states_.size() - 1) {
369 op_state.get().start(flags | Flags);
371 op_state.get().start(flags);
Definitions of finish handle types for asynchronous operations.
The main namespace for the Condy library.
Internal utility classes and functions used by Condy.