From: Jens Axboe <[email protected]>
To: [email protected]
Cc: Jens Axboe <[email protected]>
Subject: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
Date: Fri, 22 Nov 2024 09:12:40 -0700 [thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
Add a spinlock for the list, and replace the lockless llist with the
work list instead. This avoids needing to reverse items in the list
before running them, as the io_wq_work_list is FIFO by nature whereas
the llist is LIFO.
Signed-off-by: Jens Axboe <[email protected]>
---
include/linux/io_uring_types.h | 13 ++-
io_uring/io_uring.c | 194 ++++++++++++++++-----------------
io_uring/io_uring.h | 2 +-
3 files changed, 104 insertions(+), 105 deletions(-)
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 011860ade268..e9ba99cb0ed0 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -335,8 +335,9 @@ struct io_ring_ctx {
* regularly bounce b/w CPUs.
*/
struct {
- struct llist_head work_llist;
- struct llist_head retry_llist;
+ struct io_wq_work_list work_list;
+ spinlock_t work_lock;
+ int work_items;
unsigned long check_cq;
atomic_t cq_wait_nr;
atomic_t cq_timeouts;
@@ -566,7 +567,11 @@ enum {
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
struct io_task_work {
- struct llist_node node;
+ /* DEFER_TASKRUN uses work_node, regular task_work node */
+ union {
+ struct io_wq_work_node work_node;
+ struct llist_node node;
+ };
io_req_tw_func_t func;
};
@@ -622,8 +627,6 @@ struct io_kiocb {
*/
u16 buf_index;
- unsigned nr_tw;
-
/* REQ_F_* flags */
io_req_flags_t flags;
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index c3a7d0197636..b7eb962e9872 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -339,7 +339,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
- init_llist_head(&ctx->work_llist);
+ INIT_WQ_LIST(&ctx->work_list);
+ spin_lock_init(&ctx->work_lock);
INIT_LIST_HEAD(&ctx->tctx_list);
ctx->submit_state.free_list.next = NULL;
INIT_HLIST_HEAD(&ctx->waitid_list);
@@ -1066,25 +1067,31 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
return node;
}
-static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
+static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync,
+ struct io_ring_ctx **last_ctx)
{
+ if (sync && *last_ctx != req->ctx) {
+ if (*last_ctx) {
+ flush_delayed_work(&(*last_ctx)->fallback_work);
+ percpu_ref_put(&(*last_ctx)->refs);
+ }
+ *last_ctx = req->ctx;
+ percpu_ref_get(&(*last_ctx)->refs);
+ }
+ if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist))
+ schedule_delayed_work(&req->ctx->fallback_work, 1);
+}
+
+static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+{
+ struct llist_node *node = llist_del_all(&tctx->task_list);
struct io_ring_ctx *last_ctx = NULL;
struct io_kiocb *req;
while (node) {
req = container_of(node, struct io_kiocb, io_task_work.node);
node = node->next;
- if (sync && last_ctx != req->ctx) {
- if (last_ctx) {
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
- last_ctx = req->ctx;
- percpu_ref_get(&last_ctx->refs);
- }
- if (llist_add(&req->io_task_work.node,
- &req->ctx->fallback_llist))
- schedule_delayed_work(&req->ctx->fallback_work, 1);
+ __io_fallback_tw(req, sync, &last_ctx);
}
if (last_ctx) {
@@ -1093,13 +1100,6 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
}
}
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
-{
- struct llist_node *node = llist_del_all(&tctx->task_list);
-
- __io_fallback_tw(node, sync);
-}
-
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
unsigned int max_entries,
unsigned int *count)
@@ -1139,65 +1139,45 @@ void tctx_task_work(struct callback_head *cb)
static inline void io_req_local_work_add(struct io_kiocb *req,
struct io_ring_ctx *ctx,
- unsigned flags)
+ unsigned tw_flags)
{
- unsigned nr_wait, nr_tw, nr_tw_prev;
- struct llist_node *head;
+ unsigned nr_tw, nr_tw_prev, nr_wait;
+ unsigned long flags;
/* See comment above IO_CQ_WAKE_INIT */
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
/*
- * We don't know how many reuqests is there in the link and whether
- * they can even be queued lazily, fall back to non-lazy.
+ * We don't know how many requests are in the link and whether they can
+ * even be queued lazily, fall back to non-lazy.
*/
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
- flags &= ~IOU_F_TWQ_LAZY_WAKE;
+ tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
- guard(rcu)();
+ spin_lock_irqsave(&ctx->work_lock, flags);
+ wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+ nr_tw_prev = ctx->work_items++;
+ spin_unlock_irqrestore(&ctx->work_lock, flags);
- head = READ_ONCE(ctx->work_llist.first);
- do {
- nr_tw_prev = 0;
- if (head) {
- struct io_kiocb *first_req = container_of(head,
- struct io_kiocb,
- io_task_work.node);
- /*
- * Might be executed at any moment, rely on
- * SLAB_TYPESAFE_BY_RCU to keep it alive.
- */
- nr_tw_prev = READ_ONCE(first_req->nr_tw);
- }
-
- /*
- * Theoretically, it can overflow, but that's fine as one of
- * previous adds should've tried to wake the task.
- */
- nr_tw = nr_tw_prev + 1;
- if (!(flags & IOU_F_TWQ_LAZY_WAKE))
- nr_tw = IO_CQ_WAKE_FORCE;
-
- req->nr_tw = nr_tw;
- req->io_task_work.node.next = head;
- } while (!try_cmpxchg(&ctx->work_llist.first, &head,
- &req->io_task_work.node));
-
- /*
- * cmpxchg implies a full barrier, which pairs with the barrier
- * in set_current_state() on the io_cqring_wait() side. It's used
- * to ensure that either we see updated ->cq_wait_nr, or waiters
- * going to sleep will observe the work added to the list, which
- * is similar to the wait/wawke task state sync.
- */
+ nr_tw = nr_tw_prev + 1;
+ if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
+ nr_tw = IO_CQ_WAKE_FORCE;
- if (!head) {
+ if (!nr_tw_prev) {
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
if (ctx->has_evfd)
io_eventfd_signal(ctx);
}
+ /*
+ * We need a barrier after unlock, which pairs with the barrier
+ * in set_current_state() on the io_cqring_wait() side. It's used
+ * to ensure that either we see updated ->cq_wait_nr, or waiters
+ * going to sleep will observe the work added to the list, which
+ * is similar to the wait/wake task state sync.
+ */
+ smp_mb();
nr_wait = atomic_read(&ctx->cq_wait_nr);
/* not enough or no one is waiting */
if (nr_tw < nr_wait)
@@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
{
- struct llist_node *node = llist_del_all(&ctx->work_llist);
+ struct io_ring_ctx *last_ctx = NULL;
+ struct io_wq_work_node *node;
+ unsigned long flags;
- __io_fallback_tw(node, false);
- node = llist_del_all(&ctx->retry_llist);
- __io_fallback_tw(node, false);
+ spin_lock_irqsave(&ctx->work_lock, flags);
+ node = ctx->work_list.first;
+ INIT_WQ_LIST(&ctx->work_list);
+ ctx->work_items = 0;
+ spin_unlock_irqrestore(&ctx->work_lock, flags);
+
+ while (node) {
+ struct io_kiocb *req;
+
+ req = container_of(node, struct io_kiocb, io_task_work.work_node);
+ node = node->next;
+ __io_fallback_tw(req, false, &last_ctx);
+ }
+ if (last_ctx) {
+ flush_delayed_work(&last_ctx->fallback_work);
+ percpu_ref_put(&last_ctx->refs);
+ }
}
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1272,52 +1268,52 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
return false;
}
-static int __io_run_local_work_loop(struct llist_node **node,
- struct io_tw_state *ts,
- int events)
-{
- while (*node) {
- struct llist_node *next = (*node)->next;
- struct io_kiocb *req = container_of(*node, struct io_kiocb,
- io_task_work.node);
- INDIRECT_CALL_2(req->io_task_work.func,
- io_poll_task_func, io_req_rw_complete,
- req, ts);
- *node = next;
- if (--events <= 0)
- break;
- }
-
- return events;
-}
-
static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
int min_events)
{
- struct llist_node *node;
+ struct io_wq_work_node *node, *tail;
+ int ret, limit, nitems;
unsigned int loops = 0;
- int ret, limit;
if (WARN_ON_ONCE(ctx->submitter_task != current))
return -EEXIST;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+ ret = 0;
limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
again:
- ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
- if (ctx->retry_llist.first)
+ spin_lock_irq(&ctx->work_lock);
+ node = ctx->work_list.first;
+ tail = ctx->work_list.last;
+ nitems = ctx->work_items;
+ INIT_WQ_LIST(&ctx->work_list);
+ ctx->work_items = 0;
+ spin_unlock_irq(&ctx->work_lock);
+
+ while (node) {
+ struct io_kiocb *req = container_of(node, struct io_kiocb,
+ io_task_work.work_node);
+ node = node->next;
+ INDIRECT_CALL_2(req->io_task_work.func,
+ io_poll_task_func, io_req_rw_complete,
+ req, ts);
+ nitems--;
+ if (++ret >= limit)
+ break;
+ }
+
+ if (unlikely(node)) {
+ spin_lock_irq(&ctx->work_lock);
+ tail->next = ctx->work_list.first;
+ ctx->work_list.first = node;
+ if (!ctx->work_list.last)
+ ctx->work_list.last = tail;
+ ctx->work_items += nitems;
+ spin_unlock_irq(&ctx->work_lock);
goto retry_done;
+ }
- /*
- * llists are in reverse order, flip it back the right way before
- * running the pending items.
- */
- node = llist_reverse_order(llist_del_all(&ctx->work_llist));
- ret = __io_run_local_work_loop(&node, ts, ret);
- ctx->retry_llist.first = node;
loops++;
-
- ret = limit - ret;
if (io_run_local_work_continue(ctx, ret, min_events))
goto again;
retry_done:
@@ -2413,7 +2409,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
atomic_set(&ctx->cq_wait_nr, 1);
smp_mb();
- if (!llist_empty(&ctx->work_llist))
+ if (io_local_work_pending(ctx))
goto out_wake;
}
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 214f9f175102..2fae27803116 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -349,7 +349,7 @@ static inline int io_run_task_work(void)
static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
{
- return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
+ return READ_ONCE(ctx->work_list.first);
}
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
--
2.45.2
next prev parent reply other threads:[~2024-11-22 16:16 UTC|newest]
Thread overview: 12+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
2024-11-22 16:12 ` [PATCH 1/6] io_uring: make task_work pending check dependent on ring type Jens Axboe
2024-11-22 16:12 ` Jens Axboe [this message]
2024-11-22 17:07 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Pavel Begunkov
2024-11-22 17:11 ` Jens Axboe
2024-11-22 17:25 ` Pavel Begunkov
2024-11-22 17:44 ` Jens Axboe
2024-11-23 0:36 ` Pavel Begunkov
2024-11-22 16:12 ` [PATCH 3/6] io_uring/slist: add list-to-list list splice helper Jens Axboe
2024-11-22 16:12 ` [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list Jens Axboe
2024-11-22 16:12 ` [PATCH 5/6] io_uring: add __tctx_task_work_run() helper Jens Axboe
2024-11-22 16:12 ` [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list Jens Axboe
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox