diff options
Diffstat (limited to 'kernel/rcu/srcu.c')
-rw-r--r-- | kernel/rcu/srcu.c | 347 |
1 files changed, 196 insertions, 151 deletions
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c index 3cfcc59bddf3..584d8a983883 100644 --- a/kernel/rcu/srcu.c +++ b/kernel/rcu/srcu.c @@ -36,16 +36,75 @@ #include <linux/delay.h> #include <linux/srcu.h> -#include <linux/rcu_node_tree.h> #include "rcu.h" +/* + * Initialize an rcu_batch structure to empty. + */ +static inline void rcu_batch_init(struct rcu_batch *b) +{ + b->head = NULL; + b->tail = &b->head; +} + +/* + * Enqueue a callback onto the tail of the specified rcu_batch structure. + */ +static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head) +{ + *b->tail = head; + b->tail = &head->next; +} + +/* + * Is the specified rcu_batch structure empty? + */ +static inline bool rcu_batch_empty(struct rcu_batch *b) +{ + return b->tail == &b->head; +} + +/* + * Remove the callback at the head of the specified rcu_batch structure + * and return a pointer to it, or return NULL if the structure is empty. + */ +static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b) +{ + struct rcu_head *head; + + if (rcu_batch_empty(b)) + return NULL; + + head = b->head; + b->head = head->next; + if (b->tail == &head->next) + rcu_batch_init(b); + + return head; +} + +/* + * Move all callbacks from the rcu_batch structure specified by "from" to + * the structure specified by "to". + */ +static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from) +{ + if (!rcu_batch_empty(from)) { + *to->tail = from->head; + to->tail = from->tail; + rcu_batch_init(from); + } +} + static int init_srcu_struct_fields(struct srcu_struct *sp) { sp->completed = 0; - sp->srcu_gp_seq = 0; - atomic_set(&sp->srcu_exp_cnt, 0); spin_lock_init(&sp->queue_lock); - rcu_segcblist_init(&sp->srcu_cblist); + sp->running = false; + rcu_batch_init(&sp->batch_queue); + rcu_batch_init(&sp->batch_check0); + rcu_batch_init(&sp->batch_check1); + rcu_batch_init(&sp->batch_done); INIT_DELAYED_WORK(&sp->work, process_srcu); sp->per_cpu_ref = alloc_percpu(struct srcu_array); return sp->per_cpu_ref ? 0 : -ENOMEM; @@ -180,8 +239,6 @@ static bool srcu_readers_active(struct srcu_struct *sp) return sum; } -#define SRCU_INTERVAL 1 - /** * cleanup_srcu_struct - deconstruct a sleep-RCU structure * @sp: structure to clean up. @@ -197,16 +254,8 @@ static bool srcu_readers_active(struct srcu_struct *sp) */ void cleanup_srcu_struct(struct srcu_struct *sp) { - WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt)); if (WARN_ON(srcu_readers_active(sp))) return; /* Leakage unless caller handles error. */ - if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist))) - return; /* Leakage unless caller handles error. */ - flush_delayed_work(&sp->work); - if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) { - pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq))); - return; /* Caller forgot to stop doing call_srcu()? */ - } free_percpu(sp->per_cpu_ref); sp->per_cpu_ref = NULL; } @@ -245,36 +294,26 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * We use an adaptive strategy for synchronize_srcu() and especially for * synchronize_srcu_expedited(). We spin for a fixed time period * (defined below) to allow SRCU readers to exit their read-side critical - * sections. If there are still some readers after a few microseconds, - * we repeatedly block for 1-millisecond time periods. + * sections. If there are still some readers after 10 microseconds, + * we repeatedly block for 1-millisecond time periods. This approach + * has done well in testing, so there is no need for a config parameter. */ #define SRCU_RETRY_CHECK_DELAY 5 +#define SYNCHRONIZE_SRCU_TRYCOUNT 2 +#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 /* - * Start an SRCU grace period. - */ -static void srcu_gp_start(struct srcu_struct *sp) -{ - int state; - - rcu_segcblist_accelerate(&sp->srcu_cblist, - rcu_seq_snap(&sp->srcu_gp_seq)); - rcu_seq_start(&sp->srcu_gp_seq); - state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); - WARN_ON_ONCE(state != SRCU_STATE_SCAN1); -} - -/* - * Wait until all readers counted by array index idx complete, but - * loop an additional time if there is an expedited grace period pending. - * The caller must ensure that ->completed is not changed while checking. + * @@@ Wait until all pre-existing readers complete. Such readers + * will have used the index specified by "idx". + * the caller should ensures the ->completed is not changed while checking + * and idx = (->completed & 1) ^ 1 */ static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) { for (;;) { if (srcu_readers_active_idx_check(sp, idx)) return true; - if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0) + if (--trycount <= 0) return false; udelay(SRCU_RETRY_CHECK_DELAY); } @@ -300,19 +339,6 @@ static void srcu_flip(struct srcu_struct *sp) } /* - * End an SRCU grace period. - */ -static void srcu_gp_end(struct srcu_struct *sp) -{ - rcu_seq_end(&sp->srcu_gp_seq); - - spin_lock_irq(&sp->queue_lock); - rcu_segcblist_advance(&sp->srcu_cblist, - rcu_seq_current(&sp->srcu_gp_seq)); - spin_unlock_irq(&sp->queue_lock); -} - -/* * Enqueue an SRCU callback on the specified srcu_struct structure, * initiating grace-period processing if it is not already running. * @@ -348,24 +374,26 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head, head->func = func; spin_lock_irqsave(&sp->queue_lock, flags); smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ - rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); - if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { - srcu_gp_start(sp); + rcu_batch_queue(&sp->batch_queue, head); + if (!sp->running) { + sp->running = true; queue_delayed_work(system_power_efficient_wq, &sp->work, 0); } spin_unlock_irqrestore(&sp->queue_lock, flags); } EXPORT_SYMBOL_GPL(call_srcu); -static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay); +static void srcu_advance_batches(struct srcu_struct *sp, int trycount); +static void srcu_reschedule(struct srcu_struct *sp); /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp) +static void __synchronize_srcu(struct srcu_struct *sp, int trycount) { struct rcu_synchronize rcu; struct rcu_head *head = &rcu.head; + bool done = false; RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) || lock_is_held(&rcu_bh_lock_map) || @@ -373,8 +401,6 @@ static void __synchronize_srcu(struct srcu_struct *sp) lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section"); - if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE) - return; might_sleep(); init_completion(&rcu.completion); @@ -382,47 +408,31 @@ static void __synchronize_srcu(struct srcu_struct *sp) head->func = wakeme_after_rcu; spin_lock_irq(&sp->queue_lock); smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ - if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { + if (!sp->running) { /* steal the processing owner */ - rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); - srcu_gp_start(sp); + sp->running = true; + rcu_batch_queue(&sp->batch_check0, head); spin_unlock_irq(&sp->queue_lock); + + srcu_advance_batches(sp, trycount); + if (!rcu_batch_empty(&sp->batch_done)) { + BUG_ON(sp->batch_done.head != head); + rcu_batch_dequeue(&sp->batch_done); + done = true; + } /* give the processing owner to work_struct */ - srcu_reschedule(sp, 0); + srcu_reschedule(sp); } else { - rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); + rcu_batch_queue(&sp->batch_queue, head); spin_unlock_irq(&sp->queue_lock); } - wait_for_completion(&rcu.completion); - smp_mb(); /* Caller's later accesses after GP. */ -} - -/** - * synchronize_srcu_expedited - Brute-force SRCU grace period - * @sp: srcu_struct with which to synchronize. - * - * Wait for an SRCU grace period to elapse, but be more aggressive about - * spinning rather than blocking when waiting. - * - * Note that synchronize_srcu_expedited() has the same deadlock and - * memory-ordering properties as does synchronize_srcu(). - */ -void synchronize_srcu_expedited(struct srcu_struct *sp) -{ - bool do_norm = rcu_gp_is_normal(); - - if (!do_norm) { - atomic_inc(&sp->srcu_exp_cnt); - smp_mb__after_atomic(); /* increment before GP. */ - } - __synchronize_srcu(sp); - if (!do_norm) { - smp_mb__before_atomic(); /* GP before decrement. */ - atomic_dec(&sp->srcu_exp_cnt); + if (!done) { + wait_for_completion(&rcu.completion); + smp_mb(); /* Caller's later accesses after GP. */ } + } -EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); /** * synchronize_srcu - wait for prior SRCU read-side critical-section completion @@ -465,14 +475,29 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); */ void synchronize_srcu(struct srcu_struct *sp) { - if (rcu_gp_is_expedited()) - synchronize_srcu_expedited(sp); - else - __synchronize_srcu(sp); + __synchronize_srcu(sp, (rcu_gp_is_expedited() && !rcu_gp_is_normal()) + ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT + : SYNCHRONIZE_SRCU_TRYCOUNT); } EXPORT_SYMBOL_GPL(synchronize_srcu); /** + * synchronize_srcu_expedited - Brute-force SRCU grace period + * @sp: srcu_struct with which to synchronize. + * + * Wait for an SRCU grace period to elapse, but be more aggressive about + * spinning rather than blocking when waiting. + * + * Note that synchronize_srcu_expedited() has the same deadlock and + * memory-ordering properties as does synchronize_srcu(). + */ +void synchronize_srcu_expedited(struct srcu_struct *sp) +{ + __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT); +} +EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); + +/** * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete. * @sp: srcu_struct on which to wait for in-flight callbacks. */ @@ -495,13 +520,29 @@ unsigned long srcu_batches_completed(struct srcu_struct *sp) } EXPORT_SYMBOL_GPL(srcu_batches_completed); +#define SRCU_CALLBACK_BATCH 10 +#define SRCU_INTERVAL 1 + +/* + * Move any new SRCU callbacks to the first stage of the SRCU grace + * period pipeline. + */ +static void srcu_collect_new(struct srcu_struct *sp) +{ + if (!rcu_batch_empty(&sp->batch_queue)) { + spin_lock_irq(&sp->queue_lock); + rcu_batch_move(&sp->batch_check0, &sp->batch_queue); + spin_unlock_irq(&sp->queue_lock); + } +} + /* * Core SRCU state machine. Advance callbacks from ->batch_check0 to * ->batch_check1 and then to ->batch_done as readers drain. */ -static void srcu_advance_batches(struct srcu_struct *sp) +static void srcu_advance_batches(struct srcu_struct *sp, int trycount) { - int idx; + int idx = 1 ^ (sp->completed & 1); /* * Because readers might be delayed for an extended period after @@ -509,44 +550,50 @@ static void srcu_advance_batches(struct srcu_struct *sp) * might well be readers using both idx=0 and idx=1. We therefore * need to wait for readers to clear from both index values before * invoking a callback. - * - * The load-acquire ensures that we see the accesses performed - * by the prior grace period. */ - idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */ - if (idx == SRCU_STATE_IDLE) { - spin_lock_irq(&sp->queue_lock); - if (rcu_segcblist_empty(&sp->srcu_cblist)) { - spin_unlock_irq(&sp->queue_lock); - return; - } - idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); - if (idx == SRCU_STATE_IDLE) - srcu_gp_start(sp); - spin_unlock_irq(&sp->queue_lock); - if (idx != SRCU_STATE_IDLE) - return; /* Someone else started the grace period. */ - } - if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) { - idx = 1 ^ (sp->completed & 1); - if (!try_check_zero(sp, idx, 1)) - return; /* readers present, retry later. */ - srcu_flip(sp); - rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2); - } + if (rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_check1)) + return; /* no callbacks need to be advanced */ - if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) { + if (!try_check_zero(sp, idx, trycount)) + return; /* failed to advance, will try after SRCU_INTERVAL */ - /* - * SRCU read-side critical sections are normally short, - * so check at least twice in quick succession after a flip. - */ - idx = 1 ^ (sp->completed & 1); - if (!try_check_zero(sp, idx, 2)) - return; /* readers present, retry after later. */ - srcu_gp_end(sp); - } + /* + * The callbacks in ->batch_check1 have already done with their + * first zero check and flip back when they were enqueued on + * ->batch_check0 in a previous invocation of srcu_advance_batches(). + * (Presumably try_check_zero() returned false during that + * invocation, leaving the callbacks stranded on ->batch_check1.) + * They are therefore ready to invoke, so move them to ->batch_done. + */ + rcu_batch_move(&sp->batch_done, &sp->batch_check1); + + if (rcu_batch_empty(&sp->batch_check0)) + return; /* no callbacks need to be advanced */ + srcu_flip(sp); + + /* + * The callbacks in ->batch_check0 just finished their + * first check zero and flip, so move them to ->batch_check1 + * for future checking on the other idx. + */ + rcu_batch_move(&sp->batch_check1, &sp->batch_check0); + + /* + * SRCU read-side critical sections are normally short, so check + * at least twice in quick succession after a flip. + */ + trycount = trycount < 2 ? 2 : trycount; + if (!try_check_zero(sp, idx^1, trycount)) + return; /* failed to advance, will try after SRCU_INTERVAL */ + + /* + * The callbacks in ->batch_check1 have now waited for all + * pre-existing readers using both idx values. They are therefore + * ready to invoke, so move them to ->batch_done. + */ + rcu_batch_move(&sp->batch_done, &sp->batch_check1); } /* @@ -557,48 +604,45 @@ static void srcu_advance_batches(struct srcu_struct *sp) */ static void srcu_invoke_callbacks(struct srcu_struct *sp) { - struct rcu_cblist ready_cbs; - struct rcu_head *rhp; + int i; + struct rcu_head *head; - spin_lock_irq(&sp->queue_lock); - if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) { - spin_unlock_irq(&sp->queue_lock); - return; - } - rcu_cblist_init(&ready_cbs); - rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs); - spin_unlock_irq(&sp->queue_lock); - rhp = rcu_cblist_dequeue(&ready_cbs); - for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) { + for (i = 0; i < SRCU_CALLBACK_BATCH; i++) { + head = rcu_batch_dequeue(&sp->batch_done); + if (!head) + break; local_bh_disable(); - rhp->func(rhp); + head->func(head); local_bh_enable(); } - spin_lock_irq(&sp->queue_lock); - rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs); - spin_unlock_irq(&sp->queue_lock); } /* * Finished one round of SRCU grace period. Start another if there are * more SRCU callbacks queued, otherwise put SRCU into not-running state. */ -static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) +static void srcu_reschedule(struct srcu_struct *sp) { bool pending = true; - int state; - if (rcu_segcblist_empty(&sp->srcu_cblist)) { + if (rcu_batch_empty(&sp->batch_done) && + rcu_batch_empty(&sp->batch_check1) && + rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_queue)) { spin_lock_irq(&sp->queue_lock); - state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); - if (rcu_segcblist_empty(&sp->srcu_cblist) && - state == SRCU_STATE_IDLE) + if (rcu_batch_empty(&sp->batch_done) && + rcu_batch_empty(&sp->batch_check1) && + rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_queue)) { + sp->running = false; pending = false; + } spin_unlock_irq(&sp->queue_lock); } if (pending) - queue_delayed_work(system_power_efficient_wq, &sp->work, delay); + queue_delayed_work(system_power_efficient_wq, + &sp->work, SRCU_INTERVAL); } /* @@ -610,8 +654,9 @@ void process_srcu(struct work_struct *work) sp = container_of(work, struct srcu_struct, work.work); - srcu_advance_batches(sp); + srcu_collect_new(sp); + srcu_advance_batches(sp, 1); srcu_invoke_callbacks(sp); - srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); + srcu_reschedule(sp); } EXPORT_SYMBOL_GPL(process_srcu); |