public inbox for [email protected]
 help / color / mirror / Atom feed
* [RFC v2 0/9] fixed worker
@ 2022-04-20 10:39 Hao Xu
  2022-04-20 10:39 ` [PATCH 1/9] io-wq: add a worker flag for individual exit Hao Xu
                   ` (8 more replies)
  0 siblings, 9 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

This is the second version of fixed worker implementation.
Wrote a nop test program to test it, 3 fixed-workers VS 3 normal workers.
normal workers:
./run_nop_wqe.sh nop_wqe_normal 200000 100 3 1-3
        time spent: 10464397 usecs      IOPS: 1911242
        time spent: 9610976 usecs       IOPS: 2080954
        time spent: 9807361 usecs       IOPS: 2039284

fixed workers:
./run_nop_wqe.sh nop_wqe_fixed 200000 100 3 1-3
        time spent: 17314274 usecs      IOPS: 1155116
        time spent: 17016942 usecs      IOPS: 1175299
        time spent: 17908684 usecs      IOPS: 1116776

About 2x improvement. From perf result, almost no acct->lock contension.
Test program: https://github.com/HowHsu/liburing/tree/fixed_worker
liburing/test/nop_wqe.c

Hao Xu (9):
  io-wq: add a worker flag for individual exit
  io-wq: change argument of create_io_worker() for convienence
  io-wq: add infra data structure for fixed workers
  io-wq: tweak io_get_acct()
  io-wq: fixed worker initialization
  io-wq: fixed worker exit
  io-wq: implement fixed worker logic
  io-wq: batch the handling of fixed worker private works
  io_uring: add register fixed worker interface

 fs/io-wq.c                    | 457 ++++++++++++++++++++++++++++++----
 fs/io-wq.h                    |   8 +
 fs/io_uring.c                 |  71 ++++++
 include/uapi/linux/io_uring.h |  11 +
 4 files changed, 498 insertions(+), 49 deletions(-)

-- 
2.36.0


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 1/9] io-wq: add a worker flag for individual exit
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
  2022-04-20 10:39 ` [PATCH 2/9] io-wq: change argument of create_io_worker() for convienence Hao Xu
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

Add a worker flag to control exit of an individual worker, this is
needed for fixed worker in the next patches but also as a generic
functionality.

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index 32aeb2c581c5..4239aa4e2c8b 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -26,6 +26,7 @@ enum {
 	IO_WORKER_F_RUNNING	= 2,	/* account as running */
 	IO_WORKER_F_FREE	= 4,	/* worker on free list */
 	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
+	IO_WORKER_F_EXIT	= 32,	/* worker is exiting */
 };
 
 enum {
@@ -639,8 +640,12 @@ static int io_wqe_worker(void *data)
 	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 		long ret;
 
+		if (worker->flags & IO_WORKER_F_EXIT)
+			break;
+
 		set_current_state(TASK_INTERRUPTIBLE);
-		while (io_acct_run_queue(acct))
+		while (!(worker->flags & IO_WORKER_F_EXIT) &&
+		       io_acct_run_queue(acct))
 			io_worker_handle_work(worker);
 
 		raw_spin_lock(&wqe->lock);
@@ -656,6 +661,10 @@ static int io_wqe_worker(void *data)
 		raw_spin_unlock(&wqe->lock);
 		if (io_flush_signals())
 			continue;
+		if (worker->flags & IO_WORKER_F_EXIT) {
+			__set_current_state(TASK_RUNNING);
+			break;
+		}
 		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
 		if (signal_pending(current)) {
 			struct ksignal ksig;
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 2/9] io-wq: change argument of create_io_worker() for convienence
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
  2022-04-20 10:39 ` [PATCH 1/9] io-wq: add a worker flag for individual exit Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
  2022-04-20 10:39 ` [PATCH 3/9] io-wq: add infra data structure for fixed workers Hao Xu
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

Change index to acct itself for create_io_worker() for convienence in
the next patches.

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index 4239aa4e2c8b..e4f5575750f4 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -139,7 +139,8 @@ struct io_cb_cancel_data {
 	bool cancel_all;
 };
 
-static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
+static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe,
+			     struct io_wqe_acct *acct);
 static void io_wqe_dec_running(struct io_worker *worker);
 static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
 					struct io_wqe_acct *acct,
@@ -306,7 +307,7 @@ static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 	raw_spin_unlock(&wqe->lock);
 	atomic_inc(&acct->nr_running);
 	atomic_inc(&wqe->wq->worker_refs);
-	return create_io_worker(wqe->wq, wqe, acct->index);
+	return create_io_worker(wqe->wq, wqe, acct);
 }
 
 static void io_wqe_inc_running(struct io_worker *worker)
@@ -335,7 +336,7 @@ static void create_worker_cb(struct callback_head *cb)
 	}
 	raw_spin_unlock(&wqe->lock);
 	if (do_create) {
-		create_io_worker(wq, wqe, worker->create_index);
+		create_io_worker(wq, wqe, acct);
 	} else {
 		atomic_dec(&acct->nr_running);
 		io_worker_ref_put(wq);
@@ -812,9 +813,10 @@ static void io_workqueue_create(struct work_struct *work)
 		kfree(worker);
 }
 
-static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe,
+			     struct io_wqe_acct *acct)
 {
-	struct io_wqe_acct *acct = &wqe->acct[index];
+	int index = acct->index;
 	struct io_worker *worker;
 	struct task_struct *tsk;
 
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 3/9] io-wq: add infra data structure for fixed workers
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
  2022-04-20 10:39 ` [PATCH 1/9] io-wq: add a worker flag for individual exit Hao Xu
  2022-04-20 10:39 ` [PATCH 2/9] io-wq: change argument of create_io_worker() for convienence Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
  2022-04-20 10:39 ` [PATCH 4/9] io-wq: tweak io_get_acct() Hao Xu
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

Add data sttructure and basic initialization for fixed worker.

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c | 98 ++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 87 insertions(+), 11 deletions(-)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index e4f5575750f4..451b8fb389d1 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -26,6 +26,7 @@ enum {
 	IO_WORKER_F_RUNNING	= 2,	/* account as running */
 	IO_WORKER_F_FREE	= 4,	/* worker on free list */
 	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
+	IO_WORKER_F_FIXED	= 16,	/* is a fixed worker */
 	IO_WORKER_F_EXIT	= 32,	/* worker is exiting */
 };
 
@@ -37,6 +38,61 @@ enum {
 	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
 };
 
+struct io_wqe_acct {
+	/*
+	 * union {
+	 *	1) for normal worker
+	 *	struct {
+	 *		unsigned nr_workers;
+	 *		unsigned max_workers;
+	 *		struct io_wq_work_list work_list;
+	 *	};
+	 *	2) for fixed worker
+	 *	struct {
+	 *		unsigned nr_workers; // not meaningful
+	 *		unsigned max_workers; // not meaningful
+	 *		unsigned nr_fixed;
+	 *		unsigned max_works;
+	 *		struct io_worker **fixed_workers;
+	 *	};
+	 *	3) for fixed worker's private acct
+	 *	struct {
+	 *		unsigned nr_works;
+	 *		unsigned max_works;
+	 *		struct io_wq_work_list work_list;
+	 *	};
+	 *};
+	 */
+	union {
+		unsigned nr_workers;
+		unsigned nr_works;
+	};
+	unsigned max_workers;
+	unsigned nr_fixed;
+	unsigned max_works;
+	union {
+		struct io_wq_work_list work_list;
+		struct io_worker **fixed_workers;
+	};
+	/*
+	 * nr_running is not meaningful for fixed worker
+	 * but still keep the same logic for it for the
+	 * convinence for now. So do nr_workers and
+	 * max_workers.
+	 */
+	atomic_t nr_running;
+	/*
+	 * For 1), it protects the work_list, the other two member nr_workers
+	 * and max_workers are protected by wqe->lock.
+	 * For 2), it protects nr_fixed, max_works, fixed_workers
+	 * For 3), it protects nr_works, max_works and work_list.
+	 */
+	raw_spinlock_t lock;
+	int index;
+	unsigned long flags;
+	bool fixed_worker_registered;
+};
+
 /*
  * One for each thread in a wqe pool
  */
@@ -62,6 +118,8 @@ struct io_worker {
 		struct rcu_head rcu;
 		struct work_struct work;
 	};
+	int index;
+	struct io_wqe_acct acct;
 };
 
 #if BITS_PER_LONG == 64
@@ -72,16 +130,6 @@ struct io_worker {
 
 #define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)
 
-struct io_wqe_acct {
-	unsigned nr_workers;
-	unsigned max_workers;
-	int index;
-	atomic_t nr_running;
-	raw_spinlock_t lock;
-	struct io_wq_work_list work_list;
-	unsigned long flags;
-};
-
 enum {
 	IO_WQ_ACCT_BOUND,
 	IO_WQ_ACCT_UNBOUND,
@@ -94,6 +142,7 @@ enum {
 struct io_wqe {
 	raw_spinlock_t lock;
 	struct io_wqe_acct acct[IO_WQ_ACCT_NR];
+	struct io_wqe_acct fixed_acct[IO_WQ_ACCT_NR];
 
 	int node;
 
@@ -1205,6 +1254,31 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 			atomic_set(&acct->nr_running, 0);
 			INIT_WQ_LIST(&acct->work_list);
 			raw_spin_lock_init(&acct->lock);
+
+			acct = &wqe->fixed_acct[i];
+			acct->index = i;
+			INIT_WQ_LIST(&acct->work_list);
+			raw_spin_lock_init(&acct->lock);
+			/*
+			 * nr_running for a fixed worker is meaningless
+			 * for now, init it to 1 to wround around the
+			 * io_wqe_dec_running logic
+			 */
+			atomic_set(&acct->nr_running, 1);
+			/*
+			 * max_workers for a fixed worker is meaningless
+			 * for now, init it so since number of fixed workers
+			 * should be controlled by users.
+			 */
+			acct->max_workers = task_rlimit(current, RLIMIT_NPROC);
+			raw_spin_lock_init(&acct->lock);
+			/*
+			 * For fixed worker, not necessary
+			 * but do it explicitly for clearity
+			 */
+			acct->nr_fixed = 0;
+			acct->max_works = 0;
+			acct->fixed_workers = NULL;
 		}
 		wqe->wq = wq;
 		raw_spin_lock_init(&wqe->lock);
@@ -1287,7 +1361,7 @@ static void io_wq_exit_workers(struct io_wq *wq)
 
 static void io_wq_destroy(struct io_wq *wq)
 {
-	int node;
+	int i, node;
 
 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
 
@@ -1299,6 +1373,8 @@ static void io_wq_destroy(struct io_wq *wq)
 		};
 		io_wqe_cancel_pending_work(wqe, &match);
 		free_cpumask_var(wqe->cpu_mask);
+		for (i = 0; i < IO_WQ_ACCT_NR; i++)
+			kfree(wqe->fixed_acct[i].fixed_workers);
 		kfree(wqe);
 	}
 	io_wq_put_hash(wq->hash);
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 4/9] io-wq: tweak io_get_acct()
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
                   ` (2 preceding siblings ...)
  2022-04-20 10:39 ` [PATCH 3/9] io-wq: add infra data structure for fixed workers Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
  2022-04-20 10:39 ` [PATCH 5/9] io-wq: fixed worker initialization Hao Xu
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

Add an argument for io_get_acct() to indicate fixed or normal worker

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index 451b8fb389d1..f1c9e7936988 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -208,20 +208,24 @@ static void io_worker_release(struct io_worker *worker)
 		complete(&worker->ref_done);
 }
 
-static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
+static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound,
+					      bool fixed)
 {
-	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
+	unsigned index = bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND;
+
+	return fixed ? &wqe->fixed_acct[index] : &wqe->acct[index];
 }
 
 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
 						   struct io_wq_work *work)
 {
-	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
+	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND), false);
 }
 
 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
 {
-	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
+	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND,
+			   worker->flags & IO_WORKER_F_FIXED);
 }
 
 static void io_worker_ref_put(struct io_wq *wq)
@@ -1124,7 +1128,7 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 	int i;
 retry:
 	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
-		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
+		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0, false);
 
 		if (io_acct_cancel_pending_work(wqe, acct, match)) {
 			if (match->cancel_all)
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 5/9] io-wq: fixed worker initialization
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
                   ` (3 preceding siblings ...)
  2022-04-20 10:39 ` [PATCH 4/9] io-wq: tweak io_get_acct() Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
  2022-04-20 10:39 ` [PATCH 6/9] io-wq: fixed worker exit Hao Xu
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

Implementation of the fixed worker initialization.

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c | 24 ++++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index f1c9e7936988..02f9b15a998f 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -774,6 +774,26 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
 	io_wqe_dec_running(worker);
 }
 
+static void io_init_new_fixed_worker(struct io_wqe *wqe,
+				     struct io_worker *worker)
+{
+	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+	struct io_wqe_acct *iw_acct = &worker->acct;
+	unsigned index = acct->index;
+	unsigned *nr_fixed;
+
+	raw_spin_lock(&acct->lock);
+	nr_fixed = &acct->nr_fixed;
+	acct->fixed_workers[*nr_fixed] = worker;
+	worker->index = (*nr_fixed)++;
+	iw_acct->nr_works = 0;
+	iw_acct->max_works = acct->max_works;
+	iw_acct->index = index;
+	INIT_WQ_LIST(&iw_acct->work_list);
+	raw_spin_lock_init(&iw_acct->lock);
+	raw_spin_unlock(&acct->lock);
+}
+
 static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
 			       struct task_struct *tsk)
 {
@@ -787,6 +807,8 @@ static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
 	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
 	worker->flags |= IO_WORKER_F_FREE;
 	raw_spin_unlock(&wqe->lock);
+	if (worker->flags & IO_WORKER_F_FIXED)
+		io_init_new_fixed_worker(wqe, worker);
 	wake_up_new_task(tsk);
 }
 
@@ -893,6 +915,8 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe,
 
 	if (index == IO_WQ_ACCT_BOUND)
 		worker->flags |= IO_WORKER_F_BOUND;
+	if (&wqe->fixed_acct[index] == acct)
+		worker->flags |= IO_WORKER_F_FIXED;
 
 	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
 	if (!IS_ERR(tsk)) {
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 6/9] io-wq: fixed worker exit
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
                   ` (4 preceding siblings ...)
  2022-04-20 10:39 ` [PATCH 5/9] io-wq: fixed worker initialization Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
  2022-04-20 10:39 ` [PATCH 7/9] io-wq: implement fixed worker logic Hao Xu
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

Implement the fixed worker exit

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c | 26 ++++++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index 02f9b15a998f..a43dcb55ff77 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -259,6 +259,29 @@ static bool io_task_worker_match(struct callback_head *cb, void *data)
 	return worker == data;
 }
 
+static void io_fixed_worker_exit(struct io_worker *worker)
+{
+	int *nr_fixed;
+	int index = worker->acct.index;
+	struct io_wqe *wqe = worker->wqe;
+	struct io_wqe_acct *acct = io_get_acct(wqe, index == 0, true);
+	struct io_worker **fixed_workers;
+
+	raw_spin_lock(&acct->lock);
+	fixed_workers = acct->fixed_workers;
+	if (!fixed_workers || worker->index == -1) {
+		raw_spin_unlock(&acct->lock);
+		return;
+	}
+	nr_fixed = &acct->nr_fixed;
+	/* reuse variable index to represent fixed worker index in its array */
+	index = worker->index;
+	fixed_workers[index] = fixed_workers[*nr_fixed - 1];
+	(*nr_fixed)--;
+	fixed_workers[index]->index = index;
+	raw_spin_unlock(&acct->lock);
+}
+
 static void io_worker_exit(struct io_worker *worker)
 {
 	struct io_wqe *wqe = worker->wqe;
@@ -682,6 +705,7 @@ static int io_wqe_worker(void *data)
 	struct io_wqe *wqe = worker->wqe;
 	struct io_wq *wq = wqe->wq;
 	bool last_timeout = false;
+	bool fixed = worker->flags & IO_WORKER_F_FIXED;
 	char buf[TASK_COMM_LEN];
 
 	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
@@ -732,6 +756,8 @@ static int io_wqe_worker(void *data)
 
 	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
 		io_worker_handle_work(worker);
+	if (fixed)
+		io_fixed_worker_exit(worker);
 
 	audit_free(current);
 	io_worker_exit(worker);
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 7/9] io-wq: implement fixed worker logic
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
                   ` (5 preceding siblings ...)
  2022-04-20 10:39 ` [PATCH 6/9] io-wq: fixed worker exit Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
  2022-04-20 10:39 ` [PATCH 8/9] io-wq: batch the handling of fixed worker private works Hao Xu
  2022-04-20 10:40 ` [PATCH 9/9] io_uring: add register fixed worker interface Hao Xu
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

The current implementation of io-wq has big spinlock contension. The
main reason is the single work list model. All producers(who insert
works) and consumers(io-workers) have to grap wqe->lock to move ahead.
Set max_worker to 3 or 4, do a fio read test, we can see 40%~50% lock
contension.
Introduce fixed io-workers which sticks there to handle works and have
their own work list.

previous:

  producer0 ---insert---> work_list ---get---> io-worker0,1,2

now:

                     ---> private work_list0 --get--> fixed-worker0
                    /
  producer0 --insert----> private work_list1 --get--> fixed-worker1
      |             \
      |              ---> private work_list2 --get--> fixed-worker2
      |
      |---insert---> public work_list --get--> (normal)io-worker

Since each fixed-worker has a private work list, the contension will be
limited to a smaller range(the private work list).
Logic of fixed-worker: first handle private works then public ones.
Logic of normal io-worker: only handle public works.
Logic of producer: 1) randomly pick a private work list and check if it
                      is full, insert the work if it's not
                   2) insert the work to the public work list if 1)
                      fails.
The get logic of a private list: fixed-worker grab all the works in
its private work list(like what tctx_task_work() does) rather than one
by one.(this code is in the next patches as a optimization)

To achieve this, we need to add an io_wqe_acct for each fixed-worker
struct, and though this we can leverage the old code as much as
possible, which makes the new design clean and compatible.

Good things of this feature:
  1) bound and unbound work lists now have different spinlocks.
  2) much smaller contension between work producers and consumers.
  3) fixed workers are friendly for users to control: binding cpus,
     reset priority etc.

Wrote a nop test program to test it, 3 fixed-workers VS 3 normal workers.
normal workers:
./run_nop_wqe.sh nop_wqe_normal 200000 100 3 1-3
        time spent: 10464397 usecs      IOPS: 1911242
        time spent: 9610976 usecs       IOPS: 2080954
        time spent: 9807361 usecs       IOPS: 2039284

fixed workers:
./run_nop_wqe.sh nop_wqe_fixed 200000 100 3 1-3
        time spent: 17314274 usecs      IOPS: 1155116
        time spent: 17016942 usecs      IOPS: 1175299
        time spent: 17908684 usecs      IOPS: 1116776

About 2x improvement. From perf result, almost no acct->lock contension.
Test program: https://github.com/HowHsu/liburing/tree/fixed_worker

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c | 145 +++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 119 insertions(+), 26 deletions(-)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index a43dcb55ff77..8fa5bfb298dc 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -624,9 +624,9 @@ static void io_assign_current_work(struct io_worker *worker,
 
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
 
-static void io_worker_handle_work(struct io_worker *worker)
+static void io_worker_handle_work(struct io_worker *worker,
+				  struct io_wqe_acct *acct)
 {
-	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 	struct io_wqe *wqe = worker->wqe;
 	struct io_wq *wq = wqe->wq;
 	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
@@ -698,19 +698,31 @@ static void io_worker_handle_work(struct io_worker *worker)
 	} while (1);
 }
 
+static inline void io_worker_handle_private_work(struct io_worker *worker)
+{
+	io_worker_handle_work(worker, &worker->acct);
+}
+
+static inline void io_worker_handle_public_work(struct io_worker *worker)
+{
+	io_worker_handle_work(worker, io_wqe_get_acct(worker));
+}
+
 static int io_wqe_worker(void *data)
 {
 	struct io_worker *worker = data;
-	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 	struct io_wqe *wqe = worker->wqe;
 	struct io_wq *wq = wqe->wq;
-	bool last_timeout = false;
+	struct io_wqe_acct *acct =
+		io_get_acct(wqe, worker->flags & IO_WORKER_F_BOUND, false);
 	bool fixed = worker->flags & IO_WORKER_F_FIXED;
+	bool last_timeout = false;
 	char buf[TASK_COMM_LEN];
 
 	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
 
-	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
+	snprintf(buf, sizeof(buf), fixed ? "iou-fix-%d" : "iou-wrk-%d",
+		 wq->task->pid);
 	set_task_comm(current, buf);
 
 	audit_alloc_kernel(current);
@@ -722,13 +734,24 @@ static int io_wqe_worker(void *data)
 			break;
 
 		set_current_state(TASK_INTERRUPTIBLE);
-		while (!(worker->flags & IO_WORKER_F_EXIT) &&
-		       io_acct_run_queue(acct))
-			io_worker_handle_work(worker);
-
+		if (fixed) {
+			while (io_acct_run_queue(&worker->acct))
+				io_worker_handle_private_work(worker);
+			if (io_acct_run_queue(acct))
+				io_worker_handle_public_work(worker);
+		} else {
+			while (io_acct_run_queue(acct))
+				io_worker_handle_public_work(worker);
+		}
 		raw_spin_lock(&wqe->lock);
-		/* timed out, exit unless we're the last worker */
-		if (last_timeout && acct->nr_workers > 1) {
+		/* timed out, a worker will exit only if:
+		 * - not a fixed worker
+		 * - not the last non-fixed worker
+		 *
+		 * the second condition is due to we need at least one worker to
+		 * handle the public work list.
+		 */
+		if (last_timeout && !fixed && acct->nr_workers > 1) {
 			acct->nr_workers--;
 			raw_spin_unlock(&wqe->lock);
 			__set_current_state(TASK_RUNNING);
@@ -754,10 +777,16 @@ static int io_wqe_worker(void *data)
 		last_timeout = !ret;
 	}
 
-	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
-		io_worker_handle_work(worker);
-	if (fixed)
+	if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && !fixed)
+		io_worker_handle_public_work(worker);
+	if (fixed) {
 		io_fixed_worker_exit(worker);
+		/*
+		 * Check and handle private work list again
+		 * to avoid race with private work insertion
+		 */
+		io_worker_handle_private_work(worker);
+	}
 
 	audit_free(current);
 	io_worker_exit(worker);
@@ -1001,9 +1030,9 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 	} while (work);
 }
 
-static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
+static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work,
+			       struct io_wqe_acct *acct)
 {
-	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 	unsigned int hash;
 	struct io_wq_work *tail;
 
@@ -1022,6 +1051,45 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
 	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
 }
 
+static bool io_wqe_insert_private_work(struct io_wqe *wqe,
+				       struct io_wq_work *work,
+				       struct io_wqe_acct *acct)
+{
+	unsigned int nr_fixed;
+	struct io_worker *fixed_worker;
+	struct io_wqe_acct *iw_acct;
+	unsigned int fixed_worker_index;
+
+	raw_spin_lock(&acct->lock);
+	nr_fixed = acct->nr_fixed;
+	if (!nr_fixed) {
+		raw_spin_unlock(&acct->lock);
+		return false;
+	}
+
+	fixed_worker_index = ((unsigned long)work >> 8) % nr_fixed;
+	fixed_worker = acct->fixed_workers[fixed_worker_index];
+	if (!fixed_worker || fixed_worker->flags & IO_WORKER_F_EXIT) {
+		raw_spin_unlock(&acct->lock);
+		return false;
+	}
+	iw_acct = &fixed_worker->acct;
+
+	raw_spin_lock(&iw_acct->lock);
+	if (iw_acct->nr_works < iw_acct->max_works) {
+		io_wqe_insert_work(wqe, work, iw_acct);
+		iw_acct->nr_works++;
+		raw_spin_unlock(&iw_acct->lock);
+		wake_up_process(fixed_worker->task);
+		raw_spin_unlock(&acct->lock);
+		return true;
+	}
+	raw_spin_unlock(&iw_acct->lock);
+	raw_spin_unlock(&acct->lock);
+
+	return false;
+}
+
 static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
 {
 	return work == data;
@@ -1030,6 +1098,7 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 {
 	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+	struct io_wqe_acct *fixed_acct;
 	struct io_cb_cancel_data match;
 	unsigned work_flags = work->flags;
 	bool do_create;
@@ -1044,8 +1113,14 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 		return;
 	}
 
+	fixed_acct = io_get_acct(wqe, !acct->index, true);
+	if (fixed_acct->fixed_worker_registered && !io_wq_is_hashed(work)) {
+		if (io_wqe_insert_private_work(wqe, work, fixed_acct))
+			return;
+	}
+
 	raw_spin_lock(&acct->lock);
-	io_wqe_insert_work(wqe, work);
+	io_wqe_insert_work(wqe, work, acct);
 	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 	raw_spin_unlock(&acct->lock);
 
@@ -1131,9 +1206,9 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 
 static inline void io_wqe_remove_pending(struct io_wqe *wqe,
 					 struct io_wq_work *work,
-					 struct io_wq_work_node *prev)
+					 struct io_wq_work_node *prev,
+					 struct io_wqe_acct *acct)
 {
-	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 	unsigned int hash = io_get_work_hash(work);
 	struct io_wq_work *prev_work = NULL;
 
@@ -1160,7 +1235,7 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
 		work = container_of(node, struct io_wq_work, list);
 		if (!match->fn(work, match->data))
 			continue;
-		io_wqe_remove_pending(wqe, work, prev);
+		io_wqe_remove_pending(wqe, work, prev, acct);
 		raw_spin_unlock(&acct->lock);
 		io_run_cancel(work, wqe);
 		match->nr_pending++;
@@ -1175,17 +1250,35 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 				       struct io_cb_cancel_data *match)
 {
-	int i;
-retry:
-	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
-		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0, false);
+	int i, j;
+	struct io_wqe_acct *acct, *iw_acct;
 
+retry_public:
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		acct = io_get_acct(wqe, i == 0, false);
 		if (io_acct_cancel_pending_work(wqe, acct, match)) {
 			if (match->cancel_all)
-				goto retry;
-			break;
+				goto retry_public;
+			return;
 		}
 	}
+
+retry_private:
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		acct = io_get_acct(wqe, i == 0, true);
+		raw_spin_lock(&acct->lock);
+		for (j = 0; j < acct->nr_fixed; j++) {
+			iw_acct = &acct->fixed_workers[j]->acct;
+			if (io_acct_cancel_pending_work(wqe, iw_acct, match)) {
+				if (match->cancel_all) {
+					raw_spin_unlock(&acct->lock);
+					goto retry_private;
+				}
+				break;
+			}
+		}
+		raw_spin_unlock(&acct->lock);
+	}
 }
 
 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 8/9] io-wq: batch the handling of fixed worker private works
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
                   ` (6 preceding siblings ...)
  2022-04-20 10:39 ` [PATCH 7/9] io-wq: implement fixed worker logic Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
  2022-04-20 10:40 ` [PATCH 9/9] io_uring: add register fixed worker interface Hao Xu
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

Reduce acct->lock contension by batching the handling of private work
list for fixed_workers.

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c | 42 +++++++++++++++++++++++++++++++++---------
 fs/io-wq.h |  5 +++++
 2 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index 8fa5bfb298dc..807985249f62 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -539,8 +539,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
 	return ret;
 }
 
+static inline void conditional_acct_lock(struct io_wqe_acct *acct,
+					 bool needs_lock)
+{
+	if (needs_lock)
+		raw_spin_lock(&acct->lock);
+}
+
+static inline void conditional_acct_unlock(struct io_wqe_acct *acct,
+					   bool needs_lock)
+{
+	if (needs_lock)
+		raw_spin_unlock(&acct->lock);
+}
+
 static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
-					   struct io_worker *worker)
+					   struct io_worker *worker,
+					   bool needs_lock)
 	__must_hold(acct->lock)
 {
 	struct io_wq_work_node *node, *prev;
@@ -548,6 +563,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
 	unsigned int stall_hash = -1U;
 	struct io_wqe *wqe = worker->wqe;
 
+	conditional_acct_lock(acct, needs_lock);
 	wq_list_for_each(node, prev, &acct->work_list) {
 		unsigned int hash;
 
@@ -556,6 +572,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
 		/* not hashed, can run anytime */
 		if (!io_wq_is_hashed(work)) {
 			wq_list_del(&acct->work_list, node, prev);
+			conditional_acct_unlock(acct, needs_lock);
 			return work;
 		}
 
@@ -567,6 +584,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
 		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
 			wqe->hash_tail[hash] = NULL;
 			wq_list_cut(&acct->work_list, &tail->list, prev);
+			conditional_acct_unlock(acct, needs_lock);
 			return work;
 		}
 		if (stall_hash == -1U)
@@ -583,15 +601,16 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
 		 * work being added and clearing the stalled bit.
 		 */
 		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
-		raw_spin_unlock(&acct->lock);
+		conditional_acct_unlock(acct, needs_lock);
 		unstalled = io_wait_on_hash(wqe, stall_hash);
-		raw_spin_lock(&acct->lock);
+		conditional_acct_lock(acct, needs_lock);
 		if (unstalled) {
 			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 			if (wq_has_sleeper(&wqe->wq->hash->wait))
 				wake_up(&wqe->wq->hash->wait);
 		}
 	}
+	conditional_acct_unlock(acct, needs_lock);
 
 	return NULL;
 }
@@ -625,7 +644,7 @@ static void io_assign_current_work(struct io_worker *worker,
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
 
 static void io_worker_handle_work(struct io_worker *worker,
-				  struct io_wqe_acct *acct)
+				  struct io_wqe_acct *acct, bool needs_lock)
 {
 	struct io_wqe *wqe = worker->wqe;
 	struct io_wq *wq = wqe->wq;
@@ -641,9 +660,7 @@ static void io_worker_handle_work(struct io_worker *worker,
 		 * can't make progress, any work completion or insertion will
 		 * clear the stalled flag.
 		 */
-		raw_spin_lock(&acct->lock);
-		work = io_get_next_work(acct, worker);
-		raw_spin_unlock(&acct->lock);
+		work = io_get_next_work(acct, worker, needs_lock);
 		if (work) {
 			__io_worker_busy(wqe, worker);
 
@@ -700,12 +717,19 @@ static void io_worker_handle_work(struct io_worker *worker,
 
 static inline void io_worker_handle_private_work(struct io_worker *worker)
 {
-	io_worker_handle_work(worker, &worker->acct);
+	struct io_wqe_acct acct;
+
+	raw_spin_lock(&worker->acct.lock);
+	acct = worker->acct;
+	wq_list_clean(&worker->acct.work_list);
+	worker->acct.nr_works = 0;
+	raw_spin_unlock(&worker->acct.lock);
+	io_worker_handle_work(worker, &acct, false);
 }
 
 static inline void io_worker_handle_public_work(struct io_worker *worker)
 {
-	io_worker_handle_work(worker, io_wqe_get_acct(worker));
+	io_worker_handle_work(worker, io_wqe_get_acct(worker), true);
 }
 
 static int io_wqe_worker(void *data)
diff --git a/fs/io-wq.h b/fs/io-wq.h
index dbecd27656c7..98befe7b0081 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -40,6 +40,11 @@ struct io_wq_work_list {
 	(list)->first = NULL;					\
 } while (0)
 
+static inline void wq_list_clean(struct io_wq_work_list *list)
+{
+	list->first = list->last = NULL;
+}
+
 static inline void wq_list_add_after(struct io_wq_work_node *node,
 				     struct io_wq_work_node *pos,
 				     struct io_wq_work_list *list)
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 9/9] io_uring: add register fixed worker interface
  2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
                   ` (7 preceding siblings ...)
  2022-04-20 10:39 ` [PATCH 8/9] io-wq: batch the handling of fixed worker private works Hao Xu
@ 2022-04-20 10:40 ` Hao Xu
  8 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-20 10:40 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov

From: Hao Xu <[email protected]>

Add an io_uring_register() interface to register fixed workers and
indicate its work capacity.
The argument is an array of two elements each is
    struct {
    	__s32 nr_workers;
    	__s32 max_works;
    }
(nr_workers, max_works)                        meaning

nr_workers or max_works <  -1                  invalid
nr_workers or max_works == -1           get the old value back
nr_workers or max_works >=  0        get the old value and set to the
                                     new value

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c                    | 101 ++++++++++++++++++++++++++++++++++
 fs/io-wq.h                    |   3 +
 fs/io_uring.c                 |  71 ++++++++++++++++++++++++
 include/uapi/linux/io_uring.h |  11 ++++
 4 files changed, 186 insertions(+)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index 807985249f62..3927d0c48a69 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -1668,6 +1668,107 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
 	return 0;
 }
 
+/*
+ * Set max number of fixed workers and the capacity of private work list,
+ * returns old value. If new_count is -1, then just return the old value.
+ */
+int io_wq_fixed_workers(struct io_wq *wq,
+			struct io_uring_fixed_worker_arg *new_count)
+{
+	struct io_uring_fixed_worker_arg prev[IO_WQ_ACCT_NR];
+	bool first_node = true;
+	int i, node;
+	bool readonly[2] = {
+		(new_count[0].nr_workers == -1 && new_count[0].max_works == -1),
+		(new_count[1].nr_workers == -1 && new_count[1].max_works == -1),
+	};
+
+	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
+	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
+	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);
+
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		if (new_count[i].nr_workers > task_rlimit(current, RLIMIT_NPROC))
+			new_count[i].nr_workers =
+				task_rlimit(current, RLIMIT_NPROC);
+	}
+
+	rcu_read_lock();
+	for_each_node(node) {
+		int j;
+		struct io_wqe *wqe = wq->wqes[node];
+
+		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+			struct io_wqe_acct *acct = &wqe->fixed_acct[i];
+			int *nr_fixed, *max_works;
+			struct io_worker **fixed_workers;
+			int nr = new_count[i].nr_workers;
+
+			raw_spin_lock(&acct->lock);
+			nr_fixed = &acct->nr_fixed;
+			max_works = &acct->max_works;
+			fixed_workers = acct->fixed_workers;
+			if (first_node) {
+				prev[i].nr_workers = *nr_fixed;
+				prev[i].max_works = *max_works;
+			}
+			if (readonly[i]) {
+				raw_spin_unlock(&acct->lock);
+				continue;
+			}
+			if (*nr_fixed == nr || nr == -1) {
+				*max_works = new_count[i].max_works;
+				raw_spin_unlock(&acct->lock);
+				continue;
+			}
+			for (j = 0; j < *nr_fixed; j++) {
+				struct io_worker *worker = fixed_workers[j];
+
+				if (!worker)
+					continue;
+				worker->flags |= IO_WORKER_F_EXIT;
+				/*
+				 * Mark index to -1 to avoid false deletion
+				 * in io_fixed_worker_exit()
+				 */
+				worker->index = -1;
+				/*
+				 * Once a worker is in fixed_workers array
+				 * it is definitely there before we release
+				 * the acct->lock below. That's why we don't
+				 * need to increment the worker->ref here.
+				 */
+				wake_up_process(worker->task);
+			}
+			kfree(fixed_workers);
+			acct->fixed_workers = NULL;
+			*nr_fixed = 0;
+			*max_works = new_count[i].max_works;
+			acct->fixed_workers = kzalloc_node(
+						sizeof(*fixed_workers) * nr,
+						GFP_KERNEL, wqe->node);
+			if (!acct->fixed_workers) {
+				raw_spin_unlock(&acct->lock);
+				return -ENOMEM;
+			}
+			raw_spin_unlock(&acct->lock);
+			for (j = 0; j < nr; j++)
+				io_wqe_create_worker(wqe, acct);
+
+			acct->fixed_worker_registered = !!nr;
+		}
+		first_node = false;
+	}
+	rcu_read_unlock();
+
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		new_count[i].nr_workers = prev[i].nr_workers;
+		new_count[i].max_works = prev[i].max_works;
+	}
+
+	return 0;
+}
+
 static __init int io_wq_init(void)
 {
 	int ret;
diff --git a/fs/io-wq.h b/fs/io-wq.h
index 98befe7b0081..c7a454796a05 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -2,6 +2,7 @@
 #define INTERNAL_IO_WQ_H
 
 #include <linux/refcount.h>
+#include <uapi/linux/io_uring.h>
 
 struct io_wq;
 
@@ -201,6 +202,8 @@ void io_wq_hash_work(struct io_wq_work *work, void *val);
 
 int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask);
 int io_wq_max_workers(struct io_wq *wq, int *new_count);
+int io_wq_fixed_workers(struct io_wq *wq,
+			struct io_uring_fixed_worker_arg *new_count);
 
 static inline bool io_wq_is_hashed(struct io_wq_work *work)
 {
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 3905b3ec87b8..89d088ed636a 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -11580,6 +11580,71 @@ static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
 	return ret;
 }
 
+static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
+						 void __user *arg)
+	__must_hold(&ctx->uring_lock)
+{
+	struct io_uring_task *tctx = NULL;
+	struct io_sq_data *sqd = NULL;
+	struct io_uring_fixed_worker_arg new_count[2];
+	int i, ret;
+
+	if (copy_from_user(new_count, arg, sizeof(new_count)))
+		return -EFAULT;
+	for (i = 0; i < ARRAY_SIZE(new_count); i++) {
+		int nr_workers = new_count[i].nr_workers;
+		int max_works = new_count[i].max_works;
+
+		if (nr_workers < -1 || max_works < -1)
+			return -EINVAL;
+	}
+
+	if (ctx->flags & IORING_SETUP_SQPOLL) {
+		sqd = ctx->sq_data;
+		if (sqd) {
+			/*
+			 * Observe the correct sqd->lock -> ctx->uring_lock
+			 * ordering. Fine to drop uring_lock here, we hold
+			 * a ref to the ctx.
+			 */
+			refcount_inc(&sqd->refs);
+			mutex_unlock(&ctx->uring_lock);
+			mutex_lock(&sqd->lock);
+			mutex_lock(&ctx->uring_lock);
+			if (sqd->thread)
+				tctx = sqd->thread->io_uring;
+		}
+	} else {
+		tctx = current->io_uring;
+	}
+
+	if (tctx && tctx->io_wq) {
+		ret = io_wq_fixed_workers(tctx->io_wq, new_count);
+		if (ret)
+			goto err;
+	} else {
+		memset(new_count, -1, sizeof(new_count));
+	}
+
+	if (sqd) {
+		mutex_unlock(&sqd->lock);
+		io_put_sq_data(sqd);
+	}
+
+	if (copy_to_user(arg, new_count, sizeof(new_count)))
+		return -EFAULT;
+
+	/* that's it for SQPOLL, only the SQPOLL task creates requests */
+	if (sqd)
+		return 0;
+
+err:
+	if (sqd) {
+		mutex_unlock(&sqd->lock);
+		io_put_sq_data(sqd);
+	}
+	return ret;
+}
 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			       void __user *arg, unsigned nr_args)
 	__releases(ctx->uring_lock)
@@ -11708,6 +11773,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 	case IORING_UNREGISTER_RING_FDS:
 		ret = io_ringfd_unregister(ctx, arg, nr_args);
 		break;
+	case IORING_REGISTER_IOWQ_FIXED_WORKERS:
+		ret = -EINVAL;
+		if (!arg || nr_args != 2)
+			break;
+		ret = io_register_iowq_fixed_workers(ctx, arg);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 1845cf7c80ba..ffce27a4f9a6 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -333,6 +333,12 @@ enum {
 	IORING_REGISTER_RING_FDS		= 20,
 	IORING_UNREGISTER_RING_FDS		= 21,
 
+	/* set number of fixed workers and number
+	 * of works in a private work list which
+	 * belongs to a fixed worker
+	 */
+	IORING_REGISTER_IOWQ_FIXED_WORKERS	= 22,
+
 	/* this goes last */
 	IORING_REGISTER_LAST
 };
@@ -430,4 +436,9 @@ struct io_uring_getevents_arg {
 	__u64	ts;
 };
 
+struct io_uring_fixed_worker_arg {
+	__s32	nr_workers;
+	__s32	max_works;
+};
+
 #endif
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCH 9/9] io_uring: add register fixed worker interface
  2022-04-29 10:18 [RFC v3 0/9] fixed worker Hao Xu
@ 2022-04-29 10:18 ` Hao Xu
  0 siblings, 0 replies; 11+ messages in thread
From: Hao Xu @ 2022-04-29 10:18 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, linux-fsdevel, linux-kernel

From: Hao Xu <[email protected]>

Add an io_uring_register() interface to register fixed workers and
indicate its work capacity.
The argument is an array of two elements each is
    struct {
    	__s32 nr_workers;
    	__s32 max_works;
    }
(nr_workers, max_works)                        meaning

nr_workers or max_works <  -1                  invalid
nr_workers or max_works == -1           get the old value back
nr_workers or max_works >=  0        get the old value and set to the
                                     new value

Signed-off-by: Hao Xu <[email protected]>
---
 fs/io-wq.c                    | 101 ++++++++++++++++++++++++++++++++++
 fs/io-wq.h                    |   3 +
 fs/io_uring.c                 |  71 ++++++++++++++++++++++++
 include/uapi/linux/io_uring.h |  11 ++++
 4 files changed, 186 insertions(+)

diff --git a/fs/io-wq.c b/fs/io-wq.c
index df2d480395e8..c1e87b29c960 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -1671,6 +1671,107 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
 	return 0;
 }
 
+/*
+ * Set max number of fixed workers and the capacity of private work list,
+ * returns old value. If new_count is -1, then just return the old value.
+ */
+int io_wq_fixed_workers(struct io_wq *wq,
+			struct io_uring_fixed_worker_arg *new_count)
+{
+	struct io_uring_fixed_worker_arg prev[IO_WQ_ACCT_NR];
+	bool first_node = true;
+	int i, node;
+	bool readonly[2] = {
+		(new_count[0].nr_workers == -1 && new_count[0].max_works == -1),
+		(new_count[1].nr_workers == -1 && new_count[1].max_works == -1),
+	};
+
+	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
+	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
+	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);
+
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		if (new_count[i].nr_workers > task_rlimit(current, RLIMIT_NPROC))
+			new_count[i].nr_workers =
+				task_rlimit(current, RLIMIT_NPROC);
+	}
+
+	rcu_read_lock();
+	for_each_node(node) {
+		int j;
+		struct io_wqe *wqe = wq->wqes[node];
+
+		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+			struct io_wqe_acct *acct = &wqe->fixed_acct[i];
+			int *nr_fixed, *max_works;
+			struct io_worker **fixed_workers;
+			int nr = new_count[i].nr_workers;
+
+			raw_spin_lock(&acct->lock);
+			nr_fixed = &acct->nr_fixed;
+			max_works = &acct->max_works;
+			fixed_workers = acct->fixed_workers;
+			if (first_node) {
+				prev[i].nr_workers = *nr_fixed;
+				prev[i].max_works = *max_works;
+			}
+			if (readonly[i]) {
+				raw_spin_unlock(&acct->lock);
+				continue;
+			}
+			if (*nr_fixed == nr || nr == -1) {
+				*max_works = new_count[i].max_works;
+				raw_spin_unlock(&acct->lock);
+				continue;
+			}
+			for (j = 0; j < *nr_fixed; j++) {
+				struct io_worker *worker = fixed_workers[j];
+
+				if (!worker)
+					continue;
+				worker->flags |= IO_WORKER_F_EXIT;
+				/*
+				 * Mark index to -1 to avoid false deletion
+				 * in io_fixed_worker_exit()
+				 */
+				worker->index = -1;
+				/*
+				 * Once a worker is in fixed_workers array
+				 * it is definitely there before we release
+				 * the acct->lock below. That's why we don't
+				 * need to increment the worker->ref here.
+				 */
+				wake_up_process(worker->task);
+			}
+			kfree(fixed_workers);
+			acct->fixed_workers = NULL;
+			*nr_fixed = 0;
+			*max_works = new_count[i].max_works;
+			acct->fixed_workers = kzalloc_node(
+						sizeof(*fixed_workers) * nr,
+						GFP_KERNEL, wqe->node);
+			if (!acct->fixed_workers) {
+				raw_spin_unlock(&acct->lock);
+				return -ENOMEM;
+			}
+			raw_spin_unlock(&acct->lock);
+			for (j = 0; j < nr; j++)
+				io_wqe_create_worker(wqe, acct);
+
+			acct->fixed_worker_registered = !!nr;
+		}
+		first_node = false;
+	}
+	rcu_read_unlock();
+
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		new_count[i].nr_workers = prev[i].nr_workers;
+		new_count[i].max_works = prev[i].max_works;
+	}
+
+	return 0;
+}
+
 static __init int io_wq_init(void)
 {
 	int ret;
diff --git a/fs/io-wq.h b/fs/io-wq.h
index ef3ce577e6b7..bf90488b0283 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -2,6 +2,7 @@
 #define INTERNAL_IO_WQ_H
 
 #include <linux/refcount.h>
+#include <uapi/linux/io_uring.h>
 
 struct io_wq;
 
@@ -202,6 +203,8 @@ void io_wq_hash_work(struct io_wq_work *work, void *val);
 
 int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask);
 int io_wq_max_workers(struct io_wq *wq, int *new_count);
+int io_wq_fixed_workers(struct io_wq *wq,
+			struct io_uring_fixed_worker_arg *new_count);
 
 static inline bool io_wq_is_hashed(struct io_wq_work *work)
 {
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 1e7466079af7..c0c7c1fd94fd 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -11806,6 +11806,71 @@ static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
 	return ret;
 }
 
+static __cold int io_register_iowq_fixed_workers(struct io_ring_ctx *ctx,
+						 void __user *arg)
+	__must_hold(&ctx->uring_lock)
+{
+	struct io_uring_task *tctx = NULL;
+	struct io_sq_data *sqd = NULL;
+	struct io_uring_fixed_worker_arg new_count[2];
+	int i, ret;
+
+	if (copy_from_user(new_count, arg, sizeof(new_count)))
+		return -EFAULT;
+	for (i = 0; i < ARRAY_SIZE(new_count); i++) {
+		int nr_workers = new_count[i].nr_workers;
+		int max_works = new_count[i].max_works;
+
+		if (nr_workers < -1 || max_works < -1)
+			return -EINVAL;
+	}
+
+	if (ctx->flags & IORING_SETUP_SQPOLL) {
+		sqd = ctx->sq_data;
+		if (sqd) {
+			/*
+			 * Observe the correct sqd->lock -> ctx->uring_lock
+			 * ordering. Fine to drop uring_lock here, we hold
+			 * a ref to the ctx.
+			 */
+			refcount_inc(&sqd->refs);
+			mutex_unlock(&ctx->uring_lock);
+			mutex_lock(&sqd->lock);
+			mutex_lock(&ctx->uring_lock);
+			if (sqd->thread)
+				tctx = sqd->thread->io_uring;
+		}
+	} else {
+		tctx = current->io_uring;
+	}
+
+	if (tctx && tctx->io_wq) {
+		ret = io_wq_fixed_workers(tctx->io_wq, new_count);
+		if (ret)
+			goto err;
+	} else {
+		memset(new_count, -1, sizeof(new_count));
+	}
+
+	if (sqd) {
+		mutex_unlock(&sqd->lock);
+		io_put_sq_data(sqd);
+	}
+
+	if (copy_to_user(arg, new_count, sizeof(new_count)))
+		return -EFAULT;
+
+	/* that's it for SQPOLL, only the SQPOLL task creates requests */
+	if (sqd)
+		return 0;
+
+err:
+	if (sqd) {
+		mutex_unlock(&sqd->lock);
+		io_put_sq_data(sqd);
+	}
+	return ret;
+}
 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			       void __user *arg, unsigned nr_args)
 	__releases(ctx->uring_lock)
@@ -11934,6 +11999,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 	case IORING_UNREGISTER_RING_FDS:
 		ret = io_ringfd_unregister(ctx, arg, nr_args);
 		break;
+	case IORING_REGISTER_IOWQ_FIXED_WORKERS:
+		ret = -EINVAL;
+		if (!arg || nr_args != 2)
+			break;
+		ret = io_register_iowq_fixed_workers(ctx, arg);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index fad63564678a..f0ec9523ab42 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -360,6 +360,12 @@ enum {
 	IORING_REGISTER_RING_FDS		= 20,
 	IORING_UNREGISTER_RING_FDS		= 21,
 
+	/* set number of fixed workers and number
+	 * of works in a private work list which
+	 * belongs to a fixed worker
+	 */
+	IORING_REGISTER_IOWQ_FIXED_WORKERS	= 22,
+
 	/* this goes last */
 	IORING_REGISTER_LAST
 };
@@ -457,4 +463,9 @@ struct io_uring_getevents_arg {
 	__u64	ts;
 };
 
+struct io_uring_fixed_worker_arg {
+	__s32	nr_workers;
+	__s32	max_works;
+};
+
 #endif
-- 
2.36.0


^ permalink raw reply related	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2022-04-29 10:19 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
2022-04-20 10:39 ` [PATCH 1/9] io-wq: add a worker flag for individual exit Hao Xu
2022-04-20 10:39 ` [PATCH 2/9] io-wq: change argument of create_io_worker() for convienence Hao Xu
2022-04-20 10:39 ` [PATCH 3/9] io-wq: add infra data structure for fixed workers Hao Xu
2022-04-20 10:39 ` [PATCH 4/9] io-wq: tweak io_get_acct() Hao Xu
2022-04-20 10:39 ` [PATCH 5/9] io-wq: fixed worker initialization Hao Xu
2022-04-20 10:39 ` [PATCH 6/9] io-wq: fixed worker exit Hao Xu
2022-04-20 10:39 ` [PATCH 7/9] io-wq: implement fixed worker logic Hao Xu
2022-04-20 10:39 ` [PATCH 8/9] io-wq: batch the handling of fixed worker private works Hao Xu
2022-04-20 10:40 ` [PATCH 9/9] io_uring: add register fixed worker interface Hao Xu
  -- strict thread matches above, loose matches on Subject: below --
2022-04-29 10:18 [RFC v3 0/9] fixed worker Hao Xu
2022-04-29 10:18 ` [PATCH 9/9] io_uring: add register fixed worker interface Hao Xu

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox