49class BundledProvidedBufferQueue {
51 BundledProvidedBufferQueue(uint32_t capacity,
unsigned int flags)
52 : capacity_(std::bit_ceil(capacity)), buf_lens_(capacity_, 0),
54 auto &context = detail::Context::current();
56 size_t data_size = capacity_ *
sizeof(io_uring_buf);
57 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
58 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
59 if (data == MAP_FAILED) [[unlikely]] {
60 throw make_system_error(
"mmap");
62 auto d1 =
defer([&]() { munmap(data, data_size); });
64 bgid_ = context.next_bgid();
65 auto d2 =
defer([&]() { context.recycle_bgid(bgid_); });
67 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
68 io_uring_buf_ring_init(br_);
70 io_uring_buf_reg reg = {};
71 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
72 reg.ring_entries = capacity_;
74 int r = io_uring_register_buf_ring(context.runtime()->ring().ring(),
76 if (r != 0) [[unlikely]] {
77 throw make_system_error(
"io_uring_register_buf_ring", -r);
84 ~BundledProvidedBufferQueue() {
85 assert(br_ !=
nullptr);
86 size_t data_size = capacity_ *
sizeof(io_uring_buf);
87 munmap(br_, data_size);
88 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
89 detail::Context::current().runtime()->ring().ring(), bgid_);
92 detail::Context::current().recycle_bgid(bgid_);
96 CONDY_DELETE_COPY_MOVE(BundledProvidedBufferQueue);
102 size_t size()
const noexcept {
return size_; }
107 size_t capacity()
const noexcept {
return capacity_; }
119 template <BufferLike Buffer> uint16_t push(
const Buffer &
buffer) {
120 if (size_ >= capacity_) [[unlikely]] {
121 throw std::logic_error(
"Capacity exceeded");
124 auto mask = io_uring_buf_ring_mask(capacity_);
125 uint16_t bid = br_->tail & mask;
126 io_uring_buf_ring_add(br_,
buffer.data(),
buffer.size(), bid, mask, 0);
127 buf_lens_[bid] =
buffer.size();
128 io_uring_buf_ring_advance(br_, 1);
135 uint16_t bgid()
const noexcept {
return bgid_; }
137 BufferInfo handle_finish(io_uring_cqe *cqe)
noexcept {
138 assert(cqe !=
nullptr);
139 int32_t res = cqe->res;
140 uint32_t flags = cqe->flags;
142 if (!(flags & IORING_CQE_F_BUFFER)) {
149 .bid =
static_cast<uint16_t
>(flags >> IORING_CQE_BUFFER_SHIFT),
153#if !IO_URING_CHECK_VERSION(2, 8)
154 if (flags & IORING_CQE_F_BUF_MORE) {
155 assert(buf_lens_[result.
bid] >
static_cast<uint32_t
>(res));
156 buf_lens_[result.
bid] -= res;
161 bool is_incr =
false;
162#if !IO_URING_CHECK_VERSION(2, 8)
163 if (br_flags_ & IOU_PBUF_RING_INC) {
168 auto mask = io_uring_buf_ring_mask(capacity_);
169 uint16_t curr_bid = result.
bid;
174 buf_len = std::min<uint32_t>(bytes, buf_lens_[curr_bid]);
175 buf_lens_[curr_bid] -= buf_len;
177 buf_len = std::exchange(buf_lens_[curr_bid], 0);
180 if (buf_lens_[curr_bid] == 0) {
183 curr_bid = (curr_bid + 1) & mask;
192 io_uring_buf_ring *br_ =
nullptr;
196 std::vector<uint32_t> buf_lens_;
197 unsigned int br_flags_;
222 : BundledProvidedBufferQueue(capacity, flags) {}
224 BufferInfo handle_finish(io_uring_cqe *cqe)
noexcept {
225 assert(cqe !=
nullptr);
226 auto result = BundledProvidedBufferQueue::handle_finish(cqe);
227 assert(result.num_buffers <= 1);
233class BundledProvidedBufferPool;
244struct ProvidedBuffer {
246 using CondyBuffer = void;
248 ProvidedBuffer() =
default;
249 ProvidedBuffer(
void *
data,
size_t size,
250 detail::BundledProvidedBufferPool *pool)
251 : data_(
data), size_(
size), pool_(pool) {}
252 ProvidedBuffer(ProvidedBuffer &&other) noexcept
253 : data_(std::exchange(other.data_,
nullptr)),
254 size_(std::exchange(other.size_, 0)),
255 pool_(std::exchange(other.pool_,
nullptr)) {}
256 ProvidedBuffer &operator=(ProvidedBuffer &&other)
noexcept {
257 if (
this != &other) {
259 data_ = std::exchange(other.data_,
nullptr);
260 size_ = std::exchange(other.size_, 0);
261 pool_ = std::exchange(other.pool_,
nullptr);
266 ~ProvidedBuffer() {
reset(); }
268 CONDY_DELETE_COPY(ProvidedBuffer);
274 void *
data() const noexcept {
return data_; }
279 size_t size() const noexcept {
return size_; }
284 void reset() noexcept;
292 void *data_ =
nullptr;
294 detail::BundledProvidedBufferPool *pool_ =
nullptr;
299class BundledProvidedBufferPool {
301 BundledProvidedBufferPool(uint32_t num_buffers,
size_t buffer_size,
303 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size),
304 curr_buf_len_(buffer_size), br_flags_(flags) {
305 auto &context = detail::Context::current();
307 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size);
308 void *data = mmap(
nullptr, data_size, PROT_READ | PROT_WRITE,
309 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
310 if (data == MAP_FAILED) [[unlikely]] {
311 throw make_system_error(
"mmap");
313 auto d1 =
defer([&]() { munmap(data, data_size); });
315 bgid_ = context.next_bgid();
316 auto d2 =
defer([&]() { context.recycle_bgid(bgid_); });
318 br_ =
reinterpret_cast<io_uring_buf_ring *
>(data);
319 io_uring_buf_ring_init(br_);
321 io_uring_buf_reg reg = {};
322 reg.ring_addr =
reinterpret_cast<uint64_t
>(br_);
323 reg.ring_entries = num_buffers_;
325 int r = io_uring_register_buf_ring(context.runtime()->ring().ring(),
327 if (r != 0) [[unlikely]] {
328 throw make_system_error(
"io_uring_register_buf_ring", -r);
332 static_cast<char *
>(data) +
sizeof(io_uring_buf) * num_buffers_;
333 auto mask = io_uring_buf_ring_mask(num_buffers_);
334 for (
size_t bid = 0; bid < num_buffers_; bid++) {
335 char *ptr = buffer_base + bid * buffer_size;
336 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
337 static_cast<int>(bid));
339 io_uring_buf_ring_advance(br_,
static_cast<int>(num_buffers_));
345 ~BundledProvidedBufferPool() {
346 assert(br_ !=
nullptr);
347 size_t data_size = num_buffers_ * (
sizeof(io_uring_buf) + buffer_size_);
348 munmap(br_, data_size);
349 [[maybe_unused]]
int r = io_uring_unregister_buf_ring(
350 detail::Context::current().runtime()->ring().ring(), bgid_);
353 detail::Context::current().recycle_bgid(bgid_);
357 CONDY_DELETE_COPY_MOVE(BundledProvidedBufferPool);
363 size_t capacity() const noexcept {
return num_buffers_; }
368 size_t buffer_size() const noexcept {
return buffer_size_; }
371 uint16_t bgid() const noexcept {
return bgid_; }
373 std::vector<ProvidedBuffer> handle_finish(io_uring_cqe *cqe)
noexcept {
374 assert(cqe !=
nullptr);
375 int32_t res = cqe->res;
376 uint32_t flags = cqe->flags;
377 std::vector<ProvidedBuffer> buffers;
379 if (!(flags & IORING_CQE_F_BUFFER)) {
385 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
387#if !IO_URING_CHECK_VERSION(2, 8)
388 if (flags & IORING_CQE_F_BUF_MORE) {
389 char *data = get_buffer_(bid) + (buffer_size_ - curr_buf_len_);
390 buffers.emplace_back(data, res,
nullptr);
391 assert(
static_cast<uint32_t
>(res) < curr_buf_len_);
392 curr_buf_len_ -= res;
396 assert(bid == curr_io_uring_buf_()->bid);
398 bool is_incr =
false;
399#if !IO_URING_CHECK_VERSION(2, 8)
400 if (br_flags_ & IOU_PBUF_RING_INC) {
407 auto *buf_ptr = curr_io_uring_buf_();
410 char *data = get_buffer_(bid) + (buffer_size_ - curr_buf_len_);
413 buf_len = std::min<uint32_t>(bytes, curr_buf_len_);
414 curr_buf_len_ -= buf_len;
416 buf_len = std::exchange(curr_buf_len_, 0);
419 if (curr_buf_len_ == 0) {
420 buffers.emplace_back(data, buf_len,
this);
421 advance_io_uring_buf_();
422 curr_buf_len_ = buffer_size_;
424 buffers.emplace_back(data, buf_len,
nullptr);
431 void add_buffer_back(
void *ptr)
noexcept {
432 char *base = get_buffers_base_();
434 size_t offset =
static_cast<char *
>(ptr) - base;
435 size_t bid = offset / buffer_size_;
436 assert(bid < num_buffers_);
437 char *buffer_ptr = base + bid * buffer_size_;
438 auto mask = io_uring_buf_ring_mask(num_buffers_);
439 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
440 io_uring_buf_ring_advance(br_, 1);
444 char *get_buffer_(uint16_t bid)
const noexcept {
445 return get_buffers_base_() +
static_cast<size_t>(bid) * buffer_size_;
448 char *get_buffers_base_() const noexcept {
449 return reinterpret_cast<char *
>(br_) +
450 sizeof(io_uring_buf) * num_buffers_;
453 io_uring_buf *curr_io_uring_buf_() noexcept {
454 auto mask = io_uring_buf_ring_mask(num_buffers_);
455 return &br_->bufs[br_head_ & mask];
458 void advance_io_uring_buf_() noexcept { br_head_++; }
461 io_uring_buf_ring *br_ =
nullptr;
462 uint32_t num_buffers_;
463 uint32_t buffer_size_;
464 uint32_t curr_buf_len_;
466 uint16_t br_head_ = 0;
467 unsigned int br_flags_;
473 if (pool_ !=
nullptr) {
474 pool_->add_buffer_back(data_);
503 unsigned int flags = 0)
504 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
508 assert(cqe !=
nullptr);
509 auto buffers = BundledProvidedBufferPool::handle_finish(cqe);
510 if (buffers.empty()) {
513 assert(buffers.size() == 1);
514 return std::move(buffers[0]);
528 return static_cast<detail::BundledProvidedBufferPool &
>(
buffer);
538 return static_cast<detail::BundledProvidedBufferQueue &
>(
buffer);
Basic buffer types and conversion utilities.
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object in current Runtime.
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object in current Runtime.
The main namespace for the Condy library.
auto & bundled(ProvidedBufferPool &buffer)
Get the bundled variant of a provided buffer pool. This will enable buffer bundling feature of io_uri...
MutableBuffer buffer(void *data, size_t size) noexcept
Create a buffer object from various data sources.
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Wrapper classes for liburing interfaces.
Runtime type for running the io_uring event loop.
Information about buffers consumed from a provided buffer queue.
uint16_t bid
Buffer ID of the first buffer consumed.
uint16_t num_buffers
Number of buffers consumed.
bool owns_buffer() const noexcept
Check if the provided buffer owns a buffer from a pool.
void * data() const noexcept
Get the data pointer of the provided buffer.
size_t size() const noexcept
Get the size of the provided buffer.
void reset() noexcept
Reset the provided buffer, returning it to the pool if owned.
Internal utility classes and functions used by Condy.