From 39caf24bc3645a5c608c070652e3a5e9385232c7 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Fri, 17 Dec 2021 13:44:22 -0700 Subject: [PATCH 2/2] io-wq: pass back cancel information from worker creation path Don't call cancel directly deep inside the worker creation, pass back whether to cancel to the caller which can then do so from a saner context. We have two paths that currently do this, and one does so while holding a lock we may need on cancelation. Fixes: d800c65c2d4e ("io-wq: drop wqe lock before creating new worker") Signed-off-by: Jens Axboe --- fs/io-wq.c | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index f261fb700cfc..139eecd89e72 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -137,7 +137,7 @@ struct io_cb_cancel_data { }; static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); -static void io_wqe_dec_running(struct io_worker *worker); +static bool io_wqe_dec_running(struct io_worker *worker); static bool io_acct_cancel_pending_work(struct io_wqe *wqe, struct io_wqe_acct *acct, struct io_cb_cancel_data *match); @@ -206,6 +206,7 @@ static void io_worker_exit(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; + bool cancel; while (1) { struct callback_head *cb = task_work_cancel_match(wq->task, @@ -224,12 +225,15 @@ static void io_worker_exit(struct io_worker *worker) hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); preempt_disable(); - io_wqe_dec_running(worker); + cancel = io_wqe_dec_running(worker); worker->flags = 0; current->flags &= ~PF_IO_WORKER; preempt_enable(); raw_spin_unlock(&wqe->lock); + if (cancel) + io_wq_cancel_tw_create(wq); + kfree_rcu(worker, rcu); io_worker_ref_put(wqe->wq); do_exit(0); @@ -336,13 +340,17 @@ static void create_worker_cb(struct callback_head *cb) io_worker_release(worker); } -static void io_queue_worker_create(struct io_worker *worker, +/* + * Returns true if the caller should call io_wq_cancel_tw_create + */ +static bool io_queue_worker_create(struct io_worker *worker, struct io_wqe_acct *acct, task_work_func_t func, bool free_worker_on_error) { struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; + bool ret = false; /* raced with exit, just ignore create call */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) @@ -370,7 +378,7 @@ static void io_queue_worker_create(struct io_worker *worker, * work item after we canceled in io_wq_exit_workers(). */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) - io_wq_cancel_tw_create(wq); + ret = true; goto fail_wq_put; } io_worker_ref_put(wq); @@ -383,24 +391,28 @@ static void io_queue_worker_create(struct io_worker *worker, io_worker_ref_put(wq); if (free_worker_on_error) kfree(worker); + return ret; } -static void io_wqe_dec_running(struct io_worker *worker) +/* + * Returns true if the caller should call io_wq_cancel_tw_create + */ +static bool io_wqe_dec_running(struct io_worker *worker) __must_hold(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; if (!(worker->flags & IO_WORKER_F_UP)) - return; + return false; if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { atomic_inc(&acct->nr_running); atomic_inc(&wqe->wq->worker_refs); - raw_spin_unlock(&wqe->lock); - io_queue_worker_create(worker, acct, create_worker_cb, false); - raw_spin_lock(&wqe->lock); + return io_queue_worker_create(worker, acct, create_worker_cb, false); } + + return false; } /* @@ -691,6 +703,8 @@ void io_wq_worker_running(struct task_struct *tsk) void io_wq_worker_sleeping(struct task_struct *tsk) { struct io_worker *worker = tsk->pf_io_worker; + struct io_wqe *wqe = worker->wqe; + bool cancel; if (!worker) return; @@ -701,9 +715,11 @@ void io_wq_worker_sleeping(struct task_struct *tsk) worker->flags &= ~IO_WORKER_F_RUNNING; - raw_spin_lock(&worker->wqe->lock); - io_wqe_dec_running(worker); - raw_spin_unlock(&worker->wqe->lock); + raw_spin_lock(&wqe->lock); + cancel = io_wqe_dec_running(worker); + raw_spin_unlock(&wqe->lock); + if (cancel) + io_wq_cancel_tw_create(wqe->wq); } static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, @@ -791,8 +807,12 @@ static void io_workqueue_create(struct work_struct *work) { struct io_worker *worker = container_of(work, struct io_worker, work); struct io_wqe_acct *acct = io_wqe_get_acct(worker); + struct io_wq *wq = worker->wqe->wq; + bool cancel; - io_queue_worker_create(worker, acct, create_worker_cont, true); + cancel = io_queue_worker_create(worker, acct, create_worker_cont, true); + if (cancel) + io_wq_cancel_tw_create(wq); } static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) -- 2.34.1