From: Pavel Begunkov <[email protected]>
To: Jens Axboe <[email protected]>,
[email protected],
"[email protected]" <[email protected]>
Subject: Re: [RFC] io_uring CQ ring backpressure
Date: Wed, 6 Nov 2019 22:12:00 +0300 [thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
[-- Attachment #1.1: Type: text/plain, Size: 8662 bytes --]
On 06/11/2019 19:21, Jens Axboe wrote:
> Currently we drop completion events, if the CQ ring is full. That's fine
> for requests with bounded completion times, but it may make it harder to
> use io_uring with networked IO where request completion times are
> generally unbounded. Or with POLL, for example, which is also unbounded.
>
> This patch adds IORING_SETUP_CQ_NODROP, which changes the behavior a bit
> for CQ ring overflows. First of all, it doesn't overflow the ring, it
> simply stores backlog of completions that we weren't able to put into
> the CQ ring. To prevent the backlog from growing indefinitely, if the
> backlog is non-empty, we apply back pressure on IO submissions. Any
> attempt to submit new IO with a non-empty backlog will get an -EBUSY
> return from the kernel.
>
> I think that makes for a pretty sane API in terms of how the application
> can handle it. With CQ_NODROP enabled, we'll never drop a completion
> event (well unless we're totally out of memory...), but we'll also not
> allow submissions with a completion backlog.
>
> ---
>
> diff --git a/fs/io_uring.c b/fs/io_uring.c
> index b647cdf0312c..12e9fe2479f4 100644
> --- a/fs/io_uring.c
> +++ b/fs/io_uring.c
> @@ -207,6 +207,7 @@ struct io_ring_ctx {
>
> struct list_head defer_list;
> struct list_head timeout_list;
> + struct list_head cq_overflow_list;
>
> wait_queue_head_t inflight_wait;
> } ____cacheline_aligned_in_smp;
> @@ -414,6 +415,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
>
> ctx->flags = p->flags;
> init_waitqueue_head(&ctx->cq_wait);
> + INIT_LIST_HEAD(&ctx->cq_overflow_list);
> init_completion(&ctx->ctx_done);
> init_completion(&ctx->sqo_thread_started);
> mutex_init(&ctx->uring_lock);
> @@ -588,6 +590,77 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
> return &rings->cqes[tail & ctx->cq_mask];
> }
>
> +static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
> +{
> + if (waitqueue_active(&ctx->wait))
> + wake_up(&ctx->wait);
> + if (waitqueue_active(&ctx->sqo_wait))
> + wake_up(&ctx->sqo_wait);
> + if (ctx->cq_ev_fd)
> + eventfd_signal(ctx->cq_ev_fd, 1);
> +}
> +
> +struct cqe_drop {
> + struct list_head list;
> + u64 user_data;
> + s32 res;
> +};
How about to use io_kiocb instead of new structure?
It already has valid req->user_data and occasionaly used
req->result. But this would probably take more work to do.
> +
> +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
> +{
> + struct io_rings *rings = ctx->rings;
> + struct io_uring_cqe *cqe;
> + struct cqe_drop *drop;
> + unsigned long flags;
> +
> + if (list_empty_careful(&ctx->cq_overflow_list))
> + return;
> + if (ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
> + rings->cq_ring_entries)
> + return;
> +
> + spin_lock_irqsave(&ctx->completion_lock, flags);
> +
> + while (!list_empty(&ctx->cq_overflow_list)) {
> + drop = list_first_entry(&ctx->cq_overflow_list, struct cqe_drop,
> + list);
> + cqe = io_get_cqring(ctx);
> + if (!cqe)
> + break;
> + list_del(&drop->list);
> + WRITE_ONCE(cqe->user_data, drop->user_data);
> + WRITE_ONCE(cqe->res, drop->res);
> + WRITE_ONCE(cqe->flags, 0);
> + kfree(drop);
> + }
> +
> + io_commit_cqring(ctx);
> + spin_unlock_irqrestore(&ctx->completion_lock, flags);
> + io_cqring_ev_posted(ctx);
> +}
> +
> +static void io_cqring_overflow(struct io_ring_ctx *ctx, u64 ki_user_data,
> + long res)
> + __must_hold(&ctx->completion_lock)
> +{
> + struct cqe_drop *drop;
> +
> + if (!(ctx->flags & IORING_SETUP_CQ_NODROP)) {
> +log_overflow:
> + WRITE_ONCE(ctx->rings->cq_overflow,
> + atomic_inc_return(&ctx->cached_cq_overflow));
> + return;
> + }
> +
> + drop = kmalloc(sizeof(*drop), GFP_ATOMIC);
> + if (!drop)
> + goto log_overflow;
> +
> + drop->user_data = ki_user_data;
> + drop->res = res;
> + list_add_tail(&drop->list, &ctx->cq_overflow_list);
> +}
> +
> static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
> long res)
> {
> @@ -601,26 +674,15 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
> * the ring.
> */
> cqe = io_get_cqring(ctx);
> - if (cqe) {
> + if (likely(cqe)) {
> WRITE_ONCE(cqe->user_data, ki_user_data);
> WRITE_ONCE(cqe->res, res);
> WRITE_ONCE(cqe->flags, 0);
> } else {
> - WRITE_ONCE(ctx->rings->cq_overflow,
> - atomic_inc_return(&ctx->cached_cq_overflow));
> + io_cqring_overflow(ctx, ki_user_data, res);
> }
> }
>
> -static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
> -{
> - if (waitqueue_active(&ctx->wait))
> - wake_up(&ctx->wait);
> - if (waitqueue_active(&ctx->sqo_wait))
> - wake_up(&ctx->sqo_wait);
> - if (ctx->cq_ev_fd)
> - eventfd_signal(ctx->cq_ev_fd, 1);
> -}
> -
> static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
> long res)
> {
> @@ -859,8 +921,13 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
> }
> }
>
> -static unsigned io_cqring_events(struct io_rings *rings)
> +static unsigned io_cqring_events(struct io_ring_ctx *ctx)
> {
> + struct io_rings *rings = ctx->rings;
> +
> + if (ctx->flags & IORING_SETUP_CQ_NODROP)
> + io_cqring_overflow_flush(ctx);
> +
> /* See comment at the top of this file */
> smp_rmb();
> return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
> @@ -1016,7 +1083,7 @@ static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
> * If we do, we can potentially be spinning for commands that
> * already triggered a CQE (eg in error).
> */
> - if (io_cqring_events(ctx->rings))
> + if (io_cqring_events(ctx))
> break;
>
> /*
> @@ -2873,6 +2940,10 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
> int i, submitted = 0;
> bool mm_fault = false;
>
> + if ((ctx->flags & IORING_SETUP_CQ_NODROP) &&
> + !list_empty(&ctx->cq_overflow_list))
> + return -EBUSY;
> +
> if (nr > IO_PLUG_THRESHOLD) {
> io_submit_state_start(&state, ctx, nr);
> statep = &state;
> @@ -2952,6 +3023,7 @@ static int io_sq_thread(void *data)
> timeout = inflight = 0;
> while (!kthread_should_park()) {
> unsigned int to_submit;
> + int ret;
>
> if (inflight) {
> unsigned nr_events = 0;
> @@ -3036,8 +3108,10 @@ static int io_sq_thread(void *data)
> }
>
> to_submit = min(to_submit, ctx->sq_entries);
> - inflight += io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm,
> - true);
> + ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
> + if (ret < 0)
> + continue;
> + inflight += ret;
>
After rebase could be simplified to
if (ret >= 0)
inflight += ret;
> /* Commit SQ ring head once we've consumed all SQEs */
> io_commit_sqring(ctx);
> @@ -3070,7 +3144,7 @@ static inline bool io_should_wake(struct io_wait_queue *iowq)
> * started waiting. For timeouts, we always want to return to userspace,
> * regardless of event count.
> */
> - return io_cqring_events(ctx->rings) >= iowq->to_wait ||
> + return io_cqring_events(ctx) >= iowq->to_wait ||
> atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
> }
>
> @@ -3105,7 +3179,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
> struct io_rings *rings = ctx->rings;
> int ret = 0;
>
> - if (io_cqring_events(rings) >= min_events)
> + if (io_cqring_events(ctx) >= min_events)
> return 0;
>
> if (sig) {
> @@ -4406,7 +4480,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
> }
>
> if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
> - IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
> + IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
> + IORING_SETUP_CQ_NODROP))
> return -EINVAL;
>
> ret = io_uring_create(entries, &p);
> diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
> index f1a118b01d18..3d8517eb376e 100644
> --- a/include/uapi/linux/io_uring.h
> +++ b/include/uapi/linux/io_uring.h
> @@ -56,6 +56,7 @@ struct io_uring_sqe {
> #define IORING_SETUP_SQPOLL (1U << 1) /* SQ poll thread */
> #define IORING_SETUP_SQ_AFF (1U << 2) /* sq_thread_cpu is valid */
> #define IORING_SETUP_CQSIZE (1U << 3) /* app defines CQ size */
> +#define IORING_SETUP_CQ_NODROP (1U << 4) /* no CQ drops */
>
> #define IORING_OP_NOP 0
> #define IORING_OP_READV 1
>
--
Pavel Begunkov
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
next prev parent reply other threads:[~2019-11-06 19:12 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2019-11-06 16:21 [RFC] io_uring CQ ring backpressure Jens Axboe
2019-11-06 19:12 ` Pavel Begunkov [this message]
2019-11-06 19:43 ` Jens Axboe
2019-11-06 19:51 ` Jann Horn
2019-11-06 20:08 ` Jens Axboe
2019-11-06 21:31 ` Jens Axboe
2019-11-06 21:54 ` Pavel Begunkov
2019-11-06 21:56 ` Jens Axboe
2019-11-06 22:42 ` Jens Axboe
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox