public inbox for [email protected]
 help / color / mirror / Atom feed
From: Breno Leitao <[email protected]>
To: [email protected], [email protected], [email protected]
Cc: [email protected], [email protected]
Subject: [PATCH] io_uring: One wqe per wq
Date: Fri, 10 Mar 2023 12:11:07 -0800	[thread overview]
Message-ID: <[email protected]> (raw)

Right now io_wq allocates one io_wqe per NUMA node.  As io_wq is now
bound to a task, the task basically uses only the NUMA local io_wqe, and
almost never changes NUMA nodes, thus, the other wqes are mostly
unused.

Allocate just one io_wqe embedded into io_wq, and uses all possible cpus
(cpu_possible_mask) in the io_wqe->cpumask.

Signed-off-by: Breno Leitao <[email protected]>
---
 io_uring/io-wq.c | 180 ++++++++++++++++++-----------------------------
 1 file changed, 70 insertions(+), 110 deletions(-)

diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index f81c0a7136a5..44d522c5d36f 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -15,6 +15,7 @@
 #include <linux/cpu.h>
 #include <linux/task_work.h>
 #include <linux/audit.h>
+#include <linux/mmu_context.h>
 #include <uapi/linux/io_uring.h>
 
 #include "io-wq.h"
@@ -96,8 +97,6 @@ struct io_wqe {
 	raw_spinlock_t lock;
 	struct io_wqe_acct acct[IO_WQ_ACCT_NR];
 
-	int node;
-
 	struct hlist_nulls_head free_list;
 	struct list_head all_list;
 
@@ -127,7 +126,7 @@ struct io_wq {
 
 	struct task_struct *task;
 
-	struct io_wqe *wqes[];
+	struct io_wqe wqe;
 };
 
 static enum cpuhp_state io_wq_online;
@@ -754,7 +753,7 @@ static void create_worker_cont(struct callback_head *cb)
 	worker = container_of(cb, struct io_worker, create_work);
 	clear_bit_unlock(0, &worker->create_state);
 	wqe = worker->wqe;
-	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+	tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
 	if (!IS_ERR(tsk)) {
 		io_init_new_worker(wqe, worker, tsk);
 		io_worker_release(worker);
@@ -804,7 +803,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 
 	__set_current_state(TASK_RUNNING);
 
-	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
+	worker = kzalloc(sizeof(*worker), GFP_KERNEL);
 	if (!worker) {
 fail:
 		atomic_dec(&acct->nr_running);
@@ -823,7 +822,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 	if (index == IO_WQ_ACCT_BOUND)
 		worker->flags |= IO_WORKER_F_BOUND;
 
-	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+	tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
 	if (!IS_ERR(tsk)) {
 		io_init_new_worker(wqe, worker, tsk);
 	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
@@ -961,7 +960,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 
 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
 {
-	struct io_wqe *wqe = wq->wqes[numa_node_id()];
+	struct io_wqe *wqe = &wq->wqe;
 
 	io_wqe_enqueue(wqe, work);
 }
@@ -1083,7 +1082,7 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
 		.data		= data,
 		.cancel_all	= cancel_all,
 	};
-	int node;
+	struct io_wqe *wqe = &wq->wqe;
 
 	/*
 	 * First check pending list, if we're lucky we can just remove it
@@ -1098,19 +1097,15 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
 	 * Do both of these while holding the wqe->lock, to ensure that
 	 * we'll find a work item regardless of state.
 	 */
-	for_each_node(node) {
-		struct io_wqe *wqe = wq->wqes[node];
-
-		io_wqe_cancel_pending_work(wqe, &match);
-		if (match.nr_pending && !match.cancel_all)
-			return IO_WQ_CANCEL_OK;
+	io_wqe_cancel_pending_work(wqe, &match);
+	if (match.nr_pending && !match.cancel_all)
+		return IO_WQ_CANCEL_OK;
 
-		raw_spin_lock(&wqe->lock);
-		io_wqe_cancel_running_work(wqe, &match);
-		raw_spin_unlock(&wqe->lock);
-		if (match.nr_running && !match.cancel_all)
-			return IO_WQ_CANCEL_RUNNING;
-	}
+	raw_spin_lock(&wqe->lock);
+	io_wqe_cancel_running_work(wqe, &match);
+	raw_spin_unlock(&wqe->lock);
+	if (match.nr_running && !match.cancel_all)
+		return IO_WQ_CANCEL_RUNNING;
 
 	if (match.nr_running)
 		return IO_WQ_CANCEL_RUNNING;
@@ -1140,15 +1135,16 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
 
 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 {
-	int ret, node, i;
+	int ret, i;
 	struct io_wq *wq;
+	struct io_wqe *wqe;
 
 	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
 		return ERR_PTR(-EINVAL);
 	if (WARN_ON_ONCE(!bounded))
 		return ERR_PTR(-EINVAL);
 
-	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
+	wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
 	if (!wq)
 		return ERR_PTR(-ENOMEM);
 	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
@@ -1159,40 +1155,30 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 	wq->hash = data->hash;
 	wq->free_work = data->free_work;
 	wq->do_work = data->do_work;
+	wqe = &wq->wqe;
 
 	ret = -ENOMEM;
-	for_each_node(node) {
-		struct io_wqe *wqe;
-		int alloc_node = node;
-
-		if (!node_online(alloc_node))
-			alloc_node = NUMA_NO_NODE;
-		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
-		if (!wqe)
-			goto err;
-		wq->wqes[node] = wqe;
-		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
-			goto err;
-		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
-		wqe->node = alloc_node;
-		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
-		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
-					task_rlimit(current, RLIMIT_NPROC);
-		INIT_LIST_HEAD(&wqe->wait.entry);
-		wqe->wait.func = io_wqe_hash_wake;
-		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
-			struct io_wqe_acct *acct = &wqe->acct[i];
-
-			acct->index = i;
-			atomic_set(&acct->nr_running, 0);
-			INIT_WQ_LIST(&acct->work_list);
-			raw_spin_lock_init(&acct->lock);
-		}
-		wqe->wq = wq;
-		raw_spin_lock_init(&wqe->lock);
-		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
-		INIT_LIST_HEAD(&wqe->all_list);
+
+	if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
+		goto err;
+	cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
+	wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
+	wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
+				task_rlimit(current, RLIMIT_NPROC);
+	INIT_LIST_HEAD(&wqe->wait.entry);
+	wqe->wait.func = io_wqe_hash_wake;
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		struct io_wqe_acct *acct = &wqe->acct[i];
+
+		acct->index = i;
+		atomic_set(&acct->nr_running, 0);
+		INIT_WQ_LIST(&acct->work_list);
+		raw_spin_lock_init(&acct->lock);
 	}
+	wqe->wq = wq;
+	raw_spin_lock_init(&wqe->lock);
+	INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
+	INIT_LIST_HEAD(&wqe->all_list);
 
 	wq->task = get_task_struct(data->task);
 	atomic_set(&wq->worker_refs, 1);
@@ -1201,12 +1187,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 err:
 	io_wq_put_hash(data->hash);
 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
-	for_each_node(node) {
-		if (!wq->wqes[node])
-			continue;
-		free_cpumask_var(wq->wqes[node]->cpu_mask);
-		kfree(wq->wqes[node]);
-	}
+
+	free_cpumask_var(wq->wqe.cpu_mask);
 err_wq:
 	kfree(wq);
 	return ERR_PTR(ret);
@@ -1247,48 +1229,36 @@ static void io_wq_cancel_tw_create(struct io_wq *wq)
 
 static void io_wq_exit_workers(struct io_wq *wq)
 {
-	int node;
-
 	if (!wq->task)
 		return;
 
 	io_wq_cancel_tw_create(wq);
 
 	rcu_read_lock();
-	for_each_node(node) {
-		struct io_wqe *wqe = wq->wqes[node];
-
-		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
-	}
+	io_wq_for_each_worker(&wq->wqe, io_wq_worker_wake, NULL);
 	rcu_read_unlock();
 	io_worker_ref_put(wq);
 	wait_for_completion(&wq->worker_done);
 
-	for_each_node(node) {
-		spin_lock_irq(&wq->hash->wait.lock);
-		list_del_init(&wq->wqes[node]->wait.entry);
-		spin_unlock_irq(&wq->hash->wait.lock);
-	}
+	spin_lock_irq(&wq->hash->wait.lock);
+	list_del_init(&wq->wqe.wait.entry);
+	spin_unlock_irq(&wq->hash->wait.lock);
+
 	put_task_struct(wq->task);
 	wq->task = NULL;
 }
 
 static void io_wq_destroy(struct io_wq *wq)
 {
-	int node;
+	struct io_cb_cancel_data match = {
+		.fn		= io_wq_work_match_all,
+		.cancel_all	= true,
+	};
+	struct io_wqe *wqe = &wq->wqe;
 
 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
-
-	for_each_node(node) {
-		struct io_wqe *wqe = wq->wqes[node];
-		struct io_cb_cancel_data match = {
-			.fn		= io_wq_work_match_all,
-			.cancel_all	= true,
-		};
-		io_wqe_cancel_pending_work(wqe, &match);
-		free_cpumask_var(wqe->cpu_mask);
-		kfree(wqe);
-	}
+	io_wqe_cancel_pending_work(wqe, &match);
+	free_cpumask_var(wqe->cpu_mask);
 	io_wq_put_hash(wq->hash);
 	kfree(wq);
 }
@@ -1323,11 +1293,9 @@ static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
 		.cpu = cpu,
 		.online = online
 	};
-	int i;
 
 	rcu_read_lock();
-	for_each_node(i)
-		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
+	io_wq_for_each_worker(&wq->wqe, io_wq_worker_affinity, &od);
 	rcu_read_unlock();
 	return 0;
 }
@@ -1348,18 +1316,15 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
 
 int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
 {
-	int i;
+	struct io_wqe *wqe = &wq->wqe;
 
 	rcu_read_lock();
-	for_each_node(i) {
-		struct io_wqe *wqe = wq->wqes[i];
-
-		if (mask)
-			cpumask_copy(wqe->cpu_mask, mask);
-		else
-			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
-	}
+	if (mask)
+		cpumask_copy(wqe->cpu_mask, mask);
+	else
+		cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
 	rcu_read_unlock();
+
 	return 0;
 }
 
@@ -1369,9 +1334,10 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
  */
 int io_wq_max_workers(struct io_wq *wq, int *new_count)
 {
+	struct io_wqe *wqe = &wq->wqe;
+	struct io_wqe_acct *acct;
 	int prev[IO_WQ_ACCT_NR];
-	bool first_node = true;
-	int i, node;
+	int i;
 
 	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
 	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
@@ -1386,21 +1352,15 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
 		prev[i] = 0;
 
 	rcu_read_lock();
-	for_each_node(node) {
-		struct io_wqe *wqe = wq->wqes[node];
-		struct io_wqe_acct *acct;
 
-		raw_spin_lock(&wqe->lock);
-		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
-			acct = &wqe->acct[i];
-			if (first_node)
-				prev[i] = max_t(int, acct->max_workers, prev[i]);
-			if (new_count[i])
-				acct->max_workers = new_count[i];
-		}
-		raw_spin_unlock(&wqe->lock);
-		first_node = false;
+	raw_spin_lock(&wqe->lock);
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		acct = &wqe->acct[i];
+		prev[i] = max_t(int, acct->max_workers, prev[i]);
+		if (new_count[i])
+			acct->max_workers = new_count[i];
 	}
+	raw_spin_unlock(&wqe->lock);
 	rcu_read_unlock();
 
 	for (i = 0; i < IO_WQ_ACCT_NR; i++)
-- 
2.34.1


             reply	other threads:[~2023-03-10 20:11 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-03-10 20:11 Breno Leitao [this message]
2023-03-10 20:38 ` [PATCH] io_uring: One wqe per wq Jens Axboe
2023-03-11 20:56   ` Pavel Begunkov
2023-03-11 22:13     ` Jens Axboe
2023-03-13  3:56       ` Pavel Begunkov
2023-03-15 20:44 ` Jens Axboe

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox