From: Jens Axboe <axboe@kernel.dk>
To: io-uring@vger.kernel.org
Cc: dvyukov@google.com, csander@purestorage.com, krisman@suse.de,
Jens Axboe <axboe@kernel.dk>
Subject: [PATCH 4/6] io_uring: switch normal task_work to a mpscq
Date: Thu, 11 Jun 2026 20:48:30 -0600 [thread overview]
Message-ID: <20260612025125.1690253-5-axboe@kernel.dk> (raw)
In-Reply-To: <20260612025125.1690253-1-axboe@kernel.dk>
Like the local task_work list, the normal (tctx) task_work list is an
llist, and hence needs the O(n) llist_reverse_order() pass before
running entries in queue order. On top of that, capped runs - sqpoll
processing IORING_TW_CAP_ENTRIES_VALUE entries at a time - need the
claimed-but-unprocessed leftovers carried in a separate retry_list,
as they can't be pushed back to the shared list.
Switch tctx->task_list to a mpscq, like what was done for the
DEFER_TASKRUN paths as well.
Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
include/linux/io_uring_types.h | 12 ++-
io_uring/sqpoll.c | 30 +++----
io_uring/tctx.c | 3 +-
io_uring/tw.c | 146 ++++++++++++++++++++-------------
io_uring/tw.h | 4 +-
5 files changed, 113 insertions(+), 82 deletions(-)
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 9df5584ec3b1..33de451127f9 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -131,6 +131,11 @@ struct io_uring_task {
const struct io_ring_ctx *last;
struct task_struct *task;
struct io_wq *io_wq;
+ /*
+ * Consumer cursor for ->task_list. Only popped by the task itself,
+ * or by ->fallback_work once the task can no longer run task_work.
+ */
+ struct llist_node *task_head;
struct file *registered_rings[IO_RINGFD_REG_MAX];
struct xarray xa;
@@ -139,8 +144,13 @@ struct io_uring_task {
atomic_t inflight_tracked;
struct percpu_counter inflight;
+ /* drains ->task_list once the task can no longer run task_work */
+ struct work_struct fallback_work;
+
struct { /* task_work */
- struct llist_head task_list;
+ struct mpscq task_list;
+ /* BIT(0) guards adding tw only once */
+ unsigned long tw_pending;
struct callback_head task_work;
} ____cacheline_aligned_in_smp;
};
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 46c12afec73e..2460bd605266 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -260,39 +260,29 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
}
/*
- * Run task_work, processing the retry_list first. The retry_list holds
- * entries that we passed on in the previous run, if we had more task_work
- * than we were asked to process. Newly queued task_work isn't run until the
- * retry list has been fully processed.
+ * Run task_work, processing no more than max_entries at a time. If more
+ * than that is pending, it simply stays on the queue for the next run.
*/
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(int max_entries)
{
struct io_uring_task *tctx = current->io_uring;
unsigned int count = 0;
- if (*retry_list) {
- *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
- if (count >= max_entries)
- goto out;
- max_entries -= count;
- }
- *retry_list = tctx_task_work_run(tctx, max_entries, &count);
-out:
+ tctx_task_work_run(tctx, max_entries, &count);
if (task_work_pending(current))
task_work_run();
return count;
}
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(void)
{
struct io_uring_task *tctx = current->io_uring;
- return retry_list || !llist_empty(&tctx->task_list);
+ return !mpscq_empty(&tctx->task_list);
}
static int io_sq_thread(void *data)
{
- struct llist_node *retry_list = NULL;
struct io_sq_data *sqd = data;
struct io_ring_ctx *ctx;
unsigned long timeout = 0;
@@ -347,7 +337,7 @@ static int io_sq_thread(void *data)
if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
sqt_spin = true;
}
- if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
+ if (io_sq_tw(IORING_TW_CAP_ENTRIES_VALUE))
sqt_spin = true;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -372,7 +362,7 @@ static int io_sq_thread(void *data)
}
prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
- if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
+ if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending()) {
bool needs_sched = true;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -411,8 +401,8 @@ static int io_sq_thread(void *data)
timeout = jiffies + sqd->sq_thread_idle;
}
- if (retry_list)
- io_sq_tw(&retry_list, UINT_MAX);
+ if (io_sq_tw_pending())
+ io_sq_tw(UINT_MAX);
io_uring_cancel_generic(true, sqd);
rcu_assign_pointer(sqd->thread, NULL);
diff --git a/io_uring/tctx.c b/io_uring/tctx.c
index 42b219b34aa8..cc3bf2b3bdbc 100644
--- a/io_uring/tctx.c
+++ b/io_uring/tctx.c
@@ -103,7 +103,8 @@ __cold struct io_uring_task *io_uring_alloc_task_context(struct task_struct *tas
init_waitqueue_head(&tctx->wait);
atomic_set(&tctx->in_cancel, 0);
atomic_set(&tctx->inflight_tracked, 0);
- init_llist_head(&tctx->task_list);
+ mpscq_init(&tctx->task_list, &tctx->task_head);
+ INIT_WORK(&tctx->fallback_work, io_tctx_fallback_work);
init_task_work(&tctx->task_work, tctx_task_work);
return tctx;
}
diff --git a/io_uring/tw.c b/io_uring/tw.c
index b8d6027aaeff..ca29bb0b9768 100644
--- a/io_uring/tw.c
+++ b/io_uring/tw.c
@@ -46,46 +46,6 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
percpu_ref_put(&ctx->refs);
}
-/*
- * Run queued task_work, returning the number of entries processed in *count.
- * If more entries than max_entries are available, stop processing once this
- * is reached and return the rest of the list.
- */
-struct llist_node *io_handle_tw_list(struct llist_node *node,
- unsigned int *count,
- unsigned int max_entries)
-{
- struct io_ring_ctx *ctx = NULL;
- struct io_tw_state ts = { };
-
- do {
- struct llist_node *next = node->next;
- struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.node);
-
- if (req->ctx != ctx) {
- ctx_flush_and_put(ctx, ts);
- ctx = req->ctx;
- mutex_lock(&ctx->uring_lock);
- percpu_ref_get(&ctx->refs);
- ts.cancel = io_should_terminate_tw(ctx);
- }
- INDIRECT_CALL_2(req->io_task_work.func,
- io_poll_task_func, io_req_rw_complete,
- (struct io_tw_req){req}, ts);
- node = next;
- (*count)++;
- if (unlikely(need_resched())) {
- ctx_flush_and_put(ctx, ts);
- ctx = NULL;
- cond_resched();
- }
- } while (node && *count < max_entries);
-
- ctx_flush_and_put(ctx, ts);
- return node;
-}
-
static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
{
struct io_ring_ctx *last_ctx = NULL;
@@ -114,43 +74,109 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
}
}
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+void io_tctx_fallback_work(struct work_struct *work)
{
- struct llist_node *node = llist_del_all(&tctx->task_list);
+ struct io_uring_task *tctx = container_of(work, struct io_uring_task,
+ fallback_work);
+ struct llist_node *node, *first = NULL, **tail = &first;
+
+ /* see tctx_task_work() - a set bit must always have a run coming */
+ clear_bit(0, &tctx->tw_pending);
+ smp_mb__after_atomic();
+
+ while (!mpscq_empty(&tctx->task_list)) {
+ node = mpscq_pop(&tctx->task_list, &tctx->task_head);
+ if (!node) {
+ /* a producer is mid-push, wait for it to link */
+ cond_resched();
+ continue;
+ }
+ *tail = node;
+ tail = &node->next;
+ }
+ *tail = NULL;
+ __io_fallback_tw(first, false);
+ put_task_struct(tctx->task);
+}
- __io_fallback_tw(node, sync);
+static void io_fallback_tw(struct io_uring_task *tctx)
+{
+ /*
+ * The task ref both keeps ->task valid and, as __io_uring_free() is
+ * only called when the task itself is freed, ensures the tctx (and
+ * the queued work) stay around until the drain has run.
+ */
+ get_task_struct(tctx->task);
+ if (!queue_work(system_unbound_wq, &tctx->fallback_work))
+ put_task_struct(tctx->task);
}
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
+/*
+ * Run queued task_work, processing no more than max_entries, with the number
+ * of entries processed added to *count. If more entries than max_entries are
+ * available, the remainder simply stay on the queue for the next run.
+ */
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries,
+ unsigned int *count)
{
- struct llist_node *node;
+ struct io_ring_ctx *ctx = NULL;
+ struct io_tw_state ts = { };
- node = llist_del_all(&tctx->task_list);
- if (node) {
- node = llist_reverse_order(node);
- node = io_handle_tw_list(node, count, max_entries);
+ while (*count < max_entries) {
+ struct llist_node *node = mpscq_pop(&tctx->task_list,
+ &tctx->task_head);
+ struct io_kiocb *req;
+
+ if (!node) {
+ if (mpscq_empty(&tctx->task_list))
+ break;
+ /*
+ * A producer has published a node but hasn't
+ * linked it into the queue yet (see mpscq_pop()).
+ * Give it a chance to finish rather than spinning,
+ * and don't sit on the ctx lock while doing so.
+ */
+ ctx_flush_and_put(ctx, ts);
+ ctx = NULL;
+ cond_resched();
+ continue;
+ }
+ req = container_of(node, struct io_kiocb, io_task_work.node);
+ if (req->ctx != ctx) {
+ ctx_flush_and_put(ctx, ts);
+ ctx = req->ctx;
+ mutex_lock(&ctx->uring_lock);
+ percpu_ref_get(&ctx->refs);
+ ts.cancel = io_should_terminate_tw(ctx);
+ }
+ INDIRECT_CALL_2(req->io_task_work.func,
+ io_poll_task_func, io_req_rw_complete,
+ (struct io_tw_req){req}, ts);
+ (*count)++;
+ if (unlikely(need_resched())) {
+ ctx_flush_and_put(ctx, ts);
+ ctx = NULL;
+ cond_resched();
+ }
}
+ ctx_flush_and_put(ctx, ts);
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
io_uring_drop_tctx_refs(current);
trace_io_uring_task_work_run(tctx, *count);
- return node;
}
void tctx_task_work(struct callback_head *cb)
{
struct io_uring_task *tctx;
- struct llist_node *ret;
unsigned int count = 0;
tctx = container_of(cb, struct io_uring_task, task_work);
- ret = tctx_task_work_run(tctx, UINT_MAX, &count);
- /* can't happen */
- WARN_ON_ONCE(ret);
+ clear_bit(0, &tctx->tw_pending);
+ smp_mb__after_atomic();
+ tctx_task_work_run(tctx, UINT_MAX, &count);
}
/*
@@ -220,7 +246,7 @@ void io_req_normal_work_add(struct io_kiocb *req)
struct io_ring_ctx *ctx = req->ctx;
/* task_work already pending, we're done */
- if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+ if (!mpscq_push(&tctx->task_list, &req->io_task_work.node))
return;
/*
@@ -236,10 +262,14 @@ void io_req_normal_work_add(struct io_kiocb *req)
return;
}
+ /* task_work must only be added once */
+ if (test_and_set_bit(0, &tctx->tw_pending))
+ return;
+
if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
return;
- io_fallback_tw(tctx, false);
+ io_fallback_tw(tctx);
}
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
diff --git a/io_uring/tw.h b/io_uring/tw.h
index f42db5fdbded..387e52004da8 100644
--- a/io_uring/tw.h
+++ b/io_uring/tw.h
@@ -25,8 +25,8 @@ static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx)
}
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags);
-struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
void tctx_task_work(struct callback_head *cb);
+void io_tctx_fallback_work(struct work_struct *work);
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events);
int io_run_task_work_sig(struct io_ring_ctx *ctx);
@@ -36,7 +36,7 @@ int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events);
void io_req_local_work_add(struct io_kiocb *req, unsigned flags);
void io_req_normal_work_add(struct io_kiocb *req);
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
static inline void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
{
--
2.53.0
next prev parent reply other threads:[~2026-06-12 2:51 UTC|newest]
Thread overview: 22+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-06-12 2:48 [PATCHSET v2] Add lockless MPSC FIFO queue for task work Jens Axboe
2026-06-12 2:48 ` [PATCH 1/6] io_uring: grab RCU read lock marking task run Jens Axboe
2026-06-13 2:27 ` Caleb Sander Mateos
2026-06-12 2:48 ` [PATCH 2/6] io_uring/mpscq: add lockless multi-producer, single-consumer FIFO queue Jens Axboe
2026-06-13 2:40 ` Caleb Sander Mateos
2026-06-13 12:22 ` Jens Axboe
2026-06-12 2:48 ` [PATCH 3/6] io_uring: switch local task_work to a mpscq Jens Axboe
2026-06-12 3:20 ` Caleb Sander Mateos
2026-06-12 12:23 ` Jens Axboe
2026-06-12 2:48 ` Jens Axboe [this message]
2026-06-12 18:59 ` [PATCH 4/6] io_uring: switch normal " Caleb Sander Mateos
2026-06-12 19:37 ` Jens Axboe
2026-06-13 2:26 ` Caleb Sander Mateos
2026-06-13 12:08 ` Jens Axboe
2026-06-15 18:33 ` Caleb Sander Mateos
2026-06-15 18:47 ` Jens Axboe
2026-06-15 20:04 ` Jens Axboe
2026-06-15 20:40 ` Caleb Sander Mateos
2026-06-15 21:51 ` Jens Axboe
2026-06-16 0:22 ` Caleb Sander Mateos
2026-06-12 2:48 ` [PATCH 5/6] io_uring: run the tctx task_work fallback directly Jens Axboe
2026-06-12 2:48 ` [PATCH 6/6] io_uring: remove the per-ctx fallback task_work machinery Jens Axboe
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260612025125.1690253-5-axboe@kernel.dk \
--to=axboe@kernel.dk \
--cc=csander@purestorage.com \
--cc=dvyukov@google.com \
--cc=io-uring@vger.kernel.org \
--cc=krisman@suse.de \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox