[7/7] io_uring: add futex waitv

Message ID 20230712004705.316157-8-axboe@kernel.dk
State New
Headers
Series Add io_uring futex/futexv support |

Commit Message

Jens Axboe July 12, 2023, 12:47 a.m. UTC
  Needs a bit of splitting and a few hunks should go further back (like
the wake handler typedef).

WIP, adds IORING_OP_FUTEX_WAITV - pass in an array of futex addresses,
and wait on all of them until one of them triggers.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 include/uapi/linux/io_uring.h |   1 +
 io_uring/futex.c              | 165 +++++++++++++++++++++++++++++++---
 io_uring/futex.h              |   2 +
 io_uring/opdef.c              |  11 +++
 4 files changed, 169 insertions(+), 10 deletions(-)
  

Comments

Peter Zijlstra July 12, 2023, 9:31 a.m. UTC | #1
On Tue, Jul 11, 2023 at 06:47:05PM -0600, Jens Axboe wrote:
> Needs a bit of splitting and a few hunks should go further back (like
> the wake handler typedef).
> 
> WIP, adds IORING_OP_FUTEX_WAITV - pass in an array of futex addresses,
> and wait on all of them until one of them triggers.
> 

So I'm once again not following. FUTEX_WAITV is to wait on multiple
futexes and get a notification when any one of them wakes up with an
index to indicate which one.

How exactly is that different from multiple FUTEX_WAIT entries in the
io_uring thing itself? Admittedly I don't actually know much of anything
when it comes to io_uring, but isn't the idea that queue multiple
'syscall' like things and get individual completions back?

So how does WAITV make sense here?
  
Jens Axboe July 12, 2023, 3:10 p.m. UTC | #2
On 7/12/23 3:31?AM, Peter Zijlstra wrote:
> On Tue, Jul 11, 2023 at 06:47:05PM -0600, Jens Axboe wrote:
>> Needs a bit of splitting and a few hunks should go further back (like
>> the wake handler typedef).
>>
>> WIP, adds IORING_OP_FUTEX_WAITV - pass in an array of futex addresses,
>> and wait on all of them until one of them triggers.
>>
> 
> So I'm once again not following. FUTEX_WAITV is to wait on multiple
> futexes and get a notification when any one of them wakes up with an
> index to indicate which one.

Right

> How exactly is that different from multiple FUTEX_WAIT entries in the
> io_uring thing itself? Admittedly I don't actually know much of anything
> when it comes to io_uring, but isn't the idea that queue multiple
> 'syscall' like things and get individual completions back?
> 
> So how does WAITV make sense here?

You most certainly could just queue N FUTEX_WAIT operations rather than
a single FUTEX_WAITV, but it becomes pretty cumbersome to deal with.
First of all, you'd now get N completions you have to deal with. That's
obviously doable, but you'd probably also need to care about
cancelations of the N-1 FUTEX_WAIT that weren't triggered.

For those reasons, I do think having a separate FUTEX_WAITV makes a lot
more sense. It's a single request and there's no cleanup or cancelation
work to run when just one futex triggers. Tongue in cheek, but you could
also argue that why would you need futex waitv support in the kernel,
when you could just have N processes wait on N futexes? We can certainly
do that a LOT more efficiently with io_uring even without FUTEX_WAITV,
but from an efficiency and usability point of view, having FUTEX_WAITV
makes this a lot easier than dealing with N requests and cancelations on
completion.
  

Patch

diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 3bd2d765f593..420f38675769 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -238,6 +238,7 @@  enum io_uring_op {
 	IORING_OP_SENDMSG_ZC,
 	IORING_OP_FUTEX_WAIT,
 	IORING_OP_FUTEX_WAKE,
+	IORING_OP_FUTEX_WAITV,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
diff --git a/io_uring/futex.c b/io_uring/futex.c
index ff0f6b394756..b22120545d31 100644
--- a/io_uring/futex.c
+++ b/io_uring/futex.c
@@ -14,11 +14,16 @@ 
 
 struct io_futex {
 	struct file	*file;
-	u32 __user	*uaddr;
+	union {
+		u32 __user			*uaddr;
+		struct futex_waitv __user	*uwaitv;
+	};
 	int		futex_op;
 	unsigned int	futex_val;
 	unsigned int	futex_flags;
 	unsigned int	futex_mask;
+	unsigned int	futex_nr;
+	unsigned long	futexv_owned;
 };
 
 struct io_futex_data {
@@ -45,6 +50,13 @@  void io_futex_cache_free(struct io_ring_ctx *ctx)
 	io_alloc_cache_free(&ctx->futex_cache, io_futex_cache_entry_free);
 }
 
+static void __io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts)
+{
+	req->async_data = NULL;
+	hlist_del_init(&req->hash_node);
+	io_req_task_complete(req, ts);
+}
+
 static void io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts)
 {
 	struct io_futex_data *ifd = req->async_data;
@@ -53,22 +65,59 @@  static void io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts)
 	io_tw_lock(ctx, ts);
 	if (!io_alloc_cache_put(&ctx->futex_cache, &ifd->cache))
 		kfree(ifd);
-	req->async_data = NULL;
-	hlist_del_init(&req->hash_node);
-	io_req_task_complete(req, ts);
+	__io_futex_complete(req, ts);
 }
 
-static bool __io_futex_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req)
+static void io_futexv_complete(struct io_kiocb *req, struct io_tw_state *ts)
 {
-	struct io_futex_data *ifd = req->async_data;
+	struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
+	struct futex_vector *futexv = req->async_data;
+	struct io_ring_ctx *ctx = req->ctx;
+	int res = 0;
 
-	/* futex wake already done or in progress */
-	if (!futex_unqueue(&ifd->q))
+	io_tw_lock(ctx, ts);
+
+	res = futex_unqueue_multiple(futexv, iof->futex_nr);
+	if (res != -1)
+		io_req_set_res(req, res, 0);
+
+	kfree(req->async_data);
+	req->flags &= ~REQ_F_ASYNC_DATA;
+	__io_futex_complete(req, ts);
+}
+
+static bool io_futexv_claimed(struct io_futex *iof)
+{
+	return test_bit(0, &iof->futexv_owned);
+}
+
+static bool io_futexv_claim(struct io_futex *iof)
+{
+	if (test_bit(0, &iof->futexv_owned) ||
+	    test_and_set_bit(0, &iof->futexv_owned))
 		return false;
+	return true;
+}
+
+static bool __io_futex_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+	/* futex wake already done or in progress */
+	if (req->opcode == IORING_OP_FUTEX_WAIT) {
+		struct io_futex_data *ifd = req->async_data;
+
+		if (!futex_unqueue(&ifd->q))
+			return false;
+		req->io_task_work.func = io_futex_complete;
+	} else {
+		struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
+
+		if (!io_futexv_claim(iof))
+			return false;
+		req->io_task_work.func = io_futexv_complete;
+	}
 
 	hlist_del_init(&req->hash_node);
 	io_req_set_res(req, -ECANCELED, 0);
-	req->io_task_work.func = io_futex_complete;
 	io_req_task_work_add(req);
 	return true;
 }
@@ -124,7 +173,7 @@  int io_futex_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
 	struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
 
-	if (unlikely(sqe->addr2 || sqe->buf_index || sqe->addr3))
+	if (unlikely(sqe->buf_index || sqe->addr3))
 		return -EINVAL;
 
 	iof->futex_op = READ_ONCE(sqe->fd);
@@ -135,6 +184,53 @@  int io_futex_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	if (iof->futex_flags & FUTEX_CMD_MASK)
 		return -EINVAL;
 
+	iof->futexv_owned = 0;
+	return 0;
+}
+
+static void io_futex_wakev_fn(struct wake_q_head *wake_q, struct futex_q *q)
+{
+	struct io_kiocb *req = q->wake_data;
+	struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
+
+	if (!io_futexv_claim(iof))
+		return;
+
+	__futex_unqueue(q);
+	smp_store_release(&q->lock_ptr, NULL);
+
+	io_req_set_res(req, 0, 0);
+	req->io_task_work.func = io_futexv_complete;
+	io_req_task_work_add(req);
+}
+
+int io_futexv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
+	struct futex_vector *futexv;
+	int ret;
+
+	ret = io_futex_prep(req, sqe);
+	if (ret)
+		return ret;
+
+	iof->futex_nr = READ_ONCE(sqe->off);
+	if (!iof->futex_nr || iof->futex_nr > FUTEX_WAITV_MAX)
+		return -EINVAL;
+
+	futexv = kcalloc(iof->futex_nr, sizeof(*futexv), GFP_KERNEL);
+	if (!futexv)
+		return -ENOMEM;
+
+	ret = futex_parse_waitv(futexv, iof->uwaitv, iof->futex_nr,
+				io_futex_wakev_fn, req);
+	if (ret) {
+		kfree(futexv);
+		return ret;
+	}
+
+	req->flags |= REQ_F_ASYNC_DATA;
+	req->async_data = futexv;
 	return 0;
 }
 
@@ -162,6 +258,55 @@  static struct io_futex_data *io_alloc_ifd(struct io_ring_ctx *ctx)
 	return kmalloc(sizeof(struct io_futex_data), GFP_NOWAIT);
 }
 
+int io_futex_waitv(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
+	struct futex_vector *futexv = req->async_data;
+	struct io_ring_ctx *ctx = req->ctx;
+	int ret, woken = -1;
+
+	io_ring_submit_lock(ctx, issue_flags);
+
+	ret = futex_wait_multiple_setup(futexv, iof->futex_nr, &woken);
+
+	/*
+	 * The above call leaves us potentially non-running. This is fine
+	 * for the sync syscall as it'll be blocking unless we already got
+	 * one of the futexes woken, but it obviously won't work for an async
+	 * invocation. Mark is runnable again.
+	 */
+	__set_current_state(TASK_RUNNING);
+
+	/*
+	 * We got woken while setting up, let that side do the completion
+	 */
+	if (io_futexv_claimed(iof)) {
+skip:
+		io_ring_submit_unlock(ctx, issue_flags);
+		return IOU_ISSUE_SKIP_COMPLETE;
+	}
+
+	/*
+	 * 0 return means that we successfully setup the waiters, and that
+	 * nobody triggered a wakeup while we were doing so. < 0 or 1 return
+	 * is either an error or we got a wakeup while setting up.
+	 */
+	if (!ret) {
+		hlist_add_head(&req->hash_node, &ctx->futex_list);
+		goto skip;
+	}
+
+	io_ring_submit_unlock(ctx, issue_flags);
+	if (ret < 0)
+		req_set_fail(req);
+	else if (woken != -1)
+		ret = woken;
+	io_req_set_res(req, ret, 0);
+	kfree(futexv);
+	req->flags &= ~REQ_F_ASYNC_DATA;
+	return IOU_OK;
+}
+
 int io_futex_wait(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
diff --git a/io_uring/futex.h b/io_uring/futex.h
index ddc9e0d73c52..7828e27e4184 100644
--- a/io_uring/futex.h
+++ b/io_uring/futex.h
@@ -3,7 +3,9 @@ 
 #include "cancel.h"
 
 int io_futex_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_futexv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_futex_wait(struct io_kiocb *req, unsigned int issue_flags);
+int io_futex_waitv(struct io_kiocb *req, unsigned int issue_flags);
 int io_futex_wake(struct io_kiocb *req, unsigned int issue_flags);
 
 #if defined(CONFIG_FUTEX)
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index c9f23c21a031..2034acfe10d0 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -443,6 +443,14 @@  const struct io_issue_def io_issue_defs[] = {
 		.issue			= io_futex_wake,
 #else
 		.prep			= io_eopnotsupp_prep,
+#endif
+	},
+	[IORING_OP_FUTEX_WAITV] = {
+#if defined(CONFIG_FUTEX)
+		.prep			= io_futexv_prep,
+		.issue			= io_futex_waitv,
+#else
+		.prep			= io_eopnotsupp_prep,
 #endif
 	},
 };
@@ -670,6 +678,9 @@  const struct io_cold_def io_cold_defs[] = {
 	[IORING_OP_FUTEX_WAKE] = {
 		.name			= "FUTEX_WAKE",
 	},
+	[IORING_OP_FUTEX_WAITV] = {
+		.name			= "FUTEX_WAITV",
+	},
 };
 
 const char *io_uring_get_opcode(u8 opcode)