diff options
Diffstat (limited to 'net/tipc/socket.c')
| -rw-r--r-- | net/tipc/socket.c | 386 | 
1 files changed, 195 insertions, 191 deletions
| diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 46b6ed534ef2..1060d52ff23e 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -248,6 +248,22 @@ static void tsk_advance_rx_queue(struct sock *sk)  	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));  } +/* tipc_sk_respond() : send response message back to sender + */ +static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err) +{ +	u32 selector; +	u32 dnode; +	u32 onode = tipc_own_addr(sock_net(sk)); + +	if (!tipc_msg_reverse(onode, &skb, err)) +		return; + +	dnode = msg_destnode(buf_msg(skb)); +	selector = msg_origport(buf_msg(skb)); +	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); +} +  /**   * tsk_rej_rx_queue - reject all buffers in socket receive queue   * @@ -256,13 +272,9 @@ static void tsk_advance_rx_queue(struct sock *sk)  static void tsk_rej_rx_queue(struct sock *sk)  {  	struct sk_buff *skb; -	u32 dnode; -	u32 own_node = tsk_own_node(tipc_sk(sk)); -	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { -		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) -			tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0); -	} +	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) +		tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);  }  /* tsk_peer_msg - verify if message was sent by connected port's peer @@ -441,9 +453,7 @@ static int tipc_release(struct socket *sock)  				tsk->connected = 0;  				tipc_node_remove_conn(net, dnode, tsk->portid);  			} -			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -					     TIPC_ERR_NO_PORT)) -				tipc_link_xmit_skb(net, skb, dnode, 0); +			tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);  		}  	} @@ -456,7 +466,7 @@ static int tipc_release(struct socket *sock)  				      tsk_own_node(tsk), tsk_peer_port(tsk),  				      tsk->portid, TIPC_ERR_NO_PORT);  		if (skb) -			tipc_link_xmit_skb(net, skb, dnode, tsk->portid); +			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);  		tipc_node_remove_conn(net, dnode, tsk->portid);  	} @@ -686,21 +696,22 @@ new_mtu:  	do {  		rc = tipc_bclink_xmit(net, pktchain); -		if (likely(rc >= 0)) { -			rc = dsz; -			break; +		if (likely(!rc)) +			return dsz; + +		if (rc == -ELINKCONG) { +			tsk->link_cong = 1; +			rc = tipc_wait_for_sndmsg(sock, &timeo); +			if (!rc) +				continue;  		} +		__skb_queue_purge(pktchain);  		if (rc == -EMSGSIZE) {  			msg->msg_iter = save;  			goto new_mtu;  		} -		if (rc != -ELINKCONG) -			break; -		tipc_sk(sk)->link_cong = 1; -		rc = tipc_wait_for_sndmsg(sock, &timeo); -		if (rc) -			__skb_queue_purge(pktchain); -	} while (!rc); +		break; +	} while (1);  	return rc;  } @@ -763,35 +774,35 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  /**   * tipc_sk_proto_rcv - receive a connection mng protocol message   * @tsk: receiving socket - * @skb: pointer to message buffer. Set to NULL if buffer is consumed. + * @skb: pointer to message buffer.   */ -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb) +static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)  { -	struct tipc_msg *msg = buf_msg(*skb); +	struct sock *sk = &tsk->sk; +	struct tipc_msg *hdr = buf_msg(skb); +	int mtyp = msg_type(hdr);  	int conn_cong; -	u32 dnode; -	u32 own_node = tsk_own_node(tsk); +  	/* Ignore if connection cannot be validated: */ -	if (!tsk_peer_msg(tsk, msg)) +	if (!tsk_peer_msg(tsk, hdr))  		goto exit;  	tsk->probing_state = TIPC_CONN_OK; -	if (msg_type(msg) == CONN_ACK) { +	if (mtyp == CONN_PROBE) { +		msg_set_type(hdr, CONN_PROBE_REPLY); +		tipc_sk_respond(sk, skb, TIPC_OK); +		return; +	} else if (mtyp == CONN_ACK) {  		conn_cong = tsk_conn_cong(tsk); -		tsk->sent_unacked -= msg_msgcnt(msg); +		tsk->sent_unacked -= msg_msgcnt(hdr);  		if (conn_cong) -			tsk->sk.sk_write_space(&tsk->sk); -	} else if (msg_type(msg) == CONN_PROBE) { -		if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) { -			msg_set_type(msg, CONN_PROBE_REPLY); -			return; -		} +			sk->sk_write_space(sk); +	} else if (mtyp != CONN_PROBE_REPLY) { +		pr_warn("Received unknown CONN_PROTO msg\n");  	} -	/* Do nothing if msg_type() == CONN_PROBE_REPLY */  exit: -	kfree_skb(*skb); -	*skb = NULL; +	kfree_skb(skb);  }  static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) @@ -924,24 +935,25 @@ new_mtu:  	do {  		skb = skb_peek(pktchain);  		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; -		rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid); -		if (likely(rc >= 0)) { +		rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid); +		if (likely(!rc)) {  			if (sock->state != SS_READY)  				sock->state = SS_CONNECTING; -			rc = dsz; -			break; +			return dsz;  		} +		if (rc == -ELINKCONG) { +			tsk->link_cong = 1; +			rc = tipc_wait_for_sndmsg(sock, &timeo); +			if (!rc) +				continue; +		} +		__skb_queue_purge(pktchain);  		if (rc == -EMSGSIZE) {  			m->msg_iter = save;  			goto new_mtu;  		} -		if (rc != -ELINKCONG) -			break; -		tsk->link_cong = 1; -		rc = tipc_wait_for_sndmsg(sock, &timeo); -		if (rc) -			__skb_queue_purge(pktchain); -	} while (!rc); +		break; +	} while (1);  	return rc;  } @@ -1043,15 +1055,16 @@ next:  		return rc;  	do {  		if (likely(!tsk_conn_cong(tsk))) { -			rc = tipc_link_xmit(net, pktchain, dnode, portid); +			rc = tipc_node_xmit(net, pktchain, dnode, portid);  			if (likely(!rc)) {  				tsk->sent_unacked++;  				sent += send;  				if (sent == dsz) -					break; +					return dsz;  				goto next;  			}  			if (rc == -EMSGSIZE) { +				__skb_queue_purge(pktchain);  				tsk->max_pkt = tipc_node_get_mtu(net, dnode,  								 portid);  				m->msg_iter = save; @@ -1059,13 +1072,13 @@ next:  			}  			if (rc != -ELINKCONG)  				break; +  			tsk->link_cong = 1;  		}  		rc = tipc_wait_for_sndpkt(sock, &timeo); -		if (rc) -			__skb_queue_purge(pktchain);  	} while (!rc); +	__skb_queue_purge(pktchain);  	return sent ? sent : rc;  } @@ -1221,7 +1234,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)  		return;  	msg = buf_msg(skb);  	msg_set_msgcnt(msg, ack); -	tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg)); +	tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));  }  static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) @@ -1507,82 +1520,81 @@ static void tipc_data_ready(struct sock *sk)   * @tsk: TIPC socket   * @skb: pointer to message buffer. Set to NULL if buffer is consumed   * - * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise + * Returns true if everything ok, false otherwise   */ -static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb) +static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)  {  	struct sock *sk = &tsk->sk;  	struct net *net = sock_net(sk);  	struct socket *sock = sk->sk_socket; -	struct tipc_msg *msg = buf_msg(*skb); -	int retval = -TIPC_ERR_NO_PORT; +	struct tipc_msg *hdr = buf_msg(skb); -	if (msg_mcast(msg)) -		return retval; +	if (unlikely(msg_mcast(hdr))) +		return false;  	switch ((int)sock->state) {  	case SS_CONNECTED: +  		/* Accept only connection-based messages sent by peer */ -		if (tsk_peer_msg(tsk, msg)) { -			if (unlikely(msg_errcode(msg))) { -				sock->state = SS_DISCONNECTING; -				tsk->connected = 0; -				/* let timer expire on it's own */ -				tipc_node_remove_conn(net, tsk_peer_node(tsk), -						      tsk->portid); -			} -			retval = TIPC_OK; +		if (unlikely(!tsk_peer_msg(tsk, hdr))) +			return false; + +		if (unlikely(msg_errcode(hdr))) { +			sock->state = SS_DISCONNECTING; +			tsk->connected = 0; +			/* Let timer expire on it's own */ +			tipc_node_remove_conn(net, tsk_peer_node(tsk), +					      tsk->portid);  		} -		break; +		return true; +  	case SS_CONNECTING: -		/* Accept only ACK or NACK message */ -		if (unlikely(!msg_connected(msg))) -			break; +		/* Accept only ACK or NACK message */ +		if (unlikely(!msg_connected(hdr))) +			return false; -		if (unlikely(msg_errcode(msg))) { +		if (unlikely(msg_errcode(hdr))) {  			sock->state = SS_DISCONNECTING;  			sk->sk_err = ECONNREFUSED; -			retval = TIPC_OK; -			break; +			return true;  		} -		if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) { +		if (unlikely(!msg_isdata(hdr))) {  			sock->state = SS_DISCONNECTING;  			sk->sk_err = EINVAL; -			retval = TIPC_OK; -			break; +			return true;  		} -		tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg)); -		msg_set_importance(&tsk->phdr, msg_importance(msg)); +		tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr)); +		msg_set_importance(&tsk->phdr, msg_importance(hdr));  		sock->state = SS_CONNECTED; -		/* If an incoming message is an 'ACK-', it should be -		 * discarded here because it doesn't contain useful -		 * data. In addition, we should try to wake up -		 * connect() routine if sleeping. -		 */ -		if (msg_data_sz(msg) == 0) { -			kfree_skb(*skb); -			*skb = NULL; -			if (waitqueue_active(sk_sleep(sk))) -				wake_up_interruptible(sk_sleep(sk)); -		} -		retval = TIPC_OK; -		break; +		/* If 'ACK+' message, add to socket receive queue */ +		if (msg_data_sz(hdr)) +			return true; + +		/* If empty 'ACK-' message, wake up sleeping connect() */ +		if (waitqueue_active(sk_sleep(sk))) +			wake_up_interruptible(sk_sleep(sk)); + +		/* 'ACK-' message is neither accepted nor rejected: */ +		msg_set_dest_droppable(hdr, 1); +		return false; +  	case SS_LISTENING:  	case SS_UNCONNECTED: +  		/* Accept only SYN message */ -		if (!msg_connected(msg) && !(msg_errcode(msg))) -			retval = TIPC_OK; +		if (!msg_connected(hdr) && !(msg_errcode(hdr))) +			return true;  		break;  	case SS_DISCONNECTING:  		break;  	default:  		pr_err("Unknown socket state %u\n", sock->state);  	} -	return retval; +	return false;  }  /** @@ -1617,61 +1629,70 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)  /**   * filter_rcv - validate incoming message   * @sk: socket - * @skb: pointer to message. Set to NULL if buffer is consumed. + * @skb: pointer to message.   *   * Enqueues message on receive queue if acceptable; optionally handles   * disconnect indication for a connected socket.   *   * Called with socket lock already taken   * - * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected + * Returns true if message was added to socket receive queue, otherwise false   */ -static int filter_rcv(struct sock *sk, struct sk_buff **skb) +static bool filter_rcv(struct sock *sk, struct sk_buff *skb)  {  	struct socket *sock = sk->sk_socket;  	struct tipc_sock *tsk = tipc_sk(sk); -	struct tipc_msg *msg = buf_msg(*skb); -	unsigned int limit = rcvbuf_limit(sk, *skb); -	int rc = TIPC_OK; +	struct tipc_msg *hdr = buf_msg(skb); +	unsigned int limit = rcvbuf_limit(sk, skb); +	int err = TIPC_OK; +	int usr = msg_user(hdr); -	if (unlikely(msg_user(msg) == CONN_MANAGER)) { +	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {  		tipc_sk_proto_rcv(tsk, skb); -		return TIPC_OK; +		return false;  	} -	if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { -		kfree_skb(*skb); +	if (unlikely(usr == SOCK_WAKEUP)) { +		kfree_skb(skb);  		tsk->link_cong = 0;  		sk->sk_write_space(sk); -		*skb = NULL; -		return TIPC_OK; +		return false;  	} -	/* Reject message if it is wrong sort of message for socket */ -	if (msg_type(msg) > TIPC_DIRECT_MSG) -		return -TIPC_ERR_NO_PORT; +	/* Drop if illegal message type */ +	if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { +		kfree_skb(skb); +		return false; +	} -	if (sock->state == SS_READY) { -		if (msg_connected(msg)) -			return -TIPC_ERR_NO_PORT; -	} else { -		rc = filter_connect(tsk, skb); -		if (rc != TIPC_OK || !*skb) -			return rc; +	/* Reject if wrong message type for current socket state */ +	if (unlikely(sock->state == SS_READY)) { +		if (msg_connected(hdr)) { +			err = TIPC_ERR_NO_PORT; +			goto reject; +		} +	} else if (unlikely(!filter_connect(tsk, skb))) { +		err = TIPC_ERR_NO_PORT; +		goto reject;  	}  	/* Reject message if there isn't room to queue it */ -	if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit) -		return -TIPC_ERR_OVERLOAD; +	if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { +		err = TIPC_ERR_OVERLOAD; +		goto reject; +	}  	/* Enqueue message */ -	TIPC_SKB_CB(*skb)->handle = NULL; -	__skb_queue_tail(&sk->sk_receive_queue, *skb); -	skb_set_owner_r(*skb, sk); +	TIPC_SKB_CB(skb)->handle = NULL; +	__skb_queue_tail(&sk->sk_receive_queue, skb); +	skb_set_owner_r(skb, sk);  	sk->sk_data_ready(sk); -	*skb = NULL; -	return TIPC_OK; +	return true; + +reject: +	tipc_sk_respond(sk, skb, err); +	return false;  }  /** @@ -1685,22 +1706,10 @@ static int filter_rcv(struct sock *sk, struct sk_buff **skb)   */  static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)  { -	int err; -	atomic_t *dcnt; -	u32 dnode; -	struct tipc_sock *tsk = tipc_sk(sk); -	struct net *net = sock_net(sk); -	uint truesize = skb->truesize; +	unsigned int truesize = skb->truesize; -	err = filter_rcv(sk, &skb); -	if (likely(!skb)) { -		dcnt = &tsk->dupl_rcvcnt; -		if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT) -			atomic_add(truesize, dcnt); -		return 0; -	} -	if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) -		tipc_link_xmit_skb(net, skb, dnode, tsk->portid); +	if (likely(filter_rcv(sk, skb))) +		atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);  	return 0;  } @@ -1710,45 +1719,43 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)   * @inputq: list of incoming buffers with potentially different destinations   * @sk: socket where the buffers should be enqueued   * @dport: port number for the socket - * @_skb: returned buffer to be forwarded or rejected, if applicable   *   * Caller must hold socket lock - * - * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD - * or -TIPC_ERR_NO_PORT   */ -static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, -			   u32 dport, struct sk_buff **_skb) +static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, +			    u32 dport)  {  	unsigned int lim;  	atomic_t *dcnt; -	int err;  	struct sk_buff *skb;  	unsigned long time_limit = jiffies + 2;  	while (skb_queue_len(inputq)) {  		if (unlikely(time_after_eq(jiffies, time_limit))) -			return TIPC_OK; +			return; +  		skb = tipc_skb_dequeue(inputq, dport);  		if (unlikely(!skb)) -			return TIPC_OK; +			return; + +		/* Add message directly to receive queue if possible */  		if (!sock_owned_by_user(sk)) { -			err = filter_rcv(sk, &skb); -			if (likely(!skb)) -				continue; -			*_skb = skb; -			return err; +			filter_rcv(sk, skb); +			continue;  		} + +		/* Try backlog, compensating for double-counted bytes */  		dcnt = &tipc_sk(sk)->dupl_rcvcnt;  		if (sk->sk_backlog.len)  			atomic_set(dcnt, 0);  		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);  		if (likely(!sk_add_backlog(sk, skb, lim)))  			continue; -		*_skb = skb; -		return -TIPC_ERR_OVERLOAD; + +		/* Overload => reject message back to sender */ +		tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD); +		break;  	} -	return TIPC_OK;  }  /** @@ -1756,49 +1763,46 @@ static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,   * @inputq: buffer list containing the buffers   * Consumes all buffers in list until inputq is empty   * Note: may be called in multiple threads referring to the same queue - * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH - * Only node local calls check the return value, sending single-buffer queues   */ -int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) +void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)  {  	u32 dnode, dport = 0;  	int err; -	struct sk_buff *skb;  	struct tipc_sock *tsk; -	struct tipc_net *tn;  	struct sock *sk; +	struct sk_buff *skb;  	while (skb_queue_len(inputq)) { -		err = -TIPC_ERR_NO_PORT; -		skb = NULL;  		dport = tipc_skb_peek_port(inputq, dport);  		tsk = tipc_sk_lookup(net, dport); +  		if (likely(tsk)) {  			sk = &tsk->sk;  			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { -				err = tipc_sk_enqueue(inputq, sk, dport, &skb); +				tipc_sk_enqueue(inputq, sk, dport);  				spin_unlock_bh(&sk->sk_lock.slock); -				dport = 0;  			}  			sock_put(sk); -		} else { -			skb = tipc_skb_dequeue(inputq, dport); -		} -		if (likely(!skb))  			continue; -		if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) -			goto xmit; -		if (!err) { -			dnode = msg_destnode(buf_msg(skb)); -			goto xmit;  		} -		tn = net_generic(net, tipc_net_id); -		if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) + +		/* No destination socket => dequeue skb if still there */ +		skb = tipc_skb_dequeue(inputq, dport); +		if (!skb) +			return; + +		/* Try secondary lookup if unresolved named message */ +		err = TIPC_ERR_NO_PORT; +		if (tipc_msg_lookup_dest(net, skb, &err)) +			goto xmit; + +		/* Prepare for message rejection */ +		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))  			continue;  xmit: -		tipc_link_xmit_skb(net, skb, dnode, dport); +		dnode = msg_destnode(buf_msg(skb)); +		tipc_node_xmit_skb(net, skb, dnode, dport);  	} -	return err ? -EHOSTUNREACH : 0;  }  static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -2007,6 +2011,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)  	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);  	if (res)  		goto exit; +	security_sk_clone(sock->sk, new_sock->sk);  	new_sk = new_sock->sk;  	new_tsock = tipc_sk(new_sk); @@ -2066,7 +2071,10 @@ static int tipc_shutdown(struct socket *sock, int how)  	struct net *net = sock_net(sk);  	struct tipc_sock *tsk = tipc_sk(sk);  	struct sk_buff *skb; -	u32 dnode; +	u32 dnode = tsk_peer_node(tsk); +	u32 dport = tsk_peer_port(tsk); +	u32 onode = tipc_own_addr(net); +	u32 oport = tsk->portid;  	int res;  	if (how != SHUT_RDWR) @@ -2079,6 +2087,8 @@ static int tipc_shutdown(struct socket *sock, int how)  	case SS_CONNECTED:  restart: +		dnode = tsk_peer_node(tsk); +  		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */  		skb = __skb_dequeue(&sk->sk_receive_queue);  		if (skb) { @@ -2086,19 +2096,13 @@ restart:  				kfree_skb(skb);  				goto restart;  			} -			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -					     TIPC_CONN_SHUTDOWN)) -				tipc_link_xmit_skb(net, skb, dnode, -						   tsk->portid); +			tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);  		} else { -			dnode = tsk_peer_node(tsk); -  			skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,  					      TIPC_CONN_MSG, SHORT_H_SIZE, -					      0, dnode, tsk_own_node(tsk), -					      tsk_peer_port(tsk), -					      tsk->portid, TIPC_CONN_SHUTDOWN); -			tipc_link_xmit_skb(net, skb, dnode, tsk->portid); +					      0, dnode, onode, dport, oport, +					      TIPC_CONN_SHUTDOWN); +			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);  		}  		tsk->connected = 0;  		sock->state = SS_DISCONNECTING; @@ -2160,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data)  	}  	bh_unlock_sock(sk);  	if (skb) -		tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); +		tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);  exit:  	sock_put(sk);  } | 
