* [RFC 0/9] fixed worker: a new way to handle io works
@ 2021-11-24 4:46 Hao Xu
2021-11-24 4:46 ` [PATCH 1/9] io-wq: decouple work_list protection from the big wqe->lock Hao Xu
` (9 more replies)
0 siblings, 10 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
There is big contension in current io-wq implementation. Introduce a new
type io-worker called fixed-worker to solve this problem. it is also a
new way to handle works. In this new system, works are dispatched to
different private queues rather than a long shared queue.
Detail introduction and data in 7/9.
To be done: 1) the hash optimization isn't applied yet
2) user interface
3) cannot ensure linear order for works of same reg file
writing since we now have multiple work lists.
4) code clean
Sent this for suggestions.
The test program used in this patchset:
// nop_test.c
// remove some error handling, variable definition, header files etc.
typedef long long ll;
ll usecs(struct timeval tv) {
return tv.tv_sec*(ll)1000*1000+tv.tv_usec;
}
static int test_single_nop(struct io_uring *ring, int depth)
{
for (i=0; i<depth; i++) {
sqe = io_uring_get_sqe(ring);
io_uring_prep_nop(sqe);
sqe->flags |= IOSQE_ASYNC;
}
ret = io_uring_submit(ring);
for(i=0; i<depth; i++) {
ret = io_uring_wait_cqe(ring, &cqe);
io_uring_cqe_seen(ring, cqe);
}
return 0;
}
int main(int argc, char *argv[])
{
ll delta;
struct io_uring ring;
int ret, l, loop=4000000, depth = 10;
struct timeval tv_begin, tv_end;
struct timezone tz;
ret = io_uring_queue_init(10010, &ring, 0);
if (ret) {
fprintf(stderr, "ring setup failed: %d\n", ret);
return 1;
}
l = loop;
gettimeofday(&tv_begin, &tz);
while(loop--)
test_single_nop(&ring, depth);
gettimeofday(&tv_end, &tz);
delta = usecs(tv_end) - usecs(tv_begin);
printf("time spent: %lld usecs\n", delta);
printf("IOPS: %lld\n", (ll)l * depth * 1000000 / delta);
return 0;
}
Hao Xu (9):
io-wq: decouple work_list protection from the big wqe->lock
io-wq: reduce acct->lock crossing functions lock/unlock
io-wq: update check condition for lock
io-wq: use IO_WQ_ACCT_NR rather than hardcoded number
io-wq: move hash wait entry to io_wqe_acct
io-wq: add infra data structure for fix workers
io-wq: implement fixed worker logic
io-wq: batch the handling of fixed worker private works
io-wq: small optimization for __io_worker_busy()
fs/io-wq.c | 415 ++++++++++++++++++++++++++++++++++++++---------------
fs/io-wq.h | 5 +
2 files changed, 308 insertions(+), 112 deletions(-)
--
2.24.4
^ permalink raw reply [flat|nested] 16+ messages in thread
* [PATCH 1/9] io-wq: decouple work_list protection from the big wqe->lock
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-24 4:46 ` [PATCH 2/9] io-wq: reduce acct->lock crossing functions lock/unlock Hao Xu
` (8 subsequent siblings)
9 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
wqe->lock is abused, it now protects acct->work_list, hash stuff,
nr_workers, wqe->free_list and so on. Lets first get the work_list out
of the wqe-lock mess by introduce a specific lock for work list. This
is the first step to solve the huge contension between work insertion
and work consumption.
good thing:
- split locking for bound and unbound work list
- reduce contension between work_list visit and (worker's)free_list.
For the hash stuff, since there won't be a work with same file in both
bound and unbound work list, thus they won't visit same hash entry. it
works well to use the new lock to protect hash stuff.
Results:
set max_unbound_worker = 4, test with echo-server:
nice -n -15 ./io_uring_echo_server -p 8081 -f -n 1000 -l 16
(-n connection, -l workload)
before this patch:
Samples: 2M of event 'cycles:ppp', Event count (approx.): 1239982111074
Overhead Command Shared Object Symbol
28.59% iou-wrk-10021 [kernel.vmlinux] [k] native_queued_spin_lock_slowpath
8.89% io_uring_echo_s [kernel.vmlinux] [k] native_queued_spin_lock_slowpath
6.20% iou-wrk-10021 [kernel.vmlinux] [k] _raw_spin_lock
2.45% io_uring_echo_s [kernel.vmlinux] [k] io_prep_async_work
2.36% iou-wrk-10021 [kernel.vmlinux] [k] _raw_spin_lock_irqsave
2.29% iou-wrk-10021 [kernel.vmlinux] [k] io_worker_handle_work
1.29% io_uring_echo_s [kernel.vmlinux] [k] io_wqe_enqueue
1.06% iou-wrk-10021 [kernel.vmlinux] [k] io_wqe_worker
1.06% io_uring_echo_s [kernel.vmlinux] [k] _raw_spin_lock
1.03% iou-wrk-10021 [kernel.vmlinux] [k] __schedule
0.99% iou-wrk-10021 [kernel.vmlinux] [k] tcp_sendmsg_locked
with this patch:
Samples: 1M of event 'cycles:ppp', Event count (approx.): 708446691943
Overhead Command Shared Object Symbol
16.86% iou-wrk-10893 [kernel.vmlinux] [k] native_queued_spin_lock_slowpat
9.10% iou-wrk-10893 [kernel.vmlinux] [k] _raw_spin_lock
4.53% io_uring_echo_s [kernel.vmlinux] [k] native_queued_spin_lock_slowpat
2.87% iou-wrk-10893 [kernel.vmlinux] [k] io_worker_handle_work
2.57% iou-wrk-10893 [kernel.vmlinux] [k] _raw_spin_lock_irqsave
2.56% io_uring_echo_s [kernel.vmlinux] [k] io_prep_async_work
1.82% io_uring_echo_s [kernel.vmlinux] [k] _raw_spin_lock
1.33% iou-wrk-10893 [kernel.vmlinux] [k] io_wqe_worker
1.26% io_uring_echo_s [kernel.vmlinux] [k] try_to_wake_up
spin_lock failure from 25.59% + 8.89% = 34.48% to 16.86% + 4.53% = 21.39%
TPS is similar, while cpu usage is from almost 400% to 350%
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 89 +++++++++++++++++++++++++++++++++---------------------
1 file changed, 54 insertions(+), 35 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index fe6b2abcaa49..1869cf6c39f3 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -74,6 +74,7 @@ struct io_wqe_acct {
unsigned max_workers;
int index;
atomic_t nr_running;
+ raw_spinlock_t lock;
struct io_wq_work_list work_list;
unsigned long flags;
};
@@ -221,12 +222,13 @@ static void io_worker_exit(struct io_worker *worker)
if (worker->flags & IO_WORKER_F_FREE)
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
- preempt_disable();
+ raw_spin_unlock(&wqe->lock);
+
io_wqe_dec_running(worker);
worker->flags = 0;
+ preempt_disable();
current->flags &= ~PF_IO_WORKER;
preempt_enable();
- raw_spin_unlock(&wqe->lock);
kfree_rcu(worker, rcu);
io_worker_ref_put(wqe->wq);
@@ -380,11 +382,19 @@ static void io_wqe_dec_running(struct io_worker *worker)
if (!(worker->flags & IO_WORKER_F_UP))
return;
- if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
- atomic_inc(&acct->nr_running);
- atomic_inc(&wqe->wq->worker_refs);
- io_queue_worker_create(worker, acct, create_worker_cb);
+ if (!atomic_dec_and_test(&acct->nr_running))
+ return;
+
+ raw_spin_lock(&acct->lock);
+ if (!io_acct_run_queue(acct)) {
+ raw_spin_unlock(&acct->lock);
+ return;
}
+
+ raw_spin_unlock(&acct->lock);
+ atomic_inc(&acct->nr_running);
+ atomic_inc(&wqe->wq->worker_refs);
+ io_queue_worker_create(worker, acct, create_worker_cb);
}
/*
@@ -479,9 +489,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
* work being added and clearing the stalled bit.
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&acct->lock);
io_wait_on_hash(wqe, stall_hash);
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&acct->lock);
}
return NULL;
@@ -531,12 +541,14 @@ static void io_worker_handle_work(struct io_worker *worker)
* clear the stalled flag.
*/
work = io_get_next_work(acct, worker);
- if (work)
+ raw_spin_unlock(&acct->lock);
+ if (work) {
+ raw_spin_lock(&wqe->lock);
__io_worker_busy(wqe, worker, work);
-
- raw_spin_unlock(&wqe->lock);
- if (!work)
+ raw_spin_unlock(&wqe->lock);
+ } else {
break;
+ }
io_assign_current_work(worker, work);
__set_current_state(TASK_RUNNING);
@@ -567,15 +579,15 @@ static void io_worker_handle_work(struct io_worker *worker)
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&acct->lock);
/* skip unnecessary unlock-lock wqe->lock */
if (!work)
goto get_next;
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&acct->lock);
}
} while (work);
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&acct->lock);
} while (1);
}
@@ -598,11 +610,14 @@ static int io_wqe_worker(void *data)
set_current_state(TASK_INTERRUPTIBLE);
loop:
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&acct->lock);
if (io_acct_run_queue(acct)) {
io_worker_handle_work(worker);
goto loop;
+ } else {
+ raw_spin_unlock(&acct->lock);
}
+ raw_spin_lock(&wqe->lock);
/* timed out, exit unless we're the last worker */
if (last_timeout && acct->nr_workers > 1) {
acct->nr_workers--;
@@ -627,7 +642,7 @@ static int io_wqe_worker(void *data)
}
if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&acct->lock);
io_worker_handle_work(worker);
}
@@ -668,10 +683,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
return;
worker->flags &= ~IO_WORKER_F_RUNNING;
-
- raw_spin_lock(&worker->wqe->lock);
io_wqe_dec_running(worker);
- raw_spin_unlock(&worker->wqe->lock);
}
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
@@ -734,10 +746,12 @@ static void create_worker_cont(struct callback_head *cb)
.cancel_all = true,
};
+ raw_spin_unlock(&wqe->lock);
while (io_acct_cancel_pending_work(wqe, acct, &match))
- raw_spin_lock(&wqe->lock);
+ ;
+ } else {
+ raw_spin_unlock(&wqe->lock);
}
- raw_spin_unlock(&wqe->lock);
io_worker_ref_put(wqe->wq);
kfree(worker);
return;
@@ -870,6 +884,7 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+ struct io_cb_cancel_data match;
unsigned work_flags = work->flags;
bool do_create;
@@ -883,10 +898,12 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
return;
}
- raw_spin_lock(&wqe->lock);
+ raw_spin_lock(&acct->lock);
io_wqe_insert_work(wqe, work);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+ raw_spin_unlock(&acct->lock);
+ raw_spin_lock(&wqe->lock);
rcu_read_lock();
do_create = !io_wqe_activate_free_worker(wqe, acct);
rcu_read_unlock();
@@ -902,18 +919,19 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
return;
raw_spin_lock(&wqe->lock);
+ if (acct->nr_workers) {
+ raw_spin_unlock(&wqe->lock);
+ return;
+ }
+ raw_spin_unlock(&wqe->lock);
+
/* fatal condition, failed to create the first worker */
- if (!acct->nr_workers) {
- struct io_cb_cancel_data match = {
- .fn = io_wq_work_match_item,
- .data = work,
- .cancel_all = false,
- };
+ match.fn = io_wq_work_match_item,
+ match.data = work,
+ match.cancel_all = false,
- if (io_acct_cancel_pending_work(wqe, acct, &match))
- raw_spin_lock(&wqe->lock);
- }
raw_spin_unlock(&wqe->lock);
+ io_acct_cancel_pending_work(wqe, acct, &match);
}
}
@@ -982,17 +1000,19 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
struct io_wq_work_node *node, *prev;
struct io_wq_work *work;
+ raw_spin_lock(&acct->lock);
wq_list_for_each(node, prev, &acct->work_list) {
work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
io_wqe_remove_pending(wqe, work, prev);
- raw_spin_unlock(&wqe->lock);
+ raw_spin_unlock(&acct->lock);
io_run_cancel(work, wqe);
match->nr_pending++;
/* not safe to continue after unlock */
return true;
}
+ raw_spin_unlock(&acct->lock);
return false;
}
@@ -1002,7 +1022,6 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
{
int i;
retry:
- raw_spin_lock(&wqe->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
@@ -1012,7 +1031,6 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
return;
}
}
- raw_spin_unlock(&wqe->lock);
}
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
@@ -1134,6 +1152,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
acct->index = i;
atomic_set(&acct->nr_running, 0);
INIT_WQ_LIST(&acct->work_list);
+ raw_spin_lock_init(&acct->lock);
}
wqe->wq = wq;
raw_spin_lock_init(&wqe->lock);
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 2/9] io-wq: reduce acct->lock crossing functions lock/unlock
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
2021-11-24 4:46 ` [PATCH 1/9] io-wq: decouple work_list protection from the big wqe->lock Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-24 4:46 ` [PATCH 3/9] io-wq: update check condition for lock Hao Xu
` (7 subsequent siblings)
9 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
reduce acct->lock lock and unlock in different functions to make the
code clearer.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 27 +++++++++------------------
1 file changed, 9 insertions(+), 18 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 1869cf6c39f3..26ccc04797b7 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -237,10 +237,14 @@ static void io_worker_exit(struct io_worker *worker)
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
{
+ bool ret = false;
+
+ raw_spin_lock(&acct->lock);
if (!wq_list_empty(&acct->work_list) &&
!test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
- return true;
- return false;
+ ret = true;
+ raw_spin_unlock(&acct->lock);
+ return ret;
}
/*
@@ -385,13 +389,9 @@ static void io_wqe_dec_running(struct io_worker *worker)
if (!atomic_dec_and_test(&acct->nr_running))
return;
- raw_spin_lock(&acct->lock);
- if (!io_acct_run_queue(acct)) {
- raw_spin_unlock(&acct->lock);
+ if (!io_acct_run_queue(acct))
return;
- }
- raw_spin_unlock(&acct->lock);
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
io_queue_worker_create(worker, acct, create_worker_cb);
@@ -540,6 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker)
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
+ raw_spin_lock(&acct->lock);
work = io_get_next_work(acct, worker);
raw_spin_unlock(&acct->lock);
if (work) {
@@ -579,15 +580,10 @@ static void io_worker_handle_work(struct io_worker *worker)
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
- raw_spin_lock(&acct->lock);
- /* skip unnecessary unlock-lock wqe->lock */
if (!work)
goto get_next;
- raw_spin_unlock(&acct->lock);
}
} while (work);
-
- raw_spin_lock(&acct->lock);
} while (1);
}
@@ -610,12 +606,9 @@ static int io_wqe_worker(void *data)
set_current_state(TASK_INTERRUPTIBLE);
loop:
- raw_spin_lock(&acct->lock);
if (io_acct_run_queue(acct)) {
io_worker_handle_work(worker);
goto loop;
- } else {
- raw_spin_unlock(&acct->lock);
}
raw_spin_lock(&wqe->lock);
/* timed out, exit unless we're the last worker */
@@ -641,10 +634,8 @@ static int io_wqe_worker(void *data)
last_timeout = !ret;
}
- if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
- raw_spin_lock(&acct->lock);
+ if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
io_worker_handle_work(worker);
- }
io_worker_exit(worker);
return 0;
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 3/9] io-wq: update check condition for lock
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
2021-11-24 4:46 ` [PATCH 1/9] io-wq: decouple work_list protection from the big wqe->lock Hao Xu
2021-11-24 4:46 ` [PATCH 2/9] io-wq: reduce acct->lock crossing functions lock/unlock Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-25 14:47 ` Pavel Begunkov
2021-11-24 4:46 ` [PATCH 4/9] io-wq: use IO_WQ_ACCT_NR rather than hardcoded number Hao Xu
` (6 subsequent siblings)
9 siblings, 1 reply; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
Update sparse check since we changed the lock.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 26ccc04797b7..443c34d9b326 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -378,7 +378,6 @@ static bool io_queue_worker_create(struct io_worker *worker,
}
static void io_wqe_dec_running(struct io_worker *worker)
- __must_hold(wqe->lock)
{
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
@@ -449,7 +448,7 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
struct io_worker *worker)
- __must_hold(wqe->lock)
+ __must_hold(acct->lock)
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work, *tail;
@@ -523,7 +522,6 @@ static void io_assign_current_work(struct io_worker *worker,
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
static void io_worker_handle_work(struct io_worker *worker)
- __releases(wqe->lock)
{
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
@@ -986,7 +984,6 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
struct io_wqe_acct *acct,
struct io_cb_cancel_data *match)
- __releases(wqe->lock)
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work;
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 4/9] io-wq: use IO_WQ_ACCT_NR rather than hardcoded number
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
` (2 preceding siblings ...)
2021-11-24 4:46 ` [PATCH 3/9] io-wq: update check condition for lock Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-24 4:46 ` [PATCH 5/9] io-wq: move hash wait entry to io_wqe_acct Hao Xu
` (5 subsequent siblings)
9 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
It's better to use the defined enum stuff not the hardcoded number to
define array.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 443c34d9b326..dce365013bd5 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -90,7 +90,7 @@ enum {
*/
struct io_wqe {
raw_spinlock_t lock;
- struct io_wqe_acct acct[2];
+ struct io_wqe_acct acct[IO_WQ_ACCT_NR];
int node;
@@ -1317,7 +1317,7 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
- for (i = 0; i < 2; i++) {
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
new_count[i] = task_rlimit(current, RLIMIT_NPROC);
}
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 5/9] io-wq: move hash wait entry to io_wqe_acct
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
` (3 preceding siblings ...)
2021-11-24 4:46 ` [PATCH 4/9] io-wq: use IO_WQ_ACCT_NR rather than hardcoded number Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-24 4:46 ` [PATCH 6/9] io-wq: add infra data structure for fixed workers Hao Xu
` (4 subsequent siblings)
9 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
Move wait entry to struct io_wqe_acct since we are going to add private
work list for io_worker in the next patch. This is preparation for the
fixed io-worker feature.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 45 ++++++++++++++++++++++++---------------------
1 file changed, 24 insertions(+), 21 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index dce365013bd5..44c3e344c5d6 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -77,6 +77,8 @@ struct io_wqe_acct {
raw_spinlock_t lock;
struct io_wq_work_list work_list;
unsigned long flags;
+ struct wait_queue_entry wait;
+ struct io_wqe *wqe;
};
enum {
@@ -97,8 +99,6 @@ struct io_wqe {
struct hlist_nulls_head free_list;
struct list_head all_list;
- struct wait_queue_entry wait;
-
struct io_wq *wq;
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
@@ -431,16 +431,16 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work)
return work->flags >> IO_WQ_HASH_SHIFT;
}
-static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
+static void io_wait_on_hash(struct io_wqe_acct *acct, unsigned int hash)
{
- struct io_wq *wq = wqe->wq;
+ struct io_wq *wq = acct->wqe->wq;
spin_lock_irq(&wq->hash->wait.lock);
- if (list_empty(&wqe->wait.entry)) {
- __add_wait_queue(&wq->hash->wait, &wqe->wait);
+ if (list_empty(&acct->wait.entry)) {
+ __add_wait_queue(&wq->hash->wait, &acct->wait);
if (!test_bit(hash, &wq->hash->map)) {
__set_current_state(TASK_RUNNING);
- list_del_init(&wqe->wait.entry);
+ list_del_init(&acct->wait.entry);
}
}
spin_unlock_irq(&wq->hash->wait.lock);
@@ -489,7 +489,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
- io_wait_on_hash(wqe, stall_hash);
+ io_wait_on_hash(acct, stall_hash);
raw_spin_lock(&acct->lock);
}
@@ -1076,19 +1076,17 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
int sync, void *key)
{
- struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
- int i;
+ struct io_wqe_acct *acct = container_of(wait, struct io_wqe_acct, wait);
+ bool ret;
list_del_init(&wait->entry);
-
- rcu_read_lock();
- for (i = 0; i < IO_WQ_ACCT_NR; i++) {
- struct io_wqe_acct *acct = &wqe->acct[i];
-
- if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
- io_wqe_activate_free_worker(wqe, acct);
+ ret = test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+ if (ret) {
+ rcu_read_lock();
+ io_wqe_activate_free_worker(acct->wqe, acct);
+ rcu_read_unlock();
}
- rcu_read_unlock();
+
return 1;
}
@@ -1132,8 +1130,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
task_rlimit(current, RLIMIT_NPROC);
- INIT_LIST_HEAD(&wqe->wait.entry);
- wqe->wait.func = io_wqe_hash_wake;
+
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = &wqe->acct[i];
@@ -1141,6 +1138,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
atomic_set(&acct->nr_running, 0);
INIT_WQ_LIST(&acct->work_list);
raw_spin_lock_init(&acct->lock);
+ INIT_LIST_HEAD(&acct->wait.entry);
+ acct->wait.func = io_wqe_hash_wake;
+ acct->wqe = wqe;
}
wqe->wq = wq;
raw_spin_lock_init(&wqe->lock);
@@ -1207,8 +1207,11 @@ static void io_wq_exit_workers(struct io_wq *wq)
wait_for_completion(&wq->worker_done);
for_each_node(node) {
+ int i;
+
spin_lock_irq(&wq->hash->wait.lock);
- list_del_init(&wq->wqes[node]->wait.entry);
+ for (i = 0; i < IO_WQ_ACCT_NR; i++)
+ list_del_init(&wq->wqes[node]->acct[i].wait.entry);
spin_unlock_irq(&wq->hash->wait.lock);
}
put_task_struct(wq->task);
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 6/9] io-wq: add infra data structure for fixed workers
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
` (4 preceding siblings ...)
2021-11-24 4:46 ` [PATCH 5/9] io-wq: move hash wait entry to io_wqe_acct Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-24 4:46 ` [PATCH 7/9] io-wq: implement fixed worker logic Hao Xu
` (3 subsequent siblings)
9 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
Add data sttructure and basic initialization for fixed worker.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 63 ++++++++++++++++++++++++++++++++++++++----------------
1 file changed, 45 insertions(+), 18 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 44c3e344c5d6..fcdfbb904cdf 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -25,6 +25,7 @@ enum {
IO_WORKER_F_RUNNING = 2, /* account as running */
IO_WORKER_F_FREE = 4, /* worker on free list */
IO_WORKER_F_BOUND = 8, /* is doing bounded work */
+ IO_WORKER_F_FIXED = 16, /* is a fixed worker */
};
enum {
@@ -33,6 +34,35 @@ enum {
enum {
IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
+ IO_ACCT_IN_WORKER_BIT,
+};
+
+struct io_wqe_acct {
+ union {
+ unsigned int nr_workers;
+ unsigned int nr_works;
+ };
+ union {
+ unsigned int max_workers;
+ unsigned int max_works;
+ };
+ int index;
+ atomic_t nr_running;
+ raw_spinlock_t lock;
+ struct io_wq_work_list work_list;
+ unsigned long flags;
+
+ struct wait_queue_entry wait;
+ union {
+ struct io_wqe *wqe;
+ struct io_worker *worker;
+ };
+};
+
+enum {
+ IO_WQ_ACCT_BOUND,
+ IO_WQ_ACCT_UNBOUND,
+ IO_WQ_ACCT_NR,
};
/*
@@ -59,6 +89,9 @@ struct io_worker {
struct rcu_head rcu;
struct work_struct work;
};
+ bool fixed;
+ unsigned int index;
+ struct io_wqe_acct acct;
};
#if BITS_PER_LONG == 64
@@ -69,24 +102,6 @@ struct io_worker {
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
-struct io_wqe_acct {
- unsigned nr_workers;
- unsigned max_workers;
- int index;
- atomic_t nr_running;
- raw_spinlock_t lock;
- struct io_wq_work_list work_list;
- unsigned long flags;
- struct wait_queue_entry wait;
- struct io_wqe *wqe;
-};
-
-enum {
- IO_WQ_ACCT_BOUND,
- IO_WQ_ACCT_UNBOUND,
- IO_WQ_ACCT_NR,
-};
-
/*
* Per-node worker thread pool
*/
@@ -103,6 +118,12 @@ struct io_wqe {
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
cpumask_var_t cpu_mask;
+
+ raw_spinlock_t fixed_lock;
+ unsigned int max_fixed[IO_WQ_ACCT_NR];
+ unsigned int nr_fixed[IO_WQ_ACCT_NR];
+ unsigned int default_max_works[IO_WQ_ACCT_NR];
+ struct io_worker **fixed_workers[IO_WQ_ACCT_NR];
};
/*
@@ -1090,6 +1111,8 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
return 1;
}
+#define DEFAULT_MAX_FIXED_WORKERS 0
+#define DEFAULT_MAX_FIXED_WORKS 0
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{
int ret, node, i;
@@ -1141,9 +1164,12 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
INIT_LIST_HEAD(&acct->wait.entry);
acct->wait.func = io_wqe_hash_wake;
acct->wqe = wqe;
+ wqe->max_fixed[i] = DEFAULT_MAX_FIXED_WORKERS;
+ wqe->default_max_works[i] = DEFAULT_MAX_FIXED_WORKS;
}
wqe->wq = wq;
raw_spin_lock_init(&wqe->lock);
+ raw_spin_lock_init(&wqe->fixed_lock);
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
INIT_LIST_HEAD(&wqe->all_list);
}
@@ -1232,6 +1258,7 @@ static void io_wq_destroy(struct io_wq *wq)
};
io_wqe_cancel_pending_work(wqe, &match);
free_cpumask_var(wqe->cpu_mask);
+ kfree(wqe->fixed_workers);
kfree(wqe);
}
io_wq_put_hash(wq->hash);
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 7/9] io-wq: implement fixed worker logic
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
` (5 preceding siblings ...)
2021-11-24 4:46 ` [PATCH 6/9] io-wq: add infra data structure for fixed workers Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-24 4:46 ` [PATCH 8/9] io-wq: batch the handling of fixed worker private works Hao Xu
` (2 subsequent siblings)
9 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
The current implementation of io-wq has big spinlock contension. The
main reason is the single work list model. All producers(who insert
works) and consumers(io-workers) have to grap wqe->lock to move ahead.
Set max_worker to 3 or 4, do a fio read test, we can see 40%~50% lock
contension.
Introduce fixed io-workers which sticks there to handle works and have
their own work list.
previous:
producer0 ---insert---> work_list ---get---> io-worker0,1,2
now:
---> private work_list0 --get--> fixed-worker0
/
producer0 --insert----> private work_list1 --get--> fixed-worker1
| \
| ---> private work_list2 --get--> fixed-worker2
|
|---insert---> public work_list --get--> (normal)io-worker
Since each fixed-worker has a private work list, the contension will be
limited to a smaller range(the private work list).
Logic of fixed-worker: first handle private works then public ones.
Logic of normal io-worker: only handle public works.
Logic of producer: 1) create fixed-workers as needed
2) randomly pick a private work list and check if it
is full, insert the work if it's not
3) insert the work to the public work list if 2)
fails.
The get logic of a private list: fixed-worker grab all the works in
its private work list(like what tctx_task_work() does) each time rather
than one by one.(this code is in the next patches as a optimization)
To achieve this, we need to add an io_wqe_acct for each fixed-worker
struct. And move the work list lock to io_wqe_acct from io_wqe
Good things of this feature:
1) bound and unbound work lists now have different spinlocks.
2) much smaller contension between work producers and consumers.
3) fixed workers are friendly for users to control: binding cpus,
reset priority etc.
Wrote a nop test program to test it, set max number of fixed-workers to
3, max number of all workers to 4.
previous: IOPS = 40w~50w
now: IOPS = ~100w
And 50w is the peak IOPS the old version can achieve, while with proper
number of loop and depth(see test code in the cover letter) this (whole)
patchset can achieve ~160w IOPS. From perf result, almost no acct->lock
contension.
(The test program attached in the cover letter of this patchset)
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 186 ++++++++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 161 insertions(+), 25 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index fcdfbb904cdf..b53019d4691d 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -251,6 +251,17 @@ static void io_worker_exit(struct io_worker *worker)
current->flags &= ~PF_IO_WORKER;
preempt_enable();
+ if (worker->flags & IO_WORKER_F_FIXED) {
+ unsigned int index = worker->acct.index;
+
+ raw_spin_lock(&wqe->fixed_lock);
+ kfree(wqe->fixed_workers[worker->index]);
+ spin_lock_irq(&wq->hash->wait.lock);
+ list_del_init(&worker->acct.wait.entry);
+ spin_unlock_irq(&wq->hash->wait.lock);
+ wqe->nr_fixed[index]--;
+ raw_spin_unlock(&wqe->fixed_lock);
+ }
kfree_rcu(worker, rcu);
io_worker_ref_put(wqe->wq);
do_exit(0);
@@ -542,9 +553,8 @@ static void io_assign_current_work(struct io_worker *worker,
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
-static void io_worker_handle_work(struct io_worker *worker)
+static void io_worker_handle_work(struct io_worker *worker, struct io_wqe_acct *acct)
{
- struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
@@ -606,13 +616,24 @@ static void io_worker_handle_work(struct io_worker *worker)
} while (1);
}
+static inline void io_worker_handle_private_work(struct io_worker *worker)
+{
+ io_worker_handle_work(worker, &worker->acct);
+}
+
+static inline void io_worker_handle_public_work(struct io_worker *worker)
+{
+ io_worker_handle_work(worker, io_wqe_get_acct(worker));
+}
+
static int io_wqe_worker(void *data)
{
struct io_worker *worker = data;
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
- bool last_timeout = false;
+ bool fixed = worker->flags & IO_WORKER_F_FIXED;
+ bool last_timeout = false, run_private = false;
char buf[TASK_COMM_LEN];
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
@@ -625,18 +646,34 @@ static int io_wqe_worker(void *data)
set_current_state(TASK_INTERRUPTIBLE);
loop:
- if (io_acct_run_queue(acct)) {
- io_worker_handle_work(worker);
+ if (fixed) {
+ run_private = io_acct_run_queue(&worker->acct);
+ if (run_private) {
+ io_worker_handle_private_work(worker);
+ goto loop;
+ }
+ }
+ if (!run_private && io_acct_run_queue(acct)) {
+ io_worker_handle_public_work(worker);
goto loop;
}
raw_spin_lock(&wqe->lock);
- /* timed out, exit unless we're the last worker */
- if (last_timeout && acct->nr_workers > 1) {
+ raw_spin_lock(&wqe->fixed_lock);
+ /* timed out, a worker will exit only if:
+ * - not a fixed worker
+ * - not the last non-fixed worker
+ *
+ * the second condition is due to we need at least one worker to handle the
+ * public work list.
+ */
+ if (last_timeout && !fixed && acct->nr_workers > wqe->nr_fixed[acct->index] + 1) {
+ raw_spin_unlock(&wqe->fixed_lock);
acct->nr_workers--;
raw_spin_unlock(&wqe->lock);
__set_current_state(TASK_RUNNING);
break;
}
+ raw_spin_unlock(&wqe->fixed_lock);
last_timeout = false;
__io_worker_idle(wqe, worker);
raw_spin_unlock(&wqe->lock);
@@ -653,8 +690,11 @@ static int io_wqe_worker(void *data)
last_timeout = !ret;
}
- if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
- io_worker_handle_work(worker);
+ if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
+ if (fixed)
+ io_worker_handle_private_work(worker);
+ io_worker_handle_public_work(worker);
+ }
io_worker_exit(worker);
return 0;
@@ -696,9 +736,36 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
io_wqe_dec_running(worker);
}
+static void io_init_new_fixed_worker(struct io_wqe *wqe, struct io_worker *worker)
+{
+ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+ unsigned int index = acct->index;
+ unsigned int *nr_fixed, max_fixed;
+
+ raw_spin_lock(&wqe->fixed_lock);
+ nr_fixed = &wqe->nr_fixed[index];
+ max_fixed = wqe->max_fixed[index];
+ if (*nr_fixed < max_fixed) {
+ struct io_wqe_acct *iw_acct = &worker->acct;
+
+ worker->flags |= IO_WORKER_F_FIXED;
+ wqe->fixed_workers[index][*nr_fixed] = worker;
+ worker->index = *nr_fixed;
+ iw_acct->max_works = wqe->default_max_works[index];
+ iw_acct->index = index;
+ set_bit(IO_ACCT_IN_WORKER_BIT, &iw_acct->flags);
+ INIT_WQ_LIST(&iw_acct->work_list);
+ raw_spin_lock_init(&iw_acct->lock);
+ (*nr_fixed)++;
+ }
+ raw_spin_unlock(&wqe->fixed_lock);
+}
+
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
struct task_struct *tsk)
{
+ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+
tsk->pf_io_worker = worker;
worker->task = tsk;
set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
@@ -708,6 +775,8 @@ static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
worker->flags |= IO_WORKER_F_FREE;
+ if (acct->nr_workers > 1)
+ io_init_new_fixed_worker(wqe, worker);
raw_spin_unlock(&wqe->lock);
wake_up_new_task(tsk);
}
@@ -865,13 +934,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
} while (work);
}
-static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
+static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work,
+ struct io_wqe_acct *acct)
{
- struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned int hash;
struct io_wq_work *tail;
- if (!io_wq_is_hashed(work)) {
+ if (test_bit(IO_ACCT_IN_WORKER_BIT, &acct->flags) || !io_wq_is_hashed(work)) {
append:
wq_list_add_tail(&work->list, &acct->work_list);
return;
@@ -886,17 +955,50 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
wq_list_add_after(&work->list, &tail->list, &acct->work_list);
}
+static bool io_wqe_insert_private_work(struct io_wqe *wqe, struct io_wq_work *work, int index)
+{
+ bool needs_fixed_worker;
+ unsigned int nr_fixed, max_fixed;
+ struct io_worker *fixed_worker;
+ struct io_wqe_acct *iw_acct;
+ unsigned int fixed_worker_index;
+
+ raw_spin_lock(&wqe->fixed_lock);
+ nr_fixed = wqe->nr_fixed[index];
+ max_fixed = wqe->max_fixed[index];
+ needs_fixed_worker = nr_fixed < max_fixed;
+ if (nr_fixed && !needs_fixed_worker) {
+ fixed_worker_index = (unsigned long)work % nr_fixed;
+ fixed_worker = wqe->fixed_workers[index][fixed_worker_index];
+ iw_acct = &fixed_worker->acct;
+
+ raw_spin_lock(&iw_acct->lock);
+ if (iw_acct->nr_works < iw_acct->max_works) {
+ io_wqe_insert_work(wqe, work, iw_acct);
+ iw_acct->nr_works++;
+ raw_spin_unlock(&iw_acct->lock);
+ raw_spin_unlock(&wqe->fixed_lock);
+ wake_up_process(fixed_worker->task);
+ return false;
+ }
+ raw_spin_unlock(&iw_acct->lock);
+ }
+ raw_spin_unlock(&wqe->fixed_lock);
+ return needs_fixed_worker;
+}
+
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
return work == data;
}
+
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
struct io_cb_cancel_data match;
- unsigned work_flags = work->flags;
- bool do_create;
+ unsigned int work_flags = work->flags;
+ bool do_create, needs_fixed_worker;
/*
* If io-wq is exiting for this task, or if the request has explicitly
@@ -908,8 +1010,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
return;
}
+ needs_fixed_worker = io_wqe_insert_private_work(wqe, work, acct->index);
+
raw_spin_lock(&acct->lock);
- io_wqe_insert_work(wqe, work);
+ io_wqe_insert_work(wqe, work, acct);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
@@ -920,8 +1024,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
raw_spin_unlock(&wqe->lock);
- if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
- !atomic_read(&acct->nr_running))) {
+ if (needs_fixed_worker ||
+ (do_create &&
+ ((work_flags & IO_WQ_WORK_CONCURRENT) ||
+ !atomic_read(&acct->nr_running)))) {
bool did_create;
did_create = io_wqe_create_worker(wqe, acct);
@@ -985,9 +1091,9 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
struct io_wq_work *work,
- struct io_wq_work_node *prev)
+ struct io_wq_work_node *prev,
+ struct io_wqe_acct *acct)
{
- struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned int hash = io_get_work_hash(work);
struct io_wq_work *prev_work = NULL;
@@ -1014,7 +1120,7 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
- io_wqe_remove_pending(wqe, work, prev);
+ io_wqe_remove_pending(wqe, work, prev, acct);
raw_spin_unlock(&acct->lock);
io_run_cancel(work, wqe);
match->nr_pending++;
@@ -1029,17 +1135,32 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match)
{
- int i;
-retry:
+ int i, j;
+retry_public:
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
if (io_acct_cancel_pending_work(wqe, acct, match)) {
if (match->cancel_all)
- goto retry;
+ goto retry_public;
return;
}
}
+
+retry_private:
+ raw_spin_lock(&wqe->fixed_lock);
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ for (j = 0; j < wqe->nr_fixed[i]; j++) {
+ struct io_wqe_acct *acct = &wqe->fixed_workers[i][j]->acct;
+
+ if (io_acct_cancel_pending_work(wqe, acct, match)) {
+ if (match->cancel_all)
+ goto retry_private;
+ return;
+ }
+ }
+ }
+ raw_spin_unlock(&wqe->fixed_lock);
}
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
@@ -1102,6 +1223,11 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
list_del_init(&wait->entry);
ret = test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+ if (test_bit(IO_ACCT_IN_WORKER_BIT, &acct->flags)) {
+ wake_up_process(acct->worker->task);
+ return 1;
+ }
+
if (ret) {
rcu_read_lock();
io_wqe_activate_free_worker(acct->wqe, acct);
@@ -1145,6 +1271,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
if (!wqe)
goto err;
+
if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
goto err;
cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
@@ -1156,6 +1283,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = &wqe->acct[i];
+ struct io_worker **fixed_workers;
acct->index = i;
atomic_set(&acct->nr_running, 0);
@@ -1166,6 +1294,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
acct->wqe = wqe;
wqe->max_fixed[i] = DEFAULT_MAX_FIXED_WORKERS;
wqe->default_max_works[i] = DEFAULT_MAX_FIXED_WORKS;
+ fixed_workers = kzalloc_node(sizeof(struct io_worker *) * wqe->max_fixed[i],
+ GFP_KERNEL, alloc_node);
+ if (!fixed_workers)
+ goto err;
+ wqe->fixed_workers[i] = fixed_workers;
}
wqe->wq = wq;
raw_spin_lock_init(&wqe->lock);
@@ -1184,6 +1317,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
for_each_node(node) {
if (!wq->wqes[node])
continue;
+ for (i = 0; i < IO_WQ_ACCT_NR; i++)
+ kfree(wq->wqes[node]->fixed_workers[i]);
free_cpumask_var(wq->wqes[node]->cpu_mask);
kfree(wq->wqes[node]);
}
@@ -1246,7 +1381,7 @@ static void io_wq_exit_workers(struct io_wq *wq)
static void io_wq_destroy(struct io_wq *wq)
{
- int node;
+ int node, i;
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
@@ -1258,7 +1393,8 @@ static void io_wq_destroy(struct io_wq *wq)
};
io_wqe_cancel_pending_work(wqe, &match);
free_cpumask_var(wqe->cpu_mask);
- kfree(wqe->fixed_workers);
+ for (i = 0; i < IO_WQ_ACCT_NR; i++)
+ kfree(wqe->fixed_workers[i]);
kfree(wqe);
}
io_wq_put_hash(wq->hash);
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 8/9] io-wq: batch the handling of fixed worker private works
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
` (6 preceding siblings ...)
2021-11-24 4:46 ` [PATCH 7/9] io-wq: implement fixed worker logic Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-24 4:46 ` [PATCH 9/9] io-wq: small optimization for __io_worker_busy() Hao Xu
2021-11-25 15:09 ` [RFC 0/9] fixed worker: a new way to handle io works Pavel Begunkov
9 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
Let's reduce acct->lock contension by batching the handling of private
work list for fixed_workers.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 42 ++++++++++++++++++++++++++++++++----------
fs/io-wq.h | 5 +++++
2 files changed, 37 insertions(+), 10 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index b53019d4691d..097ea598bfe5 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -479,7 +479,7 @@ static void io_wait_on_hash(struct io_wqe_acct *acct, unsigned int hash)
}
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
- struct io_worker *worker)
+ struct io_worker *worker, bool needs_lock)
__must_hold(acct->lock)
{
struct io_wq_work_node *node, *prev;
@@ -487,14 +487,23 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
unsigned int stall_hash = -1U;
struct io_wqe *wqe = worker->wqe;
+ if (needs_lock)
+ raw_spin_lock(&acct->lock);
wq_list_for_each(node, prev, &acct->work_list) {
unsigned int hash;
work = container_of(node, struct io_wq_work, list);
+ /* hash optimization doesn't work for fixed_workers for now */
+ if (!needs_lock) {
+ wq_list_del(&acct->work_list, node, prev);
+ return work;
+ }
+
/* not hashed, can run anytime */
if (!io_wq_is_hashed(work)) {
wq_list_del(&acct->work_list, node, prev);
+ raw_spin_unlock(&acct->lock);
return work;
}
@@ -506,6 +515,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
wqe->hash_tail[hash] = NULL;
wq_list_cut(&acct->work_list, &tail->list, prev);
+ raw_spin_unlock(&acct->lock);
return work;
}
if (stall_hash == -1U)
@@ -515,15 +525,21 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
}
if (stall_hash != -1U) {
+ if (!needs_lock)
+ acct = &worker->acct;
/*
* Set this before dropping the lock to avoid racing with new
* work being added and clearing the stalled bit.
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
- raw_spin_unlock(&acct->lock);
+ if (needs_lock)
+ raw_spin_unlock(&acct->lock);
io_wait_on_hash(acct, stall_hash);
- raw_spin_lock(&acct->lock);
+ if (needs_lock)
+ raw_spin_lock(&acct->lock);
}
+ if (needs_lock)
+ raw_spin_unlock(&acct->lock);
return NULL;
}
@@ -553,7 +569,8 @@ static void io_assign_current_work(struct io_worker *worker,
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
-static void io_worker_handle_work(struct io_worker *worker, struct io_wqe_acct *acct)
+static void io_worker_handle_work(struct io_worker *worker, struct io_wqe_acct *acct,
+ bool needs_lock)
{
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
@@ -569,9 +586,7 @@ static void io_worker_handle_work(struct io_worker *worker, struct io_wqe_acct *
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
- raw_spin_lock(&acct->lock);
- work = io_get_next_work(acct, worker);
- raw_spin_unlock(&acct->lock);
+ work = io_get_next_work(acct, worker, needs_lock);
if (work) {
raw_spin_lock(&wqe->lock);
__io_worker_busy(wqe, worker, work);
@@ -604,7 +619,7 @@ static void io_worker_handle_work(struct io_worker *worker, struct io_wqe_acct *
if (linked)
io_wqe_enqueue(wqe, linked);
- if (hash != -1U && !next_hashed) {
+ if (needs_lock && hash != -1U && !next_hashed) {
clear_bit(hash, &wq->hash->map);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
if (wq_has_sleeper(&wq->hash->wait))
@@ -618,12 +633,19 @@ static void io_worker_handle_work(struct io_worker *worker, struct io_wqe_acct *
static inline void io_worker_handle_private_work(struct io_worker *worker)
{
- io_worker_handle_work(worker, &worker->acct);
+ struct io_wqe_acct acct;
+
+ raw_spin_lock(&worker->acct.lock);
+ acct = worker->acct;
+ wq_list_clean(&worker->acct.work_list);
+ worker->acct.nr_works = 0;
+ raw_spin_unlock(&worker->acct.lock);
+ io_worker_handle_work(worker, &acct, false);
}
static inline void io_worker_handle_public_work(struct io_worker *worker)
{
- io_worker_handle_work(worker, io_wqe_get_acct(worker));
+ io_worker_handle_work(worker, io_wqe_get_acct(worker), true);
}
static int io_wqe_worker(void *data)
diff --git a/fs/io-wq.h b/fs/io-wq.h
index 41bf37674a49..7c330264172b 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -40,6 +40,11 @@ struct io_wq_work_list {
(list)->first = NULL; \
} while (0)
+static inline void wq_list_clean(struct io_wq_work_list *list)
+{
+ list->first = list->last = NULL;
+}
+
static inline void wq_list_add_after(struct io_wq_work_node *node,
struct io_wq_work_node *pos,
struct io_wq_work_list *list)
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 9/9] io-wq: small optimization for __io_worker_busy()
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
` (7 preceding siblings ...)
2021-11-24 4:46 ` [PATCH 8/9] io-wq: batch the handling of fixed worker private works Hao Xu
@ 2021-11-24 4:46 ` Hao Xu
2021-11-25 15:09 ` [RFC 0/9] fixed worker: a new way to handle io works Pavel Begunkov
9 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-24 4:46 UTC (permalink / raw)
To: Jens Axboe; +Cc: io-uring, Pavel Begunkov, Joseph Qi
Let's change the worker state between free/running only when it's
necessary. This can reduce some lock contension.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 16 ++++++----------
1 file changed, 6 insertions(+), 10 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 097ea598bfe5..377c3e42a491 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -434,12 +434,11 @@ static void io_wqe_dec_running(struct io_worker *worker)
*/
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
struct io_wq_work *work)
- __must_hold(wqe->lock)
{
- if (worker->flags & IO_WORKER_F_FREE) {
- worker->flags &= ~IO_WORKER_F_FREE;
- hlist_nulls_del_init_rcu(&worker->nulls_node);
- }
+ raw_spin_lock(&wqe->lock);
+ worker->flags &= ~IO_WORKER_F_FREE;
+ hlist_nulls_del_init_rcu(&worker->nulls_node);
+ raw_spin_unlock(&wqe->lock);
}
/*
@@ -587,13 +586,10 @@ static void io_worker_handle_work(struct io_worker *worker, struct io_wqe_acct *
* clear the stalled flag.
*/
work = io_get_next_work(acct, worker, needs_lock);
- if (work) {
- raw_spin_lock(&wqe->lock);
+ if (work && (worker->flags & IO_WORKER_F_FREE))
__io_worker_busy(wqe, worker, work);
- raw_spin_unlock(&wqe->lock);
- } else {
+ else if (!work)
break;
- }
io_assign_current_work(worker, work);
__set_current_state(TASK_RUNNING);
--
2.24.4
^ permalink raw reply related [flat|nested] 16+ messages in thread
* Re: [PATCH 3/9] io-wq: update check condition for lock
2021-11-24 4:46 ` [PATCH 3/9] io-wq: update check condition for lock Hao Xu
@ 2021-11-25 14:47 ` Pavel Begunkov
2021-11-30 3:32 ` Hao Xu
0 siblings, 1 reply; 16+ messages in thread
From: Pavel Begunkov @ 2021-11-25 14:47 UTC (permalink / raw)
To: Hao Xu, Jens Axboe; +Cc: io-uring, Joseph Qi
On 11/24/21 04:46, Hao Xu wrote:
> Update sparse check since we changed the lock.
Shouldn't it be a part of one of the previous patches?
>
> Signed-off-by: Hao Xu <[email protected]>
> ---
> fs/io-wq.c | 5 +----
> 1 file changed, 1 insertion(+), 4 deletions(-)
>
> diff --git a/fs/io-wq.c b/fs/io-wq.c
> index 26ccc04797b7..443c34d9b326 100644
> --- a/fs/io-wq.c
> +++ b/fs/io-wq.c
> @@ -378,7 +378,6 @@ static bool io_queue_worker_create(struct io_worker *worker,
> }
>
> static void io_wqe_dec_running(struct io_worker *worker)
> - __must_hold(wqe->lock)
> {
> struct io_wqe_acct *acct = io_wqe_get_acct(worker);
> struct io_wqe *wqe = worker->wqe;
> @@ -449,7 +448,7 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
>
> static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
> struct io_worker *worker)
> - __must_hold(wqe->lock)
> + __must_hold(acct->lock)
> {
> struct io_wq_work_node *node, *prev;
> struct io_wq_work *work, *tail;
> @@ -523,7 +522,6 @@ static void io_assign_current_work(struct io_worker *worker,
> static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
>
> static void io_worker_handle_work(struct io_worker *worker)
> - __releases(wqe->lock)
> {
> struct io_wqe_acct *acct = io_wqe_get_acct(worker);
> struct io_wqe *wqe = worker->wqe;
> @@ -986,7 +984,6 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
> static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
> struct io_wqe_acct *acct,
> struct io_cb_cancel_data *match)
> - __releases(wqe->lock)
> {
> struct io_wq_work_node *node, *prev;
> struct io_wq_work *work;
>
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [RFC 0/9] fixed worker: a new way to handle io works
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
` (8 preceding siblings ...)
2021-11-24 4:46 ` [PATCH 9/9] io-wq: small optimization for __io_worker_busy() Hao Xu
@ 2021-11-25 15:09 ` Pavel Begunkov
2021-11-30 3:48 ` Hao Xu
9 siblings, 1 reply; 16+ messages in thread
From: Pavel Begunkov @ 2021-11-25 15:09 UTC (permalink / raw)
To: Hao Xu, Jens Axboe; +Cc: io-uring, Joseph Qi
On 11/24/21 04:46, Hao Xu wrote:
> There is big contension in current io-wq implementation. Introduce a new
> type io-worker called fixed-worker to solve this problem. it is also a
> new way to handle works. In this new system, works are dispatched to
> different private queues rather than a long shared queue.
It's really great to temper the contention here, even though it looks
we are stepping onto the path of reinventing all the optimisations
solved long ago in other thread pools. Work stealing is probably
the next, but guess it's inevitable :)
First four patchhes sound like a good idea, they will probably go
first. However, IIUC, the hashing is crucial and it's a must have.
Are you planning to add it? If not, is there an easy way to leave
hashing working even if hashed reqs not going through those new
per-worker queues? E.g. (if it's not already as this...)
if (hashed) {
// fixed workers don't support hashing, so go through the
// old path and place into the shared queue.
enqueue_shared_queue();
} else
enqueue_new_path();
And last note, just fyi, it's easier to sell patches if you put
numbers in the cover letter
> Hao Xu (9):
> io-wq: decouple work_list protection from the big wqe->lock
> io-wq: reduce acct->lock crossing functions lock/unlock
> io-wq: update check condition for lock
> io-wq: use IO_WQ_ACCT_NR rather than hardcoded number
> io-wq: move hash wait entry to io_wqe_acct
> io-wq: add infra data structure for fix workers
> io-wq: implement fixed worker logic
> io-wq: batch the handling of fixed worker private works
> io-wq: small optimization for __io_worker_busy()
>
> fs/io-wq.c | 415 ++++++++++++++++++++++++++++++++++++++---------------
> fs/io-wq.h | 5 +
> 2 files changed, 308 insertions(+), 112 deletions(-)
>
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [PATCH 3/9] io-wq: update check condition for lock
2021-11-25 14:47 ` Pavel Begunkov
@ 2021-11-30 3:32 ` Hao Xu
0 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-30 3:32 UTC (permalink / raw)
To: Pavel Begunkov, Jens Axboe; +Cc: io-uring, Joseph Qi
在 2021/11/25 下午10:47, Pavel Begunkov 写道:
> On 11/24/21 04:46, Hao Xu wrote:
>> Update sparse check since we changed the lock.
>
> Shouldn't it be a part of one of the previous patches?
Sure, that would be better.
>
>>
>> Signed-off-by: Hao Xu <[email protected]>
>> ---
>> fs/io-wq.c | 5 +----
>> 1 file changed, 1 insertion(+), 4 deletions(-)
>>
>> diff --git a/fs/io-wq.c b/fs/io-wq.c
>> index 26ccc04797b7..443c34d9b326 100644
>> --- a/fs/io-wq.c
>> +++ b/fs/io-wq.c
>> @@ -378,7 +378,6 @@ static bool io_queue_worker_create(struct
>> io_worker *worker,
>> }
>> static void io_wqe_dec_running(struct io_worker *worker)
>> - __must_hold(wqe->lock)
>> {
>> struct io_wqe_acct *acct = io_wqe_get_acct(worker);
>> struct io_wqe *wqe = worker->wqe;
>> @@ -449,7 +448,7 @@ static void io_wait_on_hash(struct io_wqe *wqe,
>> unsigned int hash)
>> static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
>> struct io_worker *worker)
>> - __must_hold(wqe->lock)
>> + __must_hold(acct->lock)
>> {
>> struct io_wq_work_node *node, *prev;
>> struct io_wq_work *work, *tail;
>> @@ -523,7 +522,6 @@ static void io_assign_current_work(struct
>> io_worker *worker,
>> static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work
>> *work);
>> static void io_worker_handle_work(struct io_worker *worker)
>> - __releases(wqe->lock)
>> {
>> struct io_wqe_acct *acct = io_wqe_get_acct(worker);
>> struct io_wqe *wqe = worker->wqe;
>> @@ -986,7 +984,6 @@ static inline void io_wqe_remove_pending(struct
>> io_wqe *wqe,
>> static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
>> struct io_wqe_acct *acct,
>> struct io_cb_cancel_data *match)
>> - __releases(wqe->lock)
>> {
>> struct io_wq_work_node *node, *prev;
>> struct io_wq_work *work;
>>
>
^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [RFC 0/9] fixed worker: a new way to handle io works
2021-11-25 15:09 ` [RFC 0/9] fixed worker: a new way to handle io works Pavel Begunkov
@ 2021-11-30 3:48 ` Hao Xu
0 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2021-11-30 3:48 UTC (permalink / raw)
To: Pavel Begunkov, Jens Axboe; +Cc: io-uring, Joseph Qi
在 2021/11/25 下午11:09, Pavel Begunkov 写道:
> On 11/24/21 04:46, Hao Xu wrote:
>> There is big contension in current io-wq implementation. Introduce a new
>> type io-worker called fixed-worker to solve this problem. it is also a
>> new way to handle works. In this new system, works are dispatched to
>> different private queues rather than a long shared queue.
>
> It's really great to temper the contention here, even though it looks
> we are stepping onto the path of reinventing all the optimisations
> solved long ago in other thread pools. Work stealing is probably
Hmm, hope io_uring can do it better, a powerful iowq! :)
> the next, but guess it's inevitable :)
Probably yes :)
>
> First four patchhes sound like a good idea, they will probably go
> first. However, IIUC, the hashing is crucial and it's a must have.
> Are you planning to add it? If not, is there an easy way to leave
I'm planning to add it, still need some time to make it robust.
> hashing working even if hashed reqs not going through those new
> per-worker queues? E.g. (if it's not already as this...)
>
> if (hashed) {
> // fixed workers don't support hashing, so go through the
> // old path and place into the shared queue.
> enqueue_shared_queue();
> } else
> enqueue_new_path();
>
Good idea.
> And last note, just fyi, it's easier to sell patches if you put
> numbers in the cover letter
Thanks Pavel, that's definitely clearer for people to review.
Cheers,
Hao
>
>
>> Hao Xu (9):
>> io-wq: decouple work_list protection from the big wqe->lock
>> io-wq: reduce acct->lock crossing functions lock/unlock
>> io-wq: update check condition for lock
>> io-wq: use IO_WQ_ACCT_NR rather than hardcoded number
>> io-wq: move hash wait entry to io_wqe_acct
>> io-wq: add infra data structure for fix workers
>> io-wq: implement fixed worker logic
>> io-wq: batch the handling of fixed worker private works
>> io-wq: small optimization for __io_worker_busy()
>>
>> fs/io-wq.c | 415 ++++++++++++++++++++++++++++++++++++++---------------
>> fs/io-wq.h | 5 +
>> 2 files changed, 308 insertions(+), 112 deletions(-)
>>
>
^ permalink raw reply [flat|nested] 16+ messages in thread
* [PATCH 8/9] io-wq: batch the handling of fixed worker private works
2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
@ 2022-04-20 10:39 ` Hao Xu
0 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2022-04-20 10:39 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov
From: Hao Xu <[email protected]>
Reduce acct->lock contension by batching the handling of private work
list for fixed_workers.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 42 +++++++++++++++++++++++++++++++++---------
fs/io-wq.h | 5 +++++
2 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 8fa5bfb298dc..807985249f62 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -539,8 +539,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
return ret;
}
+static inline void conditional_acct_lock(struct io_wqe_acct *acct,
+ bool needs_lock)
+{
+ if (needs_lock)
+ raw_spin_lock(&acct->lock);
+}
+
+static inline void conditional_acct_unlock(struct io_wqe_acct *acct,
+ bool needs_lock)
+{
+ if (needs_lock)
+ raw_spin_unlock(&acct->lock);
+}
+
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
- struct io_worker *worker)
+ struct io_worker *worker,
+ bool needs_lock)
__must_hold(acct->lock)
{
struct io_wq_work_node *node, *prev;
@@ -548,6 +563,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
unsigned int stall_hash = -1U;
struct io_wqe *wqe = worker->wqe;
+ conditional_acct_lock(acct, needs_lock);
wq_list_for_each(node, prev, &acct->work_list) {
unsigned int hash;
@@ -556,6 +572,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
/* not hashed, can run anytime */
if (!io_wq_is_hashed(work)) {
wq_list_del(&acct->work_list, node, prev);
+ conditional_acct_unlock(acct, needs_lock);
return work;
}
@@ -567,6 +584,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
wqe->hash_tail[hash] = NULL;
wq_list_cut(&acct->work_list, &tail->list, prev);
+ conditional_acct_unlock(acct, needs_lock);
return work;
}
if (stall_hash == -1U)
@@ -583,15 +601,16 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
* work being added and clearing the stalled bit.
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
- raw_spin_unlock(&acct->lock);
+ conditional_acct_unlock(acct, needs_lock);
unstalled = io_wait_on_hash(wqe, stall_hash);
- raw_spin_lock(&acct->lock);
+ conditional_acct_lock(acct, needs_lock);
if (unstalled) {
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
if (wq_has_sleeper(&wqe->wq->hash->wait))
wake_up(&wqe->wq->hash->wait);
}
}
+ conditional_acct_unlock(acct, needs_lock);
return NULL;
}
@@ -625,7 +644,7 @@ static void io_assign_current_work(struct io_worker *worker,
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
static void io_worker_handle_work(struct io_worker *worker,
- struct io_wqe_acct *acct)
+ struct io_wqe_acct *acct, bool needs_lock)
{
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
@@ -641,9 +660,7 @@ static void io_worker_handle_work(struct io_worker *worker,
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
- raw_spin_lock(&acct->lock);
- work = io_get_next_work(acct, worker);
- raw_spin_unlock(&acct->lock);
+ work = io_get_next_work(acct, worker, needs_lock);
if (work) {
__io_worker_busy(wqe, worker);
@@ -700,12 +717,19 @@ static void io_worker_handle_work(struct io_worker *worker,
static inline void io_worker_handle_private_work(struct io_worker *worker)
{
- io_worker_handle_work(worker, &worker->acct);
+ struct io_wqe_acct acct;
+
+ raw_spin_lock(&worker->acct.lock);
+ acct = worker->acct;
+ wq_list_clean(&worker->acct.work_list);
+ worker->acct.nr_works = 0;
+ raw_spin_unlock(&worker->acct.lock);
+ io_worker_handle_work(worker, &acct, false);
}
static inline void io_worker_handle_public_work(struct io_worker *worker)
{
- io_worker_handle_work(worker, io_wqe_get_acct(worker));
+ io_worker_handle_work(worker, io_wqe_get_acct(worker), true);
}
static int io_wqe_worker(void *data)
diff --git a/fs/io-wq.h b/fs/io-wq.h
index dbecd27656c7..98befe7b0081 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -40,6 +40,11 @@ struct io_wq_work_list {
(list)->first = NULL; \
} while (0)
+static inline void wq_list_clean(struct io_wq_work_list *list)
+{
+ list->first = list->last = NULL;
+}
+
static inline void wq_list_add_after(struct io_wq_work_node *node,
struct io_wq_work_node *pos,
struct io_wq_work_list *list)
--
2.36.0
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 8/9] io-wq: batch the handling of fixed worker private works
2022-04-29 10:18 [RFC v3 0/9] fixed worker Hao Xu
@ 2022-04-29 10:18 ` Hao Xu
0 siblings, 0 replies; 16+ messages in thread
From: Hao Xu @ 2022-04-29 10:18 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, Pavel Begunkov, linux-fsdevel, linux-kernel
From: Hao Xu <[email protected]>
Reduce acct->lock contension by batching the handling of private work
list for fixed_workers.
Signed-off-by: Hao Xu <[email protected]>
---
fs/io-wq.c | 42 +++++++++++++++++++++++++++++++++---------
fs/io-wq.h | 5 +++++
2 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index aaa9cea7d39a..df2d480395e8 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -540,8 +540,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
return ret;
}
+static inline void conditional_acct_lock(struct io_wqe_acct *acct,
+ bool needs_lock)
+{
+ if (needs_lock)
+ raw_spin_lock(&acct->lock);
+}
+
+static inline void conditional_acct_unlock(struct io_wqe_acct *acct,
+ bool needs_lock)
+{
+ if (needs_lock)
+ raw_spin_unlock(&acct->lock);
+}
+
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
- struct io_worker *worker)
+ struct io_worker *worker,
+ bool needs_lock)
__must_hold(acct->lock)
{
struct io_wq_work_node *node, *prev;
@@ -549,6 +564,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
unsigned int stall_hash = -1U;
struct io_wqe *wqe = worker->wqe;
+ conditional_acct_lock(acct, needs_lock);
wq_list_for_each(node, prev, &acct->work_list) {
unsigned int hash;
@@ -557,6 +573,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
/* not hashed, can run anytime */
if (!io_wq_is_hashed(work)) {
wq_list_del(&acct->work_list, node, prev);
+ conditional_acct_unlock(acct, needs_lock);
return work;
}
@@ -568,6 +585,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
wqe->hash_tail[hash] = NULL;
wq_list_cut(&acct->work_list, &tail->list, prev);
+ conditional_acct_unlock(acct, needs_lock);
return work;
}
if (stall_hash == -1U)
@@ -584,15 +602,16 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
* work being added and clearing the stalled bit.
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
- raw_spin_unlock(&acct->lock);
+ conditional_acct_unlock(acct, needs_lock);
unstalled = io_wait_on_hash(wqe, stall_hash);
- raw_spin_lock(&acct->lock);
+ conditional_acct_lock(acct, needs_lock);
if (unstalled) {
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
if (wq_has_sleeper(&wqe->wq->hash->wait))
wake_up(&wqe->wq->hash->wait);
}
}
+ conditional_acct_unlock(acct, needs_lock);
return NULL;
}
@@ -626,7 +645,7 @@ static void io_assign_current_work(struct io_worker *worker,
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
static void io_worker_handle_work(struct io_worker *worker,
- struct io_wqe_acct *acct)
+ struct io_wqe_acct *acct, bool needs_lock)
{
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
@@ -642,9 +661,7 @@ static void io_worker_handle_work(struct io_worker *worker,
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
- raw_spin_lock(&acct->lock);
- work = io_get_next_work(acct, worker);
- raw_spin_unlock(&acct->lock);
+ work = io_get_next_work(acct, worker, needs_lock);
if (work) {
__io_worker_busy(wqe, worker);
@@ -701,12 +718,19 @@ static void io_worker_handle_work(struct io_worker *worker,
static inline void io_worker_handle_private_work(struct io_worker *worker)
{
- io_worker_handle_work(worker, &worker->acct);
+ struct io_wqe_acct acct;
+
+ raw_spin_lock(&worker->acct.lock);
+ acct = worker->acct;
+ wq_list_clean(&worker->acct.work_list);
+ worker->acct.nr_works = 0;
+ raw_spin_unlock(&worker->acct.lock);
+ io_worker_handle_work(worker, &acct, false);
}
static inline void io_worker_handle_public_work(struct io_worker *worker)
{
- io_worker_handle_work(worker, io_wqe_get_acct(worker));
+ io_worker_handle_work(worker, io_wqe_get_acct(worker), true);
}
static int io_wqe_worker(void *data)
diff --git a/fs/io-wq.h b/fs/io-wq.h
index ba6eee76d028..ef3ce577e6b7 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -40,6 +40,11 @@ struct io_wq_work_list {
(list)->first = NULL; \
} while (0)
+static inline void wq_list_clean(struct io_wq_work_list *list)
+{
+ list->first = list->last = NULL;
+}
+
static inline void wq_list_add_after(struct io_wq_work_node *node,
struct io_wq_work_node *pos,
struct io_wq_work_list *list)
--
2.36.0
^ permalink raw reply related [flat|nested] 16+ messages in thread
end of thread, other threads:[~2022-04-29 10:19 UTC | newest]
Thread overview: 16+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2021-11-24 4:46 [RFC 0/9] fixed worker: a new way to handle io works Hao Xu
2021-11-24 4:46 ` [PATCH 1/9] io-wq: decouple work_list protection from the big wqe->lock Hao Xu
2021-11-24 4:46 ` [PATCH 2/9] io-wq: reduce acct->lock crossing functions lock/unlock Hao Xu
2021-11-24 4:46 ` [PATCH 3/9] io-wq: update check condition for lock Hao Xu
2021-11-25 14:47 ` Pavel Begunkov
2021-11-30 3:32 ` Hao Xu
2021-11-24 4:46 ` [PATCH 4/9] io-wq: use IO_WQ_ACCT_NR rather than hardcoded number Hao Xu
2021-11-24 4:46 ` [PATCH 5/9] io-wq: move hash wait entry to io_wqe_acct Hao Xu
2021-11-24 4:46 ` [PATCH 6/9] io-wq: add infra data structure for fixed workers Hao Xu
2021-11-24 4:46 ` [PATCH 7/9] io-wq: implement fixed worker logic Hao Xu
2021-11-24 4:46 ` [PATCH 8/9] io-wq: batch the handling of fixed worker private works Hao Xu
2021-11-24 4:46 ` [PATCH 9/9] io-wq: small optimization for __io_worker_busy() Hao Xu
2021-11-25 15:09 ` [RFC 0/9] fixed worker: a new way to handle io works Pavel Begunkov
2021-11-30 3:48 ` Hao Xu
-- strict thread matches above, loose matches on Subject: below --
2022-04-20 10:39 [RFC v2 0/9] fixed worker Hao Xu
2022-04-20 10:39 ` [PATCH 8/9] io-wq: batch the handling of fixed worker private works Hao Xu
2022-04-29 10:18 [RFC v3 0/9] fixed worker Hao Xu
2022-04-29 10:18 ` [PATCH 8/9] io-wq: batch the handling of fixed worker private works Hao Xu
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox