public inbox for [email protected]
 help / color / mirror / Atom feed
From: Jens Axboe <[email protected]>
To: [email protected]
Cc: Jens Axboe <[email protected]>
Subject: [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list
Date: Fri, 22 Nov 2024 09:12:42 -0700	[thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>

Switch the normal task_work to io_wq_work_list as well, to both unify
with defer task_work, but also to avoid needing to reverse the ordering
of the list when running it.

Note that this still keeps the manual retry list for SQPOLL task_work.
That could go away as well, as now the task_work list is fully ordered
and SQPOLL could just leave entries on there when it chops up the
running of the list.

Signed-off-by: Jens Axboe <[email protected]>
---
 include/linux/io_uring_types.h |  14 ++-
 io_uring/io_uring.c            | 167 ++++++++++++++++++++-------------
 io_uring/io_uring.h            |   6 +-
 io_uring/sqpoll.c              |   8 +-
 io_uring/tctx.c                |   3 +-
 5 files changed, 116 insertions(+), 82 deletions(-)

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index e9ba99cb0ed0..7ddac4d1d4b3 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -102,7 +102,8 @@ struct io_uring_task {
 	struct percpu_counter		inflight;
 
 	struct { /* task_work */
-		struct llist_head	task_list;
+		struct io_wq_work_list	task_list;
+		spinlock_t		task_lock;
 		struct callback_head	task_work;
 	} ____cacheline_aligned_in_smp;
 };
@@ -390,8 +391,9 @@ struct io_ring_ctx {
 	struct mm_struct		*mm_account;
 
 	/* ctx exit and cancelation */
-	struct llist_head		fallback_llist;
-	struct delayed_work		fallback_work;
+	struct io_wq_work_list		fallback_list;
+	spinlock_t			fallback_lock;
+	struct work_struct		fallback_work;
 	struct work_struct		exit_work;
 	struct list_head		tctx_list;
 	struct completion		ref_comp;
@@ -567,11 +569,7 @@ enum {
 typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
 
 struct io_task_work {
-	/* DEFER_TASKRUN uses work_node, regular task_work node */
-	union {
-		struct io_wq_work_node	work_node;
-		struct llist_node	node;
-	};
+	struct io_wq_work_node		node;
 	io_req_tw_func_t		func;
 };
 
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b7eb962e9872..3bb93c77ac3f 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -245,15 +245,26 @@ static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
 static __cold void io_fallback_req_func(struct work_struct *work)
 {
 	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
-						fallback_work.work);
-	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
-	struct io_kiocb *req, *tmp;
+						fallback_work);
+	struct io_wq_work_node *node;
 	struct io_tw_state ts = {};
+	struct io_wq_work_list list;
+
+	spin_lock_irq(&ctx->fallback_lock);
+	list = ctx->fallback_list;
+	INIT_WQ_LIST(&ctx->fallback_list);
+	spin_unlock_irq(&ctx->fallback_lock);
 
 	percpu_ref_get(&ctx->refs);
 	mutex_lock(&ctx->uring_lock);
-	llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
+	node = list.first;
+	while (node) {
+		struct io_kiocb *req;
+
+		req = container_of(node, struct io_kiocb, io_task_work.node);
+		node = node->next;
 		req->io_task_work.func(req, &ts);
+	}
 	io_submit_flush_completions(ctx);
 	mutex_unlock(&ctx->uring_lock);
 	percpu_ref_put(&ctx->refs);
@@ -347,7 +358,9 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 #ifdef CONFIG_FUTEX
 	INIT_HLIST_HEAD(&ctx->futex_list);
 #endif
-	INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
+	INIT_WORK(&ctx->fallback_work, io_fallback_req_func);
+	INIT_WQ_LIST(&ctx->fallback_list);
+	spin_lock_init(&ctx->fallback_lock);
 	INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
 	INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
 	io_napi_init(ctx);
@@ -1033,15 +1046,15 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
  * If more entries than max_entries are available, stop processing once this
  * is reached and return the rest of the list.
  */
-struct llist_node *io_handle_tw_list(struct llist_node *node,
-				     unsigned int *count,
-				     unsigned int max_entries)
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
+					  unsigned int *count,
+					  unsigned int max_entries)
 {
 	struct io_ring_ctx *ctx = NULL;
 	struct io_tw_state ts = { };
 
 	do {
-		struct llist_node *next = node->next;
+		struct io_wq_work_node *next = node->next;
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
 						    io_task_work.node);
 
@@ -1067,55 +1080,84 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
 	return node;
 }
 
-static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync,
-				    struct io_ring_ctx **last_ctx)
+static __cold void __io_fallback_schedule(struct io_ring_ctx *ctx,
+					  struct io_wq_work_list *list,
+					  bool sync)
 {
-	if (sync && *last_ctx != req->ctx) {
-		if (*last_ctx) {
-			flush_delayed_work(&(*last_ctx)->fallback_work);
-			percpu_ref_put(&(*last_ctx)->refs);
-		}
-		*last_ctx = req->ctx;
-		percpu_ref_get(&(*last_ctx)->refs);
-	}
-	if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist))
-		schedule_delayed_work(&req->ctx->fallback_work, 1);
+	bool kick_work = true;
+	unsigned long flags;
+
+	spin_lock_irqsave(&ctx->fallback_lock, flags);
+	kick_work = !wq_list_splice_list(list, &ctx->fallback_list);
+	spin_unlock_irqrestore(&ctx->fallback_lock, flags);
+	if (kick_work)
+		schedule_work(&ctx->fallback_work);
+
+	if (sync)
+		flush_work(&ctx->fallback_work);
+	percpu_ref_put(&ctx->refs);
 }
 
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+static void __io_fallback_tw(struct io_wq_work_list *list, spinlock_t *lock,
+			     bool sync)
 {
-	struct llist_node *node = llist_del_all(&tctx->task_list);
+	struct io_wq_work_list local_list, ctx_list;
 	struct io_ring_ctx *last_ctx = NULL;
+	struct io_wq_work_node *node;
 	struct io_kiocb *req;
+	unsigned long flags;
+
+	spin_lock_irqsave(lock, flags);
+	local_list = *list;
+	INIT_WQ_LIST(list);
+	spin_unlock_irqrestore(lock, flags);
 
+	INIT_WQ_LIST(&ctx_list);
+	node = local_list.first;
 	while (node) {
+		struct io_wq_work_node *next = node->next;
+
 		req = container_of(node, struct io_kiocb, io_task_work.node);
-		node = node->next;
-		__io_fallback_tw(req, sync, &last_ctx);
+		if (last_ctx != req->ctx) {
+			if (last_ctx)
+				__io_fallback_schedule(last_ctx, &ctx_list, sync);
+			last_ctx = req->ctx;
+			percpu_ref_get(&last_ctx->refs);
+		}
+		wq_list_add_tail(node, &ctx_list);
+		node = next;
 	}
 
-	if (last_ctx) {
-		flush_delayed_work(&last_ctx->fallback_work);
-		percpu_ref_put(&last_ctx->refs);
-	}
+	if (last_ctx)
+		__io_fallback_schedule(last_ctx, &ctx_list, sync);
+}
+
+static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+{
+	__io_fallback_tw(&tctx->task_list, &tctx->task_lock, sync);
 }
 
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
-				      unsigned int max_entries,
-				      unsigned int *count)
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+					   unsigned int max_entries,
+					   unsigned int *count)
 {
-	struct llist_node *node;
+	struct io_wq_work_node *node;
 
 	if (unlikely(current->flags & PF_EXITING)) {
 		io_fallback_tw(tctx, true);
 		return NULL;
 	}
 
-	node = llist_del_all(&tctx->task_list);
-	if (node) {
-		node = llist_reverse_order(node);
+	if (!READ_ONCE(tctx->task_list.first))
+		return NULL;
+
+	spin_lock_irq(&tctx->task_lock);
+	node = tctx->task_list.first;
+	INIT_WQ_LIST(&tctx->task_list);
+	spin_unlock_irq(&tctx->task_lock);
+
+	if (node)
 		node = io_handle_tw_list(node, count, max_entries);
-	}
 
 	/* relaxed read is enough as only the task itself sets ->in_cancel */
 	if (unlikely(atomic_read(&tctx->in_cancel)))
@@ -1128,13 +1170,11 @@ struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
 void tctx_task_work(struct callback_head *cb)
 {
 	struct io_uring_task *tctx;
-	struct llist_node *ret;
 	unsigned int count = 0;
 
 	tctx = container_of(cb, struct io_uring_task, task_work);
-	ret = tctx_task_work_run(tctx, UINT_MAX, &count);
-	/* can't happen */
-	WARN_ON_ONCE(ret);
+	if (tctx_task_work_run(tctx, UINT_MAX, &count))
+		WARN_ON_ONCE(1);
 }
 
 static inline void io_req_local_work_add(struct io_kiocb *req,
@@ -1155,7 +1195,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req,
 		tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
 	spin_lock_irqsave(&ctx->work_lock, flags);
-	wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+	wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
 	nr_tw_prev = ctx->work_items++;
 	spin_unlock_irqrestore(&ctx->work_lock, flags);
 
@@ -1192,9 +1232,16 @@ static void io_req_normal_work_add(struct io_kiocb *req)
 {
 	struct io_uring_task *tctx = req->tctx;
 	struct io_ring_ctx *ctx = req->ctx;
+	unsigned long flags;
+	bool was_empty;
+
+	spin_lock_irqsave(&tctx->task_lock, flags);
+	was_empty = tctx->task_list.first == NULL;
+	wq_list_add_tail(&req->io_task_work.node, &tctx->task_list);
+	spin_unlock_irqrestore(&tctx->task_lock, flags);
 
 	/* task_work already pending, we're done */
-	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+	if (!was_empty)
 		return;
 
 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
@@ -1233,27 +1280,13 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
 
 static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
 {
-	struct io_ring_ctx *last_ctx = NULL;
-	struct io_wq_work_node *node;
-	unsigned long flags;
-
-	spin_lock_irqsave(&ctx->work_lock, flags);
-	node = ctx->work_list.first;
-	INIT_WQ_LIST(&ctx->work_list);
-	ctx->work_items = 0;
-	spin_unlock_irqrestore(&ctx->work_lock, flags);
-
-	while (node) {
-		struct io_kiocb *req;
-
-		req = container_of(node, struct io_kiocb, io_task_work.work_node);
-		node = node->next;
-		__io_fallback_tw(req, false, &last_ctx);
-	}
-	if (last_ctx) {
-		flush_delayed_work(&last_ctx->fallback_work);
-		percpu_ref_put(&last_ctx->refs);
-	}
+	/*
+	 * __io_fallback_tw() handles lists that can have multiple
+	 * rings in it, which isn't the case here. But it'll work just
+	 * fine, so use it anyway rather than have a special case for
+	 * just a single ctx.
+	 */
+	__io_fallback_tw(&ctx->work_list, &ctx->work_lock, false);
 }
 
 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1292,7 +1325,7 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 
 	while (node) {
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
-						    io_task_work.work_node);
+						    io_task_work.node);
 		node = node->next;
 		INDIRECT_CALL_2(req->io_task_work.func,
 				io_poll_task_func, io_req_rw_complete,
@@ -2967,7 +3000,7 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 		io_unregister_personality(ctx, index);
 	mutex_unlock(&ctx->uring_lock);
 
-	flush_delayed_work(&ctx->fallback_work);
+	flush_work(&ctx->fallback_work);
 
 	INIT_WORK(&ctx->exit_work, io_ring_exit_work);
 	/*
@@ -3106,7 +3139,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
 	if (tctx)
 		ret |= io_run_task_work() > 0;
 	else
-		ret |= flush_delayed_work(&ctx->fallback_work);
+		ret |= flush_work(&ctx->fallback_work);
 	return ret;
 }
 
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 2fae27803116..0b5181b128aa 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -91,8 +91,10 @@ void io_req_task_queue(struct io_kiocb *req);
 void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
 void io_req_task_queue_fail(struct io_kiocb *req, int ret);
 void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
-struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
+	unsigned int *count, unsigned int max_entries);
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+	unsigned int max_entries, unsigned int *count);
 void tctx_task_work(struct callback_head *cb);
 __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
 int io_uring_alloc_task_context(struct task_struct *task,
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 6df5e649c413..615707260f25 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -221,7 +221,7 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
  * than we were asked to process. Newly queued task_work isn't run until the
  * retry list has been fully processed.
  */
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(struct io_wq_work_node **retry_list, int max_entries)
 {
 	struct io_uring_task *tctx = current->io_uring;
 	unsigned int count = 0;
@@ -239,11 +239,11 @@ static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
 	return count;
 }
 
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(struct io_wq_work_node *retry_list)
 {
 	struct io_uring_task *tctx = current->io_uring;
 
-	return retry_list || !llist_empty(&tctx->task_list);
+	return retry_list || READ_ONCE(tctx->task_list.first);
 }
 
 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
@@ -259,7 +259,7 @@ static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
 
 static int io_sq_thread(void *data)
 {
-	struct llist_node *retry_list = NULL;
+	struct io_wq_work_node *retry_list = NULL;
 	struct io_sq_data *sqd = data;
 	struct io_ring_ctx *ctx;
 	struct rusage start;
diff --git a/io_uring/tctx.c b/io_uring/tctx.c
index 503f3ff8bc4f..7155b3c56c85 100644
--- a/io_uring/tctx.c
+++ b/io_uring/tctx.c
@@ -87,7 +87,8 @@ __cold int io_uring_alloc_task_context(struct task_struct *task,
 	atomic_set(&tctx->in_cancel, 0);
 	atomic_set(&tctx->inflight_tracked, 0);
 	task->io_uring = tctx;
-	init_llist_head(&tctx->task_list);
+	INIT_WQ_LIST(&tctx->task_list);
+	spin_lock_init(&tctx->task_lock);
 	init_task_work(&tctx->task_work, tctx_task_work);
 	return 0;
 }
-- 
2.45.2


  parent reply	other threads:[~2024-11-22 16:16 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
2024-11-22 16:12 ` [PATCH 1/6] io_uring: make task_work pending check dependent on ring type Jens Axboe
2024-11-22 16:12 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Jens Axboe
2024-11-22 17:07   ` Pavel Begunkov
2024-11-22 17:11     ` Jens Axboe
2024-11-22 17:25       ` Pavel Begunkov
2024-11-22 17:44         ` Jens Axboe
2024-11-23  0:36           ` Pavel Begunkov
2024-11-22 16:12 ` [PATCH 3/6] io_uring/slist: add list-to-list list splice helper Jens Axboe
2024-11-22 16:12 ` Jens Axboe [this message]
2024-11-22 16:12 ` [PATCH 5/6] io_uring: add __tctx_task_work_run() helper Jens Axboe
2024-11-22 16:12 ` [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list Jens Axboe

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox