Hi Joanne, sorry for my late reply, I was occupied all week. On 1/13/25 23:44, Joanne Koong wrote: > On Mon, Jan 6, 2025 at 4:25 PM Bernd Schubert wrote: >> >> This adds support for fuse request completion through ring SQEs >> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing >> the ring entry it becomes available for new fuse requests. >> Handling of requests through the ring (SQE/CQE handling) >> is complete now. >> >> Fuse request data are copied through the mmaped ring buffer, >> there is no support for any zero copy yet. >> >> Signed-off-by: Bernd Schubert >> --- >> fs/fuse/dev_uring.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++ >> fs/fuse/dev_uring_i.h | 12 ++ >> fs/fuse/fuse_i.h | 4 + >> 3 files changed, 466 insertions(+) >> >> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c >> index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644 >> --- a/fs/fuse/dev_uring.c >> +++ b/fs/fuse/dev_uring.c >> @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void) >> return enable_uring; >> } >> >> +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err, >> + int error) >> +{ >> + struct fuse_req *req = ring_ent->fuse_req; >> + >> + if (set_err) >> + req->out.h.error = error; > > I think we could get away with not having the "bool set_err" as an > argument if we do "if (error)" directly. AFAICT, we can use the value > of error directly since it always returns zero on success and any > non-zero value is considered an error. I had done this because of fuse_uring_commit() err = fuse_uring_out_header_has_err(&req->out.h, req, fc); if (err) { /* req->out.h.error already set */ goto out; } In fuse_uring_out_header_has_err() the header might already have the error code, but there are other errors as well. Well, setting an existing error code saves us a few lines and conditions, so you are probably right and I removed that argument now. > >> + >> + clear_bit(FR_SENT, &req->flags); >> + fuse_request_end(ring_ent->fuse_req); >> + ring_ent->fuse_req = NULL; >> +} >> + >> void fuse_uring_destruct(struct fuse_conn *fc) >> { >> struct fuse_ring *ring = fc->ring; >> @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc) >> continue; >> >> WARN_ON(!list_empty(&queue->ent_avail_queue)); >> + WARN_ON(!list_empty(&queue->ent_w_req_queue)); >> WARN_ON(!list_empty(&queue->ent_commit_queue)); >> + WARN_ON(!list_empty(&queue->ent_in_userspace)); >> >> + kfree(queue->fpq.processing); >> kfree(queue); >> ring->queues[qid] = NULL; >> } >> @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, >> { >> struct fuse_conn *fc = ring->fc; >> struct fuse_ring_queue *queue; >> + struct list_head *pq; >> >> queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); >> if (!queue) >> return NULL; >> + pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); >> + if (!pq) { >> + kfree(queue); >> + return NULL; >> + } >> + >> queue->qid = qid; >> queue->ring = ring; >> spin_lock_init(&queue->lock); >> >> INIT_LIST_HEAD(&queue->ent_avail_queue); >> INIT_LIST_HEAD(&queue->ent_commit_queue); >> + INIT_LIST_HEAD(&queue->ent_w_req_queue); >> + INIT_LIST_HEAD(&queue->ent_in_userspace); >> + INIT_LIST_HEAD(&queue->fuse_req_queue); >> + >> + queue->fpq.processing = pq; >> + fuse_pqueue_init(&queue->fpq); >> >> spin_lock(&fc->lock); >> if (ring->queues[qid]) { >> spin_unlock(&fc->lock); >> + kfree(queue->fpq.processing); >> kfree(queue); >> return ring->queues[qid]; >> } >> @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, >> return queue; >> } >> >> +/* >> + * Checks for errors and stores it into the request >> + */ >> +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, >> + struct fuse_req *req, >> + struct fuse_conn *fc) >> +{ >> + int err; >> + >> + err = -EINVAL; >> + if (oh->unique == 0) { >> + /* Not supportd through io-uring yet */ >> + pr_warn_once("notify through fuse-io-uring not supported\n"); >> + goto seterr; >> + } >> + >> + err = -EINVAL; >> + if (oh->error <= -ERESTARTSYS || oh->error > 0) >> + goto seterr; >> + >> + if (oh->error) { >> + err = oh->error; >> + goto err; >> + } >> + >> + err = -ENOENT; >> + if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { >> + pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", >> + req->in.h.unique, >> + oh->unique & ~FUSE_INT_REQ_BIT); >> + goto seterr; >> + } >> + >> + /* >> + * Is it an interrupt reply ID? >> + * XXX: Not supported through fuse-io-uring yet, it should not even >> + * find the request - should not happen. >> + */ >> + WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); >> + >> + return 0; >> + >> +seterr: >> + oh->error = err; >> +err: >> + return err; >> +} >> + >> +static int fuse_uring_copy_from_ring(struct fuse_ring *ring, >> + struct fuse_req *req, >> + struct fuse_ring_ent *ent) >> +{ >> + struct fuse_copy_state cs; >> + struct fuse_args *args = req->args; >> + struct iov_iter iter; >> + int err, res; >> + struct fuse_uring_ent_in_out ring_in_out; >> + >> + res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, >> + sizeof(ring_in_out)); >> + if (res) >> + return -EFAULT; >> + >> + err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, >> + &iter); >> + if (err) >> + return err; >> + >> + fuse_copy_init(&cs, 0, &iter); >> + cs.is_uring = 1; >> + cs.req = req; >> + >> + return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); >> +} >> + >> + /* >> + * Copy data from the req to the ring buffer >> + */ >> +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req, >> + struct fuse_ring_ent *ent) >> +{ >> + struct fuse_copy_state cs; >> + struct fuse_args *args = req->args; >> + struct fuse_in_arg *in_args = args->in_args; >> + int num_args = args->in_numargs; >> + int err, res; >> + struct iov_iter iter; >> + struct fuse_uring_ent_in_out ent_in_out = { >> + .flags = 0, >> + .commit_id = ent->commit_id, >> + }; >> + >> + if (WARN_ON(ent_in_out.commit_id == 0)) >> + return -EINVAL; >> + >> + err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); >> + if (err) { >> + pr_info_ratelimited("fuse: Import of user buffer failed\n"); >> + return err; >> + } >> + >> + fuse_copy_init(&cs, 1, &iter); >> + cs.is_uring = 1; >> + cs.req = req; >> + >> + if (num_args > 0) { >> + /* >> + * Expectation is that the first argument is the per op header. >> + * Some op code have that as zero. >> + */ >> + if (args->in_args[0].size > 0) { >> + res = copy_to_user(&ent->headers->op_in, in_args->value, >> + in_args->size); >> + err = res > 0 ? -EFAULT : res; >> + if (err) { >> + pr_info_ratelimited( >> + "Copying the header failed.\n"); >> + return err; >> + } >> + } >> + in_args++; >> + num_args--; >> + } >> + >> + /* copy the payload */ >> + err = fuse_copy_args(&cs, num_args, args->in_pages, >> + (struct fuse_arg *)in_args, 0); >> + if (err) { >> + pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); >> + return err; >> + } >> + >> + ent_in_out.payload_sz = cs.ring.copied_sz; >> + res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, >> + sizeof(ent_in_out)); >> + err = res > 0 ? -EFAULT : res; >> + if (err) >> + return err; >> + >> + return 0; >> +} >> + >> +static int >> +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent) >> +{ >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + struct fuse_ring *ring = queue->ring; >> + struct fuse_req *req = ring_ent->fuse_req; >> + int err, res; >> + >> + err = -EIO; >> + if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) { >> + pr_err("qid=%d ring-req=%p invalid state %d on send\n", >> + queue->qid, ring_ent, ring_ent->state); >> + err = -EIO; >> + goto err; >> + } >> + >> + /* copy the request */ >> + err = fuse_uring_copy_to_ring(ring, req, ring_ent); >> + if (unlikely(err)) { >> + pr_info_ratelimited("Copy to ring failed: %d\n", err); >> + goto err; >> + } >> + >> + /* copy fuse_in_header */ >> + res = copy_to_user(&ring_ent->headers->in_out, &req->in.h, >> + sizeof(req->in.h)); >> + err = res > 0 ? -EFAULT : res; >> + if (err) >> + goto err; >> + >> + set_bit(FR_SENT, &req->flags); >> + return 0; >> + >> +err: >> + fuse_uring_req_end(ring_ent, true, err); >> + return err; >> +} >> + >> +/* >> + * Write data to the ring buffer and send the request to userspace, >> + * userspace will read it >> + * This is comparable with classical read(/dev/fuse) >> + */ >> +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent, >> + unsigned int issue_flags) >> +{ >> + int err = 0; >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + >> + err = fuse_uring_prepare_send(ring_ent); >> + if (err) >> + goto err; >> + >> + spin_lock(&queue->lock); >> + ring_ent->state = FRRS_USERSPACE; >> + list_move(&ring_ent->list, &queue->ent_in_userspace); >> + spin_unlock(&queue->lock); >> + >> + io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags); >> + ring_ent->cmd = NULL; >> + return 0; >> + >> +err: >> + return err; >> +} >> + >> /* >> * Make a ring entry available for fuse_req assignment >> */ >> @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent, >> ring_ent->state = FRRS_AVAILABLE; >> } >> >> +/* Used to find the request on SQE commit */ >> +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent, >> + struct fuse_req *req) >> +{ >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + struct fuse_pqueue *fpq = &queue->fpq; >> + unsigned int hash; >> + >> + /* commit_id is the unique id of the request */ >> + ring_ent->commit_id = req->in.h.unique; >> + >> + req->ring_entry = ring_ent; >> + hash = fuse_req_hash(ring_ent->commit_id); >> + list_move_tail(&req->list, &fpq->processing[hash]); >> +} >> + >> +/* >> + * Assign a fuse queue entry to the given entry >> + */ >> +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent, >> + struct fuse_req *req) >> +{ >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + >> + lockdep_assert_held(&queue->lock); >> + >> + if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE && >> + ring_ent->state != FRRS_COMMIT)) { >> + pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid, >> + ring_ent->state); >> + } >> + list_del_init(&req->list); >> + clear_bit(FR_PENDING, &req->flags); >> + ring_ent->fuse_req = req; >> + ring_ent->state = FRRS_FUSE_REQ; >> + list_move(&ring_ent->list, &queue->ent_w_req_queue); >> + fuse_uring_add_to_pq(ring_ent, req); >> +} >> + >> +/* >> + * Release the ring entry and fetch the next fuse request if available >> + * >> + * @return true if a new request has been fetched >> + */ >> +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent) >> + __must_hold(&queue->lock) >> +{ >> + struct fuse_req *req; >> + struct fuse_ring_queue *queue = ring_ent->queue; >> + struct list_head *req_queue = &queue->fuse_req_queue; >> + >> + lockdep_assert_held(&queue->lock); >> + >> + /* get and assign the next entry while it is still holding the lock */ >> + req = list_first_entry_or_null(req_queue, struct fuse_req, list); >> + if (req) { >> + fuse_uring_add_req_to_ring_ent(ring_ent, req); >> + return true; >> + } >> + >> + return false; >> +} >> + >> +/* >> + * Read data from the ring buffer, which user space has written to >> + * This is comparible with handling of classical write(/dev/fuse). >> + * Also make the ring request available again for new fuse requests. >> + */ >> +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent, >> + unsigned int issue_flags) >> +{ >> + struct fuse_ring *ring = ring_ent->queue->ring; >> + struct fuse_conn *fc = ring->fc; >> + struct fuse_req *req = ring_ent->fuse_req; >> + ssize_t err = 0; >> + bool set_err = false; >> + >> + err = copy_from_user(&req->out.h, &ring_ent->headers->in_out, >> + sizeof(req->out.h)); >> + if (err) { >> + req->out.h.error = err; >> + goto out; >> + } >> + >> + err = fuse_uring_out_header_has_err(&req->out.h, req, fc); >> + if (err) { >> + /* req->out.h.error already set */ >> + goto out; >> + } >> + >> + err = fuse_uring_copy_from_ring(ring, req, ring_ent); >> + if (err) >> + set_err = true; >> + >> +out: >> + fuse_uring_req_end(ring_ent, set_err, err); >> +} >> + >> +/* >> + * Get the next fuse req and send it >> + */ >> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent, >> + struct fuse_ring_queue *queue, >> + unsigned int issue_flags) >> +{ >> + int err; >> + bool has_next; >> + >> +retry: >> + spin_lock(&queue->lock); >> + fuse_uring_ent_avail(ring_ent, queue); >> + has_next = fuse_uring_ent_assign_req(ring_ent); >> + spin_unlock(&queue->lock); >> + >> + if (has_next) { >> + err = fuse_uring_send_next_to_ring(ring_ent, issue_flags); >> + if (err) >> + goto retry; >> + } >> +} >> + >> +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) >> +{ >> + struct fuse_ring_queue *queue = ent->queue; >> + >> + lockdep_assert_held(&queue->lock); >> + >> + if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) >> + return -EIO; >> + >> + ent->state = FRRS_COMMIT; >> + list_move(&ent->list, &queue->ent_commit_queue); >> + >> + return 0; >> +} >> + >> +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ >> +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, >> + struct fuse_conn *fc) >> +{ >> + const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); >> + struct fuse_ring_ent *ring_ent; >> + int err; >> + struct fuse_ring *ring = fc->ring; >> + struct fuse_ring_queue *queue; >> + uint64_t commit_id = READ_ONCE(cmd_req->commit_id); >> + unsigned int qid = READ_ONCE(cmd_req->qid); >> + struct fuse_pqueue *fpq; >> + struct fuse_req *req; >> + >> + err = -ENOTCONN; >> + if (!ring) >> + return err; >> + >> + if (qid >= ring->nr_queues) >> + return -EINVAL; >> + >> + queue = ring->queues[qid]; >> + if (!queue) >> + return err; >> + fpq = &queue->fpq; >> + >> + spin_lock(&queue->lock); >> + /* Find a request based on the unique ID of the fuse request >> + * This should get revised, as it needs a hash calculation and list >> + * search. And full struct fuse_pqueue is needed (memory overhead). >> + * As well as the link from req to ring_ent. >> + */ > > imo, the hash calculation and list search seems ok. I can't think of a > more optimal way of doing it. Instead of using the full struct > fuse_pqueue, I think we could just have the "struct list_head > *processing" defined inside "struct fuse_ring_queue" and change > fuse_request_find() to take in a list_head. I don't think we need a > dedicated spinlock for the list either. We can just reuse queue->lock, > as that's (currently) always held already when the processing list is > accessed. Please see the attached patch, which uses xarray. Totally untested, though. I actually found an issue while writing this patch - FR_PENDING was cleared without holding fiq->lock, but that is important for request_wait_answer(). If something removes req from the list, we entirely loose the ring entry - can never be used anymore. Personally I think the attached patch is safer. > > >> + req = fuse_request_find(fpq, commit_id); >> + err = -ENOENT; >> + if (!req) { >> + pr_info("qid=%d commit_id %llu not found\n", queue->qid, >> + commit_id); >> + spin_unlock(&queue->lock); >> + return err; >> + } >> + list_del_init(&req->list); >> + ring_ent = req->ring_entry; >> + req->ring_entry = NULL; > > Do we need to set this to NULL, given that the request will be cleaned > up later in fuse_uring_req_end() anyways? It is not explicitly set to NULL in that function. Would you mind to keep it safe? > >> + >> + err = fuse_ring_ent_set_commit(ring_ent); >> + if (err != 0) { >> + pr_info_ratelimited("qid=%d commit_id %llu state %d", >> + queue->qid, commit_id, ring_ent->state); >> + spin_unlock(&queue->lock); >> + return err; >> + } >> + >> + ring_ent->cmd = cmd; >> + spin_unlock(&queue->lock); >> + >> + /* without the queue lock, as other locks are taken */ >> + fuse_uring_commit(ring_ent, issue_flags); >> + >> + /* >> + * Fetching the next request is absolutely required as queued >> + * fuse requests would otherwise not get processed - committing >> + * and fetching is done in one step vs legacy fuse, which has separated >> + * read (fetch request) and write (commit result). >> + */ >> + fuse_uring_next_fuse_req(ring_ent, queue, issue_flags); > > If there's no request ready to read next, then no request will be > fetched and this will return. However, as I understand it, once the > uring is registered, userspace should only be interacting with the > uring via FUSE_IO_URING_CMD_COMMIT_AND_FETCH. However for the case > where no request was ready to read, it seems like userspace would have > nothing to commit when it wants to fetch the next request? We have FUSE_IO_URING_CMD_REGISTER FUSE_IO_URING_CMD_COMMIT_AND_FETCH After _CMD_REGISTER the corresponding ring-entry is ready to get fuse requests and waiting. After it gets a request assigned and handles it by fuse server the _COMMIT_AND_FETCH scheme applies. Did you possibly miss that _CMD_REGISTER will already have it waiting? > > A more general question though: I imagine the most common use case > from the server side is waiting / polling until there is a request to > fetch. Could we not just do that here in the kernel instead with > adding a waitqueue mechanism and having fuse_uring_next_fuse_req() > only return when there is a request available? It seems like that > would reduce the amount of overhead instead of doing the > waiting/checking from the server side? The io-uring interface says that we should return -EIOCBQUEUED. If we would wait here, other SQEs that are submitted in parallel by fuse-server couldn't be handled anymore, as we wouldn't return to io-uring (all of this is in io-uring task context). > >> + return 0; >> +} >> + >> /* >> * fuse_uring_req_fetch command handling >> */ >> @@ -325,6 +767,14 @@ int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd, >> return err; >> } >> break; >> + case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: >> + err = fuse_uring_commit_fetch(cmd, issue_flags, fc); >> + if (err) { >> + pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", >> + err); >> + return err; >> + } >> + break; >> default: >> return -EINVAL; >> } >> diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h >> index 4e46dd65196d26dabc62dada33b17de9aa511c08..80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85 100644 >> --- a/fs/fuse/dev_uring_i.h >> +++ b/fs/fuse/dev_uring_i.h >> @@ -20,6 +20,9 @@ enum fuse_ring_req_state { >> /* The ring entry is waiting for new fuse requests */ >> FRRS_AVAILABLE, >> >> + /* The ring entry got assigned a fuse req */ >> + FRRS_FUSE_REQ, >> + >> /* The ring entry is in or on the way to user space */ >> FRRS_USERSPACE, >> }; >> @@ -70,7 +73,16 @@ struct fuse_ring_queue { >> * entries in the process of being committed or in the process >> * to be sent to userspace >> */ >> + struct list_head ent_w_req_queue; > > What does the w in this stand for? I find the name ambiguous here. "entry-with-request-queue". Do you have another naming suggestion? Thanks, Bernd