Condy v1.7.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
coro.inl
Go to the documentation of this file.
1
5
6#pragma once
7
8#include "condy/coro.hpp"
9#include "condy/invoker.hpp"
11#include "condy/utils.hpp"
12#include <atomic>
13#include <coroutine>
14#include <exception>
15#include <new>
16#include <optional>
17
18namespace condy {
19
20template <typename...> struct always_false {
21 static constexpr bool value = false;
22};
23
24template <typename Allocator, typename... Args>
25struct first_is_not_allocator : public std::true_type {};
26
27template <typename Allocator, typename Arg, typename... Args>
28struct first_is_not_allocator<Allocator, Arg, Args...> {
29 static constexpr bool value =
30 !std::is_same_v<std::remove_cvref_t<Arg>, Allocator>;
31};
32
33template <typename Promise, typename Allocator>
34class BindAllocator : public Promise {
35public:
36#ifdef __clang__
37 template <typename... Args>
38 requires(first_is_not_allocator<Allocator, Args...>::value)
39 static void *operator new(size_t, Args &&...) {
40 // If user didn't provide a signature like (Allocator&, ...), clang will
41 // fall back to ::new, we don't want that.
42 // https://github.com/llvm/llvm-project/issues/54881
43 static_assert(always_false<Args...>::value,
44 "Invalid arguments for allocator-bound coroutine");
45 }
46#endif
47
48 template <typename... Args>
49 static void *operator new(size_t size, Allocator &alloc, const Args &...) {
50 size_t allocator_offset = align_up(size, alignof(Allocator));
51 size_t total_size = allocator_offset + sizeof(Allocator);
52
53 Pointer mem = alloc.allocate(total_size);
54 try {
55 new (mem + allocator_offset) Allocator(alloc);
56 } catch (...) {
57 alloc.deallocate(mem, total_size);
58 throw;
59 }
60 return mem;
61 }
62
63 void operator delete(void *ptr, size_t size) noexcept {
64 size_t allocator_offset = align_up(size, alignof(Allocator));
65 size_t total_size = allocator_offset + sizeof(Allocator);
66 Pointer mem = static_cast<Pointer>(ptr);
67 Allocator &alloc = *std::launder(
68 reinterpret_cast<Allocator *>(mem + allocator_offset));
69 Allocator alloc_copy = std::move(alloc);
70 alloc.~Allocator();
71 alloc_copy.deallocate(mem, total_size);
72 }
73
74private:
75 using Pointer = typename std::allocator_traits<Allocator>::pointer;
76 using T = std::remove_pointer_t<Pointer>;
77 static_assert(sizeof(T) == 1, "Allocator pointer must point to byte type");
78};
79
80template <typename Promise>
81class BindAllocator<Promise, void> : public Promise {};
82
83template <typename Coro>
84class PromiseBase : public InvokerAdapter<PromiseBase<Coro>, WorkInvoker> {
85public:
86 using PromiseType = typename Coro::promise_type;
87
88 ~PromiseBase() {
89 if (exception_) [[unlikely]] {
90 try {
91 std::rethrow_exception(exception_);
92 } catch (const std::exception &e) {
93 panic_on(std::format(
94 "Unhandled exception in detached coroutine: {}", e.what()));
95 } catch (...) {
96 panic_on("Unhandled unknown exception in detached coroutine");
97 }
98 }
99 }
100
101 Coro get_return_object() noexcept {
102 return Coro{std::coroutine_handle<PromiseType>::from_promise(
103 static_cast<PromiseType &>(*this))};
104 }
105
106 std::suspend_always initial_suspend() const noexcept { return {}; }
107
108 void unhandled_exception() noexcept {
109 exception_ = std::current_exception();
110 }
111
112 struct FinalAwaiter {
113 bool await_ready() const noexcept { return false; }
114
115 std::coroutine_handle<>
116 await_suspend(std::coroutine_handle<PromiseType> handle) noexcept {
117 auto &self = handle.promise();
118
119 State expected = self.state_.load(std::memory_order_acquire);
120 State desired;
121 do {
122 if (expected == State::Idle) {
123 return self.caller_handle_;
124 } else if (expected == State::RunningJoinable) {
125 desired = State::Zombie;
126 } else if (expected == State::RunningDetached ||
127 expected == State::RunningJoining) {
128 desired = State::Finished;
129 } else [[unlikely]] {
130 panic_on(std::format(
131 "Invalid coroutine state in final_suspend: {}",
132 static_cast<int>(expected)));
133 }
134 } while (!self.state_.compare_exchange_weak(
135 expected, desired, std::memory_order_acq_rel,
136 std::memory_order_acquire));
137
138 State prev = expected;
139 if (prev == State::RunningDetached) {
140 handle.destroy();
141 return std::noop_coroutine();
142 } else if (prev == State::RunningJoining) {
143 auto *callback = self.callback_;
144 (*callback)();
145 return std::noop_coroutine();
146 } else {
147 assert(prev == State::RunningJoinable);
148 return std::noop_coroutine();
149 }
150 }
151
152 void await_resume() const noexcept {}
153 };
154
155 FinalAwaiter final_suspend() const noexcept { return {}; }
156
157 template <SenderLike T> auto await_transform(T &&value) {
158 return detail::as_awaiter(std::forward<T>(value));
159 }
160
161 template <typename T> T &&await_transform(T &&value) {
162 return std::forward<T>(value);
163 }
164
165public:
166 // Should be called before task is scheduled
167 void mark_running() noexcept {
168 state_.store(State::RunningJoinable, std::memory_order_relaxed);
169 }
170
171 void set_caller_handle(std::coroutine_handle<> handle) noexcept {
172 caller_handle_ = handle;
173 }
174
175 void request_detach() noexcept {
176 State expected = state_.load(std::memory_order_acquire);
177 State desired;
178 do {
179 if (expected == State::RunningJoinable) {
180 desired = State::RunningDetached;
181 } else if (expected == State::Zombie) {
182 desired = State::Finished;
183 } else [[unlikely]] {
184 panic_on(
185 std::format("Invalid coroutine state in request_detach: {}",
186 static_cast<int>(expected)));
187 }
188 } while (!state_.compare_exchange_weak(expected, desired,
189 std::memory_order_acq_rel,
190 std::memory_order_acquire));
191
192 State prev = expected;
193 if (prev == State::Zombie) {
194 auto h = std::coroutine_handle<PromiseType>::from_promise(
195 static_cast<PromiseType &>(*this));
196 h.destroy();
197 }
198 }
199
200 bool request_join(Invoker *remote_callback) noexcept {
201 State expected = state_.load(std::memory_order_acquire);
202 State desired;
203 do {
204 if (expected == State::RunningJoinable) {
205 desired = State::RunningJoining;
206 callback_ = remote_callback;
207 } else if (expected == State::Zombie) {
208 desired = State::Finished;
209 } else [[unlikely]] {
210 panic_on(
211 std::format("Invalid coroutine state in request_join: {}",
212 static_cast<int>(expected)));
213 }
214 } while (!state_.compare_exchange_weak(expected, desired,
215 std::memory_order_acq_rel,
216 std::memory_order_acquire));
217
218 State prev = expected;
219 if (prev == State::Zombie) {
220 return false; // ready to resume immediately
221 } else {
222 assert(prev == State::RunningJoinable);
223 return true;
224 }
225 }
226
227 std::exception_ptr exception() noexcept { return std::move(exception_); }
228
229 void invoke() noexcept {
230 auto h = std::coroutine_handle<PromiseType>::from_promise(
231 static_cast<PromiseType &>(*this));
232 h.resume();
233 }
234
235protected:
236 // Promise lifecycle state machine:
237 //
238 // 1) co_spawn():
239 // - Idle -> RunningJoinable:
240 // Task has been scheduled and is joinable.
241 //
242 // 2) final_suspend():
243 // - RunningJoinable -> Zombie:
244 // Coroutine completed, but no join/detach consumer has claimed
245 // final ownership yet.
246 // - RunningDetached -> Finished:
247 // Detach path, performs destroy() immediately.
248 // - RunningJoining -> Finished:
249 // Join path, invokes the registered callback to wake the
250 // waiter/continuation.
251 //
252 // 3) request_detach():
253 // - RunningJoinable -> RunningDetached:
254 // Detach requested before completion; final_suspend will destroy
255 // later.
256 // - Zombie -> Finished:
257 // Detach requested after completion; requester destroys coroutine
258 // immediately.
259 //
260 // 4) request_join():
261 // - RunningJoinable -> RunningJoining:
262 // Join requested before completion; stores callback and waits for
263 // final_suspend callback.
264 // - Zombie -> Finished:
265 // Join requested after completion; caller can resume/wait
266 // immediately (no callback path needed).
267 enum class State : uint8_t {
268 Idle,
269 RunningJoinable,
270 RunningDetached,
271 RunningJoining,
272 Zombie,
273 Finished,
274 };
275 static_assert(std::atomic<State>::is_always_lock_free);
276
277 std::atomic<State> state_ = State::Idle;
278 union {
279 std::coroutine_handle<> caller_handle_ = std::noop_coroutine();
280 Invoker *callback_;
281 };
282 std::exception_ptr exception_;
283};
284
285template <typename Allocator>
286class Promise<void, Allocator>
287 : public BindAllocator<PromiseBase<Coro<void, Allocator>>, Allocator> {
288public:
289 void return_void() const noexcept {}
290};
291
292template <typename T, typename Allocator>
293class Promise
294 : public BindAllocator<PromiseBase<Coro<T, Allocator>>, Allocator> {
295public:
296 void return_value(T value) { value_ = std::move(value); }
297
298 T value() { return std::move(value_.value()); }
299
300private:
301 std::optional<T> value_;
302};
303
304template <typename PromiseType> struct CoroAwaiterBase {
305 bool await_ready() const noexcept { return false; }
306
307 std::coroutine_handle<PromiseType>
308 await_suspend(std::coroutine_handle<> caller_handle) noexcept {
309 handle_.promise().set_caller_handle(caller_handle);
310 return handle_;
311 }
312
313 std::coroutine_handle<PromiseType> handle_;
314};
315
316template <typename T, typename Allocator>
317struct CoroAwaiter
318 : public CoroAwaiterBase<typename Coro<T, Allocator>::promise_type> {
319 using Base = CoroAwaiterBase<typename Coro<T, Allocator>::promise_type>;
320 T await_resume() {
321 auto exception = Base::handle_.promise().exception();
322 if (exception) [[unlikely]] {
323 Base::handle_.destroy();
324 std::rethrow_exception(exception);
325 }
326 T value = Base::handle_.promise().value();
327 Base::handle_.destroy();
328 return value;
329 }
330};
331
332template <typename Allocator>
333struct CoroAwaiter<void, Allocator>
334 : public CoroAwaiterBase<typename Coro<void, Allocator>::promise_type> {
335 using Base = CoroAwaiterBase<typename Coro<void, Allocator>::promise_type>;
336 void await_resume() {
337 auto exception = Base::handle_.promise().exception();
338 Base::handle_.destroy();
339 if (exception) [[unlikely]] {
340 std::rethrow_exception(exception);
341 }
342 }
343};
344
345template <typename T, typename Allocator>
346inline auto Coro<T, Allocator>::operator co_await() noexcept {
347 return CoroAwaiter<T, Allocator>{release()};
348}
349
350} // namespace condy
Coroutine definitions.
Polymorphic invocation utilities.
condy::Coro< T, std::pmr::polymorphic_allocator< std::byte > > Coro
Coroutine type using polymorphic allocator.
Definition pmr.hpp:26
The main namespace for the Condy library.
Definition condy.hpp:31
Helper functions for composing asynchronous operations.
Internal utility classes and functions used by Condy.