diff options
Diffstat (limited to 'io_uring')
-rw-r--r-- | io_uring/filetable.c | 2 | ||||
-rw-r--r-- | io_uring/io_uring.c | 379 | ||||
-rw-r--r-- | io_uring/io_uring.h | 84 | ||||
-rw-r--r-- | io_uring/kbuf.c | 16 | ||||
-rw-r--r-- | io_uring/msg_ring.c | 168 | ||||
-rw-r--r-- | io_uring/msg_ring.h | 1 | ||||
-rw-r--r-- | io_uring/net.c | 110 | ||||
-rw-r--r-- | io_uring/notif.c | 57 | ||||
-rw-r--r-- | io_uring/notif.h | 15 | ||||
-rw-r--r-- | io_uring/opdef.c | 8 | ||||
-rw-r--r-- | io_uring/opdef.h | 2 | ||||
-rw-r--r-- | io_uring/poll.c | 181 | ||||
-rw-r--r-- | io_uring/rsrc.c | 71 | ||||
-rw-r--r-- | io_uring/rsrc.h | 1 | ||||
-rw-r--r-- | io_uring/rw.c | 16 | ||||
-rw-r--r-- | io_uring/timeout.c | 10 | ||||
-rw-r--r-- | io_uring/uring_cmd.c | 2 | ||||
-rw-r--r-- | io_uring/xattr.c | 8 |
18 files changed, 737 insertions, 394 deletions
diff --git a/io_uring/filetable.c b/io_uring/filetable.c index 7b473259f3f4..68dfc6936aa7 100644 --- a/io_uring/filetable.c +++ b/io_uring/filetable.c @@ -101,8 +101,6 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file, err: if (needs_switch) io_rsrc_node_switch(ctx, ctx->file_data); - if (ret) - fput(file); return ret; } diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 6cc16e39b27f..b521186efa5c 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -149,6 +149,7 @@ static void io_clean_op(struct io_kiocb *req); static void io_queue_sqe(struct io_kiocb *req); static void io_move_task_work_from_local(struct io_ring_ctx *ctx); static void __io_submit_flush_completions(struct io_ring_ctx *ctx); +static __cold void io_fallback_tw(struct io_uring_task *tctx); static struct kmem_cache *req_cachep; @@ -167,7 +168,8 @@ EXPORT_SYMBOL(io_uring_get_socket); static inline void io_submit_flush_completions(struct io_ring_ctx *ctx) { - if (!wq_list_empty(&ctx->submit_state.compl_reqs)) + if (!wq_list_empty(&ctx->submit_state.compl_reqs) || + ctx->submit_state.cqes_count) __io_submit_flush_completions(ctx); } @@ -176,6 +178,11 @@ static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx) return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head); } +static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx) +{ + return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head); +} + static bool io_match_linked(struct io_kiocb *head) { struct io_kiocb *req; @@ -320,6 +327,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) spin_lock_init(&ctx->rsrc_ref_lock); INIT_LIST_HEAD(&ctx->rsrc_ref_list); INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work); + init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw); init_llist_head(&ctx->rsrc_put_llist); init_llist_head(&ctx->work_llist); INIT_LIST_HEAD(&ctx->tctx_list); @@ -490,7 +498,7 @@ static void io_eventfd_ops(struct rcu_head *rcu) int ops = atomic_xchg(&ev_fd->ops, 0); if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT)) - eventfd_signal(ev_fd->cq_ev_fd, 1); + eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE); /* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback * ordering in a race but if references are 0 we know we have to free @@ -526,7 +534,7 @@ static void io_eventfd_signal(struct io_ring_ctx *ctx) goto out; if (likely(eventfd_signal_allowed())) { - eventfd_signal(ev_fd->cq_ev_fd, 1); + eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE); } else { atomic_inc(&ev_fd->refs); if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops)) @@ -576,33 +584,63 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx) io_eventfd_flush_signal(ctx); } -static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx) +static inline void __io_cq_lock(struct io_ring_ctx *ctx) + __acquires(ctx->completion_lock) +{ + if (!ctx->task_complete) + spin_lock(&ctx->completion_lock); +} + +static inline void __io_cq_unlock(struct io_ring_ctx *ctx) +{ + if (!ctx->task_complete) + spin_unlock(&ctx->completion_lock); +} + +/* keep it inlined for io_submit_flush_completions() */ +static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) + __releases(ctx->completion_lock) { + io_commit_cqring(ctx); + __io_cq_unlock(ctx); io_commit_cqring_flush(ctx); io_cqring_wake(ctx); } -static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) +void io_cq_unlock_post(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { io_commit_cqring(ctx); spin_unlock(&ctx->completion_lock); - io_cqring_ev_posted(ctx); + io_commit_cqring_flush(ctx); + io_cqring_wake(ctx); } -void io_cq_unlock_post(struct io_ring_ctx *ctx) +/* Returns true if there are no backlogged entries after the flush */ +static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) { - __io_cq_unlock_post(ctx); + struct io_overflow_cqe *ocqe; + LIST_HEAD(list); + + io_cq_lock(ctx); + list_splice_init(&ctx->cq_overflow_list, &list); + clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); + io_cq_unlock(ctx); + + while (!list_empty(&list)) { + ocqe = list_first_entry(&list, struct io_overflow_cqe, list); + list_del(&ocqe->list); + kfree(ocqe); + } } /* Returns true if there are no backlogged entries after the flush */ -static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) +static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx) { - bool all_flushed; size_t cqe_size = sizeof(struct io_uring_cqe); - if (!force && __io_cqring_events(ctx) == ctx->cq_entries) - return false; + if (__io_cqring_events(ctx) == ctx->cq_entries) + return; if (ctx->flags & IORING_SETUP_CQE32) cqe_size <<= 1; @@ -612,43 +650,32 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true); struct io_overflow_cqe *ocqe; - if (!cqe && !force) + if (!cqe) break; ocqe = list_first_entry(&ctx->cq_overflow_list, struct io_overflow_cqe, list); - if (cqe) - memcpy(cqe, &ocqe->cqe, cqe_size); - else - io_account_cq_overflow(ctx); - + memcpy(cqe, &ocqe->cqe, cqe_size); list_del(&ocqe->list); kfree(ocqe); } - all_flushed = list_empty(&ctx->cq_overflow_list); - if (all_flushed) { + if (list_empty(&ctx->cq_overflow_list)) { clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags); } - io_cq_unlock_post(ctx); - return all_flushed; } -static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx) +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx) { - bool ret = true; - if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { /* iopoll syncs against uring_lock, not completion_lock */ if (ctx->flags & IORING_SETUP_IOPOLL) mutex_lock(&ctx->uring_lock); - ret = __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(ctx); if (ctx->flags & IORING_SETUP_IOPOLL) mutex_unlock(&ctx->uring_lock); } - - return ret; } void __io_put_task(struct task_struct *task, int nr) @@ -773,11 +800,14 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow) return &rings->cqes[off]; } -bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, - bool allow_overflow) +static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, + u32 cflags) { struct io_uring_cqe *cqe; + if (!ctx->task_complete) + lockdep_assert_held(&ctx->completion_lock); + ctx->cq_extra++; /* @@ -799,34 +829,100 @@ bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags } return true; } - - if (allow_overflow) - return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); - return false; } -bool io_post_aux_cqe(struct io_ring_ctx *ctx, - u64 user_data, s32 res, u32 cflags, - bool allow_overflow) +static void __io_flush_post_cqes(struct io_ring_ctx *ctx) + __must_hold(&ctx->uring_lock) +{ + struct io_submit_state *state = &ctx->submit_state; + unsigned int i; + + lockdep_assert_held(&ctx->uring_lock); + for (i = 0; i < state->cqes_count; i++) { + struct io_uring_cqe *cqe = &state->cqes[i]; + + if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) { + if (ctx->task_complete) { + spin_lock(&ctx->completion_lock); + io_cqring_event_overflow(ctx, cqe->user_data, + cqe->res, cqe->flags, 0, 0); + spin_unlock(&ctx->completion_lock); + } else { + io_cqring_event_overflow(ctx, cqe->user_data, + cqe->res, cqe->flags, 0, 0); + } + } + } + state->cqes_count = 0; +} + +static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, + bool allow_overflow) { bool filled; io_cq_lock(ctx); - filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow); + filled = io_fill_cqe_aux(ctx, user_data, res, cflags); + if (!filled && allow_overflow) + filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); + io_cq_unlock_post(ctx); return filled; } -static void __io_req_complete_put(struct io_kiocb *req) +bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags) +{ + return __io_post_aux_cqe(ctx, user_data, res, cflags, true); +} + +bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 cflags, + bool allow_overflow) +{ + struct io_uring_cqe *cqe; + unsigned int length; + + if (!defer) + return __io_post_aux_cqe(ctx, user_data, res, cflags, allow_overflow); + + length = ARRAY_SIZE(ctx->submit_state.cqes); + + lockdep_assert_held(&ctx->uring_lock); + + if (ctx->submit_state.cqes_count == length) { + __io_cq_lock(ctx); + __io_flush_post_cqes(ctx); + /* no need to flush - flush is deferred */ + __io_cq_unlock_post(ctx); + } + + /* For defered completions this is not as strict as it is otherwise, + * however it's main job is to prevent unbounded posted completions, + * and in that it works just as well. + */ + if (!allow_overflow && test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) + return false; + + cqe = &ctx->submit_state.cqes[ctx->submit_state.cqes_count++]; + cqe->user_data = user_data; + cqe->res = res; + cqe->flags = cflags; + return true; +} + +static void __io_req_complete_post(struct io_kiocb *req) { + struct io_ring_ctx *ctx = req->ctx; + + io_cq_lock(ctx); + if (!(req->flags & REQ_F_CQE_SKIP)) + __io_fill_cqe_req(ctx, req); + /* * If we're the last reference to this request, add to our locked * free_list cache. */ if (req_ref_put_and_test(req)) { - struct io_ring_ctx *ctx = req->ctx; - if (req->flags & IO_REQ_LINK_FLAGS) { if (req->flags & IO_DISARM_MASK) io_disarm_next(req); @@ -847,38 +943,38 @@ static void __io_req_complete_put(struct io_kiocb *req) wq_list_add_head(&req->comp_list, &ctx->locked_free_list); ctx->locked_free_nr++; } -} - -void __io_req_complete_post(struct io_kiocb *req) -{ - if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe_req(req->ctx, req); - __io_req_complete_put(req); -} - -void io_req_complete_post(struct io_kiocb *req) -{ - struct io_ring_ctx *ctx = req->ctx; - - io_cq_lock(ctx); - __io_req_complete_post(req); io_cq_unlock_post(ctx); } -inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags) +void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { - io_req_complete_post(req); + if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) { + req->io_task_work.func = io_req_task_complete; + io_req_task_work_add(req); + } else if (!(issue_flags & IO_URING_F_UNLOCKED) || + !(req->ctx->flags & IORING_SETUP_IOPOLL)) { + __io_req_complete_post(req); + } else { + struct io_ring_ctx *ctx = req->ctx; + + mutex_lock(&ctx->uring_lock); + __io_req_complete_post(req); + mutex_unlock(&ctx->uring_lock); + } } -void io_req_complete_failed(struct io_kiocb *req, s32 res) +void io_req_defer_failed(struct io_kiocb *req, s32 res) + __must_hold(&ctx->uring_lock) { const struct io_op_def *def = &io_op_defs[req->opcode]; + lockdep_assert_held(&req->ctx->uring_lock); + req_set_fail(req); io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED)); if (def->fail) def->fail(req); - io_req_complete_post(req); + io_req_complete_defer(req); } /* @@ -1079,10 +1175,17 @@ void tctx_task_work(struct callback_head *cb) struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work); struct llist_node fake = {}; - struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake); + struct llist_node *node; unsigned int loops = 1; - unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL); + unsigned int count; + + if (unlikely(current->flags & PF_EXITING)) { + io_fallback_tw(tctx); + return; + } + node = io_llist_xchg(&tctx->task_list, &fake); + count = handle_tw_list(node, &ctx, &uring_locked, NULL); node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL); while (node != &fake) { loops++; @@ -1100,6 +1203,20 @@ void tctx_task_work(struct callback_head *cb) trace_io_uring_task_work_run(tctx, count, loops); } +static __cold void io_fallback_tw(struct io_uring_task *tctx) +{ + struct llist_node *node = llist_del_all(&tctx->task_list); + struct io_kiocb *req; + + while (node) { + req = container_of(node, struct io_kiocb, io_task_work.node); + node = node->next; + if (llist_add(&req->io_task_work.node, + &req->ctx->fallback_llist)) + schedule_delayed_work(&req->ctx->fallback_work, 1); + } +} + static void io_req_local_work_add(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; @@ -1122,11 +1239,10 @@ static void io_req_local_work_add(struct io_kiocb *req) __io_cqring_wake(ctx); } -static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) +void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) { struct io_uring_task *tctx = req->task->io_uring; struct io_ring_ctx *ctx = req->ctx; - struct llist_node *node; if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) { io_req_local_work_add(req); @@ -1143,20 +1259,7 @@ static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method))) return; - node = llist_del_all(&tctx->task_list); - - while (node) { - req = container_of(node, struct io_kiocb, io_task_work.node); - node = node->next; - if (llist_add(&req->io_task_work.node, - &req->ctx->fallback_llist)) - schedule_delayed_work(&req->ctx->fallback_work, 1); - } -} - -void io_req_task_work_add(struct io_kiocb *req) -{ - __io_req_task_work_add(req, true); + io_fallback_tw(tctx); } static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) @@ -1173,7 +1276,7 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) } } -int __io_run_local_work(struct io_ring_ctx *ctx, bool locked) +int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked) { struct llist_node *node; struct llist_node fake; @@ -1192,7 +1295,7 @@ again: struct io_kiocb *req = container_of(node, struct io_kiocb, io_task_work.node); prefetch(container_of(next, struct io_kiocb, io_task_work.node)); - req->io_task_work.func(req, &locked); + req->io_task_work.func(req, locked); ret++; node = next; } @@ -1208,7 +1311,7 @@ again: goto again; } - if (locked) + if (*locked) io_submit_flush_completions(ctx); trace_io_uring_local_work_run(ctx, ret, loops); return ret; @@ -1225,30 +1328,17 @@ int io_run_local_work(struct io_ring_ctx *ctx) __set_current_state(TASK_RUNNING); locked = mutex_trylock(&ctx->uring_lock); - ret = __io_run_local_work(ctx, locked); + ret = __io_run_local_work(ctx, &locked); if (locked) mutex_unlock(&ctx->uring_lock); return ret; } -static void io_req_tw_post(struct io_kiocb *req, bool *locked) -{ - io_req_complete_post(req); -} - -void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags) -{ - io_req_set_res(req, res, cflags); - req->io_task_work.func = io_req_tw_post; - io_req_task_work_add(req); -} - static void io_req_task_cancel(struct io_kiocb *req, bool *locked) { - /* not needed for normal modes, but SQPOLL depends on it */ io_tw_lock(req->ctx, locked); - io_req_complete_failed(req, req->cqe.res); + io_req_defer_failed(req, req->cqe.res); } void io_req_task_submit(struct io_kiocb *req, bool *locked) @@ -1258,7 +1348,7 @@ void io_req_task_submit(struct io_kiocb *req, bool *locked) if (likely(!(req->task->flags & PF_EXITING))) io_queue_sqe(req); else - io_req_complete_failed(req, -EFAULT); + io_req_defer_failed(req, -EFAULT); } void io_req_task_queue_fail(struct io_kiocb *req, int ret) @@ -1338,18 +1428,31 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_wq_work_node *node, *prev; struct io_submit_state *state = &ctx->submit_state; - io_cq_lock(ctx); + __io_cq_lock(ctx); + /* must come first to preserve CQE ordering in failure cases */ + if (state->cqes_count) + __io_flush_post_cqes(ctx); wq_list_for_each(node, prev, &state->compl_reqs) { struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); - if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe_req(ctx, req); + if (!(req->flags & REQ_F_CQE_SKIP) && + unlikely(!__io_fill_cqe_req(ctx, req))) { + if (ctx->task_complete) { + spin_lock(&ctx->completion_lock); + io_req_cqe_overflow(req); + spin_unlock(&ctx->completion_lock); + } else { + io_req_cqe_overflow(req); + } + } } __io_cq_unlock_post(ctx); - io_free_batch_list(ctx, state->compl_reqs.first); - INIT_WQ_LIST(&state->compl_reqs); + if (!wq_list_empty(&ctx->submit_state.compl_reqs)) { + io_free_batch_list(ctx, state->compl_reqs.first); + INIT_WQ_LIST(&state->compl_reqs); + } } /* @@ -1415,7 +1518,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) check_cq = READ_ONCE(ctx->check_cq); if (unlikely(check_cq)) { if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(ctx); /* * Similarly do not spin if we have not informed the user of any * dropped CQE. @@ -1446,8 +1549,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) io_task_work_pending(ctx)) { u32 tail = ctx->cached_cq_tail; - if (!llist_empty(&ctx->work_llist)) - __io_run_local_work(ctx, true); + (void) io_run_local_work_locked(ctx); if (task_work_pending(current) || wq_list_empty(&ctx->iopoll_list)) { @@ -1472,16 +1574,10 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) void io_req_task_complete(struct io_kiocb *req, bool *locked) { - if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) { - unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED; - - req->cqe.flags |= io_put_kbuf(req, issue_flags); - } - if (*locked) io_req_complete_defer(req); else - io_req_complete_post(req); + io_req_complete_post(req, IO_URING_F_UNLOCKED); } /* @@ -1631,6 +1727,7 @@ static u32 io_get_sequence(struct io_kiocb *req) } static __cold void io_drain_req(struct io_kiocb *req) + __must_hold(&ctx->uring_lock) { struct io_ring_ctx *ctx = req->ctx; struct io_defer_entry *de; @@ -1651,7 +1748,7 @@ queue: ret = io_req_prep_async(req); if (ret) { fail: - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); return; } io_prep_async_link(req); @@ -1748,12 +1845,12 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) if (issue_flags & IO_URING_F_COMPLETE_DEFER) io_req_complete_defer(req); else - io_req_complete_post(req); + io_req_complete_post(req, issue_flags); } else if (ret != IOU_ISSUE_SKIP_COMPLETE) return ret; /* If the op doesn't have a file, we're not polling for it */ - if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file) + if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue) io_iopoll_req_issued(req, issue_flags); return 0; @@ -1762,9 +1859,8 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) int io_poll_issue(struct io_kiocb *req, bool *locked) { io_tw_lock(req->ctx, locked); - if (unlikely(req->task->flags & PF_EXITING)) - return -EFAULT; - return io_issue_sqe(req, IO_URING_F_NONBLOCK); + return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT| + IO_URING_F_COMPLETE_DEFER); } struct io_wq_work *io_wq_free_work(struct io_wq_work *work) @@ -1779,11 +1875,11 @@ void io_wq_submit_work(struct io_wq_work *work) { struct io_kiocb *req = container_of(work, struct io_kiocb, work); const struct io_op_def *def = &io_op_defs[req->opcode]; - unsigned int issue_flags = IO_URING_F_UNLOCKED; + unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ; bool needs_poll = false; int ret = 0, err = -ECANCELED; - /* one will be dropped by ->io_free_work() after returning to io-wq */ + /* one will be dropped by ->io_wq_free_work() after returning to io-wq */ if (!(req->flags & REQ_F_REFCOUNT)) __io_req_set_refcount(req, 2); else @@ -1881,7 +1977,7 @@ static void io_queue_async(struct io_kiocb *req, int ret) struct io_kiocb *linked_timeout; if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) { - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); return; } @@ -1931,14 +2027,14 @@ static void io_queue_sqe_fallback(struct io_kiocb *req) */ req->flags &= ~REQ_F_HARDLINK; req->flags |= REQ_F_LINK; - io_req_complete_failed(req, req->cqe.res); + io_req_defer_failed(req, req->cqe.res); } else if (unlikely(req->ctx->drain_active)) { io_drain_req(req); } else { int ret = io_req_prep_async(req); if (unlikely(ret)) - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); else io_queue_iowq(req, NULL); } @@ -2316,7 +2412,7 @@ static inline bool io_has_work(struct io_ring_ctx *ctx) static inline bool io_should_wake(struct io_wait_queue *iowq) { struct io_ring_ctx *ctx = iowq->ctx; - int dist = ctx->cached_cq_tail - (int) iowq->cq_tail; + int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail; /* * Wake up if we have enough events, or if a timeout occurred since we @@ -2400,7 +2496,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, return ret; io_cqring_overflow_flush(ctx); - if (io_cqring_events(ctx) >= min_events) + /* if user messes with these they will just get an early return */ + if (__io_cqring_events_user(ctx) >= min_events) return 0; } while (ret > 0); @@ -2434,11 +2531,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, trace_io_uring_cqring_wait(ctx, min_events); do { - /* if we can't even flush overflow, don't wait for more */ - if (!io_cqring_overflow_flush(ctx)) { - ret = -EBUSY; - break; - } + io_cqring_overflow_flush(ctx); prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, TASK_INTERRUPTIBLE); ret = io_cqring_wait_schedule(ctx, &iowq, timeout); @@ -2589,8 +2682,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) __io_sqe_buffers_unregister(ctx); if (ctx->file_data) __io_sqe_files_unregister(ctx); - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); + io_cqring_overflow_kill(ctx); io_eventfd_unregister(ctx); io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free); io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); @@ -2665,7 +2757,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait) * lock(&ep->mtx); * * Users may get EPOLLIN meanwhile seeing nothing in cqring, this - * pushs them to do the flush. + * pushes them to do the flush. */ if (io_cqring_events(ctx) || io_has_work(ctx)) @@ -2702,8 +2794,10 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb) /* * When @in_idle, we're in cancellation and it's racy to remove the * node. It'll be removed by the end of cancellation, just ignore it. + * tctx can be NULL if the queueing of this task_work raced with + * work cancelation off the exec path. */ - if (!atomic_read(&tctx->in_idle)) + if (tctx && !atomic_read(&tctx->in_idle)) io_uring_del_tctx_node((unsigned long)work->ctx); complete(&work->completion); } @@ -2731,6 +2825,12 @@ static __cold void io_ring_exit_work(struct work_struct *work) * as nobody else will be looking for them. */ do { + if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { + mutex_lock(&ctx->uring_lock); + io_cqring_overflow_kill(ctx); + mutex_unlock(&ctx->uring_lock); + } + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) io_move_task_work_from_local(ctx); @@ -2796,8 +2896,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) mutex_lock(&ctx->uring_lock); percpu_ref_kill(&ctx->refs); - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); xa_for_each(&ctx->personalities, index, creds) io_unregister_personality(ctx, index); if (ctx->rings) @@ -2864,7 +2962,7 @@ static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx, while (!list_empty(&list)) { de = list_first_entry(&list, struct io_defer_entry, list); list_del_init(&de->list); - io_req_complete_failed(de->req, -ECANCELED); + io_req_task_queue_fail(de->req, -ECANCELED); kfree(de); } return true; @@ -3439,6 +3537,11 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (!ctx) return -ENOMEM; + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && + !(ctx->flags & IORING_SETUP_IOPOLL) && + !(ctx->flags & IORING_SETUP_SQPOLL)) + ctx->task_complete = true; + /* * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user * space applications don't need to do io completion events @@ -4057,8 +4160,6 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, ctx = f.file->private_data; - io_run_task_work_ctx(ctx); - mutex_lock(&ctx->uring_lock); ret = __io_uring_register(ctx, opcode, arg, nr_args); mutex_unlock(&ctx->uring_lock); diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index ef77d2aa3172..1b2f0b2cc888 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -4,6 +4,7 @@ #include <linux/errno.h> #include <linux/lockdep.h> #include <linux/io_uring_types.h> +#include <uapi/linux/eventpoll.h> #include "io-wq.h" #include "slist.h" #include "filetable.h" @@ -17,8 +18,8 @@ enum { IOU_ISSUE_SKIP_COMPLETE = -EIOCBQUEUED, /* - * Intended only when both REQ_F_POLLED and REQ_F_APOLL_MULTISHOT - * are set to indicate to the poll runner that multishot should be + * Intended only when both IO_URING_F_MULTISHOT is passed + * to indicate to the poll runner that multishot should be * removed and the result is set on req->cqe.res. */ IOU_STOP_MULTISHOT = -ECANCELED, @@ -27,16 +28,13 @@ enum { struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow); bool io_req_cqe_overflow(struct io_kiocb *req); int io_run_task_work_sig(struct io_ring_ctx *ctx); -int __io_run_local_work(struct io_ring_ctx *ctx, bool locked); +int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked); int io_run_local_work(struct io_ring_ctx *ctx); -void io_req_complete_failed(struct io_kiocb *req, s32 res); -void __io_req_complete(struct io_kiocb *req, unsigned issue_flags); -void io_req_complete_post(struct io_kiocb *req); -void __io_req_complete_post(struct io_kiocb *req); -bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, - bool allow_overflow); -bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, - bool allow_overflow); +void io_req_defer_failed(struct io_kiocb *req, s32 res); +void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags); +bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags); +bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 cflags, + bool allow_overflow); void __io_commit_cqring_flush(struct io_ring_ctx *ctx); struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages); @@ -50,10 +48,9 @@ static inline bool io_req_ffs_set(struct io_kiocb *req) return req->flags & REQ_F_FIXED_FILE; } +void __io_req_task_work_add(struct io_kiocb *req, bool allow_local); bool io_is_uring_fops(struct file *file); bool io_alloc_async_data(struct io_kiocb *req); -void io_req_task_work_add(struct io_kiocb *req); -void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags); void io_req_task_queue(struct io_kiocb *req); void io_queue_iowq(struct io_kiocb *req, bool *dont_use); void io_req_task_complete(struct io_kiocb *req, bool *locked); @@ -82,6 +79,11 @@ bool __io_alloc_req_refill(struct io_ring_ctx *ctx); bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task, bool cancel_all); +static inline void io_req_task_work_add(struct io_kiocb *req) +{ + __io_req_task_work_add(req, true); +} + #define io_for_each_link(pos, head) \ for (pos = (head); pos; pos = pos->link) @@ -91,6 +93,11 @@ static inline void io_cq_lock(struct io_ring_ctx *ctx) spin_lock(&ctx->completion_lock); } +static inline void io_cq_unlock(struct io_ring_ctx *ctx) +{ + spin_unlock(&ctx->completion_lock); +} + void io_cq_unlock_post(struct io_ring_ctx *ctx); static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx, @@ -126,7 +133,7 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx, */ cqe = io_get_cqe(ctx); if (unlikely(!cqe)) - return io_req_cqe_overflow(req); + return false; trace_io_uring_complete(req->ctx, req, req->cqe.user_data, req->cqe.res, req->cqe.flags, @@ -149,6 +156,14 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx, return true; } +static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx, + struct io_kiocb *req) +{ + if (likely(__io_fill_cqe_req(ctx, req))) + return true; + return io_req_cqe_overflow(req); +} + static inline void req_set_fail(struct io_kiocb *req) { req->flags |= REQ_F_FAIL; @@ -207,12 +222,18 @@ static inline void io_commit_cqring(struct io_ring_ctx *ctx) static inline void __io_cqring_wake(struct io_ring_ctx *ctx) { /* - * wake_up_all() may seem excessive, but io_wake_function() and - * io_should_wake() handle the termination of the loop and only - * wake as many waiters as we need to. + * Trigger waitqueue handler on all waiters on our waitqueue. This + * won't necessarily wake up all the tasks, io_should_wake() will make + * that decision. + * + * Pass in EPOLLIN|EPOLL_URING_WAKE as the poll wakeup key. The latter + * set in the mask so that if we recurse back into our own poll + * waitqueue handlers, we know we have a dependency between eventfd or + * epoll and should terminate multishot poll at that point. */ if (waitqueue_active(&ctx->cq_wait)) - wake_up_all(&ctx->cq_wait); + __wake_up(&ctx->cq_wait, TASK_NORMAL, 0, + poll_to_key(EPOLL_URING_WAKE | EPOLLIN)); } static inline void io_cqring_wake(struct io_ring_ctx *ctx) @@ -238,9 +259,14 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) static inline int io_run_task_work(void) { + /* + * Always check-and-clear the task_work notification signal. With how + * signaling works for task_work, we can find it set with nothing to + * run. We need to clear it for that case, like get_signal() does. + */ + if (test_thread_flag(TIF_NOTIFY_SIGNAL)) + clear_notify_signal(); if (task_work_pending(current)) { - if (test_thread_flag(TIF_NOTIFY_SIGNAL)) - clear_notify_signal(); __set_current_state(TASK_RUNNING); task_work_run(); return 1; @@ -277,9 +303,18 @@ static inline int io_run_task_work_ctx(struct io_ring_ctx *ctx) static inline int io_run_local_work_locked(struct io_ring_ctx *ctx) { + bool locked; + int ret; + if (llist_empty(&ctx->work_llist)) return 0; - return __io_run_local_work(ctx, true); + + locked = true; + ret = __io_run_local_work(ctx, &locked); + /* shouldn't happen! */ + if (WARN_ON_ONCE(!locked)) + mutex_lock(&ctx->uring_lock); + return ret; } static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked) @@ -355,4 +390,11 @@ static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx) ctx->submitter_task == current); } +static inline void io_req_queue_tw_complete(struct io_kiocb *req, s32 res) +{ + io_req_set_res(req, res, 0); + req->io_task_work.func = io_req_task_complete; + io_req_task_work_add(req); +} + #endif diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index 25cd724ade18..4a6401080c1f 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -306,14 +306,11 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) if (!bl->buf_nr_pages) ret = __io_remove_buffers(ctx, bl, p->nbufs); } + io_ring_submit_unlock(ctx, issue_flags); if (ret < 0) req_set_fail(req); - - /* complete before unlock, IOPOLL may need the lock */ io_req_set_res(req, ret, 0); - __io_req_complete(req, issue_flags); - io_ring_submit_unlock(ctx, issue_flags); - return IOU_ISSUE_SKIP_COMPLETE; + return IOU_OK; } int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) @@ -346,6 +343,8 @@ int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe tmp = READ_ONCE(sqe->off); if (tmp > USHRT_MAX) return -E2BIG; + if (tmp + p->nbufs >= USHRT_MAX) + return -EINVAL; p->bid = tmp; return 0; } @@ -456,13 +455,12 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) ret = io_add_buffers(ctx, p, bl); err: + io_ring_submit_unlock(ctx, issue_flags); + if (ret < 0) req_set_fail(req); - /* complete before unlock, IOPOLL may need the lock */ io_req_set_res(req, ret, 0); - __io_req_complete(req, issue_flags); - io_ring_submit_unlock(ctx, issue_flags); - return IOU_ISSUE_SKIP_COMPLETE; + return IOU_OK; } int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c index 90d2fc6fd80e..2d3cd945a531 100644 --- a/io_uring/msg_ring.c +++ b/io_uring/msg_ring.c @@ -15,6 +15,8 @@ struct io_msg { struct file *file; + struct file *src_file; + struct callback_head tw; u64 user_data; u32 len; u32 cmd; @@ -23,6 +25,34 @@ struct io_msg { u32 flags; }; +void io_msg_ring_cleanup(struct io_kiocb *req) +{ + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); + + if (WARN_ON_ONCE(!msg->src_file)) + return; + + fput(msg->src_file); + msg->src_file = NULL; +} + +static void io_msg_tw_complete(struct callback_head *head) +{ + struct io_msg *msg = container_of(head, struct io_msg, tw); + struct io_kiocb *req = cmd_to_io_kiocb(msg); + struct io_ring_ctx *target_ctx = req->file->private_data; + int ret = 0; + + if (current->flags & PF_EXITING) + ret = -EOWNERDEAD; + else if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) + ret = -EOVERFLOW; + + if (ret < 0) + req_set_fail(req); + io_req_queue_tw_complete(req, ret); +} + static int io_msg_ring_data(struct io_kiocb *req) { struct io_ring_ctx *target_ctx = req->file->private_data; @@ -31,23 +61,29 @@ static int io_msg_ring_data(struct io_kiocb *req) if (msg->src_fd || msg->dst_fd || msg->flags) return -EINVAL; - if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0, true)) + if (target_ctx->task_complete && current != target_ctx->submitter_task) { + init_task_work(&msg->tw, io_msg_tw_complete); + if (task_work_add(target_ctx->submitter_task, &msg->tw, + TWA_SIGNAL_NO_IPI)) + return -EOWNERDEAD; + + atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags); + return IOU_ISSUE_SKIP_COMPLETE; + } + + if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) return 0; return -EOVERFLOW; } -static void io_double_unlock_ctx(struct io_ring_ctx *ctx, - struct io_ring_ctx *octx, +static void io_double_unlock_ctx(struct io_ring_ctx *octx, unsigned int issue_flags) { - if (issue_flags & IO_URING_F_UNLOCKED) - mutex_unlock(&ctx->uring_lock); mutex_unlock(&octx->uring_lock); } -static int io_double_lock_ctx(struct io_ring_ctx *ctx, - struct io_ring_ctx *octx, +static int io_double_lock_ctx(struct io_ring_ctx *octx, unsigned int issue_flags) { /* @@ -60,69 +96,103 @@ static int io_double_lock_ctx(struct io_ring_ctx *ctx, return -EAGAIN; return 0; } - - /* Always grab smallest value ctx first. We know ctx != octx. */ - if (ctx < octx) { - mutex_lock(&ctx->uring_lock); - mutex_lock(&octx->uring_lock); - } else { - mutex_lock(&octx->uring_lock); - mutex_lock(&ctx->uring_lock); - } - + mutex_lock(&octx->uring_lock); return 0; } -static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) +static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags) { - struct io_ring_ctx *target_ctx = req->file->private_data; struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); struct io_ring_ctx *ctx = req->ctx; + struct file *file = NULL; unsigned long file_ptr; - struct file *src_file; - int ret; - - if (target_ctx == ctx) - return -EINVAL; - - ret = io_double_lock_ctx(ctx, target_ctx, issue_flags); - if (unlikely(ret)) - return ret; - - ret = -EBADF; - if (unlikely(msg->src_fd >= ctx->nr_user_files)) - goto out_unlock; + int idx = msg->src_fd; + + io_ring_submit_lock(ctx, issue_flags); + if (likely(idx < ctx->nr_user_files)) { + idx = array_index_nospec(idx, ctx->nr_user_files); + file_ptr = io_fixed_file_slot(&ctx->file_table, idx)->file_ptr; + file = (struct file *) (file_ptr & FFS_MASK); + if (file) + get_file(file); + } + io_ring_submit_unlock(ctx, issue_flags); + return file; +} - msg->src_fd = array_index_nospec(msg->src_fd, ctx->nr_user_files); - file_ptr = io_fixed_file_slot(&ctx->file_table, msg->src_fd)->file_ptr; - if (!file_ptr) - goto out_unlock; +static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_ring_ctx *target_ctx = req->file->private_data; + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); + struct file *src_file = msg->src_file; + int ret; - src_file = (struct file *) (file_ptr & FFS_MASK); - get_file(src_file); + if (unlikely(io_double_lock_ctx(target_ctx, issue_flags))) + return -EAGAIN; ret = __io_fixed_fd_install(target_ctx, src_file, msg->dst_fd); - if (ret < 0) { - fput(src_file); + if (ret < 0) goto out_unlock; - } + + msg->src_file = NULL; + req->flags &= ~REQ_F_NEED_CLEANUP; if (msg->flags & IORING_MSG_RING_CQE_SKIP) goto out_unlock; - /* * If this fails, the target still received the file descriptor but * wasn't notified of the fact. This means that if this request * completes with -EOVERFLOW, then the sender must ensure that a * later IORING_OP_MSG_RING delivers the message. */ - if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0, true)) + if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) ret = -EOVERFLOW; out_unlock: - io_double_unlock_ctx(ctx, target_ctx, issue_flags); + io_double_unlock_ctx(target_ctx, issue_flags); return ret; } +static void io_msg_tw_fd_complete(struct callback_head *head) +{ + struct io_msg *msg = container_of(head, struct io_msg, tw); + struct io_kiocb *req = cmd_to_io_kiocb(msg); + int ret = -EOWNERDEAD; + + if (!(current->flags & PF_EXITING)) + ret = io_msg_install_complete(req, IO_URING_F_UNLOCKED); + if (ret < 0) + req_set_fail(req); + io_req_queue_tw_complete(req, ret); +} + +static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_ring_ctx *target_ctx = req->file->private_data; + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); + struct io_ring_ctx *ctx = req->ctx; + struct file *src_file = msg->src_file; + + if (target_ctx == ctx) + return -EINVAL; + if (!src_file) { + src_file = io_msg_grab_file(req, issue_flags); + if (!src_file) + return -EBADF; + msg->src_file = src_file; + req->flags |= REQ_F_NEED_CLEANUP; + } + + if (target_ctx->task_complete && current != target_ctx->submitter_task) { + init_task_work(&msg->tw, io_msg_tw_fd_complete); + if (task_work_add(target_ctx->submitter_task, &msg->tw, + TWA_SIGNAL)) + return -EOWNERDEAD; + + return IOU_ISSUE_SKIP_COMPLETE; + } + return io_msg_install_complete(req, issue_flags); +} + int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); @@ -130,6 +200,7 @@ int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) if (unlikely(sqe->buf_index || sqe->personality)) return -EINVAL; + msg->src_file = NULL; msg->user_data = READ_ONCE(sqe->off); msg->len = READ_ONCE(sqe->len); msg->cmd = READ_ONCE(sqe->addr); @@ -164,12 +235,11 @@ int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags) } done: - if (ret < 0) + if (ret < 0) { + if (ret == -EAGAIN || ret == IOU_ISSUE_SKIP_COMPLETE) + return ret; req_set_fail(req); + } io_req_set_res(req, ret, 0); - /* put file to avoid an attempt to IOPOLL the req */ - if (!(req->flags & REQ_F_FIXED_FILE)) - io_put_file(req->file); - req->file = NULL; return IOU_OK; } diff --git a/io_uring/msg_ring.h b/io_uring/msg_ring.h index fb9601f202d0..3987ee6c0e5f 100644 --- a/io_uring/msg_ring.h +++ b/io_uring/msg_ring.h @@ -2,3 +2,4 @@ int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags); +void io_msg_ring_cleanup(struct io_kiocb *req); diff --git a/io_uring/net.c b/io_uring/net.c index 15dea91625e2..5229976cb582 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -67,7 +67,18 @@ struct io_sr_msg { struct io_kiocb *notif; }; -#define IO_APOLL_MULTI_POLLED (REQ_F_APOLL_MULTISHOT | REQ_F_POLLED) +static inline bool io_check_multishot(struct io_kiocb *req, + unsigned int issue_flags) +{ + /* + * When ->locked_cq is set we only allow to post CQEs from the original + * task context. Usual request completions will be handled in other + * generic paths but multipoll may decide to post extra cqes. + */ + return !(issue_flags & IO_URING_F_IOWQ) || + !(issue_flags & IO_URING_F_MULTISHOT) || + !req->ctx->task_complete; +} int io_shutdown_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { @@ -127,13 +138,15 @@ static struct io_async_msghdr *io_msg_alloc_async(struct io_kiocb *req, struct io_cache_entry *entry; struct io_async_msghdr *hdr; - if (!(issue_flags & IO_URING_F_UNLOCKED) && - (entry = io_alloc_cache_get(&ctx->netmsg_cache)) != NULL) { - hdr = container_of(entry, struct io_async_msghdr, cache); - hdr->free_iov = NULL; - req->flags |= REQ_F_ASYNC_DATA; - req->async_data = hdr; - return hdr; + if (!(issue_flags & IO_URING_F_UNLOCKED)) { + entry = io_alloc_cache_get(&ctx->netmsg_cache); + if (entry) { + hdr = container_of(entry, struct io_async_msghdr, cache); + hdr->free_iov = NULL; + req->flags |= REQ_F_ASYNC_DATA; + req->async_data = hdr; + return hdr; + } } if (!io_alloc_async_data(req)) { @@ -365,7 +378,7 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags) if (unlikely(!sock)) return -ENOTSOCK; - ret = import_single_range(WRITE, sr->buf, sr->len, &iov, &msg.msg_iter); + ret = import_single_range(ITER_SOURCE, sr->buf, sr->len, &iov, &msg.msg_iter); if (unlikely(ret)) return ret; @@ -451,7 +464,7 @@ static int __io_recvmsg_copy_hdr(struct io_kiocb *req, } } else { iomsg->free_iov = iomsg->fast_iov; - ret = __import_iovec(READ, msg.msg_iov, msg.msg_iovlen, UIO_FASTIOV, + ret = __import_iovec(ITER_DEST, msg.msg_iov, msg.msg_iovlen, UIO_FASTIOV, &iomsg->free_iov, &iomsg->msg.msg_iter, false); if (ret > 0) @@ -503,7 +516,7 @@ static int __io_compat_recvmsg_copy_hdr(struct io_kiocb *req, } } else { iomsg->free_iov = iomsg->fast_iov; - ret = __import_iovec(READ, (struct iovec __user *)uiov, msg.msg_iovlen, + ret = __import_iovec(ITER_DEST, (struct iovec __user *)uiov, msg.msg_iovlen, UIO_FASTIOV, &iomsg->free_iov, &iomsg->msg.msg_iter, true); if (ret < 0) @@ -591,7 +604,8 @@ static inline void io_recv_prep_retry(struct io_kiocb *req) * again (for multishot). */ static inline bool io_recv_finish(struct io_kiocb *req, int *ret, - unsigned int cflags, bool mshot_finished) + unsigned int cflags, bool mshot_finished, + unsigned issue_flags) { if (!(req->flags & REQ_F_APOLL_MULTISHOT)) { io_req_set_res(req, *ret, cflags); @@ -600,21 +614,17 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret, } if (!mshot_finished) { - if (io_post_aux_cqe(req->ctx, req->cqe.user_data, *ret, - cflags | IORING_CQE_F_MORE, false)) { + if (io_aux_cqe(req->ctx, issue_flags & IO_URING_F_COMPLETE_DEFER, + req->cqe.user_data, *ret, cflags | IORING_CQE_F_MORE, true)) { io_recv_prep_retry(req); return false; } - /* - * Otherwise stop multishot but use the current result. - * Probably will end up going into overflow, but this means - * we cannot trust the ordering anymore - */ + /* Otherwise stop multishot but use the current result. */ } io_req_set_res(req, *ret, cflags); - if (req->flags & REQ_F_POLLED) + if (issue_flags & IO_URING_F_MULTISHOT) *ret = IOU_STOP_MULTISHOT; else *ret = IOU_OK; @@ -733,6 +743,9 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) (sr->flags & IORING_RECVSEND_POLL_FIRST)) return io_setup_async_msg(req, kmsg, issue_flags); + if (!io_check_multishot(req, issue_flags)) + return io_setup_async_msg(req, kmsg, issue_flags); + retry_multishot: if (io_do_buffer_select(req)) { void __user *buf; @@ -752,7 +765,7 @@ retry_multishot: kmsg->fast_iov[0].iov_base = buf; kmsg->fast_iov[0].iov_len = len; - iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg->fast_iov, 1, + iov_iter_init(&kmsg->msg.msg_iter, ITER_DEST, kmsg->fast_iov, 1, len); } @@ -773,8 +786,7 @@ retry_multishot: if (ret < min_ret) { if (ret == -EAGAIN && force_nonblock) { ret = io_setup_async_msg(req, kmsg, issue_flags); - if (ret == -EAGAIN && (req->flags & IO_APOLL_MULTI_POLLED) == - IO_APOLL_MULTI_POLLED) { + if (ret == -EAGAIN && (issue_flags & IO_URING_F_MULTISHOT)) { io_kbuf_recycle(req, issue_flags); return IOU_ISSUE_SKIP_COMPLETE; } @@ -803,7 +815,7 @@ retry_multishot: if (kmsg->msg.msg_inq) cflags |= IORING_CQE_F_SOCK_NONEMPTY; - if (!io_recv_finish(req, &ret, cflags, mshot_finished)) + if (!io_recv_finish(req, &ret, cflags, mshot_finished, issue_flags)) goto retry_multishot; if (mshot_finished) { @@ -833,6 +845,9 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags) (sr->flags & IORING_RECVSEND_POLL_FIRST)) return -EAGAIN; + if (!io_check_multishot(req, issue_flags)) + return -EAGAIN; + sock = sock_from_file(req->file); if (unlikely(!sock)) return -ENOTSOCK; @@ -847,7 +862,7 @@ retry_multishot: sr->buf = buf; } - ret = import_single_range(READ, sr->buf, len, &iov, &msg.msg_iter); + ret = import_single_range(ITER_DEST, sr->buf, len, &iov, &msg.msg_iter); if (unlikely(ret)) goto out_free; @@ -869,7 +884,7 @@ retry_multishot: ret = sock_recvmsg(sock, &msg, flags); if (ret < min_ret) { if (ret == -EAGAIN && force_nonblock) { - if ((req->flags & IO_APOLL_MULTI_POLLED) == IO_APOLL_MULTI_POLLED) { + if (issue_flags & IO_URING_F_MULTISHOT) { io_kbuf_recycle(req, issue_flags); return IOU_ISSUE_SKIP_COMPLETE; } @@ -902,7 +917,7 @@ out_free: if (msg.msg_inq) cflags |= IORING_CQE_F_SOCK_NONEMPTY; - if (!io_recv_finish(req, &ret, cflags, ret <= 0)) + if (!io_recv_finish(req, &ret, cflags, ret <= 0, issue_flags)) goto retry_multishot; return ret; @@ -925,6 +940,9 @@ void io_send_zc_cleanup(struct io_kiocb *req) } } +#define IO_ZC_FLAGS_COMMON (IORING_RECVSEND_POLL_FIRST | IORING_RECVSEND_FIXED_BUF) +#define IO_ZC_FLAGS_VALID (IO_ZC_FLAGS_COMMON | IORING_SEND_ZC_REPORT_USAGE) + int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg); @@ -937,10 +955,6 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) if (req->flags & REQ_F_CQE_SKIP) return -EINVAL; - zc->flags = READ_ONCE(sqe->ioprio); - if (zc->flags & ~(IORING_RECVSEND_POLL_FIRST | - IORING_RECVSEND_FIXED_BUF)) - return -EINVAL; notif = zc->notif = io_alloc_notif(ctx); if (!notif) return -ENOMEM; @@ -948,6 +962,17 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) notif->cqe.res = 0; notif->cqe.flags = IORING_CQE_F_NOTIF; req->flags |= REQ_F_NEED_CLEANUP; + + zc->flags = READ_ONCE(sqe->ioprio); + if (unlikely(zc->flags & ~IO_ZC_FLAGS_COMMON)) { + if (zc->flags & ~IO_ZC_FLAGS_VALID) + return -EINVAL; + if (zc->flags & IORING_SEND_ZC_REPORT_USAGE) { + io_notif_set_extended(notif); + io_notif_to_data(notif)->zc_report = true; + } + } + if (zc->flags & IORING_RECVSEND_FIXED_BUF) { unsigned idx = READ_ONCE(sqe->buf_index); @@ -1083,13 +1108,14 @@ int io_send_zc(struct io_kiocb *req, unsigned int issue_flags) return io_setup_async_addr(req, &__address, issue_flags); if (zc->flags & IORING_RECVSEND_FIXED_BUF) { - ret = io_import_fixed(WRITE, &msg.msg_iter, req->imu, + ret = io_import_fixed(ITER_SOURCE, &msg.msg_iter, req->imu, (u64)(uintptr_t)zc->buf, zc->len); if (unlikely(ret)) return ret; msg.sg_from_iter = io_sg_from_iter; } else { - ret = import_single_range(WRITE, zc->buf, zc->len, &iov, + io_notif_set_extended(zc->notif); + ret = import_single_range(ITER_SOURCE, zc->buf, zc->len, &iov, &msg.msg_iter); if (unlikely(ret)) return ret; @@ -1150,6 +1176,8 @@ int io_sendmsg_zc(struct io_kiocb *req, unsigned int issue_flags) unsigned flags; int ret, min_ret = 0; + io_notif_set_extended(sr->notif); + sock = sock_from_file(req->file); if (unlikely(!sock)) return -ENOTSOCK; @@ -1271,6 +1299,8 @@ int io_accept(struct io_kiocb *req, unsigned int issue_flags) struct file *file; int ret, fd; + if (!io_check_multishot(req, issue_flags)) + return -EAGAIN; retry: if (!fixed) { fd = __get_unused_fd_flags(accept->flags, accept->nofile); @@ -1289,8 +1319,7 @@ retry: * return EAGAIN to arm the poll infra since it * has already been done */ - if ((req->flags & IO_APOLL_MULTI_POLLED) == - IO_APOLL_MULTI_POLLED) + if (issue_flags & IO_URING_F_MULTISHOT) ret = IOU_ISSUE_SKIP_COMPLETE; return ret; } @@ -1310,14 +1339,13 @@ retry: return IOU_OK; } - if (ret >= 0 && - io_post_aux_cqe(ctx, req->cqe.user_data, ret, IORING_CQE_F_MORE, false)) + if (ret < 0) + return ret; + if (io_aux_cqe(ctx, issue_flags & IO_URING_F_COMPLETE_DEFER, + req->cqe.user_data, ret, IORING_CQE_F_MORE, true)) goto retry; - io_req_set_res(req, ret, 0); - if (req->flags & REQ_F_POLLED) - return IOU_STOP_MULTISHOT; - return IOU_OK; + return -ECANCELED; } int io_socket_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) diff --git a/io_uring/notif.c b/io_uring/notif.c index e37c6569d82e..c4bb793ebf0e 100644 --- a/io_uring/notif.c +++ b/io_uring/notif.c @@ -9,11 +9,14 @@ #include "notif.h" #include "rsrc.h" -static void __io_notif_complete_tw(struct io_kiocb *notif, bool *locked) +static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked) { struct io_notif_data *nd = io_notif_to_data(notif); struct io_ring_ctx *ctx = notif->ctx; + if (nd->zc_report && (nd->zc_copied || !nd->zc_used)) + notif->cqe.res |= IORING_NOTIF_USAGE_ZC_COPIED; + if (nd->account_pages && ctx->user) { __io_unaccount_mem(ctx->user, nd->account_pages); nd->account_pages = 0; @@ -21,16 +24,41 @@ static void __io_notif_complete_tw(struct io_kiocb *notif, bool *locked) io_req_task_complete(notif, locked); } -static void io_uring_tx_zerocopy_callback(struct sk_buff *skb, - struct ubuf_info *uarg, - bool success) +static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg, + bool success) { struct io_notif_data *nd = container_of(uarg, struct io_notif_data, uarg); struct io_kiocb *notif = cmd_to_io_kiocb(nd); - if (refcount_dec_and_test(&uarg->refcnt)) { - notif->io_task_work.func = __io_notif_complete_tw; + if (refcount_dec_and_test(&uarg->refcnt)) io_req_task_work_add(notif); +} + +static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg, + bool success) +{ + struct io_notif_data *nd = container_of(uarg, struct io_notif_data, uarg); + + if (nd->zc_report) { + if (success && !nd->zc_used && skb) + WRITE_ONCE(nd->zc_used, true); + else if (!success && !nd->zc_copied) + WRITE_ONCE(nd->zc_copied, true); + } + io_tx_ubuf_callback(skb, uarg, success); +} + +void io_notif_set_extended(struct io_kiocb *notif) +{ + struct io_notif_data *nd = io_notif_to_data(notif); + + if (nd->uarg.callback != io_tx_ubuf_callback_ext) { + nd->account_pages = 0; + nd->zc_report = false; + nd->zc_used = false; + nd->zc_copied = false; + nd->uarg.callback = io_tx_ubuf_callback_ext; + notif->io_task_work.func = io_notif_complete_tw_ext; } } @@ -49,24 +77,11 @@ struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx) notif->task = current; io_get_task_refs(1); notif->rsrc_node = NULL; - io_req_set_rsrc_node(notif, ctx, 0); + notif->io_task_work.func = io_req_task_complete; nd = io_notif_to_data(notif); - nd->account_pages = 0; nd->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN; - nd->uarg.callback = io_uring_tx_zerocopy_callback; + nd->uarg.callback = io_tx_ubuf_callback; refcount_set(&nd->uarg.refcnt, 1); return notif; } - -void io_notif_flush(struct io_kiocb *notif) - __must_hold(&slot->notif->ctx->uring_lock) -{ - struct io_notif_data *nd = io_notif_to_data(notif); - - /* drop slot's master ref */ - if (refcount_dec_and_test(&nd->uarg.refcnt)) { - notif->io_task_work.func = __io_notif_complete_tw; - io_req_task_work_add(notif); - } -} diff --git a/io_uring/notif.h b/io_uring/notif.h index 5b4d710c8ca5..c88c800cd89d 100644 --- a/io_uring/notif.h +++ b/io_uring/notif.h @@ -13,16 +13,29 @@ struct io_notif_data { struct file *file; struct ubuf_info uarg; unsigned long account_pages; + bool zc_report; + bool zc_used; + bool zc_copied; }; -void io_notif_flush(struct io_kiocb *notif); struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx); +void io_notif_set_extended(struct io_kiocb *notif); static inline struct io_notif_data *io_notif_to_data(struct io_kiocb *notif) { return io_kiocb_to_cmd(notif, struct io_notif_data); } +static inline void io_notif_flush(struct io_kiocb *notif) + __must_hold(¬if->ctx->uring_lock) +{ + struct io_notif_data *nd = io_notif_to_data(notif); + + /* drop slot's master ref */ + if (refcount_dec_and_test(&nd->uarg.refcnt)) + io_req_task_work_add(notif); +} + static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len) { struct io_ring_ctx *ctx = notif->ctx; diff --git a/io_uring/opdef.c b/io_uring/opdef.c index 83dc0f9ad3b2..3aa0d65c50e3 100644 --- a/io_uring/opdef.c +++ b/io_uring/opdef.c @@ -63,6 +63,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "READV", .prep = io_prep_rw, @@ -80,6 +81,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "WRITEV", .prep = io_prep_rw, @@ -103,6 +105,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "READ_FIXED", .prep = io_prep_rw, @@ -118,6 +121,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "WRITE_FIXED", .prep = io_prep_rw, @@ -277,6 +281,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "READ", .prep = io_prep_rw, @@ -292,6 +297,7 @@ const struct io_op_def io_op_defs[] = { .audit_skip = 1, .ioprio = 1, .iopoll = 1, + .iopoll_queue = 1, .async_size = sizeof(struct io_async_rw), .name = "WRITE", .prep = io_prep_rw, @@ -439,6 +445,7 @@ const struct io_op_def io_op_defs[] = { .name = "MSG_RING", .prep = io_msg_ring_prep, .issue = io_msg_ring, + .cleanup = io_msg_ring_cleanup, }, [IORING_OP_FSETXATTR] = { .needs_file = 1, @@ -481,6 +488,7 @@ const struct io_op_def io_op_defs[] = { .plug = 1, .name = "URING_CMD", .iopoll = 1, + .iopoll_queue = 1, .async_size = uring_cmd_pdu_size(1), .prep = io_uring_cmd_prep, .issue = io_uring_cmd, diff --git a/io_uring/opdef.h b/io_uring/opdef.h index 3efe06d25473..df7e13d9bfba 100644 --- a/io_uring/opdef.h +++ b/io_uring/opdef.h @@ -25,6 +25,8 @@ struct io_op_def { unsigned ioprio : 1; /* supports iopoll */ unsigned iopoll : 1; + /* have to be put into the iopoll list */ + unsigned iopoll_queue : 1; /* opcode specific path will handle ->async_data allocation if needed */ unsigned manual_alloc : 1; /* size of async data needed, if any */ diff --git a/io_uring/poll.c b/io_uring/poll.c index 0d9f49c575e0..ee7da6150ec4 100644 --- a/io_uring/poll.c +++ b/io_uring/poll.c @@ -40,7 +40,14 @@ struct io_poll_table { }; #define IO_POLL_CANCEL_FLAG BIT(31) -#define IO_POLL_REF_MASK GENMASK(30, 0) +#define IO_POLL_RETRY_FLAG BIT(30) +#define IO_POLL_REF_MASK GENMASK(29, 0) + +/* + * We usually have 1-2 refs taken, 128 is more than enough and we want to + * maximise the margin between this amount and the moment when it overflows. + */ +#define IO_POLL_REF_BIAS 128 #define IO_WQE_F_DOUBLE 1 @@ -58,6 +65,21 @@ static inline bool wqe_is_double(struct wait_queue_entry *wqe) return priv & IO_WQE_F_DOUBLE; } +static bool io_poll_get_ownership_slowpath(struct io_kiocb *req) +{ + int v; + + /* + * poll_refs are already elevated and we don't have much hope for + * grabbing the ownership. Instead of incrementing set a retry flag + * to notify the loop that there might have been some change. + */ + v = atomic_fetch_or(IO_POLL_RETRY_FLAG, &req->poll_refs); + if (v & IO_POLL_REF_MASK) + return false; + return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK); +} + /* * If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can * bump it and acquire ownership. It's disallowed to modify requests while not @@ -66,6 +88,8 @@ static inline bool wqe_is_double(struct wait_queue_entry *wqe) */ static inline bool io_poll_get_ownership(struct io_kiocb *req) { + if (unlikely(atomic_read(&req->poll_refs) >= IO_POLL_REF_BIAS)) + return io_poll_get_ownership_slowpath(req); return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK); } @@ -116,6 +140,8 @@ static void io_poll_req_insert_locked(struct io_kiocb *req) struct io_hash_table *table = &req->ctx->cancel_table_locked; u32 index = hash_long(req->cqe.user_data, table->hash_bits); + lockdep_assert_held(&req->ctx->uring_lock); + hlist_add_head(&req->hash_node, &table->hbs[index].list); } @@ -211,7 +237,6 @@ enum { */ static int io_poll_check_events(struct io_kiocb *req, bool *locked) { - struct io_ring_ctx *ctx = req->ctx; int v, ret; /* req->task == current here, checking PF_EXITING is safe */ @@ -221,11 +246,31 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked) do { v = atomic_read(&req->poll_refs); - /* tw handler should be the owner, and so have some references */ - if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK))) - return IOU_POLL_DONE; - if (v & IO_POLL_CANCEL_FLAG) - return -ECANCELED; + if (unlikely(v != 1)) { + /* tw should be the owner and so have some refs */ + if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK))) + return IOU_POLL_NO_ACTION; + if (v & IO_POLL_CANCEL_FLAG) + return -ECANCELED; + /* + * cqe.res contains only events of the first wake up + * and all others are to be lost. Redo vfs_poll() to get + * up to date state. + */ + if ((v & IO_POLL_REF_MASK) != 1) + req->cqe.res = 0; + + if (v & IO_POLL_RETRY_FLAG) { + req->cqe.res = 0; + /* + * We won't find new events that came in between + * vfs_poll and the ref put unless we clear the + * flag in advance. + */ + atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs); + v &= ~IO_POLL_RETRY_FLAG; + } + } /* the mask was stashed in __io_poll_execute */ if (!req->cqe.res) { @@ -243,8 +288,8 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked) __poll_t mask = mangle_poll(req->cqe.res & req->apoll_events); - if (!io_post_aux_cqe(ctx, req->cqe.user_data, - mask, IORING_CQE_F_MORE, false)) { + if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data, + mask, IORING_CQE_F_MORE, false)) { io_req_set_res(req, mask, 0); return IOU_POLL_REMOVE_POLL_USE_RES; } @@ -256,11 +301,15 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked) return ret; } + /* force the next iteration to vfs_poll() */ + req->cqe.res = 0; + /* * Release all references, retry if someone tried to restart * task_work while we were executing it. */ - } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs)); + } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs) & + IO_POLL_REF_MASK); return IOU_POLL_NO_ACTION; } @@ -272,54 +321,38 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked) ret = io_poll_check_events(req, locked); if (ret == IOU_POLL_NO_ACTION) return; - - if (ret == IOU_POLL_DONE) { - struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll); - req->cqe.res = mangle_poll(req->cqe.res & poll->events); - } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) { - req->cqe.res = ret; - req_set_fail(req); - } - io_poll_remove_entries(req); io_poll_tw_hash_eject(req, locked); - io_req_set_res(req, req->cqe.res, 0); - io_req_task_complete(req, locked); -} - -static void io_apoll_task_func(struct io_kiocb *req, bool *locked) -{ - int ret; - - ret = io_poll_check_events(req, locked); - if (ret == IOU_POLL_NO_ACTION) - return; + if (req->opcode == IORING_OP_POLL_ADD) { + if (ret == IOU_POLL_DONE) { + struct io_poll *poll; - io_poll_remove_entries(req); - io_poll_tw_hash_eject(req, locked); + poll = io_kiocb_to_cmd(req, struct io_poll); + req->cqe.res = mangle_poll(req->cqe.res & poll->events); + } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) { + req->cqe.res = ret; + req_set_fail(req); + } - if (ret == IOU_POLL_REMOVE_POLL_USE_RES) - io_req_complete_post(req); - else if (ret == IOU_POLL_DONE) - io_req_task_submit(req, locked); - else - io_req_complete_failed(req, ret); + io_req_set_res(req, req->cqe.res, 0); + io_req_task_complete(req, locked); + } else { + io_tw_lock(req->ctx, locked); + + if (ret == IOU_POLL_REMOVE_POLL_USE_RES) + io_req_task_complete(req, locked); + else if (ret == IOU_POLL_DONE) + io_req_task_submit(req, locked); + else + io_req_defer_failed(req, ret); + } } static void __io_poll_execute(struct io_kiocb *req, int mask) { io_req_set_res(req, mask, 0); - /* - * This is useful for poll that is armed on behalf of another - * request, and where the wakeup path could be on a different - * CPU. We want to avoid pulling in req->apoll->events for that - * case. - */ - if (req->opcode == IORING_OP_POLL_ADD) - req->io_task_work.func = io_poll_task_func; - else - req->io_task_work.func = io_apoll_task_func; + req->io_task_work.func = io_poll_task_func; trace_io_uring_task_add(req, mask); io_req_task_work_add(req); @@ -380,6 +413,14 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, return 0; if (io_poll_get_ownership(req)) { + /* + * If we trigger a multishot poll off our own wakeup path, + * disable multishot as there is a circular dependency between + * CQ posting and triggering the event. + */ + if (mask & EPOLL_URING_WAKE) + poll->events |= EPOLLONESHOT; + /* optional, saves extra locking for removal in tw handler */ if (mask && poll->events & EPOLLONESHOT) { list_del_init(&poll->wait.entry); @@ -394,7 +435,8 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, return 1; } -static void io_poll_double_prepare(struct io_kiocb *req) +/* fails only when polling is already completing by the first entry */ +static bool io_poll_double_prepare(struct io_kiocb *req) { struct wait_queue_head *head; struct io_poll *poll = io_poll_get_single(req); @@ -403,20 +445,20 @@ static void io_poll_double_prepare(struct io_kiocb *req) rcu_read_lock(); head = smp_load_acquire(&poll->head); /* - * poll arm may not hold ownership and so race with - * io_poll_wake() by modifying req->flags. There is only one - * poll entry queued, serialise with it by taking its head lock. + * poll arm might not hold ownership and so race for req->flags with + * io_poll_wake(). There is only one poll entry queued, serialise with + * it by taking its head lock. As we're still arming the tw hanlder + * is not going to be run, so there are no races with it. */ - if (head) + if (head) { spin_lock_irq(&head->lock); - - req->flags |= REQ_F_DOUBLE_POLL; - if (req->opcode == IORING_OP_POLL_ADD) - req->flags |= REQ_F_ASYNC_DATA; - - if (head) + req->flags |= REQ_F_DOUBLE_POLL; + if (req->opcode == IORING_OP_POLL_ADD) + req->flags |= REQ_F_ASYNC_DATA; spin_unlock_irq(&head->lock); + } rcu_read_unlock(); + return !!head; } static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt, @@ -454,7 +496,11 @@ static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt, /* mark as double wq entry */ wqe_private |= IO_WQE_F_DOUBLE; io_init_poll_iocb(poll, first->events, first->wait.func); - io_poll_double_prepare(req); + if (!io_poll_double_prepare(req)) { + /* the request is completing, just back off */ + kfree(poll); + return; + } *poll_ptr = poll; } else { /* fine to modify, there is no poll queued to race with us */ @@ -499,7 +545,6 @@ static int __io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags) { struct io_ring_ctx *ctx = req->ctx; - int v; INIT_HLIST_NODE(&req->hash_node); req->work.cancel_seq = atomic_read(&ctx->cancel_seq); @@ -567,11 +612,10 @@ static int __io_arm_poll_handler(struct io_kiocb *req, if (ipt->owning) { /* - * Release ownership. If someone tried to queue a tw while it was - * locked, kick it off for them. + * Try to release ownership. If we see a change of state, e.g. + * poll was waken up, queue up a tw, it'll deal with it. */ - v = atomic_dec_return(&req->poll_refs); - if (unlikely(v & IO_POLL_REF_MASK)) + if (atomic_cmpxchg(&req->poll_refs, 1, 0) != 1) __io_poll_execute(req, 0); } return 0; @@ -596,10 +640,13 @@ static struct async_poll *io_req_alloc_apoll(struct io_kiocb *req, if (req->flags & REQ_F_POLLED) { apoll = req->apoll; kfree(apoll->double_poll); - } else if (!(issue_flags & IO_URING_F_UNLOCKED) && - (entry = io_alloc_cache_get(&ctx->apoll_cache)) != NULL) { + } else if (!(issue_flags & IO_URING_F_UNLOCKED)) { + entry = io_alloc_cache_get(&ctx->apoll_cache); + if (entry == NULL) + goto alloc_apoll; apoll = container_of(entry, struct async_poll, cache); } else { +alloc_apoll: apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC); if (unlikely(!apoll)) return NULL; diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 55d4ab96fb92..18de10c68a15 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -170,10 +170,10 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) if (prsrc->tag) { if (ctx->flags & IORING_SETUP_IOPOLL) { mutex_lock(&ctx->uring_lock); - io_post_aux_cqe(ctx, prsrc->tag, 0, 0, true); + io_post_aux_cqe(ctx, prsrc->tag, 0, 0); mutex_unlock(&ctx->uring_lock); } else { - io_post_aux_cqe(ctx, prsrc->tag, 0, 0, true); + io_post_aux_cqe(ctx, prsrc->tag, 0, 0); } } @@ -204,6 +204,14 @@ void io_rsrc_put_work(struct work_struct *work) } } +void io_rsrc_put_tw(struct callback_head *cb) +{ + struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx, + rsrc_put_tw); + + io_rsrc_put_work(&ctx->rsrc_put_work.work); +} + void io_wait_rsrc_data(struct io_rsrc_data *data) { if (data && !atomic_dec_and_test(&data->refs)) @@ -242,8 +250,15 @@ static __cold void io_rsrc_node_ref_zero(struct percpu_ref *ref) } spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags); - if (first_add) - mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay); + if (!first_add) + return; + + if (ctx->submitter_task) { + if (!task_work_add(ctx->submitter_task, &ctx->rsrc_put_tw, + ctx->notify_method)) + return; + } + mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay); } static struct io_rsrc_node *io_rsrc_node_alloc(void) @@ -309,41 +324,41 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, /* As we may drop ->uring_lock, other task may have started quiesce */ if (data->quiesce) return -ENXIO; + ret = io_rsrc_node_switch_start(ctx); + if (ret) + return ret; + io_rsrc_node_switch(ctx, data); + + /* kill initial ref, already quiesced if zero */ + if (atomic_dec_and_test(&data->refs)) + return 0; data->quiesce = true; + mutex_unlock(&ctx->uring_lock); do { - ret = io_rsrc_node_switch_start(ctx); - if (ret) + ret = io_run_task_work_sig(ctx); + if (ret < 0) { + atomic_inc(&data->refs); + /* wait for all works potentially completing data->done */ + flush_delayed_work(&ctx->rsrc_put_work); + reinit_completion(&data->done); + mutex_lock(&ctx->uring_lock); break; - io_rsrc_node_switch(ctx, data); + } - /* kill initial ref, already quiesced if zero */ - if (atomic_dec_and_test(&data->refs)) - break; - mutex_unlock(&ctx->uring_lock); flush_delayed_work(&ctx->rsrc_put_work); ret = wait_for_completion_interruptible(&data->done); if (!ret) { mutex_lock(&ctx->uring_lock); - if (atomic_read(&data->refs) > 0) { - /* - * it has been revived by another thread while - * we were unlocked - */ - mutex_unlock(&ctx->uring_lock); - } else { + if (atomic_read(&data->refs) <= 0) break; - } + /* + * it has been revived by another thread while + * we were unlocked + */ + mutex_unlock(&ctx->uring_lock); } - - atomic_inc(&data->refs); - /* wait for all works potentially completing data->done */ - flush_delayed_work(&ctx->rsrc_put_work); - reinit_completion(&data->done); - - ret = io_run_task_work_sig(ctx); - mutex_lock(&ctx->uring_lock); - } while (ret >= 0); + } while (1); data->quiesce = false; return ret; diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index 81445a477622..2b8743645efc 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -53,6 +53,7 @@ struct io_mapped_ubuf { struct bio_vec bvec[]; }; +void io_rsrc_put_tw(struct callback_head *cb); void io_rsrc_put_work(struct work_struct *work); void io_rsrc_refs_refill(struct io_ring_ctx *ctx); void io_wait_rsrc_data(struct io_rsrc_data *data); diff --git a/io_uring/rw.c b/io_uring/rw.c index 5c91cc80b348..b9cac5706e8d 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -286,6 +286,12 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res) static void io_req_rw_complete(struct io_kiocb *req, bool *locked) { io_req_io_end(req); + + if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) { + unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED; + + req->cqe.flags |= io_put_kbuf(req, issue_flags); + } io_req_task_complete(req, locked); } @@ -548,12 +554,12 @@ static inline int io_rw_prep_async(struct io_kiocb *req, int rw) int io_readv_prep_async(struct io_kiocb *req) { - return io_rw_prep_async(req, READ); + return io_rw_prep_async(req, ITER_DEST); } int io_writev_prep_async(struct io_kiocb *req) { - return io_rw_prep_async(req, WRITE); + return io_rw_prep_async(req, ITER_SOURCE); } /* @@ -705,7 +711,7 @@ int io_read(struct io_kiocb *req, unsigned int issue_flags) loff_t *ppos; if (!req_has_async_data(req)) { - ret = io_import_iovec(READ, req, &iovec, s, issue_flags); + ret = io_import_iovec(ITER_DEST, req, &iovec, s, issue_flags); if (unlikely(ret < 0)) return ret; } else { @@ -717,7 +723,7 @@ int io_read(struct io_kiocb *req, unsigned int issue_flags) * buffers, as we dropped the selected one before retry. */ if (io_do_buffer_select(req)) { - ret = io_import_iovec(READ, req, &iovec, s, issue_flags); + ret = io_import_iovec(ITER_DEST, req, &iovec, s, issue_flags); if (unlikely(ret < 0)) return ret; } @@ -852,7 +858,7 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags) loff_t *ppos; if (!req_has_async_data(req)) { - ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags); + ret = io_import_iovec(ITER_SOURCE, req, &iovec, s, issue_flags); if (unlikely(ret < 0)) return ret; } else { diff --git a/io_uring/timeout.c b/io_uring/timeout.c index e8a8c2099480..5b4bc93fd6e0 100644 --- a/io_uring/timeout.c +++ b/io_uring/timeout.c @@ -63,7 +63,7 @@ static bool io_kill_timeout(struct io_kiocb *req, int status) atomic_set(&req->ctx->cq_timeouts, atomic_read(&req->ctx->cq_timeouts) + 1); list_del_init(&timeout->list); - io_req_tw_post_queue(req, status, 0); + io_req_queue_tw_complete(req, status); return true; } return false; @@ -159,7 +159,7 @@ void io_disarm_next(struct io_kiocb *req) req->flags &= ~REQ_F_ARM_LTIMEOUT; if (link && link->opcode == IORING_OP_LINK_TIMEOUT) { io_remove_next_linked(req); - io_req_tw_post_queue(link, -ECANCELED, 0); + io_req_queue_tw_complete(link, -ECANCELED); } } else if (req->flags & REQ_F_LINK_TIMEOUT) { struct io_ring_ctx *ctx = req->ctx; @@ -168,7 +168,7 @@ void io_disarm_next(struct io_kiocb *req) link = io_disarm_linked_timeout(req); spin_unlock_irq(&ctx->timeout_lock); if (link) - io_req_tw_post_queue(link, -ECANCELED, 0); + io_req_queue_tw_complete(link, -ECANCELED); } if (unlikely((req->flags & REQ_F_FAIL) && !(req->flags & REQ_F_HARDLINK))) @@ -282,11 +282,11 @@ static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked) ret = io_try_cancel(req->task->io_uring, &cd, issue_flags); } io_req_set_res(req, ret ?: -ETIME, 0); - io_req_complete_post(req); + io_req_task_complete(req, locked); io_put_req(prev); } else { io_req_set_res(req, -ETIME, 0); - io_req_complete_post(req); + io_req_task_complete(req, locked); } } diff --git a/io_uring/uring_cmd.c b/io_uring/uring_cmd.c index e50de0b6b9f8..446a189b78b0 100644 --- a/io_uring/uring_cmd.c +++ b/io_uring/uring_cmd.c @@ -56,7 +56,7 @@ void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2) /* order with io_iopoll_req_issued() checking ->iopoll_complete */ smp_store_release(&req->iopoll_completed, 1); else - __io_req_complete(req, 0); + io_req_complete_post(req, 0); } EXPORT_SYMBOL_GPL(io_uring_cmd_done); diff --git a/io_uring/xattr.c b/io_uring/xattr.c index 99df641594d7..6201a9f442c6 100644 --- a/io_uring/xattr.c +++ b/io_uring/xattr.c @@ -112,7 +112,7 @@ int io_fgetxattr(struct io_kiocb *req, unsigned int issue_flags) if (issue_flags & IO_URING_F_NONBLOCK) return -EAGAIN; - ret = do_getxattr(mnt_user_ns(req->file->f_path.mnt), + ret = do_getxattr(mnt_idmap(req->file->f_path.mnt), req->file->f_path.dentry, &ix->ctx); @@ -133,9 +133,7 @@ int io_getxattr(struct io_kiocb *req, unsigned int issue_flags) retry: ret = filename_lookup(AT_FDCWD, ix->filename, lookup_flags, &path, NULL); if (!ret) { - ret = do_getxattr(mnt_user_ns(path.mnt), - path.dentry, - &ix->ctx); + ret = do_getxattr(mnt_idmap(path.mnt), path.dentry, &ix->ctx); path_put(&path); if (retry_estale(ret, lookup_flags)) { @@ -213,7 +211,7 @@ static int __io_setxattr(struct io_kiocb *req, unsigned int issue_flags, ret = mnt_want_write(path->mnt); if (!ret) { - ret = do_setxattr(mnt_user_ns(path->mnt), path->dentry, &ix->ctx); + ret = do_setxattr(mnt_idmap(path->mnt), path->dentry, &ix->ctx); mnt_drop_write(path->mnt); } |