From: Pavel Begunkov <[email protected]>
To: [email protected]
Cc: Jens Axboe <[email protected]>, [email protected]
Subject: [PATCH liburing v2 4/5] examples/send-zc: add the receive part
Date: Sun, 5 Mar 2023 05:13:07 +0000 [thread overview]
Message-ID: <05beb317f14f7903f8edf7be981d17ad1dd46770.1677993039.git.asml.silence@gmail.com> (raw)
In-Reply-To: <[email protected]>
'-R' will switch the benchmark into the server mode accepting data. For
TCP the number of threads should match the number of threads of the
client. For UDP just one thread/connection should be enough.
Signed-off-by: Pavel Begunkov <[email protected]>
---
examples/send-zerocopy.c | 148 +++++++++++++++++++++++++++++++++++++--
1 file changed, 144 insertions(+), 4 deletions(-)
diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
index 683a965..8e1242e 100644
--- a/examples/send-zerocopy.c
+++ b/examples/send-zerocopy.c
@@ -13,6 +13,7 @@
#include <string.h>
#include <pthread.h>
+#include <poll.h>
#include <sched.h>
#include <arpa/inet.h>
#include <linux/if_packet.h>
@@ -52,6 +53,7 @@ struct thread_data {
unsigned long long packets;
unsigned long long bytes;
struct sockaddr_storage dst_addr;
+ int fd;
};
static bool cfg_reg_ringfd = true;
@@ -62,6 +64,7 @@ static bool cfg_fixed_buf = 1;
static bool cfg_hugetlb = 0;
static bool cfg_defer_taskrun = 0;
static int cfg_cpu = -1;
+static bool cfg_rx = 0;
static unsigned cfg_nr_threads = 1;
static int cfg_family = PF_UNSPEC;
@@ -164,6 +167,135 @@ static void setup_sockaddr(int domain, const char *str_addr,
}
}
+static int do_poll(int fd, int events)
+{
+ struct pollfd pfd;
+ int ret;
+
+ pfd.events = events;
+ pfd.revents = 0;
+ pfd.fd = fd;
+
+ ret = poll(&pfd, 1, -1);
+ if (ret == -1)
+ t_error(1, errno, "poll");
+
+ return ret && (pfd.revents & events);
+}
+
+/* Flush all outstanding bytes for the tcp receive queue */
+static int do_flush_tcp(struct thread_data *td, int fd)
+{
+ int ret;
+
+ /* MSG_TRUNC flushes up to len bytes */
+ ret = recv(fd, NULL, 1 << 21, MSG_TRUNC | MSG_DONTWAIT);
+ if (ret == -1 && errno == EAGAIN)
+ return 0;
+ if (ret == -1)
+ t_error(1, errno, "flush");
+ if (!ret)
+ return 1;
+
+ td->packets++;
+ td->bytes += ret;
+ return 0;
+}
+
+/* Flush all outstanding datagrams. Verify first few bytes of each. */
+static int do_flush_datagram(struct thread_data *td, int fd, int type)
+{
+ int ret, off = 0;
+ char buf[64];
+
+ /* MSG_TRUNC will return full datagram length */
+ ret = recv(fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_TRUNC);
+ if (ret == -1 && errno == EAGAIN)
+ return 0;
+
+ if (ret == -1)
+ t_error(1, errno, "recv");
+ if (ret != cfg_payload_len)
+ t_error(1, 0, "recv: ret=%u != %u", ret, cfg_payload_len);
+ if (ret > sizeof(buf) - off)
+ ret = sizeof(buf) - off;
+ if (memcmp(buf + off, payload, ret))
+ t_error(1, 0, "recv: data mismatch");
+
+ td->packets++;
+ td->bytes += cfg_payload_len;
+ return 0;
+}
+
+static void do_setup_rx(int domain, int type, int protocol)
+{
+ struct sockaddr_storage addr = {};
+ struct thread_data *td;
+ int listen_fd, fd, i;
+
+ fd = socket(domain, type, protocol);
+ if (fd == -1)
+ t_error(1, errno, "socket r");
+
+ do_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, 1 << 21);
+ do_setsockopt(fd, SOL_SOCKET, SO_RCVLOWAT, 1 << 16);
+ do_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
+
+ setup_sockaddr(cfg_family, str_addr, &addr);
+
+ if (bind(fd, (void *)&addr, cfg_alen))
+ t_error(1, errno, "bind");
+
+ if (type != SOCK_STREAM) {
+ if (cfg_nr_threads != 1)
+ t_error(1, 0, "udp rx cant multithread");
+ threads[0].fd = fd;
+ return;
+ }
+
+ listen_fd = fd;
+ if (listen(listen_fd, cfg_nr_threads))
+ t_error(1, errno, "listen");
+
+ for (i = 0; i < cfg_nr_threads; i++) {
+ td = &threads[i];
+
+ fd = accept(listen_fd, NULL, NULL);
+ if (fd == -1)
+ t_error(1, errno, "accept");
+ td->fd = fd;
+ }
+
+ if (close(listen_fd))
+ t_error(1, errno, "close listen sock");
+}
+
+static void *do_rx(void *arg)
+{
+ struct thread_data *td = arg;
+ const int cfg_receiver_wait_ms = 400;
+ uint64_t tstop;
+ int ret, fd = td->fd;
+
+ tstop = gettimeofday_ms() + cfg_runtime_ms + cfg_receiver_wait_ms;
+ do {
+ if (cfg_type == SOCK_STREAM)
+ ret = do_flush_tcp(td, fd);
+ else
+ ret = do_flush_datagram(td, fd, cfg_type);
+
+ if (ret)
+ break;
+
+ do_poll(fd, POLLIN);
+ } while (gettimeofday_ms() < tstop);
+
+ if (close(fd))
+ t_error(1, errno, "close");
+ pthread_exit(&td->ret);
+ return NULL;
+}
+
static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
{
struct io_uring_cqe *cqe;
@@ -283,7 +415,7 @@ static void do_tx(struct thread_data *td, int domain, int type, int protocol)
td->bytes += cqe->res;
} else if (cqe->res == -ECONNREFUSED || cqe->res == -EPIPE ||
cqe->res == -ECONNRESET) {
- fprintf(stderr, "Connection failure");
+ fprintf(stderr, "Connection failure\n");
goto out_fail;
} else if (cqe->res != -EAGAIN) {
t_error(1, cqe->res, "send failed");
@@ -316,6 +448,7 @@ static void *do_test(void *arg)
do_tx(td, cfg_family, cfg_type, protocol);
pthread_exit(&td->ret);
+ return NULL;
}
static void usage(const char *filepath)
@@ -338,7 +471,7 @@ static void parse_opts(int argc, char **argv)
cfg_payload_len = max_payload_len;
- while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:")) != -1) {
+ while ((c = getopt(argc, argv, "46D:p:s:t:n:z:b:l:dC:T:R")) != -1) {
switch (c) {
case '4':
if (cfg_family != PF_UNSPEC)
@@ -387,6 +520,9 @@ static void parse_opts(int argc, char **argv)
if (cfg_nr_threads > MAX_THREADS)
t_error(1, 0, "too many threads\n");
break;
+ case 'R':
+ cfg_rx = 1;
+ break;
}
}
@@ -441,8 +577,12 @@ int main(int argc, char **argv)
td->idx = i;
}
+ if (cfg_rx)
+ do_setup_rx(cfg_family, cfg_type, 0);
+
for (i = 0; i < cfg_nr_threads; i++)
- pthread_create(&threads[i].thread, NULL, do_test, td);
+ pthread_create(&threads[i].thread, NULL,
+ !cfg_rx ? do_test : do_rx, &threads[i]);
for (i = 0; i < cfg_nr_threads; i++) {
td = &threads[i];
@@ -451,7 +591,7 @@ int main(int argc, char **argv)
bytes += td->bytes;
}
- fprintf(stderr, "tx=%llu (MB=%llu), tx/s=%llu (MB/s=%llu)\n",
+ fprintf(stderr, "packets=%llu (MB=%llu), rps=%llu (MB/s=%llu)\n",
packets, bytes >> 20,
packets / (cfg_runtime_ms / 1000),
(bytes >> 20) / (cfg_runtime_ms / 1000));
--
2.39.1
next prev parent reply other threads:[~2023-03-05 5:14 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-03-05 5:13 [PATCH liburing v2 0/5] sendzc test improvements Pavel Begunkov
2023-03-05 5:13 ` [PATCH liburing v2 1/5] examples/send-zc: add defer taskrun support Pavel Begunkov
2023-03-05 5:13 ` [PATCH liburing v2 2/5] examples/send-zc: add affinity / CPU pinning Pavel Begunkov
2023-03-05 5:13 ` [PATCH liburing v2 3/5] examples/send-zc: add multithreading Pavel Begunkov
2023-03-05 5:13 ` Pavel Begunkov [this message]
2023-03-05 5:13 ` [PATCH liburing v2 5/5] examples/send-zc: kill sock bufs configuration Pavel Begunkov
2023-03-05 14:35 ` [PATCH liburing v2 0/5] sendzc test improvements Jens Axboe
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=05beb317f14f7903f8edf7be981d17ad1dd46770.1677993039.git.asml.silence@gmail.com \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox