From cc79dd1ba9c1021c2ac6ae200a65ec38ee8db351 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 17 Jun 2013 10:54:37 -0400 Subject: tipc: change socket buffer overflow control to respect sk_rcvbuf As per feedback from the netdev community, we change the buffer overflow protection algorithm in receiving sockets so that it always respects the nominal upper limit set in sk_rcvbuf. Instead of scaling up from a small sk_rcvbuf value, which leads to violation of the configured sk_rcvbuf limit, we now calculate the weighted per-message limit by scaling down from a much bigger value, still in the same field, according to the importance priority of the received message. To allow for administrative tunability of the socket receive buffer size, we create a tipc_rmem sysctl variable to allow the user to configure an even bigger value via sysctl command. It is a size of three (min/default/max) to be consistent with things like tcp_rmem. By default, the value initialized in tipc_rmem[1] is equal to the receive socket size needed by a TIPC_CRITICAL_IMPORTANCE message. This value is also set as the default value of sk_rcvbuf. Originally-by: Jon Maloy Cc: Neil Horman Cc: Jon Maloy [Ying: added sysctl variation to Jon's original patch] Signed-off-by: Ying Xue [PG: don't compile sysctl.c if not config'd; add Documentation] Signed-off-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/socket.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) (limited to 'net/tipc/socket.c') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 515ce38e4f4c..aba4255f297b 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -43,8 +43,6 @@ #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ -#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \ - SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ struct tipc_sock { @@ -203,6 +201,7 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, sock_init_data(sock, sk); sk->sk_backlog_rcv = backlog_rcv; + sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; tipc_sk(sk)->p = tp_ptr; @@ -1233,10 +1232,10 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) * For all connectionless messages, by default new queue limits are * as belows: * - * TIPC_LOW_IMPORTANCE (5MB) - * TIPC_MEDIUM_IMPORTANCE (10MB) - * TIPC_HIGH_IMPORTANCE (20MB) - * TIPC_CRITICAL_IMPORTANCE (40MB) + * TIPC_LOW_IMPORTANCE (4 MB) + * TIPC_MEDIUM_IMPORTANCE (8 MB) + * TIPC_HIGH_IMPORTANCE (16 MB) + * TIPC_CRITICAL_IMPORTANCE (32 MB) * * Returns overload limit according to corresponding message importance */ @@ -1246,9 +1245,10 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) unsigned int limit; if (msg_connected(msg)) - limit = CONN_OVERLOAD_LIMIT; + limit = sysctl_tipc_rmem[2]; else - limit = sk->sk_rcvbuf << (msg_importance(msg) + 5); + limit = sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE << + msg_importance(msg); return limit; } @@ -1847,7 +1847,8 @@ static const struct net_proto_family tipc_family_ops = { static struct proto tipc_proto = { .name = "TIPC", .owner = THIS_MODULE, - .obj_size = sizeof(struct tipc_sock) + .obj_size = sizeof(struct tipc_sock), + .sysctl_rmem = sysctl_tipc_rmem }; /** -- cgit v1.2.3-70-g09d2 From 5d21cb70db0122507cd18f58b4a9112583c1e075 Mon Sep 17 00:00:00 2001 From: Erik Hugne Date: Mon, 17 Jun 2013 10:54:38 -0400 Subject: tipc: allow implicit connect for stream sockets TIPC's implied connect feature, aka piggyback connect, allows applications to save one syscall and all SYN/SYN-ACK signalling overhead when setting up a connection. Until now, this has only been supported for SEQPACKET sockets. Here, we make it possible to use this feature even with stream sockets. At the connecting side, the connection is completed when the first data message arrives from the accepting peer. This means that we must allow the connecting user to call blocking recv() before the socket has reached state SS_CONNECTED. So we must must relax the state machine check at recv_stream(), and allow the recv() call even if socket is in state SS_CONNECTING. Signed-off-by: Erik Hugne Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/socket.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'net/tipc/socket.c') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index aba4255f297b..d5fa708f037d 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -518,8 +518,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock, res = -EISCONN; goto exit; } - if ((tport->published) || - ((sock->type == SOCK_STREAM) && (total_len != 0))) { + if (tport->published) { res = -EOPNOTSUPP; goto exit; } @@ -1010,8 +1009,7 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock, lock_sock(sk); - if (unlikely((sock->state == SS_UNCONNECTED) || - (sock->state == SS_CONNECTING))) { + if (unlikely((sock->state == SS_UNCONNECTED))) { res = -ENOTCONN; goto exit; } -- cgit v1.2.3-70-g09d2 From c5fa7b3cf3cb22e4ac60485fc2dc187fe012910f Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 17 Jun 2013 10:54:39 -0400 Subject: tipc: introduce new TIPC server infrastructure TIPC has two internal servers, one providing a subscription service for topology events, and another providing the configuration interface. These servers have previously been running in BH context, accessing the TIPC-port (aka native) API directly. Apart from these servers, even the TIPC socket implementation is partially built on this API. As this API may simultaneously be called via different paths and in different contexts, a complex and costly lock policiy is required in order to protect TIPC internal resources. To eliminate the need for this complex lock policiy, we introduce a new, generic service API that uses kernel sockets for message passing instead of the native API. Once the toplogy and configuration servers are converted to use this new service, all code pertaining to the native API can be removed. This entails a significant reduction in code amount and complexity, and opens up for a complete rework of the locking policy in TIPC. The new service also solves another problem: As the current topology server works in BH context, it cannot easily be blocked when sending of events fails due to congestion. In such cases events may have to be silently dropped, something that is unacceptable. Therefore, the new service keeps a dedicated outbound queue receiving messages from BH context. Once messages are inserted into this queue, we will immediately schedule a work from a special workqueue. This way, messages/events from the topology server are in reality sent in process context, and the server can block if necessary. Analogously, there is a new workqueue for receiving messages. Once a notification about an arriving message is received in BH context, we schedule a work from the receive workqueue to do the job of receiving the message in process context. As both sending and receive messages are now finished in processes, subscribed events cannot be dropped any more. As of this commit, this new server infrastructure is built, but not actually yet called by the existing TIPC code, but since the conversion changes required in order to use it are significant, the addition is kept here as a separate commit. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/Makefile | 2 +- net/tipc/core.h | 8 +- net/tipc/server.c | 596 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ net/tipc/server.h | 94 +++++++++ net/tipc/socket.c | 99 ++++++++- 5 files changed, 789 insertions(+), 10 deletions(-) create mode 100644 net/tipc/server.c create mode 100644 net/tipc/server.h (limited to 'net/tipc/socket.c') diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 02636d0a90cf..b282f7130d2b 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -8,7 +8,7 @@ tipc-y += addr.o bcast.o bearer.o config.o \ core.o handler.o link.o discover.o msg.o \ name_distr.o subscr.o name_table.o net.o \ netlink.o node.o node_subscr.o port.o ref.o \ - socket.o log.o eth_media.o + socket.o log.o eth_media.o server.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o tipc-$(CONFIG_SYSCTL) += sysctl.o diff --git a/net/tipc/core.h b/net/tipc/core.h index fe7f2b7c19fe..be72f8cebc53 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -1,8 +1,8 @@ /* * net/tipc/core.h: Include file for TIPC global declarations * - * Copyright (c) 2005-2006, Ericsson AB - * Copyright (c) 2005-2007, 2010-2011, Wind River Systems + * Copyright (c) 2005-2006, 2013 Ericsson AB + * Copyright (c) 2005-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -97,6 +97,10 @@ extern int tipc_netlink_start(void); extern void tipc_netlink_stop(void); extern int tipc_socket_init(void); extern void tipc_socket_stop(void); +extern int tipc_sock_create_local(int type, struct socket **res); +extern void tipc_sock_release_local(struct socket *sock); +extern int tipc_sock_accept_local(struct socket *sock, + struct socket **newsock, int flags); #ifdef CONFIG_SYSCTL extern int tipc_register_sysctl(void); diff --git a/net/tipc/server.c b/net/tipc/server.c new file mode 100644 index 000000000000..19da5abe0fa6 --- /dev/null +++ b/net/tipc/server.c @@ -0,0 +1,596 @@ +/* + * net/tipc/server.c: TIPC server infrastructure + * + * Copyright (c) 2012-2013, Wind River Systems + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include "core.h" +#include + +/* Number of messages to send before rescheduling */ +#define MAX_SEND_MSG_COUNT 25 +#define MAX_RECV_MSG_COUNT 25 +#define CF_CONNECTED 1 + +#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) + +/** + * struct tipc_conn - TIPC connection structure + * @kref: reference counter to connection object + * @conid: connection identifier + * @sock: socket handler associated with connection + * @flags: indicates connection state + * @server: pointer to connected server + * @rwork: receive work item + * @usr_data: user-specified field + * @rx_action: what to do when connection socket is active + * @outqueue: pointer to first outbound message in queue + * @outqueue_lock: controll access to the outqueue + * @outqueue: list of connection objects for its server + * @swork: send work item + */ +struct tipc_conn { + struct kref kref; + int conid; + struct socket *sock; + unsigned long flags; + struct tipc_server *server; + struct work_struct rwork; + int (*rx_action) (struct tipc_conn *con); + void *usr_data; + struct list_head outqueue; + spinlock_t outqueue_lock; + struct work_struct swork; +}; + +/* An entry waiting to be sent */ +struct outqueue_entry { + struct list_head list; + struct kvec iov; + struct sockaddr_tipc dest; +}; + +static void tipc_recv_work(struct work_struct *work); +static void tipc_send_work(struct work_struct *work); +static void tipc_clean_outqueues(struct tipc_conn *con); + +static void tipc_conn_kref_release(struct kref *kref) +{ + struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); + struct tipc_server *s = con->server; + + if (con->sock) { + tipc_sock_release_local(con->sock); + con->sock = NULL; + } + + tipc_clean_outqueues(con); + + if (con->conid) + s->tipc_conn_shutdown(con->conid, con->usr_data); + + kfree(con); +} + +static void conn_put(struct tipc_conn *con) +{ + kref_put(&con->kref, tipc_conn_kref_release); +} + +static void conn_get(struct tipc_conn *con) +{ + kref_get(&con->kref); +} + +static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) +{ + struct tipc_conn *con; + + spin_lock_bh(&s->idr_lock); + con = idr_find(&s->conn_idr, conid); + if (con) + conn_get(con); + spin_unlock_bh(&s->idr_lock); + return con; +} + +static void sock_data_ready(struct sock *sk, int unused) +{ + struct tipc_conn *con; + + read_lock(&sk->sk_callback_lock); + con = sock2con(sk); + if (con && test_bit(CF_CONNECTED, &con->flags)) { + conn_get(con); + if (!queue_work(con->server->rcv_wq, &con->rwork)) + conn_put(con); + } + read_unlock(&sk->sk_callback_lock); +} + +static void sock_write_space(struct sock *sk) +{ + struct tipc_conn *con; + + read_lock(&sk->sk_callback_lock); + con = sock2con(sk); + if (con && test_bit(CF_CONNECTED, &con->flags)) { + conn_get(con); + if (!queue_work(con->server->send_wq, &con->swork)) + conn_put(con); + } + read_unlock(&sk->sk_callback_lock); +} + +static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) +{ + struct sock *sk = sock->sk; + + write_lock_bh(&sk->sk_callback_lock); + + sk->sk_data_ready = sock_data_ready; + sk->sk_write_space = sock_write_space; + sk->sk_user_data = con; + + con->sock = sock; + + write_unlock_bh(&sk->sk_callback_lock); +} + +static void tipc_unregister_callbacks(struct tipc_conn *con) +{ + struct sock *sk = con->sock->sk; + + write_lock_bh(&sk->sk_callback_lock); + sk->sk_user_data = NULL; + write_unlock_bh(&sk->sk_callback_lock); +} + +static void tipc_close_conn(struct tipc_conn *con) +{ + struct tipc_server *s = con->server; + + if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { + spin_lock_bh(&s->idr_lock); + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + spin_unlock_bh(&s->idr_lock); + + tipc_unregister_callbacks(con); + + /* We shouldn't flush pending works as we may be in the + * thread. In fact the races with pending rx/tx work structs + * are harmless for us here as we have already deleted this + * connection from server connection list and set + * sk->sk_user_data to 0 before releasing connection object. + */ + kernel_sock_shutdown(con->sock, SHUT_RDWR); + + conn_put(con); + } +} + +static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) +{ + struct tipc_conn *con; + int ret; + + con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); + if (!con) + return ERR_PTR(-ENOMEM); + + kref_init(&con->kref); + INIT_LIST_HEAD(&con->outqueue); + spin_lock_init(&con->outqueue_lock); + INIT_WORK(&con->swork, tipc_send_work); + INIT_WORK(&con->rwork, tipc_recv_work); + + spin_lock_bh(&s->idr_lock); + ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC); + if (ret < 0) { + kfree(con); + spin_unlock_bh(&s->idr_lock); + return ERR_PTR(-ENOMEM); + } + con->conid = ret; + s->idr_in_use++; + spin_unlock_bh(&s->idr_lock); + + set_bit(CF_CONNECTED, &con->flags); + con->server = s; + + return con; +} + +static int tipc_receive_from_sock(struct tipc_conn *con) +{ + struct msghdr msg = {}; + struct tipc_server *s = con->server; + struct sockaddr_tipc addr; + struct kvec iov; + void *buf; + int ret; + + buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); + if (!buf) { + ret = -ENOMEM; + goto out_close; + } + + iov.iov_base = buf; + iov.iov_len = s->max_rcvbuf_size; + msg.msg_name = &addr; + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + MSG_DONTWAIT); + if (ret <= 0) { + kmem_cache_free(s->rcvbuf_cache, buf); + goto out_close; + } + + s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret); + + kmem_cache_free(s->rcvbuf_cache, buf); + + return 0; + +out_close: + if (ret != -EWOULDBLOCK) + tipc_close_conn(con); + else if (ret == 0) + /* Don't return success if we really got EOF */ + ret = -EAGAIN; + + return ret; +} + +static int tipc_accept_from_sock(struct tipc_conn *con) +{ + struct tipc_server *s = con->server; + struct socket *sock = con->sock; + struct socket *newsock; + struct tipc_conn *newcon; + int ret; + + ret = tipc_sock_accept_local(sock, &newsock, O_NONBLOCK); + if (ret < 0) + return ret; + + newcon = tipc_alloc_conn(con->server); + if (IS_ERR(newcon)) { + ret = PTR_ERR(newcon); + sock_release(newsock); + return ret; + } + + newcon->rx_action = tipc_receive_from_sock; + tipc_register_callbacks(newsock, newcon); + + /* Notify that new connection is incoming */ + newcon->usr_data = s->tipc_conn_new(newcon->conid); + + /* Wake up receive process in case of 'SYN+' message */ + newsock->sk->sk_data_ready(newsock->sk, 0); + return ret; +} + +static struct socket *tipc_create_listen_sock(struct tipc_conn *con) +{ + struct tipc_server *s = con->server; + struct socket *sock = NULL; + int ret; + + ret = tipc_sock_create_local(s->type, &sock); + if (ret < 0) + return NULL; + ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, + (char *)&s->imp, sizeof(s->imp)); + if (ret < 0) + goto create_err; + ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr)); + if (ret < 0) + goto create_err; + + switch (s->type) { + case SOCK_STREAM: + case SOCK_SEQPACKET: + con->rx_action = tipc_accept_from_sock; + + ret = kernel_listen(sock, 0); + if (ret < 0) + goto create_err; + break; + case SOCK_DGRAM: + case SOCK_RDM: + con->rx_action = tipc_receive_from_sock; + break; + default: + pr_err("Unknown socket type %d\n", s->type); + goto create_err; + } + return sock; + +create_err: + sock_release(sock); + con->sock = NULL; + return NULL; +} + +static int tipc_open_listening_sock(struct tipc_server *s) +{ + struct socket *sock; + struct tipc_conn *con; + + con = tipc_alloc_conn(s); + if (IS_ERR(con)) + return PTR_ERR(con); + + sock = tipc_create_listen_sock(con); + if (!sock) + return -EINVAL; + + tipc_register_callbacks(sock, con); + return 0; +} + +static struct outqueue_entry *tipc_alloc_entry(void *data, int len) +{ + struct outqueue_entry *entry; + void *buf; + + entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); + if (!entry) + return NULL; + + buf = kmalloc(len, GFP_ATOMIC); + if (!buf) { + kfree(entry); + return NULL; + } + + memcpy(buf, data, len); + entry->iov.iov_base = buf; + entry->iov.iov_len = len; + + return entry; +} + +static void tipc_free_entry(struct outqueue_entry *e) +{ + kfree(e->iov.iov_base); + kfree(e); +} + +static void tipc_clean_outqueues(struct tipc_conn *con) +{ + struct outqueue_entry *e, *safe; + + spin_lock_bh(&con->outqueue_lock); + list_for_each_entry_safe(e, safe, &con->outqueue, list) { + list_del(&e->list); + tipc_free_entry(e); + } + spin_unlock_bh(&con->outqueue_lock); +} + +int tipc_conn_sendmsg(struct tipc_server *s, int conid, + struct sockaddr_tipc *addr, void *data, size_t len) +{ + struct outqueue_entry *e; + struct tipc_conn *con; + + con = tipc_conn_lookup(s, conid); + if (!con) + return -EINVAL; + + e = tipc_alloc_entry(data, len); + if (!e) { + conn_put(con); + return -ENOMEM; + } + + if (addr) + memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); + + spin_lock_bh(&con->outqueue_lock); + list_add_tail(&e->list, &con->outqueue); + spin_unlock_bh(&con->outqueue_lock); + + if (test_bit(CF_CONNECTED, &con->flags)) + if (!queue_work(s->send_wq, &con->swork)) + conn_put(con); + + return 0; +} + +void tipc_conn_terminate(struct tipc_server *s, int conid) +{ + struct tipc_conn *con; + + con = tipc_conn_lookup(s, conid); + if (con) { + tipc_close_conn(con); + conn_put(con); + } +} + +static void tipc_send_to_sock(struct tipc_conn *con) +{ + int count = 0; + struct tipc_server *s = con->server; + struct outqueue_entry *e; + struct msghdr msg; + int ret; + + spin_lock_bh(&con->outqueue_lock); + while (1) { + e = list_entry(con->outqueue.next, struct outqueue_entry, + list); + if ((struct list_head *) e == &con->outqueue) + break; + spin_unlock_bh(&con->outqueue_lock); + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { + msg.msg_name = &e->dest; + msg.msg_namelen = sizeof(struct sockaddr_tipc); + } + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, + e->iov.iov_len); + if (ret == -EWOULDBLOCK || ret == 0) { + cond_resched(); + goto out; + } else if (ret < 0) { + goto send_err; + } + + /* Don't starve users filling buffers */ + if (++count >= MAX_SEND_MSG_COUNT) { + cond_resched(); + count = 0; + } + + spin_lock_bh(&con->outqueue_lock); + list_del(&e->list); + tipc_free_entry(e); + } + spin_unlock_bh(&con->outqueue_lock); +out: + return; + +send_err: + tipc_close_conn(con); +} + +static void tipc_recv_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); + int count = 0; + + while (test_bit(CF_CONNECTED, &con->flags)) { + if (con->rx_action(con)) + break; + + /* Don't flood Rx machine */ + if (++count >= MAX_RECV_MSG_COUNT) { + cond_resched(); + count = 0; + } + } + conn_put(con); +} + +static void tipc_send_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, swork); + + if (test_bit(CF_CONNECTED, &con->flags)) + tipc_send_to_sock(con); + + conn_put(con); +} + +static void tipc_work_stop(struct tipc_server *s) +{ + destroy_workqueue(s->rcv_wq); + destroy_workqueue(s->send_wq); +} + +static int tipc_work_start(struct tipc_server *s) +{ + s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1); + if (!s->rcv_wq) { + pr_err("can't start tipc receive workqueue\n"); + return -ENOMEM; + } + + s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1); + if (!s->send_wq) { + pr_err("can't start tipc send workqueue\n"); + destroy_workqueue(s->rcv_wq); + return -ENOMEM; + } + + return 0; +} + +int tipc_server_start(struct tipc_server *s) +{ + int ret; + + spin_lock_init(&s->idr_lock); + idr_init(&s->conn_idr); + s->idr_in_use = 0; + + s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size, + 0, SLAB_HWCACHE_ALIGN, NULL); + if (!s->rcvbuf_cache) + return -ENOMEM; + + ret = tipc_work_start(s); + if (ret < 0) { + kmem_cache_destroy(s->rcvbuf_cache); + return ret; + } + s->enabled = 1; + + return tipc_open_listening_sock(s); +} + +void tipc_server_stop(struct tipc_server *s) +{ + struct tipc_conn *con; + int total = 0; + int id; + + if (!s->enabled) + return; + + s->enabled = 0; + spin_lock_bh(&s->idr_lock); + for (id = 0; total < s->idr_in_use; id++) { + con = idr_find(&s->conn_idr, id); + if (con) { + total++; + spin_unlock_bh(&s->idr_lock); + tipc_close_conn(con); + spin_lock_bh(&s->idr_lock); + } + } + spin_unlock_bh(&s->idr_lock); + + tipc_work_stop(s); + kmem_cache_destroy(s->rcvbuf_cache); + idr_destroy(&s->conn_idr); +} diff --git a/net/tipc/server.h b/net/tipc/server.h new file mode 100644 index 000000000000..98b23f20bc0f --- /dev/null +++ b/net/tipc/server.h @@ -0,0 +1,94 @@ +/* + * net/tipc/server.h: Include file for TIPC server code + * + * Copyright (c) 2012-2013, Wind River Systems + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_SERVER_H +#define _TIPC_SERVER_H + +#include "core.h" + +#define TIPC_SERVER_NAME_LEN 32 + +/** + * struct tipc_server - TIPC server structure + * @conn_idr: identifier set of connection + * @idr_lock: protect the connection identifier set + * @idr_in_use: amount of allocated identifier entry + * @rcvbuf_cache: memory cache of server receive buffer + * @rcv_wq: receive workqueue + * @send_wq: send workqueue + * @max_rcvbuf_size: maximum permitted receive message length + * @tipc_conn_new: callback will be called when new connection is incoming + * @tipc_conn_shutdown: callback will be called when connection is shut down + * @tipc_conn_recvmsg: callback will be called when message arrives + * @saddr: TIPC server address + * @name: server name + * @imp: message importance + * @type: socket type + * @enabled: identify whether server is launched or not + */ +struct tipc_server { + struct idr conn_idr; + spinlock_t idr_lock; + int idr_in_use; + struct kmem_cache *rcvbuf_cache; + struct workqueue_struct *rcv_wq; + struct workqueue_struct *send_wq; + int max_rcvbuf_size; + void *(*tipc_conn_new) (int conid); + void (*tipc_conn_shutdown) (int conid, void *usr_data); + void (*tipc_conn_recvmsg) (int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len); + struct sockaddr_tipc *saddr; + const char name[TIPC_SERVER_NAME_LEN]; + int imp; + int type; + int enabled; +}; + +int tipc_conn_sendmsg(struct tipc_server *s, int conid, + struct sockaddr_tipc *addr, void *data, size_t len); + +/** + * tipc_conn_terminate - terminate connection with server + * + * Note: Must call it in process context since it might sleep + */ +void tipc_conn_terminate(struct tipc_server *s, int conid); + +int tipc_server_start(struct tipc_server *s); + +void tipc_server_stop(struct tipc_server *s); + +#endif diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d5fa708f037d..bd8e2cdeceef 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2,7 +2,7 @@ * net/tipc/socket.c: TIPC socket API * * Copyright (c) 2001-2007, 2012 Ericsson AB - * Copyright (c) 2004-2008, 2010-2012, Wind River Systems + * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -63,12 +63,15 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); static void wakeupdispatch(struct tipc_port *tport); static void tipc_data_ready(struct sock *sk, int len); static void tipc_write_space(struct sock *sk); +static int release(struct socket *sock); +static int accept(struct socket *sock, struct socket *new_sock, int flags); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; static const struct proto_ops msg_ops; static struct proto tipc_proto; +static struct proto tipc_proto_kern; static int sockets_enabled; @@ -141,7 +144,7 @@ static void reject_rx_queue(struct sock *sk) } /** - * tipc_create - create a TIPC socket + * tipc_sk_create - create a TIPC socket * @net: network namespace (must be default network) * @sock: pre-allocated socket structure * @protocol: protocol indicator (must be 0) @@ -152,8 +155,8 @@ static void reject_rx_queue(struct sock *sk) * * Returns 0 on success, errno otherwise */ -static int tipc_create(struct net *net, struct socket *sock, int protocol, - int kern) +static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, + int kern) { const struct proto_ops *ops; socket_state state; @@ -183,7 +186,11 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, } /* Allocate socket's protocol area */ - sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto); + if (!kern) + sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto); + else + sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern); + if (sk == NULL) return -ENOMEM; @@ -218,6 +225,78 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, return 0; } +/** + * tipc_sock_create_local - create TIPC socket from inside TIPC module + * @type: socket type - SOCK_RDM or SOCK_SEQPACKET + * + * We cannot use sock_creat_kern here because it bumps module user count. + * Since socket owner and creator is the same module we must make sure + * that module count remains zero for module local sockets, otherwise + * we cannot do rmmod. + * + * Returns 0 on success, errno otherwise + */ +int tipc_sock_create_local(int type, struct socket **res) +{ + int rc; + struct sock *sk; + + rc = sock_create_lite(AF_TIPC, type, 0, res); + if (rc < 0) { + pr_err("Failed to create kernel socket\n"); + return rc; + } + tipc_sk_create(&init_net, *res, 0, 1); + + sk = (*res)->sk; + + return 0; +} + +/** + * tipc_sock_release_local - release socket created by tipc_sock_create_local + * @sock: the socket to be released. + * + * Module reference count is not incremented when such sockets are created, + * so we must keep it from being decremented when they are released. + */ +void tipc_sock_release_local(struct socket *sock) +{ + release(sock); + sock->ops = NULL; + sock_release(sock); +} + +/** + * tipc_sock_accept_local - accept a connection on a socket created + * with tipc_sock_create_local. Use this function to avoid that + * module reference count is inadvertently incremented. + * + * @sock: the accepting socket + * @newsock: reference to the new socket to be created + * @flags: socket flags + */ + +int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, + int flags) +{ + struct sock *sk = sock->sk; + int ret; + + ret = sock_create_lite(sk->sk_family, sk->sk_type, + sk->sk_protocol, newsock); + if (ret < 0) + return ret; + + ret = accept(sock, *newsock, flags); + if (ret < 0) { + sock_release(*newsock); + return ret; + } + (*newsock)->ops = sock->ops; + return ret; +} + /** * release - destroy a TIPC socket * @sock: socket to destroy @@ -1529,7 +1608,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) buf = skb_peek(&sk->sk_receive_queue); - res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); + res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1); if (res) goto exit; @@ -1839,7 +1918,7 @@ static const struct proto_ops stream_ops = { static const struct net_proto_family tipc_family_ops = { .owner = THIS_MODULE, .family = AF_TIPC, - .create = tipc_create + .create = tipc_sk_create }; static struct proto tipc_proto = { @@ -1849,6 +1928,12 @@ static struct proto tipc_proto = { .sysctl_rmem = sysctl_tipc_rmem }; +static struct proto tipc_proto_kern = { + .name = "TIPC", + .obj_size = sizeof(struct tipc_sock), + .sysctl_rmem = sysctl_tipc_rmem +}; + /** * tipc_socket_init - initialize TIPC socket interface * -- cgit v1.2.3-70-g09d2 From 13a2e89873506d64d7e52f17b571da371a3e25a4 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 17 Jun 2013 10:54:40 -0400 Subject: tipc: convert topology server to use new server facility As the new TIPC server infrastructure has been introduced, we can now convert the TIPC topology server to it. We get two benefits from doing this: 1) It simplifies the topology server locking policy. In the original locking policy, we placed one spin lock pointer in the tipc_subscriber structure to reuse the lock of the subscriber's server port, controlling access to members of tipc_subscriber instance. That is, we only used one lock to ensure both tipc_port and tipc_subscriber members were safely accessed. Now we introduce another spin lock for tipc_subscriber structure only protecting themselves, to get a finer granularity locking policy. Moreover, the change will allow us to make the topology server code more readable and maintainable. 2) It fixes a bug where sent subscription events may be lost when the topology port is congested. Using the new service, the topology server now queues sent events into an outgoing buffer, and then wakes up a sender process which has been blocked in workqueue context. The process will keep picking events from the buffer and send them to their respective subscribers, using the kernel socket interface, until the buffer is empty. Even if the socket is congested during transmission there is no risk that events may be dropped, since the sender process may block when needed. Some minor reordering of initialization is done, since we now have a scenario where the topology server must be started after socket initialization has taken place, as the former depends on the latter. And overall, we see a simplification of the TIPC subscriber code in making this changeover. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/core.c | 6 +- net/tipc/socket.c | 3 +- net/tipc/subscr.c | 334 +++++++++++++++--------------------------------------- net/tipc/subscr.h | 8 +- 4 files changed, 104 insertions(+), 247 deletions(-) (limited to 'net/tipc/socket.c') diff --git a/net/tipc/core.c b/net/tipc/core.c index b0e42a087291..15bbe99b609d 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -2,7 +2,7 @@ * net/tipc/core.c: TIPC module code * * Copyright (c) 2003-2006, Ericsson AB - * Copyright (c) 2005-2006, 2010-2011, Wind River Systems + * Copyright (c) 2005-2006, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -136,8 +136,6 @@ static int tipc_core_start(void) res = tipc_ref_table_init(tipc_max_ports, tipc_random); if (!res) res = tipc_nametbl_init(); - if (!res) - res = tipc_subscr_start(); if (!res) res = tipc_cfg_init(); if (!res) @@ -146,6 +144,8 @@ static int tipc_core_start(void) res = tipc_socket_init(); if (!res) res = tipc_register_sysctl(); + if (!res) + res = tipc_subscr_start(); if (res) tipc_core_stop(); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index bd8e2cdeceef..d0254157a30d 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -402,7 +402,8 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) else if (addr->addrtype != TIPC_ADDR_NAMESEQ) return -EAFNOSUPPORT; - if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES) + if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) && + (addr->addr.nameseq.type != TIPC_TOP_SRV)) return -EACCES; return (addr->scope > 0) ? diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 6b42d47029af..f6be92a6973a 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -2,7 +2,7 @@ * net/tipc/subscr.c: TIPC network topology service * * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005-2007, 2010-2011, Wind River Systems + * Copyright (c) 2005-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,33 +41,42 @@ /** * struct tipc_subscriber - TIPC network topology subscriber - * @port_ref: object reference to server port connecting to subscriber - * @lock: pointer to spinlock controlling access to subscriber's server port - * @subscriber_list: adjacent subscribers in top. server's list of subscribers + * @conid: connection identifier to server connecting to subscriber + * @lock: controll access to subscriber * @subscription_list: list of subscription objects for this subscriber */ struct tipc_subscriber { - u32 port_ref; - spinlock_t *lock; - struct list_head subscriber_list; + int conid; + spinlock_t lock; struct list_head subscription_list; }; -/** - * struct top_srv - TIPC network topology subscription service - * @setup_port: reference to TIPC port that handles subscription requests - * @subscription_count: number of active subscriptions (not subscribers!) - * @subscriber_list: list of ports subscribing to service - * @lock: spinlock govering access to subscriber list - */ -struct top_srv { - u32 setup_port; - atomic_t subscription_count; - struct list_head subscriber_list; - spinlock_t lock; +static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len); +static void *subscr_named_msg_event(int conid); +static void subscr_conn_shutdown_event(int conid, void *usr_data); + +static atomic_t subscription_count = ATOMIC_INIT(0); + +static struct sockaddr_tipc topsrv_addr __read_mostly = { + .family = AF_TIPC, + .addrtype = TIPC_ADDR_NAMESEQ, + .addr.nameseq.type = TIPC_TOP_SRV, + .addr.nameseq.lower = TIPC_TOP_SRV, + .addr.nameseq.upper = TIPC_TOP_SRV, + .scope = TIPC_NODE_SCOPE }; -static struct top_srv topsrv; +static struct tipc_server topsrv __read_mostly = { + .saddr = &topsrv_addr, + .imp = TIPC_CRITICAL_IMPORTANCE, + .type = SOCK_SEQPACKET, + .max_rcvbuf_size = sizeof(struct tipc_subscr), + .name = "topology_server", + .tipc_conn_recvmsg = subscr_conn_msg_event, + .tipc_conn_new = subscr_named_msg_event, + .tipc_conn_shutdown = subscr_conn_shutdown_event, +}; /** * htohl - convert value to endianness used by destination @@ -81,20 +90,13 @@ static u32 htohl(u32 in, int swap) return swap ? swab32(in) : in; } -/** - * subscr_send_event - send a message containing a tipc_event to the subscriber - * - * Note: Must not hold subscriber's server port lock, since tipc_send() will - * try to take the lock if the message is rejected and returned! - */ -static void subscr_send_event(struct tipc_subscription *sub, - u32 found_lower, - u32 found_upper, - u32 event, - u32 port_ref, +static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower, + u32 found_upper, u32 event, u32 port_ref, u32 node) { - struct iovec msg_sect; + struct tipc_subscriber *subscriber = sub->subscriber; + struct kvec msg_sect; + int ret; msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_len = sizeof(struct tipc_event); @@ -104,7 +106,10 @@ static void subscr_send_event(struct tipc_subscription *sub, sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.node = htohl(node, sub->swap); - tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len); + ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, + msg_sect.iov_base, msg_sect.iov_len); + if (ret < 0) + pr_err("Sending subscription event failed, no memory\n"); } /** @@ -147,21 +152,24 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub, subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); } -/** - * subscr_timeout - subscription timeout has occurred - */ static void subscr_timeout(struct tipc_subscription *sub) { - struct tipc_port *server_port; + struct tipc_subscriber *subscriber = sub->subscriber; - /* Validate server port reference (in case subscriber is terminating) */ - server_port = tipc_port_lock(sub->server_ref); - if (server_port == NULL) + /* The spin lock per subscriber is used to protect its members */ + spin_lock_bh(&subscriber->lock); + + /* Validate if the connection related to the subscriber is + * closed (in case subscriber is terminating) + */ + if (subscriber->conid == 0) { + spin_unlock_bh(&subscriber->lock); return; + } /* Validate timeout (in case subscription is being cancelled) */ if (sub->timeout == TIPC_WAIT_FOREVER) { - tipc_port_unlock(server_port); + spin_unlock_bh(&subscriber->lock); return; } @@ -171,8 +179,7 @@ static void subscr_timeout(struct tipc_subscription *sub) /* Unlink subscription from subscriber */ list_del(&sub->subscription_list); - /* Release subscriber's server port */ - tipc_port_unlock(server_port); + spin_unlock_bh(&subscriber->lock); /* Notify subscriber of timeout */ subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, @@ -181,64 +188,54 @@ static void subscr_timeout(struct tipc_subscription *sub) /* Now destroy subscription */ k_term_timer(&sub->timer); kfree(sub); - atomic_dec(&topsrv.subscription_count); + atomic_dec(&subscription_count); } /** * subscr_del - delete a subscription within a subscription list * - * Called with subscriber port locked. + * Called with subscriber lock held. */ static void subscr_del(struct tipc_subscription *sub) { tipc_nametbl_unsubscribe(sub); list_del(&sub->subscription_list); kfree(sub); - atomic_dec(&topsrv.subscription_count); + atomic_dec(&subscription_count); } /** * subscr_terminate - terminate communication with a subscriber * - * Called with subscriber port locked. Routine must temporarily release lock - * to enable subscription timeout routine(s) to finish without deadlocking; - * the lock is then reclaimed to allow caller to release it upon return. - * (This should work even in the unlikely event some other thread creates - * a new object reference in the interim that uses this lock; this routine will - * simply wait for it to be released, then claim it.) + * Note: Must call it in process context since it might sleep. */ static void subscr_terminate(struct tipc_subscriber *subscriber) { - u32 port_ref; + tipc_conn_terminate(&topsrv, subscriber->conid); +} + +static void subscr_release(struct tipc_subscriber *subscriber) +{ struct tipc_subscription *sub; struct tipc_subscription *sub_temp; - /* Invalidate subscriber reference */ - port_ref = subscriber->port_ref; - subscriber->port_ref = 0; - spin_unlock_bh(subscriber->lock); + spin_lock_bh(&subscriber->lock); - /* Sever connection to subscriber */ - tipc_shutdown(port_ref); - tipc_deleteport(port_ref); + /* Invalidate subscriber reference */ + subscriber->conid = 0; /* Destroy any existing subscriptions for subscriber */ list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, subscription_list) { if (sub->timeout != TIPC_WAIT_FOREVER) { + spin_unlock_bh(&subscriber->lock); k_cancel_timer(&sub->timer); k_term_timer(&sub->timer); + spin_lock_bh(&subscriber->lock); } subscr_del(sub); } - - /* Remove subscriber from topology server's subscriber list */ - spin_lock_bh(&topsrv.lock); - list_del(&subscriber->subscriber_list); - spin_unlock_bh(&topsrv.lock); - - /* Reclaim subscriber lock */ - spin_lock_bh(subscriber->lock); + spin_unlock_bh(&subscriber->lock); /* Now destroy subscriber */ kfree(subscriber); @@ -247,7 +244,7 @@ static void subscr_terminate(struct tipc_subscriber *subscriber) /** * subscr_cancel - handle subscription cancellation request * - * Called with subscriber port locked. Routine must temporarily release lock + * Called with subscriber lock held. Routine must temporarily release lock * to enable the subscription timeout routine to finish without deadlocking; * the lock is then reclaimed to allow caller to release it upon return. * @@ -274,10 +271,10 @@ static void subscr_cancel(struct tipc_subscr *s, /* Cancel subscription timer (if used), then delete subscription */ if (sub->timeout != TIPC_WAIT_FOREVER) { sub->timeout = TIPC_WAIT_FOREVER; - spin_unlock_bh(subscriber->lock); + spin_unlock_bh(&subscriber->lock); k_cancel_timer(&sub->timer); k_term_timer(&sub->timer); - spin_lock_bh(subscriber->lock); + spin_lock_bh(&subscriber->lock); } subscr_del(sub); } @@ -285,7 +282,7 @@ static void subscr_cancel(struct tipc_subscr *s, /** * subscr_subscribe - create subscription for subscriber * - * Called with subscriber port locked. + * Called with subscriber lock held. */ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, struct tipc_subscriber *subscriber) @@ -304,7 +301,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, } /* Refuse subscription if global limit exceeded */ - if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { + if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { pr_warn("Subscription rejected, limit reached (%u)\n", TIPC_MAX_SUBSCRIPTIONS); subscr_terminate(subscriber); @@ -335,10 +332,10 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, } INIT_LIST_HEAD(&sub->nameseq_list); list_add(&sub->subscription_list, &subscriber->subscription_list); - sub->server_ref = subscriber->port_ref; + sub->subscriber = subscriber; sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); - atomic_inc(&topsrv.subscription_count); + atomic_inc(&subscription_count); if (sub->timeout != TIPC_WAIT_FOREVER) { k_init_timer(&sub->timer, (Handler)subscr_timeout, (unsigned long)sub); @@ -348,196 +345,51 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, return sub; } -/** - * subscr_conn_shutdown_event - handle termination request from subscriber - * - * Called with subscriber's server port unlocked. - */ -static void subscr_conn_shutdown_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - unsigned char const *data, - unsigned int size, - int reason) +/* Handle one termination request for the subscriber */ +static void subscr_conn_shutdown_event(int conid, void *usr_data) { - struct tipc_subscriber *subscriber = usr_handle; - spinlock_t *subscriber_lock; - - if (tipc_port_lock(port_ref) == NULL) - return; - - subscriber_lock = subscriber->lock; - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); + subscr_release((struct tipc_subscriber *)usr_data); } -/** - * subscr_conn_msg_event - handle new subscription request from subscriber - * - * Called with subscriber's server port unlocked. - */ -static void subscr_conn_msg_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - const unchar *data, - u32 size) +/* Handle one request to create a new subscription for the subscriber */ +static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len) { - struct tipc_subscriber *subscriber = usr_handle; - spinlock_t *subscriber_lock; + struct tipc_subscriber *subscriber = usr_data; struct tipc_subscription *sub; - /* - * Lock subscriber's server port (& make a local copy of lock pointer, - * in case subscriber is deleted while processing subscription request) - */ - if (tipc_port_lock(port_ref) == NULL) - return; - - subscriber_lock = subscriber->lock; - - if (size != sizeof(struct tipc_subscr)) { - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); - } else { - sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); - spin_unlock_bh(subscriber_lock); - if (sub != NULL) { - - /* - * We must release the server port lock before adding a - * subscription to the name table since TIPC needs to be - * able to (re)acquire the port lock if an event message - * issued by the subscription process is rejected and - * returned. The subscription cannot be deleted while - * it is being added to the name table because: - * a) the single-threading of the native API port code - * ensures the subscription cannot be cancelled and - * the subscriber connection cannot be broken, and - * b) the name table lock ensures the subscription - * timeout code cannot delete the subscription, - * so the subscription object is still protected. - */ - tipc_nametbl_subscribe(sub); - } - } + spin_lock_bh(&subscriber->lock); + sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber); + if (sub) + tipc_nametbl_subscribe(sub); + spin_unlock_bh(&subscriber->lock); } -/** - * subscr_named_msg_event - handle request to establish a new subscriber - */ -static void subscr_named_msg_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - const unchar *data, - u32 size, - u32 importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest) + +/* Handle one request to establish a new subscriber */ +static void *subscr_named_msg_event(int conid) { struct tipc_subscriber *subscriber; - u32 server_port_ref; /* Create subscriber object */ subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC); if (subscriber == NULL) { pr_warn("Subscriber rejected, no memory\n"); - return; + return NULL; } INIT_LIST_HEAD(&subscriber->subscription_list); - INIT_LIST_HEAD(&subscriber->subscriber_list); - - /* Create server port & establish connection to subscriber */ - tipc_createport(subscriber, - importance, - NULL, - NULL, - subscr_conn_shutdown_event, - NULL, - NULL, - subscr_conn_msg_event, - NULL, - &subscriber->port_ref); - if (subscriber->port_ref == 0) { - pr_warn("Subscriber rejected, unable to create port\n"); - kfree(subscriber); - return; - } - tipc_connect(subscriber->port_ref, orig); - - /* Lock server port (& save lock address for future use) */ - subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; - - /* Add subscriber to topology server's subscriber list */ - spin_lock_bh(&topsrv.lock); - list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); - spin_unlock_bh(&topsrv.lock); - - /* Unlock server port */ - server_port_ref = subscriber->port_ref; - spin_unlock_bh(subscriber->lock); - - /* Send an ACK- to complete connection handshaking */ - tipc_send(server_port_ref, 0, NULL, 0); + subscriber->conid = conid; + spin_lock_init(&subscriber->lock); - /* Handle optional subscription request */ - if (size != 0) { - subscr_conn_msg_event(subscriber, server_port_ref, - buf, data, size); - } + return (void *)subscriber; } int tipc_subscr_start(void) { - struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; - int res; - - spin_lock_init(&topsrv.lock); - INIT_LIST_HEAD(&topsrv.subscriber_list); - - res = tipc_createport(NULL, - TIPC_CRITICAL_IMPORTANCE, - NULL, - NULL, - NULL, - NULL, - subscr_named_msg_event, - NULL, - NULL, - &topsrv.setup_port); - if (res) - goto failed; - - res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); - if (res) { - tipc_deleteport(topsrv.setup_port); - topsrv.setup_port = 0; - goto failed; - } - - return 0; - -failed: - pr_err("Failed to create subscription service\n"); - return res; + return tipc_server_start(&topsrv); } void tipc_subscr_stop(void) { - struct tipc_subscriber *subscriber; - struct tipc_subscriber *subscriber_temp; - spinlock_t *subscriber_lock; - - if (topsrv.setup_port) { - tipc_deleteport(topsrv.setup_port); - topsrv.setup_port = 0; - - list_for_each_entry_safe(subscriber, subscriber_temp, - &topsrv.subscriber_list, - subscriber_list) { - subscriber_lock = subscriber->lock; - spin_lock_bh(subscriber_lock); - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); - } - } + tipc_server_stop(&topsrv); } diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 218d2e07f0cc..43e6d6332a02 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -2,7 +2,7 @@ * net/tipc/subscr.h: Include file for TIPC network topology service * * Copyright (c) 2003-2006, Ericsson AB - * Copyright (c) 2005-2007, Wind River Systems + * Copyright (c) 2005-2007, 2012-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,10 +37,14 @@ #ifndef _TIPC_SUBSCR_H #define _TIPC_SUBSCR_H +#include "server.h" + struct tipc_subscription; +struct tipc_subscriber; /** * struct tipc_subscription - TIPC network topology subscription object + * @subscriber: pointer to its subscriber * @seq: name sequence associated with subscription * @timeout: duration of subscription (in ms) * @filter: event filtering to be done for subscription @@ -52,13 +56,13 @@ struct tipc_subscription; * @evt: template for events generated by subscription */ struct tipc_subscription { + struct tipc_subscriber *subscriber; struct tipc_name_seq seq; u32 timeout; u32 filter; struct timer_list timer; struct list_head nameseq_list; struct list_head subscription_list; - u32 server_ref; int swap; struct tipc_event evt; }; -- cgit v1.2.3-70-g09d2 From 7d0ab17b74330e39a68ba33099ccda27f794f519 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 17 Jun 2013 10:54:41 -0400 Subject: tipc: convert configuration server to use new server facility As the new socket-based TIPC server infrastructure has been introduced, we can now convert the configuration server to use it. Then we can take future steps to simplify the configuration server locking policy. Some minor reordering of initialization is done, due to the dependency on having tipc_socket_init completed. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/config.c | 102 ++++++++++++++++++++++++------------------------------ net/tipc/core.c | 4 +-- net/tipc/socket.c | 3 +- 3 files changed, 49 insertions(+), 60 deletions(-) (limited to 'net/tipc/socket.c') diff --git a/net/tipc/config.c b/net/tipc/config.c index f67866c765dd..4887ae04f3a5 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -2,7 +2,7 @@ * net/tipc/config.c: TIPC configuration management code * * Copyright (c) 2002-2006, Ericsson AB - * Copyright (c) 2004-2007, 2010-2012, Wind River Systems + * Copyright (c) 2004-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,12 +38,12 @@ #include "port.h" #include "name_table.h" #include "config.h" +#include "server.h" #define REPLY_TRUNCATED "\n" -static u32 config_port_ref; - static DEFINE_SPINLOCK(config_lock); +static struct tipc_server cfgsrv; static const void *req_tlv_area; /* request message TLV area */ static int req_tlv_space; /* request message TLV area size */ @@ -381,33 +381,27 @@ exit: return rep_tlv_buf; } -static void cfg_named_msg_event(void *userdata, - u32 port_ref, - struct sk_buff **buf, - const unchar *msg, - u32 size, - u32 importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest) +static void cfg_conn_msg_event(int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len) { struct tipc_cfg_msg_hdr *req_hdr; struct tipc_cfg_msg_hdr *rep_hdr; struct sk_buff *rep_buf; + int ret; /* Validate configuration message header (ignore invalid message) */ - req_hdr = (struct tipc_cfg_msg_hdr *)msg; - if ((size < sizeof(*req_hdr)) || - (size != TCM_ALIGN(ntohl(req_hdr->tcm_len))) || + req_hdr = (struct tipc_cfg_msg_hdr *)buf; + if ((len < sizeof(*req_hdr)) || + (len != TCM_ALIGN(ntohl(req_hdr->tcm_len))) || (ntohs(req_hdr->tcm_flags) != TCM_F_REQUEST)) { pr_warn("Invalid configuration message discarded\n"); return; } /* Generate reply for request (if can't, return request) */ - rep_buf = tipc_cfg_do_cmd(orig->node, - ntohs(req_hdr->tcm_type), - msg + sizeof(*req_hdr), - size - sizeof(*req_hdr), + rep_buf = tipc_cfg_do_cmd(addr->addr.id.node, ntohs(req_hdr->tcm_type), + buf + sizeof(*req_hdr), + len - sizeof(*req_hdr), BUF_HEADROOM + MAX_H_SIZE + sizeof(*rep_hdr)); if (rep_buf) { skb_push(rep_buf, sizeof(*rep_hdr)); @@ -415,57 +409,51 @@ static void cfg_named_msg_event(void *userdata, memcpy(rep_hdr, req_hdr, sizeof(*rep_hdr)); rep_hdr->tcm_len = htonl(rep_buf->len); rep_hdr->tcm_flags &= htons(~TCM_F_REQUEST); - } else { - rep_buf = *buf; - *buf = NULL; - } - /* NEED TO ADD CODE TO HANDLE FAILED SEND (SUCH AS CONGESTION) */ - tipc_send_buf2port(port_ref, orig, rep_buf, rep_buf->len); + ret = tipc_conn_sendmsg(&cfgsrv, conid, addr, rep_buf->data, + rep_buf->len); + if (ret < 0) + pr_err("Sending cfg reply message failed, no memory\n"); + + kfree_skb(rep_buf); + } } +static struct sockaddr_tipc cfgsrv_addr __read_mostly = { + .family = AF_TIPC, + .addrtype = TIPC_ADDR_NAMESEQ, + .addr.nameseq.type = TIPC_CFG_SRV, + .addr.nameseq.lower = 0, + .addr.nameseq.upper = 0, + .scope = TIPC_ZONE_SCOPE +}; + +static struct tipc_server cfgsrv __read_mostly = { + .saddr = &cfgsrv_addr, + .imp = TIPC_CRITICAL_IMPORTANCE, + .type = SOCK_RDM, + .max_rcvbuf_size = 64 * 1024, + .name = "cfg_server", + .tipc_conn_recvmsg = cfg_conn_msg_event, + .tipc_conn_new = NULL, + .tipc_conn_shutdown = NULL +}; + int tipc_cfg_init(void) { - struct tipc_name_seq seq; - int res; - - res = tipc_createport(NULL, TIPC_CRITICAL_IMPORTANCE, - NULL, NULL, NULL, - NULL, cfg_named_msg_event, NULL, - NULL, &config_port_ref); - if (res) - goto failed; - - seq.type = TIPC_CFG_SRV; - seq.lower = seq.upper = tipc_own_addr; - res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq); - if (res) - goto failed; - - return 0; - -failed: - pr_err("Unable to create configuration service\n"); - return res; + return tipc_server_start(&cfgsrv); } void tipc_cfg_reinit(void) { - struct tipc_name_seq seq; - int res; - - seq.type = TIPC_CFG_SRV; - seq.lower = seq.upper = 0; - tipc_withdraw(config_port_ref, TIPC_ZONE_SCOPE, &seq); + tipc_server_stop(&cfgsrv); - seq.lower = seq.upper = tipc_own_addr; - res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq); - if (res) - pr_err("Unable to reinitialize configuration service\n"); + cfgsrv_addr.addr.nameseq.lower = tipc_own_addr; + cfgsrv_addr.addr.nameseq.upper = tipc_own_addr; + tipc_server_start(&cfgsrv); } void tipc_cfg_stop(void) { - tipc_deleteport(config_port_ref); - config_port_ref = 0; + tipc_server_stop(&cfgsrv); } diff --git a/net/tipc/core.c b/net/tipc/core.c index 15bbe99b609d..fd4eeeaa972a 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -136,8 +136,6 @@ static int tipc_core_start(void) res = tipc_ref_table_init(tipc_max_ports, tipc_random); if (!res) res = tipc_nametbl_init(); - if (!res) - res = tipc_cfg_init(); if (!res) res = tipc_netlink_start(); if (!res) @@ -146,6 +144,8 @@ static int tipc_core_start(void) res = tipc_register_sysctl(); if (!res) res = tipc_subscr_start(); + if (!res) + res = tipc_cfg_init(); if (res) tipc_core_stop(); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d0254157a30d..9510fe8acf45 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -403,7 +403,8 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) return -EAFNOSUPPORT; if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) && - (addr->addr.nameseq.type != TIPC_TOP_SRV)) + (addr->addr.nameseq.type != TIPC_TOP_SRV) && + (addr->addr.nameseq.type != TIPC_CFG_SRV)) return -EACCES; return (addr->scope > 0) ? -- cgit v1.2.3-70-g09d2 From 3c5db8e4eca36e4f312b49bba99f4c1f6ce0563a Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 17 Jun 2013 10:54:44 -0400 Subject: tipc: rename tipc_createport_raw to tipc_createport After the removal of the native API, there is now only one way to to create a TIPC port instance -- the function tipc_createport_raw(). We make it more readable by renaming it to tipc_createport(). Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/port.c | 4 ++-- net/tipc/port.h | 2 +- net/tipc/socket.c | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'net/tipc/socket.c') diff --git a/net/tipc/port.c b/net/tipc/port.c index f628c84a8f61..84b2a574f161 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -202,11 +202,11 @@ exit: } /** - * tipc_createport_raw - create a generic TIPC port + * tipc_createport - create a generic TIPC port * * Returns pointer to (locked) TIPC port, or NULL if unable to create it */ -struct tipc_port *tipc_createport_raw(void *usr_handle, +struct tipc_port *tipc_createport(void *usr_handle, u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), void (*wakeup)(struct tipc_port *), const u32 importance) diff --git a/net/tipc/port.h b/net/tipc/port.h index 4779f0a82234..45838826f2f8 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -106,7 +106,7 @@ struct tipc_port_list; /* * TIPC port manipulation routines */ -struct tipc_port *tipc_createport_raw(void *usr_handle, +struct tipc_port *tipc_createport(void *usr_handle, u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), void (*wakeup)(struct tipc_port *), const u32 importance); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 9510fe8acf45..67f4e1fbf5a1 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -195,8 +195,8 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, return -ENOMEM; /* Allocate TIPC port for socket to use */ - tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch, - TIPC_LOW_IMPORTANCE); + tp_ptr = tipc_createport(sk, &dispatch, &wakeupdispatch, + TIPC_LOW_IMPORTANCE); if (unlikely(!tp_ptr)) { sk_free(sk); return -ENOMEM; -- cgit v1.2.3-70-g09d2 From c0fee8aca7206264d5e3dcc4e60aaf86501f4ea1 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 17 Jun 2013 10:54:46 -0400 Subject: tipc: save sock structure pointer instead of void pointer to tipc_port Directly save sock structure pointer instead of void pointer to avoid unnecessary cast conversions. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/port.c | 4 ++-- net/tipc/port.h | 6 +++--- net/tipc/socket.c | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) (limited to 'net/tipc/socket.c') diff --git a/net/tipc/port.c b/net/tipc/port.c index 84b2a574f161..0bb185a3ed4a 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -206,7 +206,7 @@ exit: * * Returns pointer to (locked) TIPC port, or NULL if unable to create it */ -struct tipc_port *tipc_createport(void *usr_handle, +struct tipc_port *tipc_createport(struct sock *sk, u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), void (*wakeup)(struct tipc_port *), const u32 importance) @@ -227,7 +227,7 @@ struct tipc_port *tipc_createport(void *usr_handle, return NULL; } - p_ptr->usr_handle = usr_handle; + p_ptr->sk = sk; p_ptr->max_pkt = MAX_PKT_DEFAULT; p_ptr->ref = ref; INIT_LIST_HEAD(&p_ptr->wait_list); diff --git a/net/tipc/port.h b/net/tipc/port.h index 45838826f2f8..241f529db942 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -48,7 +48,7 @@ /** * struct tipc_port - TIPC port structure - * @usr_handle: pointer to additional user-defined information about port + * @sk: pointer to socket handle * @lock: pointer to spinlock for controlling access to port * @connected: non-zero if port is currently connected to a peer port * @conn_type: TIPC type used when connection was established @@ -74,7 +74,7 @@ * @subscription: "node down" subscription used to terminate failed connections */ struct tipc_port { - void *usr_handle; + struct sock *sk; spinlock_t *lock; int connected; u32 conn_type; @@ -106,7 +106,7 @@ struct tipc_port_list; /* * TIPC port manipulation routines */ -struct tipc_port *tipc_createport(void *usr_handle, +struct tipc_port *tipc_createport(struct sock *sk, u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), void (*wakeup)(struct tipc_port *), const u32 importance); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 67f4e1fbf5a1..14ed54e961b6 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1406,7 +1406,7 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf) */ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) { - struct sock *sk = (struct sock *)tport->usr_handle; + struct sock *sk = tport->sk; u32 res; /* @@ -1437,7 +1437,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) */ static void wakeupdispatch(struct tipc_port *tport) { - struct sock *sk = (struct sock *)tport->usr_handle; + struct sock *sk = tport->sk; sk->sk_write_space(sk); } -- cgit v1.2.3-70-g09d2 From ae8509c420122866344bde1241e31858d0aa2fbc Mon Sep 17 00:00:00 2001 From: Paul Gortmaker Date: Mon, 17 Jun 2013 10:54:47 -0400 Subject: tipc: cosmetic realignment of function arguments No runtime code changes here. Just a realign of the function arguments to start where the 1st one was, and fit as many args as can be put in an 80 char line. Signed-off-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/bcast.c | 3 +-- net/tipc/bcast.h | 3 ++- net/tipc/discover.c | 7 +++---- net/tipc/eth_media.c | 2 +- net/tipc/link.c | 18 +++++++----------- net/tipc/msg.c | 4 ++-- net/tipc/msg.h | 4 ++-- net/tipc/name_table.c | 10 +++++----- net/tipc/name_table.h | 11 ++++++----- net/tipc/node_subscr.c | 2 +- net/tipc/port.c | 7 ++++--- net/tipc/port.h | 10 ++++++---- net/tipc/socket.c | 12 ++++++------ net/tipc/subscr.c | 14 ++++---------- net/tipc/subscr.h | 13 ++++--------- 15 files changed, 54 insertions(+), 66 deletions(-) (limited to 'net/tipc/socket.c') diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index e5f3da507823..716de1ac6cb5 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -578,8 +578,7 @@ u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) * Returns 0 (packet sent successfully) under all circumstances, * since the broadcast link's pseudo-bearer never blocks */ -static int tipc_bcbearer_send(struct sk_buff *buf, - struct tipc_bearer *unused1, +static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, struct tipc_media_addr *unused2) { int bp_index; diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index a93306557e00..6ee587b469fd 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -75,7 +75,8 @@ void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); /** * tipc_nmap_equal - test for equality of node maps */ -static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, struct tipc_node_map *nm_b) +static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, + struct tipc_node_map *nm_b) { return !memcmp(nm_a, nm_b, sizeof(*nm_a)); } diff --git a/net/tipc/discover.c b/net/tipc/discover.c index eedff58d0387..ecc758c6eacf 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -70,8 +70,7 @@ struct tipc_link_req { * @dest_domain: network domain of node(s) which should respond to message * @b_ptr: ptr to bearer issuing message */ -static struct sk_buff *tipc_disc_init_msg(u32 type, - u32 dest_domain, +static struct sk_buff *tipc_disc_init_msg(u32 type, u32 dest_domain, struct tipc_bearer *b_ptr) { struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE); @@ -346,8 +345,8 @@ exit: * * Returns 0 if successful, otherwise -errno. */ -int tipc_disc_create(struct tipc_bearer *b_ptr, - struct tipc_media_addr *dest, u32 dest_domain) +int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, + u32 dest_domain) { struct tipc_link_req *req; diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c index fc60bea63169..c1aa37fdca2f 100644 --- a/net/tipc/eth_media.c +++ b/net/tipc/eth_media.c @@ -62,7 +62,7 @@ static struct eth_bearer eth_bearers[MAX_ETH_BEARERS]; static int eth_started; static int recv_notification(struct notifier_block *nb, unsigned long evt, - void *dv); + void *dv); /* * Network device notifier info */ diff --git a/net/tipc/link.c b/net/tipc/link.c index d34429d03c16..b852c94a784e 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -771,8 +771,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) * link_bundle_buf(): Append contents of a buffer to * the tail of an existing one. */ -static int link_bundle_buf(struct tipc_link *l_ptr, - struct sk_buff *bundler, +static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler, struct sk_buff *buf) { struct tipc_msg *bundler_msg = buf_msg(bundler); @@ -1064,8 +1063,7 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf, */ int tipc_link_send_sections_fast(struct tipc_port *sender, struct iovec const *msg_sect, - const u32 num_sect, - unsigned int total_len, + const u32 num_sect, unsigned int total_len, u32 destaddr) { struct tipc_msg *hdr = &sender->phdr; @@ -1155,8 +1153,7 @@ exit: */ static int link_send_sections_long(struct tipc_port *sender, struct iovec const *msg_sect, - u32 num_sect, - unsigned int total_len, + u32 num_sect, unsigned int total_len, u32 destaddr) { struct tipc_link *l_ptr; @@ -1408,7 +1405,7 @@ static void link_reset_all(unsigned long addr) } static void link_retransmit_failure(struct tipc_link *l_ptr, - struct sk_buff *buf) + struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); @@ -1863,8 +1860,8 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, * Send protocol message to the other endpoint. */ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, - int probe_msg, u32 gap, u32 tolerance, - u32 priority, u32 ack_mtu) + int probe_msg, u32 gap, u32 tolerance, + u32 priority, u32 ack_mtu) { struct sk_buff *buf = NULL; struct tipc_msg *msg = l_ptr->pmsg; @@ -2107,8 +2104,7 @@ exit: * another bearer. Owner node is locked. */ static void tipc_link_tunnel(struct tipc_link *l_ptr, - struct tipc_msg *tunnel_hdr, - struct tipc_msg *msg, + struct tipc_msg *tunnel_hdr, struct tipc_msg *msg, u32 selector) { struct tipc_link *tunnel; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index c2a261322515..ced60e2fc4f7 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -51,8 +51,8 @@ u32 tipc_msg_tot_importance(struct tipc_msg *m) } -void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, - u32 hsize, u32 destnode) +void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, + u32 destnode) { memset(m, 0, hsize); msg_set_version(m); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 511019a77c9c..5e4ccf5c27df 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -719,8 +719,8 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) } u32 tipc_msg_tot_importance(struct tipc_msg *m); -void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, - u32 hsize, u32 destnode); +void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, + u32 destnode); int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, u32 num_sect, unsigned int total_len, int max_size, struct sk_buff **buf); diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 24b167914311..09dcd54b04e1 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -440,7 +440,7 @@ found: * sequence overlapping with the requested sequence */ static void tipc_nameseq_subscribe(struct name_seq *nseq, - struct tipc_subscription *s) + struct tipc_subscription *s) { struct sub_seq *sseq = nseq->sseqs; @@ -662,7 +662,7 @@ exit: * tipc_nametbl_publish - add name publication to network name tables */ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, - u32 scope, u32 port_ref, u32 key) + u32 scope, u32 port_ref, u32 key) { struct publication *publ; @@ -753,7 +753,7 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s) * subseq_list - print specified sub-sequence contents into the given buffer */ static int subseq_list(struct sub_seq *sseq, char *buf, int len, u32 depth, - u32 index) + u32 index) { char portIdStr[27]; const char *scope_str[] = {"", " zone", " cluster", " node"}; @@ -792,7 +792,7 @@ static int subseq_list(struct sub_seq *sseq, char *buf, int len, u32 depth, * nameseq_list - print specified name sequence contents into the given buffer */ static int nameseq_list(struct name_seq *seq, char *buf, int len, u32 depth, - u32 type, u32 lowbound, u32 upbound, u32 index) + u32 type, u32 lowbound, u32 upbound, u32 index) { struct sub_seq *sseq; char typearea[11]; @@ -849,7 +849,7 @@ static int nametbl_header(char *buf, int len, u32 depth) * nametbl_list - print specified name table contents into the given buffer */ static int nametbl_list(char *buf, int len, u32 depth_info, - u32 type, u32 lowbound, u32 upbound) + u32 type, u32 lowbound, u32 upbound) { struct hlist_head *seq_head; struct name_seq *seq; diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 71cb4dc712df..f02f48b9a216 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -87,14 +87,15 @@ extern rwlock_t tipc_nametbl_lock; struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space); u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *node); int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit, - struct tipc_port_list *dports); + struct tipc_port_list *dports); struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, - u32 scope, u32 port_ref, u32 key); + u32 scope, u32 port_ref, u32 key); int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key); struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper, - u32 scope, u32 node, u32 ref, u32 key); -struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower, - u32 node, u32 ref, u32 key); + u32 scope, u32 node, u32 ref, + u32 key); +struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower, u32 node, + u32 ref, u32 key); void tipc_nametbl_subscribe(struct tipc_subscription *s); void tipc_nametbl_unsubscribe(struct tipc_subscription *s); int tipc_nametbl_init(void); diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c index 5e34b015da45..8a7384c04add 100644 --- a/net/tipc/node_subscr.c +++ b/net/tipc/node_subscr.c @@ -42,7 +42,7 @@ * tipc_nodesub_subscribe - create "node down" subscription for specified node */ void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, - void *usr_handle, net_ev_handler handle_down) + void *usr_handle, net_ev_handler handle_down) { if (in_own_node(addr)) { node_sub->node = NULL; diff --git a/net/tipc/port.c b/net/tipc/port.c index 0bb185a3ed4a..b3ed2fcab4fb 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -207,9 +207,10 @@ exit: * Returns pointer to (locked) TIPC port, or NULL if unable to create it */ struct tipc_port *tipc_createport(struct sock *sk, - u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), - void (*wakeup)(struct tipc_port *), - const u32 importance) + u32 (*dispatcher)(struct tipc_port *, + struct sk_buff *), + void (*wakeup)(struct tipc_port *), + const u32 importance) { struct tipc_port *p_ptr; struct tipc_msg *msg; diff --git a/net/tipc/port.h b/net/tipc/port.h index 241f529db942..5a7026b9c345 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -107,8 +107,10 @@ struct tipc_port_list; * TIPC port manipulation routines */ struct tipc_port *tipc_createport(struct sock *sk, - u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), - void (*wakeup)(struct tipc_port *), const u32 importance); + u32 (*dispatcher)(struct tipc_port *, + struct sk_buff *), + void (*wakeup)(struct tipc_port *), + const u32 importance); int tipc_reject_msg(struct sk_buff *buf, u32 err); @@ -126,9 +128,9 @@ int tipc_portunreturnable(u32 portref, unsigned int *isunreturnable); int tipc_set_portunreturnable(u32 portref, unsigned int isunreturnable); int tipc_publish(u32 portref, unsigned int scope, - struct tipc_name_seq const *name_seq); + struct tipc_name_seq const *name_seq); int tipc_withdraw(u32 portref, unsigned int scope, - struct tipc_name_seq const *name_seq); + struct tipc_name_seq const *name_seq); int tipc_connect(u32 portref, struct tipc_portid const *port); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 14ed54e961b6..ce8249c76827 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -278,7 +278,7 @@ void tipc_sock_release_local(struct socket *sock) */ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, - int flags) + int flags) { struct sock *sk = sock->sk; int ret; @@ -889,7 +889,7 @@ static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) * Returns 0 if successful, otherwise errno */ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, - struct tipc_port *tport) + struct tipc_port *tport) { u32 anc_data[3]; u32 err; @@ -1736,8 +1736,8 @@ restart: * * Returns 0 on success, errno otherwise */ -static int setsockopt(struct socket *sock, - int lvl, int opt, char __user *ov, unsigned int ol) +static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov, + unsigned int ol) { struct sock *sk = sock->sk; struct tipc_port *tport = tipc_sk_port(sk); @@ -1795,8 +1795,8 @@ static int setsockopt(struct socket *sock, * * Returns 0 on success, errno otherwise */ -static int getsockopt(struct socket *sock, - int lvl, int opt, char __user *ov, int __user *ol) +static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov, + int __user *ol) { struct sock *sk = sock->sk; struct tipc_port *tport = tipc_sk_port(sk); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index f6be92a6973a..d38bb45d82e9 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -117,10 +117,8 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower, * * Returns 1 if there is overlap, otherwise 0. */ -int tipc_subscr_overlap(struct tipc_subscription *sub, - u32 found_lower, +int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower, u32 found_upper) - { if (found_lower < sub->seq.lower) found_lower = sub->seq.lower; @@ -136,13 +134,9 @@ int tipc_subscr_overlap(struct tipc_subscription *sub, * * Protected by nameseq.lock in name_table.c */ -void tipc_subscr_report_overlap(struct tipc_subscription *sub, - u32 found_lower, - u32 found_upper, - u32 event, - u32 port_ref, - u32 node, - int must) +void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower, + u32 found_upper, u32 event, u32 port_ref, + u32 node, int must) { if (!tipc_subscr_overlap(sub, found_lower, found_upper)) return; diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 43e6d6332a02..393e417bee3f 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -67,17 +67,12 @@ struct tipc_subscription { struct tipc_event evt; }; -int tipc_subscr_overlap(struct tipc_subscription *sub, - u32 found_lower, +int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower, u32 found_upper); -void tipc_subscr_report_overlap(struct tipc_subscription *sub, - u32 found_lower, - u32 found_upper, - u32 event, - u32 port_ref, - u32 node, - int must_report); +void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower, + u32 found_upper, u32 event, u32 port_ref, + u32 node, int must); int tipc_subscr_start(void); -- cgit v1.2.3-70-g09d2