public inbox for [email protected]
 help / color / mirror / Atom feed
* [PATCH liburing v2 0/5] zerocopy send headers and tests
@ 2022-07-25 11:33 Pavel Begunkov
  2022-07-25 11:33 ` [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers Pavel Begunkov
                   ` (6 more replies)
  0 siblings, 7 replies; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi

Add zerocopy send headers, helpers and tests

v2:
	use T_EXIT_*
	fix ptr <-> int conversions for 32 bits arches
	slight renaming
	get rid of error() in the test
	add patch 5/5

Pavel Begunkov (5):
  io_uring.h: sync with kernel for zc send and notifiers
  liburing: add zc send and notif helpers
  tests: add tests for zerocopy send and notifications
  examples: add a zerocopy send example
  liburing: improve fallocate typecasting

 examples/Makefile               |   3 +-
 examples/send-zerocopy.c        | 366 +++++++++++++
 src/include/liburing.h          |  47 +-
 src/include/liburing/io_uring.h |  37 +-
 src/liburing.map                |   2 +
 src/register.c                  |  20 +
 test/Makefile                   |   1 +
 test/send-zerocopy.c            | 888 ++++++++++++++++++++++++++++++++
 8 files changed, 1358 insertions(+), 6 deletions(-)
 create mode 100644 examples/send-zerocopy.c
 create mode 100644 test/send-zerocopy.c

-- 
2.37.0


^ permalink raw reply	[flat|nested] 10+ messages in thread

* [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers
  2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
  2022-07-25 11:33 ` [PATCH liburing v2 2/5] liburing: add zc send and notif helpers Pavel Begunkov
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi

Signed-off-by: Pavel Begunkov <[email protected]>
---
 src/include/liburing/io_uring.h | 37 +++++++++++++++++++++++++++++++--
 1 file changed, 35 insertions(+), 2 deletions(-)

diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index 99e6963..3953807 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -64,6 +64,10 @@ struct io_uring_sqe {
 	union {
 		__s32	splice_fd_in;
 		__u32	file_index;
+		struct {
+			__u16	notification_idx;
+			__u16	addr_len;
+		};
 	};
 	__u64	addr3;
 	__u64	__pad2[1];
@@ -160,7 +164,8 @@ enum io_uring_op {
 	IORING_OP_FALLOCATE,
 	IORING_OP_OPENAT,
 	IORING_OP_CLOSE,
-	IORING_OP_FILES_UPDATE,
+	IORING_OP_RSRC_UPDATE,
+	IORING_OP_FILES_UPDATE = IORING_OP_RSRC_UPDATE,
 	IORING_OP_STATX,
 	IORING_OP_READ,
 	IORING_OP_WRITE,
@@ -187,6 +192,7 @@ enum io_uring_op {
 	IORING_OP_GETXATTR,
 	IORING_OP_SOCKET,
 	IORING_OP_URING_CMD,
+	IORING_OP_SENDZC_NOTIF,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
@@ -208,6 +214,7 @@ enum io_uring_op {
 #define IORING_TIMEOUT_ETIME_SUCCESS	(1U << 5)
 #define IORING_TIMEOUT_CLOCK_MASK	(IORING_TIMEOUT_BOOTTIME | IORING_TIMEOUT_REALTIME)
 #define IORING_TIMEOUT_UPDATE_MASK	(IORING_TIMEOUT_UPDATE | IORING_LINK_TIMEOUT_UPDATE)
+
 /*
  * sqe->splice_flags
  * extends splice(2) flags
@@ -254,13 +261,23 @@ enum io_uring_op {
  *				CQEs on behalf of the same SQE.
  */
 #define IORING_RECVSEND_POLL_FIRST	(1U << 0)
-#define IORING_RECV_MULTISHOT	(1U << 1)
+#define IORING_RECV_MULTISHOT		(1U << 1)
+#define IORING_RECVSEND_FIXED_BUF	(1U << 2)
+#define IORING_RECVSEND_NOTIF_FLUSH	(1U << 3)
 
 /*
  * accept flags stored in sqe->ioprio
  */
 #define IORING_ACCEPT_MULTISHOT	(1U << 0)
 
+/*
+ * IORING_OP_RSRC_UPDATE flags
+ */
+enum {
+	IORING_RSRC_UPDATE_FILES,
+	IORING_RSRC_UPDATE_NOTIF,
+};
+
 /*
  * IO completion data structure (Completion Queue Entry)
  */
@@ -426,6 +443,9 @@ enum {
 	/* register a range of fixed file slots for automatic slot allocation */
 	IORING_REGISTER_FILE_ALLOC_RANGE	= 25,
 
+	IORING_REGISTER_NOTIFIERS		= 26,
+	IORING_UNREGISTER_NOTIFIERS		= 27,
+
 	/* this goes last */
 	IORING_REGISTER_LAST
 };
@@ -472,6 +492,19 @@ struct io_uring_rsrc_update2 {
 	__u32 resv2;
 };
 
+struct io_uring_notification_slot {
+	__u64 tag;
+	__u64 resv[3];
+};
+
+struct io_uring_notification_register {
+	__u32 nr_slots;
+	__u32 resv;
+	__u64 resv2;
+	__u64 data;
+	__u64 resv3;
+};
+
 /* Skip updating fd indexes set to this value in the fd table */
 #define IORING_REGISTER_FILES_SKIP	(-2)
 
-- 
2.37.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH liburing v2 2/5] liburing: add zc send and notif helpers
  2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
  2022-07-25 11:33 ` [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
  2022-07-25 12:35   ` Ammar Faizi
  2022-07-25 11:33 ` [PATCH liburing v2 3/5] tests: add tests for zerocopy send and notifications Pavel Begunkov
                   ` (4 subsequent siblings)
  6 siblings, 1 reply; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi

Add helpers for notification registration and preparing zerocopy send
requests.

Signed-off-by: Pavel Begunkov <[email protected]>
---
 src/include/liburing.h | 42 ++++++++++++++++++++++++++++++++++++++++++
 src/liburing.map       |  2 ++
 src/register.c         | 20 ++++++++++++++++++++
 3 files changed, 64 insertions(+)

diff --git a/src/include/liburing.h b/src/include/liburing.h
index fc7613d..20cd308 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -189,6 +189,10 @@ int io_uring_register_sync_cancel(struct io_uring *ring,
 int io_uring_register_file_alloc_range(struct io_uring *ring,
 					unsigned off, unsigned len);
 
+int io_uring_register_notifications(struct io_uring *ring, unsigned nr,
+				    struct io_uring_notification_slot *slots);
+int io_uring_unregister_notifications(struct io_uring *ring);
+
 /*
  * Helper for the peek/wait single cqe functions. Exported because of that,
  * but probably shouldn't be used directly in an application.
@@ -677,6 +681,44 @@ static inline void io_uring_prep_send(struct io_uring_sqe *sqe, int sockfd,
 	sqe->msg_flags = (__u32) flags;
 }
 
+static inline void io_uring_prep_sendzc(struct io_uring_sqe *sqe, int sockfd,
+				        const void *buf, size_t len, int flags,
+				        unsigned slot_idx, unsigned zc_flags)
+{
+	io_uring_prep_rw(IORING_OP_SENDZC_NOTIF, sqe, sockfd, buf, (__u32) len, 0);
+	sqe->msg_flags = (__u32) flags;
+	sqe->notification_idx = slot_idx;
+	sqe->ioprio = zc_flags;
+}
+
+static inline void io_uring_prep_sendzc_fixed(struct io_uring_sqe *sqe, int sockfd,
+					      const void *buf, size_t len,
+					      int flags, unsigned slot_idx,
+					      unsigned zc_flags, unsigned buf_idx)
+{
+	io_uring_prep_sendzc(sqe, sockfd, buf, len, flags, slot_idx, zc_flags);
+	sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
+	sqe->buf_index = buf_idx;
+}
+
+static inline void io_uring_prep_sendzc_set_addr(struct io_uring_sqe *sqe,
+						 const struct sockaddr *dest_addr,
+						 __u16 addr_len)
+{
+	sqe->addr2 = (unsigned long)(void *)dest_addr;
+	sqe->addr_len = addr_len;
+}
+
+static inline void io_uring_prep_notif_update(struct io_uring_sqe *sqe,
+					      __u64 new_tag, /* 0 to ignore */
+					      unsigned offset, unsigned nr)
+{
+	io_uring_prep_rw(IORING_OP_FILES_UPDATE, sqe, -1, 0, nr,
+			 (__u64)offset);
+	sqe->addr = new_tag;
+	sqe->ioprio = IORING_RSRC_UPDATE_NOTIF;
+}
+
 static inline void io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd,
 				      void *buf, size_t len, int flags)
 {
diff --git a/src/liburing.map b/src/liburing.map
index 318d3d7..7d8f143 100644
--- a/src/liburing.map
+++ b/src/liburing.map
@@ -60,4 +60,6 @@ LIBURING_2.3 {
 	global:
 		io_uring_register_sync_cancel;
 		io_uring_register_file_alloc_range;
+		io_uring_register_notifications;
+		io_uring_unregister_notifications;
 } LIBURING_2.2;
diff --git a/src/register.c b/src/register.c
index 2b37e5f..7482112 100644
--- a/src/register.c
+++ b/src/register.c
@@ -364,3 +364,23 @@ int io_uring_register_file_alloc_range(struct io_uring *ring,
 				       IORING_REGISTER_FILE_ALLOC_RANGE, &range,
 				       0);
 }
+
+int io_uring_register_notifications(struct io_uring *ring, unsigned nr,
+				    struct io_uring_notification_slot *slots)
+{
+	struct io_uring_notification_register r = {
+		.nr_slots = nr,
+		.data = (unsigned long)slots,
+	};
+
+	return __sys_io_uring_register(ring->ring_fd,
+				       IORING_REGISTER_NOTIFIERS,
+				       &r, sizeof(r));
+}
+
+int io_uring_unregister_notifications(struct io_uring *ring)
+{
+	return __sys_io_uring_register(ring->ring_fd,
+				       IORING_UNREGISTER_NOTIFIERS,
+				       NULL, 0);
+}
-- 
2.37.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH liburing v2 3/5] tests: add tests for zerocopy send and notifications
  2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
  2022-07-25 11:33 ` [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers Pavel Begunkov
  2022-07-25 11:33 ` [PATCH liburing v2 2/5] liburing: add zc send and notif helpers Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
  2022-07-25 11:33 ` [PATCH liburing v2 4/5] examples: add a zerocopy send example Pavel Begunkov
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi

Signed-off-by: Pavel Begunkov <[email protected]>
---
 test/Makefile        |   1 +
 test/send-zerocopy.c | 888 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 889 insertions(+)
 create mode 100644 test/send-zerocopy.c

diff --git a/test/Makefile b/test/Makefile
index 8945368..a36ddb3 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -175,6 +175,7 @@ test_srcs := \
 	xattr.c \
 	skip-cqe.c \
 	single-issuer.c \
+	send-zerocopy.c \
 	# EOL
 
 all_targets :=
diff --git a/test/send-zerocopy.c b/test/send-zerocopy.c
new file mode 100644
index 0000000..6fa0535
--- /dev/null
+++ b/test/send-zerocopy.c
@@ -0,0 +1,888 @@
+/* SPDX-License-Identifier: MIT */
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <assert.h>
+#include <errno.h>
+#include <error.h>
+#include <limits.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <arpa/inet.h>
+#include <linux/errqueue.h>
+#include <linux/if_packet.h>
+#include <linux/ipv6.h>
+#include <linux/socket.h>
+#include <linux/sockios.h>
+#include <net/ethernet.h>
+#include <net/if.h>
+#include <netinet/ip.h>
+#include <netinet/in.h>
+#include <netinet/ip6.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <sys/un.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "liburing.h"
+#include "helpers.h"
+
+#define MAX_MSG	128
+
+#define PORT	10200
+#define HOST	"127.0.0.1"
+#define HOSTV6	"::1"
+
+#define NR_SLOTS 5
+#define ZC_TAG 10000
+#define MAX_PAYLOAD 8195
+#define BUFFER_OFFSET 41
+
+#ifndef ARRAY_SIZE
+	#define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0]))
+#endif
+
+static int seqs[NR_SLOTS];
+static char tx_buffer[MAX_PAYLOAD] __attribute__((aligned(4096)));
+static char rx_buffer[MAX_PAYLOAD] __attribute__((aligned(4096)));
+static struct iovec buffers_iov[] = {
+	{ .iov_base = tx_buffer,
+	  .iov_len = sizeof(tx_buffer), },
+	{ .iov_base = tx_buffer + BUFFER_OFFSET,
+	  .iov_len = sizeof(tx_buffer) - BUFFER_OFFSET - 13, },
+};
+
+static inline bool tag_userdata(__u64 user_data)
+{
+	return ZC_TAG <= user_data && user_data < ZC_TAG + NR_SLOTS;
+}
+
+static bool check_cq_empty(struct io_uring *ring)
+{
+	struct io_uring_cqe *cqe = NULL;
+	int ret;
+
+	ret = io_uring_peek_cqe(ring, &cqe); /* nothing should be there */
+	return ret == -EAGAIN;
+}
+
+static int register_notifications(struct io_uring *ring)
+{
+	struct io_uring_notification_slot slots[NR_SLOTS] = {};
+	int i;
+
+	memset(seqs, 0, sizeof(seqs));
+	for (i = 0; i < NR_SLOTS; i++)
+		slots[i].tag = ZC_TAG + i;
+	return io_uring_register_notifications(ring, NR_SLOTS, slots);
+}
+
+static int reregister_notifications(struct io_uring *ring)
+{
+	int ret;
+
+	ret = io_uring_unregister_notifications(ring);
+	if (ret) {
+		fprintf(stderr, "unreg notifiers failed %i\n", ret);
+		return ret;
+	}
+
+	return register_notifications(ring);
+}
+
+static int do_one(struct io_uring *ring, int sock_tx, int slot_idx)
+{
+	struct io_uring_sqe *sqe;
+	struct io_uring_cqe *cqe;
+	int msg_flags = 0;
+	unsigned zc_flags = 0;
+	int ret;
+
+	sqe = io_uring_get_sqe(ring);
+	io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, 1, msg_flags,
+			     slot_idx, zc_flags);
+	sqe->user_data = 1;
+
+	ret = io_uring_submit(ring);
+	assert(ret == 1);
+	ret = io_uring_wait_cqe(ring, &cqe);
+	assert(!ret);
+	assert(cqe->user_data == 1);
+	ret = cqe->res;
+	io_uring_cqe_seen(ring, cqe);
+	assert(check_cq_empty(ring));
+	return ret;
+}
+
+static int test_invalid_slot(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+	int ret;
+
+	ret = do_one(ring, sock_tx, NR_SLOTS);
+	assert(ret == -EINVAL);
+	return 0;
+}
+
+static int test_basic_send(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+	struct io_uring_sqe *sqe;
+	struct io_uring_cqe *cqe;
+	int msg_flags = 0;
+	int slot_idx = 0;
+	unsigned zc_flags = 0;
+	int payload_size = 100;
+	int ret;
+
+	sqe = io_uring_get_sqe(ring);
+	io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size, msg_flags,
+			     slot_idx, zc_flags);
+	sqe->user_data = 1;
+
+	ret = io_uring_submit(ring);
+	assert(ret == 1);
+	ret = io_uring_wait_cqe(ring, &cqe);
+	assert(!ret);
+	assert(cqe->user_data == 1 && cqe->res >= 0);
+	io_uring_cqe_seen(ring, cqe);
+	assert(check_cq_empty(ring));
+
+	ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+	assert(ret == payload_size);
+	return 0;
+}
+
+static int test_send_flush(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+	struct io_uring_sqe *sqe;
+	struct io_uring_cqe *cqe;
+	int msg_flags = 0;
+	int slot_idx = 0;
+	unsigned zc_flags = 0;
+	int payload_size = 100;
+	int ret, i, j;
+	int req_cqes, notif_cqes;
+
+	/* now do send+flush, do many times to verify seqs */
+	for (j = 0; j < NR_SLOTS * 5; j++) {
+		zc_flags = IORING_RECVSEND_NOTIF_FLUSH;
+		slot_idx = rand() % NR_SLOTS;
+		sqe = io_uring_get_sqe(ring);
+		io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size,
+				     msg_flags, slot_idx, zc_flags);
+		sqe->user_data = 1;
+
+		ret = io_uring_submit(ring);
+		assert(ret == 1);
+
+		req_cqes = notif_cqes = 1;
+		for (i = 0; i < 2; i ++) {
+			ret = io_uring_wait_cqe(ring, &cqe);
+			assert(!ret);
+
+			if (cqe->user_data == 1) {
+				assert(req_cqes > 0);
+				req_cqes--;
+				assert(cqe->res == payload_size);
+			} else if (cqe->user_data == ZC_TAG + slot_idx) {
+				assert(notif_cqes > 0);
+				notif_cqes--;
+				assert(cqe->res == 0 && cqe->flags == seqs[slot_idx]);
+				seqs[slot_idx]++;
+			} else {
+				fprintf(stderr, "invalid cqe %lu %i\n",
+					(unsigned long)cqe->user_data, cqe->res);
+				return -1;
+			}
+			io_uring_cqe_seen(ring, cqe);
+		}
+		assert(check_cq_empty(ring));
+
+		ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+		assert(ret == payload_size);
+	}
+	return 0;
+}
+
+static int test_multireq_notif(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+	bool slot_seen[NR_SLOTS] = {};
+	struct io_uring_sqe *sqe;
+	struct io_uring_cqe *cqe;
+	int msg_flags = 0;
+	int slot_idx = 0;
+	unsigned zc_flags = 0;
+	int payload_size = 1;
+	int ret, j, i = 0;
+	int nr = NR_SLOTS * 21;
+
+	while (i < nr) {
+		int nr_per_wave = 23;
+
+		for (j = 0; j < nr_per_wave && i < nr; j++, i++) {
+			slot_idx = rand() % NR_SLOTS;
+			sqe = io_uring_get_sqe(ring);
+			io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size,
+					     msg_flags, slot_idx, zc_flags);
+			sqe->user_data = i;
+		}
+		ret = io_uring_submit(ring);
+		assert(ret == j);
+	}
+
+	for (i = 0; i < nr; i++) {
+		ret = io_uring_wait_cqe(ring, &cqe);
+		assert(!ret);
+		assert(cqe->user_data < nr && cqe->res == payload_size);
+		io_uring_cqe_seen(ring, cqe);
+
+		ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+		assert(ret == payload_size);
+	}
+	assert(check_cq_empty(ring));
+
+	zc_flags = IORING_RECVSEND_NOTIF_FLUSH;
+	for (slot_idx = 0; slot_idx < NR_SLOTS; slot_idx++) {
+		sqe = io_uring_get_sqe(ring);
+		io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size,
+				     msg_flags, slot_idx, zc_flags);
+		sqe->user_data = slot_idx;
+		/* just to simplify cqe handling */
+		sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+	}
+	ret = io_uring_submit(ring);
+	assert(ret == NR_SLOTS);
+
+	for (i = 0; i < NR_SLOTS; i++) {
+		int slot_idx;
+
+		ret = io_uring_wait_cqe(ring, &cqe);
+		assert(!ret);
+		assert(tag_userdata(cqe->user_data));
+
+		slot_idx = cqe->user_data - ZC_TAG;
+		assert(!slot_seen[slot_idx]);
+		slot_seen[slot_idx] = true;
+
+		assert(cqe->res == 0 && cqe->flags == seqs[slot_idx]);
+		seqs[slot_idx]++;
+		io_uring_cqe_seen(ring, cqe);
+
+		ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+		assert(ret == payload_size);
+	}
+	assert(check_cq_empty(ring));
+
+	for (i = 0; i < NR_SLOTS; i++)
+		assert(slot_seen[i]);
+	return 0;
+}
+
+static int test_multi_send_flushing(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+	struct io_uring_sqe *sqe;
+	struct io_uring_cqe *cqe;
+	unsigned zc_flags = IORING_RECVSEND_NOTIF_FLUSH;
+	int msg_flags = 0, slot_idx = 0;
+	int payload_size = 1;
+	int ret, j, i = 0;
+	int nr = NR_SLOTS * 30;
+	unsigned long long check = 0, expected = 0;
+
+	while (i < nr) {
+		int nr_per_wave = 25;
+
+		for (j = 0; j < nr_per_wave && i < nr; j++, i++) {
+			sqe = io_uring_get_sqe(ring);
+			io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size,
+					     msg_flags, slot_idx, zc_flags);
+			sqe->user_data = 1;
+			sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+		}
+		ret = io_uring_submit(ring);
+		assert(ret == j);
+	}
+
+	for (i = 0; i < nr; i++) {
+		int seq;
+
+		ret = io_uring_wait_cqe(ring, &cqe);
+		assert(!ret);
+		assert(!cqe->res);
+		assert(tag_userdata(cqe->user_data));
+
+		seq = cqe->flags;
+		check += seq * 100007UL;
+		io_uring_cqe_seen(ring, cqe);
+
+		ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+		assert(ret == payload_size);
+	}
+	assert(check_cq_empty(ring));
+
+	for (i = 0; i < nr; i++)
+		expected += (i + seqs[slot_idx]) * 100007UL;
+	assert(check == expected);
+	seqs[slot_idx] += nr;
+	return 0;
+}
+
+static int do_one_fail_notif_flush(struct io_uring *ring, int off, int nr)
+{
+	struct io_uring_cqe *cqe;
+	struct io_uring_sqe *sqe;
+	int ret;
+
+	/* single out-of-bounds slot */
+	sqe = io_uring_get_sqe(ring);
+	io_uring_prep_notif_update(sqe, 0, off, nr);
+	sqe->user_data = 1;
+	ret = io_uring_submit(ring);
+	assert(ret == 1);
+	ret = io_uring_wait_cqe(ring, &cqe);
+	assert(!ret && cqe->user_data == 1);
+	ret = cqe->res;
+	io_uring_cqe_seen(ring, cqe);
+	return ret;
+}
+
+static int test_update_flush_fail(struct io_uring *ring)
+{
+	int ret;
+
+	/* single out-of-bounds slot */
+	ret = do_one_fail_notif_flush(ring, NR_SLOTS, 1);
+	assert(ret == -EINVAL);
+
+	/* out-of-bounds range */
+	ret = do_one_fail_notif_flush(ring, 0, NR_SLOTS + 3);
+	assert(ret == -EINVAL);
+	ret = do_one_fail_notif_flush(ring, NR_SLOTS - 1, 2);
+	assert(ret == -EINVAL);
+
+	/* overflow checks, note it's u32 internally */
+	ret = do_one_fail_notif_flush(ring, ~(__u32)0, 1);
+	assert(ret == -EOVERFLOW);
+	ret = do_one_fail_notif_flush(ring, NR_SLOTS - 1, ~(__u32)0);
+	assert(ret == -EOVERFLOW);
+	return 0;
+}
+
+static void do_one_consume(struct io_uring *ring, int sock_tx, int sock_rx,
+			  int slot_idx)
+{
+	int ret;
+
+	ret = do_one(ring, sock_tx, slot_idx);
+	assert(ret == 1);
+
+	ret = recv(sock_rx, rx_buffer, 1, MSG_TRUNC);
+	assert(ret == 1);
+}
+
+static int test_update_flush(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+	struct io_uring_sqe *sqe;
+	struct io_uring_cqe *cqe;
+	int offset = 1, nr_to_flush = 3;
+	int ret, i, slot_idx;
+
+	/*
+	 * Flush will be skipped for unused slots, so attached at least 1 req
+	 * to each active notifier / slot
+	 */
+	for (slot_idx = 0; slot_idx < NR_SLOTS; slot_idx++)
+		do_one_consume(ring, sock_tx, sock_rx, slot_idx);
+
+	assert(check_cq_empty(ring));
+
+	/* flush first */
+	sqe = io_uring_get_sqe(ring);
+	io_uring_prep_notif_update(sqe, 0, 0, 1);
+	sqe->user_data = 1;
+	sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+	ret = io_uring_submit(ring);
+	assert(ret == 1);
+
+	ret = io_uring_wait_cqe(ring, &cqe);
+	assert(!ret && !cqe->res && cqe->user_data == ZC_TAG);
+	assert(cqe->flags == seqs[0]);
+	seqs[0]++;
+	io_uring_cqe_seen(ring, cqe);
+	do_one_consume(ring, sock_tx, sock_rx, 0);
+	assert(check_cq_empty(ring));
+
+	/* flush last */
+	sqe = io_uring_get_sqe(ring);
+	io_uring_prep_notif_update(sqe, 0, NR_SLOTS - 1, 1);
+	sqe->user_data = 1;
+	sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+	ret = io_uring_submit(ring);
+	assert(ret == 1);
+
+	ret = io_uring_wait_cqe(ring, &cqe);
+	assert(!ret && !cqe->res && cqe->user_data == ZC_TAG + NR_SLOTS - 1);
+	assert(cqe->flags == seqs[NR_SLOTS - 1]);
+	seqs[NR_SLOTS - 1]++;
+	io_uring_cqe_seen(ring, cqe);
+	assert(check_cq_empty(ring));
+
+	/* we left the last slot without attached requests, flush should ignore it */
+	sqe = io_uring_get_sqe(ring);
+	io_uring_prep_notif_update(sqe, 0, NR_SLOTS - 1, 1);
+	sqe->user_data = 1;
+	ret = io_uring_submit(ring);
+	assert(ret == 1);
+
+	ret = io_uring_wait_cqe(ring, &cqe);
+	assert(!ret && !cqe->res && cqe->user_data == 1);
+	io_uring_cqe_seen(ring, cqe);
+	assert(check_cq_empty(ring));
+
+	/* flush range */
+	sqe = io_uring_get_sqe(ring);
+	io_uring_prep_notif_update(sqe, 0, offset, nr_to_flush);
+	sqe->user_data = 1;
+	sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+	ret = io_uring_submit(ring);
+	assert(ret == 1);
+
+	for (i = 0; i < nr_to_flush; i++) {
+		int slot_idx;
+
+		ret = io_uring_wait_cqe(ring, &cqe);
+		assert(!ret && !cqe->res);
+		assert(ZC_TAG + offset <= cqe->user_data &&
+		       cqe->user_data < ZC_TAG + offset + nr_to_flush);
+		slot_idx = cqe->user_data - ZC_TAG;
+		assert(cqe->flags == seqs[slot_idx]);
+		seqs[slot_idx]++;
+		io_uring_cqe_seen(ring, cqe);
+	}
+	assert(check_cq_empty(ring));
+	return 0;
+}
+
+static int test_registration(int sock_tx, int sock_rx)
+{
+	struct io_uring_notification_slot slots[2] = {
+		{.tag = 1}, {.tag = 2},
+	};
+	void *invalid_slots = (void *)1UL;
+	struct io_uring ring;
+	int ret, i;
+
+	ret = io_uring_queue_init(4, &ring, 0);
+	if (ret) {
+		fprintf(stderr, "queue init failed: %d\n", ret);
+		return 1;
+	}
+
+	ret = io_uring_unregister_notifications(&ring);
+	if (ret != -ENXIO) {
+		fprintf(stderr, "unregister nothing: %d\n", ret);
+		return 1;
+	}
+
+	ret = io_uring_register_notifications(&ring, 2, slots);
+	if (ret) {
+		fprintf(stderr, "io_uring_register_notifications failed: %d\n", ret);
+		return 1;
+	}
+
+	ret = io_uring_register_notifications(&ring, 2, slots);
+	if (ret != -EBUSY) {
+		fprintf(stderr, "double register: %d\n", ret);
+		return 1;
+	}
+
+	ret = io_uring_unregister_notifications(&ring);
+	if (ret) {
+		fprintf(stderr, "unregister failed: %d\n", ret);
+		return 1;
+	}
+
+	ret = io_uring_register_notifications(&ring, 2, slots);
+	if (ret) {
+		fprintf(stderr, "second register failed: %d\n", ret);
+		return 1;
+	}
+
+	ret = test_invalid_slot(&ring, sock_tx, sock_rx);
+	if (ret) {
+		fprintf(stderr, "test_invalid_slot() failed\n");
+		return ret;
+	}
+
+	for (i = 0; i < 2; i++) {
+		ret = do_one(&ring, sock_tx, 0);
+		assert(ret == 1);
+
+		ret = recv(sock_rx, rx_buffer, 1, MSG_TRUNC);
+		assert(ret == 1);
+	}
+
+	io_uring_queue_exit(&ring);
+	ret = io_uring_queue_init(4, &ring, 0);
+	if (ret) {
+		fprintf(stderr, "queue init failed: %d\n", ret);
+		return 1;
+	}
+
+	ret = io_uring_register_notifications(&ring, 4, invalid_slots);
+	if (ret != -EFAULT) {
+		fprintf(stderr, "io_uring_register_notifications with invalid ptr: %d\n", ret);
+		return 1;
+	}
+
+	io_uring_queue_exit(&ring);
+	return 0;
+}
+
+static int prepare_ip(struct sockaddr_storage *addr, int *sock_client, int *sock_server,
+		      bool ipv6, bool client_connect, bool msg_zc)
+{
+	int family, addr_size;
+	int ret, val;
+
+	memset(addr, 0, sizeof(*addr));
+	if (ipv6) {
+		struct sockaddr_in6 *saddr = (struct sockaddr_in6 *)addr;
+
+		family = AF_INET6;
+		saddr->sin6_family = family;
+		saddr->sin6_port = htons(PORT);
+		addr_size = sizeof(*saddr);
+	} else {
+		struct sockaddr_in *saddr = (struct sockaddr_in *)addr;
+
+		family = AF_INET;
+		saddr->sin_family = family;
+		saddr->sin_port = htons(PORT);
+		saddr->sin_addr.s_addr = htonl(INADDR_ANY);
+		addr_size = sizeof(*saddr);
+	}
+
+	/* server sock setup */
+	*sock_server = socket(family, SOCK_DGRAM, 0);
+	if (*sock_server < 0) {
+		perror("socket");
+		return 1;
+	}
+	val = 1;
+	setsockopt(*sock_server, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
+	ret = bind(*sock_server, (struct sockaddr *)addr, addr_size);
+	if (ret < 0) {
+		perror("bind");
+		return 1;
+	}
+
+	if (ipv6) {
+		struct sockaddr_in6 *saddr = (struct sockaddr_in6 *)addr;
+
+		inet_pton(AF_INET6, HOSTV6, &(saddr->sin6_addr));
+	} else {
+		struct sockaddr_in *saddr = (struct sockaddr_in *)addr;
+
+		inet_pton(AF_INET, HOST, &saddr->sin_addr);
+	}
+
+	/* client sock setup */
+	*sock_client = socket(family, SOCK_DGRAM, 0);
+	if (*sock_client < 0) {
+		perror("socket");
+		return 1;
+	}
+	if (client_connect) {
+		ret = connect(*sock_client, (struct sockaddr *)addr, addr_size);
+		if (ret < 0) {
+			perror("connect");
+			return 1;
+		}
+	}
+	if (msg_zc) {
+		val = 1;
+		if (setsockopt(*sock_client, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val))) {
+			perror("setsockopt zc");
+			return 1;
+		}
+	}
+	return 0;
+}
+
+static int do_test_inet_send(struct io_uring *ring, int sock_client, int sock_server,
+			     bool fixed_buf, struct sockaddr_storage *addr,
+			     size_t send_size, bool cork, bool mix_register,
+			     int buf_idx)
+{
+	const unsigned slot_idx = 0;
+	const unsigned zc_flags = 0;
+	struct io_uring_sqe *sqe;
+	struct io_uring_cqe *cqe;
+	int nr_reqs = cork ? 5 : 1;
+	int i, ret;
+	size_t chunk_size = send_size / nr_reqs;
+	size_t chunk_size_last = send_size - chunk_size * (nr_reqs - 1);
+	char *buf = buffers_iov[buf_idx].iov_base;
+
+	assert(send_size <= buffers_iov[buf_idx].iov_len);
+	memset(rx_buffer, 0, sizeof(rx_buffer));
+
+	for (i = 0; i < nr_reqs; i++) {
+		bool cur_fixed_buf = fixed_buf;
+		size_t cur_size = chunk_size;
+		int msg_flags = 0;
+
+		if (mix_register)
+			cur_fixed_buf = rand() & 1;
+
+		if (cork && i != nr_reqs - 1)
+			msg_flags = MSG_MORE;
+		if (i == nr_reqs - 1)
+			cur_size = chunk_size_last;
+
+		sqe = io_uring_get_sqe(ring);
+		if (cur_fixed_buf)
+			io_uring_prep_sendzc_fixed(sqe, sock_client,
+					     buf + i * chunk_size,
+					     cur_size, msg_flags, slot_idx,
+					     zc_flags, buf_idx);
+		else
+			io_uring_prep_sendzc(sqe, sock_client,
+					     buf + i * chunk_size,
+					     cur_size, msg_flags, slot_idx,
+					     zc_flags);
+
+		if (addr) {
+			sa_family_t fam = ((struct sockaddr_in *)addr)->sin_family;
+			int addr_len = fam == AF_INET ? sizeof(struct sockaddr_in) :
+							sizeof(struct sockaddr_in6);
+
+			io_uring_prep_sendzc_set_addr(sqe, (const struct sockaddr *)addr,
+						      addr_len);
+		}
+		sqe->user_data = i;
+	}
+
+	ret = io_uring_submit(ring);
+	if (ret != nr_reqs) {
+		fprintf(stderr, "submit failed %i expected %i\n", ret, nr_reqs);
+		return 1;
+	}
+
+	for (i = 0; i < nr_reqs; i++) {
+		int expected = chunk_size;
+
+		ret = io_uring_wait_cqe(ring, &cqe);
+		if (ret) {
+			fprintf(stderr, "io_uring_wait_cqe failed %i\n", ret);
+			return 1;
+		}
+		if (cqe->user_data >= nr_reqs) {
+			fprintf(stderr, "invalid user_data\n");
+			return 1;
+		}
+		if (cqe->user_data == nr_reqs - 1)
+			expected = chunk_size_last;
+		if (cqe->res != expected) {
+			fprintf(stderr, "invalid cqe->res %d expected %d\n",
+					 cqe->res, expected);
+			return 1;
+		}
+		io_uring_cqe_seen(ring, cqe);
+	}
+
+	ret = recv(sock_server, rx_buffer, send_size, 0);
+	if (ret != send_size) {
+		fprintf(stderr, "recv less than expected or recv failed %i\n", ret);
+		return 1;
+	}
+
+	for (i = 0; i < send_size; i++) {
+		if (buf[i] != rx_buffer[i]) {
+			fprintf(stderr, "botched data, first byte %i, %u vs %u\n",
+				i, buf[i], rx_buffer[i]);
+		}
+	}
+	return 0;
+}
+
+static int test_inet_send(struct io_uring *ring)
+{
+	struct sockaddr_storage addr;
+	int sock_client, sock_server;
+	int ret, j;
+	__u64 i;
+
+	for (j = 0; j < 8; j++) {
+		bool ipv6 = j & 1;
+		bool client_connect = j & 2;
+		bool msg_zc_set = j & 4;
+
+		ret = prepare_ip(&addr, &sock_client, &sock_server, ipv6,
+				 client_connect, msg_zc_set);
+		if (ret) {
+			fprintf(stderr, "sock prep failed %d\n", ret);
+			return 1;
+		}
+
+		for (i = 0; i < 64; i++) {
+			bool fixed_buf = i & 1;
+			struct sockaddr_storage *addr_arg = (i & 2) ? &addr : NULL;
+			size_t size = (i & 4) ? 137 : 4096;
+			bool cork = i & 8;
+			bool mix_register = i & 16;
+			bool aligned = i & 32;
+			int buf_idx = aligned ? 0 : 1;
+
+			if (mix_register && (!cork || fixed_buf))
+				continue;
+			if (!client_connect && addr_arg == NULL)
+				continue;
+
+			ret = do_test_inet_send(ring, sock_client, sock_server, fixed_buf,
+						addr_arg, size, cork, mix_register,
+						buf_idx);
+			if (ret) {
+				fprintf(stderr, "send failed fixed buf %i, conn %i, addr %i, "
+					"cork %i\n",
+					fixed_buf, client_connect, !!addr_arg,
+					cork);
+				return 1;
+			}
+		}
+
+		close(sock_client);
+		close(sock_server);
+	}
+	return 0;
+}
+
+int main(int argc, char *argv[])
+{
+	struct io_uring ring;
+	int i, ret, sp[2];
+
+	if (argc > 1)
+		return T_EXIT_SKIP;
+
+	ret = io_uring_queue_init(32, &ring, 0);
+	if (ret) {
+		fprintf(stderr, "queue init failed: %d\n", ret);
+		return T_EXIT_FAIL;
+	}
+
+	ret = register_notifications(&ring);
+	if (ret == -EINVAL) {
+		printf("sendzc is not supported, skip\n");
+		return T_EXIT_SKIP;
+	} else if (ret) {
+		fprintf(stderr, "register notif failed %i\n", ret);
+		return T_EXIT_FAIL;
+	}
+
+	srand((unsigned)time(NULL));
+	for (i = 0; i < sizeof(tx_buffer); i++)
+		tx_buffer[i] = i;
+
+	if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sp) != 0) {
+		perror("Failed to create Unix-domain socket pair\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_registration(sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_registration() failed\n");
+		return ret;
+	}
+
+	ret = test_invalid_slot(&ring, sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_invalid_slot() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_basic_send(&ring, sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_basic_send() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_send_flush(&ring, sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_send_flush() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_multireq_notif(&ring, sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_multireq_notif() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = reregister_notifications(&ring);
+	if (ret) {
+		fprintf(stderr, "reregister notifiers failed %i\n", ret);
+		return T_EXIT_FAIL;
+	}
+	/* retry a few tests after registering notifs */
+	ret = test_invalid_slot(&ring, sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_invalid_slot() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_multireq_notif(&ring, sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_multireq_notif2() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_multi_send_flushing(&ring, sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_multi_send_flushing() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_update_flush_fail(&ring);
+	if (ret) {
+		fprintf(stderr, "test_update_flush_fail() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_update_flush(&ring, sp[0], sp[1]);
+	if (ret) {
+		fprintf(stderr, "test_update_flush() failed\n");
+		return T_EXIT_FAIL;
+	}
+
+	ret = t_register_buffers(&ring, buffers_iov, ARRAY_SIZE(buffers_iov));
+	if (ret == T_SETUP_SKIP) {
+		fprintf(stderr, "can't register bufs, skip\n");
+		goto out;
+	} else if (ret != T_SETUP_OK) {
+		fprintf(stderr, "buffer registration failed %i\n", ret);
+		return T_EXIT_FAIL;
+	}
+
+	ret = test_inet_send(&ring);
+	if (ret) {
+		fprintf(stderr, "test_inet_send() failed\n");
+		return ret;
+	}
+out:
+	io_uring_queue_exit(&ring);
+	close(sp[0]);
+	close(sp[1]);
+	return T_EXIT_PASS;
+}
-- 
2.37.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH liburing v2 4/5] examples: add a zerocopy send example
  2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
                   ` (2 preceding siblings ...)
  2022-07-25 11:33 ` [PATCH liburing v2 3/5] tests: add tests for zerocopy send and notifications Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
  2022-07-25 11:33 ` [PATCH liburing v2 5/5] liburing: improve fallocate typecasting Pavel Begunkov
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi

Signed-off-by: Pavel Begunkov <[email protected]>
---
 examples/Makefile        |   3 +-
 examples/send-zerocopy.c | 366 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 368 insertions(+), 1 deletion(-)
 create mode 100644 examples/send-zerocopy.c

diff --git a/examples/Makefile b/examples/Makefile
index 8e7067f..1997a31 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -14,7 +14,8 @@ example_srcs := \
 	io_uring-cp.c \
 	io_uring-test.c \
 	link-cp.c \
-	poll-bench.c
+	poll-bench.c \
+	send-zerocopy.c
 
 all_targets :=
 
diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
new file mode 100644
index 0000000..e42aa71
--- /dev/null
+++ b/examples/send-zerocopy.c
@@ -0,0 +1,366 @@
+/* SPDX-License-Identifier: MIT */
+/* based on linux-kernel/tools/testing/selftests/net/msg_zerocopy.c */
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <assert.h>
+#include <errno.h>
+#include <error.h>
+#include <limits.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <arpa/inet.h>
+#include <linux/errqueue.h>
+#include <linux/if_packet.h>
+#include <linux/ipv6.h>
+#include <linux/socket.h>
+#include <linux/sockios.h>
+#include <net/ethernet.h>
+#include <net/if.h>
+#include <netinet/ip.h>
+#include <netinet/in.h>
+#include <netinet/ip6.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <sys/un.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "liburing.h"
+
+#define ZC_TAG 0xfffffffULL
+#define MAX_SUBMIT_NR 512
+
+static bool cfg_reg_ringfd = true;
+static bool cfg_fixed_files = 1;
+static bool cfg_zc = 1;
+static bool cfg_flush = 0;
+static int  cfg_nr_reqs = 8;
+static bool cfg_fixed_buf = 1;
+
+static int  cfg_family		= PF_UNSPEC;
+static int  cfg_payload_len;
+static int  cfg_port		= 8000;
+static int  cfg_runtime_ms	= 4200;
+
+static socklen_t cfg_alen;
+static struct sockaddr_storage cfg_dst_addr;
+
+static char payload[IP_MAXPACKET] __attribute__((aligned(4096)));
+
+static unsigned long gettimeofday_ms(void)
+{
+	struct timeval tv;
+
+	gettimeofday(&tv, NULL);
+	return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+}
+
+static void do_setsockopt(int fd, int level, int optname, int val)
+{
+	if (setsockopt(fd, level, optname, &val, sizeof(val)))
+		error(1, errno, "setsockopt %d.%d: %d", level, optname, val);
+}
+
+static void setup_sockaddr(int domain, const char *str_addr,
+			   struct sockaddr_storage *sockaddr)
+{
+	struct sockaddr_in6 *addr6 = (void *) sockaddr;
+	struct sockaddr_in *addr4 = (void *) sockaddr;
+
+	switch (domain) {
+	case PF_INET:
+		memset(addr4, 0, sizeof(*addr4));
+		addr4->sin_family = AF_INET;
+		addr4->sin_port = htons(cfg_port);
+		if (str_addr &&
+		    inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
+			error(1, 0, "ipv4 parse error: %s", str_addr);
+		break;
+	case PF_INET6:
+		memset(addr6, 0, sizeof(*addr6));
+		addr6->sin6_family = AF_INET6;
+		addr6->sin6_port = htons(cfg_port);
+		if (str_addr &&
+		    inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
+			error(1, 0, "ipv6 parse error: %s", str_addr);
+		break;
+	default:
+		error(1, 0, "illegal domain");
+	}
+}
+
+static int do_setup_tx(int domain, int type, int protocol)
+{
+	int fd;
+
+	fd = socket(domain, type, protocol);
+	if (fd == -1)
+		error(1, errno, "socket t");
+
+	do_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 1 << 21);
+
+	if (connect(fd, (void *) &cfg_dst_addr, cfg_alen))
+		error(1, errno, "connect");
+	return fd;
+}
+
+static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
+{
+	struct io_uring_cqe *cqe;
+	unsigned head;
+	int ret;
+
+	io_uring_for_each_cqe(ring, head, cqe)
+		return cqe;
+
+	ret = io_uring_wait_cqe(ring, &cqe);
+	if (ret)
+		error(1, ret, "wait cqe");
+	return cqe;
+}
+
+static void do_tx(int domain, int type, int protocol)
+{
+	unsigned long packets = 0;
+	unsigned long bytes = 0;
+	struct io_uring ring;
+	struct iovec iov;
+	uint64_t tstop;
+	int i, fd, ret;
+	int compl_cqes = 0;
+
+	fd = do_setup_tx(domain, type, protocol);
+
+	ret = io_uring_queue_init(512, &ring, IORING_SETUP_COOP_TASKRUN);
+	if (ret)
+		error(1, ret, "io_uring: queue init");
+
+	if (cfg_zc) {
+		struct io_uring_notification_slot b[1] = {{.tag = ZC_TAG}};
+
+		ret = io_uring_register_notifications(&ring, 1, b);
+		if (ret)
+			error(1, ret, "io_uring: tx ctx registration");
+	}
+	if (cfg_fixed_files) {
+		ret = io_uring_register_files(&ring, &fd, 1);
+		if (ret < 0)
+			error(1, ret, "io_uring: files registration");
+	}
+	if (cfg_reg_ringfd) {
+		ret = io_uring_register_ring_fd(&ring);
+		if (ret < 0)
+			error(1, ret, "io_uring: io_uring_register_ring_fd");
+	}
+
+	iov.iov_base = payload;
+	iov.iov_len = cfg_payload_len;
+
+	ret = io_uring_register_buffers(&ring, &iov, 1);
+	if (ret)
+		error(1, ret, "io_uring: buffer registration");
+
+	tstop = gettimeofday_ms() + cfg_runtime_ms;
+	do {
+		struct io_uring_sqe *sqe;
+		struct io_uring_cqe *cqe;
+		unsigned zc_flags = 0;
+		unsigned buf_idx = 0;
+		unsigned slot_idx = 0;
+		unsigned msg_flags = 0;
+
+		compl_cqes += cfg_flush ? cfg_nr_reqs : 0;
+		if (cfg_flush)
+			zc_flags |= IORING_RECVSEND_NOTIF_FLUSH;
+
+		for (i = 0; i < cfg_nr_reqs; i++) {
+			sqe = io_uring_get_sqe(&ring);
+
+			if (!cfg_zc)
+				io_uring_prep_send(sqe, fd, payload,
+						   cfg_payload_len, 0);
+			else if (cfg_fixed_buf)
+				io_uring_prep_sendzc_fixed(sqe, fd, payload,
+							   cfg_payload_len,
+							   msg_flags, slot_idx,
+							   zc_flags, buf_idx);
+			else
+				io_uring_prep_sendzc(sqe, fd, payload,
+						     cfg_payload_len, msg_flags,
+						     slot_idx, zc_flags);
+
+			sqe->user_data = 1;
+			if (cfg_fixed_files) {
+				sqe->fd = 0;
+				sqe->flags |= IOSQE_FIXED_FILE;
+			}
+		}
+
+		ret = io_uring_submit(&ring);
+		if (ret != cfg_nr_reqs)
+			error(1, ret, "submit");
+
+		for (i = 0; i < cfg_nr_reqs; i++) {
+			cqe = wait_cqe_fast(&ring);
+
+			if (cqe->user_data == ZC_TAG) {
+				compl_cqes--;
+				i--;
+			} else if (cqe->user_data != 1) {
+				error(1, cqe->user_data, "invalid user_data");
+			} else if (cqe->res > 0) {
+				packets++;
+				bytes += cqe->res;
+			} else if (cqe->res == -EAGAIN) {
+				/* request failed, don't flush */
+				if (cfg_flush)
+					compl_cqes--;
+			} else if (cqe->res == -ECONNREFUSED ||
+				   cqe->res == -ECONNRESET ||
+				   cqe->res == -EPIPE) {
+				fprintf(stderr, "Connection failure\n");
+				goto out_fail;
+			} else {
+				error(1, cqe->res, "send failed");
+			}
+
+			io_uring_cqe_seen(&ring, cqe);
+		}
+	} while (gettimeofday_ms() < tstop);
+
+out_fail:
+	shutdown(fd, SHUT_RDWR);
+	if (close(fd))
+		error(1, errno, "close");
+
+	fprintf(stderr, "tx=%lu (MB=%lu), tx/s=%lu (MB/s=%lu)\n",
+			packets, bytes >> 20,
+			packets / (cfg_runtime_ms / 1000),
+			(bytes >> 20) / (cfg_runtime_ms / 1000));
+
+	while (compl_cqes) {
+		struct io_uring_cqe *cqe = wait_cqe_fast(&ring);
+
+		io_uring_cqe_seen(&ring, cqe);
+		compl_cqes--;
+	}
+
+	if (cfg_zc) {
+		ret = io_uring_unregister_notifications(&ring);
+		if (ret)
+			error(1, ret, "io_uring: tx ctx unregistration");
+	}
+	io_uring_queue_exit(&ring);
+}
+
+static void do_test(int domain, int type, int protocol)
+{
+	int i;
+
+	for (i = 0; i < IP_MAXPACKET; i++)
+		payload[i] = 'a' + (i % 26);
+
+	do_tx(domain, type, protocol);
+}
+
+static void usage(const char *filepath)
+{
+	error(1, 0, "Usage: %s [-f] [-n<N>] [-z0] [-s<payload size>] "
+		    "(-4|-6) [-t<time s>] -D<dst_ip> udp", filepath);
+}
+
+static void parse_opts(int argc, char **argv)
+{
+	const int max_payload_len = sizeof(payload) -
+				    sizeof(struct ipv6hdr) -
+				    sizeof(struct tcphdr) -
+				    40 /* max tcp options */;
+	int c;
+	char *daddr = NULL;
+
+	if (argc <= 1)
+		usage(argv[0]);
+
+	cfg_payload_len = max_payload_len;
+
+	while ((c = getopt(argc, argv, "46D:p:s:t:n:fz:b:k")) != -1) {
+		switch (c) {
+		case '4':
+			if (cfg_family != PF_UNSPEC)
+				error(1, 0, "Pass one of -4 or -6");
+			cfg_family = PF_INET;
+			cfg_alen = sizeof(struct sockaddr_in);
+			break;
+		case '6':
+			if (cfg_family != PF_UNSPEC)
+				error(1, 0, "Pass one of -4 or -6");
+			cfg_family = PF_INET6;
+			cfg_alen = sizeof(struct sockaddr_in6);
+			break;
+		case 'D':
+			daddr = optarg;
+			break;
+		case 'p':
+			cfg_port = strtoul(optarg, NULL, 0);
+			break;
+		case 's':
+			cfg_payload_len = strtoul(optarg, NULL, 0);
+			break;
+		case 't':
+			cfg_runtime_ms = 200 + strtoul(optarg, NULL, 10) * 1000;
+			break;
+		case 'n':
+			cfg_nr_reqs = strtoul(optarg, NULL, 0);
+			break;
+		case 'f':
+			cfg_flush = 1;
+			break;
+		case 'z':
+			cfg_zc = strtoul(optarg, NULL, 0);
+			break;
+		case 'b':
+			cfg_fixed_buf = strtoul(optarg, NULL, 0);
+			break;
+		}
+	}
+
+	if (cfg_nr_reqs > MAX_SUBMIT_NR)
+		error(1, 0, "-n: submit batch nr exceeds max (%d)", MAX_SUBMIT_NR);
+	if (cfg_flush && !cfg_zc)
+		error(1, 0, "cfg_flush should be used with zc only");
+	if (cfg_payload_len > max_payload_len)
+		error(1, 0, "-s: payload exceeds max (%d)", max_payload_len);
+
+	setup_sockaddr(cfg_family, daddr, &cfg_dst_addr);
+
+	if (optind != argc - 1)
+		usage(argv[0]);
+}
+
+int main(int argc, char **argv)
+{
+	const char *cfg_test;
+
+	parse_opts(argc, argv);
+
+	cfg_test = argv[argc - 1];
+	if (!strcmp(cfg_test, "tcp"))
+		do_test(cfg_family, SOCK_STREAM, 0);
+	else if (!strcmp(cfg_test, "udp"))
+		do_test(cfg_family, SOCK_DGRAM, 0);
+	else
+		error(1, 0, "unknown cfg_test %s", cfg_test);
+
+	return 0;
+}
-- 
2.37.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH liburing v2 5/5] liburing: improve fallocate typecasting
  2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
                   ` (3 preceding siblings ...)
  2022-07-25 11:33 ` [PATCH liburing v2 4/5] examples: add a zerocopy send example Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
  2022-07-25 12:21   ` Ammar Faizi
  2022-07-25 12:17 ` [PATCH liburing v2 0/5] zerocopy send headers and tests Ammar Faizi
  2022-07-25 15:48 ` Jens Axboe
  6 siblings, 1 reply; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
  To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi

Don't double cast int -> ptr -> int in io_uring_prep_fallocate(), assign
len directly.

Signed-off-by: Pavel Begunkov <[email protected]>
---
 src/include/liburing.h | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/src/include/liburing.h b/src/include/liburing.h
index 20cd308..cffdabd 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -601,10 +601,9 @@ static inline void io_uring_prep_files_update(struct io_uring_sqe *sqe,
 static inline void io_uring_prep_fallocate(struct io_uring_sqe *sqe, int fd,
 					   int mode, off_t offset, off_t len)
 {
-
 	io_uring_prep_rw(IORING_OP_FALLOCATE, sqe, fd,
-			(const uintptr_t *) (unsigned long) len,
-			(unsigned int) mode, (__u64) offset);
+			0, (unsigned int) mode, (__u64) offset);
+	sqe->addr = (__u64) len;
 }
 
 static inline void io_uring_prep_openat(struct io_uring_sqe *sqe, int dfd,
-- 
2.37.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* Re: [PATCH liburing v2 0/5] zerocopy send headers and tests
  2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
                   ` (4 preceding siblings ...)
  2022-07-25 11:33 ` [PATCH liburing v2 5/5] liburing: improve fallocate typecasting Pavel Begunkov
@ 2022-07-25 12:17 ` Ammar Faizi
  2022-07-25 15:48 ` Jens Axboe
  6 siblings, 0 replies; 10+ messages in thread
From: Ammar Faizi @ 2022-07-25 12:17 UTC (permalink / raw)
  To: Pavel Begunkov, io-uring Mailing List; +Cc: Jens Axboe

On 7/25/22 6:33 PM, Pavel Begunkov wrote:
> Add zerocopy send headers, helpers and tests
> 
> v2:
> 	use T_EXIT_*
> 	fix ptr <-> int conversions for 32 bits arches
> 	slight renaming
> 	get rid of error() in the test
> 	add patch 5/5
> 
> Pavel Begunkov (5):
>    io_uring.h: sync with kernel for zc send and notifiers
>    liburing: add zc send and notif helpers
>    tests: add tests for zerocopy send and notifications
>    examples: add a zerocopy send example
>    liburing: improve fallocate typecasting

I can confirm it compiles fine on all architectures now. Nice.

-- 
Ammar Faizi

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [PATCH liburing v2 5/5] liburing: improve fallocate typecasting
  2022-07-25 11:33 ` [PATCH liburing v2 5/5] liburing: improve fallocate typecasting Pavel Begunkov
@ 2022-07-25 12:21   ` Ammar Faizi
  0 siblings, 0 replies; 10+ messages in thread
From: Ammar Faizi @ 2022-07-25 12:21 UTC (permalink / raw)
  To: Pavel Begunkov, io-uring Mailing List; +Cc: Jens Axboe

On 7/25/22 6:33 PM, Pavel Begunkov wrote:
> Don't double cast int -> ptr -> int in io_uring_prep_fallocate(), assign
> len directly.
> 
> Signed-off-by: Pavel Begunkov <[email protected]>
> ---
>   src/include/liburing.h | 5 ++---
>   1 file changed, 2 insertions(+), 3 deletions(-)
[...]
>   	io_uring_prep_rw(IORING_OP_FALLOCATE, sqe, fd,
> -			(const uintptr_t *) (unsigned long) len,
> -			(unsigned int) mode, (__u64) offset);
> +			0, (unsigned int) mode, (__u64) offset);

nit: instead of 0 it's better to use NULL. No?

> +	sqe->addr = (__u64) len;

This one looks simpler than a double cast.

Reviewed-by: Ammar Faizi <[email protected]>

-- 
Ammar Faizi


^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [PATCH liburing v2 2/5] liburing: add zc send and notif helpers
  2022-07-25 11:33 ` [PATCH liburing v2 2/5] liburing: add zc send and notif helpers Pavel Begunkov
@ 2022-07-25 12:35   ` Ammar Faizi
  0 siblings, 0 replies; 10+ messages in thread
From: Ammar Faizi @ 2022-07-25 12:35 UTC (permalink / raw)
  To: Pavel Begunkov, io-uring; +Cc: Jens Axboe

On 7/25/22 6:33 PM, Pavel Begunkov wrote:
> Add helpers for notification registration and preparing zerocopy send
> requests.
> 
> Signed-off-by: Pavel Begunkov <[email protected]>
> ---
>   src/include/liburing.h | 42 ++++++++++++++++++++++++++++++++++++++++++
>   src/liburing.map       |  2 ++
>   src/register.c         | 20 ++++++++++++++++++++
>   3 files changed, 64 insertions(+)
[...]
> +static inline void io_uring_prep_notif_update(struct io_uring_sqe *sqe,
> +					      __u64 new_tag, /* 0 to ignore */
> +					      unsigned offset, unsigned nr)
> +{
> +	io_uring_prep_rw(IORING_OP_FILES_UPDATE, sqe, -1, 0, nr,
> +			 (__u64)offset);
> +	sqe->addr = new_tag;
> +	sqe->ioprio = IORING_RSRC_UPDATE_NOTIF;
> +}

The same nit on there. But overall it looks good.

Reviewed-by: Ammar Faizi <[email protected]>

-- 
Ammar Faizi

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [PATCH liburing v2 0/5] zerocopy send headers and tests
  2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
                   ` (5 preceding siblings ...)
  2022-07-25 12:17 ` [PATCH liburing v2 0/5] zerocopy send headers and tests Ammar Faizi
@ 2022-07-25 15:48 ` Jens Axboe
  6 siblings, 0 replies; 10+ messages in thread
From: Jens Axboe @ 2022-07-25 15:48 UTC (permalink / raw)
  To: asml.silence, io-uring; +Cc: ammarfaizi2

On Mon, 25 Jul 2022 12:33:17 +0100, Pavel Begunkov wrote:
> Add zerocopy send headers, helpers and tests
> 
> v2:
> 	use T_EXIT_*
> 	fix ptr <-> int conversions for 32 bits arches
> 	slight renaming
> 	get rid of error() in the test
> 	add patch 5/5
> 
> [...]

Applied, thanks!

[1/5] io_uring.h: sync with kernel for zc send and notifiers
      (no commit info)
[2/5] liburing: add zc send and notif helpers
      (no commit info)
[3/5] tests: add tests for zerocopy send and notifications
      (no commit info)
[4/5] examples: add a zerocopy send example
      (no commit info)
[5/5] liburing: improve fallocate typecasting
      (no commit info)

Best regards,
-- 
Jens Axboe



^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2022-07-25 15:49 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 2/5] liburing: add zc send and notif helpers Pavel Begunkov
2022-07-25 12:35   ` Ammar Faizi
2022-07-25 11:33 ` [PATCH liburing v2 3/5] tests: add tests for zerocopy send and notifications Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 4/5] examples: add a zerocopy send example Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 5/5] liburing: improve fallocate typecasting Pavel Begunkov
2022-07-25 12:21   ` Ammar Faizi
2022-07-25 12:17 ` [PATCH liburing v2 0/5] zerocopy send headers and tests Ammar Faizi
2022-07-25 15:48 ` Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox