From 00bc00a9384c306cdd48611a53b955d936349bf6 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 4 May 2015 10:36:46 +0800 Subject: tipc: involve reference counter for subscriber At present subscriber's lock is used to protect the subscription list of subscriber as well as subscriptions linked into the list. While one or all subscriptions are deleted through iterating the list, the subscriber's lock must be held. Meanwhile, as deletion of subscription may happen in subscription timer's handler, the lock must be grabbed in the function as well. When subscription's timer is terminated with del_timer_sync() during above iteration, subscriber's lock has to be temporarily released, otherwise, deadlock may occur. However, the temporary release may cause the double free of a subscription as the subscription is not disconnected from the subscription list. Now if a reference counter is introduced to subscriber, subscription's timer can be asynchronously stopped with del_timer(). As a result, the issue is not only able to be fixed, but also relevant code is pretty readable and understandable. Signed-off-by: Ying Xue Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/subscr.c | 120 +++++++++++++++++++++++------------------------------- 1 file changed, 52 insertions(+), 68 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index d0dbde420540..9b24c9c7e05a 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -40,16 +40,21 @@ /** * struct tipc_subscriber - TIPC network topology subscriber + * @kref: reference counter to tipc_subscription object * @conid: connection identifier to server connecting to subscriber * @lock: control access to subscriber * @subscrp_list: list of subscription objects for this subscriber */ struct tipc_subscriber { + struct kref kref; int conid; spinlock_t lock; struct list_head subscrp_list; }; +static void tipc_subscrp_delete(struct tipc_subscription *sub); +static void tipc_subscrb_put(struct tipc_subscriber *subscriber); + /** * htohl - convert value to endianness used by destination * @in: value to convert @@ -116,42 +121,34 @@ static void tipc_subscrp_timeout(unsigned long data) { struct tipc_subscription *sub = (struct tipc_subscription *)data; struct tipc_subscriber *subscriber = sub->subscriber; - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - - /* The spin lock per subscriber is used to protect its members */ - spin_lock_bh(&subscriber->lock); - - /* Validate timeout (in case subscription is being cancelled) */ - if (sub->timeout == TIPC_WAIT_FOREVER) { - spin_unlock_bh(&subscriber->lock); - return; - } - - /* Unlink subscription from name table */ - tipc_nametbl_unsubscribe(sub); - - /* Unlink subscription from subscriber */ - list_del(&sub->subscrp_list); - - spin_unlock_bh(&subscriber->lock); /* Notify subscriber of timeout */ tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, TIPC_SUBSCR_TIMEOUT, 0, 0); - /* Now destroy subscription */ - kfree(sub); - atomic_dec(&tn->subscription_count); + spin_lock_bh(&subscriber->lock); + tipc_subscrp_delete(sub); + spin_unlock_bh(&subscriber->lock); + + tipc_subscrb_put(subscriber); } -static void tipc_subscrp_delete(struct tipc_subscription *sub) +static void tipc_subscrb_kref_release(struct kref *kref) { - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); + struct tipc_subscriber *subcriber = container_of(kref, + struct tipc_subscriber, kref); - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscrp_list); - kfree(sub); - atomic_dec(&tn->subscription_count); + kfree(subcriber); +} + +static void tipc_subscrb_put(struct tipc_subscriber *subscriber) +{ + kref_put(&subscriber->kref, tipc_subscrb_kref_release); +} + +static void tipc_subscrb_get(struct tipc_subscriber *subscriber) +{ + kref_get(&subscriber->kref); } static struct tipc_subscriber *tipc_subscrb_create(int conid) @@ -163,6 +160,7 @@ static struct tipc_subscriber *tipc_subscrb_create(int conid) pr_warn("Subscriber rejected, no memory\n"); return NULL; } + kref_init(&subscriber->kref); INIT_LIST_HEAD(&subscriber->subscrp_list); subscriber->conid = conid; spin_lock_init(&subscriber->lock); @@ -172,62 +170,48 @@ static struct tipc_subscriber *tipc_subscrb_create(int conid) static void tipc_subscrb_delete(struct tipc_subscriber *subscriber) { - struct tipc_subscription *sub; - struct tipc_subscription *sub_temp; + struct tipc_subscription *sub, *temp; spin_lock_bh(&subscriber->lock); - /* Destroy any existing subscriptions for subscriber */ - list_for_each_entry_safe(sub, sub_temp, &subscriber->subscrp_list, + list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, subscrp_list) { - if (sub->timeout != TIPC_WAIT_FOREVER) { - spin_unlock_bh(&subscriber->lock); - del_timer_sync(&sub->timer); - spin_lock_bh(&subscriber->lock); + if (del_timer(&sub->timer)) { + tipc_subscrp_delete(sub); + tipc_subscrb_put(subscriber); } - tipc_subscrp_delete(sub); } spin_unlock_bh(&subscriber->lock); - /* Now destroy subscriber */ - kfree(subscriber); + tipc_subscrb_put(subscriber); +} + +static void tipc_subscrp_delete(struct tipc_subscription *sub) +{ + struct tipc_net *tn = net_generic(sub->net, tipc_net_id); + + tipc_nametbl_unsubscribe(sub); + list_del(&sub->subscrp_list); + kfree(sub); + atomic_dec(&tn->subscription_count); } -/** - * tipc_subscrp_cancel - handle subscription cancellation request - * - * Called with subscriber lock held. Routine must temporarily release lock - * to enable the subscription timeout routine to finish without deadlocking; - * the lock is then reclaimed to allow caller to release it upon return. - * - * Note that fields of 's' use subscriber's endianness! - */ static void tipc_subscrp_cancel(struct tipc_subscr *s, struct tipc_subscriber *subscriber) { - struct tipc_subscription *sub; - struct tipc_subscription *sub_temp; - int found = 0; + struct tipc_subscription *sub, *temp; /* Find first matching subscription, exit if not found */ - list_for_each_entry_safe(sub, sub_temp, &subscriber->subscrp_list, + list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, subscrp_list) { if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { - found = 1; + if (del_timer(&sub->timer)) { + tipc_subscrp_delete(sub); + tipc_subscrb_put(subscriber); + } break; } } - if (!found) - return; - - /* Cancel subscription timer (if used), then delete subscription */ - if (sub->timeout != TIPC_WAIT_FOREVER) { - sub->timeout = TIPC_WAIT_FOREVER; - spin_unlock_bh(&subscriber->lock); - del_timer_sync(&sub->timer); - spin_lock_bh(&subscriber->lock); - } - tipc_subscrp_delete(sub); } static int tipc_subscrp_create(struct net *net, struct tipc_subscr *s, @@ -281,11 +265,11 @@ static int tipc_subscrp_create(struct net *net, struct tipc_subscr *s, sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(*s)); atomic_inc(&tn->subscription_count); - if (sub->timeout != TIPC_WAIT_FOREVER) { - setup_timer(&sub->timer, tipc_subscrp_timeout, - (unsigned long)sub); - mod_timer(&sub->timer, jiffies + sub->timeout); - } + setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); + if (sub->timeout != TIPC_WAIT_FOREVER) + sub->timeout += jiffies; + if (!mod_timer(&sub->timer, sub->timeout)) + tipc_subscrb_get(subscriber); *sub_p = sub; return 0; } -- cgit v1.2.3-70-g09d2