From: Jens Axboe <[email protected]>
To: [email protected]
Cc: Jens Axboe <[email protected]>
Subject: [PATCH 2/4] io_uring: switch deferred task_work to an io_wq_work_list
Date: Tue, 26 Mar 2024 12:42:46 -0600 [thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
Lockless lists may be handy for some things, but they mean that items
are in the reverse order as we can only add to the head of the list.
That in turn means that iterating items on the list needs to reverse it
first, if it's sensitive to ordering between items on the list.
Switch the DEFER_TASKRUN work list from an llist to a normal
io_wq_work_list, and protect it with a lock. Then we can get rid of the
manual reversing of the list when running it, which takes considerable
cycles particularly for bursty task_work additions.
Signed-off-by: Jens Axboe <[email protected]>
---
include/linux/io_uring_types.h | 11 ++--
io_uring/io_uring.c | 117 ++++++++++++---------------------
io_uring/io_uring.h | 4 +-
3 files changed, 51 insertions(+), 81 deletions(-)
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index aeb4639785b5..e51bf15196e4 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -329,7 +329,9 @@ struct io_ring_ctx {
* regularly bounce b/w CPUs.
*/
struct {
- struct llist_head work_llist;
+ struct io_wq_work_list work_list;
+ spinlock_t work_lock;
+ int work_items;
unsigned long check_cq;
atomic_t cq_wait_nr;
atomic_t cq_timeouts;
@@ -559,7 +561,10 @@ enum {
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
struct io_task_work {
- struct llist_node node;
+ union {
+ struct io_wq_work_node node;
+ struct llist_node llist_node;
+ };
io_req_tw_func_t func;
};
@@ -615,8 +620,6 @@ struct io_kiocb {
*/
u16 buf_index;
- unsigned nr_tw;
-
/* REQ_F_* flags */
io_req_flags_t flags;
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 87d7d8bbf814..9c06911077db 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -249,7 +249,7 @@ static __cold void io_fallback_req_func(struct work_struct *work)
percpu_ref_get(&ctx->refs);
mutex_lock(&ctx->uring_lock);
- llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
+ llist_for_each_entry_safe(req, tmp, node, io_task_work.llist_node)
req->io_task_work.func(req, &ts);
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
@@ -330,7 +330,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
- init_llist_head(&ctx->work_llist);
+ INIT_WQ_LIST(&ctx->work_list);
+ spin_lock_init(&ctx->work_lock);
INIT_LIST_HEAD(&ctx->tctx_list);
ctx->submit_state.free_list.next = NULL;
INIT_WQ_LIST(&ctx->locked_free_list);
@@ -1135,7 +1136,7 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
do {
struct llist_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.node);
+ io_task_work.llist_node);
if (req->ctx != ctx) {
ctx_flush_and_put(ctx, &ts);
@@ -1159,20 +1160,6 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
return node;
}
-/**
- * io_llist_xchg - swap all entries in a lock-less list
- * @head: the head of lock-less list to delete all entries
- * @new: new entry as the head of the list
- *
- * If list is empty, return NULL, otherwise, return the pointer to the first entry.
- * The order of entries returned is from the newest to the oldest added one.
- */
-static inline struct llist_node *io_llist_xchg(struct llist_head *head,
- struct llist_node *new)
-{
- return xchg(&head->first, new);
-}
-
static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
{
struct llist_node *node = llist_del_all(&tctx->task_list);
@@ -1180,7 +1167,7 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
struct io_kiocb *req;
while (node) {
- req = container_of(node, struct io_kiocb, io_task_work.node);
+ req = container_of(node, struct io_kiocb, io_task_work.llist_node);
node = node->next;
if (sync && last_ctx != req->ctx) {
if (last_ctx) {
@@ -1190,7 +1177,7 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
last_ctx = req->ctx;
percpu_ref_get(&last_ctx->refs);
}
- if (llist_add(&req->io_task_work.node,
+ if (llist_add(&req->io_task_work.llist_node,
&req->ctx->fallback_llist))
schedule_delayed_work(&req->ctx->fallback_work, 1);
}
@@ -1238,48 +1225,26 @@ void tctx_task_work(struct callback_head *cb)
WARN_ON_ONCE(ret);
}
-static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
+static inline void io_req_local_work_add(struct io_kiocb *req, unsigned tw_flags)
{
struct io_ring_ctx *ctx = req->ctx;
- unsigned nr_wait, nr_tw, nr_tw_prev;
- struct llist_node *head;
+ unsigned nr_wait, nr_tw;
+ unsigned long flags;
/* See comment above IO_CQ_WAKE_INIT */
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
/*
- * We don't know how many reuqests is there in the link and whether
+ * We don't know how many requests is there in the link and whether
* they can even be queued lazily, fall back to non-lazy.
*/
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
- flags &= ~IOU_F_TWQ_LAZY_WAKE;
-
- head = READ_ONCE(ctx->work_llist.first);
- do {
- nr_tw_prev = 0;
- if (head) {
- struct io_kiocb *first_req = container_of(head,
- struct io_kiocb,
- io_task_work.node);
- /*
- * Might be executed at any moment, rely on
- * SLAB_TYPESAFE_BY_RCU to keep it alive.
- */
- nr_tw_prev = READ_ONCE(first_req->nr_tw);
- }
-
- /*
- * Theoretically, it can overflow, but that's fine as one of
- * previous adds should've tried to wake the task.
- */
- nr_tw = nr_tw_prev + 1;
- if (!(flags & IOU_F_TWQ_LAZY_WAKE))
- nr_tw = IO_CQ_WAKE_FORCE;
+ tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
- req->nr_tw = nr_tw;
- req->io_task_work.node.next = head;
- } while (!try_cmpxchg(&ctx->work_llist.first, &head,
- &req->io_task_work.node));
+ spin_lock_irqsave(&ctx->work_lock, flags);
+ wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
+ nr_tw = ++ctx->work_items;
+ spin_unlock_irqrestore(&ctx->work_lock, flags);
/*
* cmpxchg implies a full barrier, which pairs with the barrier
@@ -1289,7 +1254,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
* is similar to the wait/wawke task state sync.
*/
- if (!head) {
+ if (nr_tw == 1) {
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
if (ctx->has_evfd)
@@ -1297,13 +1262,8 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
}
nr_wait = atomic_read(&ctx->cq_wait_nr);
- /* not enough or no one is waiting */
- if (nr_tw < nr_wait)
- return;
- /* the previous add has already woken it up */
- if (nr_tw_prev >= nr_wait)
- return;
- wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
+ if (nr_tw >= nr_wait)
+ wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}
static void io_req_normal_work_add(struct io_kiocb *req)
@@ -1312,7 +1272,7 @@ static void io_req_normal_work_add(struct io_kiocb *req)
struct io_ring_ctx *ctx = req->ctx;
/* task_work already pending, we're done */
- if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+ if (!llist_add(&req->io_task_work.llist_node, &tctx->task_list))
return;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
@@ -1346,9 +1306,15 @@ void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
{
- struct llist_node *node;
+ struct io_wq_work_node *node;
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->work_lock, flags);
+ node = ctx->work_list.first;
+ INIT_WQ_LIST(&ctx->work_list);
+ ctx->work_items = 0;
+ spin_unlock_irqrestore(&ctx->work_lock, flags);
- node = llist_del_all(&ctx->work_llist);
while (node) {
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
@@ -1361,7 +1327,7 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
int min_events)
{
- if (llist_empty(&ctx->work_llist))
+ if (wq_list_empty(&ctx->work_list))
return false;
if (events < min_events)
return true;
@@ -1373,7 +1339,7 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
int min_events)
{
- struct llist_node *node;
+ struct io_wq_work_node *node;
unsigned int loops = 0;
int ret = 0;
@@ -1382,13 +1348,14 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
again:
- /*
- * llists are in reverse order, flip it back the right way before
- * running the pending items.
- */
- node = llist_reverse_order(io_llist_xchg(&ctx->work_llist, NULL));
+ spin_lock_irq(&ctx->work_lock);
+ node = ctx->work_list.first;
+ INIT_WQ_LIST(&ctx->work_list);
+ ctx->work_items = 0;
+ spin_unlock_irq(&ctx->work_lock);
+
while (node) {
- struct llist_node *next = node->next;
+ struct io_wq_work_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
INDIRECT_CALL_2(req->io_task_work.func,
@@ -1414,7 +1381,7 @@ static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
{
struct io_tw_state ts = {};
- if (llist_empty(&ctx->work_llist))
+ if (wq_list_empty(&ctx->work_list))
return 0;
return __io_run_local_work(ctx, &ts, min_events);
}
@@ -2426,7 +2393,7 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
- if (!llist_empty(&ctx->work_llist)) {
+ if (!wq_list_empty(&ctx->work_list)) {
__set_current_state(TASK_RUNNING);
if (io_run_local_work(ctx, INT_MAX) > 0)
return 0;
@@ -2455,7 +2422,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
if (unlikely(READ_ONCE(ctx->check_cq)))
return 1;
- if (unlikely(!llist_empty(&ctx->work_llist)))
+ if (unlikely(!wq_list_empty(&ctx->work_list)))
return 1;
if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL)))
return 1;
@@ -2494,7 +2461,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
if (!io_allowed_run_tw(ctx))
return -EEXIST;
- if (!llist_empty(&ctx->work_llist))
+ if (!wq_list_empty(&ctx->work_list))
io_run_local_work(ctx, min_events);
io_run_task_work();
io_cqring_overflow_flush(ctx);
@@ -2558,7 +2525,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
* now rather than let the caller do another wait loop.
*/
io_run_task_work();
- if (!llist_empty(&ctx->work_llist))
+ if (!wq_list_empty(&ctx->work_list))
io_run_local_work(ctx, nr_wait);
/*
@@ -3331,7 +3298,7 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
io_run_task_work();
io_uring_drop_tctx_refs(current);
xa_for_each(&tctx->xa, index, node) {
- if (!llist_empty(&node->ctx->work_llist)) {
+ if (!wq_list_empty(&node->ctx->work_list)) {
WARN_ON_ONCE(node->ctx->submitter_task &&
node->ctx->submitter_task != current);
goto end_wait;
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 27d039ddb05e..bb30a29d0e27 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -343,7 +343,7 @@ static inline int io_run_task_work(void)
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
{
- return task_work_pending(current) || !llist_empty(&ctx->work_llist);
+ return task_work_pending(current) || !wq_list_empty(&ctx->work_list);
}
static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts)
@@ -457,6 +457,6 @@ enum {
static inline bool io_has_work(struct io_ring_ctx *ctx)
{
return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
- !llist_empty(&ctx->work_llist);
+ !wq_list_empty(&ctx->work_list);
}
#endif
--
2.43.0
next prev parent reply other threads:[~2024-03-26 18:46 UTC|newest]
Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-03-26 18:42 [PATCHSET 0/4] Use io_wq_work_list for task_work Jens Axboe
2024-03-26 18:42 ` [PATCH 1/4] io_uring: use the right type for work_llist empty check Jens Axboe
2024-03-26 18:42 ` Jens Axboe [this message]
2024-03-27 13:24 ` [PATCH 2/4] io_uring: switch deferred task_work to an io_wq_work_list Pavel Begunkov
2024-03-27 15:45 ` Jens Axboe
2024-03-27 16:37 ` Jens Axboe
2024-03-27 17:28 ` Pavel Begunkov
2024-03-27 17:34 ` Jens Axboe
2024-03-26 18:42 ` [PATCH 3/4] io_uring: switch fallback work to io_wq_work_list Jens Axboe
2024-03-26 18:42 ` [PATCH 4/4] io_uring: switch normal task_work " Jens Axboe
2024-03-27 13:33 ` [PATCHSET 0/4] Use io_wq_work_list for task_work Pavel Begunkov
2024-03-27 16:36 ` Jens Axboe
2024-03-27 17:05 ` Jens Axboe
2024-03-27 18:04 ` Pavel Begunkov
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox