On Tue, Jul 11, 2023 at 06:47:01PM -0600, Jens Axboe wrote:
> Add support for FUTEX_WAKE/WAIT primitives.
>
> IORING_OP_FUTEX_WAKE is mix of FUTEX_WAKE and FUTEX_WAKE_BITSET, as
> it does support passing in a bitset.
>
> Similary, IORING_OP_FUTEX_WAIT is a mix of FUTEX_WAIT and
> FUTEX_WAIT_BITSET.
>
> FUTEX_WAKE is straight forward, as we can always just do those inline.
> FUTEX_WAIT will queue the futex with an appropriate callback, and
> that callback will in turn post a CQE when it has triggered.
>
> Cancelations are supported, both from the application point-of-view,
> but also to be able to cancel pending waits if the ring exits before
> all events have occurred.
>
> This is just the barebones wait/wake support. PI or REQUEUE support is
> not added at this point, unclear if we might look into that later.
>
> Likewise, explicit timeouts are not supported either. It is expected
> that users that need timeouts would do so via the usual io_uring
> mechanism to do that using linked timeouts.
>
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
I'm not sure I'm qualified to review this :/ I really don't know
anything about how io-uring works. And the above doesn't really begin to
explain things.
> +static void io_futex_wake_fn(struct wake_q_head *wake_q, struct futex_q *q)
> +{
> + struct io_futex_data *ifd = container_of(q, struct io_futex_data, q);
> + struct io_kiocb *req = ifd->req;
> +
> + __futex_unqueue(q);
> + smp_store_release(&q->lock_ptr, NULL);
> +
> + io_req_set_res(req, 0, 0);
> + req->io_task_work.func = io_futex_complete;
> + io_req_task_work_add(req);
> +}
I'm noting the WARN from futex_wake_mark() went walk-about.
Perhaps something like so?
diff --git a/kernel/futex/waitwake.c b/kernel/futex/waitwake.c
index ba01b9408203..07758d48d5db 100644
--- a/kernel/futex/waitwake.c
+++ b/kernel/futex/waitwake.c
@@ -106,20 +106,11 @@
* double_lock_hb() and double_unlock_hb(), respectively.
*/
-/*
- * The hash bucket lock must be held when this is called.
- * Afterwards, the futex_q must not be accessed. Callers
- * must ensure to later call wake_up_q() for the actual
- * wakeups to occur.
- */
-void futex_wake_mark(struct wake_q_head *wake_q, struct futex_q *q)
+bool __futex_wake_mark(struct futex_q *q)
{
- struct task_struct *p = q->task;
-
if (WARN(q->pi_state || q->rt_waiter, "refusing to wake PI futex\n"))
- return;
+ return false;
- get_task_struct(p);
__futex_unqueue(q);
/*
* The waiting task can free the futex_q as soon as q->lock_ptr = NULL
@@ -130,6 +121,26 @@ void futex_wake_mark(struct wake_q_head *wake_q, struct futex_q *q)
*/
smp_store_release(&q->lock_ptr, NULL);
+ return true;
+}
+
+/*
+ * The hash bucket lock must be held when this is called.
+ * Afterwards, the futex_q must not be accessed. Callers
+ * must ensure to later call wake_up_q() for the actual
+ * wakeups to occur.
+ */
+void futex_wake_mark(struct wake_q_head *wake_q, struct futex_q *q)
+{
+ struct task_struct *p = q->task;
+
+ get_task_struct(p);
+
+ if (!__futex_wake_mark(q)) {
+ put_task_struct(p);
+ return;
+ }
+
/*
* Queue the task for later wakeup for after we've released
* the hb->lock.
@@ -273,6 +273,9 @@ struct io_ring_ctx {
struct io_wq_work_list locked_free_list;
unsigned int locked_free_nr;
+ struct hlist_head futex_list;
+ struct io_alloc_cache futex_cache;
+
const struct cred *sq_creds; /* cred used for __io_sq_thread() */
struct io_sq_data *sq_data; /* if using sq thread polling */
@@ -65,6 +65,7 @@ struct io_uring_sqe {
__u32 xattr_flags;
__u32 msg_ring_flags;
__u32 uring_cmd_flags;
+ __u32 futex_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
@@ -235,6 +236,8 @@ enum io_uring_op {
IORING_OP_URING_CMD,
IORING_OP_SEND_ZC,
IORING_OP_SENDMSG_ZC,
+ IORING_OP_FUTEX_WAIT,
+ IORING_OP_FUTEX_WAKE,
/* this goes last, obviously */
IORING_OP_LAST,
@@ -7,5 +7,7 @@ obj-$(CONFIG_IO_URING) += io_uring.o xattr.o nop.o fs.o splice.o \
openclose.o uring_cmd.o epoll.o \
statx.o net.o msg_ring.o timeout.o \
sqpoll.o fdinfo.o tctx.o poll.o \
- cancel.o kbuf.o rsrc.o rw.o opdef.o notif.o
+ cancel.o kbuf.o rsrc.o rw.o opdef.o \
+ notif.o
obj-$(CONFIG_IO_WQ) += io-wq.o
+obj-$(CONFIG_FUTEX) += futex.o
@@ -15,6 +15,7 @@
#include "tctx.h"
#include "poll.h"
#include "timeout.h"
+#include "futex.h"
#include "cancel.h"
struct io_cancel {
@@ -119,6 +120,10 @@ int io_try_cancel(struct io_uring_task *tctx, struct io_cancel_data *cd,
if (ret != -ENOENT)
return ret;
+ ret = io_futex_cancel(ctx, cd, issue_flags);
+ if (ret != -ENOENT)
+ return ret;
+
spin_lock(&ctx->completion_lock);
if (!(cd->flags & IORING_ASYNC_CANCEL_FD))
ret = io_timeout_cancel(ctx, cd);
@@ -1,4 +1,6 @@
// SPDX-License-Identifier: GPL-2.0
+#ifndef IORING_CANCEL_H
+#define IORING_CANCEL_H
#include <linux/io_uring_types.h>
@@ -22,3 +24,5 @@ void init_hash_table(struct io_hash_table *table, unsigned size);
int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg);
bool io_cancel_req_match(struct io_kiocb *req, struct io_cancel_data *cd);
+
+#endif
new file mode 100644
@@ -0,0 +1,232 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/kernel.h>
+#include <linux/errno.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/io_uring.h>
+
+#include <uapi/linux/io_uring.h>
+
+#include "../kernel/futex/futex.h"
+#include "io_uring.h"
+#include "rsrc.h"
+#include "futex.h"
+
+struct io_futex {
+ struct file *file;
+ u32 __user *uaddr;
+ int futex_op;
+ unsigned int futex_val;
+ unsigned int futex_flags;
+ unsigned int futex_mask;
+};
+
+struct io_futex_data {
+ union {
+ struct futex_q q;
+ struct io_cache_entry cache;
+ };
+ struct io_kiocb *req;
+};
+
+void io_futex_cache_init(struct io_ring_ctx *ctx)
+{
+ io_alloc_cache_init(&ctx->futex_cache, IO_NODE_ALLOC_CACHE_MAX,
+ sizeof(struct io_futex_data));
+}
+
+static void io_futex_cache_entry_free(struct io_cache_entry *entry)
+{
+ kfree(container_of(entry, struct io_futex_data, cache));
+}
+
+void io_futex_cache_free(struct io_ring_ctx *ctx)
+{
+ io_alloc_cache_free(&ctx->futex_cache, io_futex_cache_entry_free);
+}
+
+static void io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts)
+{
+ struct io_futex_data *ifd = req->async_data;
+ struct io_ring_ctx *ctx = req->ctx;
+
+ io_tw_lock(ctx, ts);
+ if (!io_alloc_cache_put(&ctx->futex_cache, &ifd->cache))
+ kfree(ifd);
+ req->async_data = NULL;
+ hlist_del_init(&req->hash_node);
+ io_req_task_complete(req, ts);
+}
+
+static bool __io_futex_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+ struct io_futex_data *ifd = req->async_data;
+
+ /* futex wake already done or in progress */
+ if (!futex_unqueue(&ifd->q))
+ return false;
+
+ hlist_del_init(&req->hash_node);
+ io_req_set_res(req, -ECANCELED, 0);
+ req->io_task_work.func = io_futex_complete;
+ io_req_task_work_add(req);
+ return true;
+}
+
+int io_futex_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
+ unsigned int issue_flags)
+{
+ struct hlist_node *tmp;
+ struct io_kiocb *req;
+ int nr = 0;
+
+ if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_FD_FIXED))
+ return -ENOENT;
+
+ io_ring_submit_lock(ctx, issue_flags);
+ hlist_for_each_entry_safe(req, tmp, &ctx->futex_list, hash_node) {
+ if (req->cqe.user_data != cd->data &&
+ !(cd->flags & IORING_ASYNC_CANCEL_ANY))
+ continue;
+ if (__io_futex_cancel(ctx, req))
+ nr++;
+ if (!(cd->flags & IORING_ASYNC_CANCEL_ALL))
+ break;
+ }
+ io_ring_submit_unlock(ctx, issue_flags);
+
+ if (nr)
+ return nr;
+
+ return -ENOENT;
+}
+
+bool io_futex_remove_all(struct io_ring_ctx *ctx, struct task_struct *task,
+ bool cancel_all)
+{
+ struct hlist_node *tmp;
+ struct io_kiocb *req;
+ bool found = false;
+
+ lockdep_assert_held(&ctx->uring_lock);
+
+ hlist_for_each_entry_safe(req, tmp, &ctx->futex_list, hash_node) {
+ if (!io_match_task_safe(req, task, cancel_all))
+ continue;
+ __io_futex_cancel(ctx, req);
+ found = true;
+ }
+
+ return found;
+}
+
+int io_futex_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+ struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
+
+ if (unlikely(sqe->addr2 || sqe->buf_index || sqe->addr3))
+ return -EINVAL;
+
+ iof->futex_op = READ_ONCE(sqe->fd);
+ iof->uaddr = u64_to_user_ptr(READ_ONCE(sqe->addr));
+ iof->futex_val = READ_ONCE(sqe->len);
+ iof->futex_mask = READ_ONCE(sqe->file_index);
+ iof->futex_flags = READ_ONCE(sqe->futex_flags);
+ if (iof->futex_flags & FUTEX_CMD_MASK)
+ return -EINVAL;
+
+ return 0;
+}
+
+static void io_futex_wake_fn(struct wake_q_head *wake_q, struct futex_q *q)
+{
+ struct io_futex_data *ifd = container_of(q, struct io_futex_data, q);
+ struct io_kiocb *req = ifd->req;
+
+ __futex_unqueue(q);
+ smp_store_release(&q->lock_ptr, NULL);
+
+ io_req_set_res(req, 0, 0);
+ req->io_task_work.func = io_futex_complete;
+ io_req_task_work_add(req);
+}
+
+static struct io_futex_data *io_alloc_ifd(struct io_ring_ctx *ctx)
+{
+ struct io_cache_entry *entry;
+
+ entry = io_alloc_cache_get(&ctx->futex_cache);
+ if (entry)
+ return container_of(entry, struct io_futex_data, cache);
+
+ return kmalloc(sizeof(struct io_futex_data), GFP_NOWAIT);
+}
+
+int io_futex_wait(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
+ struct io_ring_ctx *ctx = req->ctx;
+ struct io_futex_data *ifd = NULL;
+ struct futex_hash_bucket *hb;
+ unsigned int flags = 0;
+ int ret;
+
+ if (!iof->futex_mask) {
+ ret = -EINVAL;
+ goto done;
+ }
+ if (!futex_op_to_flags(FUTEX_WAIT, iof->futex_flags, &flags)) {
+ ret = -ENOSYS;
+ goto done;
+ }
+
+ io_ring_submit_lock(ctx, issue_flags);
+ ifd = io_alloc_ifd(ctx);
+ if (!ifd) {
+ ret = -ENOMEM;
+ goto done_unlock;
+ }
+
+ req->async_data = ifd;
+ ifd->q = futex_q_init;
+ ifd->q.bitset = iof->futex_mask;
+ ifd->q.wake = io_futex_wake_fn;
+ ifd->req = req;
+
+ ret = futex_wait_setup(iof->uaddr, iof->futex_val, flags, &ifd->q, &hb);
+ if (!ret) {
+ hlist_add_head(&req->hash_node, &ctx->futex_list);
+ io_ring_submit_unlock(ctx, issue_flags);
+
+ futex_queue(&ifd->q, hb);
+ return IOU_ISSUE_SKIP_COMPLETE;
+ }
+
+done_unlock:
+ io_ring_submit_unlock(ctx, issue_flags);
+done:
+ if (ret < 0)
+ req_set_fail(req);
+ io_req_set_res(req, ret, 0);
+ kfree(ifd);
+ return IOU_OK;
+}
+
+int io_futex_wake(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
+ unsigned int flags = 0;
+ int ret;
+
+ if (!futex_op_to_flags(FUTEX_WAKE, iof->futex_flags, &flags)) {
+ ret = -ENOSYS;
+ goto done;
+ }
+
+ ret = futex_wake(iof->uaddr, flags, iof->futex_val, iof->futex_mask);
+done:
+ if (ret < 0)
+ req_set_fail(req);
+ io_req_set_res(req, ret, 0);
+ return IOU_OK;
+}
new file mode 100644
@@ -0,0 +1,34 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include "cancel.h"
+
+int io_futex_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_futex_wait(struct io_kiocb *req, unsigned int issue_flags);
+int io_futex_wake(struct io_kiocb *req, unsigned int issue_flags);
+
+#if defined(CONFIG_FUTEX)
+int io_futex_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
+ unsigned int issue_flags);
+bool io_futex_remove_all(struct io_ring_ctx *ctx, struct task_struct *task,
+ bool cancel_all);
+void io_futex_cache_init(struct io_ring_ctx *ctx);
+void io_futex_cache_free(struct io_ring_ctx *ctx);
+#else
+static inline int io_futex_cancel(struct io_ring_ctx *ctx,
+ struct io_cancel_data *cd,
+ unsigned int issue_flags)
+{
+ return 0;
+}
+static inline bool io_futex_remove_all(struct io_ring_ctx *ctx,
+ struct task_struct *task, bool cancel_all)
+{
+ return false;
+}
+static inline void io_futex_cache_init(struct io_ring_ctx *ctx)
+{
+}
+static inline void io_futex_cache_free(struct io_ring_ctx *ctx)
+{
+}
+#endif
@@ -92,6 +92,7 @@
#include "cancel.h"
#include "net.h"
#include "notif.h"
+#include "futex.h"
#include "timeout.h"
#include "poll.h"
@@ -314,6 +315,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
sizeof(struct async_poll));
io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
sizeof(struct io_async_msghdr));
+ io_futex_cache_init(ctx);
init_completion(&ctx->ref_comp);
xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
mutex_init(&ctx->uring_lock);
@@ -333,6 +335,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->tctx_list);
ctx->submit_state.free_list.next = NULL;
INIT_WQ_LIST(&ctx->locked_free_list);
+ INIT_HLIST_HEAD(&ctx->futex_list);
INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
return ctx;
@@ -2842,6 +2845,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_eventfd_unregister(ctx);
io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
+ io_futex_cache_free(ctx);
io_destroy_buffers(ctx);
mutex_unlock(&ctx->uring_lock);
if (ctx->sq_creds)
@@ -3254,6 +3258,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
ret |= io_cancel_defer_files(ctx, task, cancel_all);
mutex_lock(&ctx->uring_lock);
ret |= io_poll_remove_all(ctx, task, cancel_all);
+ ret |= io_futex_remove_all(ctx, task, cancel_all);
mutex_unlock(&ctx->uring_lock);
ret |= io_kill_timeouts(ctx, task, cancel_all);
if (task)
@@ -33,6 +33,7 @@
#include "poll.h"
#include "cancel.h"
#include "rw.h"
+#include "futex.h"
static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
{
@@ -426,11 +427,26 @@ const struct io_issue_def io_issue_defs[] = {
.issue = io_sendmsg_zc,
#else
.prep = io_eopnotsupp_prep,
+#endif
+ },
+ [IORING_OP_FUTEX_WAIT] = {
+#if defined(CONFIG_FUTEX)
+ .prep = io_futex_prep,
+ .issue = io_futex_wait,
+#else
+ .prep = io_eopnotsupp_prep,
+#endif
+ },
+ [IORING_OP_FUTEX_WAKE] = {
+#if defined(CONFIG_FUTEX)
+ .prep = io_futex_prep,
+ .issue = io_futex_wake,
+#else
+ .prep = io_eopnotsupp_prep,
#endif
},
};
-
const struct io_cold_def io_cold_defs[] = {
[IORING_OP_NOP] = {
.name = "NOP",
@@ -648,6 +664,12 @@ const struct io_cold_def io_cold_defs[] = {
.fail = io_sendrecv_fail,
#endif
},
+ [IORING_OP_FUTEX_WAIT] = {
+ .name = "FUTEX_WAIT",
+ },
+ [IORING_OP_FUTEX_WAKE] = {
+ .name = "FUTEX_WAKE",
+ },
};
const char *io_uring_get_opcode(u8 opcode)