diff options
Diffstat (limited to 'net/tipc/bcast.c')
| -rw-r--r-- | net/tipc/bcast.c | 204 | 
1 files changed, 174 insertions, 30 deletions
| diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index aa1babbea385..7d99029df342 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -1,7 +1,7 @@  /*   * net/tipc/bcast.c: TIPC broadcast code   * - * Copyright (c) 2004-2006, 2014-2015, Ericsson AB + * Copyright (c) 2004-2006, 2014-2016, Ericsson AB   * Copyright (c) 2004, Intel Corporation.   * Copyright (c) 2005, 2010-2011, Wind River Systems   * All rights reserved. @@ -39,9 +39,8 @@  #include "socket.h"  #include "msg.h"  #include "bcast.h" -#include "name_distr.h"  #include "link.h" -#include "node.h" +#include "name_table.h"  #define	BCLINK_WIN_DEFAULT	50	/* bcast link window size (default) */  #define	BCLINK_WIN_MIN	        32	/* bcast minimum link window size */ @@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link";   * @inputq: data input queue; will only carry SOCK_WAKEUP messages   * @dest: array keeping number of reachable destinations per bearer   * @primary_bearer: a bearer having links to all broadcast destinations, if any + * @bcast_support: indicates if primary bearer, if any, supports broadcast + * @rcast_support: indicates if all peer nodes support replicast + * @rc_ratio: dest count as percentage of cluster size where send method changes + * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast   */  struct tipc_bc_base {  	struct tipc_link *link;  	struct sk_buff_head inputq;  	int dests[MAX_BEARERS];  	int primary_bearer; +	bool bcast_support; +	bool rcast_support; +	int rc_ratio; +	int bc_threshold;  };  static struct tipc_bc_base *tipc_bc_base(struct net *net) @@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)  int tipc_bcast_get_mtu(struct net *net)  { -	return tipc_link_mtu(tipc_bc_sndlink(net)); +	return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; +} + +void tipc_bcast_disable_rcast(struct net *net) +{ +	tipc_bc_base(net)->rcast_support = false; +} + +static void tipc_bcbase_calc_bc_threshold(struct net *net) +{ +	struct tipc_bc_base *bb = tipc_bc_base(net); +	int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net)); + +	bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);  }  /* tipc_bcbase_select_primary(): find a bearer with links to all destinations, @@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net)  {  	struct tipc_bc_base *bb = tipc_bc_base(net);  	int all_dests =  tipc_link_bc_peers(bb->link); -	int i, mtu; +	int i, mtu, prim;  	bb->primary_bearer = INVALID_BEARER_ID; +	bb->bcast_support = true;  	if (!all_dests)  		return; @@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net)  		mtu = tipc_bearer_mtu(net, i);  		if (mtu < tipc_link_mtu(bb->link))  			tipc_link_set_mtu(bb->link, mtu); - +		bb->bcast_support &= tipc_bearer_bcast_support(net, i);  		if (bb->dests[i] < all_dests)  			continue; @@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net)  		if ((i ^ tipc_own_addr(net)) & 1)  			break;  	} +	prim = bb->primary_bearer; +	if (prim != INVALID_BEARER_ID) +		bb->bcast_support = tipc_bearer_bcast_support(net, prim);  }  void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) @@ -170,45 +194,131 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)  	__skb_queue_purge(&_xmitq);  } -/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster - *                    and to identified node local sockets +static void tipc_bcast_select_xmit_method(struct net *net, int dests, +					  struct tipc_mc_method *method) +{ +	struct tipc_bc_base *bb = tipc_bc_base(net); +	unsigned long exp = method->expires; + +	/* Broadcast supported by used bearer/bearers? */ +	if (!bb->bcast_support) { +		method->rcast = true; +		return; +	} +	/* Any destinations which don't support replicast ? */ +	if (!bb->rcast_support) { +		method->rcast = false; +		return; +	} +	/* Can current method be changed ? */ +	method->expires = jiffies + TIPC_METHOD_EXPIRE; +	if (method->mandatory || time_before(jiffies, exp)) +		return; + +	/* Determine method to use now */ +	method->rcast = dests <= bb->bc_threshold; +} + +/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes   * @net: the applicable net namespace - * @list: chain of buffers containing message - * Consumes the buffer chain, except when returning -ELINKCONG - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + * @pkts: chain of buffers containing message + * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0 + * Consumes the buffer chain. + * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE   */ -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) +static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, +			   u16 *cong_link_cnt)  {  	struct tipc_link *l = tipc_bc_sndlink(net); -	struct sk_buff_head xmitq, inputq, rcvq; +	struct sk_buff_head xmitq;  	int rc = 0; -	__skb_queue_head_init(&rcvq);  	__skb_queue_head_init(&xmitq); -	skb_queue_head_init(&inputq); - -	/* Prepare message clone for local node */ -	if (unlikely(!tipc_msg_reassemble(list, &rcvq))) -		return -EHOSTUNREACH; -  	tipc_bcast_lock(net);  	if (tipc_link_bc_peers(l)) -		rc = tipc_link_xmit(l, list, &xmitq); +		rc = tipc_link_xmit(l, pkts, &xmitq);  	tipc_bcast_unlock(net); - -	/* Don't send to local node if adding to link failed */ -	if (unlikely(rc)) { -		__skb_queue_purge(&rcvq); -		return rc; +	tipc_bcbase_xmit(net, &xmitq); +	__skb_queue_purge(pkts); +	if (rc == -ELINKCONG) { +		*cong_link_cnt = 1; +		rc = 0;  	} +	return rc; +} -	/* Broadcast to all nodes, inluding local node */ -	tipc_bcbase_xmit(net, &xmitq); -	tipc_sk_mcast_rcv(net, &rcvq, &inputq); -	__skb_queue_purge(list); +/* tipc_rcast_xmit - replicate and send a message to given destination nodes + * @net: the applicable net namespace + * @pkts: chain of buffers containing message + * @dests: list of destination nodes + * @cong_link_cnt: returns number of congested links + * @cong_links: returns identities of congested links + * Returns 0 if success, otherwise errno + */ +static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, +			   struct tipc_nlist *dests, u16 *cong_link_cnt) +{ +	struct sk_buff_head _pkts; +	struct u32_item *n, *tmp; +	u32 dst, selector; + +	selector = msg_link_selector(buf_msg(skb_peek(pkts))); +	__skb_queue_head_init(&_pkts); + +	list_for_each_entry_safe(n, tmp, &dests->list, list) { +		dst = n->value; +		if (!tipc_msg_pskb_copy(dst, pkts, &_pkts)) +			return -ENOMEM; + +		/* Any other return value than -ELINKCONG is ignored */ +		if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG) +			(*cong_link_cnt)++; +	}  	return 0;  } +/* tipc_mcast_xmit - deliver message to indicated destination nodes + *                   and to identified node local sockets + * @net: the applicable net namespace + * @pkts: chain of buffers containing message + * @method: send method to be used + * @dests: destination nodes for message. + * @cong_link_cnt: returns number of encountered congested destination links + * Consumes buffer chain. + * Returns 0 if success, otherwise errno + */ +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, +		    struct tipc_mc_method *method, struct tipc_nlist *dests, +		    u16 *cong_link_cnt) +{ +	struct sk_buff_head inputq, localq; +	int rc = 0; + +	skb_queue_head_init(&inputq); +	skb_queue_head_init(&localq); + +	/* Clone packets before they are consumed by next call */ +	if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { +		rc = -ENOMEM; +		goto exit; +	} +	/* Send according to determined transmit method */ +	if (dests->remote) { +		tipc_bcast_select_xmit_method(net, dests->remote, method); +		if (method->rcast) +			rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); +		else +			rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); +	} + +	if (dests->local) +		tipc_sk_mcast_rcv(net, &localq, &inputq); +exit: +	/* This queue should normally be empty by now */ +	__skb_queue_purge(pkts); +	return rc; +} +  /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link   *   * RCU is locked, no other locks set @@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,  	tipc_bcast_lock(net);  	tipc_link_add_bc_peer(snd_l, uc_l, xmitq);  	tipc_bcbase_select_primary(net); +	tipc_bcbase_calc_bc_threshold(net);  	tipc_bcast_unlock(net);  } @@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)  	tipc_bcast_lock(net);  	tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);  	tipc_bcbase_select_primary(net); +	tipc_bcbase_calc_bc_threshold(net);  	tipc_bcast_unlock(net);  	tipc_bcbase_xmit(net, &xmitq); @@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net)  		goto enomem;  	bb->link = l;  	tn->bcl = l; +	bb->rc_ratio = 25; +	bb->rcast_support = true;  	return 0;  enomem:  	kfree(bb); @@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net)  	kfree(tn->bcbase);  	kfree(tn->bcl);  } + +void tipc_nlist_init(struct tipc_nlist *nl, u32 self) +{ +	memset(nl, 0, sizeof(*nl)); +	INIT_LIST_HEAD(&nl->list); +	nl->self = self; +} + +void tipc_nlist_add(struct tipc_nlist *nl, u32 node) +{ +	if (node == nl->self) +		nl->local = true; +	else if (u32_push(&nl->list, node)) +		nl->remote++; +} + +void tipc_nlist_del(struct tipc_nlist *nl, u32 node) +{ +	if (node == nl->self) +		nl->local = false; +	else if (u32_del(&nl->list, node)) +		nl->remote--; +} + +void tipc_nlist_purge(struct tipc_nlist *nl) +{ +	u32_list_purge(&nl->list); +	nl->remote = 0; +	nl->local = 0; +} | 
