public inbox for [email protected]
 help / color / mirror / Atom feed
From: Jens Axboe <[email protected]>
To: [email protected]
Cc: Jens Axboe <[email protected]>
Subject: [PATCH 4/4] io_uring/kbuf: add support for incremental buffer consumption
Date: Fri, 23 Aug 2024 08:42:37 -0600	[thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>

By default, any recv/read operation that uses provided buffers will
consume at least 1 buffer fully (and maybe more, in case of bundles).
This adds support for incremental consumption, meaning that an
application may add large buffers, and each read/recv will just consume
the part of the buffer that it needs.

For example, let's say an application registers 1MB buffers in a
provided buffer ring, for streaming receives. If it gets a short recv,
then the full 1MB buffer will be consumed and passed back to the
application. With incremental consumption, only the part that was
actually used is consumed, and the buffer remains the current one.

This means that both the application and the kernel needs to keep track
of what the current receive point is. Each recv will still pass back a
buffer ID and the size consumed, the only difference is that before the
next receive would always be the next buffer in the ring. Now the same
buffer ID may return multiple receives, each at an offset into that
buffer from where the previous receive left off. Example:

Application registers a provided buffer ring, and adds two 32K buffers
to the ring.

Buffer1 address: 0x1000000 (buffer ID 0)
Buffer2 address: 0x2000000 (buffer ID 1)

A recv completion is received with the following values:

cqe->res	0x1000	(4k bytes received)
cqe->flags	0x1	(IORING_CQE_F_BUFFER set, buffer ID 0)

and the application now knows that 4096b of data is available at
0x1000000, the start of that buffer. Now the next receive comes in:

cqe->res	0x2000	(8k bytes received)
cqe->flags	0x1	(IORING_CQE_F_BUFFER set, buffer ID 0)

which tells the application that 8k is available where the last
completion left off, at 0x1001000. Next completion is:

cqe->res	0x5000	(20k bytes received)
cqe->flags	0x1	(IORING_CQE_F_BUFFER set, buffer ID 0)

and the application now knows that 20k of data is available at
0x1003000, which is where the previous receive ended. The next
completion is then:

cqe->res	0x1000	(4k bytes received)
cqe->flags	0x10001	(IORING_CQE_F_BUFFER set, buffer ID 1)

which tells the application that buffer ID 1 is now the current one,
hence there's 4k of valid data at 0x2000000. 0x2001000 will be the next
receive point for this buffer ID.

When a buffer will be reused by future CQE completions,
IORING_CQE_BUF_MORE will be set in cqe->flags. This tells the application
that the kernel isn't done with the buffer yet, and that it should expect
more completions for this buffer ID. Will only be set by provided buffer
rings setup with IOU_PBUF_RING INC, as that's the only type of buffer
that will see multiple consecutive completions for the same buffer ID.
For any other provided buffer type, any completion that passes back
a buffer to the application is final.

Once a buffer has been fully consumed, the buffer ring head is
incremented and the next receive will indicate the next buffer ID in the
CQE cflags.

On the send side, the application can manage how much data is sent from
an existing buffer by setting sqe->len to the desired send length.

An application can request incremental consumption by setting
IOU_PBUF_RING_INC in the provided buffer ring registration. Outside of
that, any provided buffer ring setup and buffer additions is done like
before, no changes there. The only change is in how an application may
see multiple completions for the same buffer ID, hence needing to know
where the next receive will happen.

Note that like existing provided buffer rings, this should not be used
with IOSQE_ASYNC, as both really require the ring to remain locked over
the duration of the buffer selection and the operation completion. It
will consume a buffer otherwise regardless of the size of the IO done.

Signed-off-by: Jens Axboe <[email protected]>
---
 include/uapi/linux/io_uring.h | 18 ++++++++++
 io_uring/io_uring.c           |  2 +-
 io_uring/kbuf.c               | 26 ++++++++++-----
 io_uring/kbuf.h               | 63 +++++++++++++++++++++++++----------
 io_uring/net.c                |  8 ++---
 io_uring/rw.c                 |  8 ++---
 6 files changed, 91 insertions(+), 34 deletions(-)

diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 042eab793e26..a275f91d2ac0 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -440,11 +440,21 @@ struct io_uring_cqe {
  * IORING_CQE_F_SOCK_NONEMPTY	If set, more data to read after socket recv
  * IORING_CQE_F_NOTIF	Set for notification CQEs. Can be used to distinct
  * 			them from sends.
+ * IORING_CQE_F_BUF_MORE If set, the buffer ID set in the completion will get
+ *			more completions. In other words, the buffer is being
+ *			partially consumed, and will be used by the kernel for
+ *			more completions. This is only set for buffers used via
+ *			the incremental buffer consumption, as provided by
+ *			a ring buffer setup with IOU_PBUF_RING_INC. For any
+ *			other provided buffer type, all completions with a
+ *			buffer passed back is automatically returned to the
+ *			application.
  */
 #define IORING_CQE_F_BUFFER		(1U << 0)
 #define IORING_CQE_F_MORE		(1U << 1)
 #define IORING_CQE_F_SOCK_NONEMPTY	(1U << 2)
 #define IORING_CQE_F_NOTIF		(1U << 3)
+#define IORING_CQE_F_BUF_MORE		(1U << 4)
 
 #define IORING_CQE_BUFFER_SHIFT		16
 
@@ -716,9 +726,17 @@ struct io_uring_buf_ring {
  *			mmap(2) with the offset set as:
  *			IORING_OFF_PBUF_RING | (bgid << IORING_OFF_PBUF_SHIFT)
  *			to get a virtual mapping for the ring.
+ * IOU_PBUF_RING_INC:	If set, buffers consumed from this buffer ring can be
+ *			consumed incrementally. Normally one (or more) buffers
+ *			are fully consumed. With incremental consumptions, it's
+ *			feasible to register big ranges of buffers, and each
+ *			use of it will consume only as much as it needs. This
+ *			requires that both the kernel and application keep
+ *			track of where the current read/recv index is at.
  */
 enum io_uring_register_pbuf_ring_flags {
 	IOU_PBUF_RING_MMAP	= 1,
+	IOU_PBUF_RING_INC	= 2,
 };
 
 /* argument for IORING_(UN)REGISTER_PBUF_RING */
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 80bb6e2374e9..1aca501efaf6 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -904,7 +904,7 @@ void io_req_defer_failed(struct io_kiocb *req, s32 res)
 	lockdep_assert_held(&req->ctx->uring_lock);
 
 	req_set_fail(req);
-	io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
+	io_req_set_res(req, res, io_put_kbuf(req, res, IO_URING_F_UNLOCKED));
 	if (def->fail)
 		def->fail(req);
 	io_req_complete_defer(req);
diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c
index 0aa12703bac7..75b69ea85ac1 100644
--- a/io_uring/kbuf.c
+++ b/io_uring/kbuf.c
@@ -165,7 +165,7 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
 		 * the transfer completes (or if we get -EAGAIN and must poll of
 		 * retry).
 		 */
-		io_kbuf_commit(req, bl, 1);
+		io_kbuf_commit(req, bl, *len, 1);
 		req->buf_list = NULL;
 	}
 	return u64_to_user_ptr(buf->addr);
@@ -244,16 +244,21 @@ static int io_ring_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg,
 
 	req->buf_index = buf->bid;
 	do {
+		u32 len = buf->len;
+
 		/* truncate end piece, if needed */
-		if (buf->len > arg->max_len)
-			buf->len = arg->max_len;
+		if (len > arg->max_len) {
+			len = arg->max_len;
+			if (!(bl->flags & IOBL_INC))
+				buf->len = len;
+		}
 
 		iov->iov_base = u64_to_user_ptr(buf->addr);
-		iov->iov_len = buf->len;
+		iov->iov_len = len;
 		iov++;
 
-		arg->out_len += buf->len;
-		arg->max_len -= buf->len;
+		arg->out_len += len;
+		arg->max_len -= len;
 		if (!arg->max_len)
 			break;
 
@@ -290,8 +295,11 @@ int io_buffers_select(struct io_kiocb *req, struct buf_sel_arg *arg,
 		 * committed them, they cannot be put back in the queue.
 		 */
 		if (ret > 0) {
+			if (bl->flags & IOBL_INC)
+				req->flags |= REQ_F_BUFFERS_COMMIT;
+			else
+				bl->head += ret;
 			req->flags |= REQ_F_BL_NO_RECYCLE;
-			bl->head += ret;
 		}
 	} else {
 		ret = io_provided_buffers_select(req, &arg->out_len, bl, arg->iovs);
@@ -675,7 +683,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
 
 	if (reg.resv[0] || reg.resv[1] || reg.resv[2])
 		return -EINVAL;
-	if (reg.flags & ~IOU_PBUF_RING_MMAP)
+	if (reg.flags & ~(IOU_PBUF_RING_MMAP | IOU_PBUF_RING_INC))
 		return -EINVAL;
 	if (!(reg.flags & IOU_PBUF_RING_MMAP)) {
 		if (!reg.ring_addr)
@@ -713,6 +721,8 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
 	if (!ret) {
 		bl->nr_entries = reg.ring_entries;
 		bl->mask = reg.ring_entries - 1;
+		if (reg.flags & IOU_PBUF_RING_INC)
+			bl->flags |= IOBL_INC;
 
 		io_buffer_add_list(ctx, bl, reg.bgid);
 		return 0;
diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h
index b7da3ce880bf..6be7f00c87e1 100644
--- a/io_uring/kbuf.h
+++ b/io_uring/kbuf.h
@@ -9,6 +9,9 @@ enum {
 	IOBL_BUF_RING	= 1,
 	/* ring mapped provided buffers, but mmap'ed by application */
 	IOBL_MMAP	= 2,
+	/* buffers are consumed incrementally rather than always fully */
+	IOBL_INC	= 4,
+
 };
 
 struct io_buffer_list {
@@ -124,31 +127,55 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
 /* Mapped buffer ring, return io_uring_buf from head */
 #define io_ring_head_to_buf(br, head, mask)	&(br)->bufs[(head) & (mask)]
 
-static inline void io_kbuf_commit(struct io_kiocb *req,
-				  struct io_buffer_list *bl, int nr)
+/* returns false if more completions will be done from the current bid */
+static inline bool io_kbuf_commit(struct io_kiocb *req,
+				 struct io_buffer_list *bl, int len, int nr)
 {
 	if (unlikely(!(req->flags & REQ_F_BUFFERS_COMMIT)))
-		return;
-	bl->head += nr;
+		return true;
+
 	req->flags &= ~REQ_F_BUFFERS_COMMIT;
+
+	if (unlikely(len < 0))
+		return true;
+
+	if (bl->flags & IOBL_INC) {
+		struct io_uring_buf *buf;
+
+		buf = io_ring_head_to_buf(bl->buf_ring, bl->head, bl->mask);
+		if (WARN_ON_ONCE(len > buf->len))
+			len = buf->len;
+		buf->len -= len;
+		if (buf->len) {
+			buf->addr += len;
+			return false;
+		}
+	}
+
+	bl->head += nr;
+	return true;
 }
 
-static inline void __io_put_kbuf_ring(struct io_kiocb *req, int nr)
+static inline unsigned __io_put_kbuf_ring(struct io_kiocb *req, int len,
+					  int nr)
 {
 	struct io_buffer_list *bl = req->buf_list;
+	unsigned cflags = 0;
 
 	if (bl) {
-		io_kbuf_commit(req, bl, nr);
+		if (!io_kbuf_commit(req, bl, len, nr))
+			cflags |= IORING_CQE_F_BUF_MORE;
 		req->buf_index = bl->bgid;
 	}
 	req->flags &= ~REQ_F_BUFFER_RING;
+	return cflags;
 }
 
 static inline void __io_put_kbuf_list(struct io_kiocb *req,
 				      struct list_head *list)
 {
 	if (req->flags & REQ_F_BUFFER_RING) {
-		__io_put_kbuf_ring(req, 1);
+		__io_put_kbuf_ring(req, 0, 1);
 	} else {
 		req->buf_index = req->kbuf->bgid;
 		list_add(&req->kbuf->list, list);
@@ -166,8 +193,8 @@ static inline void io_kbuf_drop(struct io_kiocb *req)
 	__io_put_kbuf_list(req, &req->ctx->io_buffers_comp);
 }
 
-static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int nbufs,
-					  unsigned issue_flags)
+static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int len,
+					  int nbufs, unsigned issue_flags)
 {
 	unsigned int ret;
 
@@ -175,22 +202,24 @@ static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int nbufs,
 		return 0;
 
 	ret = IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT);
-	if (req->flags & REQ_F_BUFFER_RING)
-		__io_put_kbuf_ring(req, nbufs);
-	else
+	if (req->flags & REQ_F_BUFFER_RING) {
+		if (!__io_put_kbuf_ring(req, len, nbufs))
+			ret |= IORING_CQE_F_BUF_MORE;
+	} else {
 		__io_put_kbuf(req, issue_flags);
+	}
 	return ret;
 }
 
-static inline unsigned int io_put_kbuf(struct io_kiocb *req,
+static inline unsigned int io_put_kbuf(struct io_kiocb *req, int len,
 				       unsigned issue_flags)
 {
-	return __io_put_kbufs(req, 1, issue_flags);
+	return __io_put_kbufs(req, len, 1, issue_flags);
 }
 
-static inline unsigned int io_put_kbufs(struct io_kiocb *req, int nbufs,
-					unsigned issue_flags)
+static inline unsigned int io_put_kbufs(struct io_kiocb *req, int len,
+					int nbufs, unsigned issue_flags)
 {
-	return __io_put_kbufs(req, nbufs, issue_flags);
+	return __io_put_kbufs(req, len, nbufs, issue_flags);
 }
 #endif
diff --git a/io_uring/net.c b/io_uring/net.c
index cc81bcacdc1b..f10f5a22d66a 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -497,11 +497,11 @@ static inline bool io_send_finish(struct io_kiocb *req, int *ret,
 	unsigned int cflags;
 
 	if (!(sr->flags & IORING_RECVSEND_BUNDLE)) {
-		cflags = io_put_kbuf(req, issue_flags);
+		cflags = io_put_kbuf(req, *ret, issue_flags);
 		goto finish;
 	}
 
-	cflags = io_put_kbufs(req, io_bundle_nbufs(kmsg, *ret), issue_flags);
+	cflags = io_put_kbufs(req, *ret, io_bundle_nbufs(kmsg, *ret), issue_flags);
 
 	if (bundle_finished || req->flags & REQ_F_BL_EMPTY)
 		goto finish;
@@ -842,13 +842,13 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
 		cflags |= IORING_CQE_F_SOCK_NONEMPTY;
 
 	if (sr->flags & IORING_RECVSEND_BUNDLE) {
-		cflags |= io_put_kbufs(req, io_bundle_nbufs(kmsg, *ret),
+		cflags |= io_put_kbufs(req, *ret, io_bundle_nbufs(kmsg, *ret),
 				      issue_flags);
 		/* bundle with no more immediate buffers, we're done */
 		if (req->flags & REQ_F_BL_EMPTY)
 			goto finish;
 	} else {
-		cflags |= io_put_kbuf(req, issue_flags);
+		cflags |= io_put_kbuf(req, *ret, issue_flags);
 	}
 
 	/*
diff --git a/io_uring/rw.c b/io_uring/rw.c
index c004d21e2f12..f5e0694538b9 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -511,7 +511,7 @@ void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
 	io_req_io_end(req);
 
 	if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING))
-		req->cqe.flags |= io_put_kbuf(req, 0);
+		req->cqe.flags |= io_put_kbuf(req, req->cqe.res, 0);
 
 	io_req_rw_cleanup(req, 0);
 	io_req_task_complete(req, ts);
@@ -593,7 +593,7 @@ static int kiocb_done(struct io_kiocb *req, ssize_t ret,
 			 */
 			io_req_io_end(req);
 			io_req_set_res(req, final_ret,
-				       io_put_kbuf(req, issue_flags));
+				       io_put_kbuf(req, ret, issue_flags));
 			io_req_rw_cleanup(req, issue_flags);
 			return IOU_OK;
 		}
@@ -975,7 +975,7 @@ int io_read_mshot(struct io_kiocb *req, unsigned int issue_flags)
 		 * Put our buffer and post a CQE. If we fail to post a CQE, then
 		 * jump to the termination path. This request is then done.
 		 */
-		cflags = io_put_kbuf(req, issue_flags);
+		cflags = io_put_kbuf(req, ret, issue_flags);
 		rw->len = 0; /* similarly to above, reset len to 0 */
 
 		if (io_req_post_cqe(req, ret, cflags | IORING_CQE_F_MORE)) {
@@ -1167,7 +1167,7 @@ int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
 		if (!smp_load_acquire(&req->iopoll_completed))
 			break;
 		nr_events++;
-		req->cqe.flags = io_put_kbuf(req, 0);
+		req->cqe.flags = io_put_kbuf(req, req->cqe.res, 0);
 		if (req->opcode != IORING_OP_URING_CMD)
 			io_req_rw_cleanup(req, 0);
 	}
-- 
2.43.0


  parent reply	other threads:[~2024-08-23 14:51 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-08-23 14:42 [PATCHSET v2 RFC 0/4] Add support for incremental buffer consumption Jens Axboe
2024-08-23 14:42 ` [PATCH 1/4] io_uring/kbuf: add io_kbuf_commit() helper Jens Axboe
2024-08-23 14:42 ` [PATCH 2/4] io_uring/kbuf: move io_ring_head_to_buf() to kbuf.h Jens Axboe
2024-08-23 14:42 ` [PATCH 3/4] Revert "io_uring: Require zeroed sqe->len on provided-buffers send" Jens Axboe
2024-08-23 14:42 ` Jens Axboe [this message]
  -- strict thread matches above, loose matches on Subject: below --
2024-08-24 15:43 [PATCHSET v3 0/4] Add support for incremental buffer consumption Jens Axboe
2024-08-24 15:43 ` [PATCH 4/4] io_uring/kbuf: add " Jens Axboe

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox