* [PATCH liburing 0/2] add zcrx example
@ 2025-04-09 17:58 Pavel Begunkov
2025-04-09 17:58 ` [PATCH liburing 1/2] examples/send-zerocopy: add data verification Pavel Begunkov
2025-04-09 17:58 ` [PATCH liburing 2/2] examples: add a zcrx example Pavel Begunkov
0 siblings, 2 replies; 4+ messages in thread
From: Pavel Begunkov @ 2025-04-09 17:58 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, David Wei
We need a simple example for zcrx to show case how the api works
and how to use features. Take the selftest and strip it off tx.
Anything supporting tcp can play a role of the sender, but for
data verification (-v) it needs to follow the data pattern.
Implement it in send-zerocopy -z0.
Pavel Begunkov (2):
examples/send-zerocopy: add data verification
examples: add a zcrx example
examples/Makefile | 1 +
examples/send-zerocopy.c | 50 +++++-
examples/zcrx.c | 362 +++++++++++++++++++++++++++++++++++++++
3 files changed, 410 insertions(+), 3 deletions(-)
create mode 100644 examples/zcrx.c
--
2.48.1
^ permalink raw reply [flat|nested] 4+ messages in thread
* [PATCH liburing 1/2] examples/send-zerocopy: add data verification
2025-04-09 17:58 [PATCH liburing 0/2] add zcrx example Pavel Begunkov
@ 2025-04-09 17:58 ` Pavel Begunkov
2025-04-09 17:58 ` [PATCH liburing 2/2] examples: add a zcrx example Pavel Begunkov
1 sibling, 0 replies; 4+ messages in thread
From: Pavel Begunkov @ 2025-04-09 17:58 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, David Wei
Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
---
examples/send-zerocopy.c | 50 +++++++++++++++++++++++++++++++++++++---
1 file changed, 47 insertions(+), 3 deletions(-)
diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
index a50896c6..b0f2a76a 100644
--- a/examples/send-zerocopy.c
+++ b/examples/send-zerocopy.c
@@ -58,6 +58,13 @@ struct thread_data {
int fd;
};
+enum {
+ VERIFY_DISABLED = 0,
+ VERIFY_SEQ,
+
+ __MAX_VERIFY,
+};
+
static bool cfg_reg_ringfd = true;
static bool cfg_fixed_files = 1;
static bool cfg_zc = 1;
@@ -74,8 +81,8 @@ static int cfg_type = 0;
static int cfg_payload_len;
static int cfg_port = 8000;
static int cfg_runtime_ms = 4200;
+static int cfg_verify = 0;
static bool cfg_rx_poll = false;
-
static socklen_t cfg_alen;
static char *str_addr = NULL;
@@ -198,13 +205,30 @@ static int do_poll(int fd, int events)
return ret && (pfd.revents & events);
}
+static void verify_buffer(struct thread_data *td, char *buffer, size_t size)
+{
+ size_t i;
+
+ for (i = 0; i < size; i++) {
+ char d = payload[i];
+ char e = (td->bytes + i) % 26 + 'a';
+
+ if (e != d)
+ t_error(1, -EINVAL, "data mismatch");
+ }
+}
+
/* Flush all outstanding bytes for the tcp receive queue */
static int do_flush_tcp(struct thread_data *td, int fd)
{
int ret;
+ if (cfg_verify == VERIFY_SEQ)
+ ret = recv(fd, payload_buf, sizeof(payload_buf), 0);
+ else
+ ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
+
/* MSG_TRUNC flushes up to len bytes */
- ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
if (ret == -1 && errno == EAGAIN)
return 0;
if (ret == -1)
@@ -212,6 +236,9 @@ static int do_flush_tcp(struct thread_data *td, int fd)
if (!ret)
return 1;
+ if (cfg_verify == VERIFY_SEQ)
+ verify_buffer(td, payload_buf, ret);
+
td->packets++;
td->bytes += ret;
return 0;
@@ -391,6 +418,11 @@ static void do_tx(struct thread_data *td, int domain, int type, int protocol)
unsigned buf_idx = 0;
unsigned msg_flags = MSG_WAITALL;
+ if (cfg_verify) {
+ for (i = 0; i < cfg_payload_len; i++)
+ payload[i] = 'a' + ((i + td->bytes) % 26);
+ }
+
for (i = 0; i < cfg_nr_reqs; i++) {
sqe = io_uring_get_sqe(&ring);
@@ -515,7 +547,7 @@ static void parse_opts(int argc, char **argv)
cfg_payload_len = max_payload_len;
- while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:Ry")) != -1) {
+ while ((c = getopt(argc, argv, "46v:D:p:s:t:n:z:b:l:dC:T:Ry")) != -1) {
switch (c) {
case '4':
if (cfg_family != PF_UNSPEC)
@@ -570,9 +602,21 @@ static void parse_opts(int argc, char **argv)
case 'y':
cfg_rx_poll = 1;
break;
+ case 'v':
+ cfg_verify = strtoul(optarg, NULL, 0);
+ break;
}
}
+ if (cfg_verify >= __MAX_VERIFY)
+ t_error(1, 0, "unsupported data verification type");
+ if (cfg_verify) {
+ if (!cfg_rx && cfg_zc)
+ t_error(1, 0, "verification doesn't work with sendzc");
+ if (cfg_verify == VERIFY_SEQ && cfg_nr_reqs > 1 && !cfg_zc)
+ t_error(1, 0, "sequence verification invalid inflight number");
+ }
+
if (cfg_nr_reqs > MAX_SUBMIT_NR)
t_error(1, 0, "-n: submit batch nr exceeds max (%d)", MAX_SUBMIT_NR);
if (cfg_payload_len > max_payload_len)
--
2.48.1
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH liburing 2/2] examples: add a zcrx example
2025-04-09 17:58 [PATCH liburing 0/2] add zcrx example Pavel Begunkov
2025-04-09 17:58 ` [PATCH liburing 1/2] examples/send-zerocopy: add data verification Pavel Begunkov
@ 2025-04-09 17:58 ` Pavel Begunkov
2025-04-09 23:00 ` Pavel Begunkov
1 sibling, 1 reply; 4+ messages in thread
From: Pavel Begunkov @ 2025-04-09 17:58 UTC (permalink / raw)
To: io-uring; +Cc: asml.silence, David Wei
Copy-pasted from selftests/.../iou_zcrx.c by David.
Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
---
examples/Makefile | 1 +
examples/zcrx.c | 362 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 363 insertions(+)
create mode 100644 examples/zcrx.c
diff --git a/examples/Makefile b/examples/Makefile
index 7b740ca9..47bdfbf3 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -32,6 +32,7 @@ example_srcs := \
send-zerocopy.c \
rsrc-update-bench.c \
proxy.c \
+ zcrx.c \
kdigest.c
all_targets :=
diff --git a/examples/zcrx.c b/examples/zcrx.c
new file mode 100644
index 00000000..2ac8af52
--- /dev/null
+++ b/examples/zcrx.c
@@ -0,0 +1,362 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <assert.h>
+#include <errno.h>
+#include <error.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdarg.h>
+
+#include <arpa/inet.h>
+#include <linux/errqueue.h>
+#include <linux/if_packet.h>
+#include <linux/ipv6.h>
+#include <linux/socket.h>
+#include <linux/sockios.h>
+#include <net/ethernet.h>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <sys/epoll.h>
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/resource.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <sys/wait.h>
+
+#include "liburing.h"
+
+static void t_error(int status, int errnum, const char *format, ...)
+{
+ va_list args;
+ va_start(args, format);
+
+ vfprintf(stderr, format, args);
+ if (errnum)
+ fprintf(stderr, ": %s", strerror(errnum));
+
+ fprintf(stderr, "\n");
+ va_end(args);
+ exit(status);
+}
+
+#define PAGE_SIZE (4096)
+#define AREA_SIZE (8192 * PAGE_SIZE)
+#define SEND_SIZE (512 * 4096)
+#define min(a, b) \
+ ({ \
+ typeof(a) _a = (a); \
+ typeof(b) _b = (b); \
+ _a < _b ? _a : _b; \
+ })
+#define min_t(t, a, b) \
+ ({ \
+ t _ta = (a); \
+ t _tb = (b); \
+ min(_ta, _tb); \
+ })
+
+#define ALIGN_UP(v, align) (((v) + (align) - 1) & ~((align) - 1))
+
+static int cfg_port = 8000;
+static const char *cfg_ifname;
+static int cfg_queue_id = -1;
+static bool cfg_oneshot;
+static int cfg_oneshot_recvs;
+static bool cfg_verify_data = false;
+static struct sockaddr_in6 cfg_addr;
+
+static void *area_ptr;
+static void *ring_ptr;
+static size_t ring_size;
+static struct io_uring_zcrx_rq rq_ring;
+static unsigned long area_token;
+static int connfd;
+static bool stop;
+static size_t received;
+
+static inline size_t get_refill_ring_size(unsigned int rq_entries)
+{
+ ring_size = rq_entries * sizeof(struct io_uring_zcrx_rqe);
+ /* add space for the header (head/tail/etc.) */
+ ring_size += PAGE_SIZE;
+ return ALIGN_UP(ring_size, 4096);
+}
+
+static void setup_zcrx(struct io_uring *ring)
+{
+ unsigned int ifindex;
+ unsigned int rq_entries = 4096;
+ int ret;
+
+ ifindex = if_nametoindex(cfg_ifname);
+ if (!ifindex)
+ t_error(1, 0, "bad interface name: %s", cfg_ifname);
+
+ area_ptr = mmap(NULL,
+ AREA_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_ANONYMOUS | MAP_PRIVATE,
+ 0,
+ 0);
+ if (area_ptr == MAP_FAILED)
+ t_error(1, 0, "mmap(): zero copy area");
+
+ ring_size = get_refill_ring_size(rq_entries);
+ ring_ptr = mmap(NULL,
+ ring_size,
+ PROT_READ | PROT_WRITE,
+ MAP_ANONYMOUS | MAP_PRIVATE,
+ 0,
+ 0);
+
+ struct io_uring_region_desc region_reg = {
+ .size = ring_size,
+ .user_addr = (__u64)(unsigned long)ring_ptr,
+ .flags = IORING_MEM_REGION_TYPE_USER,
+ };
+
+ struct io_uring_zcrx_area_reg area_reg = {
+ .addr = (__u64)(unsigned long)area_ptr,
+ .len = AREA_SIZE,
+ .flags = 0,
+ };
+
+ struct io_uring_zcrx_ifq_reg reg = {
+ .if_idx = ifindex,
+ .if_rxq = cfg_queue_id,
+ .rq_entries = rq_entries,
+ .area_ptr = (__u64)(unsigned long)&area_reg,
+ .region_ptr = (__u64)(unsigned long)®ion_reg,
+ };
+
+ ret = io_uring_register_ifq(ring, ®);
+ if (ret)
+ t_error(1, 0, "io_uring_register_ifq(): %d", ret);
+
+ rq_ring.khead = (unsigned int *)((char *)ring_ptr + reg.offsets.head);
+ rq_ring.ktail = (unsigned int *)((char *)ring_ptr + reg.offsets.tail);
+ rq_ring.rqes = (struct io_uring_zcrx_rqe *)((char *)ring_ptr + reg.offsets.rqes);
+ rq_ring.rq_tail = 0;
+ rq_ring.ring_entries = reg.rq_entries;
+
+ area_token = area_reg.rq_area_token;
+}
+
+static void add_accept(struct io_uring *ring, int sockfd)
+{
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(ring);
+
+ io_uring_prep_accept(sqe, sockfd, NULL, NULL, 0);
+ sqe->user_data = 1;
+}
+
+static void add_recvzc(struct io_uring *ring, int sockfd)
+{
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(ring);
+
+ io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, sockfd, NULL, 0, 0);
+ sqe->ioprio |= IORING_RECV_MULTISHOT;
+ sqe->user_data = 2;
+}
+
+static void add_recvzc_oneshot(struct io_uring *ring, int sockfd, size_t len)
+{
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(ring);
+
+ io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, sockfd, NULL, len, 0);
+ sqe->ioprio |= IORING_RECV_MULTISHOT;
+ sqe->user_data = 2;
+}
+
+static void process_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
+{
+ if (cqe->res < 0)
+ t_error(1, 0, "accept()");
+ if (connfd)
+ t_error(1, 0, "Unexpected second connection");
+
+ connfd = cqe->res;
+ if (cfg_oneshot)
+ add_recvzc_oneshot(ring, connfd, PAGE_SIZE);
+ else
+ add_recvzc(ring, connfd);
+}
+
+static void verify_data(char *data, size_t size, unsigned long seq)
+{
+ int i;
+
+ if (!cfg_verify_data)
+ return;
+
+ for (i = 0; i < size; i++) {
+ char expected = 'a' + (received + i) % 26;
+
+ if (data[i] != expected)
+ t_error(1, 0, "payload mismatch at %i", i);
+ }
+}
+
+static void process_recvzc(struct io_uring *ring, struct io_uring_cqe *cqe)
+{
+ unsigned rq_mask = rq_ring.ring_entries - 1;
+ struct io_uring_zcrx_cqe *rcqe;
+ struct io_uring_zcrx_rqe *rqe;
+ uint64_t mask;
+ char *data;
+
+ if (cqe->res == 0 && cqe->flags == 0 && cfg_oneshot_recvs == 0) {
+ stop = true;
+ return;
+ }
+
+ if (cqe->res < 0)
+ t_error(1, 0, "recvzc(): %d", cqe->res);
+
+ if (cfg_oneshot) {
+ if (cqe->res == 0 && cqe->flags == 0 && cfg_oneshot_recvs) {
+ add_recvzc_oneshot(ring, connfd, PAGE_SIZE);
+ cfg_oneshot_recvs--;
+ }
+ } else if (!(cqe->flags & IORING_CQE_F_MORE)) {
+ add_recvzc(ring, connfd);
+ }
+
+ rcqe = (struct io_uring_zcrx_cqe *)(cqe + 1);
+ mask = (1ULL << IORING_ZCRX_AREA_SHIFT) - 1;
+ data = (char *)area_ptr + (rcqe->off & mask);
+
+ verify_data(data, cqe->res, received);
+ received += cqe->res;
+
+ /* processed, return back to the kernel */
+ rqe = &rq_ring.rqes[rq_ring.rq_tail & rq_mask];
+ rqe->off = (rcqe->off & ~IORING_ZCRX_AREA_MASK) | area_token;
+ rqe->len = cqe->res;
+ io_uring_smp_store_release(rq_ring.ktail, ++rq_ring.rq_tail);
+}
+
+static void server_loop(struct io_uring *ring)
+{
+ struct io_uring_cqe *cqe;
+ unsigned int head, count = 0;
+
+ io_uring_submit_and_wait(ring, 1);
+
+ io_uring_for_each_cqe(ring, head, cqe) {
+ if (cqe->user_data == 1)
+ process_accept(ring, cqe);
+ else if (cqe->user_data == 2)
+ process_recvzc(ring, cqe);
+ else
+ t_error(1, 0, "unknown cqe");
+ count++;
+ }
+ io_uring_cq_advance(ring, count);
+}
+
+static void run_server(void)
+{
+ unsigned int flags = 0;
+ struct io_uring ring;
+ int fd, enable, ret;
+
+ fd = socket(AF_INET6, SOCK_STREAM, 0);
+ if (fd == -1)
+ t_error(1, 0, "socket()");
+
+ enable = 1;
+ ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
+ if (ret < 0)
+ t_error(1, 0, "setsockopt(SO_REUSEADDR)");
+
+ ret = bind(fd, (struct sockaddr *)&cfg_addr, sizeof(cfg_addr));
+ if (ret < 0)
+ t_error(1, 0, "bind()");
+
+ if (listen(fd, 1024) < 0)
+ t_error(1, 0, "listen()");
+
+ flags |= IORING_SETUP_COOP_TASKRUN;
+ flags |= IORING_SETUP_SINGLE_ISSUER;
+ flags |= IORING_SETUP_DEFER_TASKRUN;
+ flags |= IORING_SETUP_SUBMIT_ALL;
+ flags |= IORING_SETUP_CQE32;
+
+ ret = io_uring_queue_init(512, &ring, flags);
+ if (ret)
+ t_error(1, ret, "ring init failed");
+
+ setup_zcrx(&ring);
+ add_accept(&ring, fd);
+
+ while (!stop)
+ server_loop(&ring);
+}
+
+static void usage(const char *filepath)
+{
+ t_error(1, 0, "Usage: %s (-4|-6) -p<port> -i<ifname> -q<rxq_id>", filepath);
+}
+
+static void parse_opts(int argc, char **argv)
+{
+ struct sockaddr_in6 *addr6 = (void *) &cfg_addr;
+ int c;
+
+ if (argc <= 1)
+ usage(argv[0]);
+
+ while ((c = getopt(argc, argv, "vp:i:q:o:")) != -1) {
+ switch (c) {
+ case 'p':
+ cfg_port = strtoul(optarg, NULL, 0);
+ break;
+ case 'i':
+ cfg_ifname = optarg;
+ break;
+ case 'o': {
+ cfg_oneshot = true;
+ cfg_oneshot_recvs = strtoul(optarg, NULL, 0);
+ break;
+ }
+ case 'q':
+ cfg_queue_id = strtoul(optarg, NULL, 0);
+ break;
+ }
+ case 'v':
+ cfg_verify_data = true;
+ break;
+ }
+
+ memset(addr6, 0, sizeof(*addr6));
+ addr6->sin6_family = AF_INET6;
+ addr6->sin6_port = htons(cfg_port);
+ addr6->sin6_addr = in6addr_any;
+}
+
+int main(int argc, char **argv)
+{
+ parse_opts(argc, argv);
+ run_server();
+ return 0;
+}
--
2.48.1
^ permalink raw reply related [flat|nested] 4+ messages in thread
* Re: [PATCH liburing 2/2] examples: add a zcrx example
2025-04-09 17:58 ` [PATCH liburing 2/2] examples: add a zcrx example Pavel Begunkov
@ 2025-04-09 23:00 ` Pavel Begunkov
0 siblings, 0 replies; 4+ messages in thread
From: Pavel Begunkov @ 2025-04-09 23:00 UTC (permalink / raw)
To: io-uring; +Cc: David Wei
On 4/9/25 18:58, Pavel Begunkov wrote:
...
> +static void parse_opts(int argc, char **argv)
> +{
> + struct sockaddr_in6 *addr6 = (void *) &cfg_addr;
> + int c;
> +
> + if (argc <= 1)
> + usage(argv[0]);
> +
> + while ((c = getopt(argc, argv, "vp:i:q:o:")) != -1) {
> + switch (c) {
> + case 'p':
> + cfg_port = strtoul(optarg, NULL, 0);
> + break;
> + case 'i':
> + cfg_ifname = optarg;
> + break;
> + case 'o': {
> + cfg_oneshot = true;
> + cfg_oneshot_recvs = strtoul(optarg, NULL, 0);
> + break;
> + }
> + case 'q':
> + cfg_queue_id = strtoul(optarg, NULL, 0);
> + break;
> + }
> + case 'v':
> + cfg_verify_data = true;
> + break;
Sth went wrong here, I'll need to resend.
--
Pavel Begunkov
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2025-04-09 22:59 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2025-04-09 17:58 [PATCH liburing 0/2] add zcrx example Pavel Begunkov
2025-04-09 17:58 ` [PATCH liburing 1/2] examples/send-zerocopy: add data verification Pavel Begunkov
2025-04-09 17:58 ` [PATCH liburing 2/2] examples: add a zcrx example Pavel Begunkov
2025-04-09 23:00 ` Pavel Begunkov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox