diff options
-rw-r--r-- | include/linux/io_uring_types.h | 2 | ||||
-rw-r--r-- | include/uapi/linux/io_uring.h | 7 | ||||
-rw-r--r-- | io_uring/cancel.c | 2 | ||||
-rw-r--r-- | io_uring/io_uring.c | 147 | ||||
-rw-r--r-- | io_uring/io_uring.h | 29 | ||||
-rw-r--r-- | io_uring/rsrc.c | 2 |
6 files changed, 168 insertions, 21 deletions
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 677a25d44d7f..d56ff2185168 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -301,6 +301,8 @@ struct io_ring_ctx { struct io_hash_table cancel_table; bool poll_multi_queue; + struct llist_head work_llist; + struct list_head io_buffers_comp; } ____cacheline_aligned_in_smp; diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 6b83177fd41d..972b179bc07a 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -157,6 +157,13 @@ enum { */ #define IORING_SETUP_SINGLE_ISSUER (1U << 12) +/* + * Defer running task work to get events. + * Rather than running bits of task work whenever the task transitions + * try to do it just before it is needed. + */ +#define IORING_SETUP_DEFER_TASKRUN (1U << 13) + enum io_uring_op { IORING_OP_NOP, IORING_OP_READV, diff --git a/io_uring/cancel.c b/io_uring/cancel.c index 5fc5d3e80fcb..2291a53cdabd 100644 --- a/io_uring/cancel.c +++ b/io_uring/cancel.c @@ -292,7 +292,7 @@ int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg) break; mutex_unlock(&ctx->uring_lock); - ret = io_run_task_work_sig(); + ret = io_run_task_work_sig(ctx); if (ret < 0) { mutex_lock(&ctx->uring_lock); break; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index edf7381b0215..1f0df14c3062 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -142,7 +142,7 @@ static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, static void io_dismantle_req(struct io_kiocb *req); static void io_clean_op(struct io_kiocb *req); static void io_queue_sqe(struct io_kiocb *req); - +static void io_move_task_work_from_local(struct io_ring_ctx *ctx); static void __io_submit_flush_completions(struct io_ring_ctx *ctx); static struct kmem_cache *req_cachep; @@ -316,6 +316,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->rsrc_ref_list); INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work); init_llist_head(&ctx->rsrc_put_llist); + init_llist_head(&ctx->work_llist); INIT_LIST_HEAD(&ctx->tctx_list); ctx->submit_state.free_list.next = NULL; INIT_WQ_LIST(&ctx->locked_free_list); @@ -1047,12 +1048,36 @@ void tctx_task_work(struct callback_head *cb) trace_io_uring_task_work_run(tctx, count, loops); } -void io_req_task_work_add(struct io_kiocb *req) +static void io_req_local_work_add(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + + if (!llist_add(&req->io_task_work.node, &ctx->work_llist)) + return; + + if (unlikely(atomic_read(&req->task->io_uring->in_idle))) { + io_move_task_work_from_local(ctx); + return; + } + + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + + io_cqring_wake(ctx); + +} + +static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) { struct io_uring_task *tctx = req->task->io_uring; struct io_ring_ctx *ctx = req->ctx; struct llist_node *node; + if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + io_req_local_work_add(req); + return; + } + /* task_work already pending, we're done */ if (!llist_add(&req->io_task_work.node, &tctx->task_list)) return; @@ -1074,6 +1099,73 @@ void io_req_task_work_add(struct io_kiocb *req) } } +void io_req_task_work_add(struct io_kiocb *req) +{ + __io_req_task_work_add(req, true); +} + +static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) +{ + struct llist_node *node; + + node = llist_del_all(&ctx->work_llist); + while (node) { + struct io_kiocb *req = container_of(node, struct io_kiocb, + io_task_work.node); + + node = node->next; + __io_req_task_work_add(req, false); + } +} + +int io_run_local_work(struct io_ring_ctx *ctx) +{ + bool locked; + struct llist_node *node; + struct llist_node fake; + struct llist_node *current_final = NULL; + int ret; + + if (unlikely(ctx->submitter_task != current)) { + /* maybe this is before any submissions */ + if (!ctx->submitter_task) + return 0; + + return -EEXIST; + } + + locked = mutex_trylock(&ctx->uring_lock); + + node = io_llist_xchg(&ctx->work_llist, &fake); + ret = 0; +again: + while (node != current_final) { + struct llist_node *next = node->next; + struct io_kiocb *req = container_of(node, struct io_kiocb, + io_task_work.node); + prefetch(container_of(next, struct io_kiocb, io_task_work.node)); + req->io_task_work.func(req, &locked); + ret++; + node = next; + } + + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + + node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL); + if (node != &fake) { + current_final = &fake; + node = io_llist_xchg(&ctx->work_llist, &fake); + goto again; + } + + if (locked) { + io_submit_flush_completions(ctx); + mutex_unlock(&ctx->uring_lock); + } + return ret; +} + static void io_req_tw_post(struct io_kiocb *req, bool *locked) { io_req_complete_post(req); @@ -1285,8 +1377,10 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) u32 tail = ctx->cached_cq_tail; mutex_unlock(&ctx->uring_lock); - io_run_task_work(); + ret = io_run_task_work_ctx(ctx); mutex_lock(&ctx->uring_lock); + if (ret < 0) + break; /* some requests don't go through iopoll_list */ if (tail != ctx->cached_cq_tail || @@ -2148,7 +2242,9 @@ struct io_wait_queue { static inline bool io_has_work(struct io_ring_ctx *ctx) { - return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); + return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) || + ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && + !llist_empty(&ctx->work_llist)); } static inline bool io_should_wake(struct io_wait_queue *iowq) @@ -2180,9 +2276,9 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, return -1; } -int io_run_task_work_sig(void) +int io_run_task_work_sig(struct io_ring_ctx *ctx) { - if (io_run_task_work()) + if (io_run_task_work_ctx(ctx) > 0) return 1; if (task_sigpending(current)) return -EINTR; @@ -2198,7 +2294,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, unsigned long check_cq; /* make sure we run task_work before checking for signals */ - ret = io_run_task_work_sig(); + ret = io_run_task_work_sig(ctx); if (ret || io_should_wake(iowq)) return ret; @@ -2229,12 +2325,14 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, int ret; do { + /* always run at least 1 task work to process local work */ + ret = io_run_task_work_ctx(ctx); + if (ret < 0) + return ret; io_cqring_overflow_flush(ctx); if (io_cqring_events(ctx) >= min_events) return 0; - if (!io_run_task_work()) - break; - } while (1); + } while (ret > 0); if (sig) { #ifdef CONFIG_COMPAT @@ -2575,6 +2673,9 @@ static __cold void io_ring_exit_work(struct work_struct *work) * as nobody else will be looking for them. */ do { + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) + io_move_task_work_from_local(ctx); + while (io_uring_try_cancel_requests(ctx, NULL, true)) cond_resched(); @@ -2769,13 +2870,15 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, } } + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) + ret |= io_run_local_work(ctx) > 0; ret |= io_cancel_defer_files(ctx, task, cancel_all); mutex_lock(&ctx->uring_lock); ret |= io_poll_remove_all(ctx, task, cancel_all); mutex_unlock(&ctx->uring_lock); ret |= io_kill_timeouts(ctx, task, cancel_all); if (task) - ret |= io_run_task_work(); + ret |= io_run_task_work() > 0; return ret; } @@ -3060,8 +3163,10 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, goto iopoll_locked; mutex_unlock(&ctx->uring_lock); } + if (flags & IORING_ENTER_GETEVENTS) { int ret2; + if (ctx->syscall_iopoll) { /* * We disallow the app entering submit/complete with @@ -3290,18 +3395,30 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (ctx->flags & IORING_SETUP_SQPOLL) { /* IPI related flags don't make sense with SQPOLL */ if (ctx->flags & (IORING_SETUP_COOP_TASKRUN | - IORING_SETUP_TASKRUN_FLAG)) + IORING_SETUP_TASKRUN_FLAG | + IORING_SETUP_DEFER_TASKRUN)) goto err; ctx->notify_method = TWA_SIGNAL_NO_IPI; } else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) { ctx->notify_method = TWA_SIGNAL_NO_IPI; } else { - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG && + !(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) goto err; ctx->notify_method = TWA_SIGNAL; } /* + * For DEFER_TASKRUN we require the completion task to be the same as the + * submission task. This implies that there is only one submitter, so enforce + * that. + */ + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN && + !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) { + goto err; + } + + /* * This is just grabbed for accounting purposes. When a process exits, * the mm is exited and dropped before the files, hence we need to hang * on to this mm purely for the purposes of being able to unaccount @@ -3401,7 +3518,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params) IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL | IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_SQE128 | IORING_SETUP_CQE32 | - IORING_SETUP_SINGLE_ISSUER)) + IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN)) return -EINVAL; return io_uring_create(entries, &p, params); @@ -3864,7 +3981,7 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, ctx = f.file->private_data; - io_run_task_work(); + io_run_task_work_ctx(ctx); mutex_lock(&ctx->uring_lock); ret = __io_uring_register(ctx, opcode, arg, nr_args); diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 2f73f83af960..f417d75d7bc1 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -26,7 +26,8 @@ enum { struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx); bool io_req_cqe_overflow(struct io_kiocb *req); -int io_run_task_work_sig(void); +int io_run_task_work_sig(struct io_ring_ctx *ctx); +int io_run_local_work(struct io_ring_ctx *ctx); void io_req_complete_failed(struct io_kiocb *req, s32 res); void __io_req_complete(struct io_kiocb *req, unsigned issue_flags); void io_req_complete_post(struct io_kiocb *req); @@ -221,17 +222,37 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head; } -static inline bool io_run_task_work(void) +static inline int io_run_task_work(void) { if (test_thread_flag(TIF_NOTIFY_SIGNAL)) { __set_current_state(TASK_RUNNING); clear_notify_signal(); if (task_work_pending(current)) task_work_run(); - return true; + return 1; } - return false; + return 0; +} + +static inline int io_run_task_work_ctx(struct io_ring_ctx *ctx) +{ + int ret = 0; + int ret2; + + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) + ret = io_run_local_work(ctx); + + /* want to run this after in case more is added */ + ret2 = io_run_task_work(); + + /* Try propagate error in favour of if tasks were run, + * but still make sure to run them if requested + */ + if (ret >= 0) + ret += ret2; + + return ret; } static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked) diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index cf3272113214..6f88ded0e7e5 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -341,7 +341,7 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, flush_delayed_work(&ctx->rsrc_put_work); reinit_completion(&data->done); - ret = io_run_task_work_sig(); + ret = io_run_task_work_sig(ctx); mutex_lock(&ctx->uring_lock); } while (ret >= 0); data->quiesce = false; |