* [PATCH 01/11] io-wq: fix worker counting after worker received exit signal
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-07-05 12:10 ` Pavel Begunkov
2023-06-09 12:20 ` [PATCH 02/11] io-wq: add a new worker flag to indicate worker exit Hao Xu
` (11 subsequent siblings)
12 siblings, 1 reply; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
acct->nr_workers should be decremented when we break the loop in
io_wq_worker().
Fixes: 78f8876c2d9f ("io-wq: exclusively gate signal based exit on get_signal() return")
Signed-off-by: Hao Xu <[email protected]>
---
io_uring/io-wq.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index b2715988791e..b70eebec2845 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -634,6 +634,10 @@ static int io_wq_worker(void *data)
if (!get_signal(&ksig))
continue;
+
+ raw_spin_lock(&wq->lock);
+ acct->nr_workers--;
+ raw_spin_unlock(&wq->lock);
break;
}
if (!ret) {
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [PATCH 01/11] io-wq: fix worker counting after worker received exit signal
2023-06-09 12:20 ` [PATCH 01/11] io-wq: fix worker counting after worker received exit signal Hao Xu
@ 2023-07-05 12:10 ` Pavel Begunkov
0 siblings, 0 replies; 27+ messages in thread
From: Pavel Begunkov @ 2023-07-05 12:10 UTC (permalink / raw)
To: Hao Xu, io-uring; +Cc: Jens Axboe, Wanpeng Li, linux-fsdevel
On 6/9/23 13:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> acct->nr_workers should be decremented when we break the loop in
> io_wq_worker().
>
> Fixes: 78f8876c2d9f ("io-wq: exclusively gate signal based exit on get_signal() return")
> Signed-off-by: Hao Xu <[email protected]>
> ---
> io_uring/io-wq.c | 4 ++++
> 1 file changed, 4 insertions(+)
>
> diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
> index b2715988791e..b70eebec2845 100644
> --- a/io_uring/io-wq.c
> +++ b/io_uring/io-wq.c
> @@ -634,6 +634,10 @@ static int io_wq_worker(void *data)
>
> if (!get_signal(&ksig))
> continue;
> +
> + raw_spin_lock(&wq->lock);
> + acct->nr_workers--;
> + raw_spin_unlock(&wq->lock);
Wouldn't it suffer the same race you fixed with the following?
commit 767a65e9f31789d80e41edd03a802314905e8fbf
Author: Hao Xu <[email protected]>
Date: Sun Sep 12 03:40:52 2021 +0800
io-wq: fix potential race of acct->nr_workers
Even more, seems we fail to decrement nr_workers when the loop condition
fails, i.e.
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
I.e. the patch looks legit, but what we currently have is a mess and we
have more work to do.
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 27+ messages in thread
* [PATCH 02/11] io-wq: add a new worker flag to indicate worker exit
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
2023-06-09 12:20 ` [PATCH 01/11] io-wq: fix worker counting after worker received exit signal Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-07-05 12:16 ` Pavel Begunkov
2023-06-09 12:20 ` [PATCH 03/11] io-wq: add a new type io-wq worker Hao Xu
` (10 subsequent siblings)
12 siblings, 1 reply; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
Add a new worker flag IO_WORKER_F_EXIT to indicate a worker is going to
exit. This is important for fixed workers.
Signed-off-by: Hao Xu <[email protected]>
---
io_uring/io-wq.c | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index b70eebec2845..1717f1465613 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -29,6 +29,7 @@ enum {
IO_WORKER_F_RUNNING = 2, /* account as running */
IO_WORKER_F_FREE = 4, /* worker on free list */
IO_WORKER_F_BOUND = 8, /* is doing bounded work */
+ IO_WORKER_F_EXIT = 16, /* worker is exiting */
};
enum {
@@ -592,6 +593,11 @@ static void io_worker_handle_work(struct io_worker *worker)
} while (1);
}
+static bool is_worker_exiting(struct io_worker *worker)
+{
+ return worker->flags & IO_WORKER_F_EXIT;
+}
+
static int io_wq_worker(void *data)
{
struct io_worker *worker = data;
@@ -609,7 +615,7 @@ static int io_wq_worker(void *data)
long ret;
set_current_state(TASK_INTERRUPTIBLE);
- while (io_acct_run_queue(acct))
+ while (!is_worker_exiting(worker) && io_acct_run_queue(acct))
io_worker_handle_work(worker);
raw_spin_lock(&wq->lock);
@@ -628,6 +634,12 @@ static int io_wq_worker(void *data)
raw_spin_unlock(&wq->lock);
if (io_run_task_work())
continue;
+ if (is_worker_exiting(worker)) {
+ raw_spin_lock(&wq->lock);
+ acct->nr_workers--;
+ raw_spin_unlock(&wq->lock);
+ break;
+ }
ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
if (signal_pending(current)) {
struct ksignal ksig;
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [PATCH 02/11] io-wq: add a new worker flag to indicate worker exit
2023-06-09 12:20 ` [PATCH 02/11] io-wq: add a new worker flag to indicate worker exit Hao Xu
@ 2023-07-05 12:16 ` Pavel Begunkov
0 siblings, 0 replies; 27+ messages in thread
From: Pavel Begunkov @ 2023-07-05 12:16 UTC (permalink / raw)
To: Hao Xu, io-uring; +Cc: Jens Axboe, Wanpeng Li, linux-fsdevel
On 6/9/23 13:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> Add a new worker flag IO_WORKER_F_EXIT to indicate a worker is going to
> exit. This is important for fixed workers.
nit: would be nice to add a small sentence _how_ it's important
for fixed workers
> Signed-off-by: Hao Xu <[email protected]>
> ---
> io_uring/io-wq.c | 14 +++++++++++++-
> 1 file changed, 13 insertions(+), 1 deletion(-)
>
> diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
> index b70eebec2845..1717f1465613 100644
> --- a/io_uring/io-wq.c
> +++ b/io_uring/io-wq.c
> @@ -29,6 +29,7 @@ enum {
> IO_WORKER_F_RUNNING = 2, /* account as running */
> IO_WORKER_F_FREE = 4, /* worker on free list */
> IO_WORKER_F_BOUND = 8, /* is doing bounded work */
> + IO_WORKER_F_EXIT = 16, /* worker is exiting */
> };
>
> enum {
> @@ -592,6 +593,11 @@ static void io_worker_handle_work(struct io_worker *worker)
> } while (1);
> }
>
> +static bool is_worker_exiting(struct io_worker *worker)
> +{
> + return worker->flags & IO_WORKER_F_EXIT;
> +}
> +
> static int io_wq_worker(void *data)
> {
> struct io_worker *worker = data;
> @@ -609,7 +615,7 @@ static int io_wq_worker(void *data)
> long ret;
>
> set_current_state(TASK_INTERRUPTIBLE);
> - while (io_acct_run_queue(acct))
> + while (!is_worker_exiting(worker) && io_acct_run_queue(acct))
> io_worker_handle_work(worker);
Why it differs from the condition in io_wq_dec_running()? Would
sth like this work?
bool io_worker_run_queue(worker) {
return !is_worker_exiting(worker) &&
io_acct_run_queue(worker_get_acct(worker));
}
>
> raw_spin_lock(&wq->lock);
> @@ -628,6 +634,12 @@ static int io_wq_worker(void *data)
> raw_spin_unlock(&wq->lock);
> if (io_run_task_work())
> continue;
> + if (is_worker_exiting(worker)) {
> + raw_spin_lock(&wq->lock);
> + acct->nr_workers--;
> + raw_spin_unlock(&wq->lock);
> + break;
> + }
> ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
> if (signal_pending(current)) {
> struct ksignal ksig;
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 27+ messages in thread
* [PATCH 03/11] io-wq: add a new type io-wq worker
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
2023-06-09 12:20 ` [PATCH 01/11] io-wq: fix worker counting after worker received exit signal Hao Xu
2023-06-09 12:20 ` [PATCH 02/11] io-wq: add a new worker flag to indicate worker exit Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-07-05 12:26 ` Pavel Begunkov
2023-06-09 12:20 ` [PATCH 04/11] io-wq: add fixed worker members in io_wq_acct Hao Xu
` (9 subsequent siblings)
12 siblings, 1 reply; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
Add a new type io-wq worker IO_WORKER_F_FIXED, this type of worker
exists during the whole io-wq lifecycle.
Signed-off-by: Hao Xu <[email protected]>
---
io_uring/io-wq.c | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 1717f1465613..7326fef58ca7 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -30,6 +30,7 @@ enum {
IO_WORKER_F_FREE = 4, /* worker on free list */
IO_WORKER_F_BOUND = 8, /* is doing bounded work */
IO_WORKER_F_EXIT = 16, /* worker is exiting */
+ IO_WORKER_F_FIXED = 32, /* is a fixed worker */
};
enum {
@@ -598,6 +599,11 @@ static bool is_worker_exiting(struct io_worker *worker)
return worker->flags & IO_WORKER_F_EXIT;
}
+static bool is_fixed_worker(struct io_worker *worker)
+{
+ return worker->flags & IO_WORKER_F_FIXED;
+}
+
static int io_wq_worker(void *data)
{
struct io_worker *worker = data;
@@ -622,8 +628,13 @@ static int io_wq_worker(void *data)
/*
* Last sleep timed out. Exit if we're not the last worker,
* or if someone modified our affinity.
+ * Note: fixed worker always have same lifecycle as io-wq
+ * itself, and cpu affinity setting doesn't work well for
+ * fixed worker, they can be manually reset to cpu other than
+ * the cpuset indicated by io_wq_worker_affinity()
*/
- if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
+ if (!is_fixed_worker(worker) && last_timeout &&
+ (exit_mask || acct->nr_workers > 1)) {
acct->nr_workers--;
raw_spin_unlock(&wq->lock);
__set_current_state(TASK_RUNNING);
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [PATCH 03/11] io-wq: add a new type io-wq worker
2023-06-09 12:20 ` [PATCH 03/11] io-wq: add a new type io-wq worker Hao Xu
@ 2023-07-05 12:26 ` Pavel Begunkov
0 siblings, 0 replies; 27+ messages in thread
From: Pavel Begunkov @ 2023-07-05 12:26 UTC (permalink / raw)
To: Hao Xu, io-uring; +Cc: Jens Axboe, Wanpeng Li, linux-fsdevel
On 6/9/23 13:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> Add a new type io-wq worker IO_WORKER_F_FIXED, this type of worker
> exists during the whole io-wq lifecycle.
>
> Signed-off-by: Hao Xu <[email protected]>
> ---
> io_uring/io-wq.c | 13 ++++++++++++-
> 1 file changed, 12 insertions(+), 1 deletion(-)
>
> diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
> index 1717f1465613..7326fef58ca7 100644
> --- a/io_uring/io-wq.c
> +++ b/io_uring/io-wq.c
> @@ -30,6 +30,7 @@ enum {
> IO_WORKER_F_FREE = 4, /* worker on free list */
> IO_WORKER_F_BOUND = 8, /* is doing bounded work */
> IO_WORKER_F_EXIT = 16, /* worker is exiting */
> + IO_WORKER_F_FIXED = 32, /* is a fixed worker */
> };
>
> enum {
> @@ -598,6 +599,11 @@ static bool is_worker_exiting(struct io_worker *worker)
> return worker->flags & IO_WORKER_F_EXIT;
> }
>
> +static bool is_fixed_worker(struct io_worker *worker)
> +{
> + return worker->flags & IO_WORKER_F_FIXED;
> +}
You move it up in Patch 5/11, I suggest to move it to the top of the
file here.
> static int io_wq_worker(void *data)
> {
> struct io_worker *worker = data;
> @@ -622,8 +628,13 @@ static int io_wq_worker(void *data)
> /*
> * Last sleep timed out. Exit if we're not the last worker,
> * or if someone modified our affinity.
> + * Note: fixed worker always have same lifecycle as io-wq
> + * itself, and cpu affinity setting doesn't work well for
> + * fixed worker, they can be manually reset to cpu other than
> + * the cpuset indicated by io_wq_worker_affinity()
> */
> - if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
> + if (!is_fixed_worker(worker) && last_timeout &&
> + (exit_mask || acct->nr_workers > 1)) {
> acct->nr_workers--;
> raw_spin_unlock(&wq->lock);
> __set_current_state(TASK_RUNNING);
If there is no work it'll continue to loop every
WORKER_IDLE_TIMEOUT (5 * HZ), which sounds troublesome with many
workers in the system.
tm = is_fixed_worker(worker) ? MAX_SCHEDULE_TIMEOUT : WORKER_IDLE_TIMEOUT;
schedule_timeout(tm);
Maybe?
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 27+ messages in thread
* [PATCH 04/11] io-wq: add fixed worker members in io_wq_acct
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (2 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 03/11] io-wq: add a new type io-wq worker Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-06-09 12:20 ` [PATCH 05/11] io-wq: add a new parameter for creating a new fixed worker Hao Xu
` (8 subsequent siblings)
12 siblings, 0 replies; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
Add fixed worker related members in io_wq_acct.
Signed-off-by: Hao Xu <[email protected]>
---
io_uring/io-wq.c | 2 ++
1 file changed, 2 insertions(+)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 7326fef58ca7..bf9e9af8d9ca 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -84,6 +84,8 @@ struct io_wq_acct {
raw_spinlock_t lock;
struct io_wq_work_list work_list;
unsigned long flags;
+ struct io_worker **fixed_workers;
+ unsigned int fixed_nr;
};
enum {
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [PATCH 05/11] io-wq: add a new parameter for creating a new fixed worker
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (3 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 04/11] io-wq: add fixed worker members in io_wq_acct Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-07-05 12:54 ` Pavel Begunkov
2023-06-09 12:20 ` [PATCH 06/11] io-wq: return io_worker after successful inline worker creation Hao Xu
` (7 subsequent siblings)
12 siblings, 1 reply; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
Add a new parameter when creating new workers to indicate if users
want a normal or fixed worker.
Signed-off-by: Hao Xu <[email protected]>
---
io_uring/io-wq.c | 33 ++++++++++++++++++++-------------
1 file changed, 20 insertions(+), 13 deletions(-)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index bf9e9af8d9ca..048856eef4d4 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -137,7 +137,7 @@ struct io_cb_cancel_data {
bool cancel_all;
};
-static bool create_io_worker(struct io_wq *wq, int index);
+static bool create_io_worker(struct io_wq *wq, int index, bool fixed);
static void io_wq_dec_running(struct io_worker *worker);
static bool io_acct_cancel_pending_work(struct io_wq *wq,
struct io_wq_acct *acct,
@@ -284,7 +284,8 @@ static bool io_wq_activate_free_worker(struct io_wq *wq,
* We need a worker. If we find a free one, we're good. If not, and we're
* below the max number of workers, create one.
*/
-static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
+static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct,
+ bool fixed)
{
/*
* Most likely an attempt to queue unbounded work on an io_wq that
@@ -302,7 +303,7 @@ static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
raw_spin_unlock(&wq->lock);
atomic_inc(&acct->nr_running);
atomic_inc(&wq->worker_refs);
- return create_io_worker(wq, acct->index);
+ return create_io_worker(wq, acct->index, fixed);
}
static void io_wq_inc_running(struct io_worker *worker)
@@ -312,6 +313,11 @@ static void io_wq_inc_running(struct io_worker *worker)
atomic_inc(&acct->nr_running);
}
+static bool is_fixed_worker(struct io_worker *worker)
+{
+ return worker->flags & IO_WORKER_F_FIXED;
+}
+
static void create_worker_cb(struct callback_head *cb)
{
struct io_worker *worker;
@@ -331,7 +337,7 @@ static void create_worker_cb(struct callback_head *cb)
}
raw_spin_unlock(&wq->lock);
if (do_create) {
- create_io_worker(wq, worker->create_index);
+ create_io_worker(wq, worker->create_index, is_fixed_worker(worker));
} else {
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
@@ -398,6 +404,8 @@ static void io_wq_dec_running(struct io_worker *worker)
return;
if (!io_acct_run_queue(acct))
return;
+ if (is_fixed_worker(worker))
+ return;
atomic_inc(&acct->nr_running);
atomic_inc(&wq->worker_refs);
@@ -601,11 +609,6 @@ static bool is_worker_exiting(struct io_worker *worker)
return worker->flags & IO_WORKER_F_EXIT;
}
-static bool is_fixed_worker(struct io_worker *worker)
-{
- return worker->flags & IO_WORKER_F_FIXED;
-}
-
static int io_wq_worker(void *data)
{
struct io_worker *worker = data;
@@ -806,7 +809,7 @@ static void io_workqueue_create(struct work_struct *work)
kfree(worker);
}
-static bool create_io_worker(struct io_wq *wq, int index)
+static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
{
struct io_wq_acct *acct = &wq->acct[index];
struct io_worker *worker;
@@ -833,10 +836,14 @@ static bool create_io_worker(struct io_wq *wq, int index)
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
+ if (fixed)
+ worker->flags |= IO_WORKER_F_FIXED;
+
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
- io_init_new_worker(wq, worker, tsk);
- } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+ if (!fixed)
+ io_init_new_worker(wq, worker, tsk);
+ } else if (fixed || !io_should_retry_thread(PTR_ERR(tsk))) {
kfree(worker);
goto fail;
} else {
@@ -947,7 +954,7 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
!atomic_read(&acct->nr_running))) {
bool did_create;
- did_create = io_wq_create_worker(wq, acct);
+ did_create = io_wq_create_worker(wq, acct, false);
if (likely(did_create))
return;
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [PATCH 05/11] io-wq: add a new parameter for creating a new fixed worker
2023-06-09 12:20 ` [PATCH 05/11] io-wq: add a new parameter for creating a new fixed worker Hao Xu
@ 2023-07-05 12:54 ` Pavel Begunkov
0 siblings, 0 replies; 27+ messages in thread
From: Pavel Begunkov @ 2023-07-05 12:54 UTC (permalink / raw)
To: Hao Xu, io-uring; +Cc: Jens Axboe, Wanpeng Li, linux-fsdevel
On 6/9/23 13:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> Add a new parameter when creating new workers to indicate if users
> want a normal or fixed worker.
>
> Signed-off-by: Hao Xu <[email protected]>
> ---
> io_uring/io-wq.c | 33 ++++++++++++++++++++-------------
> 1 file changed, 20 insertions(+), 13 deletions(-)
>
> diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
> index bf9e9af8d9ca..048856eef4d4 100644
> --- a/io_uring/io-wq.c
> +++ b/io_uring/io-wq.c
[...]
>
> +static bool is_fixed_worker(struct io_worker *worker)
> +{
> + return worker->flags & IO_WORKER_F_FIXED;
> +}
That's what I mentioned in the other comment.
> +
> static void create_worker_cb(struct callback_head *cb)
> {
> struct io_worker *worker;
> @@ -331,7 +337,7 @@ static void create_worker_cb(struct callback_head *cb)
> }
> raw_spin_unlock(&wq->lock);
> if (do_create) {
> - create_io_worker(wq, worker->create_index);
> + create_io_worker(wq, worker->create_index, is_fixed_worker(worker));
> } else {
> atomic_dec(&acct->nr_running);
> io_worker_ref_put(wq);
> @@ -398,6 +404,8 @@ static void io_wq_dec_running(struct io_worker *worker)
> return;
> if (!io_acct_run_queue(acct))
> return;
> + if (is_fixed_worker(worker))
> + return;
Aha, it's here. I was thinking about it a little bit more.
Is it even correct? If you have a mixed fixed/non-fixed setup
you presumably want non-fixed workers to kick in such situations.
I don't remember this creation voodoo well, maybe Jens does have
an idea.
>
> atomic_inc(&acct->nr_running);
> atomic_inc(&wq->worker_refs);
> @@ -601,11 +609,6 @@ static bool is_worker_exiting(struct io_worker *worker)
> return worker->flags & IO_WORKER_F_EXIT;
> }
[...]
> -static bool create_io_worker(struct io_wq *wq, int index)
> +static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
> {
> struct io_wq_acct *acct = &wq->acct[index];
> struct io_worker *worker;
> @@ -833,10 +836,14 @@ static bool create_io_worker(struct io_wq *wq, int index)
> if (index == IO_WQ_ACCT_BOUND)
> worker->flags |= IO_WORKER_F_BOUND;
>
> + if (fixed)
> + worker->flags |= IO_WORKER_F_FIXED;
> +
> tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
> if (!IS_ERR(tsk)) {
> - io_init_new_worker(wq, worker, tsk);
> - } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
> + if (!fixed)
> + io_init_new_worker(wq, worker, tsk);
Why do we skip io_init_new_worker()? I assume you putting it
into lists, but what about the rest? I.e.
tsk->worker_private = worker;
worker->task = tsk;
set_cpus_allowed_ptr(tsk, wq->cpu_mask);
> + } else if (fixed || !io_should_retry_thread(PTR_ERR(tsk))) {
> kfree(worker);
> goto fail;
> } else {
> @@ -947,7 +954,7 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
> !atomic_read(&acct->nr_running))) {
> bool did_create;
>
> - did_create = io_wq_create_worker(wq, acct);
> + did_create = io_wq_create_worker(wq, acct, false);
> if (likely(did_create))
> return;
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 27+ messages in thread
* [PATCH 06/11] io-wq: return io_worker after successful inline worker creation
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (4 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 05/11] io-wq: add a new parameter for creating a new fixed worker Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-07-05 13:05 ` Pavel Begunkov
2023-06-09 12:20 ` [PATCH 07/11] io_uring: add new api to register fixed workers Hao Xu
` (6 subsequent siblings)
12 siblings, 1 reply; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
After creating a io worker inline successfully, return the io_worker
structure. This is used by fixed worker.
Signed-off-by: Hao Xu <[email protected]>
---
io_uring/io-wq.c | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 048856eef4d4..4338e5b23b07 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -137,7 +137,7 @@ struct io_cb_cancel_data {
bool cancel_all;
};
-static bool create_io_worker(struct io_wq *wq, int index, bool fixed);
+static struct io_worker *create_io_worker(struct io_wq *wq, int index, bool fixed);
static void io_wq_dec_running(struct io_worker *worker);
static bool io_acct_cancel_pending_work(struct io_wq *wq,
struct io_wq_acct *acct,
@@ -284,8 +284,8 @@ static bool io_wq_activate_free_worker(struct io_wq *wq,
* We need a worker. If we find a free one, we're good. If not, and we're
* below the max number of workers, create one.
*/
-static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct,
- bool fixed)
+static struct io_worker *io_wq_create_worker(struct io_wq *wq,
+ struct io_wq_acct *acct, bool fixed)
{
/*
* Most likely an attempt to queue unbounded work on an io_wq that
@@ -297,7 +297,7 @@ static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct,
raw_spin_lock(&wq->lock);
if (acct->nr_workers >= acct->max_workers) {
raw_spin_unlock(&wq->lock);
- return true;
+ return NULL;
}
acct->nr_workers++;
raw_spin_unlock(&wq->lock);
@@ -809,11 +809,11 @@ static void io_workqueue_create(struct work_struct *work)
kfree(worker);
}
-static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
+static struct io_worker *create_io_worker(struct io_wq *wq, int index, bool fixed)
{
struct io_wq_acct *acct = &wq->acct[index];
struct io_worker *worker;
- struct task_struct *tsk;
+ struct task_struct *tsk = NULL;
__set_current_state(TASK_RUNNING);
@@ -825,7 +825,7 @@ static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
acct->nr_workers--;
raw_spin_unlock(&wq->lock);
io_worker_ref_put(wq);
- return false;
+ return tsk ? (struct io_worker *)tsk : ERR_PTR(-ENOMEM);
}
refcount_set(&worker->ref, 1);
@@ -841,8 +841,8 @@ static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
- if (!fixed)
- io_init_new_worker(wq, worker, tsk);
+ io_init_new_worker(wq, worker, tsk);
+ return worker;
} else if (fixed || !io_should_retry_thread(PTR_ERR(tsk))) {
kfree(worker);
goto fail;
@@ -851,7 +851,7 @@ static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
schedule_work(&worker->work);
}
- return true;
+ return (struct io_worker *)tsk;
}
/*
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [PATCH 06/11] io-wq: return io_worker after successful inline worker creation
2023-06-09 12:20 ` [PATCH 06/11] io-wq: return io_worker after successful inline worker creation Hao Xu
@ 2023-07-05 13:05 ` Pavel Begunkov
0 siblings, 0 replies; 27+ messages in thread
From: Pavel Begunkov @ 2023-07-05 13:05 UTC (permalink / raw)
To: Hao Xu, io-uring; +Cc: Jens Axboe, Wanpeng Li, linux-fsdevel
On 6/9/23 13:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> After creating a io worker inline successfully, return the io_worker
> structure. This is used by fixed worker.
>
> Signed-off-by: Hao Xu <[email protected]>
> ---
> io_uring/io-wq.c | 20 ++++++++++----------
> 1 file changed, 10 insertions(+), 10 deletions(-)
>
> diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
> index 048856eef4d4..4338e5b23b07 100644
> --- a/io_uring/io-wq.c
> +++ b/io_uring/io-wq.c
> @@ -137,7 +137,7 @@ struct io_cb_cancel_data {
> bool cancel_all;
> };
>
> -static bool create_io_worker(struct io_wq *wq, int index, bool fixed);
> +static struct io_worker *create_io_worker(struct io_wq *wq, int index, bool fixed);
> static void io_wq_dec_running(struct io_worker *worker);
> static bool io_acct_cancel_pending_work(struct io_wq *wq,
> struct io_wq_acct *acct,
> @@ -284,8 +284,8 @@ static bool io_wq_activate_free_worker(struct io_wq *wq,
> * We need a worker. If we find a free one, we're good. If not, and we're
> * below the max number of workers, create one.
> */
> -static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct,
> - bool fixed)
> +static struct io_worker *io_wq_create_worker(struct io_wq *wq,
> + struct io_wq_acct *acct, bool fixed)
> {
> /*
> * Most likely an attempt to queue unbounded work on an io_wq that
> @@ -297,7 +297,7 @@ static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct,
> raw_spin_lock(&wq->lock);
> if (acct->nr_workers >= acct->max_workers) {
> raw_spin_unlock(&wq->lock);
> - return true;
> + return NULL;
Something is not right here. The function could succeed even if it didn't
create a new worker. Now it's a failure.
> }
> acct->nr_workers++;
> raw_spin_unlock(&wq->lock);
> @@ -809,11 +809,11 @@ static void io_workqueue_create(struct work_struct *work)
> kfree(worker);
> }
>
> -static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
> +static struct io_worker *create_io_worker(struct io_wq *wq, int index, bool fixed)
> {
> struct io_wq_acct *acct = &wq->acct[index];
> struct io_worker *worker;
> - struct task_struct *tsk;
> + struct task_struct *tsk = NULL;
>
> __set_current_state(TASK_RUNNING);
>
> @@ -825,7 +825,7 @@ static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
> acct->nr_workers--;
> raw_spin_unlock(&wq->lock);
> io_worker_ref_put(wq);
> - return false;
> + return tsk ? (struct io_worker *)tsk : ERR_PTR(-ENOMEM);
How it this conversion valid? I don't remember us overlaying struct
io_worker onto task_struct
> }
>
> refcount_set(&worker->ref, 1);
> @@ -841,8 +841,8 @@ static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
>
> tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
> if (!IS_ERR(tsk)) {
> - if (!fixed)
> - io_init_new_worker(wq, worker, tsk);
> + io_init_new_worker(wq, worker, tsk);
> + return worker;
> } else if (fixed || !io_should_retry_thread(PTR_ERR(tsk))) {
> kfree(worker);
> goto fail;
> @@ -851,7 +851,7 @@ static bool create_io_worker(struct io_wq *wq, int index, bool fixed)
> schedule_work(&worker->work);
> }
>
> - return true;
> + return (struct io_worker *)tsk;
> }
>
> /*
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 27+ messages in thread
* [PATCH 07/11] io_uring: add new api to register fixed workers
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (5 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 06/11] io-wq: return io_worker after successful inline worker creation Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-06-09 13:07 ` Ammar Faizi
` (2 more replies)
2023-06-09 12:20 ` [PATCH 08/11] io_uring: add function to unregister " Hao Xu
` (5 subsequent siblings)
12 siblings, 3 replies; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
Add a new api to register fixed workers. The api is designed to register
fixed workers for the current task. For simplicity, it doesn't allow
worker number update. We have a separate unregister api to uninstall all
the fixed workers. And then we can register different number of fixed
workers again.
Signed-off-by: Hao Xu <[email protected]>
---
include/uapi/linux/io_uring.h | 9 ++++
io_uring/io-wq.c | 85 +++++++++++++++++++++++++++++++++++
io_uring/io-wq.h | 1 +
io_uring/io_uring.c | 71 +++++++++++++++++++++++++++++
4 files changed, 166 insertions(+)
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index f222d263bc55..6dc43be5009d 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -535,6 +535,9 @@ enum {
/* register a range of fixed file slots for automatic slot allocation */
IORING_REGISTER_FILE_ALLOC_RANGE = 25,
+ /* set/get number of fixed workers */
+ IORING_REGISTER_IOWQ_FIXED_WORKERS = 26,
+
/* this goes last */
IORING_REGISTER_LAST,
@@ -715,6 +718,12 @@ struct io_uring_recvmsg_out {
__u32 flags;
};
+struct io_uring_fixed_worker_arg {
+ __u32 nr_workers;
+ __u32 resv;
+ __u64 resv2[3];
+};
+
#ifdef __cplusplus
}
#endif
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 4338e5b23b07..28f13c1c38f4 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -1371,6 +1371,91 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
return 0;
}
+static void io_wq_clean_fixed_workers(struct io_wq *wq)
+{
+ int i, j;
+
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ struct io_wq_acct *acct = &wq->acct[i];
+ struct io_worker **workers = acct->fixed_workers;
+
+ if (!workers)
+ continue;
+
+ for (j = 0; j < acct->fixed_nr; j++) {
+ if (!workers[j])
+ continue;
+ workers[j]->flags |= IO_WORKER_F_EXIT;
+ wake_up_process(worker->task);
+ }
+ kfree(workers);
+ }
+}
+
+/*
+ * Set number of fixed workers.
+ */
+int io_wq_fixed_workers(struct io_wq *wq, struct io_uring_fixed_worker_arg *count)
+{
+ struct io_wq_acct *acct;
+ int i, j, ret = 0;
+
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ if (wq->acct[i].fixed_nr) {
+ ret = -EBUSY;
+ break;
+ }
+ }
+ if (ret)
+ return ret;
+
+ BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
+ BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
+ BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
+
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ if (count[i].nr_workers > task_rlimit(current, RLIMIT_NPROC))
+ count[i].nr_workers =
+ task_rlimit(current, RLIMIT_NPROC);
+ }
+
+ rcu_read_lock();
+
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ unsigned int nr = count[i].nr_workers;
+
+ acct = &wq->acct[i];
+ acct->fixed_nr = nr;
+ acct->fixed_workers = kcalloc(nr, sizeof(struct io_worker *),
+ GFP_KERNEL);
+ if (!acct->fixed_workers) {
+ ret = -ENOMEM;
+ break;
+ }
+
+ for (j = 0; j < nr; j++) {
+ struct io_worker *worker =
+ io_wq_create_worker(wq, acct, true);
+ if (IS_ERR(worker)) {
+ ret = PTR_ERR(worker);
+ break;
+ }
+ acct->fixed_workers[j] = worker;
+ }
+ if (j < nr)
+ break;
+ }
+ rcu_read_unlock();
+
+ if (ret)
+ goto err;
+ return 0;
+
+err:
+ io_wq_clean_fixed_workers(wq);
+ return ret;
+}
+
static __init int io_wq_init(void)
{
int ret;
diff --git a/io_uring/io-wq.h b/io_uring/io-wq.h
index 31228426d192..88a1ee9fde24 100644
--- a/io_uring/io-wq.h
+++ b/io_uring/io-wq.h
@@ -52,6 +52,7 @@ void io_wq_hash_work(struct io_wq_work *work, void *val);
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask);
int io_wq_max_workers(struct io_wq *wq, int *new_count);
+int io_wq_fixed_workers(struct io_wq *wq, struct io_uring_fixed_worker_arg *count);
static inline bool io_wq_is_hashed(struct io_wq_work *work)
{
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index c99a7a0c3f21..bb8342b4a2c6 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -4351,6 +4351,71 @@ static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
return ret;
}
+/*
+ * note: this function sets fixed workers for a single task, so every
+ * task which wants to set the fixed workers has to call this function
+ */
+static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
+ void __user *arg, int nr_args)
+ __must_hold(&ctx->uring_lock)
+{
+ struct io_uring_task *tctx = NULL;
+ struct io_sq_data *sqd = NULL;
+ struct io_uring_fixed_worker_arg *res;
+ size_t size;
+ int i, ret;
+ bool zero = true;
+
+ size = array_size(nr_args, sizeof(*res));
+ if (size == SIZE_MAX)
+ return -EOVERFLOW;
+
+ res = memdup_user(arg, size);
+ if (IS_ERR(res))
+ return PTR_ERR(res);
+
+ for (i = 0; i < nr_args; i++) {
+ if (res[i].nr_workers) {
+ zero = false;
+ break;
+ }
+ }
+
+ if (zero)
+ return 0;
+
+ if (ctx->flags & IORING_SETUP_SQPOLL) {
+ sqd = ctx->sq_data;
+ if (sqd) {
+ /*
+ * Observe the correct sqd->lock -> ctx->uring_lock
+ * ordering. Fine to drop uring_lock here, we hold
+ * a ref to the ctx.
+ */
+ refcount_inc(&sqd->refs);
+ mutex_unlock(&ctx->uring_lock);
+ mutex_lock(&sqd->lock);
+ mutex_lock(&ctx->uring_lock);
+ if (sqd->thread)
+ tctx = sqd->thread->io_uring;
+ }
+ } else {
+ tctx = current->io_uring;
+ }
+
+ if (tctx && tctx->io_wq)
+ ret = io_wq_fixed_workers(tctx->io_wq, res);
+ else
+ ret = -EFAULT;
+
+ if (sqd) {
+ mutex_unlock(&sqd->lock);
+ io_put_sq_data(sqd);
+ }
+
+ return ret;
+}
+
static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
void __user *arg, unsigned nr_args)
__releases(ctx->uring_lock)
@@ -4509,6 +4574,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
break;
ret = io_register_file_alloc_range(ctx, arg);
break;
+ case IORING_REGISTER_IOWQ_FIXED_WORKERS:
+ ret = -EINVAL;
+ if (!arg || nr_args != 2)
+ break;
+ ret = io_register_iowq_fixed_workers(ctx, arg, nr_args);
+ break;
default:
ret = -EINVAL;
break;
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [PATCH 07/11] io_uring: add new api to register fixed workers
2023-06-09 12:20 ` [PATCH 07/11] io_uring: add new api to register fixed workers Hao Xu
@ 2023-06-09 13:07 ` Ammar Faizi
2023-06-12 13:46 ` Hao Xu
2023-06-09 13:54 ` Ammar Faizi
2023-07-05 13:10 ` Pavel Begunkov
2 siblings, 1 reply; 27+ messages in thread
From: Ammar Faizi @ 2023-06-09 13:07 UTC (permalink / raw)
To: Hao Xu
Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li,
Linux Fsdevel Mailing List, io-uring Mailing List
On Fri, Jun 09, 2023 at 08:20:27PM +0800, Hao Xu wrote:
> + rcu_read_lock();
> +
> + for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> + unsigned int nr = count[i].nr_workers;
> +
> + acct = &wq->acct[i];
> + acct->fixed_nr = nr;
> + acct->fixed_workers = kcalloc(nr, sizeof(struct io_worker *),
> + GFP_KERNEL);
> + if (!acct->fixed_workers) {
> + ret = -ENOMEM;
> + break;
> + }
> +
> + for (j = 0; j < nr; j++) {
> + struct io_worker *worker =
> + io_wq_create_worker(wq, acct, true);
> + if (IS_ERR(worker)) {
> + ret = PTR_ERR(worker);
> + break;
> + }
> + acct->fixed_workers[j] = worker;
> + }
> + if (j < nr)
> + break;
> + }
> + rcu_read_unlock();
This looks wrong. kcalloc() with GFP_KERNEL may sleep. Note that you're
not allowed to sleep inside the RCU read lock critical section.
Using GFP_KERNEL implies GFP_RECLAIM, which means that direct reclaim
may be triggered under memory pressure; the calling context must be
allowed to sleep.
--
Ammar Faizi
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [PATCH 07/11] io_uring: add new api to register fixed workers
2023-06-09 13:07 ` Ammar Faizi
@ 2023-06-12 13:46 ` Hao Xu
0 siblings, 0 replies; 27+ messages in thread
From: Hao Xu @ 2023-06-12 13:46 UTC (permalink / raw)
To: Ammar Faizi
Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li,
Linux Fsdevel Mailing List, io-uring Mailing List
Hi Ammar,
On 6/9/23 21:07, Ammar Faizi wrote:
> On Fri, Jun 09, 2023 at 08:20:27PM +0800, Hao Xu wrote:
>> + rcu_read_lock();
>> +
>> + for (i = 0; i < IO_WQ_ACCT_NR; i++) {
>> + unsigned int nr = count[i].nr_workers;
>> +
>> + acct = &wq->acct[i];
>> + acct->fixed_nr = nr;
>> + acct->fixed_workers = kcalloc(nr, sizeof(struct io_worker *),
>> + GFP_KERNEL);
>> + if (!acct->fixed_workers) {
>> + ret = -ENOMEM;
>> + break;
>> + }
>> +
>> + for (j = 0; j < nr; j++) {
>> + struct io_worker *worker =
>> + io_wq_create_worker(wq, acct, true);
>> + if (IS_ERR(worker)) {
>> + ret = PTR_ERR(worker);
>> + break;
>> + }
>> + acct->fixed_workers[j] = worker;
>> + }
>> + if (j < nr)
>> + break;
>> + }
>> + rcu_read_unlock();
>
> This looks wrong. kcalloc() with GFP_KERNEL may sleep. Note that you're
> not allowed to sleep inside the RCU read lock critical section.
>
> Using GFP_KERNEL implies GFP_RECLAIM, which means that direct reclaim
> may be triggered under memory pressure; the calling context must be
> allowed to sleep.
>
I think you are right, I'll fix it in v2.
Hi Jens, ask a question about this: I saw same rcu_read_lock() in
io_wq_max_workers(), but what is it really protect?
Regards,
Hao
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [PATCH 07/11] io_uring: add new api to register fixed workers
2023-06-09 12:20 ` [PATCH 07/11] io_uring: add new api to register fixed workers Hao Xu
2023-06-09 13:07 ` Ammar Faizi
@ 2023-06-09 13:54 ` Ammar Faizi
2023-06-12 13:47 ` Hao Xu
2023-07-05 13:10 ` Pavel Begunkov
2 siblings, 1 reply; 27+ messages in thread
From: Ammar Faizi @ 2023-06-09 13:54 UTC (permalink / raw)
To: Hao Xu
Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li,
Linux Fsdevel Mailing List, io-uring Mailing List
On Fri, Jun 09, 2023 at 08:20:27PM +0800, Hao Xu wrote:
> +static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
> + void __user *arg, int nr_args)
> + __must_hold(&ctx->uring_lock)
> +{
> + struct io_uring_task *tctx = NULL;
> + struct io_sq_data *sqd = NULL;
> + struct io_uring_fixed_worker_arg *res;
> + size_t size;
> + int i, ret;
> + bool zero = true;
> +
> + size = array_size(nr_args, sizeof(*res));
> + if (size == SIZE_MAX)
> + return -EOVERFLOW;
> +
> + res = memdup_user(arg, size);
> + if (IS_ERR(res))
> + return PTR_ERR(res);
> +
> + for (i = 0; i < nr_args; i++) {
> + if (res[i].nr_workers) {
> + zero = false;
> + break;
> + }
> + }
> +
> + if (zero)
> + return 0;
You have a memory leak bug here. The memdup_user() needs clean up.
kfree(res);
--
Ammar Faizi
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [PATCH 07/11] io_uring: add new api to register fixed workers
2023-06-09 13:54 ` Ammar Faizi
@ 2023-06-12 13:47 ` Hao Xu
0 siblings, 0 replies; 27+ messages in thread
From: Hao Xu @ 2023-06-12 13:47 UTC (permalink / raw)
To: Ammar Faizi
Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li,
Linux Fsdevel Mailing List, io-uring Mailing List
On 6/9/23 21:54, Ammar Faizi wrote:
> On Fri, Jun 09, 2023 at 08:20:27PM +0800, Hao Xu wrote:
>> +static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
>> + void __user *arg, int nr_args)
>> + __must_hold(&ctx->uring_lock)
>> +{
>> + struct io_uring_task *tctx = NULL;
>> + struct io_sq_data *sqd = NULL;
>> + struct io_uring_fixed_worker_arg *res;
>> + size_t size;
>> + int i, ret;
>> + bool zero = true;
>> +
>> + size = array_size(nr_args, sizeof(*res));
>> + if (size == SIZE_MAX)
>> + return -EOVERFLOW;
>> +
>> + res = memdup_user(arg, size);
>> + if (IS_ERR(res))
>> + return PTR_ERR(res);
>> +
>> + for (i = 0; i < nr_args; i++) {
>> + if (res[i].nr_workers) {
>> + zero = false;
>> + break;
>> + }
>> + }
>> +
>> + if (zero)
>> + return 0;
>
> You have a memory leak bug here. The memdup_user() needs clean up.
> kfree(res);
>
True, I'll fix it in v2, thanks.
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [PATCH 07/11] io_uring: add new api to register fixed workers
2023-06-09 12:20 ` [PATCH 07/11] io_uring: add new api to register fixed workers Hao Xu
2023-06-09 13:07 ` Ammar Faizi
2023-06-09 13:54 ` Ammar Faizi
@ 2023-07-05 13:10 ` Pavel Begunkov
2 siblings, 0 replies; 27+ messages in thread
From: Pavel Begunkov @ 2023-07-05 13:10 UTC (permalink / raw)
To: Hao Xu, io-uring; +Cc: Jens Axboe, Wanpeng Li, linux-fsdevel
On 6/9/23 13:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> Add a new api to register fixed workers. The api is designed to register
> fixed workers for the current task. For simplicity, it doesn't allow
> worker number update. We have a separate unregister api to uninstall all
> the fixed workers. And then we can register different number of fixed
> workers again.
>
> Signed-off-by: Hao Xu <[email protected]>
> ---
> include/uapi/linux/io_uring.h | 9 ++++
> io_uring/io-wq.c | 85 +++++++++++++++++++++++++++++++++++
> io_uring/io-wq.h | 1 +
> io_uring/io_uring.c | 71 +++++++++++++++++++++++++++++
> 4 files changed, 166 insertions(+)
>
> diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
> index f222d263bc55..6dc43be5009d 100644
> --- a/include/uapi/linux/io_uring.h
> +++ b/include/uapi/linux/io_uring.h
[...]
> diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
> index c99a7a0c3f21..bb8342b4a2c6 100644
> --- a/io_uring/io_uring.c
> +++ b/io_uring/io_uring.c
> @@ -4351,6 +4351,71 @@ static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
> return ret;
> }
>
> +/*
> + * note: this function sets fixed workers for a single task, so every
> + * task which wants to set the fixed workers has to call this function
> + */
> +static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
> + void __user *arg, int nr_args)
> + __must_hold(&ctx->uring_lock)
> +{
> + struct io_uring_task *tctx = NULL;
> + struct io_sq_data *sqd = NULL;
> + struct io_uring_fixed_worker_arg *res;
> + size_t size;
> + int i, ret;
> + bool zero = true;
> +
> + size = array_size(nr_args, sizeof(*res));
> + if (size == SIZE_MAX)
> + return -EOVERFLOW;
I don't think the number of accounting classes is going to
change, just move nr_args check from below here and have
on-stack array of size 2.
struct io_uring_fixed_worker_arg res[IO_WQ_ACCT_NR];
if (nr_args != IO_WQ_ACCT_NR)
return -EINVAL;
...
> +
> + res = memdup_user(arg, size);
> + if (IS_ERR(res))
> + return PTR_ERR(res);
> +
> + for (i = 0; i < nr_args; i++) {
> + if (res[i].nr_workers) {
> + zero = false;
> + break;
> + }
> + }
> +
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 27+ messages in thread
* [PATCH 08/11] io_uring: add function to unregister fixed workers
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (6 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 07/11] io_uring: add new api to register fixed workers Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-07-05 13:13 ` Pavel Begunkov
2023-06-09 12:20 ` [PATCH 09/11] io-wq: add strutures to allow to wait fixed workers exit Hao Xu
` (4 subsequent siblings)
12 siblings, 1 reply; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
Add a new register api to unregister fixed workers.
Signed-off-by: Hao Xu <[email protected]>
---
include/uapi/linux/io_uring.h | 3 +++
io_uring/io-wq.c | 50 ++++++++++++++++++++++++++++++++++-
io_uring/io-wq.h | 1 +
io_uring/io_uring.c | 45 +++++++++++++++++++++++++++++++
4 files changed, 98 insertions(+), 1 deletion(-)
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 6dc43be5009d..b0a6e3106b42 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -538,6 +538,9 @@ enum {
/* set/get number of fixed workers */
IORING_REGISTER_IOWQ_FIXED_WORKERS = 26,
+ /* destroy fixed workers */
+ IORING_UNREGISTER_IOWQ_FIXED_WORKERS = 27,
+
/* this goes last */
IORING_REGISTER_LAST,
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 28f13c1c38f4..f39e6b931d17 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -1386,7 +1386,7 @@ static void io_wq_clean_fixed_workers(struct io_wq *wq)
if (!workers[j])
continue;
workers[j]->flags |= IO_WORKER_F_EXIT;
- wake_up_process(worker->task);
+ wake_up_process(workers[j]->task);
}
kfree(workers);
}
@@ -1456,6 +1456,54 @@ int io_wq_fixed_workers(struct io_wq *wq, struct io_uring_fixed_worker_arg *coun
return ret;
}
+/*
+ * destroy fixed workers.
+ */
+int io_wq_destroy_fixed_workers(struct io_wq *wq)
+{
+ int i, j;
+
+ raw_spin_lock(&wq->lock);
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ if (wq->acct[i].fixed_nr)
+ break;
+ }
+ raw_spin_unlock(&wq->lock);
+ if (i == IO_WQ_ACCT_NR)
+ return -EFAULT;
+
+ BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
+ BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
+ BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
+
+ rcu_read_lock();
+ raw_spin_lock(&wq->lock);
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ struct io_wq_acct *acct = &wq->acct[i];
+ struct io_worker **workers = acct->fixed_workers;
+ unsigned int nr = acct->fixed_nr;
+
+ if (!nr)
+ continue;
+
+ for (j = 0; j < nr; j++) {
+ struct io_worker *worker = workers[j];
+
+ BUG_ON(!worker);
+ BUG_ON(!worker->task);
+
+ workers[j]->flags |= IO_WORKER_F_EXIT;
+ wake_up_process(worker->task);
+ }
+ // wait for all workers exit
+ kfree(workers);
+ }
+ raw_spin_unlock(&wq->lock);
+ rcu_read_unlock();
+
+ return 0;
+}
+
static __init int io_wq_init(void)
{
int ret;
diff --git a/io_uring/io-wq.h b/io_uring/io-wq.h
index 88a1ee9fde24..15e93af36511 100644
--- a/io_uring/io-wq.h
+++ b/io_uring/io-wq.h
@@ -53,6 +53,7 @@ void io_wq_hash_work(struct io_wq_work *work, void *val);
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask);
int io_wq_max_workers(struct io_wq *wq, int *new_count);
int io_wq_fixed_workers(struct io_wq *wq, struct io_uring_fixed_worker_arg *count);
+int io_wq_destroy_fixed_workers(struct io_wq *wq);
static inline bool io_wq_is_hashed(struct io_wq_work *work)
{
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index bb8342b4a2c6..b37224cc1d05 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -4416,6 +4416,45 @@ static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
return ret;
}
+static __cold int io_unregister_iowq_fixed_workers(struct io_ring_ctx *ctx)
+ __must_hold(&ctx->uring_lock)
+{
+ struct io_uring_task *tctx = NULL;
+ struct io_sq_data *sqd = NULL;
+ int ret;
+
+ if (ctx->flags & IORING_SETUP_SQPOLL) {
+ sqd = ctx->sq_data;
+ if (sqd) {
+ /*
+ * Observe the correct sqd->lock -> ctx->uring_lock
+ * ordering. Fine to drop uring_lock here, we hold
+ * a ref to the ctx.
+ */
+ refcount_inc(&sqd->refs);
+ mutex_unlock(&ctx->uring_lock);
+ mutex_lock(&sqd->lock);
+ mutex_lock(&ctx->uring_lock);
+ if (sqd->thread)
+ tctx = sqd->thread->io_uring;
+ }
+ } else {
+ tctx = current->io_uring;
+ }
+
+ if (tctx && tctx->io_wq)
+ ret = io_wq_destroy_fixed_workers(tctx->io_wq);
+ else
+ ret = -EFAULT;
+
+ if (sqd) {
+ mutex_unlock(&sqd->lock);
+ io_put_sq_data(sqd);
+ }
+
+ return ret;
+}
+
static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
void __user *arg, unsigned nr_args)
__releases(ctx->uring_lock)
@@ -4580,6 +4619,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
break;
ret = io_register_iowq_fixed_workers(ctx, arg, nr_args);
break;
+ case IORING_UNREGISTER_IOWQ_FIXED_WORKERS:
+ ret = -EINVAL;
+ if (arg || nr_args)
+ break;
+ ret = io_unregister_iowq_fixed_workers(ctx);
+ break;
default:
ret = -EINVAL;
break;
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [PATCH 08/11] io_uring: add function to unregister fixed workers
2023-06-09 12:20 ` [PATCH 08/11] io_uring: add function to unregister " Hao Xu
@ 2023-07-05 13:13 ` Pavel Begunkov
0 siblings, 0 replies; 27+ messages in thread
From: Pavel Begunkov @ 2023-07-05 13:13 UTC (permalink / raw)
To: Hao Xu, io-uring; +Cc: Jens Axboe, Wanpeng Li, linux-fsdevel
On 6/9/23 13:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> Add a new register api to unregister fixed workers.
>
> Signed-off-by: Hao Xu <[email protected]>
> ---
> include/uapi/linux/io_uring.h | 3 +++
> io_uring/io-wq.c | 50 ++++++++++++++++++++++++++++++++++-
> io_uring/io-wq.h | 1 +
> io_uring/io_uring.c | 45 +++++++++++++++++++++++++++++++
> 4 files changed, 98 insertions(+), 1 deletion(-)
>
> diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
> index 6dc43be5009d..b0a6e3106b42 100644
> --- a/include/uapi/linux/io_uring.h
> +++ b/include/uapi/linux/io_uring.h
> @@ -538,6 +538,9 @@ enum {
> /* set/get number of fixed workers */
> IORING_REGISTER_IOWQ_FIXED_WORKERS = 26,
>
> + /* destroy fixed workers */
> + IORING_UNREGISTER_IOWQ_FIXED_WORKERS = 27,
Do we need a new code? I think it's cleaner if we use
IORING_REGISTER_IOWQ_FIXED_WORKERS and do sth like
struct io_uring_fixed_worker_arg arg;
if (arg.nr_workers)
do_unregister_fixed_workers();
...
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 27+ messages in thread
* [PATCH 09/11] io-wq: add strutures to allow to wait fixed workers exit
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (7 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 08/11] io_uring: add function to unregister " Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-06-09 12:20 ` [PATCH 10/11] io-wq: distinguish fixed worker by its name Hao Xu
` (3 subsequent siblings)
12 siblings, 0 replies; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
When unregister fixed workers, there should be a way to allow us to wait
all the fixed workers exit.
Signed-off-by: Hao Xu <[email protected]>
---
io_uring/io-wq.c | 72 ++++++++++++++++++++++++++++++++++--------------
1 file changed, 52 insertions(+), 20 deletions(-)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index f39e6b931d17..61cf6da2c72f 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -108,6 +108,10 @@ struct io_wq {
atomic_t worker_refs;
struct completion worker_done;
+ atomic_t fixed_worker_refs;
+ struct completion fixed_worker_done;
+ bool fixed_comp_init;
+
struct hlist_node cpuhp_node;
struct task_struct *task;
@@ -172,10 +176,25 @@ static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND);
}
-static void io_worker_ref_put(struct io_wq *wq)
+static void io_worker_ref_get(struct io_wq *wq, bool fixed)
+{
+ atomic_inc(&wq->worker_refs);
+ if (fixed)
+ atomic_inc(&wq->fixed_worker_refs);
+}
+
+static void io_worker_ref_put(struct io_wq *wq, bool fixed)
{
if (atomic_dec_and_test(&wq->worker_refs))
complete(&wq->worker_done);
+
+ if (fixed && atomic_dec_and_test(&wq->fixed_worker_refs))
+ complete(&wq->fixed_worker_done);
+}
+
+static bool is_fixed_worker(struct io_worker *worker)
+{
+ return worker->flags & IO_WORKER_F_FIXED;
}
static void io_worker_cancel_cb(struct io_worker *worker)
@@ -187,7 +206,7 @@ static void io_worker_cancel_cb(struct io_worker *worker)
raw_spin_lock(&wq->lock);
acct->nr_workers--;
raw_spin_unlock(&wq->lock);
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, is_fixed_worker(worker));
clear_bit_unlock(0, &worker->create_state);
io_worker_release(worker);
}
@@ -205,6 +224,7 @@ static bool io_task_worker_match(struct callback_head *cb, void *data)
static void io_worker_exit(struct io_worker *worker)
{
struct io_wq *wq = worker->wq;
+ bool fixed = is_fixed_worker(worker);
while (1) {
struct callback_head *cb = task_work_cancel_match(wq->task,
@@ -230,7 +250,7 @@ static void io_worker_exit(struct io_worker *worker)
preempt_enable();
kfree_rcu(worker, rcu);
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, fixed);
do_exit(0);
}
@@ -302,7 +322,7 @@ static struct io_worker *io_wq_create_worker(struct io_wq *wq,
acct->nr_workers++;
raw_spin_unlock(&wq->lock);
atomic_inc(&acct->nr_running);
- atomic_inc(&wq->worker_refs);
+ io_worker_ref_get(wq, fixed);
return create_io_worker(wq, acct->index, fixed);
}
@@ -313,11 +333,6 @@ static void io_wq_inc_running(struct io_worker *worker)
atomic_inc(&acct->nr_running);
}
-static bool is_fixed_worker(struct io_worker *worker)
-{
- return worker->flags & IO_WORKER_F_FIXED;
-}
-
static void create_worker_cb(struct callback_head *cb)
{
struct io_worker *worker;
@@ -325,8 +340,10 @@ static void create_worker_cb(struct callback_head *cb)
struct io_wq_acct *acct;
bool do_create = false;
+ bool fixed;
worker = container_of(cb, struct io_worker, create_work);
+ fixed = is_fixed_worker(worker);
wq = worker->wq;
acct = &wq->acct[worker->create_index];
raw_spin_lock(&wq->lock);
@@ -337,10 +354,10 @@ static void create_worker_cb(struct callback_head *cb)
}
raw_spin_unlock(&wq->lock);
if (do_create) {
- create_io_worker(wq, worker->create_index, is_fixed_worker(worker));
+ create_io_worker(wq, worker->create_index, fixed);
} else {
atomic_dec(&acct->nr_running);
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, fixed);
}
clear_bit_unlock(0, &worker->create_state);
io_worker_release(worker);
@@ -351,6 +368,7 @@ static bool io_queue_worker_create(struct io_worker *worker,
task_work_func_t func)
{
struct io_wq *wq = worker->wq;
+ bool fixed = is_fixed_worker(worker);
/* raced with exit, just ignore create call */
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
@@ -367,7 +385,7 @@ static bool io_queue_worker_create(struct io_worker *worker,
test_and_set_bit_lock(0, &worker->create_state))
goto fail_release;
- atomic_inc(&wq->worker_refs);
+ io_worker_ref_get(wq, fixed);
init_task_work(&worker->create_work, func);
worker->create_index = acct->index;
if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
@@ -379,16 +397,16 @@ static bool io_queue_worker_create(struct io_worker *worker,
*/
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
io_wq_cancel_tw_create(wq);
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, fixed);
return true;
}
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, fixed);
clear_bit_unlock(0, &worker->create_state);
fail_release:
io_worker_release(worker);
fail:
atomic_dec(&acct->nr_running);
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, fixed);
return false;
}
@@ -408,7 +426,7 @@ static void io_wq_dec_running(struct io_worker *worker)
return;
atomic_inc(&acct->nr_running);
- atomic_inc(&wq->worker_refs);
+ io_worker_ref_get(wq, false);
io_queue_worker_create(worker, acct, create_worker_cb);
}
@@ -790,7 +808,7 @@ static void create_worker_cont(struct callback_head *cb)
} else {
raw_spin_unlock(&wq->lock);
}
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, is_fixed_worker(worker));
kfree(worker);
return;
}
@@ -824,7 +842,7 @@ static struct io_worker *create_io_worker(struct io_wq *wq, int index, bool fixe
raw_spin_lock(&wq->lock);
acct->nr_workers--;
raw_spin_unlock(&wq->lock);
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, fixed);
return tsk ? (struct io_worker *)tsk : ERR_PTR(-ENOMEM);
}
@@ -1243,7 +1261,7 @@ static void io_wq_exit_workers(struct io_wq *wq)
rcu_read_lock();
io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
rcu_read_unlock();
- io_worker_ref_put(wq);
+ io_worker_ref_put(wq, false);
wait_for_completion(&wq->worker_done);
spin_lock_irq(&wq->hash->wait.lock);
@@ -1390,6 +1408,7 @@ static void io_wq_clean_fixed_workers(struct io_wq *wq)
}
kfree(workers);
}
+ wait_for_completion(&wq->fixed_worker_done);
}
/*
@@ -1421,6 +1440,13 @@ int io_wq_fixed_workers(struct io_wq *wq, struct io_uring_fixed_worker_arg *coun
rcu_read_lock();
+ atomic_set(&wq->fixed_worker_refs, 1);
+ if (wq->fixed_comp_init) {
+ reinit_completion(&wq->fixed_worker_done);
+ } else {
+ init_completion(&wq->fixed_worker_done);
+ wq->fixed_comp_init = true;
+ }
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
unsigned int nr = count[i].nr_workers;
@@ -1495,12 +1521,18 @@ int io_wq_destroy_fixed_workers(struct io_wq *wq)
workers[j]->flags |= IO_WORKER_F_EXIT;
wake_up_process(worker->task);
}
- // wait for all workers exit
kfree(workers);
}
raw_spin_unlock(&wq->lock);
rcu_read_unlock();
+ // decrement the init reference
+ if (atomic_dec_and_test(&wq->fixed_worker_refs))
+ complete(&wq->fixed_worker_done);
+
+ wait_for_completion(&wq->fixed_worker_done);
+ wq->fixed_comp_init = false;
+
return 0;
}
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [PATCH 10/11] io-wq: distinguish fixed worker by its name
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (8 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 09/11] io-wq: add strutures to allow to wait fixed workers exit Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-07-05 13:15 ` Pavel Begunkov
2023-06-09 12:20 ` [PATCH 11/11] io_uring: add IORING_SETUP_FIXED_WORKER_ONLY and its friend Hao Xu
` (2 subsequent siblings)
12 siblings, 1 reply; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
Distinguish fixed workers and normal workers by their names.
Signed-off-by: Hao Xu <[email protected]>
---
io_uring/io-wq.c | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 61cf6da2c72f..7a9e5fa19b81 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -634,10 +634,12 @@ static int io_wq_worker(void *data)
struct io_wq *wq = worker->wq;
bool exit_mask = false, last_timeout = false;
char buf[TASK_COMM_LEN];
+ bool fixed = is_fixed_worker(worker);
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
- snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
+ snprintf(buf, sizeof(buf), fixed ? "iou-fixed-%d" : "iou-wrk-%d",
+ wq->task->pid);
set_task_comm(current, buf);
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
@@ -656,7 +658,7 @@ static int io_wq_worker(void *data)
* fixed worker, they can be manually reset to cpu other than
* the cpuset indicated by io_wq_worker_affinity()
*/
- if (!is_fixed_worker(worker) && last_timeout &&
+ if (!fixed && last_timeout &&
(exit_mask || acct->nr_workers > 1)) {
acct->nr_workers--;
raw_spin_unlock(&wq->lock);
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [PATCH 10/11] io-wq: distinguish fixed worker by its name
2023-06-09 12:20 ` [PATCH 10/11] io-wq: distinguish fixed worker by its name Hao Xu
@ 2023-07-05 13:15 ` Pavel Begunkov
0 siblings, 0 replies; 27+ messages in thread
From: Pavel Begunkov @ 2023-07-05 13:15 UTC (permalink / raw)
To: Hao Xu, io-uring; +Cc: Jens Axboe, Wanpeng Li, linux-fsdevel
On 6/9/23 13:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> Distinguish fixed workers and normal workers by their names.
>
> Signed-off-by: Hao Xu <[email protected]>
> ---
> io_uring/io-wq.c | 6 ++++--
> 1 file changed, 4 insertions(+), 2 deletions(-)
>
> diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
> index 61cf6da2c72f..7a9e5fa19b81 100644
> --- a/io_uring/io-wq.c
> +++ b/io_uring/io-wq.c
> @@ -634,10 +634,12 @@ static int io_wq_worker(void *data)
> struct io_wq *wq = worker->wq;
> bool exit_mask = false, last_timeout = false;
> char buf[TASK_COMM_LEN];
> + bool fixed = is_fixed_worker(worker);
>
> worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
>
> - snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
> + snprintf(buf, sizeof(buf), fixed ? "iou-fixed-%d" : "iou-wrk-%d",
> + wq->task->pid);
Minor nit: iou-wrk-fixed should be better, it still tells that it's
a worker and I think it should be more familiar to users.
> set_task_comm(current, buf);
>
> while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
> @@ -656,7 +658,7 @@ static int io_wq_worker(void *data)
> * fixed worker, they can be manually reset to cpu other than
> * the cpuset indicated by io_wq_worker_affinity()
> */
> - if (!is_fixed_worker(worker) && last_timeout &&
> + if (!fixed && last_timeout &&
> (exit_mask || acct->nr_workers > 1)) {
> acct->nr_workers--;
> raw_spin_unlock(&wq->lock);
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 27+ messages in thread
* [PATCH 11/11] io_uring: add IORING_SETUP_FIXED_WORKER_ONLY and its friend
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (9 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 10/11] io-wq: distinguish fixed worker by its name Hao Xu
@ 2023-06-09 12:20 ` Hao Xu
2023-07-05 13:17 ` Pavel Begunkov
2023-06-20 12:35 ` [RFC PATCH 00/11] fixed worker Hao Xu
2023-06-28 9:19 ` Hao Xu
12 siblings, 1 reply; 27+ messages in thread
From: Hao Xu @ 2023-06-09 12:20 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li, linux-fsdevel
From: Hao Xu <[email protected]>
Add a new setup flag to indicate that the uring instance only use fixed
workers as async offload threads. Add a work flag and its code logic as
well.
Signed-off-by: Hao Xu <[email protected]>
---
include/uapi/linux/io_uring.h | 10 +++++++++-
io_uring/io-wq.c | 18 +++++++++++++-----
io_uring/io-wq.h | 1 +
io_uring/io_uring.c | 24 +++++++++++++++++++-----
4 files changed, 42 insertions(+), 11 deletions(-)
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index b0a6e3106b42..900fedaa5692 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -185,6 +185,11 @@ enum {
*/
#define IORING_SETUP_REGISTERED_FD_ONLY (1U << 15)
+/*
+ * this ring instance only use fixed worker for async offload.
+ */
+#define IORING_SETUP_FIXED_WORKER_ONLY (1U << 16)
+
enum io_uring_op {
IORING_OP_NOP,
IORING_OP_READV,
@@ -721,9 +726,12 @@ struct io_uring_recvmsg_out {
__u32 flags;
};
+#define IORING_FIXED_WORKER_F_ONLY (1U << 0)
+#define IORING_FIXED_WORKER_F_VALID (IORING_FIXED_WORKER_F_ONLY)
+
struct io_uring_fixed_worker_arg {
__u32 nr_workers;
- __u32 resv;
+ __u32 flags;
__u64 resv2[3];
};
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 7a9e5fa19b81..98a16abb2944 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -272,7 +272,7 @@ static inline bool io_acct_run_queue(struct io_wq_acct *acct)
* caller must create one.
*/
static bool io_wq_activate_free_worker(struct io_wq *wq,
- struct io_wq_acct *acct)
+ struct io_wq_acct *acct, bool fixed)
__must_hold(RCU)
{
struct hlist_nulls_node *n;
@@ -286,7 +286,8 @@ static bool io_wq_activate_free_worker(struct io_wq *wq,
hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
if (!io_worker_get(worker))
continue;
- if (io_wq_get_acct(worker) != acct) {
+ if (io_wq_get_acct(worker) != acct ||
+ (fixed && !is_fixed_worker(worker))) {
io_worker_release(worker);
continue;
}
@@ -492,6 +493,9 @@ static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
work = container_of(node, struct io_wq_work, list);
+ if ((work->flags & IO_WQ_WORK_FIXED) && !is_fixed_worker(worker))
+ continue;
+
/* not hashed, can run anytime */
if (!io_wq_is_hashed(work)) {
wq_list_del(&acct->work_list, node, prev);
@@ -946,7 +950,7 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
struct io_wq_acct *acct = io_work_get_acct(wq, work);
struct io_cb_cancel_data match;
unsigned work_flags = work->flags;
- bool do_create;
+ bool do_create, fixed = work_flags & IO_WQ_WORK_FIXED;
/*
* If io-wq is exiting for this task, or if the request has explicitly
@@ -965,11 +969,14 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
raw_spin_lock(&wq->lock);
rcu_read_lock();
- do_create = !io_wq_activate_free_worker(wq, acct);
+ do_create = !io_wq_activate_free_worker(wq, acct, fixed);
rcu_read_unlock();
raw_spin_unlock(&wq->lock);
+ if (fixed)
+ return;
+
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running))) {
bool did_create;
@@ -1155,7 +1162,7 @@ static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
struct io_wq_acct *acct = &wq->acct[i];
if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
- io_wq_activate_free_worker(wq, acct);
+ io_wq_activate_free_worker(wq, acct, false);
}
rcu_read_unlock();
return 1;
@@ -1477,6 +1484,7 @@ int io_wq_fixed_workers(struct io_wq *wq, struct io_uring_fixed_worker_arg *coun
if (ret)
goto err;
+
return 0;
err:
diff --git a/io_uring/io-wq.h b/io_uring/io-wq.h
index 15e93af36511..d81d5f9aa602 100644
--- a/io_uring/io-wq.h
+++ b/io_uring/io-wq.h
@@ -11,6 +11,7 @@ enum {
IO_WQ_WORK_HASHED = 2,
IO_WQ_WORK_UNBOUND = 4,
IO_WQ_WORK_CONCURRENT = 16,
+ IO_WQ_WORK_FIXED = 32,
IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */
};
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b37224cc1d05..bf8232906605 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -479,6 +479,9 @@ void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use)
if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
req->work.flags |= IO_WQ_WORK_CANCEL;
+ if (req->ctx->flags & IORING_SETUP_FIXED_WORKER_ONLY)
+ req->work.flags |= IO_WQ_WORK_FIXED;
+
trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
io_wq_enqueue(tctx->io_wq, &req->work);
if (link)
@@ -1971,7 +1974,12 @@ struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
req = io_put_req_find_next(req);
- return req ? &req->work : NULL;
+ if (req) {
+ req->work.flags |= IO_WQ_WORK_FIXED;
+ return &req->work;
+ }
+
+ return NULL;
}
void io_wq_submit_work(struct io_wq_work *work)
@@ -4364,7 +4372,7 @@ static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
struct io_uring_fixed_worker_arg *res;
size_t size;
int i, ret;
- bool zero = true;
+ bool zero = true, fixed_only = false;
size = array_size(nr_args, sizeof(*res));
if (size == SIZE_MAX)
@@ -4375,15 +4383,20 @@ static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
return PTR_ERR(res);
for (i = 0; i < nr_args; i++) {
- if (res[i].nr_workers) {
+ if (res[i].flags & ~IORING_FIXED_WORKER_F_VALID)
+ return -EINVAL;
+ if (res[i].flags & IORING_FIXED_WORKER_F_ONLY)
+ fixed_only = true;
+ if (res[i].nr_workers)
zero = false;
- break;
- }
}
if (zero)
return 0;
+ if (fixed_only)
+ ctx->flags |= IORING_SETUP_FIXED_WORKER_ONLY;
+
if (ctx->flags & IORING_SETUP_SQPOLL) {
sqd = ctx->sq_data;
if (sqd) {
@@ -4423,6 +4436,7 @@ static __cold int io_unregister_iowq_fixed_workers(struct io_ring_ctx *ctx)
struct io_sq_data *sqd = NULL;
int ret;
+ ctx->flags &= ~IORING_SETUP_FIXED_WORKER_ONLY;
if (ctx->flags & IORING_SETUP_SQPOLL) {
sqd = ctx->sq_data;
if (sqd) {
--
2.25.1
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [RFC PATCH 00/11] fixed worker
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (10 preceding siblings ...)
2023-06-09 12:20 ` [PATCH 11/11] io_uring: add IORING_SETUP_FIXED_WORKER_ONLY and its friend Hao Xu
@ 2023-06-20 12:35 ` Hao Xu
2023-06-28 9:19 ` Hao Xu
12 siblings, 0 replies; 27+ messages in thread
From: Hao Xu @ 2023-06-20 12:35 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li
Hi Jens and all,
On 6/9/23 20:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> The initial feature request by users is here:
> https://github.com/axboe/liburing/issues/296
>
> Fixed worker provide a way for users to control the io-wq threads. A
> fixed worker is worker thread which exists no matter there are works
> to do or not. We provide a new register api to register fixed workers,
> and a register api to unregister them as well. The parameter of the
> register api is the number of fixed workers users want.
>
Here is a liburing test case to show how it works:
https://github.com/axboe/liburing/commit/bc9e862d1317f2466381adb6243be8cc86c3bd27
Regards,
Hao
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [RFC PATCH 00/11] fixed worker
2023-06-09 12:20 [RFC PATCH 00/11] fixed worker Hao Xu
` (11 preceding siblings ...)
2023-06-20 12:35 ` [RFC PATCH 00/11] fixed worker Hao Xu
@ 2023-06-28 9:19 ` Hao Xu
12 siblings, 0 replies; 27+ messages in thread
From: Hao Xu @ 2023-06-28 9:19 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, Wanpeng Li
Gently ping this series
No idea why the mail threading is a mess, it looks fine in fsdevel list.
On 6/9/23 20:20, Hao Xu wrote:
> From: Hao Xu <[email protected]>
>
> The initial feature request by users is here:
> https://github.com/axboe/liburing/issues/296
>
> Fixed worker provide a way for users to control the io-wq threads. A
> fixed worker is worker thread which exists no matter there are works
> to do or not. We provide a new register api to register fixed workers,
> and a register api to unregister them as well. The parameter of the
> register api is the number of fixed workers users want.
>
> For example:
>
> ```c
> io_uring_register_iowq_fixed_workers(&ring, { .nr_workers = 5 })
> do I/O works
> io_uring_unregister_iowq_fixed_workers(&ring)
>
> ```
>
> After registration, there will be 5 fixed workers. User can setup their
> affinity, priority etc. freely, without adding any new register api to
> set up attributions. These workers won't be destroyed until users call
> unregister api.
>
> Note, registering some fixed workers doesn't mean no creating normal
> workers. When there is no free workers, new normal workers can be
> created when works come. So a work may be picked up by fixed workers or
> normal workers.
>
> If users want to offload works only to fixed workers, they can specify
> a flag FIXED_ONLY when registering fixed workers.
>
> ```c
> io_uring_register_iowq_fixed_workers(&ring, { .nr_workers = 5, .flags |=
> FIXED_ONLY })
>
> ```
>
> In above case, no normal workers will be created before calling
> io_uring_register_iowq_fixed_workers().
>
> Note:
> - When registering fixed workers, those fixed workers are per io-wq.
> So if an io_uring instance is shared by multiple tasks, and you want
> all tasks to use fixed workers, all tasks have to call the regitser
> api.
> - if specifying FIXED_ONLY when registering fixed workers, that is per
> io_uring instance. all works in this instance are handled by fixed
> workers.
>
> Therefore, if an io_uring instance is shared by two tasks, and you want
> all requests in this instance to be handled only by fixed workers, you
> have to call the register api in these two tasks and specify FIXED_ONLY
> at least once when calling register api.
>
>
> Hao Xu (11):
> io-wq: fix worker counting after worker received exit signal
> io-wq: add a new worker flag to indicate worker exit
> io-wq: add a new type io-wq worker
> io-wq: add fixed worker members in io_wq_acct
> io-wq: add a new parameter for creating a new fixed worker
> io-wq: return io_worker after successful inline worker creation
> io_uring: add new api to register fixed workers
> io_uring: add function to unregister fixed workers
> io-wq: add strutures to allow to wait fixed workers exit
> io-wq: distinguish fixed worker by its name
> io_uring: add IORING_SETUP_FIXED_WORKER_ONLY and its friend
>
> include/uapi/linux/io_uring.h | 20 +++
> io_uring/io-wq.c | 275 ++++++++++++++++++++++++++++++----
> io_uring/io-wq.h | 3 +
> io_uring/io_uring.c | 132 +++++++++++++++-
> 4 files changed, 397 insertions(+), 33 deletions(-)
>
^ permalink raw reply [flat|nested] 27+ messages in thread