Condy v1.7.0
C++ Asynchronous System Call Layer for Linux
Loading...
Searching...
No Matches
provided_buffers.hpp
Go to the documentation of this file.
1
7
8#pragma once
9
10#include "condy/buffers.hpp"
11#include "condy/concepts.hpp"
12#include "condy/condy_uring.hpp"
13#include "condy/context.hpp"
14#include "condy/ring.hpp"
15#include "condy/runtime.hpp"
16#include "condy/utils.hpp"
17#include <algorithm>
18#include <bit>
19#include <cstddef>
20#include <cstdint>
21#include <stdexcept>
22#include <sys/mman.h>
23#include <sys/types.h>
24
25namespace condy {
26
36struct BufferInfo {
40 uint16_t bid;
44 uint16_t num_buffers;
45};
46
47namespace detail {
48
49class BundledProvidedBufferQueue {
50public:
51 BundledProvidedBufferQueue(uint32_t capacity, unsigned int flags)
52 : capacity_(std::bit_ceil(capacity)), buf_lens_(capacity_, 0),
53 br_flags_(flags) {
54 auto &context = detail::Context::current();
55
56 size_t data_size = capacity_ * sizeof(io_uring_buf);
57 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
58 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
59 if (data == MAP_FAILED) [[unlikely]] {
60 throw make_system_error("mmap");
61 }
62 auto d1 = defer([&]() { munmap(data, data_size); });
63
64 bgid_ = context.next_bgid();
65 auto d2 = defer([&]() { context.recycle_bgid(bgid_); });
66
67 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
68 io_uring_buf_ring_init(br_);
69
70 io_uring_buf_reg reg = {};
71 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
72 reg.ring_entries = capacity_;
73 reg.bgid = bgid_;
74 int r = io_uring_register_buf_ring(context.runtime()->ring().ring(),
75 &reg, br_flags_);
76 if (r != 0) [[unlikely]] {
77 throw make_system_error("io_uring_register_buf_ring", -r);
78 }
79
80 d1.dismiss();
81 d2.dismiss();
82 }
83
84 ~BundledProvidedBufferQueue() {
85 assert(br_ != nullptr);
86 size_t data_size = capacity_ * sizeof(io_uring_buf);
87 munmap(br_, data_size);
88 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
89 detail::Context::current().runtime()->ring().ring(), bgid_);
90 assert(r == 0);
91 if (r == 0) {
92 detail::Context::current().recycle_bgid(bgid_);
93 }
94 }
95
96 CONDY_DELETE_COPY_MOVE(BundledProvidedBufferQueue);
97
98public:
102 size_t size() const noexcept { return size_; }
103
107 size_t capacity() const noexcept { return capacity_; }
108
119 template <BufferLike Buffer> uint16_t push(const Buffer &buffer) {
120 if (size_ >= capacity_) [[unlikely]] {
121 throw std::logic_error("Capacity exceeded");
122 }
123
124 auto mask = io_uring_buf_ring_mask(capacity_);
125 uint16_t bid = br_->tail & mask;
126 io_uring_buf_ring_add(br_, buffer.data(), buffer.size(), bid, mask, 0);
127 buf_lens_[bid] = buffer.size();
128 io_uring_buf_ring_advance(br_, 1);
129 size_++;
130
131 return bid;
132 }
133
134public:
135 uint16_t bgid() const noexcept { return bgid_; }
136
137 BufferInfo handle_finish(io_uring_cqe *cqe) noexcept {
138 assert(cqe != nullptr);
139 int32_t res = cqe->res;
140 uint32_t flags = cqe->flags;
141
142 if (!(flags & IORING_CQE_F_BUFFER)) {
143 return BufferInfo{0, 0};
144 }
145
146 assert(res > 0);
147
148 BufferInfo result = {
149 .bid = static_cast<uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT),
150 .num_buffers = 0,
151 };
152
153#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
154 if (flags & IORING_CQE_F_BUF_MORE) {
155 assert(buf_lens_[result.bid] > static_cast<uint32_t>(res));
156 buf_lens_[result.bid] -= res;
157 return result;
158 }
159#endif
160
161 bool is_incr = false;
162#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
163 if (br_flags_ & IOU_PBUF_RING_INC) {
164 is_incr = true;
165 }
166#endif
167
168 auto mask = io_uring_buf_ring_mask(capacity_);
169 uint16_t curr_bid = result.bid;
170 int64_t bytes = res;
171 while (bytes > 0) {
172 uint32_t buf_len;
173 if (is_incr) {
174 buf_len = std::min<uint32_t>(bytes, buf_lens_[curr_bid]);
175 buf_lens_[curr_bid] -= buf_len;
176 } else {
177 buf_len = std::exchange(buf_lens_[curr_bid], 0);
178 }
179 bytes -= buf_len;
180 if (buf_lens_[curr_bid] == 0) {
181 result.num_buffers++;
182 }
183 curr_bid = (curr_bid + 1) & mask;
184 }
185 assert(size_ >= result.num_buffers);
186 size_ -= result.num_buffers;
187
188 return result;
189 }
190
191private:
192 io_uring_buf_ring *br_ = nullptr;
193 uint32_t size_ = 0;
194 uint32_t capacity_;
195 uint16_t bgid_;
196 std::vector<uint32_t> buf_lens_;
197 unsigned int br_flags_;
198};
199
200} // namespace detail
201
213class ProvidedBufferQueue : public detail::BundledProvidedBufferQueue {
214public:
221 ProvidedBufferQueue(uint32_t capacity, unsigned int flags = 0)
222 : BundledProvidedBufferQueue(capacity, flags) {}
223
224 BufferInfo handle_finish(io_uring_cqe *cqe) noexcept {
225 assert(cqe != nullptr);
226 auto result = BundledProvidedBufferQueue::handle_finish(cqe);
227 assert(result.num_buffers <= 1);
228 return result;
229 }
230};
231
232namespace detail {
233class BundledProvidedBufferPool;
234}
235
244struct ProvidedBuffer {
245public:
246 using CondyBuffer = void;
247
248 ProvidedBuffer() = default;
249 ProvidedBuffer(void *data, size_t size,
250 detail::BundledProvidedBufferPool *pool)
251 : data_(data), size_(size), pool_(pool) {}
252 ProvidedBuffer(ProvidedBuffer &&other) noexcept
253 : data_(std::exchange(other.data_, nullptr)),
254 size_(std::exchange(other.size_, 0)),
255 pool_(std::exchange(other.pool_, nullptr)) {}
256 ProvidedBuffer &operator=(ProvidedBuffer &&other) noexcept {
257 if (this != &other) {
258 reset();
259 data_ = std::exchange(other.data_, nullptr);
260 size_ = std::exchange(other.size_, 0);
261 pool_ = std::exchange(other.pool_, nullptr);
262 }
263 return *this;
264 }
265
266 ~ProvidedBuffer() { reset(); }
267
268 CONDY_DELETE_COPY(ProvidedBuffer);
269
270public:
274 void *data() const noexcept { return data_; }
275
279 size_t size() const noexcept { return size_; }
280
284 void reset() noexcept;
285
289 bool owns_buffer() const noexcept { return pool_ != nullptr; }
290
291private:
292 void *data_ = nullptr;
293 size_t size_ = 0;
294 detail::BundledProvidedBufferPool *pool_ = nullptr;
295};
296
297namespace detail {
298
299class BundledProvidedBufferPool {
300public:
301 BundledProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
302 unsigned int flags)
303 : num_buffers_(std::bit_ceil(num_buffers)), buffer_size_(buffer_size),
304 curr_buf_len_(buffer_size), br_flags_(flags) {
305 auto &context = detail::Context::current();
306
307 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size);
308 void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
309 MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
310 if (data == MAP_FAILED) [[unlikely]] {
311 throw make_system_error("mmap");
312 }
313 auto d1 = defer([&]() { munmap(data, data_size); });
314
315 bgid_ = context.next_bgid();
316 auto d2 = defer([&]() { context.recycle_bgid(bgid_); });
317
318 br_ = reinterpret_cast<io_uring_buf_ring *>(data);
319 io_uring_buf_ring_init(br_);
320
321 io_uring_buf_reg reg = {};
322 reg.ring_addr = reinterpret_cast<uint64_t>(br_);
323 reg.ring_entries = num_buffers_;
324 reg.bgid = bgid_;
325 int r = io_uring_register_buf_ring(context.runtime()->ring().ring(),
326 &reg, br_flags_);
327 if (r != 0) [[unlikely]] {
328 throw make_system_error("io_uring_register_buf_ring", -r);
329 }
330
331 char *buffer_base =
332 static_cast<char *>(data) + sizeof(io_uring_buf) * num_buffers_;
333 auto mask = io_uring_buf_ring_mask(num_buffers_);
334 for (size_t bid = 0; bid < num_buffers_; bid++) {
335 char *ptr = buffer_base + bid * buffer_size;
336 io_uring_buf_ring_add(br_, ptr, buffer_size, bid, mask,
337 static_cast<int>(bid));
338 }
339 io_uring_buf_ring_advance(br_, static_cast<int>(num_buffers_));
340
341 d1.dismiss();
342 d2.dismiss();
343 }
344
345 ~BundledProvidedBufferPool() {
346 assert(br_ != nullptr);
347 size_t data_size = num_buffers_ * (sizeof(io_uring_buf) + buffer_size_);
348 munmap(br_, data_size);
349 [[maybe_unused]] int r = io_uring_unregister_buf_ring(
350 detail::Context::current().runtime()->ring().ring(), bgid_);
351 assert(r == 0);
352 if (r == 0) {
353 detail::Context::current().recycle_bgid(bgid_);
354 }
355 }
356
357 CONDY_DELETE_COPY_MOVE(BundledProvidedBufferPool);
358
359public:
363 size_t capacity() const noexcept { return num_buffers_; }
364
368 size_t buffer_size() const noexcept { return buffer_size_; }
369
370public:
371 uint16_t bgid() const noexcept { return bgid_; }
372
373 std::vector<ProvidedBuffer> handle_finish(io_uring_cqe *cqe) noexcept {
374 assert(cqe != nullptr);
375 int32_t res = cqe->res;
376 uint32_t flags = cqe->flags;
377 std::vector<ProvidedBuffer> buffers;
378
379 if (!(flags & IORING_CQE_F_BUFFER)) {
380 return buffers;
381 }
382
383 assert(res > 0);
384
385 uint16_t bid = flags >> IORING_CQE_BUFFER_SHIFT;
386
387#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
388 if (flags & IORING_CQE_F_BUF_MORE) {
389 char *data = get_buffer_(bid) + (buffer_size_ - curr_buf_len_);
390 buffers.emplace_back(data, res, nullptr);
391 assert(static_cast<uint32_t>(res) < curr_buf_len_);
392 curr_buf_len_ -= res;
393 return buffers;
394 }
395#endif
396 assert(bid == curr_io_uring_buf_()->bid);
397
398 bool is_incr = false;
399#if !IO_URING_CHECK_VERSION(2, 8) // >= 2.8
400 if (br_flags_ & IOU_PBUF_RING_INC) {
401 is_incr = true;
402 }
403#endif
404
405 int64_t bytes = res;
406 while (bytes > 0) {
407 auto *buf_ptr = curr_io_uring_buf_();
408 bid = buf_ptr->bid;
409
410 char *data = get_buffer_(bid) + (buffer_size_ - curr_buf_len_);
411 uint32_t buf_len;
412 if (is_incr) {
413 buf_len = std::min<uint32_t>(bytes, curr_buf_len_);
414 curr_buf_len_ -= buf_len;
415 } else {
416 buf_len = std::exchange(curr_buf_len_, 0);
417 }
418 bytes -= buf_len;
419 if (curr_buf_len_ == 0) {
420 buffers.emplace_back(data, buf_len, this);
421 advance_io_uring_buf_();
422 curr_buf_len_ = buffer_size_;
423 } else {
424 buffers.emplace_back(data, buf_len, nullptr);
425 }
426 }
427
428 return buffers;
429 }
430
431 void add_buffer_back(void *ptr) noexcept {
432 char *base = get_buffers_base_();
433 assert(ptr >= base);
434 size_t offset = static_cast<char *>(ptr) - base;
435 size_t bid = offset / buffer_size_;
436 assert(bid < num_buffers_);
437 char *buffer_ptr = base + bid * buffer_size_;
438 auto mask = io_uring_buf_ring_mask(num_buffers_);
439 io_uring_buf_ring_add(br_, buffer_ptr, buffer_size_, bid, mask, 0);
440 io_uring_buf_ring_advance(br_, 1);
441 }
442
443private:
444 char *get_buffer_(uint16_t bid) const noexcept {
445 return get_buffers_base_() + static_cast<size_t>(bid) * buffer_size_;
446 }
447
448 char *get_buffers_base_() const noexcept {
449 return reinterpret_cast<char *>(br_) +
450 sizeof(io_uring_buf) * num_buffers_;
451 }
452
453 io_uring_buf *curr_io_uring_buf_() noexcept {
454 auto mask = io_uring_buf_ring_mask(num_buffers_);
455 return &br_->bufs[br_head_ & mask];
456 }
457
458 void advance_io_uring_buf_() noexcept { br_head_++; }
459
460private:
461 io_uring_buf_ring *br_ = nullptr;
462 uint32_t num_buffers_;
463 uint32_t buffer_size_;
464 uint32_t curr_buf_len_;
465 uint16_t bgid_;
466 uint16_t br_head_ = 0;
467 unsigned int br_flags_;
468};
469
470} // namespace detail
471
472inline void ProvidedBuffer::reset() noexcept {
473 if (pool_ != nullptr) {
474 pool_->add_buffer_back(data_);
475 }
476 data_ = nullptr;
477 size_ = 0;
478 pool_ = nullptr;
479}
480
493class ProvidedBufferPool : public detail::BundledProvidedBufferPool {
494public:
502 ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size,
503 unsigned int flags = 0)
504 : BundledProvidedBufferPool(num_buffers, buffer_size, flags) {}
505
506public:
507 ProvidedBuffer handle_finish(io_uring_cqe *cqe) noexcept {
508 assert(cqe != nullptr);
509 auto buffers = BundledProvidedBufferPool::handle_finish(cqe);
510 if (buffers.empty()) {
511 return ProvidedBuffer();
512 }
513 assert(buffers.size() == 1);
514 return std::move(buffers[0]);
515 }
516};
517
528 return static_cast<detail::BundledProvidedBufferPool &>(buffer);
529}
530
538 return static_cast<detail::BundledProvidedBufferQueue &>(buffer);
539}
540
541} // namespace condy
Basic buffer types and conversion utilities.
ProvidedBufferPool(uint32_t num_buffers, size_t buffer_size, unsigned int flags=0)
Construct a new ProvidedBufferPool object in current Runtime.
ProvidedBufferQueue(uint32_t capacity, unsigned int flags=0)
Construct a new ProvidedBufferQueue object in current Runtime.
The main namespace for the Condy library.
Definition condy.hpp:31
auto & bundled(ProvidedBufferPool &buffer)
Get the bundled variant of a provided buffer pool. This will enable buffer bundling feature of io_uri...
MutableBuffer buffer(void *data, size_t size) noexcept
Create a buffer object from various data sources.
Definition buffers.hpp:86
auto defer(Func &&func)
Defer the execution of a function until the current scope ends.
Definition utils.hpp:103
Wrapper classes for liburing interfaces.
Runtime type for running the io_uring event loop.
Information about buffers consumed from a provided buffer queue.
uint16_t bid
Buffer ID of the first buffer consumed.
uint16_t num_buffers
Number of buffers consumed.
bool owns_buffer() const noexcept
Check if the provided buffer owns a buffer from a pool.
void * data() const noexcept
Get the data pointer of the provided buffer.
size_t size() const noexcept
Get the size of the provided buffer.
void reset() noexcept
Reset the provided buffer, returning it to the pool if owned.
Internal utility classes and functions used by Condy.