124 : ring_(create_ring_(options)),
125 event_interval_(options.event_interval_),
126 disable_register_ring_fd_(options.disable_register_ring_fd_),
127 fd_table_(*ring_.ring()), buffer_table_(*ring_.ring()),
128 settings_(*ring_.ring()) {}
130 CONDY_DELETE_COPY_MOVE(
Runtime);
141 exit_allowed_.store(
true, std::memory_order_release);
145 void schedule(WorkInvoker *work)
noexcept {
146 auto *curr_runtime = detail::Context::current().runtime();
147 if (curr_runtime ==
this) {
148 local_queue_.push_back(work);
152 auto state = state_.load();
153 if (state == State::Enabled) {
157 schedule_msg_ring_(curr_runtime,
158 encode_work(work, WorkType::Schedule));
162 std::unique_lock<std::mutex> lock(mutex_);
163 state = state_.load();
164 if (state == State::Enabled) {
167 schedule_msg_ring_(curr_runtime,
168 encode_work(work, WorkType::Schedule));
170 global_queue_.push_back(work);
176 void cancel(uintptr_t data)
noexcept {
177 auto *curr_runtime = detail::Context::current().runtime();
178 if (curr_runtime ==
this) {
179 io_uring_sqe *sqe = ring_.get_sqe();
180 prep_cancel_(sqe, data);
184 auto state = state_.load();
185 if (state != State::Enabled) {
189 detail::CancelRequest request(data);
190 tsan_release(&request);
191 schedule_msg_ring_(curr_runtime,
192 encode_work(&request, WorkType::Cancel));
193 if (curr_runtime !=
nullptr) {
195 curr_runtime->ring_.submit();
203 void pend_work() noexcept {
204 assert(detail::Context::current().runtime() ==
this);
208 void resume_work() noexcept {
209 assert(detail::Context::current().runtime() ==
this);
223 State expected = State::Idle;
224 bool success = state_.compare_exchange_strong(expected, State::Running);
226 throw std::runtime_error(
227 "Runtime is already running or has been stopped");
229 auto d1 =
defer([
this]() { state_.store(State::Stopped); });
231 [[maybe_unused]]
int r;
232 r = io_uring_enable_rings(ring_.ring());
236 std::lock_guard<std::mutex> lock(mutex_);
237 flush_global_queue_();
240 state_.store(State::Enabled);
243 if (!disable_register_ring_fd_) {
244 r = io_uring_register_ring_fd(ring_.ring());
248 detail::Context::current().init(
this);
249 auto d2 =
defer([]() { detail::Context::current().reset(); });
254 if (tick_count_ >= event_interval_) {
259 if (
auto *work = local_queue_.pop_front()) {
264 if (pending_works_ == 0 &&
265 exit_allowed_.load(std::memory_order_acquire)) {
272 auto &ring() noexcept {
return ring_; }
294 io_uring_params params;
295 std::memset(¶ms, 0,
sizeof(params));
297 params.flags |= IORING_SETUP_CLAMP;
298 params.flags |= IORING_SETUP_SINGLE_ISSUER;
299 params.flags |= IORING_SETUP_SUBMIT_ALL;
300 params.flags |= IORING_SETUP_R_DISABLED;
302 size_t ring_entries = options.sq_size_;
303 if (options.cq_size_ != 0) {
304 params.flags |= IORING_SETUP_CQSIZE;
305 params.cq_entries = options.cq_size_;
308 if (options.enable_iopoll_) {
309 params.flags |= IORING_SETUP_IOPOLL;
310#if !IO_URING_CHECK_VERSION(2, 9)
311 if (options.enable_hybrid_iopoll_) {
312 params.flags |= IORING_SETUP_HYBRID_IOPOLL;
317 if (options.enable_sqpoll_) {
318 params.flags |= IORING_SETUP_SQPOLL;
319 params.sq_thread_idle = options.sqpoll_idle_time_ms_;
320 if (options.sqpoll_thread_cpu_.has_value()) {
321 params.flags |= IORING_SETUP_SQ_AFF;
322 params.sq_thread_cpu = *options.sqpoll_thread_cpu_;
326 if (options.attach_wq_target_ !=
nullptr) {
327 params.flags |= IORING_SETUP_ATTACH_WQ;
328 params.wq_fd = options.attach_wq_target_->ring_.ring()->ring_fd;
331 if (options.enable_defer_taskrun_) {
332 params.flags |= IORING_SETUP_DEFER_TASKRUN;
333 params.flags |= IORING_SETUP_TASKRUN_FLAG;
336 if (options.enable_coop_taskrun_) {
337 params.flags |= IORING_SETUP_COOP_TASKRUN;
338 params.flags |= IORING_SETUP_TASKRUN_FLAG;
341 if (options.enable_sqe128_) {
342 params.flags |= IORING_SETUP_SQE128;
345 if (options.enable_cqe32_) {
346 params.flags |= IORING_SETUP_CQE32;
349#if !IO_URING_CHECK_VERSION(2, 13)
350 if (options.enable_sqe_mixed_) {
351 params.flags |= IORING_SETUP_SQE_MIXED;
355#if !IO_URING_CHECK_VERSION(2, 13)
356 if (options.enable_cqe_mixed_) {
357 params.flags |= IORING_SETUP_CQE_MIXED;
363#if !IO_URING_CHECK_VERSION(2, 5)
364 if (options.enable_no_mmap_) {
365 params.flags |= IORING_SETUP_NO_MMAP;
366 buf = options.no_mmap_buf_;
367 buf_size = options.no_mmap_buf_size_;
371#if !IO_URING_CHECK_VERSION(2, 14)
372 if (options.enable_sq_rewind_) {
373 params.flags |= IORING_SETUP_SQ_REWIND;
377 size_t submit_batch = options.submit_batch_;
378 if (submit_batch == 0) {
379 if (options.enable_sqpoll_) {
380 submit_batch = std::min<size_t>(32, ring_entries);
382 submit_batch = std::numeric_limits<size_t>::max();
385 assert(submit_batch > 0);
387 return Ring(ring_entries, ¶ms, buf, buf_size, submit_batch);
390 void schedule_msg_ring_(
Runtime *curr_runtime, uintptr_t data)
noexcept {
391 int ring_fd = this->ring_.ring()->ring_fd;
392 if (curr_runtime !=
nullptr) {
393 io_uring_sqe *sqe = curr_runtime->ring_.get_sqe();
394 prep_msg_ring_(ring_fd, sqe, data);
395 curr_runtime->pend_work();
397 io_uring_sqe sqe = {};
398 prep_msg_ring_(ring_fd, &sqe, data);
399 int r = detail::sync_msg_ring(&sqe);
401 panic_on(std::format(
"sync_msg_ring: {}", std::strerror(-r)));
407 void wakeup_() noexcept {
408 auto *curr_runtime = detail::Context::current().runtime();
409 if (curr_runtime ==
this) {
413 auto state = state_.load();
414 if (state != State::Enabled) {
418 schedule_msg_ring_(curr_runtime,
419 encode_work(
nullptr, WorkType::Ignore));
422 void flush_global_queue_() noexcept {
423 local_queue_.push_back(std::move(global_queue_));
426 static void prep_msg_ring_(
int ring_fd, io_uring_sqe *sqe,
427 uintptr_t data)
noexcept {
428 io_uring_prep_msg_ring(sqe, ring_fd, 0, data, 0);
429 io_uring_sqe_set_data64(sqe, encode_work(
nullptr, WorkType::Schedule));
432 static void prep_cancel_(io_uring_sqe *sqe, uintptr_t data)
noexcept {
433 io_uring_prep_cancel64(sqe, data, 0);
434 io_uring_sqe_set_data64(sqe, encode_work(
nullptr, WorkType::Ignore));
435 io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
438 void flush_ring_() noexcept {
439 auto r = ring_.reap_completions(
440 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
442 panic_on(std::format(
"io_uring_peek_cqe: {}",
443 std::strerror(
static_cast<int>(-r))));
447 void flush_ring_wait_() noexcept {
448 auto r = ring_.reap_completions_wait(
449 [
this](io_uring_cqe *cqe) { process_cqe_(cqe); });
451 panic_on(std::format(
"io_uring_submit_and_wait: {}",
452 std::strerror(
static_cast<int>(-r))));
456 void process_cqe_(io_uring_cqe *cqe)
noexcept {
457 auto [data, type] = decode_work(io_uring_cqe_get_data64(cqe));
459 if (type == WorkType::Ignore) {
461 assert(cqe->res != -EINVAL);
462 }
else if (type == WorkType::Schedule) {
463 if (data ==
nullptr) {
465 panic_on(std::format(
"io_uring_prep_msg_ring: {}",
466 std::strerror(-cqe->res)));
470 auto *work =
static_cast<WorkInvoker *
>(data);
474 }
else if (type == WorkType::Cancel) {
475 detail::CancelRequest *request =
476 static_cast<detail::CancelRequest *
>(data);
477 tsan_acquire(request);
478 io_uring_sqe *sqe = ring_.get_sqe();
479 prep_cancel_(sqe, request->data());
481 }
else if (type == WorkType::Common) {
482 auto *handle =
static_cast<OpFinishHandleBase *
>(data);
483 auto op_finish = handle->handle(cqe);
493 enum class State : uint8_t {
499 static_assert(std::atomic<State>::is_always_lock_free);
501 using WorkListQueue =
502 IntrusiveSingleList<WorkInvoker, &WorkInvoker::work_queue_entry_>;
506 WorkListQueue global_queue_;
507 size_t pending_works_ = 0;
508 std::atomic_bool exit_allowed_ =
false;
509 std::atomic<State> state_ = State::Idle;
512 WorkListQueue local_queue_;
514 size_t tick_count_ = 0;
517 size_t event_interval_ = 61;
518 bool disable_register_ring_fd_ =
false;
521 BufferTable buffer_table_;
522 RingSettings settings_;