diff options
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/bcast.c | 200 | ||||
-rw-r--r-- | net/tipc/bcast.h | 33 | ||||
-rw-r--r-- | net/tipc/bearer.c | 15 | ||||
-rw-r--r-- | net/tipc/bearer.h | 8 | ||||
-rw-r--r-- | net/tipc/discover.c | 4 | ||||
-rw-r--r-- | net/tipc/link.c | 14 | ||||
-rw-r--r-- | net/tipc/msg.c | 33 | ||||
-rw-r--r-- | net/tipc/msg.h | 11 | ||||
-rw-r--r-- | net/tipc/name_distr.c | 2 | ||||
-rw-r--r-- | net/tipc/name_table.c | 38 | ||||
-rw-r--r-- | net/tipc/name_table.h | 9 | ||||
-rw-r--r-- | net/tipc/net.c | 4 | ||||
-rw-r--r-- | net/tipc/node.c | 36 | ||||
-rw-r--r-- | net/tipc/node.h | 4 | ||||
-rw-r--r-- | net/tipc/server.c | 48 | ||||
-rw-r--r-- | net/tipc/socket.c | 94 | ||||
-rw-r--r-- | net/tipc/subscr.c | 124 | ||||
-rw-r--r-- | net/tipc/subscr.h | 1 | ||||
-rw-r--r-- | net/tipc/udp_media.c | 8 |
19 files changed, 506 insertions, 180 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index c35fad3e08e8..7d99029df342 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -1,7 +1,7 @@ /* * net/tipc/bcast.c: TIPC broadcast code * - * Copyright (c) 2004-2006, 2014-2015, Ericsson AB + * Copyright (c) 2004-2006, 2014-2016, Ericsson AB * Copyright (c) 2004, Intel Corporation. * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. @@ -39,9 +39,8 @@ #include "socket.h" #include "msg.h" #include "bcast.h" -#include "name_distr.h" #include "link.h" -#include "node.h" +#include "name_table.h" #define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ @@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link"; * @inputq: data input queue; will only carry SOCK_WAKEUP messages * @dest: array keeping number of reachable destinations per bearer * @primary_bearer: a bearer having links to all broadcast destinations, if any + * @bcast_support: indicates if primary bearer, if any, supports broadcast + * @rcast_support: indicates if all peer nodes support replicast + * @rc_ratio: dest count as percentage of cluster size where send method changes + * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast */ struct tipc_bc_base { struct tipc_link *link; struct sk_buff_head inputq; int dests[MAX_BEARERS]; int primary_bearer; + bool bcast_support; + bool rcast_support; + int rc_ratio; + int bc_threshold; }; static struct tipc_bc_base *tipc_bc_base(struct net *net) @@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net) int tipc_bcast_get_mtu(struct net *net) { - return tipc_link_mtu(tipc_bc_sndlink(net)); + return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; +} + +void tipc_bcast_disable_rcast(struct net *net) +{ + tipc_bc_base(net)->rcast_support = false; +} + +static void tipc_bcbase_calc_bc_threshold(struct net *net) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net)); + + bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100); } /* tipc_bcbase_select_primary(): find a bearer with links to all destinations, @@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net) { struct tipc_bc_base *bb = tipc_bc_base(net); int all_dests = tipc_link_bc_peers(bb->link); - int i, mtu; + int i, mtu, prim; bb->primary_bearer = INVALID_BEARER_ID; + bb->bcast_support = true; if (!all_dests) return; @@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net) mtu = tipc_bearer_mtu(net, i); if (mtu < tipc_link_mtu(bb->link)) tipc_link_set_mtu(bb->link, mtu); - + bb->bcast_support &= tipc_bearer_bcast_support(net, i); if (bb->dests[i] < all_dests) continue; @@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net) if ((i ^ tipc_own_addr(net)) & 1) break; } + prim = bb->primary_bearer; + if (prim != INVALID_BEARER_ID) + bb->bcast_support = tipc_bearer_bcast_support(net, prim); } void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) @@ -170,42 +194,128 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) __skb_queue_purge(&_xmitq); } -/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster - * and to identified node local sockets +static void tipc_bcast_select_xmit_method(struct net *net, int dests, + struct tipc_mc_method *method) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + unsigned long exp = method->expires; + + /* Broadcast supported by used bearer/bearers? */ + if (!bb->bcast_support) { + method->rcast = true; + return; + } + /* Any destinations which don't support replicast ? */ + if (!bb->rcast_support) { + method->rcast = false; + return; + } + /* Can current method be changed ? */ + method->expires = jiffies + TIPC_METHOD_EXPIRE; + if (method->mandatory || time_before(jiffies, exp)) + return; + + /* Determine method to use now */ + method->rcast = dests <= bb->bc_threshold; +} + +/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes * @net: the applicable net namespace - * @list: chain of buffers containing message + * @pkts: chain of buffers containing message + * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0 * Consumes the buffer chain. - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE */ -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) +static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, + u16 *cong_link_cnt) { struct tipc_link *l = tipc_bc_sndlink(net); - struct sk_buff_head xmitq, inputq, rcvq; + struct sk_buff_head xmitq; int rc = 0; - __skb_queue_head_init(&rcvq); __skb_queue_head_init(&xmitq); - skb_queue_head_init(&inputq); - - /* Prepare message clone for local node */ - if (unlikely(!tipc_msg_reassemble(list, &rcvq))) - return -EHOSTUNREACH; - tipc_bcast_lock(net); if (tipc_link_bc_peers(l)) - rc = tipc_link_xmit(l, list, &xmitq); + rc = tipc_link_xmit(l, pkts, &xmitq); tipc_bcast_unlock(net); + tipc_bcbase_xmit(net, &xmitq); + __skb_queue_purge(pkts); + if (rc == -ELINKCONG) { + *cong_link_cnt = 1; + rc = 0; + } + return rc; +} - /* Don't send to local node if adding to link failed */ - if (unlikely(rc && (rc != -ELINKCONG))) { - __skb_queue_purge(&rcvq); - return rc; +/* tipc_rcast_xmit - replicate and send a message to given destination nodes + * @net: the applicable net namespace + * @pkts: chain of buffers containing message + * @dests: list of destination nodes + * @cong_link_cnt: returns number of congested links + * @cong_links: returns identities of congested links + * Returns 0 if success, otherwise errno + */ +static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, + struct tipc_nlist *dests, u16 *cong_link_cnt) +{ + struct sk_buff_head _pkts; + struct u32_item *n, *tmp; + u32 dst, selector; + + selector = msg_link_selector(buf_msg(skb_peek(pkts))); + __skb_queue_head_init(&_pkts); + + list_for_each_entry_safe(n, tmp, &dests->list, list) { + dst = n->value; + if (!tipc_msg_pskb_copy(dst, pkts, &_pkts)) + return -ENOMEM; + + /* Any other return value than -ELINKCONG is ignored */ + if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG) + (*cong_link_cnt)++; } + return 0; +} - /* Broadcast to all nodes, inluding local node */ - tipc_bcbase_xmit(net, &xmitq); - tipc_sk_mcast_rcv(net, &rcvq, &inputq); - __skb_queue_purge(list); +/* tipc_mcast_xmit - deliver message to indicated destination nodes + * and to identified node local sockets + * @net: the applicable net namespace + * @pkts: chain of buffers containing message + * @method: send method to be used + * @dests: destination nodes for message. + * @cong_link_cnt: returns number of encountered congested destination links + * Consumes buffer chain. + * Returns 0 if success, otherwise errno + */ +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, + struct tipc_mc_method *method, struct tipc_nlist *dests, + u16 *cong_link_cnt) +{ + struct sk_buff_head inputq, localq; + int rc = 0; + + skb_queue_head_init(&inputq); + skb_queue_head_init(&localq); + + /* Clone packets before they are consumed by next call */ + if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { + rc = -ENOMEM; + goto exit; + } + /* Send according to determined transmit method */ + if (dests->remote) { + tipc_bcast_select_xmit_method(net, dests->remote, method); + if (method->rcast) + rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); + else + rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); + } + + if (dests->local) + tipc_sk_mcast_rcv(net, &localq, &inputq); +exit: + /* This queue should normally be empty by now */ + __skb_queue_purge(pkts); return rc; } @@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, tipc_bcast_lock(net); tipc_link_add_bc_peer(snd_l, uc_l, xmitq); tipc_bcbase_select_primary(net); + tipc_bcbase_calc_bc_threshold(net); tipc_bcast_unlock(net); } @@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) tipc_bcast_lock(net); tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); tipc_bcbase_select_primary(net); + tipc_bcbase_calc_bc_threshold(net); tipc_bcast_unlock(net); tipc_bcbase_xmit(net, &xmitq); @@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net) goto enomem; bb->link = l; tn->bcl = l; + bb->rc_ratio = 25; + bb->rcast_support = true; return 0; enomem: kfree(bb); @@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net) kfree(tn->bcbase); kfree(tn->bcl); } + +void tipc_nlist_init(struct tipc_nlist *nl, u32 self) +{ + memset(nl, 0, sizeof(*nl)); + INIT_LIST_HEAD(&nl->list); + nl->self = self; +} + +void tipc_nlist_add(struct tipc_nlist *nl, u32 node) +{ + if (node == nl->self) + nl->local = true; + else if (u32_push(&nl->list, node)) + nl->remote++; +} + +void tipc_nlist_del(struct tipc_nlist *nl, u32 node) +{ + if (node == nl->self) + nl->local = false; + else if (u32_del(&nl->list, node)) + nl->remote--; +} + +void tipc_nlist_purge(struct tipc_nlist *nl) +{ + u32_list_purge(&nl->list); + nl->remote = 0; + nl->local = 0; +} diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 855d53c64ab3..751530ab0c49 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -42,9 +42,35 @@ struct tipc_node; struct tipc_msg; struct tipc_nl_msg; -struct tipc_node_map; +struct tipc_nlist; +struct tipc_nitem; extern const char tipc_bclink_name[]; +#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000) + +struct tipc_nlist { + struct list_head list; + u32 self; + u16 remote; + bool local; +}; + +void tipc_nlist_init(struct tipc_nlist *nl, u32 self); +void tipc_nlist_purge(struct tipc_nlist *nl); +void tipc_nlist_add(struct tipc_nlist *nl, u32 node); +void tipc_nlist_del(struct tipc_nlist *nl, u32 node); + +/* Cookie to be used between socket and broadcast layer + * @rcast: replicast (instead of broadcast) was used at previous xmit + * @mandatory: broadcast/replicast indication was set by user + * @expires: re-evaluate non-mandatory transmit method if we are past this + */ +struct tipc_mc_method { + bool rcast; + bool mandatory; + unsigned long expires; +}; + int tipc_bcast_init(struct net *net); void tipc_bcast_stop(struct net *net); void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, @@ -53,7 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl); void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); int tipc_bcast_get_mtu(struct net *net); -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); +void tipc_bcast_disable_rcast(struct net *net); +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, + struct tipc_mc_method *method, struct tipc_nlist *dests, + u16 *cong_link_cnt); int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, struct tipc_msg *hdr); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 52d74760fb68..33a5bdfbef76 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -431,7 +431,7 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b, memset(&b->bcast_addr, 0, sizeof(b->bcast_addr)); memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len); b->bcast_addr.media_id = b->media->type_id; - b->bcast_addr.broadcast = 1; + b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT; b->mtu = dev->mtu; b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr); rcu_assign_pointer(dev->tipc_ptr, b); @@ -482,6 +482,19 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb, return 0; } +bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id) +{ + bool supp = false; + struct tipc_bearer *b; + + rcu_read_lock(); + b = bearer_get(net, bearer_id); + if (b) + supp = (b->bcast_addr.broadcast == TIPC_BROADCAST_SUPPORT); + rcu_read_unlock(); + return supp; +} + int tipc_bearer_mtu(struct net *net, u32 bearer_id) { int mtu = 0; diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 278ff7f616f9..635c9086e19a 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -60,9 +60,14 @@ #define TIPC_MEDIA_TYPE_IB 2 #define TIPC_MEDIA_TYPE_UDP 3 -/* minimum bearer MTU */ +/* Minimum bearer MTU */ #define TIPC_MIN_BEARER_MTU (MAX_H_SIZE + INT_H_SIZE) +/* Identifiers for distinguishing between broadcast/multicast and replicast + */ +#define TIPC_BROADCAST_SUPPORT 1 +#define TIPC_REPLICAST_SUPPORT 2 + /** * struct tipc_media_addr - destination address used by TIPC bearers * @value: address info (format defined by media) @@ -210,6 +215,7 @@ int tipc_bearer_setup(void); void tipc_bearer_cleanup(void); void tipc_bearer_stop(struct net *net); int tipc_bearer_mtu(struct net *net, u32 bearer_id); +bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id); void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id, struct sk_buff *skb, struct tipc_media_addr *dest); diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 6b109a808d4c..02462d67d191 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -169,7 +169,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *skb, /* Send response, if necessary */ if (respond && (mtyp == DSC_REQ_MSG)) { - rskb = tipc_buf_acquire(MAX_H_SIZE); + rskb = tipc_buf_acquire(MAX_H_SIZE, GFP_ATOMIC); if (!rskb) return; tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer); @@ -278,7 +278,7 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b, req = kmalloc(sizeof(*req), GFP_ATOMIC); if (!req) return -ENOMEM; - req->buf = tipc_buf_acquire(MAX_H_SIZE); + req->buf = tipc_buf_acquire(MAX_H_SIZE, GFP_ATOMIC); if (!req->buf) { kfree(req); return -ENOMEM; diff --git a/net/tipc/link.c b/net/tipc/link.c index b758ca8b2f79..ddd2dd6f77aa 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, if (link_is_bc_sndlink(l)) l->state = LINK_ESTABLISHED; + /* Disable replicast if even a single peer doesn't support it */ + if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST)) + tipc_bcast_disable_rcast(net); + return true; } @@ -1032,11 +1036,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { - switch (msg_user(buf_msg(skb))) { + struct tipc_msg *hdr = buf_msg(skb); + + switch (msg_user(hdr)) { case TIPC_LOW_IMPORTANCE: case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: + if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { + skb_queue_tail(l->bc_rcvlink->inputq, skb); + return true; + } case CONN_MANAGER: skb_queue_tail(inputq, skb); return true; @@ -1384,7 +1394,7 @@ tnl: msg_set_seqno(hdr, seqno++); pktlen = msg_size(hdr); msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); - tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE); + tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC); if (!tnlskb) { pr_warn("%sunable to send packet\n", link_co_err); return; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index a22be502f1bd..312ef7de57d7 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -58,12 +58,12 @@ static unsigned int align(unsigned int i) * NOTE: Headroom is reserved to allow prepending of a data link header. * There may also be unrequested tailroom present at the buffer's end. */ -struct sk_buff *tipc_buf_acquire(u32 size) +struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp) { struct sk_buff *skb; unsigned int buf_size = (BUF_HEADROOM + size + 3) & ~3u; - skb = alloc_skb_fclone(buf_size, GFP_ATOMIC); + skb = alloc_skb_fclone(buf_size, gfp); if (skb) { skb_reserve(skb, BUF_HEADROOM); skb_put(skb, size); @@ -95,7 +95,7 @@ struct sk_buff *tipc_msg_create(uint user, uint type, struct tipc_msg *msg; struct sk_buff *buf; - buf = tipc_buf_acquire(hdr_sz + data_sz); + buf = tipc_buf_acquire(hdr_sz + data_sz, GFP_ATOMIC); if (unlikely(!buf)) return NULL; @@ -261,7 +261,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, /* No fragmentation needed? */ if (likely(msz <= pktmax)) { - skb = tipc_buf_acquire(msz); + skb = tipc_buf_acquire(msz, GFP_KERNEL); if (unlikely(!skb)) return -ENOMEM; skb_orphan(skb); @@ -282,7 +282,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, msg_set_importance(&pkthdr, msg_importance(mhdr)); /* Prepare first fragment */ - skb = tipc_buf_acquire(pktmax); + skb = tipc_buf_acquire(pktmax, GFP_KERNEL); if (!skb) return -ENOMEM; skb_orphan(skb); @@ -313,7 +313,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, pktsz = drem + INT_H_SIZE; else pktsz = pktmax; - skb = tipc_buf_acquire(pktsz); + skb = tipc_buf_acquire(pktsz, GFP_KERNEL); if (!skb) { rc = -ENOMEM; goto error; @@ -448,7 +448,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, if (msz > (max / 2)) return false; - _skb = tipc_buf_acquire(max); + _skb = tipc_buf_acquire(max, GFP_ATOMIC); if (!_skb) return false; @@ -496,7 +496,7 @@ bool tipc_msg_reverse(u32 own_node, struct sk_buff **skb, int err) /* Never return SHORT header; expand by replacing buffer if necessary */ if (msg_short(hdr)) { - *skb = tipc_buf_acquire(BASIC_H_SIZE + dlen); + *skb = tipc_buf_acquire(BASIC_H_SIZE + dlen, GFP_ATOMIC); if (!*skb) goto exit; memcpy((*skb)->data + BASIC_H_SIZE, msg_data(hdr), dlen); @@ -607,6 +607,23 @@ error: return false; } +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, + struct sk_buff_head *cpy) +{ + struct sk_buff *skb, *_skb; + + skb_queue_walk(msg, skb) { + _skb = pskb_copy(skb, GFP_ATOMIC); + if (!_skb) { + __skb_queue_purge(cpy); + return false; + } + msg_set_destnode(buf_msg(_skb), dst); + __skb_queue_tail(cpy, _skb); + } + return true; +} + /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number * @list: list to be appended to * @seqno: sequence number of buffer to add diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 850ae0e469f5..c843fd2bc48d 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -631,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id) static inline u32 msg_link_selector(struct tipc_msg *m) { + if (msg_user(m) == MSG_FRAGMENTER) + m = (void *)msg_data(m); return msg_bits(m, 4, 0, 1); } -static inline void msg_set_link_selector(struct tipc_msg *m, u32 n) -{ - msg_set_bits(m, 4, 0, 1, n); -} - /* * Word 5 */ @@ -818,7 +815,7 @@ static inline bool msg_is_reset(struct tipc_msg *hdr) return (msg_user(hdr) == LINK_PROTOCOL) && (msg_type(hdr) == RESET_MSG); } -struct sk_buff *tipc_buf_acquire(u32 size); +struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp); bool tipc_msg_validate(struct sk_buff *skb); bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err); void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, @@ -835,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, + struct sk_buff_head *cpy); void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, struct sk_buff *skb); diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index c1cfd92de17a..23f8899e0f8c 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -69,7 +69,7 @@ static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size, u32 dest) { struct tipc_net *tn = net_generic(net, tipc_net_id); - struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size); + struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size, GFP_ATOMIC); struct tipc_msg *msg; if (buf != NULL) { diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 5a86df1e5fc2..9be6592e4a6f 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -645,6 +645,39 @@ exit: return res; } +/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes + * - Creates list of nodes that overlap the given multicast address + * - Determines if any node local ports overlap + */ +void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, + u32 upper, u32 domain, + struct tipc_nlist *nodes) +{ + struct sub_seq *sseq, *stop; + struct publication *publ; + struct name_info *info; + struct name_seq *seq; + + rcu_read_lock(); + seq = nametbl_find_seq(net, type); + if (!seq) + goto exit; + + spin_lock_bh(&seq->lock); + sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); + stop = seq->sseqs + seq->first_free; + for (; sseq->lower <= upper && sseq != stop; sseq++) { + info = sseq->info; + list_for_each_entry(publ, &info->zone_list, zone_list) { + if (tipc_in_scope(domain, publ->node)) + tipc_nlist_add(nodes, publ->node); + } + } + spin_unlock_bh(&seq->lock); +exit: + rcu_read_unlock(); +} + /* * tipc_nametbl_publish - add name publication to network name tables */ @@ -1022,11 +1055,6 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) return skb->len; } -struct u32_item { - struct list_head list; - u32 value; -}; - bool u32_find(struct list_head *l, u32 value) { struct u32_item *item; diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index c89bb3f5c364..6ebdeb1d84a5 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -39,6 +39,7 @@ struct tipc_subscription; struct tipc_plist; +struct tipc_nlist; /* * TIPC name types reserved for internal TIPC use (both current and planned) @@ -100,6 +101,9 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports); +void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, + u32 upper, u32 domain, + struct tipc_nlist *nodes); struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, u32 upper, u32 scope, u32 port_ref, u32 key); @@ -116,6 +120,11 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s); int tipc_nametbl_init(struct net *net); void tipc_nametbl_stop(struct net *net); +struct u32_item { + struct list_head list; + u32 value; +}; + bool u32_push(struct list_head *l, u32 value); u32 u32_pop(struct list_head *l); bool u32_find(struct list_head *l, u32 value); diff --git a/net/tipc/net.c b/net/tipc/net.c index 28bf4feeb81c..ab8a2d5d1e32 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -110,6 +110,10 @@ int tipc_net_start(struct net *net, u32 addr) char addr_string[16]; tn->own_addr = addr; + + /* Ensure that the new address is visible before we reinit. */ + smp_mb(); + tipc_named_reinit(net); tipc_sk_reinit(net); diff --git a/net/tipc/node.c b/net/tipc/node.c index 2883f6a0ed98..e9295fa3a554 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -263,6 +263,11 @@ static void tipc_node_write_lock(struct tipc_node *n) write_lock_bh(&n->lock); } +static void tipc_node_write_unlock_fast(struct tipc_node *n) +{ + write_unlock_bh(&n->lock); +} + static void tipc_node_write_unlock(struct tipc_node *n) { struct net *net = n->net; @@ -417,7 +422,7 @@ void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr) } tipc_node_write_lock(n); list_add_tail(subscr, &n->publ_list); - tipc_node_write_unlock(n); + tipc_node_write_unlock_fast(n); tipc_node_put(n); } @@ -435,7 +440,7 @@ void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr) } tipc_node_write_lock(n); list_del_init(subscr); - tipc_node_write_unlock(n); + tipc_node_write_unlock_fast(n); tipc_node_put(n); } @@ -1257,6 +1262,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb) kfree_skb(skb); } +static void tipc_node_mcast_rcv(struct tipc_node *n) +{ + struct tipc_bclink_entry *be = &n->bc_entry; + + /* 'arrvq' is under inputq2's lock protection */ + spin_lock_bh(&be->inputq2.lock); + spin_lock_bh(&be->inputq1.lock); + skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); + spin_unlock_bh(&be->inputq1.lock); + spin_unlock_bh(&be->inputq2.lock); + tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2); +} + static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, int bearer_id, struct sk_buff_head *xmitq) { @@ -1330,15 +1348,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id if (!skb_queue_empty(&xmitq)) tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); - /* Deliver. 'arrvq' is under inputq2's lock protection */ - if (!skb_queue_empty(&be->inputq1)) { - spin_lock_bh(&be->inputq2.lock); - spin_lock_bh(&be->inputq1.lock); - skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); - spin_unlock_bh(&be->inputq1.lock); - spin_unlock_bh(&be->inputq2.lock); - tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2); - } + if (!skb_queue_empty(&be->inputq1)) + tipc_node_mcast_rcv(n); if (rc & TIPC_LINK_DOWN_EVT) { /* Reception reassembly failure => reset all links to peer */ @@ -1565,6 +1576,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) tipc_named_rcv(net, &n->bc_entry.namedq); + if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1))) + tipc_node_mcast_rcv(n); + if (!skb_queue_empty(&le->inputq)) tipc_sk_rcv(net, &le->inputq); diff --git a/net/tipc/node.h b/net/tipc/node.h index 39ef54c1f2ad..898c22916984 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -47,11 +47,13 @@ enum { TIPC_BCAST_SYNCH = (1 << 1), TIPC_BCAST_STATE_NACK = (1 << 2), - TIPC_BLOCK_FLOWCTL = (1 << 3) + TIPC_BLOCK_FLOWCTL = (1 << 3), + TIPC_BCAST_RCAST = (1 << 4) }; #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ TIPC_BCAST_STATE_NACK | \ + TIPC_BCAST_RCAST | \ TIPC_BLOCK_FLOWCTL) #define INVALID_BEARER_ID -1 diff --git a/net/tipc/server.c b/net/tipc/server.c index 215849ce453d..3cd6402e812c 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -86,12 +86,12 @@ struct outqueue_entry { static void tipc_recv_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work); static void tipc_clean_outqueues(struct tipc_conn *con); -static void tipc_sock_release(struct tipc_conn *con); static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); - struct sockaddr_tipc *saddr = con->server->saddr; + struct tipc_server *s = con->server; + struct sockaddr_tipc *saddr = s->saddr; struct socket *sock = con->sock; struct sock *sk; @@ -103,9 +103,13 @@ static void tipc_conn_kref_release(struct kref *kref) } saddr->scope = -TIPC_NODE_SCOPE; kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); - tipc_sock_release(con); sock_release(sock); con->sock = NULL; + + spin_lock_bh(&s->idr_lock); + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + spin_unlock_bh(&s->idr_lock); } tipc_clean_outqueues(con); @@ -128,8 +132,10 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) spin_lock_bh(&s->idr_lock); con = idr_find(&s->conn_idr, conid); - if (con) + if (con && test_bit(CF_CONNECTED, &con->flags)) conn_get(con); + else + con = NULL; spin_unlock_bh(&s->idr_lock); return con; } @@ -186,26 +192,15 @@ static void tipc_unregister_callbacks(struct tipc_conn *con) write_unlock_bh(&sk->sk_callback_lock); } -static void tipc_sock_release(struct tipc_conn *con) -{ - struct tipc_server *s = con->server; - - if (con->conid) - s->tipc_conn_release(con->conid, con->usr_data); - - tipc_unregister_callbacks(con); -} - static void tipc_close_conn(struct tipc_conn *con) { struct tipc_server *s = con->server; if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { + tipc_unregister_callbacks(con); - spin_lock_bh(&s->idr_lock); - idr_remove(&s->conn_idr, con->conid); - s->idr_in_use--; - spin_unlock_bh(&s->idr_lock); + if (con->conid) + s->tipc_conn_release(con->conid, con->usr_data); /* We shouldn't flush pending works as we may be in the * thread. In fact the races with pending rx/tx work structs @@ -458,6 +453,11 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, if (!con) return -EINVAL; + if (!test_bit(CF_CONNECTED, &con->flags)) { + conn_put(con); + return 0; + } + e = tipc_alloc_entry(data, len); if (!e) { conn_put(con); @@ -471,12 +471,8 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); - if (test_bit(CF_CONNECTED, &con->flags)) { - if (!queue_work(s->send_wq, &con->swork)) - conn_put(con); - } else { + if (!queue_work(s->send_wq, &con->swork)) conn_put(con); - } return 0; } @@ -500,7 +496,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) int ret; spin_lock_bh(&con->outqueue_lock); - while (1) { + while (test_bit(CF_CONNECTED, &con->flags)) { e = list_entry(con->outqueue.next, struct outqueue_entry, list); if ((struct list_head *) e == &con->outqueue) @@ -623,14 +619,12 @@ int tipc_server_start(struct tipc_server *s) void tipc_server_stop(struct tipc_server *s) { struct tipc_conn *con; - int total = 0; int id; spin_lock_bh(&s->idr_lock); - for (id = 0; total < s->idr_in_use; id++) { + for (id = 0; s->idr_in_use; id++) { con = idr_find(&s->conn_idr, id); if (con) { - total++; spin_unlock_bh(&s->idr_lock); tipc_close_conn(con); spin_lock_bh(&s->idr_lock); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d2f353934f82..6b09a778cc71 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -79,6 +79,7 @@ enum { * @rcv_unacked: # messages read by user, but not yet acked back to peer * @peer: 'connected' peer for dgram/rdm * @node: hash table node + * @mc_method: cookie for use between socket and broadcast layer * @rcu: rcu struct for tipc_sock */ struct tipc_sock { @@ -103,6 +104,7 @@ struct tipc_sock { u16 rcv_win; struct sockaddr_tipc peer; struct rhash_head node; + struct tipc_mc_method mc_method; struct rcu_head rcu; }; @@ -428,8 +430,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock, INIT_LIST_HEAD(&tsk->cong_links); msg = &tsk->phdr; tn = net_generic(sock_net(sk), tipc_net_id); - tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, - NAMED_H_SIZE, 0); /* Finish initializing socket data structures */ sock->ops = ops; @@ -439,6 +439,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock, pr_warn("Socket create failed; port number exhausted\n"); return -EINVAL; } + + /* Ensure tsk is visible before we read own_addr. */ + smp_mb(); + + tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, + NAMED_H_SIZE, 0); + msg_set_origport(msg, tsk->portid); setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); sk->sk_shutdown = 0; @@ -740,32 +747,44 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, struct tipc_msg *hdr = &tsk->phdr; struct net *net = sock_net(sk); int mtu = tipc_bcast_get_mtu(net); + struct tipc_mc_method *method = &tsk->mc_method; + u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE); struct sk_buff_head pkts; + struct tipc_nlist dsts; int rc; + /* Block or return if any destination link is congested */ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); if (unlikely(rc)) return rc; + /* Lookup destination nodes */ + tipc_nlist_init(&dsts, tipc_own_addr(net)); + tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower, + seq->upper, domain, &dsts); + if (!dsts.local && !dsts.remote) + return -EHOSTUNREACH; + + /* Build message header */ msg_set_type(hdr, TIPC_MCAST_MSG); + msg_set_hdr_sz(hdr, MCAST_H_SIZE); msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); msg_set_destport(hdr, 0); msg_set_destnode(hdr, 0); msg_set_nametype(hdr, seq->type); msg_set_namelower(hdr, seq->lower); msg_set_nameupper(hdr, seq->upper); - msg_set_hdr_sz(hdr, MCAST_H_SIZE); + /* Build message as chain of buffers */ skb_queue_head_init(&pkts); rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); - if (unlikely(rc != dlen)) - return rc; - rc = tipc_bcast_xmit(net, &pkts); - if (unlikely(rc == -ELINKCONG)) { - tsk->cong_link_cnt = 1; - rc = 0; - } + /* Send message if build was successful */ + if (unlikely(rc == dlen)) + rc = tipc_mcast_xmit(net, &pkts, method, &dsts, + &tsk->cong_link_cnt); + + tipc_nlist_purge(&dsts); return rc ? rc : dlen; } @@ -2220,24 +2239,27 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, void tipc_sk_reinit(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); - const struct bucket_table *tbl; - struct rhash_head *pos; + struct rhashtable_iter iter; struct tipc_sock *tsk; struct tipc_msg *msg; - int i; - rcu_read_lock(); - tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht); - for (i = 0; i < tbl->size; i++) { - rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { + rhashtable_walk_enter(&tn->sk_rht, &iter); + + do { + tsk = ERR_PTR(rhashtable_walk_start(&iter)); + if (tsk) + continue; + + while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) { spin_lock_bh(&tsk->sk.sk_lock.slock); msg = &tsk->phdr; msg_set_prevnode(msg, tn->own_addr); msg_set_orignode(msg, tn->own_addr); spin_unlock_bh(&tsk->sk.sk_lock.slock); } - } - rcu_read_unlock(); + + rhashtable_walk_stop(&iter); + } while (tsk == ERR_PTR(-EAGAIN)); } static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid) @@ -2333,18 +2355,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - u32 value; - int res; + u32 value = 0; + int res = 0; if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM)) return 0; if (lvl != SOL_TIPC) return -ENOPROTOOPT; - if (ol < sizeof(value)) - return -EINVAL; - res = get_user(value, (u32 __user *)ov); - if (res) - return res; + + switch (opt) { + case TIPC_IMPORTANCE: + case TIPC_SRC_DROPPABLE: + case TIPC_DEST_DROPPABLE: + case TIPC_CONN_TIMEOUT: + if (ol < sizeof(value)) + return -EINVAL; + res = get_user(value, (u32 __user *)ov); + if (res) + return res; + break; + default: + if (ov || ol) + return -EINVAL; + } lock_sock(sk); @@ -2363,7 +2396,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, break; case TIPC_CONN_TIMEOUT: tipc_sk(sk)->conn_timeout = value; - /* no need to set "res", since already 0 at this point */ + break; + case TIPC_MCAST_BROADCAST: + tsk->mc_method.rcast = false; + tsk->mc_method.mandatory = true; + break; + case TIPC_MCAST_REPLICAST: + tsk->mc_method.rcast = true; + tsk->mc_method.mandatory = true; break; default: res = -EINVAL; diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 0dd02244e21d..9d94e65d0894 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -54,6 +54,8 @@ struct tipc_subscriber { static void tipc_subscrp_delete(struct tipc_subscription *sub); static void tipc_subscrb_put(struct tipc_subscriber *subscriber); +static void tipc_subscrp_put(struct tipc_subscription *subscription); +static void tipc_subscrp_get(struct tipc_subscription *subscription); /** * htohl - convert value to endianness used by destination @@ -123,6 +125,7 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, { struct tipc_name_seq seq; + tipc_subscrp_get(sub); tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq); if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) return; @@ -132,30 +135,23 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, node); + tipc_subscrp_put(sub); } static void tipc_subscrp_timeout(unsigned long data) { struct tipc_subscription *sub = (struct tipc_subscription *)data; - struct tipc_subscriber *subscriber = sub->subscriber; /* Notify subscriber of timeout */ tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, TIPC_SUBSCR_TIMEOUT, 0, 0); - spin_lock_bh(&subscriber->lock); - tipc_subscrp_delete(sub); - spin_unlock_bh(&subscriber->lock); - - tipc_subscrb_put(subscriber); + tipc_subscrp_put(sub); } static void tipc_subscrb_kref_release(struct kref *kref) { - struct tipc_subscriber *subcriber = container_of(kref, - struct tipc_subscriber, kref); - - kfree(subcriber); + kfree(container_of(kref,struct tipc_subscriber, kref)); } static void tipc_subscrb_put(struct tipc_subscriber *subscriber) @@ -168,6 +164,59 @@ static void tipc_subscrb_get(struct tipc_subscriber *subscriber) kref_get(&subscriber->kref); } +static void tipc_subscrp_kref_release(struct kref *kref) +{ + struct tipc_subscription *sub = container_of(kref, + struct tipc_subscription, + kref); + struct tipc_net *tn = net_generic(sub->net, tipc_net_id); + struct tipc_subscriber *subscriber = sub->subscriber; + + spin_lock_bh(&subscriber->lock); + tipc_nametbl_unsubscribe(sub); + list_del(&sub->subscrp_list); + atomic_dec(&tn->subscription_count); + spin_unlock_bh(&subscriber->lock); + kfree(sub); + tipc_subscrb_put(subscriber); +} + +static void tipc_subscrp_put(struct tipc_subscription *subscription) +{ + kref_put(&subscription->kref, tipc_subscrp_kref_release); +} + +static void tipc_subscrp_get(struct tipc_subscription *subscription) +{ + kref_get(&subscription->kref); +} + +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all + * subscriptions for a given subscriber. + */ +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber, + struct tipc_subscr *s) +{ + struct list_head *subscription_list = &subscriber->subscrp_list; + struct tipc_subscription *sub, *temp; + + spin_lock_bh(&subscriber->lock); + list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) { + if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) + continue; + + tipc_subscrp_get(sub); + spin_unlock_bh(&subscriber->lock); + tipc_subscrp_delete(sub); + tipc_subscrp_put(sub); + spin_lock_bh(&subscriber->lock); + + if (s) + break; + } + spin_unlock_bh(&subscriber->lock); +} + static struct tipc_subscriber *tipc_subscrb_create(int conid) { struct tipc_subscriber *subscriber; @@ -177,8 +226,8 @@ static struct tipc_subscriber *tipc_subscrb_create(int conid) pr_warn("Subscriber rejected, no memory\n"); return NULL; } - kref_init(&subscriber->kref); INIT_LIST_HEAD(&subscriber->subscrp_list); + kref_init(&subscriber->kref); subscriber->conid = conid; spin_lock_init(&subscriber->lock); @@ -187,55 +236,22 @@ static struct tipc_subscriber *tipc_subscrb_create(int conid) static void tipc_subscrb_delete(struct tipc_subscriber *subscriber) { - struct tipc_subscription *sub, *temp; - u32 timeout; - - spin_lock_bh(&subscriber->lock); - /* Destroy any existing subscriptions for subscriber */ - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, - subscrp_list) { - timeout = htohl(sub->evt.s.timeout, sub->swap); - if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) { - tipc_subscrp_delete(sub); - tipc_subscrb_put(subscriber); - } - } - spin_unlock_bh(&subscriber->lock); - + tipc_subscrb_subscrp_delete(subscriber, NULL); tipc_subscrb_put(subscriber); } static void tipc_subscrp_delete(struct tipc_subscription *sub) { - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); + u32 timeout = htohl(sub->evt.s.timeout, sub->swap); - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscrp_list); - kfree(sub); - atomic_dec(&tn->subscription_count); + if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) + tipc_subscrp_put(sub); } static void tipc_subscrp_cancel(struct tipc_subscr *s, struct tipc_subscriber *subscriber) { - struct tipc_subscription *sub, *temp; - u32 timeout; - - spin_lock_bh(&subscriber->lock); - /* Find first matching subscription, exit if not found */ - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, - subscrp_list) { - if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { - timeout = htohl(sub->evt.s.timeout, sub->swap); - if ((timeout == TIPC_WAIT_FOREVER) || - del_timer(&sub->timer)) { - tipc_subscrp_delete(sub); - tipc_subscrb_put(subscriber); - } - break; - } - } - spin_unlock_bh(&subscriber->lock); + tipc_subscrb_subscrp_delete(subscriber, s); } static struct tipc_subscription *tipc_subscrp_create(struct net *net, @@ -272,6 +288,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(*s)); atomic_inc(&tn->subscription_count); + kref_init(&sub->kref); return sub; } @@ -288,17 +305,16 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, spin_lock_bh(&subscriber->lock); list_add(&sub->subscrp_list, &subscriber->subscrp_list); - tipc_subscrb_get(subscriber); sub->subscriber = subscriber; tipc_nametbl_subscribe(sub); + tipc_subscrb_get(subscriber); spin_unlock_bh(&subscriber->lock); + setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); timeout = htohl(sub->evt.s.timeout, swap); - if (timeout == TIPC_WAIT_FOREVER) - return; - setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); - mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); + if (timeout != TIPC_WAIT_FOREVER) + mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); } /* Handle one termination request for the subscriber */ diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index be60103082c9..ffdc214c117a 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -57,6 +57,7 @@ struct tipc_subscriber; * @evt: template for events generated by subscription */ struct tipc_subscription { + struct kref kref; struct tipc_subscriber *subscriber; struct net *net; struct timer_list timer; diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index b58dc95f3d35..46061cf48cd1 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -113,7 +113,7 @@ static void tipc_udp_media_addr_set(struct tipc_media_addr *addr, memcpy(addr->value, ua, sizeof(struct udp_media_addr)); if (tipc_udp_is_mcast_addr(ua)) - addr->broadcast = 1; + addr->broadcast = TIPC_BROADCAST_SUPPORT; } /* tipc_udp_addr2str - convert ip/udp address to string */ @@ -229,7 +229,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, goto out; } - if (!addr->broadcast || list_empty(&ub->rcast.list)) + if (addr->broadcast != TIPC_REPLICAST_SUPPORT) return tipc_udp_xmit(net, skb, ub, src, dst); /* Replicast, send an skb to each configured IP address */ @@ -296,7 +296,7 @@ static int tipc_udp_rcast_add(struct tipc_bearer *b, else if (ntohs(addr->proto) == ETH_P_IPV6) pr_info("New replicast peer: %pI6\n", &rcast->addr.ipv6); #endif - + b->bcast_addr.broadcast = TIPC_REPLICAST_SUPPORT; list_add_rcu(&rcast->list, &ub->rcast.list); return 0; } @@ -681,7 +681,7 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b, goto err; b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP; - b->bcast_addr.broadcast = 1; + b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT; rcu_assign_pointer(b->media_ptr, ub); rcu_assign_pointer(ub->bearer, b); tipc_udp_media_addr_set(&b->addr, &local); |