* Question about the optimal receiving TCP streams via io_uring
@ 2020-10-27 11:01 Frederich, Jens
0 siblings, 0 replies; only message in thread
From: Frederich, Jens @ 2020-10-27 11:01 UTC (permalink / raw)
To: [email protected]
Hello,
I would like to receive n 10 Gbps TCP or UDP streams (jumbo frames) as fast as possible and write each socket stream to a file on a fast XFS storage. How can I optimally implement this with io_uring? I want to use io_uring for network and file IO and the CPU load should keeping low. I would like to know your opinions. My first naive implementation looks like this, But I can't get more than 1Gbps through by one TCP stream:
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <unistd.h>
#include "liburing.h"
#define MAX_CONNECTIONS 4096
#define BACKLOG 512
#define MAX_MESSAGE_LEN 9000
#define BUFFERS_COUNT MAX_CONNECTIONS
struct Stream_Server {
int port;
struct io_uring_params ring_params;
struct io_uring ring;
int socket_listen_fd;
struct sockaddr_in next_client_address;
socklen_t next_client_address_size;
uint64_t total_cqe_count;
struct Data_Analyzer *data_analyzer;
};
enum {
ACCEPT,
READ,
WRITE,
PROVIDE_BUFFERS,
};
typedef struct conn_info {
__u32 fd;
__u16 type;
__u16 bid;
} conn_info;
char bufs[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0};
int socket_buffers_group_id = 1337;
int mdf_file_buffers_group_id = 1338;
void make_accept_sqe_and_submit(struct io_uring *ring, int fd, struct sockaddr *client_address, socklen_t *client_address_size, __u8 flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, fd, client_address, client_address_size, 0);
io_uring_sqe_set_flags(sqe, flags);
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = fd;
conn_i->type = ACCEPT;
conn_i->bid = 0;
}
void make_socket_read_sqe_and_submit(struct io_uring *ring, int fd, unsigned gid, size_t message_size, __u8 flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recv(sqe, fd, NULL, message_size, 0);
io_uring_sqe_set_flags(sqe, flags);
sqe->buf_group = gid;
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = fd;
conn_i->type = READ;
conn_i->bid = 0;
}
void make_socket_write_sqe_and_submit(struct io_uring *ring, int fd, __u16 bid, size_t message_size, __u8 flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_send(sqe, fd, &bufs[bid], message_size, 0);
io_uring_sqe_set_flags(sqe, flags);
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = fd;
conn_i->type = WRITE;
conn_i->bid = bid;
}
// @Temporary: support n file streams
int outfd = -1;
off_t file_offset = 0;
int file_index = 0;
void make_file_write_sqe_and_submit(struct io_uring *ring, int socket_fd, __u16 bid, size_t message_size, off_t file_offset, __u8 flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_write(sqe, outfd, &bufs[bid], message_size, file_offset);
io_uring_sqe_set_flags(sqe, flags);
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = socket_fd;
conn_i->type = WRITE;
conn_i->bid = bid;
}
void make_provide_buffers_sqe_and_submit(struct io_uring *ring, __u16 bid, unsigned gid) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_provide_buffers(sqe, bufs[bid], MAX_MESSAGE_LEN, 1, gid, bid);
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = 0;
conn_i->type = PROVIDE_BUFFERS;
conn_i->bid = 0;
}
struct Stream_Server *stream_server = NULL;
void main_loop_process_cqes() {
struct io_uring_cqe *cqe;
unsigned head;
stream_server->total_cqe_count = 0;
while (1) {
uint64_t cqe_count = 0;
io_uring_submit_and_wait(&stream_server->ring, 1);
//io_uring_submit(&stream_server->ring);
io_uring_for_each_cqe(&stream_server->ring, head, cqe) {
cqe_count += 1;
conn_info *conn_i = (conn_info *)&cqe->user_data;
if (cqe->res == -ENOBUFS) {
fprintf(stdout, "bufs in automatic buffer selection empty, this should not happen...\n");
fflush(stdout);
exit(1);
} else if (conn_i->type == PROVIDE_BUFFERS) {
if (cqe->res < 0) {
printf("cqe->res = %d\n", cqe->res);
exit(1);
}
} else if (conn_i->type == ACCEPT) {
int sock_conn_fd = cqe->res;
if (sock_conn_fd >= 0) {
outfd = open("/brick_storage/test_io_file.out", O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (outfd < 0) {
perror("open outfile");
exit(1);
}
make_socket_read_sqe_and_submit(&stream_server->ring, sock_conn_fd, socket_buffers_group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
}
// new connected client; read data from socket and re-add accept to monitor for new connections
make_accept_sqe_and_submit(&stream_server->ring, stream_server->socket_listen_fd, (struct sockaddr *)&stream_server->next_client_address, &stream_server->next_client_address_size, 0);
} else if (conn_i->type == READ) {
int bytes_read = cqe->res;
if (cqe->res <= 0) {
// connection closed or error
shutdown(conn_i->fd, SHUT_RDWR);
} else {
// bytes have been read into bufs, now add write to socket sqe
int bid = cqe->flags >> 16;
/*
int *data = (int *)&bufs[bid];
int *count = (int *) data;
int *id = (int *) data + 1;
printf("read cqe: bid %d, fd %d, count %d, id %d, bytes_read %d\n", bid, conn_i->fd, *count, *id, bytes_read);
*/
file_index += 1;
file_offset += bytes_read;
make_file_write_sqe_and_submit(&stream_server->ring, conn_i->fd, bid, bytes_read, file_offset, 0);
}
} else if (conn_i->type == WRITE) {
// write has been completed, first re-add the buffer
make_provide_buffers_sqe_and_submit(&stream_server->ring, conn_i->bid, socket_buffers_group_id);
// @Speed: Too late? What's the optimal way to keep receiving socket data as fast as possible?
make_socket_read_sqe_and_submit(&stream_server->ring, conn_i->fd, socket_buffers_group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
}
}
io_uring_cq_advance(&stream_server->ring, cqe_count);
stream_server->total_cqe_count += cqe_count;
}
}
int stream_server_proc(struct Stream_Server *_stream_server) {
stream_server = _stream_server;
stream_server->next_client_address_size = sizeof(stream_server->next_client_address);
struct sockaddr_in serv_addr = { 0 };
stream_server->socket_listen_fd = socket(AF_INET, SOCK_STREAM /* | SOCK_NONBLOCK */, 0);
const int val = 1;
setsockopt(stream_server->socket_listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(stream_server->port);
serv_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(stream_server->socket_listen_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Error binding socket...\n");
exit(1);
}
if (listen(stream_server->socket_listen_fd, BACKLOG) < 0) {
perror("Error listening on socket...\n");
exit(1);
}
printf("listening for connections on port: %d\n", stream_server->port);
memset(&stream_server->ring_params, 0, sizeof(stream_server->ring_params));
if (io_uring_queue_init_params(2048, &stream_server->ring, &stream_server->ring_params) < 0) {
perror("io_uring_init_failed...\n");
exit(1);
}
struct io_uring_probe *probe;
probe = io_uring_get_probe_ring(&stream_server->ring);
if (!probe || !io_uring_opcode_supported(probe, IORING_OP_PROVIDE_BUFFERS)) {
printf("Buffer select not supported, skipping...\n");
exit(0);
}
free(probe);
// first time, register buffers for buffer selection
{
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
sqe = io_uring_get_sqe(&stream_server->ring);
io_uring_prep_provide_buffers(sqe, bufs, MAX_MESSAGE_LEN, BUFFERS_COUNT, socket_buffers_group_id, 0);
io_uring_submit(&stream_server->ring);
io_uring_wait_cqe(&stream_server->ring, &cqe);
if (cqe->res < 0) {
printf("cqe->res = %d\n", cqe->res);
exit(1);
}
io_uring_cqe_seen(&stream_server->ring, cqe);
}
// add first accept SQE to monitor for new incoming connections
make_accept_sqe_and_submit(&stream_server->ring, stream_server->socket_listen_fd, (struct sockaddr *)&stream_server->next_client_address, &stream_server->next_client_address_size, 0);
main_loop_process_cqes();
}
Grüße / Regards
Jens Frederich
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2020-10-27 11:08 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2020-10-27 11:01 Question about the optimal receiving TCP streams via io_uring Frederich, Jens
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox