Condy v1.7.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
runtime.hpp
Go to the documentation of this file.
1
5
6#pragma once
7
9#include "condy/context.hpp"
10#include "condy/intrusive.hpp"
11#include "condy/invoker.hpp"
12#include "condy/ring.hpp"
15#include "condy/singleton.hpp"
16#include "condy/utils.hpp"
17#include "condy/work_type.hpp"
18#include <algorithm>
19#include <atomic>
20#include <cerrno>
21#include <cstddef>
22#include <cstdint>
23#include <cstring>
24#include <limits>
25#include <mutex>
26
27namespace condy {
28
29namespace detail {
30
31class ThreadLocalRing : public ThreadLocalSingleton<ThreadLocalRing> {
32public:
33 Ring *ring() { return &ring_; }
34
35 ThreadLocalRing() : ring_(create_ring_()) {}
36
37private:
38 // NOLINTNEXTLINE(bugprone-exception-escape)
39 static Ring create_ring_() noexcept {
40 io_uring_params params = {};
41 params.flags |= IORING_SETUP_CLAMP;
42 params.flags |= IORING_SETUP_SINGLE_ISSUER;
43 params.flags |= IORING_SETUP_SUBMIT_ALL;
44 // If we can construct Runtime, we should be able to construct this
45 // thread-local ring. So we ignore errors here.
46 return Ring(8, &params, nullptr, 0, std::numeric_limits<size_t>::max());
47 }
48
49private:
50 Ring ring_;
51};
52
53inline int sync_msg_ring(io_uring_sqe *sqe_data) noexcept {
54#if !IO_URING_CHECK_VERSION(2, 12) // >= 2.12
55 return io_uring_register_sync_msg(sqe_data);
56#else
57 auto *ring = ThreadLocalRing::current().ring();
58 auto *sqe = ring->get_sqe();
59 *sqe = *sqe_data;
60 int r = 0;
61 auto n =
62 ring->reap_completions_wait([&](io_uring_cqe *cqe) { r = cqe->res; });
63 if (n < 0) {
64 return static_cast<int>(n);
65 }
66 assert(n == 1);
67 return r;
68#endif
69}
70
71class CancelRequest {
72public:
73 CancelRequest(uintptr_t data) : data_(data) {}
74
75 void wait() noexcept {
76 while (!finished_.load(std::memory_order_acquire)) {
77 finished_.wait(false, std::memory_order_relaxed);
78 }
79 }
80
81 void notify() noexcept {
82 finished_.store(true, std::memory_order_release);
83 finished_.notify_one();
84 }
85
86 uintptr_t data() const noexcept { return data_; }
87
88private:
89 uintptr_t data_;
90 std::atomic_bool finished_ = false;
91};
92
93} // namespace detail
94
95class OpFinishHandleBase {
96public:
97 using HandleFunc = bool (*)(void *, io_uring_cqe *) noexcept;
98
99 bool handle(io_uring_cqe *cqe) noexcept {
100 assert(handle_func_ != nullptr);
101 return handle_func_(this, cqe);
102 }
103
104protected:
105 OpFinishHandleBase() = default;
106
107protected:
108 HandleFunc handle_func_ = nullptr;
109};
110
117class Runtime {
118public:
123 Runtime(const RuntimeOptions &options = {})
124 : ring_(create_ring_(options)),
125 event_interval_(options.event_interval_),
126 disable_register_ring_fd_(options.disable_register_ring_fd_),
127 fd_table_(*ring_.ring()), buffer_table_(*ring_.ring()),
128 settings_(*ring_.ring()) {}
129
130 CONDY_DELETE_COPY_MOVE(Runtime);
131
132public:
140 void allow_exit() noexcept {
141 exit_allowed_.store(true, std::memory_order_release);
142 wakeup_();
143 }
144
145 void schedule(WorkInvoker *work) noexcept {
146 auto *curr_runtime = detail::Context::current().runtime();
147 if (curr_runtime == this) {
148 local_queue_.push_back(work);
149 return;
150 }
151
152 auto state = state_.load();
153 if (state == State::Enabled) {
154 // Fast path: if the ring is enabled, we can directly schedule the
155 // work
156 tsan_release(work);
157 schedule_msg_ring_(curr_runtime,
158 encode_work(work, WorkType::Schedule));
159 } else {
160 // Slow path: if the ring is not enabled, we need to acquire the
161 // mutex to ensure the work is scheduled before the ring is enabled
162 std::unique_lock<std::mutex> lock(mutex_);
163 state = state_.load();
164 if (state == State::Enabled) {
165 lock.unlock();
166 tsan_release(work);
167 schedule_msg_ring_(curr_runtime,
168 encode_work(work, WorkType::Schedule));
169 } else {
170 global_queue_.push_back(work);
171 }
172 }
173 }
174
175 // Internal use only. Schedule a cancel request for the given data.
176 void cancel(uintptr_t data) noexcept {
177 auto *curr_runtime = detail::Context::current().runtime();
178 if (curr_runtime == this) {
179 io_uring_sqe *sqe = ring_.get_sqe();
180 prep_cancel_(sqe, data);
181 return;
182 }
183
184 auto state = state_.load();
185 if (state != State::Enabled) {
186 return;
187 }
188
189 detail::CancelRequest request(data);
190 tsan_release(&request);
191 schedule_msg_ring_(curr_runtime,
192 encode_work(&request, WorkType::Cancel));
193 if (curr_runtime != nullptr) {
194 // Ensure the cancel msg is submitted.
195 curr_runtime->ring_.submit();
196 }
197 // Block until the runtime thread has submitted the cancel SQE. This is
198 // important to prevent address reuse of the same data pointer, which
199 // can lead to incorrect cancellation or other bugs.
200 request.wait();
201 }
202
203 void pend_work() noexcept {
204 assert(detail::Context::current().runtime() == this);
205 pending_works_++;
206 }
207
208 void resume_work() noexcept {
209 assert(detail::Context::current().runtime() == this);
210 pending_works_--;
211 }
212
222 void run() {
223 State expected = State::Idle;
224 bool success = state_.compare_exchange_strong(expected, State::Running);
225 if (!success) {
226 throw std::runtime_error(
227 "Runtime is already running or has been stopped");
228 }
229 auto d1 = defer([this]() { state_.store(State::Stopped); });
230
231 [[maybe_unused]] int r;
232 r = io_uring_enable_rings(ring_.ring());
233 assert(r == 0);
234
235 {
236 std::lock_guard<std::mutex> lock(mutex_);
237 flush_global_queue_();
238 // Now that the ring is enabled and all pending works are scheduled,
239 // we can set the state to Enabled.
240 state_.store(State::Enabled);
241 }
242
243 if (!disable_register_ring_fd_) {
244 r = io_uring_register_ring_fd(ring_.ring());
245 assert(r == 1); // 1 indicates success for this call
246 }
247
248 detail::Context::current().init(this);
249 auto d2 = defer([]() { detail::Context::current().reset(); });
250
251 while (true) {
252 tick_count_++;
253
254 if (tick_count_ >= event_interval_) {
255 tick_count_ = 0;
256 flush_ring_();
257 }
258
259 if (auto *work = local_queue_.pop_front()) {
260 (*work)();
261 continue;
262 }
263
264 if (pending_works_ == 0 &&
265 exit_allowed_.load(std::memory_order_acquire)) {
266 break;
267 }
268 flush_ring_wait_();
269 }
270 }
271
272 auto &ring() noexcept { return ring_; }
273
278 auto &fd_table() noexcept { return fd_table_; }
279
284 auto &buffer_table() noexcept { return buffer_table_; }
285
290 auto &settings() noexcept { return settings_; }
291
292private:
293 static Ring create_ring_(const RuntimeOptions &options) {
294 io_uring_params params;
295 std::memset(&params, 0, sizeof(params));
296
297 params.flags |= IORING_SETUP_CLAMP;
298 params.flags |= IORING_SETUP_SINGLE_ISSUER;
299 params.flags |= IORING_SETUP_SUBMIT_ALL;
300 params.flags |= IORING_SETUP_R_DISABLED;
301
302 size_t ring_entries = options.sq_size_;
303 if (options.cq_size_ != 0) { // 0 means default
304 params.flags |= IORING_SETUP_CQSIZE;
305 params.cq_entries = options.cq_size_;
306 }
307
308 if (options.enable_iopoll_) {
309 params.flags |= IORING_SETUP_IOPOLL;
310#if !IO_URING_CHECK_VERSION(2, 9) // >= 2.9
311 if (options.enable_hybrid_iopoll_) {
312 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
313 }
314#endif
315 }
316
317 if (options.enable_sqpoll_) {
318 params.flags |= IORING_SETUP_SQPOLL;
319 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
320 if (options.sqpoll_thread_cpu_.has_value()) {
321 params.flags |= IORING_SETUP_SQ_AFF;
322 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
323 }
324 }
325
326 if (options.attach_wq_target_ != nullptr) {
327 params.flags |= IORING_SETUP_ATTACH_WQ;
328 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
329 }
330
331 if (options.enable_defer_taskrun_) {
332 params.flags |= IORING_SETUP_DEFER_TASKRUN;
333 params.flags |= IORING_SETUP_TASKRUN_FLAG;
334 }
335
336 if (options.enable_coop_taskrun_) {
337 params.flags |= IORING_SETUP_COOP_TASKRUN;
338 params.flags |= IORING_SETUP_TASKRUN_FLAG;
339 }
340
341 if (options.enable_sqe128_) {
342 params.flags |= IORING_SETUP_SQE128;
343 }
344
345 if (options.enable_cqe32_) {
346 params.flags |= IORING_SETUP_CQE32;
347 }
348
349#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
350 if (options.enable_sqe_mixed_) {
351 params.flags |= IORING_SETUP_SQE_MIXED;
352 }
353#endif
354
355#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13
356 if (options.enable_cqe_mixed_) {
357 params.flags |= IORING_SETUP_CQE_MIXED;
358 }
359#endif
360
361 void *buf = nullptr;
362 size_t buf_size = 0;
363#if !IO_URING_CHECK_VERSION(2, 5) // >= 2.5
364 if (options.enable_no_mmap_) {
365 params.flags |= IORING_SETUP_NO_MMAP;
366 buf = options.no_mmap_buf_;
367 buf_size = options.no_mmap_buf_size_;
368 }
369#endif
370
371#if !IO_URING_CHECK_VERSION(2, 14) // >= 2.14
372 if (options.enable_sq_rewind_) {
373 params.flags |= IORING_SETUP_SQ_REWIND;
374 }
375#endif
376
377 size_t submit_batch = options.submit_batch_;
378 if (submit_batch == 0) {
379 if (options.enable_sqpoll_) {
380 submit_batch = std::min<size_t>(32, ring_entries);
381 } else {
382 submit_batch = std::numeric_limits<size_t>::max();
383 }
384 }
385 assert(submit_batch > 0);
386
387 return Ring(ring_entries, &params, buf, buf_size, submit_batch);
388 }
389
390 void schedule_msg_ring_(Runtime *curr_runtime, uintptr_t data) noexcept {
391 int ring_fd = this->ring_.ring()->ring_fd;
392 if (curr_runtime != nullptr) {
393 io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
394 prep_msg_ring_(ring_fd, sqe, data);
395 curr_runtime->pend_work();
396 } else {
397 io_uring_sqe sqe = {};
398 prep_msg_ring_(ring_fd, &sqe, data);
399 int r = detail::sync_msg_ring(&sqe);
400 if (r < 0) {
401 panic_on(std::format("sync_msg_ring: {}", std::strerror(-r)));
402 }
403 }
404 }
405
406 // Wakeup the runtime if it's blocked in Ring::reap_completions_wait()
407 void wakeup_() noexcept {
408 auto *curr_runtime = detail::Context::current().runtime();
409 if (curr_runtime == this) {
410 return;
411 }
412
413 auto state = state_.load();
414 if (state != State::Enabled) {
415 return;
416 }
417
418 schedule_msg_ring_(curr_runtime,
419 encode_work(nullptr, WorkType::Ignore));
420 }
421
422 void flush_global_queue_() noexcept {
423 local_queue_.push_back(std::move(global_queue_));
424 }
425
426 static void prep_msg_ring_(int ring_fd, io_uring_sqe *sqe,
427 uintptr_t data) noexcept {
428 io_uring_prep_msg_ring(sqe, ring_fd, 0, data, 0);
429 io_uring_sqe_set_data64(sqe, encode_work(nullptr, WorkType::Schedule));
430 }
431
432 static void prep_cancel_(io_uring_sqe *sqe, uintptr_t data) noexcept {
433 io_uring_prep_cancel64(sqe, data, 0);
434 io_uring_sqe_set_data64(sqe, encode_work(nullptr, WorkType::Ignore));
435 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
436 }
437
438 void flush_ring_() noexcept {
439 auto r = ring_.reap_completions(
440 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
441 if (r < 0) {
442 panic_on(std::format("io_uring_peek_cqe: {}",
443 std::strerror(static_cast<int>(-r))));
444 }
445 }
446
447 void flush_ring_wait_() noexcept {
448 auto r = ring_.reap_completions_wait(
449 [this](io_uring_cqe *cqe) { process_cqe_(cqe); });
450 if (r < 0) {
451 panic_on(std::format("io_uring_submit_and_wait: {}",
452 std::strerror(static_cast<int>(-r))));
453 }
454 }
455
456 void process_cqe_(io_uring_cqe *cqe) noexcept {
457 auto [data, type] = decode_work(io_uring_cqe_get_data64(cqe));
458
459 if (type == WorkType::Ignore) {
460 // No-op
461 assert(cqe->res != -EINVAL); // If EINVAL, something is wrong
462 } else if (type == WorkType::Schedule) {
463 if (data == nullptr) {
464 if (cqe->res < 0) {
465 panic_on(std::format("io_uring_prep_msg_ring: {}",
466 std::strerror(-cqe->res)));
467 }
468 resume_work();
469 } else {
470 auto *work = static_cast<WorkInvoker *>(data);
471 tsan_acquire(work);
472 (*work)();
473 }
474 } else if (type == WorkType::Cancel) {
475 detail::CancelRequest *request =
476 static_cast<detail::CancelRequest *>(data);
477 tsan_acquire(request);
478 io_uring_sqe *sqe = ring_.get_sqe();
479 prep_cancel_(sqe, request->data());
480 request->notify();
481 } else if (type == WorkType::Common) {
482 auto *handle = static_cast<OpFinishHandleBase *>(data);
483 auto op_finish = handle->handle(cqe);
484 if (op_finish) {
485 resume_work();
486 }
487 } else {
488 unreachable();
489 }
490 }
491
492private:
493 enum class State : uint8_t {
494 Idle, // Not running
495 Running, // Started running
496 Enabled, // Running and ring enabled
497 Stopped, // Stopped
498 };
499 static_assert(std::atomic<State>::is_always_lock_free);
500
501 using WorkListQueue =
502 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
503
504 // Global state
505 std::mutex mutex_;
506 WorkListQueue global_queue_;
507 size_t pending_works_ = 0;
508 std::atomic_bool exit_allowed_ = false;
509 std::atomic<State> state_ = State::Idle;
510
511 // Local state
512 WorkListQueue local_queue_;
513 Ring ring_;
514 size_t tick_count_ = 0;
515
516 // Configurable parameters
517 size_t event_interval_ = 61;
518 bool disable_register_ring_fd_ = false;
519
520 FdTable fd_table_;
521 BufferTable buffer_table_;
522 RingSettings settings_;
523};
524
531inline auto &current_runtime() noexcept {
532 return *detail::Context::current().runtime();
533}
534
535} // namespace condy
void run()
Run the runtime event loop in the current thread.
Definition runtime.hpp:222
auto & buffer_table() noexcept
Get the buffer table of the runtime.
Definition runtime.hpp:284
auto & fd_table() noexcept
Get the file descriptor table of the runtime.
Definition runtime.hpp:278
void allow_exit() noexcept
Allow the runtime to exit when there are no pending works.
Definition runtime.hpp:140
Runtime(const RuntimeOptions &options={})
Construct a new Runtime object.
Definition runtime.hpp:123
auto & settings() noexcept
Get the ring settings of the runtime.
Definition runtime.hpp:290
Intrusive single-linked and double-linked list implementations.
Polymorphic invocation utilities.
The main namespace for the Condy library.
Definition condy.hpp:31
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Definition utils.hpp:103
auto & current_runtime() noexcept
Get the current runtime.
Definition runtime.hpp:531
Wrapper classes for liburing interfaces.
io_uring settings management classes.
Internal utility classes and functions used by Condy.