* [PATCH liburing 1/4] add t_create_socket_pair
2022-06-28 15:04 [PATCH liburing 0/4] liburing: multishot receive Dylan Yudaken
@ 2022-06-28 15:04 ` Dylan Yudaken
2022-06-28 15:04 ` [PATCH liburing 2/4] add IORING_RECV_MULTISHOT to io_uring.h Dylan Yudaken
` (2 subsequent siblings)
3 siblings, 0 replies; 8+ messages in thread
From: Dylan Yudaken @ 2022-06-28 15:04 UTC (permalink / raw)
To: Jens Axboe, Pavel Begunkov, io-uring; +Cc: Kernel-team, Dylan Yudaken
This is a useful tool for making networking tests, and does not require a
hard coded port which is useful
Signed-off-by: Dylan Yudaken <[email protected]>
---
test/helpers.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++
test/helpers.h | 5 +++
2 files changed, 102 insertions(+)
diff --git a/test/helpers.c b/test/helpers.c
index 491822e..6fb1157 100644
--- a/test/helpers.c
+++ b/test/helpers.c
@@ -10,6 +10,10 @@
#include <unistd.h>
#include <sys/types.h>
+#include <arpa/inet.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+
#include "helpers.h"
#include "liburing.h"
@@ -143,3 +147,96 @@ enum t_setup_ret t_register_buffers(struct io_uring *ring,
fprintf(stderr, "buffer register failed: %s\n", strerror(-ret));
return ret;
}
+
+int t_create_socket_pair(int fd[2], bool stream)
+{
+ int ret;
+ int type = stream ? SOCK_STREAM : SOCK_DGRAM;
+ int val;
+ struct sockaddr_in serv_addr;
+ struct sockaddr *paddr;
+ size_t paddrlen;
+
+ type |= SOCK_CLOEXEC;
+ fd[0] = socket(AF_INET, type, 0);
+ if (fd[0] < 0)
+ return errno;
+ fd[1] = socket(AF_INET, type, 0);
+ if (fd[1] < 0) {
+ ret = errno;
+ close(fd[0]);
+ return ret;
+ }
+
+ val = 1;
+ if (setsockopt(fd[0], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)))
+ goto errno_cleanup;
+
+ memset(&serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = 0;
+ inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr);
+
+ paddr = (struct sockaddr *)&serv_addr;
+ paddrlen = sizeof(serv_addr);
+
+ if (bind(fd[0], paddr, paddrlen)) {
+ fprintf(stderr, "bind failed\n");
+ goto errno_cleanup;
+ }
+
+ if (stream && listen(fd[0], 16)) {
+ fprintf(stderr, "listen failed\n");
+ goto errno_cleanup;
+ }
+
+ if (getsockname(fd[0], &serv_addr, (socklen_t *)&paddrlen)) {
+ fprintf(stderr, "getsockname failed\n");
+ goto errno_cleanup;
+ }
+ inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr);
+
+ if (connect(fd[1], &serv_addr, paddrlen)) {
+ fprintf(stderr, "connect failed\n");
+ goto errno_cleanup;
+ }
+
+ if (!stream) {
+ /* connect the other udp side */
+ if (getsockname(fd[1], &serv_addr, (socklen_t *)&paddrlen)) {
+ fprintf(stderr, "getsockname failed\n");
+ goto errno_cleanup;
+ }
+ inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr);
+
+ if (connect(fd[0], &serv_addr, paddrlen)) {
+ fprintf(stderr, "connect failed\n");
+ goto errno_cleanup;
+ }
+ return 0;
+ }
+
+ /* for stream case we must accept and cleanup the listen socket */
+
+ ret = accept(fd[0], NULL, NULL);
+ if (ret < 0)
+ goto errno_cleanup;
+
+ close(fd[0]);
+ fd[0] = ret;
+
+ val = 1;
+ if (stream && setsockopt(fd[0], SOL_TCP, TCP_NODELAY, &val, sizeof(val)))
+ goto errno_cleanup;
+ val = 1;
+ if (stream && setsockopt(fd[1], SOL_TCP, TCP_NODELAY, &val, sizeof(val)))
+ goto errno_cleanup;
+
+ return 0;
+
+errno_cleanup:
+ ret = errno;
+ close(fd[0]);
+ close(fd[1]);
+ return ret;
+}
diff --git a/test/helpers.h b/test/helpers.h
index d0beb93..7cef3c1 100644
--- a/test/helpers.h
+++ b/test/helpers.h
@@ -52,6 +52,11 @@ void t_create_file_pattern(const char *file, size_t size, char pattern);
*/
struct iovec *t_create_buffers(size_t buf_num, size_t buf_size);
+/*
+ * Helper for creating connected socket pairs
+ */
+int t_create_socket_pair(int fd[2], bool stream);
+
/*
* Helper for setting up a ring and checking for user privs
*/
--
2.30.2
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH liburing 2/4] add IORING_RECV_MULTISHOT to io_uring.h
2022-06-28 15:04 [PATCH liburing 0/4] liburing: multishot receive Dylan Yudaken
2022-06-28 15:04 ` [PATCH liburing 1/4] add t_create_socket_pair Dylan Yudaken
@ 2022-06-28 15:04 ` Dylan Yudaken
2022-06-28 16:10 ` Ammar Faizi
2022-06-28 15:04 ` [PATCH liburing 3/4] add recv-multishot test Dylan Yudaken
2022-06-28 15:04 ` [PATCH liburing 4/4] add IORING_RECV_MULTISHOT docs Dylan Yudaken
3 siblings, 1 reply; 8+ messages in thread
From: Dylan Yudaken @ 2022-06-28 15:04 UTC (permalink / raw)
To: Jens Axboe, Pavel Begunkov, io-uring; +Cc: Kernel-team, Dylan Yudaken
copy from include/uapi/linux/io_uring.h
Signed-off-by: Dylan Yudaken <[email protected]>
---
src/include/liburing/io_uring.h | 53 ++++++++++++++++++++++++---------
1 file changed, 39 insertions(+), 14 deletions(-)
diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index 2f391c9..1e5bdb3 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -10,10 +10,7 @@
#include <linux/fs.h>
#include <linux/types.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include <linux/time_types.h>
/*
* IO submission data structure (Submission Queue Entry)
@@ -26,6 +23,7 @@ struct io_uring_sqe {
union {
__u64 off; /* offset into file */
__u64 addr2;
+ __u32 cmd_op;
};
union {
__u64 addr; /* pointer to buffer or iovecs */
@@ -65,8 +63,17 @@ struct io_uring_sqe {
__s32 splice_fd_in;
__u32 file_index;
};
- __u64 addr3;
- __u64 __pad2[1];
+ union {
+ struct {
+ __u64 addr3;
+ __u64 __pad2[1];
+ };
+ /*
+ * If the ring is initialized with IORING_SETUP_SQE128, then
+ * this field is used for 80 bytes of arbitrary command data
+ */
+ __u8 cmd[0];
+ };
};
/*
@@ -131,9 +138,12 @@ enum {
* IORING_SQ_TASKRUN in the sq ring flags. Not valid with COOP_TASKRUN.
*/
#define IORING_SETUP_TASKRUN_FLAG (1U << 9)
-
#define IORING_SETUP_SQE128 (1U << 10) /* SQEs are 128 byte */
#define IORING_SETUP_CQE32 (1U << 11) /* CQEs are 32 byte */
+/*
+ * Only one task is allowed to submit requests
+ */
+#define IORING_SETUP_SINGLE_ISSUER (1U << 12)
enum io_uring_op {
IORING_OP_NOP,
@@ -220,10 +230,13 @@ enum io_uring_op {
*
* IORING_POLL_UPDATE Update existing poll request, matching
* sqe->addr as the old user_data field.
+ *
+ * IORING_POLL_LEVEL Level triggered poll.
*/
#define IORING_POLL_ADD_MULTI (1U << 0)
#define IORING_POLL_UPDATE_EVENTS (1U << 1)
#define IORING_POLL_UPDATE_USER_DATA (1U << 2)
+#define IORING_POLL_ADD_LEVEL (1U << 3)
/*
* ASYNC_CANCEL flags.
@@ -232,10 +245,12 @@ enum io_uring_op {
* IORING_ASYNC_CANCEL_FD Key off 'fd' for cancelation rather than the
* request 'user_data'
* IORING_ASYNC_CANCEL_ANY Match any request
+ * IORING_ASYNC_CANCEL_FD_FIXED 'fd' passed in is a fixed descriptor
*/
#define IORING_ASYNC_CANCEL_ALL (1U << 0)
#define IORING_ASYNC_CANCEL_FD (1U << 1)
-#define IORING_ASYNC_CANCEL_ANY (1U << 2)
+#define IORING_ASYNC_CANCEL_ANY (1U << 2)
+#define IORING_ASYNC_CANCEL_FD_FIXED (1U << 3)
/*
* send/sendmsg and recv/recvmsg flags (sqe->addr2)
@@ -244,8 +259,13 @@ enum io_uring_op {
* or receive and arm poll if that yields an
* -EAGAIN result, arm poll upfront and skip
* the initial transfer attempt.
+ *
+ * IORING_RECV_MULTISHOT Multishot recv. Sets IORING_CQE_F_MORE if
+ * the handler will continue to report
+ * CQEs on behalf of the same SQE.
*/
#define IORING_RECVSEND_POLL_FIRST (1U << 0)
+#define IORING_RECV_MULTISHOT (1U << 1)
/*
* accept flags stored in sqe->ioprio
@@ -411,6 +431,9 @@ enum {
IORING_REGISTER_PBUF_RING = 22,
IORING_UNREGISTER_PBUF_RING = 23,
+ /* sync cancelation API */
+ IORING_REGISTER_SYNC_CANCEL = 24,
+
/* this goes last */
IORING_REGISTER_LAST
};
@@ -547,12 +570,14 @@ struct io_uring_getevents_arg {
};
/*
- * accept flags stored in sqe->ioprio
+ * Argument for IORING_REGISTER_SYNC_CANCEL
*/
-#define IORING_ACCEPT_MULTISHOT (1U << 0)
-
-#ifdef __cplusplus
-}
-#endif
+struct io_uring_sync_cancel_reg {
+ __u64 addr;
+ __s32 fd;
+ __u32 flags;
+ struct __kernel_timespec timeout;
+ __u64 pad[4];
+};
#endif
--
2.30.2
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH liburing 3/4] add recv-multishot test
2022-06-28 15:04 [PATCH liburing 0/4] liburing: multishot receive Dylan Yudaken
2022-06-28 15:04 ` [PATCH liburing 1/4] add t_create_socket_pair Dylan Yudaken
2022-06-28 15:04 ` [PATCH liburing 2/4] add IORING_RECV_MULTISHOT to io_uring.h Dylan Yudaken
@ 2022-06-28 15:04 ` Dylan Yudaken
2022-06-28 15:04 ` [PATCH liburing 4/4] add IORING_RECV_MULTISHOT docs Dylan Yudaken
3 siblings, 0 replies; 8+ messages in thread
From: Dylan Yudaken @ 2022-06-28 15:04 UTC (permalink / raw)
To: Jens Axboe, Pavel Begunkov, io-uring; +Cc: Kernel-team, Dylan Yudaken
add a test for multishot receive functionality
Signed-off-by: Dylan Yudaken <[email protected]>
---
test/Makefile | 1 +
test/recv-multishot.c | 308 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 309 insertions(+)
create mode 100644 test/recv-multishot.c
diff --git a/test/Makefile b/test/Makefile
index e3204a7..73a0001 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -124,6 +124,7 @@ test_srcs := \
read-write.c \
recv-msgall.c \
recv-msgall-stream.c \
+ recv-multishot.c \
register-restrictions.c \
rename.c \
ringbuf-read.c \
diff --git a/test/recv-multishot.c b/test/recv-multishot.c
new file mode 100644
index 0000000..38ec615
--- /dev/null
+++ b/test/recv-multishot.c
@@ -0,0 +1,308 @@
+// SPDX-License-Identifier: MIT
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <pthread.h>
+
+#include "liburing.h"
+#include "helpers.h"
+
+
+enum early_error_t {
+ ERROR_NONE = 0,
+ ERROR_NOT_ENOUGH_BUFFERS = 1,
+ ERROR_EARLY_CLOSE_SENDER = 2,
+ ERROR_EARLY_CLOSE_RECEIVER = 3,
+};
+
+struct args {
+ bool recvmsg;
+ bool stream;
+ bool wait_each;
+ enum early_error_t early_error;
+};
+
+static int test(struct args *args)
+{
+ int const N = 8;
+ int const N_BUFFS = N * 64;
+ int const min_cqes = 2;
+ struct io_uring ring;
+ struct io_uring_cqe *cqe;
+ struct io_uring_sqe *sqe;
+ int fds[2], ret, i, j, total_sent_bytes = 0, total_recv_bytes = 0;
+ int send_buff[256];
+ int *recv_buffs[N_BUFFS];
+ int *at;
+ struct io_uring_cqe recv_cqe[N_BUFFS];
+ int recv_cqes = 0;
+ bool early_error = false;
+ bool early_error_started = false;
+ struct msghdr msg = { };
+ struct __kernel_timespec timeout = {
+ .tv_sec = 1,
+ };
+
+
+ memset(recv_buffs, 0, sizeof(recv_buffs));
+
+ ret = io_uring_queue_init(32, &ring, 0);
+ if (ret) {
+ fprintf(stderr, "queue init failed: %d\n", ret);
+ return ret;
+ }
+
+ ret = t_create_socket_pair(fds, args->stream);
+ if (ret) {
+ fprintf(stderr, "t_create_socket_pair failed: %d\n", ret);
+ return ret;
+ }
+
+ for (i = 0; i < ARRAY_SIZE(send_buff); i++)
+ send_buff[i] = i;
+
+ for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
+ /* prepare some different sized buffers */
+ int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N * sizeof(int);
+
+ recv_buffs[i] = malloc(sizeof(*at) * buffer_size);
+
+ if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
+ continue;
+
+ sqe = io_uring_get_sqe(&ring);
+ io_uring_prep_provide_buffers(sqe, recv_buffs[i],
+ buffer_size * sizeof(*recv_buffs[i]), 1, 7, i);
+ if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) != 0) {
+ fprintf(stderr, "provide buffers failed: %d\n", ret);
+ ret = -1;
+ goto cleanup;
+ }
+ io_uring_cqe_seen(&ring, cqe);
+ }
+
+ sqe = io_uring_get_sqe(&ring);
+ if (args->recvmsg) {
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_namelen = sizeof(struct sockaddr_in);
+ io_uring_prep_recvmsg(sqe, fds[0], &msg, 0);
+ } else {
+ io_uring_prep_recv(sqe, fds[0], NULL, 0, 0);
+ }
+ sqe->flags |= IOSQE_BUFFER_SELECT;
+ sqe->buf_group = 7;
+ sqe->addr2 |= IORING_RECV_MULTISHOT;
+ io_uring_sqe_set_data64(sqe, 1234);
+ io_uring_submit(&ring);
+
+ at = &send_buff[0];
+ total_sent_bytes = 0;
+ for (i = 0; i < N; i++) {
+ int to_send = sizeof(*at) * (i+1);
+
+ total_sent_bytes += to_send;
+ if (send(fds[1], at, to_send, 0) != to_send) {
+ if (early_error_started)
+ break;
+ fprintf(stderr, "send failed %d\n", errno);
+ ret = -1;
+ goto cleanup;
+ }
+
+ if (i == 2) {
+ if (args->early_error == ERROR_EARLY_CLOSE_RECEIVER) {
+ /* allow previous sends to complete */
+ usleep(1000);
+
+ sqe = io_uring_get_sqe(&ring);
+ io_uring_prep_recv(sqe, fds[0], NULL, 0, 0);
+ io_uring_prep_cancel64(sqe, 1234, 0);
+ sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+ io_uring_submit(&ring);
+ early_error_started = true;
+ }
+ if (args->early_error == ERROR_EARLY_CLOSE_SENDER) {
+ early_error_started = true;
+ shutdown(fds[1], SHUT_RDWR);
+ close(fds[1]);
+ }
+ }
+ at += (i+1);
+
+ if (args->wait_each) {
+ ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
+ if (ret) {
+ fprintf(stderr, "wait_each failed: %d\n", ret);
+ ret = -1;
+ goto cleanup;
+ }
+ while (io_uring_peek_cqe(&ring, &cqe) == 0) {
+ recv_cqe[recv_cqes++] = *cqe;
+ if (cqe->flags & IORING_CQE_F_MORE) {
+ io_uring_cqe_seen(&ring, cqe);
+ } else {
+ early_error = true;
+ io_uring_cqe_seen(&ring, cqe);
+ }
+ }
+ if (early_error)
+ break;
+ }
+ }
+
+ close(fds[1]);
+
+ /* allow sends to finish */
+ usleep(1000);
+
+ if ((args->stream && !early_error) || recv_cqes < min_cqes) {
+ ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
+ if (ret && ret != -ETIME) {
+ fprintf(stderr, "wait final failed: %d\n", ret);
+ ret = -1;
+ goto cleanup;
+ }
+ }
+
+ while (io_uring_peek_cqe(&ring, &cqe) == 0) {
+ recv_cqe[recv_cqes++] = *cqe;
+ io_uring_cqe_seen(&ring, cqe);
+ }
+
+ ret = -1;
+ at = &send_buff[0];
+ if (recv_cqes < min_cqes) {
+ fprintf(stderr, "not enough cqes: have=%d vs %d\n", recv_cqes, min_cqes);
+ goto cleanup;
+ }
+ for (i = 0; i < recv_cqes; i++) {
+ bool const is_last = i == recv_cqes - 1;
+ int *this_recv;
+
+ cqe = &recv_cqe[i];
+
+ if (cqe->res <= 0 || (args->stream && is_last)) {
+ if (!is_last) {
+ fprintf(stderr, "not last cqe had error %d\n", i);
+ goto cleanup;
+ }
+
+ switch (args->early_error) {
+ case ERROR_NOT_ENOUGH_BUFFERS:
+ if (cqe->res != -ENOBUFS) {
+ fprintf(stderr,
+ "ERROR_NOT_ENOUGH_BUFFERS: res %d\n", cqe->res);
+ goto cleanup;
+ }
+ break;
+ case ERROR_EARLY_CLOSE_RECEIVER:
+ if (cqe->res != -ECANCELED) {
+ fprintf(stderr,
+ "ERROR_EARLY_CLOSE_RECEIVER: res %d\n", cqe->res);
+ goto cleanup;
+ }
+ break;
+ case ERROR_NONE:
+ case ERROR_EARLY_CLOSE_SENDER:
+ if (cqe->res != 0) {
+ fprintf(stderr, "early error: res %d\n", cqe->res);
+ goto cleanup;
+ }
+ break;
+ };
+
+ if (cqe->flags & IORING_CQE_F_BUFFER) {
+ fprintf(stderr, "final BUFFER flag set\n");
+ goto cleanup;
+ }
+
+ if (cqe->flags & IORING_CQE_F_MORE) {
+ fprintf(stderr, "final MORE flag set\n");
+ goto cleanup;
+ }
+
+ continue;
+ }
+
+ total_recv_bytes += cqe->res;
+ if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
+ fprintf(stderr, "BUFFER flag not set\n");
+ goto cleanup;
+ }
+ if (!(cqe->flags & IORING_CQE_F_MORE)) {
+ fprintf(stderr, "MORE flag not set\n");
+ goto cleanup;
+ }
+ if (cqe->res % 4 != 0) {
+ /*
+ * doesn't seem to happen in practice, would need some
+ * work to remove this requirement
+ */
+ fprintf(stderr, "unexpectedly aligned buffer cqe->res=%d\n", cqe->res);
+ goto cleanup;
+ }
+
+ /* check buffer arrived in order (for tcp) */
+ this_recv = recv_buffs[cqe->flags >> 16];
+ for (j = 0; args->stream && j < cqe->res / 4; j++) {
+ int sent = *at++;
+ int recv = *this_recv++;
+
+ if (sent != recv) {
+ fprintf(stderr, "recv=%d sent=%d\n", recv, sent);
+ goto cleanup;
+ }
+ }
+ }
+
+ if (args->early_error == ERROR_NONE && total_recv_bytes < total_sent_bytes) {
+ fprintf(stderr,
+ "missing recv: recv=%d sent=%d\n", total_recv_bytes, total_sent_bytes);
+ goto cleanup;
+ }
+
+ /* check the final one */
+ cqe = &recv_cqe[recv_cqes-1];
+
+ ret = 0;
+cleanup:
+ for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
+ free(recv_buffs[i]);
+ close(fds[0]);
+ close(fds[1]);
+ io_uring_queue_exit(&ring);
+
+ return ret;
+}
+
+int main(int argc, char *argv[])
+{
+ int ret;
+ int loop;
+
+ if (argc > 1)
+ return 0;
+
+ for (loop = 0; loop < 32; loop++) {
+ struct args a = {
+ .early_error = loop & 0x03,
+ .stream = loop & 0x04,
+ .recvmsg = loop & 0x08,
+ .wait_each = loop & 0x10,
+ };
+ ret = test(&a);
+ if (ret) {
+ fprintf(stderr,
+ "test stream=%d recvmsg=%d wait_each=%d early_error=%d failed\n",
+ a.stream, a.recvmsg, a.wait_each, a.early_error);
+ return ret;
+ }
+ }
+ return 0;
+}
--
2.30.2
^ permalink raw reply related [flat|nested] 8+ messages in thread