From: Dylan Yudaken <[email protected]>
To: Jens Axboe <[email protected]>,
Pavel Begunkov <[email protected]>,
<[email protected]>
Cc: <[email protected]>, Dylan Yudaken <[email protected]>
Subject: [PATCH liburing 3/4] add recv-multishot test
Date: Tue, 28 Jun 2022 08:04:13 -0700 [thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
add a test for multishot receive functionality
Signed-off-by: Dylan Yudaken <[email protected]>
---
test/Makefile | 1 +
test/recv-multishot.c | 308 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 309 insertions(+)
create mode 100644 test/recv-multishot.c
diff --git a/test/Makefile b/test/Makefile
index e3204a7..73a0001 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -124,6 +124,7 @@ test_srcs := \
read-write.c \
recv-msgall.c \
recv-msgall-stream.c \
+ recv-multishot.c \
register-restrictions.c \
rename.c \
ringbuf-read.c \
diff --git a/test/recv-multishot.c b/test/recv-multishot.c
new file mode 100644
index 0000000..38ec615
--- /dev/null
+++ b/test/recv-multishot.c
@@ -0,0 +1,308 @@
+// SPDX-License-Identifier: MIT
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <pthread.h>
+
+#include "liburing.h"
+#include "helpers.h"
+
+
+enum early_error_t {
+ ERROR_NONE = 0,
+ ERROR_NOT_ENOUGH_BUFFERS = 1,
+ ERROR_EARLY_CLOSE_SENDER = 2,
+ ERROR_EARLY_CLOSE_RECEIVER = 3,
+};
+
+struct args {
+ bool recvmsg;
+ bool stream;
+ bool wait_each;
+ enum early_error_t early_error;
+};
+
+static int test(struct args *args)
+{
+ int const N = 8;
+ int const N_BUFFS = N * 64;
+ int const min_cqes = 2;
+ struct io_uring ring;
+ struct io_uring_cqe *cqe;
+ struct io_uring_sqe *sqe;
+ int fds[2], ret, i, j, total_sent_bytes = 0, total_recv_bytes = 0;
+ int send_buff[256];
+ int *recv_buffs[N_BUFFS];
+ int *at;
+ struct io_uring_cqe recv_cqe[N_BUFFS];
+ int recv_cqes = 0;
+ bool early_error = false;
+ bool early_error_started = false;
+ struct msghdr msg = { };
+ struct __kernel_timespec timeout = {
+ .tv_sec = 1,
+ };
+
+
+ memset(recv_buffs, 0, sizeof(recv_buffs));
+
+ ret = io_uring_queue_init(32, &ring, 0);
+ if (ret) {
+ fprintf(stderr, "queue init failed: %d\n", ret);
+ return ret;
+ }
+
+ ret = t_create_socket_pair(fds, args->stream);
+ if (ret) {
+ fprintf(stderr, "t_create_socket_pair failed: %d\n", ret);
+ return ret;
+ }
+
+ for (i = 0; i < ARRAY_SIZE(send_buff); i++)
+ send_buff[i] = i;
+
+ for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
+ /* prepare some different sized buffers */
+ int buffer_size = (i % 2 == 0 && args->stream) ? 1 : N * sizeof(int);
+
+ recv_buffs[i] = malloc(sizeof(*at) * buffer_size);
+
+ if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
+ continue;
+
+ sqe = io_uring_get_sqe(&ring);
+ io_uring_prep_provide_buffers(sqe, recv_buffs[i],
+ buffer_size * sizeof(*recv_buffs[i]), 1, 7, i);
+ if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) != 0) {
+ fprintf(stderr, "provide buffers failed: %d\n", ret);
+ ret = -1;
+ goto cleanup;
+ }
+ io_uring_cqe_seen(&ring, cqe);
+ }
+
+ sqe = io_uring_get_sqe(&ring);
+ if (args->recvmsg) {
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_namelen = sizeof(struct sockaddr_in);
+ io_uring_prep_recvmsg(sqe, fds[0], &msg, 0);
+ } else {
+ io_uring_prep_recv(sqe, fds[0], NULL, 0, 0);
+ }
+ sqe->flags |= IOSQE_BUFFER_SELECT;
+ sqe->buf_group = 7;
+ sqe->addr2 |= IORING_RECV_MULTISHOT;
+ io_uring_sqe_set_data64(sqe, 1234);
+ io_uring_submit(&ring);
+
+ at = &send_buff[0];
+ total_sent_bytes = 0;
+ for (i = 0; i < N; i++) {
+ int to_send = sizeof(*at) * (i+1);
+
+ total_sent_bytes += to_send;
+ if (send(fds[1], at, to_send, 0) != to_send) {
+ if (early_error_started)
+ break;
+ fprintf(stderr, "send failed %d\n", errno);
+ ret = -1;
+ goto cleanup;
+ }
+
+ if (i == 2) {
+ if (args->early_error == ERROR_EARLY_CLOSE_RECEIVER) {
+ /* allow previous sends to complete */
+ usleep(1000);
+
+ sqe = io_uring_get_sqe(&ring);
+ io_uring_prep_recv(sqe, fds[0], NULL, 0, 0);
+ io_uring_prep_cancel64(sqe, 1234, 0);
+ sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+ io_uring_submit(&ring);
+ early_error_started = true;
+ }
+ if (args->early_error == ERROR_EARLY_CLOSE_SENDER) {
+ early_error_started = true;
+ shutdown(fds[1], SHUT_RDWR);
+ close(fds[1]);
+ }
+ }
+ at += (i+1);
+
+ if (args->wait_each) {
+ ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
+ if (ret) {
+ fprintf(stderr, "wait_each failed: %d\n", ret);
+ ret = -1;
+ goto cleanup;
+ }
+ while (io_uring_peek_cqe(&ring, &cqe) == 0) {
+ recv_cqe[recv_cqes++] = *cqe;
+ if (cqe->flags & IORING_CQE_F_MORE) {
+ io_uring_cqe_seen(&ring, cqe);
+ } else {
+ early_error = true;
+ io_uring_cqe_seen(&ring, cqe);
+ }
+ }
+ if (early_error)
+ break;
+ }
+ }
+
+ close(fds[1]);
+
+ /* allow sends to finish */
+ usleep(1000);
+
+ if ((args->stream && !early_error) || recv_cqes < min_cqes) {
+ ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
+ if (ret && ret != -ETIME) {
+ fprintf(stderr, "wait final failed: %d\n", ret);
+ ret = -1;
+ goto cleanup;
+ }
+ }
+
+ while (io_uring_peek_cqe(&ring, &cqe) == 0) {
+ recv_cqe[recv_cqes++] = *cqe;
+ io_uring_cqe_seen(&ring, cqe);
+ }
+
+ ret = -1;
+ at = &send_buff[0];
+ if (recv_cqes < min_cqes) {
+ fprintf(stderr, "not enough cqes: have=%d vs %d\n", recv_cqes, min_cqes);
+ goto cleanup;
+ }
+ for (i = 0; i < recv_cqes; i++) {
+ bool const is_last = i == recv_cqes - 1;
+ int *this_recv;
+
+ cqe = &recv_cqe[i];
+
+ if (cqe->res <= 0 || (args->stream && is_last)) {
+ if (!is_last) {
+ fprintf(stderr, "not last cqe had error %d\n", i);
+ goto cleanup;
+ }
+
+ switch (args->early_error) {
+ case ERROR_NOT_ENOUGH_BUFFERS:
+ if (cqe->res != -ENOBUFS) {
+ fprintf(stderr,
+ "ERROR_NOT_ENOUGH_BUFFERS: res %d\n", cqe->res);
+ goto cleanup;
+ }
+ break;
+ case ERROR_EARLY_CLOSE_RECEIVER:
+ if (cqe->res != -ECANCELED) {
+ fprintf(stderr,
+ "ERROR_EARLY_CLOSE_RECEIVER: res %d\n", cqe->res);
+ goto cleanup;
+ }
+ break;
+ case ERROR_NONE:
+ case ERROR_EARLY_CLOSE_SENDER:
+ if (cqe->res != 0) {
+ fprintf(stderr, "early error: res %d\n", cqe->res);
+ goto cleanup;
+ }
+ break;
+ };
+
+ if (cqe->flags & IORING_CQE_F_BUFFER) {
+ fprintf(stderr, "final BUFFER flag set\n");
+ goto cleanup;
+ }
+
+ if (cqe->flags & IORING_CQE_F_MORE) {
+ fprintf(stderr, "final MORE flag set\n");
+ goto cleanup;
+ }
+
+ continue;
+ }
+
+ total_recv_bytes += cqe->res;
+ if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
+ fprintf(stderr, "BUFFER flag not set\n");
+ goto cleanup;
+ }
+ if (!(cqe->flags & IORING_CQE_F_MORE)) {
+ fprintf(stderr, "MORE flag not set\n");
+ goto cleanup;
+ }
+ if (cqe->res % 4 != 0) {
+ /*
+ * doesn't seem to happen in practice, would need some
+ * work to remove this requirement
+ */
+ fprintf(stderr, "unexpectedly aligned buffer cqe->res=%d\n", cqe->res);
+ goto cleanup;
+ }
+
+ /* check buffer arrived in order (for tcp) */
+ this_recv = recv_buffs[cqe->flags >> 16];
+ for (j = 0; args->stream && j < cqe->res / 4; j++) {
+ int sent = *at++;
+ int recv = *this_recv++;
+
+ if (sent != recv) {
+ fprintf(stderr, "recv=%d sent=%d\n", recv, sent);
+ goto cleanup;
+ }
+ }
+ }
+
+ if (args->early_error == ERROR_NONE && total_recv_bytes < total_sent_bytes) {
+ fprintf(stderr,
+ "missing recv: recv=%d sent=%d\n", total_recv_bytes, total_sent_bytes);
+ goto cleanup;
+ }
+
+ /* check the final one */
+ cqe = &recv_cqe[recv_cqes-1];
+
+ ret = 0;
+cleanup:
+ for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
+ free(recv_buffs[i]);
+ close(fds[0]);
+ close(fds[1]);
+ io_uring_queue_exit(&ring);
+
+ return ret;
+}
+
+int main(int argc, char *argv[])
+{
+ int ret;
+ int loop;
+
+ if (argc > 1)
+ return 0;
+
+ for (loop = 0; loop < 32; loop++) {
+ struct args a = {
+ .early_error = loop & 0x03,
+ .stream = loop & 0x04,
+ .recvmsg = loop & 0x08,
+ .wait_each = loop & 0x10,
+ };
+ ret = test(&a);
+ if (ret) {
+ fprintf(stderr,
+ "test stream=%d recvmsg=%d wait_each=%d early_error=%d failed\n",
+ a.stream, a.recvmsg, a.wait_each, a.early_error);
+ return ret;
+ }
+ }
+ return 0;
+}
--
2.30.2
next prev parent reply other threads:[~2022-06-28 15:04 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-06-28 15:04 [PATCH liburing 0/4] liburing: multishot receive Dylan Yudaken
2022-06-28 15:04 ` [PATCH liburing 1/4] add t_create_socket_pair Dylan Yudaken
2022-06-28 15:04 ` [PATCH liburing 2/4] add IORING_RECV_MULTISHOT to io_uring.h Dylan Yudaken
2022-06-28 16:10 ` Ammar Faizi
2022-06-28 16:12 ` Jens Axboe
2022-06-28 16:12 ` Dylan Yudaken
2022-06-28 15:04 ` Dylan Yudaken [this message]
2022-06-28 15:04 ` [PATCH liburing 4/4] add IORING_RECV_MULTISHOT docs Dylan Yudaken
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox