public inbox for [email protected]
 help / color / mirror / Atom feed
* [PATCHSET for-next 0/6] task work cleanups
@ 2024-11-22 16:12 Jens Axboe
  2024-11-22 16:12 ` [PATCH 1/6] io_uring: make task_work pending check dependent on ring type Jens Axboe
                   ` (5 more replies)
  0 siblings, 6 replies; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
  To: io-uring

Hi,

This patchset gets rid of using llist for handling task_work, for both
local and normal task_work. Instead of a lockless list, a normal
io_wq_work_list is used and protected by a spinlock. I've done some
benchmarking with this, and I only see wins with this - the act of
adding or iterating task_work is the same cost, but we get rid of the
need to reverse the task_work list, which can be substantial for bursty
applications or just generally busy task_work usages.

Patch 2 implements io_wq_work_list handling for deferred task_work, and
patch 4 does the same for normal task_work. Patch 6 then also switches
SQPOLL to use this scheme, which eliminates the passing around of
io_wq_work_node for its retry logic.

Outside of cleaning up this code, it also enables us to potentially
implement task_work run capping for normal task_work in the future.

Git tree can be found here:

https://git.kernel.dk/cgit/linux/log/?h=io_uring-defer-tw

 include/linux/io_uring_types.h |  17 +-
 io_uring/io_uring.c            | 293 ++++++++++++++++++---------------
 io_uring/io_uring.h            |  20 ++-
 io_uring/slist.h               |  16 ++
 io_uring/sqpoll.c              |  20 ++-
 io_uring/tctx.c                |   3 +-
 6 files changed, 211 insertions(+), 158 deletions(-)

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 1/6] io_uring: make task_work pending check dependent on ring type
  2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
  2024-11-22 16:12 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Jens Axboe
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

There's no need to check for generic task_work for DEFER_TASKRUN, if we
have local task_work pending. This avoids dipping into the huge
task_struct, if we have normal task_work pending.

Signed-off-by: Jens Axboe <[email protected]>
---
 io_uring/io_uring.h | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 12abee607e4a..214f9f175102 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -354,7 +354,9 @@ static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
 
 static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
 {
-	return task_work_pending(current) || io_local_work_pending(ctx);
+	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN && io_local_work_pending(ctx))
+		return true;
+	return task_work_pending(current);
 }
 
 static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts)
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
  2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
  2024-11-22 16:12 ` [PATCH 1/6] io_uring: make task_work pending check dependent on ring type Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
  2024-11-22 17:07   ` Pavel Begunkov
  2024-11-22 16:12 ` [PATCH 3/6] io_uring/slist: add list-to-list list splice helper Jens Axboe
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Add a spinlock for the list, and replace the lockless llist with the
work list instead. This avoids needing to reverse items in the list
before running them, as the io_wq_work_list is FIFO by nature whereas
the llist is LIFO.

Signed-off-by: Jens Axboe <[email protected]>
---
 include/linux/io_uring_types.h |  13 ++-
 io_uring/io_uring.c            | 194 ++++++++++++++++-----------------
 io_uring/io_uring.h            |   2 +-
 3 files changed, 104 insertions(+), 105 deletions(-)

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 011860ade268..e9ba99cb0ed0 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -335,8 +335,9 @@ struct io_ring_ctx {
 	 * regularly bounce b/w CPUs.
 	 */
 	struct {
-		struct llist_head	work_llist;
-		struct llist_head	retry_llist;
+		struct io_wq_work_list	work_list;
+		spinlock_t		work_lock;
+		int			work_items;
 		unsigned long		check_cq;
 		atomic_t		cq_wait_nr;
 		atomic_t		cq_timeouts;
@@ -566,7 +567,11 @@ enum {
 typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
 
 struct io_task_work {
-	struct llist_node		node;
+	/* DEFER_TASKRUN uses work_node, regular task_work node */
+	union {
+		struct io_wq_work_node	work_node;
+		struct llist_node	node;
+	};
 	io_req_tw_func_t		func;
 };
 
@@ -622,8 +627,6 @@ struct io_kiocb {
 	 */
 	u16				buf_index;
 
-	unsigned			nr_tw;
-
 	/* REQ_F_* flags */
 	io_req_flags_t			flags;
 
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index c3a7d0197636..b7eb962e9872 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -339,7 +339,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_LIST_HEAD(&ctx->defer_list);
 	INIT_LIST_HEAD(&ctx->timeout_list);
 	INIT_LIST_HEAD(&ctx->ltimeout_list);
-	init_llist_head(&ctx->work_llist);
+	INIT_WQ_LIST(&ctx->work_list);
+	spin_lock_init(&ctx->work_lock);
 	INIT_LIST_HEAD(&ctx->tctx_list);
 	ctx->submit_state.free_list.next = NULL;
 	INIT_HLIST_HEAD(&ctx->waitid_list);
@@ -1066,25 +1067,31 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
 	return node;
 }
 
-static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
+static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync,
+				    struct io_ring_ctx **last_ctx)
 {
+	if (sync && *last_ctx != req->ctx) {
+		if (*last_ctx) {
+			flush_delayed_work(&(*last_ctx)->fallback_work);
+			percpu_ref_put(&(*last_ctx)->refs);
+		}
+		*last_ctx = req->ctx;
+		percpu_ref_get(&(*last_ctx)->refs);
+	}
+	if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist))
+		schedule_delayed_work(&req->ctx->fallback_work, 1);
+}
+
+static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+{
+	struct llist_node *node = llist_del_all(&tctx->task_list);
 	struct io_ring_ctx *last_ctx = NULL;
 	struct io_kiocb *req;
 
 	while (node) {
 		req = container_of(node, struct io_kiocb, io_task_work.node);
 		node = node->next;
-		if (sync && last_ctx != req->ctx) {
-			if (last_ctx) {
-				flush_delayed_work(&last_ctx->fallback_work);
-				percpu_ref_put(&last_ctx->refs);
-			}
-			last_ctx = req->ctx;
-			percpu_ref_get(&last_ctx->refs);
-		}
-		if (llist_add(&req->io_task_work.node,
-			      &req->ctx->fallback_llist))
-			schedule_delayed_work(&req->ctx->fallback_work, 1);
+		__io_fallback_tw(req, sync, &last_ctx);
 	}
 
 	if (last_ctx) {
@@ -1093,13 +1100,6 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
 	}
 }
 
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
-{
-	struct llist_node *node = llist_del_all(&tctx->task_list);
-
-	__io_fallback_tw(node, sync);
-}
-
 struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
 				      unsigned int max_entries,
 				      unsigned int *count)
@@ -1139,65 +1139,45 @@ void tctx_task_work(struct callback_head *cb)
 
 static inline void io_req_local_work_add(struct io_kiocb *req,
 					 struct io_ring_ctx *ctx,
-					 unsigned flags)
+					 unsigned tw_flags)
 {
-	unsigned nr_wait, nr_tw, nr_tw_prev;
-	struct llist_node *head;
+	unsigned nr_tw, nr_tw_prev, nr_wait;
+	unsigned long flags;
 
 	/* See comment above IO_CQ_WAKE_INIT */
 	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
 
 	/*
-	 * We don't know how many reuqests is there in the link and whether
-	 * they can even be queued lazily, fall back to non-lazy.
+	 * We don't know how many requests are in the link and whether they can
+	 * even be queued lazily, fall back to non-lazy.
 	 */
 	if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
-		flags &= ~IOU_F_TWQ_LAZY_WAKE;
+		tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
-	guard(rcu)();
+	spin_lock_irqsave(&ctx->work_lock, flags);
+	wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+	nr_tw_prev = ctx->work_items++;
+	spin_unlock_irqrestore(&ctx->work_lock, flags);
 
-	head = READ_ONCE(ctx->work_llist.first);
-	do {
-		nr_tw_prev = 0;
-		if (head) {
-			struct io_kiocb *first_req = container_of(head,
-							struct io_kiocb,
-							io_task_work.node);
-			/*
-			 * Might be executed at any moment, rely on
-			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
-			 */
-			nr_tw_prev = READ_ONCE(first_req->nr_tw);
-		}
-
-		/*
-		 * Theoretically, it can overflow, but that's fine as one of
-		 * previous adds should've tried to wake the task.
-		 */
-		nr_tw = nr_tw_prev + 1;
-		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
-			nr_tw = IO_CQ_WAKE_FORCE;
-
-		req->nr_tw = nr_tw;
-		req->io_task_work.node.next = head;
-	} while (!try_cmpxchg(&ctx->work_llist.first, &head,
-			      &req->io_task_work.node));
-
-	/*
-	 * cmpxchg implies a full barrier, which pairs with the barrier
-	 * in set_current_state() on the io_cqring_wait() side. It's used
-	 * to ensure that either we see updated ->cq_wait_nr, or waiters
-	 * going to sleep will observe the work added to the list, which
-	 * is similar to the wait/wawke task state sync.
-	 */
+	nr_tw = nr_tw_prev + 1;
+	if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
+		nr_tw = IO_CQ_WAKE_FORCE;
 
-	if (!head) {
+	if (!nr_tw_prev) {
 		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
 			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
 		if (ctx->has_evfd)
 			io_eventfd_signal(ctx);
 	}
 
+	/*
+	 * We need a barrier after unlock, which pairs with the barrier
+	 * in set_current_state() on the io_cqring_wait() side. It's used
+	 * to ensure that either we see updated ->cq_wait_nr, or waiters
+	 * going to sleep will observe the work added to the list, which
+	 * is similar to the wait/wake task state sync.
+	 */
+	smp_mb();
 	nr_wait = atomic_read(&ctx->cq_wait_nr);
 	/* not enough or no one is waiting */
 	if (nr_tw < nr_wait)
@@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
 
 static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
 {
-	struct llist_node *node = llist_del_all(&ctx->work_llist);
+	struct io_ring_ctx *last_ctx = NULL;
+	struct io_wq_work_node *node;
+	unsigned long flags;
 
-	__io_fallback_tw(node, false);
-	node = llist_del_all(&ctx->retry_llist);
-	__io_fallback_tw(node, false);
+	spin_lock_irqsave(&ctx->work_lock, flags);
+	node = ctx->work_list.first;
+	INIT_WQ_LIST(&ctx->work_list);
+	ctx->work_items = 0;
+	spin_unlock_irqrestore(&ctx->work_lock, flags);
+
+	while (node) {
+		struct io_kiocb *req;
+
+		req = container_of(node, struct io_kiocb, io_task_work.work_node);
+		node = node->next;
+		__io_fallback_tw(req, false, &last_ctx);
+	}
+	if (last_ctx) {
+		flush_delayed_work(&last_ctx->fallback_work);
+		percpu_ref_put(&last_ctx->refs);
+	}
 }
 
 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1272,52 +1268,52 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
 	return false;
 }
 
-static int __io_run_local_work_loop(struct llist_node **node,
-				    struct io_tw_state *ts,
-				    int events)
-{
-	while (*node) {
-		struct llist_node *next = (*node)->next;
-		struct io_kiocb *req = container_of(*node, struct io_kiocb,
-						    io_task_work.node);
-		INDIRECT_CALL_2(req->io_task_work.func,
-				io_poll_task_func, io_req_rw_complete,
-				req, ts);
-		*node = next;
-		if (--events <= 0)
-			break;
-	}
-
-	return events;
-}
-
 static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 			       int min_events)
 {
-	struct llist_node *node;
+	struct io_wq_work_node *node, *tail;
+	int ret, limit, nitems;
 	unsigned int loops = 0;
-	int ret, limit;
 
 	if (WARN_ON_ONCE(ctx->submitter_task != current))
 		return -EEXIST;
 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+	ret = 0;
 	limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
 again:
-	ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
-	if (ctx->retry_llist.first)
+	spin_lock_irq(&ctx->work_lock);
+	node = ctx->work_list.first;
+	tail = ctx->work_list.last;
+	nitems = ctx->work_items;
+	INIT_WQ_LIST(&ctx->work_list);
+	ctx->work_items = 0;
+	spin_unlock_irq(&ctx->work_lock);
+
+	while (node) {
+		struct io_kiocb *req = container_of(node, struct io_kiocb,
+						    io_task_work.work_node);
+		node = node->next;
+		INDIRECT_CALL_2(req->io_task_work.func,
+				io_poll_task_func, io_req_rw_complete,
+				req, ts);
+		nitems--;
+		if (++ret >= limit)
+			break;
+	}
+
+	if (unlikely(node)) {
+		spin_lock_irq(&ctx->work_lock);
+		tail->next = ctx->work_list.first;
+		ctx->work_list.first = node;
+		if (!ctx->work_list.last)
+			ctx->work_list.last = tail;
+		ctx->work_items += nitems;
+		spin_unlock_irq(&ctx->work_lock);
 		goto retry_done;
+	}
 
-	/*
-	 * llists are in reverse order, flip it back the right way before
-	 * running the pending items.
-	 */
-	node = llist_reverse_order(llist_del_all(&ctx->work_llist));
-	ret = __io_run_local_work_loop(&node, ts, ret);
-	ctx->retry_llist.first = node;
 	loops++;
-
-	ret = limit - ret;
 	if (io_run_local_work_continue(ctx, ret, min_events))
 		goto again;
 retry_done:
@@ -2413,7 +2409,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
 	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
 		atomic_set(&ctx->cq_wait_nr, 1);
 		smp_mb();
-		if (!llist_empty(&ctx->work_llist))
+		if (io_local_work_pending(ctx))
 			goto out_wake;
 	}
 
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 214f9f175102..2fae27803116 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -349,7 +349,7 @@ static inline int io_run_task_work(void)
 
 static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
 {
-	return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
+	return READ_ONCE(ctx->work_list.first);
 }
 
 static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 3/6] io_uring/slist: add list-to-list list splice helper
  2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
  2024-11-22 16:12 ` [PATCH 1/6] io_uring: make task_work pending check dependent on ring type Jens Axboe
  2024-11-22 16:12 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
  2024-11-22 16:12 ` [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list Jens Axboe
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Add a helper to splice a source list to a destination list.

Signed-off-by: Jens Axboe <[email protected]>
---
 io_uring/slist.h | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/io_uring/slist.h b/io_uring/slist.h
index 0eb194817242..7ac7c136b702 100644
--- a/io_uring/slist.h
+++ b/io_uring/slist.h
@@ -85,6 +85,22 @@ static inline bool wq_list_splice(struct io_wq_work_list *list,
 	return false;
 }
 
+static inline bool wq_list_splice_list(struct io_wq_work_list *src,
+				       struct io_wq_work_list *dst)
+{
+	bool ret = false;
+
+	if (wq_list_empty(dst)) {
+		*dst = *src;
+	} else {
+		dst->last->next = src->first;
+		dst->last = src->last;
+		ret = true;
+	}
+	INIT_WQ_LIST(src);
+	return false;
+}
+
 static inline void wq_stack_add_head(struct io_wq_work_node *node,
 				     struct io_wq_work_node *stack)
 {
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list
  2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
                   ` (2 preceding siblings ...)
  2024-11-22 16:12 ` [PATCH 3/6] io_uring/slist: add list-to-list list splice helper Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
  2024-11-22 16:12 ` [PATCH 5/6] io_uring: add __tctx_task_work_run() helper Jens Axboe
  2024-11-22 16:12 ` [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list Jens Axboe
  5 siblings, 0 replies; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Switch the normal task_work to io_wq_work_list as well, to both unify
with defer task_work, but also to avoid needing to reverse the ordering
of the list when running it.

Note that this still keeps the manual retry list for SQPOLL task_work.
That could go away as well, as now the task_work list is fully ordered
and SQPOLL could just leave entries on there when it chops up the
running of the list.

Signed-off-by: Jens Axboe <[email protected]>
---
 include/linux/io_uring_types.h |  14 ++-
 io_uring/io_uring.c            | 167 ++++++++++++++++++++-------------
 io_uring/io_uring.h            |   6 +-
 io_uring/sqpoll.c              |   8 +-
 io_uring/tctx.c                |   3 +-
 5 files changed, 116 insertions(+), 82 deletions(-)

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index e9ba99cb0ed0..7ddac4d1d4b3 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -102,7 +102,8 @@ struct io_uring_task {
 	struct percpu_counter		inflight;
 
 	struct { /* task_work */
-		struct llist_head	task_list;
+		struct io_wq_work_list	task_list;
+		spinlock_t		task_lock;
 		struct callback_head	task_work;
 	} ____cacheline_aligned_in_smp;
 };
@@ -390,8 +391,9 @@ struct io_ring_ctx {
 	struct mm_struct		*mm_account;
 
 	/* ctx exit and cancelation */
-	struct llist_head		fallback_llist;
-	struct delayed_work		fallback_work;
+	struct io_wq_work_list		fallback_list;
+	spinlock_t			fallback_lock;
+	struct work_struct		fallback_work;
 	struct work_struct		exit_work;
 	struct list_head		tctx_list;
 	struct completion		ref_comp;
@@ -567,11 +569,7 @@ enum {
 typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
 
 struct io_task_work {
-	/* DEFER_TASKRUN uses work_node, regular task_work node */
-	union {
-		struct io_wq_work_node	work_node;
-		struct llist_node	node;
-	};
+	struct io_wq_work_node		node;
 	io_req_tw_func_t		func;
 };
 
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b7eb962e9872..3bb93c77ac3f 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -245,15 +245,26 @@ static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
 static __cold void io_fallback_req_func(struct work_struct *work)
 {
 	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
-						fallback_work.work);
-	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
-	struct io_kiocb *req, *tmp;
+						fallback_work);
+	struct io_wq_work_node *node;
 	struct io_tw_state ts = {};
+	struct io_wq_work_list list;
+
+	spin_lock_irq(&ctx->fallback_lock);
+	list = ctx->fallback_list;
+	INIT_WQ_LIST(&ctx->fallback_list);
+	spin_unlock_irq(&ctx->fallback_lock);
 
 	percpu_ref_get(&ctx->refs);
 	mutex_lock(&ctx->uring_lock);
-	llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
+	node = list.first;
+	while (node) {
+		struct io_kiocb *req;
+
+		req = container_of(node, struct io_kiocb, io_task_work.node);
+		node = node->next;
 		req->io_task_work.func(req, &ts);
+	}
 	io_submit_flush_completions(ctx);
 	mutex_unlock(&ctx->uring_lock);
 	percpu_ref_put(&ctx->refs);
@@ -347,7 +358,9 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 #ifdef CONFIG_FUTEX
 	INIT_HLIST_HEAD(&ctx->futex_list);
 #endif
-	INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
+	INIT_WORK(&ctx->fallback_work, io_fallback_req_func);
+	INIT_WQ_LIST(&ctx->fallback_list);
+	spin_lock_init(&ctx->fallback_lock);
 	INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
 	INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
 	io_napi_init(ctx);
@@ -1033,15 +1046,15 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
  * If more entries than max_entries are available, stop processing once this
  * is reached and return the rest of the list.
  */
-struct llist_node *io_handle_tw_list(struct llist_node *node,
-				     unsigned int *count,
-				     unsigned int max_entries)
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
+					  unsigned int *count,
+					  unsigned int max_entries)
 {
 	struct io_ring_ctx *ctx = NULL;
 	struct io_tw_state ts = { };
 
 	do {
-		struct llist_node *next = node->next;
+		struct io_wq_work_node *next = node->next;
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
 						    io_task_work.node);
 
@@ -1067,55 +1080,84 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
 	return node;
 }
 
-static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync,
-				    struct io_ring_ctx **last_ctx)
+static __cold void __io_fallback_schedule(struct io_ring_ctx *ctx,
+					  struct io_wq_work_list *list,
+					  bool sync)
 {
-	if (sync && *last_ctx != req->ctx) {
-		if (*last_ctx) {
-			flush_delayed_work(&(*last_ctx)->fallback_work);
-			percpu_ref_put(&(*last_ctx)->refs);
-		}
-		*last_ctx = req->ctx;
-		percpu_ref_get(&(*last_ctx)->refs);
-	}
-	if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist))
-		schedule_delayed_work(&req->ctx->fallback_work, 1);
+	bool kick_work = true;
+	unsigned long flags;
+
+	spin_lock_irqsave(&ctx->fallback_lock, flags);
+	kick_work = !wq_list_splice_list(list, &ctx->fallback_list);
+	spin_unlock_irqrestore(&ctx->fallback_lock, flags);
+	if (kick_work)
+		schedule_work(&ctx->fallback_work);
+
+	if (sync)
+		flush_work(&ctx->fallback_work);
+	percpu_ref_put(&ctx->refs);
 }
 
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+static void __io_fallback_tw(struct io_wq_work_list *list, spinlock_t *lock,
+			     bool sync)
 {
-	struct llist_node *node = llist_del_all(&tctx->task_list);
+	struct io_wq_work_list local_list, ctx_list;
 	struct io_ring_ctx *last_ctx = NULL;
+	struct io_wq_work_node *node;
 	struct io_kiocb *req;
+	unsigned long flags;
+
+	spin_lock_irqsave(lock, flags);
+	local_list = *list;
+	INIT_WQ_LIST(list);
+	spin_unlock_irqrestore(lock, flags);
 
+	INIT_WQ_LIST(&ctx_list);
+	node = local_list.first;
 	while (node) {
+		struct io_wq_work_node *next = node->next;
+
 		req = container_of(node, struct io_kiocb, io_task_work.node);
-		node = node->next;
-		__io_fallback_tw(req, sync, &last_ctx);
+		if (last_ctx != req->ctx) {
+			if (last_ctx)
+				__io_fallback_schedule(last_ctx, &ctx_list, sync);
+			last_ctx = req->ctx;
+			percpu_ref_get(&last_ctx->refs);
+		}
+		wq_list_add_tail(node, &ctx_list);
+		node = next;
 	}
 
-	if (last_ctx) {
-		flush_delayed_work(&last_ctx->fallback_work);
-		percpu_ref_put(&last_ctx->refs);
-	}
+	if (last_ctx)
+		__io_fallback_schedule(last_ctx, &ctx_list, sync);
+}
+
+static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+{
+	__io_fallback_tw(&tctx->task_list, &tctx->task_lock, sync);
 }
 
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
-				      unsigned int max_entries,
-				      unsigned int *count)
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+					   unsigned int max_entries,
+					   unsigned int *count)
 {
-	struct llist_node *node;
+	struct io_wq_work_node *node;
 
 	if (unlikely(current->flags & PF_EXITING)) {
 		io_fallback_tw(tctx, true);
 		return NULL;
 	}
 
-	node = llist_del_all(&tctx->task_list);
-	if (node) {
-		node = llist_reverse_order(node);
+	if (!READ_ONCE(tctx->task_list.first))
+		return NULL;
+
+	spin_lock_irq(&tctx->task_lock);
+	node = tctx->task_list.first;
+	INIT_WQ_LIST(&tctx->task_list);
+	spin_unlock_irq(&tctx->task_lock);
+
+	if (node)
 		node = io_handle_tw_list(node, count, max_entries);
-	}
 
 	/* relaxed read is enough as only the task itself sets ->in_cancel */
 	if (unlikely(atomic_read(&tctx->in_cancel)))
@@ -1128,13 +1170,11 @@ struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
 void tctx_task_work(struct callback_head *cb)
 {
 	struct io_uring_task *tctx;
-	struct llist_node *ret;
 	unsigned int count = 0;
 
 	tctx = container_of(cb, struct io_uring_task, task_work);
-	ret = tctx_task_work_run(tctx, UINT_MAX, &count);
-	/* can't happen */
-	WARN_ON_ONCE(ret);
+	if (tctx_task_work_run(tctx, UINT_MAX, &count))
+		WARN_ON_ONCE(1);
 }
 
 static inline void io_req_local_work_add(struct io_kiocb *req,
@@ -1155,7 +1195,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req,
 		tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
 	spin_lock_irqsave(&ctx->work_lock, flags);
-	wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+	wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
 	nr_tw_prev = ctx->work_items++;
 	spin_unlock_irqrestore(&ctx->work_lock, flags);
 
@@ -1192,9 +1232,16 @@ static void io_req_normal_work_add(struct io_kiocb *req)
 {
 	struct io_uring_task *tctx = req->tctx;
 	struct io_ring_ctx *ctx = req->ctx;
+	unsigned long flags;
+	bool was_empty;
+
+	spin_lock_irqsave(&tctx->task_lock, flags);
+	was_empty = tctx->task_list.first == NULL;
+	wq_list_add_tail(&req->io_task_work.node, &tctx->task_list);
+	spin_unlock_irqrestore(&tctx->task_lock, flags);
 
 	/* task_work already pending, we're done */
-	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+	if (!was_empty)
 		return;
 
 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
@@ -1233,27 +1280,13 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
 
 static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
 {
-	struct io_ring_ctx *last_ctx = NULL;
-	struct io_wq_work_node *node;
-	unsigned long flags;
-
-	spin_lock_irqsave(&ctx->work_lock, flags);
-	node = ctx->work_list.first;
-	INIT_WQ_LIST(&ctx->work_list);
-	ctx->work_items = 0;
-	spin_unlock_irqrestore(&ctx->work_lock, flags);
-
-	while (node) {
-		struct io_kiocb *req;
-
-		req = container_of(node, struct io_kiocb, io_task_work.work_node);
-		node = node->next;
-		__io_fallback_tw(req, false, &last_ctx);
-	}
-	if (last_ctx) {
-		flush_delayed_work(&last_ctx->fallback_work);
-		percpu_ref_put(&last_ctx->refs);
-	}
+	/*
+	 * __io_fallback_tw() handles lists that can have multiple
+	 * rings in it, which isn't the case here. But it'll work just
+	 * fine, so use it anyway rather than have a special case for
+	 * just a single ctx.
+	 */
+	__io_fallback_tw(&ctx->work_list, &ctx->work_lock, false);
 }
 
 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1292,7 +1325,7 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 
 	while (node) {
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
-						    io_task_work.work_node);
+						    io_task_work.node);
 		node = node->next;
 		INDIRECT_CALL_2(req->io_task_work.func,
 				io_poll_task_func, io_req_rw_complete,
@@ -2967,7 +3000,7 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 		io_unregister_personality(ctx, index);
 	mutex_unlock(&ctx->uring_lock);
 
-	flush_delayed_work(&ctx->fallback_work);
+	flush_work(&ctx->fallback_work);
 
 	INIT_WORK(&ctx->exit_work, io_ring_exit_work);
 	/*
@@ -3106,7 +3139,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
 	if (tctx)
 		ret |= io_run_task_work() > 0;
 	else
-		ret |= flush_delayed_work(&ctx->fallback_work);
+		ret |= flush_work(&ctx->fallback_work);
 	return ret;
 }
 
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 2fae27803116..0b5181b128aa 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -91,8 +91,10 @@ void io_req_task_queue(struct io_kiocb *req);
 void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
 void io_req_task_queue_fail(struct io_kiocb *req, int ret);
 void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
-struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
+	unsigned int *count, unsigned int max_entries);
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+	unsigned int max_entries, unsigned int *count);
 void tctx_task_work(struct callback_head *cb);
 __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
 int io_uring_alloc_task_context(struct task_struct *task,
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 6df5e649c413..615707260f25 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -221,7 +221,7 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
  * than we were asked to process. Newly queued task_work isn't run until the
  * retry list has been fully processed.
  */
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(struct io_wq_work_node **retry_list, int max_entries)
 {
 	struct io_uring_task *tctx = current->io_uring;
 	unsigned int count = 0;
@@ -239,11 +239,11 @@ static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
 	return count;
 }
 
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(struct io_wq_work_node *retry_list)
 {
 	struct io_uring_task *tctx = current->io_uring;
 
-	return retry_list || !llist_empty(&tctx->task_list);
+	return retry_list || READ_ONCE(tctx->task_list.first);
 }
 
 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
@@ -259,7 +259,7 @@ static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
 
 static int io_sq_thread(void *data)
 {
-	struct llist_node *retry_list = NULL;
+	struct io_wq_work_node *retry_list = NULL;
 	struct io_sq_data *sqd = data;
 	struct io_ring_ctx *ctx;
 	struct rusage start;
diff --git a/io_uring/tctx.c b/io_uring/tctx.c
index 503f3ff8bc4f..7155b3c56c85 100644
--- a/io_uring/tctx.c
+++ b/io_uring/tctx.c
@@ -87,7 +87,8 @@ __cold int io_uring_alloc_task_context(struct task_struct *task,
 	atomic_set(&tctx->in_cancel, 0);
 	atomic_set(&tctx->inflight_tracked, 0);
 	task->io_uring = tctx;
-	init_llist_head(&tctx->task_list);
+	INIT_WQ_LIST(&tctx->task_list);
+	spin_lock_init(&tctx->task_lock);
 	init_task_work(&tctx->task_work, tctx_task_work);
 	return 0;
 }
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 5/6] io_uring: add __tctx_task_work_run() helper
  2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
                   ` (3 preceding siblings ...)
  2024-11-22 16:12 ` [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
  2024-11-22 16:12 ` [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list Jens Axboe
  5 siblings, 0 replies; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

Most use cases only care about running all of the task_work, and they
don't need the node passed back or the work capped. Rename the existing
helper to __tctx_task_work_run(), and add a wrapper around that for the
more basic use cases.

Signed-off-by: Jens Axboe <[email protected]>
---
 io_uring/io_uring.c | 18 ++++++++++++------
 io_uring/io_uring.h |  9 +++------
 io_uring/sqpoll.c   |  2 +-
 3 files changed, 16 insertions(+), 13 deletions(-)

diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 3bb93c77ac3f..bc520a67fc03 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -1137,9 +1137,9 @@ static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
 	__io_fallback_tw(&tctx->task_list, &tctx->task_lock, sync);
 }
 
-struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
-					   unsigned int max_entries,
-					   unsigned int *count)
+struct io_wq_work_node *__tctx_task_work_run(struct io_uring_task *tctx,
+					     unsigned int max_entries,
+					     unsigned int *count)
 {
 	struct io_wq_work_node *node;
 
@@ -1167,14 +1167,20 @@ struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
 	return node;
 }
 
+unsigned int tctx_task_work_run(struct io_uring_task *tctx)
+{
+	unsigned int count = 0;
+
+	__tctx_task_work_run(tctx, UINT_MAX, &count);
+	return count;
+}
+
 void tctx_task_work(struct callback_head *cb)
 {
 	struct io_uring_task *tctx;
-	unsigned int count = 0;
 
 	tctx = container_of(cb, struct io_uring_task, task_work);
-	if (tctx_task_work_run(tctx, UINT_MAX, &count))
-		WARN_ON_ONCE(1);
+	tctx_task_work_run(tctx);
 }
 
 static inline void io_req_local_work_add(struct io_kiocb *req,
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 0b5181b128aa..2b0e7c5db30d 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -93,8 +93,9 @@ void io_req_task_queue_fail(struct io_kiocb *req, int ret);
 void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
 struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
 	unsigned int *count, unsigned int max_entries);
-struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+struct io_wq_work_node *__tctx_task_work_run(struct io_uring_task *tctx,
 	unsigned int max_entries, unsigned int *count);
+unsigned int tctx_task_work_run(struct io_uring_task *tctx);
 void tctx_task_work(struct callback_head *cb);
 __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
 int io_uring_alloc_task_context(struct task_struct *task,
@@ -332,12 +333,8 @@ static inline int io_run_task_work(void)
 			resume_user_mode_work(NULL);
 		}
 		if (current->io_uring) {
-			unsigned int count = 0;
-
 			__set_current_state(TASK_RUNNING);
-			tctx_task_work_run(current->io_uring, UINT_MAX, &count);
-			if (count)
-				ret = true;
+			ret = tctx_task_work_run(current->io_uring) != 0;
 		}
 	}
 	if (task_work_pending(current)) {
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 615707260f25..aec6c2d56910 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -232,7 +232,7 @@ static unsigned int io_sq_tw(struct io_wq_work_node **retry_list, int max_entrie
 			goto out;
 		max_entries -= count;
 	}
-	*retry_list = tctx_task_work_run(tctx, max_entries, &count);
+	*retry_list = __tctx_task_work_run(tctx, max_entries, &count);
 out:
 	if (task_work_pending(current))
 		task_work_run();
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list
  2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
                   ` (4 preceding siblings ...)
  2024-11-22 16:12 ` [PATCH 5/6] io_uring: add __tctx_task_work_run() helper Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
  5 siblings, 0 replies; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

The normal task_work logic doesn't really need it, as it always runs
all of the pending work. But for SQPOLL, it can now pass in its
retry_list which simplifies the tracking of split up task_work running.

This avoids passing io_wq_work_node around. Rather than pass in a list,
SQPOLL could re-add the leftover items to the generic task_work list.
But that requires re-locking the task_lock and using task_list for that,
whereas having a separate retry list allows for skipping those steps.
The downside is that now two lists need checking, but that's now it
was before as well.

Signed-off-by: Jens Axboe <[email protected]>
---
 io_uring/io_uring.c | 36 ++++++++++++++++--------------------
 io_uring/io_uring.h |  9 +++++----
 io_uring/sqpoll.c   | 20 +++++++++++---------
 3 files changed, 32 insertions(+), 33 deletions(-)

diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index bc520a67fc03..5e52d8db3dca 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -1044,20 +1044,20 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
 /*
  * Run queued task_work, returning the number of entries processed in *count.
  * If more entries than max_entries are available, stop processing once this
- * is reached and return the rest of the list.
+ * is reached.
  */
-struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
-					  unsigned int *count,
-					  unsigned int max_entries)
+void io_handle_tw_list(struct io_wq_work_list *list, unsigned int *count,
+		       unsigned int max_entries)
 {
 	struct io_ring_ctx *ctx = NULL;
 	struct io_tw_state ts = { };
 
 	do {
-		struct io_wq_work_node *next = node->next;
+		struct io_wq_work_node *node = list->first;
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
 						    io_task_work.node);
 
+		list->first = node->next;
 		if (req->ctx != ctx) {
 			ctx_flush_and_put(ctx, &ts);
 			ctx = req->ctx;
@@ -1067,17 +1067,15 @@ struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
 		INDIRECT_CALL_2(req->io_task_work.func,
 				io_poll_task_func, io_req_rw_complete,
 				req, &ts);
-		node = next;
 		(*count)++;
 		if (unlikely(need_resched())) {
 			ctx_flush_and_put(ctx, &ts);
 			ctx = NULL;
 			cond_resched();
 		}
-	} while (node && *count < max_entries);
+	} while (list->first && *count < max_entries);
 
 	ctx_flush_and_put(ctx, &ts);
-	return node;
 }
 
 static __cold void __io_fallback_schedule(struct io_ring_ctx *ctx,
@@ -1137,41 +1135,39 @@ static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
 	__io_fallback_tw(&tctx->task_list, &tctx->task_lock, sync);
 }
 
-struct io_wq_work_node *__tctx_task_work_run(struct io_uring_task *tctx,
-					     unsigned int max_entries,
-					     unsigned int *count)
+void __tctx_task_work_run(struct io_uring_task *tctx,
+			  struct io_wq_work_list *list,
+			  unsigned int max_entries, unsigned int *count)
 {
-	struct io_wq_work_node *node;
-
 	if (unlikely(current->flags & PF_EXITING)) {
 		io_fallback_tw(tctx, true);
-		return NULL;
+		return;
 	}
 
 	if (!READ_ONCE(tctx->task_list.first))
-		return NULL;
+		return;
 
 	spin_lock_irq(&tctx->task_lock);
-	node = tctx->task_list.first;
+	*list = tctx->task_list;
 	INIT_WQ_LIST(&tctx->task_list);
 	spin_unlock_irq(&tctx->task_lock);
 
-	if (node)
-		node = io_handle_tw_list(node, count, max_entries);
+	if (!wq_list_empty(list))
+		io_handle_tw_list(list, count, max_entries);
 
 	/* relaxed read is enough as only the task itself sets ->in_cancel */
 	if (unlikely(atomic_read(&tctx->in_cancel)))
 		io_uring_drop_tctx_refs(current);
 
 	trace_io_uring_task_work_run(tctx, *count);
-	return node;
 }
 
 unsigned int tctx_task_work_run(struct io_uring_task *tctx)
 {
+	struct io_wq_work_list list;
 	unsigned int count = 0;
 
-	__tctx_task_work_run(tctx, UINT_MAX, &count);
+	__tctx_task_work_run(tctx, &list, UINT_MAX, &count);
 	return count;
 }
 
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 2b0e7c5db30d..74b1468aefda 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -91,10 +91,11 @@ void io_req_task_queue(struct io_kiocb *req);
 void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
 void io_req_task_queue_fail(struct io_kiocb *req, int ret);
 void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
-struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
-	unsigned int *count, unsigned int max_entries);
-struct io_wq_work_node *__tctx_task_work_run(struct io_uring_task *tctx,
-	unsigned int max_entries, unsigned int *count);
+void io_handle_tw_list(struct io_wq_work_list *list, unsigned int *count,
+	unsigned int max_entries);
+void __tctx_task_work_run(struct io_uring_task *tctx,
+	struct io_wq_work_list *list, unsigned int max_entries,
+	unsigned int *count);
 unsigned int tctx_task_work_run(struct io_uring_task *tctx);
 void tctx_task_work(struct callback_head *cb);
 __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index aec6c2d56910..3cd50369db5a 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -221,29 +221,29 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
  * than we were asked to process. Newly queued task_work isn't run until the
  * retry list has been fully processed.
  */
-static unsigned int io_sq_tw(struct io_wq_work_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(struct io_wq_work_list *retry_list, int max_entries)
 {
 	struct io_uring_task *tctx = current->io_uring;
 	unsigned int count = 0;
 
-	if (*retry_list) {
-		*retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
+	if (!wq_list_empty(retry_list)) {
+		io_handle_tw_list(retry_list, &count, max_entries);
 		if (count >= max_entries)
 			goto out;
 		max_entries -= count;
 	}
-	*retry_list = __tctx_task_work_run(tctx, max_entries, &count);
+	__tctx_task_work_run(tctx, retry_list, max_entries, &count);
 out:
 	if (task_work_pending(current))
 		task_work_run();
 	return count;
 }
 
-static bool io_sq_tw_pending(struct io_wq_work_node *retry_list)
+static bool io_sq_tw_pending(struct io_wq_work_list *retry_list)
 {
 	struct io_uring_task *tctx = current->io_uring;
 
-	return retry_list || READ_ONCE(tctx->task_list.first);
+	return !wq_list_empty(retry_list) || !wq_list_empty(&tctx->task_list);
 }
 
 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
@@ -259,7 +259,7 @@ static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
 
 static int io_sq_thread(void *data)
 {
-	struct io_wq_work_node *retry_list = NULL;
+	struct io_wq_work_list retry_list;
 	struct io_sq_data *sqd = data;
 	struct io_ring_ctx *ctx;
 	struct rusage start;
@@ -292,6 +292,7 @@ static int io_sq_thread(void *data)
 	audit_uring_entry(IORING_OP_NOP);
 	audit_uring_exit(true, 0);
 
+	INIT_WQ_LIST(&retry_list);
 	mutex_lock(&sqd->lock);
 	while (1) {
 		bool cap_entries, sqt_spin = false;
@@ -332,7 +333,8 @@ static int io_sq_thread(void *data)
 		}
 
 		prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
-		if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
+		if (!io_sqd_events_pending(sqd) &&
+		    !io_sq_tw_pending(&retry_list)) {
 			bool needs_sched = true;
 
 			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -371,7 +373,7 @@ static int io_sq_thread(void *data)
 		timeout = jiffies + sqd->sq_thread_idle;
 	}
 
-	if (retry_list)
+	if (!wq_list_empty(&retry_list))
 		io_sq_tw(&retry_list, UINT_MAX);
 
 	io_uring_cancel_generic(true, sqd);
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
  2024-11-22 16:12 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Jens Axboe
@ 2024-11-22 17:07   ` Pavel Begunkov
  2024-11-22 17:11     ` Jens Axboe
  0 siblings, 1 reply; 11+ messages in thread
From: Pavel Begunkov @ 2024-11-22 17:07 UTC (permalink / raw)
  To: Jens Axboe, io-uring

On 11/22/24 16:12, Jens Axboe wrote:
...
>   static inline void io_req_local_work_add(struct io_kiocb *req,
>   					 struct io_ring_ctx *ctx,
> -					 unsigned flags)
> +					 unsigned tw_flags)
>   {
> -	unsigned nr_wait, nr_tw, nr_tw_prev;
> -	struct llist_node *head;
> +	unsigned nr_tw, nr_tw_prev, nr_wait;
> +	unsigned long flags;
>   
>   	/* See comment above IO_CQ_WAKE_INIT */
>   	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>   
>   	/*
> -	 * We don't know how many reuqests is there in the link and whether
> -	 * they can even be queued lazily, fall back to non-lazy.
> +	 * We don't know how many requests are in the link and whether they can
> +	 * even be queued lazily, fall back to non-lazy.
>   	 */
>   	if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
> -		flags &= ~IOU_F_TWQ_LAZY_WAKE;
> +		tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>   
> -	guard(rcu)();

protects against ctx->task deallocation, see a comment in
io_ring_exit_work() -> synchronize_rcu()

> +	spin_lock_irqsave(&ctx->work_lock, flags);
> +	wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
> +	nr_tw_prev = ctx->work_items++;

Is there a good reason why it changes the semantics of
what's stored across adds? It was assigning a corrected
nr_tw, this one will start heavily spamming with wake_up()
in some cases.

> +	spin_unlock_irqrestore(&ctx->work_lock, flags);
>   
> -	head = READ_ONCE(ctx->work_llist.first);
> -	do {
> -		nr_tw_prev = 0;
> -		if (head) {
> -			struct io_kiocb *first_req = container_of(head,
> -							struct io_kiocb,
> -							io_task_work.node);
> -			/*
> -			 * Might be executed at any moment, rely on
> -			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
> -			 */
> -			nr_tw_prev = READ_ONCE(first_req->nr_tw);
> -		}
> -
> -		/*
> -		 * Theoretically, it can overflow, but that's fine as one of
> -		 * previous adds should've tried to wake the task.
> -		 */
> -		nr_tw = nr_tw_prev + 1;
> -		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
> -			nr_tw = IO_CQ_WAKE_FORCE;
> -
> -		req->nr_tw = nr_tw;
> -		req->io_task_work.node.next = head;
> -	} while (!try_cmpxchg(&ctx->work_llist.first, &head,
> -			      &req->io_task_work.node));
> -
> -	/*
> -	 * cmpxchg implies a full barrier, which pairs with the barrier
> -	 * in set_current_state() on the io_cqring_wait() side. It's used
> -	 * to ensure that either we see updated ->cq_wait_nr, or waiters
> -	 * going to sleep will observe the work added to the list, which
> -	 * is similar to the wait/wawke task state sync.
> -	 */
> +	nr_tw = nr_tw_prev + 1;
> +	if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
> +		nr_tw = IO_CQ_WAKE_FORCE;
>   
> -	if (!head) {
> +	if (!nr_tw_prev) {
>   		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
>   			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
>   		if (ctx->has_evfd)
>   			io_eventfd_signal(ctx);
>   	}
>   
> +	/*
> +	 * We need a barrier after unlock, which pairs with the barrier
> +	 * in set_current_state() on the io_cqring_wait() side. It's used
> +	 * to ensure that either we see updated ->cq_wait_nr, or waiters
> +	 * going to sleep will observe the work added to the list, which
> +	 * is similar to the wait/wake task state sync.
> +	 */
> +	smp_mb();
>   	nr_wait = atomic_read(&ctx->cq_wait_nr);
>   	/* not enough or no one is waiting */
>   	if (nr_tw < nr_wait)
> @@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,

-- 
Pavel Begunkov

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
  2024-11-22 17:07   ` Pavel Begunkov
@ 2024-11-22 17:11     ` Jens Axboe
  2024-11-22 17:25       ` Pavel Begunkov
  0 siblings, 1 reply; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 17:11 UTC (permalink / raw)
  To: Pavel Begunkov, io-uring

On 11/22/24 10:07 AM, Pavel Begunkov wrote:
> On 11/22/24 16:12, Jens Axboe wrote:
> ...
>>   static inline void io_req_local_work_add(struct io_kiocb *req,
>>                        struct io_ring_ctx *ctx,
>> -                     unsigned flags)
>> +                     unsigned tw_flags)
>>   {
>> -    unsigned nr_wait, nr_tw, nr_tw_prev;
>> -    struct llist_node *head;
>> +    unsigned nr_tw, nr_tw_prev, nr_wait;
>> +    unsigned long flags;
>>         /* See comment above IO_CQ_WAKE_INIT */
>>       BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>         /*
>> -     * We don't know how many reuqests is there in the link and whether
>> -     * they can even be queued lazily, fall back to non-lazy.
>> +     * We don't know how many requests are in the link and whether they can
>> +     * even be queued lazily, fall back to non-lazy.
>>        */
>>       if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>> -        flags &= ~IOU_F_TWQ_LAZY_WAKE;
>> +        tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>   -    guard(rcu)();
> 
> protects against ctx->task deallocation, see a comment in
> io_ring_exit_work() -> synchronize_rcu()

Yeah that's just an editing mistake.

>> +    spin_lock_irqsave(&ctx->work_lock, flags);
>> +    wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>> +    nr_tw_prev = ctx->work_items++;
> 
> Is there a good reason why it changes the semantics of
> what's stored across adds? It was assigning a corrected
> nr_tw, this one will start heavily spamming with wake_up()
> in some cases.

Not sure I follow, how so? nr_tw_prev will be the previous count, just
like before. Except we won't need to dig into the list to find it, we
have it readily available. nr_tw will be the current code, or force wake
if needed. As before.

-- 
Jens Axboe

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
  2024-11-22 17:11     ` Jens Axboe
@ 2024-11-22 17:25       ` Pavel Begunkov
  2024-11-22 17:44         ` Jens Axboe
  0 siblings, 1 reply; 11+ messages in thread
From: Pavel Begunkov @ 2024-11-22 17:25 UTC (permalink / raw)
  To: Jens Axboe, io-uring

On 11/22/24 17:11, Jens Axboe wrote:
> On 11/22/24 10:07 AM, Pavel Begunkov wrote:
>> On 11/22/24 16:12, Jens Axboe wrote:
>> ...
>>>    static inline void io_req_local_work_add(struct io_kiocb *req,
>>>                         struct io_ring_ctx *ctx,
>>> -                     unsigned flags)
>>> +                     unsigned tw_flags)
>>>    {
>>> -    unsigned nr_wait, nr_tw, nr_tw_prev;
>>> -    struct llist_node *head;
>>> +    unsigned nr_tw, nr_tw_prev, nr_wait;
>>> +    unsigned long flags;
>>>          /* See comment above IO_CQ_WAKE_INIT */
>>>        BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>>          /*
>>> -     * We don't know how many reuqests is there in the link and whether
>>> -     * they can even be queued lazily, fall back to non-lazy.
>>> +     * We don't know how many requests are in the link and whether they can
>>> +     * even be queued lazily, fall back to non-lazy.
>>>         */
>>>        if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>>> -        flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>> +        tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>    -    guard(rcu)();
>>
>> protects against ctx->task deallocation, see a comment in
>> io_ring_exit_work() -> synchronize_rcu()
> 
> Yeah that's just an editing mistake.
> 
>>> +    spin_lock_irqsave(&ctx->work_lock, flags);
>>> +    wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>>> +    nr_tw_prev = ctx->work_items++;
>>
>> Is there a good reason why it changes the semantics of
>> what's stored across adds? It was assigning a corrected
>> nr_tw, this one will start heavily spamming with wake_up()
>> in some cases.
> 
> Not sure I follow, how so? nr_tw_prev will be the previous count, just
> like before. Except we won't need to dig into the list to find it, we
> have it readily available. nr_tw will be the current code, or force wake
> if needed. As before.

The problem is what it stores, not how and where. Before req->nr_tw
could've been set to IO_CQ_WAKE_FORCE, in which case following
requests are not going to attempt waking up the task, now work_items
is just a counter.

Let's say you've got a bunch of non-lazy adds coming close to each
other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and
others just queue themselves in the list. Now, every single one
of them will try to wake_up() as long as ->cq_wait_nr is large
enough.

-- 
Pavel Begunkov

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
  2024-11-22 17:25       ` Pavel Begunkov
@ 2024-11-22 17:44         ` Jens Axboe
  0 siblings, 0 replies; 11+ messages in thread
From: Jens Axboe @ 2024-11-22 17:44 UTC (permalink / raw)
  To: Pavel Begunkov, io-uring

On 11/22/24 10:25 AM, Pavel Begunkov wrote:
> On 11/22/24 17:11, Jens Axboe wrote:
>> On 11/22/24 10:07 AM, Pavel Begunkov wrote:
>>> On 11/22/24 16:12, Jens Axboe wrote:
>>> ...
>>>>    static inline void io_req_local_work_add(struct io_kiocb *req,
>>>>                         struct io_ring_ctx *ctx,
>>>> -                     unsigned flags)
>>>> +                     unsigned tw_flags)
>>>>    {
>>>> -    unsigned nr_wait, nr_tw, nr_tw_prev;
>>>> -    struct llist_node *head;
>>>> +    unsigned nr_tw, nr_tw_prev, nr_wait;
>>>> +    unsigned long flags;
>>>>          /* See comment above IO_CQ_WAKE_INIT */
>>>>        BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>>>          /*
>>>> -     * We don't know how many reuqests is there in the link and whether
>>>> -     * they can even be queued lazily, fall back to non-lazy.
>>>> +     * We don't know how many requests are in the link and whether they can
>>>> +     * even be queued lazily, fall back to non-lazy.
>>>>         */
>>>>        if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>>>> -        flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>> +        tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>>    -    guard(rcu)();
>>>
>>> protects against ctx->task deallocation, see a comment in
>>> io_ring_exit_work() -> synchronize_rcu()
>>
>> Yeah that's just an editing mistake.
>>
>>>> +    spin_lock_irqsave(&ctx->work_lock, flags);
>>>> +    wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>>>> +    nr_tw_prev = ctx->work_items++;
>>>
>>> Is there a good reason why it changes the semantics of
>>> what's stored across adds? It was assigning a corrected
>>> nr_tw, this one will start heavily spamming with wake_up()
>>> in some cases.
>>
>> Not sure I follow, how so? nr_tw_prev will be the previous count, just
>> like before. Except we won't need to dig into the list to find it, we
>> have it readily available. nr_tw will be the current code, or force wake
>> if needed. As before.
> 
> The problem is what it stores, not how and where. Before req->nr_tw
> could've been set to IO_CQ_WAKE_FORCE, in which case following
> requests are not going to attempt waking up the task, now work_items
> is just a counter.
> 
> Let's say you've got a bunch of non-lazy adds coming close to each
> other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and
> others just queue themselves in the list. Now, every single one
> of them will try to wake_up() as long as ->cq_wait_nr is large
> enough.

If we really care about the non-lazy path as much, we can just use the
same storing scheme as we did in req->nr_tw, except in ->work_items
instead. Not a big deal imho.

-- 
Jens Axboe

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2024-11-22 17:44 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
2024-11-22 16:12 ` [PATCH 1/6] io_uring: make task_work pending check dependent on ring type Jens Axboe
2024-11-22 16:12 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Jens Axboe
2024-11-22 17:07   ` Pavel Begunkov
2024-11-22 17:11     ` Jens Axboe
2024-11-22 17:25       ` Pavel Begunkov
2024-11-22 17:44         ` Jens Axboe
2024-11-22 16:12 ` [PATCH 3/6] io_uring/slist: add list-to-list list splice helper Jens Axboe
2024-11-22 16:12 ` [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list Jens Axboe
2024-11-22 16:12 ` [PATCH 5/6] io_uring: add __tctx_task_work_run() helper Jens Axboe
2024-11-22 16:12 ` [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox