From 3690e69ed6c7a4115acc56735dce4434b1105cc3 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 21 Nov 2024 09:18:19 -0700 Subject: [PATCH 2/2] io_uring: replace defer task_work llist with io_wq_work_list Add a spinlock for the list, and replace the lockless llist with the work list instead. This avoids needing to reverse items in the list before running them, as the io_wq_work_list is FIFO by nature whereas the llist is LIFO. Signed-off-by: Jens Axboe --- include/linux/io_uring_types.h | 12 +- io_uring/io_uring.c | 196 ++++++++++++++++----------------- io_uring/io_uring.h | 2 +- 3 files changed, 101 insertions(+), 109 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 011860ade268..03a102ca8105 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -335,8 +335,9 @@ struct io_ring_ctx { * regularly bounce b/w CPUs. */ struct { - struct llist_head work_llist; - struct llist_head retry_llist; + struct io_wq_work_list work_list; + spinlock_t work_lock; + int work_items; unsigned long check_cq; atomic_t cq_wait_nr; atomic_t cq_timeouts; @@ -566,7 +567,10 @@ enum { typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts); struct io_task_work { - struct llist_node node; + union { + struct io_wq_work_node work_node; + struct llist_node node; + }; io_req_tw_func_t func; }; @@ -622,8 +626,6 @@ struct io_kiocb { */ u16 buf_index; - unsigned nr_tw; - /* REQ_F_* flags */ io_req_flags_t flags; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index c3a7d0197636..07e63e532797 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -338,9 +338,10 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->io_buffers_comp); INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); - INIT_LIST_HEAD(&ctx->ltimeout_list); - init_llist_head(&ctx->work_llist); INIT_LIST_HEAD(&ctx->tctx_list); + INIT_LIST_HEAD(&ctx->ltimeout_list); + INIT_WQ_LIST(&ctx->work_list); + spin_lock_init(&ctx->work_lock); ctx->submit_state.free_list.next = NULL; INIT_HLIST_HEAD(&ctx->waitid_list); #ifdef CONFIG_FUTEX @@ -1066,25 +1067,32 @@ struct llist_node *io_handle_tw_list(struct llist_node *node, return node; } -static __cold void __io_fallback_tw(struct llist_node *node, bool sync) +static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync, + struct io_ring_ctx **last_ctx) +{ + if (sync && *last_ctx != req->ctx) { + if (*last_ctx) { + flush_delayed_work(&(*last_ctx)->fallback_work); + percpu_ref_put(&(*last_ctx)->refs); + } + *last_ctx = req->ctx; + percpu_ref_get(&(*last_ctx)->refs); + } + if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist)) + schedule_delayed_work(&req->ctx->fallback_work, 1); + +} + +static void io_fallback_tw(struct io_uring_task *tctx, bool sync) { + struct llist_node *node = llist_del_all(&tctx->task_list); struct io_ring_ctx *last_ctx = NULL; struct io_kiocb *req; while (node) { req = container_of(node, struct io_kiocb, io_task_work.node); node = node->next; - if (sync && last_ctx != req->ctx) { - if (last_ctx) { - flush_delayed_work(&last_ctx->fallback_work); - percpu_ref_put(&last_ctx->refs); - } - last_ctx = req->ctx; - percpu_ref_get(&last_ctx->refs); - } - if (llist_add(&req->io_task_work.node, - &req->ctx->fallback_llist)) - schedule_delayed_work(&req->ctx->fallback_work, 1); + __io_fallback_tw(req, sync, &last_ctx); } if (last_ctx) { @@ -1093,13 +1101,6 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync) } } -static void io_fallback_tw(struct io_uring_task *tctx, bool sync) -{ - struct llist_node *node = llist_del_all(&tctx->task_list); - - __io_fallback_tw(node, sync); -} - struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count) @@ -1139,73 +1140,45 @@ void tctx_task_work(struct callback_head *cb) static inline void io_req_local_work_add(struct io_kiocb *req, struct io_ring_ctx *ctx, - unsigned flags) + unsigned tw_flags) { - unsigned nr_wait, nr_tw, nr_tw_prev; - struct llist_node *head; + unsigned long flags; + unsigned nr_tw; /* See comment above IO_CQ_WAKE_INIT */ BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); /* - * We don't know how many reuqests is there in the link and whether - * they can even be queued lazily, fall back to non-lazy. + * We don't know how many requests are in the link and whether they can + * even be queued lazily, fall back to non-lazy. */ if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) - flags &= ~IOU_F_TWQ_LAZY_WAKE; + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; - guard(rcu)(); - - head = READ_ONCE(ctx->work_llist.first); - do { - nr_tw_prev = 0; - if (head) { - struct io_kiocb *first_req = container_of(head, - struct io_kiocb, - io_task_work.node); - /* - * Might be executed at any moment, rely on - * SLAB_TYPESAFE_BY_RCU to keep it alive. - */ - nr_tw_prev = READ_ONCE(first_req->nr_tw); - } - - /* - * Theoretically, it can overflow, but that's fine as one of - * previous adds should've tried to wake the task. - */ - nr_tw = nr_tw_prev + 1; - if (!(flags & IOU_F_TWQ_LAZY_WAKE)) - nr_tw = IO_CQ_WAKE_FORCE; - - req->nr_tw = nr_tw; - req->io_task_work.node.next = head; - } while (!try_cmpxchg(&ctx->work_llist.first, &head, - &req->io_task_work.node)); + spin_lock_irqsave(&ctx->work_lock, flags); + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list); + nr_tw = ++ctx->work_items; + if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE)) + nr_tw = IO_CQ_WAKE_FORCE; + spin_unlock_irqrestore(&ctx->work_lock, flags); /* - * cmpxchg implies a full barrier, which pairs with the barrier + * We need a barrier after unlock, which pairs with the barrier * in set_current_state() on the io_cqring_wait() side. It's used * to ensure that either we see updated ->cq_wait_nr, or waiters * going to sleep will observe the work added to the list, which * is similar to the wait/wawke task state sync. */ - - if (!head) { + smp_mb(); + if (nr_tw == 1) { if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); if (ctx->has_evfd) io_eventfd_signal(ctx); } - nr_wait = atomic_read(&ctx->cq_wait_nr); - /* not enough or no one is waiting */ - if (nr_tw < nr_wait) - return; - /* the previous add has already woken it up */ - if (nr_tw_prev >= nr_wait) - return; - wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); + if (nr_tw >= atomic_read(&ctx->cq_wait_nr)) + wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); } static void io_req_normal_work_add(struct io_kiocb *req) @@ -1253,11 +1226,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx, static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) { - struct llist_node *node = llist_del_all(&ctx->work_llist); + struct io_ring_ctx *last_ctx = NULL; + struct io_wq_work_node *node; + unsigned long flags; + + spin_lock_irqsave(&ctx->work_lock, flags); + node = ctx->work_list.first; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irqrestore(&ctx->work_lock, flags); + + while (node) { + struct io_kiocb *req; - __io_fallback_tw(node, false); - node = llist_del_all(&ctx->retry_llist); - __io_fallback_tw(node, false); + req = container_of(node, struct io_kiocb, io_task_work.work_node); + node = node->next; + __io_fallback_tw(req, false, &last_ctx); + } + if (last_ctx) { + flush_delayed_work(&last_ctx->fallback_work); + percpu_ref_put(&last_ctx->refs); + } } static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, @@ -1272,51 +1261,52 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, return false; } -static int __io_run_local_work_loop(struct llist_node **node, - struct io_tw_state *ts, - int events) -{ - while (*node) { - struct llist_node *next = (*node)->next; - struct io_kiocb *req = container_of(*node, struct io_kiocb, - io_task_work.node); - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - req, ts); - *node = next; - if (--events <= 0) - break; - } - - return events; -} - static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts, int min_events) { - struct llist_node *node; - unsigned int loops = 0; - int ret, limit; + struct io_wq_work_node *node, *tail; + unsigned int loops = 1; + int ret, limit, nitems; + unsigned long flags; if (WARN_ON_ONCE(ctx->submitter_task != current)) return -EEXIST; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events); + again: - ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit); - if (ctx->retry_llist.first) + spin_lock_irqsave(&ctx->work_lock, flags); + node = ctx->work_list.first; + tail = ctx->work_list.last; + nitems = ctx->work_items; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irqrestore(&ctx->work_lock, flags); + + while (node) { + struct io_kiocb *req = container_of(node, struct io_kiocb, + io_task_work.work_node); + node = node->next; + INDIRECT_CALL_2(req->io_task_work.func, + io_poll_task_func, io_req_rw_complete, + req, ts); + if (++ret >= limit) + break; + } + + if (unlikely(node)) { + spin_lock_irqsave(&ctx->work_lock, flags); + tail->next = ctx->work_list.first; + ctx->work_list.first = node; + if (!ctx->work_list.last) + ctx->work_list.last = tail; + ctx->work_items += nitems - ret; + spin_unlock_irqrestore(&ctx->work_lock, flags); goto retry_done; + } - /* - * llists are in reverse order, flip it back the right way before - * running the pending items. - */ - node = llist_reverse_order(llist_del_all(&ctx->work_llist)); - ret = __io_run_local_work_loop(&node, ts, ret); - ctx->retry_llist.first = node; loops++; - ret = limit - ret; if (io_run_local_work_continue(ctx, ret, min_events)) goto again; @@ -2413,7 +2403,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { atomic_set(&ctx->cq_wait_nr, 1); smp_mb(); - if (!llist_empty(&ctx->work_llist)) + if (io_local_work_pending(ctx)) goto out_wake; } diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 214f9f175102..2fae27803116 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -349,7 +349,7 @@ static inline int io_run_task_work(void) static inline bool io_local_work_pending(struct io_ring_ctx *ctx) { - return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist); + return READ_ONCE(ctx->work_list.first); } static inline bool io_task_work_pending(struct io_ring_ctx *ctx) -- 2.45.2