* [RFC PATCH 1/2] io_uring: add high-performance IPC channel infrastructure
2026-03-13 13:07 [RFC PATCH 0/2] io_uring: add IPC channel infrastructure Daniel Hodges
@ 2026-03-13 13:07 ` Daniel Hodges
2026-03-13 13:07 ` [RFC PATCH 2/2] selftests/ipc: Add io_uring IPC selftest Daniel Hodges
2026-03-14 13:50 ` [RFC PATCH 0/2] io_uring: add IPC channel infrastructure Daniel Hodges
2 siblings, 0 replies; 6+ messages in thread
From: Daniel Hodges @ 2026-03-13 13:07 UTC (permalink / raw)
To: Jens Axboe; +Cc: Daniel Hodges, Pavel Begunkov, io-uring, linux-kernel
Add a new IPC mechanism built on top of io_uring for message passing
between processes.
Features:
- Shared memory ring buffer for message data transfer
- RCU-based subscriber lookup on send/recv paths
- Broadcast mode (all subscribers receive each message)
- Channel-based design supporting multiple subscribers
- Permission checking modeled on Unix file permissions
- Anonymous file per channel for mmap support
Architecture:
- Two new io_uring opcodes: IPC_SEND and IPC_RECV
- Four new registration commands for channel lifecycle:
CREATE, ATTACH, DETACH, and BUFFERS (BUFFERS is a stub)
- Channels are identified by numeric ID or 64-bit key
- Data is copied between userspace and a kernel ring buffer
via copy_from_user/copy_to_user
- Multicast (round-robin) flag is defined but not yet implemented
Send/recv hot path optimizations:
- Cache the descriptor array base pointer in struct io_ipc_channel
to avoid recomputing it on every send/recv
- Use __copy_from_user_inatomic/__copy_to_user_inatomic with fallback
to the standard copy variants
- Add prefetch/prefetchw hints for descriptor and data access
- In non-broadcast mode, wake only the first receiver instead of
iterating all subscribers
- Simplify error handling in io_ipc_send() using inline returns
- Remove channel pointer caching in request structs, emptying the
send and recv cleanup handlers
Signed-off-by: Daniel Hodges <git@danielhodges.dev>
---
include/linux/io_uring_types.h | 7 +
include/uapi/linux/io_uring.h | 74 +++
io_uring/Kconfig | 14 +
io_uring/Makefile | 1 +
io_uring/io_uring.c | 6 +
io_uring/ipc.c | 1002 ++++++++++++++++++++++++++++++++
io_uring/ipc.h | 161 +++++
io_uring/opdef.c | 19 +
io_uring/register.c | 25 +
9 files changed, 1309 insertions(+)
create mode 100644 io_uring/ipc.c
create mode 100644 io_uring/ipc.h
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index dd1420bfcb73..fbbe51aaca68 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -435,6 +435,13 @@ struct io_ring_ctx {
u32 pers_next;
struct xarray personalities;
+#ifdef CONFIG_IO_URING_IPC
+ /* IPC subscriber tracking - keyed by subscriber_id */
+ struct xarray ipc_subscribers;
+ /* IPC channels created by this ring - keyed by channel_id */
+ struct xarray ipc_channels;
+#endif
+
/* hashed buffered write serialization */
struct io_wq_hash *hash_map;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 17ac1b785440..a5b68bd1a047 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -318,6 +318,8 @@ enum io_uring_op {
IORING_OP_PIPE,
IORING_OP_NOP128,
IORING_OP_URING_CMD128,
+ IORING_OP_IPC_SEND,
+ IORING_OP_IPC_RECV,
/* this goes last, obviously */
IORING_OP_LAST,
@@ -723,6 +725,12 @@ enum io_uring_register_op {
/* register bpf filtering programs */
IORING_REGISTER_BPF_FILTER = 37,
+ /* IPC channel operations */
+ IORING_REGISTER_IPC_CHANNEL_CREATE = 38,
+ IORING_REGISTER_IPC_CHANNEL_ATTACH = 39,
+ IORING_REGISTER_IPC_CHANNEL_DETACH = 40,
+ IORING_REGISTER_IPC_CHANNEL_DESTROY = 41,
+
/* this goes last */
IORING_REGISTER_LAST,
@@ -1057,6 +1065,72 @@ struct io_timespec {
__u64 tv_nsec;
};
+/*
+ * IPC channel support
+ */
+
+/* Flags for IPC channel creation */
+#define IOIPC_F_BROADCAST (1U << 0) /* Broadcast mode (all subscribers receive) */
+#define IOIPC_F_MULTICAST (1U << 1) /* Multicast mode (round-robin delivery) */
+#define IOIPC_F_PRIVATE (1U << 2) /* Private (permissions enforced strictly) */
+
+/* Flags for subscriber attachment */
+#define IOIPC_SUB_SEND (1U << 0) /* Can send to channel */
+#define IOIPC_SUB_RECV (1U << 1) /* Can receive from channel */
+#define IOIPC_SUB_BOTH (IOIPC_SUB_SEND | IOIPC_SUB_RECV)
+
+/* Create IPC channel */
+struct io_uring_ipc_channel_create {
+ __u32 flags; /* IOIPC_F_BROADCAST, IOIPC_F_PRIVATE */
+ __u32 ring_entries; /* Number of message slots */
+ __u32 max_msg_size; /* Maximum message size */
+ __u32 mode; /* Permission bits (like chmod) */
+ __u64 key; /* Unique key for channel (like ftok()) */
+ __u32 channel_id_out; /* Returned channel ID */
+ __u32 reserved[3];
+};
+
+/* Attach to existing channel */
+struct io_uring_ipc_channel_attach {
+ __u32 channel_id; /* Attach by channel ID (when key == 0) */
+ __u32 flags; /* IOIPC_SUB_SEND, IOIPC_SUB_RECV, IOIPC_SUB_BOTH */
+ __u64 key; /* Non-zero: attach by key instead of channel_id */
+ __s32 channel_fd; /* Output: fd for mmap */
+ __u32 local_id_out; /* Output: local subscriber ID */
+ __u64 mmap_offset_out; /* Output: offset for mmap() */
+ __u32 region_size; /* Output: size of shared region */
+ __u32 reserved[3];
+};
+
+/* Message descriptor in the ring */
+struct io_uring_ipc_msg_desc {
+ __u64 offset; /* Offset in data region for message payload */
+ __u32 len; /* Message length */
+ __u32 msg_id; /* Unique message ID */
+ __u64 sender_data; /* Sender's user_data for context */
+};
+
+/* Shared ring structure (mmap'd to userspace) */
+struct io_uring_ipc_ring {
+ /* Cache-aligned producer/consumer positions */
+ struct {
+ __u32 head __attribute__((aligned(64)));
+ __u32 tail;
+ } producer;
+
+ struct {
+ __u32 head __attribute__((aligned(64)));
+ } consumer;
+
+ /* Ring parameters */
+ __u32 ring_mask;
+ __u32 ring_entries;
+ __u32 max_msg_size;
+
+ /* Array of message descriptors follows */
+ struct io_uring_ipc_msg_desc msgs[];
+};
+
#ifdef __cplusplus
}
#endif
diff --git a/io_uring/Kconfig b/io_uring/Kconfig
index a7ae23cf1035..d7d7f858b46c 100644
--- a/io_uring/Kconfig
+++ b/io_uring/Kconfig
@@ -14,3 +14,17 @@ config IO_URING_BPF
def_bool y
depends on BPF
depends on NET
+
+config IO_URING_IPC
+ bool "io_uring IPC support"
+ depends on IO_URING
+ default y
+ help
+ Enable io_uring-based inter-process communication.
+
+ This provides high-performance IPC channels that leverage
+ io_uring's shared memory infrastructure and async notification
+ model. Supports broadcast and multicast patterns with
+ zero-copy capabilities.
+
+ If unsure, say Y.
diff --git a/io_uring/Makefile b/io_uring/Makefile
index 931f9156132a..380fe8a19174 100644
--- a/io_uring/Makefile
+++ b/io_uring/Makefile
@@ -17,6 +17,7 @@ obj-$(CONFIG_IO_URING) += io_uring.o opdef.o kbuf.o rsrc.o notif.o \
query.o
obj-$(CONFIG_IO_URING_ZCRX) += zcrx.o
+obj-$(CONFIG_IO_URING_IPC) += ipc.o
obj-$(CONFIG_IO_WQ) += io-wq.o
obj-$(CONFIG_FUTEX) += futex.o
obj-$(CONFIG_EPOLL) += epoll.o
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 9a37035e76c0..6dccc005aaca 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -87,6 +87,7 @@
#include "msg_ring.h"
#include "memmap.h"
#include "zcrx.h"
+#include "ipc.h"
#include "timeout.h"
#include "poll.h"
@@ -251,6 +252,10 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
init_waitqueue_head(&ctx->sqo_sq_wait);
INIT_LIST_HEAD(&ctx->sqd_list);
INIT_LIST_HEAD(&ctx->cq_overflow_list);
+#ifdef CONFIG_IO_URING_IPC
+ xa_init(&ctx->ipc_subscribers);
+ xa_init(&ctx->ipc_channels);
+#endif
ret = io_alloc_cache_init(&ctx->apoll_cache, IO_POLL_ALLOC_CACHE_MAX,
sizeof(struct async_poll), 0);
ret |= io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
@@ -2154,6 +2159,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_sqe_buffers_unregister(ctx);
io_sqe_files_unregister(ctx);
io_unregister_zcrx_ifqs(ctx);
+ io_ipc_ctx_cleanup(ctx);
io_cqring_overflow_kill(ctx);
io_eventfd_unregister(ctx);
io_free_alloc_caches(ctx);
diff --git a/io_uring/ipc.c b/io_uring/ipc.c
new file mode 100644
index 000000000000..3f4daf75c843
--- /dev/null
+++ b/io_uring/ipc.c
@@ -0,0 +1,1002 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * io_uring IPC channel implementation
+ *
+ * High-performance inter-process communication using io_uring infrastructure.
+ * Provides broadcast and multicast channels with zero-copy support.
+ */
+#include <linux/kernel.h>
+#include <linux/errno.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/mm.h>
+#include <linux/slab.h>
+#include <linux/uaccess.h>
+#include <linux/io_uring.h>
+#include <linux/io_uring/cmd.h>
+#include <linux/hashtable.h>
+#include <linux/anon_inodes.h>
+#include <linux/mman.h>
+#include <linux/vmalloc.h>
+
+#include <uapi/linux/io_uring.h>
+
+#include "io_uring.h"
+#include "rsrc.h"
+#include "memmap.h"
+#include "ipc.h"
+
+#ifdef CONFIG_IO_URING_IPC
+
+/*
+ * Global channel registry
+ * Protected by RCU and spinlock
+ */
+static DEFINE_HASHTABLE(channel_hash, 8);
+static DEFINE_SPINLOCK(channel_hash_lock);
+static DEFINE_XARRAY_ALLOC(channel_xa);
+static atomic_t ipc_global_sub_id = ATOMIC_INIT(0);
+
+static int ipc_channel_mmap(struct file *file, struct vm_area_struct *vma)
+{
+ struct io_ipc_channel *channel;
+ size_t region_size;
+ unsigned long uaddr = vma->vm_start;
+ unsigned long size = vma->vm_end - vma->vm_start;
+ void *kaddr;
+ unsigned long pfn;
+ int ret;
+
+ /*
+ * Safely acquire a channel reference. A concurrent
+ * io_ipc_channel_destroy() can NULL private_data and drop
+ * all references; the channel is freed via call_rcu(), so
+ * it stays accessible under rcu_read_lock().
+ */
+ rcu_read_lock();
+ channel = READ_ONCE(file->private_data);
+ if (!channel || !refcount_inc_not_zero(&channel->ref_count)) {
+ rcu_read_unlock();
+ return -EINVAL;
+ }
+ rcu_read_unlock();
+
+ region_size = io_region_size(channel->region);
+ kaddr = channel->region->ptr;
+
+ /* Validate mmap parameters */
+ if (vma->vm_pgoff != 0) {
+ ret = -EINVAL;
+ goto out_put;
+ }
+
+ if (size > region_size) {
+ ret = -EINVAL;
+ goto out_put;
+ }
+
+ if (vma->vm_flags & VM_WRITE) {
+ ret = -EACCES;
+ goto out_put;
+ }
+
+ /* Prevent mprotect from later adding write permission */
+ vm_flags_clear(vma, VM_MAYWRITE);
+
+ /* Map the vmalloc'd region page by page */
+ vm_flags_set(vma, VM_DONTEXPAND | VM_DONTDUMP);
+
+ while (size > 0) {
+ pfn = vmalloc_to_pfn(kaddr);
+ ret = remap_pfn_range(vma, uaddr, pfn, PAGE_SIZE, vma->vm_page_prot);
+ if (ret)
+ goto out_put;
+
+ uaddr += PAGE_SIZE;
+ kaddr += PAGE_SIZE;
+ size -= PAGE_SIZE;
+ }
+
+ ret = 0;
+out_put:
+ io_ipc_channel_put(channel);
+ return ret;
+}
+
+static int ipc_channel_release(struct inode *inode, struct file *file)
+{
+ struct io_ipc_channel *channel = file->private_data;
+
+ if (channel)
+ io_ipc_channel_put(channel);
+
+ return 0;
+}
+
+static const struct file_operations ipc_channel_fops = {
+ .mmap = ipc_channel_mmap,
+ .release = ipc_channel_release,
+ .llseek = noop_llseek,
+};
+
+
+static int ipc_calc_region_size(u32 ring_entries, u32 max_msg_size,
+ size_t *ring_size_out, size_t *data_size_out,
+ size_t *total_size_out)
+{
+ size_t ring_size, data_size, total_size;
+
+ ring_size = sizeof(struct io_ipc_ring) +
+ ring_entries * sizeof(struct io_ipc_msg_desc);
+ ring_size = ALIGN(ring_size, PAGE_SIZE);
+
+ if ((u64)ring_entries * max_msg_size > INT_MAX)
+ return -EINVAL;
+
+ data_size = (size_t)ring_entries * max_msg_size;
+ data_size = ALIGN(data_size, PAGE_SIZE);
+
+ total_size = ring_size + data_size;
+
+ if (total_size > INT_MAX)
+ return -EINVAL;
+
+ *ring_size_out = ring_size;
+ *data_size_out = data_size;
+ *total_size_out = total_size;
+
+ return 0;
+}
+
+static int ipc_region_alloc(struct io_ipc_channel *channel, u32 ring_entries,
+ u32 max_msg_size)
+{
+ struct io_mapped_region *region;
+ size_t ring_size, data_size, total_size;
+ void *ptr;
+ int ret;
+
+ ret = ipc_calc_region_size(ring_entries, max_msg_size, &ring_size,
+ &data_size, &total_size);
+ if (ret)
+ return ret;
+
+ region = kzalloc(sizeof(*region), GFP_KERNEL);
+ if (!region)
+ return -ENOMEM;
+
+ ptr = vmalloc_user(total_size);
+ if (!ptr) {
+ kfree(region);
+ return -ENOMEM;
+ }
+
+ region->ptr = ptr;
+ region->nr_pages = (total_size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+ region->flags = 0;
+
+ channel->region = region;
+ channel->ring = (struct io_ipc_ring *)ptr;
+ channel->desc_array = (struct io_ipc_msg_desc *)((u8 *)ptr + sizeof(struct io_ipc_ring));
+ channel->data_region = ptr + ring_size;
+
+ channel->ring->ring_mask = ring_entries - 1;
+ channel->ring->ring_entries = ring_entries;
+ channel->ring->max_msg_size = max_msg_size;
+
+ return 0;
+}
+
+static void ipc_region_free(struct io_ipc_channel *channel)
+{
+ if (!channel->region)
+ return;
+
+ if (channel->region->ptr)
+ vfree(channel->region->ptr);
+ kfree(channel->region);
+ channel->region = NULL;
+ channel->ring = NULL;
+ channel->data_region = NULL;
+}
+
+static void io_ipc_channel_free(struct io_ipc_channel *channel)
+{
+ struct io_ipc_subscriber *sub;
+ unsigned long index;
+
+ xa_for_each(&channel->subscribers, index, sub)
+ kfree(sub);
+ xa_destroy(&channel->subscribers);
+
+ if (channel->file) {
+ /*
+ * Prevent the deferred fput release callback from
+ * accessing the channel after it has been freed.
+ */
+ WRITE_ONCE(channel->file->private_data, NULL);
+ fput(channel->file);
+ }
+
+ ipc_region_free(channel);
+
+ kfree(channel);
+}
+
+static void io_ipc_channel_free_rcu(struct rcu_head *rcu)
+{
+ struct io_ipc_channel *channel;
+
+ channel = container_of(rcu, struct io_ipc_channel, rcu);
+ io_ipc_channel_free(channel);
+}
+
+void io_ipc_channel_put(struct io_ipc_channel *channel)
+{
+ if (refcount_dec_and_test(&channel->ref_count)) {
+ /*
+ * Remove from global data structures before scheduling
+ * the RCU callback. Lookups use refcount_inc_not_zero()
+ * so no new references can appear. Removing here avoids
+ * taking xa_lock (a plain spinlock) inside the RCU
+ * callback which runs in softirq context — that would
+ * deadlock against xa_alloc() in process context.
+ */
+ spin_lock_bh(&channel_hash_lock);
+ if (!hlist_unhashed(&channel->hash_node))
+ hash_del_rcu(&channel->hash_node);
+ spin_unlock_bh(&channel_hash_lock);
+
+ xa_erase(&channel_xa, channel->channel_id);
+
+ call_rcu(&channel->rcu, io_ipc_channel_free_rcu);
+ }
+}
+
+struct io_ipc_channel *io_ipc_channel_get(u32 channel_id)
+{
+ struct io_ipc_channel *channel;
+
+ rcu_read_lock();
+ channel = xa_load(&channel_xa, channel_id);
+ if (channel && !refcount_inc_not_zero(&channel->ref_count))
+ channel = NULL;
+ rcu_read_unlock();
+
+ return channel;
+}
+
+struct io_ipc_channel *io_ipc_channel_get_by_key(u64 key)
+{
+ struct io_ipc_channel *channel;
+ u32 hash = hash_64(key, HASH_BITS(channel_hash));
+
+ rcu_read_lock();
+ hash_for_each_possible_rcu(channel_hash, channel, hash_node, hash) {
+ if (channel->key == key &&
+ refcount_inc_not_zero(&channel->ref_count)) {
+ rcu_read_unlock();
+ return channel;
+ }
+ }
+ rcu_read_unlock();
+
+ return NULL;
+}
+
+static int ipc_check_permission(struct io_ipc_channel *channel, u32 access)
+{
+ const struct cred *cred = current_cred();
+ kuid_t uid = cred->fsuid;
+ kgid_t gid = cred->fsgid;
+ u16 mode = channel->mode;
+ u16 needed = 0;
+
+ /* Map IPC access flags to Unix permission bits: send=write, recv=read */
+ if (access & IOIPC_SUB_RECV)
+ needed |= 4; /* read */
+ if (access & IOIPC_SUB_SEND)
+ needed |= 2; /* write */
+
+ if (uid_eq(uid, channel->owner_uid))
+ return ((mode >> 6) & needed) == needed ? 0 : -EACCES;
+
+ if (gid_eq(gid, channel->owner_gid))
+ return ((mode >> 3) & needed) == needed ? 0 : -EACCES;
+
+ return (mode & needed) == needed ? 0 : -EACCES;
+}
+
+int io_ipc_channel_create(struct io_ring_ctx *ctx,
+ const struct io_uring_ipc_channel_create __user *arg)
+{
+ struct io_uring_ipc_channel_create create;
+ struct io_ipc_channel *channel;
+ u32 hash;
+ int ret;
+
+ if (copy_from_user(&create, arg, sizeof(create)))
+ return -EFAULT;
+
+ if (!mem_is_zero(create.reserved, sizeof(create.reserved)))
+ return -EINVAL;
+
+ if (!create.ring_entries || create.ring_entries > IORING_MAX_ENTRIES)
+ return -EINVAL;
+
+ if (!is_power_of_2(create.ring_entries))
+ return -EINVAL;
+
+ if (!create.max_msg_size || create.max_msg_size > SZ_1M)
+ return -EINVAL;
+
+ if (create.flags & ~(IOIPC_F_BROADCAST | IOIPC_F_MULTICAST |
+ IOIPC_F_PRIVATE))
+ return -EINVAL;
+
+ if ((create.flags & IOIPC_F_BROADCAST) &&
+ (create.flags & IOIPC_F_MULTICAST))
+ return -EINVAL;
+
+ channel = kzalloc(sizeof(*channel), GFP_KERNEL);
+ if (!channel)
+ return -ENOMEM;
+
+ refcount_set(&channel->ref_count, 1);
+ channel->flags = create.flags;
+ channel->key = create.key;
+ channel->msg_max_size = create.max_msg_size;
+ channel->owner_uid = current_fsuid();
+ channel->owner_gid = current_fsgid();
+ channel->mode = create.mode & 0666;
+ atomic_set(&channel->next_msg_id, 1);
+ atomic_set(&channel->next_receiver_idx, 0);
+ atomic_set(&channel->recv_count, 0);
+ xa_init(&channel->subscribers);
+
+ ret = ipc_region_alloc(channel, create.ring_entries, create.max_msg_size);
+ if (ret)
+ goto err_free_channel;
+
+ refcount_inc(&channel->ref_count); /* File holds a reference */
+ channel->file = anon_inode_getfile("[io_uring_ipc]", &ipc_channel_fops,
+ channel, O_RDWR);
+ if (IS_ERR(channel->file)) {
+ ret = PTR_ERR(channel->file);
+ channel->file = NULL;
+ refcount_dec(&channel->ref_count);
+ goto err_free_region;
+ }
+
+ ret = xa_alloc(&channel_xa, &channel->channel_id, channel,
+ XA_LIMIT(1, INT_MAX), GFP_KERNEL);
+ if (ret < 0)
+ goto err_put_file;
+
+ hash = hash_64(create.key, HASH_BITS(channel_hash));
+ spin_lock_bh(&channel_hash_lock);
+ hash_add_rcu(channel_hash, &channel->hash_node, hash);
+ spin_unlock_bh(&channel_hash_lock);
+
+ /*
+ * Track the channel in the creating ring so that
+ * io_ipc_ctx_cleanup() can break the channel-file reference
+ * cycle if the ring is torn down without an explicit destroy.
+ */
+ ret = xa_insert(&ctx->ipc_channels, channel->channel_id,
+ xa_mk_value(0), GFP_KERNEL);
+ if (ret)
+ goto err_remove_channel;
+
+ create.channel_id_out = channel->channel_id;
+ if (copy_to_user((void __user *)arg, &create, sizeof(create))) {
+ ret = -EFAULT;
+ goto err_remove_ctx;
+ }
+
+ return 0;
+
+err_remove_ctx:
+ xa_erase(&ctx->ipc_channels, channel->channel_id);
+err_remove_channel:
+ spin_lock_bh(&channel_hash_lock);
+ hash_del_rcu(&channel->hash_node);
+ spin_unlock_bh(&channel_hash_lock);
+ xa_erase(&channel_xa, channel->channel_id);
+err_put_file:
+ /*
+ * fput() is deferred — the release callback will call
+ * io_ipc_channel_put() to drop the file's reference.
+ * Drop the initial reference here; it can't reach zero
+ * yet because the file still holds one.
+ */
+ fput(channel->file);
+ channel->file = NULL;
+ io_ipc_channel_put(channel);
+ return ret;
+err_free_region:
+ ipc_region_free(channel);
+err_free_channel:
+ kfree(channel);
+ return ret;
+}
+
+int io_ipc_channel_attach(struct io_ring_ctx *ctx,
+ const struct io_uring_ipc_channel_attach __user *arg)
+{
+ struct io_uring_ipc_channel_attach attach;
+ struct io_ipc_channel *channel = NULL;
+ struct io_ipc_subscriber *sub;
+ int ret;
+
+ if (copy_from_user(&attach, arg, sizeof(attach)))
+ return -EFAULT;
+
+ if (!mem_is_zero(attach.reserved, sizeof(attach.reserved)))
+ return -EINVAL;
+
+ if (!attach.flags || (attach.flags & ~IOIPC_SUB_BOTH))
+ return -EINVAL;
+
+ if (attach.key)
+ channel = io_ipc_channel_get_by_key(attach.key);
+ else
+ channel = io_ipc_channel_get(attach.channel_id);
+
+ if (!channel)
+ return -ENOENT;
+
+ ret = ipc_check_permission(channel, attach.flags);
+ if (ret)
+ goto err_put_channel;
+
+ sub = kzalloc(sizeof(*sub), GFP_KERNEL);
+ if (!sub) {
+ ret = -ENOMEM;
+ goto err_put_channel;
+ }
+
+ sub->ctx = ctx;
+ sub->channel = channel;
+ sub->flags = attach.flags;
+ sub->local_head = 0;
+ sub->subscriber_id = atomic_inc_return(&ipc_global_sub_id);
+
+ ret = xa_insert(&channel->subscribers, sub->subscriber_id, sub,
+ GFP_KERNEL);
+ if (ret) {
+ kfree(sub);
+ goto err_put_channel;
+ }
+
+ refcount_inc(&channel->ref_count);
+ if (attach.flags & IOIPC_SUB_RECV)
+ atomic_inc(&channel->recv_count);
+
+ ret = xa_insert(&ctx->ipc_subscribers, sub->subscriber_id, sub,
+ GFP_KERNEL);
+ if (ret)
+ goto err_remove_chan_sub;
+
+ ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
+ if (ret < 0)
+ goto err_remove_sub;
+
+ attach.local_id_out = sub->subscriber_id;
+ attach.region_size = io_region_size(channel->region);
+ attach.channel_fd = ret;
+ attach.mmap_offset_out = 0;
+
+ if (copy_to_user((void __user *)arg, &attach, sizeof(attach))) {
+ put_unused_fd(ret);
+ ret = -EFAULT;
+ goto err_remove_sub;
+ }
+
+ {
+ struct file *f = get_file_active(&channel->file);
+
+ if (!f) {
+ put_unused_fd(attach.channel_fd);
+ ret = -EINVAL;
+ goto err_remove_sub;
+ }
+ fd_install(attach.channel_fd, f);
+ }
+
+ io_ipc_channel_put(channel);
+ return 0;
+
+err_remove_sub:
+ xa_erase(&ctx->ipc_subscribers, sub->subscriber_id);
+err_remove_chan_sub:
+ xa_erase(&channel->subscribers, sub->subscriber_id);
+ if (attach.flags & IOIPC_SUB_RECV)
+ atomic_dec(&channel->recv_count);
+ kfree_rcu(sub, rcu);
+ io_ipc_channel_put(channel); /* Drop subscriber's reference */
+err_put_channel:
+ io_ipc_channel_put(channel); /* Drop lookup reference */
+ return ret;
+}
+
+int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 subscriber_id)
+{
+ struct io_ipc_subscriber *sub;
+ struct io_ipc_channel *channel;
+
+ sub = xa_erase(&ctx->ipc_subscribers, subscriber_id);
+ if (!sub)
+ return -ENOENT;
+
+ channel = sub->channel;
+ xa_erase(&channel->subscribers, subscriber_id);
+
+ if (sub->flags & IOIPC_SUB_RECV)
+ atomic_dec(&channel->recv_count);
+
+ io_ipc_channel_put(channel);
+ kfree_rcu(sub, rcu);
+ return 0;
+}
+
+/*
+ * Recompute consumer.head as the minimum local_head across all receive
+ * subscribers. Called lazily from the broadcast recv path (every 16
+ * messages) and from the send path when the ring appears full.
+ */
+static void ipc_advance_consumer_head(struct io_ipc_channel *channel)
+{
+ struct io_ipc_subscriber *s;
+ struct io_ipc_ring *ring = channel->ring;
+ unsigned long index;
+ u32 min_head = READ_ONCE(ring->producer.tail);
+
+ rcu_read_lock();
+ xa_for_each(&channel->subscribers, index, s) {
+ if (s->flags & IOIPC_SUB_RECV) {
+ u32 sh = READ_ONCE(s->local_head);
+
+ if ((s32)(sh - min_head) < 0)
+ min_head = sh;
+ }
+ }
+ rcu_read_unlock();
+
+ WRITE_ONCE(ring->consumer.head, min_head);
+}
+
+static int ipc_wake_receivers(struct io_ipc_channel *channel, u32 target)
+{
+ struct io_ipc_subscriber *sub;
+ unsigned long index;
+
+ rcu_read_lock();
+ if (target) {
+ sub = xa_load(&channel->subscribers, target);
+ if (!sub || !(sub->flags & IOIPC_SUB_RECV)) {
+ rcu_read_unlock();
+ return -ENOENT;
+ }
+ io_cqring_wake(sub->ctx);
+ rcu_read_unlock();
+ return 0;
+ }
+
+ if (channel->flags & IOIPC_F_MULTICAST) {
+ u32 recv_cnt = atomic_read(&channel->recv_count);
+
+ if (recv_cnt) {
+ u32 rr = (u32)atomic_inc_return(&channel->next_receiver_idx);
+ u32 target_idx = rr % recv_cnt;
+ u32 i = 0;
+
+ xa_for_each(&channel->subscribers, index, sub) {
+ if (sub->flags & IOIPC_SUB_RECV) {
+ if (i == target_idx) {
+ io_cqring_wake(sub->ctx);
+ break;
+ }
+ i++;
+ }
+ }
+ }
+ } else {
+ xa_for_each(&channel->subscribers, index, sub) {
+ if (sub->flags & IOIPC_SUB_RECV) {
+ io_cqring_wake(sub->ctx);
+ if (!(channel->flags & IOIPC_F_BROADCAST))
+ break;
+ }
+ }
+ }
+ rcu_read_unlock();
+ return 0;
+}
+
+int io_ipc_send_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+ struct io_ipc_send *ipc = io_kiocb_to_cmd(req, struct io_ipc_send);
+
+ if (sqe->buf_index || sqe->personality)
+ return -EINVAL;
+
+ if (sqe->ioprio || sqe->rw_flags)
+ return -EINVAL;
+
+ ipc->channel_id = READ_ONCE(sqe->fd);
+ ipc->addr = READ_ONCE(sqe->addr);
+ ipc->len = READ_ONCE(sqe->len);
+ ipc->target = READ_ONCE(sqe->file_index);
+
+ return 0;
+}
+
+int io_ipc_send(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_ipc_send *ipc = io_kiocb_to_cmd(req, struct io_ipc_send);
+ struct io_ipc_subscriber *sub = NULL;
+ struct io_ipc_channel *channel = NULL;
+ struct io_ipc_ring *ring;
+ struct io_ipc_msg_desc *desc;
+ void __user *user_buf;
+ void *dest;
+ u32 head, tail, next_tail, idx;
+ u32 sub_flags;
+ int ret;
+ u32 fd = ipc->channel_id;
+ bool need_put = false;
+ bool retried_advance = false;
+
+ /* O(1) subscriber lookup via xarray */
+ rcu_read_lock();
+ sub = xa_load(&req->ctx->ipc_subscribers, fd);
+ if (sub) {
+ channel = sub->channel;
+ if (!refcount_inc_not_zero(&channel->ref_count)) {
+ rcu_read_unlock();
+ ret = -ENOENT;
+ goto fail;
+ }
+ sub_flags = sub->flags;
+ rcu_read_unlock();
+ need_put = true;
+
+ if (!(sub_flags & IOIPC_SUB_SEND)) {
+ ret = -EACCES;
+ goto fail_put;
+ }
+ goto found;
+ }
+ rcu_read_unlock();
+
+ channel = io_ipc_channel_get(fd);
+ if (!channel) {
+ ret = -ENOENT;
+ goto fail;
+ }
+
+ need_put = true;
+
+ ret = ipc_check_permission(channel, IOIPC_SUB_SEND);
+ if (ret)
+ goto fail_put;
+
+found:
+ ring = channel->ring;
+
+ if (unlikely(ipc->len > channel->msg_max_size)) {
+ ret = -EMSGSIZE;
+ goto fail_put;
+ }
+
+ /* Lock-free slot reservation via CAS on producer tail */
+retry:
+ do {
+ tail = READ_ONCE(ring->producer.tail);
+ next_tail = tail + 1;
+ head = READ_ONCE(ring->consumer.head);
+
+ if (unlikely(next_tail - head > ring->ring_entries)) {
+ /*
+ * Ring full. For broadcast channels, try to
+ * advance consumer.head once by scanning the
+ * min local_head across all receivers.
+ */
+ if ((channel->flags & IOIPC_F_BROADCAST) &&
+ !retried_advance) {
+ ipc_advance_consumer_head(channel);
+ retried_advance = true;
+ goto retry;
+ }
+ ret = -ENOBUFS;
+ goto fail_put;
+ }
+ idx = tail & ring->ring_mask;
+ } while (cmpxchg(&ring->producer.tail, tail, next_tail) != tail);
+
+ /* Slot exclusively claimed — write data and descriptor */
+ dest = channel->data_region + (idx * channel->msg_max_size);
+ desc = &channel->desc_array[idx];
+
+ prefetchw(desc);
+
+ user_buf = u64_to_user_ptr(ipc->addr);
+ if (unlikely(__copy_from_user_inatomic(dest, user_buf, ipc->len))) {
+ if (unlikely(copy_from_user(dest, user_buf, ipc->len))) {
+ /*
+ * Slot already claimed via CAS; mark it ready with
+ * zero length so consumers can advance past it.
+ */
+ desc->offset = idx * channel->msg_max_size;
+ desc->len = 0;
+ desc->msg_id = 0;
+ desc->sender_data = 0;
+ smp_wmb();
+ WRITE_ONCE(desc->seq, tail + 1);
+ ret = -EFAULT;
+ goto fail_put;
+ }
+ }
+
+ desc->offset = idx * channel->msg_max_size;
+ desc->len = ipc->len;
+ desc->msg_id = atomic_inc_return(&channel->next_msg_id);
+ desc->sender_data = req->cqe.user_data;
+
+ /* Ensure descriptor + data visible before marking slot ready */
+ smp_wmb();
+ WRITE_ONCE(desc->seq, tail + 1);
+
+ ret = ipc_wake_receivers(channel, ipc->target);
+ if (ret)
+ goto fail_put;
+
+ if (need_put)
+ io_ipc_channel_put(channel);
+
+ io_req_set_res(req, ipc->len, 0);
+ return IOU_COMPLETE;
+
+fail_put:
+ if (need_put)
+ io_ipc_channel_put(channel);
+fail:
+ req_set_fail(req);
+ io_req_set_res(req, ret, 0);
+ return IOU_COMPLETE;
+}
+
+
+int io_ipc_recv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+ struct io_ipc_recv *ipc = io_kiocb_to_cmd(req, struct io_ipc_recv);
+
+ if (sqe->buf_index || sqe->personality)
+ return -EINVAL;
+
+ if (sqe->ioprio || sqe->rw_flags)
+ return -EINVAL;
+
+ ipc->channel_id = READ_ONCE(sqe->fd);
+ ipc->addr = READ_ONCE(sqe->addr);
+ ipc->len = READ_ONCE(sqe->len);
+
+ return 0;
+}
+
+int io_ipc_recv(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_ipc_recv *ipc = io_kiocb_to_cmd(req, struct io_ipc_recv);
+ struct io_ipc_subscriber *sub = NULL;
+ struct io_ipc_channel *channel;
+ struct io_ipc_ring *ring;
+ struct io_ipc_msg_desc *desc;
+ void __user *user_buf;
+ void *src;
+ u32 head, tail, idx;
+ u32 sub_flags, sub_id;
+ size_t copy_len;
+ int ret;
+
+ /* O(1) subscriber lookup via xarray */
+ rcu_read_lock();
+ sub = xa_load(&req->ctx->ipc_subscribers, ipc->channel_id);
+ if (sub) {
+ channel = sub->channel;
+ if (!refcount_inc_not_zero(&channel->ref_count)) {
+ rcu_read_unlock();
+ ret = -ENOENT;
+ goto fail;
+ }
+ sub_flags = sub->flags;
+ sub_id = sub->subscriber_id;
+ head = READ_ONCE(sub->local_head);
+ rcu_read_unlock();
+ goto found;
+ }
+ rcu_read_unlock();
+ ret = -ENOENT;
+ goto fail;
+
+found:
+ ring = channel->ring;
+
+ if (!(sub_flags & IOIPC_SUB_RECV)) {
+ ret = -EACCES;
+ goto fail_put;
+ }
+
+ /*
+ * For multicast, competing consumers share consumer.head directly
+ * instead of using per-subscriber local_head.
+ */
+ if (channel->flags & IOIPC_F_MULTICAST)
+ head = READ_ONCE(ring->consumer.head);
+
+ /* Check if there are messages available - punt to io-wq for retry */
+ tail = READ_ONCE(ring->producer.tail);
+ if (unlikely(head == tail)) {
+ io_ipc_channel_put(channel);
+ return -EAGAIN;
+ }
+
+ idx = head & ring->ring_mask;
+ desc = &channel->desc_array[idx];
+
+ /*
+ * Verify slot is fully written by the producer. With lock-free
+ * CAS-based send, tail advances before data is written; the
+ * per-slot seq number signals completion.
+ * Pairs with smp_wmb() + WRITE_ONCE(desc->seq) in send path.
+ */
+ if (READ_ONCE(desc->seq) != head + 1) {
+ io_ipc_channel_put(channel);
+ return -EAGAIN;
+ }
+
+ smp_rmb();
+
+ src = channel->data_region + desc->offset;
+ prefetch(src);
+
+ copy_len = min_t(u32, desc->len, ipc->len);
+ user_buf = u64_to_user_ptr(ipc->addr);
+
+ if (unlikely(__copy_to_user_inatomic(user_buf, src, copy_len))) {
+ if (unlikely(copy_to_user(user_buf, src, copy_len))) {
+ ret = -EFAULT;
+ goto fail_put;
+ }
+ }
+
+ if (channel->flags & IOIPC_F_MULTICAST) {
+ /*
+ * Multicast: atomically advance the shared consumer head.
+ * Losers retry via -EAGAIN / io-wq.
+ */
+ if (cmpxchg(&ring->consumer.head, head, head + 1) != head) {
+ io_ipc_channel_put(channel);
+ return -EAGAIN;
+ }
+ } else {
+ /*
+ * Re-find subscriber under RCU for atomic local_head update.
+ * O(1) xarray lookup; subscriber is stable while channel
+ * ref is held and we're under RCU.
+ */
+ rcu_read_lock();
+ sub = xa_load(&req->ctx->ipc_subscribers, sub_id);
+ if (!sub) {
+ rcu_read_unlock();
+ goto done;
+ }
+
+ if (cmpxchg(&sub->local_head, head, head + 1) != head) {
+ rcu_read_unlock();
+ io_ipc_channel_put(channel);
+ return -EAGAIN;
+ }
+
+ if (channel->flags & IOIPC_F_BROADCAST) {
+ /*
+ * Lazy consumer.head advancement: only scan min-head
+ * every 16 messages to amortize the O(N) walk.
+ * The send path also triggers this on ring-full.
+ */
+ if ((head & 0xf) == 0)
+ ipc_advance_consumer_head(channel);
+ } else {
+ /* Unicast: single consumer, update directly */
+ WRITE_ONCE(ring->consumer.head, head + 1);
+ }
+ rcu_read_unlock();
+ }
+
+done:
+ io_ipc_channel_put(channel);
+ io_req_set_res(req, copy_len, 0);
+ return IOU_COMPLETE;
+
+fail_put:
+ io_ipc_channel_put(channel);
+fail:
+ req_set_fail(req);
+ io_req_set_res(req, ret, 0);
+ return IOU_COMPLETE;
+}
+
+int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 channel_id)
+{
+ struct io_ipc_channel *channel;
+ struct file *f;
+
+ /*
+ * Atomically remove from global lookup. This prevents a second
+ * destroy call from finding the channel and draining refcount
+ * below the number of outstanding references.
+ */
+ channel = xa_erase(&channel_xa, channel_id);
+ if (!channel)
+ return -ENOENT;
+
+ spin_lock_bh(&channel_hash_lock);
+ if (!hlist_unhashed(&channel->hash_node))
+ hash_del_rcu(&channel->hash_node);
+ spin_unlock_bh(&channel_hash_lock);
+
+ /*
+ * Break the reference cycle between channel and file.
+ * The channel holds a file reference (channel->file) and the
+ * file's release callback holds a channel reference via
+ * private_data. Detach the file here so that the release
+ * callback becomes a no-op and the cycle is broken.
+ */
+ f = channel->file;
+ if (f) {
+ WRITE_ONCE(f->private_data, NULL);
+ channel->file = NULL;
+ fput(f);
+ io_ipc_channel_put(channel); /* Drop file's reference */
+ }
+
+ /* Drop the creator's initial reference */
+ io_ipc_channel_put(channel);
+
+ return 0;
+}
+
+void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx)
+{
+ struct io_ipc_subscriber *sub;
+ struct io_ipc_channel *channel;
+ unsigned long index;
+ void *entry;
+
+ xa_for_each(&ctx->ipc_subscribers, index, sub) {
+ channel = sub->channel;
+
+ xa_erase(&ctx->ipc_subscribers, index);
+ xa_erase(&channel->subscribers, sub->subscriber_id);
+
+ if (sub->flags & IOIPC_SUB_RECV)
+ atomic_dec(&channel->recv_count);
+
+ io_ipc_channel_put(channel);
+ kfree_rcu(sub, rcu);
+ }
+ xa_destroy(&ctx->ipc_subscribers);
+
+ /*
+ * Destroy any channels created by this ring that were not
+ * explicitly destroyed. io_ipc_channel_destroy() is
+ * idempotent — it returns -ENOENT if the channel was
+ * already destroyed.
+ */
+ xa_for_each(&ctx->ipc_channels, index, entry) {
+ io_ipc_channel_destroy(ctx, index);
+ }
+ xa_destroy(&ctx->ipc_channels);
+}
+
+#endif /* CONFIG_IO_URING_IPC */
diff --git a/io_uring/ipc.h b/io_uring/ipc.h
new file mode 100644
index 000000000000..65d143c3e520
--- /dev/null
+++ b/io_uring/ipc.h
@@ -0,0 +1,161 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef IO_URING_IPC_H
+#define IO_URING_IPC_H
+
+#include <linux/io_uring_types.h>
+
+#ifdef CONFIG_IO_URING_IPC
+
+/*
+ * Internal kernel structures for io_uring IPC
+ */
+
+/* Shared ring structure - lives in mmap'd memory region */
+struct io_ipc_ring {
+ /* Cache-aligned producer/consumer positions */
+ struct {
+ u32 head __aligned(64);
+ u32 tail;
+ } producer;
+
+ struct {
+ u32 head __aligned(64);
+ } consumer;
+
+ /* Ring parameters */
+ u32 ring_mask;
+ u32 ring_entries;
+ u32 max_msg_size;
+
+ /* Message descriptors follow inline */
+};
+
+/* Message descriptor in the ring */
+struct io_ipc_msg_desc {
+ u64 offset; /* Offset in data region for message payload */
+ u32 len; /* Message length */
+ u32 msg_id; /* Unique message ID */
+ u64 sender_data; /* Sender's user_data for context */
+ u32 seq; /* Lock-free completion sequence number */
+};
+
+/* Per-subscriber attachment to a channel */
+struct io_ipc_subscriber {
+ u32 local_head __aligned(64); /* Cache-aligned to prevent false sharing */
+ u32 subscriber_id; /* Unique subscriber ID */
+ u32 flags; /* IOIPC_SUB_SEND, IOIPC_SUB_RECV */
+
+ struct io_ring_ctx *ctx; /* io_uring context */
+ struct io_ipc_channel *channel; /* Channel this subscriber is attached to */
+ struct rcu_head rcu; /* For kfree_rcu deferred freeing */
+};
+
+/* IPC channel connecting two or more io_uring instances */
+struct io_ipc_channel {
+ struct io_mapped_region *region; /* Shared memory region */
+ struct io_ipc_ring *ring; /* Shared ring structure in mmap'd region */
+ void *data_region; /* Data storage area for messages */
+ struct io_ipc_msg_desc *desc_array; /* Cached descriptor array base */
+ struct file *file; /* Anonymous file for mmap support */
+
+ /* Subscribers to this channel */
+ struct xarray subscribers; /* All subscribers */
+ atomic_t recv_count; /* Cached count of IOIPC_SUB_RECV subscribers */
+
+ /* Channel metadata */
+ refcount_t ref_count;
+ u32 channel_id;
+ u32 flags; /* IOIPC_F_BROADCAST, IOIPC_F_MULTICAST */
+ u64 key; /* Unique key for lookup */
+
+ /* Ring buffer configuration */
+ u32 msg_max_size;
+
+ /* Access control */
+ kuid_t owner_uid;
+ kgid_t owner_gid;
+ u16 mode; /* Permission bits */
+
+ /* Next message ID */
+ atomic_t next_msg_id;
+
+ /* For multicast round-robin */
+ atomic_t next_receiver_idx;
+
+ /* Channel lifecycle */
+ struct rcu_head rcu;
+ struct hlist_node hash_node; /* For global channel hash table */
+};
+
+/* Request state for IPC operations */
+struct io_ipc_send {
+ struct file *file;
+ u64 addr;
+ u32 channel_id;
+ size_t len;
+ u32 target;
+};
+
+struct io_ipc_recv {
+ struct file *file;
+ u64 addr;
+ u32 channel_id;
+ size_t len;
+};
+
+/* Function declarations */
+
+/* Registration operations */
+int io_ipc_channel_create(struct io_ring_ctx *ctx,
+ const struct io_uring_ipc_channel_create __user *arg);
+int io_ipc_channel_attach(struct io_ring_ctx *ctx,
+ const struct io_uring_ipc_channel_attach __user *arg);
+int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 channel_id);
+
+/* Operation prep and execution */
+int io_ipc_send_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_ipc_send(struct io_kiocb *req, unsigned int issue_flags);
+
+int io_ipc_recv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_ipc_recv(struct io_kiocb *req, unsigned int issue_flags);
+
+/* Channel lifecycle */
+int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 channel_id);
+void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx);
+
+/* Channel lookup and management */
+struct io_ipc_channel *io_ipc_channel_get(u32 channel_id);
+struct io_ipc_channel *io_ipc_channel_get_by_key(u64 key);
+void io_ipc_channel_put(struct io_ipc_channel *channel);
+
+#else /* !CONFIG_IO_URING_IPC */
+
+static inline int io_ipc_channel_create(struct io_ring_ctx *ctx,
+ const struct io_uring_ipc_channel_create __user *arg)
+{
+ return -EOPNOTSUPP;
+}
+
+static inline int io_ipc_channel_attach(struct io_ring_ctx *ctx,
+ const struct io_uring_ipc_channel_attach __user *arg)
+{
+ return -EOPNOTSUPP;
+}
+
+static inline int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 channel_id)
+{
+ return -EOPNOTSUPP;
+}
+
+static inline int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 channel_id)
+{
+ return -EOPNOTSUPP;
+}
+
+static inline void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx)
+{
+}
+
+#endif /* CONFIG_IO_URING_IPC */
+
+#endif /* IO_URING_IPC_H */
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index 645980fa4651..658aa36efda2 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -38,6 +38,7 @@
#include "futex.h"
#include "truncate.h"
#include "zcrx.h"
+#include "ipc.h"
static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
{
@@ -599,6 +600,18 @@ const struct io_issue_def io_issue_defs[] = {
.prep = io_uring_cmd_prep,
.issue = io_uring_cmd,
},
+ [IORING_OP_IPC_SEND] = {
+ .audit_skip = 1,
+ .async_size = sizeof(struct io_ipc_send),
+ .prep = io_ipc_send_prep,
+ .issue = io_ipc_send,
+ },
+ [IORING_OP_IPC_RECV] = {
+ .audit_skip = 1,
+ .async_size = sizeof(struct io_ipc_recv),
+ .prep = io_ipc_recv_prep,
+ .issue = io_ipc_recv,
+ },
};
const struct io_cold_def io_cold_defs[] = {
@@ -857,6 +870,12 @@ const struct io_cold_def io_cold_defs[] = {
.sqe_copy = io_uring_cmd_sqe_copy,
.cleanup = io_uring_cmd_cleanup,
},
+ [IORING_OP_IPC_SEND] = {
+ .name = "IPC_SEND",
+ },
+ [IORING_OP_IPC_RECV] = {
+ .name = "IPC_RECV",
+ },
};
const char *io_uring_get_opcode(u8 opcode)
diff --git a/io_uring/register.c b/io_uring/register.c
index 0148735f7711..7646dbb2d572 100644
--- a/io_uring/register.c
+++ b/io_uring/register.c
@@ -34,6 +34,7 @@
#include "zcrx.h"
#include "query.h"
#include "bpf_filter.h"
+#include "ipc.h"
#define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \
IORING_REGISTER_LAST + IORING_OP_LAST)
@@ -930,6 +931,30 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
WRITE_ONCE(ctx->bpf_filters,
ctx->restrictions.bpf_filters->filters);
break;
+ case IORING_REGISTER_IPC_CHANNEL_CREATE:
+ ret = -EINVAL;
+ if (!arg || nr_args != 1)
+ break;
+ ret = io_ipc_channel_create(ctx, arg);
+ break;
+ case IORING_REGISTER_IPC_CHANNEL_ATTACH:
+ ret = -EINVAL;
+ if (!arg || nr_args != 1)
+ break;
+ ret = io_ipc_channel_attach(ctx, arg);
+ break;
+ case IORING_REGISTER_IPC_CHANNEL_DETACH:
+ ret = -EINVAL;
+ if (arg || !nr_args)
+ break;
+ ret = io_ipc_channel_detach(ctx, nr_args);
+ break;
+ case IORING_REGISTER_IPC_CHANNEL_DESTROY:
+ ret = -EINVAL;
+ if (arg || !nr_args)
+ break;
+ ret = io_ipc_channel_destroy(ctx, nr_args);
+ break;
default:
ret = -EINVAL;
break;
--
2.52.0
^ permalink raw reply related [flat|nested] 6+ messages in thread* [RFC PATCH 2/2] selftests/ipc: Add io_uring IPC selftest
2026-03-13 13:07 [RFC PATCH 0/2] io_uring: add IPC channel infrastructure Daniel Hodges
2026-03-13 13:07 ` [RFC PATCH 1/2] io_uring: add high-performance " Daniel Hodges
@ 2026-03-13 13:07 ` Daniel Hodges
2026-03-14 13:50 ` [RFC PATCH 0/2] io_uring: add IPC channel infrastructure Daniel Hodges
2 siblings, 0 replies; 6+ messages in thread
From: Daniel Hodges @ 2026-03-13 13:07 UTC (permalink / raw)
To: Jens Axboe; +Cc: Daniel Hodges, Pavel Begunkov, io-uring, linux-kernel
Add selftests for io_uring IPC channels. Tests cover send/recv,
broadcast, detach, permission enforcement, ring full, message
truncation, slot reuse, and cross-process operation.
Signed-off-by: Daniel Hodges <git@danielhodges.dev>
---
MAINTAINERS | 1 +
tools/testing/selftests/ipc/Makefile | 2 +-
tools/testing/selftests/ipc/io_uring_ipc.c | 1265 ++++++++++++++++++++
3 files changed, 1267 insertions(+), 1 deletion(-)
create mode 100644 tools/testing/selftests/ipc/io_uring_ipc.c
diff --git a/MAINTAINERS b/MAINTAINERS
index 837db4f7bcca..d43f59e31f03 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -13432,6 +13432,7 @@ F: include/trace/events/io_uring.h
F: include/uapi/linux/io_uring.h
F: include/uapi/linux/io_uring/
F: io_uring/
+F: tools/testing/selftests/ipc/io_uring_ipc*
IO_URING ZCRX
M: Pavel Begunkov <asml.silence@gmail.com>
diff --git a/tools/testing/selftests/ipc/Makefile b/tools/testing/selftests/ipc/Makefile
index 50e9c299fc4a..74bc45b555f8 100644
--- a/tools/testing/selftests/ipc/Makefile
+++ b/tools/testing/selftests/ipc/Makefile
@@ -12,7 +12,7 @@ endif
CFLAGS += $(KHDR_INCLUDES)
-TEST_GEN_PROGS := msgque
+TEST_GEN_PROGS := msgque io_uring_ipc
include ../lib.mk
diff --git a/tools/testing/selftests/ipc/io_uring_ipc.c b/tools/testing/selftests/ipc/io_uring_ipc.c
new file mode 100644
index 000000000000..a82988351e02
--- /dev/null
+++ b/tools/testing/selftests/ipc/io_uring_ipc.c
@@ -0,0 +1,1265 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * io_uring IPC selftest
+ *
+ * Tests the io_uring IPC channel functionality including:
+ * - Channel creation and attachment
+ * - Message send and receive (broadcast and non-broadcast)
+ * - Broadcast delivery to multiple receivers
+ * - Channel detach
+ * - Permission enforcement (send-only, recv-only)
+ * - Ring full and message size limits
+ * - Multiple message ordering
+ * - Invalid parameter rejection
+ * - Cross-process communication
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <linux/io_uring.h>
+
+/* Check if IO_URING_IPC is supported */
+#ifndef IORING_OP_IPC_SEND
+#define IORING_OP_IPC_SEND 65
+#define IORING_OP_IPC_RECV 66
+
+#define IORING_REGISTER_IPC_CHANNEL_CREATE 38
+#define IORING_REGISTER_IPC_CHANNEL_ATTACH 39
+#define IORING_REGISTER_IPC_CHANNEL_DETACH 40
+#define IORING_REGISTER_IPC_CHANNEL_DESTROY 41
+
+/* Flags for IPC channel creation */
+#define IOIPC_F_BROADCAST (1U << 0)
+#define IOIPC_F_MULTICAST (1U << 1)
+#define IOIPC_F_PRIVATE (1U << 2)
+
+/* Flags for subscriber attachment */
+#define IOIPC_SUB_SEND (1U << 0)
+#define IOIPC_SUB_RECV (1U << 1)
+#define IOIPC_SUB_BOTH (IOIPC_SUB_SEND | IOIPC_SUB_RECV)
+
+/* Create IPC channel */
+struct io_uring_ipc_channel_create {
+ __u32 flags;
+ __u32 ring_entries;
+ __u32 max_msg_size;
+ __u32 mode;
+ __u64 key;
+ __u32 channel_id_out;
+ __u32 reserved[3];
+};
+
+/* Attach to existing channel */
+struct io_uring_ipc_channel_attach {
+ __u32 channel_id;
+ __u32 flags;
+ __u64 key;
+ __s32 channel_fd;
+ __u32 local_id_out;
+ __u64 mmap_offset_out;
+ __u32 region_size;
+ __u32 reserved[3];
+};
+#endif
+
+#ifndef __NR_io_uring_setup
+#define __NR_io_uring_setup 425
+#endif
+
+#ifndef __NR_io_uring_enter
+#define __NR_io_uring_enter 426
+#endif
+
+#ifndef __NR_io_uring_register
+#define __NR_io_uring_register 427
+#endif
+
+#define QUEUE_DEPTH 32
+#define TEST_MSG "Hello from io_uring IPC!"
+#define TEST_KEY 0x12345678ULL
+#define KSFT_SKIP 4
+
+static int io_uring_setup(unsigned int entries, struct io_uring_params *p)
+{
+ return syscall(__NR_io_uring_setup, entries, p);
+}
+
+static int io_uring_enter(int fd, unsigned int to_submit, unsigned int min_complete,
+ unsigned int flags, sigset_t *sig)
+{
+ return syscall(__NR_io_uring_enter, fd, to_submit, min_complete,
+ flags, sig);
+}
+
+static int io_uring_register_syscall(int fd, unsigned int opcode, void *arg,
+ unsigned int nr_args)
+{
+ return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args);
+}
+
+struct io_uring {
+ int ring_fd;
+ struct io_uring_sqe *sqes;
+ struct io_uring_cqe *cqes;
+ unsigned int *sq_head;
+ unsigned int *sq_tail;
+ unsigned int *cq_head;
+ unsigned int *cq_tail;
+ unsigned int sq_ring_mask;
+ unsigned int cq_ring_mask;
+ unsigned int *sq_array;
+ void *sq_ring_ptr;
+ void *cq_ring_ptr;
+};
+
+static int setup_io_uring(struct io_uring *ring, unsigned int entries)
+{
+ struct io_uring_params p;
+ void *sq_ptr, *cq_ptr;
+ int ret;
+
+ memset(&p, 0, sizeof(p));
+ ret = io_uring_setup(entries, &p);
+ if (ret < 0)
+ return -errno;
+
+ ring->ring_fd = ret;
+ ring->sq_ring_mask = p.sq_entries - 1;
+ ring->cq_ring_mask = p.cq_entries - 1;
+
+ sq_ptr = mmap(NULL, p.sq_off.array + p.sq_entries * sizeof(unsigned int),
+ PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
+ ring->ring_fd, IORING_OFF_SQ_RING);
+ if (sq_ptr == MAP_FAILED) {
+ close(ring->ring_fd);
+ return -errno;
+ }
+ ring->sq_ring_ptr = sq_ptr;
+
+ ring->sqes = mmap(NULL, p.sq_entries * sizeof(struct io_uring_sqe),
+ PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
+ ring->ring_fd, IORING_OFF_SQES);
+ if (ring->sqes == MAP_FAILED) {
+ munmap(sq_ptr, p.sq_off.array + p.sq_entries * sizeof(unsigned int));
+ close(ring->ring_fd);
+ return -errno;
+ }
+
+ cq_ptr = mmap(NULL, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
+ PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
+ ring->ring_fd, IORING_OFF_CQ_RING);
+ if (cq_ptr == MAP_FAILED) {
+ munmap(ring->sqes, p.sq_entries * sizeof(struct io_uring_sqe));
+ munmap(sq_ptr, p.sq_off.array + p.sq_entries * sizeof(unsigned int));
+ close(ring->ring_fd);
+ return -errno;
+ }
+ ring->cq_ring_ptr = cq_ptr;
+
+ ring->sq_head = sq_ptr + p.sq_off.head;
+ ring->sq_tail = sq_ptr + p.sq_off.tail;
+ ring->sq_array = sq_ptr + p.sq_off.array;
+ ring->cq_head = cq_ptr + p.cq_off.head;
+ ring->cq_tail = cq_ptr + p.cq_off.tail;
+ ring->cqes = cq_ptr + p.cq_off.cqes;
+
+ return 0;
+}
+
+static void cleanup_io_uring(struct io_uring *ring)
+{
+ close(ring->ring_fd);
+}
+
+static struct io_uring_sqe *get_sqe(struct io_uring *ring)
+{
+ unsigned int tail = *ring->sq_tail;
+ unsigned int index = tail & ring->sq_ring_mask;
+ struct io_uring_sqe *sqe = &ring->sqes[index];
+
+ tail++;
+ *ring->sq_tail = tail;
+ ring->sq_array[index] = index;
+
+ memset(sqe, 0, sizeof(*sqe));
+ return sqe;
+}
+
+static int submit_and_wait(struct io_uring *ring, struct io_uring_cqe **cqe_ptr)
+{
+ unsigned int to_submit = *ring->sq_tail - *ring->sq_head;
+ unsigned int head;
+ int ret;
+
+ if (to_submit) {
+ ret = io_uring_enter(ring->ring_fd, to_submit, 0, 0, NULL);
+ if (ret < 0)
+ return -errno;
+ }
+
+ ret = io_uring_enter(ring->ring_fd, 0, 1, IORING_ENTER_GETEVENTS, NULL);
+ if (ret < 0)
+ return -errno;
+
+ head = *ring->cq_head;
+ if (head == *ring->cq_tail)
+ return -EAGAIN;
+
+ *cqe_ptr = &ring->cqes[head & ring->cq_ring_mask];
+ return 0;
+}
+
+static void cqe_seen(struct io_uring *ring)
+{
+ (*ring->cq_head)++;
+}
+
+static int create_channel(struct io_uring *ring, __u32 flags, __u32 ring_entries,
+ __u32 max_msg_size, __u64 key, unsigned int *id_out)
+{
+ struct io_uring_ipc_channel_create create;
+ int ret;
+
+ memset(&create, 0, sizeof(create));
+ create.flags = flags;
+ create.ring_entries = ring_entries;
+ create.max_msg_size = max_msg_size;
+ create.mode = 0666;
+ create.key = key;
+
+ ret = io_uring_register_syscall(ring->ring_fd,
+ IORING_REGISTER_IPC_CHANNEL_CREATE,
+ &create, 1);
+ if (ret < 0)
+ return -errno;
+
+ *id_out = create.channel_id_out;
+ return 0;
+}
+
+static int attach_channel(struct io_uring *ring, __u64 key, __u32 sub_flags,
+ unsigned int *local_id_out)
+{
+ struct io_uring_ipc_channel_attach attach;
+ int ret;
+
+ memset(&attach, 0, sizeof(attach));
+ attach.key = key;
+ attach.flags = sub_flags;
+
+ ret = io_uring_register_syscall(ring->ring_fd,
+ IORING_REGISTER_IPC_CHANNEL_ATTACH,
+ &attach, 1);
+ if (ret < 0)
+ return -errno;
+
+ *local_id_out = attach.local_id_out;
+ return 0;
+}
+
+static int attach_channel_by_id(struct io_uring *ring, __u32 channel_id,
+ __u32 sub_flags, unsigned int *local_id_out)
+{
+ struct io_uring_ipc_channel_attach attach;
+ int ret;
+
+ memset(&attach, 0, sizeof(attach));
+ attach.channel_id = channel_id;
+ attach.key = 0;
+ attach.flags = sub_flags;
+
+ ret = io_uring_register_syscall(ring->ring_fd,
+ IORING_REGISTER_IPC_CHANNEL_ATTACH,
+ &attach, 1);
+ if (ret < 0)
+ return -errno;
+
+ *local_id_out = attach.local_id_out;
+ return 0;
+}
+
+static int detach_channel(struct io_uring *ring, unsigned int subscriber_id)
+{
+ int ret;
+
+ ret = io_uring_register_syscall(ring->ring_fd,
+ IORING_REGISTER_IPC_CHANNEL_DETACH,
+ NULL, subscriber_id);
+ if (ret < 0)
+ return -errno;
+
+ return 0;
+}
+
+static int destroy_channel(struct io_uring *ring, unsigned int channel_id)
+{
+ int ret;
+
+ ret = io_uring_register_syscall(ring->ring_fd,
+ IORING_REGISTER_IPC_CHANNEL_DESTROY,
+ NULL, channel_id);
+ if (ret < 0)
+ return -errno;
+
+ return 0;
+}
+
+static int do_send(struct io_uring *ring, unsigned int fd,
+ const void *msg, size_t len)
+{
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int ret, res;
+
+ sqe = get_sqe(ring);
+ sqe->opcode = IORING_OP_IPC_SEND;
+ sqe->fd = fd;
+ sqe->addr = (unsigned long)msg;
+ sqe->len = len;
+ sqe->user_data = 1;
+
+ ret = submit_and_wait(ring, &cqe);
+ if (ret < 0)
+ return ret;
+
+ res = cqe->res;
+ cqe_seen(ring);
+ return res;
+}
+
+static int do_recv(struct io_uring *ring, unsigned int fd,
+ void *buf, size_t len)
+{
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int ret, res;
+
+ sqe = get_sqe(ring);
+ sqe->opcode = IORING_OP_IPC_RECV;
+ sqe->fd = fd;
+ sqe->addr = (unsigned long)buf;
+ sqe->len = len;
+ sqe->user_data = 2;
+
+ ret = submit_and_wait(ring, &cqe);
+ if (ret < 0)
+ return ret;
+
+ res = cqe->res;
+ cqe_seen(ring);
+ return res;
+}
+
+static int do_send_targeted(struct io_uring *ring, unsigned int fd,
+ const void *msg, size_t len, __u32 target)
+{
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int ret, res;
+
+ sqe = get_sqe(ring);
+ sqe->opcode = IORING_OP_IPC_SEND;
+ sqe->fd = fd;
+ sqe->addr = (unsigned long)msg;
+ sqe->len = len;
+ sqe->user_data = 1;
+ sqe->file_index = target;
+
+ ret = submit_and_wait(ring, &cqe);
+ if (ret < 0)
+ return ret;
+
+ res = cqe->res;
+ cqe_seen(ring);
+ return res;
+}
+
+static int test_nonbroadcast(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ char recv_buf[256];
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 4096, TEST_KEY + 100, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 100, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = do_send(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1);
+ if (ret < 0)
+ goto fail;
+
+ memset(recv_buf, 0, sizeof(recv_buf));
+ ret = do_recv(&ring, sub_id, recv_buf, sizeof(recv_buf));
+ if (ret < 0)
+ goto fail;
+
+ if (strcmp(recv_buf, TEST_MSG) != 0)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_broadcast_multi(void)
+{
+ struct io_uring ring1, ring2;
+ unsigned int channel_id, sub1_id, sub2_id;
+ char buf1[256], buf2[256];
+ int ret;
+
+ ret = setup_io_uring(&ring1, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = setup_io_uring(&ring2, QUEUE_DEPTH);
+ if (ret < 0) {
+ cleanup_io_uring(&ring1);
+ return 1;
+ }
+
+ ret = create_channel(&ring1, IOIPC_F_BROADCAST, 16, 4096,
+ TEST_KEY + 200, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring1, TEST_KEY + 200, IOIPC_SUB_BOTH, &sub1_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring2, TEST_KEY + 200, IOIPC_SUB_RECV, &sub2_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = do_send(&ring1, sub1_id, TEST_MSG, strlen(TEST_MSG) + 1);
+ if (ret < 0)
+ goto fail;
+
+ /* Both subscribers must receive the same message */
+ memset(buf1, 0, sizeof(buf1));
+ ret = do_recv(&ring1, sub1_id, buf1, sizeof(buf1));
+ if (ret < 0)
+ goto fail;
+ if (strcmp(buf1, TEST_MSG) != 0)
+ goto fail;
+
+ memset(buf2, 0, sizeof(buf2));
+ ret = do_recv(&ring2, sub2_id, buf2, sizeof(buf2));
+ if (ret < 0)
+ goto fail;
+ if (strcmp(buf2, TEST_MSG) != 0)
+ goto fail;
+
+ cleanup_io_uring(&ring1);
+ cleanup_io_uring(&ring2);
+ return 0;
+fail:
+ cleanup_io_uring(&ring1);
+ cleanup_io_uring(&ring2);
+ return 1;
+}
+
+static int test_detach(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ char buf[256];
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 4096, TEST_KEY + 300, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 300, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = detach_channel(&ring, sub_id);
+ if (ret < 0)
+ goto fail;
+
+ /* After detach, recv should fail with ENOENT */
+ ret = do_recv(&ring, sub_id, buf, sizeof(buf));
+ if (ret != -ENOENT)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_recv_only_cannot_send(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 4096, TEST_KEY + 400, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 400, IOIPC_SUB_RECV, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = do_send(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1);
+ if (ret != -EACCES)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_send_only_cannot_recv(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ char buf[256];
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 4096, TEST_KEY + 500, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 500, IOIPC_SUB_SEND, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Send first so there's a message in the ring */
+ ret = do_send(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1);
+ if (ret < 0)
+ goto fail;
+
+ /* Recv should fail with EACCES */
+ ret = do_recv(&ring, sub_id, buf, sizeof(buf));
+ if (ret != -EACCES)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_ring_full(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ const char msg[] = "X";
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ /* ring_entries=2: can hold 2 messages before full */
+ ret = create_channel(&ring, 0, 2, 64, TEST_KEY + 600, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 600, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Fill the 2 slots */
+ ret = do_send(&ring, sub_id, msg, sizeof(msg));
+ if (ret < 0)
+ goto fail;
+
+ ret = do_send(&ring, sub_id, msg, sizeof(msg));
+ if (ret < 0)
+ goto fail;
+
+ /* Third send must fail */
+ ret = do_send(&ring, sub_id, msg, sizeof(msg));
+ if (ret != -ENOBUFS)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_msg_too_large(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ char big_msg[128];
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ /* max_msg_size=64 */
+ ret = create_channel(&ring, 0, 16, 64, TEST_KEY + 700, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 700, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ memset(big_msg, 'A', sizeof(big_msg));
+ ret = do_send(&ring, sub_id, big_msg, sizeof(big_msg));
+ if (ret != -EMSGSIZE)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+#define NUM_MULTI_MSGS 8
+
+static int test_multiple_messages(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ char send_buf[64], recv_buf[64];
+ int ret, i;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 64, TEST_KEY + 800, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 800, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ for (i = 0; i < NUM_MULTI_MSGS; i++) {
+ snprintf(send_buf, sizeof(send_buf), "msg-%d", i);
+ ret = do_send(&ring, sub_id, send_buf, strlen(send_buf) + 1);
+ if (ret < 0)
+ goto fail;
+ }
+
+ for (i = 0; i < NUM_MULTI_MSGS; i++) {
+ memset(recv_buf, 0, sizeof(recv_buf));
+ ret = do_recv(&ring, sub_id, recv_buf, sizeof(recv_buf));
+ if (ret < 0)
+ goto fail;
+ snprintf(send_buf, sizeof(send_buf), "msg-%d", i);
+ if (strcmp(recv_buf, send_buf) != 0)
+ goto fail;
+ }
+
+ /* Ring should be empty now */
+ ret = do_recv(&ring, sub_id, recv_buf, sizeof(recv_buf));
+ if (ret != -EAGAIN)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_invalid_params(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id;
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ /* Non-power-of-2 ring_entries */
+ ret = create_channel(&ring, 0, 3, 4096, TEST_KEY + 900, &channel_id);
+ if (ret != -EINVAL)
+ goto fail;
+
+ /* Zero ring_entries */
+ ret = create_channel(&ring, 0, 0, 4096, TEST_KEY + 901, &channel_id);
+ if (ret != -EINVAL)
+ goto fail;
+
+ /* Zero max_msg_size */
+ ret = create_channel(&ring, 0, 16, 0, TEST_KEY + 902, &channel_id);
+ if (ret != -EINVAL)
+ goto fail;
+
+ /* BROADCAST | MULTICAST together */
+ ret = create_channel(&ring, IOIPC_F_BROADCAST | IOIPC_F_MULTICAST,
+ 16, 4096, TEST_KEY + 903, &channel_id);
+ if (ret != -EINVAL)
+ goto fail;
+
+ /* Unsupported flags */
+ ret = create_channel(&ring, 0xFF00, 16, 4096, TEST_KEY + 904,
+ &channel_id);
+ if (ret != -EINVAL)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_attach_by_id(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ char recv_buf[256];
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 4096, TEST_KEY + 1000, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Attach using channel_id (key=0) instead of key */
+ ret = attach_channel_by_id(&ring, channel_id, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = do_send(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1);
+ if (ret < 0)
+ goto fail;
+
+ memset(recv_buf, 0, sizeof(recv_buf));
+ ret = do_recv(&ring, sub_id, recv_buf, sizeof(recv_buf));
+ if (ret < 0)
+ goto fail;
+
+ if (strcmp(recv_buf, TEST_MSG) != 0)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_recv_truncation(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ const char long_msg[] = "This message is longer than the receive buffer";
+ char small_buf[8];
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 4096, TEST_KEY + 1100, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 1100, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = do_send(&ring, sub_id, long_msg, sizeof(long_msg));
+ if (ret < 0)
+ goto fail;
+
+ memset(small_buf, 0, sizeof(small_buf));
+ ret = do_recv(&ring, sub_id, small_buf, sizeof(small_buf));
+ /* Should return truncated length, not full message length */
+ if (ret != sizeof(small_buf))
+ goto fail;
+
+ /* Verify we got the first bytes */
+ if (memcmp(small_buf, long_msg, sizeof(small_buf)) != 0)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_broadcast_slot_reuse(void)
+{
+ struct io_uring ring1, ring2;
+ unsigned int channel_id, sub1_id, sub2_id;
+ char buf[256];
+ const char msg1[] = "first";
+ const char msg2[] = "second";
+ const char msg3[] = "third";
+ int ret;
+
+ ret = setup_io_uring(&ring1, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = setup_io_uring(&ring2, QUEUE_DEPTH);
+ if (ret < 0) {
+ cleanup_io_uring(&ring1);
+ return 1;
+ }
+
+ /* ring_entries=2: only 2 slots available */
+ ret = create_channel(&ring1, IOIPC_F_BROADCAST, 2, 256,
+ TEST_KEY + 1200, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring1, TEST_KEY + 1200, IOIPC_SUB_BOTH, &sub1_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring2, TEST_KEY + 1200, IOIPC_SUB_RECV, &sub2_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Fill both slots */
+ ret = do_send(&ring1, sub1_id, msg1, sizeof(msg1));
+ if (ret < 0)
+ goto fail;
+
+ ret = do_send(&ring1, sub1_id, msg2, sizeof(msg2));
+ if (ret < 0)
+ goto fail;
+
+ /* Ring is full now -- third send should fail */
+ ret = do_send(&ring1, sub1_id, msg3, sizeof(msg3));
+ if (ret != -ENOBUFS)
+ goto fail;
+
+ /* sub1 consumes both messages */
+ ret = do_recv(&ring1, sub1_id, buf, sizeof(buf));
+ if (ret < 0)
+ goto fail;
+
+ ret = do_recv(&ring1, sub1_id, buf, sizeof(buf));
+ if (ret < 0)
+ goto fail;
+
+ /*
+ * Ring should still be full from the producer's perspective because
+ * sub2 hasn't consumed yet -- min_head stays at 0.
+ */
+ ret = do_send(&ring1, sub1_id, msg3, sizeof(msg3));
+ if (ret != -ENOBUFS)
+ goto fail;
+
+ /* sub2 consumes both messages -- now min_head advances */
+ ret = do_recv(&ring2, sub2_id, buf, sizeof(buf));
+ if (ret < 0)
+ goto fail;
+
+ ret = do_recv(&ring2, sub2_id, buf, sizeof(buf));
+ if (ret < 0)
+ goto fail;
+
+ /* Now the slots should be reusable */
+ ret = do_send(&ring1, sub1_id, msg3, sizeof(msg3));
+ if (ret < 0)
+ goto fail;
+
+ memset(buf, 0, sizeof(buf));
+ ret = do_recv(&ring1, sub1_id, buf, sizeof(buf));
+ if (ret < 0)
+ goto fail;
+
+ if (strcmp(buf, msg3) != 0)
+ goto fail;
+
+ cleanup_io_uring(&ring1);
+ cleanup_io_uring(&ring2);
+ return 0;
+fail:
+ cleanup_io_uring(&ring1);
+ cleanup_io_uring(&ring2);
+ return 1;
+}
+
+static int test_cross_process(void)
+{
+ struct io_uring ring1, ring2;
+ unsigned int channel_id, local_id;
+ char recv_buf[256];
+ int ret;
+ pid_t pid;
+
+ ret = setup_io_uring(&ring1, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring1, IOIPC_F_BROADCAST, 16, 4096,
+ TEST_KEY, &channel_id);
+ if (ret < 0) {
+ cleanup_io_uring(&ring1);
+ return 1;
+ }
+
+ pid = fork();
+ if (pid < 0) {
+ cleanup_io_uring(&ring1);
+ return 1;
+ }
+
+ if (pid == 0) {
+ cleanup_io_uring(&ring1);
+
+ ret = setup_io_uring(&ring2, QUEUE_DEPTH);
+ if (ret < 0)
+ exit(1);
+
+ usleep(100000);
+
+ ret = attach_channel(&ring2, TEST_KEY, IOIPC_SUB_BOTH,
+ &local_id);
+ if (ret < 0)
+ exit(1);
+
+ usleep(250000);
+
+ memset(recv_buf, 0, sizeof(recv_buf));
+ ret = do_recv(&ring2, local_id, recv_buf, sizeof(recv_buf));
+ if (ret < 0)
+ exit(1);
+
+ if (strcmp(recv_buf, TEST_MSG) != 0)
+ exit(1);
+
+ cleanup_io_uring(&ring2);
+ exit(0);
+ }
+
+ /* Parent process - producer */
+ usleep(200000);
+
+ ret = do_send(&ring1, channel_id, TEST_MSG, strlen(TEST_MSG) + 1);
+ if (ret < 0) {
+ waitpid(pid, NULL, 0);
+ cleanup_io_uring(&ring1);
+ return 1;
+ }
+
+ int status;
+
+ waitpid(pid, &status, 0);
+ cleanup_io_uring(&ring1);
+
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0)
+ return 1;
+
+ return 0;
+}
+
+static int test_multicast_roundrobin(void)
+{
+ struct io_uring ring1, ring2;
+ unsigned int channel_id, sub1_id, sub2_id;
+ char buf1[256], buf2[256];
+ int ret;
+
+ ret = setup_io_uring(&ring1, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = setup_io_uring(&ring2, QUEUE_DEPTH);
+ if (ret < 0) {
+ cleanup_io_uring(&ring1);
+ return 1;
+ }
+
+ ret = create_channel(&ring1, IOIPC_F_MULTICAST, 16, 4096,
+ TEST_KEY + 1300, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring1, TEST_KEY + 1300, IOIPC_SUB_BOTH, &sub1_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring2, TEST_KEY + 1300, IOIPC_SUB_BOTH, &sub2_id);
+ if (ret < 0)
+ goto fail;
+
+ /*
+ * Send two messages. With multicast round-robin waking, different
+ * subscribers get woken for each message. Both use the shared
+ * consumer head, so both can recv any available message.
+ */
+ ret = do_send(&ring1, sub1_id, "msg-0", 6);
+ if (ret < 0)
+ goto fail;
+
+ ret = do_send(&ring1, sub1_id, "msg-1", 6);
+ if (ret < 0)
+ goto fail;
+
+ /* Both subscribers should be able to recv one message each */
+ memset(buf1, 0, sizeof(buf1));
+ ret = do_recv(&ring1, sub1_id, buf1, sizeof(buf1));
+ if (ret < 0)
+ goto fail;
+
+ memset(buf2, 0, sizeof(buf2));
+ ret = do_recv(&ring2, sub2_id, buf2, sizeof(buf2));
+ if (ret < 0)
+ goto fail;
+
+ /* Verify we got both messages (order may vary) */
+ if (strcmp(buf1, "msg-0") != 0 && strcmp(buf1, "msg-1") != 0)
+ goto fail;
+ if (strcmp(buf2, "msg-0") != 0 && strcmp(buf2, "msg-1") != 0)
+ goto fail;
+ /* They must be different messages */
+ if (strcmp(buf1, buf2) == 0)
+ goto fail;
+
+ cleanup_io_uring(&ring1);
+ cleanup_io_uring(&ring2);
+ return 0;
+fail:
+ cleanup_io_uring(&ring1);
+ cleanup_io_uring(&ring2);
+ return 1;
+}
+
+static int test_channel_destroy(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 4096, TEST_KEY + 1400, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 1400, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Destroy the channel (drops creator's reference) */
+ ret = destroy_channel(&ring, channel_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Double destroy should fail (channel refcount already dropped) */
+ ret = destroy_channel(&ring, channel_id);
+ /*
+ * May succeed if subscriber still holds a ref, or fail with
+ * ENOENT if the channel was already freed. Either way the
+ * first destroy must have succeeded.
+ */
+
+ /* Detach the subscriber */
+ ret = detach_channel(&ring, sub_id);
+ if (ret < 0)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+static int test_unicast_targeted(void)
+{
+ struct io_uring ring1, ring2;
+ unsigned int channel_id, sub1_id, sub2_id;
+ char buf1[256], buf2[256];
+ struct io_uring_sqe *sqe;
+ struct io_uring_cqe *cqe;
+ int ret;
+
+ ret = setup_io_uring(&ring1, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = setup_io_uring(&ring2, QUEUE_DEPTH);
+ if (ret < 0) {
+ cleanup_io_uring(&ring1);
+ return 1;
+ }
+
+ /* Create a unicast channel (no flags) */
+ ret = create_channel(&ring1, 0, 16, 4096, TEST_KEY + 1500, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Attach sender+receiver on ring1, receiver-only on ring2 */
+ ret = attach_channel(&ring1, TEST_KEY + 1500, IOIPC_SUB_BOTH, &sub1_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring2, TEST_KEY + 1500, IOIPC_SUB_RECV, &sub2_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Send targeting subscriber 2 specifically */
+ ret = do_send_targeted(&ring1, sub1_id, TEST_MSG, strlen(TEST_MSG) + 1,
+ sub2_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Receiver 2 should get the message */
+ memset(buf2, 0, sizeof(buf2));
+ ret = do_recv(&ring2, sub2_id, buf2, sizeof(buf2));
+ if (ret < 0)
+ goto fail;
+
+ if (strcmp(buf2, TEST_MSG) != 0)
+ goto fail;
+
+ cleanup_io_uring(&ring1);
+ cleanup_io_uring(&ring2);
+ return 0;
+fail:
+ cleanup_io_uring(&ring1);
+ cleanup_io_uring(&ring2);
+ return 1;
+}
+
+static int test_unicast_targeted_invalid(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id, sub_id;
+ int ret;
+
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0)
+ return 1;
+
+ ret = create_channel(&ring, 0, 16, 4096, TEST_KEY + 1600, &channel_id);
+ if (ret < 0)
+ goto fail;
+
+ ret = attach_channel(&ring, TEST_KEY + 1600, IOIPC_SUB_BOTH, &sub_id);
+ if (ret < 0)
+ goto fail;
+
+ /* Send targeting a non-existent subscriber ID */
+ ret = do_send_targeted(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1,
+ 9999);
+ if (ret != -ENOENT)
+ goto fail;
+
+ cleanup_io_uring(&ring);
+ return 0;
+fail:
+ cleanup_io_uring(&ring);
+ return 1;
+}
+
+struct test_case {
+ const char *name;
+ int (*func)(void);
+};
+
+static struct test_case tests[] = {
+ { "Non-broadcast send/recv", test_nonbroadcast },
+ { "Broadcast multi-receiver", test_broadcast_multi },
+ { "Channel detach", test_detach },
+ { "Recv-only cannot send", test_recv_only_cannot_send },
+ { "Send-only cannot recv", test_send_only_cannot_recv },
+ { "Ring full", test_ring_full },
+ { "Message too large", test_msg_too_large },
+ { "Multiple messages", test_multiple_messages },
+ { "Invalid parameters", test_invalid_params },
+ { "Attach by channel ID", test_attach_by_id },
+ { "Recv truncation", test_recv_truncation },
+ { "Broadcast slot reuse", test_broadcast_slot_reuse },
+ { "Cross-process send/recv", test_cross_process },
+ { "Multicast round-robin", test_multicast_roundrobin },
+ { "Channel destroy", test_channel_destroy },
+ { "Unicast targeted delivery", test_unicast_targeted },
+ { "Unicast targeted invalid", test_unicast_targeted_invalid },
+};
+
+int main(void)
+{
+ struct io_uring ring;
+ unsigned int channel_id;
+ int i, passed = 0, failed = 0;
+ int total = sizeof(tests) / sizeof(tests[0]);
+ int ret;
+
+ printf("=== io_uring IPC Selftest ===\n\n");
+
+ /* Check if IPC is supported before running any tests */
+ ret = setup_io_uring(&ring, QUEUE_DEPTH);
+ if (ret < 0) {
+ fprintf(stderr, "Failed to setup io_uring\n");
+ return 1;
+ }
+
+ ret = create_channel(&ring, 0, 16, 4096, 0xDEAD0000ULL, &channel_id);
+ cleanup_io_uring(&ring);
+ if (ret == -EINVAL || ret == -ENOSYS) {
+ printf("SKIP: IO_URING_IPC not supported by kernel\n");
+ return KSFT_SKIP;
+ }
+
+ for (i = 0; i < total; i++) {
+ printf(" [%2d/%d] %-30s ", i + 1, total, tests[i].name);
+ fflush(stdout);
+ ret = tests[i].func();
+ if (ret == 0) {
+ printf("PASS\n");
+ passed++;
+ } else {
+ printf("FAIL\n");
+ failed++;
+ }
+ }
+
+ printf("\n=== Results: %d passed, %d failed (of %d) ===\n",
+ passed, failed, total);
+
+ return failed ? 1 : 0;
+}
--
2.52.0
^ permalink raw reply related [flat|nested] 6+ messages in thread