public inbox for [email protected]
 help / color / mirror / Atom feed
* [PATCH RFC liburing 0/2] multishot recvmsg
@ 2022-07-08 18:45 Dylan Yudaken
  2022-07-08 18:45 ` [PATCH RFC liburing 1/2] add multishot recvmsg API Dylan Yudaken
  2022-07-08 18:45 ` [PATCH RFC liburing 2/2] add tests for multishot recvmsg Dylan Yudaken
  0 siblings, 2 replies; 3+ messages in thread
From: Dylan Yudaken @ 2022-07-08 18:45 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

This series adds an API (patch 1) and a test (#2) for multishot recvmsg.

I have not included docs yet, but I want to get feedback on the API for handling
the result (if there is any).

Dylan Yudaken (2):
  add multishot recvmsg API
  add tests for multishot recvmsg

 src/include/liburing.h          |  59 ++++++++++++++
 src/include/liburing/io_uring.h |   7 ++
 test/recv-multishot.c           | 137 ++++++++++++++++++++++++++++----
 3 files changed, 189 insertions(+), 14 deletions(-)


base-commit: 5d0e33f50a06db768b1891972daab40732400778
-- 
2.30.2


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [PATCH RFC liburing 1/2] add multishot recvmsg API
  2022-07-08 18:45 [PATCH RFC liburing 0/2] multishot recvmsg Dylan Yudaken
@ 2022-07-08 18:45 ` Dylan Yudaken
  2022-07-08 18:45 ` [PATCH RFC liburing 2/2] add tests for multishot recvmsg Dylan Yudaken
  1 sibling, 0 replies; 3+ messages in thread
From: Dylan Yudaken @ 2022-07-08 18:45 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

This adds a new API to do multishot recvmsg. This is more complicated than
multishot recv as it requires handling a well known data layout copied by
the kernel.

Signed-off-by: Dylan Yudaken <[email protected]>
---
 src/include/liburing.h          | 59 +++++++++++++++++++++++++++++++++
 src/include/liburing/io_uring.h |  7 ++++
 2 files changed, 66 insertions(+)

diff --git a/src/include/liburing.h b/src/include/liburing.h
index d35bfa9..3f18bd2 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -419,6 +419,13 @@ static inline void io_uring_prep_recvmsg(struct io_uring_sqe *sqe, int fd,
 	sqe->msg_flags = flags;
 }
 
+static inline void io_uring_prep_recvmsg_multishot(struct io_uring_sqe *sqe, int fd,
+						   struct msghdr *msg, unsigned flags)
+{
+	io_uring_prep_recvmsg(sqe, fd, msg, flags);
+	sqe->ioprio |= IORING_RECV_MULTISHOT;
+}
+
 static inline void io_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd,
 					 const struct msghdr *msg,
 					 unsigned flags)
@@ -685,6 +692,58 @@ static inline void io_uring_prep_recv_multishot(struct io_uring_sqe *sqe,
 	sqe->ioprio |= IORING_RECV_MULTISHOT;
 }
 
+static inline struct io_uring_recvmsg_out *io_uring_recvmsg_validate(
+	void *buf, int buf_len, struct msghdr *m)
+{
+	struct io_uring_recvmsg_out *ret;
+	size_t header = m->msg_controllen + m->msg_namelen + sizeof(struct io_uring_recvmsg_out);
+
+	if (buf_len < header)
+		return NULL;
+	ret = (struct io_uring_recvmsg_out *)buf;
+	if (buf_len < header + ret->payloadlen)
+		return NULL;
+	return ret;
+}
+
+static inline void *io_uring_recvmsg_name(struct io_uring_recvmsg_out *o)
+{
+	return (void*)&o[1];
+}
+
+static inline struct cmsghdr *io_uring_recvmsg_cmsg_firsthdr(struct io_uring_recvmsg_out *o,
+							     struct msghdr *m)
+{
+	if (o->controllen < sizeof(struct cmsghdr))
+		return NULL;
+	return (struct cmsghdr *)((unsigned char*)io_uring_recvmsg_name(o) + m->msg_namelen);
+}
+
+static inline void *io_uring_recvmsg_payload(struct io_uring_recvmsg_out *o,
+					     struct msghdr *m)
+{
+	return (void*)((unsigned char*)io_uring_recvmsg_name(o) + m->msg_namelen + m->msg_controllen);
+}
+
+static inline struct cmsghdr *io_uring_recvmsg_cmsg_nexthdr(struct io_uring_recvmsg_out *o,
+							    struct msghdr *m,
+							    struct cmsghdr *cmsg)
+{
+	unsigned char *end;
+
+	if (cmsg->cmsg_len < sizeof (struct cmsghdr))
+		return NULL;
+	end = (unsigned char *)io_uring_recvmsg_payload(o, m);
+	cmsg = (struct cmsghdr *)((unsigned char *)cmsg + CMSG_ALIGN(cmsg->cmsg_len));
+
+	if ((unsigned char *)(cmsg + 1) > end)
+		return NULL;
+	if (((unsigned char *)cmsg) + CMSG_ALIGN(cmsg->cmsg_len) > end)
+		return NULL;
+
+	return cmsg;
+}
+
 static inline void io_uring_prep_openat2(struct io_uring_sqe *sqe, int dfd,
 					const char *path, struct open_how *how)
 {
diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index fbf6403..5e111b4 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -573,6 +573,13 @@ struct io_uring_file_index_range {
 	__u64	resv;
 };
 
+struct io_uring_recvmsg_out {
+	__u32 namelen;
+	__u32 controllen;
+	__u32 payloadlen;
+	__u32 flags;
+};
+
 /*
  * accept flags stored in sqe->ioprio
  */
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH RFC liburing 2/2] add tests for multishot recvmsg
  2022-07-08 18:45 [PATCH RFC liburing 0/2] multishot recvmsg Dylan Yudaken
  2022-07-08 18:45 ` [PATCH RFC liburing 1/2] add multishot recvmsg API Dylan Yudaken
@ 2022-07-08 18:45 ` Dylan Yudaken
  1 sibling, 0 replies; 3+ messages in thread
From: Dylan Yudaken @ 2022-07-08 18:45 UTC (permalink / raw)
  To: axboe, asml.silence; +Cc: io-uring, Kernel-team, Dylan Yudaken

Expand the multishot recv test to include recvmsg.
This also checks that sockaddr comes back, and that control messages work
properly.

Signed-off-by: Dylan Yudaken <[email protected]>
---
 test/recv-multishot.c | 137 +++++++++++++++++++++++++++++++++++++-----
 1 file changed, 123 insertions(+), 14 deletions(-)

diff --git a/test/recv-multishot.c b/test/recv-multishot.c
index 9df8184..b1cc335 100644
--- a/test/recv-multishot.c
+++ b/test/recv-multishot.c
@@ -27,20 +27,42 @@ enum early_error_t {
 struct args {
 	bool stream;
 	bool wait_each;
+	bool recvmsg;
 	enum early_error_t early_error;
 };
 
+static int check_sockaddr(struct sockaddr_in *in)
+{
+	struct in_addr expected;
+	inet_pton(AF_INET, "127.0.0.1", &expected);
+	if (in->sin_family != AF_INET) {
+		fprintf(stderr, "bad family %d\n", (int)htons(in->sin_family));
+		return -1;
+	}
+	if (memcmp(&expected, &in->sin_addr, sizeof(in->sin_addr))) {
+		char buff[256];
+		const char *addr = inet_ntop(AF_INET, &in->sin_addr, buff, sizeof(buff));
+		fprintf(stderr, "unexpected address %s\n", addr ? addr : "INVALID");
+		return -1;
+	}
+	return 0;
+}
+
 static int test(struct args *args)
 {
 	int const N = 8;
 	int const N_BUFFS = N * 64;
 	int const N_CQE_OVERFLOW = 4;
 	int const min_cqes = 2;
+	int const NAME_LEN = sizeof(struct sockaddr_storage);
+	int const CONTROL_LEN = CMSG_ALIGN(sizeof(struct sockaddr_storage))
+					+ sizeof(struct cmsghdr);
 	struct io_uring ring;
 	struct io_uring_cqe *cqe;
 	struct io_uring_sqe *sqe;
 	int fds[2], ret, i, j, total_sent_bytes = 0, total_recv_bytes = 0;
 	int send_buff[256];
+	int *sent_buffs[N_BUFFS];
 	int *recv_buffs[N_BUFFS];
 	int *at;
 	struct io_uring_cqe recv_cqe[N_BUFFS];
@@ -50,7 +72,7 @@ static int test(struct args *args)
 	struct __kernel_timespec timeout = {
 		.tv_sec = 1,
 	};
-
+	struct msghdr msg;
 
 	memset(recv_buffs, 0, sizeof(recv_buffs));
 
@@ -75,21 +97,39 @@ static int test(struct args *args)
 		return ret;
 	}
 
+	if (!args->stream) {
+		bool val = true;
+		/* force some cmsgs to come back to us */
+		if (setsockopt(fds[0], IPPROTO_IP,
+				IP_RECVORIGDSTADDR, &val, sizeof(val))) {
+				fprintf(stderr, "setsockopt failed %d\n", errno);
+				goto cleanup;
+			}
+	}
+
 	for (i = 0; i < ARRAY_SIZE(send_buff); i++)
 		send_buff[i] = i;
 
 	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
 		/* prepare some different sized buffers */
-		int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N * sizeof(int);
+		int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N;
+		buffer_size *= sizeof(int);
+		if (args->recvmsg) {
+			buffer_size +=
+				sizeof(struct io_uring_recvmsg_out) +
+				NAME_LEN +
+				CONTROL_LEN;
+		}
 
-		recv_buffs[i] = malloc(sizeof(*at) * buffer_size);
+		recv_buffs[i] = malloc(buffer_size);
 
 		if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
 			continue;
 
 		sqe = io_uring_get_sqe(&ring);
 		io_uring_prep_provide_buffers(sqe, recv_buffs[i],
-					buffer_size * sizeof(*recv_buffs[i]), 1, 7, i);
+					buffer_size, 1, 7, i);
+		memset(recv_buffs[i], 0xcc, buffer_size);
 		if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) != 0) {
 			fprintf(stderr, "provide buffers failed: %d\n", ret);
 			ret = -1;
@@ -99,7 +139,15 @@ static int test(struct args *args)
 	}
 
 	sqe = io_uring_get_sqe(&ring);
-	io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
+	if (args->recvmsg) {
+		memset(&msg, 0, sizeof(msg));
+		msg.msg_namelen = NAME_LEN;
+		msg.msg_controllen = CONTROL_LEN;
+
+		io_uring_prep_recvmsg_multishot(sqe, fds[0], &msg, 0);
+	} else {
+		io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
+	}
 	sqe->flags |= IOSQE_BUFFER_SELECT;
 	sqe->buf_group = 7;
 	io_uring_sqe_set_data64(sqe, 1234);
@@ -111,6 +159,7 @@ static int test(struct args *args)
 		int to_send = sizeof(*at) * (i+1);
 
 		total_sent_bytes += to_send;
+		sent_buffs[i] = at;
 		if (send(fds[1], at, to_send, 0) != to_send) {
 			if (early_error_started)
 				break;
@@ -205,6 +254,8 @@ static int test(struct args *args)
 
 
 		if (should_be_last) {
+			int used_res = cqe->res;
+
 			if (!is_last) {
 				fprintf(stderr, "not last cqe had error %d\n", i);
 				goto cleanup;
@@ -234,7 +285,16 @@ static int test(struct args *args)
 				break;
 			case ERROR_NONE:
 			case ERROR_EARLY_CLOSE_SENDER:
-				if (cqe->res != 0) {
+				if (args->recvmsg && (cqe->flags & IORING_CQE_F_BUFFER)) {
+					struct io_uring_recvmsg_out *o =
+						(struct io_uring_recvmsg_out *)recv_buffs[cqe->flags >> 16];
+					if (o->payloadlen != 0) {
+						fprintf(stderr, "early error expected 0 payloadlen, got %u\n",
+							o->payloadlen);
+						goto cleanup;
+					}
+					used_res = 0;
+				} else if (cqe->res != 0) {
 					fprintf(stderr, "early error: res %d\n", cqe->res);
 					goto cleanup;
 				}
@@ -254,7 +314,7 @@ static int test(struct args *args)
 				goto cleanup;
 			}
 
-			if (cqe->res <= 0)
+			if (used_res <= 0)
 				continue;
 		} else {
 			if (!(cqe->flags & IORING_CQE_F_MORE)) {
@@ -268,7 +328,48 @@ static int test(struct args *args)
 			goto cleanup;
 		}
 
+		this_recv = recv_buffs[cqe->flags >> 16];
+
+		if (args->recvmsg) {
+			struct io_uring_recvmsg_out *o = io_uring_recvmsg_validate(
+				this_recv, cqe->res, &msg);
+			if (!o) {
+				fprintf(stderr, "bad recvmsg\n");
+				goto cleanup;
+			}
+			cqe->res = o->payloadlen;
+
+			if (!args->stream) {
+				struct cmsghdr *cmsg;
+				if (o->namelen < sizeof(struct sockaddr_in)) {
+					fprintf(stderr, "bad addr len %d",
+						o->namelen);
+					goto cleanup;
+				}
+				if (check_sockaddr((struct sockaddr_in*)io_uring_recvmsg_name(o)))
+					goto cleanup;
+
+				cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
+				if (!cmsg ||
+				    cmsg->cmsg_level != IPPROTO_IP ||
+				    cmsg->cmsg_type != IP_RECVORIGDSTADDR) {
+					fprintf(stderr, "bad cmsg");
+					goto cleanup;
+				}
+				if (check_sockaddr((struct sockaddr_in *)CMSG_DATA(cmsg)))
+					goto cleanup;
+				cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
+				if (cmsg) {
+					fprintf(stderr, "unexpected extra cmsg\n");
+					goto cleanup;
+				}
+			}
+
+			this_recv = (int*)io_uring_recvmsg_payload(o, &msg);
+		}
+
 		total_recv_bytes += cqe->res;
+
 		if (cqe->res % 4 != 0) {
 			/*
 			 * doesn't seem to happen in practice, would need some
@@ -278,9 +379,19 @@ static int test(struct args *args)
 			goto cleanup;
 		}
 
-		/* check buffer arrived in order (for tcp) */
-		this_recv = recv_buffs[cqe->flags >> 16];
-		for (j = 0; args->stream && j < cqe->res / 4; j++) {
+		/*
+		 * for tcp: check buffer arrived in order
+		 * for udp: based on size validate data based on size
+		 */
+		if (!args->stream) {
+			int sent_idx = cqe->res / sizeof(*at) - 1;
+			if (sent_idx < 0 || sent_idx > N) {
+				fprintf(stderr, "Bad sent idx: %d\n", sent_idx);
+				goto cleanup;
+			}
+			at = sent_buffs[sent_idx];
+		}
+		for (j = 0; j < cqe->res / 4; j++) {
 			int sent = *at++;
 			int recv = *this_recv++;
 
@@ -297,9 +408,6 @@ static int test(struct args *args)
 		goto cleanup;
 	}
 
-	/* check the final one */
-	cqe = &recv_cqe[recv_cqes-1];
-
 	ret = 0;
 cleanup:
 	for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
@@ -320,10 +428,11 @@ int main(int argc, char *argv[])
 	if (argc > 1)
 		return T_EXIT_SKIP;
 
-	for (loop = 0; loop < 4; loop++) {
+	for (loop = 0; loop < 8; loop++) {
 		struct args a = {
 			.stream = loop & 0x01,
 			.wait_each = loop & 0x2,
+			.recvmsg = loop & 0x04,
 		};
 		for (early_error = 0; early_error < ERROR_EARLY_LAST; early_error++) {
 			a.early_error = (enum early_error_t)early_error;
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2022-07-08 18:45 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2022-07-08 18:45 [PATCH RFC liburing 0/2] multishot recvmsg Dylan Yudaken
2022-07-08 18:45 ` [PATCH RFC liburing 1/2] add multishot recvmsg API Dylan Yudaken
2022-07-08 18:45 ` [PATCH RFC liburing 2/2] add tests for multishot recvmsg Dylan Yudaken

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox