* Re: [PATCH] io_uring/io-wq: add check free worker before create new worker
2025-08-13 12:02 [PATCH] io_uring/io-wq: add check free worker before create new worker Fengnan Chang
@ 2025-08-13 12:04 ` Fengnan Chang
2025-08-13 12:31 ` Jens Axboe
2025-08-13 12:32 ` Jens Axboe
2 siblings, 0 replies; 4+ messages in thread
From: Fengnan Chang @ 2025-08-13 12:04 UTC (permalink / raw)
To: axboe, io-uring; +Cc: Diangang Li
[-- Attachment #1: Type: text/plain, Size: 2564 bytes --]
Here's my test program.
Fengnan Chang <changfengnan@bytedance.com> 于2025年8月13日周三 20:02写道:
>
> After commit 0b2b066f8a85 ("io_uring/io-wq: only create a new worker
> if it can make progress"), in our produce environment, we still
> observe that part of io_worker threads keeps creating and destroying.
> After analysis, it was confirmed that this was due to a more complex
> scenario involving a large number of fsync operations, which can be
> abstracted as frequent write + fsync operations on multiple files in
> a single uring instance. Since write is a hash operation while fsync
> is not, and fsync is likely to be suspended during execution, the
> action of checking the hash value in
> io_wqe_dec_running cannot handle such scenarios.
> Similarly, if hash-based work and non-hash-based work are sent at the
> same time, similar issues are likely to occur.
> Returning to the starting point of the issue, when a new work
> arrives, io_wq_enqueue may wake up free worker A, while
> io_wq_dec_running may create worker B. Ultimately, only one of A and
> B can obtain and process the task, leaving the other in an idle
> state. In the end, the issue is caused by inconsistent logic in the
> checks performed by io_wq_enqueue and io_wq_dec_running.
> Therefore, the problem can be resolved by checking for available
> workers in io_wq_dec_running.
>
> Signed-off-by: Fengnan Chang <changfengnan@bytedance.com>
> Reviewed-by: Diangang Li <lidiangang@bytedance.com>
> ---
> io_uring/io-wq.c | 8 ++++++++
> 1 file changed, 8 insertions(+)
>
> diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
> index be91edf34f01..17dfaa0395c4 100644
> --- a/io_uring/io-wq.c
> +++ b/io_uring/io-wq.c
> @@ -357,6 +357,13 @@ static void create_worker_cb(struct callback_head *cb)
> worker = container_of(cb, struct io_worker, create_work);
> wq = worker->wq;
> acct = worker->acct;
> +
> + rcu_read_lock();
> + do_create = !io_acct_activate_free_worker(acct);
> + rcu_read_unlock();
> + if (!do_create)
> + goto no_need_create;
> +
> raw_spin_lock(&acct->workers_lock);
>
> if (acct->nr_workers < acct->max_workers) {
> @@ -367,6 +374,7 @@ static void create_worker_cb(struct callback_head *cb)
> if (do_create) {
> create_io_worker(wq, acct);
> } else {
> +no_need_create:
> atomic_dec(&acct->nr_running);
> io_worker_ref_put(wq);
> }
> --
> 2.20.1
>
[-- Attachment #2: io_uring_test.c --]
[-- Type: application/octet-stream, Size: 13828 bytes --]
#include <stdio.h>
#include <stdlib.h> // For atoi, malloc, free, exit
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <liburing.h>
#include <sys/uio.h> // For iovec
#include <stdbool.h> // For bool type
#include <stdint.h> // For uintptr_t
#include <limits.h>
// Default values if not overridden by calculation
#define DEFAULT_QUEUE_DEPTH_FACTOR 4 // Factor to multiply num_files by for queue depth
#define MIN_QUEUE_DEPTH 64 // Minimum queue depth
#define BUFFER_SIZE 4096 // 每个文件的写入缓冲区大小
// User data tags (using the least significant bit)
#define WRITE_TAG 0UL
#define FSYNC_TAG 1UL // Only used if fsync is enabled
// 用于跟踪每个文件请求状态的结构
typedef struct file_request {
int fd;
struct iovec iov;
char buffer[BUFFER_SIZE];
char filename[64];
int cycles_done;
bool write_completed_this_cycle;
bool fsync_completed_this_cycle; // Only relevant if fsync is performed
off_t current_offset;
} file_request_t;
// Helper function to submit the next write and optionally fsync cycle
// Returns the number of SQEs prepared (1 or 2)
static int submit_next_cycle(file_request_t *req, struct io_uring *ring, int target_cycles_per_file, bool perform_fsync) {
struct io_uring_sqe *sqe_w, *sqe_f;
int submitted_count = 0;
if (req->cycles_done >= target_cycles_per_file) {
return 0;
}
// Reset cycle completion flags
req->write_completed_this_cycle = false;
// Only reset fsync flag if we are actually doing fsyncs
if (perform_fsync) {
req->fsync_completed_this_cycle = false;
} else {
// If not doing fsync, mark it as "complete" trivially
req->fsync_completed_this_cycle = true;
}
// 1. Prepare Write SQE
sqe_w = io_uring_get_sqe(ring);
if (!sqe_w) {
fprintf(stderr, "Could not get SQE for write (cycle %d) for %s\n", req->cycles_done + 1, req->filename);
return 0;
}
req->iov.iov_base = req->buffer;
req->iov.iov_len = BUFFER_SIZE;
io_uring_prep_writev(sqe_w, req->fd, &req->iov, 1, req->current_offset);
req->current_offset += BUFFER_SIZE;
io_uring_sqe_set_data(sqe_w, (void*)(((uintptr_t)req) | WRITE_TAG));
submitted_count++;
// printf("Prepared write SQE for %s (cycle %d)\n", req->filename, req->cycles_done + 1);
// 2. Prepare Fsync SQE (Conditional)
if (perform_fsync) {
sqe_f = io_uring_get_sqe(ring);
if (!sqe_f) {
fprintf(stderr, "Could not get SQE for fsync (cycle %d) for %s (write SQE already prepared)\n", req->cycles_done + 1, req->filename);
// Return 1, indicating only write was prepared successfully in this attempt
return submitted_count;
}
io_uring_prep_fsync(sqe_f, req->fd, 0);
io_uring_sqe_set_data(sqe_f, (void*)(((uintptr_t)req) | FSYNC_TAG));
submitted_count++;
// printf("Prepared fsync SQE for %s (cycle %d)\n", req->filename, req->cycles_done + 1);
}
return submitted_count;
}
int main(int argc, char *argv[]) {
if (argc != 4) {
fprintf(stderr, "Usage: %s <num_files> <cycles_per_file> <do_fsync (0 or 1)>\n", argv[0]);
return 1;
}
// Parse command-line arguments
errno = 0;
long num_files_long = strtol(argv[1], NULL, 10);
if (errno != 0 || num_files_long <= 0 || num_files_long > INT_MAX) {
fprintf(stderr, "Error: Invalid number of files '%s'. Must be a positive integer.\n", argv[1]);
return 1;
}
int num_files = (int)num_files_long;
errno = 0;
long cycles_per_file_long = strtol(argv[2], NULL, 10);
if (errno != 0 || cycles_per_file_long <= 0 || cycles_per_file_long > INT_MAX) {
fprintf(stderr, "Error: Invalid number of cycles per file '%s'. Must be a positive integer.\n", argv[2]);
return 1;
}
int cycles_per_file = (int)cycles_per_file_long;
errno = 0;
long do_fsync_long = strtol(argv[3], NULL, 10);
if (errno != 0 || (do_fsync_long != 0 && do_fsync_long != 1)) {
fprintf(stderr, "Error: Invalid value for do_fsync '%s'. Must be 0 or 1.\n", argv[3]);
return 1;
}
bool perform_fsync = (do_fsync_long == 1);
printf("Configuration: Files=%d, Cycles/File=%d, Fsync=%s\n",
num_files, cycles_per_file, perform_fsync ? "Enabled" : "Disabled");
struct io_uring ring;
int ret;
file_request_t *requests = NULL;
int total_required_cycles = num_files * cycles_per_file;
int total_completed_cycles = 0;
int inflight_ops = 0;
// Calculate queue depth (remains the same logic, factor of 4 is generally safe)
unsigned int queue_depth = num_files * DEFAULT_QUEUE_DEPTH_FACTOR;
if (queue_depth < MIN_QUEUE_DEPTH) {
queue_depth = MIN_QUEUE_DEPTH;
}
unsigned int power_of_2_queue_depth = 1;
while(power_of_2_queue_depth < queue_depth) {
power_of_2_queue_depth <<= 1;
}
queue_depth = power_of_2_queue_depth;
requests = malloc(num_files * sizeof(file_request_t));
if (!requests) {
perror("Failed to allocate memory for requests");
return 1;
}
for (int i = 0; i < num_files; ++i) {
requests[i].fd = -1;
}
// 1. 初始化 io_uring 实例
ret = io_uring_queue_init(queue_depth, &ring, 0);
if (ret < 0) {
perror("io_uring_queue_init failed");
free(requests);
return 1;
}
printf("io_uring initialized with queue depth %u.\n", queue_depth);
// 2. 准备文件和请求结构
for (int i = 0; i < num_files; ++i) {
snprintf(requests[i].filename, sizeof(requests[i].filename), "test_file_%d.dat", i);
requests[i].fd = open(requests[i].filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (requests[i].fd < 0) {
perror("Failed to open file");
fprintf(stderr, "Error opening %s\n", requests[i].filename);
goto cleanup;
}
requests[i].cycles_done = 0;
requests[i].write_completed_this_cycle = false;
// Initialize fsync flag based on whether we perform fsync
requests[i].fsync_completed_this_cycle = !perform_fsync; // True if not doing fsync
requests[i].current_offset = 0;
memset(requests[i].buffer, 'A' + (i % 26), BUFFER_SIZE);
printf("Opened %s (fd: %d)\n", requests[i].filename, requests[i].fd);
}
// 3. 提交所有文件的第一个周期的请求
int expected_ops_per_file = perform_fsync ? 2 : 1;
printf("Submitting initial %d requests (%d total SQEs)...\n", num_files, num_files * expected_ops_per_file);
int initial_sqes_prepared = 0;
for (int i = 0; i < num_files; ++i) {
int prepared = submit_next_cycle(&requests[i], &ring, cycles_per_file, perform_fsync);
if (prepared == expected_ops_per_file) {
inflight_ops += prepared;
initial_sqes_prepared += prepared;
} else {
// This indicates a potential issue (e.g., couldn't get enough SQEs)
fprintf(stderr, "Error: Failed to prepare full initial cycle for %s (prepared %d/%d SQEs)\n",
requests[i].filename, prepared, expected_ops_per_file);
// If even the write failed (prepared == 0), it's critical.
// If only fsync failed (prepared == 1 when expected 2), maybe less critical but indicates queue pressure.
goto cleanup;
}
}
ret = io_uring_submit(&ring);
if (ret < initial_sqes_prepared) {
fprintf(stderr, "io_uring_submit failed or submitted less than expected: %d / %d\n", ret, initial_sqes_prepared);
if (ret < 0) {
errno = -ret;
perror("io_uring_submit error");
}
goto cleanup;
}
printf("Successfully submitted initial %d SQEs.\n", ret);
// 4. 事件循环:等待并处理完成事件 (CQE)
printf("Entering completion loop... Target cycles: %d\n", total_required_cycles);
while (total_completed_cycles < total_required_cycles) {
struct io_uring_cqe *cqe;
int submitted_now = 0;
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
errno = -ret;
if (errno == EINTR) continue;
perror("io_uring_wait_cqe failed");
goto cleanup;
}
uintptr_t data = (uintptr_t)io_uring_cqe_get_data(cqe);
file_request_t *req = (file_request_t*)(data & ~1UL);
uintptr_t tag = data & 1UL;
if (req < requests || req >= requests + num_files) {
fprintf(stderr, "Error: Invalid user data pointer received from CQE!\n");
io_uring_cqe_seen(&ring, cqe);
continue;
}
inflight_ops--;
if (cqe->res < 0) {
errno = -cqe->res;
fprintf(stderr, "Error in async operation for %s (cycle %d, %s): %s (%d)\n",
req->filename, req->cycles_done + 1, (tag == WRITE_TAG ? "write" : "fsync"),
strerror(errno), errno);
if (req->cycles_done < cycles_per_file) {
int remaining_cycles = cycles_per_file - req->cycles_done;
total_completed_cycles += remaining_cycles;
req->cycles_done = cycles_per_file; // Mark file as finished
printf("Cycle %d for %s failed. Skipping remaining cycles. Total completed cycles: %d/%d\n",
req->cycles_done /* already incremented implicitly */, req->filename, total_completed_cycles, total_required_cycles);
}
} else {
// Operation succeeded
if (tag == WRITE_TAG) {
if (cqe->res != BUFFER_SIZE) {
fprintf(stderr, "Warning: Partial write for %s (cycle %d): %d / %ld bytes\n",
req->filename, req->cycles_done + 1, cqe->res, (long)BUFFER_SIZE);
}
// printf("Write completed for %s (cycle %d)\n", req->filename, req->cycles_done + 1);
req->write_completed_this_cycle = true;
} else if (tag == FSYNC_TAG) { // Only possible if perform_fsync is true
// printf("Fsync completed for %s (cycle %d)\n", req->filename, req->cycles_done + 1);
req->fsync_completed_this_cycle = true;
} else {
// Should not happen with current tags
fprintf(stderr, "Internal error: Unknown tag %lu received for %s\n", (unsigned long)tag, req->filename);
}
// Check if the cycle is complete
// Condition: Write must be done. Fsync must be done *if* we are performing fsync.
if (req->write_completed_this_cycle && req->fsync_completed_this_cycle) {
// printf("Cycle %d completed for %s.\n", req->cycles_done + 1, req->filename);
req->cycles_done++;
total_completed_cycles++;
if (total_completed_cycles % 100 == 0 || total_completed_cycles == total_required_cycles) {
printf("Progress: %d / %d cycles completed.\n", total_completed_cycles, total_required_cycles);
}
// If more cycles are needed, submit the next cycle
if (req->cycles_done < cycles_per_file) {
int prepared = submit_next_cycle(req, &ring, cycles_per_file, perform_fsync);
int expected_prepared = perform_fsync ? 2 : 1;
if (prepared == expected_prepared) {
inflight_ops += prepared;
submitted_now += prepared;
// printf("Submitted next cycle (%d) for %s\n", req->cycles_done + 1, req->filename);
} else {
fprintf(stderr, "Error: Failed to prepare full next cycle (%d) for %s (prepared %d/%d SQEs). Stopping cycles for this file.\n",
req->cycles_done + 1, req->filename, prepared, expected_prepared);
int remaining_cycles = cycles_per_file - req->cycles_done;
total_completed_cycles += remaining_cycles;
req->cycles_done = cycles_per_file;
printf("Adjusted total completed cycles: %d/%d\n", total_completed_cycles, total_required_cycles);
}
} else {
// printf("All %d cycles finished for %s.\n", cycles_per_file, req->filename);
}
}
}
io_uring_cqe_seen(&ring, cqe);
if (submitted_now > 0) {
ret = io_uring_submit(&ring);
if (ret < submitted_now) {
fprintf(stderr, "io_uring_submit failed after completion: submitted %d / %d\n", ret, submitted_now);
if (ret < 0) {
errno = -ret;
perror("io_uring_submit error");
}
goto cleanup;
}
// printf("Submitted %d new SQEs.\n", ret);
}
}
printf("All %d required cycles completed across %d files.\n", total_required_cycles, num_files);
cleanup:
printf("Cleaning up...\n");
if (requests) {
for (int i = 0; i < num_files; ++i) {
if (requests[i].fd >= 0) {
close(requests[i].fd);
}
}
free(requests);
requests = NULL;
}
io_uring_queue_exit(&ring);
printf("io_uring exited.\n");
if (total_completed_cycles < total_required_cycles) {
fprintf(stderr, "Warning: Exited before all cycles completed (%d/%d).\n",
total_completed_cycles, total_required_cycles);
return 1;
}
return 0;
}
^ permalink raw reply [flat|nested] 4+ messages in thread