Condy v1.7.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
channel.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/context.hpp"
11#include "condy/intrusive.hpp"
12#include "condy/invoker.hpp"
13#include "condy/runtime.hpp"
14#include "condy/type_traits.hpp"
15#include "condy/utils.hpp"
16#include <bit>
17#include <cerrno>
18#include <cstddef>
19#include <cstdint>
20#include <new>
21#include <optional>
22#include <type_traits>
23
24namespace condy {
25
38template <typename T, size_t N = 2> class Channel {
39public:
46 : buffer_(capacity ? std::bit_ceil(capacity) : 0) {}
47 ~Channel() {
48 std::lock_guard<std::mutex> lock(mutex_);
49 push_close_inner_();
50 destruct_all_();
51 }
52
53 CONDY_DELETE_COPY_MOVE(Channel);
54
55public:
62 template <typename U>
63 requires std::is_same_v<std::remove_cvref_t<U>, T>
64 int32_t try_push(U &&item) noexcept {
65 std::lock_guard<std::mutex> lock(mutex_);
66 if (closed_) {
67 return -EPIPE;
68 }
69 if (try_push_inner_(std::forward<U>(item))) {
70 return 0;
71 }
72 return -EAGAIN;
73 }
74
81 std::pair<int32_t, T> try_pop() noexcept {
82 std::lock_guard<std::mutex> lock(mutex_);
83 auto item = try_pop_inner_();
84 if (item.has_value()) {
85 return {0, std::move(item.value())};
86 } else if (closed_) {
87 return {-EPIPE, T()};
88 } else {
89 return {-EAGAIN, T()};
90 }
91 }
92
93 void force_push(T item) noexcept {
94 std::lock_guard<std::mutex> lock(mutex_);
95 if (closed_) [[unlikely]] {
96 panic_on("Push to closed channel");
97 }
98 if (try_push_inner_(std::move(item))) [[likely]] {
99 return;
100 }
101 // This is safe because if try_push_inner_ returns false, the item has
102 // not been moved into the channel.
103 // NOLINTBEGIN(bugprone-use-after-move)
104 auto *fake_handle =
105 new (std::nothrow) FakePushFinishHandle(std::move(item));
106 // NOLINTEND(bugprone-use-after-move)
107 if (!fake_handle) {
108 panic_on("Allocation failed for PushFinishHandle");
109 }
110 assert(pop_awaiters_.empty());
111 push_awaiters_.push_back(fake_handle);
112 }
113
114 class [[nodiscard]] MovePushSender;
127 MovePushSender push(T &&item) noexcept { return {*this, std::move(item)}; }
128
129 class [[nodiscard]] CopyPushSender;
137 CopyPushSender push(const T &item) noexcept
138 requires std::copy_constructible<T>
139 {
140 return {*this, item};
141 }
142
143 class [[nodiscard]] PopSender;
150 PopSender pop() noexcept { return {*this}; }
151
155 size_t capacity() const noexcept { return buffer_.capacity(); }
156
161 size_t size() const noexcept {
162 std::lock_guard<std::mutex> lock(mutex_);
163 return size_inner_();
164 }
165
170 bool empty() const noexcept {
171 std::lock_guard<std::mutex> lock(mutex_);
172 return empty_inner_();
173 }
174
179 bool is_closed() const noexcept {
180 std::lock_guard<std::mutex> lock(mutex_);
181 return closed_;
182 }
183
192 void push_close() noexcept {
193 std::lock_guard<std::mutex> lock(mutex_);
194 push_close_inner_();
195 }
196
197private:
198 class PushFinishHandleBase;
199 template <typename Receiver> class PushFinishHandle;
200 class FakePushFinishHandle;
201
202 class PopFinishHandleBase;
203 template <typename Receiver> class PopFinishHandle;
204
205 int32_t request_push_(PushFinishHandleBase *finish_handle) noexcept {
206 std::lock_guard<std::mutex> lock(mutex_);
207 if (closed_) {
208 return -EPIPE;
209 }
210 if (try_push_inner_(std::move(finish_handle->get_item()))) {
211 return 0;
212 }
213 assert(pop_awaiters_.empty());
214 push_awaiters_.push_back(finish_handle);
215 return -EAGAIN;
216 }
217
218 bool cancel_push_(PushFinishHandleBase *finish_handle) noexcept {
219 std::lock_guard<std::mutex> lock(mutex_);
220 return push_awaiters_.remove(finish_handle);
221 }
222
223 std::pair<int32_t, T>
224 request_pop_(PopFinishHandleBase *finish_handle) noexcept {
225 std::lock_guard<std::mutex> lock(mutex_);
226 auto result = try_pop_inner_();
227 if (result.has_value()) {
228 return {0, std::move(result.value())};
229 }
230 assert(push_awaiters_.empty());
231 if (closed_) {
232 return {-EPIPE, T()};
233 }
234 pop_awaiters_.push_back(finish_handle);
235 return {-EAGAIN, T()};
236 }
237
238 bool cancel_pop_(PopFinishHandleBase *finish_handle) noexcept {
239 std::lock_guard<std::mutex> lock(mutex_);
240 return pop_awaiters_.remove(finish_handle);
241 }
242
243private:
244 template <typename U>
245 requires std::is_same_v<std::remove_cvref_t<U>, T>
246 bool try_push_inner_(U &&item) noexcept {
247 if (!pop_awaiters_.empty()) {
248 assert(empty_inner_());
249 auto *pop_handle = pop_awaiters_.pop_front();
250 pop_handle->set_result({0, std::forward<U>(item)});
251 pop_handle->schedule();
252 return true;
253 }
254 if (!full_inner_()) {
255 push_inner_(std::forward<U>(item));
256 return true;
257 }
258 return false;
259 }
260
261 std::optional<T> try_pop_inner_() noexcept {
262 if (!push_awaiters_.empty()) {
263 assert(full_inner_());
264 auto *push_handle = push_awaiters_.pop_front();
265 T item = std::move(push_handle->get_item());
266 push_handle->set_result(0);
267 push_handle->schedule();
268 return pop_and_push_(std::move(item));
269 }
270 if (!empty_inner_()) {
271 T result = pop_inner_();
272 return result;
273 }
274 return std::nullopt;
275 }
276
277 T pop_and_push_(T item) noexcept {
278 if (no_buffer_()) {
279 return item;
280 } else {
281 T result = pop_inner_();
282 push_inner_(std::move(item));
283 return result;
284 }
285 }
286
287 template <typename U>
288 requires std::is_same_v<std::remove_cvref_t<U>, T>
289 void push_inner_(U &&item) noexcept {
290 assert(!full_inner_());
291 auto mask = buffer_.capacity() - 1;
292 buffer_[tail_ & mask].construct(std::forward<U>(item));
293 tail_++;
294 }
295
296 T pop_inner_() noexcept {
297 assert(!empty_inner_());
298 auto mask = buffer_.capacity() - 1;
299 T item = std::move(buffer_[head_ & mask].get());
300 buffer_[head_ & mask].destroy();
301 head_++;
302 return item;
303 }
304
305 bool no_buffer_() const noexcept { return buffer_.capacity() == 0; }
306
307 bool empty_inner_() const noexcept { return size_inner_() == 0; }
308
309 bool full_inner_() const noexcept {
310 return size_inner_() == buffer_.capacity();
311 }
312
313 void push_close_inner_() noexcept {
314 if (closed_) {
315 return;
316 }
317 closed_ = true;
318 // Cancel all pending pop awaiters
319 PopFinishHandleBase *pop_handle = nullptr;
320 while ((pop_handle = pop_awaiters_.pop_front()) != nullptr) {
321 assert(empty_inner_());
322 pop_handle->set_result({-EPIPE, T()});
323 pop_handle->schedule();
324 }
325 // Cancel all pending push awaiters
326 PushFinishHandleBase *push_handle = nullptr;
327 while ((push_handle = push_awaiters_.pop_front()) != nullptr) {
328 assert(full_inner_());
329 push_handle->set_result(-EPIPE);
330 push_handle->schedule();
331 }
332 }
333
334 void destruct_all_() noexcept {
335 while (!empty_inner_()) {
336 pop_inner_();
337 }
338 assert(head_ == tail_);
339 }
340
341 size_t size_inner_() const noexcept { return tail_ - head_; }
342
343private:
344 template <typename Handle>
345 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
346
347 mutable std::mutex mutex_;
348 HandleList<PushFinishHandleBase> push_awaiters_;
349 HandleList<PopFinishHandleBase> pop_awaiters_;
350 size_t head_ = 0;
351 size_t tail_ = 0;
352 SmallArray<RawStorage<T>, N> buffer_;
353 bool closed_ = false;
354};
355
356template <typename T, size_t N>
357class Channel<T, N>::PushFinishHandleBase : public WorkInvoker {
358public:
359 PushFinishHandleBase(T &item) : item_(item) {}
360
361 void schedule() noexcept {
362 if (runtime_ == nullptr) [[unlikely]] {
363 // Fake handle, no need to schedule
364 auto *this_fake = static_cast<FakePushFinishHandle *>(this);
365 delete this_fake;
366 } else {
367 runtime_->schedule(this);
368 }
369 }
370
371 T &get_item() noexcept { return item_; }
372
373 void set_result(int32_t result) noexcept { result_ = result; }
374
375public:
376 DoubleLinkEntry link_entry_;
377
378public:
379 Runtime *runtime_ = nullptr;
380 T &item_;
381 int32_t result_ = -ENOTRECOVERABLE; // Internal error if not set
382};
383
384template <typename T, size_t N>
385template <typename Receiver>
386class Channel<T, N>::PushFinishHandle
387 : public InvokerAdapter<PushFinishHandle<Receiver>, PushFinishHandleBase> {
388public:
389 using Base =
390 InvokerAdapter<PushFinishHandle<Receiver>, PushFinishHandleBase>;
391
392 PushFinishHandle(Channel &channel, T &item, Receiver receiver)
393 : Base(item), channel_(channel), receiver_(std::move(receiver)) {}
394
395 void start(Runtime *runtime) noexcept {
396 this->runtime_ = runtime;
397 int32_t r = channel_.request_push_(this);
398 if (r != -EAGAIN) {
399 std::move(receiver_)(r);
400 return;
401 }
402 runtime->pend_work();
403
404 auto stop_token = receiver_.get_stop_token();
405 if (stop_token.stop_possible()) {
406 stop_callback_.emplace(std::move(stop_token), Cancellation{this});
407 }
408 }
409
410 void invoke() noexcept {
411 stop_callback_.reset();
412 assert(this->runtime_ != nullptr);
413 this->runtime_->resume_work();
414 std::move(receiver_)(this->result_);
415 }
416
417private:
418 void cancel_() noexcept {
419 if (channel_.cancel_push_(this)) {
420 // Successfully canceled
421 assert(this->result_ == -ENOTRECOVERABLE);
422 this->result_ = -ECANCELED;
423 assert(this->runtime_ != nullptr);
424 this->runtime_->schedule(this);
425 }
426 }
427
428 struct Cancellation {
429 PushFinishHandle *self;
430 void operator()() noexcept { self->cancel_(); }
431 };
432
433 using StopCallbackType =
434 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
435
436private:
437 Channel &channel_;
438 Receiver receiver_;
439 std::optional<StopCallbackType> stop_callback_;
440};
441
442template <typename T, size_t N>
443class Channel<T, N>::FakePushFinishHandle : public PushFinishHandleBase {
444public:
445 FakePushFinishHandle(T &&item)
446 : PushFinishHandleBase(item_copy_), item_copy_(std::move(item)) {}
447
448private:
449 T item_copy_;
450};
451
452template <typename T, size_t N>
453class Channel<T, N>::PopFinishHandleBase : public WorkInvoker {
454public:
455 void schedule() noexcept {
456 assert(runtime_ != nullptr);
457 runtime_->schedule(this);
458 }
459
460 void set_result(std::pair<int32_t, T> result) noexcept {
461 result_ = std::move(result);
462 }
463
464public:
465 DoubleLinkEntry link_entry_;
466
467protected:
468 Runtime *runtime_ = nullptr;
469 // Internal error if not set
470 std::pair<int32_t, T> result_ = {-ENOTRECOVERABLE, T()};
471};
472
473template <typename T, size_t N>
474template <typename Receiver>
475class Channel<T, N>::PopFinishHandle
476 : public InvokerAdapter<PopFinishHandle<Receiver>, PopFinishHandleBase> {
477public:
478 PopFinishHandle(Channel &channel, Receiver receiver)
479 : channel_(channel), receiver_(std::move(receiver)) {}
480
481 void start(Runtime *runtime) noexcept {
482 this->runtime_ = runtime;
483 auto item = channel_.request_pop_(this);
484 auto r = item.first;
485 if (r != -EAGAIN) {
486 std::move(receiver_)(std::move(item));
487 return;
488 }
489 runtime->pend_work();
490
491 auto stop_token = receiver_.get_stop_token();
492 if (stop_token.stop_possible()) {
493 stop_callback_.emplace(std::move(stop_token), Cancellation{this});
494 }
495 }
496
497 void invoke() noexcept {
498 stop_callback_.reset();
499 assert(this->runtime_ != nullptr);
500 this->runtime_->resume_work();
501 std::move(receiver_)(std::move(this->result_));
502 }
503
504private:
505 void cancel_() noexcept {
506 if (channel_.cancel_pop_(this)) {
507 // Successfully canceled
508 assert(this->result_.first == -ENOTRECOVERABLE);
509 this->result_.first = -ECANCELED;
510 assert(this->runtime_ != nullptr);
511 this->runtime_->schedule(this);
512 }
513 }
514
515 struct Cancellation {
516 PopFinishHandle *self;
517 void operator()() noexcept { self->cancel_(); }
518 };
519
520 using StopCallbackType =
521 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
522
523private:
524 Channel &channel_;
525 Receiver receiver_;
526 std::optional<StopCallbackType> stop_callback_;
527};
528
529template <typename T, size_t N> class Channel<T, N>::MovePushSender {
530public:
531 using CondySender = void;
532 using ReturnType = int32_t;
533
534 MovePushSender(Channel &channel, T &&item)
535 : channel_(channel), item_(std::move(item)) {}
536
537 template <typename Receiver> auto connect_impl(Receiver receiver) noexcept {
538 return OperationState<Receiver>(channel_, std::move(item_),
539 std::move(receiver));
540 }
541
542private:
543 template <typename Receiver>
544 class OperationState
545 : public Channel<T, N>::template PushFinishHandle<Receiver> {
546 public:
547 using Base =
548 typename Channel<T, N>::template PushFinishHandle<Receiver>;
549 OperationState(Channel &channel, T &&item, Receiver receiver)
550 : Base(channel, item, std::move(receiver)) {}
551
552 void start(unsigned int /*flags*/) noexcept {
553 auto *runtime = detail::Context::current().runtime();
554 Base::start(runtime);
555 }
556 };
557
558 Channel &channel_;
559 T &&item_;
560};
561
562template <typename T, size_t N> class Channel<T, N>::CopyPushSender {
563public:
564 using CondySender = void;
565 using ReturnType = int32_t;
566
567 CopyPushSender(Channel &channel, const T &item)
568 : channel_(channel), item_(item) {}
569
570 template <typename Receiver> auto connect_impl(Receiver receiver) noexcept {
571 return OperationState<Receiver>(channel_, item_, std::move(receiver));
572 }
573
574private:
575 template <typename Receiver>
576 class OperationState
577 : public Channel<T, N>::template PushFinishHandle<Receiver> {
578 public:
579 using Base =
580 typename Channel<T, N>::template PushFinishHandle<Receiver>;
581 OperationState(Channel &channel, const T &item, Receiver receiver)
582 : Base(channel, item_copy_, std::move(receiver)), item_copy_(item) {
583 }
584
585 void start(unsigned int /*flags*/) noexcept {
586 auto *runtime = detail::Context::current().runtime();
587 Base::start(runtime);
588 }
589
590 private:
591 T item_copy_;
592 };
593
594 Channel &channel_;
595 const T &item_;
596};
597
598template <typename T, size_t N> class Channel<T, N>::PopSender {
599public:
600 using CondySender = void;
601 using ReturnType = std::pair<int32_t, T>;
602
603 PopSender(Channel &channel) : channel_(channel) {}
604
605 template <typename Receiver> auto connect_impl(Receiver receiver) noexcept {
606 return OperationState<Receiver>(channel_, std::move(receiver));
607 }
608
609private:
610 template <typename Receiver>
611 class OperationState
612 : public Channel<T, N>::template PopFinishHandle<Receiver> {
613 public:
614 using Base = typename Channel<T, N>::template PopFinishHandle<Receiver>;
615 using Base::Base;
616
617 void start(unsigned int /*flags*/) noexcept {
618 auto *runtime = detail::Context::current().runtime();
619 Base::start(runtime);
620 }
621 };
622
623 Channel &channel_;
624};
625
626} // namespace condy
Thread-safe bounded channel for communication and synchronization.
Definition channel.hpp:38
void push_close() noexcept
Close the channel.
Definition channel.hpp:192
MovePushSender push(T &&item) noexcept
Push an item into the channel, awaiting if necessary.
Definition channel.hpp:127
std::pair< int32_t, T > try_pop() noexcept
Try to pop an item from the channel.
Definition channel.hpp:81
bool empty() const noexcept
Check if the channel is empty.
Definition channel.hpp:170
CopyPushSender push(const T &item) noexcept
Push an item into the channel, awaiting if necessary.
Definition channel.hpp:137
PopSender pop() noexcept
Pop an item from the channel, awaiting if necessary.
Definition channel.hpp:150
size_t size() const noexcept
Get the current size of the channel.
Definition channel.hpp:161
bool is_closed() const noexcept
Check if the channel is closed.
Definition channel.hpp:179
size_t capacity() const noexcept
Get the capacity of the channel.
Definition channel.hpp:155
Channel(size_t capacity)
Construct a new Channel object.
Definition channel.hpp:45
int32_t try_push(U &&item) noexcept
Try to push an item into the channel.
Definition channel.hpp:64
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:31
Runtime type for running the io_uring event loop.
Internal utility classes and functions used by Condy.