diff options
Diffstat (limited to 'net/tipc/socket.c')
| -rw-r--r-- | net/tipc/socket.c | 525 | 
1 files changed, 259 insertions, 266 deletions
| diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 800caaa699a1..43e4045e72bc 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -35,6 +35,8 @@   */  #include <linux/rhashtable.h> +#include <linux/sched/signal.h> +  #include "core.h"  #include "name_table.h"  #include "node.h" @@ -67,16 +69,19 @@ enum {   * @max_pkt: maximum packet size "hint" used when building messages sent by port   * @portid: unique port identity in TIPC socket hash table   * @phdr: preformatted message header used when sending messages + * #cong_links: list of congested links   * @publications: list of publications for port + * @blocking_link: address of the congested link we are currently sleeping on   * @pub_count: total # of publications port has made during its lifetime   * @probing_state:   * @conn_timeout: the time we can wait for an unresponded setup request   * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue - * @link_cong: non-zero if owner must sleep because of link congestion + * @cong_link_cnt: number of congested links   * @sent_unacked: # messages sent by socket, and not yet acked by peer   * @rcv_unacked: # messages read by user, but not yet acked back to peer   * @peer: 'connected' peer for dgram/rdm   * @node: hash table node + * @mc_method: cookie for use between socket and broadcast layer   * @rcu: rcu struct for tipc_sock   */  struct tipc_sock { @@ -87,13 +92,13 @@ struct tipc_sock {  	u32 max_pkt;  	u32 portid;  	struct tipc_msg phdr; -	struct list_head sock_list; +	struct list_head cong_links;  	struct list_head publications;  	u32 pub_count;  	uint conn_timeout;  	atomic_t dupl_rcvcnt;  	bool probe_unacked; -	bool link_cong; +	u16 cong_link_cnt;  	u16 snt_unacked;  	u16 snd_win;  	u16 peer_caps; @@ -101,6 +106,7 @@ struct tipc_sock {  	u16 rcv_win;  	struct sockaddr_tipc peer;  	struct rhash_head node; +	struct tipc_mc_method mc_method;  	struct rcu_head rcu;  }; @@ -110,7 +116,6 @@ static void tipc_write_space(struct sock *sk);  static void tipc_sock_destruct(struct sock *sk);  static int tipc_release(struct socket *sock);  static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);  static void tipc_sk_timeout(unsigned long data);  static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,  			   struct tipc_name_seq const *seq); @@ -119,8 +124,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);  static int tipc_sk_insert(struct tipc_sock *tsk);  static void tipc_sk_remove(struct tipc_sock *tsk); -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, -			      size_t dsz); +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);  static const struct proto_ops packet_ops; @@ -334,6 +338,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)  	return res;  } +static int tipc_sk_sock_err(struct socket *sock, long *timeout) +{ +	struct sock *sk = sock->sk; +	int err = sock_error(sk); +	int typ = sock->type; + +	if (err) +		return err; +	if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) { +		if (sk->sk_state == TIPC_DISCONNECTING) +			return -EPIPE; +		else if (!tipc_sk_connected(sk)) +			return -ENOTCONN; +	} +	if (!*timeout) +		return -EAGAIN; +	if (signal_pending(current)) +		return sock_intr_errno(*timeout); + +	return 0; +} + +#define tipc_wait_for_cond(sock_, timeout_, condition_)			\ +({								        \ +	int rc_ = 0;							\ +	int done_ = 0;							\ +									\ +	while (!(condition_) && !done_) {				\ +		struct sock *sk_ = sock->sk;				\ +		DEFINE_WAIT_FUNC(wait_, woken_wake_function);		\ +									\ +		rc_ = tipc_sk_sock_err(sock_, timeout_);		\ +		if (rc_)						\ +			break;						\ +		prepare_to_wait(sk_sleep(sk_), &wait_,			\ +				TASK_INTERRUPTIBLE);			\ +		done_ = sk_wait_event(sk_, timeout_,			\ +				      (condition_), &wait_);		\ +		remove_wait_queue(sk_sleep(sk_), &wait_);		\ +	}								\ +	rc_;								\ +}) +  /**   * tipc_sk_create - create a TIPC socket   * @net: network namespace (must be default network) @@ -382,10 +429,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  	tsk = tipc_sk(sk);  	tsk->max_pkt = MAX_PKT_DEFAULT;  	INIT_LIST_HEAD(&tsk->publications); +	INIT_LIST_HEAD(&tsk->cong_links);  	msg = &tsk->phdr;  	tn = net_generic(sock_net(sk), tipc_net_id); -	tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, -		      NAMED_H_SIZE, 0);  	/* Finish initializing socket data structures */  	sock->ops = ops; @@ -395,6 +441,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  		pr_warn("Socket create failed; port number exhausted\n");  		return -EINVAL;  	} + +	/* Ensure tsk is visible before we read own_addr. */ +	smp_mb(); + +	tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, +		      NAMED_H_SIZE, 0); +  	msg_set_origport(msg, tsk->portid);  	setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);  	sk->sk_shutdown = 0; @@ -432,9 +485,14 @@ static void __tipc_shutdown(struct socket *sock, int error)  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk);  	struct net *net = sock_net(sk); +	long timeout = CONN_TIMEOUT_DEFAULT;  	u32 dnode = tsk_peer_node(tsk);  	struct sk_buff *skb; +	/* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */ +	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt && +					    !tsk_conn_cong(tsk))); +  	/* Reject all unreceived messages, except on an active connection  	 * (which disconnects locally & sends a 'FIN+' to peer).  	 */ @@ -505,7 +563,8 @@ static int tipc_release(struct socket *sock)  	/* Reject any messages that accumulated in backlog queue */  	release_sock(sk); - +	u32_list_purge(&tsk->cong_links); +	tsk->cong_link_cnt = 0;  	call_rcu(&tsk->rcu, tipc_sk_callback);  	sock->sk = NULL; @@ -648,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,  	switch (sk->sk_state) {  	case TIPC_ESTABLISHED: -		if (!tsk->link_cong && !tsk_conn_cong(tsk)) +		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))  			mask |= POLLOUT;  		/* fall thru' */  	case TIPC_LISTEN: @@ -657,7 +716,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,  			mask |= (POLLIN | POLLRDNORM);  		break;  	case TIPC_OPEN: -		if (!tsk->link_cong) +		if (!tsk->cong_link_cnt)  			mask |= POLLOUT;  		if (tipc_sk_type_connectionless(sk) &&  		    (!skb_queue_empty(&sk->sk_receive_queue))) @@ -676,63 +735,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,   * @sock: socket structure   * @seq: destination address   * @msg: message to send - * @dsz: total length of message data - * @timeo: timeout to wait for wakeup + * @dlen: length of data to send + * @timeout: timeout to wait for wakeup   *   * Called from function tipc_sendmsg(), which has done all sanity checks   * Returns the number of bytes sent on success, or errno   */  static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq, -			  struct msghdr *msg, size_t dsz, long timeo) +			  struct msghdr *msg, size_t dlen, long timeout)  {  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_msg *hdr = &tsk->phdr;  	struct net *net = sock_net(sk); -	struct tipc_msg *mhdr = &tsk->phdr; -	struct sk_buff_head pktchain; -	struct iov_iter save = msg->msg_iter; -	uint mtu; +	int mtu = tipc_bcast_get_mtu(net); +	struct tipc_mc_method *method = &tsk->mc_method; +	u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE); +	struct sk_buff_head pkts; +	struct tipc_nlist dsts;  	int rc; -	if (!timeo && tsk->link_cong) -		return -ELINKCONG; +	/* Block or return if any destination link is congested */ +	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); +	if (unlikely(rc)) +		return rc; -	msg_set_type(mhdr, TIPC_MCAST_MSG); -	msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); -	msg_set_destport(mhdr, 0); -	msg_set_destnode(mhdr, 0); -	msg_set_nametype(mhdr, seq->type); -	msg_set_namelower(mhdr, seq->lower); -	msg_set_nameupper(mhdr, seq->upper); -	msg_set_hdr_sz(mhdr, MCAST_H_SIZE); +	/* Lookup destination nodes */ +	tipc_nlist_init(&dsts, tipc_own_addr(net)); +	tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower, +				      seq->upper, domain, &dsts); +	if (!dsts.local && !dsts.remote) +		return -EHOSTUNREACH; -	skb_queue_head_init(&pktchain); +	/* Build message header */ +	msg_set_type(hdr, TIPC_MCAST_MSG); +	msg_set_hdr_sz(hdr, MCAST_H_SIZE); +	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); +	msg_set_destport(hdr, 0); +	msg_set_destnode(hdr, 0); +	msg_set_nametype(hdr, seq->type); +	msg_set_namelower(hdr, seq->lower); +	msg_set_nameupper(hdr, seq->upper); -new_mtu: -	mtu = tipc_bcast_get_mtu(net); -	rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); -	if (unlikely(rc < 0)) -		return rc; +	/* Build message as chain of buffers */ +	skb_queue_head_init(&pkts); +	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); -	do { -		rc = tipc_bcast_xmit(net, &pktchain); -		if (likely(!rc)) -			return dsz; +	/* Send message if build was successful */ +	if (unlikely(rc == dlen)) +		rc = tipc_mcast_xmit(net, &pkts, method, &dsts, +				     &tsk->cong_link_cnt); -		if (rc == -ELINKCONG) { -			tsk->link_cong = 1; -			rc = tipc_wait_for_sndmsg(sock, &timeo); -			if (!rc) -				continue; -		} -		__skb_queue_purge(&pktchain); -		if (rc == -EMSGSIZE) { -			msg->msg_iter = save; -			goto new_mtu; -		} -		break; -	} while (1); -	return rc; +	tipc_nlist_purge(&dsts); + +	return rc ? rc : dlen;  }  /** @@ -746,7 +802,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  		       struct sk_buff_head *inputq)  {  	struct tipc_msg *msg; -	struct tipc_plist dports; +	struct list_head dports;  	u32 portid;  	u32 scope = TIPC_CLUSTER_SCOPE;  	struct sk_buff_head tmpq; @@ -754,7 +810,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  	struct sk_buff *skb, *_skb;  	__skb_queue_head_init(&tmpq); -	tipc_plist_init(&dports); +	INIT_LIST_HEAD(&dports);  	skb = tipc_skb_peek(arrvq, &inputq->lock);  	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { @@ -768,8 +824,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  		tipc_nametbl_mc_translate(net,  					  msg_nametype(msg), msg_namelower(msg),  					  msg_nameupper(msg), scope, &dports); -		portid = tipc_plist_pop(&dports); -		for (; portid; portid = tipc_plist_pop(&dports)) { +		portid = u32_pop(&dports); +		for (; portid; portid = u32_pop(&dports)) {  			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);  			if (_skb) {  				msg_set_destport(buf_msg(_skb), portid); @@ -830,31 +886,6 @@ exit:  	kfree_skb(skb);  } -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) -{ -	DEFINE_WAIT_FUNC(wait, woken_wake_function); -	struct sock *sk = sock->sk; -	struct tipc_sock *tsk = tipc_sk(sk); -	int done; - -	do { -		int err = sock_error(sk); -		if (err) -			return err; -		if (sk->sk_shutdown & SEND_SHUTDOWN) -			return -EPIPE; -		if (!*timeo_p) -			return -EAGAIN; -		if (signal_pending(current)) -			return sock_intr_errno(*timeo_p); - -		add_wait_queue(sk_sleep(sk), &wait); -		done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait); -		remove_wait_queue(sk_sleep(sk), &wait); -	} while (!done); -	return 0; -} -  /**   * tipc_sendmsg - send message in connectionless manner   * @sock: socket structure @@ -881,35 +912,38 @@ static int tipc_sendmsg(struct socket *sock,  	return ret;  } -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)  { -	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);  	struct sock *sk = sock->sk; -	struct tipc_sock *tsk = tipc_sk(sk);  	struct net *net = sock_net(sk); -	struct tipc_msg *mhdr = &tsk->phdr; -	u32 dnode, dport; -	struct sk_buff_head pktchain; -	bool is_connectionless = tipc_sk_type_connectionless(sk); -	struct sk_buff *skb; +	struct tipc_sock *tsk = tipc_sk(sk); +	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); +	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); +	struct list_head *clinks = &tsk->cong_links; +	bool syn = !tipc_sk_type_connectionless(sk); +	struct tipc_msg *hdr = &tsk->phdr;  	struct tipc_name_seq *seq; -	struct iov_iter save; -	u32 mtu; -	long timeo; -	int rc; +	struct sk_buff_head pkts; +	u32 type, inst, domain; +	u32 dnode, dport; +	int mtu, rc; -	if (dsz > TIPC_MAX_USER_MSG_SIZE) +	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))  		return -EMSGSIZE; +  	if (unlikely(!dest)) { -		if (is_connectionless && tsk->peer.family == AF_TIPC) -			dest = &tsk->peer; -		else +		dest = &tsk->peer; +		if (!syn || dest->family != AF_TIPC)  			return -EDESTADDRREQ; -	} else if (unlikely(m->msg_namelen < sizeof(*dest)) || -		   dest->family != AF_TIPC) { -		return -EINVAL;  	} -	if (!is_connectionless) { + +	if (unlikely(m->msg_namelen < sizeof(*dest))) +		return -EINVAL; + +	if (unlikely(dest->family != AF_TIPC)) +		return -EINVAL; + +	if (unlikely(syn)) {  		if (sk->sk_state == TIPC_LISTEN)  			return -EPIPE;  		if (sk->sk_state != TIPC_OPEN) @@ -921,102 +955,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)  			tsk->conn_instance = dest->addr.name.name.instance;  		}  	} -	seq = &dest->addr.nameseq; -	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); -	if (dest->addrtype == TIPC_ADDR_MCAST) { -		return tipc_sendmcast(sock, seq, m, dsz, timeo); -	} else if (dest->addrtype == TIPC_ADDR_NAME) { -		u32 type = dest->addr.name.name.type; -		u32 inst = dest->addr.name.name.instance; -		u32 domain = dest->addr.name.domain; +	seq = &dest->addr.nameseq; +	if (dest->addrtype == TIPC_ADDR_MCAST) +		return tipc_sendmcast(sock, seq, m, dlen, timeout); +	if (dest->addrtype == TIPC_ADDR_NAME) { +		type = dest->addr.name.name.type; +		inst = dest->addr.name.name.instance; +		domain = dest->addr.name.domain;  		dnode = domain; -		msg_set_type(mhdr, TIPC_NAMED_MSG); -		msg_set_hdr_sz(mhdr, NAMED_H_SIZE); -		msg_set_nametype(mhdr, type); -		msg_set_nameinst(mhdr, inst); -		msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); +		msg_set_type(hdr, TIPC_NAMED_MSG); +		msg_set_hdr_sz(hdr, NAMED_H_SIZE); +		msg_set_nametype(hdr, type); +		msg_set_nameinst(hdr, inst); +		msg_set_lookup_scope(hdr, tipc_addr_scope(domain));  		dport = tipc_nametbl_translate(net, type, inst, &dnode); -		msg_set_destnode(mhdr, dnode); -		msg_set_destport(mhdr, dport); +		msg_set_destnode(hdr, dnode); +		msg_set_destport(hdr, dport);  		if (unlikely(!dport && !dnode))  			return -EHOSTUNREACH; +  	} else if (dest->addrtype == TIPC_ADDR_ID) {  		dnode = dest->addr.id.node; -		msg_set_type(mhdr, TIPC_DIRECT_MSG); -		msg_set_lookup_scope(mhdr, 0); -		msg_set_destnode(mhdr, dnode); -		msg_set_destport(mhdr, dest->addr.id.ref); -		msg_set_hdr_sz(mhdr, BASIC_H_SIZE); +		msg_set_type(hdr, TIPC_DIRECT_MSG); +		msg_set_lookup_scope(hdr, 0); +		msg_set_destnode(hdr, dnode); +		msg_set_destport(hdr, dest->addr.id.ref); +		msg_set_hdr_sz(hdr, BASIC_H_SIZE);  	} -	skb_queue_head_init(&pktchain); -	save = m->msg_iter; -new_mtu: -	mtu = tipc_node_get_mtu(net, dnode, tsk->portid); -	rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain); -	if (rc < 0) +	/* Block or return if destination link is congested */ +	rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); +	if (unlikely(rc))  		return rc; -	do { -		skb = skb_peek(&pktchain); -		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; -		rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); -		if (likely(!rc)) { -			if (!is_connectionless) -				tipc_set_sk_state(sk, TIPC_CONNECTING); -			return dsz; -		} -		if (rc == -ELINKCONG) { -			tsk->link_cong = 1; -			rc = tipc_wait_for_sndmsg(sock, &timeo); -			if (!rc) -				continue; -		} -		__skb_queue_purge(&pktchain); -		if (rc == -EMSGSIZE) { -			m->msg_iter = save; -			goto new_mtu; -		} -		break; -	} while (1); - -	return rc; -} +	skb_queue_head_init(&pkts); +	mtu = tipc_node_get_mtu(net, dnode, tsk->portid); +	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); +	if (unlikely(rc != dlen)) +		return rc; -static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) -{ -	DEFINE_WAIT_FUNC(wait, woken_wake_function); -	struct sock *sk = sock->sk; -	struct tipc_sock *tsk = tipc_sk(sk); -	int done; +	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); +	if (unlikely(rc == -ELINKCONG)) { +		u32_push(clinks, dnode); +		tsk->cong_link_cnt++; +		rc = 0; +	} -	do { -		int err = sock_error(sk); -		if (err) -			return err; -		if (sk->sk_state == TIPC_DISCONNECTING) -			return -EPIPE; -		else if (!tipc_sk_connected(sk)) -			return -ENOTCONN; -		if (!*timeo_p) -			return -EAGAIN; -		if (signal_pending(current)) -			return sock_intr_errno(*timeo_p); +	if (unlikely(syn && !rc)) +		tipc_set_sk_state(sk, TIPC_CONNECTING); -		add_wait_queue(sk_sleep(sk), &wait); -		done = sk_wait_event(sk, timeo_p, -				     (!tsk->link_cong && -				      !tsk_conn_cong(tsk)) || -				      !tipc_sk_connected(sk), &wait); -		remove_wait_queue(sk_sleep(sk), &wait); -	} while (!done); -	return 0; +	return rc ? rc : dlen;  }  /** - * tipc_send_stream - send stream-oriented data + * tipc_sendstream - send stream-oriented data   * @sock: socket structure   * @m: data to send   * @dsz: total length of data to be transmitted @@ -1026,94 +1020,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)   * Returns the number of bytes sent on success (or partial success),   * or errno if no data sent   */ -static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)  {  	struct sock *sk = sock->sk;  	int ret;  	lock_sock(sk); -	ret = __tipc_send_stream(sock, m, dsz); +	ret = __tipc_sendstream(sock, m, dsz);  	release_sock(sk);  	return ret;  } -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)  {  	struct sock *sk = sock->sk; -	struct net *net = sock_net(sk); -	struct tipc_sock *tsk = tipc_sk(sk); -	struct tipc_msg *mhdr = &tsk->phdr; -	struct sk_buff_head pktchain;  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); -	u32 portid = tsk->portid; -	int rc = -EINVAL; -	long timeo; -	u32 dnode; -	uint mtu, send, sent = 0; -	struct iov_iter save; -	int hlen = MIN_H_SIZE; - -	/* Handle implied connection establishment */ -	if (unlikely(dest)) { -		rc = __tipc_sendmsg(sock, m, dsz); -		hlen = msg_hdr_sz(mhdr); -		if (dsz && (dsz == rc)) -			tsk->snt_unacked = tsk_inc(tsk, dsz + hlen); -		return rc; -	} -	if (dsz > (uint)INT_MAX) -		return -EMSGSIZE; - -	if (unlikely(!tipc_sk_connected(sk))) { -		if (sk->sk_state == TIPC_DISCONNECTING) -			return -EPIPE; -		else -			return -ENOTCONN; -	} +	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_msg *hdr = &tsk->phdr; +	struct net *net = sock_net(sk); +	struct sk_buff_head pkts; +	u32 dnode = tsk_peer_node(tsk); +	int send, sent = 0; +	int rc = 0; -	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); -	if (!timeo && tsk->link_cong) -		return -ELINKCONG; +	skb_queue_head_init(&pkts); -	dnode = tsk_peer_node(tsk); -	skb_queue_head_init(&pktchain); +	if (unlikely(dlen > INT_MAX)) +		return -EMSGSIZE; -next: -	save = m->msg_iter; -	mtu = tsk->max_pkt; -	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); -	rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); -	if (unlikely(rc < 0)) +	/* Handle implicit connection setup */ +	if (unlikely(dest)) { +		rc = __tipc_sendmsg(sock, m, dlen); +		if (dlen && (dlen == rc)) +			tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));  		return rc; +	}  	do { -		if (likely(!tsk_conn_cong(tsk))) { -			rc = tipc_node_xmit(net, &pktchain, dnode, portid); -			if (likely(!rc)) { -				tsk->snt_unacked += tsk_inc(tsk, send + hlen); -				sent += send; -				if (sent == dsz) -					return dsz; -				goto next; -			} -			if (rc == -EMSGSIZE) { -				__skb_queue_purge(&pktchain); -				tsk->max_pkt = tipc_node_get_mtu(net, dnode, -								 portid); -				m->msg_iter = save; -				goto next; -			} -			if (rc != -ELINKCONG) -				break; +		rc = tipc_wait_for_cond(sock, &timeout, +					(!tsk->cong_link_cnt && +					 !tsk_conn_cong(tsk) && +					 tipc_sk_connected(sk))); +		if (unlikely(rc)) +			break; -			tsk->link_cong = 1; +		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); +		rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); +		if (unlikely(rc != send)) +			break; + +		rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); +		if (unlikely(rc == -ELINKCONG)) { +			tsk->cong_link_cnt = 1; +			rc = 0;  		} -		rc = tipc_wait_for_sndpkt(sock, &timeo); -	} while (!rc); +		if (likely(!rc)) { +			tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); +			sent += send; +		} +	} while (sent < dlen && !rc); -	__skb_queue_purge(&pktchain); -	return sent ? sent : rc; +	return rc ? rc : sent;  }  /** @@ -1131,7 +1100,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)  	if (dsz > TIPC_MAX_USER_MSG_SIZE)  		return -EMSGSIZE; -	return tipc_send_stream(sock, m, dsz); +	return tipc_sendstream(sock, m, dsz);  }  /* tipc_sk_finish_conn - complete the setup of a connection @@ -1698,6 +1667,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,  	unsigned int limit = rcvbuf_limit(sk, skb);  	int err = TIPC_OK;  	int usr = msg_user(hdr); +	u32 onode;  	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {  		tipc_sk_proto_rcv(tsk, skb, xmitq); @@ -1705,8 +1675,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,  	}  	if (unlikely(usr == SOCK_WAKEUP)) { +		onode = msg_orignode(hdr);  		kfree_skb(skb); -		tsk->link_cong = 0; +		u32_del(&tsk->cong_links, onode); +		tsk->cong_link_cnt--;  		sk->sk_write_space(sk);  		return false;  	} @@ -2114,7 +2086,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)  		struct msghdr m = {NULL,};  		tsk_advance_rx_queue(sk); -		__tipc_send_stream(new_sock, &m, 0); +		__tipc_sendstream(new_sock, &m, 0);  	} else {  		__skb_dequeue(&sk->sk_receive_queue);  		__skb_queue_head(&new_sk->sk_receive_queue, buf); @@ -2269,24 +2241,27 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  void tipc_sk_reinit(struct net *net)  {  	struct tipc_net *tn = net_generic(net, tipc_net_id); -	const struct bucket_table *tbl; -	struct rhash_head *pos; +	struct rhashtable_iter iter;  	struct tipc_sock *tsk;  	struct tipc_msg *msg; -	int i; -	rcu_read_lock(); -	tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht); -	for (i = 0; i < tbl->size; i++) { -		rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { +	rhashtable_walk_enter(&tn->sk_rht, &iter); + +	do { +		tsk = ERR_PTR(rhashtable_walk_start(&iter)); +		if (tsk) +			continue; + +		while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {  			spin_lock_bh(&tsk->sk.sk_lock.slock);  			msg = &tsk->phdr;  			msg_set_prevnode(msg, tn->own_addr);  			msg_set_orignode(msg, tn->own_addr);  			spin_unlock_bh(&tsk->sk.sk_lock.slock);  		} -	} -	rcu_read_unlock(); + +		rhashtable_walk_stop(&iter); +	} while (tsk == ERR_PTR(-EAGAIN));  }  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid) @@ -2382,18 +2357,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,  {  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); -	u32 value; -	int res; +	u32 value = 0; +	int res = 0;  	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))  		return 0;  	if (lvl != SOL_TIPC)  		return -ENOPROTOOPT; -	if (ol < sizeof(value)) -		return -EINVAL; -	res = get_user(value, (u32 __user *)ov); -	if (res) -		return res; + +	switch (opt) { +	case TIPC_IMPORTANCE: +	case TIPC_SRC_DROPPABLE: +	case TIPC_DEST_DROPPABLE: +	case TIPC_CONN_TIMEOUT: +		if (ol < sizeof(value)) +			return -EINVAL; +		res = get_user(value, (u32 __user *)ov); +		if (res) +			return res; +		break; +	default: +		if (ov || ol) +			return -EINVAL; +	}  	lock_sock(sk); @@ -2412,7 +2398,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,  		break;  	case TIPC_CONN_TIMEOUT:  		tipc_sk(sk)->conn_timeout = value; -		/* no need to set "res", since already 0 at this point */ +		break; +	case TIPC_MCAST_BROADCAST: +		tsk->mc_method.rcast = false; +		tsk->mc_method.mandatory = true; +		break; +	case TIPC_MCAST_REPLICAST: +		tsk->mc_method.rcast = true; +		tsk->mc_method.mandatory = true;  		break;  	default:  		res = -EINVAL; @@ -2575,7 +2568,7 @@ static const struct proto_ops stream_ops = {  	.shutdown	= tipc_shutdown,  	.setsockopt	= tipc_setsockopt,  	.getsockopt	= tipc_getsockopt, -	.sendmsg	= tipc_send_stream, +	.sendmsg	= tipc_sendstream,  	.recvmsg	= tipc_recv_stream,  	.mmap		= sock_no_mmap,  	.sendpage	= sock_no_sendpage | 
