public inbox for [email protected]
 help / color / mirror / Atom feed
* [PATCH v3 liburing 0/7] liburing: multishot receive
@ 2022-06-30 16:49 Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 1/7] add t_create_socket_pair Dylan Yudaken
                   ` (7 more replies)
  0 siblings, 8 replies; 9+ messages in thread
From: Dylan Yudaken @ 2022-06-30 16:49 UTC (permalink / raw)
  To: io-uring; +Cc: axboe, asml.silence, kernel-team, Dylan Yudaken

This adds an API, tests and documentation for the multi shot receive functionality.

It also adds some testing for overflow paths in accept & poll which previously was
not tested.

Patch 1 adds a helper t_create_socket_pair which provides two connected sockets
without needing a hard coded port

Patch 2-5 adds multishot recv api, tests and docs
Patch 6,7 add tests for overflow in poll

v3:
 * remove TCP_NODELAY stuff from patch 1 as it was useless
 * fix return codes from recv-multishot test
 * use ioprio field for multishot

v2:
 * added a multishot recv api rather than expecting applications to use the
   flags directly
 * fixed the io_uring.h include
 * added overflow tests for recv multishot
 * added tests for poll/accept

Dylan Yudaken (7):
  add t_create_socket_pair
  add IORING_RECV_MULTISHOT to io_uring.h
  add io_uring_prep_(recv|recvmsg)_multishot
  add IORING_RECV_MULTISHOT docs
  add recv-multishot test
  add poll overflow test
  add accept with overflow test

 man/io_uring_prep_recv.3              |  22 ++
 man/io_uring_prep_recv_multishot.3    |   1 +
 man/io_uring_prep_recvmsg.3           |  20 ++
 man/io_uring_prep_recvmsg_multishot.3 |   1 +
 src/include/liburing.h                |  16 ++
 src/include/liburing/io_uring.h       |   5 +
 test/Makefile                         |   2 +
 test/accept.c                         | 129 +++++++---
 test/helpers.c                        |  90 +++++++
 test/helpers.h                        |   5 +
 test/poll-mshot-overflow.c            | 128 ++++++++++
 test/recv-multishot.c                 | 343 ++++++++++++++++++++++++++
 12 files changed, 734 insertions(+), 28 deletions(-)
 create mode 120000 man/io_uring_prep_recv_multishot.3
 create mode 120000 man/io_uring_prep_recvmsg_multishot.3
 create mode 100644 test/poll-mshot-overflow.c
 create mode 100644 test/recv-multishot.c


base-commit: 6c90ac3c9fb874d406656c9e7a68805c83a055a0
-- 
2.30.2


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v3 liburing 1/7] add t_create_socket_pair
  2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
@ 2022-06-30 16:49 ` Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 2/7] add IORING_RECV_MULTISHOT to io_uring.h Dylan Yudaken
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dylan Yudaken @ 2022-06-30 16:49 UTC (permalink / raw)
  To: io-uring; +Cc: axboe, asml.silence, kernel-team, Dylan Yudaken

This is a useful tool for making networking tests, and does not require a
hard coded port which is useful

Signed-off-by: Dylan Yudaken <[email protected]>
---
 test/helpers.c | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++
 test/helpers.h |  5 +++
 2 files changed, 95 insertions(+)

diff --git a/test/helpers.c b/test/helpers.c
index 491822e..3660cc0 100644
--- a/test/helpers.c
+++ b/test/helpers.c
@@ -10,6 +10,10 @@
 #include <unistd.h>
 #include <sys/types.h>
 
+#include <arpa/inet.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+
 #include "helpers.h"
 #include "liburing.h"
 
@@ -143,3 +147,89 @@ enum t_setup_ret t_register_buffers(struct io_uring *ring,
 	fprintf(stderr, "buffer register failed: %s\n", strerror(-ret));
 	return ret;
 }
+
+int t_create_socket_pair(int fd[2], bool stream)
+{
+	int ret;
+	int type = stream ? SOCK_STREAM : SOCK_DGRAM;
+	int val;
+	struct sockaddr_in serv_addr;
+	struct sockaddr *paddr;
+	size_t paddrlen;
+
+	type |= SOCK_CLOEXEC;
+	fd[0] = socket(AF_INET, type, 0);
+	if (fd[0] < 0)
+		return errno;
+	fd[1] = socket(AF_INET, type, 0);
+	if (fd[1] < 0) {
+		ret = errno;
+		close(fd[0]);
+		return ret;
+	}
+
+	val = 1;
+	if (setsockopt(fd[0], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)))
+		goto errno_cleanup;
+
+	memset(&serv_addr, 0, sizeof(serv_addr));
+	serv_addr.sin_family = AF_INET;
+	serv_addr.sin_port = 0;
+	inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr);
+
+	paddr = (struct sockaddr *)&serv_addr;
+	paddrlen = sizeof(serv_addr);
+
+	if (bind(fd[0], paddr, paddrlen)) {
+		fprintf(stderr, "bind failed\n");
+		goto errno_cleanup;
+	}
+
+	if (stream && listen(fd[0], 16)) {
+		fprintf(stderr, "listen failed\n");
+		goto errno_cleanup;
+	}
+
+	if (getsockname(fd[0], &serv_addr, (socklen_t *)&paddrlen)) {
+		fprintf(stderr, "getsockname failed\n");
+		goto errno_cleanup;
+	}
+	inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr);
+
+	if (connect(fd[1], &serv_addr, paddrlen)) {
+		fprintf(stderr, "connect failed\n");
+		goto errno_cleanup;
+	}
+
+	if (!stream) {
+		/* connect the other udp side */
+		if (getsockname(fd[1], &serv_addr, (socklen_t *)&paddrlen)) {
+			fprintf(stderr, "getsockname failed\n");
+			goto errno_cleanup;
+		}
+		inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr);
+
+		if (connect(fd[0], &serv_addr, paddrlen)) {
+			fprintf(stderr, "connect failed\n");
+			goto errno_cleanup;
+		}
+		return 0;
+	}
+
+	/* for stream case we must accept and cleanup the listen socket */
+
+	ret = accept(fd[0], NULL, NULL);
+	if (ret < 0)
+		goto errno_cleanup;
+
+	close(fd[0]);
+	fd[0] = ret;
+
+	return 0;
+
+errno_cleanup:
+	ret = errno;
+	close(fd[0]);
+	close(fd[1]);
+	return ret;
+}
diff --git a/test/helpers.h b/test/helpers.h
index 3179687..fbfd7d1 100644
--- a/test/helpers.h
+++ b/test/helpers.h
@@ -59,6 +59,11 @@ void t_create_file_pattern(const char *file, size_t size, char pattern);
  */
 struct iovec *t_create_buffers(size_t buf_num, size_t buf_size);
 
+/*
+ * Helper for creating connected socket pairs
+ */
+int t_create_socket_pair(int fd[2], bool stream);
+
 /*
  * Helper for setting up a ring and checking for user privs
  */
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v3 liburing 2/7] add IORING_RECV_MULTISHOT to io_uring.h
  2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 1/7] add t_create_socket_pair Dylan Yudaken
@ 2022-06-30 16:49 ` Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 3/7] add io_uring_prep_(recv|recvmsg)_multishot Dylan Yudaken
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dylan Yudaken @ 2022-06-30 16:49 UTC (permalink / raw)
  To: io-uring; +Cc: axboe, asml.silence, kernel-team, Dylan Yudaken

copy relevant part from include/uapi/linux/io_uring.h from
for-5.20/io_uring branch

Signed-off-by: Dylan Yudaken <[email protected]>
---
 src/include/liburing/io_uring.h | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index 0fd1f98..de42c54 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -244,8 +244,13 @@ enum io_uring_op {
  *				or receive and arm poll if that yields an
  *				-EAGAIN result, arm poll upfront and skip
  *				the initial transfer attempt.
+ *
+ * IORING_RECV_MULTISHOT	Multishot recv. Sets IORING_CQE_F_MORE if
+ *				the handler will continue to report
+ *				CQEs on behalf of the same SQE.
  */
 #define IORING_RECVSEND_POLL_FIRST	(1U << 0)
+#define IORING_RECV_MULTISHOT	(1U << 1)
 
 /*
  * accept flags stored in sqe->ioprio
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v3 liburing 3/7] add io_uring_prep_(recv|recvmsg)_multishot
  2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 1/7] add t_create_socket_pair Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 2/7] add IORING_RECV_MULTISHOT to io_uring.h Dylan Yudaken
@ 2022-06-30 16:49 ` Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 4/7] add IORING_RECV_MULTISHOT docs Dylan Yudaken
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dylan Yudaken @ 2022-06-30 16:49 UTC (permalink / raw)
  To: io-uring; +Cc: axboe, asml.silence, kernel-team, Dylan Yudaken

Add a nice API for multishot recv and recvmsg.

Signed-off-by: Dylan Yudaken <[email protected]>
---
 src/include/liburing.h | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/src/include/liburing.h b/src/include/liburing.h
index bb2fb87..b11d90e 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -416,6 +416,14 @@ static inline void io_uring_prep_recvmsg(struct io_uring_sqe *sqe, int fd,
 	sqe->msg_flags = flags;
 }
 
+static inline void io_uring_prep_recvmsg_multishot(struct io_uring_sqe *sqe,
+						   int fd, struct msghdr *msg,
+						   unsigned flags)
+{
+	io_uring_prep_recvmsg(sqe, fd, msg, flags);
+	sqe->ioprio |= IORING_RECV_MULTISHOT;
+}
+
 static inline void io_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd,
 					 const struct msghdr *msg,
 					 unsigned flags)
@@ -674,6 +682,14 @@ static inline void io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd,
 	sqe->msg_flags = (__u32) flags;
 }
 
+static inline void io_uring_prep_recv_multishot(struct io_uring_sqe *sqe,
+						int sockfd, void *buf,
+						size_t len, int flags)
+{
+	io_uring_prep_recv(sqe, sockfd, buf, len, flags);
+	sqe->ioprio |= IORING_RECV_MULTISHOT;
+}
+
 static inline void io_uring_prep_openat2(struct io_uring_sqe *sqe, int dfd,
 					const char *path, struct open_how *how)
 {
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v3 liburing 4/7] add IORING_RECV_MULTISHOT docs
  2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
                   ` (2 preceding siblings ...)
  2022-06-30 16:49 ` [PATCH v3 liburing 3/7] add io_uring_prep_(recv|recvmsg)_multishot Dylan Yudaken
@ 2022-06-30 16:49 ` Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 5/7] add recv-multishot test Dylan Yudaken
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dylan Yudaken @ 2022-06-30 16:49 UTC (permalink / raw)
  To: io-uring; +Cc: axboe, asml.silence, kernel-team, Dylan Yudaken

add appropriate docs to io_uring_prep_recvmsg/io_uring_prep_recv man pages

Signed-off-by: Dylan Yudaken <[email protected]>
---
 man/io_uring_prep_recv.3              | 22 ++++++++++++++++++++++
 man/io_uring_prep_recv_multishot.3    |  1 +
 man/io_uring_prep_recvmsg.3           | 20 ++++++++++++++++++++
 man/io_uring_prep_recvmsg_multishot.3 |  1 +
 4 files changed, 44 insertions(+)
 create mode 120000 man/io_uring_prep_recv_multishot.3
 create mode 120000 man/io_uring_prep_recvmsg_multishot.3

diff --git a/man/io_uring_prep_recv.3 b/man/io_uring_prep_recv.3
index 993e331..bcd3145 100644
--- a/man/io_uring_prep_recv.3
+++ b/man/io_uring_prep_recv.3
@@ -14,6 +14,12 @@ io_uring_prep_recv \- prepare a recv request
 .BI "                        void *" buf ","
 .BI "                        size_t " len ","
 .BI "                        int " flags ");"
+.PP
+.BI "void io_uring_prep_recv_multishot(struct io_uring_sqe *" sqe ","
+.BI "                                  int " sockfd ","
+.BI "                                  void *" buf ","
+.BI "                                  size_t " len ","
+.BI "                                  int " flags ");"
 .fi
 .SH DESCRIPTION
 .PP
@@ -36,6 +42,22 @@ This function prepares an async
 request. See that man page for details on the arguments specified to this
 prep helper.
 
+The multishot version allows the application to issue a single receive request,
+which repeatedly posts a CQE when data is available. It requires length to be 0
+, the
+.B IOSQE_BUFFER_SELECT
+flag to be set and no
+.B MSG_WAITALL
+flag to be set.
+Therefore each CQE will take a buffer out of a provided buffer pool for receiving.
+The application should check the flags of each CQE, regardless of it's result.
+If a posted CQE does not have the
+.B IORING_CQE_F_MORE
+flag set then the multishot receive will be done and the application should issue a
+new request.
+Multishot variants are available since kernel 5.20.
+
+
 After calling this function, additional io_uring internal modifier flags
 may be set in the SQE
 .I off
diff --git a/man/io_uring_prep_recv_multishot.3 b/man/io_uring_prep_recv_multishot.3
new file mode 120000
index 0000000..71fe277
--- /dev/null
+++ b/man/io_uring_prep_recv_multishot.3
@@ -0,0 +1 @@
+io_uring_prep_recv.3
\ No newline at end of file
diff --git a/man/io_uring_prep_recvmsg.3 b/man/io_uring_prep_recvmsg.3
index 8c49411..24c68ce 100644
--- a/man/io_uring_prep_recvmsg.3
+++ b/man/io_uring_prep_recvmsg.3
@@ -15,6 +15,11 @@ io_uring_prep_recvmsg \- prepare a recvmsg request
 .BI "                           int " fd ","
 .BI "                           struct msghdr *" msg ","
 .BI "                           unsigned " flags ");"
+.PP
+.BI "void io_uring_prep_recvmsg_multishot(struct io_uring_sqe *" sqe ","
+.BI "                                     int " fd ","
+.BI "                                     struct msghdr *" msg ","
+.BI "                                     unsigned " flags ");"
 .fi
 .SH DESCRIPTION
 .PP
@@ -37,6 +42,21 @@ This function prepares an async
 request. See that man page for details on the arguments specified to this
 prep helper.
 
+The multishot version allows the application to issue a single receive request,
+which repeatedly posts a CQE when data is available. It requires length to be 0
+, the
+.B IOSQE_BUFFER_SELECT
+flag to be set and no
+.B MSG_WAITALL
+flag to be set.
+Therefore each CQE will take a buffer out of a provided buffer pool for receiving.
+The application should check the flags of each CQE, regardless of it's result.
+If a posted CQE does not have the
+.B IORING_CQE_F_MORE
+flag set then the multishot receive will be done and the application should issue a
+new request.
+Multishot variants are available since kernel 5.20.
+
 After calling this function, additional io_uring internal modifier flags
 may be set in the SQE
 .I off
diff --git a/man/io_uring_prep_recvmsg_multishot.3 b/man/io_uring_prep_recvmsg_multishot.3
new file mode 120000
index 0000000..cd9566f
--- /dev/null
+++ b/man/io_uring_prep_recvmsg_multishot.3
@@ -0,0 +1 @@
+io_uring_prep_recvmsg.3
\ No newline at end of file
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v3 liburing 5/7] add recv-multishot test
  2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
                   ` (3 preceding siblings ...)
  2022-06-30 16:49 ` [PATCH v3 liburing 4/7] add IORING_RECV_MULTISHOT docs Dylan Yudaken
@ 2022-06-30 16:49 ` Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 6/7] add poll overflow test Dylan Yudaken
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dylan Yudaken @ 2022-06-30 16:49 UTC (permalink / raw)
  To: io-uring; +Cc: axboe, asml.silence, kernel-team, Dylan Yudaken

add a test for multishot receive functionality

Signed-off-by: Dylan Yudaken <[email protected]>
---
 test/Makefile         |   1 +
 test/recv-multishot.c | 343 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 344 insertions(+)
 create mode 100644 test/recv-multishot.c

diff --git a/test/Makefile b/test/Makefile
index ad47d2d..e718583 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -125,6 +125,7 @@ test_srcs := \
 	read-write.c \
 	recv-msgall.c \
 	recv-msgall-stream.c \
+	recv-multishot.c \
 	register-restrictions.c \
 	rename.c \
 	ringbuf-read.c \
diff --git a/test/recv-multishot.c b/test/recv-multishot.c
new file mode 100644
index 0000000..e91f585
--- /dev/null
+++ b/test/recv-multishot.c
@@ -0,0 +1,343 @@
+// SPDX-License-Identifier: MIT
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <pthread.h>
+
+#include "liburing.h"
+#include "helpers.h"
+
+
+enum early_error_t {
+	ERROR_NONE  = 0,
+	ERROR_NOT_ENOUGH_BUFFERS,
+	ERROR_EARLY_CLOSE_SENDER,
+	ERROR_EARLY_CLOSE_RECEIVER,
+	ERROR_EARLY_OVERFLOW,
+	ERROR_EARLY_LAST
+};
+
+struct args {
+	bool recvmsg;
+	bool stream;
+	bool wait_each;
+	enum early_error_t early_error;
+};
+
+static int test(struct args *args)
+{
+	int const N = 8;
+	int const N_BUFFS = N * 64;
+	int const N_CQE_OVERFLOW = 4;
+	int const min_cqes = 2;
+	struct io_uring ring;
+	struct io_uring_cqe *cqe;
+	struct io_uring_sqe *sqe;
+	int fds[2], ret, i, j, total_sent_bytes = 0, total_recv_bytes = 0;
+	int send_buff[256];
+	int *recv_buffs[N_BUFFS];
+	int *at;
+	struct io_uring_cqe recv_cqe[N_BUFFS];
+	int recv_cqes = 0;
+	bool early_error = false;
+	bool early_error_started = false;
+	struct msghdr msg = { };
+	struct __kernel_timespec timeout = {
+		.tv_sec = 1,
+	};
+
+
+	memset(recv_buffs, 0, sizeof(recv_buffs));
+
+	if (args->early_error == ERROR_EARLY_OVERFLOW) {
+		struct io_uring_params params = {
+			.flags = IORING_SETUP_CQSIZE,
+			.cq_entries = N_CQE_OVERFLOW
+		};
+
+		ret = io_uring_queue_init_params(N_CQE_OVERFLOW, &ring, &params);
+	} else {
+		ret = io_uring_queue_init(32, &ring, 0);
+	}
+	if (ret) {
+		fprintf(stderr, "queue init failed: %d\n", ret);
+		return ret;
+	}
+
+	ret = t_create_socket_pair(fds, args->stream);
+	if (ret) {
+		fprintf(stderr, "t_create_socket_pair failed: %d\n", ret);
+		return ret;
+	}
+
+	for (i = 0; i < ARRAY_SIZE(send_buff); i++)
+		send_buff[i] = i;
+
+	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
+		/* prepare some different sized buffers */
+		int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N * sizeof(int);
+
+		recv_buffs[i] = malloc(sizeof(*at) * buffer_size);
+
+		if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
+			continue;
+
+		sqe = io_uring_get_sqe(&ring);
+		io_uring_prep_provide_buffers(sqe, recv_buffs[i],
+					buffer_size * sizeof(*recv_buffs[i]), 1, 7, i);
+		if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) != 0) {
+			fprintf(stderr, "provide buffers failed: %d\n", ret);
+			ret = -1;
+			goto cleanup;
+		}
+		io_uring_cqe_seen(&ring, cqe);
+	}
+
+	sqe = io_uring_get_sqe(&ring);
+	if (args->recvmsg) {
+		memset(&msg, 0, sizeof(msg));
+		msg.msg_namelen = sizeof(struct sockaddr_in);
+		io_uring_prep_recvmsg_multishot(sqe, fds[0], &msg, 0);
+	} else {
+		io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
+	}
+	sqe->flags |= IOSQE_BUFFER_SELECT;
+	sqe->buf_group = 7;
+	io_uring_sqe_set_data64(sqe, 1234);
+	io_uring_submit(&ring);
+
+	at = &send_buff[0];
+	total_sent_bytes = 0;
+	for (i = 0; i < N; i++) {
+		int to_send = sizeof(*at) * (i+1);
+
+		total_sent_bytes += to_send;
+		if (send(fds[1], at, to_send, 0) != to_send) {
+			if (early_error_started)
+				break;
+			fprintf(stderr, "send failed %d\n", errno);
+			ret = -1;
+			goto cleanup;
+		}
+
+		if (i == 2) {
+			if (args->early_error == ERROR_EARLY_CLOSE_RECEIVER) {
+				/* allow previous sends to complete */
+				usleep(1000);
+
+				sqe = io_uring_get_sqe(&ring);
+				io_uring_prep_recv(sqe, fds[0], NULL, 0, 0);
+				io_uring_prep_cancel64(sqe, 1234, 0);
+				sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+				io_uring_submit(&ring);
+				early_error_started = true;
+			}
+			if (args->early_error == ERROR_EARLY_CLOSE_SENDER) {
+				early_error_started = true;
+				shutdown(fds[1], SHUT_RDWR);
+				close(fds[1]);
+			}
+		}
+		at += (i+1);
+
+		if (args->wait_each) {
+			ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
+			if (ret) {
+				fprintf(stderr, "wait_each failed: %d\n", ret);
+				ret = -1;
+				goto cleanup;
+			}
+			while (io_uring_peek_cqe(&ring, &cqe) == 0) {
+				recv_cqe[recv_cqes++] = *cqe;
+				if (cqe->flags & IORING_CQE_F_MORE) {
+					io_uring_cqe_seen(&ring, cqe);
+				} else {
+					early_error = true;
+					io_uring_cqe_seen(&ring, cqe);
+				}
+			}
+			if (early_error)
+				break;
+		}
+	}
+
+	close(fds[1]);
+
+	/* allow sends to finish */
+	usleep(1000);
+
+	if ((args->stream && !early_error) || recv_cqes < min_cqes) {
+		ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
+		if (ret && ret != -ETIME) {
+			fprintf(stderr, "wait final failed: %d\n", ret);
+			ret = -1;
+			goto cleanup;
+		}
+	}
+
+	while (io_uring_peek_cqe(&ring, &cqe) == 0) {
+		recv_cqe[recv_cqes++] = *cqe;
+		io_uring_cqe_seen(&ring, cqe);
+	}
+
+	ret = -1;
+	at = &send_buff[0];
+	if (recv_cqes < min_cqes) {
+		fprintf(stderr, "not enough cqes: have=%d vs %d\n", recv_cqes, min_cqes);
+		goto cleanup;
+	}
+	for (i = 0; i < recv_cqes; i++) {
+		cqe = &recv_cqe[i];
+
+		bool const is_last = i == recv_cqes - 1;
+
+		bool const should_be_last =
+			(cqe->res <= 0) ||
+			(args->stream && is_last) ||
+			(args->early_error == ERROR_EARLY_OVERFLOW &&
+			 !args->wait_each && i == N_CQE_OVERFLOW);
+		int *this_recv;
+
+
+		if (should_be_last) {
+			if (!is_last) {
+				fprintf(stderr, "not last cqe had error %d\n", i);
+				goto cleanup;
+			}
+
+			switch (args->early_error) {
+			case ERROR_NOT_ENOUGH_BUFFERS:
+				if (cqe->res != -ENOBUFS) {
+					fprintf(stderr,
+						"ERROR_NOT_ENOUGH_BUFFERS: res %d\n", cqe->res);
+					goto cleanup;
+				}
+				break;
+			case ERROR_EARLY_OVERFLOW:
+				if (cqe->res < 0) {
+					fprintf(stderr,
+						"ERROR_EARLY_OVERFLOW: res %d\n", cqe->res);
+					goto cleanup;
+				}
+				break;
+			case ERROR_EARLY_CLOSE_RECEIVER:
+				if (cqe->res != -ECANCELED) {
+					fprintf(stderr,
+						"ERROR_EARLY_CLOSE_RECEIVER: res %d\n", cqe->res);
+					goto cleanup;
+				}
+				break;
+			case ERROR_NONE:
+			case ERROR_EARLY_CLOSE_SENDER:
+				if (cqe->res != 0) {
+					fprintf(stderr, "early error: res %d\n", cqe->res);
+					goto cleanup;
+				}
+				break;
+			case ERROR_EARLY_LAST:
+				fprintf(stderr, "bad error_early\n");
+				goto cleanup;
+			};
+
+			if (cqe->res <= 0 && cqe->flags & IORING_CQE_F_BUFFER) {
+				fprintf(stderr, "final BUFFER flag set\n");
+				goto cleanup;
+			}
+
+			if (cqe->flags & IORING_CQE_F_MORE) {
+				fprintf(stderr, "final MORE flag set\n");
+				goto cleanup;
+			}
+
+			if (cqe->res <= 0)
+				continue;
+		} else {
+			if (!(cqe->flags & IORING_CQE_F_MORE)) {
+				fprintf(stderr, "MORE flag not set\n");
+				goto cleanup;
+			}
+		}
+
+		if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
+			fprintf(stderr, "BUFFER flag not set\n");
+			goto cleanup;
+		}
+
+		total_recv_bytes += cqe->res;
+		if (cqe->res % 4 != 0) {
+			/*
+			 * doesn't seem to happen in practice, would need some
+			 * work to remove this requirement
+			 */
+			fprintf(stderr, "unexpectedly aligned buffer cqe->res=%d\n", cqe->res);
+			goto cleanup;
+		}
+
+		/* check buffer arrived in order (for tcp) */
+		this_recv = recv_buffs[cqe->flags >> 16];
+		for (j = 0; args->stream && j < cqe->res / 4; j++) {
+			int sent = *at++;
+			int recv = *this_recv++;
+
+			if (sent != recv) {
+				fprintf(stderr, "recv=%d sent=%d\n", recv, sent);
+				goto cleanup;
+			}
+		}
+	}
+
+	if (args->early_error == ERROR_NONE && total_recv_bytes < total_sent_bytes) {
+		fprintf(stderr,
+			"missing recv: recv=%d sent=%d\n", total_recv_bytes, total_sent_bytes);
+		goto cleanup;
+	}
+
+	/* check the final one */
+	cqe = &recv_cqe[recv_cqes-1];
+
+	ret = 0;
+cleanup:
+	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
+		free(recv_buffs[i]);
+	close(fds[0]);
+	close(fds[1]);
+	io_uring_queue_exit(&ring);
+
+	return ret;
+}
+
+int main(int argc, char *argv[])
+{
+	int ret;
+	int loop;
+	int early_error = 0;
+
+	if (argc > 1)
+		return T_EXIT_SKIP;
+
+	for (loop = 0; loop < 7; loop++) {
+		struct args a = {
+			.stream = loop & 0x01,
+			.recvmsg = loop & 0x02,
+			.wait_each = loop & 0x4,
+		};
+		for (early_error = 0; early_error < ERROR_EARLY_LAST; early_error++) {
+			a.early_error = (enum early_error_t)early_error;
+			ret = test(&a);
+			if (ret) {
+				fprintf(stderr,
+					"test stream=%d recvmsg=%d wait_each=%d early_error=%d failed\n",
+					a.stream, a.recvmsg, a.wait_each, a.early_error);
+				return T_EXIT_FAIL;
+			}
+		}
+	}
+
+	return T_EXIT_PASS;
+}
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v3 liburing 6/7] add poll overflow test
  2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
                   ` (4 preceding siblings ...)
  2022-06-30 16:49 ` [PATCH v3 liburing 5/7] add recv-multishot test Dylan Yudaken
@ 2022-06-30 16:49 ` Dylan Yudaken
  2022-06-30 16:49 ` [PATCH v3 liburing 7/7] add accept with " Dylan Yudaken
  2022-06-30 20:32 ` [PATCH v3 liburing 0/7] liburing: multishot receive Jens Axboe
  7 siblings, 0 replies; 9+ messages in thread
From: Dylan Yudaken @ 2022-06-30 16:49 UTC (permalink / raw)
  To: io-uring; +Cc: axboe, asml.silence, kernel-team, Dylan Yudaken

Add a test that when CQE overflows, multishot poll doesn't give
out of order completions.

Signed-off-by: Dylan Yudaken <[email protected]>
---
 test/Makefile              |   1 +
 test/poll-mshot-overflow.c | 128 +++++++++++++++++++++++++++++++++++++
 2 files changed, 129 insertions(+)
 create mode 100644 test/poll-mshot-overflow.c

diff --git a/test/Makefile b/test/Makefile
index e718583..9590e1e 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -117,6 +117,7 @@ test_srcs := \
 	poll-link.c \
 	poll-many.c \
 	poll-mshot-update.c \
+	poll-mshot-overflow.c \
 	poll-ring.c \
 	poll-v-poll.c \
 	pollfree.c \
diff --git a/test/poll-mshot-overflow.c b/test/poll-mshot-overflow.c
new file mode 100644
index 0000000..078df04
--- /dev/null
+++ b/test/poll-mshot-overflow.c
@@ -0,0 +1,128 @@
+// SPDX-License-Identifier: MIT
+
+#include <errno.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <poll.h>
+#include <sys/wait.h>
+
+#include "liburing.h"
+
+int check_final_cqe(struct io_uring *ring)
+{
+	struct io_uring_cqe *cqe;
+	int count = 0;
+	bool signalled_no_more = false;
+
+	while (!io_uring_peek_cqe(ring, &cqe)) {
+		if (cqe->user_data == 1) {
+			count++;
+			if (signalled_no_more) {
+				fprintf(stderr, "signalled no more!\n");
+				return 1;
+			}
+			if (!(cqe->flags & IORING_CQE_F_MORE))
+				signalled_no_more = true;
+		} else if (cqe->user_data != 3) {
+			fprintf(stderr, "%d: got unexpected %d\n", count, (int)cqe->user_data);
+			return 1;
+		}
+		io_uring_cqe_seen(ring, cqe);
+	}
+
+	if (!count) {
+		fprintf(stderr, "no cqe\n");
+		return 1;
+	}
+
+	return 0;
+}
+
+int main(int argc, char *argv[])
+{
+	struct io_uring_cqe *cqe;
+	struct io_uring_sqe *sqe;
+	struct io_uring ring;
+	int pipe1[2];
+	int ret, i;
+
+	if (argc > 1)
+		return 0;
+
+	if (pipe(pipe1) != 0) {
+		perror("pipe");
+		return 1;
+	}
+
+	struct io_uring_params params = {
+		.flags = IORING_SETUP_CQSIZE,
+		.cq_entries = 2
+	};
+
+	ret = io_uring_queue_init_params(2, &ring, &params);
+	if (ret) {
+		fprintf(stderr, "ring setup failed: %d\n", ret);
+		return 1;
+	}
+
+	sqe = io_uring_get_sqe(&ring);
+	if (!sqe) {
+		fprintf(stderr, "get sqe failed\n");
+		return 1;
+	}
+	io_uring_prep_poll_multishot(sqe, pipe1[0], POLLIN);
+	io_uring_sqe_set_data64(sqe, 1);
+
+	if (io_uring_cq_ready(&ring)) {
+		fprintf(stderr, "unexpected cqe\n");
+		return 1;
+	}
+
+	for (i = 0; i < 2; i++) {
+		sqe = io_uring_get_sqe(&ring);
+		io_uring_prep_nop(sqe);
+		io_uring_sqe_set_data64(sqe, 2);
+		io_uring_submit(&ring);
+	}
+
+	do {
+		errno = 0;
+		ret = write(pipe1[1], "foo", 3);
+	} while (ret == -1 && errno == EINTR);
+
+	if (ret <= 0) {
+		fprintf(stderr, "write failed: %d\n", errno);
+		return 1;
+	}
+
+	/* should have 2 cqe + 1 overflow now, so take out two cqes */
+	for (i = 0; i < 2; i++) {
+		if (io_uring_peek_cqe(&ring, &cqe)) {
+			fprintf(stderr, "unexpectedly no cqe\n");
+			return 1;
+		}
+		if (cqe->user_data != 2) {
+			fprintf(stderr, "unexpected user_data\n");
+			return 1;
+		}
+		io_uring_cqe_seen(&ring, cqe);
+	}
+
+	/* now remove the poll */
+	sqe = io_uring_get_sqe(&ring);
+	io_uring_prep_poll_remove(sqe, 1);
+	io_uring_sqe_set_data64(sqe, 3);
+	ret = io_uring_submit(&ring);
+
+	if (ret != 1) {
+		fprintf(stderr, "bad poll remove\n");
+		return 1;
+	}
+
+	ret = check_final_cqe(&ring);
+
+	return ret;
+}
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [PATCH v3 liburing 7/7] add accept with overflow test
  2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
                   ` (5 preceding siblings ...)
  2022-06-30 16:49 ` [PATCH v3 liburing 6/7] add poll overflow test Dylan Yudaken
@ 2022-06-30 16:49 ` Dylan Yudaken
  2022-06-30 20:32 ` [PATCH v3 liburing 0/7] liburing: multishot receive Jens Axboe
  7 siblings, 0 replies; 9+ messages in thread
From: Dylan Yudaken @ 2022-06-30 16:49 UTC (permalink / raw)
  To: io-uring; +Cc: axboe, asml.silence, kernel-team, Dylan Yudaken

add test to exercise the overflow codepath for multishot accept.
this doesn't actually fail previously, but does at least exerceise
the codepath and ensure that some invariants hold wrt flags and
IORING_CQE_F_MORE.

Signed-off-by: Dylan Yudaken <[email protected]>
---
 test/accept.c | 129 +++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 101 insertions(+), 28 deletions(-)

diff --git a/test/accept.c b/test/accept.c
index 77e3ebc..0463173 100644
--- a/test/accept.c
+++ b/test/accept.c
@@ -24,6 +24,9 @@
 #include "liburing.h"
 
 #define MAX_FDS 32
+#define NOP_USER_DATA (1LLU << 50)
+#define INITIAL_USER_DATA 1000
+
 static int no_accept;
 static int no_accept_multi;
 
@@ -39,6 +42,7 @@ struct accept_test_args {
 	bool queue_accept_before_connect;
 	bool multishot;
 	int extra_loops;
+	bool overflow;
 };
 
 static void close_fds(int fds[], int nr)
@@ -86,6 +90,24 @@ static void queue_recv(struct io_uring *ring, int fd, bool fixed)
 		sqe->flags |= IOSQE_FIXED_FILE;
 }
 
+static void queue_accept_multishot(struct io_uring *ring, int fd,
+				   int idx, bool fixed)
+{
+	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
+	int ret;
+
+	if (fixed)
+		io_uring_prep_multishot_accept_direct(sqe, fd,
+						NULL, NULL,
+						0);
+	else
+		io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
+
+	io_uring_sqe_set_data64(sqe, idx);
+	ret = io_uring_submit(ring);
+	assert(ret != -1);
+}
+
 static void queue_accept_conn(struct io_uring *ring, int fd,
 			      struct accept_test_args args)
 {
@@ -93,40 +115,51 @@ static void queue_accept_conn(struct io_uring *ring, int fd,
 	int ret;
 	int fixed_idx = args.fixed ? 0 : -1;
 	int count = 1 + args.extra_loops;
-	bool multishot = args.multishot;
+
+	if (args.multishot) {
+		queue_accept_multishot(ring, fd, INITIAL_USER_DATA, args.fixed);
+		return;
+	}
 
 	while (count--) {
 		sqe = io_uring_get_sqe(ring);
 		if (fixed_idx < 0) {
-			if (!multishot)
-				io_uring_prep_accept(sqe, fd, NULL, NULL, 0);
-			else
-				io_uring_prep_multishot_accept(sqe, fd, NULL,
-							       NULL, 0);
+			io_uring_prep_accept(sqe, fd, NULL, NULL, 0);
 		} else {
-			if (!multishot)
-				io_uring_prep_accept_direct(sqe, fd, NULL, NULL,
-							    0, fixed_idx);
-			else
-				io_uring_prep_multishot_accept_direct(sqe, fd,
-								      NULL, NULL,
-								      0);
+			io_uring_prep_accept_direct(sqe, fd, NULL, NULL,
+						    0, fixed_idx);
 		}
-
 		ret = io_uring_submit(ring);
 		assert(ret != -1);
 	}
 }
 
-static int accept_conn(struct io_uring *ring, int fixed_idx, bool multishot)
+static int accept_conn(struct io_uring *ring, int fixed_idx, int *multishot, int fd)
 {
-	struct io_uring_cqe *cqe;
+	struct io_uring_cqe *pcqe;
+	struct io_uring_cqe cqe;
 	int ret;
 
-	ret = io_uring_wait_cqe(ring, &cqe);
-	assert(!ret);
-	ret = cqe->res;
-	io_uring_cqe_seen(ring, cqe);
+	do {
+		ret = io_uring_wait_cqe(ring, &pcqe);
+		assert(!ret);
+		cqe = *pcqe;
+		io_uring_cqe_seen(ring, pcqe);
+	} while (cqe.user_data == NOP_USER_DATA);
+
+	if (*multishot) {
+		if (!(cqe.flags & IORING_CQE_F_MORE)) {
+			(*multishot)++;
+			queue_accept_multishot(ring, fd, *multishot, fixed_idx == 0);
+		} else {
+			if (cqe.user_data != *multishot) {
+				fprintf(stderr, "received multishot after told done!\n");
+				return -ECANCELED;
+			}
+		}
+	}
+
+	ret = cqe.res;
 
 	if (fixed_idx >= 0) {
 		if (ret > 0) {
@@ -203,6 +236,32 @@ static int set_client_fd(struct sockaddr_in *addr)
 	return fd;
 }
 
+static void cause_overflow(struct io_uring *ring)
+{
+	int i, ret;
+
+	for (i = 0; i < *ring->cq.kring_entries; i++) {
+		struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
+
+		io_uring_prep_nop(sqe);
+		io_uring_sqe_set_data64(sqe, NOP_USER_DATA);
+		ret = io_uring_submit(ring);
+		assert(ret != -1);
+	}
+
+}
+
+static void clear_overflow(struct io_uring *ring)
+{
+	struct io_uring_cqe *cqe;
+
+	while (!io_uring_peek_cqe(ring, &cqe)) {
+		if (cqe->user_data != NOP_USER_DATA)
+			break;
+		io_uring_cqe_seen(ring, cqe);
+	}
+}
+
 static int test_loop(struct io_uring *ring,
 		     struct accept_test_args args,
 		     int recv_s0,
@@ -215,15 +274,22 @@ static int test_loop(struct io_uring *ring,
 	bool multishot = args.multishot;
 	uint32_t multishot_mask = 0;
 	int nr_fds = multishot ? MAX_FDS : 1;
+	int multishot_idx = multishot ? INITIAL_USER_DATA : 0;
 
-	for (i = 0; i < nr_fds; i++)
+	if (args.overflow)
+		cause_overflow(ring);
+
+	for (i = 0; i < nr_fds; i++) {
 		c_fd[i] = set_client_fd(addr);
+		if (args.overflow && i == nr_fds / 2)
+			clear_overflow(ring);
+	}
 
 	if (!args.queue_accept_before_connect)
 		queue_accept_conn(ring, recv_s0, args);
 
 	for (i = 0; i < nr_fds; i++) {
-		s_fd[i] = accept_conn(ring, fixed ? 0 : -1, multishot);
+		s_fd[i] = accept_conn(ring, fixed ? 0 : -1, &multishot_idx, recv_s0);
 		if (s_fd[i] == -EINVAL) {
 			if (args.accept_should_error)
 				goto out;
@@ -527,14 +593,15 @@ static int test_accept(int count, bool before)
 	return ret;
 }
 
-static int test_multishot_accept(int count, bool before)
+static int test_multishot_accept(int count, bool before, bool overflow)
 {
 	struct io_uring m_io_uring;
 	int ret;
 	struct accept_test_args args = {
 		.queue_accept_before_connect = before,
 		.multishot = true,
-		.extra_loops = count - 1
+		.extra_loops = count - 1,
+		.overflow = overflow
 	};
 
 	if (no_accept_multi)
@@ -779,15 +846,21 @@ int main(int argc, char *argv[])
 		return ret;
 	}
 
-	ret = test_multishot_accept(1, false);
+	ret = test_multishot_accept(1, true, true);
+	if (ret) {
+		fprintf(stderr, "test_multishot_accept(1, false, true) failed\n");
+		return ret;
+	}
+
+	ret = test_multishot_accept(1, false, false);
 	if (ret) {
-		fprintf(stderr, "test_multishot_accept(1, false) failed\n");
+		fprintf(stderr, "test_multishot_accept(1, false, false) failed\n");
 		return ret;
 	}
 
-	ret = test_multishot_accept(1, true);
+	ret = test_multishot_accept(1, true, false);
 	if (ret) {
-		fprintf(stderr, "test_multishot_accept(1, true) failed\n");
+		fprintf(stderr, "test_multishot_accept(1, true, false) failed\n");
 		return ret;
 	}
 
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [PATCH v3 liburing 0/7] liburing: multishot receive
  2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
                   ` (6 preceding siblings ...)
  2022-06-30 16:49 ` [PATCH v3 liburing 7/7] add accept with " Dylan Yudaken
@ 2022-06-30 20:32 ` Jens Axboe
  7 siblings, 0 replies; 9+ messages in thread
From: Jens Axboe @ 2022-06-30 20:32 UTC (permalink / raw)
  To: dylany, io-uring; +Cc: kernel-team, asml.silence

On Thu, 30 Jun 2022 09:49:11 -0700, Dylan Yudaken wrote:
> This adds an API, tests and documentation for the multi shot receive functionality.
> 
> It also adds some testing for overflow paths in accept & poll which previously was
> not tested.
> 
> Patch 1 adds a helper t_create_socket_pair which provides two connected sockets
> without needing a hard coded port
> 
> [...]

Applied, thanks!

[1/7] add t_create_socket_pair
      commit: 9167905ca187064ba1d9ac4c8bb8484157bef86b
[2/7] add IORING_RECV_MULTISHOT to io_uring.h
      commit: 791fc0998b0bdb913f71320caf3128aae23d8f39
[3/7] add io_uring_prep_(recv|recvmsg)_multishot
      commit: 5279d6abefcdb3eedfbcae87559cfdda0ec6e94b
[4/7] add IORING_RECV_MULTISHOT docs
      commit: 8e182bbdff0f5d6e1640190f713ed11060470b5f
[5/7] add recv-multishot test
      commit: b367a5273328d05b8e118bdaf1e87c0c8d9f8606
[6/7] add poll overflow test
      commit: 4218ad94b03db5f9109f02db3ba45bebd55eb744
[7/7] add accept with overflow test
      commit: afc6be1619b4d9eeda5a432353fd7285a543640f

Best regards,
-- 
Jens Axboe



^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2022-06-30 20:33 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2022-06-30 16:49 [PATCH v3 liburing 0/7] liburing: multishot receive Dylan Yudaken
2022-06-30 16:49 ` [PATCH v3 liburing 1/7] add t_create_socket_pair Dylan Yudaken
2022-06-30 16:49 ` [PATCH v3 liburing 2/7] add IORING_RECV_MULTISHOT to io_uring.h Dylan Yudaken
2022-06-30 16:49 ` [PATCH v3 liburing 3/7] add io_uring_prep_(recv|recvmsg)_multishot Dylan Yudaken
2022-06-30 16:49 ` [PATCH v3 liburing 4/7] add IORING_RECV_MULTISHOT docs Dylan Yudaken
2022-06-30 16:49 ` [PATCH v3 liburing 5/7] add recv-multishot test Dylan Yudaken
2022-06-30 16:49 ` [PATCH v3 liburing 6/7] add poll overflow test Dylan Yudaken
2022-06-30 16:49 ` [PATCH v3 liburing 7/7] add accept with " Dylan Yudaken
2022-06-30 20:32 ` [PATCH v3 liburing 0/7] liburing: multishot receive Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox