public inbox for [email protected]
 help / color / mirror / Atom feed
From: Pavel Begunkov <[email protected]>
To: [email protected]
Cc: Jens Axboe <[email protected]>, [email protected]
Subject: [PATCH liburing 3/3] examples/send-zc: add the receive part
Date: Wed,  1 Mar 2023 16:10:12 +0000	[thread overview]
Message-ID: <00e887532822d4744741655532be4fbede0f18b0.1677686850.git.asml.silence@gmail.com> (raw)
In-Reply-To: <[email protected]>

'-R' will switch the benchmark into the server mode accepting data. For
TCP the number of threads should match the number of threads of the
client. For UDP just one thread/connection should be enough.

Signed-off-by: Pavel Begunkov <[email protected]>
---
 examples/send-zerocopy.c | 146 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 143 insertions(+), 3 deletions(-)

diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
index c0549a1..75e516c 100644
--- a/examples/send-zerocopy.c
+++ b/examples/send-zerocopy.c
@@ -13,6 +13,7 @@
 #include <string.h>
 #include <pthread.h>
 
+#include <poll.h>
 #include <sched.h>
 #include <arpa/inet.h>
 #include <linux/errqueue.h>
@@ -53,6 +54,7 @@ struct thread_data {
 	unsigned long long packets;
 	unsigned long long bytes;
 	struct sockaddr_storage dst_addr;
+	int fd;
 };
 
 static bool cfg_reg_ringfd = true;
@@ -63,6 +65,7 @@ static bool cfg_fixed_buf = 1;
 static bool cfg_hugetlb = 0;
 static bool cfg_defer_taskrun = 0;
 static int  cfg_cpu = -1;
+static bool cfg_rx = 0;
 static unsigned  cfg_nr_threads = 1;
 
 static int  cfg_family		= PF_UNSPEC;
@@ -165,6 +168,135 @@ static void setup_sockaddr(int domain, const char *str_addr,
 	}
 }
 
+static int do_poll(int fd, int events)
+{
+	struct pollfd pfd;
+	int ret;
+
+	pfd.events = events;
+	pfd.revents = 0;
+	pfd.fd = fd;
+
+	ret = poll(&pfd, 1, -1);
+	if (ret == -1)
+		t_error(1, errno, "poll");
+
+	return ret && (pfd.revents & events);
+}
+
+/* Flush all outstanding bytes for the tcp receive queue */
+static int do_flush_tcp(struct thread_data *td, int fd)
+{
+	int ret;
+
+	/* MSG_TRUNC flushes up to len bytes */
+	ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
+	if (ret == -1 && errno == EAGAIN)
+		return 0;
+	if (ret == -1)
+		t_error(1, errno, "flush");
+	if (!ret)
+		return 1;
+
+	td->packets++;
+	td->bytes += ret;
+	return 0;
+}
+
+/* Flush all outstanding datagrams. Verify first few bytes of each. */
+static int do_flush_datagram(struct thread_data *td, int fd, int type)
+{
+	int ret, off = 0;
+	char buf[64];
+
+	/* MSG_TRUNC will return full datagram length */
+	ret = recv(fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_TRUNC);
+	if (ret == -1 && errno == EAGAIN)
+		return 0;
+
+	if (ret == -1)
+		t_error(1, errno, "recv");
+	if (ret != cfg_payload_len)
+		t_error(1, 0, "recv: ret=%u != %u", ret, cfg_payload_len);
+	if (ret > sizeof(buf) - off)
+		ret = sizeof(buf) - off;
+	if (memcmp(buf + off, payload, ret))
+		t_error(1, 0, "recv: data mismatch");
+
+	td->packets++;
+	td->bytes += cfg_payload_len;
+	return 0;
+}
+
+static void do_setup_rx(int domain, int type, int protocol)
+{
+	struct sockaddr_storage addr = {};
+	struct thread_data *td;
+	int listen_fd, fd, i;
+
+	fd = socket(domain, type, protocol);
+	if (fd == -1)
+		t_error(1, errno, "socket r");
+
+	do_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, 1 << 21);
+	do_setsockopt(fd, SOL_SOCKET, SO_RCVLOWAT, 1 << 16);
+	do_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
+
+	setup_sockaddr(cfg_family, str_addr, &addr);
+
+	if (bind(fd, (void *)&addr, cfg_alen))
+		t_error(1, errno, "bind");
+
+	if (type != SOCK_STREAM) {
+		if (cfg_nr_threads != 1)
+			t_error(1, 0, "udp rx cant multithread");
+		threads[0].fd = fd;
+		return;
+	}
+
+	listen_fd = fd;
+	if (listen(listen_fd, cfg_nr_threads))
+		t_error(1, errno, "listen");
+
+	for (i = 0; i < cfg_nr_threads; i++) {
+		td = &threads[i];
+
+		fd = accept(listen_fd, NULL, NULL);
+		if (fd == -1)
+			t_error(1, errno, "accept");
+		td->fd = fd;
+	}
+
+	if (close(listen_fd))
+		t_error(1, errno, "close listen sock");
+}
+
+static void *do_rx(void *arg)
+{
+	struct thread_data *td = arg;
+	const int cfg_receiver_wait_ms = 400;
+	uint64_t tstop;
+	int ret, fd = td->fd;
+
+	tstop = gettimeofday_ms() + cfg_runtime_ms + cfg_receiver_wait_ms;
+	do {
+		if (cfg_type == SOCK_STREAM)
+			ret = do_flush_tcp(td, fd);
+		else
+			ret = do_flush_datagram(td, fd, cfg_type);
+
+		if (ret)
+			break;
+
+		do_poll(fd, POLLIN);
+	} while (gettimeofday_ms() < tstop);
+
+	if (close(fd))
+		t_error(1, errno, "close");
+	pthread_exit(&td->ret);
+	return NULL;
+}
+
 static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
 {
 	struct io_uring_cqe *cqe;
@@ -284,7 +416,7 @@ static void do_tx(struct thread_data *td, int domain, int type, int protocol)
 				td->bytes += cqe->res;
 			} else if (cqe->res == -ECONNREFUSED || cqe->res == -EPIPE ||
 				   cqe->res == -ECONNRESET) {
-				fprintf(stderr, "Connection failure");
+				fprintf(stderr, "Connection failure\n");
 				goto out_fail;
 			} else if (cqe->res != -EAGAIN) {
 				t_error(1, cqe->res, "send failed");
@@ -317,6 +449,7 @@ static void *do_test(void *arg)
 
 	do_tx(td, cfg_family, cfg_type, protocol);
 	pthread_exit(&td->ret);
+	return NULL;
 }
 
 static void usage(const char *filepath)
@@ -339,7 +472,7 @@ static void parse_opts(int argc, char **argv)
 
 	cfg_payload_len = max_payload_len;
 
-	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:")) != -1) {
+	while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:R")) != -1) {
 		switch (c) {
 		case '4':
 			if (cfg_family != PF_UNSPEC)
@@ -388,6 +521,9 @@ static void parse_opts(int argc, char **argv)
 			if (cfg_nr_threads > MAX_THREADS)
 				t_error(1, 0, "too many threads\n");
 			break;
+		case 'R':
+			cfg_rx = 1;
+			break;
 		}
 	}
 
@@ -442,8 +578,12 @@ int main(int argc, char **argv)
 		td->idx = i;
 	}
 
+	if (cfg_rx)
+		do_setup_rx(cfg_family, cfg_type, 0);
+
 	for (i = 0; i < cfg_nr_threads; i++)
-		pthread_create(&threads[i].thread, NULL, do_test, td);
+		pthread_create(&threads[i].thread, NULL,
+				!cfg_rx ? do_test : do_rx, &threads[i]);
 
 	for (i = 0; i < cfg_nr_threads; i++) {
 		td = &threads[i];
-- 
2.39.1


  parent reply	other threads:[~2023-03-01 16:14 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-03-01 16:10 [PATCH liburing 0/3] sendzc test improvements Pavel Begunkov
2023-03-01 16:10 ` [PATCH liburing 1/3] examples/send-zc: add affinity / CPU pinning Pavel Begunkov
2023-03-01 16:10 ` [PATCH liburing 2/3] examples/send-zc: add multithreading Pavel Begunkov
2023-03-01 16:10 ` Pavel Begunkov [this message]
2023-03-01 20:00 ` [PATCH liburing 0/3] sendzc test improvements Jens Axboe
2023-03-01 20:31   ` Pavel Begunkov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=00e887532822d4744741655532be4fbede0f18b0.1677686850.git.asml.silence@gmail.com \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox