public inbox for io-uring@vger.kernel.org
 help / color / mirror / Atom feed
From: Jens Axboe <axboe@kernel.dk>
To: io-uring@vger.kernel.org
Cc: dvyukov@google.com, csander@purestorage.com, krisman@suse.de,
	Jens Axboe <axboe@kernel.dk>
Subject: [PATCH 4/6] io_uring: switch normal task_work to a mpscq
Date: Thu, 11 Jun 2026 20:48:30 -0600	[thread overview]
Message-ID: <20260612025125.1690253-5-axboe@kernel.dk> (raw)
In-Reply-To: <20260612025125.1690253-1-axboe@kernel.dk>

Like the local task_work list, the normal (tctx) task_work list is an
llist, and hence needs the O(n) llist_reverse_order() pass before
running entries in queue order. On top of that, capped runs - sqpoll
processing IORING_TW_CAP_ENTRIES_VALUE entries at a time - need the
claimed-but-unprocessed leftovers carried in a separate retry_list,
as they can't be pushed back to the shared list.

Switch tctx->task_list to a mpscq, like what was done for the
DEFER_TASKRUN paths as well.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 include/linux/io_uring_types.h |  12 ++-
 io_uring/sqpoll.c              |  30 +++----
 io_uring/tctx.c                |   3 +-
 io_uring/tw.c                  | 146 ++++++++++++++++++++-------------
 io_uring/tw.h                  |   4 +-
 5 files changed, 113 insertions(+), 82 deletions(-)

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 9df5584ec3b1..33de451127f9 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -131,6 +131,11 @@ struct io_uring_task {
 	const struct io_ring_ctx 	*last;
 	struct task_struct		*task;
 	struct io_wq			*io_wq;
+	/*
+	 * Consumer cursor for ->task_list. Only popped by the task itself,
+	 * or by ->fallback_work once the task can no longer run task_work.
+	 */
+	struct llist_node		*task_head;
 	struct file			*registered_rings[IO_RINGFD_REG_MAX];
 
 	struct xarray			xa;
@@ -139,8 +144,13 @@ struct io_uring_task {
 	atomic_t			inflight_tracked;
 	struct percpu_counter		inflight;
 
+	/* drains ->task_list once the task can no longer run task_work */
+	struct work_struct		fallback_work;
+
 	struct { /* task_work */
-		struct llist_head	task_list;
+		struct mpscq		task_list;
+		/* BIT(0) guards adding tw only once */
+		unsigned long		tw_pending;
 		struct callback_head	task_work;
 	} ____cacheline_aligned_in_smp;
 };
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 46c12afec73e..2460bd605266 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -260,39 +260,29 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
 }
 
 /*
- * Run task_work, processing the retry_list first. The retry_list holds
- * entries that we passed on in the previous run, if we had more task_work
- * than we were asked to process. Newly queued task_work isn't run until the
- * retry list has been fully processed.
+ * Run task_work, processing no more than max_entries at a time. If more
+ * than that is pending, it simply stays on the queue for the next run.
  */
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(int max_entries)
 {
 	struct io_uring_task *tctx = current->io_uring;
 	unsigned int count = 0;
 
-	if (*retry_list) {
-		*retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
-		if (count >= max_entries)
-			goto out;
-		max_entries -= count;
-	}
-	*retry_list = tctx_task_work_run(tctx, max_entries, &count);
-out:
+	tctx_task_work_run(tctx, max_entries, &count);
 	if (task_work_pending(current))
 		task_work_run();
 	return count;
 }
 
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(void)
 {
 	struct io_uring_task *tctx = current->io_uring;
 
-	return retry_list || !llist_empty(&tctx->task_list);
+	return !mpscq_empty(&tctx->task_list);
 }
 
 static int io_sq_thread(void *data)
 {
-	struct llist_node *retry_list = NULL;
 	struct io_sq_data *sqd = data;
 	struct io_ring_ctx *ctx;
 	unsigned long timeout = 0;
@@ -347,7 +337,7 @@ static int io_sq_thread(void *data)
 			if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
 				sqt_spin = true;
 		}
-		if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
+		if (io_sq_tw(IORING_TW_CAP_ENTRIES_VALUE))
 			sqt_spin = true;
 
 		list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -372,7 +362,7 @@ static int io_sq_thread(void *data)
 		}
 
 		prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
-		if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
+		if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending()) {
 			bool needs_sched = true;
 
 			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -411,8 +401,8 @@ static int io_sq_thread(void *data)
 		timeout = jiffies + sqd->sq_thread_idle;
 	}
 
-	if (retry_list)
-		io_sq_tw(&retry_list, UINT_MAX);
+	if (io_sq_tw_pending())
+		io_sq_tw(UINT_MAX);
 
 	io_uring_cancel_generic(true, sqd);
 	rcu_assign_pointer(sqd->thread, NULL);
diff --git a/io_uring/tctx.c b/io_uring/tctx.c
index 42b219b34aa8..cc3bf2b3bdbc 100644
--- a/io_uring/tctx.c
+++ b/io_uring/tctx.c
@@ -103,7 +103,8 @@ __cold struct io_uring_task *io_uring_alloc_task_context(struct task_struct *tas
 	init_waitqueue_head(&tctx->wait);
 	atomic_set(&tctx->in_cancel, 0);
 	atomic_set(&tctx->inflight_tracked, 0);
-	init_llist_head(&tctx->task_list);
+	mpscq_init(&tctx->task_list, &tctx->task_head);
+	INIT_WORK(&tctx->fallback_work, io_tctx_fallback_work);
 	init_task_work(&tctx->task_work, tctx_task_work);
 	return tctx;
 }
diff --git a/io_uring/tw.c b/io_uring/tw.c
index b8d6027aaeff..ca29bb0b9768 100644
--- a/io_uring/tw.c
+++ b/io_uring/tw.c
@@ -46,46 +46,6 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
 	percpu_ref_put(&ctx->refs);
 }
 
-/*
- * Run queued task_work, returning the number of entries processed in *count.
- * If more entries than max_entries are available, stop processing once this
- * is reached and return the rest of the list.
- */
-struct llist_node *io_handle_tw_list(struct llist_node *node,
-				     unsigned int *count,
-				     unsigned int max_entries)
-{
-	struct io_ring_ctx *ctx = NULL;
-	struct io_tw_state ts = { };
-
-	do {
-		struct llist_node *next = node->next;
-		struct io_kiocb *req = container_of(node, struct io_kiocb,
-						    io_task_work.node);
-
-		if (req->ctx != ctx) {
-			ctx_flush_and_put(ctx, ts);
-			ctx = req->ctx;
-			mutex_lock(&ctx->uring_lock);
-			percpu_ref_get(&ctx->refs);
-			ts.cancel = io_should_terminate_tw(ctx);
-		}
-		INDIRECT_CALL_2(req->io_task_work.func,
-				io_poll_task_func, io_req_rw_complete,
-				(struct io_tw_req){req}, ts);
-		node = next;
-		(*count)++;
-		if (unlikely(need_resched())) {
-			ctx_flush_and_put(ctx, ts);
-			ctx = NULL;
-			cond_resched();
-		}
-	} while (node && *count < max_entries);
-
-	ctx_flush_and_put(ctx, ts);
-	return node;
-}
-
 static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
 {
 	struct io_ring_ctx *last_ctx = NULL;
@@ -114,43 +74,109 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
 	}
 }
 
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+void io_tctx_fallback_work(struct work_struct *work)
 {
-	struct llist_node *node = llist_del_all(&tctx->task_list);
+	struct io_uring_task *tctx = container_of(work, struct io_uring_task,
+						  fallback_work);
+	struct llist_node *node, *first = NULL, **tail = &first;
+
+	/* see tctx_task_work() - a set bit must always have a run coming */
+	clear_bit(0, &tctx->tw_pending);
+	smp_mb__after_atomic();
+
+	while (!mpscq_empty(&tctx->task_list)) {
+		node = mpscq_pop(&tctx->task_list, &tctx->task_head);
+		if (!node) {
+			/* a producer is mid-push, wait for it to link */
+			cond_resched();
+			continue;
+		}
+		*tail = node;
+		tail = &node->next;
+	}
+	*tail = NULL;
+	__io_fallback_tw(first, false);
+	put_task_struct(tctx->task);
+}
 
-	__io_fallback_tw(node, sync);
+static void io_fallback_tw(struct io_uring_task *tctx)
+{
+	/*
+	 * The task ref both keeps ->task valid and, as __io_uring_free() is
+	 * only called when the task itself is freed, ensures the tctx (and
+	 * the queued work) stay around until the drain has run.
+	 */
+	get_task_struct(tctx->task);
+	if (!queue_work(system_unbound_wq, &tctx->fallback_work))
+		put_task_struct(tctx->task);
 }
 
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
-				      unsigned int max_entries,
-				      unsigned int *count)
+/*
+ * Run queued task_work, processing no more than max_entries, with the number
+ * of entries processed added to *count. If more entries than max_entries are
+ * available, the remainder simply stay on the queue for the next run.
+ */
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries,
+			unsigned int *count)
 {
-	struct llist_node *node;
+	struct io_ring_ctx *ctx = NULL;
+	struct io_tw_state ts = { };
 
-	node = llist_del_all(&tctx->task_list);
-	if (node) {
-		node = llist_reverse_order(node);
-		node = io_handle_tw_list(node, count, max_entries);
+	while (*count < max_entries) {
+		struct llist_node *node = mpscq_pop(&tctx->task_list,
+						    &tctx->task_head);
+		struct io_kiocb *req;
+
+		if (!node) {
+			if (mpscq_empty(&tctx->task_list))
+				break;
+			/*
+			 * A producer has published a node but hasn't
+			 * linked it into the queue yet (see mpscq_pop()).
+			 * Give it a chance to finish rather than spinning,
+			 * and don't sit on the ctx lock while doing so.
+			 */
+			ctx_flush_and_put(ctx, ts);
+			ctx = NULL;
+			cond_resched();
+			continue;
+		}
+		req = container_of(node, struct io_kiocb, io_task_work.node);
+		if (req->ctx != ctx) {
+			ctx_flush_and_put(ctx, ts);
+			ctx = req->ctx;
+			mutex_lock(&ctx->uring_lock);
+			percpu_ref_get(&ctx->refs);
+			ts.cancel = io_should_terminate_tw(ctx);
+		}
+		INDIRECT_CALL_2(req->io_task_work.func,
+				io_poll_task_func, io_req_rw_complete,
+				(struct io_tw_req){req}, ts);
+		(*count)++;
+		if (unlikely(need_resched())) {
+			ctx_flush_and_put(ctx, ts);
+			ctx = NULL;
+			cond_resched();
+		}
 	}
+	ctx_flush_and_put(ctx, ts);
 
 	/* relaxed read is enough as only the task itself sets ->in_cancel */
 	if (unlikely(atomic_read(&tctx->in_cancel)))
 		io_uring_drop_tctx_refs(current);
 
 	trace_io_uring_task_work_run(tctx, *count);
-	return node;
 }
 
 void tctx_task_work(struct callback_head *cb)
 {
 	struct io_uring_task *tctx;
-	struct llist_node *ret;
 	unsigned int count = 0;
 
 	tctx = container_of(cb, struct io_uring_task, task_work);
-	ret = tctx_task_work_run(tctx, UINT_MAX, &count);
-	/* can't happen */
-	WARN_ON_ONCE(ret);
+	clear_bit(0, &tctx->tw_pending);
+	smp_mb__after_atomic();
+	tctx_task_work_run(tctx, UINT_MAX, &count);
 }
 
 /*
@@ -220,7 +246,7 @@ void io_req_normal_work_add(struct io_kiocb *req)
 	struct io_ring_ctx *ctx = req->ctx;
 
 	/* task_work already pending, we're done */
-	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+	if (!mpscq_push(&tctx->task_list, &req->io_task_work.node))
 		return;
 
 	/*
@@ -236,10 +262,14 @@ void io_req_normal_work_add(struct io_kiocb *req)
 		return;
 	}
 
+	/* task_work must only be added once */
+	if (test_and_set_bit(0, &tctx->tw_pending))
+		return;
+
 	if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
 		return;
 
-	io_fallback_tw(tctx, false);
+	io_fallback_tw(tctx);
 }
 
 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
diff --git a/io_uring/tw.h b/io_uring/tw.h
index f42db5fdbded..387e52004da8 100644
--- a/io_uring/tw.h
+++ b/io_uring/tw.h
@@ -25,8 +25,8 @@ static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx)
 }
 
 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags);
-struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
 void tctx_task_work(struct callback_head *cb);
+void io_tctx_fallback_work(struct work_struct *work);
 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events);
 int io_run_task_work_sig(struct io_ring_ctx *ctx);
 
@@ -36,7 +36,7 @@ int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events);
 
 void io_req_local_work_add(struct io_kiocb *req, unsigned flags);
 void io_req_normal_work_add(struct io_kiocb *req);
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
 
 static inline void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
 {
-- 
2.53.0


  parent reply	other threads:[~2026-06-12  2:51 UTC|newest]

Thread overview: 22+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-06-12  2:48 [PATCHSET v2] Add lockless MPSC FIFO queue for task work Jens Axboe
2026-06-12  2:48 ` [PATCH 1/6] io_uring: grab RCU read lock marking task run Jens Axboe
2026-06-13  2:27   ` Caleb Sander Mateos
2026-06-12  2:48 ` [PATCH 2/6] io_uring/mpscq: add lockless multi-producer, single-consumer FIFO queue Jens Axboe
2026-06-13  2:40   ` Caleb Sander Mateos
2026-06-13 12:22     ` Jens Axboe
2026-06-12  2:48 ` [PATCH 3/6] io_uring: switch local task_work to a mpscq Jens Axboe
2026-06-12  3:20   ` Caleb Sander Mateos
2026-06-12 12:23     ` Jens Axboe
2026-06-12  2:48 ` Jens Axboe [this message]
2026-06-12 18:59   ` [PATCH 4/6] io_uring: switch normal " Caleb Sander Mateos
2026-06-12 19:37     ` Jens Axboe
2026-06-13  2:26       ` Caleb Sander Mateos
2026-06-13 12:08         ` Jens Axboe
2026-06-15 18:33           ` Caleb Sander Mateos
2026-06-15 18:47             ` Jens Axboe
2026-06-15 20:04               ` Jens Axboe
2026-06-15 20:40                 ` Caleb Sander Mateos
2026-06-15 21:51                   ` Jens Axboe
2026-06-16  0:22                     ` Caleb Sander Mateos
2026-06-12  2:48 ` [PATCH 5/6] io_uring: run the tctx task_work fallback directly Jens Axboe
2026-06-12  2:48 ` [PATCH 6/6] io_uring: remove the per-ctx fallback task_work machinery Jens Axboe

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260612025125.1690253-5-axboe@kernel.dk \
    --to=axboe@kernel.dk \
    --cc=csander@purestorage.com \
    --cc=dvyukov@google.com \
    --cc=io-uring@vger.kernel.org \
    --cc=krisman@suse.de \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox