public inbox for [email protected]
 help / color / mirror / Atom feed
* [PATCHSET 0/2] Eliminated need for io thread manager
@ 2021-03-22 18:00 Jens Axboe
  2021-03-22 18:00 ` [PATCH 1/2] kernel: allow fork with TIF_NOTIFY_SIGNAL pending Jens Axboe
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Jens Axboe @ 2021-03-22 18:00 UTC (permalink / raw)
  To: io-uring

Hi,

Currently (5.12+) any ring that gets created will get an io-wq manager
created. The manager is tasked with creating async workers, if they are
needed. Earlier (5.11 and prior), io_uring would create the manager
thread, and the manager thread would create a static worker per NUMA node
and more if needed. Hence 5.12+ is more lean than earlier, but I would
like us to get to the point where no threads are created if they aren't
strictly needed. For workloads that never need async offload, it's
pointless to create one (or more) threads that just sit idle.

With that in mind, here's a patchset that attempts to do that. There
should be no functional changes here - if we do need an async worker,
the first one created will stick around for the lifetime of the ring.
And more are created as needed, using the same logic as before. The only
difference is that a ring will NOT get a thread by default, only when
it actually needs one is it created.

Comments welcome! This passes normal regression testing, but no further
testing has been done yet.

-- 
Jens Axboe



^ permalink raw reply	[flat|nested] 4+ messages in thread

* [PATCH 1/2] kernel: allow fork with TIF_NOTIFY_SIGNAL pending
  2021-03-22 18:00 [PATCHSET 0/2] Eliminated need for io thread manager Jens Axboe
@ 2021-03-22 18:00 ` Jens Axboe
  2021-03-22 18:00 ` [PATCH 2/2] io-wq: eliminate the need for a manager thread Jens Axboe
       [not found] ` <[email protected]>
  2 siblings, 0 replies; 4+ messages in thread
From: Jens Axboe @ 2021-03-22 18:00 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

fork() fails if signal_pending() is true, but there are two conditions
that can lead to that:

1) An actual signal is pending. We want fork to fail for that one, like
   we always have.

2) TIF_NOTIFY_SIGNAL is pending, because the task has pending task_work.
   We don't need to make it fail for that case.

Allow fork() to proceed if just task_work is pending, by changing the
signal_pending() check to task_sigpending().

Signed-off-by: Jens Axboe <[email protected]>
---
 kernel/fork.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/kernel/fork.c b/kernel/fork.c
index 54cc905e5fe0..254e08c65de9 100644
--- a/kernel/fork.c
+++ b/kernel/fork.c
@@ -1941,7 +1941,7 @@ static __latent_entropy struct task_struct *copy_process(
 	recalc_sigpending();
 	spin_unlock_irq(&current->sighand->siglock);
 	retval = -ERESTARTNOINTR;
-	if (signal_pending(current))
+	if (task_sigpending(current))
 		goto fork_out;
 
 	retval = -ENOMEM;
-- 
2.31.0


^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 2/2] io-wq: eliminate the need for a manager thread
  2021-03-22 18:00 [PATCHSET 0/2] Eliminated need for io thread manager Jens Axboe
  2021-03-22 18:00 ` [PATCH 1/2] kernel: allow fork with TIF_NOTIFY_SIGNAL pending Jens Axboe
@ 2021-03-22 18:00 ` Jens Axboe
       [not found] ` <[email protected]>
  2 siblings, 0 replies; 4+ messages in thread
From: Jens Axboe @ 2021-03-22 18:00 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe

io-wq relies on a manager thread to create/fork new workers, as needed.
But there's really no strong need for it anymore. We have the following
cases that fork a new worker:

1) Work queue. This is done from the task itself always, and it's trivial
   to create a worker off that path, if needed.

2) All workers have gone to sleep, and we have more work. This is called
   off the sched out path. For this case, use a task_work items to queue
   a fork-worker operation.

3) Hashed work completion. Don't think we need to do anything off this
   case. If need be, it could just use approach 2 as well.

Part of this change is incrementing the running worker count before the
fork, to avoid cases where we observe we need a worker and then queue
creation of one. Then new work comes in, we fork a new one. That last
queue operation should have waited for the previous worker to come up,
it's quite possible we don't even need it. Hence move the worker running
from before we fork it off to more efficiently handle that case.

Signed-off-by: Jens Axboe <[email protected]>
---
 fs/io-wq.c | 237 +++++++++++++++++++++--------------------------------
 1 file changed, 94 insertions(+), 143 deletions(-)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index d805ca8e3439..ec4f60659268 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -69,6 +69,7 @@ struct io_worker {
 struct io_wqe_acct {
 	unsigned nr_workers;
 	unsigned max_workers;
+	int index;
 	atomic_t nr_running;
 };
 
@@ -109,19 +110,16 @@ struct io_wq {
 	free_work_fn *free_work;
 	io_wq_work_fn *do_work;
 
-	struct task_struct *manager;
-
 	struct io_wq_hash *hash;
 
 	refcount_t refs;
-	struct completion exited;
 
 	atomic_t worker_refs;
 	struct completion worker_done;
 
 	struct hlist_node cpuhp_node;
 
-	pid_t task_pid;
+	struct task_struct *task;
 };
 
 static enum cpuhp_state io_wq_online;
@@ -136,6 +134,7 @@ struct io_cb_cancel_data {
 
 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 				       struct io_cb_cancel_data *match);
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
 
 static bool io_worker_get(struct io_worker *worker)
 {
@@ -207,7 +206,7 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
 
 /*
  * Check head of free list for an available worker. If one isn't available,
- * caller must wake up the wq manager to create one.
+ * caller must create one.
  */
 static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 	__must_hold(RCU)
@@ -231,7 +230,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 
 /*
  * We need a worker. If we find a free one, we're good. If not, and we're
- * below the max number of workers, wake up the manager to create one.
+ * below the max number of workers, create one.
  */
 static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 {
@@ -247,8 +246,10 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 	ret = io_wqe_activate_free_worker(wqe);
 	rcu_read_unlock();
 
-	if (!ret && acct->nr_workers < acct->max_workers)
-		wake_up_process(wqe->wq->manager);
+	if (!ret && acct->nr_workers < acct->max_workers) {
+		atomic_inc(&acct->nr_running);
+		create_io_worker(wqe->wq, wqe, acct->index);
+	}
 }
 
 static void io_wqe_inc_running(struct io_worker *worker)
@@ -258,14 +259,55 @@ static void io_wqe_inc_running(struct io_worker *worker)
 	atomic_inc(&acct->nr_running);
 }
 
+struct create_worker_data {
+	struct callback_head work;
+	struct io_wqe *wqe;
+	int index;
+};
+
+static void create_worker_cb(struct callback_head *cb)
+{
+	struct create_worker_data *cwd;
+	struct io_wq *wq;
+
+	cwd = container_of(cb, struct create_worker_data, work);
+	wq = cwd->wqe->wq;
+	create_io_worker(wq, cwd->wqe, cwd->index);
+	kfree(cwd);
+}
+
+static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
+{
+	struct create_worker_data *cwd;
+
+	cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
+	if (cwd) {
+		struct io_wq *wq = wqe->wq;
+
+		init_task_work(&cwd->work, create_worker_cb);
+		cwd->wqe = wqe;
+		cwd->index = acct->index;
+		if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
+			return;
+
+		kfree(cwd);
+	}
+	atomic_dec(&acct->nr_running);
+}
+
 static void io_wqe_dec_running(struct io_worker *worker)
 	__must_hold(wqe->lock)
 {
 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 	struct io_wqe *wqe = worker->wqe;
 
-	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
-		io_wqe_wake_worker(wqe, acct);
+	if (!(worker->flags & IO_WORKER_F_UP))
+		return;
+
+	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
+		atomic_inc(&acct->nr_running);
+		io_queue_worker_create(wqe, acct);
+	}
 }
 
 /*
@@ -480,9 +522,8 @@ static int io_wqe_worker(void *data)
 	char buf[TASK_COMM_LEN];
 
 	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
-	io_wqe_inc_running(worker);
 
-	sprintf(buf, "iou-wrk-%d", wq->task_pid);
+	sprintf(buf, "iou-wrk-%d", wq->task->pid);
 	set_task_comm(current, buf);
 
 	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
@@ -562,7 +603,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
 	raw_spin_unlock_irq(&worker->wqe->lock);
 }
 
-static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 {
 	struct io_wqe_acct *acct = &wqe->acct[index];
 	struct io_worker *worker;
@@ -572,7 +613,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 
 	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
 	if (!worker)
-		return false;
+		goto fail;
 
 	refcount_set(&worker->ref, 1);
 	worker->nulls_node.pprev = NULL;
@@ -587,7 +628,9 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 		if (atomic_dec_and_test(&wq->worker_refs))
 			complete(&wq->worker_done);
 		kfree(worker);
-		return false;
+fail:
+		atomic_dec(&acct->nr_running);
+		return;
 	}
 
 	tsk->pf_io_worker = worker;
@@ -606,7 +649,6 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 	acct->nr_workers++;
 	raw_spin_unlock_irq(&wqe->lock);
 	wake_up_new_task(tsk);
-	return true;
 }
 
 static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
@@ -654,89 +696,6 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
 	return false;
 }
 
-static void io_wq_check_workers(struct io_wq *wq)
-{
-	int node;
-
-	for_each_node(node) {
-		struct io_wqe *wqe = wq->wqes[node];
-		bool fork_worker[2] = { false, false };
-
-		if (!node_online(node))
-			continue;
-
-		raw_spin_lock_irq(&wqe->lock);
-		if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
-			fork_worker[IO_WQ_ACCT_BOUND] = true;
-		if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
-			fork_worker[IO_WQ_ACCT_UNBOUND] = true;
-		raw_spin_unlock_irq(&wqe->lock);
-		if (fork_worker[IO_WQ_ACCT_BOUND])
-			create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
-		if (fork_worker[IO_WQ_ACCT_UNBOUND])
-			create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
-	}
-}
-
-static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
-{
-	return true;
-}
-
-static void io_wq_cancel_pending(struct io_wq *wq)
-{
-	struct io_cb_cancel_data match = {
-		.fn		= io_wq_work_match_all,
-		.cancel_all	= true,
-	};
-	int node;
-
-	for_each_node(node)
-		io_wqe_cancel_pending_work(wq->wqes[node], &match);
-}
-
-/*
- * Manager thread. Tasked with creating new workers, if we need them.
- */
-static int io_wq_manager(void *data)
-{
-	struct io_wq *wq = data;
-	char buf[TASK_COMM_LEN];
-	int node;
-
-	sprintf(buf, "iou-mgr-%d", wq->task_pid);
-	set_task_comm(current, buf);
-
-	do {
-		set_current_state(TASK_INTERRUPTIBLE);
-		io_wq_check_workers(wq);
-		schedule_timeout(HZ);
-		try_to_freeze();
-		if (fatal_signal_pending(current))
-			set_bit(IO_WQ_BIT_EXIT, &wq->state);
-	} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
-
-	io_wq_check_workers(wq);
-
-	rcu_read_lock();
-	for_each_node(node)
-		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
-	rcu_read_unlock();
-
-	if (atomic_dec_and_test(&wq->worker_refs))
-		complete(&wq->worker_done);
-	wait_for_completion(&wq->worker_done);
-
-	spin_lock_irq(&wq->hash->wait.lock);
-	for_each_node(node)
-		list_del_init(&wq->wqes[node]->wait.entry);
-	spin_unlock_irq(&wq->hash->wait.lock);
-
-	io_wq_cancel_pending(wq);
-	complete(&wq->exited);
-	do_exit(0);
-}
-
 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 {
 	struct io_wq *wq = wqe->wq;
@@ -768,39 +727,13 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
 	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
 }
 
-static int io_wq_fork_manager(struct io_wq *wq)
-{
-	struct task_struct *tsk;
-
-	if (wq->manager)
-		return 0;
-
-	WARN_ON_ONCE(test_bit(IO_WQ_BIT_EXIT, &wq->state));
-
-	init_completion(&wq->worker_done);
-	atomic_set(&wq->worker_refs, 1);
-	tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE);
-	if (!IS_ERR(tsk)) {
-		wq->manager = get_task_struct(tsk);
-		wake_up_new_task(tsk);
-		return 0;
-	}
-
-	if (atomic_dec_and_test(&wq->worker_refs))
-		complete(&wq->worker_done);
-
-	return PTR_ERR(tsk);
-}
-
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 {
 	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 	int work_flags;
 	unsigned long flags;
 
-	/* Can only happen if manager creation fails after exec */
-	if (io_wq_fork_manager(wqe->wq) ||
-	    test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
+	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
 		io_run_cancel(work, wqe);
 		return;
 	}
@@ -955,17 +888,12 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
 			    int sync, void *key)
 {
 	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
-	int ret;
 
 	list_del_init(&wait->entry);
 
 	rcu_read_lock();
-	ret = io_wqe_activate_free_worker(wqe);
+	io_wqe_activate_free_worker(wqe);
 	rcu_read_unlock();
-
-	if (!ret)
-		wake_up_process(wqe->wq->manager);
-
 	return 1;
 }
 
@@ -1006,6 +934,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 			goto err;
 		wq->wqes[node] = wqe;
 		wqe->node = alloc_node;
+		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
+		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
 		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
 		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
 		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
@@ -1020,13 +950,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 		INIT_LIST_HEAD(&wqe->all_list);
 	}
 
-	wq->task_pid = current->pid;
-	init_completion(&wq->exited);
+	wq->task = get_task_struct(current);
 	refcount_set(&wq->refs, 1);
-
-	ret = io_wq_fork_manager(wq);
-	if (!ret)
-		return wq;
+	atomic_set(&wq->worker_refs, 1);
+	init_completion(&wq->worker_done);
+	return wq;
 err:
 	io_wq_put_hash(data->hash);
 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
@@ -1041,12 +969,35 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 
 static void io_wq_destroy_manager(struct io_wq *wq)
 {
-	if (wq->manager) {
-		wake_up_process(wq->manager);
-		wait_for_completion(&wq->exited);
-		put_task_struct(wq->manager);
-		wq->manager = NULL;
+	struct callback_head *cb;
+	int node;
+
+	if (!wq->task)
+		return;
+
+	while ((cb = task_work_cancel(wq->task, create_worker_cb)) != NULL) {
+		struct create_worker_data *cwd;
+
+		cwd = container_of(cb, struct create_worker_data, work);
+		atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
+		kfree(cwd);
+	}
+
+	rcu_read_lock();
+	for_each_node(node) {
+		struct io_wqe *wqe = wq->wqes[node];
+
+		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
+		spin_lock_irq(&wq->hash->wait.lock);
+		list_del_init(&wq->wqes[node]->wait.entry);
+		spin_unlock_irq(&wq->hash->wait.lock);
 	}
+	rcu_read_unlock();
+	if (atomic_dec_and_test(&wq->worker_refs))
+		complete(&wq->worker_done);
+	wait_for_completion(&wq->worker_done);
+	put_task_struct(wq->task);
+	wq->task = NULL;
 }
 
 static void io_wq_destroy(struct io_wq *wq)
-- 
2.31.0


^ permalink raw reply related	[flat|nested] 4+ messages in thread

* Re: [PATCH 2/2] io-wq: eliminate the need for a manager thread
       [not found] ` <[email protected]>
@ 2021-03-23 13:57   ` Jens Axboe
  0 siblings, 0 replies; 4+ messages in thread
From: Jens Axboe @ 2021-03-23 13:57 UTC (permalink / raw)
  To: Hillf Danton; +Cc: io-uring

On 3/23/21 3:15 AM, Hillf Danton wrote:
> On Mon, 22 Mar 2021 12:00:59   Jens Axboe wrote:
>>  
>> @@ -109,19 +110,16 @@ struct io_wq {
>>  	free_work_fn *free_work;
>>  	io_wq_work_fn *do_work;
>>  
>> -	struct task_struct *manager;
>> -
> [...]
>> -static void io_wq_cancel_pending(struct io_wq *wq)
> 
> No more caller on destroying wq?

Should probably keep that for io_wq_destroy(), I'll add it.
Thanks.

>> -{
>> -	struct io_cb_cancel_data match = {
>> -		.fn		= io_wq_work_match_all,
>> -		.cancel_all	= true,
>> -	};
>> -	int node;
>> -
>> -	for_each_node(node)
>> -		io_wqe_cancel_pending_work(wq->wqes[node], &match);
>> -}
> [...]
>>  static void io_wq_destroy_manager(struct io_wq *wq)
> 
> s/io_wq_destroy_manager/__io_wq_destroy/ as manager is gone.

Renamed - also missed another comment on having the manager create
a worker, fixed that one up too.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2021-03-23 13:58 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2021-03-22 18:00 [PATCHSET 0/2] Eliminated need for io thread manager Jens Axboe
2021-03-22 18:00 ` [PATCH 1/2] kernel: allow fork with TIF_NOTIFY_SIGNAL pending Jens Axboe
2021-03-22 18:00 ` [PATCH 2/2] io-wq: eliminate the need for a manager thread Jens Axboe
     [not found] ` <[email protected]>
2021-03-23 13:57   ` Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox