public inbox for io-uring@vger.kernel.org
 help / color / mirror / Atom feed
* [PATCH] io_uring/io-wq: add check free worker before create new worker
@ 2025-08-13 12:02 Fengnan Chang
  2025-08-13 12:04 ` Fengnan Chang
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Fengnan Chang @ 2025-08-13 12:02 UTC (permalink / raw)
  To: axboe, io-uring; +Cc: Fengnan Chang, Diangang Li

After commit 0b2b066f8a85 ("io_uring/io-wq: only create a new worker
if it can make progress"), in our produce environment, we still
observe that part of io_worker threads keeps creating and destroying.
After analysis, it was confirmed that this was due to a more complex
scenario involving a large number of fsync operations, which can be
abstracted as frequent write + fsync operations on multiple files in
a single uring instance. Since write is a hash operation while fsync
is not, and fsync is likely to be suspended during execution, the
action of checking the hash value in
io_wqe_dec_running cannot handle such scenarios.
Similarly, if hash-based work and non-hash-based work are sent at the
same time, similar issues are likely to occur.
Returning to the starting point of the issue, when a new work
arrives, io_wq_enqueue may wake up free worker A, while
io_wq_dec_running may create worker B. Ultimately, only one of A and
B can obtain and process the task, leaving the other in an idle
state. In the end, the issue is caused by inconsistent logic in the
checks performed by io_wq_enqueue and io_wq_dec_running.
Therefore, the problem can be resolved by checking for available
workers in io_wq_dec_running.

Signed-off-by: Fengnan Chang <changfengnan@bytedance.com>
Reviewed-by: Diangang Li <lidiangang@bytedance.com>
---
 io_uring/io-wq.c | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index be91edf34f01..17dfaa0395c4 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -357,6 +357,13 @@ static void create_worker_cb(struct callback_head *cb)
 	worker = container_of(cb, struct io_worker, create_work);
 	wq = worker->wq;
 	acct = worker->acct;
+
+	rcu_read_lock();
+	do_create = !io_acct_activate_free_worker(acct);
+	rcu_read_unlock();
+	if (!do_create)
+		goto no_need_create;
+
 	raw_spin_lock(&acct->workers_lock);
 
 	if (acct->nr_workers < acct->max_workers) {
@@ -367,6 +374,7 @@ static void create_worker_cb(struct callback_head *cb)
 	if (do_create) {
 		create_io_worker(wq, acct);
 	} else {
+no_need_create:
 		atomic_dec(&acct->nr_running);
 		io_worker_ref_put(wq);
 	}
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 4+ messages in thread

* Re: [PATCH] io_uring/io-wq: add check free worker before create new worker
  2025-08-13 12:02 [PATCH] io_uring/io-wq: add check free worker before create new worker Fengnan Chang
@ 2025-08-13 12:04 ` Fengnan Chang
  2025-08-13 12:31 ` Jens Axboe
  2025-08-13 12:32 ` Jens Axboe
  2 siblings, 0 replies; 4+ messages in thread
From: Fengnan Chang @ 2025-08-13 12:04 UTC (permalink / raw)
  To: axboe, io-uring; +Cc: Diangang Li

[-- Attachment #1: Type: text/plain, Size: 2564 bytes --]

Here's my test program.

Fengnan Chang <changfengnan@bytedance.com> 于2025年8月13日周三 20:02写道:
>
> After commit 0b2b066f8a85 ("io_uring/io-wq: only create a new worker
> if it can make progress"), in our produce environment, we still
> observe that part of io_worker threads keeps creating and destroying.
> After analysis, it was confirmed that this was due to a more complex
> scenario involving a large number of fsync operations, which can be
> abstracted as frequent write + fsync operations on multiple files in
> a single uring instance. Since write is a hash operation while fsync
> is not, and fsync is likely to be suspended during execution, the
> action of checking the hash value in
> io_wqe_dec_running cannot handle such scenarios.
> Similarly, if hash-based work and non-hash-based work are sent at the
> same time, similar issues are likely to occur.
> Returning to the starting point of the issue, when a new work
> arrives, io_wq_enqueue may wake up free worker A, while
> io_wq_dec_running may create worker B. Ultimately, only one of A and
> B can obtain and process the task, leaving the other in an idle
> state. In the end, the issue is caused by inconsistent logic in the
> checks performed by io_wq_enqueue and io_wq_dec_running.
> Therefore, the problem can be resolved by checking for available
> workers in io_wq_dec_running.
>
> Signed-off-by: Fengnan Chang <changfengnan@bytedance.com>
> Reviewed-by: Diangang Li <lidiangang@bytedance.com>
> ---
>  io_uring/io-wq.c | 8 ++++++++
>  1 file changed, 8 insertions(+)
>
> diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
> index be91edf34f01..17dfaa0395c4 100644
> --- a/io_uring/io-wq.c
> +++ b/io_uring/io-wq.c
> @@ -357,6 +357,13 @@ static void create_worker_cb(struct callback_head *cb)
>         worker = container_of(cb, struct io_worker, create_work);
>         wq = worker->wq;
>         acct = worker->acct;
> +
> +       rcu_read_lock();
> +       do_create = !io_acct_activate_free_worker(acct);
> +       rcu_read_unlock();
> +       if (!do_create)
> +               goto no_need_create;
> +
>         raw_spin_lock(&acct->workers_lock);
>
>         if (acct->nr_workers < acct->max_workers) {
> @@ -367,6 +374,7 @@ static void create_worker_cb(struct callback_head *cb)
>         if (do_create) {
>                 create_io_worker(wq, acct);
>         } else {
> +no_need_create:
>                 atomic_dec(&acct->nr_running);
>                 io_worker_ref_put(wq);
>         }
> --
> 2.20.1
>

[-- Attachment #2: io_uring_test.c --]
[-- Type: application/octet-stream, Size: 13828 bytes --]

#include <stdio.h>
#include <stdlib.h> // For atoi, malloc, free, exit
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <liburing.h>
#include <sys/uio.h> // For iovec
#include <stdbool.h> // For bool type
#include <stdint.h>  // For uintptr_t
#include <limits.h>

// Default values if not overridden by calculation
#define DEFAULT_QUEUE_DEPTH_FACTOR 4 // Factor to multiply num_files by for queue depth
#define MIN_QUEUE_DEPTH 64           // Minimum queue depth

#define BUFFER_SIZE 4096        // 每个文件的写入缓冲区大小

// User data tags (using the least significant bit)
#define WRITE_TAG 0UL
#define FSYNC_TAG 1UL // Only used if fsync is enabled

// 用于跟踪每个文件请求状态的结构
typedef struct file_request {
    int fd;
    struct iovec iov;
    char buffer[BUFFER_SIZE];
    char filename[64];
    int cycles_done;
    bool write_completed_this_cycle;
    bool fsync_completed_this_cycle; // Only relevant if fsync is performed
    off_t current_offset;
} file_request_t;

// Helper function to submit the next write and optionally fsync cycle
// Returns the number of SQEs prepared (1 or 2)
static int submit_next_cycle(file_request_t *req, struct io_uring *ring, int target_cycles_per_file, bool perform_fsync) {
    struct io_uring_sqe *sqe_w, *sqe_f;
    int submitted_count = 0;

    if (req->cycles_done >= target_cycles_per_file) {
        return 0;
    }

    // Reset cycle completion flags
    req->write_completed_this_cycle = false;
    // Only reset fsync flag if we are actually doing fsyncs
    if (perform_fsync) {
        req->fsync_completed_this_cycle = false;
    } else {
        // If not doing fsync, mark it as "complete" trivially
        req->fsync_completed_this_cycle = true;
    }


    // 1. Prepare Write SQE
    sqe_w = io_uring_get_sqe(ring);
    if (!sqe_w) {
        fprintf(stderr, "Could not get SQE for write (cycle %d) for %s\n", req->cycles_done + 1, req->filename);
        return 0;
    }

    req->iov.iov_base = req->buffer;
    req->iov.iov_len = BUFFER_SIZE;
    io_uring_prep_writev(sqe_w, req->fd, &req->iov, 1, req->current_offset);
    req->current_offset += BUFFER_SIZE;
    io_uring_sqe_set_data(sqe_w, (void*)(((uintptr_t)req) | WRITE_TAG));
    submitted_count++;
    // printf("Prepared write SQE for %s (cycle %d)\n", req->filename, req->cycles_done + 1);


    // 2. Prepare Fsync SQE (Conditional)
    if (perform_fsync) {
        sqe_f = io_uring_get_sqe(ring);
        if (!sqe_f) {
            fprintf(stderr, "Could not get SQE for fsync (cycle %d) for %s (write SQE already prepared)\n", req->cycles_done + 1, req->filename);
            // Return 1, indicating only write was prepared successfully in this attempt
            return submitted_count;
        }

        io_uring_prep_fsync(sqe_f, req->fd, 0);
        io_uring_sqe_set_data(sqe_f, (void*)(((uintptr_t)req) | FSYNC_TAG));
        submitted_count++;
        // printf("Prepared fsync SQE for %s (cycle %d)\n", req->filename, req->cycles_done + 1);
    }

    return submitted_count;
}


int main(int argc, char *argv[]) {
    if (argc != 4) {
        fprintf(stderr, "Usage: %s <num_files> <cycles_per_file> <do_fsync (0 or 1)>\n", argv[0]);
        return 1;
    }

    // Parse command-line arguments
    errno = 0;
    long num_files_long = strtol(argv[1], NULL, 10);
    if (errno != 0 || num_files_long <= 0 || num_files_long > INT_MAX) {
         fprintf(stderr, "Error: Invalid number of files '%s'. Must be a positive integer.\n", argv[1]);
         return 1;
    }
    int num_files = (int)num_files_long;

    errno = 0;
    long cycles_per_file_long = strtol(argv[2], NULL, 10);
     if (errno != 0 || cycles_per_file_long <= 0 || cycles_per_file_long > INT_MAX) {
         fprintf(stderr, "Error: Invalid number of cycles per file '%s'. Must be a positive integer.\n", argv[2]);
         return 1;
    }
    int cycles_per_file = (int)cycles_per_file_long;

    errno = 0;
    long do_fsync_long = strtol(argv[3], NULL, 10);
    if (errno != 0 || (do_fsync_long != 0 && do_fsync_long != 1)) {
        fprintf(stderr, "Error: Invalid value for do_fsync '%s'. Must be 0 or 1.\n", argv[3]);
        return 1;
    }
    bool perform_fsync = (do_fsync_long == 1);


    printf("Configuration: Files=%d, Cycles/File=%d, Fsync=%s\n",
           num_files, cycles_per_file, perform_fsync ? "Enabled" : "Disabled");


    struct io_uring ring;
    int ret;
    file_request_t *requests = NULL;
    int total_required_cycles = num_files * cycles_per_file;
    int total_completed_cycles = 0;
    int inflight_ops = 0;

    // Calculate queue depth (remains the same logic, factor of 4 is generally safe)
    unsigned int queue_depth = num_files * DEFAULT_QUEUE_DEPTH_FACTOR;
    if (queue_depth < MIN_QUEUE_DEPTH) {
        queue_depth = MIN_QUEUE_DEPTH;
    }
     unsigned int power_of_2_queue_depth = 1;
     while(power_of_2_queue_depth < queue_depth) {
         power_of_2_queue_depth <<= 1;
     }
     queue_depth = power_of_2_queue_depth;


    requests = malloc(num_files * sizeof(file_request_t));
    if (!requests) {
        perror("Failed to allocate memory for requests");
        return 1;
    }
    for (int i = 0; i < num_files; ++i) {
        requests[i].fd = -1;
    }


    // 1. 初始化 io_uring 实例
    ret = io_uring_queue_init(queue_depth, &ring, 0);
    if (ret < 0) {
        perror("io_uring_queue_init failed");
        free(requests);
        return 1;
    }
    printf("io_uring initialized with queue depth %u.\n", queue_depth);

    // 2. 准备文件和请求结构
    for (int i = 0; i < num_files; ++i) {
        snprintf(requests[i].filename, sizeof(requests[i].filename), "test_file_%d.dat", i);
        requests[i].fd = open(requests[i].filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
        if (requests[i].fd < 0) {
            perror("Failed to open file");
            fprintf(stderr, "Error opening %s\n", requests[i].filename);
            goto cleanup;
        }
        requests[i].cycles_done = 0;
        requests[i].write_completed_this_cycle = false;
        // Initialize fsync flag based on whether we perform fsync
        requests[i].fsync_completed_this_cycle = !perform_fsync; // True if not doing fsync
        requests[i].current_offset = 0;
        memset(requests[i].buffer, 'A' + (i % 26), BUFFER_SIZE);
        printf("Opened %s (fd: %d)\n", requests[i].filename, requests[i].fd);
    }

    // 3. 提交所有文件的第一个周期的请求
    int expected_ops_per_file = perform_fsync ? 2 : 1;
    printf("Submitting initial %d requests (%d total SQEs)...\n", num_files, num_files * expected_ops_per_file);
    int initial_sqes_prepared = 0;
    for (int i = 0; i < num_files; ++i) {
        int prepared = submit_next_cycle(&requests[i], &ring, cycles_per_file, perform_fsync);
        if (prepared == expected_ops_per_file) {
            inflight_ops += prepared;
            initial_sqes_prepared += prepared;
        } else {
            // This indicates a potential issue (e.g., couldn't get enough SQEs)
             fprintf(stderr, "Error: Failed to prepare full initial cycle for %s (prepared %d/%d SQEs)\n",
                     requests[i].filename, prepared, expected_ops_per_file);
             // If even the write failed (prepared == 0), it's critical.
             // If only fsync failed (prepared == 1 when expected 2), maybe less critical but indicates queue pressure.
            goto cleanup;
        }
    }

    ret = io_uring_submit(&ring);
    if (ret < initial_sqes_prepared) {
        fprintf(stderr, "io_uring_submit failed or submitted less than expected: %d / %d\n", ret, initial_sqes_prepared);
        if (ret < 0) {
             errno = -ret;
             perror("io_uring_submit error");
        }
        goto cleanup;
    }
    printf("Successfully submitted initial %d SQEs.\n", ret);

    // 4. 事件循环:等待并处理完成事件 (CQE)
    printf("Entering completion loop... Target cycles: %d\n", total_required_cycles);
    while (total_completed_cycles < total_required_cycles) {
        struct io_uring_cqe *cqe;
        int submitted_now = 0;

        ret = io_uring_wait_cqe(&ring, &cqe);
         if (ret < 0) {
            errno = -ret;
            if (errno == EINTR) continue;
            perror("io_uring_wait_cqe failed");
            goto cleanup;
        }

        uintptr_t data = (uintptr_t)io_uring_cqe_get_data(cqe);
        file_request_t *req = (file_request_t*)(data & ~1UL);
        uintptr_t tag = data & 1UL;

        if (req < requests || req >= requests + num_files) {
             fprintf(stderr, "Error: Invalid user data pointer received from CQE!\n");
             io_uring_cqe_seen(&ring, cqe);
             continue;
        }

        inflight_ops--;

        if (cqe->res < 0) {
            errno = -cqe->res;
            fprintf(stderr, "Error in async operation for %s (cycle %d, %s): %s (%d)\n",
                    req->filename, req->cycles_done + 1, (tag == WRITE_TAG ? "write" : "fsync"),
                    strerror(errno), errno);

            if (req->cycles_done < cycles_per_file) {
                int remaining_cycles = cycles_per_file - req->cycles_done;
                total_completed_cycles += remaining_cycles;
                req->cycles_done = cycles_per_file; // Mark file as finished
                 printf("Cycle %d for %s failed. Skipping remaining cycles. Total completed cycles: %d/%d\n",
                        req->cycles_done /* already incremented implicitly */, req->filename, total_completed_cycles, total_required_cycles);
            }

        } else {
            // Operation succeeded
            if (tag == WRITE_TAG) {
                if (cqe->res != BUFFER_SIZE) {
                     fprintf(stderr, "Warning: Partial write for %s (cycle %d): %d / %ld bytes\n",
                             req->filename, req->cycles_done + 1, cqe->res, (long)BUFFER_SIZE);
                }
                // printf("Write completed for %s (cycle %d)\n", req->filename, req->cycles_done + 1);
                req->write_completed_this_cycle = true;

            } else if (tag == FSYNC_TAG) { // Only possible if perform_fsync is true
                // printf("Fsync completed for %s (cycle %d)\n", req->filename, req->cycles_done + 1);
                req->fsync_completed_this_cycle = true;
            } else {
                 // Should not happen with current tags
                 fprintf(stderr, "Internal error: Unknown tag %lu received for %s\n", (unsigned long)tag, req->filename);
            }

            // Check if the cycle is complete
            // Condition: Write must be done. Fsync must be done *if* we are performing fsync.
            if (req->write_completed_this_cycle && req->fsync_completed_this_cycle) {
                // printf("Cycle %d completed for %s.\n", req->cycles_done + 1, req->filename);
                req->cycles_done++;
                total_completed_cycles++;
                if (total_completed_cycles % 100 == 0 || total_completed_cycles == total_required_cycles) {
                     printf("Progress: %d / %d cycles completed.\n", total_completed_cycles, total_required_cycles);
                }

                // If more cycles are needed, submit the next cycle
                if (req->cycles_done < cycles_per_file) {
                    int prepared = submit_next_cycle(req, &ring, cycles_per_file, perform_fsync);
                    int expected_prepared = perform_fsync ? 2 : 1;

                    if (prepared == expected_prepared) {
                        inflight_ops += prepared;
                        submitted_now += prepared;
                        // printf("Submitted next cycle (%d) for %s\n", req->cycles_done + 1, req->filename);
                    } else {
                         fprintf(stderr, "Error: Failed to prepare full next cycle (%d) for %s (prepared %d/%d SQEs). Stopping cycles for this file.\n",
                                 req->cycles_done + 1, req->filename, prepared, expected_prepared);
                         int remaining_cycles = cycles_per_file - req->cycles_done;
                         total_completed_cycles += remaining_cycles;
                         req->cycles_done = cycles_per_file;
                         printf("Adjusted total completed cycles: %d/%d\n", total_completed_cycles, total_required_cycles);
                    }
                } else {
                     // printf("All %d cycles finished for %s.\n", cycles_per_file, req->filename);
                }
            }
        }

        io_uring_cqe_seen(&ring, cqe);

        if (submitted_now > 0) {
            ret = io_uring_submit(&ring);
            if (ret < submitted_now) {
                 fprintf(stderr, "io_uring_submit failed after completion: submitted %d / %d\n", ret, submitted_now);
                 if (ret < 0) {
                     errno = -ret;
                     perror("io_uring_submit error");
                 }
                 goto cleanup;
            }
             // printf("Submitted %d new SQEs.\n", ret);
        }
    }

    printf("All %d required cycles completed across %d files.\n", total_required_cycles, num_files);

cleanup:
    printf("Cleaning up...\n");
    if (requests) {
        for (int i = 0; i < num_files; ++i) {
            if (requests[i].fd >= 0) {
                close(requests[i].fd);
            }
        }
        free(requests);
        requests = NULL;
    }
    io_uring_queue_exit(&ring);
    printf("io_uring exited.\n");

    if (total_completed_cycles < total_required_cycles) {
         fprintf(stderr, "Warning: Exited before all cycles completed (%d/%d).\n",
                 total_completed_cycles, total_required_cycles);
         return 1;
    }

    return 0;
}

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [PATCH] io_uring/io-wq: add check free worker before create new worker
  2025-08-13 12:02 [PATCH] io_uring/io-wq: add check free worker before create new worker Fengnan Chang
  2025-08-13 12:04 ` Fengnan Chang
@ 2025-08-13 12:31 ` Jens Axboe
  2025-08-13 12:32 ` Jens Axboe
  2 siblings, 0 replies; 4+ messages in thread
From: Jens Axboe @ 2025-08-13 12:31 UTC (permalink / raw)
  To: Fengnan Chang, io-uring; +Cc: Diangang Li

On 8/13/25 6:02 AM, Fengnan Chang wrote:
> After commit 0b2b066f8a85 ("io_uring/io-wq: only create a new worker
> if it can make progress"), in our produce environment, we still
> observe that part of io_worker threads keeps creating and destroying.
> After analysis, it was confirmed that this was due to a more complex
> scenario involving a large number of fsync operations, which can be
> abstracted as frequent write + fsync operations on multiple files in
> a single uring instance. Since write is a hash operation while fsync
> is not, and fsync is likely to be suspended during execution, the
> action of checking the hash value in
> io_wqe_dec_running cannot handle such scenarios.
> Similarly, if hash-based work and non-hash-based work are sent at the
> same time, similar issues are likely to occur.
> Returning to the starting point of the issue, when a new work
> arrives, io_wq_enqueue may wake up free worker A, while
> io_wq_dec_running may create worker B. Ultimately, only one of A and
> B can obtain and process the task, leaving the other in an idle
> state. In the end, the issue is caused by inconsistent logic in the
> checks performed by io_wq_enqueue and io_wq_dec_running.
> Therefore, the problem can be resolved by checking for available
> workers in io_wq_dec_running.

Good catch, and thanks for sending a test case as well!

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [PATCH] io_uring/io-wq: add check free worker before create new worker
  2025-08-13 12:02 [PATCH] io_uring/io-wq: add check free worker before create new worker Fengnan Chang
  2025-08-13 12:04 ` Fengnan Chang
  2025-08-13 12:31 ` Jens Axboe
@ 2025-08-13 12:32 ` Jens Axboe
  2 siblings, 0 replies; 4+ messages in thread
From: Jens Axboe @ 2025-08-13 12:32 UTC (permalink / raw)
  To: io-uring, Fengnan Chang; +Cc: Diangang Li


On Wed, 13 Aug 2025 20:02:14 +0800, Fengnan Chang wrote:
> After commit 0b2b066f8a85 ("io_uring/io-wq: only create a new worker
> if it can make progress"), in our produce environment, we still
> observe that part of io_worker threads keeps creating and destroying.
> After analysis, it was confirmed that this was due to a more complex
> scenario involving a large number of fsync operations, which can be
> abstracted as frequent write + fsync operations on multiple files in
> a single uring instance. Since write is a hash operation while fsync
> is not, and fsync is likely to be suspended during execution, the
> action of checking the hash value in
> io_wqe_dec_running cannot handle such scenarios.
> Similarly, if hash-based work and non-hash-based work are sent at the
> same time, similar issues are likely to occur.
> Returning to the starting point of the issue, when a new work
> arrives, io_wq_enqueue may wake up free worker A, while
> io_wq_dec_running may create worker B. Ultimately, only one of A and
> B can obtain and process the task, leaving the other in an idle
> state. In the end, the issue is caused by inconsistent logic in the
> checks performed by io_wq_enqueue and io_wq_dec_running.
> Therefore, the problem can be resolved by checking for available
> workers in io_wq_dec_running.
> 
> [...]

Applied, thanks!

[1/1] io_uring/io-wq: add check free worker before create new worker
      commit: 9d83e1f05c98bab5de350bef89177e2be8b34db0

Best regards,
-- 
Jens Axboe




^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2025-08-13 12:32 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2025-08-13 12:02 [PATCH] io_uring/io-wq: add check free worker before create new worker Fengnan Chang
2025-08-13 12:04 ` Fengnan Chang
2025-08-13 12:31 ` Jens Axboe
2025-08-13 12:32 ` Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox