diff options
| author | Linus Torvalds <torvalds@linux-foundation.org> | 2018-04-03 14:04:18 -0700 | 
|---|---|---|
| committer | Linus Torvalds <torvalds@linux-foundation.org> | 2018-04-03 14:04:18 -0700 | 
| commit | 5bb053bef82523a8fd78d650bca81c9f114fa276 (patch) | |
| tree | 58c2fe47f60bb69230bb05d57a6c9e3f47f7b1fe /net/tipc/server.c | |
| parent | bb2407a7219760926760f0448fddf00d625e5aec (diff) | |
| parent | 159f02977b2feb18a4bece5e586c838a6d26d44b (diff) | |
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next
Pull networking updates from David Miller:
 1) Support offloading wireless authentication to userspace via
    NL80211_CMD_EXTERNAL_AUTH, from Srinivas Dasari.
 2) A lot of work on network namespace setup/teardown from Kirill Tkhai.
    Setup and cleanup of namespaces now all run asynchronously and thus
    performance is significantly increased.
 3) Add rx/tx timestamping support to mv88e6xxx driver, from Brandon
    Streiff.
 4) Support zerocopy on RDS sockets, from Sowmini Varadhan.
 5) Use denser instruction encoding in x86 eBPF JIT, from Daniel
    Borkmann.
 6) Support hw offload of vlan filtering in mvpp2 dreiver, from Maxime
    Chevallier.
 7) Support grafting of child qdiscs in mlxsw driver, from Nogah
    Frankel.
 8) Add packet forwarding tests to selftests, from Ido Schimmel.
 9) Deal with sub-optimal GSO packets better in BBR congestion control,
    from Eric Dumazet.
10) Support 5-tuple hashing in ipv6 multipath routing, from David Ahern.
11) Add path MTU tests to selftests, from Stefano Brivio.
12) Various bits of IPSEC offloading support for mlx5, from Aviad
    Yehezkel, Yossi Kuperman, and Saeed Mahameed.
13) Support RSS spreading on ntuple filters in SFC driver, from Edward
    Cree.
14) Lots of sockmap work from John Fastabend. Applications can use eBPF
    to filter sendmsg and sendpage operations.
15) In-kernel receive TLS support, from Dave Watson.
16) Add XDP support to ixgbevf, this is significant because it should
    allow optimized XDP usage in various cloud environments. From Tony
    Nguyen.
17) Add new Intel E800 series "ice" ethernet driver, from Anirudh
    Venkataramanan et al.
18) IP fragmentation match offload support in nfp driver, from Pieter
    Jansen van Vuuren.
19) Support XDP redirect in i40e driver, from Björn Töpel.
20) Add BPF_RAW_TRACEPOINT program type for accessing the arguments of
    tracepoints in their raw form, from Alexei Starovoitov.
21) Lots of striding RQ improvements to mlx5 driver with many
    performance improvements, from Tariq Toukan.
22) Use rhashtable for inet frag reassembly, from Eric Dumazet.
* git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next: (1678 commits)
  net: mvneta: improve suspend/resume
  net: mvneta: split rxq/txq init and txq deinit into SW and HW parts
  ipv6: frags: fix /proc/sys/net/ipv6/ip6frag_low_thresh
  net: bgmac: Fix endian access in bgmac_dma_tx_ring_free()
  net: bgmac: Correctly annotate register space
  route: check sysctl_fib_multipath_use_neigh earlier than hash
  fix typo in command value in drivers/net/phy/mdio-bitbang.
  sky2: Increase D3 delay to sky2 stops working after suspend
  net/mlx5e: Set EQE based as default TX interrupt moderation mode
  ibmvnic: Disable irqs before exiting reset from closed state
  net: sched: do not emit messages while holding spinlock
  vlan: also check phy_driver ts_info for vlan's real device
  Bluetooth: Mark expected switch fall-throughs
  Bluetooth: Set HCI_QUIRK_SIMULTANEOUS_DISCOVERY for BTUSB_QCA_ROME
  Bluetooth: btrsi: remove unused including <linux/version.h>
  Bluetooth: hci_bcm: Remove DMI quirk for the MINIX Z83-4
  sh_eth: kill useless check in __sh_eth_get_regs()
  sh_eth: add sh_eth_cpu_data::no_xdfar flag
  ipv6: factorize sk_wmem_alloc updates done by __ip6_append_data()
  ipv4: factorize sk_wmem_alloc updates done by __ip_append_data()
  ...
Diffstat (limited to 'net/tipc/server.c')
| -rw-r--r-- | net/tipc/server.c | 710 | 
1 files changed, 0 insertions, 710 deletions
| diff --git a/net/tipc/server.c b/net/tipc/server.c deleted file mode 100644 index df0c563c90cd..000000000000 --- a/net/tipc/server.c +++ /dev/null @@ -1,710 +0,0 @@ -/* - * net/tipc/server.c: TIPC server infrastructure - * - * Copyright (c) 2012-2013, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - *    notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - *    notice, this list of conditions and the following disclaimer in the - *    documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - *    contributors may be used to endorse or promote products derived from - *    this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "server.h" -#include "core.h" -#include "socket.h" -#include "addr.h" -#include "msg.h" -#include <net/sock.h> -#include <linux/module.h> - -/* Number of messages to send before rescheduling */ -#define MAX_SEND_MSG_COUNT	25 -#define MAX_RECV_MSG_COUNT	25 -#define CF_CONNECTED		1 -#define CF_SERVER		2 - -#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) - -/** - * struct tipc_conn - TIPC connection structure - * @kref: reference counter to connection object - * @conid: connection identifier - * @sock: socket handler associated with connection - * @flags: indicates connection state - * @server: pointer to connected server - * @rwork: receive work item - * @usr_data: user-specified field - * @rx_action: what to do when connection socket is active - * @outqueue: pointer to first outbound message in queue - * @outqueue_lock: control access to the outqueue - * @outqueue: list of connection objects for its server - * @swork: send work item - */ -struct tipc_conn { -	struct kref kref; -	int conid; -	struct socket *sock; -	unsigned long flags; -	struct tipc_server *server; -	struct work_struct rwork; -	int (*rx_action) (struct tipc_conn *con); -	void *usr_data; -	struct list_head outqueue; -	spinlock_t outqueue_lock; -	struct work_struct swork; -}; - -/* An entry waiting to be sent */ -struct outqueue_entry { -	struct list_head list; -	struct kvec iov; -	struct sockaddr_tipc dest; -}; - -static void tipc_recv_work(struct work_struct *work); -static void tipc_send_work(struct work_struct *work); -static void tipc_clean_outqueues(struct tipc_conn *con); - -static void tipc_conn_kref_release(struct kref *kref) -{ -	struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); -	struct tipc_server *s = con->server; -	struct sockaddr_tipc *saddr = s->saddr; -	struct socket *sock = con->sock; -	struct sock *sk; - -	if (sock) { -		sk = sock->sk; -		if (test_bit(CF_SERVER, &con->flags)) { -			__module_get(sock->ops->owner); -			__module_get(sk->sk_prot_creator->owner); -		} -		saddr->scope = -TIPC_NODE_SCOPE; -		kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); -		sock_release(sock); -		con->sock = NULL; -	} -	spin_lock_bh(&s->idr_lock); -	idr_remove(&s->conn_idr, con->conid); -	s->idr_in_use--; -	spin_unlock_bh(&s->idr_lock); -	tipc_clean_outqueues(con); -	kfree(con); -} - -static void conn_put(struct tipc_conn *con) -{ -	kref_put(&con->kref, tipc_conn_kref_release); -} - -static void conn_get(struct tipc_conn *con) -{ -	kref_get(&con->kref); -} - -static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) -{ -	struct tipc_conn *con; - -	spin_lock_bh(&s->idr_lock); -	con = idr_find(&s->conn_idr, conid); -	if (con) { -		if (!test_bit(CF_CONNECTED, &con->flags) || -		    !kref_get_unless_zero(&con->kref)) -			con = NULL; -	} -	spin_unlock_bh(&s->idr_lock); -	return con; -} - -static void sock_data_ready(struct sock *sk) -{ -	struct tipc_conn *con; - -	read_lock_bh(&sk->sk_callback_lock); -	con = sock2con(sk); -	if (con && test_bit(CF_CONNECTED, &con->flags)) { -		conn_get(con); -		if (!queue_work(con->server->rcv_wq, &con->rwork)) -			conn_put(con); -	} -	read_unlock_bh(&sk->sk_callback_lock); -} - -static void sock_write_space(struct sock *sk) -{ -	struct tipc_conn *con; - -	read_lock_bh(&sk->sk_callback_lock); -	con = sock2con(sk); -	if (con && test_bit(CF_CONNECTED, &con->flags)) { -		conn_get(con); -		if (!queue_work(con->server->send_wq, &con->swork)) -			conn_put(con); -	} -	read_unlock_bh(&sk->sk_callback_lock); -} - -static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) -{ -	struct sock *sk = sock->sk; - -	write_lock_bh(&sk->sk_callback_lock); - -	sk->sk_data_ready = sock_data_ready; -	sk->sk_write_space = sock_write_space; -	sk->sk_user_data = con; - -	con->sock = sock; - -	write_unlock_bh(&sk->sk_callback_lock); -} - -static void tipc_close_conn(struct tipc_conn *con) -{ -	struct tipc_server *s = con->server; -	struct sock *sk = con->sock->sk; -	bool disconnect = false; - -	write_lock_bh(&sk->sk_callback_lock); -	disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); -	if (disconnect) { -		sk->sk_user_data = NULL; -		if (con->conid) -			s->tipc_conn_release(con->conid, con->usr_data); -	} -	write_unlock_bh(&sk->sk_callback_lock); - -	/* Handle concurrent calls from sending and receiving threads */ -	if (!disconnect) -		return; - -	/* Don't flush pending works, -just let them expire */ -	kernel_sock_shutdown(con->sock, SHUT_RDWR); -	conn_put(con); -} - -static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) -{ -	struct tipc_conn *con; -	int ret; - -	con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); -	if (!con) -		return ERR_PTR(-ENOMEM); - -	kref_init(&con->kref); -	INIT_LIST_HEAD(&con->outqueue); -	spin_lock_init(&con->outqueue_lock); -	INIT_WORK(&con->swork, tipc_send_work); -	INIT_WORK(&con->rwork, tipc_recv_work); - -	spin_lock_bh(&s->idr_lock); -	ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC); -	if (ret < 0) { -		kfree(con); -		spin_unlock_bh(&s->idr_lock); -		return ERR_PTR(-ENOMEM); -	} -	con->conid = ret; -	s->idr_in_use++; -	spin_unlock_bh(&s->idr_lock); - -	set_bit(CF_CONNECTED, &con->flags); -	con->server = s; - -	return con; -} - -static int tipc_receive_from_sock(struct tipc_conn *con) -{ -	struct tipc_server *s = con->server; -	struct sock *sk = con->sock->sk; -	struct sockaddr_tipc addr; -	struct msghdr msg = {}; -	struct kvec iov; -	void *buf; -	int ret; - -	buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); -	if (!buf) { -		ret = -ENOMEM; -		goto out_close; -	} - -	iov.iov_base = buf; -	iov.iov_len = s->max_rcvbuf_size; -	msg.msg_name = &addr; -	iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); -	ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); -	if (ret <= 0) { -		kmem_cache_free(s->rcvbuf_cache, buf); -		goto out_close; -	} - -	read_lock_bh(&sk->sk_callback_lock); -	if (test_bit(CF_CONNECTED, &con->flags)) -		ret = s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid, -					   &addr, con->usr_data, buf, ret); -	read_unlock_bh(&sk->sk_callback_lock); -	kmem_cache_free(s->rcvbuf_cache, buf); -	if (ret < 0) -		tipc_conn_terminate(s, con->conid); -	return ret; - -out_close: -	if (ret != -EWOULDBLOCK) -		tipc_close_conn(con); -	else if (ret == 0) -		/* Don't return success if we really got EOF */ -		ret = -EAGAIN; - -	return ret; -} - -static int tipc_accept_from_sock(struct tipc_conn *con) -{ -	struct tipc_server *s = con->server; -	struct socket *sock = con->sock; -	struct socket *newsock; -	struct tipc_conn *newcon; -	int ret; - -	ret = kernel_accept(sock, &newsock, O_NONBLOCK); -	if (ret < 0) -		return ret; - -	newcon = tipc_alloc_conn(con->server); -	if (IS_ERR(newcon)) { -		ret = PTR_ERR(newcon); -		sock_release(newsock); -		return ret; -	} - -	newcon->rx_action = tipc_receive_from_sock; -	tipc_register_callbacks(newsock, newcon); - -	/* Notify that new connection is incoming */ -	newcon->usr_data = s->tipc_conn_new(newcon->conid); -	if (!newcon->usr_data) { -		sock_release(newsock); -		conn_put(newcon); -		return -ENOMEM; -	} - -	/* Wake up receive process in case of 'SYN+' message */ -	newsock->sk->sk_data_ready(newsock->sk); -	return ret; -} - -static struct socket *tipc_create_listen_sock(struct tipc_conn *con) -{ -	struct tipc_server *s = con->server; -	struct socket *sock = NULL; -	int ret; - -	ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock); -	if (ret < 0) -		return NULL; -	ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, -				(char *)&s->imp, sizeof(s->imp)); -	if (ret < 0) -		goto create_err; -	ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr)); -	if (ret < 0) -		goto create_err; - -	switch (s->type) { -	case SOCK_STREAM: -	case SOCK_SEQPACKET: -		con->rx_action = tipc_accept_from_sock; - -		ret = kernel_listen(sock, 0); -		if (ret < 0) -			goto create_err; -		break; -	case SOCK_DGRAM: -	case SOCK_RDM: -		con->rx_action = tipc_receive_from_sock; -		break; -	default: -		pr_err("Unknown socket type %d\n", s->type); -		goto create_err; -	} - -	/* As server's listening socket owner and creator is the same module, -	 * we have to decrease TIPC module reference count to guarantee that -	 * it remains zero after the server socket is created, otherwise, -	 * executing "rmmod" command is unable to make TIPC module deleted -	 * after TIPC module is inserted successfully. -	 * -	 * However, the reference count is ever increased twice in -	 * sock_create_kern(): one is to increase the reference count of owner -	 * of TIPC socket's proto_ops struct; another is to increment the -	 * reference count of owner of TIPC proto struct. Therefore, we must -	 * decrement the module reference count twice to ensure that it keeps -	 * zero after server's listening socket is created. Of course, we -	 * must bump the module reference count twice as well before the socket -	 * is closed. -	 */ -	module_put(sock->ops->owner); -	module_put(sock->sk->sk_prot_creator->owner); -	set_bit(CF_SERVER, &con->flags); - -	return sock; - -create_err: -	kernel_sock_shutdown(sock, SHUT_RDWR); -	sock_release(sock); -	return NULL; -} - -static int tipc_open_listening_sock(struct tipc_server *s) -{ -	struct socket *sock; -	struct tipc_conn *con; - -	con = tipc_alloc_conn(s); -	if (IS_ERR(con)) -		return PTR_ERR(con); - -	sock = tipc_create_listen_sock(con); -	if (!sock) { -		idr_remove(&s->conn_idr, con->conid); -		s->idr_in_use--; -		kfree(con); -		return -EINVAL; -	} - -	tipc_register_callbacks(sock, con); -	return 0; -} - -static struct outqueue_entry *tipc_alloc_entry(void *data, int len) -{ -	struct outqueue_entry *entry; -	void *buf; - -	entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); -	if (!entry) -		return NULL; - -	buf = kmemdup(data, len, GFP_ATOMIC); -	if (!buf) { -		kfree(entry); -		return NULL; -	} - -	entry->iov.iov_base = buf; -	entry->iov.iov_len = len; - -	return entry; -} - -static void tipc_free_entry(struct outqueue_entry *e) -{ -	kfree(e->iov.iov_base); -	kfree(e); -} - -static void tipc_clean_outqueues(struct tipc_conn *con) -{ -	struct outqueue_entry *e, *safe; - -	spin_lock_bh(&con->outqueue_lock); -	list_for_each_entry_safe(e, safe, &con->outqueue, list) { -		list_del(&e->list); -		tipc_free_entry(e); -	} -	spin_unlock_bh(&con->outqueue_lock); -} - -int tipc_conn_sendmsg(struct tipc_server *s, int conid, -		      struct sockaddr_tipc *addr, void *data, size_t len) -{ -	struct outqueue_entry *e; -	struct tipc_conn *con; - -	con = tipc_conn_lookup(s, conid); -	if (!con) -		return -EINVAL; - -	if (!test_bit(CF_CONNECTED, &con->flags)) { -		conn_put(con); -		return 0; -	} - -	e = tipc_alloc_entry(data, len); -	if (!e) { -		conn_put(con); -		return -ENOMEM; -	} - -	if (addr) -		memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); - -	spin_lock_bh(&con->outqueue_lock); -	list_add_tail(&e->list, &con->outqueue); -	spin_unlock_bh(&con->outqueue_lock); - -	if (!queue_work(s->send_wq, &con->swork)) -		conn_put(con); -	return 0; -} - -void tipc_conn_terminate(struct tipc_server *s, int conid) -{ -	struct tipc_conn *con; - -	con = tipc_conn_lookup(s, conid); -	if (con) { -		tipc_close_conn(con); -		conn_put(con); -	} -} - -bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, -			     u32 upper, u32 filter, int *conid) -{ -	struct tipc_subscriber *scbr; -	struct tipc_subscr sub; -	struct tipc_server *s; -	struct tipc_conn *con; - -	sub.seq.type = type; -	sub.seq.lower = lower; -	sub.seq.upper = upper; -	sub.timeout = TIPC_WAIT_FOREVER; -	sub.filter = filter; -	*(u32 *)&sub.usr_handle = port; - -	con = tipc_alloc_conn(tipc_topsrv(net)); -	if (IS_ERR(con)) -		return false; - -	*conid = con->conid; -	s = con->server; -	scbr = s->tipc_conn_new(*conid); -	if (!scbr) { -		conn_put(con); -		return false; -	} - -	con->usr_data = scbr; -	con->sock = NULL; -	s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub)); -	return true; -} - -void tipc_topsrv_kern_unsubscr(struct net *net, int conid) -{ -	struct tipc_conn *con; -	struct tipc_server *srv; - -	con = tipc_conn_lookup(tipc_topsrv(net), conid); -	if (!con) -		return; - -	test_and_clear_bit(CF_CONNECTED, &con->flags); -	srv = con->server; -	if (con->conid) -		srv->tipc_conn_release(con->conid, con->usr_data); -	conn_put(con); -	conn_put(con); -} - -static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) -{ -	u32 port = *(u32 *)&evt->s.usr_handle; -	u32 self = tipc_own_addr(net); -	struct sk_buff_head evtq; -	struct sk_buff *skb; - -	skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), -			      self, self, port, port, 0); -	if (!skb) -		return; -	msg_set_dest_droppable(buf_msg(skb), true); -	memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); -	skb_queue_head_init(&evtq); -	__skb_queue_tail(&evtq, skb); -	tipc_sk_rcv(net, &evtq); -} - -static void tipc_send_to_sock(struct tipc_conn *con) -{ -	struct tipc_server *s = con->server; -	struct outqueue_entry *e; -	struct tipc_event *evt; -	struct msghdr msg; -	int count = 0; -	int ret; - -	spin_lock_bh(&con->outqueue_lock); -	while (test_bit(CF_CONNECTED, &con->flags)) { -		e = list_entry(con->outqueue.next, struct outqueue_entry, list); -		if ((struct list_head *) e == &con->outqueue) -			break; - -		spin_unlock_bh(&con->outqueue_lock); - -		if (con->sock) { -			memset(&msg, 0, sizeof(msg)); -			msg.msg_flags = MSG_DONTWAIT; -			if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { -				msg.msg_name = &e->dest; -				msg.msg_namelen = sizeof(struct sockaddr_tipc); -			} -			ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, -					     e->iov.iov_len); -			if (ret == -EWOULDBLOCK || ret == 0) { -				cond_resched(); -				goto out; -			} else if (ret < 0) { -				goto send_err; -			} -		} else { -			evt = e->iov.iov_base; -			tipc_send_kern_top_evt(s->net, evt); -		} -		/* Don't starve users filling buffers */ -		if (++count >= MAX_SEND_MSG_COUNT) { -			cond_resched(); -			count = 0; -		} - -		spin_lock_bh(&con->outqueue_lock); -		list_del(&e->list); -		tipc_free_entry(e); -	} -	spin_unlock_bh(&con->outqueue_lock); -out: -	return; - -send_err: -	tipc_close_conn(con); -} - -static void tipc_recv_work(struct work_struct *work) -{ -	struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); -	int count = 0; - -	while (test_bit(CF_CONNECTED, &con->flags)) { -		if (con->rx_action(con)) -			break; - -		/* Don't flood Rx machine */ -		if (++count >= MAX_RECV_MSG_COUNT) { -			cond_resched(); -			count = 0; -		} -	} -	conn_put(con); -} - -static void tipc_send_work(struct work_struct *work) -{ -	struct tipc_conn *con = container_of(work, struct tipc_conn, swork); - -	if (test_bit(CF_CONNECTED, &con->flags)) -		tipc_send_to_sock(con); - -	conn_put(con); -} - -static void tipc_work_stop(struct tipc_server *s) -{ -	destroy_workqueue(s->rcv_wq); -	destroy_workqueue(s->send_wq); -} - -static int tipc_work_start(struct tipc_server *s) -{ -	s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0); -	if (!s->rcv_wq) { -		pr_err("can't start tipc receive workqueue\n"); -		return -ENOMEM; -	} - -	s->send_wq = alloc_ordered_workqueue("tipc_send", 0); -	if (!s->send_wq) { -		pr_err("can't start tipc send workqueue\n"); -		destroy_workqueue(s->rcv_wq); -		return -ENOMEM; -	} - -	return 0; -} - -int tipc_server_start(struct tipc_server *s) -{ -	int ret; - -	spin_lock_init(&s->idr_lock); -	idr_init(&s->conn_idr); -	s->idr_in_use = 0; - -	s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size, -					    0, SLAB_HWCACHE_ALIGN, NULL); -	if (!s->rcvbuf_cache) -		return -ENOMEM; - -	ret = tipc_work_start(s); -	if (ret < 0) { -		kmem_cache_destroy(s->rcvbuf_cache); -		return ret; -	} -	ret = tipc_open_listening_sock(s); -	if (ret < 0) { -		tipc_work_stop(s); -		kmem_cache_destroy(s->rcvbuf_cache); -		return ret; -	} -	return ret; -} - -void tipc_server_stop(struct tipc_server *s) -{ -	struct tipc_conn *con; -	int id; - -	spin_lock_bh(&s->idr_lock); -	for (id = 0; s->idr_in_use; id++) { -		con = idr_find(&s->conn_idr, id); -		if (con) { -			spin_unlock_bh(&s->idr_lock); -			tipc_close_conn(con); -			spin_lock_bh(&s->idr_lock); -		} -	} -	spin_unlock_bh(&s->idr_lock); - -	tipc_work_stop(s); -	kmem_cache_destroy(s->rcvbuf_cache); -	idr_destroy(&s->conn_idr); -} | 
