summaryrefslogtreecommitdiff
path: root/io_uring
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring')
-rw-r--r--io_uring/io_uring.c63
-rw-r--r--io_uring/register.c8
-rw-r--r--io_uring/rsrc.h14
-rw-r--r--io_uring/rw.c48
4 files changed, 86 insertions, 47 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 86761ec623f9..cd9a137ad6ce 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -137,6 +137,14 @@ struct io_defer_entry {
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
+/*
+ * No waiters. It's larger than any valid value of the tw counter
+ * so that tests against ->cq_wait_nr would fail and skip wake_up().
+ */
+#define IO_CQ_WAKE_INIT (-1U)
+/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
+#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
+
static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
struct task_struct *task,
bool cancel_all);
@@ -303,6 +311,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
goto err;
ctx->flags = p->flags;
+ atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
init_waitqueue_head(&ctx->sqo_sq_wait);
INIT_LIST_HEAD(&ctx->sqd_list);
INIT_LIST_HEAD(&ctx->cq_overflow_list);
@@ -1304,16 +1313,23 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
struct io_ring_ctx *ctx = req->ctx;
unsigned nr_wait, nr_tw, nr_tw_prev;
- struct llist_node *first;
+ struct llist_node *head;
+
+ /* See comment above IO_CQ_WAKE_INIT */
+ BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
+ /*
+ * We don't know how many reuqests is there in the link and whether
+ * they can even be queued lazily, fall back to non-lazy.
+ */
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
flags &= ~IOU_F_TWQ_LAZY_WAKE;
- first = READ_ONCE(ctx->work_llist.first);
+ head = READ_ONCE(ctx->work_llist.first);
do {
nr_tw_prev = 0;
- if (first) {
- struct io_kiocb *first_req = container_of(first,
+ if (head) {
+ struct io_kiocb *first_req = container_of(head,
struct io_kiocb,
io_task_work.node);
/*
@@ -1322,17 +1338,29 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
*/
nr_tw_prev = READ_ONCE(first_req->nr_tw);
}
+
+ /*
+ * Theoretically, it can overflow, but that's fine as one of
+ * previous adds should've tried to wake the task.
+ */
nr_tw = nr_tw_prev + 1;
- /* Large enough to fail the nr_wait comparison below */
if (!(flags & IOU_F_TWQ_LAZY_WAKE))
- nr_tw = -1U;
+ nr_tw = IO_CQ_WAKE_FORCE;
req->nr_tw = nr_tw;
- req->io_task_work.node.next = first;
- } while (!try_cmpxchg(&ctx->work_llist.first, &first,
+ req->io_task_work.node.next = head;
+ } while (!try_cmpxchg(&ctx->work_llist.first, &head,
&req->io_task_work.node));
- if (!first) {
+ /*
+ * cmpxchg implies a full barrier, which pairs with the barrier
+ * in set_current_state() on the io_cqring_wait() side. It's used
+ * to ensure that either we see updated ->cq_wait_nr, or waiters
+ * going to sleep will observe the work added to the list, which
+ * is similar to the wait/wawke task state sync.
+ */
+
+ if (!head) {
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
if (ctx->has_evfd)
@@ -1340,14 +1368,12 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
}
nr_wait = atomic_read(&ctx->cq_wait_nr);
- /* no one is waiting */
- if (!nr_wait)
+ /* not enough or no one is waiting */
+ if (nr_tw < nr_wait)
return;
- /* either not enough or the previous add has already woken it up */
- if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
+ /* the previous add has already woken it up */
+ if (nr_tw_prev >= nr_wait)
return;
- /* pairs with set_current_state() in io_cqring_wait() */
- smp_mb__after_atomic();
wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}
@@ -2000,9 +2026,10 @@ inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
goto out;
fd = array_index_nospec(fd, ctx->nr_user_files);
slot = io_fixed_file_slot(&ctx->file_table, fd);
- file = io_slot_file(slot);
+ if (!req->rsrc_node)
+ __io_req_set_rsrc_node(req, ctx);
req->flags |= io_slot_flags(slot);
- io_req_set_rsrc_node(req, ctx, 0);
+ file = io_slot_file(slot);
out:
io_ring_submit_unlock(ctx, issue_flags);
return file;
@@ -2613,7 +2640,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
ret = io_cqring_wait_schedule(ctx, &iowq);
__set_current_state(TASK_RUNNING);
- atomic_set(&ctx->cq_wait_nr, 0);
+ atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
/*
* Run task_work after scheduling and before io_should_wake().
diff --git a/io_uring/register.c b/io_uring/register.c
index 708dd1d89add..5e62c1208996 100644
--- a/io_uring/register.c
+++ b/io_uring/register.c
@@ -14,6 +14,7 @@
#include <linux/slab.h>
#include <linux/uaccess.h>
#include <linux/nospec.h>
+#include <linux/compat.h>
#include <linux/io_uring.h>
#include <linux/io_uring_types.h>
@@ -278,13 +279,14 @@ static __cold int io_register_iowq_aff(struct io_ring_ctx *ctx,
if (len > cpumask_size())
len = cpumask_size();
- if (in_compat_syscall()) {
+#ifdef CONFIG_COMPAT
+ if (in_compat_syscall())
ret = compat_get_bitmap(cpumask_bits(new_mask),
(const compat_ulong_t __user *)arg,
len * 8 /* CHAR_BIT */);
- } else {
+ else
+#endif
ret = copy_from_user(new_mask, arg, len);
- }
if (ret) {
free_cpumask_var(new_mask);
diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h
index 7238b9cfe33b..c6f199bbee28 100644
--- a/io_uring/rsrc.h
+++ b/io_uring/rsrc.h
@@ -102,17 +102,21 @@ static inline void io_charge_rsrc_node(struct io_ring_ctx *ctx,
node->refs++;
}
+static inline void __io_req_set_rsrc_node(struct io_kiocb *req,
+ struct io_ring_ctx *ctx)
+{
+ lockdep_assert_held(&ctx->uring_lock);
+ req->rsrc_node = ctx->rsrc_node;
+ io_charge_rsrc_node(ctx, ctx->rsrc_node);
+}
+
static inline void io_req_set_rsrc_node(struct io_kiocb *req,
struct io_ring_ctx *ctx,
unsigned int issue_flags)
{
if (!req->rsrc_node) {
io_ring_submit_lock(ctx, issue_flags);
-
- lockdep_assert_held(&ctx->uring_lock);
-
- req->rsrc_node = ctx->rsrc_node;
- io_charge_rsrc_node(ctx, ctx->rsrc_node);
+ __io_req_set_rsrc_node(req, ctx);
io_ring_submit_unlock(ctx, issue_flags);
}
}
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 0c856726b15d..118cc9f1cf16 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -168,27 +168,6 @@ void io_readv_writev_cleanup(struct io_kiocb *req)
kfree(io->free_iovec);
}
-static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
-{
- switch (ret) {
- case -EIOCBQUEUED:
- break;
- case -ERESTARTSYS:
- case -ERESTARTNOINTR:
- case -ERESTARTNOHAND:
- case -ERESTART_RESTARTBLOCK:
- /*
- * We can't just restart the syscall, since previously
- * submitted sqes may already be in progress. Just fail this
- * IO with EINTR.
- */
- ret = -EINTR;
- fallthrough;
- default:
- kiocb->ki_complete(kiocb, ret);
- }
-}
-
static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
{
struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
@@ -371,6 +350,33 @@ static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
smp_store_release(&req->iopoll_completed, 1);
}
+static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
+{
+ /* IO was queued async, completion will happen later */
+ if (ret == -EIOCBQUEUED)
+ return;
+
+ /* transform internal restart error codes */
+ if (unlikely(ret < 0)) {
+ switch (ret) {
+ case -ERESTARTSYS:
+ case -ERESTARTNOINTR:
+ case -ERESTARTNOHAND:
+ case -ERESTART_RESTARTBLOCK:
+ /*
+ * We can't just restart the syscall, since previously
+ * submitted sqes may already be in progress. Just fail
+ * this IO with EINTR.
+ */
+ ret = -EINTR;
+ break;
+ }
+ }
+
+ INDIRECT_CALL_2(kiocb->ki_complete, io_complete_rw_iopoll,
+ io_complete_rw, kiocb, ret);
+}
+
static int kiocb_done(struct io_kiocb *req, ssize_t ret,
unsigned int issue_flags)
{