Condy v1.7.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
futex.hpp
Go to the documentation of this file.
1
6
7#pragma once
8
9#include "condy/intrusive.hpp"
10#include "condy/invoker.hpp"
11#include "condy/runtime.hpp"
12#include "condy/type_traits.hpp"
13#include "condy/utils.hpp"
14#include <atomic>
15#include <cerrno>
16#include <optional>
17
18namespace condy {
19
29template <typename T> class Futex {
30public:
35 Futex(std::atomic<T> &futex) : futex_(futex) {}
36
37 ~Futex() { notify_all_(-EIDRM); }
38
39 CONDY_DELETE_COPY_MOVE(Futex);
40
41public:
42 struct [[nodiscard]] WaitSender;
53 WaitSender wait(T old) noexcept { return {*this, old}; }
54
59 void notify_one() noexcept {
60 WaitFinishHandleBase *handle = nullptr;
61 {
62 std::lock_guard<std::mutex> lock(mutex_);
63 handle = wait_awaiters_.pop_front();
64 }
65 if (handle) {
66 handle->set_result(0);
67 handle->schedule();
68 }
69 }
70
75 void notify_all() noexcept { notify_all_(0); }
76
77private:
78 class WaitFinishHandleBase;
79 template <typename Receiver> class WaitFinishHandle;
80
81 bool cancel_wait_(WaitFinishHandleBase *handle) noexcept {
82 std::lock_guard<std::mutex> lock(mutex_);
83 return wait_awaiters_.remove(handle);
84 }
85
86 int32_t request_wait_(WaitFinishHandleBase *handle, T old) noexcept {
87 std::lock_guard<std::mutex> lock(mutex_);
88 auto val = futex_.load(std::memory_order_relaxed);
89 if (val != old) {
90 return 0; // No need to wait
91 }
92 wait_awaiters_.push_back(handle);
93 return -EAGAIN; // Need to wait
94 }
95
96private:
97 void notify_all_(int32_t result) noexcept {
98 HandleList handles;
99 {
100 std::lock_guard<std::mutex> lock(mutex_);
101 handles = std::move(wait_awaiters_);
102 }
103 while (auto *handle = handles.pop_front()) {
104 handle->set_result(result);
105 handle->schedule();
106 }
107 }
108
109private:
110 using HandleList = IntrusiveDoubleList<WaitFinishHandleBase,
111 &WaitFinishHandleBase::link_entry_>;
112
113 mutable std::mutex mutex_;
114 HandleList wait_awaiters_;
115 std::atomic<T> &futex_;
116};
117
118template <typename T>
119class Futex<T>::WaitFinishHandleBase : public WorkInvoker {
120public:
121 void schedule() noexcept {
122 assert(runtime_ != nullptr);
123 runtime_->schedule(this);
124 }
125
126 void set_result(int32_t result) noexcept { result_ = result; }
127
128public:
129 DoubleLinkEntry link_entry_;
130
131protected:
132 Runtime *runtime_ = nullptr;
133 int32_t result_ = -ENOTRECOVERABLE; // Internal error if not set
134};
135
136template <typename T>
137template <typename Receiver>
138class Futex<T>::WaitFinishHandle
139 : public InvokerAdapter<WaitFinishHandle<Receiver>, WaitFinishHandleBase> {
140public:
141 using Base = InvokerAdapter<WaitFinishHandle, WaitFinishHandleBase>;
142
143 WaitFinishHandle(Futex &futex, Receiver receiver)
144 : futex_(futex), receiver_(std::move(receiver)) {}
145
146 void start(Runtime *runtime, T old) noexcept {
147 this->runtime_ = runtime;
148 int32_t r = futex_.request_wait_(this, old);
149 if (r != -EAGAIN) {
150 std::move(receiver_)(r);
151 return;
152 }
153 runtime->pend_work();
154
155 auto stop_token = receiver_.get_stop_token();
156 if (stop_token.stop_possible()) {
157 stop_callback_.emplace(std::move(stop_token), Cancellation{this});
158 }
159 }
160
161 void invoke() noexcept {
162 stop_callback_.reset();
163 assert(this->runtime_ != nullptr);
164 this->runtime_->resume_work();
165 std::move(receiver_)(this->result_);
166 }
167
168private:
169 void cancel_() noexcept {
170 if (futex_.cancel_wait_(this)) {
171 // Successfully canceled
172 this->result_ = -ECANCELED;
173 assert(this->runtime_ != nullptr);
174 this->runtime_->schedule(this);
175 }
176 }
177
178 struct Cancellation {
179 WaitFinishHandle *self;
180 void operator()() noexcept { self->cancel_(); }
181 };
182
183 using StopCallbackType =
184 stop_callback_t<stop_token_t<Receiver>, Cancellation>;
185
186private:
187 Futex &futex_;
188 Receiver receiver_;
189 std::optional<StopCallbackType> stop_callback_;
190};
191
192template <typename T> struct Futex<T>::WaitSender {
193public:
194 using CondySender = void;
195 using ReturnType = int32_t;
196
197 WaitSender(Futex &futex, T old) : futex_(futex), old_(old) {}
198
199 template <typename Receiver> auto connect_impl(Receiver receiver) noexcept {
200 return OperationState<Receiver>(futex_, old_, std::move(receiver));
201 }
202
203private:
204 template <typename Receiver>
205 class OperationState : public WaitFinishHandle<Receiver> {
206 public:
207 using Base = WaitFinishHandle<Receiver>;
208 OperationState(Futex &futex, T old, Receiver receiver)
209 : Base(futex, std::move(receiver)), old_(old) {}
210
211 void start(unsigned int /*flags*/) noexcept {
212 auto *runtime = detail::Context::current().runtime();
213 Base::start(runtime, old_);
214 }
215
216 private:
217 T old_;
218 };
219
220private:
221 Futex &futex_;
222 T old_;
223};
224
225} // namespace condy
User-space "futex" implementation for efficient synchronization between coroutines.
Definition futex.hpp:29
WaitSender wait(T old) noexcept
Wait if the futex value equals to the specified old value. The awaiting coroutine will be suspended u...
Definition futex.hpp:53
Futex(std::atomic< T > &futex)
Construct a new Futex object.
Definition futex.hpp:35
void notify_all() noexcept
Notify all awaiting coroutines.
Definition futex.hpp:75
void notify_one() noexcept
Notify one awaiting coroutine, if any.
Definition futex.hpp:59
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:31
Runtime type for running the io_uring event loop.
Internal utility classes and functions used by Condy.