public inbox for [email protected]
 help / color / mirror / Atom feed
From: Jens Axboe <[email protected]>
To: [email protected]
Cc: [email protected], Jens Axboe <[email protected]>
Subject: [PATCH 2/2] io_uring: use separate io_wq's for bounded and unbounded request times
Date: Wed,  6 Nov 2019 12:40:40 -0700	[thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>

We currently have just the one io_wq for the ctx, which is used for
both disk and network IO. Generally it can be a problem to mix the two,
since disk IO is bounded in time, and network IO is not. This can lead
to situations where we could tie up all workers with unbounded work,
leaving no room for bounded work.

Add a separate io_wq for unbounded work times.

Signed-off-by: Jens Axboe <[email protected]>
---
 fs/io_uring.c | 139 ++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 102 insertions(+), 37 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index ad452be9f3bc..4418139c1fce 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -213,7 +213,7 @@ struct io_ring_ctx {
 	} ____cacheline_aligned_in_smp;
 
 	/* IO offload */
-	struct io_wq		*io_wq;
+	struct io_wq		*io_wq[2];
 	struct task_struct	*sqo_thread;	/* if using sq thread polling */
 	struct mm_struct	*sqo_mm;
 	wait_queue_head_t	sqo_wait;
@@ -496,37 +496,55 @@ static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
 		 opcode == IORING_OP_WRITE_FIXED);
 }
 
-static inline bool io_prep_async_work(struct io_kiocb *req)
+static inline struct io_wq *io_prep_async_work(struct io_ring_ctx *ctx,
+					       struct io_kiocb *req,
+					       bool *do_hashed)
 {
-	bool do_hashed = false;
+	struct io_wq *wq = ctx->io_wq[0];
 
+	*do_hashed = false;
 	if (req->submit.sqe) {
 		switch (req->submit.sqe->opcode) {
 		case IORING_OP_WRITEV:
 		case IORING_OP_WRITE_FIXED:
-			do_hashed = true;
+			*do_hashed = true;
+			/* fall-through */
+		case IORING_OP_READV:
+		case IORING_OP_READ_FIXED:
+		case IORING_OP_SENDMSG:
+		case IORING_OP_RECVMSG:
+		case IORING_OP_ACCEPT:
+		case IORING_OP_POLL_ADD:
+			/*
+			 * We know REQ_F_ISREG is not set on some of these
+			 * opcodes, but this enables us to keep the check in
+			 * just one place.
+			 */
+			if (!(req->flags & REQ_F_ISREG))
+				wq = ctx->io_wq[1];
 			break;
 		}
 		if (io_sqe_needs_user(req->submit.sqe))
 			req->work.flags |= IO_WQ_WORK_NEEDS_USER;
 	}
 
-	return do_hashed;
+	return wq;
 }
 
 static inline void io_queue_async_work(struct io_ring_ctx *ctx,
 				       struct io_kiocb *req)
 {
-	bool do_hashed = io_prep_async_work(req);
+	struct io_wq *wq;
+	bool do_hashed;
+
+	wq = io_prep_async_work(ctx, req, &do_hashed);
 
 	trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
 					req->flags);
-	if (!do_hashed) {
-		io_wq_enqueue(ctx->io_wq, &req->work);
-	} else {
-		io_wq_enqueue_hashed(ctx->io_wq, &req->work,
-					file_inode(req->file));
-	}
+	if (!do_hashed)
+		io_wq_enqueue(wq, &req->work);
+	else
+		io_wq_enqueue_hashed(wq, &req->work, file_inode(req->file));
 }
 
 static void io_kill_timeout(struct io_kiocb *req)
@@ -2282,12 +2300,12 @@ static bool io_cancel_cb(struct io_wq_work *work, void *data)
 	return req->user_data == (unsigned long) data;
 }
 
-static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
+static int io_async_cancel_one(struct io_wq *wq, void *sqe_addr)
 {
 	enum io_wq_cancel cancel_ret;
 	int ret = 0;
 
-	cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
+	cancel_ret = io_wq_cancel_cb(wq, io_cancel_cb, sqe_addr);
 	switch (cancel_ret) {
 	case IO_WQ_CANCEL_OK:
 		ret = 0;
@@ -2308,7 +2326,7 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 {
 	struct io_ring_ctx *ctx = req->ctx;
 	void *sqe_addr;
-	int ret;
+	int i, ret;
 
 	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
 		return -EINVAL;
@@ -2317,7 +2335,11 @@ static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		return -EINVAL;
 
 	sqe_addr = (void *) (unsigned long) READ_ONCE(sqe->addr);
-	ret = io_async_cancel_one(ctx, sqe_addr);
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++) {
+		ret = io_async_cancel_one(ctx->io_wq[i], sqe_addr);
+		if (ret != IO_WQ_CANCEL_NOTFOUND)
+			break;
+	}
 
 	if (ret < 0 && (req->flags & REQ_F_LINK))
 		req->flags |= REQ_F_FAIL_LINK;
@@ -2481,10 +2503,20 @@ static void io_wq_submit_work(struct io_wq *wq, struct io_wq_work **workptr)
 	/* async context always use a copy of the sqe */
 	kfree(sqe);
 
-	/* if a dependent link is ready, pass it back */
+	/*
+	 * If a dependent link is ready and is on the same io_wq as us, pass it
+	 * back for immediate execution. If it's on a different io_wq, enqueue
+	 * it separately.
+	 */
 	if (!ret && nxt) {
-		io_prep_async_work(nxt);
-		*workptr = &nxt->work;
+		struct io_wq *nxt_wq;
+		bool do_hashed;
+
+		nxt_wq = io_prep_async_work(ctx, nxt, &do_hashed);
+		if (nxt_wq == wq)
+			*workptr = &nxt->work;
+		else
+			io_wq_enqueue(nxt_wq, &nxt->work);
 	}
 }
 
@@ -2598,8 +2630,13 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
 
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
-	if (prev)
-		ret = io_async_cancel_one(ctx, (void *) prev->user_data);
+	if (prev) {
+		struct io_wq *wq;
+		bool tmp;
+
+		wq = io_prep_async_work(ctx, prev, &tmp);
+		ret = io_async_cancel_one(wq, (void *) prev->user_data);
+	}
 
 	io_cqring_add_event(ctx, req->user_data, ret);
 	io_put_req(req, NULL);
@@ -3274,11 +3311,15 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
 
 static void io_finish_async(struct io_ring_ctx *ctx)
 {
+	int i;
+
 	io_sq_thread_stop(ctx);
 
-	if (ctx->io_wq) {
-		io_wq_destroy(ctx->io_wq);
-		ctx->io_wq = NULL;
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++) {
+		if (ctx->io_wq[i]) {
+			io_wq_destroy(ctx->io_wq[i]);
+			ctx->io_wq[i] = NULL;
+		}
 	}
 }
 
@@ -3286,9 +3327,11 @@ static void io_finish_async(struct io_ring_ctx *ctx)
 static void io_destruct_skb(struct sk_buff *skb)
 {
 	struct io_ring_ctx *ctx = skb->sk->sk_user_data;
+	int i;
 
-	if (ctx->io_wq)
-		io_wq_flush(ctx->io_wq);
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+		if (ctx->io_wq[i])
+			io_wq_flush(ctx->io_wq[i]);
 
 	unix_destruct_scm(skb);
 }
@@ -3731,12 +3774,27 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
 		goto err;
 	}
 
-	/* Do QD, or 4 * CPUS, whatever is smallest */
+	/*
+	 * This is for the bounded time requests, generally disk IO.
+	 * Do QD, or 4 * CPUS, whatever is smallest
+	 */
 	concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
-	ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm);
-	if (IS_ERR(ctx->io_wq)) {
-		ret = PTR_ERR(ctx->io_wq);
-		ctx->io_wq = NULL;
+	ctx->io_wq[0] = io_wq_create(concurrency, ctx->sqo_mm);
+	if (IS_ERR(ctx->io_wq[0])) {
+		ret = PTR_ERR(ctx->io_wq[0]);
+		ctx->io_wq[0] = NULL;
+		goto err;
+	}
+
+	/*
+	 * This pool is for unbounded request times, things that could
+	 * take an indeterminite amount of time to complete. Use a separate
+	 * pool for those, to provide fairness with the bounded queue.
+	 */
+	ctx->io_wq[1] = io_wq_create(ctx->cq_entries, ctx->sqo_mm);
+	if (IS_ERR(ctx->io_wq[1])) {
+		ret = PTR_ERR(ctx->io_wq[1]);
+		ctx->io_wq[1] = NULL;
 		goto err;
 	}
 
@@ -4114,6 +4172,8 @@ static int io_uring_fasync(int fd, struct file *file, int on)
 
 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 {
+	int i;
+
 	mutex_lock(&ctx->uring_lock);
 	percpu_ref_kill(&ctx->refs);
 	mutex_unlock(&ctx->uring_lock);
@@ -4121,8 +4181,9 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 	io_kill_timeouts(ctx);
 	io_poll_remove_all(ctx);
 
-	if (ctx->io_wq)
-		io_wq_cancel_all(ctx->io_wq);
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+		if (ctx->io_wq[i])
+			io_wq_cancel_all(ctx->io_wq[i]);
 
 	io_iopoll_reap_events(ctx);
 	wait_for_completion(&ctx->ctx_done);
@@ -4138,7 +4199,7 @@ static int io_uring_release(struct inode *inode, struct file *file)
 	return 0;
 }
 
-static void io_uring_cancel_files(struct io_ring_ctx *ctx,
+static void io_uring_cancel_files(struct io_ring_ctx *ctx, struct io_wq *wq,
 				  struct files_struct *files)
 {
 	struct io_kiocb *req;
@@ -4150,7 +4211,7 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
 		spin_lock_irq(&ctx->inflight_lock);
 		list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
 			if (req->work.files == files) {
-				ret = io_wq_cancel_work(ctx->io_wq, &req->work);
+				ret = io_wq_cancel_work(wq, &req->work);
 				break;
 			}
 		}
@@ -4178,10 +4239,14 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
 static int io_uring_flush(struct file *file, void *data)
 {
 	struct io_ring_ctx *ctx = file->private_data;
+	int i;
+
+	for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+		io_uring_cancel_files(ctx, ctx->io_wq[i], data);
 
-	io_uring_cancel_files(ctx, data);
 	if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
-		io_wq_cancel_all(ctx->io_wq);
+		for (i = 0; i < ARRAY_SIZE(ctx->io_wq); i++)
+			io_wq_cancel_all(ctx->io_wq[i]);
 	return 0;
 }
 
-- 
2.24.0


      parent reply	other threads:[~2019-11-06 19:40 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-11-06 19:40 [PATCHSET 0/2] Add unbounded io_wq for io_uring Jens Axboe
2019-11-06 19:40 ` [PATCH 1/2] io-wq: pass in io_wq to work handler Jens Axboe
2019-11-06 19:40 ` Jens Axboe [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox