public inbox for io-uring@vger.kernel.org
 help / color / mirror / Atom feed
From: Daniel Hodges <git@danielhodges.dev>
To: Jens Axboe <axboe@kernel.dk>
Cc: Daniel Hodges <git@danielhodges.dev>,
	Pavel Begunkov <asml.silence@gmail.com>,
	io-uring@vger.kernel.org, linux-kernel@vger.kernel.org
Subject: [RFC PATCH 1/2] io_uring: add high-performance IPC channel infrastructure
Date: Fri, 13 Mar 2026 09:07:38 -0400	[thread overview]
Message-ID: <20260313130739.23265-2-git@danielhodges.dev> (raw)
In-Reply-To: <20260313130739.23265-1-git@danielhodges.dev>

Add a new IPC mechanism built on top of io_uring for message passing
between processes.

Features:
- Shared memory ring buffer for message data transfer
- RCU-based subscriber lookup on send/recv paths
- Broadcast mode (all subscribers receive each message)
- Channel-based design supporting multiple subscribers
- Permission checking modeled on Unix file permissions
- Anonymous file per channel for mmap support

Architecture:
- Two new io_uring opcodes: IPC_SEND and IPC_RECV
- Four new registration commands for channel lifecycle:
  CREATE, ATTACH, DETACH, and BUFFERS (BUFFERS is a stub)
- Channels are identified by numeric ID or 64-bit key
- Data is copied between userspace and a kernel ring buffer
  via copy_from_user/copy_to_user
- Multicast (round-robin) flag is defined but not yet implemented

Send/recv hot path optimizations:
- Cache the descriptor array base pointer in struct io_ipc_channel
  to avoid recomputing it on every send/recv
- Use __copy_from_user_inatomic/__copy_to_user_inatomic with fallback
  to the standard copy variants
- Add prefetch/prefetchw hints for descriptor and data access
- In non-broadcast mode, wake only the first receiver instead of
  iterating all subscribers
- Simplify error handling in io_ipc_send() using inline returns
- Remove channel pointer caching in request structs, emptying the
  send and recv cleanup handlers

Signed-off-by: Daniel Hodges <git@danielhodges.dev>
---
 include/linux/io_uring_types.h |    7 +
 include/uapi/linux/io_uring.h  |   74 +++
 io_uring/Kconfig               |   14 +
 io_uring/Makefile              |    1 +
 io_uring/io_uring.c            |    6 +
 io_uring/ipc.c                 | 1002 ++++++++++++++++++++++++++++++++
 io_uring/ipc.h                 |  161 +++++
 io_uring/opdef.c               |   19 +
 io_uring/register.c            |   25 +
 9 files changed, 1309 insertions(+)
 create mode 100644 io_uring/ipc.c
 create mode 100644 io_uring/ipc.h

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index dd1420bfcb73..fbbe51aaca68 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -435,6 +435,13 @@ struct io_ring_ctx {
 	u32			pers_next;
 	struct xarray		personalities;
 
+#ifdef CONFIG_IO_URING_IPC
+	/* IPC subscriber tracking - keyed by subscriber_id */
+	struct xarray		ipc_subscribers;
+	/* IPC channels created by this ring - keyed by channel_id */
+	struct xarray		ipc_channels;
+#endif
+
 	/* hashed buffered write serialization */
 	struct io_wq_hash		*hash_map;
 
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 17ac1b785440..a5b68bd1a047 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -318,6 +318,8 @@ enum io_uring_op {
 	IORING_OP_PIPE,
 	IORING_OP_NOP128,
 	IORING_OP_URING_CMD128,
+	IORING_OP_IPC_SEND,
+	IORING_OP_IPC_RECV,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
@@ -723,6 +725,12 @@ enum io_uring_register_op {
 	/* register bpf filtering programs */
 	IORING_REGISTER_BPF_FILTER		= 37,
 
+	/* IPC channel operations */
+	IORING_REGISTER_IPC_CHANNEL_CREATE	= 38,
+	IORING_REGISTER_IPC_CHANNEL_ATTACH	= 39,
+	IORING_REGISTER_IPC_CHANNEL_DETACH	= 40,
+	IORING_REGISTER_IPC_CHANNEL_DESTROY	= 41,
+
 	/* this goes last */
 	IORING_REGISTER_LAST,
 
@@ -1057,6 +1065,72 @@ struct io_timespec {
 	__u64		tv_nsec;
 };
 
+/*
+ * IPC channel support
+ */
+
+/* Flags for IPC channel creation */
+#define IOIPC_F_BROADCAST	(1U << 0)  /* Broadcast mode (all subscribers receive) */
+#define IOIPC_F_MULTICAST	(1U << 1)  /* Multicast mode (round-robin delivery) */
+#define IOIPC_F_PRIVATE		(1U << 2)  /* Private (permissions enforced strictly) */
+
+/* Flags for subscriber attachment */
+#define IOIPC_SUB_SEND		(1U << 0)  /* Can send to channel */
+#define IOIPC_SUB_RECV		(1U << 1)  /* Can receive from channel */
+#define IOIPC_SUB_BOTH		(IOIPC_SUB_SEND | IOIPC_SUB_RECV)
+
+/* Create IPC channel */
+struct io_uring_ipc_channel_create {
+	__u32	flags;			/* IOIPC_F_BROADCAST, IOIPC_F_PRIVATE */
+	__u32	ring_entries;		/* Number of message slots */
+	__u32	max_msg_size;		/* Maximum message size */
+	__u32	mode;			/* Permission bits (like chmod) */
+	__u64	key;			/* Unique key for channel (like ftok()) */
+	__u32	channel_id_out;		/* Returned channel ID */
+	__u32	reserved[3];
+};
+
+/* Attach to existing channel */
+struct io_uring_ipc_channel_attach {
+	__u32	channel_id;		/* Attach by channel ID (when key == 0) */
+	__u32	flags;			/* IOIPC_SUB_SEND, IOIPC_SUB_RECV, IOIPC_SUB_BOTH */
+	__u64	key;			/* Non-zero: attach by key instead of channel_id */
+	__s32	channel_fd;		/* Output: fd for mmap */
+	__u32	local_id_out;		/* Output: local subscriber ID */
+	__u64	mmap_offset_out;	/* Output: offset for mmap() */
+	__u32	region_size;		/* Output: size of shared region */
+	__u32	reserved[3];
+};
+
+/* Message descriptor in the ring */
+struct io_uring_ipc_msg_desc {
+	__u64	offset;			/* Offset in data region for message payload */
+	__u32	len;			/* Message length */
+	__u32	msg_id;			/* Unique message ID */
+	__u64	sender_data;		/* Sender's user_data for context */
+};
+
+/* Shared ring structure (mmap'd to userspace) */
+struct io_uring_ipc_ring {
+	/* Cache-aligned producer/consumer positions */
+	struct {
+		__u32	head __attribute__((aligned(64)));
+		__u32	tail;
+	} producer;
+
+	struct {
+		__u32	head __attribute__((aligned(64)));
+	} consumer;
+
+	/* Ring parameters */
+	__u32	ring_mask;
+	__u32	ring_entries;
+	__u32	max_msg_size;
+
+	/* Array of message descriptors follows */
+	struct io_uring_ipc_msg_desc msgs[];
+};
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/io_uring/Kconfig b/io_uring/Kconfig
index a7ae23cf1035..d7d7f858b46c 100644
--- a/io_uring/Kconfig
+++ b/io_uring/Kconfig
@@ -14,3 +14,17 @@ config IO_URING_BPF
 	def_bool y
 	depends on BPF
 	depends on NET
+
+config IO_URING_IPC
+	bool "io_uring IPC support"
+	depends on IO_URING
+	default y
+	help
+	  Enable io_uring-based inter-process communication.
+
+	  This provides high-performance IPC channels that leverage
+	  io_uring's shared memory infrastructure and async notification
+	  model. Supports broadcast and multicast patterns with
+	  zero-copy capabilities.
+
+	  If unsure, say Y.
diff --git a/io_uring/Makefile b/io_uring/Makefile
index 931f9156132a..380fe8a19174 100644
--- a/io_uring/Makefile
+++ b/io_uring/Makefile
@@ -17,6 +17,7 @@ obj-$(CONFIG_IO_URING)		+= io_uring.o opdef.o kbuf.o rsrc.o notif.o \
 					query.o
 
 obj-$(CONFIG_IO_URING_ZCRX)	+= zcrx.o
+obj-$(CONFIG_IO_URING_IPC)	+= ipc.o
 obj-$(CONFIG_IO_WQ)		+= io-wq.o
 obj-$(CONFIG_FUTEX)		+= futex.o
 obj-$(CONFIG_EPOLL)		+= epoll.o
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 9a37035e76c0..6dccc005aaca 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -87,6 +87,7 @@
 #include "msg_ring.h"
 #include "memmap.h"
 #include "zcrx.h"
+#include "ipc.h"
 
 #include "timeout.h"
 #include "poll.h"
@@ -251,6 +252,10 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	init_waitqueue_head(&ctx->sqo_sq_wait);
 	INIT_LIST_HEAD(&ctx->sqd_list);
 	INIT_LIST_HEAD(&ctx->cq_overflow_list);
+#ifdef CONFIG_IO_URING_IPC
+	xa_init(&ctx->ipc_subscribers);
+	xa_init(&ctx->ipc_channels);
+#endif
 	ret = io_alloc_cache_init(&ctx->apoll_cache, IO_POLL_ALLOC_CACHE_MAX,
 			    sizeof(struct async_poll), 0);
 	ret |= io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
@@ -2154,6 +2159,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_sqe_buffers_unregister(ctx);
 	io_sqe_files_unregister(ctx);
 	io_unregister_zcrx_ifqs(ctx);
+	io_ipc_ctx_cleanup(ctx);
 	io_cqring_overflow_kill(ctx);
 	io_eventfd_unregister(ctx);
 	io_free_alloc_caches(ctx);
diff --git a/io_uring/ipc.c b/io_uring/ipc.c
new file mode 100644
index 000000000000..3f4daf75c843
--- /dev/null
+++ b/io_uring/ipc.c
@@ -0,0 +1,1002 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * io_uring IPC channel implementation
+ *
+ * High-performance inter-process communication using io_uring infrastructure.
+ * Provides broadcast and multicast channels with zero-copy support.
+ */
+#include <linux/kernel.h>
+#include <linux/errno.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/mm.h>
+#include <linux/slab.h>
+#include <linux/uaccess.h>
+#include <linux/io_uring.h>
+#include <linux/io_uring/cmd.h>
+#include <linux/hashtable.h>
+#include <linux/anon_inodes.h>
+#include <linux/mman.h>
+#include <linux/vmalloc.h>
+
+#include <uapi/linux/io_uring.h>
+
+#include "io_uring.h"
+#include "rsrc.h"
+#include "memmap.h"
+#include "ipc.h"
+
+#ifdef CONFIG_IO_URING_IPC
+
+/*
+ * Global channel registry
+ * Protected by RCU and spinlock
+ */
+static DEFINE_HASHTABLE(channel_hash, 8);
+static DEFINE_SPINLOCK(channel_hash_lock);
+static DEFINE_XARRAY_ALLOC(channel_xa);
+static atomic_t ipc_global_sub_id = ATOMIC_INIT(0);
+
+static int ipc_channel_mmap(struct file *file, struct vm_area_struct *vma)
+{
+	struct io_ipc_channel *channel;
+	size_t region_size;
+	unsigned long uaddr = vma->vm_start;
+	unsigned long size = vma->vm_end - vma->vm_start;
+	void *kaddr;
+	unsigned long pfn;
+	int ret;
+
+	/*
+	 * Safely acquire a channel reference.  A concurrent
+	 * io_ipc_channel_destroy() can NULL private_data and drop
+	 * all references; the channel is freed via call_rcu(), so
+	 * it stays accessible under rcu_read_lock().
+	 */
+	rcu_read_lock();
+	channel = READ_ONCE(file->private_data);
+	if (!channel || !refcount_inc_not_zero(&channel->ref_count)) {
+		rcu_read_unlock();
+		return -EINVAL;
+	}
+	rcu_read_unlock();
+
+	region_size = io_region_size(channel->region);
+	kaddr = channel->region->ptr;
+
+	/* Validate mmap parameters */
+	if (vma->vm_pgoff != 0) {
+		ret = -EINVAL;
+		goto out_put;
+	}
+
+	if (size > region_size) {
+		ret = -EINVAL;
+		goto out_put;
+	}
+
+	if (vma->vm_flags & VM_WRITE) {
+		ret = -EACCES;
+		goto out_put;
+	}
+
+	/* Prevent mprotect from later adding write permission */
+	vm_flags_clear(vma, VM_MAYWRITE);
+
+	/* Map the vmalloc'd region page by page */
+	vm_flags_set(vma, VM_DONTEXPAND | VM_DONTDUMP);
+
+	while (size > 0) {
+		pfn = vmalloc_to_pfn(kaddr);
+		ret = remap_pfn_range(vma, uaddr, pfn, PAGE_SIZE, vma->vm_page_prot);
+		if (ret)
+			goto out_put;
+
+		uaddr += PAGE_SIZE;
+		kaddr += PAGE_SIZE;
+		size -= PAGE_SIZE;
+	}
+
+	ret = 0;
+out_put:
+	io_ipc_channel_put(channel);
+	return ret;
+}
+
+static int ipc_channel_release(struct inode *inode, struct file *file)
+{
+	struct io_ipc_channel *channel = file->private_data;
+
+	if (channel)
+		io_ipc_channel_put(channel);
+
+	return 0;
+}
+
+static const struct file_operations ipc_channel_fops = {
+	.mmap		= ipc_channel_mmap,
+	.release	= ipc_channel_release,
+	.llseek		= noop_llseek,
+};
+
+
+static int ipc_calc_region_size(u32 ring_entries, u32 max_msg_size,
+				size_t *ring_size_out, size_t *data_size_out,
+				size_t *total_size_out)
+{
+	size_t ring_size, data_size, total_size;
+
+	ring_size = sizeof(struct io_ipc_ring) +
+		    ring_entries * sizeof(struct io_ipc_msg_desc);
+	ring_size = ALIGN(ring_size, PAGE_SIZE);
+
+	if ((u64)ring_entries * max_msg_size > INT_MAX)
+		return -EINVAL;
+
+	data_size = (size_t)ring_entries * max_msg_size;
+	data_size = ALIGN(data_size, PAGE_SIZE);
+
+	total_size = ring_size + data_size;
+
+	if (total_size > INT_MAX)
+		return -EINVAL;
+
+	*ring_size_out = ring_size;
+	*data_size_out = data_size;
+	*total_size_out = total_size;
+
+	return 0;
+}
+
+static int ipc_region_alloc(struct io_ipc_channel *channel, u32 ring_entries,
+			    u32 max_msg_size)
+{
+	struct io_mapped_region *region;
+	size_t ring_size, data_size, total_size;
+	void *ptr;
+	int ret;
+
+	ret = ipc_calc_region_size(ring_entries, max_msg_size, &ring_size,
+				   &data_size, &total_size);
+	if (ret)
+		return ret;
+
+	region = kzalloc(sizeof(*region), GFP_KERNEL);
+	if (!region)
+		return -ENOMEM;
+
+	ptr = vmalloc_user(total_size);
+	if (!ptr) {
+		kfree(region);
+		return -ENOMEM;
+	}
+
+	region->ptr = ptr;
+	region->nr_pages = (total_size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+	region->flags = 0;
+
+	channel->region = region;
+	channel->ring = (struct io_ipc_ring *)ptr;
+	channel->desc_array = (struct io_ipc_msg_desc *)((u8 *)ptr + sizeof(struct io_ipc_ring));
+	channel->data_region = ptr + ring_size;
+
+	channel->ring->ring_mask = ring_entries - 1;
+	channel->ring->ring_entries = ring_entries;
+	channel->ring->max_msg_size = max_msg_size;
+
+	return 0;
+}
+
+static void ipc_region_free(struct io_ipc_channel *channel)
+{
+	if (!channel->region)
+		return;
+
+	if (channel->region->ptr)
+		vfree(channel->region->ptr);
+	kfree(channel->region);
+	channel->region = NULL;
+	channel->ring = NULL;
+	channel->data_region = NULL;
+}
+
+static void io_ipc_channel_free(struct io_ipc_channel *channel)
+{
+	struct io_ipc_subscriber *sub;
+	unsigned long index;
+
+	xa_for_each(&channel->subscribers, index, sub)
+		kfree(sub);
+	xa_destroy(&channel->subscribers);
+
+	if (channel->file) {
+		/*
+		 * Prevent the deferred fput release callback from
+		 * accessing the channel after it has been freed.
+		 */
+		WRITE_ONCE(channel->file->private_data, NULL);
+		fput(channel->file);
+	}
+
+	ipc_region_free(channel);
+
+	kfree(channel);
+}
+
+static void io_ipc_channel_free_rcu(struct rcu_head *rcu)
+{
+	struct io_ipc_channel *channel;
+
+	channel = container_of(rcu, struct io_ipc_channel, rcu);
+	io_ipc_channel_free(channel);
+}
+
+void io_ipc_channel_put(struct io_ipc_channel *channel)
+{
+	if (refcount_dec_and_test(&channel->ref_count)) {
+		/*
+		 * Remove from global data structures before scheduling
+		 * the RCU callback.  Lookups use refcount_inc_not_zero()
+		 * so no new references can appear.  Removing here avoids
+		 * taking xa_lock (a plain spinlock) inside the RCU
+		 * callback which runs in softirq context — that would
+		 * deadlock against xa_alloc() in process context.
+		 */
+		spin_lock_bh(&channel_hash_lock);
+		if (!hlist_unhashed(&channel->hash_node))
+			hash_del_rcu(&channel->hash_node);
+		spin_unlock_bh(&channel_hash_lock);
+
+		xa_erase(&channel_xa, channel->channel_id);
+
+		call_rcu(&channel->rcu, io_ipc_channel_free_rcu);
+	}
+}
+
+struct io_ipc_channel *io_ipc_channel_get(u32 channel_id)
+{
+	struct io_ipc_channel *channel;
+
+	rcu_read_lock();
+	channel = xa_load(&channel_xa, channel_id);
+	if (channel && !refcount_inc_not_zero(&channel->ref_count))
+		channel = NULL;
+	rcu_read_unlock();
+
+	return channel;
+}
+
+struct io_ipc_channel *io_ipc_channel_get_by_key(u64 key)
+{
+	struct io_ipc_channel *channel;
+	u32 hash = hash_64(key, HASH_BITS(channel_hash));
+
+	rcu_read_lock();
+	hash_for_each_possible_rcu(channel_hash, channel, hash_node, hash) {
+		if (channel->key == key &&
+		    refcount_inc_not_zero(&channel->ref_count)) {
+			rcu_read_unlock();
+			return channel;
+		}
+	}
+	rcu_read_unlock();
+
+	return NULL;
+}
+
+static int ipc_check_permission(struct io_ipc_channel *channel, u32 access)
+{
+	const struct cred *cred = current_cred();
+	kuid_t uid = cred->fsuid;
+	kgid_t gid = cred->fsgid;
+	u16 mode = channel->mode;
+	u16 needed = 0;
+
+	/* Map IPC access flags to Unix permission bits: send=write, recv=read */
+	if (access & IOIPC_SUB_RECV)
+		needed |= 4;	/* read */
+	if (access & IOIPC_SUB_SEND)
+		needed |= 2;	/* write */
+
+	if (uid_eq(uid, channel->owner_uid))
+		return ((mode >> 6) & needed) == needed ? 0 : -EACCES;
+
+	if (gid_eq(gid, channel->owner_gid))
+		return ((mode >> 3) & needed) == needed ? 0 : -EACCES;
+
+	return (mode & needed) == needed ? 0 : -EACCES;
+}
+
+int io_ipc_channel_create(struct io_ring_ctx *ctx,
+			   const struct io_uring_ipc_channel_create __user *arg)
+{
+	struct io_uring_ipc_channel_create create;
+	struct io_ipc_channel *channel;
+	u32 hash;
+	int ret;
+
+	if (copy_from_user(&create, arg, sizeof(create)))
+		return -EFAULT;
+
+	if (!mem_is_zero(create.reserved, sizeof(create.reserved)))
+		return -EINVAL;
+
+	if (!create.ring_entries || create.ring_entries > IORING_MAX_ENTRIES)
+		return -EINVAL;
+
+	if (!is_power_of_2(create.ring_entries))
+		return -EINVAL;
+
+	if (!create.max_msg_size || create.max_msg_size > SZ_1M)
+		return -EINVAL;
+
+	if (create.flags & ~(IOIPC_F_BROADCAST | IOIPC_F_MULTICAST |
+			     IOIPC_F_PRIVATE))
+		return -EINVAL;
+
+	if ((create.flags & IOIPC_F_BROADCAST) &&
+	    (create.flags & IOIPC_F_MULTICAST))
+		return -EINVAL;
+
+	channel = kzalloc(sizeof(*channel), GFP_KERNEL);
+	if (!channel)
+		return -ENOMEM;
+
+	refcount_set(&channel->ref_count, 1);
+	channel->flags = create.flags;
+	channel->key = create.key;
+	channel->msg_max_size = create.max_msg_size;
+	channel->owner_uid = current_fsuid();
+	channel->owner_gid = current_fsgid();
+	channel->mode = create.mode & 0666;
+	atomic_set(&channel->next_msg_id, 1);
+	atomic_set(&channel->next_receiver_idx, 0);
+	atomic_set(&channel->recv_count, 0);
+	xa_init(&channel->subscribers);
+
+	ret = ipc_region_alloc(channel, create.ring_entries, create.max_msg_size);
+	if (ret)
+		goto err_free_channel;
+
+	refcount_inc(&channel->ref_count); /* File holds a reference */
+	channel->file = anon_inode_getfile("[io_uring_ipc]", &ipc_channel_fops,
+					   channel, O_RDWR);
+	if (IS_ERR(channel->file)) {
+		ret = PTR_ERR(channel->file);
+		channel->file = NULL;
+		refcount_dec(&channel->ref_count);
+		goto err_free_region;
+	}
+
+	ret = xa_alloc(&channel_xa, &channel->channel_id, channel,
+		       XA_LIMIT(1, INT_MAX), GFP_KERNEL);
+	if (ret < 0)
+		goto err_put_file;
+
+	hash = hash_64(create.key, HASH_BITS(channel_hash));
+	spin_lock_bh(&channel_hash_lock);
+	hash_add_rcu(channel_hash, &channel->hash_node, hash);
+	spin_unlock_bh(&channel_hash_lock);
+
+	/*
+	 * Track the channel in the creating ring so that
+	 * io_ipc_ctx_cleanup() can break the channel-file reference
+	 * cycle if the ring is torn down without an explicit destroy.
+	 */
+	ret = xa_insert(&ctx->ipc_channels, channel->channel_id,
+			xa_mk_value(0), GFP_KERNEL);
+	if (ret)
+		goto err_remove_channel;
+
+	create.channel_id_out = channel->channel_id;
+	if (copy_to_user((void __user *)arg, &create, sizeof(create))) {
+		ret = -EFAULT;
+		goto err_remove_ctx;
+	}
+
+	return 0;
+
+err_remove_ctx:
+	xa_erase(&ctx->ipc_channels, channel->channel_id);
+err_remove_channel:
+	spin_lock_bh(&channel_hash_lock);
+	hash_del_rcu(&channel->hash_node);
+	spin_unlock_bh(&channel_hash_lock);
+	xa_erase(&channel_xa, channel->channel_id);
+err_put_file:
+	/*
+	 * fput() is deferred — the release callback will call
+	 * io_ipc_channel_put() to drop the file's reference.
+	 * Drop the initial reference here; it can't reach zero
+	 * yet because the file still holds one.
+	 */
+	fput(channel->file);
+	channel->file = NULL;
+	io_ipc_channel_put(channel);
+	return ret;
+err_free_region:
+	ipc_region_free(channel);
+err_free_channel:
+	kfree(channel);
+	return ret;
+}
+
+int io_ipc_channel_attach(struct io_ring_ctx *ctx,
+			   const struct io_uring_ipc_channel_attach __user *arg)
+{
+	struct io_uring_ipc_channel_attach attach;
+	struct io_ipc_channel *channel = NULL;
+	struct io_ipc_subscriber *sub;
+	int ret;
+
+	if (copy_from_user(&attach, arg, sizeof(attach)))
+		return -EFAULT;
+
+	if (!mem_is_zero(attach.reserved, sizeof(attach.reserved)))
+		return -EINVAL;
+
+	if (!attach.flags || (attach.flags & ~IOIPC_SUB_BOTH))
+		return -EINVAL;
+
+	if (attach.key)
+		channel = io_ipc_channel_get_by_key(attach.key);
+	else
+		channel = io_ipc_channel_get(attach.channel_id);
+
+	if (!channel)
+		return -ENOENT;
+
+	ret = ipc_check_permission(channel, attach.flags);
+	if (ret)
+		goto err_put_channel;
+
+	sub = kzalloc(sizeof(*sub), GFP_KERNEL);
+	if (!sub) {
+		ret = -ENOMEM;
+		goto err_put_channel;
+	}
+
+	sub->ctx = ctx;
+	sub->channel = channel;
+	sub->flags = attach.flags;
+	sub->local_head = 0;
+	sub->subscriber_id = atomic_inc_return(&ipc_global_sub_id);
+
+	ret = xa_insert(&channel->subscribers, sub->subscriber_id, sub,
+			GFP_KERNEL);
+	if (ret) {
+		kfree(sub);
+		goto err_put_channel;
+	}
+
+	refcount_inc(&channel->ref_count);
+	if (attach.flags & IOIPC_SUB_RECV)
+		atomic_inc(&channel->recv_count);
+
+	ret = xa_insert(&ctx->ipc_subscribers, sub->subscriber_id, sub,
+			GFP_KERNEL);
+	if (ret)
+		goto err_remove_chan_sub;
+
+	ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
+	if (ret < 0)
+		goto err_remove_sub;
+
+	attach.local_id_out = sub->subscriber_id;
+	attach.region_size = io_region_size(channel->region);
+	attach.channel_fd = ret;
+	attach.mmap_offset_out = 0;
+
+	if (copy_to_user((void __user *)arg, &attach, sizeof(attach))) {
+		put_unused_fd(ret);
+		ret = -EFAULT;
+		goto err_remove_sub;
+	}
+
+	{
+		struct file *f = get_file_active(&channel->file);
+
+		if (!f) {
+			put_unused_fd(attach.channel_fd);
+			ret = -EINVAL;
+			goto err_remove_sub;
+		}
+		fd_install(attach.channel_fd, f);
+	}
+
+	io_ipc_channel_put(channel);
+	return 0;
+
+err_remove_sub:
+	xa_erase(&ctx->ipc_subscribers, sub->subscriber_id);
+err_remove_chan_sub:
+	xa_erase(&channel->subscribers, sub->subscriber_id);
+	if (attach.flags & IOIPC_SUB_RECV)
+		atomic_dec(&channel->recv_count);
+	kfree_rcu(sub, rcu);
+	io_ipc_channel_put(channel); /* Drop subscriber's reference */
+err_put_channel:
+	io_ipc_channel_put(channel); /* Drop lookup reference */
+	return ret;
+}
+
+int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 subscriber_id)
+{
+	struct io_ipc_subscriber *sub;
+	struct io_ipc_channel *channel;
+
+	sub = xa_erase(&ctx->ipc_subscribers, subscriber_id);
+	if (!sub)
+		return -ENOENT;
+
+	channel = sub->channel;
+	xa_erase(&channel->subscribers, subscriber_id);
+
+	if (sub->flags & IOIPC_SUB_RECV)
+		atomic_dec(&channel->recv_count);
+
+	io_ipc_channel_put(channel);
+	kfree_rcu(sub, rcu);
+	return 0;
+}
+
+/*
+ * Recompute consumer.head as the minimum local_head across all receive
+ * subscribers.  Called lazily from the broadcast recv path (every 16
+ * messages) and from the send path when the ring appears full.
+ */
+static void ipc_advance_consumer_head(struct io_ipc_channel *channel)
+{
+	struct io_ipc_subscriber *s;
+	struct io_ipc_ring *ring = channel->ring;
+	unsigned long index;
+	u32 min_head = READ_ONCE(ring->producer.tail);
+
+	rcu_read_lock();
+	xa_for_each(&channel->subscribers, index, s) {
+		if (s->flags & IOIPC_SUB_RECV) {
+			u32 sh = READ_ONCE(s->local_head);
+
+			if ((s32)(sh - min_head) < 0)
+				min_head = sh;
+		}
+	}
+	rcu_read_unlock();
+
+	WRITE_ONCE(ring->consumer.head, min_head);
+}
+
+static int ipc_wake_receivers(struct io_ipc_channel *channel, u32 target)
+{
+	struct io_ipc_subscriber *sub;
+	unsigned long index;
+
+	rcu_read_lock();
+	if (target) {
+		sub = xa_load(&channel->subscribers, target);
+		if (!sub || !(sub->flags & IOIPC_SUB_RECV)) {
+			rcu_read_unlock();
+			return -ENOENT;
+		}
+		io_cqring_wake(sub->ctx);
+		rcu_read_unlock();
+		return 0;
+	}
+
+	if (channel->flags & IOIPC_F_MULTICAST) {
+		u32 recv_cnt = atomic_read(&channel->recv_count);
+
+		if (recv_cnt) {
+			u32 rr = (u32)atomic_inc_return(&channel->next_receiver_idx);
+			u32 target_idx = rr % recv_cnt;
+			u32 i = 0;
+
+			xa_for_each(&channel->subscribers, index, sub) {
+				if (sub->flags & IOIPC_SUB_RECV) {
+					if (i == target_idx) {
+						io_cqring_wake(sub->ctx);
+						break;
+					}
+					i++;
+				}
+			}
+		}
+	} else {
+		xa_for_each(&channel->subscribers, index, sub) {
+			if (sub->flags & IOIPC_SUB_RECV) {
+				io_cqring_wake(sub->ctx);
+				if (!(channel->flags & IOIPC_F_BROADCAST))
+					break;
+			}
+		}
+	}
+	rcu_read_unlock();
+	return 0;
+}
+
+int io_ipc_send_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	struct io_ipc_send *ipc = io_kiocb_to_cmd(req, struct io_ipc_send);
+
+	if (sqe->buf_index || sqe->personality)
+		return -EINVAL;
+
+	if (sqe->ioprio || sqe->rw_flags)
+		return -EINVAL;
+
+	ipc->channel_id = READ_ONCE(sqe->fd);
+	ipc->addr = READ_ONCE(sqe->addr);
+	ipc->len = READ_ONCE(sqe->len);
+	ipc->target = READ_ONCE(sqe->file_index);
+
+	return 0;
+}
+
+int io_ipc_send(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_ipc_send *ipc = io_kiocb_to_cmd(req, struct io_ipc_send);
+	struct io_ipc_subscriber *sub = NULL;
+	struct io_ipc_channel *channel = NULL;
+	struct io_ipc_ring *ring;
+	struct io_ipc_msg_desc *desc;
+	void __user *user_buf;
+	void *dest;
+	u32 head, tail, next_tail, idx;
+	u32 sub_flags;
+	int ret;
+	u32 fd = ipc->channel_id;
+	bool need_put = false;
+	bool retried_advance = false;
+
+	/* O(1) subscriber lookup via xarray */
+	rcu_read_lock();
+	sub = xa_load(&req->ctx->ipc_subscribers, fd);
+	if (sub) {
+		channel = sub->channel;
+		if (!refcount_inc_not_zero(&channel->ref_count)) {
+			rcu_read_unlock();
+			ret = -ENOENT;
+			goto fail;
+		}
+		sub_flags = sub->flags;
+		rcu_read_unlock();
+		need_put = true;
+
+		if (!(sub_flags & IOIPC_SUB_SEND)) {
+			ret = -EACCES;
+			goto fail_put;
+		}
+		goto found;
+	}
+	rcu_read_unlock();
+
+	channel = io_ipc_channel_get(fd);
+	if (!channel) {
+		ret = -ENOENT;
+		goto fail;
+	}
+
+	need_put = true;
+
+	ret = ipc_check_permission(channel, IOIPC_SUB_SEND);
+	if (ret)
+		goto fail_put;
+
+found:
+	ring = channel->ring;
+
+	if (unlikely(ipc->len > channel->msg_max_size)) {
+		ret = -EMSGSIZE;
+		goto fail_put;
+	}
+
+	/* Lock-free slot reservation via CAS on producer tail */
+retry:
+	do {
+		tail = READ_ONCE(ring->producer.tail);
+		next_tail = tail + 1;
+		head = READ_ONCE(ring->consumer.head);
+
+		if (unlikely(next_tail - head > ring->ring_entries)) {
+			/*
+			 * Ring full.  For broadcast channels, try to
+			 * advance consumer.head once by scanning the
+			 * min local_head across all receivers.
+			 */
+			if ((channel->flags & IOIPC_F_BROADCAST) &&
+			    !retried_advance) {
+				ipc_advance_consumer_head(channel);
+				retried_advance = true;
+				goto retry;
+			}
+			ret = -ENOBUFS;
+			goto fail_put;
+		}
+		idx = tail & ring->ring_mask;
+	} while (cmpxchg(&ring->producer.tail, tail, next_tail) != tail);
+
+	/* Slot exclusively claimed — write data and descriptor */
+	dest = channel->data_region + (idx * channel->msg_max_size);
+	desc = &channel->desc_array[idx];
+
+	prefetchw(desc);
+
+	user_buf = u64_to_user_ptr(ipc->addr);
+	if (unlikely(__copy_from_user_inatomic(dest, user_buf, ipc->len))) {
+		if (unlikely(copy_from_user(dest, user_buf, ipc->len))) {
+			/*
+			 * Slot already claimed via CAS; mark it ready with
+			 * zero length so consumers can advance past it.
+			 */
+			desc->offset = idx * channel->msg_max_size;
+			desc->len = 0;
+			desc->msg_id = 0;
+			desc->sender_data = 0;
+			smp_wmb();
+			WRITE_ONCE(desc->seq, tail + 1);
+			ret = -EFAULT;
+			goto fail_put;
+		}
+	}
+
+	desc->offset = idx * channel->msg_max_size;
+	desc->len = ipc->len;
+	desc->msg_id = atomic_inc_return(&channel->next_msg_id);
+	desc->sender_data = req->cqe.user_data;
+
+	/* Ensure descriptor + data visible before marking slot ready */
+	smp_wmb();
+	WRITE_ONCE(desc->seq, tail + 1);
+
+	ret = ipc_wake_receivers(channel, ipc->target);
+	if (ret)
+		goto fail_put;
+
+	if (need_put)
+		io_ipc_channel_put(channel);
+
+	io_req_set_res(req, ipc->len, 0);
+	return IOU_COMPLETE;
+
+fail_put:
+	if (need_put)
+		io_ipc_channel_put(channel);
+fail:
+	req_set_fail(req);
+	io_req_set_res(req, ret, 0);
+	return IOU_COMPLETE;
+}
+
+
+int io_ipc_recv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	struct io_ipc_recv *ipc = io_kiocb_to_cmd(req, struct io_ipc_recv);
+
+	if (sqe->buf_index || sqe->personality)
+		return -EINVAL;
+
+	if (sqe->ioprio || sqe->rw_flags)
+		return -EINVAL;
+
+	ipc->channel_id = READ_ONCE(sqe->fd);
+	ipc->addr = READ_ONCE(sqe->addr);
+	ipc->len = READ_ONCE(sqe->len);
+
+	return 0;
+}
+
+int io_ipc_recv(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_ipc_recv *ipc = io_kiocb_to_cmd(req, struct io_ipc_recv);
+	struct io_ipc_subscriber *sub = NULL;
+	struct io_ipc_channel *channel;
+	struct io_ipc_ring *ring;
+	struct io_ipc_msg_desc *desc;
+	void __user *user_buf;
+	void *src;
+	u32 head, tail, idx;
+	u32 sub_flags, sub_id;
+	size_t copy_len;
+	int ret;
+
+	/* O(1) subscriber lookup via xarray */
+	rcu_read_lock();
+	sub = xa_load(&req->ctx->ipc_subscribers, ipc->channel_id);
+	if (sub) {
+		channel = sub->channel;
+		if (!refcount_inc_not_zero(&channel->ref_count)) {
+			rcu_read_unlock();
+			ret = -ENOENT;
+			goto fail;
+		}
+		sub_flags = sub->flags;
+		sub_id = sub->subscriber_id;
+		head = READ_ONCE(sub->local_head);
+		rcu_read_unlock();
+		goto found;
+	}
+	rcu_read_unlock();
+	ret = -ENOENT;
+	goto fail;
+
+found:
+	ring = channel->ring;
+
+	if (!(sub_flags & IOIPC_SUB_RECV)) {
+		ret = -EACCES;
+		goto fail_put;
+	}
+
+	/*
+	 * For multicast, competing consumers share consumer.head directly
+	 * instead of using per-subscriber local_head.
+	 */
+	if (channel->flags & IOIPC_F_MULTICAST)
+		head = READ_ONCE(ring->consumer.head);
+
+	/* Check if there are messages available - punt to io-wq for retry */
+	tail = READ_ONCE(ring->producer.tail);
+	if (unlikely(head == tail)) {
+		io_ipc_channel_put(channel);
+		return -EAGAIN;
+	}
+
+	idx = head & ring->ring_mask;
+	desc = &channel->desc_array[idx];
+
+	/*
+	 * Verify slot is fully written by the producer.  With lock-free
+	 * CAS-based send, tail advances before data is written; the
+	 * per-slot seq number signals completion.
+	 * Pairs with smp_wmb() + WRITE_ONCE(desc->seq) in send path.
+	 */
+	if (READ_ONCE(desc->seq) != head + 1) {
+		io_ipc_channel_put(channel);
+		return -EAGAIN;
+	}
+
+	smp_rmb();
+
+	src = channel->data_region + desc->offset;
+	prefetch(src);
+
+	copy_len = min_t(u32, desc->len, ipc->len);
+	user_buf = u64_to_user_ptr(ipc->addr);
+
+	if (unlikely(__copy_to_user_inatomic(user_buf, src, copy_len))) {
+		if (unlikely(copy_to_user(user_buf, src, copy_len))) {
+			ret = -EFAULT;
+			goto fail_put;
+		}
+	}
+
+	if (channel->flags & IOIPC_F_MULTICAST) {
+		/*
+		 * Multicast: atomically advance the shared consumer head.
+		 * Losers retry via -EAGAIN / io-wq.
+		 */
+		if (cmpxchg(&ring->consumer.head, head, head + 1) != head) {
+			io_ipc_channel_put(channel);
+			return -EAGAIN;
+		}
+	} else {
+		/*
+		 * Re-find subscriber under RCU for atomic local_head update.
+		 * O(1) xarray lookup; subscriber is stable while channel
+		 * ref is held and we're under RCU.
+		 */
+		rcu_read_lock();
+		sub = xa_load(&req->ctx->ipc_subscribers, sub_id);
+		if (!sub) {
+			rcu_read_unlock();
+			goto done;
+		}
+
+		if (cmpxchg(&sub->local_head, head, head + 1) != head) {
+			rcu_read_unlock();
+			io_ipc_channel_put(channel);
+			return -EAGAIN;
+		}
+
+		if (channel->flags & IOIPC_F_BROADCAST) {
+			/*
+			 * Lazy consumer.head advancement: only scan min-head
+			 * every 16 messages to amortize the O(N) walk.
+			 * The send path also triggers this on ring-full.
+			 */
+			if ((head & 0xf) == 0)
+				ipc_advance_consumer_head(channel);
+		} else {
+			/* Unicast: single consumer, update directly */
+			WRITE_ONCE(ring->consumer.head, head + 1);
+		}
+		rcu_read_unlock();
+	}
+
+done:
+	io_ipc_channel_put(channel);
+	io_req_set_res(req, copy_len, 0);
+	return IOU_COMPLETE;
+
+fail_put:
+	io_ipc_channel_put(channel);
+fail:
+	req_set_fail(req);
+	io_req_set_res(req, ret, 0);
+	return IOU_COMPLETE;
+}
+
+int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 channel_id)
+{
+	struct io_ipc_channel *channel;
+	struct file *f;
+
+	/*
+	 * Atomically remove from global lookup.  This prevents a second
+	 * destroy call from finding the channel and draining refcount
+	 * below the number of outstanding references.
+	 */
+	channel = xa_erase(&channel_xa, channel_id);
+	if (!channel)
+		return -ENOENT;
+
+	spin_lock_bh(&channel_hash_lock);
+	if (!hlist_unhashed(&channel->hash_node))
+		hash_del_rcu(&channel->hash_node);
+	spin_unlock_bh(&channel_hash_lock);
+
+	/*
+	 * Break the reference cycle between channel and file.
+	 * The channel holds a file reference (channel->file) and the
+	 * file's release callback holds a channel reference via
+	 * private_data.  Detach the file here so that the release
+	 * callback becomes a no-op and the cycle is broken.
+	 */
+	f = channel->file;
+	if (f) {
+		WRITE_ONCE(f->private_data, NULL);
+		channel->file = NULL;
+		fput(f);
+		io_ipc_channel_put(channel); /* Drop file's reference */
+	}
+
+	/* Drop the creator's initial reference */
+	io_ipc_channel_put(channel);
+
+	return 0;
+}
+
+void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx)
+{
+	struct io_ipc_subscriber *sub;
+	struct io_ipc_channel *channel;
+	unsigned long index;
+	void *entry;
+
+	xa_for_each(&ctx->ipc_subscribers, index, sub) {
+		channel = sub->channel;
+
+		xa_erase(&ctx->ipc_subscribers, index);
+		xa_erase(&channel->subscribers, sub->subscriber_id);
+
+		if (sub->flags & IOIPC_SUB_RECV)
+			atomic_dec(&channel->recv_count);
+
+		io_ipc_channel_put(channel);
+		kfree_rcu(sub, rcu);
+	}
+	xa_destroy(&ctx->ipc_subscribers);
+
+	/*
+	 * Destroy any channels created by this ring that were not
+	 * explicitly destroyed.  io_ipc_channel_destroy() is
+	 * idempotent — it returns -ENOENT if the channel was
+	 * already destroyed.
+	 */
+	xa_for_each(&ctx->ipc_channels, index, entry) {
+		io_ipc_channel_destroy(ctx, index);
+	}
+	xa_destroy(&ctx->ipc_channels);
+}
+
+#endif /* CONFIG_IO_URING_IPC */
diff --git a/io_uring/ipc.h b/io_uring/ipc.h
new file mode 100644
index 000000000000..65d143c3e520
--- /dev/null
+++ b/io_uring/ipc.h
@@ -0,0 +1,161 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef IO_URING_IPC_H
+#define IO_URING_IPC_H
+
+#include <linux/io_uring_types.h>
+
+#ifdef CONFIG_IO_URING_IPC
+
+/*
+ * Internal kernel structures for io_uring IPC
+ */
+
+/* Shared ring structure - lives in mmap'd memory region */
+struct io_ipc_ring {
+	/* Cache-aligned producer/consumer positions */
+	struct {
+		u32	head __aligned(64);
+		u32	tail;
+	} producer;
+
+	struct {
+		u32	head __aligned(64);
+	} consumer;
+
+	/* Ring parameters */
+	u32	ring_mask;
+	u32	ring_entries;
+	u32	max_msg_size;
+
+	/* Message descriptors follow inline */
+};
+
+/* Message descriptor in the ring */
+struct io_ipc_msg_desc {
+	u64	offset;			/* Offset in data region for message payload */
+	u32	len;			/* Message length */
+	u32	msg_id;			/* Unique message ID */
+	u64	sender_data;		/* Sender's user_data for context */
+	u32	seq;			/* Lock-free completion sequence number */
+};
+
+/* Per-subscriber attachment to a channel */
+struct io_ipc_subscriber {
+	u32	local_head __aligned(64);	/* Cache-aligned to prevent false sharing */
+	u32	subscriber_id;			/* Unique subscriber ID */
+	u32	flags;				/* IOIPC_SUB_SEND, IOIPC_SUB_RECV */
+
+	struct io_ring_ctx	*ctx;		/* io_uring context */
+	struct io_ipc_channel	*channel;	/* Channel this subscriber is attached to */
+	struct rcu_head rcu;			/* For kfree_rcu deferred freeing */
+};
+
+/* IPC channel connecting two or more io_uring instances */
+struct io_ipc_channel {
+	struct io_mapped_region	*region;	/* Shared memory region */
+	struct io_ipc_ring	*ring;		/* Shared ring structure in mmap'd region */
+	void			*data_region;	/* Data storage area for messages */
+	struct io_ipc_msg_desc	*desc_array;	/* Cached descriptor array base */
+	struct file		*file;		/* Anonymous file for mmap support */
+
+	/* Subscribers to this channel */
+	struct xarray		subscribers;	/* All subscribers */
+	atomic_t		recv_count;	/* Cached count of IOIPC_SUB_RECV subscribers */
+
+	/* Channel metadata */
+	refcount_t		ref_count;
+	u32			channel_id;
+	u32			flags;		/* IOIPC_F_BROADCAST, IOIPC_F_MULTICAST */
+	u64			key;		/* Unique key for lookup */
+
+	/* Ring buffer configuration */
+	u32			msg_max_size;
+
+	/* Access control */
+	kuid_t			owner_uid;
+	kgid_t			owner_gid;
+	u16			mode;		/* Permission bits */
+
+	/* Next message ID */
+	atomic_t		next_msg_id;
+
+	/* For multicast round-robin */
+	atomic_t		next_receiver_idx;
+
+	/* Channel lifecycle */
+	struct rcu_head		rcu;
+	struct hlist_node	hash_node;	/* For global channel hash table */
+};
+
+/* Request state for IPC operations */
+struct io_ipc_send {
+	struct file		*file;
+	u64			addr;
+	u32			channel_id;
+	size_t			len;
+	u32			target;
+};
+
+struct io_ipc_recv {
+	struct file		*file;
+	u64			addr;
+	u32			channel_id;
+	size_t			len;
+};
+
+/* Function declarations */
+
+/* Registration operations */
+int io_ipc_channel_create(struct io_ring_ctx *ctx,
+			   const struct io_uring_ipc_channel_create __user *arg);
+int io_ipc_channel_attach(struct io_ring_ctx *ctx,
+			   const struct io_uring_ipc_channel_attach __user *arg);
+int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 channel_id);
+
+/* Operation prep and execution */
+int io_ipc_send_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_ipc_send(struct io_kiocb *req, unsigned int issue_flags);
+
+int io_ipc_recv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_ipc_recv(struct io_kiocb *req, unsigned int issue_flags);
+
+/* Channel lifecycle */
+int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 channel_id);
+void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx);
+
+/* Channel lookup and management */
+struct io_ipc_channel *io_ipc_channel_get(u32 channel_id);
+struct io_ipc_channel *io_ipc_channel_get_by_key(u64 key);
+void io_ipc_channel_put(struct io_ipc_channel *channel);
+
+#else /* !CONFIG_IO_URING_IPC */
+
+static inline int io_ipc_channel_create(struct io_ring_ctx *ctx,
+			const struct io_uring_ipc_channel_create __user *arg)
+{
+	return -EOPNOTSUPP;
+}
+
+static inline int io_ipc_channel_attach(struct io_ring_ctx *ctx,
+			const struct io_uring_ipc_channel_attach __user *arg)
+{
+	return -EOPNOTSUPP;
+}
+
+static inline int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 channel_id)
+{
+	return -EOPNOTSUPP;
+}
+
+static inline int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 channel_id)
+{
+	return -EOPNOTSUPP;
+}
+
+static inline void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx)
+{
+}
+
+#endif /* CONFIG_IO_URING_IPC */
+
+#endif /* IO_URING_IPC_H */
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index 645980fa4651..658aa36efda2 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -38,6 +38,7 @@
 #include "futex.h"
 #include "truncate.h"
 #include "zcrx.h"
+#include "ipc.h"
 
 static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
 {
@@ -599,6 +600,18 @@ const struct io_issue_def io_issue_defs[] = {
 		.prep			= io_uring_cmd_prep,
 		.issue			= io_uring_cmd,
 	},
+	[IORING_OP_IPC_SEND] = {
+		.audit_skip		= 1,
+		.async_size		= sizeof(struct io_ipc_send),
+		.prep			= io_ipc_send_prep,
+		.issue			= io_ipc_send,
+	},
+	[IORING_OP_IPC_RECV] = {
+		.audit_skip		= 1,
+		.async_size		= sizeof(struct io_ipc_recv),
+		.prep			= io_ipc_recv_prep,
+		.issue			= io_ipc_recv,
+	},
 };
 
 const struct io_cold_def io_cold_defs[] = {
@@ -857,6 +870,12 @@ const struct io_cold_def io_cold_defs[] = {
 		.sqe_copy		= io_uring_cmd_sqe_copy,
 		.cleanup		= io_uring_cmd_cleanup,
 	},
+	[IORING_OP_IPC_SEND] = {
+		.name			= "IPC_SEND",
+	},
+	[IORING_OP_IPC_RECV] = {
+		.name			= "IPC_RECV",
+	},
 };
 
 const char *io_uring_get_opcode(u8 opcode)
diff --git a/io_uring/register.c b/io_uring/register.c
index 0148735f7711..7646dbb2d572 100644
--- a/io_uring/register.c
+++ b/io_uring/register.c
@@ -34,6 +34,7 @@
 #include "zcrx.h"
 #include "query.h"
 #include "bpf_filter.h"
+#include "ipc.h"
 
 #define IORING_MAX_RESTRICTIONS	(IORING_RESTRICTION_LAST + \
 				 IORING_REGISTER_LAST + IORING_OP_LAST)
@@ -930,6 +931,30 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			WRITE_ONCE(ctx->bpf_filters,
 				   ctx->restrictions.bpf_filters->filters);
 		break;
+	case IORING_REGISTER_IPC_CHANNEL_CREATE:
+		ret = -EINVAL;
+		if (!arg || nr_args != 1)
+			break;
+		ret = io_ipc_channel_create(ctx, arg);
+		break;
+	case IORING_REGISTER_IPC_CHANNEL_ATTACH:
+		ret = -EINVAL;
+		if (!arg || nr_args != 1)
+			break;
+		ret = io_ipc_channel_attach(ctx, arg);
+		break;
+	case IORING_REGISTER_IPC_CHANNEL_DETACH:
+		ret = -EINVAL;
+		if (arg || !nr_args)
+			break;
+		ret = io_ipc_channel_detach(ctx, nr_args);
+		break;
+	case IORING_REGISTER_IPC_CHANNEL_DESTROY:
+		ret = -EINVAL;
+		if (arg || !nr_args)
+			break;
+		ret = io_ipc_channel_destroy(ctx, nr_args);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
-- 
2.52.0


  reply	other threads:[~2026-03-13 13:09 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-03-13 13:07 [RFC PATCH 0/2] io_uring: add IPC channel infrastructure Daniel Hodges
2026-03-13 13:07 ` Daniel Hodges [this message]
2026-03-13 13:07 ` [RFC PATCH 2/2] selftests/ipc: Add io_uring IPC selftest Daniel Hodges
2026-03-14 13:50 ` [RFC PATCH 0/2] io_uring: add IPC channel infrastructure Daniel Hodges
2026-03-14 16:54   ` Jens Axboe
2026-03-14 17:09     ` Jens Axboe

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260313130739.23265-2-git@danielhodges.dev \
    --to=git@danielhodges.dev \
    --cc=asml.silence@gmail.com \
    --cc=axboe@kernel.dk \
    --cc=io-uring@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox