public inbox for [email protected]
 help / color / mirror / Atom feed
* [RFC] liburing: add an example for a TCP/UDP ZC RX server.
@ 2022-10-07 21:19 Jonathan Lemon
  0 siblings, 0 replies; only message in thread
From: Jonathan Lemon @ 2022-10-07 21:19 UTC (permalink / raw)
  To: io-uring

This is a WIP sample application that demonstrates the use of 
the API in the pevious RFC series.  This is for demonstration
purposes only!

copy io_uring-udp.c and make it handle TCP.

Wire up a simple example of the recvzc opcode.

Signed-off-by: Jonathan Lemon <[email protected]>
---
 examples/Makefile       |   3 +-
 examples/io_uring-net.c | 835 ++++++++++++++++++++++++++++++++++++++++
 src/include/liburing.h  |   3 +
 src/register.c          |  29 ++
 4 files changed, 869 insertions(+), 1 deletion(-)
 create mode 100644 examples/io_uring-net.c

diff --git a/examples/Makefile b/examples/Makefile
index e561e05..c279acf 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -16,7 +16,8 @@ example_srcs := \
 	io_uring-udp.c \
 	link-cp.c \
 	poll-bench.c \
-	send-zerocopy.c
+	send-zerocopy.c \
+	io_uring-net.c
 
 all_targets :=
 
diff --git a/examples/io_uring-net.c b/examples/io_uring-net.c
new file mode 100644
index 0000000..c569437
--- /dev/null
+++ b/examples/io_uring-net.c
@@ -0,0 +1,835 @@
+/* SPDX-License-Identifier: MIT */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/mman.h>
+#include <stdlib.h>
+#include <string.h>
+#include <netinet/udp.h>
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <error.h>
+
+#include "liburing.h"
+
+#if 0
+/* XXX temp development hack */
+struct io_uring_zctap_iov {
+	__u32	off;
+	__u32	len;
+	__u16	bgid;
+	__u16	bid;
+	__u16	ifq_id;
+	__u16	resv;
+};
+
+#define IORING_OP_PROVIDE_IFQ_REGION	(IORING_OP_URING_CMD + 2)
+#define IORING_OP_RECV_ZC		(IORING_OP_URING_CMD + 3)
+#endif
+
+#define FRAME_REGION_SIZE	(8192 * 4096)
+#define FILL_QUEUE_ENTRIES	4096
+#define COPY_QUEUE_ENTRIES	256
+#define COPY_BUF_SIZE		4096
+
+#define QD 64
+#define BUF_SHIFT 12 /* 4k */
+#define CQES (QD * 16)
+#define BUFFERS CQES
+#define CONTROLLEN 0
+
+struct sendmsg_ctx {
+	struct msghdr msg;
+	struct iovec iov;
+};
+
+struct ctx {
+	struct io_uring ring;
+	struct io_uring_buf_ring *buf_ring;
+	struct io_uring_buf_ring *fillq;
+	struct io_uring_buf_ring *copyq;
+	unsigned char *buffer_base;
+	unsigned char *copy_base;
+	unsigned char *frame_base;
+	struct msghdr msg;
+	int buf_shift;
+	int af;
+	int queue_id;
+	int ifq_id;
+	bool verbose;
+	bool udp;
+	char *ifname;
+	struct sendmsg_ctx send[BUFFERS];
+	size_t buf_ring_size;
+	size_t fillq_size;
+	size_t copyq_size;
+};
+
+static size_t buffer_size(struct ctx *ctx)
+{
+	return 1U << ctx->buf_shift;
+}
+
+static unsigned char *get_buffer(struct ctx *ctx, int idx)
+{
+	return ctx->buffer_base + (idx << ctx->buf_shift);
+}
+
+/* buffer pool for metadata, etc.  BGID 0. */
+static int setup_buffer_pool(struct ctx *ctx)
+{
+	int ret, i;
+	void *mapped;
+	struct io_uring_buf_reg reg;
+
+	/* maps:
+	 *	BUFFER x (struct io_ring_buf)
+	 *	BUFFER x (buffer_size) (4K, by default)
+	 * buffer_ring is first part.
+	 * buffer_base is second part.
+	 *
+	 * register_buf_ring() registers the ring
+	 *
+	 * buffers are then provided:
+	 *   io_uring_buf_ring_add(ring, addr, len, buf_id)
+	 * io_uring_buf_ring_advance(ctx->buf_ring, BUFFERS);
+	 */
+
+	ctx->buf_ring_size = (sizeof(struct io_uring_buf) + buffer_size(ctx)) * BUFFERS;
+	mapped = mmap(NULL, ctx->buf_ring_size, PROT_READ | PROT_WRITE,
+		      MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
+	if (mapped == MAP_FAILED) {
+		fprintf(stderr, "buf_ring mmap: %s\n", strerror(errno));
+		return -1;
+	}
+	ctx->buf_ring = (struct io_uring_buf_ring *)mapped;
+
+	io_uring_buf_ring_init(ctx->buf_ring);
+
+	reg = (struct io_uring_buf_reg) {
+		.ring_addr = (unsigned long)ctx->buf_ring,
+		.ring_entries = BUFFERS,
+		.bgid = 0
+	};
+	ctx->buffer_base = (unsigned char *)ctx->buf_ring +
+			   sizeof(struct io_uring_buf) * BUFFERS;
+	printf("metadata base region: %p, group %d\n", ctx->buffer_base, 0);
+
+	ret = io_uring_register_buf_ring(&ctx->ring, &reg, 0);
+	if (ret) {
+		fprintf(stderr, "buf_ring init failed: %s\n"
+				"NB This requires a kernel version >= 6.0\n",
+				strerror(-ret));
+		return ret;
+	}
+
+	for (i = 0; i < BUFFERS; i++) {
+		io_uring_buf_ring_add(ctx->buf_ring, get_buffer(ctx, i), buffer_size(ctx), i,
+				      io_uring_buf_ring_mask(BUFFERS), i);
+	}
+	io_uring_buf_ring_advance(ctx->buf_ring, BUFFERS);
+
+	return 0;
+}
+
+/* fill queue used for returning packet store buffers. BGID 1 */
+static int setup_fill_queue(struct ctx *ctx)
+{
+	struct io_uring_buf_reg reg;
+	void *area;
+	int ret;
+
+	ctx->fillq_size = sizeof(struct io_uring_buf) * FILL_QUEUE_ENTRIES;
+	area = mmap(NULL, ctx->fillq_size, PROT_READ | PROT_WRITE,
+		    MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
+	if (area == MAP_FAILED)
+		error(1, errno, "fill queue mmap");
+
+	ctx->fillq = (struct io_uring_buf_ring *)area;
+
+	io_uring_buf_ring_init(ctx->fillq);
+
+	reg = (struct io_uring_buf_reg) {
+		.ring_addr = (unsigned long)ctx->fillq,
+		.ring_entries = FILL_QUEUE_ENTRIES,
+		.bgid = 1,
+	};
+
+	/* flags is unused */
+	ret = io_uring_register_buf_ring(&ctx->ring, &reg, 0);
+	if (ret) {
+		error(0, -ret, "fillq register failed");
+		fprintf(stderr, "NB This requires a kernel version >= 6.0\n");
+		exit(1);
+	}
+
+	return 0;
+}
+
+/* copy pool for system pages, BGID 2 */
+static int setup_copy_pool(struct ctx *ctx)
+{
+	struct io_uring_buf_reg reg;
+	void *area;
+	int i, ret;
+
+	ctx->copyq_size = (sizeof(struct io_uring_buf) + COPY_BUF_SIZE) *
+			  COPY_QUEUE_ENTRIES;
+	area = mmap(NULL, ctx->copyq_size, PROT_READ | PROT_WRITE,
+		    MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
+	if (area == MAP_FAILED)
+		error(1, errno, "coyp queue mmap");
+
+	ctx->copyq = (struct io_uring_buf_ring *)area;
+
+	io_uring_buf_ring_init(ctx->copyq);
+
+	reg = (struct io_uring_buf_reg) {
+		.ring_addr = (unsigned long)ctx->copyq,
+		.ring_entries = COPY_QUEUE_ENTRIES,
+		.bgid = 2,
+	};
+
+	/* flags is unused */
+	ret = io_uring_register_buf_ring(&ctx->ring, &reg, 0);
+	if (ret)
+		error(1, -ret, "copyq ring register failed");
+
+	area += sizeof(struct io_uring_buf) * COPY_QUEUE_ENTRIES;
+	ctx->copy_base = area;
+	printf("copy base region: %p, group %d\n", area, 2);
+
+	for (i = 0; i < COPY_QUEUE_ENTRIES; i++) {
+		io_uring_buf_ring_add(ctx->copyq, area + i * COPY_BUF_SIZE,
+			COPY_BUF_SIZE, i,
+			io_uring_buf_ring_mask(COPY_QUEUE_ENTRIES), i);
+	}
+	io_uring_buf_ring_advance(ctx->copyq, COPY_QUEUE_ENTRIES);
+
+	return 0;
+}
+
+static int setup_context(struct ctx *ctx)
+{
+	struct io_uring_params params;
+	int ret;
+
+	memset(&params, 0, sizeof(params));
+	params.cq_entries = QD * 8;
+	params.flags = IORING_SETUP_SUBMIT_ALL | IORING_SETUP_COOP_TASKRUN |
+		       IORING_SETUP_CQSIZE;
+
+	ret = io_uring_queue_init_params(QD, &ctx->ring, &params);
+	if (ret < 0) {
+		fprintf(stderr, "queue_init failed: %s\n"
+				"NB: This requires a kernel version >= 6.0\n",
+				strerror(-ret));
+		return ret;
+	}
+
+	ret = setup_buffer_pool(ctx);
+	if (ret)
+		io_uring_queue_exit(&ctx->ring);
+
+	memset(&ctx->msg, 0, sizeof(ctx->msg));
+	ctx->msg.msg_namelen = sizeof(struct sockaddr_storage);
+	ctx->msg.msg_controllen = CONTROLLEN;
+	return ret;
+}
+
+static int setup_sock(struct ctx *ctx, int port)
+{
+	int ret;
+	int fd;
+	uint16_t nport = port <= 0 ? 0 : htons(port);
+	int one = 1;
+	int flags = 0; /* SOCK_NONBLOCK */
+
+	if (ctx->udp)
+		fd = socket(ctx->af, SOCK_DGRAM | flags, IPPROTO_UDP);
+	else
+		fd = socket(ctx->af, SOCK_STREAM | flags, IPPROTO_TCP);
+	if (fd < 0) {
+		fprintf(stderr, "sock_init: %s\n", strerror(errno));
+		return -1;
+	}
+
+	ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+	if (ret) {
+		fprintf(stderr, "setsockopt: %s\n", strerror(errno));
+		close(fd);
+		return -1;
+	}
+
+	if (ctx->af == AF_INET6) {
+		struct sockaddr_in6 addr6 = {
+			.sin6_family = ctx->af,
+			.sin6_port = nport,
+			.sin6_addr = IN6ADDR_ANY_INIT
+		};
+
+		ret = bind(fd, (struct sockaddr *) &addr6, sizeof(addr6));
+	} else {
+		struct sockaddr_in addr = {
+			.sin_family = ctx->af,
+			.sin_port = nport,
+			.sin_addr = { INADDR_ANY }
+		};
+
+		ret = bind(fd, (struct sockaddr *) &addr, sizeof(addr));
+	}
+
+	if (ret) {
+		fprintf(stderr, "sock_bind: %s\n", strerror(errno));
+		close(fd);
+		return -1;
+	}
+
+	if (port <= 0) {
+		int port;
+		struct sockaddr_storage s;
+		socklen_t sz = sizeof(s);
+
+		if (getsockname(fd, (struct sockaddr *)&s, &sz)) {
+			fprintf(stderr, "getsockname failed\n");
+			close(fd);
+			return -1;
+		}
+
+		port = ntohs(((struct sockaddr_in *)&s)->sin_port);
+		fprintf(stderr, "port bound to %d\n", port);
+	}
+
+	if (!ctx->udp) {
+		ret = listen(fd, 1);
+		if (ret) {
+			fprintf(stderr, "listen: %s\n", strerror(errno));
+			close(fd);
+			return -1;
+		}
+	}
+
+	return fd;
+}
+
+static void cleanup_context(struct ctx *ctx)
+{
+	munmap(ctx->buf_ring, ctx->buf_ring_size);
+	io_uring_queue_exit(&ctx->ring);
+}
+
+static bool get_sqe(struct ctx *ctx, struct io_uring_sqe **sqe)
+{
+	*sqe = io_uring_get_sqe(&ctx->ring);
+
+	if (!*sqe) {
+		io_uring_submit(&ctx->ring);
+		*sqe = io_uring_get_sqe(&ctx->ring);
+	}
+	if (!*sqe) {
+		fprintf(stderr, "cannot get sqe\n");
+		return true;
+	}
+	return false;
+}
+
+static int wait_accept(struct ctx *ctx, int fd, int *clientfd)
+{
+	struct io_uring_sqe *sqe;
+	struct io_uring_cqe *cqe;
+	int ret;
+
+	if (get_sqe(ctx, &sqe))
+		return -1;
+
+	io_uring_prep_accept(sqe, fd, NULL, NULL, 0);
+	ret = io_uring_submit(&ctx->ring);
+	if (ret == -1) {
+		fprintf(stderr, "cannot submit accept\n");
+		return true;
+	}
+
+	ret = io_uring_wait_cqe(&ctx->ring, &cqe);
+	if (ret) {
+		fprintf(stderr, "accept wait_cqe\n");
+		return true;
+	}
+	*clientfd = cqe->res;
+	io_uring_cqe_seen(&ctx->ring, cqe);
+
+	return false;
+}
+
+/* adds one SQE for RECVMSG, as multishot */
+static int add_recv(struct ctx *ctx, int idx)
+{
+	struct io_uring_sqe *sqe;
+
+	if (get_sqe(ctx, &sqe))
+		return -1;
+
+	io_uring_prep_recvmsg_multishot(sqe, idx, &ctx->msg, MSG_TRUNC);
+	sqe->flags |= IOSQE_FIXED_FILE;
+
+	sqe->flags |= IOSQE_BUFFER_SELECT;
+	sqe->buf_group = 1;
+	io_uring_sqe_set_data64(sqe, BUFFERS + 1);
+	return 0;
+}
+
+static void recycle_buffer(struct ctx *ctx, int idx)
+{
+	io_uring_buf_ring_add(ctx->buf_ring, get_buffer(ctx, idx), buffer_size(ctx), idx,
+			      io_uring_buf_ring_mask(BUFFERS), 0);
+	io_uring_buf_ring_advance(ctx->buf_ring, 1);
+}
+
+static int process_cqe_send(struct ctx *ctx, struct io_uring_cqe *cqe)
+{
+	int idx = cqe->user_data;
+
+	if (cqe->res < 0)
+		fprintf(stderr, "bad send %s\n", strerror(-cqe->res));
+	recycle_buffer(ctx, idx);
+	return 0;
+}
+
+static void
+hex_dump(void *data, size_t length, uint64_t addr)
+{
+	const unsigned char *address = data;
+	const unsigned char *line = address;
+	size_t line_size = 16;
+	unsigned char c;
+	char buf[32];
+	int i = 0;
+
+	sprintf(buf, "addr=0x%lx", addr);
+	printf("length = %zu\n", length);
+	printf("%s | ", buf);
+	while (length-- > 0) {
+		printf("%02X ", *address++);
+		if (!(++i % line_size) || (length == 0 && i % line_size)) {
+			if (length == 0) {
+				while (i++ % line_size)
+					printf("__ ");
+			}
+			printf(" | ");	/* right close */
+			while (line < address) {
+				c = *line++;
+				printf("%c", (c < 33 || c == 255) ? 0x2E : c);
+			}
+			printf("\n");
+			if (length > 0)
+				printf("%s | ", buf);
+		}
+	}
+	printf("\n");
+}
+
+static int process_cqe_recv(struct ctx *ctx, struct io_uring_cqe *cqe,
+			    int fdidx)
+{
+	int ret, idx;
+	struct io_uring_recvmsg_out *o;
+	struct io_uring_sqe *sqe;
+
+	if (!(cqe->flags & IORING_CQE_F_MORE)) {
+		ret = add_recv(ctx, fdidx);
+		if (ret)
+			return ret;
+	}
+
+	if (cqe->res == -ENOBUFS)
+		return 0;
+
+	if (!(cqe->flags & IORING_CQE_F_BUFFER) || cqe->res < 0) {
+		fprintf(stderr, "recv cqe bad res %d\n", cqe->res);
+		if (cqe->res == -EFAULT || cqe->res == -EINVAL)
+			fprintf(stderr,
+				"NB: This requires a kernel version >= 6.0\n");
+		return -1;
+	}
+	idx = cqe->flags >> 16;
+
+	/* at the moment, 'res' returned here is # of bytes read */
+	printf("user_data: %llx\n", cqe->user_data);
+	printf("res (error/buflen): %d\n", cqe->res);
+	printf("flags: %x\n", cqe->flags);
+
+	{
+		struct io_uring_zctap_iov *zov;
+		void *base, *addr;
+		int count, total;
+		int i;
+
+		/* cqe flags contains the buffer index */
+		addr = get_buffer(ctx, cqe->flags >> 16);
+
+		count = cqe->res / sizeof(*zov);
+		total = 0;
+		zov = addr;
+		for (i = 0; i < count; i++)
+			total += zov[i].len;
+
+		printf("Buffer address: %p\n", addr);
+		printf("data length: %d, vectors:%d\n", total, count);
+		hex_dump(addr, cqe->res, 0);
+
+		for (i = 0; i < count; i++) {
+			printf("%d: q:%d g:%d id:%d off:%d len:%d\n",
+				i, zov[i].ifq_id, zov[i].bgid, zov[i].bid,
+				zov[i].off, zov[i].len);
+			if (zov->bgid == 2) {
+				base = ctx->copy_base;
+				addr = base + zov[i].bid * COPY_BUF_SIZE;
+			} else {
+				/* should be frame area, PAGE_SIZE */
+				base = ctx->copy_base;
+				addr = base + zov[i].bid * PAGE_SIZE;
+			}
+			printf("ADDR: %p\n", addr + zov[i].off);
+			hex_dump(addr + zov[i].off, zov[i].len, i);
+		}
+	}
+
+
+	printf("exiting!\n");
+	exit(1);
+
+
+#if 0
+	o = io_uring_recvmsg_validate(get_buffer(ctx, cqe->flags >> 16),
+				      cqe->res, &ctx->msg);
+	if (!o) {
+		fprintf(stderr, "bad recvmsg\n");
+		return -1;
+	}
+	if (o->namelen > ctx->msg.msg_namelen) {
+		fprintf(stderr, "truncated name\n");
+		recycle_buffer(ctx, idx);
+		return 0;
+	}
+	if (o->flags & MSG_TRUNC) {
+		unsigned int r;
+
+		r = io_uring_recvmsg_payload_length(o, cqe->res, &ctx->msg);
+		fprintf(stderr, "truncated msg need %u received %u\n",
+				o->payloadlen, r);
+		recycle_buffer(ctx, idx);
+		return 0;
+	}
+
+	if (io_uring_recvmsg_payload_length(o, cqe->res, &ctx->msg) == 0) {
+		fprintf(stderr, "0 byte recv, assuming EOF.\n");
+		return -1;
+	}
+
+	if (ctx->verbose) {
+		char buff[INET6_ADDRSTRLEN + 1];
+		const char *name;
+		struct sockaddr_in *addr = io_uring_recvmsg_name(o);
+
+		name = inet_ntop(ctx->af, addr, buff, sizeof(buff));
+		if (!name)
+			name = "<INVALID>";
+		fprintf(stderr, "received %u bytes %d from %s:%d\n",
+			io_uring_recvmsg_payload_length(o, cqe->res, &ctx->msg),
+			o->namelen, name, (int)ntohs(addr->sin_port));
+	}
+
+	if (get_sqe(ctx, &sqe))
+		return -1;
+
+	ctx->send[idx].iov = (struct iovec) {
+		.iov_base = io_uring_recvmsg_payload(o, &ctx->msg),
+		.iov_len =
+			io_uring_recvmsg_payload_length(o, cqe->res, &ctx->msg)
+	};
+	ctx->send[idx].msg = (struct msghdr) {
+		.msg_namelen = o->namelen,
+		.msg_name = io_uring_recvmsg_name(o),
+		.msg_control = NULL,
+		.msg_controllen = 0,
+		.msg_iov = &ctx->send[idx].iov,
+		.msg_iovlen = 1
+	};
+
+	io_uring_prep_sendmsg(sqe, fdidx, &ctx->send[idx].msg, 0);
+	io_uring_sqe_set_data64(sqe, idx);
+	sqe->flags |= IOSQE_FIXED_FILE;
+#endif
+
+	return 0;
+}
+
+static int process_cqe(struct ctx *ctx, struct io_uring_cqe *cqe, int fdidx)
+{
+	if (cqe->user_data < BUFFERS)
+		return process_cqe_send(ctx, cqe);
+	else
+		return process_cqe_recv(ctx, cqe, fdidx);
+}
+
+int
+io_zctap_ifq(struct ctx *ctx)
+{
+	__u16 qid, ifq_id;
+	int ifindex, bgid;
+	int ret;
+
+	/* API for register_ifq:
+	 *  ifindex	- network device index
+	 *  qid		- desired/targeted qid
+	 *  ifq_id	- GLOBAL io_uring ifq number.
+	 *  bgid	- fill queue id
+	 */
+
+	bgid = 1;
+	qid = ctx->queue_id;
+
+	ifindex = if_nametoindex(ctx->ifname);
+	if (!ifindex) {
+		fprintf(stderr, "Interface %s does not exist\n", ctx->ifname);
+		return -1;
+	}
+	ret = io_uring_register_ifq(&ctx->ring, ifindex, &qid, &ifq_id, bgid);
+
+	if (ret) {
+		fprintf(stderr, "register_ifq failed: %s\n", strerror(-ret));
+		return -1;
+	}
+	fprintf(stderr, "registered ifq:%d, qid:%d  r=%d\n", ifq_id, qid, ret);
+	ctx->queue_id = qid;
+	ctx->ifq_id = ifq_id;
+	return ret;
+}
+
+static void
+io_complete_sqe(struct io_uring *ring, struct io_uring_sqe *sqe,
+		const char *what)
+{
+	struct io_uring_cqe *cqe;
+	int ret;
+
+	ret = io_uring_submit(ring);
+	if (ret < 0)
+		error(1, -ret, "submit failed");
+
+	ret = io_uring_wait_cqe(ring, &cqe);
+	if (ret)
+		error(1, -ret, "wait_cqe failed");
+
+	if (cqe->res < 0)
+		error(1, -cqe->res, "Bad SQE '%s'", what);
+
+	io_uring_cqe_seen(ring, cqe);
+}
+
+/* submits a SQE for provide region */
+int io_provide_region(struct ctx *ctx)
+{
+	struct io_uring_sqe *sqe;
+	struct iovec iov;
+	void *area;
+	int ret;
+
+	area = mmap(NULL, FRAME_REGION_SIZE, PROT_READ | PROT_WRITE,
+		      MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
+	if (area == MAP_FAILED)
+		error(1, errno, "frame_region mmap");
+
+	/* register (mmap) this buffer area with the kernel */
+	printf("frame base region: %p, group %d\n", area, 0);
+	ctx->frame_base = area;
+
+	iov.iov_base = area;
+	iov.iov_len = FRAME_REGION_SIZE;
+	ret = io_uring_register_buffers(&ctx->ring, &iov, 1);
+	if (ret)
+		error(1, -ret, "register_buffers");
+
+	if (get_sqe(ctx, &sqe))
+		return -1;
+
+	/* API for provide_ifq_region:
+	 *  fd		= ifq_id identifier
+	 *  area	= mmap'd area
+	 *  len		= length of area
+	 *
+	 * area/len refer to a previously mapped buffer area
+	 */
+	io_uring_prep_rw(IORING_OP_PROVIDE_IFQ_REGION, sqe, ctx->ifq_id,
+			 area, FRAME_REGION_SIZE, 0);
+
+	/* buf_group -> buf_index, selects from user_bufs */
+	sqe->flags |= IOSQE_BUFFER_SELECT;
+	sqe->buf_group = 0;
+
+	io_uring_sqe_set_data64(sqe, BUFFERS + 1);
+
+	io_complete_sqe(&ctx->ring, sqe, "ifq region");
+
+	return 0;
+}
+
+/* adds one SQE for RECVZC */
+static int add_recvzc(struct ctx *ctx, int idx_sockfd)
+{
+	struct io_uring_sqe *sqe;
+	__u64 readlen, copy_bgid;
+
+	if (get_sqe(ctx, &sqe))
+		return -1;
+
+	/* API for RECV_ZC:
+	 *  fd		= sockfd (or registered file index)
+	 *  addr/len	= immediate metadata buffer.
+	 *		  not used if BUFFER_SELECT flag is set.
+	 *  buf_group	= group to obtain metadata buffer if BUFFER_SELECT.
+	 *  ioprio	= io_uring recvmsg flags (aka MULTISHOT)
+	 *  msg_flags	= recvmsg flags (MSG_DONTWAIT, etc)
+	 *  addr3	= <32>data_len | <16>copy_bgid | <ifq_id> 
+	 */
+
+	io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, idx_sockfd, NULL, 0, 0);
+	sqe->flags |= IOSQE_FIXED_FILE;
+
+	sqe->flags |= IOSQE_BUFFER_SELECT;
+	sqe->buf_group = 0;
+
+	readlen = 800000;
+	copy_bgid = 2;
+	sqe->addr3 = (readlen << 32) | (copy_bgid << 16) | ctx->ifq_id;
+
+	io_uring_sqe_set_data64(sqe, BUFFERS + 1);
+
+	return 0;
+}
+
+int main(int argc, char *argv[])
+{
+	struct ctx ctx = {
+		.af		= AF_INET6,
+		.buf_shift	= BUF_SHIFT,
+		.ifname		= "eth0",
+		.queue_id	= -1,
+	};
+	int ret;
+	int port = -1;
+	int sockfd, clientfd;
+	int opt;
+	struct io_uring_cqe *cqes[CQES];
+	unsigned int count, i;
+
+	while ((opt = getopt(argc, argv, "46b:i:p:q:uv")) != -1) {
+		switch (opt) {
+		case '4':
+			ctx.af = AF_INET;
+			break;
+		case '6':
+			ctx.af = AF_INET6;
+			break;
+		case 'b':
+			ctx.buf_shift = atoi(optarg);
+			break;
+		case 'i':
+			ctx.ifname = optarg;
+			break;
+		case 'p':
+			port = atoi(optarg);
+			break;
+		case 'q':
+			ctx.queue_id = atoi(optarg);
+			break;
+		case 'u':
+			ctx.udp = true;
+			break;
+		case 'v':
+			ctx.verbose = true;
+			break;
+		default:
+			fprintf(stderr, "Usage: %s [-4] [-6] [-p port] [-u] "
+					"[-i ifname] [-q queue_id] "
+					"[-b log2(BufferSize)] [-v]\n",
+					argv[0]);
+			exit(-1);
+		}
+	}
+
+	if (ctx.verbose) {
+		fprintf(stderr, "%s %s\n", 
+			ctx.af == AF_INET ? "IPv4" : "IPv6",
+			ctx.udp ? "UDP" : "TCP");
+	}
+
+	sockfd = setup_sock(&ctx, port);
+	if (sockfd < 0)
+		return 1;
+
+	if (setup_context(&ctx)) {
+		close(sockfd);
+		return 1;
+	}
+
+	ret = setup_fill_queue(&ctx);
+	if (ret)
+		return 1;
+
+	ret = io_zctap_ifq(&ctx);
+	if (ret)
+		return 1;
+
+	ret = io_provide_region(&ctx);
+	if (ret)
+		return 1;
+
+	ret = setup_copy_pool(&ctx);
+	if (ret)
+		return 1;
+
+	clientfd = sockfd;
+	if (!ctx.udp) {
+		ret = wait_accept(&ctx, sockfd, &clientfd);
+		if (ret) {
+			fprintf(stderr, "wait_accept: %s\n", strerror(-ret));
+			return -1;
+		}
+	}
+
+	/* optimization: register clientfd as file 0, avoiding lookups */
+	ret = io_uring_register_files(&ctx.ring, &clientfd, 1);
+	if (ret) {
+		fprintf(stderr, "register files: %s\n", strerror(-ret));
+		return -1;
+	}
+
+//	ret = add_recv(&ctx, 0);
+	ret = add_recvzc(&ctx, 0);
+	if (ret)
+		return 1;
+
+	while (true) {
+		ret = io_uring_submit_and_wait(&ctx.ring, 1);
+		if (ret == -EINTR)
+			continue;
+		if (ret < 0) {
+			fprintf(stderr, "submit and wait failed %d\n", ret);
+			break;
+		}
+
+		count = io_uring_peek_batch_cqe(&ctx.ring, &cqes[0], CQES);
+		for (i = 0; i < count; i++) {
+			ret = process_cqe(&ctx, cqes[i], 0);
+			if (ret)
+				goto cleanup;
+		}
+		io_uring_cq_advance(&ctx.ring, count);
+	}
+
+cleanup:
+	cleanup_context(&ctx);
+	close(sockfd);
+	return ret;
+}
diff --git a/src/include/liburing.h b/src/include/liburing.h
index 902f26a..d8aa6dc 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -235,6 +235,9 @@ int io_uring_register_sync_cancel(struct io_uring *ring,
 int io_uring_register_file_alloc_range(struct io_uring *ring,
 					unsigned off, unsigned len);
 
+int io_uring_register_ifq(struct io_uring *ring, int ifindex,
+			  __u16 *queue_id, __u16 *ifq_id, __u16 bgid);
+
 int io_uring_get_events(struct io_uring *ring);
 int io_uring_submit_and_get_events(struct io_uring *ring);
 
diff --git a/src/register.c b/src/register.c
index 0a2e5af..283a9f0 100644
--- a/src/register.c
+++ b/src/register.c
@@ -364,3 +364,32 @@ int io_uring_register_file_alloc_range(struct io_uring *ring,
 				       IORING_REGISTER_FILE_ALLOC_RANGE, &range,
 				       0);
 }
+
+#if 0 /* XXX temp development hack */
+#define IORING_REGISTER_IFQ		26
+
+struct io_uring_ifq_req {
+	__u32	ifindex;
+	__u16	queue_id;
+	__u16	ifq_id;
+	__u16	fill_bgid;
+        __u16   __pad[3];
+};
+#endif
+
+int io_uring_register_ifq(struct io_uring *ring, int ifindex,
+			  __u16 *queue_id, __u16 *ifq_id, __u16 bgid)
+{
+	struct io_uring_ifq_req reg = {
+		.ifindex = ifindex,
+		.queue_id = *queue_id,
+		.fill_bgid = bgid,
+	};
+	int ret;
+
+	ret = __sys_io_uring_register(ring->ring_fd, IORING_REGISTER_IFQ,
+				      &reg, 1);
+	*queue_id = reg.queue_id;
+	*ifq_id = reg.ifq_id;
+	return ret;
+}
-- 
2.30.2


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2022-10-07 21:20 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2022-10-07 21:19 [RFC] liburing: add an example for a TCP/UDP ZC RX server Jonathan Lemon

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox