38template <
typename T,
size_t N = 2>
class Channel {
48 std::lock_guard<std::mutex> lock(mutex_);
53 CONDY_DELETE_COPY_MOVE(Channel);
63 requires std::is_same_v<std::remove_cvref_t<U>, T>
65 std::lock_guard<std::mutex> lock(mutex_);
69 if (try_push_inner_(std::forward<U>(item))) {
81 std::pair<int32_t, T>
try_pop() noexcept {
82 std::lock_guard<std::mutex> lock(mutex_);
83 auto item = try_pop_inner_();
84 if (item.has_value()) {
85 return {0, std::move(item.value())};
89 return {-EAGAIN, T()};
93 void force_push(T item)
noexcept {
94 std::lock_guard<std::mutex> lock(mutex_);
95 if (closed_) [[unlikely]] {
96 panic_on(
"Push to closed channel");
98 if (try_push_inner_(std::move(item))) [[likely]] {
105 new (std::nothrow) FakePushFinishHandle(std::move(item));
108 panic_on(
"Allocation failed for PushFinishHandle");
110 assert(pop_awaiters_.empty());
111 push_awaiters_.push_back(fake_handle);
114 class [[nodiscard]] MovePushSender;
127 MovePushSender
push(T &&item)
noexcept {
return {*
this, std::move(item)}; }
129 class [[nodiscard]] CopyPushSender;
137 CopyPushSender
push(
const T &item)
noexcept
138 requires std::copy_constructible<T>
140 return {*
this, item};
143 class [[nodiscard]] PopSender;
150 PopSender
pop() noexcept {
return {*
this}; }
155 size_t capacity() const noexcept {
return buffer_.capacity(); }
162 std::lock_guard<std::mutex> lock(mutex_);
163 return size_inner_();
171 std::lock_guard<std::mutex> lock(mutex_);
172 return empty_inner_();
180 std::lock_guard<std::mutex> lock(mutex_);
193 std::lock_guard<std::mutex> lock(mutex_);
198 class PushFinishHandleBase;
199 template <
typename Receiver>
class PushFinishHandle;
200 class FakePushFinishHandle;
202 class PopFinishHandleBase;
203 template <
typename Receiver>
class PopFinishHandle;
205 int32_t request_push_(PushFinishHandleBase *finish_handle)
noexcept {
206 std::lock_guard<std::mutex> lock(mutex_);
210 if (try_push_inner_(std::move(finish_handle->get_item()))) {
213 assert(pop_awaiters_.empty());
214 push_awaiters_.push_back(finish_handle);
218 bool cancel_push_(PushFinishHandleBase *finish_handle)
noexcept {
219 std::lock_guard<std::mutex> lock(mutex_);
220 return push_awaiters_.remove(finish_handle);
223 std::pair<int32_t, T>
224 request_pop_(PopFinishHandleBase *finish_handle)
noexcept {
225 std::lock_guard<std::mutex> lock(mutex_);
226 auto result = try_pop_inner_();
227 if (result.has_value()) {
228 return {0, std::move(result.value())};
230 assert(push_awaiters_.empty());
232 return {-EPIPE, T()};
234 pop_awaiters_.push_back(finish_handle);
235 return {-EAGAIN, T()};
238 bool cancel_pop_(PopFinishHandleBase *finish_handle)
noexcept {
239 std::lock_guard<std::mutex> lock(mutex_);
240 return pop_awaiters_.remove(finish_handle);
244 template <
typename U>
245 requires std::is_same_v<std::remove_cvref_t<U>, T>
246 bool try_push_inner_(U &&item)
noexcept {
247 if (!pop_awaiters_.empty()) {
248 assert(empty_inner_());
249 auto *pop_handle = pop_awaiters_.pop_front();
250 pop_handle->set_result({0, std::forward<U>(item)});
251 pop_handle->schedule();
254 if (!full_inner_()) {
255 push_inner_(std::forward<U>(item));
261 std::optional<T> try_pop_inner_() noexcept {
262 if (!push_awaiters_.empty()) {
263 assert(full_inner_());
264 auto *push_handle = push_awaiters_.pop_front();
265 T item = std::move(push_handle->get_item());
266 push_handle->set_result(0);
267 push_handle->schedule();
268 return pop_and_push_(std::move(item));
270 if (!empty_inner_()) {
271 T result = pop_inner_();
277 T pop_and_push_(T item)
noexcept {
281 T result = pop_inner_();
282 push_inner_(std::move(item));
287 template <
typename U>
288 requires std::is_same_v<std::remove_cvref_t<U>, T>
289 void push_inner_(U &&item)
noexcept {
290 assert(!full_inner_());
291 auto mask = buffer_.capacity() - 1;
292 buffer_[tail_ & mask].construct(std::forward<U>(item));
296 T pop_inner_() noexcept {
297 assert(!empty_inner_());
298 auto mask = buffer_.capacity() - 1;
299 T item = std::move(buffer_[head_ & mask].get());
300 buffer_[head_ & mask].destroy();
305 bool no_buffer_() const noexcept {
return buffer_.capacity() == 0; }
307 bool empty_inner_() const noexcept {
return size_inner_() == 0; }
309 bool full_inner_() const noexcept {
310 return size_inner_() == buffer_.capacity();
313 void push_close_inner_() noexcept {
319 PopFinishHandleBase *pop_handle =
nullptr;
320 while ((pop_handle = pop_awaiters_.pop_front()) !=
nullptr) {
321 assert(empty_inner_());
322 pop_handle->set_result({-EPIPE, T()});
323 pop_handle->schedule();
326 PushFinishHandleBase *push_handle =
nullptr;
327 while ((push_handle = push_awaiters_.pop_front()) !=
nullptr) {
328 assert(full_inner_());
329 push_handle->set_result(-EPIPE);
330 push_handle->schedule();
334 void destruct_all_() noexcept {
335 while (!empty_inner_()) {
338 assert(head_ == tail_);
341 size_t size_inner_() const noexcept {
return tail_ - head_; }
344 template <
typename Handle>
345 using HandleList = IntrusiveDoubleList<Handle, &Handle::link_entry_>;
347 mutable std::mutex mutex_;
348 HandleList<PushFinishHandleBase> push_awaiters_;
349 HandleList<PopFinishHandleBase> pop_awaiters_;
352 SmallArray<RawStorage<T>, N> buffer_;
353 bool closed_ =
false;
356template <
typename T,
size_t N>
357class Channel<T, N>::PushFinishHandleBase :
public WorkInvoker {
359 PushFinishHandleBase(T &item) : item_(item) {}
361 void schedule() noexcept {
362 if (runtime_ ==
nullptr) [[unlikely]] {
364 auto *this_fake =
static_cast<FakePushFinishHandle *
>(
this);
367 runtime_->schedule(
this);
371 T &get_item() noexcept {
return item_; }
373 void set_result(int32_t result)
noexcept { result_ = result; }
376 DoubleLinkEntry link_entry_;
379 Runtime *runtime_ =
nullptr;
381 int32_t result_ = -ENOTRECOVERABLE;
384template <
typename T,
size_t N>
385template <
typename Receiver>
386class Channel<T, N>::PushFinishHandle
387 :
public InvokerAdapter<PushFinishHandle<Receiver>, PushFinishHandleBase> {
390 InvokerAdapter<PushFinishHandle<Receiver>, PushFinishHandleBase>;
392 PushFinishHandle(
Channel &channel, T &item, Receiver receiver)
393 : Base(item), channel_(channel), receiver_(std::move(receiver)) {}
395 void start(Runtime *runtime)
noexcept {
396 this->runtime_ = runtime;
397 int32_t r = channel_.request_push_(
this);
399 std::move(receiver_)(r);
402 runtime->pend_work();
404 auto stop_token = receiver_.get_stop_token();
405 if (stop_token.stop_possible()) {
406 stop_callback_.emplace(std::move(stop_token), Cancellation{
this});
410 void invoke() noexcept {
411 stop_callback_.reset();
412 assert(this->runtime_ !=
nullptr);
413 this->runtime_->resume_work();
414 std::move(receiver_)(this->result_);
418 void cancel_() noexcept {
419 if (channel_.cancel_push_(
this)) {
421 assert(this->result_ == -ENOTRECOVERABLE);
422 this->result_ = -ECANCELED;
423 assert(this->runtime_ !=
nullptr);
424 this->runtime_->schedule(
this);
428 struct Cancellation {
429 PushFinishHandle *self;
430 void operator()() noexcept { self->cancel_(); }
433 using StopCallbackType =
434 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
439 std::optional<StopCallbackType> stop_callback_;
442template <
typename T,
size_t N>
443class Channel<T, N>::FakePushFinishHandle :
public PushFinishHandleBase {
445 FakePushFinishHandle(T &&item)
446 : PushFinishHandleBase(item_copy_), item_copy_(std::move(item)) {}
452template <
typename T,
size_t N>
453class Channel<T, N>::PopFinishHandleBase :
public WorkInvoker {
455 void schedule() noexcept {
456 assert(runtime_ !=
nullptr);
457 runtime_->schedule(
this);
460 void set_result(std::pair<int32_t, T> result)
noexcept {
461 result_ = std::move(result);
465 DoubleLinkEntry link_entry_;
468 Runtime *runtime_ =
nullptr;
470 std::pair<int32_t, T> result_ = {-ENOTRECOVERABLE, T()};
473template <
typename T,
size_t N>
474template <
typename Receiver>
475class Channel<T, N>::PopFinishHandle
476 :
public InvokerAdapter<PopFinishHandle<Receiver>, PopFinishHandleBase> {
478 PopFinishHandle(
Channel &channel, Receiver receiver)
479 : channel_(channel), receiver_(std::move(receiver)) {}
481 void start(Runtime *runtime)
noexcept {
482 this->runtime_ = runtime;
483 auto item = channel_.request_pop_(
this);
486 std::move(receiver_)(std::move(item));
489 runtime->pend_work();
491 auto stop_token = receiver_.get_stop_token();
492 if (stop_token.stop_possible()) {
493 stop_callback_.emplace(std::move(stop_token), Cancellation{
this});
497 void invoke() noexcept {
498 stop_callback_.reset();
499 assert(this->runtime_ !=
nullptr);
500 this->runtime_->resume_work();
501 std::move(receiver_)(std::move(this->result_));
505 void cancel_() noexcept {
506 if (channel_.cancel_pop_(
this)) {
508 assert(this->result_.first == -ENOTRECOVERABLE);
509 this->result_.first = -ECANCELED;
510 assert(this->runtime_ !=
nullptr);
511 this->runtime_->schedule(
this);
515 struct Cancellation {
516 PopFinishHandle *self;
517 void operator()() noexcept { self->cancel_(); }
520 using StopCallbackType =
521 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
526 std::optional<StopCallbackType> stop_callback_;
529template <
typename T,
size_t N>
class Channel<T, N>::MovePushSender {
531 using CondySender = void;
532 using ReturnType = int32_t;
534 MovePushSender(
Channel &channel, T &&item)
535 : channel_(channel), item_(std::move(item)) {}
537 template <
typename Receiver>
auto connect_impl(Receiver receiver)
noexcept {
538 return OperationState<Receiver>(channel_, std::move(item_),
539 std::move(receiver));
543 template <
typename Receiver>
545 :
public Channel<T, N>::template PushFinishHandle<Receiver> {
548 typename Channel<T, N>::template PushFinishHandle<Receiver>;
549 OperationState(Channel &channel, T &&item, Receiver receiver)
550 : Base(channel, item, std::move(receiver)) {}
552 void start(
unsigned int )
noexcept {
553 auto *runtime = detail::Context::current().runtime();
554 Base::start(runtime);
562template <
typename T,
size_t N>
class Channel<T, N>::CopyPushSender {
564 using CondySender = void;
565 using ReturnType = int32_t;
567 CopyPushSender(
Channel &channel,
const T &item)
568 : channel_(channel), item_(item) {}
570 template <
typename Receiver>
auto connect_impl(Receiver receiver)
noexcept {
571 return OperationState<Receiver>(channel_, item_, std::move(receiver));
575 template <
typename Receiver>
577 :
public Channel<T, N>::template PushFinishHandle<Receiver> {
580 typename Channel<T, N>::template PushFinishHandle<Receiver>;
581 OperationState(Channel &channel,
const T &item, Receiver receiver)
582 : Base(channel, item_copy_, std::move(receiver)), item_copy_(item) {
585 void start(
unsigned int )
noexcept {
586 auto *runtime = detail::Context::current().runtime();
587 Base::start(runtime);
598template <
typename T,
size_t N>
class Channel<T, N>::PopSender {
600 using CondySender = void;
601 using ReturnType = std::pair<int32_t, T>;
603 PopSender(
Channel &channel) : channel_(channel) {}
605 template <
typename Receiver>
auto connect_impl(Receiver receiver)
noexcept {
606 return OperationState<Receiver>(channel_, std::move(receiver));
610 template <
typename Receiver>
612 :
public Channel<T, N>::template PopFinishHandle<Receiver> {
614 using Base =
typename Channel<T, N>::template PopFinishHandle<Receiver>;
617 void start(
unsigned int )
noexcept {
618 auto *runtime = detail::Context::current().runtime();
619 Base::start(runtime);
Thread-safe bounded channel for communication and synchronization.
void push_close() noexcept
Close the channel.
MovePushSender push(T &&item) noexcept
Push an item into the channel, awaiting if necessary.
std::pair< int32_t, T > try_pop() noexcept
Try to pop an item from the channel.
bool empty() const noexcept
Check if the channel is empty.
CopyPushSender push(const T &item) noexcept
Push an item into the channel, awaiting if necessary.
PopSender pop() noexcept
Pop an item from the channel, awaiting if necessary.
size_t size() const noexcept
Get the current size of the channel.
bool is_closed() const noexcept
Check if the channel is closed.
size_t capacity() const noexcept
Get the capacity of the channel.
Channel(size_t capacity)
Construct a new Channel object.
int32_t try_push(U &&item) noexcept
Try to push an item into the channel.
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Runtime type for running the io_uring event loop.
Internal utility classes and functions used by Condy.