diff options
-rw-r--r-- | fs/io-wq.c | 85 | ||||
-rw-r--r-- | fs/io-wq.h | 14 | ||||
-rw-r--r-- | fs/io_uring.c | 19 |
3 files changed, 107 insertions, 11 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index a53df2b3762a..d28ad66b7f16 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -87,7 +87,6 @@ struct io_wqe { struct { raw_spinlock_t lock; struct io_wq_work_list work_list; - unsigned long hash_map; unsigned flags; } ____cacheline_aligned_in_smp; @@ -97,6 +96,8 @@ struct io_wqe { struct hlist_nulls_head free_list; struct list_head all_list; + struct wait_queue_entry wait; + struct io_wq *wq; struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; }; @@ -113,6 +114,9 @@ struct io_wq { struct task_struct *manager; struct user_struct *user; + + struct io_wq_hash *hash; + refcount_t refs; struct completion done; @@ -328,14 +332,31 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work) return work->flags >> IO_WQ_HASH_SHIFT; } +static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) +{ + struct io_wq *wq = wqe->wq; + + spin_lock(&wq->hash->wait.lock); + if (list_empty(&wqe->wait.entry)) { + __add_wait_queue(&wq->hash->wait, &wqe->wait); + if (!test_bit(hash, &wq->hash->map)) { + __set_current_state(TASK_RUNNING); + list_del_init(&wqe->wait.entry); + } + } + spin_unlock(&wq->hash->wait.lock); +} + static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; - unsigned int hash; + unsigned int stall_hash = -1U; wq_list_for_each(node, prev, &wqe->work_list) { + unsigned int hash; + work = container_of(node, struct io_wq_work, list); /* not hashed, can run anytime */ @@ -344,16 +365,26 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) return work; } - /* hashed, can run if not already running */ hash = io_get_work_hash(work); - if (!(wqe->hash_map & BIT(hash))) { - wqe->hash_map |= BIT(hash); - /* all items with this hash lie in [work, tail] */ - tail = wqe->hash_tail[hash]; + /* all items with this hash lie in [work, tail] */ + tail = wqe->hash_tail[hash]; + + /* hashed, can run if not already running */ + if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { wqe->hash_tail[hash] = NULL; wq_list_cut(&wqe->work_list, &tail->list, prev); return work; } + if (stall_hash == -1U) + stall_hash = hash; + /* fast forward to a next hash, for-each will fix up @prev */ + node = &tail->list; + } + + if (stall_hash != -1U) { + raw_spin_unlock(&wqe->lock); + io_wait_on_hash(wqe, stall_hash); + raw_spin_lock(&wqe->lock); } return NULL; @@ -421,6 +452,7 @@ get_next: if (!work) break; io_assign_current_work(worker, work); + __set_current_state(TASK_RUNNING); /* handle a whole dependent link */ do { @@ -444,8 +476,10 @@ get_next: io_wqe_enqueue(wqe, linked); if (hash != -1U && !next_hashed) { + clear_bit(hash, &wq->hash->map); + if (wq_has_sleeper(&wq->hash->wait)) + wake_up(&wq->hash->wait); raw_spin_lock_irq(&wqe->lock); - wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; /* skip unnecessary unlock-lock wqe->lock */ if (!work) @@ -471,7 +505,6 @@ static int io_wqe_worker(void *data) loop: raw_spin_lock_irq(&wqe->lock); if (io_wqe_run_queue(wqe)) { - __set_current_state(TASK_RUNNING); io_worker_handle_work(worker); goto loop; } @@ -928,6 +961,24 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, return IO_WQ_CANCEL_NOTFOUND; } +static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, + int sync, void *key) +{ + struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); + int ret; + + list_del_init(&wait->entry); + + rcu_read_lock(); + ret = io_wqe_activate_free_worker(wqe); + rcu_read_unlock(); + + if (!ret) + wake_up_process(wqe->wq->manager); + + return 1; +} + struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) { int ret = -ENOMEM, node; @@ -948,6 +999,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) if (ret) goto err_wqes; + refcount_inc(&data->hash->refs); + wq->hash = data->hash; wq->free_work = data->free_work; wq->do_work = data->do_work; @@ -968,6 +1021,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); + wqe->wait.func = io_wqe_hash_wake; + INIT_LIST_HEAD(&wqe->wait.entry); wqe->wq = wq; raw_spin_lock_init(&wqe->lock); INIT_WQ_LIST(&wqe->work_list); @@ -989,6 +1044,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) if (refcount_dec_and_test(&wq->refs)) complete(&wq->done); + io_wq_put_hash(data->hash); err: cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); for_each_node(node) @@ -1017,8 +1073,15 @@ void io_wq_destroy(struct io_wq *wq) wait_for_completion(&wq->done); - for_each_node(node) - kfree(wq->wqes[node]); + spin_lock_irq(&wq->hash->wait.lock); + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; + + list_del_init(&wqe->wait.entry); + kfree(wqe); + } + spin_unlock_irq(&wq->hash->wait.lock); + io_wq_put_hash(wq->hash); kfree(wq->wqes); kfree(wq); } diff --git a/fs/io-wq.h b/fs/io-wq.h index 86825673be08..3677b39db015 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -1,6 +1,7 @@ #ifndef INTERNAL_IO_WQ_H #define INTERNAL_IO_WQ_H +#include <linux/refcount.h> #include <linux/io_uring.h> struct io_wq; @@ -93,7 +94,20 @@ static inline struct io_wq_work *wq_next_work(struct io_wq_work *work) typedef struct io_wq_work *(free_work_fn)(struct io_wq_work *); typedef void (io_wq_work_fn)(struct io_wq_work *); +struct io_wq_hash { + refcount_t refs; + unsigned long map; + struct wait_queue_head wait; +}; + +static inline void io_wq_put_hash(struct io_wq_hash *hash) +{ + if (refcount_dec_and_test(&hash->refs)) + kfree(hash); +} + struct io_wq_data { + struct io_wq_hash *hash; io_wq_work_fn *do_work; free_work_fn *free_work; }; diff --git a/fs/io_uring.c b/fs/io_uring.c index 0a435a6f265a..fbc85afa9a87 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -360,6 +360,9 @@ struct io_ring_ctx { unsigned cached_cq_overflow; unsigned long sq_check_overflow; + /* hashed buffered write serialization */ + struct io_wq_hash *hash_map; + struct list_head defer_list; struct list_head timeout_list; struct list_head cq_overflow_list; @@ -454,6 +457,8 @@ struct io_ring_ctx { /* exit task_work */ struct callback_head *exit_task_work; + struct wait_queue_head hash_wait; + /* Keep this last, we don't need it for the fast path */ struct work_struct exit_work; }; @@ -7763,9 +7768,21 @@ static struct io_wq_work *io_free_work(struct io_wq_work *work) static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx) { + struct io_wq_hash *hash; struct io_wq_data data; unsigned int concurrency; + hash = ctx->hash_map; + if (!hash) { + hash = kzalloc(sizeof(*hash), GFP_KERNEL); + if (!hash) + return ERR_PTR(-ENOMEM); + refcount_set(&hash->refs, 1); + init_waitqueue_head(&hash->wait); + ctx->hash_map = hash; + } + + data.hash = hash; data.free_work = io_free_work; data.do_work = io_wq_submit_work; @@ -8405,6 +8422,8 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) percpu_ref_exit(&ctx->refs); free_uid(ctx->user); io_req_caches_free(ctx, NULL); + if (ctx->hash_map) + io_wq_put_hash(ctx->hash_map); kfree(ctx->cancel_hash); kfree(ctx); } |