* [PATCHSET] Improve io_uring cancellations @ 2019-11-13 19:43 Jens Axboe 2019-11-13 19:43 ` [PATCH 1/2] io_wq: add get/put_work handlers to io_wq_create() Jens Axboe 2019-11-13 19:43 ` [PATCH 2/2] io-wq: ensure task is valid before sending it a signal Jens Axboe 0 siblings, 2 replies; 4+ messages in thread From: Jens Axboe @ 2019-11-13 19:43 UTC (permalink / raw) To: io-uring We have a few issues currently: - While the wqe->lock protects the stability of the worker->cur_work pointer, it does NOT stabilize the actual work item. For cancelling specific work, io_uring needs to dereference it to see if it matches. - While we know the worker structure won't go away under the RCU read lock, the worker might exit. It's not safe to wake up or signal this process without ensuring it's still alive. - We're not consistent in comparing worker->cur_work and sending a signal to cancel it. These two patches fix the above issues. The first is a prep patch that adds referencing of work items, which makes it safe to dereference ->cur_work if we ensure that ->cur_work itself is stable. The other patch reworks how we ensure that tasks are alive for signaling, and that we have a consistent view of ->cur_work while sending a signal. We add a new lock for ->cur_work, as we cannot safely use wqe->lock for this. See comment in patch 2 on signalfd usage. fs/io-wq.c | 91 +++++++++++++++++++++++++++++++++++++-------------- fs/io-wq.h | 7 +++- fs/io_uring.c | 17 +++++++++- 3 files changed, 89 insertions(+), 26 deletions(-) -- Jens Axboe ^ permalink raw reply [flat|nested] 4+ messages in thread
* [PATCH 1/2] io_wq: add get/put_work handlers to io_wq_create() 2019-11-13 19:43 [PATCHSET] Improve io_uring cancellations Jens Axboe @ 2019-11-13 19:43 ` Jens Axboe 2019-11-13 19:43 ` [PATCH 2/2] io-wq: ensure task is valid before sending it a signal Jens Axboe 1 sibling, 0 replies; 4+ messages in thread From: Jens Axboe @ 2019-11-13 19:43 UTC (permalink / raw) To: io-uring; +Cc: Jens Axboe For cancellation, we need to ensure that the work item stays valid for as long as ->cur_work is valid. Right now we can't safely dereference the work item even under the wqe->lock, because while the ->cur_work pointer will remain valid, the work could be completing and be freed in parallel. Only invoke ->get/put_work() on items we know that the caller queued themselves. Add IO_WQ_WORK_INTERNAL for io-wq to use, which is needed when we're queueing a flush item, for instance. Signed-off-by: Jens Axboe <[email protected]> --- fs/io-wq.c | 25 +++++++++++++++++++++++-- fs/io-wq.h | 7 ++++++- fs/io_uring.c | 17 ++++++++++++++++- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 33b14b85752b..26d81540c1fc 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -106,6 +106,9 @@ struct io_wq { unsigned long state; unsigned nr_wqes; + get_work_fn *get_work; + put_work_fn *put_work; + struct task_struct *manager; struct user_struct *user; struct mm_struct *mm; @@ -392,7 +395,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { - struct io_wq_work *work, *old_work; + struct io_wq_work *work, *old_work = NULL, *put_work = NULL; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; @@ -424,6 +427,8 @@ static void io_worker_handle_work(struct io_worker *worker) wqe->flags |= IO_WQE_FLAG_STALLED; spin_unlock_irq(&wqe->lock); + if (put_work && wq->put_work) + wq->put_work(old_work); if (!work) break; next: @@ -444,6 +449,11 @@ static void io_worker_handle_work(struct io_worker *worker) if (worker->mm) work->flags |= IO_WQ_WORK_HAS_MM; + if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) { + put_work = work; + wq->get_work(work); + } + old_work = work; work->func(&work); @@ -455,6 +465,12 @@ static void io_worker_handle_work(struct io_worker *worker) } if (work && work != old_work) { spin_unlock_irq(&wqe->lock); + + if (put_work && wq->put_work) { + wq->put_work(put_work); + put_work = NULL; + } + /* dependent work not hashed */ hash = -1U; goto next; @@ -950,13 +966,15 @@ void io_wq_flush(struct io_wq *wq) init_completion(&data.done); INIT_IO_WORK(&data.work, io_wq_flush_func); + data.work.flags |= IO_WQ_WORK_INTERNAL; io_wqe_enqueue(wqe, &data.work); wait_for_completion(&data.done); } } struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, - struct user_struct *user) + struct user_struct *user, get_work_fn *get_work, + put_work_fn *put_work) { int ret = -ENOMEM, i, node; struct io_wq *wq; @@ -972,6 +990,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, return ERR_PTR(-ENOMEM); } + wq->get_work = get_work; + wq->put_work = put_work; + /* caller must already hold a reference to this */ wq->user = user; diff --git a/fs/io-wq.h b/fs/io-wq.h index cc50754d028c..4b29f922f80c 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -10,6 +10,7 @@ enum { IO_WQ_WORK_NEEDS_USER = 8, IO_WQ_WORK_NEEDS_FILES = 16, IO_WQ_WORK_UNBOUND = 32, + IO_WQ_WORK_INTERNAL = 64, IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */ }; @@ -34,8 +35,12 @@ struct io_wq_work { (work)->files = NULL; \ } while (0) \ +typedef void (get_work_fn)(struct io_wq_work *); +typedef void (put_work_fn)(struct io_wq_work *); + struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, - struct user_struct *user); + struct user_struct *user, + get_work_fn *get_work, put_work_fn *put_work); void io_wq_destroy(struct io_wq *wq); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); diff --git a/fs/io_uring.c b/fs/io_uring.c index 99822bf89924..e1a3b8b667e0 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3822,6 +3822,20 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg, return done ? done : err; } +static void io_put_work(struct io_wq_work *work) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + + io_put_req(req); +} + +static void io_get_work(struct io_wq_work *work) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + + refcount_inc(&req->refs); +} + static int io_sq_offload_start(struct io_ring_ctx *ctx, struct io_uring_params *p) { @@ -3871,7 +3885,8 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx, /* Do QD, or 4 * CPUS, whatever is smallest */ concurrency = min(ctx->sq_entries, 4 * num_online_cpus()); - ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm, ctx->user); + ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm, ctx->user, + io_get_work, io_put_work); if (IS_ERR(ctx->io_wq)) { ret = PTR_ERR(ctx->io_wq); ctx->io_wq = NULL; -- 2.24.0 ^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 2/2] io-wq: ensure task is valid before sending it a signal 2019-11-13 19:43 [PATCHSET] Improve io_uring cancellations Jens Axboe 2019-11-13 19:43 ` [PATCH 1/2] io_wq: add get/put_work handlers to io_wq_create() Jens Axboe @ 2019-11-13 19:43 ` Jens Axboe 2019-11-13 19:55 ` Paul E. McKenney 1 sibling, 1 reply; 4+ messages in thread From: Jens Axboe @ 2019-11-13 19:43 UTC (permalink / raw) To: io-uring; +Cc: Jens Axboe, Paul E . McKenney While we're always under RCU read protection when finding the worker to signal, that only protects the worker from being freed. The task could very well be exiting, if we get unlucky enough. Same is true for ->cur_work, which is currently under protection by the wqe->lock that this worker belongs to. Add a specific worker lock that protects whether the task is exiting and also the current work item. Then we can guarantee that the task we're sending a signal is: 1) Currently processing the exact work we think it is 2) It's not currently exiting It's important to not use the wqe->lock for ->cur_work, as we can run into lock ordering issues with io_poll_wake() being called under the signal lock if we're polling a signal fd, and io_poll_wake() then needing to call io_wq_enqueue() which grabs wqe->lock. For cancel, the ordering is exactly the opposite. Reported-by: Paul E. McKenney <[email protected]> Signed-off-by: Jens Axboe <[email protected]> --- fs/io-wq.c | 66 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 26d81540c1fc..f035460b9776 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -49,7 +49,10 @@ struct io_worker { struct task_struct *task; wait_queue_head_t wait; struct io_wqe *wqe; + struct io_wq_work *cur_work; + spinlock_t lock; + int exiting; struct rcu_head rcu; struct mm_struct *mm; @@ -223,6 +226,10 @@ static void io_worker_exit(struct io_worker *worker) if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs)) complete(&wqe->wq->done); + spin_lock_irq(&worker->lock); + worker->exiting = true; + spin_unlock_irq(&worker->lock); + kfree_rcu(worker, rcu); } @@ -323,7 +330,6 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->busy_list.head); } - worker->cur_work = work; /* * If worker is moving from bound to unbound (or vice versa), then @@ -402,17 +408,6 @@ static void io_worker_handle_work(struct io_worker *worker) do { unsigned hash = -1U; - /* - * Signals are either sent to cancel specific work, or to just - * cancel all work items. For the former, ->cur_work must - * match. ->cur_work is NULL at this point, since we haven't - * assigned any work, so it's safe to flush signals for that - * case. For the latter case of cancelling all work, the caller - * wil have set IO_WQ_BIT_CANCEL. - */ - if (signal_pending(current)) - flush_signals(current); - /* * If we got some work, mark us as busy. If we didn't, but * the list isn't empty, it means we stalled on hashed work. @@ -432,6 +427,14 @@ static void io_worker_handle_work(struct io_worker *worker) if (!work) break; next: + /* flush any pending signals before assigning new work */ + if (signal_pending(current)) + flush_signals(current); + + spin_lock_irq(&worker->lock); + worker->cur_work = work; + spin_unlock_irq(&worker->lock); + if ((work->flags & IO_WQ_WORK_NEEDS_FILES) && current->files != work->files) { task_lock(current); @@ -457,8 +460,12 @@ static void io_worker_handle_work(struct io_worker *worker) old_work = work; work->func(&work); - spin_lock_irq(&wqe->lock); + spin_lock_irq(&worker->lock); worker->cur_work = NULL; + spin_unlock_irq(&worker->lock); + + spin_lock_irq(&wqe->lock); + if (hash != -1U) { wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; @@ -577,6 +584,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) worker->nulls_node.pprev = NULL; init_waitqueue_head(&worker->wait); worker->wqe = wqe; + spin_lock_init(&worker->lock); worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, "io_wqe_worker-%d/%d", index, wqe->node); @@ -721,7 +729,10 @@ void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val) static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) { - send_sig(SIGINT, worker->task, 1); + spin_lock_irq(&worker->lock); + if (!worker->exiting) + send_sig(SIGINT, worker->task, 1); + spin_unlock_irq(&worker->lock); return false; } @@ -783,7 +794,6 @@ struct io_cb_cancel_data { static bool io_work_cancel(struct io_worker *worker, void *cancel_data) { struct io_cb_cancel_data *data = cancel_data; - struct io_wqe *wqe = data->wqe; unsigned long flags; bool ret = false; @@ -791,13 +801,14 @@ static bool io_work_cancel(struct io_worker *worker, void *cancel_data) * Hold the lock to avoid ->cur_work going out of scope, caller * may deference the passed in work. */ - spin_lock_irqsave(&wqe->lock, flags); + spin_lock_irqsave(&worker->lock, flags); if (worker->cur_work && data->cancel(worker->cur_work, data->caller_data)) { - send_sig(SIGINT, worker->task, 1); + if (!worker->exiting) + send_sig(SIGINT, worker->task, 1); ret = true; } - spin_unlock_irqrestore(&wqe->lock, flags); + spin_unlock_irqrestore(&worker->lock, flags); return ret; } @@ -864,13 +875,21 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { struct io_wq_work *work = data; + unsigned long flags; + bool ret = false; + + if (worker->cur_work != work) + return false; + spin_lock_irqsave(&worker->lock, flags); if (worker->cur_work == work) { - send_sig(SIGINT, worker->task, 1); - return true; + if (!worker->exiting) + send_sig(SIGINT, worker->task, 1); + ret = true; } + spin_unlock_irqrestore(&worker->lock, flags); - return false; + return ret; } static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, @@ -1049,7 +1068,10 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, static bool io_wq_worker_wake(struct io_worker *worker, void *data) { - wake_up_process(worker->task); + spin_lock_irq(&worker->lock); + if (!worker->exiting) + wake_up_process(worker->task); + spin_unlock_irq(&worker->lock); return false; } -- 2.24.0 ^ permalink raw reply related [flat|nested] 4+ messages in thread
* Re: [PATCH 2/2] io-wq: ensure task is valid before sending it a signal 2019-11-13 19:43 ` [PATCH 2/2] io-wq: ensure task is valid before sending it a signal Jens Axboe @ 2019-11-13 19:55 ` Paul E. McKenney 0 siblings, 0 replies; 4+ messages in thread From: Paul E. McKenney @ 2019-11-13 19:55 UTC (permalink / raw) To: Jens Axboe; +Cc: io-uring On Wed, Nov 13, 2019 at 12:43:55PM -0700, Jens Axboe wrote: > While we're always under RCU read protection when finding the worker > to signal, that only protects the worker from being freed. The task > could very well be exiting, if we get unlucky enough. > > Same is true for ->cur_work, which is currently under protection by > the wqe->lock that this worker belongs to. Add a specific worker lock > that protects whether the task is exiting and also the current work > item. Then we can guarantee that the task we're sending a signal is: > > 1) Currently processing the exact work we think it is > 2) It's not currently exiting > > It's important to not use the wqe->lock for ->cur_work, as we can run > into lock ordering issues with io_poll_wake() being called under the > signal lock if we're polling a signal fd, and io_poll_wake() then > needing to call io_wq_enqueue() which grabs wqe->lock. For cancel, the > ordering is exactly the opposite. > > Reported-by: Paul E. McKenney <[email protected]> > Signed-off-by: Jens Axboe <[email protected]> Reviewed-by: Paul E. McKenney <[email protected]> > --- > fs/io-wq.c | 66 ++++++++++++++++++++++++++++++++++++------------------ > 1 file changed, 44 insertions(+), 22 deletions(-) > > diff --git a/fs/io-wq.c b/fs/io-wq.c > index 26d81540c1fc..f035460b9776 100644 > --- a/fs/io-wq.c > +++ b/fs/io-wq.c > @@ -49,7 +49,10 @@ struct io_worker { > struct task_struct *task; > wait_queue_head_t wait; > struct io_wqe *wqe; > + > struct io_wq_work *cur_work; > + spinlock_t lock; > + int exiting; > > struct rcu_head rcu; > struct mm_struct *mm; > @@ -223,6 +226,10 @@ static void io_worker_exit(struct io_worker *worker) > if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs)) > complete(&wqe->wq->done); > > + spin_lock_irq(&worker->lock); > + worker->exiting = true; > + spin_unlock_irq(&worker->lock); > + > kfree_rcu(worker, rcu); > } > > @@ -323,7 +330,6 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, > hlist_nulls_add_head_rcu(&worker->nulls_node, > &wqe->busy_list.head); > } > - worker->cur_work = work; > > /* > * If worker is moving from bound to unbound (or vice versa), then > @@ -402,17 +408,6 @@ static void io_worker_handle_work(struct io_worker *worker) > do { > unsigned hash = -1U; > > - /* > - * Signals are either sent to cancel specific work, or to just > - * cancel all work items. For the former, ->cur_work must > - * match. ->cur_work is NULL at this point, since we haven't > - * assigned any work, so it's safe to flush signals for that > - * case. For the latter case of cancelling all work, the caller > - * wil have set IO_WQ_BIT_CANCEL. > - */ > - if (signal_pending(current)) > - flush_signals(current); > - > /* > * If we got some work, mark us as busy. If we didn't, but > * the list isn't empty, it means we stalled on hashed work. > @@ -432,6 +427,14 @@ static void io_worker_handle_work(struct io_worker *worker) > if (!work) > break; > next: > + /* flush any pending signals before assigning new work */ > + if (signal_pending(current)) > + flush_signals(current); > + > + spin_lock_irq(&worker->lock); > + worker->cur_work = work; > + spin_unlock_irq(&worker->lock); > + > if ((work->flags & IO_WQ_WORK_NEEDS_FILES) && > current->files != work->files) { > task_lock(current); > @@ -457,8 +460,12 @@ static void io_worker_handle_work(struct io_worker *worker) > old_work = work; > work->func(&work); > > - spin_lock_irq(&wqe->lock); > + spin_lock_irq(&worker->lock); > worker->cur_work = NULL; > + spin_unlock_irq(&worker->lock); > + > + spin_lock_irq(&wqe->lock); > + > if (hash != -1U) { > wqe->hash_map &= ~BIT_ULL(hash); > wqe->flags &= ~IO_WQE_FLAG_STALLED; > @@ -577,6 +584,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) > worker->nulls_node.pprev = NULL; > init_waitqueue_head(&worker->wait); > worker->wqe = wqe; > + spin_lock_init(&worker->lock); > > worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, > "io_wqe_worker-%d/%d", index, wqe->node); > @@ -721,7 +729,10 @@ void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val) > > static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) > { > - send_sig(SIGINT, worker->task, 1); > + spin_lock_irq(&worker->lock); > + if (!worker->exiting) > + send_sig(SIGINT, worker->task, 1); > + spin_unlock_irq(&worker->lock); > return false; > } > > @@ -783,7 +794,6 @@ struct io_cb_cancel_data { > static bool io_work_cancel(struct io_worker *worker, void *cancel_data) > { > struct io_cb_cancel_data *data = cancel_data; > - struct io_wqe *wqe = data->wqe; > unsigned long flags; > bool ret = false; > > @@ -791,13 +801,14 @@ static bool io_work_cancel(struct io_worker *worker, void *cancel_data) > * Hold the lock to avoid ->cur_work going out of scope, caller > * may deference the passed in work. > */ > - spin_lock_irqsave(&wqe->lock, flags); > + spin_lock_irqsave(&worker->lock, flags); > if (worker->cur_work && > data->cancel(worker->cur_work, data->caller_data)) { > - send_sig(SIGINT, worker->task, 1); > + if (!worker->exiting) > + send_sig(SIGINT, worker->task, 1); > ret = true; > } > - spin_unlock_irqrestore(&wqe->lock, flags); > + spin_unlock_irqrestore(&worker->lock, flags); > > return ret; > } > @@ -864,13 +875,21 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, > static bool io_wq_worker_cancel(struct io_worker *worker, void *data) > { > struct io_wq_work *work = data; > + unsigned long flags; > + bool ret = false; > + > + if (worker->cur_work != work) > + return false; > > + spin_lock_irqsave(&worker->lock, flags); > if (worker->cur_work == work) { > - send_sig(SIGINT, worker->task, 1); > - return true; > + if (!worker->exiting) > + send_sig(SIGINT, worker->task, 1); > + ret = true; > } > + spin_unlock_irqrestore(&worker->lock, flags); > > - return false; > + return ret; > } > > static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, > @@ -1049,7 +1068,10 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, > > static bool io_wq_worker_wake(struct io_worker *worker, void *data) > { > - wake_up_process(worker->task); > + spin_lock_irq(&worker->lock); > + if (!worker->exiting) > + wake_up_process(worker->task); > + spin_unlock_irq(&worker->lock); > return false; > } > > -- > 2.24.0 > ^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2019-11-13 19:55 UTC | newest] Thread overview: 4+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- 2019-11-13 19:43 [PATCHSET] Improve io_uring cancellations Jens Axboe 2019-11-13 19:43 ` [PATCH 1/2] io_wq: add get/put_work handlers to io_wq_create() Jens Axboe 2019-11-13 19:43 ` [PATCH 2/2] io-wq: ensure task is valid before sending it a signal Jens Axboe 2019-11-13 19:55 ` Paul E. McKenney
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox