* [PATCH 1/2] io-wq: pass in io_wq to work handler
2019-11-06 19:40 [PATCHSET 0/2] Add unbounded io_wq for io_uring Jens Axboe
@ 2019-11-06 19:40 ` Jens Axboe
2019-11-06 19:40 ` [PATCH 2/2] io_uring: use separate io_wq's for bounded and unbounded request times Jens Axboe
1 sibling, 0 replies; 3+ messages in thread
From: Jens Axboe @ 2019-11-06 19:40 UTC (permalink / raw)
To: io-uring; +Cc: linux-block, Jens Axboe
This is the io_wq that the work is executing on. No functional changes
in this patch, we'll need this in a future patch.
Signed-off-by: Jens Axboe <[email protected]>
---
fs/io-wq.c | 8 ++++----
fs/io-wq.h | 2 +-
fs/io_uring.c | 6 +++---
3 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/fs/io-wq.c b/fs/io-wq.c
index ba40a7ee31c3..4ebbdd068ebf 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -318,7 +318,7 @@ static void io_worker_handle_work(struct io_worker *worker)
work->flags |= IO_WQ_WORK_HAS_MM;
old_work = work;
- work->func(&work);
+ work->func(wq, &work);
spin_lock_irq(&wqe->lock);
worker->cur_work = NULL;
@@ -685,7 +685,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
if (found) {
work->flags |= IO_WQ_WORK_CANCEL;
- work->func(&work);
+ work->func(wqe->wq, &work);
return IO_WQ_CANCEL_OK;
}
@@ -757,7 +757,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
if (found) {
work->flags |= IO_WQ_WORK_CANCEL;
- work->func(&work);
+ work->func(wqe->wq, &work);
return IO_WQ_CANCEL_OK;
}
@@ -801,7 +801,7 @@ struct io_wq_flush_data {
struct completion done;
};
-static void io_wq_flush_func(struct io_wq_work **workptr)
+static void io_wq_flush_func(struct io_wq *wq, struct io_wq_work **workptr)
{
struct io_wq_work *work = *workptr;
struct io_wq_flush_data *data;
diff --git a/fs/io-wq.h b/fs/io-wq.h
index 3de192dc73fc..9fe8c97bcbd2 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -21,7 +21,7 @@ enum io_wq_cancel {
struct io_wq_work {
struct list_head list;
- void (*func)(struct io_wq_work **);
+ void (*func)(struct io_wq *, struct io_wq_work **);
unsigned flags;
struct files_struct *files;
};
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 6c13411896b5..ad452be9f3bc 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -369,7 +369,7 @@ struct io_submit_state {
unsigned int ios_left;
};
-static void io_wq_submit_work(struct io_wq_work **workptr);
+static void io_wq_submit_work(struct io_wq *wq, struct io_wq_work **workptr);
static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
long res);
static void __io_free_req(struct io_kiocb *req);
@@ -1930,7 +1930,7 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
io_commit_cqring(ctx);
}
-static void io_poll_complete_work(struct io_wq_work **workptr)
+static void io_poll_complete_work(struct io_wq *wq, struct io_wq_work **workptr)
{
struct io_wq_work *work = *workptr;
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
@@ -2436,7 +2436,7 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
return 0;
}
-static void io_wq_submit_work(struct io_wq_work **workptr)
+static void io_wq_submit_work(struct io_wq *wq, struct io_wq_work **workptr)
{
struct io_wq_work *work = *workptr;
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
--
2.24.0
^ permalink raw reply related [flat|nested] 3+ messages in thread
* [PATCH 2/2] io_uring: use separate io_wq's for bounded and unbounded request times
2019-11-06 19:40 [PATCHSET 0/2] Add unbounded io_wq for io_uring Jens Axboe
2019-11-06 19:40 ` [PATCH 1/2] io-wq: pass in io_wq to work handler Jens Axboe
@ 2019-11-06 19:40 ` Jens Axboe
1 sibling, 0 replies; 3+ messages in thread
From: Jens Axboe @ 2019-11-06 19:40 UTC (permalink / raw)
To: io-uring; +Cc: linux-block, Jens Axboe
We currently have just the one io_wq for the ctx, which is used for
both disk and network IO. Generally it can be a problem to mix the two,
since disk IO is bounded in time, and network IO is not. This can lead
to situations where we could tie up all workers with unbounded work,
leaving no room for bounded work.
Add a separate io_wq for unbounded work times.
Signed-off-by: Jens Axboe <[email protected]>
---
fs/io_uring.c | 139 ++++++++++++++++++++++++++++++++++++--------------
1 file changed, 102 insertions(+), 37 deletions(-)
diff --git a/fs/io_uring.c b/fs/io_uring.c
index ad452be9f3bc..4418139c1fce 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -213,7 +213,7 @@ struct io_ring_ctx {
} ____cacheline_aligned_in_smp;
/* IO offload */
- struct io_wq *io_wq;
+ struct io_wq *io_wq[2];
struct task_struct *sqo_thread; /* if using sq thread polling */
struct mm_struct *sqo_mm;
wait_queue_head_t sqo_wait;
@@ -496,37 +496,55 @@ static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
opcode == IORING_OP_WRITE_FIXED);
}
-static inline bool io_prep_async_work(struct io_kiocb *req)
+static inline struct io_wq *io_prep_async_work(struct io_ring_ctx *ctx,
+ struct io_kiocb *req,
+ bool *do_hashed)
{
- bool do_hashed = false;
+ struct io_wq *wq = ctx->io_wq[0];
+ *do_hashed = false;
if (req->submit.sqe) {
switch (req->submit.sqe->opcode) {
case IORING_OP_WRITEV:
case IORING_OP_WRITE_FIXED:
- do_hashed = true;
+ *do_hashed = true;
+ /* fall-through */
+ case IORING_OP_READV:
+ case IORING_OP_READ_FIXED:
+ case IORING_OP_SENDMSG:
+ case IORING_OP_RECVMSG:
+ case IORING_OP_ACCEPT:
+ case IORING_OP_POLL_ADD:
+ /*
+ * We know REQ_F_ISREG is not set on some of these
+ * opcodes, but this enables us to keep the check in
+ * just one place.
+ */
+ if (!(req->flags & REQ_F_ISREG))
+ wq = ctx->io_wq[1];
break;
}
if (io_sqe_needs_user(req->submit.sqe))
req->work.flags |= IO_WQ_WORK_NEEDS_USER;
}
- return do_hashed;
+ return wq;
}
static inline void io_queue_async_work(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
- bool do_hashed = io_prep_async_work(req);
+ struct io_wq *wq;
+ bool do_hashed;
+
+ wq = io_prep_async_work(ctx, req, &do_hashed);
trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
req->flags);
- if (!do_hashed) {
- io_wq_enqueue(ctx->io_wq, &req->work);
- } else {
- io_wq_enqueue_hashed(ctx->io_wq, &req->work,
- file_inode(req->file));
- }
+ if (!do_hashed)
+ io_wq_enqueue(wq, &req->work);
+ else
+ io_wq_enqueue_hashed(wq, &req->work, file_inode(req->file));
}
static void io_kill_timeout(struct io_kiocb *req)
@@ -2282,12 +2300,12 @@ static bool io_cancel_cb(struct io_wq_work *work, void *data)
return req->user_data == (unsigned long) data;
}
-static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
+static int io_async_cancel_one(struct io_wq *wq, void *sqe_addr)
{
enum io_wq_cancel cancel_ret;
int ret = 0;
- cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
+ cancel_ret = io_wq_cancel_cb(wq, io_cancel_cb, sqe_addr);
switch (cancel_ret) {
case IO_WQ_CANCEL_OK:
ret = 0;
@@ -2308,7 +2326,7 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
{
struct io_ring_ctx *ctx = req->ctx;
void *sqe_addr;
- int ret;
+ int i, ret;
if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
@@ -2317,7 +2335,11 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
return -EINVAL;
sqe_addr = (void *) (unsigned long) READ_ONCE(sqe->addr);
- ret = io_async_cancel_one(ctx, sqe_addr);
+ for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++) {
+ ret = io_async_cancel_one(ctx->io_wq[i], sqe_addr);
+ if (ret != IO_WQ_CANCEL_NOTFOUND)
+ break;
+ }
if (ret < 0 && (req->flags & REQ_F_LINK))
req->flags |= REQ_F_FAIL_LINK;
@@ -2481,10 +2503,20 @@ static void io_wq_submit_work(struct io_wq *wq, struct io_wq_work **workptr)
/* async context always use a copy of the sqe */
kfree(sqe);
- /* if a dependent link is ready, pass it back */
+ /*
+ * If a dependent link is ready and is on the same io_wq as us, pass it
+ * back for immediate execution. If it's on a different io_wq, enqueue
+ * it separately.
+ */
if (!ret && nxt) {
- io_prep_async_work(nxt);
- *workptr = &nxt->work;
+ struct io_wq *nxt_wq;
+ bool do_hashed;
+
+ nxt_wq = io_prep_async_work(ctx, nxt, &do_hashed);
+ if (nxt_wq == wq)
+ *workptr = &nxt->work;
+ else
+ io_wq_enqueue(nxt_wq, &nxt->work);
}
}
@@ -2598,8 +2630,13 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
spin_unlock_irqrestore(&ctx->completion_lock, flags);
- if (prev)
- ret = io_async_cancel_one(ctx, (void *) prev->user_data);
+ if (prev) {
+ struct io_wq *wq;
+ bool tmp;
+
+ wq = io_prep_async_work(ctx, prev, &tmp);
+ ret = io_async_cancel_one(wq, (void *) prev->user_data);
+ }
io_cqring_add_event(ctx, req->user_data, ret);
io_put_req(req, NULL);
@@ -3274,11 +3311,15 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
static void io_finish_async(struct io_ring_ctx *ctx)
{
+ int i;
+
io_sq_thread_stop(ctx);
- if (ctx->io_wq) {
- io_wq_destroy(ctx->io_wq);
- ctx->io_wq = NULL;
+ for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++) {
+ if (ctx->io_wq[i]) {
+ io_wq_destroy(ctx->io_wq[i]);
+ ctx->io_wq[i] = NULL;
+ }
}
}
@@ -3286,9 +3327,11 @@ static void io_finish_async(struct io_ring_ctx *ctx)
static void io_destruct_skb(struct sk_buff *skb)
{
struct io_ring_ctx *ctx = skb->sk->sk_user_data;
+ int i;
- if (ctx->io_wq)
- io_wq_flush(ctx->io_wq);
+ for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+ if (ctx->io_wq[i])
+ io_wq_flush(ctx->io_wq[i]);
unix_destruct_scm(skb);
}
@@ -3731,12 +3774,27 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
goto err;
}
- /* Do QD, or 4 * CPUS, whatever is smallest */
+ /*
+ * This is for the bounded time requests, generally disk IO.
+ * Do QD, or 4 * CPUS, whatever is smallest
+ */
concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
- ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm);
- if (IS_ERR(ctx->io_wq)) {
- ret = PTR_ERR(ctx->io_wq);
- ctx->io_wq = NULL;
+ ctx->io_wq[0] = io_wq_create(concurrency, ctx->sqo_mm);
+ if (IS_ERR(ctx->io_wq[0])) {
+ ret = PTR_ERR(ctx->io_wq[0]);
+ ctx->io_wq[0] = NULL;
+ goto err;
+ }
+
+ /*
+ * This pool is for unbounded request times, things that could
+ * take an indeterminite amount of time to complete. Use a separate
+ * pool for those, to provide fairness with the bounded queue.
+ */
+ ctx->io_wq[1] = io_wq_create(ctx->cq_entries, ctx->sqo_mm);
+ if (IS_ERR(ctx->io_wq[1])) {
+ ret = PTR_ERR(ctx->io_wq[1]);
+ ctx->io_wq[1] = NULL;
goto err;
}
@@ -4114,6 +4172,8 @@ static int io_uring_fasync(int fd, struct file *file, int on)
static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
{
+ int i;
+
mutex_lock(&ctx->uring_lock);
percpu_ref_kill(&ctx->refs);
mutex_unlock(&ctx->uring_lock);
@@ -4121,8 +4181,9 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
io_kill_timeouts(ctx);
io_poll_remove_all(ctx);
- if (ctx->io_wq)
- io_wq_cancel_all(ctx->io_wq);
+ for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+ if (ctx->io_wq[i])
+ io_wq_cancel_all(ctx->io_wq[i]);
io_iopoll_reap_events(ctx);
wait_for_completion(&ctx->ctx_done);
@@ -4138,7 +4199,7 @@ static int io_uring_release(struct inode *inode, struct file *file)
return 0;
}
-static void io_uring_cancel_files(struct io_ring_ctx *ctx,
+static void io_uring_cancel_files(struct io_ring_ctx *ctx, struct io_wq *wq,
struct files_struct *files)
{
struct io_kiocb *req;
@@ -4150,7 +4211,7 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
spin_lock_irq(&ctx->inflight_lock);
list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
if (req->work.files == files) {
- ret = io_wq_cancel_work(ctx->io_wq, &req->work);
+ ret = io_wq_cancel_work(wq, &req->work);
break;
}
}
@@ -4178,10 +4239,14 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
static int io_uring_flush(struct file *file, void *data)
{
struct io_ring_ctx *ctx = file->private_data;
+ int i;
+
+ for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+ io_uring_cancel_files(ctx, ctx->io_wq[i], data);
- io_uring_cancel_files(ctx, data);
if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
- io_wq_cancel_all(ctx->io_wq);
+ for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+ io_wq_cancel_all(ctx->io_wq[i]);
return 0;
}
--
2.24.0
^ permalink raw reply related [flat|nested] 3+ messages in thread