* [PATCH 1/6] io_uring: make task_work pending check dependent on ring type
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
2024-11-22 16:12 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Jens Axboe
` (4 subsequent siblings)
5 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe
There's no need to check for generic task_work for DEFER_TASKRUN, if we
have local task_work pending. This avoids dipping into the huge
task_struct, if we have normal task_work pending.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/io_uring.h | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 12abee607e4a..214f9f175102 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -354,7 +354,9 @@ static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
{
- return task_work_pending(current) || io_local_work_pending(ctx);
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN && io_local_work_pending(ctx))
+ return true;
+ return task_work_pending(current);
}
static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts)
--
2.45.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
2024-11-22 16:12 ` [PATCH 1/6] io_uring: make task_work pending check dependent on ring type Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
2024-11-22 17:07 ` Pavel Begunkov
2024-11-22 16:12 ` [PATCH 3/6] io_uring/slist: add list-to-list list splice helper Jens Axboe
` (3 subsequent siblings)
5 siblings, 1 reply; 12+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe
Add a spinlock for the list, and replace the lockless llist with the
work list instead. This avoids needing to reverse items in the list
before running them, as the io_wq_work_list is FIFO by nature whereas
the llist is LIFO.
Signed-off-by: Jens Axboe <[email protected]>
---
include/linux/io_uring_types.h | 13 ++-
io_uring/io_uring.c | 194 ++++++++++++++++-----------------
io_uring/io_uring.h | 2 +-
3 files changed, 104 insertions(+), 105 deletions(-)
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 011860ade268..e9ba99cb0ed0 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -335,8 +335,9 @@ struct io_ring_ctx {
* regularly bounce b/w CPUs.
*/
struct {
- struct llist_head work_llist;
- struct llist_head retry_llist;
+ struct io_wq_work_list work_list;
+ spinlock_t work_lock;
+ int work_items;
unsigned long check_cq;
atomic_t cq_wait_nr;
atomic_t cq_timeouts;
@@ -566,7 +567,11 @@ enum {
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
struct io_task_work {
- struct llist_node node;
+ /* DEFER_TASKRUN uses work_node, regular task_work node */
+ union {
+ struct io_wq_work_node work_node;
+ struct llist_node node;
+ };
io_req_tw_func_t func;
};
@@ -622,8 +627,6 @@ struct io_kiocb {
*/
u16 buf_index;
- unsigned nr_tw;
-
/* REQ_F_* flags */
io_req_flags_t flags;
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index c3a7d0197636..b7eb962e9872 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -339,7 +339,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
- init_llist_head(&ctx->work_llist);
+ INIT_WQ_LIST(&ctx->work_list);
+ spin_lock_init(&ctx->work_lock);
INIT_LIST_HEAD(&ctx->tctx_list);
ctx->submit_state.free_list.next = NULL;
INIT_HLIST_HEAD(&ctx->waitid_list);
@@ -1066,25 +1067,31 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
return node;
}
-static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
+static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync,
+ struct io_ring_ctx **last_ctx)
{
+ if (sync && *last_ctx != req->ctx) {
+ if (*last_ctx) {
+ flush_delayed_work(&(*last_ctx)->fallback_work);
+ percpu_ref_put(&(*last_ctx)->refs);
+ }
+ *last_ctx = req->ctx;
+ percpu_ref_get(&(*last_ctx)->refs);
+ }
+ if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist))
+ schedule_delayed_work(&req->ctx->fallback_work, 1);
+}
+
+static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+{
+ struct llist_node *node = llist_del_all(&tctx->task_list);
struct io_ring_ctx *last_ctx = NULL;
struct io_kiocb *req;
while (node) {
req = container_of(node, struct io_kiocb, io_task_work.node);
node = node->next;
- if (sync && last_ctx != req->ctx) {
- if (last_ctx) {
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
- last_ctx = req->ctx;
- percpu_ref_get(&last_ctx->refs);
- }
- if (llist_add(&req->io_task_work.node,
- &req->ctx->fallback_llist))
- schedule_delayed_work(&req->ctx->fallback_work, 1);
+ __io_fallback_tw(req, sync, &last_ctx);
}
if (last_ctx) {
@@ -1093,13 +1100,6 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
}
}
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
-{
- struct llist_node *node = llist_del_all(&tctx->task_list);
-
- __io_fallback_tw(node, sync);
-}
-
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
unsigned int max_entries,
unsigned int *count)
@@ -1139,65 +1139,45 @@ void tctx_task_work(struct callback_head *cb)
static inline void io_req_local_work_add(struct io_kiocb *req,
struct io_ring_ctx *ctx,
- unsigned flags)
+ unsigned tw_flags)
{
- unsigned nr_wait, nr_tw, nr_tw_prev;
- struct llist_node *head;
+ unsigned nr_tw, nr_tw_prev, nr_wait;
+ unsigned long flags;
/* See comment above IO_CQ_WAKE_INIT */
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
/*
- * We don't know how many reuqests is there in the link and whether
- * they can even be queued lazily, fall back to non-lazy.
+ * We don't know how many requests are in the link and whether they can
+ * even be queued lazily, fall back to non-lazy.
*/
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
- flags &= ~IOU_F_TWQ_LAZY_WAKE;
+ tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
- guard(rcu)();
+ spin_lock_irqsave(&ctx->work_lock, flags);
+ wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+ nr_tw_prev = ctx->work_items++;
+ spin_unlock_irqrestore(&ctx->work_lock, flags);
- head = READ_ONCE(ctx->work_llist.first);
- do {
- nr_tw_prev = 0;
- if (head) {
- struct io_kiocb *first_req = container_of(head,
- struct io_kiocb,
- io_task_work.node);
- /*
- * Might be executed at any moment, rely on
- * SLAB_TYPESAFE_BY_RCU to keep it alive.
- */
- nr_tw_prev = READ_ONCE(first_req->nr_tw);
- }
-
- /*
- * Theoretically, it can overflow, but that's fine as one of
- * previous adds should've tried to wake the task.
- */
- nr_tw = nr_tw_prev + 1;
- if (!(flags & IOU_F_TWQ_LAZY_WAKE))
- nr_tw = IO_CQ_WAKE_FORCE;
-
- req->nr_tw = nr_tw;
- req->io_task_work.node.next = head;
- } while (!try_cmpxchg(&ctx->work_llist.first, &head,
- &req->io_task_work.node));
-
- /*
- * cmpxchg implies a full barrier, which pairs with the barrier
- * in set_current_state() on the io_cqring_wait() side. It's used
- * to ensure that either we see updated ->cq_wait_nr, or waiters
- * going to sleep will observe the work added to the list, which
- * is similar to the wait/wawke task state sync.
- */
+ nr_tw = nr_tw_prev + 1;
+ if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
+ nr_tw = IO_CQ_WAKE_FORCE;
- if (!head) {
+ if (!nr_tw_prev) {
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
if (ctx->has_evfd)
io_eventfd_signal(ctx);
}
+ /*
+ * We need a barrier after unlock, which pairs with the barrier
+ * in set_current_state() on the io_cqring_wait() side. It's used
+ * to ensure that either we see updated ->cq_wait_nr, or waiters
+ * going to sleep will observe the work added to the list, which
+ * is similar to the wait/wake task state sync.
+ */
+ smp_mb();
nr_wait = atomic_read(&ctx->cq_wait_nr);
/* not enough or no one is waiting */
if (nr_tw < nr_wait)
@@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
{
- struct llist_node *node = llist_del_all(&ctx->work_llist);
+ struct io_ring_ctx *last_ctx = NULL;
+ struct io_wq_work_node *node;
+ unsigned long flags;
- __io_fallback_tw(node, false);
- node = llist_del_all(&ctx->retry_llist);
- __io_fallback_tw(node, false);
+ spin_lock_irqsave(&ctx->work_lock, flags);
+ node = ctx->work_list.first;
+ INIT_WQ_LIST(&ctx->work_list);
+ ctx->work_items = 0;
+ spin_unlock_irqrestore(&ctx->work_lock, flags);
+
+ while (node) {
+ struct io_kiocb *req;
+
+ req = container_of(node, struct io_kiocb, io_task_work.work_node);
+ node = node->next;
+ __io_fallback_tw(req, false, &last_ctx);
+ }
+ if (last_ctx) {
+ flush_delayed_work(&last_ctx->fallback_work);
+ percpu_ref_put(&last_ctx->refs);
+ }
}
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1272,52 +1268,52 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
return false;
}
-static int __io_run_local_work_loop(struct llist_node **node,
- struct io_tw_state *ts,
- int events)
-{
- while (*node) {
- struct llist_node *next = (*node)->next;
- struct io_kiocb *req = container_of(*node, struct io_kiocb,
- io_task_work.node);
- INDIRECT_CALL_2(req->io_task_work.func,
- io_poll_task_func, io_req_rw_complete,
- req, ts);
- *node = next;
- if (--events <= 0)
- break;
- }
-
- return events;
-}
-
static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
int min_events)
{
- struct llist_node *node;
+ struct io_wq_work_node *node, *tail;
+ int ret, limit, nitems;
unsigned int loops = 0;
- int ret, limit;
if (WARN_ON_ONCE(ctx->submitter_task != current))
return -EEXIST;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+ ret = 0;
limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
again:
- ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
- if (ctx->retry_llist.first)
+ spin_lock_irq(&ctx->work_lock);
+ node = ctx->work_list.first;
+ tail = ctx->work_list.last;
+ nitems = ctx->work_items;
+ INIT_WQ_LIST(&ctx->work_list);
+ ctx->work_items = 0;
+ spin_unlock_irq(&ctx->work_lock);
+
+ while (node) {
+ struct io_kiocb *req = container_of(node, struct io_kiocb,
+ io_task_work.work_node);
+ node = node->next;
+ INDIRECT_CALL_2(req->io_task_work.func,
+ io_poll_task_func, io_req_rw_complete,
+ req, ts);
+ nitems--;
+ if (++ret >= limit)
+ break;
+ }
+
+ if (unlikely(node)) {
+ spin_lock_irq(&ctx->work_lock);
+ tail->next = ctx->work_list.first;
+ ctx->work_list.first = node;
+ if (!ctx->work_list.last)
+ ctx->work_list.last = tail;
+ ctx->work_items += nitems;
+ spin_unlock_irq(&ctx->work_lock);
goto retry_done;
+ }
- /*
- * llists are in reverse order, flip it back the right way before
- * running the pending items.
- */
- node = llist_reverse_order(llist_del_all(&ctx->work_llist));
- ret = __io_run_local_work_loop(&node, ts, ret);
- ctx->retry_llist.first = node;
loops++;
-
- ret = limit - ret;
if (io_run_local_work_continue(ctx, ret, min_events))
goto again;
retry_done:
@@ -2413,7 +2409,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
atomic_set(&ctx->cq_wait_nr, 1);
smp_mb();
- if (!llist_empty(&ctx->work_llist))
+ if (io_local_work_pending(ctx))
goto out_wake;
}
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 214f9f175102..2fae27803116 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -349,7 +349,7 @@ static inline int io_run_task_work(void)
static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
{
- return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
+ return READ_ONCE(ctx->work_list.first);
}
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
--
2.45.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
2024-11-22 16:12 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Jens Axboe
@ 2024-11-22 17:07 ` Pavel Begunkov
2024-11-22 17:11 ` Jens Axboe
0 siblings, 1 reply; 12+ messages in thread
From: Pavel Begunkov @ 2024-11-22 17:07 UTC (permalink / raw)
To: Jens Axboe, io-uring
On 11/22/24 16:12, Jens Axboe wrote:
...
> static inline void io_req_local_work_add(struct io_kiocb *req,
> struct io_ring_ctx *ctx,
> - unsigned flags)
> + unsigned tw_flags)
> {
> - unsigned nr_wait, nr_tw, nr_tw_prev;
> - struct llist_node *head;
> + unsigned nr_tw, nr_tw_prev, nr_wait;
> + unsigned long flags;
>
> /* See comment above IO_CQ_WAKE_INIT */
> BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>
> /*
> - * We don't know how many reuqests is there in the link and whether
> - * they can even be queued lazily, fall back to non-lazy.
> + * We don't know how many requests are in the link and whether they can
> + * even be queued lazily, fall back to non-lazy.
> */
> if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
> - flags &= ~IOU_F_TWQ_LAZY_WAKE;
> + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>
> - guard(rcu)();
protects against ctx->task deallocation, see a comment in
io_ring_exit_work() -> synchronize_rcu()
> + spin_lock_irqsave(&ctx->work_lock, flags);
> + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
> + nr_tw_prev = ctx->work_items++;
Is there a good reason why it changes the semantics of
what's stored across adds? It was assigning a corrected
nr_tw, this one will start heavily spamming with wake_up()
in some cases.
> + spin_unlock_irqrestore(&ctx->work_lock, flags);
>
> - head = READ_ONCE(ctx->work_llist.first);
> - do {
> - nr_tw_prev = 0;
> - if (head) {
> - struct io_kiocb *first_req = container_of(head,
> - struct io_kiocb,
> - io_task_work.node);
> - /*
> - * Might be executed at any moment, rely on
> - * SLAB_TYPESAFE_BY_RCU to keep it alive.
> - */
> - nr_tw_prev = READ_ONCE(first_req->nr_tw);
> - }
> -
> - /*
> - * Theoretically, it can overflow, but that's fine as one of
> - * previous adds should've tried to wake the task.
> - */
> - nr_tw = nr_tw_prev + 1;
> - if (!(flags & IOU_F_TWQ_LAZY_WAKE))
> - nr_tw = IO_CQ_WAKE_FORCE;
> -
> - req->nr_tw = nr_tw;
> - req->io_task_work.node.next = head;
> - } while (!try_cmpxchg(&ctx->work_llist.first, &head,
> - &req->io_task_work.node));
> -
> - /*
> - * cmpxchg implies a full barrier, which pairs with the barrier
> - * in set_current_state() on the io_cqring_wait() side. It's used
> - * to ensure that either we see updated ->cq_wait_nr, or waiters
> - * going to sleep will observe the work added to the list, which
> - * is similar to the wait/wawke task state sync.
> - */
> + nr_tw = nr_tw_prev + 1;
> + if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
> + nr_tw = IO_CQ_WAKE_FORCE;
>
> - if (!head) {
> + if (!nr_tw_prev) {
> if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
> atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
> if (ctx->has_evfd)
> io_eventfd_signal(ctx);
> }
>
> + /*
> + * We need a barrier after unlock, which pairs with the barrier
> + * in set_current_state() on the io_cqring_wait() side. It's used
> + * to ensure that either we see updated ->cq_wait_nr, or waiters
> + * going to sleep will observe the work added to the list, which
> + * is similar to the wait/wake task state sync.
> + */
> + smp_mb();
> nr_wait = atomic_read(&ctx->cq_wait_nr);
> /* not enough or no one is waiting */
> if (nr_tw < nr_wait)
> @@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
2024-11-22 17:07 ` Pavel Begunkov
@ 2024-11-22 17:11 ` Jens Axboe
2024-11-22 17:25 ` Pavel Begunkov
0 siblings, 1 reply; 12+ messages in thread
From: Jens Axboe @ 2024-11-22 17:11 UTC (permalink / raw)
To: Pavel Begunkov, io-uring
On 11/22/24 10:07 AM, Pavel Begunkov wrote:
> On 11/22/24 16:12, Jens Axboe wrote:
> ...
>> static inline void io_req_local_work_add(struct io_kiocb *req,
>> struct io_ring_ctx *ctx,
>> - unsigned flags)
>> + unsigned tw_flags)
>> {
>> - unsigned nr_wait, nr_tw, nr_tw_prev;
>> - struct llist_node *head;
>> + unsigned nr_tw, nr_tw_prev, nr_wait;
>> + unsigned long flags;
>> /* See comment above IO_CQ_WAKE_INIT */
>> BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>> /*
>> - * We don't know how many reuqests is there in the link and whether
>> - * they can even be queued lazily, fall back to non-lazy.
>> + * We don't know how many requests are in the link and whether they can
>> + * even be queued lazily, fall back to non-lazy.
>> */
>> if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>> - flags &= ~IOU_F_TWQ_LAZY_WAKE;
>> + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>> - guard(rcu)();
>
> protects against ctx->task deallocation, see a comment in
> io_ring_exit_work() -> synchronize_rcu()
Yeah that's just an editing mistake.
>> + spin_lock_irqsave(&ctx->work_lock, flags);
>> + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>> + nr_tw_prev = ctx->work_items++;
>
> Is there a good reason why it changes the semantics of
> what's stored across adds? It was assigning a corrected
> nr_tw, this one will start heavily spamming with wake_up()
> in some cases.
Not sure I follow, how so? nr_tw_prev will be the previous count, just
like before. Except we won't need to dig into the list to find it, we
have it readily available. nr_tw will be the current code, or force wake
if needed. As before.
--
Jens Axboe
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
2024-11-22 17:11 ` Jens Axboe
@ 2024-11-22 17:25 ` Pavel Begunkov
2024-11-22 17:44 ` Jens Axboe
0 siblings, 1 reply; 12+ messages in thread
From: Pavel Begunkov @ 2024-11-22 17:25 UTC (permalink / raw)
To: Jens Axboe, io-uring
On 11/22/24 17:11, Jens Axboe wrote:
> On 11/22/24 10:07 AM, Pavel Begunkov wrote:
>> On 11/22/24 16:12, Jens Axboe wrote:
>> ...
>>> static inline void io_req_local_work_add(struct io_kiocb *req,
>>> struct io_ring_ctx *ctx,
>>> - unsigned flags)
>>> + unsigned tw_flags)
>>> {
>>> - unsigned nr_wait, nr_tw, nr_tw_prev;
>>> - struct llist_node *head;
>>> + unsigned nr_tw, nr_tw_prev, nr_wait;
>>> + unsigned long flags;
>>> /* See comment above IO_CQ_WAKE_INIT */
>>> BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>> /*
>>> - * We don't know how many reuqests is there in the link and whether
>>> - * they can even be queued lazily, fall back to non-lazy.
>>> + * We don't know how many requests are in the link and whether they can
>>> + * even be queued lazily, fall back to non-lazy.
>>> */
>>> if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>>> - flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>> + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>> - guard(rcu)();
>>
>> protects against ctx->task deallocation, see a comment in
>> io_ring_exit_work() -> synchronize_rcu()
>
> Yeah that's just an editing mistake.
>
>>> + spin_lock_irqsave(&ctx->work_lock, flags);
>>> + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>>> + nr_tw_prev = ctx->work_items++;
>>
>> Is there a good reason why it changes the semantics of
>> what's stored across adds? It was assigning a corrected
>> nr_tw, this one will start heavily spamming with wake_up()
>> in some cases.
>
> Not sure I follow, how so? nr_tw_prev will be the previous count, just
> like before. Except we won't need to dig into the list to find it, we
> have it readily available. nr_tw will be the current code, or force wake
> if needed. As before.
The problem is what it stores, not how and where. Before req->nr_tw
could've been set to IO_CQ_WAKE_FORCE, in which case following
requests are not going to attempt waking up the task, now work_items
is just a counter.
Let's say you've got a bunch of non-lazy adds coming close to each
other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and
others just queue themselves in the list. Now, every single one
of them will try to wake_up() as long as ->cq_wait_nr is large
enough.
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
2024-11-22 17:25 ` Pavel Begunkov
@ 2024-11-22 17:44 ` Jens Axboe
2024-11-23 0:36 ` Pavel Begunkov
0 siblings, 1 reply; 12+ messages in thread
From: Jens Axboe @ 2024-11-22 17:44 UTC (permalink / raw)
To: Pavel Begunkov, io-uring
On 11/22/24 10:25 AM, Pavel Begunkov wrote:
> On 11/22/24 17:11, Jens Axboe wrote:
>> On 11/22/24 10:07 AM, Pavel Begunkov wrote:
>>> On 11/22/24 16:12, Jens Axboe wrote:
>>> ...
>>>> static inline void io_req_local_work_add(struct io_kiocb *req,
>>>> struct io_ring_ctx *ctx,
>>>> - unsigned flags)
>>>> + unsigned tw_flags)
>>>> {
>>>> - unsigned nr_wait, nr_tw, nr_tw_prev;
>>>> - struct llist_node *head;
>>>> + unsigned nr_tw, nr_tw_prev, nr_wait;
>>>> + unsigned long flags;
>>>> /* See comment above IO_CQ_WAKE_INIT */
>>>> BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>>> /*
>>>> - * We don't know how many reuqests is there in the link and whether
>>>> - * they can even be queued lazily, fall back to non-lazy.
>>>> + * We don't know how many requests are in the link and whether they can
>>>> + * even be queued lazily, fall back to non-lazy.
>>>> */
>>>> if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>>>> - flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>> + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>> - guard(rcu)();
>>>
>>> protects against ctx->task deallocation, see a comment in
>>> io_ring_exit_work() -> synchronize_rcu()
>>
>> Yeah that's just an editing mistake.
>>
>>>> + spin_lock_irqsave(&ctx->work_lock, flags);
>>>> + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>>>> + nr_tw_prev = ctx->work_items++;
>>>
>>> Is there a good reason why it changes the semantics of
>>> what's stored across adds? It was assigning a corrected
>>> nr_tw, this one will start heavily spamming with wake_up()
>>> in some cases.
>>
>> Not sure I follow, how so? nr_tw_prev will be the previous count, just
>> like before. Except we won't need to dig into the list to find it, we
>> have it readily available. nr_tw will be the current code, or force wake
>> if needed. As before.
>
> The problem is what it stores, not how and where. Before req->nr_tw
> could've been set to IO_CQ_WAKE_FORCE, in which case following
> requests are not going to attempt waking up the task, now work_items
> is just a counter.
>
> Let's say you've got a bunch of non-lazy adds coming close to each
> other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and
> others just queue themselves in the list. Now, every single one
> of them will try to wake_up() as long as ->cq_wait_nr is large
> enough.
If we really care about the non-lazy path as much, we can just use the
same storing scheme as we did in req->nr_tw, except in ->work_items
instead. Not a big deal imho.
--
Jens Axboe
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list
2024-11-22 17:44 ` Jens Axboe
@ 2024-11-23 0:36 ` Pavel Begunkov
0 siblings, 0 replies; 12+ messages in thread
From: Pavel Begunkov @ 2024-11-23 0:36 UTC (permalink / raw)
To: Jens Axboe, io-uring
On 11/22/24 17:44, Jens Axboe wrote:
> On 11/22/24 10:25 AM, Pavel Begunkov wrote:
>> On 11/22/24 17:11, Jens Axboe wrote:
>>> On 11/22/24 10:07 AM, Pavel Begunkov wrote:
>>>> On 11/22/24 16:12, Jens Axboe wrote:
>>>> ...
>>>>> static inline void io_req_local_work_add(struct io_kiocb *req,
>>>>> struct io_ring_ctx *ctx,
>>>>> - unsigned flags)
>>>>> + unsigned tw_flags)
>>>>> {
>>>>> - unsigned nr_wait, nr_tw, nr_tw_prev;
>>>>> - struct llist_node *head;
>>>>> + unsigned nr_tw, nr_tw_prev, nr_wait;
>>>>> + unsigned long flags;
>>>>> /* See comment above IO_CQ_WAKE_INIT */
>>>>> BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
>>>>> /*
>>>>> - * We don't know how many reuqests is there in the link and whether
>>>>> - * they can even be queued lazily, fall back to non-lazy.
>>>>> + * We don't know how many requests are in the link and whether they can
>>>>> + * even be queued lazily, fall back to non-lazy.
>>>>> */
>>>>> if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
>>>>> - flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>>> + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
>>>>> - guard(rcu)();
>>>>
>>>> protects against ctx->task deallocation, see a comment in
>>>> io_ring_exit_work() -> synchronize_rcu()
>>>
>>> Yeah that's just an editing mistake.
>>>
>>>>> + spin_lock_irqsave(&ctx->work_lock, flags);
>>>>> + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
>>>>> + nr_tw_prev = ctx->work_items++;
>>>>
>>>> Is there a good reason why it changes the semantics of
>>>> what's stored across adds? It was assigning a corrected
>>>> nr_tw, this one will start heavily spamming with wake_up()
>>>> in some cases.
>>>
>>> Not sure I follow, how so? nr_tw_prev will be the previous count, just
>>> like before. Except we won't need to dig into the list to find it, we
>>> have it readily available. nr_tw will be the current code, or force wake
>>> if needed. As before.
>>
>> The problem is what it stores, not how and where. Before req->nr_tw
>> could've been set to IO_CQ_WAKE_FORCE, in which case following
>> requests are not going to attempt waking up the task, now work_items
>> is just a counter.
>>
>> Let's say you've got a bunch of non-lazy adds coming close to each
>> other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and
>> others just queue themselves in the list. Now, every single one
>> of them will try to wake_up() as long as ->cq_wait_nr is large
>> enough.
>
> If we really care about the non-lazy path as much, we can just use the
Well, it's all linked requests, some of sendzc notif until
I optimise it, maybe something else?
> same storing scheme as we did in req->nr_tw, except in ->work_items
> instead. Not a big deal imho.
Yes please. It wouldn't be great sneaking them in the same
commit either way.
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 12+ messages in thread
* [PATCH 3/6] io_uring/slist: add list-to-list list splice helper
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
2024-11-22 16:12 ` [PATCH 1/6] io_uring: make task_work pending check dependent on ring type Jens Axboe
2024-11-22 16:12 ` [PATCH 2/6] io_uring: replace defer task_work llist with io_wq_work_list Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
2024-11-22 16:12 ` [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list Jens Axboe
` (2 subsequent siblings)
5 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe
Add a helper to splice a source list to a destination list.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/slist.h | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
diff --git a/io_uring/slist.h b/io_uring/slist.h
index 0eb194817242..7ac7c136b702 100644
--- a/io_uring/slist.h
+++ b/io_uring/slist.h
@@ -85,6 +85,22 @@ static inline bool wq_list_splice(struct io_wq_work_list *list,
return false;
}
+static inline bool wq_list_splice_list(struct io_wq_work_list *src,
+ struct io_wq_work_list *dst)
+{
+ bool ret = false;
+
+ if (wq_list_empty(dst)) {
+ *dst = *src;
+ } else {
+ dst->last->next = src->first;
+ dst->last = src->last;
+ ret = true;
+ }
+ INIT_WQ_LIST(src);
+ return false;
+}
+
static inline void wq_stack_add_head(struct io_wq_work_node *node,
struct io_wq_work_node *stack)
{
--
2.45.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
` (2 preceding siblings ...)
2024-11-22 16:12 ` [PATCH 3/6] io_uring/slist: add list-to-list list splice helper Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
2024-11-22 16:12 ` [PATCH 5/6] io_uring: add __tctx_task_work_run() helper Jens Axboe
2024-11-22 16:12 ` [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list Jens Axboe
5 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe
Switch the normal task_work to io_wq_work_list as well, to both unify
with defer task_work, but also to avoid needing to reverse the ordering
of the list when running it.
Note that this still keeps the manual retry list for SQPOLL task_work.
That could go away as well, as now the task_work list is fully ordered
and SQPOLL could just leave entries on there when it chops up the
running of the list.
Signed-off-by: Jens Axboe <[email protected]>
---
include/linux/io_uring_types.h | 14 ++-
io_uring/io_uring.c | 167 ++++++++++++++++++++-------------
io_uring/io_uring.h | 6 +-
io_uring/sqpoll.c | 8 +-
io_uring/tctx.c | 3 +-
5 files changed, 116 insertions(+), 82 deletions(-)
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index e9ba99cb0ed0..7ddac4d1d4b3 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -102,7 +102,8 @@ struct io_uring_task {
struct percpu_counter inflight;
struct { /* task_work */
- struct llist_head task_list;
+ struct io_wq_work_list task_list;
+ spinlock_t task_lock;
struct callback_head task_work;
} ____cacheline_aligned_in_smp;
};
@@ -390,8 +391,9 @@ struct io_ring_ctx {
struct mm_struct *mm_account;
/* ctx exit and cancelation */
- struct llist_head fallback_llist;
- struct delayed_work fallback_work;
+ struct io_wq_work_list fallback_list;
+ spinlock_t fallback_lock;
+ struct work_struct fallback_work;
struct work_struct exit_work;
struct list_head tctx_list;
struct completion ref_comp;
@@ -567,11 +569,7 @@ enum {
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
struct io_task_work {
- /* DEFER_TASKRUN uses work_node, regular task_work node */
- union {
- struct io_wq_work_node work_node;
- struct llist_node node;
- };
+ struct io_wq_work_node node;
io_req_tw_func_t func;
};
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b7eb962e9872..3bb93c77ac3f 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -245,15 +245,26 @@ static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
static __cold void io_fallback_req_func(struct work_struct *work)
{
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
- fallback_work.work);
- struct llist_node *node = llist_del_all(&ctx->fallback_llist);
- struct io_kiocb *req, *tmp;
+ fallback_work);
+ struct io_wq_work_node *node;
struct io_tw_state ts = {};
+ struct io_wq_work_list list;
+
+ spin_lock_irq(&ctx->fallback_lock);
+ list = ctx->fallback_list;
+ INIT_WQ_LIST(&ctx->fallback_list);
+ spin_unlock_irq(&ctx->fallback_lock);
percpu_ref_get(&ctx->refs);
mutex_lock(&ctx->uring_lock);
- llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
+ node = list.first;
+ while (node) {
+ struct io_kiocb *req;
+
+ req = container_of(node, struct io_kiocb, io_task_work.node);
+ node = node->next;
req->io_task_work.func(req, &ts);
+ }
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
percpu_ref_put(&ctx->refs);
@@ -347,7 +358,9 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
#ifdef CONFIG_FUTEX
INIT_HLIST_HEAD(&ctx->futex_list);
#endif
- INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
+ INIT_WORK(&ctx->fallback_work, io_fallback_req_func);
+ INIT_WQ_LIST(&ctx->fallback_list);
+ spin_lock_init(&ctx->fallback_lock);
INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
io_napi_init(ctx);
@@ -1033,15 +1046,15 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
* If more entries than max_entries are available, stop processing once this
* is reached and return the rest of the list.
*/
-struct llist_node *io_handle_tw_list(struct llist_node *node,
- unsigned int *count,
- unsigned int max_entries)
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
+ unsigned int *count,
+ unsigned int max_entries)
{
struct io_ring_ctx *ctx = NULL;
struct io_tw_state ts = { };
do {
- struct llist_node *next = node->next;
+ struct io_wq_work_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
@@ -1067,55 +1080,84 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
return node;
}
-static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync,
- struct io_ring_ctx **last_ctx)
+static __cold void __io_fallback_schedule(struct io_ring_ctx *ctx,
+ struct io_wq_work_list *list,
+ bool sync)
{
- if (sync && *last_ctx != req->ctx) {
- if (*last_ctx) {
- flush_delayed_work(&(*last_ctx)->fallback_work);
- percpu_ref_put(&(*last_ctx)->refs);
- }
- *last_ctx = req->ctx;
- percpu_ref_get(&(*last_ctx)->refs);
- }
- if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist))
- schedule_delayed_work(&req->ctx->fallback_work, 1);
+ bool kick_work = true;
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->fallback_lock, flags);
+ kick_work = !wq_list_splice_list(list, &ctx->fallback_list);
+ spin_unlock_irqrestore(&ctx->fallback_lock, flags);
+ if (kick_work)
+ schedule_work(&ctx->fallback_work);
+
+ if (sync)
+ flush_work(&ctx->fallback_work);
+ percpu_ref_put(&ctx->refs);
}
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+static void __io_fallback_tw(struct io_wq_work_list *list, spinlock_t *lock,
+ bool sync)
{
- struct llist_node *node = llist_del_all(&tctx->task_list);
+ struct io_wq_work_list local_list, ctx_list;
struct io_ring_ctx *last_ctx = NULL;
+ struct io_wq_work_node *node;
struct io_kiocb *req;
+ unsigned long flags;
+
+ spin_lock_irqsave(lock, flags);
+ local_list = *list;
+ INIT_WQ_LIST(list);
+ spin_unlock_irqrestore(lock, flags);
+ INIT_WQ_LIST(&ctx_list);
+ node = local_list.first;
while (node) {
+ struct io_wq_work_node *next = node->next;
+
req = container_of(node, struct io_kiocb, io_task_work.node);
- node = node->next;
- __io_fallback_tw(req, sync, &last_ctx);
+ if (last_ctx != req->ctx) {
+ if (last_ctx)
+ __io_fallback_schedule(last_ctx, &ctx_list, sync);
+ last_ctx = req->ctx;
+ percpu_ref_get(&last_ctx->refs);
+ }
+ wq_list_add_tail(node, &ctx_list);
+ node = next;
}
- if (last_ctx) {
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
+ if (last_ctx)
+ __io_fallback_schedule(last_ctx, &ctx_list, sync);
+}
+
+static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+{
+ __io_fallback_tw(&tctx->task_list, &tctx->task_lock, sync);
}
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+ unsigned int max_entries,
+ unsigned int *count)
{
- struct llist_node *node;
+ struct io_wq_work_node *node;
if (unlikely(current->flags & PF_EXITING)) {
io_fallback_tw(tctx, true);
return NULL;
}
- node = llist_del_all(&tctx->task_list);
- if (node) {
- node = llist_reverse_order(node);
+ if (!READ_ONCE(tctx->task_list.first))
+ return NULL;
+
+ spin_lock_irq(&tctx->task_lock);
+ node = tctx->task_list.first;
+ INIT_WQ_LIST(&tctx->task_list);
+ spin_unlock_irq(&tctx->task_lock);
+
+ if (node)
node = io_handle_tw_list(node, count, max_entries);
- }
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
@@ -1128,13 +1170,11 @@ struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
void tctx_task_work(struct callback_head *cb)
{
struct io_uring_task *tctx;
- struct llist_node *ret;
unsigned int count = 0;
tctx = container_of(cb, struct io_uring_task, task_work);
- ret = tctx_task_work_run(tctx, UINT_MAX, &count);
- /* can't happen */
- WARN_ON_ONCE(ret);
+ if (tctx_task_work_run(tctx, UINT_MAX, &count))
+ WARN_ON_ONCE(1);
}
static inline void io_req_local_work_add(struct io_kiocb *req,
@@ -1155,7 +1195,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req,
tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
spin_lock_irqsave(&ctx->work_lock, flags);
- wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+ wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
nr_tw_prev = ctx->work_items++;
spin_unlock_irqrestore(&ctx->work_lock, flags);
@@ -1192,9 +1232,16 @@ static void io_req_normal_work_add(struct io_kiocb *req)
{
struct io_uring_task *tctx = req->tctx;
struct io_ring_ctx *ctx = req->ctx;
+ unsigned long flags;
+ bool was_empty;
+
+ spin_lock_irqsave(&tctx->task_lock, flags);
+ was_empty = tctx->task_list.first == NULL;
+ wq_list_add_tail(&req->io_task_work.node, &tctx->task_list);
+ spin_unlock_irqrestore(&tctx->task_lock, flags);
/* task_work already pending, we're done */
- if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+ if (!was_empty)
return;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
@@ -1233,27 +1280,13 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
{
- struct io_ring_ctx *last_ctx = NULL;
- struct io_wq_work_node *node;
- unsigned long flags;
-
- spin_lock_irqsave(&ctx->work_lock, flags);
- node = ctx->work_list.first;
- INIT_WQ_LIST(&ctx->work_list);
- ctx->work_items = 0;
- spin_unlock_irqrestore(&ctx->work_lock, flags);
-
- while (node) {
- struct io_kiocb *req;
-
- req = container_of(node, struct io_kiocb, io_task_work.work_node);
- node = node->next;
- __io_fallback_tw(req, false, &last_ctx);
- }
- if (last_ctx) {
- flush_delayed_work(&last_ctx->fallback_work);
- percpu_ref_put(&last_ctx->refs);
- }
+ /*
+ * __io_fallback_tw() handles lists that can have multiple
+ * rings in it, which isn't the case here. But it'll work just
+ * fine, so use it anyway rather than have a special case for
+ * just a single ctx.
+ */
+ __io_fallback_tw(&ctx->work_list, &ctx->work_lock, false);
}
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1292,7 +1325,7 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
while (node) {
struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.work_node);
+ io_task_work.node);
node = node->next;
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
@@ -2967,7 +3000,7 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
io_unregister_personality(ctx, index);
mutex_unlock(&ctx->uring_lock);
- flush_delayed_work(&ctx->fallback_work);
+ flush_work(&ctx->fallback_work);
INIT_WORK(&ctx->exit_work, io_ring_exit_work);
/*
@@ -3106,7 +3139,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
if (tctx)
ret |= io_run_task_work() > 0;
else
- ret |= flush_delayed_work(&ctx->fallback_work);
+ ret |= flush_work(&ctx->fallback_work);
return ret;
}
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 2fae27803116..0b5181b128aa 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -91,8 +91,10 @@ void io_req_task_queue(struct io_kiocb *req);
void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
-struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
+struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
+ unsigned int *count, unsigned int max_entries);
+struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+ unsigned int max_entries, unsigned int *count);
void tctx_task_work(struct callback_head *cb);
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
int io_uring_alloc_task_context(struct task_struct *task,
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 6df5e649c413..615707260f25 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -221,7 +221,7 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
* than we were asked to process. Newly queued task_work isn't run until the
* retry list has been fully processed.
*/
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(struct io_wq_work_node **retry_list, int max_entries)
{
struct io_uring_task *tctx = current->io_uring;
unsigned int count = 0;
@@ -239,11 +239,11 @@ static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
return count;
}
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(struct io_wq_work_node *retry_list)
{
struct io_uring_task *tctx = current->io_uring;
- return retry_list || !llist_empty(&tctx->task_list);
+ return retry_list || READ_ONCE(tctx->task_list.first);
}
static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
@@ -259,7 +259,7 @@ static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
static int io_sq_thread(void *data)
{
- struct llist_node *retry_list = NULL;
+ struct io_wq_work_node *retry_list = NULL;
struct io_sq_data *sqd = data;
struct io_ring_ctx *ctx;
struct rusage start;
diff --git a/io_uring/tctx.c b/io_uring/tctx.c
index 503f3ff8bc4f..7155b3c56c85 100644
--- a/io_uring/tctx.c
+++ b/io_uring/tctx.c
@@ -87,7 +87,8 @@ __cold int io_uring_alloc_task_context(struct task_struct *task,
atomic_set(&tctx->in_cancel, 0);
atomic_set(&tctx->inflight_tracked, 0);
task->io_uring = tctx;
- init_llist_head(&tctx->task_list);
+ INIT_WQ_LIST(&tctx->task_list);
+ spin_lock_init(&tctx->task_lock);
init_task_work(&tctx->task_work, tctx_task_work);
return 0;
}
--
2.45.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 5/6] io_uring: add __tctx_task_work_run() helper
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
` (3 preceding siblings ...)
2024-11-22 16:12 ` [PATCH 4/6] io_uring: switch non-defer task_work to io_wq_work_list Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
2024-11-22 16:12 ` [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list Jens Axboe
5 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe
Most use cases only care about running all of the task_work, and they
don't need the node passed back or the work capped. Rename the existing
helper to __tctx_task_work_run(), and add a wrapper around that for the
more basic use cases.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/io_uring.c | 18 ++++++++++++------
io_uring/io_uring.h | 9 +++------
io_uring/sqpoll.c | 2 +-
3 files changed, 16 insertions(+), 13 deletions(-)
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 3bb93c77ac3f..bc520a67fc03 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -1137,9 +1137,9 @@ static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
__io_fallback_tw(&tctx->task_list, &tctx->task_lock, sync);
}
-struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
+struct io_wq_work_node *__tctx_task_work_run(struct io_uring_task *tctx,
+ unsigned int max_entries,
+ unsigned int *count)
{
struct io_wq_work_node *node;
@@ -1167,14 +1167,20 @@ struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
return node;
}
+unsigned int tctx_task_work_run(struct io_uring_task *tctx)
+{
+ unsigned int count = 0;
+
+ __tctx_task_work_run(tctx, UINT_MAX, &count);
+ return count;
+}
+
void tctx_task_work(struct callback_head *cb)
{
struct io_uring_task *tctx;
- unsigned int count = 0;
tctx = container_of(cb, struct io_uring_task, task_work);
- if (tctx_task_work_run(tctx, UINT_MAX, &count))
- WARN_ON_ONCE(1);
+ tctx_task_work_run(tctx);
}
static inline void io_req_local_work_add(struct io_kiocb *req,
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 0b5181b128aa..2b0e7c5db30d 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -93,8 +93,9 @@ void io_req_task_queue_fail(struct io_kiocb *req, int ret);
void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
unsigned int *count, unsigned int max_entries);
-struct io_wq_work_node *tctx_task_work_run(struct io_uring_task *tctx,
+struct io_wq_work_node *__tctx_task_work_run(struct io_uring_task *tctx,
unsigned int max_entries, unsigned int *count);
+unsigned int tctx_task_work_run(struct io_uring_task *tctx);
void tctx_task_work(struct callback_head *cb);
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
int io_uring_alloc_task_context(struct task_struct *task,
@@ -332,12 +333,8 @@ static inline int io_run_task_work(void)
resume_user_mode_work(NULL);
}
if (current->io_uring) {
- unsigned int count = 0;
-
__set_current_state(TASK_RUNNING);
- tctx_task_work_run(current->io_uring, UINT_MAX, &count);
- if (count)
- ret = true;
+ ret = tctx_task_work_run(current->io_uring) != 0;
}
}
if (task_work_pending(current)) {
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index 615707260f25..aec6c2d56910 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -232,7 +232,7 @@ static unsigned int io_sq_tw(struct io_wq_work_node **retry_list, int max_entrie
goto out;
max_entries -= count;
}
- *retry_list = tctx_task_work_run(tctx, max_entries, &count);
+ *retry_list = __tctx_task_work_run(tctx, max_entries, &count);
out:
if (task_work_pending(current))
task_work_run();
--
2.45.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 6/6] io_uring: make __tctx_task_work_run() take an io_wq_work_list
2024-11-22 16:12 [PATCHSET for-next 0/6] task work cleanups Jens Axboe
` (4 preceding siblings ...)
2024-11-22 16:12 ` [PATCH 5/6] io_uring: add __tctx_task_work_run() helper Jens Axboe
@ 2024-11-22 16:12 ` Jens Axboe
5 siblings, 0 replies; 12+ messages in thread
From: Jens Axboe @ 2024-11-22 16:12 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe
The normal task_work logic doesn't really need it, as it always runs
all of the pending work. But for SQPOLL, it can now pass in its
retry_list which simplifies the tracking of split up task_work running.
This avoids passing io_wq_work_node around. Rather than pass in a list,
SQPOLL could re-add the leftover items to the generic task_work list.
But that requires re-locking the task_lock and using task_list for that,
whereas having a separate retry list allows for skipping those steps.
The downside is that now two lists need checking, but that's now it
was before as well.
Signed-off-by: Jens Axboe <[email protected]>
---
io_uring/io_uring.c | 36 ++++++++++++++++--------------------
io_uring/io_uring.h | 9 +++++----
io_uring/sqpoll.c | 20 +++++++++++---------
3 files changed, 32 insertions(+), 33 deletions(-)
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index bc520a67fc03..5e52d8db3dca 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -1044,20 +1044,20 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
/*
* Run queued task_work, returning the number of entries processed in *count.
* If more entries than max_entries are available, stop processing once this
- * is reached and return the rest of the list.
+ * is reached.
*/
-struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
- unsigned int *count,
- unsigned int max_entries)
+void io_handle_tw_list(struct io_wq_work_list *list, unsigned int *count,
+ unsigned int max_entries)
{
struct io_ring_ctx *ctx = NULL;
struct io_tw_state ts = { };
do {
- struct io_wq_work_node *next = node->next;
+ struct io_wq_work_node *node = list->first;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
+ list->first = node->next;
if (req->ctx != ctx) {
ctx_flush_and_put(ctx, &ts);
ctx = req->ctx;
@@ -1067,17 +1067,15 @@ struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
req, &ts);
- node = next;
(*count)++;
if (unlikely(need_resched())) {
ctx_flush_and_put(ctx, &ts);
ctx = NULL;
cond_resched();
}
- } while (node && *count < max_entries);
+ } while (list->first && *count < max_entries);
ctx_flush_and_put(ctx, &ts);
- return node;
}
static __cold void __io_fallback_schedule(struct io_ring_ctx *ctx,
@@ -1137,41 +1135,39 @@ static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
__io_fallback_tw(&tctx->task_list, &tctx->task_lock, sync);
}
-struct io_wq_work_node *__tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
+void __tctx_task_work_run(struct io_uring_task *tctx,
+ struct io_wq_work_list *list,
+ unsigned int max_entries, unsigned int *count)
{
- struct io_wq_work_node *node;
-
if (unlikely(current->flags & PF_EXITING)) {
io_fallback_tw(tctx, true);
- return NULL;
+ return;
}
if (!READ_ONCE(tctx->task_list.first))
- return NULL;
+ return;
spin_lock_irq(&tctx->task_lock);
- node = tctx->task_list.first;
+ *list = tctx->task_list;
INIT_WQ_LIST(&tctx->task_list);
spin_unlock_irq(&tctx->task_lock);
- if (node)
- node = io_handle_tw_list(node, count, max_entries);
+ if (!wq_list_empty(list))
+ io_handle_tw_list(list, count, max_entries);
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
io_uring_drop_tctx_refs(current);
trace_io_uring_task_work_run(tctx, *count);
- return node;
}
unsigned int tctx_task_work_run(struct io_uring_task *tctx)
{
+ struct io_wq_work_list list;
unsigned int count = 0;
- __tctx_task_work_run(tctx, UINT_MAX, &count);
+ __tctx_task_work_run(tctx, &list, UINT_MAX, &count);
return count;
}
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 2b0e7c5db30d..74b1468aefda 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -91,10 +91,11 @@ void io_req_task_queue(struct io_kiocb *req);
void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
-struct io_wq_work_node *io_handle_tw_list(struct io_wq_work_node *node,
- unsigned int *count, unsigned int max_entries);
-struct io_wq_work_node *__tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries, unsigned int *count);
+void io_handle_tw_list(struct io_wq_work_list *list, unsigned int *count,
+ unsigned int max_entries);
+void __tctx_task_work_run(struct io_uring_task *tctx,
+ struct io_wq_work_list *list, unsigned int max_entries,
+ unsigned int *count);
unsigned int tctx_task_work_run(struct io_uring_task *tctx);
void tctx_task_work(struct callback_head *cb);
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
index aec6c2d56910..3cd50369db5a 100644
--- a/io_uring/sqpoll.c
+++ b/io_uring/sqpoll.c
@@ -221,29 +221,29 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
* than we were asked to process. Newly queued task_work isn't run until the
* retry list has been fully processed.
*/
-static unsigned int io_sq_tw(struct io_wq_work_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(struct io_wq_work_list *retry_list, int max_entries)
{
struct io_uring_task *tctx = current->io_uring;
unsigned int count = 0;
- if (*retry_list) {
- *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
+ if (!wq_list_empty(retry_list)) {
+ io_handle_tw_list(retry_list, &count, max_entries);
if (count >= max_entries)
goto out;
max_entries -= count;
}
- *retry_list = __tctx_task_work_run(tctx, max_entries, &count);
+ __tctx_task_work_run(tctx, retry_list, max_entries, &count);
out:
if (task_work_pending(current))
task_work_run();
return count;
}
-static bool io_sq_tw_pending(struct io_wq_work_node *retry_list)
+static bool io_sq_tw_pending(struct io_wq_work_list *retry_list)
{
struct io_uring_task *tctx = current->io_uring;
- return retry_list || READ_ONCE(tctx->task_list.first);
+ return !wq_list_empty(retry_list) || !wq_list_empty(&tctx->task_list);
}
static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
@@ -259,7 +259,7 @@ static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
static int io_sq_thread(void *data)
{
- struct io_wq_work_node *retry_list = NULL;
+ struct io_wq_work_list retry_list;
struct io_sq_data *sqd = data;
struct io_ring_ctx *ctx;
struct rusage start;
@@ -292,6 +292,7 @@ static int io_sq_thread(void *data)
audit_uring_entry(IORING_OP_NOP);
audit_uring_exit(true, 0);
+ INIT_WQ_LIST(&retry_list);
mutex_lock(&sqd->lock);
while (1) {
bool cap_entries, sqt_spin = false;
@@ -332,7 +333,8 @@ static int io_sq_thread(void *data)
}
prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
- if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
+ if (!io_sqd_events_pending(sqd) &&
+ !io_sq_tw_pending(&retry_list)) {
bool needs_sched = true;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -371,7 +373,7 @@ static int io_sq_thread(void *data)
timeout = jiffies + sqd->sq_thread_idle;
}
- if (retry_list)
+ if (!wq_list_empty(&retry_list))
io_sq_tw(&retry_list, UINT_MAX);
io_uring_cancel_generic(true, sqd);
--
2.45.2
^ permalink raw reply related [flat|nested] 12+ messages in thread