public inbox for [email protected]
 help / color / mirror / Atom feed
From: Bernd Schubert <[email protected]>
To: Miklos Szeredi <[email protected]>, Jens Axboe <[email protected]>,
	 Pavel Begunkov <[email protected]>,
	[email protected]
Cc: [email protected], [email protected],
	 Joanne Koong <[email protected]>,
	Josef Bacik <[email protected]>,
	 Amir Goldstein <[email protected]>,
	Bernd Schubert <[email protected]>
Subject: [PATCH RFC v3 12/17] fuse: {uring} Handle teardown of ring entries
Date: Sun, 01 Sep 2024 15:37:06 +0200	[thread overview]
Message-ID: <20240901-b4-fuse-uring-rfcv3-without-mmap-v3-12-9207f7391444@ddn.com> (raw)
In-Reply-To: <20240901-b4-fuse-uring-rfcv3-without-mmap-v3-0-9207f7391444@ddn.com>

On teardown struct file_operations::uring_cmd requests
need to be completed by calling io_uring_cmd_done().
Not completing all ring entries would result in busy io-uring
tasks giving warning messages in intervals and unreleased
struct file.

Additionally the fuse connection and with that the ring can
only get released when all io-uring commands are completed.

Completion is done with ring entries that are
a) in waiting state for new fuse requests - io_uring_cmd_done
is needed

b) already in userspace - io_uring_cmd_done through teardown
is not needed, the request can just get released. If fuse server
is still active and commits such a ring entry, fuse_uring_cmd()
already checks if the connection is active and then complete the
io-uring itself with -ENOTCONN. I.e. special handling is not
needed.

This scheme is basically represented by the ring entry state
FRRS_WAIT and FRRS_USERSPACE.

Entries in state:
- FRRS_INIT: No action needed, do not contribute to
  ring->queue_refs yet
- All other states: Are currently processed by other tasks,
  async teardown is needed and it has to wait for the two
  states above. It could be also solved without an async
  teardown task, but would require additional if conditions
  in hot code paths. Also in my personal opinion the code
  looks cleaner with async teardown.

Signed-off-by: Bernd Schubert <[email protected]>
---
 fs/fuse/dev.c         |  10 +++
 fs/fuse/dev_uring.c   | 196 ++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/fuse/dev_uring_i.h |  81 +++++++++++++++++++++
 3 files changed, 287 insertions(+)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 71443a93f1d4..3485752e25aa 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2190,6 +2190,8 @@ void fuse_abort_conn(struct fuse_conn *fc)
 		fc->connected = 0;
 		spin_unlock(&fc->bg_lock);
 
+		fuse_uring_set_stopped(fc);
+
 		fuse_set_initialized(fc);
 		list_for_each_entry(fud, &fc->devices, entry) {
 			struct fuse_pqueue *fpq = &fud->pq;
@@ -2233,6 +2235,12 @@ void fuse_abort_conn(struct fuse_conn *fc)
 		spin_unlock(&fc->lock);
 
 		fuse_dev_end_requests(&to_end);
+
+		/*
+		 * fc->lock must not be taken to avoid conflicts with io-uring
+		 * locks
+		 */
+		fuse_uring_abort(fc);
 	} else {
 		spin_unlock(&fc->lock);
 	}
@@ -2244,6 +2252,8 @@ void fuse_wait_aborted(struct fuse_conn *fc)
 	/* matches implicit memory barrier in fuse_drop_waiting() */
 	smp_mb();
 	wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0);
+
+	fuse_uring_wait_stopped_queues(fc);
 }
 
 int fuse_dev_release(struct inode *inode, struct file *file)
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 96347751668e..52e2323cc258 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -67,6 +67,41 @@ fuse_uring_async_send_to_ring(struct io_uring_cmd *cmd,
 	io_uring_cmd_done(cmd, 0, 0, issue_flags);
 }
 
+/* Abort all list queued request on the given ring queue */
+static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
+{
+	struct fuse_req *req;
+	LIST_HEAD(sync_list);
+	LIST_HEAD(async_list);
+
+	spin_lock(&queue->lock);
+
+	list_for_each_entry(req, &queue->sync_fuse_req_queue, list)
+		clear_bit(FR_PENDING, &req->flags);
+	list_for_each_entry(req, &queue->async_fuse_req_queue, list)
+		clear_bit(FR_PENDING, &req->flags);
+
+	list_splice_init(&queue->async_fuse_req_queue, &sync_list);
+	list_splice_init(&queue->sync_fuse_req_queue, &async_list);
+
+	spin_unlock(&queue->lock);
+
+	/* must not hold queue lock to avoid order issues with fi->lock */
+	fuse_dev_end_requests(&sync_list);
+	fuse_dev_end_requests(&async_list);
+}
+
+void fuse_uring_abort_end_requests(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		fuse_uring_abort_end_queue_requests(queue);
+	}
+}
+
 /* Update conn limits according to ring values */
 static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring)
 {
@@ -124,6 +159,8 @@ static int _fuse_uring_conn_cfg(struct fuse_ring_config *rcfg,
 
 	ring->queue_size = queue_sz;
 
+	init_waitqueue_head(&ring->stop_waitq);
+
 	fc->ring = ring;
 	ring->fc = fc;
 
@@ -203,6 +240,165 @@ int fuse_uring_conn_cfg(struct file *file, void __user *argp)
 	return res;
 }
 
+static void fuse_uring_stop_fuse_req_end(struct fuse_ring_ent *ent)
+{
+	struct fuse_req *req = ent->fuse_req;
+
+	ent->fuse_req = NULL;
+	clear_bit(FR_SENT, &req->flags);
+	req->out.h.error = -ECONNABORTED;
+	fuse_request_end(req);
+}
+
+/*
+ * Release a request/entry on connection tear down
+ */
+static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent,
+					 bool need_cmd_done)
+{
+	struct fuse_ring_queue *queue = ent->queue;
+
+	/*
+	 * fuse_request_end() might take other locks like fi->lock and
+	 * can lead to lock ordering issues
+	 */
+	lockdep_assert_not_held(&ent->queue->lock);
+
+	if (need_cmd_done) {
+		pr_devel("qid=%d tag=%d sending cmd_done\n", queue->qid,
+			 ent->tag);
+
+		io_uring_cmd_done(ent->cmd, -ENOTCONN, 0,
+				  IO_URING_F_UNLOCKED);
+	}
+
+	if (ent->fuse_req)
+		fuse_uring_stop_fuse_req_end(ent);
+
+	ent->state = FRRS_FREED;
+}
+
+static void fuse_uring_stop_list_entries(struct list_head *head,
+					 struct fuse_ring_queue *queue,
+					 enum fuse_ring_req_state exp_state)
+{
+	struct fuse_ring *ring = queue->ring;
+	struct fuse_ring_ent *ent, *next;
+	ssize_t queue_refs = SSIZE_MAX;
+	LIST_HEAD(to_teardown);
+
+	spin_lock(&queue->lock);
+	list_for_each_entry_safe(ent, next, head, list) {
+		if (ent->state != exp_state) {
+			pr_warn("entry teardown qid=%d tag=%d state=%d expected=%d",
+				queue->qid, ent->tag, ent->state, exp_state);
+			continue;
+		}
+
+		list_move(&ent->list, &to_teardown);
+	}
+	spin_unlock(&queue->lock);
+
+	/* no queue lock to avoid lock order issues */
+	list_for_each_entry_safe(ent, next, &to_teardown, list) {
+		bool need_cmd_done = ent->state != FRRS_USERSPACE;
+
+		fuse_uring_entry_teardown(ent, need_cmd_done);
+		queue_refs = atomic_dec_return(&ring->queue_refs);
+
+		if (WARN_ON_ONCE(queue_refs < 0))
+			pr_warn("qid=%d queue_refs=%zd", queue->qid,
+				queue_refs);
+	}
+}
+
+static void fuse_uring_stop_queue(struct fuse_ring_queue *queue)
+{
+	fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue,
+				     FRRS_USERSPACE);
+	fuse_uring_stop_list_entries(&queue->async_ent_avail_queue, queue,
+				     FRRS_WAIT);
+	fuse_uring_stop_list_entries(&queue->sync_ent_avail_queue, queue,
+				     FRRS_WAIT);
+}
+
+/*
+ * Log state debug info
+ */
+static void fuse_uring_log_ent_state(struct fuse_ring *ring)
+{
+	int qid, tag;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		for (tag = 0; tag < ring->queue_depth; tag++) {
+			struct fuse_ring_ent *ent = &queue->ring_ent[tag];
+
+			if (ent->state != FRRS_FREED && ent->state != FRRS_INIT)
+				pr_info("ring=%p qid=%d tag=%d state=%d\n",
+					ring, qid, tag, ent->state);
+		}
+	}
+	ring->stop_debug_log = 1;
+}
+
+static void fuse_uring_async_stop_queues(struct work_struct *work)
+{
+	int qid;
+	struct fuse_ring *ring =
+		container_of(work, struct fuse_ring, async_teardown_work.work);
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		fuse_uring_stop_queue(queue);
+	}
+
+	/*
+	 * Some ring entries are might be in the middle of IO operations,
+	 * i.e. in process to get handled by file_operations::uring_cmd
+	 * or on the way to userspace - we could handle that with conditions in
+	 * run time code, but easier/cleaner to have an async tear down handler
+	 * If there are still queue references left
+	 */
+	if (atomic_read(&ring->queue_refs) > 0) {
+		if (time_after(jiffies,
+			       ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT))
+			fuse_uring_log_ent_state(ring);
+
+		schedule_delayed_work(&ring->async_teardown_work,
+				      FUSE_URING_TEARDOWN_INTERVAL);
+	} else {
+		wake_up_all(&ring->stop_waitq);
+	}
+}
+
+/*
+ * Stop the ring queues
+ */
+void fuse_uring_stop_queues(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		fuse_uring_stop_queue(queue);
+	}
+
+	if (atomic_read(&ring->queue_refs) > 0) {
+		pr_info("ring=%p scheduling async queue stop\n", ring);
+		ring->teardown_time = jiffies;
+		INIT_DELAYED_WORK(&ring->async_teardown_work,
+				  fuse_uring_async_stop_queues);
+		schedule_delayed_work(&ring->async_teardown_work,
+				      FUSE_URING_TEARDOWN_INTERVAL);
+	} else {
+		wake_up_all(&ring->stop_waitq);
+	}
+}
+
 /*
  * Checks for errors and stores it into the request
  */
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index 697963e5d524..432465d4bfce 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -14,6 +14,9 @@
 /* IORING_MAX_ENTRIES */
 #define FUSE_URING_MAX_QUEUE_DEPTH 32768
 
+#define FUSE_URING_TEARDOWN_TIMEOUT (5 * HZ)
+#define FUSE_URING_TEARDOWN_INTERVAL (HZ/20)
+
 enum fuse_ring_req_state {
 	FRRS_INVALID = 0,
 
@@ -31,6 +34,9 @@ enum fuse_ring_req_state {
 
 	/* request is in or on the way to user space */
 	FRRS_USERSPACE,
+
+	/* request is released */
+	FRRS_FREED,
 };
 
 /* A fuse ring entry, part of the ring queue */
@@ -143,17 +149,32 @@ struct fuse_ring {
 	/* Is the ring read to take requests */
 	unsigned int ready : 1;
 
+	/*
+	 * Log ring entry states onces on stop when entries cannot be
+	 * released
+	 */
+	unsigned int stop_debug_log : 1;
+
 	/* number of SQEs initialized */
 	atomic_t nr_sqe_init;
 
 	/* Used to release the ring on stop */
 	atomic_t queue_refs;
 
+	wait_queue_head_t stop_waitq;
+
+	/* async tear down */
+	struct delayed_work async_teardown_work;
+
+	/* log */
+	unsigned long teardown_time;
+
 	struct fuse_ring_queue queues[] ____cacheline_aligned_in_smp;
 };
 
 void fuse_uring_abort_end_requests(struct fuse_ring *ring);
 int fuse_uring_conn_cfg(struct file *file, void __user *argp);
+void fuse_uring_stop_queues(struct fuse_ring *ring);
 int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
 
 static inline void fuse_uring_conn_destruct(struct fuse_conn *fc)
@@ -189,6 +210,55 @@ static inline bool fuse_per_core_queue(struct fuse_conn *fc)
 	return fc->ring && fc->ring->per_core_queue;
 }
 
+static inline void fuse_uring_set_stopped_queues(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		spin_lock(&queue->lock);
+		queue->stopped = 1;
+		spin_unlock(&queue->lock);
+	}
+}
+
+/*
+ *  Set per queue aborted flag
+ */
+static inline void fuse_uring_set_stopped(struct fuse_conn *fc)
+	__must_hold(fc->lock)
+{
+	if (fc->ring == NULL)
+		return;
+
+	fc->ring->ready = false;
+
+	fuse_uring_set_stopped_queues(fc->ring);
+}
+
+static inline void fuse_uring_abort(struct fuse_conn *fc)
+{
+	struct fuse_ring *ring = fc->ring;
+
+	if (ring == NULL)
+		return;
+
+	if (atomic_read(&ring->queue_refs) > 0) {
+		fuse_uring_abort_end_requests(ring);
+		fuse_uring_stop_queues(ring);
+	}
+}
+
+static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
+{
+	struct fuse_ring *ring = fc->ring;
+
+	if (ring)
+		wait_event(ring->stop_waitq,
+			   atomic_read(&ring->queue_refs) == 0);
+}
+
 #else /* CONFIG_FUSE_IO_URING */
 
 struct fuse_ring;
@@ -212,6 +282,17 @@ static inline bool fuse_per_core_queue(struct fuse_conn *fc)
 	return false;
 }
 
+static inline void fuse_uring_set_stopped(struct fuse_conn *fc)
+{
+}
+
+static inline void fuse_uring_abort(struct fuse_conn *fc)
+{
+}
+
+static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
+{
+}
 #endif /* CONFIG_FUSE_IO_URING */
 
 #endif /* _FS_FUSE_DEV_URING_I_H */

-- 
2.43.0


  parent reply	other threads:[~2024-09-01 13:37 UTC|newest]

Thread overview: 37+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-09-01 13:36 [PATCH RFC v3 00/17] fuse: fuse-over-io-uring Bernd Schubert
2024-09-01 13:36 ` [PATCH RFC v3 01/17] fuse: rename to fuse_dev_end_requests and make non-static Bernd Schubert
2024-09-01 13:36 ` [PATCH RFC v3 02/17] fuse: Move fuse_get_dev to header file Bernd Schubert
2024-09-01 13:36 ` [PATCH RFC v3 03/17] fuse: Move request bits Bernd Schubert
2024-09-01 13:36 ` [PATCH RFC v3 04/17] fuse: Add fuse-io-uring design documentation Bernd Schubert
2024-09-01 13:36 ` [PATCH RFC v3 05/17] fuse: Add a uring config ioctl Bernd Schubert
2024-09-04  0:43   ` Joanne Koong
2024-09-04 22:24     ` Bernd Schubert
2024-09-06 19:23       ` Joanne Koong
2024-09-01 13:37 ` [PATCH RFC v3 06/17] fuse: Add the queue configuration ioctl Bernd Schubert
2024-09-04 22:23   ` Joanne Koong
2024-09-04 22:38     ` Bernd Schubert
2024-09-04 22:42       ` Joanne Koong
2024-09-01 13:37 ` [PATCH RFC v3 07/17] fuse: {uring} Add a dev_release exception for fuse-over-io-uring Bernd Schubert
2024-09-01 13:37 ` [PATCH RFC v3 08/17] fuse: {uring} Handle SQEs - register commands Bernd Schubert
2024-09-04 15:40   ` Jens Axboe
2024-09-01 13:37 ` [PATCH RFC v3 09/17] fuse: Make fuse_copy non static Bernd Schubert
2024-09-01 13:37 ` [PATCH RFC v3 10/17] fuse: Add buffer offset for uring into fuse_copy_state Bernd Schubert
2024-09-01 13:37 ` [PATCH RFC v3 11/17] fuse: {uring} Add uring sqe commit and fetch support Bernd Schubert
2024-09-01 13:37 ` Bernd Schubert [this message]
2024-09-01 13:37 ` [PATCH RFC v3 13/17] fuse: {uring} Add a ring queue and send method Bernd Schubert
2024-09-01 13:37 ` [PATCH RFC v3 14/17] fuse: {uring} Allow to queue to the ring Bernd Schubert
2024-09-01 13:37 ` [PATCH RFC v3 15/17] ate: 2024-08-30 15:43:32 +0100 Bernd Schubert
2024-09-04 15:43   ` Jens Axboe
2024-09-04 15:54     ` Bernd Schubert
2024-09-01 13:37 ` [PATCH RFC v3 16/17] fuse: {uring} Handle IO_URING_F_TASK_DEAD Bernd Schubert
2024-09-01 13:37 ` [PATCH RFC v3 17/17] fuse: {uring} Pin the user buffer Bernd Schubert
2024-09-04 15:47   ` Jens Axboe
2024-09-04 16:08     ` Bernd Schubert
2024-09-04 16:16       ` Jens Axboe
2024-09-04 19:25         ` Bernd Schubert
2024-09-04 19:40           ` Jens Axboe
2024-09-05 21:04             ` Bernd Schubert
2024-09-04 18:59   ` Jens Axboe
2024-09-04 16:42 ` [PATCH RFC v3 00/17] fuse: fuse-over-io-uring Jens Axboe
2024-09-04 19:37   ` Bernd Schubert
2024-09-04 19:41     ` Jens Axboe

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240901-b4-fuse-uring-rfcv3-without-mmap-v3-12-9207f7391444@ddn.com \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox