29template <
typename T>
class Futex {
35 Futex(std::atomic<T> &futex) : futex_(futex) {}
37 ~Futex() { notify_all_(-EIDRM); }
39 CONDY_DELETE_COPY_MOVE(
Futex);
42 struct [[nodiscard]] WaitSender;
53 WaitSender
wait(T old)
noexcept {
return {*
this, old}; }
60 WaitFinishHandleBase *handle =
nullptr;
62 std::lock_guard<std::mutex> lock(mutex_);
63 handle = wait_awaiters_.pop_front();
66 handle->set_result(0);
78 class WaitFinishHandleBase;
79 template <
typename Receiver>
class WaitFinishHandle;
81 bool cancel_wait_(WaitFinishHandleBase *handle)
noexcept {
82 std::lock_guard<std::mutex> lock(mutex_);
83 return wait_awaiters_.remove(handle);
86 int32_t request_wait_(WaitFinishHandleBase *handle, T old)
noexcept {
87 std::lock_guard<std::mutex> lock(mutex_);
88 auto val = futex_.load(std::memory_order_relaxed);
92 wait_awaiters_.push_back(handle);
97 void notify_all_(int32_t result)
noexcept {
100 std::lock_guard<std::mutex> lock(mutex_);
101 handles = std::move(wait_awaiters_);
103 while (
auto *handle = handles.pop_front()) {
104 handle->set_result(result);
110 using HandleList = IntrusiveDoubleList<WaitFinishHandleBase,
111 &WaitFinishHandleBase::link_entry_>;
113 mutable std::mutex mutex_;
114 HandleList wait_awaiters_;
115 std::atomic<T> &futex_;
119class Futex<T>::WaitFinishHandleBase :
public WorkInvoker {
121 void schedule() noexcept {
122 assert(runtime_ !=
nullptr);
123 runtime_->schedule(
this);
126 void set_result(int32_t result)
noexcept { result_ = result; }
129 DoubleLinkEntry link_entry_;
132 Runtime *runtime_ =
nullptr;
133 int32_t result_ = -ENOTRECOVERABLE;
137template <
typename Receiver>
138class Futex<T>::WaitFinishHandle
139 :
public InvokerAdapter<WaitFinishHandle<Receiver>, WaitFinishHandleBase> {
141 using Base = InvokerAdapter<WaitFinishHandle, WaitFinishHandleBase>;
143 WaitFinishHandle(
Futex &futex, Receiver receiver)
144 : futex_(futex), receiver_(std::move(receiver)) {}
146 void start(Runtime *runtime, T old)
noexcept {
147 this->runtime_ = runtime;
148 int32_t r = futex_.request_wait_(
this, old);
150 std::move(receiver_)(r);
153 runtime->pend_work();
155 auto stop_token = receiver_.get_stop_token();
156 if (stop_token.stop_possible()) {
157 stop_callback_.emplace(std::move(stop_token), Cancellation{
this});
161 void invoke() noexcept {
162 stop_callback_.reset();
163 assert(this->runtime_ !=
nullptr);
164 this->runtime_->resume_work();
165 std::move(receiver_)(this->result_);
169 void cancel_() noexcept {
170 if (futex_.cancel_wait_(
this)) {
172 this->result_ = -ECANCELED;
173 assert(this->runtime_ !=
nullptr);
174 this->runtime_->schedule(
this);
178 struct Cancellation {
179 WaitFinishHandle *self;
180 void operator()() noexcept { self->cancel_(); }
183 using StopCallbackType =
184 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
189 std::optional<StopCallbackType> stop_callback_;
192template <
typename T>
struct Futex<T>::WaitSender {
194 using CondySender = void;
195 using ReturnType = int32_t;
197 WaitSender(
Futex &futex, T old) : futex_(futex), old_(old) {}
199 template <
typename Receiver>
auto connect_impl(Receiver receiver)
noexcept {
200 return OperationState<Receiver>(futex_, old_, std::move(receiver));
204 template <
typename Receiver>
205 class OperationState :
public WaitFinishHandle<Receiver> {
207 using Base = WaitFinishHandle<Receiver>;
208 OperationState(Futex &futex, T old, Receiver receiver)
209 : Base(futex, std::move(receiver)), old_(old) {}
211 void start(
unsigned int )
noexcept {
212 auto *runtime = detail::Context::current().runtime();
213 Base::start(runtime, old_);
User-space "futex" implementation for efficient synchronization between coroutines.
WaitSender wait(T old) noexcept
Wait if the futex value equals to the specified old value. The awaiting coroutine will be suspended u...
Futex(std::atomic< T > &futex)
Construct a new Futex object.
void notify_all() noexcept
Notify all awaiting coroutines.
void notify_one() noexcept
Notify one awaiting coroutine, if any.
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Runtime type for running the io_uring event loop.
Internal utility classes and functions used by Condy.