* [PATCH liburing v2 0/5] zerocopy send headers and tests
@ 2022-07-25 11:33 Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers Pavel Begunkov
` (6 more replies)
0 siblings, 7 replies; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi
Add zerocopy send headers, helpers and tests
v2:
use T_EXIT_*
fix ptr <-> int conversions for 32 bits arches
slight renaming
get rid of error() in the test
add patch 5/5
Pavel Begunkov (5):
io_uring.h: sync with kernel for zc send and notifiers
liburing: add zc send and notif helpers
tests: add tests for zerocopy send and notifications
examples: add a zerocopy send example
liburing: improve fallocate typecasting
examples/Makefile | 3 +-
examples/send-zerocopy.c | 366 +++++++++++++
src/include/liburing.h | 47 +-
src/include/liburing/io_uring.h | 37 +-
src/liburing.map | 2 +
src/register.c | 20 +
test/Makefile | 1 +
test/send-zerocopy.c | 888 ++++++++++++++++++++++++++++++++
8 files changed, 1358 insertions(+), 6 deletions(-)
create mode 100644 examples/send-zerocopy.c
create mode 100644 test/send-zerocopy.c
--
2.37.0
^ permalink raw reply [flat|nested] 10+ messages in thread
* [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 2/5] liburing: add zc send and notif helpers Pavel Begunkov
` (5 subsequent siblings)
6 siblings, 0 replies; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi
Signed-off-by: Pavel Begunkov <[email protected]>
---
src/include/liburing/io_uring.h | 37 +++++++++++++++++++++++++++++++--
1 file changed, 35 insertions(+), 2 deletions(-)
diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index 99e6963..3953807 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -64,6 +64,10 @@ struct io_uring_sqe {
union {
__s32 splice_fd_in;
__u32 file_index;
+ struct {
+ __u16 notification_idx;
+ __u16 addr_len;
+ };
};
__u64 addr3;
__u64 __pad2[1];
@@ -160,7 +164,8 @@ enum io_uring_op {
IORING_OP_FALLOCATE,
IORING_OP_OPENAT,
IORING_OP_CLOSE,
- IORING_OP_FILES_UPDATE,
+ IORING_OP_RSRC_UPDATE,
+ IORING_OP_FILES_UPDATE = IORING_OP_RSRC_UPDATE,
IORING_OP_STATX,
IORING_OP_READ,
IORING_OP_WRITE,
@@ -187,6 +192,7 @@ enum io_uring_op {
IORING_OP_GETXATTR,
IORING_OP_SOCKET,
IORING_OP_URING_CMD,
+ IORING_OP_SENDZC_NOTIF,
/* this goes last, obviously */
IORING_OP_LAST,
@@ -208,6 +214,7 @@ enum io_uring_op {
#define IORING_TIMEOUT_ETIME_SUCCESS (1U << 5)
#define IORING_TIMEOUT_CLOCK_MASK (IORING_TIMEOUT_BOOTTIME | IORING_TIMEOUT_REALTIME)
#define IORING_TIMEOUT_UPDATE_MASK (IORING_TIMEOUT_UPDATE | IORING_LINK_TIMEOUT_UPDATE)
+
/*
* sqe->splice_flags
* extends splice(2) flags
@@ -254,13 +261,23 @@ enum io_uring_op {
* CQEs on behalf of the same SQE.
*/
#define IORING_RECVSEND_POLL_FIRST (1U << 0)
-#define IORING_RECV_MULTISHOT (1U << 1)
+#define IORING_RECV_MULTISHOT (1U << 1)
+#define IORING_RECVSEND_FIXED_BUF (1U << 2)
+#define IORING_RECVSEND_NOTIF_FLUSH (1U << 3)
/*
* accept flags stored in sqe->ioprio
*/
#define IORING_ACCEPT_MULTISHOT (1U << 0)
+/*
+ * IORING_OP_RSRC_UPDATE flags
+ */
+enum {
+ IORING_RSRC_UPDATE_FILES,
+ IORING_RSRC_UPDATE_NOTIF,
+};
+
/*
* IO completion data structure (Completion Queue Entry)
*/
@@ -426,6 +443,9 @@ enum {
/* register a range of fixed file slots for automatic slot allocation */
IORING_REGISTER_FILE_ALLOC_RANGE = 25,
+ IORING_REGISTER_NOTIFIERS = 26,
+ IORING_UNREGISTER_NOTIFIERS = 27,
+
/* this goes last */
IORING_REGISTER_LAST
};
@@ -472,6 +492,19 @@ struct io_uring_rsrc_update2 {
__u32 resv2;
};
+struct io_uring_notification_slot {
+ __u64 tag;
+ __u64 resv[3];
+};
+
+struct io_uring_notification_register {
+ __u32 nr_slots;
+ __u32 resv;
+ __u64 resv2;
+ __u64 data;
+ __u64 resv3;
+};
+
/* Skip updating fd indexes set to this value in the fd table */
#define IORING_REGISTER_FILES_SKIP (-2)
--
2.37.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH liburing v2 2/5] liburing: add zc send and notif helpers
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
2022-07-25 12:35 ` Ammar Faizi
2022-07-25 11:33 ` [PATCH liburing v2 3/5] tests: add tests for zerocopy send and notifications Pavel Begunkov
` (4 subsequent siblings)
6 siblings, 1 reply; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi
Add helpers for notification registration and preparing zerocopy send
requests.
Signed-off-by: Pavel Begunkov <[email protected]>
---
src/include/liburing.h | 42 ++++++++++++++++++++++++++++++++++++++++++
src/liburing.map | 2 ++
src/register.c | 20 ++++++++++++++++++++
3 files changed, 64 insertions(+)
diff --git a/src/include/liburing.h b/src/include/liburing.h
index fc7613d..20cd308 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -189,6 +189,10 @@ int io_uring_register_sync_cancel(struct io_uring *ring,
int io_uring_register_file_alloc_range(struct io_uring *ring,
unsigned off, unsigned len);
+int io_uring_register_notifications(struct io_uring *ring, unsigned nr,
+ struct io_uring_notification_slot *slots);
+int io_uring_unregister_notifications(struct io_uring *ring);
+
/*
* Helper for the peek/wait single cqe functions. Exported because of that,
* but probably shouldn't be used directly in an application.
@@ -677,6 +681,44 @@ static inline void io_uring_prep_send(struct io_uring_sqe *sqe, int sockfd,
sqe->msg_flags = (__u32) flags;
}
+static inline void io_uring_prep_sendzc(struct io_uring_sqe *sqe, int sockfd,
+ const void *buf, size_t len, int flags,
+ unsigned slot_idx, unsigned zc_flags)
+{
+ io_uring_prep_rw(IORING_OP_SENDZC_NOTIF, sqe, sockfd, buf, (__u32) len, 0);
+ sqe->msg_flags = (__u32) flags;
+ sqe->notification_idx = slot_idx;
+ sqe->ioprio = zc_flags;
+}
+
+static inline void io_uring_prep_sendzc_fixed(struct io_uring_sqe *sqe, int sockfd,
+ const void *buf, size_t len,
+ int flags, unsigned slot_idx,
+ unsigned zc_flags, unsigned buf_idx)
+{
+ io_uring_prep_sendzc(sqe, sockfd, buf, len, flags, slot_idx, zc_flags);
+ sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
+ sqe->buf_index = buf_idx;
+}
+
+static inline void io_uring_prep_sendzc_set_addr(struct io_uring_sqe *sqe,
+ const struct sockaddr *dest_addr,
+ __u16 addr_len)
+{
+ sqe->addr2 = (unsigned long)(void *)dest_addr;
+ sqe->addr_len = addr_len;
+}
+
+static inline void io_uring_prep_notif_update(struct io_uring_sqe *sqe,
+ __u64 new_tag, /* 0 to ignore */
+ unsigned offset, unsigned nr)
+{
+ io_uring_prep_rw(IORING_OP_FILES_UPDATE, sqe, -1, 0, nr,
+ (__u64)offset);
+ sqe->addr = new_tag;
+ sqe->ioprio = IORING_RSRC_UPDATE_NOTIF;
+}
+
static inline void io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd,
void *buf, size_t len, int flags)
{
diff --git a/src/liburing.map b/src/liburing.map
index 318d3d7..7d8f143 100644
--- a/src/liburing.map
+++ b/src/liburing.map
@@ -60,4 +60,6 @@ LIBURING_2.3 {
global:
io_uring_register_sync_cancel;
io_uring_register_file_alloc_range;
+ io_uring_register_notifications;
+ io_uring_unregister_notifications;
} LIBURING_2.2;
diff --git a/src/register.c b/src/register.c
index 2b37e5f..7482112 100644
--- a/src/register.c
+++ b/src/register.c
@@ -364,3 +364,23 @@ int io_uring_register_file_alloc_range(struct io_uring *ring,
IORING_REGISTER_FILE_ALLOC_RANGE, &range,
0);
}
+
+int io_uring_register_notifications(struct io_uring *ring, unsigned nr,
+ struct io_uring_notification_slot *slots)
+{
+ struct io_uring_notification_register r = {
+ .nr_slots = nr,
+ .data = (unsigned long)slots,
+ };
+
+ return __sys_io_uring_register(ring->ring_fd,
+ IORING_REGISTER_NOTIFIERS,
+ &r, sizeof(r));
+}
+
+int io_uring_unregister_notifications(struct io_uring *ring)
+{
+ return __sys_io_uring_register(ring->ring_fd,
+ IORING_UNREGISTER_NOTIFIERS,
+ NULL, 0);
+}
--
2.37.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH liburing v2 3/5] tests: add tests for zerocopy send and notifications
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 2/5] liburing: add zc send and notif helpers Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 4/5] examples: add a zerocopy send example Pavel Begunkov
` (3 subsequent siblings)
6 siblings, 0 replies; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi
Signed-off-by: Pavel Begunkov <[email protected]>
---
test/Makefile | 1 +
test/send-zerocopy.c | 888 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 889 insertions(+)
create mode 100644 test/send-zerocopy.c
diff --git a/test/Makefile b/test/Makefile
index 8945368..a36ddb3 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -175,6 +175,7 @@ test_srcs := \
xattr.c \
skip-cqe.c \
single-issuer.c \
+ send-zerocopy.c \
# EOL
all_targets :=
diff --git a/test/send-zerocopy.c b/test/send-zerocopy.c
new file mode 100644
index 0000000..6fa0535
--- /dev/null
+++ b/test/send-zerocopy.c
@@ -0,0 +1,888 @@
+/* SPDX-License-Identifier: MIT */
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <assert.h>
+#include <errno.h>
+#include <error.h>
+#include <limits.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <arpa/inet.h>
+#include <linux/errqueue.h>
+#include <linux/if_packet.h>
+#include <linux/ipv6.h>
+#include <linux/socket.h>
+#include <linux/sockios.h>
+#include <net/ethernet.h>
+#include <net/if.h>
+#include <netinet/ip.h>
+#include <netinet/in.h>
+#include <netinet/ip6.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <sys/un.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "liburing.h"
+#include "helpers.h"
+
+#define MAX_MSG 128
+
+#define PORT 10200
+#define HOST "127.0.0.1"
+#define HOSTV6 "::1"
+
+#define NR_SLOTS 5
+#define ZC_TAG 10000
+#define MAX_PAYLOAD 8195
+#define BUFFER_OFFSET 41
+
+#ifndef ARRAY_SIZE
+ #define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0]))
+#endif
+
+static int seqs[NR_SLOTS];
+static char tx_buffer[MAX_PAYLOAD] __attribute__((aligned(4096)));
+static char rx_buffer[MAX_PAYLOAD] __attribute__((aligned(4096)));
+static struct iovec buffers_iov[] = {
+ { .iov_base = tx_buffer,
+ .iov_len = sizeof(tx_buffer), },
+ { .iov_base = tx_buffer + BUFFER_OFFSET,
+ .iov_len = sizeof(tx_buffer) - BUFFER_OFFSET - 13, },
+};
+
+static inline bool tag_userdata(__u64 user_data)
+{
+ return ZC_TAG <= user_data && user_data < ZC_TAG + NR_SLOTS;
+}
+
+static bool check_cq_empty(struct io_uring *ring)
+{
+ struct io_uring_cqe *cqe = NULL;
+ int ret;
+
+ ret = io_uring_peek_cqe(ring, &cqe); /* nothing should be there */
+ return ret == -EAGAIN;
+}
+
+static int register_notifications(struct io_uring *ring)
+{
+ struct io_uring_notification_slot slots[NR_SLOTS] = {};
+ int i;
+
+ memset(seqs, 0, sizeof(seqs));
+ for (i = 0; i < NR_SLOTS; i++)
+ slots[i].tag = ZC_TAG + i;
+ return io_uring_register_notifications(ring, NR_SLOTS, slots);
+}
+
+static int reregister_notifications(struct io_uring *ring)
+{
+ int ret;
+
+ ret = io_uring_unregister_notifications(ring);
+ if (ret) {
+ fprintf(stderr, "unreg notifiers failed %i\n", ret);
+ return ret;
+ }
+
+ return register_notifications(ring);
+}
+
+static int do_one(struct io_uring *ring, int sock_tx, int slot_idx)
+{
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int msg_flags = 0;
+ unsigned zc_flags = 0;
+ int ret;
+
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, 1, msg_flags,
+ slot_idx, zc_flags);
+ sqe->user_data = 1;
+
+ ret = io_uring_submit(ring);
+ assert(ret == 1);
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret);
+ assert(cqe->user_data == 1);
+ ret = cqe->res;
+ io_uring_cqe_seen(ring, cqe);
+ assert(check_cq_empty(ring));
+ return ret;
+}
+
+static int test_invalid_slot(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+ int ret;
+
+ ret = do_one(ring, sock_tx, NR_SLOTS);
+ assert(ret == -EINVAL);
+ return 0;
+}
+
+static int test_basic_send(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int msg_flags = 0;
+ int slot_idx = 0;
+ unsigned zc_flags = 0;
+ int payload_size = 100;
+ int ret;
+
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size, msg_flags,
+ slot_idx, zc_flags);
+ sqe->user_data = 1;
+
+ ret = io_uring_submit(ring);
+ assert(ret == 1);
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret);
+ assert(cqe->user_data == 1 && cqe->res >= 0);
+ io_uring_cqe_seen(ring, cqe);
+ assert(check_cq_empty(ring));
+
+ ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+ assert(ret == payload_size);
+ return 0;
+}
+
+static int test_send_flush(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int msg_flags = 0;
+ int slot_idx = 0;
+ unsigned zc_flags = 0;
+ int payload_size = 100;
+ int ret, i, j;
+ int req_cqes, notif_cqes;
+
+ /* now do send+flush, do many times to verify seqs */
+ for (j = 0; j < NR_SLOTS * 5; j++) {
+ zc_flags = IORING_RECVSEND_NOTIF_FLUSH;
+ slot_idx = rand() % NR_SLOTS;
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size,
+ msg_flags, slot_idx, zc_flags);
+ sqe->user_data = 1;
+
+ ret = io_uring_submit(ring);
+ assert(ret == 1);
+
+ req_cqes = notif_cqes = 1;
+ for (i = 0; i < 2; i ++) {
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret);
+
+ if (cqe->user_data == 1) {
+ assert(req_cqes > 0);
+ req_cqes--;
+ assert(cqe->res == payload_size);
+ } else if (cqe->user_data == ZC_TAG + slot_idx) {
+ assert(notif_cqes > 0);
+ notif_cqes--;
+ assert(cqe->res == 0 && cqe->flags == seqs[slot_idx]);
+ seqs[slot_idx]++;
+ } else {
+ fprintf(stderr, "invalid cqe %lu %i\n",
+ (unsigned long)cqe->user_data, cqe->res);
+ return -1;
+ }
+ io_uring_cqe_seen(ring, cqe);
+ }
+ assert(check_cq_empty(ring));
+
+ ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+ assert(ret == payload_size);
+ }
+ return 0;
+}
+
+static int test_multireq_notif(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+ bool slot_seen[NR_SLOTS] = {};
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int msg_flags = 0;
+ int slot_idx = 0;
+ unsigned zc_flags = 0;
+ int payload_size = 1;
+ int ret, j, i = 0;
+ int nr = NR_SLOTS * 21;
+
+ while (i < nr) {
+ int nr_per_wave = 23;
+
+ for (j = 0; j < nr_per_wave && i < nr; j++, i++) {
+ slot_idx = rand() % NR_SLOTS;
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size,
+ msg_flags, slot_idx, zc_flags);
+ sqe->user_data = i;
+ }
+ ret = io_uring_submit(ring);
+ assert(ret == j);
+ }
+
+ for (i = 0; i < nr; i++) {
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret);
+ assert(cqe->user_data < nr && cqe->res == payload_size);
+ io_uring_cqe_seen(ring, cqe);
+
+ ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+ assert(ret == payload_size);
+ }
+ assert(check_cq_empty(ring));
+
+ zc_flags = IORING_RECVSEND_NOTIF_FLUSH;
+ for (slot_idx = 0; slot_idx < NR_SLOTS; slot_idx++) {
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size,
+ msg_flags, slot_idx, zc_flags);
+ sqe->user_data = slot_idx;
+ /* just to simplify cqe handling */
+ sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+ }
+ ret = io_uring_submit(ring);
+ assert(ret == NR_SLOTS);
+
+ for (i = 0; i < NR_SLOTS; i++) {
+ int slot_idx;
+
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret);
+ assert(tag_userdata(cqe->user_data));
+
+ slot_idx = cqe->user_data - ZC_TAG;
+ assert(!slot_seen[slot_idx]);
+ slot_seen[slot_idx] = true;
+
+ assert(cqe->res == 0 && cqe->flags == seqs[slot_idx]);
+ seqs[slot_idx]++;
+ io_uring_cqe_seen(ring, cqe);
+
+ ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+ assert(ret == payload_size);
+ }
+ assert(check_cq_empty(ring));
+
+ for (i = 0; i < NR_SLOTS; i++)
+ assert(slot_seen[i]);
+ return 0;
+}
+
+static int test_multi_send_flushing(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ unsigned zc_flags = IORING_RECVSEND_NOTIF_FLUSH;
+ int msg_flags = 0, slot_idx = 0;
+ int payload_size = 1;
+ int ret, j, i = 0;
+ int nr = NR_SLOTS * 30;
+ unsigned long long check = 0, expected = 0;
+
+ while (i < nr) {
+ int nr_per_wave = 25;
+
+ for (j = 0; j < nr_per_wave && i < nr; j++, i++) {
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_sendzc(sqe, sock_tx, tx_buffer, payload_size,
+ msg_flags, slot_idx, zc_flags);
+ sqe->user_data = 1;
+ sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+ }
+ ret = io_uring_submit(ring);
+ assert(ret == j);
+ }
+
+ for (i = 0; i < nr; i++) {
+ int seq;
+
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret);
+ assert(!cqe->res);
+ assert(tag_userdata(cqe->user_data));
+
+ seq = cqe->flags;
+ check += seq * 100007UL;
+ io_uring_cqe_seen(ring, cqe);
+
+ ret = recv(sock_rx, rx_buffer, payload_size, MSG_TRUNC);
+ assert(ret == payload_size);
+ }
+ assert(check_cq_empty(ring));
+
+ for (i = 0; i < nr; i++)
+ expected += (i + seqs[slot_idx]) * 100007UL;
+ assert(check == expected);
+ seqs[slot_idx] += nr;
+ return 0;
+}
+
+static int do_one_fail_notif_flush(struct io_uring *ring, int off, int nr)
+{
+ struct io_uring_cqe *cqe;
+ struct io_uring_sqe *sqe;
+ int ret;
+
+ /* single out-of-bounds slot */
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_notif_update(sqe, 0, off, nr);
+ sqe->user_data = 1;
+ ret = io_uring_submit(ring);
+ assert(ret == 1);
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret && cqe->user_data == 1);
+ ret = cqe->res;
+ io_uring_cqe_seen(ring, cqe);
+ return ret;
+}
+
+static int test_update_flush_fail(struct io_uring *ring)
+{
+ int ret;
+
+ /* single out-of-bounds slot */
+ ret = do_one_fail_notif_flush(ring, NR_SLOTS, 1);
+ assert(ret == -EINVAL);
+
+ /* out-of-bounds range */
+ ret = do_one_fail_notif_flush(ring, 0, NR_SLOTS + 3);
+ assert(ret == -EINVAL);
+ ret = do_one_fail_notif_flush(ring, NR_SLOTS - 1, 2);
+ assert(ret == -EINVAL);
+
+ /* overflow checks, note it's u32 internally */
+ ret = do_one_fail_notif_flush(ring, ~(__u32)0, 1);
+ assert(ret == -EOVERFLOW);
+ ret = do_one_fail_notif_flush(ring, NR_SLOTS - 1, ~(__u32)0);
+ assert(ret == -EOVERFLOW);
+ return 0;
+}
+
+static void do_one_consume(struct io_uring *ring, int sock_tx, int sock_rx,
+ int slot_idx)
+{
+ int ret;
+
+ ret = do_one(ring, sock_tx, slot_idx);
+ assert(ret == 1);
+
+ ret = recv(sock_rx, rx_buffer, 1, MSG_TRUNC);
+ assert(ret == 1);
+}
+
+static int test_update_flush(struct io_uring *ring, int sock_tx, int sock_rx)
+{
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int offset = 1, nr_to_flush = 3;
+ int ret, i, slot_idx;
+
+ /*
+ * Flush will be skipped for unused slots, so attached at least 1 req
+ * to each active notifier / slot
+ */
+ for (slot_idx = 0; slot_idx < NR_SLOTS; slot_idx++)
+ do_one_consume(ring, sock_tx, sock_rx, slot_idx);
+
+ assert(check_cq_empty(ring));
+
+ /* flush first */
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_notif_update(sqe, 0, 0, 1);
+ sqe->user_data = 1;
+ sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+ ret = io_uring_submit(ring);
+ assert(ret == 1);
+
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret && !cqe->res && cqe->user_data == ZC_TAG);
+ assert(cqe->flags == seqs[0]);
+ seqs[0]++;
+ io_uring_cqe_seen(ring, cqe);
+ do_one_consume(ring, sock_tx, sock_rx, 0);
+ assert(check_cq_empty(ring));
+
+ /* flush last */
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_notif_update(sqe, 0, NR_SLOTS - 1, 1);
+ sqe->user_data = 1;
+ sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+ ret = io_uring_submit(ring);
+ assert(ret == 1);
+
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret && !cqe->res && cqe->user_data == ZC_TAG + NR_SLOTS - 1);
+ assert(cqe->flags == seqs[NR_SLOTS - 1]);
+ seqs[NR_SLOTS - 1]++;
+ io_uring_cqe_seen(ring, cqe);
+ assert(check_cq_empty(ring));
+
+ /* we left the last slot without attached requests, flush should ignore it */
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_notif_update(sqe, 0, NR_SLOTS - 1, 1);
+ sqe->user_data = 1;
+ ret = io_uring_submit(ring);
+ assert(ret == 1);
+
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret && !cqe->res && cqe->user_data == 1);
+ io_uring_cqe_seen(ring, cqe);
+ assert(check_cq_empty(ring));
+
+ /* flush range */
+ sqe = io_uring_get_sqe(ring);
+ io_uring_prep_notif_update(sqe, 0, offset, nr_to_flush);
+ sqe->user_data = 1;
+ sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
+ ret = io_uring_submit(ring);
+ assert(ret == 1);
+
+ for (i = 0; i < nr_to_flush; i++) {
+ int slot_idx;
+
+ ret = io_uring_wait_cqe(ring, &cqe);
+ assert(!ret && !cqe->res);
+ assert(ZC_TAG + offset <= cqe->user_data &&
+ cqe->user_data < ZC_TAG + offset + nr_to_flush);
+ slot_idx = cqe->user_data - ZC_TAG;
+ assert(cqe->flags == seqs[slot_idx]);
+ seqs[slot_idx]++;
+ io_uring_cqe_seen(ring, cqe);
+ }
+ assert(check_cq_empty(ring));
+ return 0;
+}
+
+static int test_registration(int sock_tx, int sock_rx)
+{
+ struct io_uring_notification_slot slots[2] = {
+ {.tag = 1}, {.tag = 2},
+ };
+ void *invalid_slots = (void *)1UL;
+ struct io_uring ring;
+ int ret, i;
+
+ ret = io_uring_queue_init(4, &ring, 0);
+ if (ret) {
+ fprintf(stderr, "queue init failed: %d\n", ret);
+ return 1;
+ }
+
+ ret = io_uring_unregister_notifications(&ring);
+ if (ret != -ENXIO) {
+ fprintf(stderr, "unregister nothing: %d\n", ret);
+ return 1;
+ }
+
+ ret = io_uring_register_notifications(&ring, 2, slots);
+ if (ret) {
+ fprintf(stderr, "io_uring_register_notifications failed: %d\n", ret);
+ return 1;
+ }
+
+ ret = io_uring_register_notifications(&ring, 2, slots);
+ if (ret != -EBUSY) {
+ fprintf(stderr, "double register: %d\n", ret);
+ return 1;
+ }
+
+ ret = io_uring_unregister_notifications(&ring);
+ if (ret) {
+ fprintf(stderr, "unregister failed: %d\n", ret);
+ return 1;
+ }
+
+ ret = io_uring_register_notifications(&ring, 2, slots);
+ if (ret) {
+ fprintf(stderr, "second register failed: %d\n", ret);
+ return 1;
+ }
+
+ ret = test_invalid_slot(&ring, sock_tx, sock_rx);
+ if (ret) {
+ fprintf(stderr, "test_invalid_slot() failed\n");
+ return ret;
+ }
+
+ for (i = 0; i < 2; i++) {
+ ret = do_one(&ring, sock_tx, 0);
+ assert(ret == 1);
+
+ ret = recv(sock_rx, rx_buffer, 1, MSG_TRUNC);
+ assert(ret == 1);
+ }
+
+ io_uring_queue_exit(&ring);
+ ret = io_uring_queue_init(4, &ring, 0);
+ if (ret) {
+ fprintf(stderr, "queue init failed: %d\n", ret);
+ return 1;
+ }
+
+ ret = io_uring_register_notifications(&ring, 4, invalid_slots);
+ if (ret != -EFAULT) {
+ fprintf(stderr, "io_uring_register_notifications with invalid ptr: %d\n", ret);
+ return 1;
+ }
+
+ io_uring_queue_exit(&ring);
+ return 0;
+}
+
+static int prepare_ip(struct sockaddr_storage *addr, int *sock_client, int *sock_server,
+ bool ipv6, bool client_connect, bool msg_zc)
+{
+ int family, addr_size;
+ int ret, val;
+
+ memset(addr, 0, sizeof(*addr));
+ if (ipv6) {
+ struct sockaddr_in6 *saddr = (struct sockaddr_in6 *)addr;
+
+ family = AF_INET6;
+ saddr->sin6_family = family;
+ saddr->sin6_port = htons(PORT);
+ addr_size = sizeof(*saddr);
+ } else {
+ struct sockaddr_in *saddr = (struct sockaddr_in *)addr;
+
+ family = AF_INET;
+ saddr->sin_family = family;
+ saddr->sin_port = htons(PORT);
+ saddr->sin_addr.s_addr = htonl(INADDR_ANY);
+ addr_size = sizeof(*saddr);
+ }
+
+ /* server sock setup */
+ *sock_server = socket(family, SOCK_DGRAM, 0);
+ if (*sock_server < 0) {
+ perror("socket");
+ return 1;
+ }
+ val = 1;
+ setsockopt(*sock_server, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
+ ret = bind(*sock_server, (struct sockaddr *)addr, addr_size);
+ if (ret < 0) {
+ perror("bind");
+ return 1;
+ }
+
+ if (ipv6) {
+ struct sockaddr_in6 *saddr = (struct sockaddr_in6 *)addr;
+
+ inet_pton(AF_INET6, HOSTV6, &(saddr->sin6_addr));
+ } else {
+ struct sockaddr_in *saddr = (struct sockaddr_in *)addr;
+
+ inet_pton(AF_INET, HOST, &saddr->sin_addr);
+ }
+
+ /* client sock setup */
+ *sock_client = socket(family, SOCK_DGRAM, 0);
+ if (*sock_client < 0) {
+ perror("socket");
+ return 1;
+ }
+ if (client_connect) {
+ ret = connect(*sock_client, (struct sockaddr *)addr, addr_size);
+ if (ret < 0) {
+ perror("connect");
+ return 1;
+ }
+ }
+ if (msg_zc) {
+ val = 1;
+ if (setsockopt(*sock_client, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val))) {
+ perror("setsockopt zc");
+ return 1;
+ }
+ }
+ return 0;
+}
+
+static int do_test_inet_send(struct io_uring *ring, int sock_client, int sock_server,
+ bool fixed_buf, struct sockaddr_storage *addr,
+ size_t send_size, bool cork, bool mix_register,
+ int buf_idx)
+{
+ const unsigned slot_idx = 0;
+ const unsigned zc_flags = 0;
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int nr_reqs = cork ? 5 : 1;
+ int i, ret;
+ size_t chunk_size = send_size / nr_reqs;
+ size_t chunk_size_last = send_size - chunk_size * (nr_reqs - 1);
+ char *buf = buffers_iov[buf_idx].iov_base;
+
+ assert(send_size <= buffers_iov[buf_idx].iov_len);
+ memset(rx_buffer, 0, sizeof(rx_buffer));
+
+ for (i = 0; i < nr_reqs; i++) {
+ bool cur_fixed_buf = fixed_buf;
+ size_t cur_size = chunk_size;
+ int msg_flags = 0;
+
+ if (mix_register)
+ cur_fixed_buf = rand() & 1;
+
+ if (cork && i != nr_reqs - 1)
+ msg_flags = MSG_MORE;
+ if (i == nr_reqs - 1)
+ cur_size = chunk_size_last;
+
+ sqe = io_uring_get_sqe(ring);
+ if (cur_fixed_buf)
+ io_uring_prep_sendzc_fixed(sqe, sock_client,
+ buf + i * chunk_size,
+ cur_size, msg_flags, slot_idx,
+ zc_flags, buf_idx);
+ else
+ io_uring_prep_sendzc(sqe, sock_client,
+ buf + i * chunk_size,
+ cur_size, msg_flags, slot_idx,
+ zc_flags);
+
+ if (addr) {
+ sa_family_t fam = ((struct sockaddr_in *)addr)->sin_family;
+ int addr_len = fam == AF_INET ? sizeof(struct sockaddr_in) :
+ sizeof(struct sockaddr_in6);
+
+ io_uring_prep_sendzc_set_addr(sqe, (const struct sockaddr *)addr,
+ addr_len);
+ }
+ sqe->user_data = i;
+ }
+
+ ret = io_uring_submit(ring);
+ if (ret != nr_reqs) {
+ fprintf(stderr, "submit failed %i expected %i\n", ret, nr_reqs);
+ return 1;
+ }
+
+ for (i = 0; i < nr_reqs; i++) {
+ int expected = chunk_size;
+
+ ret = io_uring_wait_cqe(ring, &cqe);
+ if (ret) {
+ fprintf(stderr, "io_uring_wait_cqe failed %i\n", ret);
+ return 1;
+ }
+ if (cqe->user_data >= nr_reqs) {
+ fprintf(stderr, "invalid user_data\n");
+ return 1;
+ }
+ if (cqe->user_data == nr_reqs - 1)
+ expected = chunk_size_last;
+ if (cqe->res != expected) {
+ fprintf(stderr, "invalid cqe->res %d expected %d\n",
+ cqe->res, expected);
+ return 1;
+ }
+ io_uring_cqe_seen(ring, cqe);
+ }
+
+ ret = recv(sock_server, rx_buffer, send_size, 0);
+ if (ret != send_size) {
+ fprintf(stderr, "recv less than expected or recv failed %i\n", ret);
+ return 1;
+ }
+
+ for (i = 0; i < send_size; i++) {
+ if (buf[i] != rx_buffer[i]) {
+ fprintf(stderr, "botched data, first byte %i, %u vs %u\n",
+ i, buf[i], rx_buffer[i]);
+ }
+ }
+ return 0;
+}
+
+static int test_inet_send(struct io_uring *ring)
+{
+ struct sockaddr_storage addr;
+ int sock_client, sock_server;
+ int ret, j;
+ __u64 i;
+
+ for (j = 0; j < 8; j++) {
+ bool ipv6 = j & 1;
+ bool client_connect = j & 2;
+ bool msg_zc_set = j & 4;
+
+ ret = prepare_ip(&addr, &sock_client, &sock_server, ipv6,
+ client_connect, msg_zc_set);
+ if (ret) {
+ fprintf(stderr, "sock prep failed %d\n", ret);
+ return 1;
+ }
+
+ for (i = 0; i < 64; i++) {
+ bool fixed_buf = i & 1;
+ struct sockaddr_storage *addr_arg = (i & 2) ? &addr : NULL;
+ size_t size = (i & 4) ? 137 : 4096;
+ bool cork = i & 8;
+ bool mix_register = i & 16;
+ bool aligned = i & 32;
+ int buf_idx = aligned ? 0 : 1;
+
+ if (mix_register && (!cork || fixed_buf))
+ continue;
+ if (!client_connect && addr_arg == NULL)
+ continue;
+
+ ret = do_test_inet_send(ring, sock_client, sock_server, fixed_buf,
+ addr_arg, size, cork, mix_register,
+ buf_idx);
+ if (ret) {
+ fprintf(stderr, "send failed fixed buf %i, conn %i, addr %i, "
+ "cork %i\n",
+ fixed_buf, client_connect, !!addr_arg,
+ cork);
+ return 1;
+ }
+ }
+
+ close(sock_client);
+ close(sock_server);
+ }
+ return 0;
+}
+
+int main(int argc, char *argv[])
+{
+ struct io_uring ring;
+ int i, ret, sp[2];
+
+ if (argc > 1)
+ return T_EXIT_SKIP;
+
+ ret = io_uring_queue_init(32, &ring, 0);
+ if (ret) {
+ fprintf(stderr, "queue init failed: %d\n", ret);
+ return T_EXIT_FAIL;
+ }
+
+ ret = register_notifications(&ring);
+ if (ret == -EINVAL) {
+ printf("sendzc is not supported, skip\n");
+ return T_EXIT_SKIP;
+ } else if (ret) {
+ fprintf(stderr, "register notif failed %i\n", ret);
+ return T_EXIT_FAIL;
+ }
+
+ srand((unsigned)time(NULL));
+ for (i = 0; i < sizeof(tx_buffer); i++)
+ tx_buffer[i] = i;
+
+ if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sp) != 0) {
+ perror("Failed to create Unix-domain socket pair\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_registration(sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_registration() failed\n");
+ return ret;
+ }
+
+ ret = test_invalid_slot(&ring, sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_invalid_slot() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_basic_send(&ring, sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_basic_send() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_send_flush(&ring, sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_send_flush() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_multireq_notif(&ring, sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_multireq_notif() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = reregister_notifications(&ring);
+ if (ret) {
+ fprintf(stderr, "reregister notifiers failed %i\n", ret);
+ return T_EXIT_FAIL;
+ }
+ /* retry a few tests after registering notifs */
+ ret = test_invalid_slot(&ring, sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_invalid_slot() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_multireq_notif(&ring, sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_multireq_notif2() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_multi_send_flushing(&ring, sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_multi_send_flushing() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_update_flush_fail(&ring);
+ if (ret) {
+ fprintf(stderr, "test_update_flush_fail() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_update_flush(&ring, sp[0], sp[1]);
+ if (ret) {
+ fprintf(stderr, "test_update_flush() failed\n");
+ return T_EXIT_FAIL;
+ }
+
+ ret = t_register_buffers(&ring, buffers_iov, ARRAY_SIZE(buffers_iov));
+ if (ret == T_SETUP_SKIP) {
+ fprintf(stderr, "can't register bufs, skip\n");
+ goto out;
+ } else if (ret != T_SETUP_OK) {
+ fprintf(stderr, "buffer registration failed %i\n", ret);
+ return T_EXIT_FAIL;
+ }
+
+ ret = test_inet_send(&ring);
+ if (ret) {
+ fprintf(stderr, "test_inet_send() failed\n");
+ return ret;
+ }
+out:
+ io_uring_queue_exit(&ring);
+ close(sp[0]);
+ close(sp[1]);
+ return T_EXIT_PASS;
+}
--
2.37.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH liburing v2 4/5] examples: add a zerocopy send example
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
` (2 preceding siblings ...)
2022-07-25 11:33 ` [PATCH liburing v2 3/5] tests: add tests for zerocopy send and notifications Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 5/5] liburing: improve fallocate typecasting Pavel Begunkov
` (2 subsequent siblings)
6 siblings, 0 replies; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi
Signed-off-by: Pavel Begunkov <[email protected]>
---
examples/Makefile | 3 +-
examples/send-zerocopy.c | 366 +++++++++++++++++++++++++++++++++++++++
2 files changed, 368 insertions(+), 1 deletion(-)
create mode 100644 examples/send-zerocopy.c
diff --git a/examples/Makefile b/examples/Makefile
index 8e7067f..1997a31 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -14,7 +14,8 @@ example_srcs := \
io_uring-cp.c \
io_uring-test.c \
link-cp.c \
- poll-bench.c
+ poll-bench.c \
+ send-zerocopy.c
all_targets :=
diff --git a/examples/send-zerocopy.c b/examples/send-zerocopy.c
new file mode 100644
index 0000000..e42aa71
--- /dev/null
+++ b/examples/send-zerocopy.c
@@ -0,0 +1,366 @@
+/* SPDX-License-Identifier: MIT */
+/* based on linux-kernel/tools/testing/selftests/net/msg_zerocopy.c */
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <assert.h>
+#include <errno.h>
+#include <error.h>
+#include <limits.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <arpa/inet.h>
+#include <linux/errqueue.h>
+#include <linux/if_packet.h>
+#include <linux/ipv6.h>
+#include <linux/socket.h>
+#include <linux/sockios.h>
+#include <net/ethernet.h>
+#include <net/if.h>
+#include <netinet/ip.h>
+#include <netinet/in.h>
+#include <netinet/ip6.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <sys/un.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "liburing.h"
+
+#define ZC_TAG 0xfffffffULL
+#define MAX_SUBMIT_NR 512
+
+static bool cfg_reg_ringfd = true;
+static bool cfg_fixed_files = 1;
+static bool cfg_zc = 1;
+static bool cfg_flush = 0;
+static int cfg_nr_reqs = 8;
+static bool cfg_fixed_buf = 1;
+
+static int cfg_family = PF_UNSPEC;
+static int cfg_payload_len;
+static int cfg_port = 8000;
+static int cfg_runtime_ms = 4200;
+
+static socklen_t cfg_alen;
+static struct sockaddr_storage cfg_dst_addr;
+
+static char payload[IP_MAXPACKET] __attribute__((aligned(4096)));
+
+static unsigned long gettimeofday_ms(void)
+{
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+}
+
+static void do_setsockopt(int fd, int level, int optname, int val)
+{
+ if (setsockopt(fd, level, optname, &val, sizeof(val)))
+ error(1, errno, "setsockopt %d.%d: %d", level, optname, val);
+}
+
+static void setup_sockaddr(int domain, const char *str_addr,
+ struct sockaddr_storage *sockaddr)
+{
+ struct sockaddr_in6 *addr6 = (void *) sockaddr;
+ struct sockaddr_in *addr4 = (void *) sockaddr;
+
+ switch (domain) {
+ case PF_INET:
+ memset(addr4, 0, sizeof(*addr4));
+ addr4->sin_family = AF_INET;
+ addr4->sin_port = htons(cfg_port);
+ if (str_addr &&
+ inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
+ error(1, 0, "ipv4 parse error: %s", str_addr);
+ break;
+ case PF_INET6:
+ memset(addr6, 0, sizeof(*addr6));
+ addr6->sin6_family = AF_INET6;
+ addr6->sin6_port = htons(cfg_port);
+ if (str_addr &&
+ inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
+ error(1, 0, "ipv6 parse error: %s", str_addr);
+ break;
+ default:
+ error(1, 0, "illegal domain");
+ }
+}
+
+static int do_setup_tx(int domain, int type, int protocol)
+{
+ int fd;
+
+ fd = socket(domain, type, protocol);
+ if (fd == -1)
+ error(1, errno, "socket t");
+
+ do_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, 1 << 21);
+
+ if (connect(fd, (void *) &cfg_dst_addr, cfg_alen))
+ error(1, errno, "connect");
+ return fd;
+}
+
+static inline struct io_uring_cqe *wait_cqe_fast(struct io_uring *ring)
+{
+ struct io_uring_cqe *cqe;
+ unsigned head;
+ int ret;
+
+ io_uring_for_each_cqe(ring, head, cqe)
+ return cqe;
+
+ ret = io_uring_wait_cqe(ring, &cqe);
+ if (ret)
+ error(1, ret, "wait cqe");
+ return cqe;
+}
+
+static void do_tx(int domain, int type, int protocol)
+{
+ unsigned long packets = 0;
+ unsigned long bytes = 0;
+ struct io_uring ring;
+ struct iovec iov;
+ uint64_t tstop;
+ int i, fd, ret;
+ int compl_cqes = 0;
+
+ fd = do_setup_tx(domain, type, protocol);
+
+ ret = io_uring_queue_init(512, &ring, IORING_SETUP_COOP_TASKRUN);
+ if (ret)
+ error(1, ret, "io_uring: queue init");
+
+ if (cfg_zc) {
+ struct io_uring_notification_slot b[1] = {{.tag = ZC_TAG}};
+
+ ret = io_uring_register_notifications(&ring, 1, b);
+ if (ret)
+ error(1, ret, "io_uring: tx ctx registration");
+ }
+ if (cfg_fixed_files) {
+ ret = io_uring_register_files(&ring, &fd, 1);
+ if (ret < 0)
+ error(1, ret, "io_uring: files registration");
+ }
+ if (cfg_reg_ringfd) {
+ ret = io_uring_register_ring_fd(&ring);
+ if (ret < 0)
+ error(1, ret, "io_uring: io_uring_register_ring_fd");
+ }
+
+ iov.iov_base = payload;
+ iov.iov_len = cfg_payload_len;
+
+ ret = io_uring_register_buffers(&ring, &iov, 1);
+ if (ret)
+ error(1, ret, "io_uring: buffer registration");
+
+ tstop = gettimeofday_ms() + cfg_runtime_ms;
+ do {
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ unsigned zc_flags = 0;
+ unsigned buf_idx = 0;
+ unsigned slot_idx = 0;
+ unsigned msg_flags = 0;
+
+ compl_cqes += cfg_flush ? cfg_nr_reqs : 0;
+ if (cfg_flush)
+ zc_flags |= IORING_RECVSEND_NOTIF_FLUSH;
+
+ for (i = 0; i < cfg_nr_reqs; i++) {
+ sqe = io_uring_get_sqe(&ring);
+
+ if (!cfg_zc)
+ io_uring_prep_send(sqe, fd, payload,
+ cfg_payload_len, 0);
+ else if (cfg_fixed_buf)
+ io_uring_prep_sendzc_fixed(sqe, fd, payload,
+ cfg_payload_len,
+ msg_flags, slot_idx,
+ zc_flags, buf_idx);
+ else
+ io_uring_prep_sendzc(sqe, fd, payload,
+ cfg_payload_len, msg_flags,
+ slot_idx, zc_flags);
+
+ sqe->user_data = 1;
+ if (cfg_fixed_files) {
+ sqe->fd = 0;
+ sqe->flags |= IOSQE_FIXED_FILE;
+ }
+ }
+
+ ret = io_uring_submit(&ring);
+ if (ret != cfg_nr_reqs)
+ error(1, ret, "submit");
+
+ for (i = 0; i < cfg_nr_reqs; i++) {
+ cqe = wait_cqe_fast(&ring);
+
+ if (cqe->user_data == ZC_TAG) {
+ compl_cqes--;
+ i--;
+ } else if (cqe->user_data != 1) {
+ error(1, cqe->user_data, "invalid user_data");
+ } else if (cqe->res > 0) {
+ packets++;
+ bytes += cqe->res;
+ } else if (cqe->res == -EAGAIN) {
+ /* request failed, don't flush */
+ if (cfg_flush)
+ compl_cqes--;
+ } else if (cqe->res == -ECONNREFUSED ||
+ cqe->res == -ECONNRESET ||
+ cqe->res == -EPIPE) {
+ fprintf(stderr, "Connection failure\n");
+ goto out_fail;
+ } else {
+ error(1, cqe->res, "send failed");
+ }
+
+ io_uring_cqe_seen(&ring, cqe);
+ }
+ } while (gettimeofday_ms() < tstop);
+
+out_fail:
+ shutdown(fd, SHUT_RDWR);
+ if (close(fd))
+ error(1, errno, "close");
+
+ fprintf(stderr, "tx=%lu (MB=%lu), tx/s=%lu (MB/s=%lu)\n",
+ packets, bytes >> 20,
+ packets / (cfg_runtime_ms / 1000),
+ (bytes >> 20) / (cfg_runtime_ms / 1000));
+
+ while (compl_cqes) {
+ struct io_uring_cqe *cqe = wait_cqe_fast(&ring);
+
+ io_uring_cqe_seen(&ring, cqe);
+ compl_cqes--;
+ }
+
+ if (cfg_zc) {
+ ret = io_uring_unregister_notifications(&ring);
+ if (ret)
+ error(1, ret, "io_uring: tx ctx unregistration");
+ }
+ io_uring_queue_exit(&ring);
+}
+
+static void do_test(int domain, int type, int protocol)
+{
+ int i;
+
+ for (i = 0; i < IP_MAXPACKET; i++)
+ payload[i] = 'a' + (i % 26);
+
+ do_tx(domain, type, protocol);
+}
+
+static void usage(const char *filepath)
+{
+ error(1, 0, "Usage: %s [-f] [-n<N>] [-z0] [-s<payload size>] "
+ "(-4|-6) [-t<time s>] -D<dst_ip> udp", filepath);
+}
+
+static void parse_opts(int argc, char **argv)
+{
+ const int max_payload_len = sizeof(payload) -
+ sizeof(struct ipv6hdr) -
+ sizeof(struct tcphdr) -
+ 40 /* max tcp options */;
+ int c;
+ char *daddr = NULL;
+
+ if (argc <= 1)
+ usage(argv[0]);
+
+ cfg_payload_len = max_payload_len;
+
+ while ((c = getopt(argc, argv, "46D:p:s:t:n:fz:b:k")) != -1) {
+ switch (c) {
+ case '4':
+ if (cfg_family != PF_UNSPEC)
+ error(1, 0, "Pass one of -4 or -6");
+ cfg_family = PF_INET;
+ cfg_alen = sizeof(struct sockaddr_in);
+ break;
+ case '6':
+ if (cfg_family != PF_UNSPEC)
+ error(1, 0, "Pass one of -4 or -6");
+ cfg_family = PF_INET6;
+ cfg_alen = sizeof(struct sockaddr_in6);
+ break;
+ case 'D':
+ daddr = optarg;
+ break;
+ case 'p':
+ cfg_port = strtoul(optarg, NULL, 0);
+ break;
+ case 's':
+ cfg_payload_len = strtoul(optarg, NULL, 0);
+ break;
+ case 't':
+ cfg_runtime_ms = 200 + strtoul(optarg, NULL, 10) * 1000;
+ break;
+ case 'n':
+ cfg_nr_reqs = strtoul(optarg, NULL, 0);
+ break;
+ case 'f':
+ cfg_flush = 1;
+ break;
+ case 'z':
+ cfg_zc = strtoul(optarg, NULL, 0);
+ break;
+ case 'b':
+ cfg_fixed_buf = strtoul(optarg, NULL, 0);
+ break;
+ }
+ }
+
+ if (cfg_nr_reqs > MAX_SUBMIT_NR)
+ error(1, 0, "-n: submit batch nr exceeds max (%d)", MAX_SUBMIT_NR);
+ if (cfg_flush && !cfg_zc)
+ error(1, 0, "cfg_flush should be used with zc only");
+ if (cfg_payload_len > max_payload_len)
+ error(1, 0, "-s: payload exceeds max (%d)", max_payload_len);
+
+ setup_sockaddr(cfg_family, daddr, &cfg_dst_addr);
+
+ if (optind != argc - 1)
+ usage(argv[0]);
+}
+
+int main(int argc, char **argv)
+{
+ const char *cfg_test;
+
+ parse_opts(argc, argv);
+
+ cfg_test = argv[argc - 1];
+ if (!strcmp(cfg_test, "tcp"))
+ do_test(cfg_family, SOCK_STREAM, 0);
+ else if (!strcmp(cfg_test, "udp"))
+ do_test(cfg_family, SOCK_DGRAM, 0);
+ else
+ error(1, 0, "unknown cfg_test %s", cfg_test);
+
+ return 0;
+}
--
2.37.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH liburing v2 5/5] liburing: improve fallocate typecasting
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
` (3 preceding siblings ...)
2022-07-25 11:33 ` [PATCH liburing v2 4/5] examples: add a zerocopy send example Pavel Begunkov
@ 2022-07-25 11:33 ` Pavel Begunkov
2022-07-25 12:21 ` Ammar Faizi
2022-07-25 12:17 ` [PATCH liburing v2 0/5] zerocopy send headers and tests Ammar Faizi
2022-07-25 15:48 ` Jens Axboe
6 siblings, 1 reply; 10+ messages in thread
From: Pavel Begunkov @ 2022-07-25 11:33 UTC (permalink / raw)
To: io-uring; +Cc: Jens Axboe, asml.silence, Ammar Faizi
Don't double cast int -> ptr -> int in io_uring_prep_fallocate(), assign
len directly.
Signed-off-by: Pavel Begunkov <[email protected]>
---
src/include/liburing.h | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/src/include/liburing.h b/src/include/liburing.h
index 20cd308..cffdabd 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -601,10 +601,9 @@ static inline void io_uring_prep_files_update(struct io_uring_sqe *sqe,
static inline void io_uring_prep_fallocate(struct io_uring_sqe *sqe, int fd,
int mode, off_t offset, off_t len)
{
-
io_uring_prep_rw(IORING_OP_FALLOCATE, sqe, fd,
- (const uintptr_t *) (unsigned long) len,
- (unsigned int) mode, (__u64) offset);
+ 0, (unsigned int) mode, (__u64) offset);
+ sqe->addr = (__u64) len;
}
static inline void io_uring_prep_openat(struct io_uring_sqe *sqe, int dfd,
--
2.37.0
^ permalink raw reply related [flat|nested] 10+ messages in thread
* Re: [PATCH liburing v2 0/5] zerocopy send headers and tests
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
` (4 preceding siblings ...)
2022-07-25 11:33 ` [PATCH liburing v2 5/5] liburing: improve fallocate typecasting Pavel Begunkov
@ 2022-07-25 12:17 ` Ammar Faizi
2022-07-25 15:48 ` Jens Axboe
6 siblings, 0 replies; 10+ messages in thread
From: Ammar Faizi @ 2022-07-25 12:17 UTC (permalink / raw)
To: Pavel Begunkov, io-uring Mailing List; +Cc: Jens Axboe
On 7/25/22 6:33 PM, Pavel Begunkov wrote:
> Add zerocopy send headers, helpers and tests
>
> v2:
> use T_EXIT_*
> fix ptr <-> int conversions for 32 bits arches
> slight renaming
> get rid of error() in the test
> add patch 5/5
>
> Pavel Begunkov (5):
> io_uring.h: sync with kernel for zc send and notifiers
> liburing: add zc send and notif helpers
> tests: add tests for zerocopy send and notifications
> examples: add a zerocopy send example
> liburing: improve fallocate typecasting
I can confirm it compiles fine on all architectures now. Nice.
--
Ammar Faizi
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [PATCH liburing v2 5/5] liburing: improve fallocate typecasting
2022-07-25 11:33 ` [PATCH liburing v2 5/5] liburing: improve fallocate typecasting Pavel Begunkov
@ 2022-07-25 12:21 ` Ammar Faizi
0 siblings, 0 replies; 10+ messages in thread
From: Ammar Faizi @ 2022-07-25 12:21 UTC (permalink / raw)
To: Pavel Begunkov, io-uring Mailing List; +Cc: Jens Axboe
On 7/25/22 6:33 PM, Pavel Begunkov wrote:
> Don't double cast int -> ptr -> int in io_uring_prep_fallocate(), assign
> len directly.
>
> Signed-off-by: Pavel Begunkov <[email protected]>
> ---
> src/include/liburing.h | 5 ++---
> 1 file changed, 2 insertions(+), 3 deletions(-)
[...]
> io_uring_prep_rw(IORING_OP_FALLOCATE, sqe, fd,
> - (const uintptr_t *) (unsigned long) len,
> - (unsigned int) mode, (__u64) offset);
> + 0, (unsigned int) mode, (__u64) offset);
nit: instead of 0 it's better to use NULL. No?
> + sqe->addr = (__u64) len;
This one looks simpler than a double cast.
Reviewed-by: Ammar Faizi <[email protected]>
--
Ammar Faizi
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [PATCH liburing v2 2/5] liburing: add zc send and notif helpers
2022-07-25 11:33 ` [PATCH liburing v2 2/5] liburing: add zc send and notif helpers Pavel Begunkov
@ 2022-07-25 12:35 ` Ammar Faizi
0 siblings, 0 replies; 10+ messages in thread
From: Ammar Faizi @ 2022-07-25 12:35 UTC (permalink / raw)
To: Pavel Begunkov, io-uring; +Cc: Jens Axboe
On 7/25/22 6:33 PM, Pavel Begunkov wrote:
> Add helpers for notification registration and preparing zerocopy send
> requests.
>
> Signed-off-by: Pavel Begunkov <[email protected]>
> ---
> src/include/liburing.h | 42 ++++++++++++++++++++++++++++++++++++++++++
> src/liburing.map | 2 ++
> src/register.c | 20 ++++++++++++++++++++
> 3 files changed, 64 insertions(+)
[...]
> +static inline void io_uring_prep_notif_update(struct io_uring_sqe *sqe,
> + __u64 new_tag, /* 0 to ignore */
> + unsigned offset, unsigned nr)
> +{
> + io_uring_prep_rw(IORING_OP_FILES_UPDATE, sqe, -1, 0, nr,
> + (__u64)offset);
> + sqe->addr = new_tag;
> + sqe->ioprio = IORING_RSRC_UPDATE_NOTIF;
> +}
The same nit on there. But overall it looks good.
Reviewed-by: Ammar Faizi <[email protected]>
--
Ammar Faizi
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [PATCH liburing v2 0/5] zerocopy send headers and tests
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
` (5 preceding siblings ...)
2022-07-25 12:17 ` [PATCH liburing v2 0/5] zerocopy send headers and tests Ammar Faizi
@ 2022-07-25 15:48 ` Jens Axboe
6 siblings, 0 replies; 10+ messages in thread
From: Jens Axboe @ 2022-07-25 15:48 UTC (permalink / raw)
To: asml.silence, io-uring; +Cc: ammarfaizi2
On Mon, 25 Jul 2022 12:33:17 +0100, Pavel Begunkov wrote:
> Add zerocopy send headers, helpers and tests
>
> v2:
> use T_EXIT_*
> fix ptr <-> int conversions for 32 bits arches
> slight renaming
> get rid of error() in the test
> add patch 5/5
>
> [...]
Applied, thanks!
[1/5] io_uring.h: sync with kernel for zc send and notifiers
(no commit info)
[2/5] liburing: add zc send and notif helpers
(no commit info)
[3/5] tests: add tests for zerocopy send and notifications
(no commit info)
[4/5] examples: add a zerocopy send example
(no commit info)
[5/5] liburing: improve fallocate typecasting
(no commit info)
Best regards,
--
Jens Axboe
^ permalink raw reply [flat|nested] 10+ messages in thread
end of thread, other threads:[~2022-07-25 15:49 UTC | newest]
Thread overview: 10+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2022-07-25 11:33 [PATCH liburing v2 0/5] zerocopy send headers and tests Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 1/5] io_uring.h: sync with kernel for zc send and notifiers Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 2/5] liburing: add zc send and notif helpers Pavel Begunkov
2022-07-25 12:35 ` Ammar Faizi
2022-07-25 11:33 ` [PATCH liburing v2 3/5] tests: add tests for zerocopy send and notifications Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 4/5] examples: add a zerocopy send example Pavel Begunkov
2022-07-25 11:33 ` [PATCH liburing v2 5/5] liburing: improve fallocate typecasting Pavel Begunkov
2022-07-25 12:21 ` Ammar Faizi
2022-07-25 12:17 ` [PATCH liburing v2 0/5] zerocopy send headers and tests Ammar Faizi
2022-07-25 15:48 ` Jens Axboe
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox