From d999297c3dbbe7fdd832f7fa4ec84301e170b3e6 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 16 Jul 2015 16:54:31 -0400 Subject: tipc: reduce locking scope during packet reception We convert packet/message reception according to the same principle we have been using for message sending and timeout handling: We move the function tipc_rcv() to node.c, hence handling the initial packet reception at the link aggregation level. The function grabs the node lock, selects the receiving link, and accesses it via a new call tipc_link_rcv(). This function appends buffers to the input queue for delivery upwards, but it may also append outgoing packets to the xmit queue, just as we do during regular message sending. The latter will happen when buffers are forwarded from the link backlog, or when retransmission is requested. Upon return of this function, and after having released the node lock, tipc_rcv() delivers/tranmsits the contents of those queues, but it may also perform actions such as link activation or reset, as indicated by the return flags from the link. This reduces the number of cpu cycles spent inside the node spinlock, and reduces contention on that lock. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/msg.h | 50 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 4 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 4dc66d9f69cc..2f1563b47e24 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -38,6 +38,7 @@ #define _TIPC_MSG_H #include +#include "core.h" /* * Constants and routines used to read and write TIPC payload message headers @@ -658,12 +659,12 @@ static inline void msg_set_link_selector(struct tipc_msg *m, u32 n) /* * Word 5 */ -static inline u32 msg_session(struct tipc_msg *m) +static inline u16 msg_session(struct tipc_msg *m) { return msg_bits(m, 5, 16, 0xffff); } -static inline void msg_set_session(struct tipc_msg *m, u32 n) +static inline void msg_set_session(struct tipc_msg *m, u16 n) { msg_set_bits(m, 5, 16, 0xffff, n); } @@ -766,10 +767,19 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } -static inline bool msg_peer_is_up(struct tipc_msg *m) +static inline bool msg_is_traffic(struct tipc_msg *m) { - if (likely(msg_user(m) != LINK_PROTOCOL) || (msg_type(m) == STATE_MSG)) + if (likely(msg_user(m) != LINK_PROTOCOL)) return true; + if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG)) + return false; + return true; +} + +static inline bool msg_peer_is_up(struct tipc_msg *m) +{ + if (likely(msg_is_traffic(m))) + return false; return msg_redundant_link(m); } @@ -886,4 +896,36 @@ static inline bool tipc_skb_queue_tail(struct sk_buff_head *list, return rv; } +/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number + * @list: list to be appended to + * @skb: buffer to add + * Returns true if queue should treated further, otherwise false + */ +static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list, + struct sk_buff *skb) +{ + struct sk_buff *_skb, *tmp; + struct tipc_msg *hdr = buf_msg(skb); + u16 seqno = msg_seqno(hdr); + + if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) { + __skb_queue_head(list, skb); + return true; + } + if (likely(less(seqno, buf_seqno(skb_peek(list))))) { + __skb_queue_head(list, skb); + return true; + } + if (!more(seqno, buf_seqno(skb_peek_tail(list)))) { + skb_queue_walk_safe(list, _skb, tmp) { + if (likely(less(seqno, buf_seqno(_skb)))) { + __skb_queue_before(list, _skb, skb); + return true; + } + } + } + __skb_queue_tail(list, skb); + return false; +} + #endif -- cgit v1.2.3-70-g09d2