* [PATCH 1/9] io_uring/msg_ring: tighten requirement for remote posting
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
2024-06-05 13:51 ` [PATCH 2/9] io_uring: keep track of overflow entry count Jens Axboe
` (7 subsequent siblings)
8 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
Currently this is gated on whether or not the target ring needs a local
completion - and if so, whether or not we're running on the right task.
The use case for same thread cross posting is probably a lot less
relevant than remote posting. And since we're going to improve this
situation anyway, just gate it on local posting and ignore what task
we're currently running on.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/msg_ring.c | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 81c4a9d43729..9fdb0cc19bfd 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -68,9 +68,7 @@ void io_msg_ring_cleanup(struct io_kiocb *req)
static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
{
- if (!target_ctx->task_complete)
- return false;
- return current != target_ctx->submitter_task;
+ return target_ctx->task_complete;
}
static int io_msg_exec_remote(struct io_kiocb *req, task_work_func_t func)
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 2/9] io_uring: keep track of overflow entry count
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
2024-06-05 13:51 ` [PATCH 1/9] io_uring/msg_ring: tighten requirement for remote posting Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
2024-06-05 13:51 ` [PATCH 3/9] io_uring: abstract out helpers for DEFER_TASKRUN wakeup batching Jens Axboe
` (6 subsequent siblings)
8 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
Not needed just yet, but in preparation for factoring these in for the
wakeup batching.
Signed-off-by: Jens Axboe <[email protected]>
---
include/linux/io_uring_types.h | 1 +
io_uring/io_uring.c | 2 ++
2 files changed, 3 insertions(+)
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index a2227ab7fd16..d795f3f3a705 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -325,6 +325,7 @@ struct io_ring_ctx {
unsigned long check_cq;
atomic_t cq_wait_nr;
atomic_t cq_timeouts;
+ int nr_overflow;
struct wait_queue_head cq_wait;
} ____cacheline_aligned_in_smp;
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 96f6da0bf5cd..94af56dd5344 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -618,6 +618,7 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying)
memcpy(cqe, &ocqe->cqe, cqe_size);
}
list_del(&ocqe->list);
+ ctx->nr_overflow--;
kfree(ocqe);
}
@@ -724,6 +725,7 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
ocqe->cqe.big_cqe[0] = extra1;
ocqe->cqe.big_cqe[1] = extra2;
}
+ ctx->nr_overflow++;
list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
return true;
}
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 3/9] io_uring: abstract out helpers for DEFER_TASKRUN wakeup batching
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
2024-06-05 13:51 ` [PATCH 1/9] io_uring/msg_ring: tighten requirement for remote posting Jens Axboe
2024-06-05 13:51 ` [PATCH 2/9] io_uring: keep track of overflow entry count Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
2024-06-05 13:51 ` [PATCH 4/9] io_uring/msg_ring: avoid double indirection task_work for data messages Jens Axboe
` (5 subsequent siblings)
8 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
In preparation for being able to use these two elsewhere, factor out
the helpers that io_req_local_work_add() uses to do wakeup batching.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/io_uring.c | 24 +++---------------------
io_uring/io_uring.h | 44 ++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 47 insertions(+), 21 deletions(-)
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 94af56dd5344..499255ef62c7 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -1103,7 +1103,7 @@ void tctx_task_work(struct callback_head *cb)
static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
struct io_ring_ctx *ctx = req->ctx;
- unsigned nr_wait, nr_tw, nr_tw_prev;
+ unsigned nr_tw, nr_tw_prev;
struct llist_node *head;
/* See comment above IO_CQ_WAKE_INIT */
@@ -1116,19 +1116,8 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
flags &= ~IOU_F_TWQ_LAZY_WAKE;
- head = READ_ONCE(ctx->work_llist.first);
do {
- nr_tw_prev = 0;
- if (head) {
- struct io_kiocb *first_req = container_of(head,
- struct io_kiocb,
- io_task_work.node);
- /*
- * Might be executed at any moment, rely on
- * SLAB_TYPESAFE_BY_RCU to keep it alive.
- */
- nr_tw_prev = READ_ONCE(first_req->nr_tw);
- }
+ head = io_defer_tw_count(ctx, &nr_tw_prev);
/*
* Theoretically, it can overflow, but that's fine as one of
@@ -1158,14 +1147,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
io_eventfd_signal(ctx);
}
- nr_wait = atomic_read(&ctx->cq_wait_nr);
- /* not enough or no one is waiting */
- if (nr_tw < nr_wait)
- return;
- /* the previous add has already woken it up */
- if (nr_tw_prev >= nr_wait)
- return;
- wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
+ io_defer_wake(ctx, nr_tw, nr_tw_prev);
}
static void io_req_normal_work_add(struct io_kiocb *req)
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index cd43924eed04..fdcf1a2a6b8a 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -444,4 +444,48 @@ static inline bool io_has_work(struct io_ring_ctx *ctx)
return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
!llist_empty(&ctx->work_llist);
}
+
+/*
+ * Return first request nr_tw field. Only applicable for users of
+ * ctx->work_llist, which is DEFER_TASKRUN. Must be called with the RCU read
+ * lock held. Returns the current task_work count and head of list, if any.
+ */
+static inline struct llist_node *io_defer_tw_count(struct io_ring_ctx *ctx,
+ unsigned *nr_tw_prev)
+{
+ struct llist_node *head = READ_ONCE(ctx->work_llist.first);
+
+ *nr_tw_prev = 0;
+ if (head) {
+ struct io_kiocb *first;
+
+ first = container_of(head, struct io_kiocb, io_task_work.node);
+ /*
+ * Might be executed at any moment, rely on
+ * SLAB_TYPESAFE_BY_RCU to keep it alive.
+ */
+ *nr_tw_prev = READ_ONCE(first->nr_tw);
+ }
+
+ return head;
+}
+
+static inline void io_defer_wake(struct io_ring_ctx *ctx, unsigned nr_tw,
+ unsigned nr_tw_prev)
+{
+ struct task_struct *task = READ_ONCE(ctx->submitter_task);
+ unsigned nr_wait;
+
+ /* add pending overflows, for MSG_RING */
+ nr_tw += READ_ONCE(ctx->nr_overflow);
+
+ nr_wait = atomic_read(&ctx->cq_wait_nr);
+ /* not enough or no one is waiting */
+ if (nr_tw < nr_wait)
+ return;
+ /* the previous add has already woken it up */
+ if (nr_tw_prev >= nr_wait)
+ return;
+ wake_up_state(task, TASK_INTERRUPTIBLE);
+}
#endif
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 4/9] io_uring/msg_ring: avoid double indirection task_work for data messages
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
` (2 preceding siblings ...)
2024-06-05 13:51 ` [PATCH 3/9] io_uring: abstract out helpers for DEFER_TASKRUN wakeup batching Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
2024-06-05 13:51 ` [PATCH 5/9] io_uring/msg_ring: avoid double indirection task_work for fd passing Jens Axboe
` (4 subsequent siblings)
8 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
If IORING_SETUP_DEFER_TASKRUN is set, then we can't post CQEs remotely
to the target ring. Instead, task_work is queued for the target ring,
which is used to post the CQE. To make matters worse, once the target
CQE has been posted, task_work is then queued with the originator to
fill the completion.
This obviously adds a bunch of overhead and latency. Instead of relying
on generic kernel task_work for this, fill an overflow entry on the
target ring and flag it as such that the target ring will flush it. This
avoids both the task_work for posting the CQE, and it means that the
originator CQE can be filled inline as well.
In local testing, this reduces the latency on the sender side by 5-6x.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/msg_ring.c | 88 ++++++++++++++++++++++++++++-----------------
1 file changed, 56 insertions(+), 32 deletions(-)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 9fdb0cc19bfd..2b649087fe5c 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -87,38 +87,62 @@ static int io_msg_exec_remote(struct io_kiocb *req, task_work_func_t func)
return IOU_ISSUE_SKIP_COMPLETE;
}
-static void io_msg_tw_complete(struct callback_head *head)
+static struct io_overflow_cqe *io_alloc_overflow(struct io_ring_ctx *target_ctx)
{
- struct io_msg *msg = container_of(head, struct io_msg, tw);
- struct io_kiocb *req = cmd_to_io_kiocb(msg);
- struct io_ring_ctx *target_ctx = req->file->private_data;
- int ret = 0;
-
- if (current->flags & PF_EXITING) {
- ret = -EOWNERDEAD;
- } else {
- u32 flags = 0;
-
- if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
- flags = msg->cqe_flags;
-
- /*
- * If the target ring is using IOPOLL mode, then we need to be
- * holding the uring_lock for posting completions. Other ring
- * types rely on the regular completion locking, which is
- * handled while posting.
- */
- if (target_ctx->flags & IORING_SETUP_IOPOLL)
- mutex_lock(&target_ctx->uring_lock);
- if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, flags))
- ret = -EOVERFLOW;
- if (target_ctx->flags & IORING_SETUP_IOPOLL)
- mutex_unlock(&target_ctx->uring_lock);
+ bool is_cqe32 = target_ctx->flags & IORING_SETUP_CQE32;
+ size_t cqe_size = sizeof(struct io_overflow_cqe);
+ struct io_overflow_cqe *ocqe;
+
+ if (is_cqe32)
+ cqe_size += sizeof(struct io_uring_cqe);
+
+ ocqe = kmalloc(cqe_size, GFP_ATOMIC | __GFP_ACCOUNT);
+ if (!ocqe)
+ return NULL;
+
+ if (is_cqe32)
+ ocqe->cqe.big_cqe[0] = ocqe->cqe.big_cqe[1] = 0;
+
+ return ocqe;
+}
+
+/*
+ * Entered with the target uring_lock held, and will drop it before
+ * returning. Adds a previously allocated ocqe to the overflow list on
+ * the target, and marks it appropriately for flushing.
+ */
+static void io_msg_add_overflow(struct io_msg *msg,
+ struct io_ring_ctx *target_ctx,
+ struct io_overflow_cqe *ocqe, int ret,
+ u32 flags)
+ __releases(&target_ctx->completion_lock)
+{
+ if (list_empty(&target_ctx->cq_overflow_list)) {
+ set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &target_ctx->check_cq);
+ atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags);
}
- if (ret < 0)
- req_set_fail(req);
- io_req_queue_tw_complete(req, ret);
+ ocqe->cqe.user_data = msg->user_data;
+ ocqe->cqe.res = ret;
+ ocqe->cqe.flags = flags;
+ target_ctx->nr_overflow++;
+ list_add_tail(&ocqe->list, &target_ctx->cq_overflow_list);
+ spin_unlock(&target_ctx->completion_lock);
+ wake_up_state(target_ctx->submitter_task, TASK_INTERRUPTIBLE);
+}
+
+static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
+ struct io_ring_ctx *target_ctx, u32 flags)
+{
+ struct io_overflow_cqe *ocqe;
+
+ ocqe = io_alloc_overflow(target_ctx);
+ if (!ocqe)
+ return -ENOMEM;
+
+ spin_lock(&target_ctx->completion_lock);
+ io_msg_add_overflow(msg, target_ctx, ocqe, msg->len, flags);
+ return 0;
}
static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
@@ -135,12 +159,12 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
if (target_ctx->flags & IORING_SETUP_R_DISABLED)
return -EBADFD;
- if (io_msg_need_remote(target_ctx))
- return io_msg_exec_remote(req, io_msg_tw_complete);
-
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
flags = msg->cqe_flags;
+ if (io_msg_need_remote(target_ctx))
+ return io_msg_fill_remote(msg, issue_flags, target_ctx, flags);
+
ret = -EOVERFLOW;
if (target_ctx->flags & IORING_SETUP_IOPOLL) {
if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 5/9] io_uring/msg_ring: avoid double indirection task_work for fd passing
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
` (3 preceding siblings ...)
2024-06-05 13:51 ` [PATCH 4/9] io_uring/msg_ring: avoid double indirection task_work for data messages Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
2024-06-05 13:51 ` [PATCH 6/9] io_uring/msg_ring: add an alloc cache for CQE entries Jens Axboe
` (3 subsequent siblings)
8 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
Like what was done for MSG_RING data passing avoiding a double task_work
roundtrip for IORING_SETUP_DEFER_TASKRUN, implement the same model for
fd passing. File descriptor passing is separately locked anyway, so the
only remaining issue is CQE posting, just like it was for data passing.
And for that, we can use the same approach.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/msg_ring.c | 57 ++++++++++++++++++++++++---------------------
1 file changed, 31 insertions(+), 26 deletions(-)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 2b649087fe5c..1ee89bdbbb5b 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -71,22 +71,6 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
return target_ctx->task_complete;
}
-static int io_msg_exec_remote(struct io_kiocb *req, task_work_func_t func)
-{
- struct io_ring_ctx *ctx = req->file->private_data;
- struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
- struct task_struct *task = READ_ONCE(ctx->submitter_task);
-
- if (unlikely(!task))
- return -EOWNERDEAD;
-
- init_task_work(&msg->tw, func);
- if (task_work_add(task, &msg->tw, TWA_SIGNAL))
- return -EOWNERDEAD;
-
- return IOU_ISSUE_SKIP_COMPLETE;
-}
-
static struct io_overflow_cqe *io_alloc_overflow(struct io_ring_ctx *target_ctx)
{
bool is_cqe32 = target_ctx->flags & IORING_SETUP_CQE32;
@@ -227,17 +211,38 @@ static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flag
return ret;
}
-static void io_msg_tw_fd_complete(struct callback_head *head)
+static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
+ struct io_ring_ctx *target_ctx)
{
- struct io_msg *msg = container_of(head, struct io_msg, tw);
- struct io_kiocb *req = cmd_to_io_kiocb(msg);
- int ret = -EOWNERDEAD;
+ struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
+ struct io_overflow_cqe *ocqe = NULL;
+ int ret;
- if (!(current->flags & PF_EXITING))
- ret = io_msg_install_complete(req, IO_URING_F_UNLOCKED);
- if (ret < 0)
- req_set_fail(req);
- io_req_queue_tw_complete(req, ret);
+ if (!(msg->flags & IORING_MSG_RING_CQE_SKIP)) {
+ ocqe = io_alloc_overflow(target_ctx);
+ if (!ocqe)
+ return -ENOMEM;
+ }
+
+ if (unlikely(io_double_lock_ctx(target_ctx, issue_flags))) {
+ kfree(ocqe);
+ return -EAGAIN;
+ }
+
+ ret = __io_fixed_fd_install(target_ctx, msg->src_file, msg->dst_fd);
+ mutex_unlock(&target_ctx->uring_lock);
+
+ if (ret >= 0) {
+ msg->src_file = NULL;
+ req->flags &= ~REQ_F_NEED_CLEANUP;
+ if (ocqe) {
+ spin_lock(&target_ctx->completion_lock);
+ io_msg_add_overflow(msg, target_ctx, ocqe, ret, 0);
+ return 0;
+ }
+ }
+ kfree(ocqe);
+ return ret;
}
static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
@@ -262,7 +267,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
}
if (io_msg_need_remote(target_ctx))
- return io_msg_exec_remote(req, io_msg_tw_fd_complete);
+ return io_msg_install_remote(req, issue_flags, target_ctx);
return io_msg_install_complete(req, issue_flags);
}
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 6/9] io_uring/msg_ring: add an alloc cache for CQE entries
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
` (4 preceding siblings ...)
2024-06-05 13:51 ` [PATCH 5/9] io_uring/msg_ring: avoid double indirection task_work for fd passing Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
2024-06-05 13:51 ` [PATCH 7/9] io_uring/msg_ring: remove callback_head from struct io_msg Jens Axboe
` (2 subsequent siblings)
8 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
io_uring accounts the memory allocated, which is quite expensive. Wrap
the allocation and frees in the provided alloc cache framework. The
target ctx needs to be locked anyway for posting the overflow entry,
so just move the overflow alloc inside that section. Flushing the
entries has it locked as well, so io_cache_alloc_free() can be used.
In a simple test, most of the overhead of DEFER_TASKRUN message passing
ends up being accounting for allocation and free, and with this change
it's completely gone.
Signed-off-by: Jens Axboe <[email protected]>
---
include/linux/io_uring_types.h | 7 ++++
io_uring/io_uring.c | 7 +++-
io_uring/msg_ring.c | 76 +++++++++++++++++++++++-----------
io_uring/msg_ring.h | 3 ++
4 files changed, 68 insertions(+), 25 deletions(-)
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index d795f3f3a705..c7f3a330482d 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -350,6 +350,13 @@ struct io_ring_ctx {
struct io_alloc_cache futex_cache;
#endif
+ /*
+ * Unlike the other caches, this one is used by the sender of messages
+ * to this ring, not by the ring itself. As such, protection for this
+ * cache is under ->completion_lock, not ->uring_lock.
+ */
+ struct io_alloc_cache msg_cache;
+
const struct cred *sq_creds; /* cred used for __io_sq_thread() */
struct io_sq_data *sq_data; /* if using sq thread polling */
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 499255ef62c7..3c77d96fc6d7 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -95,6 +95,7 @@
#include "futex.h"
#include "napi.h"
#include "uring_cmd.h"
+#include "msg_ring.h"
#include "memmap.h"
#include "timeout.h"
@@ -316,6 +317,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
ret |= io_alloc_cache_init(&ctx->uring_cache, IO_ALLOC_CACHE_MAX,
sizeof(struct uring_cache));
ret |= io_futex_cache_init(ctx);
+ ret |= io_msg_cache_init(ctx);
if (ret)
goto err;
init_completion(&ctx->ref_comp);
@@ -352,6 +354,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
io_alloc_cache_free(&ctx->uring_cache, kfree);
io_futex_cache_free(ctx);
+ io_msg_cache_free(ctx);
kfree(ctx->cancel_table.hbs);
kfree(ctx->cancel_table_locked.hbs);
xa_destroy(&ctx->io_bl_xa);
@@ -619,7 +622,8 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying)
}
list_del(&ocqe->list);
ctx->nr_overflow--;
- kfree(ocqe);
+ if (!io_alloc_cache_put(&ctx->msg_cache, ocqe))
+ kfree(ocqe);
}
if (list_empty(&ctx->cq_overflow_list)) {
@@ -2556,6 +2560,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
io_alloc_cache_free(&ctx->uring_cache, kfree);
io_futex_cache_free(ctx);
+ io_msg_cache_free(ctx);
io_destroy_buffers(ctx);
mutex_unlock(&ctx->uring_lock);
if (ctx->sq_creds)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 1ee89bdbbb5b..a33228f8c364 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -11,6 +11,7 @@
#include "io_uring.h"
#include "rsrc.h"
#include "filetable.h"
+#include "alloc_cache.h"
#include "msg_ring.h"
@@ -72,20 +73,28 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
}
static struct io_overflow_cqe *io_alloc_overflow(struct io_ring_ctx *target_ctx)
+ __acquires(&target_ctx->completion_lock)
{
- bool is_cqe32 = target_ctx->flags & IORING_SETUP_CQE32;
- size_t cqe_size = sizeof(struct io_overflow_cqe);
struct io_overflow_cqe *ocqe;
- if (is_cqe32)
- cqe_size += sizeof(struct io_uring_cqe);
+ spin_lock(&target_ctx->completion_lock);
+
+ ocqe = io_alloc_cache_get(&target_ctx->msg_cache);
+ if (!ocqe) {
+ bool is_cqe32 = target_ctx->flags & IORING_SETUP_CQE32;
+ size_t cqe_size = sizeof(struct io_overflow_cqe);
- ocqe = kmalloc(cqe_size, GFP_ATOMIC | __GFP_ACCOUNT);
- if (!ocqe)
- return NULL;
+ if (is_cqe32)
+ cqe_size += sizeof(struct io_uring_cqe);
+
+ ocqe = kmalloc(cqe_size, GFP_ATOMIC | __GFP_ACCOUNT);
+ if (!ocqe)
+ return NULL;
- if (is_cqe32)
- ocqe->cqe.big_cqe[0] = ocqe->cqe.big_cqe[1] = 0;
+ /* just init at alloc time, won't change */
+ if (is_cqe32)
+ ocqe->cqe.big_cqe[0] = ocqe->cqe.big_cqe[1] = 0;
+ }
return ocqe;
}
@@ -121,12 +130,13 @@ static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
struct io_overflow_cqe *ocqe;
ocqe = io_alloc_overflow(target_ctx);
- if (!ocqe)
- return -ENOMEM;
+ if (ocqe) {
+ io_msg_add_overflow(msg, target_ctx, ocqe, msg->len, flags);
+ return 0;
+ }
- spin_lock(&target_ctx->completion_lock);
- io_msg_add_overflow(msg, target_ctx, ocqe, msg->len, flags);
- return 0;
+ spin_unlock(&target_ctx->completion_lock);
+ return -ENOMEM;
}
static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
@@ -216,17 +226,17 @@ static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
{
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
struct io_overflow_cqe *ocqe = NULL;
- int ret;
+ int ret = -ENOMEM;
+
+ if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
+ return -EAGAIN;
if (!(msg->flags & IORING_MSG_RING_CQE_SKIP)) {
ocqe = io_alloc_overflow(target_ctx);
- if (!ocqe)
- return -ENOMEM;
- }
-
- if (unlikely(io_double_lock_ctx(target_ctx, issue_flags))) {
- kfree(ocqe);
- return -EAGAIN;
+ if (unlikely(!ocqe)) {
+ mutex_unlock(&target_ctx->uring_lock);
+ goto err;
+ }
}
ret = __io_fixed_fd_install(target_ctx, msg->src_file, msg->dst_fd);
@@ -236,12 +246,15 @@ static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
msg->src_file = NULL;
req->flags &= ~REQ_F_NEED_CLEANUP;
if (ocqe) {
- spin_lock(&target_ctx->completion_lock);
io_msg_add_overflow(msg, target_ctx, ocqe, ret, 0);
return 0;
}
}
- kfree(ocqe);
+ if (ocqe) {
+err:
+ spin_unlock(&target_ctx->completion_lock);
+ kfree(ocqe);
+ }
return ret;
}
@@ -321,3 +334,18 @@ int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags)
io_req_set_res(req, ret, 0);
return IOU_OK;
}
+
+int io_msg_cache_init(struct io_ring_ctx *ctx)
+{
+ size_t size = sizeof(struct io_overflow_cqe);
+
+ if (ctx->flags & IORING_SETUP_CQE32)
+ size += sizeof(struct io_uring_cqe);
+
+ return io_alloc_cache_init(&ctx->msg_cache, IO_ALLOC_CACHE_MAX, size);
+}
+
+void io_msg_cache_free(struct io_ring_ctx *ctx)
+{
+ io_alloc_cache_free(&ctx->msg_cache, kfree);
+}
diff --git a/io_uring/msg_ring.h b/io_uring/msg_ring.h
index 3987ee6c0e5f..94f5716d522e 100644
--- a/io_uring/msg_ring.h
+++ b/io_uring/msg_ring.h
@@ -3,3 +3,6 @@
int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags);
void io_msg_ring_cleanup(struct io_kiocb *req);
+
+int io_msg_cache_init(struct io_ring_ctx *ctx);
+void io_msg_cache_free(struct io_ring_ctx *ctx);
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 7/9] io_uring/msg_ring: remove callback_head from struct io_msg
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
` (5 preceding siblings ...)
2024-06-05 13:51 ` [PATCH 6/9] io_uring/msg_ring: add an alloc cache for CQE entries Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
2024-06-05 13:51 ` [PATCH 8/9] io_uring/msg_ring: add basic wakeup batch support Jens Axboe
2024-06-05 13:51 ` [PATCH 9/9] io_uring/msg_ring: remove non-remote message passing Jens Axboe
8 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
This is now unused, get rid of it.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/msg_ring.c | 1 -
1 file changed, 1 deletion(-)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index a33228f8c364..9a7c63f38c46 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -22,7 +22,6 @@
struct io_msg {
struct file *file;
struct file *src_file;
- struct callback_head tw;
u64 user_data;
u32 len;
u32 cmd;
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 8/9] io_uring/msg_ring: add basic wakeup batch support
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
` (6 preceding siblings ...)
2024-06-05 13:51 ` [PATCH 7/9] io_uring/msg_ring: remove callback_head from struct io_msg Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
2024-06-05 15:32 ` Pavel Begunkov
2024-06-05 13:51 ` [PATCH 9/9] io_uring/msg_ring: remove non-remote message passing Jens Axboe
8 siblings, 1 reply; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
Factor in the number of overflow entries waiting, on both the msg ring
and local task_work add side.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/msg_ring.c | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 9a7c63f38c46..eeca1563ceed 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -109,6 +109,8 @@ static void io_msg_add_overflow(struct io_msg *msg,
u32 flags)
__releases(&target_ctx->completion_lock)
{
+ unsigned nr_prev, nr_wait;
+
if (list_empty(&target_ctx->cq_overflow_list)) {
set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &target_ctx->check_cq);
atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags);
@@ -117,10 +119,14 @@ static void io_msg_add_overflow(struct io_msg *msg,
ocqe->cqe.user_data = msg->user_data;
ocqe->cqe.res = ret;
ocqe->cqe.flags = flags;
- target_ctx->nr_overflow++;
+ nr_prev = target_ctx->nr_overflow++;
list_add_tail(&ocqe->list, &target_ctx->cq_overflow_list);
spin_unlock(&target_ctx->completion_lock);
- wake_up_state(target_ctx->submitter_task, TASK_INTERRUPTIBLE);
+ rcu_read_lock();
+ io_defer_tw_count(target_ctx, &nr_wait);
+ nr_prev += nr_wait;
+ io_defer_wake(target_ctx, nr_prev + 1, nr_prev);
+ rcu_read_unlock();
}
static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread
* Re: [PATCH 8/9] io_uring/msg_ring: add basic wakeup batch support
2024-06-05 13:51 ` [PATCH 8/9] io_uring/msg_ring: add basic wakeup batch support Jens Axboe
@ 2024-06-05 15:32 ` Pavel Begunkov
2024-06-05 15:50 ` Jens Axboe
0 siblings, 1 reply; 12+ messages in thread
From: Pavel Begunkov @ 2024-06-05 15:32 UTC (permalink / raw)
To: Jens Axboe, io-uring
On 6/5/24 14:51, Jens Axboe wrote:
> Factor in the number of overflow entries waiting, on both the msg ring
> and local task_work add side.
Did you forget to add the local tw change to the patch?
> Signed-off-by: Jens Axboe <[email protected]>
> ---
> io_uring/msg_ring.c | 10 ++++++++--
> 1 file changed, 8 insertions(+), 2 deletions(-)
>
> diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
> index 9a7c63f38c46..eeca1563ceed 100644
> --- a/io_uring/msg_ring.c
> +++ b/io_uring/msg_ring.c
> @@ -109,6 +109,8 @@ static void io_msg_add_overflow(struct io_msg *msg,
> u32 flags)
> __releases(&target_ctx->completion_lock)
> {
> + unsigned nr_prev, nr_wait;
> +
> if (list_empty(&target_ctx->cq_overflow_list)) {
> set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &target_ctx->check_cq);
> atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags);
> @@ -117,10 +119,14 @@ static void io_msg_add_overflow(struct io_msg *msg,
> ocqe->cqe.user_data = msg->user_data;
> ocqe->cqe.res = ret;
> ocqe->cqe.flags = flags;
> - target_ctx->nr_overflow++;
> + nr_prev = target_ctx->nr_overflow++;
> list_add_tail(&ocqe->list, &target_ctx->cq_overflow_list);
> spin_unlock(&target_ctx->completion_lock);
> - wake_up_state(target_ctx->submitter_task, TASK_INTERRUPTIBLE);
> + rcu_read_lock();
> + io_defer_tw_count(target_ctx, &nr_wait);
> + nr_prev += nr_wait;
> + io_defer_wake(target_ctx, nr_prev + 1, nr_prev);
> + rcu_read_unlock();
> }
>
> static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH 8/9] io_uring/msg_ring: add basic wakeup batch support
2024-06-05 15:32 ` Pavel Begunkov
@ 2024-06-05 15:50 ` Jens Axboe
0 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 15:50 UTC (permalink / raw)
To: Pavel Begunkov, io-uring
On 6/5/24 9:32 AM, Pavel Begunkov wrote:
> On 6/5/24 14:51, Jens Axboe wrote:
>> Factor in the number of overflow entries waiting, on both the msg ring
>> and local task_work add side.
>
> Did you forget to add the local tw change to the patch?
Gah yes, looked like that line got dropped while shuffling the
patches around...
--
Jens Axboe
^ permalink raw reply [flat|nested] 12+ messages in thread
* [PATCH 9/9] io_uring/msg_ring: remove non-remote message passing
2024-06-05 13:51 [PATCHSET v3 0/9] Improve MSG_RING DEFER_TASKRUN performance Jens Axboe
` (7 preceding siblings ...)
2024-06-05 13:51 ` [PATCH 8/9] io_uring/msg_ring: add basic wakeup batch support Jens Axboe
@ 2024-06-05 13:51 ` Jens Axboe
8 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-06-05 13:51 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, Jens Axboe
Now that the overflow approach works well, there's no need to retain the
double locking for direct CQ posting on the target ring. Just have any
kind of target ring use the same messaging mechanism.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/msg_ring.c | 79 ++++++++-------------------------------------
1 file changed, 14 insertions(+), 65 deletions(-)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index eeca1563ceed..9fb355b7e736 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -33,11 +33,6 @@ struct io_msg {
u32 flags;
};
-static void io_double_unlock_ctx(struct io_ring_ctx *octx)
-{
- mutex_unlock(&octx->uring_lock);
-}
-
static int io_double_lock_ctx(struct io_ring_ctx *octx,
unsigned int issue_flags)
{
@@ -66,11 +61,6 @@ void io_msg_ring_cleanup(struct io_kiocb *req)
msg->src_file = NULL;
}
-static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
-{
- return target_ctx->task_complete;
-}
-
static struct io_overflow_cqe *io_alloc_overflow(struct io_ring_ctx *target_ctx)
__acquires(&target_ctx->completion_lock)
{
@@ -109,7 +99,7 @@ static void io_msg_add_overflow(struct io_msg *msg,
u32 flags)
__releases(&target_ctx->completion_lock)
{
- unsigned nr_prev, nr_wait;
+ unsigned nr_prev;
if (list_empty(&target_ctx->cq_overflow_list)) {
set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &target_ctx->check_cq);
@@ -122,11 +112,17 @@ static void io_msg_add_overflow(struct io_msg *msg,
nr_prev = target_ctx->nr_overflow++;
list_add_tail(&ocqe->list, &target_ctx->cq_overflow_list);
spin_unlock(&target_ctx->completion_lock);
- rcu_read_lock();
- io_defer_tw_count(target_ctx, &nr_wait);
- nr_prev += nr_wait;
- io_defer_wake(target_ctx, nr_prev + 1, nr_prev);
- rcu_read_unlock();
+ if (target_ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+ unsigned nr_wait;
+
+ rcu_read_lock();
+ io_defer_tw_count(target_ctx, &nr_wait);
+ nr_prev += nr_wait;
+ io_defer_wake(target_ctx, nr_prev + 1, nr_prev);
+ rcu_read_unlock();
+ } else if (wq_has_sleeper(&target_ctx->cq_wait)) {
+ wake_up(&target_ctx->cq_wait);
+ }
}
static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
@@ -149,7 +145,6 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
struct io_ring_ctx *target_ctx = req->file->private_data;
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
u32 flags = 0;
- int ret;
if (msg->src_fd || msg->flags & ~IORING_MSG_RING_FLAGS_PASS)
return -EINVAL;
@@ -161,19 +156,7 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
flags = msg->cqe_flags;
- if (io_msg_need_remote(target_ctx))
- return io_msg_fill_remote(msg, issue_flags, target_ctx, flags);
-
- ret = -EOVERFLOW;
- if (target_ctx->flags & IORING_SETUP_IOPOLL) {
- if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
- return -EAGAIN;
- }
- if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, flags))
- ret = 0;
- if (target_ctx->flags & IORING_SETUP_IOPOLL)
- io_double_unlock_ctx(target_ctx);
- return ret;
+ return io_msg_fill_remote(msg, issue_flags, target_ctx, flags);
}
static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags)
@@ -194,38 +177,6 @@ static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_fl
return file;
}
-static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flags)
-{
- struct io_ring_ctx *target_ctx = req->file->private_data;
- struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
- struct file *src_file = msg->src_file;
- int ret;
-
- if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
- return -EAGAIN;
-
- ret = __io_fixed_fd_install(target_ctx, src_file, msg->dst_fd);
- if (ret < 0)
- goto out_unlock;
-
- msg->src_file = NULL;
- req->flags &= ~REQ_F_NEED_CLEANUP;
-
- if (msg->flags & IORING_MSG_RING_CQE_SKIP)
- goto out_unlock;
- /*
- * If this fails, the target still received the file descriptor but
- * wasn't notified of the fact. This means that if this request
- * completes with -EOVERFLOW, then the sender must ensure that a
- * later IORING_OP_MSG_RING delivers the message.
- */
- if (!io_post_aux_cqe(target_ctx, msg->user_data, ret, 0))
- ret = -EOVERFLOW;
-out_unlock:
- io_double_unlock_ctx(target_ctx);
- return ret;
-}
-
static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
struct io_ring_ctx *target_ctx)
{
@@ -284,9 +235,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
req->flags |= REQ_F_NEED_CLEANUP;
}
- if (io_msg_need_remote(target_ctx))
- return io_msg_install_remote(req, issue_flags, target_ctx);
- return io_msg_install_complete(req, issue_flags);
+ return io_msg_install_remote(req, issue_flags, target_ctx);
}
int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
--
2.43.0
^ permalink raw reply related [flat|nested] 12+ messages in thread