From c901d26d4a8137f3ad0e5865d331f7c63c42d9f9 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:43 +0100 Subject: tipc: remove unnecessary function pointers Interaction between the functionality in server.c and subscr.c is done via function pointers installed in struct server. This makes the code harder to follow, and doesn't serve any obvious purpose. Here, we replace the function pointers with direct function calls. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/subscr.h | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index f3edca775d9f..a736f29ba9ab 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -67,6 +67,10 @@ struct tipc_subscription { struct tipc_event evt; }; +struct tipc_subscriber *tipc_subscrb_create(int conid); +void tipc_subscrb_delete(struct tipc_subscriber *subscriber); +int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, + void *buf, size_t len); int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); void tipc_subscrp_report_overlap(struct tipc_subscription *sub, -- cgit v1.2.3-70-g09d2 From df79d040dcd7d7e580c50edf40b82e677fe84801 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:44 +0100 Subject: tipc: eliminate struct tipc_subscriber It is unnecessary to keep two structures, struct tipc_conn and struct tipc_subscriber, with a one-to-one relationship and still with different life cycles. The fact that the two often run in different contexts, and still may access each other via direct pointers constitutes an additional hazard, something we have experienced at several occasions, and still see happening. We have identified at least two remaining problems that are easier to fix if we simplify the topology server data structure somewhat. - When there is a race between a subscription up/down event and a timeout event, it is fully possible that the former might be delivered after the latter, leading to confusion for the receiver. - The function tipc_subcrp_timeout() is executing in interrupt context, while the following call chain is at least theoretically possible: tipc_subscrp_timeout() tipc_subscrp_send_event() tipc_conn_sendmsg() conn_put() tipc_conn_kref_release() sock_release(sock) I.e., we end up calling a function that might try to sleep in interrupt context. To eliminate this, we need to ensure that the tipc_conn structure and the socket, as well as the subscription instances, only are deleted in work queue context, i.e., after the timeout event really has been sent out. We now remove this unnecessary complexity, by merging data and functionality of the subscriber structure into struct tipc_conn and the associated file server.c. We thereafter add a spinlock and a new 'inactive' state to the subscription structure. Using those, both problems described above can be easily solved. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/server.c | 161 ++++++++++++++++++++++++++++++++------------------ net/tipc/server.h | 2 +- net/tipc/subscr.c | 173 ++++++++++-------------------------------------------- net/tipc/subscr.h | 17 +++--- 4 files changed, 146 insertions(+), 207 deletions(-) (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/server.c b/net/tipc/server.c index 8aa2a33b1e48..b8268c0882a7 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -2,6 +2,7 @@ * net/tipc/server.c: TIPC server infrastructure * * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017, Ericsson AB * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -57,12 +58,13 @@ * @sock: socket handler associated with connection * @flags: indicates connection state * @server: pointer to connected server + * @sub_list: lsit to all pertaing subscriptions + * @sub_lock: lock protecting the subscription list + * @outqueue_lock: control access to the outqueue * @rwork: receive work item - * @usr_data: user-specified field * @rx_action: what to do when connection socket is active * @outqueue: pointer to first outbound message in queue * @outqueue_lock: control access to the outqueue - * @outqueue: list of connection objects for its server * @swork: send work item */ struct tipc_conn { @@ -71,9 +73,10 @@ struct tipc_conn { struct socket *sock; unsigned long flags; struct tipc_server *server; + struct list_head sub_list; + spinlock_t sub_lock; /* for subscription list */ struct work_struct rwork; int (*rx_action) (struct tipc_conn *con); - void *usr_data; struct list_head outqueue; spinlock_t outqueue_lock; struct work_struct swork; @@ -81,6 +84,7 @@ struct tipc_conn { /* An entry waiting to be sent */ struct outqueue_entry { + u32 evt; struct list_head list; struct kvec iov; }; @@ -89,18 +93,33 @@ static void tipc_recv_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work); static void tipc_clean_outqueues(struct tipc_conn *con); +static bool connected(struct tipc_conn *con) +{ + return con && test_bit(CF_CONNECTED, &con->flags); +} + +/** + * htohl - convert value to endianness used by destination + * @in: value to convert + * @swap: non-zero if endianness must be reversed + * + * Returns converted value + */ +static u32 htohl(u32 in, int swap) +{ + return swap ? swab32(in) : in; +} + static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); struct tipc_server *s = con->server; struct socket *sock = con->sock; - struct sock *sk; if (sock) { - sk = sock->sk; if (test_bit(CF_SERVER, &con->flags)) { __module_get(sock->ops->owner); - __module_get(sk->sk_prot_creator->owner); + __module_get(sock->sk->sk_prot_creator->owner); } sock_release(sock); con->sock = NULL; @@ -129,11 +148,8 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) spin_lock_bh(&s->idr_lock); con = idr_find(&s->conn_idr, conid); - if (con) { - if (!test_bit(CF_CONNECTED, &con->flags) || - !kref_get_unless_zero(&con->kref)) - con = NULL; - } + if (!connected(con) || !kref_get_unless_zero(&con->kref)) + con = NULL; spin_unlock_bh(&s->idr_lock); return con; } @@ -144,7 +160,7 @@ static void sock_data_ready(struct sock *sk) read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); - if (con && test_bit(CF_CONNECTED, &con->flags)) { + if (connected(con)) { conn_get(con); if (!queue_work(con->server->rcv_wq, &con->rwork)) conn_put(con); @@ -158,7 +174,7 @@ static void sock_write_space(struct sock *sk) read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); - if (con && test_bit(CF_CONNECTED, &con->flags)) { + if (connected(con)) { conn_get(con); if (!queue_work(con->server->send_wq, &con->swork)) conn_put(con); @@ -181,6 +197,24 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) write_unlock_bh(&sk->sk_callback_lock); } +/* tipc_con_delete_sub - delete a specific or all subscriptions + * for a given subscriber + */ +static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) +{ + struct list_head *sub_list = &con->sub_list; + struct tipc_subscription *sub, *tmp; + + spin_lock_bh(&con->sub_lock); + list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) { + if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) + tipc_sub_delete(sub); + else if (s) + break; + } + spin_unlock_bh(&con->sub_lock); +} + static void tipc_close_conn(struct tipc_conn *con) { struct sock *sk = con->sock->sk; @@ -188,10 +222,11 @@ static void tipc_close_conn(struct tipc_conn *con) write_lock_bh(&sk->sk_callback_lock); disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); + if (disconnect) { sk->sk_user_data = NULL; if (con->conid) - tipc_subscrb_delete(con->usr_data); + tipc_con_delete_sub(con, NULL); } write_unlock_bh(&sk->sk_callback_lock); @@ -215,7 +250,9 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) kref_init(&con->kref); INIT_LIST_HEAD(&con->outqueue); + INIT_LIST_HEAD(&con->sub_list); spin_lock_init(&con->outqueue_lock); + spin_lock_init(&con->sub_lock); INIT_WORK(&con->swork, tipc_send_work); INIT_WORK(&con->rwork, tipc_recv_work); @@ -236,6 +273,35 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) return con; } +int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, + void *buf, size_t len) +{ + struct tipc_subscr *s = (struct tipc_subscr *)buf; + struct tipc_subscription *sub; + bool status; + int swap; + + /* Determine subscriber's endianness */ + swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | + TIPC_SUB_CANCEL)); + + /* Detect & process a subscription cancellation request */ + if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { + s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); + tipc_con_delete_sub(con, s); + return 0; + } + status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); + sub = tipc_subscrp_subscribe(net, s, conid, swap, status); + if (!sub) + return -1; + + spin_lock_bh(&con->sub_lock); + list_add(&sub->subscrp_list, &con->sub_list); + spin_unlock_bh(&con->sub_lock); + return 0; +} + static int tipc_receive_from_sock(struct tipc_conn *con) { struct tipc_server *s = con->server; @@ -262,9 +328,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con) } read_lock_bh(&sk->sk_callback_lock); - if (test_bit(CF_CONNECTED, &con->flags)) - ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid, - con->usr_data, buf, ret); + ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret); read_unlock_bh(&sk->sk_callback_lock); kmem_cache_free(s->rcvbuf_cache, buf); if (ret < 0) @@ -302,15 +366,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con) newcon->rx_action = tipc_receive_from_sock; tipc_register_callbacks(newsock, newcon); - /* Notify that new connection is incoming */ - newcon->usr_data = tipc_subscrb_create(newcon->conid); - - if (!newcon->usr_data) { - sock_release(newsock); - conn_put(newcon); - return -ENOMEM; - } - /* Wake up receive process in case of 'SYN+' message */ newsock->sk->sk_data_ready(newsock->sk); return ret; @@ -427,7 +482,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con) } int tipc_conn_sendmsg(struct tipc_server *s, int conid, - void *data, size_t len) + u32 evt, void *data, size_t len) { struct outqueue_entry *e; struct tipc_conn *con; @@ -436,7 +491,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, if (!con) return -EINVAL; - if (!test_bit(CF_CONNECTED, &con->flags)) { + if (!connected(con)) { conn_put(con); return 0; } @@ -446,7 +501,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, conn_put(con); return -ENOMEM; } - + e->evt = evt; spin_lock_bh(&con->outqueue_lock); list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); @@ -470,10 +525,9 @@ void tipc_conn_terminate(struct tipc_server *s, int conid) bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid) { - struct tipc_subscriber *scbr; struct tipc_subscr sub; - struct tipc_server *s; struct tipc_conn *con; + int rc; sub.seq.type = type; sub.seq.lower = lower; @@ -487,32 +541,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, return false; *conid = con->conid; - s = con->server; - scbr = tipc_subscrb_create(*conid); - if (!scbr) { - conn_put(con); - return false; - } - - con->usr_data = scbr; con->sock = NULL; - tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub)); - return true; + rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub)); + if (rc < 0) + tipc_close_conn(con); + return !rc; } void tipc_topsrv_kern_unsubscr(struct net *net, int conid) { struct tipc_conn *con; - struct tipc_server *srv; con = tipc_conn_lookup(tipc_topsrv(net), conid); if (!con) return; test_and_clear_bit(CF_CONNECTED, &con->flags); - srv = con->server; - if (con->conid) - tipc_subscrb_delete(con->usr_data); + tipc_con_delete_sub(con, NULL); conn_put(con); conn_put(con); } @@ -537,7 +582,8 @@ static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) static void tipc_send_to_sock(struct tipc_conn *con) { - struct tipc_server *s = con->server; + struct list_head *queue = &con->outqueue; + struct tipc_server *srv = con->server; struct outqueue_entry *e; struct tipc_event *evt; struct msghdr msg; @@ -545,16 +591,20 @@ static void tipc_send_to_sock(struct tipc_conn *con) int ret; spin_lock_bh(&con->outqueue_lock); - while (test_bit(CF_CONNECTED, &con->flags)) { - e = list_entry(con->outqueue.next, struct outqueue_entry, list); - if ((struct list_head *) e == &con->outqueue) - break; + + while (!list_empty(queue)) { + e = list_first_entry(queue, struct outqueue_entry, list); spin_unlock_bh(&con->outqueue_lock); + if (e->evt == TIPC_SUBSCR_TIMEOUT) { + evt = (struct tipc_event *)e->iov.iov_base; + tipc_con_delete_sub(con, &evt->s); + } + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + if (con->sock) { - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT; ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, e->iov.iov_len); if (ret == -EWOULDBLOCK || ret == 0) { @@ -565,7 +615,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) } } else { evt = e->iov.iov_base; - tipc_send_kern_top_evt(s->net, evt); + tipc_send_kern_top_evt(srv->net, evt); } /* Don't starve users filling buffers */ @@ -573,7 +623,6 @@ static void tipc_send_to_sock(struct tipc_conn *con) cond_resched(); count = 0; } - spin_lock_bh(&con->outqueue_lock); list_del(&e->list); tipc_free_entry(e); @@ -591,7 +640,7 @@ static void tipc_recv_work(struct work_struct *work) struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); int count = 0; - while (test_bit(CF_CONNECTED, &con->flags)) { + while (connected(con)) { if (con->rx_action(con)) break; @@ -608,7 +657,7 @@ static void tipc_send_work(struct work_struct *work) { struct tipc_conn *con = container_of(work, struct tipc_conn, swork); - if (test_bit(CF_CONNECTED, &con->flags)) + if (connected(con)) tipc_send_to_sock(con); conn_put(con); diff --git a/net/tipc/server.h b/net/tipc/server.h index b4b83bdc8b20..fcc72321362d 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -77,7 +77,7 @@ struct tipc_server { }; int tipc_conn_sendmsg(struct tipc_server *s, int conid, - void *data, size_t len); + u32 evt, void *data, size_t len); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index b86fbbf7a0b9..c6de1452db69 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -1,7 +1,7 @@ /* * net/tipc/subscr.c: TIPC network topology service * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2017, Ericsson AB * Copyright (c) 2005-2007, 2010-2013, Wind River Systems * All rights reserved. * @@ -38,22 +38,6 @@ #include "name_table.h" #include "subscr.h" -/** - * struct tipc_subscriber - TIPC network topology subscriber - * @kref: reference counter to tipc_subscription object - * @conid: connection identifier to server connecting to subscriber - * @lock: control access to subscriber - * @subscrp_list: list of subscription objects for this subscriber - */ -struct tipc_subscriber { - struct kref kref; - int conid; - spinlock_t lock; - struct list_head subscrp_list; -}; - -static void tipc_subscrb_put(struct tipc_subscriber *subscriber); - /** * htohl - convert value to endianness used by destination * @in: value to convert @@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, u32 event, u32 port_ref, u32 node) { struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct tipc_subscriber *subscriber = sub->subscriber; struct kvec msg_sect; + if (sub->inactive) + return; msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_len = sizeof(struct tipc_event); sub->evt.event = htohl(event, sub->swap); @@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.node = htohl(node, sub->swap); - tipc_conn_sendmsg(tn->topsrv, subscriber->conid, + tipc_conn_sendmsg(tn->topsrv, sub->conid, event, msg_sect.iov_base, msg_sect.iov_len); } @@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, return; if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) return; - - tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, - node); + spin_lock(&sub->lock); + tipc_subscrp_send_event(sub, found_lower, found_upper, + event, port_ref, node); + spin_unlock(&sub->lock); } static void tipc_subscrp_timeout(struct timer_list *t) { struct tipc_subscription *sub = from_timer(sub, t, timer); - struct tipc_subscriber *subscriber = sub->subscriber; - - spin_lock_bh(&subscriber->lock); - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscrp_list); - spin_unlock_bh(&subscriber->lock); + struct tipc_subscr *s = &sub->evt.s; - /* Notify subscriber of timeout */ - tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, + spin_lock(&sub->lock); + tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper, TIPC_SUBSCR_TIMEOUT, 0, 0); - - tipc_subscrp_put(sub); -} - -static void tipc_subscrb_kref_release(struct kref *kref) -{ - kfree(container_of(kref,struct tipc_subscriber, kref)); -} - -static void tipc_subscrb_put(struct tipc_subscriber *subscriber) -{ - kref_put(&subscriber->kref, tipc_subscrb_kref_release); -} - -static void tipc_subscrb_get(struct tipc_subscriber *subscriber) -{ - kref_get(&subscriber->kref); + sub->inactive = true; + spin_unlock(&sub->lock); } static void tipc_subscrp_kref_release(struct kref *kref) @@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref) struct tipc_subscription, kref); struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct tipc_subscriber *subscriber = sub->subscriber; atomic_dec(&tn->subscription_count); kfree(sub); - tipc_subscrb_put(subscriber); } void tipc_subscrp_put(struct tipc_subscription *subscription) @@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -/* tipc_subscrb_subscrp_delete - delete a specific subscription or all - * subscriptions for a given subscriber. - */ -static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber, - struct tipc_subscr *s) -{ - struct list_head *subscription_list = &subscriber->subscrp_list; - struct tipc_subscription *sub, *temp; - u32 timeout; - - spin_lock_bh(&subscriber->lock); - list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) { - if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) - continue; - - timeout = htohl(sub->evt.s.timeout, sub->swap); - if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) { - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscrp_list); - tipc_subscrp_put(sub); - } - - if (s) - break; - } - spin_unlock_bh(&subscriber->lock); -} - -struct tipc_subscriber *tipc_subscrb_create(int conid) -{ - struct tipc_subscriber *subscriber; - - subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC); - if (!subscriber) { - pr_warn("Subscriber rejected, no memory\n"); - return NULL; - } - INIT_LIST_HEAD(&subscriber->subscrp_list); - kref_init(&subscriber->kref); - subscriber->conid = conid; - spin_lock_init(&subscriber->lock); - - return subscriber; -} - -void tipc_subscrb_delete(struct tipc_subscriber *subscriber) -{ - tipc_subscrb_subscrp_delete(subscriber, NULL); - tipc_subscrb_put(subscriber); -} - -static void tipc_subscrp_cancel(struct tipc_subscr *s, - struct tipc_subscriber *subscriber) -{ - tipc_subscrb_get(subscriber); - tipc_subscrb_subscrp_delete(subscriber, s); - tipc_subscrb_put(subscriber); -} - static struct tipc_subscription *tipc_subscrp_create(struct net *net, struct tipc_subscr *s, - int swap) + int conid, bool swap) { struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_subscription *sub; @@ -275,6 +180,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, /* Initialize subscription object */ sub->net = net; + sub->conid = conid; + sub->inactive = false; if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { pr_warn("Subscription rejected, illegal request\n"); @@ -284,59 +191,39 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(*s)); + spin_lock_init(&sub->lock); atomic_inc(&tn->subscription_count); kref_init(&sub->kref); return sub; } -static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, - struct tipc_subscriber *subscriber, int swap, - bool status) +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, + struct tipc_subscr *s, + int conid, bool swap, + bool status) { struct tipc_subscription *sub = NULL; u32 timeout; - sub = tipc_subscrp_create(net, s, swap); + sub = tipc_subscrp_create(net, s, conid, swap); if (!sub) - return -1; + return NULL; - spin_lock_bh(&subscriber->lock); - list_add(&sub->subscrp_list, &subscriber->subscrp_list); - sub->subscriber = subscriber; tipc_nametbl_subscribe(sub, status); - tipc_subscrb_get(subscriber); - spin_unlock_bh(&subscriber->lock); - timer_setup(&sub->timer, tipc_subscrp_timeout, 0); timeout = htohl(sub->evt.s.timeout, swap); - if (timeout != TIPC_WAIT_FOREVER) mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); - return 0; + return sub; } -/* Handle one request to create a new subscription for the subscriber - */ -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, - void *buf, size_t len) +void tipc_sub_delete(struct tipc_subscription *sub) { - struct tipc_subscriber *subscriber = usr_data; - struct tipc_subscr *s = (struct tipc_subscr *)buf; - bool status; - int swap; - - /* Determine subscriber's endianness */ - swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | - TIPC_SUB_CANCEL)); - - /* Detect & process a subscription cancellation request */ - if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { - s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); - tipc_subscrp_cancel(s, subscriber); - return 0; - } - status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); - return tipc_subscrp_subscribe(net, s, subscriber, swap, status); + tipc_nametbl_unsubscribe(sub); + if (sub->evt.s.timeout != TIPC_WAIT_FOREVER) + del_timer_sync(&sub->timer); + list_del(&sub->subscrp_list); + tipc_subscrp_put(sub); } int tipc_topsrv_start(struct net *net) diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index a736f29ba9ab..cfd0cb3a1da8 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -43,7 +43,7 @@ #define TIPC_MAX_PUBLICATIONS 65535 struct tipc_subscription; -struct tipc_subscriber; +struct tipc_conn; /** * struct tipc_subscription - TIPC network topology subscription object @@ -58,19 +58,22 @@ struct tipc_subscriber; */ struct tipc_subscription { struct kref kref; - struct tipc_subscriber *subscriber; struct net *net; struct timer_list timer; struct list_head nameseq_list; struct list_head subscrp_list; - int swap; struct tipc_event evt; + int conid; + bool swap; + bool inactive; + spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscriber *tipc_subscrb_create(int conid); -void tipc_subscrb_delete(struct tipc_subscriber *subscriber); -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, - void *buf, size_t len); +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, + struct tipc_subscr *s, + int conid, bool swap, + bool status); +void tipc_sub_delete(struct tipc_subscription *sub); int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); void tipc_subscrp_report_overlap(struct tipc_subscription *sub, -- cgit v1.2.3-70-g09d2 From 414574a0af36d329f560f542e650cc4a81cc1d69 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:45 +0100 Subject: tipc: simplify interaction between subscription and topology connection The message transmission and reception in the topology server is more generic than is currently necessary. By basing the funtionality on the fact that we only send items of type struct tipc_event and always receive items of struct tipc_subcr we can make several simplifications, and also get rid of some unnecessary dynamic memory allocations. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/name_table.c | 10 +-- net/tipc/server.c | 170 +++++++++++++++++--------------------------------- net/tipc/server.h | 12 +--- net/tipc/subscr.c | 40 ++++++------ net/tipc/subscr.h | 5 +- 5 files changed, 88 insertions(+), 149 deletions(-) (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index ed0457cc99d6..c0ca7be46f7a 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -810,14 +810,15 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, */ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) { - struct tipc_net *tn = net_generic(s->net, tipc_net_id); + struct tipc_server *srv = s->server; + struct tipc_net *tn = tipc_net(srv->net); u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); int index = hash(type); struct name_seq *seq; struct tipc_name_seq ns; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(s->net, type); + seq = nametbl_find_seq(srv->net, type); if (!seq) seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); if (seq) { @@ -837,12 +838,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) */ void tipc_nametbl_unsubscribe(struct tipc_subscription *s) { - struct tipc_net *tn = net_generic(s->net, tipc_net_id); + struct tipc_server *srv = s->server; + struct tipc_net *tn = tipc_net(srv->net); struct name_seq *seq; u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(s->net, type); + seq = nametbl_find_seq(srv->net, type); if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&s->nameseq_list); diff --git a/net/tipc/server.c b/net/tipc/server.c index b8268c0882a7..7933fb92d852 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -84,9 +84,9 @@ struct tipc_conn { /* An entry waiting to be sent */ struct outqueue_entry { - u32 evt; + bool inactive; + struct tipc_event evt; struct list_head list; - struct kvec iov; }; static void tipc_recv_work(struct work_struct *work); @@ -154,6 +154,9 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) return con; } +/* sock_data_ready - interrupt callback indicating the socket has data to read + * The queued job is launched in tipc_recv_from_sock() + */ static void sock_data_ready(struct sock *sk) { struct tipc_conn *con; @@ -168,6 +171,10 @@ static void sock_data_ready(struct sock *sk) read_unlock_bh(&sk->sk_callback_lock); } +/* sock_write_space - interrupt callback after a sendmsg EAGAIN + * Indicates that there now is more is space in the send buffer + * The queued job is launched in tipc_send_to_sock() + */ static void sock_write_space(struct sock *sk) { struct tipc_conn *con; @@ -273,10 +280,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) return con; } -int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, - void *buf, size_t len) +static int tipc_con_rcv_sub(struct tipc_server *srv, + struct tipc_conn *con, + struct tipc_subscr *s) { - struct tipc_subscr *s = (struct tipc_subscr *)buf; struct tipc_subscription *sub; bool status; int swap; @@ -292,7 +299,7 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, return 0; } status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); - sub = tipc_subscrp_subscribe(net, s, conid, swap, status); + sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status); if (!sub) return -1; @@ -304,43 +311,27 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, static int tipc_receive_from_sock(struct tipc_conn *con) { - struct tipc_server *s = con->server; + struct tipc_server *srv = con->server; struct sock *sk = con->sock->sk; struct msghdr msg = {}; + struct tipc_subscr s; struct kvec iov; - void *buf; int ret; - buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); - if (!buf) { - ret = -ENOMEM; - goto out_close; - } - - iov.iov_base = buf; - iov.iov_len = s->max_rcvbuf_size; + iov.iov_base = &s; + iov.iov_len = sizeof(s); msg.msg_name = NULL; iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); - if (ret <= 0) { - kmem_cache_free(s->rcvbuf_cache, buf); - goto out_close; + if (ret == -EWOULDBLOCK) + return -EWOULDBLOCK; + if (ret > 0) { + read_lock_bh(&sk->sk_callback_lock); + ret = tipc_con_rcv_sub(srv, con, &s); + read_unlock_bh(&sk->sk_callback_lock); } - - read_lock_bh(&sk->sk_callback_lock); - ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret); - read_unlock_bh(&sk->sk_callback_lock); - kmem_cache_free(s->rcvbuf_cache, buf); if (ret < 0) - tipc_conn_terminate(s, con->conid); - return ret; - -out_close: - if (ret != -EWOULDBLOCK) tipc_close_conn(con); - else if (ret == 0) - /* Don't return success if we really got EOF */ - ret = -EAGAIN; return ret; } @@ -442,33 +433,6 @@ static int tipc_open_listening_sock(struct tipc_server *s) return 0; } -static struct outqueue_entry *tipc_alloc_entry(void *data, int len) -{ - struct outqueue_entry *entry; - void *buf; - - entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); - if (!entry) - return NULL; - - buf = kmemdup(data, len, GFP_ATOMIC); - if (!buf) { - kfree(entry); - return NULL; - } - - entry->iov.iov_base = buf; - entry->iov.iov_len = len; - - return entry; -} - -static void tipc_free_entry(struct outqueue_entry *e) -{ - kfree(e->iov.iov_base); - kfree(e); -} - static void tipc_clean_outqueues(struct tipc_conn *con) { struct outqueue_entry *e, *safe; @@ -476,50 +440,40 @@ static void tipc_clean_outqueues(struct tipc_conn *con) spin_lock_bh(&con->outqueue_lock); list_for_each_entry_safe(e, safe, &con->outqueue, list) { list_del(&e->list); - tipc_free_entry(e); + kfree(e); } spin_unlock_bh(&con->outqueue_lock); } -int tipc_conn_sendmsg(struct tipc_server *s, int conid, - u32 evt, void *data, size_t len) +/* tipc_conn_queue_evt - interrupt level call from a subscription instance + * The queued job is launched in tipc_send_to_sock() + */ +void tipc_conn_queue_evt(struct tipc_server *s, int conid, + u32 event, struct tipc_event *evt) { struct outqueue_entry *e; struct tipc_conn *con; con = tipc_conn_lookup(s, conid); if (!con) - return -EINVAL; + return; - if (!connected(con)) { - conn_put(con); - return 0; - } + if (!connected(con)) + goto err; - e = tipc_alloc_entry(data, len); - if (!e) { - conn_put(con); - return -ENOMEM; - } - e->evt = evt; + e = kmalloc(sizeof(*e), GFP_ATOMIC); + if (!e) + goto err; + e->inactive = (event == TIPC_SUBSCR_TIMEOUT); + memcpy(&e->evt, evt, sizeof(*evt)); spin_lock_bh(&con->outqueue_lock); list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); - if (!queue_work(s->send_wq, &con->swork)) - conn_put(con); - return 0; -} - -void tipc_conn_terminate(struct tipc_server *s, int conid) -{ - struct tipc_conn *con; - - con = tipc_conn_lookup(s, conid); - if (con) { - tipc_close_conn(con); - conn_put(con); - } + if (queue_work(s->send_wq, &con->swork)) + return; +err: + conn_put(con); } bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, @@ -542,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, *conid = con->conid; con->sock = NULL; - rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub)); + rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub); if (rc < 0) tipc_close_conn(con); return !rc; @@ -587,6 +541,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) struct outqueue_entry *e; struct tipc_event *evt; struct msghdr msg; + struct kvec iov; int count = 0; int ret; @@ -594,27 +549,28 @@ static void tipc_send_to_sock(struct tipc_conn *con) while (!list_empty(queue)) { e = list_first_entry(queue, struct outqueue_entry, list); - + evt = &e->evt; spin_unlock_bh(&con->outqueue_lock); - if (e->evt == TIPC_SUBSCR_TIMEOUT) { - evt = (struct tipc_event *)e->iov.iov_base; + if (e->inactive) tipc_con_delete_sub(con, &evt->s); - } + memset(&msg, 0, sizeof(msg)); msg.msg_flags = MSG_DONTWAIT; + iov.iov_base = evt; + iov.iov_len = sizeof(*evt); + msg.msg_name = NULL; if (con->sock) { - ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, - e->iov.iov_len); + ret = kernel_sendmsg(con->sock, &msg, &iov, + 1, sizeof(*evt)); if (ret == -EWOULDBLOCK || ret == 0) { cond_resched(); goto out; } else if (ret < 0) { - goto send_err; + goto err; } } else { - evt = e->iov.iov_base; tipc_send_kern_top_evt(srv->net, evt); } @@ -625,13 +581,12 @@ static void tipc_send_to_sock(struct tipc_conn *con) } spin_lock_bh(&con->outqueue_lock); list_del(&e->list); - tipc_free_entry(e); + kfree(e); } spin_unlock_bh(&con->outqueue_lock); out: return; - -send_err: +err: tipc_close_conn(con); } @@ -695,22 +650,14 @@ int tipc_server_start(struct tipc_server *s) idr_init(&s->conn_idr); s->idr_in_use = 0; - s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size, - 0, SLAB_HWCACHE_ALIGN, NULL); - if (!s->rcvbuf_cache) - return -ENOMEM; - ret = tipc_work_start(s); - if (ret < 0) { - kmem_cache_destroy(s->rcvbuf_cache); + if (ret < 0) return ret; - } + ret = tipc_open_listening_sock(s); - if (ret < 0) { + if (ret < 0) tipc_work_stop(s); - kmem_cache_destroy(s->rcvbuf_cache); - return ret; - } + return ret; } @@ -731,6 +678,5 @@ void tipc_server_stop(struct tipc_server *s) spin_unlock_bh(&s->idr_lock); tipc_work_stop(s); - kmem_cache_destroy(s->rcvbuf_cache); idr_destroy(&s->conn_idr); } diff --git a/net/tipc/server.h b/net/tipc/server.h index fcc72321362d..2de8709b467d 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -36,6 +36,7 @@ #ifndef _TIPC_SERVER_H #define _TIPC_SERVER_H +#include "core.h" #include #include #include @@ -68,7 +69,6 @@ struct tipc_server { spinlock_t idr_lock; int idr_in_use; struct net *net; - struct kmem_cache *rcvbuf_cache; struct workqueue_struct *rcv_wq; struct workqueue_struct *send_wq; int max_rcvbuf_size; @@ -76,19 +76,13 @@ struct tipc_server { char name[TIPC_SERVER_NAME_LEN]; }; -int tipc_conn_sendmsg(struct tipc_server *s, int conid, - u32 evt, void *data, size_t len); +void tipc_conn_queue_evt(struct tipc_server *s, int conid, + u32 event, struct tipc_event *evt); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); void tipc_topsrv_kern_unsubscr(struct net *net, int conid); -/** - * tipc_conn_terminate - terminate connection with server - * - * Note: Must call it in process context since it might sleep - */ -void tipc_conn_terminate(struct tipc_server *s, int conid); int tipc_server_start(struct tipc_server *s); void tipc_server_stop(struct tipc_server *s); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index c6de1452db69..c3e0b924e8c2 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -52,22 +52,19 @@ static u32 htohl(u32 in, int swap) static void tipc_subscrp_send_event(struct tipc_subscription *sub, u32 found_lower, u32 found_upper, - u32 event, u32 port_ref, u32 node) + u32 event, u32 port, u32 node) { - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct kvec msg_sect; + struct tipc_event *evt = &sub->evt; + bool swap = sub->swap; if (sub->inactive) return; - msg_sect.iov_base = (void *)&sub->evt; - msg_sect.iov_len = sizeof(struct tipc_event); - sub->evt.event = htohl(event, sub->swap); - sub->evt.found_lower = htohl(found_lower, sub->swap); - sub->evt.found_upper = htohl(found_upper, sub->swap); - sub->evt.port.ref = htohl(port_ref, sub->swap); - sub->evt.port.node = htohl(node, sub->swap); - tipc_conn_sendmsg(tn->topsrv, sub->conid, event, - msg_sect.iov_base, msg_sect.iov_len); + evt->event = htohl(event, swap); + evt->found_lower = htohl(found_lower, swap); + evt->found_upper = htohl(found_upper, swap); + evt->port.ref = htohl(port, swap); + evt->port.node = htohl(node, swap); + tipc_conn_queue_evt(sub->server, sub->conid, event, evt); } /** @@ -137,10 +134,11 @@ static void tipc_subscrp_timeout(struct timer_list *t) static void tipc_subscrp_kref_release(struct kref *kref) { - struct tipc_subscription *sub = container_of(kref, - struct tipc_subscription, - kref); - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); + struct tipc_subscription *sub; + struct tipc_net *tn; + + sub = container_of(kref, struct tipc_subscription, kref); + tn = tipc_net(sub->server->net); atomic_dec(&tn->subscription_count); kfree(sub); @@ -156,11 +154,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -static struct tipc_subscription *tipc_subscrp_create(struct net *net, +static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap) { - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; u32 filter = htohl(s->filter, swap); @@ -179,7 +177,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, } /* Initialize subscription object */ - sub->net = net; + sub->server = srv; sub->conid = conid; sub->inactive = false; if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || @@ -197,7 +195,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, return sub; } -struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, +struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap, bool status) @@ -205,7 +203,7 @@ struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, struct tipc_subscription *sub = NULL; u32 timeout; - sub = tipc_subscrp_create(net, s, conid, swap); + sub = tipc_subscrp_create(srv, s, conid, swap); if (!sub) return NULL; diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index cfd0cb3a1da8..64ffd32d15e6 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -49,7 +49,6 @@ struct tipc_conn; * struct tipc_subscription - TIPC network topology subscription object * @subscriber: pointer to its subscriber * @seq: name sequence associated with subscription - * @net: point to network namespace * @timer: timer governing subscription duration (optional) * @nameseq_list: adjacent subscriptions in name sequence's subscription list * @subscrp_list: adjacent subscriptions in subscriber's subscription list @@ -58,7 +57,7 @@ struct tipc_conn; */ struct tipc_subscription { struct kref kref; - struct net *net; + struct tipc_server *server; struct timer_list timer; struct list_head nameseq_list; struct list_head subscrp_list; @@ -69,7 +68,7 @@ struct tipc_subscription { spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, +struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap, bool status); -- cgit v1.2.3-70-g09d2 From 8985ecc7c1e07c42acc1e44ac56fa224f8a5c62f Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:46 +0100 Subject: tipc: simplify endianness handling in topology subscriber Because of the requirement for total distribution transparency, users send subscriptions and receive topology events in their own host format. It is up to the topology server to determine this format and do the correct conversions to and from its own host format when needed. Until now, this has been handled in a rather non-transparent way inside the topology server and subscriber code, leading to unnecessary complexity when creating subscriptions and issuing events. We now improve this situation by adding two new macros, tipc_sub_read() and tipc_evt_write(). Both those functions calculate the need for conversion internally before performing their respective operations. Hence, all handling of such conversions become transparent to the rest of the code. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/name_table.c | 42 +++++++++++++++----------- net/tipc/name_table.h | 2 +- net/tipc/server.c | 25 ++-------------- net/tipc/subscr.c | 83 +++++++++++++++++++-------------------------------- net/tipc/subscr.h | 36 ++++++++++++++++------ 5 files changed, 86 insertions(+), 102 deletions(-) (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index c0ca7be46f7a..2fbd0a20a958 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -412,18 +412,22 @@ found: * sequence overlapping with the requested sequence */ static void tipc_nameseq_subscribe(struct name_seq *nseq, - struct tipc_subscription *s, - bool status) + struct tipc_subscription *sub) { struct sub_seq *sseq = nseq->sseqs; struct tipc_name_seq ns; + struct tipc_subscr *s = &sub->evt.s; + bool no_status; - tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); + ns.type = tipc_sub_read(s, seq.type); + ns.lower = tipc_sub_read(s, seq.lower); + ns.upper = tipc_sub_read(s, seq.upper); + no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS; - tipc_subscrp_get(s); - list_add(&s->nameseq_list, &nseq->subscriptions); + tipc_subscrp_get(sub); + list_add(&sub->nameseq_list, &nseq->subscriptions); - if (!status || !sseq) + if (no_status || !sseq) return; while (sseq != &nseq->sseqs[nseq->first_free]) { @@ -433,7 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq, int must_report = 1; list_for_each_entry(crs, &info->zone_list, zone_list) { - tipc_subscrp_report_overlap(s, sseq->lower, + tipc_subscrp_report_overlap(sub, sseq->lower, sseq->upper, TIPC_PUBLISHED, crs->ref, crs->node, @@ -808,11 +812,12 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, /** * tipc_nametbl_subscribe - add a subscription object to the name table */ -void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) +void tipc_nametbl_subscribe(struct tipc_subscription *sub) { - struct tipc_server *srv = s->server; + struct tipc_server *srv = sub->server; struct tipc_net *tn = tipc_net(srv->net); - u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); + struct tipc_subscr *s = &sub->evt.s; + u32 type = tipc_sub_read(s, seq.type); int index = hash(type); struct name_seq *seq; struct tipc_name_seq ns; @@ -823,10 +828,12 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); if (seq) { spin_lock_bh(&seq->lock); - tipc_nameseq_subscribe(seq, s, status); + tipc_nameseq_subscribe(seq, sub); spin_unlock_bh(&seq->lock); } else { - tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); + ns.type = tipc_sub_read(s, seq.type); + ns.lower = tipc_sub_read(s, seq.lower); + ns.upper = tipc_sub_read(s, seq.upper); pr_warn("Failed to create subscription for {%u,%u,%u}\n", ns.type, ns.lower, ns.upper); } @@ -836,19 +843,20 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) /** * tipc_nametbl_unsubscribe - remove a subscription object from name table */ -void tipc_nametbl_unsubscribe(struct tipc_subscription *s) +void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) { - struct tipc_server *srv = s->server; + struct tipc_server *srv = sub->server; + struct tipc_subscr *s = &sub->evt.s; struct tipc_net *tn = tipc_net(srv->net); struct name_seq *seq; - u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); + u32 type = tipc_sub_read(s, seq.type); spin_lock_bh(&tn->nametbl_lock); seq = nametbl_find_seq(srv->net, type); if (seq != NULL) { spin_lock_bh(&seq->lock); - list_del_init(&s->nameseq_list); - tipc_subscrp_put(s); + list_del_init(&sub->nameseq_list); + tipc_subscrp_put(sub); if (!seq->first_free && list_empty(&seq->subscriptions)) { hlist_del_init_rcu(&seq->ns_list); kfree(seq->sseqs); diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index f56e7cb3d436..17652602d5e2 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -120,7 +120,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type, struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, u32 lower, u32 node, u32 ref, u32 key); -void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status); +void tipc_nametbl_subscribe(struct tipc_subscription *s); void tipc_nametbl_unsubscribe(struct tipc_subscription *s); int tipc_nametbl_init(struct net *net); void tipc_nametbl_stop(struct net *net); diff --git a/net/tipc/server.c b/net/tipc/server.c index 7933fb92d852..5d231fa8e4b5 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -98,18 +98,6 @@ static bool connected(struct tipc_conn *con) return con && test_bit(CF_CONNECTED, &con->flags); } -/** - * htohl - convert value to endianness used by destination - * @in: value to convert - * @swap: non-zero if endianness must be reversed - * - * Returns converted value - */ -static u32 htohl(u32 in, int swap) -{ - return swap ? swab32(in) : in; -} - static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); @@ -285,21 +273,12 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, struct tipc_subscr *s) { struct tipc_subscription *sub; - bool status; - int swap; - - /* Determine subscriber's endianness */ - swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | - TIPC_SUB_CANCEL)); - /* Detect & process a subscription cancellation request */ - if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { - s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); + if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { tipc_con_delete_sub(con, s); return 0; } - status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); - sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status); + sub = tipc_subscrp_subscribe(srv, s, con->conid); if (!sub) return -1; diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index c3e0b924e8c2..406b09fc227b 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -38,32 +38,19 @@ #include "name_table.h" #include "subscr.h" -/** - * htohl - convert value to endianness used by destination - * @in: value to convert - * @swap: non-zero if endianness must be reversed - * - * Returns converted value - */ -static u32 htohl(u32 in, int swap) -{ - return swap ? swab32(in) : in; -} - static void tipc_subscrp_send_event(struct tipc_subscription *sub, u32 found_lower, u32 found_upper, u32 event, u32 port, u32 node) { struct tipc_event *evt = &sub->evt; - bool swap = sub->swap; if (sub->inactive) return; - evt->event = htohl(event, swap); - evt->found_lower = htohl(found_lower, swap); - evt->found_upper = htohl(found_upper, swap); - evt->port.ref = htohl(port, swap); - evt->port.node = htohl(node, swap); + tipc_evt_write(evt, event, event); + tipc_evt_write(evt, found_lower, found_lower); + tipc_evt_write(evt, found_upper, found_upper); + tipc_evt_write(evt, port.ref, port); + tipc_evt_write(evt, port.node, node); tipc_conn_queue_evt(sub->server, sub->conid, event, evt); } @@ -85,29 +72,22 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, return 1; } -u32 tipc_subscrp_convert_seq_type(u32 type, int swap) +void tipc_subscrp_report_overlap(struct tipc_subscription *sub, + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node, + u32 scope, int must) { - return htohl(type, swap); -} - -void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, - struct tipc_name_seq *out) -{ - out->type = htohl(in->type, swap); - out->lower = htohl(in->lower, swap); - out->upper = htohl(in->upper, swap); -} - -void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, - u32 found_upper, u32 event, u32 port_ref, - u32 node, u32 scope, int must) -{ - u32 filter = htohl(sub->evt.s.filter, sub->swap); struct tipc_name_seq seq; + struct tipc_subscr *s = &sub->evt.s; + u32 filter = tipc_sub_read(s, filter); + + seq.type = tipc_sub_read(s, seq.type); + seq.lower = tipc_sub_read(s, seq.lower); + seq.upper = tipc_sub_read(s, seq.upper); - tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq); if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) return; + if (!must && !(filter & TIPC_SUB_PORTS)) return; if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE) @@ -116,7 +96,7 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, return; spin_lock(&sub->lock); tipc_subscrp_send_event(sub, found_lower, found_upper, - event, port_ref, node); + event, port, node); spin_unlock(&sub->lock); } @@ -156,11 +136,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, struct tipc_subscr *s, - int conid, bool swap) + int conid) { struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; - u32 filter = htohl(s->filter, swap); + u32 filter = tipc_sub_read(s, filter); /* Refuse subscription if global limit exceeded */ if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { @@ -177,39 +157,38 @@ static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, } /* Initialize subscription object */ + if (filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) + goto err; + if (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper)) + goto err; sub->server = srv; sub->conid = conid; sub->inactive = false; - if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || - (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { - pr_warn("Subscription rejected, illegal request\n"); - kfree(sub); - return NULL; - } - - sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(*s)); spin_lock_init(&sub->lock); atomic_inc(&tn->subscription_count); kref_init(&sub->kref); return sub; +err: + pr_warn("Subscription rejected, illegal request\n"); + kfree(sub); + return NULL; } struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, - int conid, bool swap, - bool status) + int conid) { struct tipc_subscription *sub = NULL; u32 timeout; - sub = tipc_subscrp_create(srv, s, conid, swap); + sub = tipc_subscrp_create(srv, s, conid); if (!sub) return NULL; - tipc_nametbl_subscribe(sub, status); + tipc_nametbl_subscribe(sub); timer_setup(&sub->timer, tipc_subscrp_timeout, 0); - timeout = htohl(sub->evt.s.timeout, swap); + timeout = tipc_sub_read(&sub->evt.s, timeout); if (timeout != TIPC_WAIT_FOREVER) mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); return sub; diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 64ffd32d15e6..db80e41bba08 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -52,7 +52,6 @@ struct tipc_conn; * @timer: timer governing subscription duration (optional) * @nameseq_list: adjacent subscriptions in name sequence's subscription list * @subscrp_list: adjacent subscriptions in subscriber's subscription list - * @swap: indicates if subscriber uses opposite endianness in its messages * @evt: template for events generated by subscription */ struct tipc_subscription { @@ -63,28 +62,47 @@ struct tipc_subscription { struct list_head subscrp_list; struct tipc_event evt; int conid; - bool swap; bool inactive; spinlock_t lock; /* serialize up/down and timer events */ }; struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, - int conid, bool swap, - bool status); + int conid); void tipc_sub_delete(struct tipc_subscription *sub); + int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); void tipc_subscrp_report_overlap(struct tipc_subscription *sub, - u32 found_lower, u32 found_upper, u32 event, - u32 port_ref, u32 node, u32 scope, int must); -void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, - struct tipc_name_seq *out); -u32 tipc_subscrp_convert_seq_type(u32 type, int swap); + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node, + u32 scope, int must); int tipc_topsrv_start(struct net *net); void tipc_topsrv_stop(struct net *net); void tipc_subscrp_put(struct tipc_subscription *subscription); void tipc_subscrp_get(struct tipc_subscription *subscription); +#define TIPC_FILTER_MASK (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | TIPC_SUB_CANCEL) + +/* tipc_sub_read - return field_ of struct sub_ in host endian format + */ +#define tipc_sub_read(sub_, field_) \ + ({ \ + struct tipc_subscr *sub__ = sub_; \ + u32 val__ = (sub__)->field_; \ + int swap_ = !((sub__)->filter & TIPC_FILTER_MASK); \ + (swap_ ? swab32(val__) : val__); \ + }) + +/* tipc_evt_write - write val_ to field_ of struct evt_ in user endian format + */ +#define tipc_evt_write(evt_, field_, val_) \ + ({ \ + struct tipc_event *evt__ = evt_; \ + u32 val__ = val_; \ + int swap_ = !((evt__)->s.filter & (TIPC_FILTER_MASK)); \ + (evt__)->field_ = swap_ ? swab32(val__) : val__; \ + }) + #endif -- cgit v1.2.3-70-g09d2 From 242e82cc95f6b4e83e1771f9915edcb2a63708e1 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:47 +0100 Subject: tipc: collapse subscription creation functions After the previous changes it becomes logical to collapse the two-level creation of subscription instances into one. We do that here. We also rename the creation and deletion functions for more consistency. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/server.c | 4 ++-- net/tipc/server.h | 1 + net/tipc/subscr.c | 46 ++++++++++++---------------------------------- net/tipc/subscr.h | 14 +++++++------- 4 files changed, 22 insertions(+), 43 deletions(-) (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/server.c b/net/tipc/server.c index 5d231fa8e4b5..6a18b10ac271 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -203,7 +203,7 @@ static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) spin_lock_bh(&con->sub_lock); list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) { if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) - tipc_sub_delete(sub); + tipc_sub_unsubscribe(sub); else if (s) break; } @@ -278,7 +278,7 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, tipc_con_delete_sub(con, s); return 0; } - sub = tipc_subscrp_subscribe(srv, s, con->conid); + sub = tipc_sub_subscribe(srv, s, con->conid); if (!sub) return -1; diff --git a/net/tipc/server.h b/net/tipc/server.h index 2de8709b467d..995b79591ffe 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -2,6 +2,7 @@ * net/tipc/server.h: Include file for TIPC server code * * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017, Ericsson AB * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 406b09fc227b..8d37b61e3163 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -134,33 +134,29 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, - struct tipc_subscr *s, - int conid) +struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, + struct tipc_subscr *s, + int conid) { struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; u32 filter = tipc_sub_read(s, filter); + u32 timeout; - /* Refuse subscription if global limit exceeded */ - if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { - pr_warn("Subscription rejected, limit reached (%u)\n", - TIPC_MAX_SUBSCRIPTIONS); + if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { + pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); + return NULL; + } + if ((filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) || + (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper))) { + pr_warn("Subscription rejected, illegal request\n"); return NULL; } - - /* Allocate subscription object */ sub = kmalloc(sizeof(*sub), GFP_ATOMIC); if (!sub) { pr_warn("Subscription rejected, no memory\n"); return NULL; } - - /* Initialize subscription object */ - if (filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) - goto err; - if (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper)) - goto err; sub->server = srv; sub->conid = conid; sub->inactive = false; @@ -168,24 +164,6 @@ static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, spin_lock_init(&sub->lock); atomic_inc(&tn->subscription_count); kref_init(&sub->kref); - return sub; -err: - pr_warn("Subscription rejected, illegal request\n"); - kfree(sub); - return NULL; -} - -struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, - struct tipc_subscr *s, - int conid) -{ - struct tipc_subscription *sub = NULL; - u32 timeout; - - sub = tipc_subscrp_create(srv, s, conid); - if (!sub) - return NULL; - tipc_nametbl_subscribe(sub); timer_setup(&sub->timer, tipc_subscrp_timeout, 0); timeout = tipc_sub_read(&sub->evt.s, timeout); @@ -194,7 +172,7 @@ struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, return sub; } -void tipc_sub_delete(struct tipc_subscription *sub) +void tipc_sub_unsubscribe(struct tipc_subscription *sub) { tipc_nametbl_unsubscribe(sub); if (sub->evt.s.timeout != TIPC_WAIT_FOREVER) diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index db80e41bba08..2d35f1046754 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -1,7 +1,7 @@ /* * net/tipc/subscr.h: Include file for TIPC network topology service * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2017, Ericsson AB * Copyright (c) 2005-2007, 2012-2013, Wind River Systems * All rights reserved. * @@ -39,8 +39,8 @@ #include "server.h" -#define TIPC_MAX_SUBSCRIPTIONS 65535 -#define TIPC_MAX_PUBLICATIONS 65535 +#define TIPC_MAX_SUBSCR 65535 +#define TIPC_MAX_PUBLICATIONS 65535 struct tipc_subscription; struct tipc_conn; @@ -66,10 +66,10 @@ struct tipc_subscription { spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, - struct tipc_subscr *s, - int conid); -void tipc_sub_delete(struct tipc_subscription *sub); +struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, + struct tipc_subscr *s, + int conid); +void tipc_sub_unsubscribe(struct tipc_subscription *sub); int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); -- cgit v1.2.3-70-g09d2 From da0a75e86ae230f92743c073843d3ea35bd061af Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:48 +0100 Subject: tipc: some prefix changes Since we now have removed struct tipc_subscriber from the code, and only struct tipc_subscription remains, there is no longer need for long and awkward prefixes to distinguish between their pertaining functions. We now change all tipc_subscrp_* prefixes to tipc_sub_*. This is a purely cosmetic change. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/name_table.c | 33 ++++++++++++++++---------------- net/tipc/server.c | 5 ++--- net/tipc/subscr.c | 52 +++++++++++++++++++++++++-------------------------- net/tipc/subscr.h | 20 ++++++++++---------- 4 files changed, 54 insertions(+), 56 deletions(-) (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 2fbd0a20a958..b234b7ee0b84 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -326,10 +326,10 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net, /* Any subscriptions waiting for notification? */ list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { - tipc_subscrp_report_overlap(s, publ->lower, publ->upper, - TIPC_PUBLISHED, publ->ref, - publ->node, publ->scope, - created_subseq); + tipc_sub_report_overlap(s, publ->lower, publ->upper, + TIPC_PUBLISHED, publ->ref, + publ->node, publ->scope, + created_subseq); } return publ; } @@ -397,10 +397,9 @@ found: /* Notify any waiting subscriptions */ list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { - tipc_subscrp_report_overlap(s, publ->lower, publ->upper, - TIPC_WITHDRAWN, publ->ref, - publ->node, publ->scope, - removed_subseq); + tipc_sub_report_overlap(s, publ->lower, publ->upper, + TIPC_WITHDRAWN, publ->ref, publ->node, + publ->scope, removed_subseq); } return publ; @@ -424,25 +423,25 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq, ns.upper = tipc_sub_read(s, seq.upper); no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS; - tipc_subscrp_get(sub); + tipc_sub_get(sub); list_add(&sub->nameseq_list, &nseq->subscriptions); if (no_status || !sseq) return; while (sseq != &nseq->sseqs[nseq->first_free]) { - if (tipc_subscrp_check_overlap(&ns, sseq->lower, sseq->upper)) { + if (tipc_sub_check_overlap(&ns, sseq->lower, sseq->upper)) { struct publication *crs; struct name_info *info = sseq->info; int must_report = 1; list_for_each_entry(crs, &info->zone_list, zone_list) { - tipc_subscrp_report_overlap(sub, sseq->lower, - sseq->upper, - TIPC_PUBLISHED, - crs->ref, crs->node, - crs->scope, - must_report); + tipc_sub_report_overlap(sub, sseq->lower, + sseq->upper, + TIPC_PUBLISHED, + crs->ref, crs->node, + crs->scope, + must_report); must_report = 0; } } @@ -856,7 +855,7 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&sub->nameseq_list); - tipc_subscrp_put(sub); + tipc_sub_put(sub); if (!seq->first_free && list_empty(&seq->subscriptions)) { hlist_del_init_rcu(&seq->ns_list); kfree(seq->sseqs); diff --git a/net/tipc/server.c b/net/tipc/server.c index 6a18b10ac271..a5c112ed3bbe 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -201,7 +201,7 @@ static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) struct tipc_subscription *sub, *tmp; spin_lock_bh(&con->sub_lock); - list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) { + list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) tipc_sub_unsubscribe(sub); else if (s) @@ -281,9 +281,8 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, sub = tipc_sub_subscribe(srv, s, con->conid); if (!sub) return -1; - spin_lock_bh(&con->sub_lock); - list_add(&sub->subscrp_list, &con->sub_list); + list_add(&sub->sub_list, &con->sub_list); spin_unlock_bh(&con->sub_lock); return 0; } diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 8d37b61e3163..3be1e4b6cbfa 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -38,9 +38,9 @@ #include "name_table.h" #include "subscr.h" -static void tipc_subscrp_send_event(struct tipc_subscription *sub, - u32 found_lower, u32 found_upper, - u32 event, u32 port, u32 node) +static void tipc_sub_send_event(struct tipc_subscription *sub, + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node) { struct tipc_event *evt = &sub->evt; @@ -55,13 +55,13 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, } /** - * tipc_subscrp_check_overlap - test for subscription overlap with the + * tipc_sub_check_overlap - test for subscription overlap with the * given values * * Returns 1 if there is overlap, otherwise 0. */ -int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, - u32 found_upper) +int tipc_sub_check_overlap(struct tipc_name_seq *seq, u32 found_lower, + u32 found_upper) { if (found_lower < seq->lower) found_lower = seq->lower; @@ -72,20 +72,20 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, return 1; } -void tipc_subscrp_report_overlap(struct tipc_subscription *sub, - u32 found_lower, u32 found_upper, - u32 event, u32 port, u32 node, - u32 scope, int must) +void tipc_sub_report_overlap(struct tipc_subscription *sub, + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node, + u32 scope, int must) { - struct tipc_name_seq seq; struct tipc_subscr *s = &sub->evt.s; u32 filter = tipc_sub_read(s, filter); + struct tipc_name_seq seq; seq.type = tipc_sub_read(s, seq.type); seq.lower = tipc_sub_read(s, seq.lower); seq.upper = tipc_sub_read(s, seq.upper); - if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) + if (!tipc_sub_check_overlap(&seq, found_lower, found_upper)) return; if (!must && !(filter & TIPC_SUB_PORTS)) @@ -95,24 +95,24 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) return; spin_lock(&sub->lock); - tipc_subscrp_send_event(sub, found_lower, found_upper, - event, port, node); + tipc_sub_send_event(sub, found_lower, found_upper, + event, port, node); spin_unlock(&sub->lock); } -static void tipc_subscrp_timeout(struct timer_list *t) +static void tipc_sub_timeout(struct timer_list *t) { struct tipc_subscription *sub = from_timer(sub, t, timer); struct tipc_subscr *s = &sub->evt.s; spin_lock(&sub->lock); - tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper, - TIPC_SUBSCR_TIMEOUT, 0, 0); + tipc_sub_send_event(sub, s->seq.lower, s->seq.upper, + TIPC_SUBSCR_TIMEOUT, 0, 0); sub->inactive = true; spin_unlock(&sub->lock); } -static void tipc_subscrp_kref_release(struct kref *kref) +static void tipc_sub_kref_release(struct kref *kref) { struct tipc_subscription *sub; struct tipc_net *tn; @@ -124,12 +124,12 @@ static void tipc_subscrp_kref_release(struct kref *kref) kfree(sub); } -void tipc_subscrp_put(struct tipc_subscription *subscription) +void tipc_sub_put(struct tipc_subscription *subscription) { - kref_put(&subscription->kref, tipc_subscrp_kref_release); + kref_put(&subscription->kref, tipc_sub_kref_release); } -void tipc_subscrp_get(struct tipc_subscription *subscription) +void tipc_sub_get(struct tipc_subscription *subscription) { kref_get(&subscription->kref); } @@ -139,8 +139,8 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, int conid) { struct tipc_net *tn = tipc_net(srv->net); - struct tipc_subscription *sub; u32 filter = tipc_sub_read(s, filter); + struct tipc_subscription *sub; u32 timeout; if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { @@ -165,7 +165,7 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, atomic_inc(&tn->subscription_count); kref_init(&sub->kref); tipc_nametbl_subscribe(sub); - timer_setup(&sub->timer, tipc_subscrp_timeout, 0); + timer_setup(&sub->timer, tipc_sub_timeout, 0); timeout = tipc_sub_read(&sub->evt.s, timeout); if (timeout != TIPC_WAIT_FOREVER) mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); @@ -177,16 +177,16 @@ void tipc_sub_unsubscribe(struct tipc_subscription *sub) tipc_nametbl_unsubscribe(sub); if (sub->evt.s.timeout != TIPC_WAIT_FOREVER) del_timer_sync(&sub->timer); - list_del(&sub->subscrp_list); - tipc_subscrp_put(sub); + list_del(&sub->sub_list); + tipc_sub_put(sub); } int tipc_topsrv_start(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); const char name[] = "topology_server"; - struct tipc_server *topsrv; struct sockaddr_tipc *saddr; + struct tipc_server *topsrv; saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); if (!saddr) diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 2d35f1046754..720932896ba6 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -51,7 +51,7 @@ struct tipc_conn; * @seq: name sequence associated with subscription * @timer: timer governing subscription duration (optional) * @nameseq_list: adjacent subscriptions in name sequence's subscription list - * @subscrp_list: adjacent subscriptions in subscriber's subscription list + * @sub_list: adjacent subscriptions in subscriber's subscription list * @evt: template for events generated by subscription */ struct tipc_subscription { @@ -59,7 +59,7 @@ struct tipc_subscription { struct tipc_server *server; struct timer_list timer; struct list_head nameseq_list; - struct list_head subscrp_list; + struct list_head sub_list; struct tipc_event evt; int conid; bool inactive; @@ -71,17 +71,17 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, int conid); void tipc_sub_unsubscribe(struct tipc_subscription *sub); -int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, - u32 found_upper); -void tipc_subscrp_report_overlap(struct tipc_subscription *sub, - u32 found_lower, u32 found_upper, - u32 event, u32 port, u32 node, - u32 scope, int must); +int tipc_sub_check_overlap(struct tipc_name_seq *seq, u32 found_lower, + u32 found_upper); +void tipc_sub_report_overlap(struct tipc_subscription *sub, + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node, + u32 scope, int must); int tipc_topsrv_start(struct net *net); void tipc_topsrv_stop(struct net *net); -void tipc_subscrp_put(struct tipc_subscription *subscription); -void tipc_subscrp_get(struct tipc_subscription *subscription); +void tipc_sub_put(struct tipc_subscription *subscription); +void tipc_sub_get(struct tipc_subscription *subscription); #define TIPC_FILTER_MASK (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | TIPC_SUB_CANCEL) -- cgit v1.2.3-70-g09d2 From 5c45ab24ac77ea32eae7d3576cf37c3ddb259f80 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:49 +0100 Subject: tipc: make struct tipc_server private for server.c In order to narrow the interface and dependencies between the topology server and the subscription/binding table functionality we move struct tipc_server inside the file server.c. This requires some code adaptations in other files, but those are mostly minor. The most important change is that we have to move the start/stop functions for the topology server to server.c, where they logically belong anyway. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/name_table.c | 10 ++-- net/tipc/server.c | 124 ++++++++++++++++++++++++++++++++++++++++---------- net/tipc/server.h | 36 +-------------- net/tipc/subscr.c | 64 ++------------------------ net/tipc/subscr.h | 4 +- 5 files changed, 110 insertions(+), 128 deletions(-) (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index b234b7ee0b84..e01c9c691ba2 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -813,8 +813,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, */ void tipc_nametbl_subscribe(struct tipc_subscription *sub) { - struct tipc_server *srv = sub->server; - struct tipc_net *tn = tipc_net(srv->net); + struct tipc_net *tn = tipc_net(sub->net); struct tipc_subscr *s = &sub->evt.s; u32 type = tipc_sub_read(s, seq.type); int index = hash(type); @@ -822,7 +821,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) struct tipc_name_seq ns; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(srv->net, type); + seq = nametbl_find_seq(sub->net, type); if (!seq) seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); if (seq) { @@ -844,14 +843,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) */ void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) { - struct tipc_server *srv = sub->server; struct tipc_subscr *s = &sub->evt.s; - struct tipc_net *tn = tipc_net(srv->net); + struct tipc_net *tn = tipc_net(sub->net); struct name_seq *seq; u32 type = tipc_sub_read(s, seq.type); spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(srv->net, type); + seq = nametbl_find_seq(sub->net, type); if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&sub->nameseq_list); diff --git a/net/tipc/server.c b/net/tipc/server.c index a5c112ed3bbe..0abbdd698662 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -49,7 +49,37 @@ #define CF_CONNECTED 1 #define CF_SERVER 2 -#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) +#define TIPC_SERVER_NAME_LEN 32 + +/** + * struct tipc_server - TIPC server structure + * @conn_idr: identifier set of connection + * @idr_lock: protect the connection identifier set + * @idr_in_use: amount of allocated identifier entry + * @net: network namspace instance + * @rcvbuf_cache: memory cache of server receive buffer + * @rcv_wq: receive workqueue + * @send_wq: send workqueue + * @max_rcvbuf_size: maximum permitted receive message length + * @tipc_conn_new: callback will be called when new connection is incoming + * @tipc_conn_release: callback will be called before releasing the connection + * @tipc_conn_recvmsg: callback will be called when message arrives + * @saddr: TIPC server address + * @name: server name + * @imp: message importance + * @type: socket type + */ +struct tipc_server { + struct idr conn_idr; + spinlock_t idr_lock; /* for idr list */ + int idr_in_use; + struct net *net; + struct workqueue_struct *rcv_wq; + struct workqueue_struct *send_wq; + int max_rcvbuf_size; + struct sockaddr_tipc *saddr; + char name[TIPC_SERVER_NAME_LEN]; +}; /** * struct tipc_conn - TIPC connection structure @@ -93,6 +123,11 @@ static void tipc_recv_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work); static void tipc_clean_outqueues(struct tipc_conn *con); +static struct tipc_conn *sock2con(struct sock *sk) +{ + return sk->sk_user_data; +} + static bool connected(struct tipc_conn *con) { return con && test_bit(CF_CONNECTED, &con->flags); @@ -198,14 +233,17 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) { struct list_head *sub_list = &con->sub_list; + struct tipc_net *tn = tipc_net(con->server->net); struct tipc_subscription *sub, *tmp; spin_lock_bh(&con->sub_lock); list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { - if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) + if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) { tipc_sub_unsubscribe(sub); - else if (s) + atomic_dec(&tn->subscription_count); + } else if (s) { break; + } } spin_unlock_bh(&con->sub_lock); } @@ -220,8 +258,7 @@ static void tipc_close_conn(struct tipc_conn *con) if (disconnect) { sk->sk_user_data = NULL; - if (con->conid) - tipc_con_delete_sub(con, NULL); + tipc_con_delete_sub(con, NULL); } write_unlock_bh(&sk->sk_callback_lock); @@ -272,15 +309,21 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, struct tipc_conn *con, struct tipc_subscr *s) { + struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { tipc_con_delete_sub(con, s); return 0; } - sub = tipc_sub_subscribe(srv, s, con->conid); + if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { + pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); + return -1; + } + sub = tipc_sub_subscribe(srv->net, s, con->conid); if (!sub) return -1; + atomic_inc(&tn->subscription_count); spin_lock_bh(&con->sub_lock); list_add(&sub->sub_list, &con->sub_list); spin_unlock_bh(&con->sub_lock); @@ -426,13 +469,14 @@ static void tipc_clean_outqueues(struct tipc_conn *con) /* tipc_conn_queue_evt - interrupt level call from a subscription instance * The queued job is launched in tipc_send_to_sock() */ -void tipc_conn_queue_evt(struct tipc_server *s, int conid, +void tipc_conn_queue_evt(struct net *net, int conid, u32 event, struct tipc_event *evt) { + struct tipc_server *srv = tipc_topsrv(net); struct outqueue_entry *e; struct tipc_conn *con; - con = tipc_conn_lookup(s, conid); + con = tipc_conn_lookup(srv, conid); if (!con) return; @@ -448,7 +492,7 @@ void tipc_conn_queue_evt(struct tipc_server *s, int conid, list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); - if (queue_work(s->send_wq, &con->swork)) + if (queue_work(srv->send_wq, &con->swork)) return; err: conn_put(con); @@ -620,41 +664,71 @@ static int tipc_work_start(struct tipc_server *s) return 0; } -int tipc_server_start(struct tipc_server *s) +int tipc_topsrv_start(struct net *net) { + struct tipc_net *tn = tipc_net(net); + const char name[] = "topology_server"; + struct sockaddr_tipc *saddr; + struct tipc_server *srv; int ret; - spin_lock_init(&s->idr_lock); - idr_init(&s->conn_idr); - s->idr_in_use = 0; + saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); + if (!saddr) + return -ENOMEM; + saddr->family = AF_TIPC; + saddr->addrtype = TIPC_ADDR_NAMESEQ; + saddr->addr.nameseq.type = TIPC_TOP_SRV; + saddr->addr.nameseq.lower = TIPC_TOP_SRV; + saddr->addr.nameseq.upper = TIPC_TOP_SRV; + saddr->scope = TIPC_NODE_SCOPE; + + srv = kzalloc(sizeof(*srv), GFP_ATOMIC); + if (!srv) { + kfree(saddr); + return -ENOMEM; + } + srv->net = net; + srv->saddr = saddr; + srv->max_rcvbuf_size = sizeof(struct tipc_subscr); + + strncpy(srv->name, name, strlen(name) + 1); + tn->topsrv = srv; + atomic_set(&tn->subscription_count, 0); + + spin_lock_init(&srv->idr_lock); + idr_init(&srv->conn_idr); + srv->idr_in_use = 0; - ret = tipc_work_start(s); + ret = tipc_work_start(srv); if (ret < 0) return ret; - ret = tipc_open_listening_sock(s); + ret = tipc_open_listening_sock(srv); if (ret < 0) - tipc_work_stop(s); + tipc_work_stop(srv); return ret; } -void tipc_server_stop(struct tipc_server *s) +void tipc_topsrv_stop(struct net *net) { + struct tipc_server *srv = tipc_topsrv(net); struct tipc_conn *con; int id; - spin_lock_bh(&s->idr_lock); - for (id = 0; s->idr_in_use; id++) { - con = idr_find(&s->conn_idr, id); + spin_lock_bh(&srv->idr_lock); + for (id = 0; srv->idr_in_use; id++) { + con = idr_find(&srv->conn_idr, id); if (con) { - spin_unlock_bh(&s->idr_lock); + spin_unlock_bh(&srv->idr_lock); tipc_close_conn(con); - spin_lock_bh(&s->idr_lock); + spin_lock_bh(&srv->idr_lock); } } - spin_unlock_bh(&s->idr_lock); + spin_unlock_bh(&srv->idr_lock); - tipc_work_stop(s); - idr_destroy(&s->conn_idr); + tipc_work_stop(srv); + idr_destroy(&srv->conn_idr); + kfree(srv->saddr); + kfree(srv); } diff --git a/net/tipc/server.h b/net/tipc/server.h index 995b79591ffe..ce93b6d68e41 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -47,45 +47,11 @@ #define TIPC_SUB_NODE_SCOPE 0x40 #define TIPC_SUB_NO_STATUS 0x80 -/** - * struct tipc_server - TIPC server structure - * @conn_idr: identifier set of connection - * @idr_lock: protect the connection identifier set - * @idr_in_use: amount of allocated identifier entry - * @net: network namspace instance - * @rcvbuf_cache: memory cache of server receive buffer - * @rcv_wq: receive workqueue - * @send_wq: send workqueue - * @max_rcvbuf_size: maximum permitted receive message length - * @tipc_conn_new: callback will be called when new connection is incoming - * @tipc_conn_release: callback will be called before releasing the connection - * @tipc_conn_recvmsg: callback will be called when message arrives - * @saddr: TIPC server address - * @name: server name - * @imp: message importance - * @type: socket type - */ -struct tipc_server { - struct idr conn_idr; - spinlock_t idr_lock; - int idr_in_use; - struct net *net; - struct workqueue_struct *rcv_wq; - struct workqueue_struct *send_wq; - int max_rcvbuf_size; - struct sockaddr_tipc *saddr; - char name[TIPC_SERVER_NAME_LEN]; -}; - -void tipc_conn_queue_evt(struct tipc_server *s, int conid, +void tipc_conn_queue_evt(struct net *net, int conid, u32 event, struct tipc_event *evt); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); void tipc_topsrv_kern_unsubscr(struct net *net, int conid); -int tipc_server_start(struct tipc_server *s); - -void tipc_server_stop(struct tipc_server *s); - #endif diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 3be1e4b6cbfa..c8146568d04e 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -51,7 +51,7 @@ static void tipc_sub_send_event(struct tipc_subscription *sub, tipc_evt_write(evt, found_upper, found_upper); tipc_evt_write(evt, port.ref, port); tipc_evt_write(evt, port.node, node); - tipc_conn_queue_evt(sub->server, sub->conid, event, evt); + tipc_conn_queue_evt(sub->net, sub->conid, event, evt); } /** @@ -114,14 +114,7 @@ static void tipc_sub_timeout(struct timer_list *t) static void tipc_sub_kref_release(struct kref *kref) { - struct tipc_subscription *sub; - struct tipc_net *tn; - - sub = container_of(kref, struct tipc_subscription, kref); - tn = tipc_net(sub->server->net); - - atomic_dec(&tn->subscription_count); - kfree(sub); + kfree(container_of(kref, struct tipc_subscription, kref)); } void tipc_sub_put(struct tipc_subscription *subscription) @@ -134,19 +127,14 @@ void tipc_sub_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, +struct tipc_subscription *tipc_sub_subscribe(struct net *net, struct tipc_subscr *s, int conid) { - struct tipc_net *tn = tipc_net(srv->net); u32 filter = tipc_sub_read(s, filter); struct tipc_subscription *sub; u32 timeout; - if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { - pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); - return NULL; - } if ((filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) || (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper))) { pr_warn("Subscription rejected, illegal request\n"); @@ -157,12 +145,11 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, pr_warn("Subscription rejected, no memory\n"); return NULL; } - sub->server = srv; + sub->net = net; sub->conid = conid; sub->inactive = false; memcpy(&sub->evt.s, s, sizeof(*s)); spin_lock_init(&sub->lock); - atomic_inc(&tn->subscription_count); kref_init(&sub->kref); tipc_nametbl_subscribe(sub); timer_setup(&sub->timer, tipc_sub_timeout, 0); @@ -180,46 +167,3 @@ void tipc_sub_unsubscribe(struct tipc_subscription *sub) list_del(&sub->sub_list); tipc_sub_put(sub); } - -int tipc_topsrv_start(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - const char name[] = "topology_server"; - struct sockaddr_tipc *saddr; - struct tipc_server *topsrv; - - saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); - if (!saddr) - return -ENOMEM; - saddr->family = AF_TIPC; - saddr->addrtype = TIPC_ADDR_NAMESEQ; - saddr->addr.nameseq.type = TIPC_TOP_SRV; - saddr->addr.nameseq.lower = TIPC_TOP_SRV; - saddr->addr.nameseq.upper = TIPC_TOP_SRV; - saddr->scope = TIPC_NODE_SCOPE; - - topsrv = kzalloc(sizeof(*topsrv), GFP_ATOMIC); - if (!topsrv) { - kfree(saddr); - return -ENOMEM; - } - topsrv->net = net; - topsrv->saddr = saddr; - topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr); - - strncpy(topsrv->name, name, strlen(name) + 1); - tn->topsrv = topsrv; - atomic_set(&tn->subscription_count, 0); - - return tipc_server_start(topsrv); -} - -void tipc_topsrv_stop(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_server *topsrv = tn->topsrv; - - tipc_server_stop(topsrv); - kfree(topsrv->saddr); - kfree(topsrv); -} diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 720932896ba6..82ba61afe638 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -56,7 +56,7 @@ struct tipc_conn; */ struct tipc_subscription { struct kref kref; - struct tipc_server *server; + struct net *net; struct timer_list timer; struct list_head nameseq_list; struct list_head sub_list; @@ -66,7 +66,7 @@ struct tipc_subscription { spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, +struct tipc_subscription *tipc_sub_subscribe(struct net *net, struct tipc_subscr *s, int conid); void tipc_sub_unsubscribe(struct tipc_subscription *sub); -- cgit v1.2.3-70-g09d2 From 026321c6d056a54b4145522492245d2b5913ee1d Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:51 +0100 Subject: tipc: rename tipc_server to tipc_topsrv We rename struct tipc_server to struct tipc_topsrv. This reflect its now specialized role as topology server. Accoringly, we change or add function prefixes to make it clearer which functionality those belong to. There are no functional changes in this commit. Acked-by: Ying.Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/Makefile | 2 +- net/tipc/core.h | 6 +- net/tipc/group.c | 2 +- net/tipc/server.c | 700 ----------------------------------------------------- net/tipc/server.h | 57 ----- net/tipc/subscr.c | 2 +- net/tipc/subscr.h | 2 +- net/tipc/topsrv.c | 702 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ net/tipc/topsrv.h | 54 +++++ 9 files changed, 763 insertions(+), 764 deletions(-) delete mode 100644 net/tipc/server.c delete mode 100644 net/tipc/server.h create mode 100644 net/tipc/topsrv.c create mode 100644 net/tipc/topsrv.h (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 37bb0bfbd936..1edb7192aa2f 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -9,7 +9,7 @@ tipc-y += addr.o bcast.o bearer.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o monitor.o name_table.o net.o \ netlink.o netlink_compat.o node.o socket.o eth_media.o \ - server.o socket.o group.o + topsrv.o socket.o group.o tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o diff --git a/net/tipc/core.h b/net/tipc/core.h index 20b21af2ff14..ff8b071654f5 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -64,7 +64,7 @@ struct tipc_bearer; struct tipc_bc_base; struct tipc_link; struct tipc_name_table; -struct tipc_server; +struct tipc_topsrv; struct tipc_monitor; #define TIPC_MOD_VER "2.0.0" @@ -112,7 +112,7 @@ struct tipc_net { struct list_head dist_queue; /* Topology subscription server */ - struct tipc_server *topsrv; + struct tipc_topsrv *topsrv; atomic_t subscription_count; }; @@ -131,7 +131,7 @@ static inline struct list_head *tipc_nodes(struct net *net) return &tipc_net(net)->node_list; } -static inline struct tipc_server *tipc_topsrv(struct net *net) +static inline struct tipc_topsrv *tipc_topsrv(struct net *net) { return tipc_net(net)->topsrv; } diff --git a/net/tipc/group.c b/net/tipc/group.c index 122162a31816..03086ccb7746 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -37,7 +37,7 @@ #include "addr.h" #include "group.h" #include "bcast.h" -#include "server.h" +#include "topsrv.h" #include "msg.h" #include "socket.h" #include "node.h" diff --git a/net/tipc/server.c b/net/tipc/server.c deleted file mode 100644 index 0e351e81690e..000000000000 --- a/net/tipc/server.c +++ /dev/null @@ -1,700 +0,0 @@ -/* - * net/tipc/server.c: TIPC server infrastructure - * - * Copyright (c) 2012-2013, Wind River Systems - * Copyright (c) 2017, Ericsson AB - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "subscr.h" -#include "server.h" -#include "core.h" -#include "socket.h" -#include "addr.h" -#include "msg.h" -#include -#include - -/* Number of messages to send before rescheduling */ -#define MAX_SEND_MSG_COUNT 25 -#define MAX_RECV_MSG_COUNT 25 -#define CF_CONNECTED 1 -#define CF_SERVER 2 - -#define TIPC_SERVER_NAME_LEN 32 - -/** - * struct tipc_server - TIPC server structure - * @conn_idr: identifier set of connection - * @idr_lock: protect the connection identifier set - * @idr_in_use: amount of allocated identifier entry - * @net: network namspace instance - * @rcvbuf_cache: memory cache of server receive buffer - * @rcv_wq: receive workqueue - * @send_wq: send workqueue - * @max_rcvbuf_size: maximum permitted receive message length - * @tipc_conn_new: callback will be called when new connection is incoming - * @tipc_conn_release: callback will be called before releasing the connection - * @tipc_conn_recvmsg: callback will be called when message arrives - * @name: server name - * @imp: message importance - * @type: socket type - */ -struct tipc_server { - struct idr conn_idr; - spinlock_t idr_lock; /* for idr list */ - int idr_in_use; - struct net *net; - struct work_struct awork; - struct workqueue_struct *rcv_wq; - struct workqueue_struct *send_wq; - int max_rcvbuf_size; - struct socket *listener; - char name[TIPC_SERVER_NAME_LEN]; -}; - -/** - * struct tipc_conn - TIPC connection structure - * @kref: reference counter to connection object - * @conid: connection identifier - * @sock: socket handler associated with connection - * @flags: indicates connection state - * @server: pointer to connected server - * @sub_list: lsit to all pertaing subscriptions - * @sub_lock: lock protecting the subscription list - * @outqueue_lock: control access to the outqueue - * @rwork: receive work item - * @rx_action: what to do when connection socket is active - * @outqueue: pointer to first outbound message in queue - * @outqueue_lock: control access to the outqueue - * @swork: send work item - */ -struct tipc_conn { - struct kref kref; - int conid; - struct socket *sock; - unsigned long flags; - struct tipc_server *server; - struct list_head sub_list; - spinlock_t sub_lock; /* for subscription list */ - struct work_struct rwork; - struct list_head outqueue; - spinlock_t outqueue_lock; - struct work_struct swork; -}; - -/* An entry waiting to be sent */ -struct outqueue_entry { - bool inactive; - struct tipc_event evt; - struct list_head list; -}; - -static void tipc_recv_work(struct work_struct *work); -static void tipc_send_work(struct work_struct *work); - -static bool connected(struct tipc_conn *con) -{ - return con && test_bit(CF_CONNECTED, &con->flags); -} - -static void tipc_conn_kref_release(struct kref *kref) -{ - struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); - struct tipc_server *s = con->server; - struct outqueue_entry *e, *safe; - - spin_lock_bh(&s->idr_lock); - idr_remove(&s->conn_idr, con->conid); - s->idr_in_use--; - spin_unlock_bh(&s->idr_lock); - if (con->sock) - sock_release(con->sock); - - spin_lock_bh(&con->outqueue_lock); - list_for_each_entry_safe(e, safe, &con->outqueue, list) { - list_del(&e->list); - kfree(e); - } - spin_unlock_bh(&con->outqueue_lock); - kfree(con); -} - -static void conn_put(struct tipc_conn *con) -{ - kref_put(&con->kref, tipc_conn_kref_release); -} - -static void conn_get(struct tipc_conn *con) -{ - kref_get(&con->kref); -} - -static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) -{ - struct tipc_conn *con; - - spin_lock_bh(&s->idr_lock); - con = idr_find(&s->conn_idr, conid); - if (!connected(con) || !kref_get_unless_zero(&con->kref)) - con = NULL; - spin_unlock_bh(&s->idr_lock); - return con; -} - -/* sock_data_ready - interrupt callback indicating the socket has data to read - * The queued work is launched into tipc_recv_work()->tipc_recv_from_sock() - */ -static void sock_data_ready(struct sock *sk) -{ - struct tipc_conn *con; - - read_lock_bh(&sk->sk_callback_lock); - con = sk->sk_user_data; - if (connected(con)) { - conn_get(con); - if (!queue_work(con->server->rcv_wq, &con->rwork)) - conn_put(con); - } - read_unlock_bh(&sk->sk_callback_lock); -} - -/* sock_write_space - interrupt callback after a sendmsg EAGAIN - * Indicates that there now is more space in the send buffer - * The queued work is launched into tipc_send_work()->tipc_send_to_sock() - */ -static void sock_write_space(struct sock *sk) -{ - struct tipc_conn *con; - - read_lock_bh(&sk->sk_callback_lock); - con = sk->sk_user_data; - if (connected(con)) { - conn_get(con); - if (!queue_work(con->server->send_wq, &con->swork)) - conn_put(con); - } - read_unlock_bh(&sk->sk_callback_lock); -} - -/* tipc_con_delete_sub - delete a specific or all subscriptions - * for a given subscriber - */ -static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) -{ - struct list_head *sub_list = &con->sub_list; - struct tipc_net *tn = tipc_net(con->server->net); - struct tipc_subscription *sub, *tmp; - - spin_lock_bh(&con->sub_lock); - list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { - if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) { - tipc_sub_unsubscribe(sub); - atomic_dec(&tn->subscription_count); - } else if (s) { - break; - } - } - spin_unlock_bh(&con->sub_lock); -} - -static void tipc_close_conn(struct tipc_conn *con) -{ - struct sock *sk = con->sock->sk; - bool disconnect = false; - - write_lock_bh(&sk->sk_callback_lock); - disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); - - if (disconnect) { - sk->sk_user_data = NULL; - tipc_con_delete_sub(con, NULL); - } - write_unlock_bh(&sk->sk_callback_lock); - - /* Handle concurrent calls from sending and receiving threads */ - if (!disconnect) - return; - - /* Don't flush pending works, -just let them expire */ - kernel_sock_shutdown(con->sock, SHUT_RDWR); - - conn_put(con); -} - -static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) -{ - struct tipc_conn *con; - int ret; - - con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); - if (!con) - return ERR_PTR(-ENOMEM); - - kref_init(&con->kref); - INIT_LIST_HEAD(&con->outqueue); - INIT_LIST_HEAD(&con->sub_list); - spin_lock_init(&con->outqueue_lock); - spin_lock_init(&con->sub_lock); - INIT_WORK(&con->swork, tipc_send_work); - INIT_WORK(&con->rwork, tipc_recv_work); - - spin_lock_bh(&s->idr_lock); - ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC); - if (ret < 0) { - kfree(con); - spin_unlock_bh(&s->idr_lock); - return ERR_PTR(-ENOMEM); - } - con->conid = ret; - s->idr_in_use++; - spin_unlock_bh(&s->idr_lock); - - set_bit(CF_CONNECTED, &con->flags); - con->server = s; - - return con; -} - -static int tipc_con_rcv_sub(struct tipc_server *srv, - struct tipc_conn *con, - struct tipc_subscr *s) -{ - struct tipc_net *tn = tipc_net(srv->net); - struct tipc_subscription *sub; - - if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { - tipc_con_delete_sub(con, s); - return 0; - } - if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { - pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); - return -1; - } - sub = tipc_sub_subscribe(srv->net, s, con->conid); - if (!sub) - return -1; - atomic_inc(&tn->subscription_count); - spin_lock_bh(&con->sub_lock); - list_add(&sub->sub_list, &con->sub_list); - spin_unlock_bh(&con->sub_lock); - return 0; -} - -static int tipc_receive_from_sock(struct tipc_conn *con) -{ - struct tipc_server *srv = con->server; - struct sock *sk = con->sock->sk; - struct msghdr msg = {}; - struct tipc_subscr s; - struct kvec iov; - int ret; - - iov.iov_base = &s; - iov.iov_len = sizeof(s); - msg.msg_name = NULL; - iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); - ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); - if (ret == -EWOULDBLOCK) - return -EWOULDBLOCK; - if (ret > 0) { - read_lock_bh(&sk->sk_callback_lock); - ret = tipc_con_rcv_sub(srv, con, &s); - read_unlock_bh(&sk->sk_callback_lock); - } - if (ret < 0) - tipc_close_conn(con); - - return ret; -} - -/* tipc_conn_queue_evt() - interrupt level call from a subscription instance - * The queued work is launched into tipc_send_work()->tipc_send_to_sock() - */ -void tipc_conn_queue_evt(struct net *net, int conid, - u32 event, struct tipc_event *evt) -{ - struct tipc_server *srv = tipc_topsrv(net); - struct outqueue_entry *e; - struct tipc_conn *con; - - con = tipc_conn_lookup(srv, conid); - if (!con) - return; - - if (!connected(con)) - goto err; - - e = kmalloc(sizeof(*e), GFP_ATOMIC); - if (!e) - goto err; - e->inactive = (event == TIPC_SUBSCR_TIMEOUT); - memcpy(&e->evt, evt, sizeof(*evt)); - spin_lock_bh(&con->outqueue_lock); - list_add_tail(&e->list, &con->outqueue); - spin_unlock_bh(&con->outqueue_lock); - - if (queue_work(srv->send_wq, &con->swork)) - return; -err: - conn_put(con); -} - -bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, - u32 upper, u32 filter, int *conid) -{ - struct tipc_subscr sub; - struct tipc_conn *con; - int rc; - - sub.seq.type = type; - sub.seq.lower = lower; - sub.seq.upper = upper; - sub.timeout = TIPC_WAIT_FOREVER; - sub.filter = filter; - *(u32 *)&sub.usr_handle = port; - - con = tipc_alloc_conn(tipc_topsrv(net)); - if (IS_ERR(con)) - return false; - - *conid = con->conid; - con->sock = NULL; - rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub); - if (rc < 0) - tipc_close_conn(con); - return !rc; -} - -void tipc_topsrv_kern_unsubscr(struct net *net, int conid) -{ - struct tipc_conn *con; - - con = tipc_conn_lookup(tipc_topsrv(net), conid); - if (!con) - return; - - test_and_clear_bit(CF_CONNECTED, &con->flags); - tipc_con_delete_sub(con, NULL); - conn_put(con); - conn_put(con); -} - -static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) -{ - u32 port = *(u32 *)&evt->s.usr_handle; - u32 self = tipc_own_addr(net); - struct sk_buff_head evtq; - struct sk_buff *skb; - - skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), - self, self, port, port, 0); - if (!skb) - return; - msg_set_dest_droppable(buf_msg(skb), true); - memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); - skb_queue_head_init(&evtq); - __skb_queue_tail(&evtq, skb); - tipc_sk_rcv(net, &evtq); -} - -static void tipc_send_to_sock(struct tipc_conn *con) -{ - struct list_head *queue = &con->outqueue; - struct tipc_server *srv = con->server; - struct outqueue_entry *e; - struct tipc_event *evt; - struct msghdr msg; - struct kvec iov; - int count = 0; - int ret; - - spin_lock_bh(&con->outqueue_lock); - - while (!list_empty(queue)) { - e = list_first_entry(queue, struct outqueue_entry, list); - evt = &e->evt; - spin_unlock_bh(&con->outqueue_lock); - - if (e->inactive) - tipc_con_delete_sub(con, &evt->s); - - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT; - iov.iov_base = evt; - iov.iov_len = sizeof(*evt); - msg.msg_name = NULL; - - if (con->sock) { - ret = kernel_sendmsg(con->sock, &msg, &iov, - 1, sizeof(*evt)); - if (ret == -EWOULDBLOCK || ret == 0) { - cond_resched(); - return; - } else if (ret < 0) { - return tipc_close_conn(con); - } - } else { - tipc_send_kern_top_evt(srv->net, evt); - } - - /* Don't starve users filling buffers */ - if (++count >= MAX_SEND_MSG_COUNT) { - cond_resched(); - count = 0; - } - spin_lock_bh(&con->outqueue_lock); - list_del(&e->list); - kfree(e); - } - spin_unlock_bh(&con->outqueue_lock); -} - -static void tipc_recv_work(struct work_struct *work) -{ - struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); - int count = 0; - - while (connected(con)) { - if (tipc_receive_from_sock(con)) - break; - - /* Don't flood Rx machine */ - if (++count >= MAX_RECV_MSG_COUNT) { - cond_resched(); - count = 0; - } - } - conn_put(con); -} - -static void tipc_send_work(struct work_struct *work) -{ - struct tipc_conn *con = container_of(work, struct tipc_conn, swork); - - if (connected(con)) - tipc_send_to_sock(con); - - conn_put(con); -} - -static void tipc_accept_from_sock(struct work_struct *work) -{ - struct tipc_server *srv = container_of(work, struct tipc_server, awork); - struct socket *lsock = srv->listener; - struct socket *newsock; - struct tipc_conn *con; - struct sock *newsk; - int ret; - - while (1) { - ret = kernel_accept(lsock, &newsock, O_NONBLOCK); - if (ret < 0) - return; - con = tipc_alloc_conn(srv); - if (IS_ERR(con)) { - ret = PTR_ERR(con); - sock_release(newsock); - return; - } - /* Register callbacks */ - newsk = newsock->sk; - write_lock_bh(&newsk->sk_callback_lock); - newsk->sk_data_ready = sock_data_ready; - newsk->sk_write_space = sock_write_space; - newsk->sk_user_data = con; - con->sock = newsock; - write_unlock_bh(&newsk->sk_callback_lock); - - /* Wake up receive process in case of 'SYN+' message */ - newsk->sk_data_ready(newsk); - } -} - -/* listener_sock_data_ready - interrupt callback indicating new connection - * The queued job is launched into tipc_accept_from_sock() - */ -static void listener_sock_data_ready(struct sock *sk) -{ - struct tipc_server *srv; - - read_lock_bh(&sk->sk_callback_lock); - srv = sk->sk_user_data; - if (srv->listener) - queue_work(srv->rcv_wq, &srv->awork); - read_unlock_bh(&sk->sk_callback_lock); -} - -static int tipc_create_listener_sock(struct tipc_server *srv) -{ - int imp = TIPC_CRITICAL_IMPORTANCE; - struct socket *lsock = NULL; - struct sockaddr_tipc saddr; - struct sock *sk; - int rc; - - rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock); - if (rc < 0) - return rc; - - srv->listener = lsock; - sk = lsock->sk; - write_lock_bh(&sk->sk_callback_lock); - sk->sk_data_ready = listener_sock_data_ready; - sk->sk_user_data = srv; - write_unlock_bh(&sk->sk_callback_lock); - - rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE, - (char *)&imp, sizeof(imp)); - if (rc < 0) - goto err; - - saddr.family = AF_TIPC; - saddr.addrtype = TIPC_ADDR_NAMESEQ; - saddr.addr.nameseq.type = TIPC_TOP_SRV; - saddr.addr.nameseq.lower = TIPC_TOP_SRV; - saddr.addr.nameseq.upper = TIPC_TOP_SRV; - saddr.scope = TIPC_NODE_SCOPE; - - rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr)); - if (rc < 0) - goto err; - rc = kernel_listen(lsock, 0); - if (rc < 0) - goto err; - - /* As server's listening socket owner and creator is the same module, - * we have to decrease TIPC module reference count to guarantee that - * it remains zero after the server socket is created, otherwise, - * executing "rmmod" command is unable to make TIPC module deleted - * after TIPC module is inserted successfully. - * - * However, the reference count is ever increased twice in - * sock_create_kern(): one is to increase the reference count of owner - * of TIPC socket's proto_ops struct; another is to increment the - * reference count of owner of TIPC proto struct. Therefore, we must - * decrement the module reference count twice to ensure that it keeps - * zero after server's listening socket is created. Of course, we - * must bump the module reference count twice as well before the socket - * is closed. - */ - module_put(lsock->ops->owner); - module_put(sk->sk_prot_creator->owner); - - return 0; -err: - sock_release(lsock); - return -EINVAL; -} - -static int tipc_work_start(struct tipc_server *s) -{ - s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0); - if (!s->rcv_wq) { - pr_err("can't start tipc receive workqueue\n"); - return -ENOMEM; - } - - s->send_wq = alloc_ordered_workqueue("tipc_send", 0); - if (!s->send_wq) { - pr_err("can't start tipc send workqueue\n"); - destroy_workqueue(s->rcv_wq); - return -ENOMEM; - } - - return 0; -} - -static void tipc_work_stop(struct tipc_server *s) -{ - destroy_workqueue(s->rcv_wq); - destroy_workqueue(s->send_wq); -} - -int tipc_topsrv_start(struct net *net) -{ - struct tipc_net *tn = tipc_net(net); - const char name[] = "topology_server"; - struct tipc_server *srv; - int ret; - - srv = kzalloc(sizeof(*srv), GFP_ATOMIC); - if (!srv) - return -ENOMEM; - - srv->net = net; - srv->max_rcvbuf_size = sizeof(struct tipc_subscr); - INIT_WORK(&srv->awork, tipc_accept_from_sock); - - strncpy(srv->name, name, strlen(name) + 1); - tn->topsrv = srv; - atomic_set(&tn->subscription_count, 0); - - spin_lock_init(&srv->idr_lock); - idr_init(&srv->conn_idr); - srv->idr_in_use = 0; - - ret = tipc_work_start(srv); - if (ret < 0) - return ret; - - ret = tipc_create_listener_sock(srv); - if (ret < 0) - tipc_work_stop(srv); - - return ret; -} - -void tipc_topsrv_stop(struct net *net) -{ - struct tipc_server *srv = tipc_topsrv(net); - struct socket *lsock = srv->listener; - struct tipc_conn *con; - int id; - - spin_lock_bh(&srv->idr_lock); - for (id = 0; srv->idr_in_use; id++) { - con = idr_find(&srv->conn_idr, id); - if (con) { - spin_unlock_bh(&srv->idr_lock); - tipc_close_conn(con); - spin_lock_bh(&srv->idr_lock); - } - } - __module_get(lsock->ops->owner); - __module_get(lsock->sk->sk_prot_creator->owner); - sock_release(lsock); - srv->listener = NULL; - spin_unlock_bh(&srv->idr_lock); - tipc_work_stop(srv); - idr_destroy(&srv->conn_idr); - kfree(srv); -} diff --git a/net/tipc/server.h b/net/tipc/server.h deleted file mode 100644 index ce93b6d68e41..000000000000 --- a/net/tipc/server.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * net/tipc/server.h: Include file for TIPC server code - * - * Copyright (c) 2012-2013, Wind River Systems - * Copyright (c) 2017, Ericsson AB - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _TIPC_SERVER_H -#define _TIPC_SERVER_H - -#include "core.h" -#include -#include -#include - -#define TIPC_SERVER_NAME_LEN 32 -#define TIPC_SUB_CLUSTER_SCOPE 0x20 -#define TIPC_SUB_NODE_SCOPE 0x40 -#define TIPC_SUB_NO_STATUS 0x80 - -void tipc_conn_queue_evt(struct net *net, int conid, - u32 event, struct tipc_event *evt); - -bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, - u32 upper, u32 filter, int *conid); -void tipc_topsrv_kern_unsubscr(struct net *net, int conid); - -#endif diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index c8146568d04e..6925a989569b 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -51,7 +51,7 @@ static void tipc_sub_send_event(struct tipc_subscription *sub, tipc_evt_write(evt, found_upper, found_upper); tipc_evt_write(evt, port.ref, port); tipc_evt_write(evt, port.node, node); - tipc_conn_queue_evt(sub->net, sub->conid, event, evt); + tipc_topsrv_queue_evt(sub->net, sub->conid, event, evt); } /** diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 82ba61afe638..8b2d22b18f22 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -37,7 +37,7 @@ #ifndef _TIPC_SUBSCR_H #define _TIPC_SUBSCR_H -#include "server.h" +#include "topsrv.h" #define TIPC_MAX_SUBSCR 65535 #define TIPC_MAX_PUBLICATIONS 65535 diff --git a/net/tipc/topsrv.c b/net/tipc/topsrv.c new file mode 100644 index 000000000000..02013e00f287 --- /dev/null +++ b/net/tipc/topsrv.c @@ -0,0 +1,702 @@ +/* + * net/tipc/server.c: TIPC server infrastructure + * + * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017-2018, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "subscr.h" +#include "topsrv.h" +#include "core.h" +#include "socket.h" +#include "addr.h" +#include "msg.h" +#include +#include + +/* Number of messages to send before rescheduling */ +#define MAX_SEND_MSG_COUNT 25 +#define MAX_RECV_MSG_COUNT 25 +#define CF_CONNECTED 1 +#define CF_SERVER 2 + +#define TIPC_SERVER_NAME_LEN 32 + +/** + * struct tipc_topsrv - TIPC server structure + * @conn_idr: identifier set of connection + * @idr_lock: protect the connection identifier set + * @idr_in_use: amount of allocated identifier entry + * @net: network namspace instance + * @rcvbuf_cache: memory cache of server receive buffer + * @rcv_wq: receive workqueue + * @send_wq: send workqueue + * @max_rcvbuf_size: maximum permitted receive message length + * @tipc_conn_new: callback will be called when new connection is incoming + * @tipc_conn_release: callback will be called before releasing the connection + * @tipc_conn_recvmsg: callback will be called when message arrives + * @name: server name + * @imp: message importance + * @type: socket type + */ +struct tipc_topsrv { + struct idr conn_idr; + spinlock_t idr_lock; /* for idr list */ + int idr_in_use; + struct net *net; + struct work_struct awork; + struct workqueue_struct *rcv_wq; + struct workqueue_struct *send_wq; + int max_rcvbuf_size; + struct socket *listener; + char name[TIPC_SERVER_NAME_LEN]; +}; + +/** + * struct tipc_conn - TIPC connection structure + * @kref: reference counter to connection object + * @conid: connection identifier + * @sock: socket handler associated with connection + * @flags: indicates connection state + * @server: pointer to connected server + * @sub_list: lsit to all pertaing subscriptions + * @sub_lock: lock protecting the subscription list + * @outqueue_lock: control access to the outqueue + * @rwork: receive work item + * @rx_action: what to do when connection socket is active + * @outqueue: pointer to first outbound message in queue + * @outqueue_lock: control access to the outqueue + * @swork: send work item + */ +struct tipc_conn { + struct kref kref; + int conid; + struct socket *sock; + unsigned long flags; + struct tipc_topsrv *server; + struct list_head sub_list; + spinlock_t sub_lock; /* for subscription list */ + struct work_struct rwork; + struct list_head outqueue; + spinlock_t outqueue_lock; /* for outqueue */ + struct work_struct swork; +}; + +/* An entry waiting to be sent */ +struct outqueue_entry { + bool inactive; + struct tipc_event evt; + struct list_head list; +}; + +static void tipc_conn_recv_work(struct work_struct *work); +static void tipc_conn_send_work(struct work_struct *work); +static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt); +static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s); + +static bool connected(struct tipc_conn *con) +{ + return con && test_bit(CF_CONNECTED, &con->flags); +} + +static void tipc_conn_kref_release(struct kref *kref) +{ + struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); + struct tipc_topsrv *s = con->server; + struct outqueue_entry *e, *safe; + + spin_lock_bh(&s->idr_lock); + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + spin_unlock_bh(&s->idr_lock); + if (con->sock) + sock_release(con->sock); + + spin_lock_bh(&con->outqueue_lock); + list_for_each_entry_safe(e, safe, &con->outqueue, list) { + list_del(&e->list); + kfree(e); + } + spin_unlock_bh(&con->outqueue_lock); + kfree(con); +} + +static void conn_put(struct tipc_conn *con) +{ + kref_put(&con->kref, tipc_conn_kref_release); +} + +static void conn_get(struct tipc_conn *con) +{ + kref_get(&con->kref); +} + +static void tipc_conn_close(struct tipc_conn *con) +{ + struct sock *sk = con->sock->sk; + bool disconnect = false; + + write_lock_bh(&sk->sk_callback_lock); + disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); + + if (disconnect) { + sk->sk_user_data = NULL; + tipc_conn_delete_sub(con, NULL); + } + write_unlock_bh(&sk->sk_callback_lock); + + /* Handle concurrent calls from sending and receiving threads */ + if (!disconnect) + return; + + /* Don't flush pending works, -just let them expire */ + kernel_sock_shutdown(con->sock, SHUT_RDWR); + + conn_put(con); +} + +static struct tipc_conn *tipc_conn_alloc(struct tipc_topsrv *s) +{ + struct tipc_conn *con; + int ret; + + con = kzalloc(sizeof(*con), GFP_ATOMIC); + if (!con) + return ERR_PTR(-ENOMEM); + + kref_init(&con->kref); + INIT_LIST_HEAD(&con->outqueue); + INIT_LIST_HEAD(&con->sub_list); + spin_lock_init(&con->outqueue_lock); + spin_lock_init(&con->sub_lock); + INIT_WORK(&con->swork, tipc_conn_send_work); + INIT_WORK(&con->rwork, tipc_conn_recv_work); + + spin_lock_bh(&s->idr_lock); + ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC); + if (ret < 0) { + kfree(con); + spin_unlock_bh(&s->idr_lock); + return ERR_PTR(-ENOMEM); + } + con->conid = ret; + s->idr_in_use++; + spin_unlock_bh(&s->idr_lock); + + set_bit(CF_CONNECTED, &con->flags); + con->server = s; + + return con; +} + +static struct tipc_conn *tipc_conn_lookup(struct tipc_topsrv *s, int conid) +{ + struct tipc_conn *con; + + spin_lock_bh(&s->idr_lock); + con = idr_find(&s->conn_idr, conid); + if (!connected(con) || !kref_get_unless_zero(&con->kref)) + con = NULL; + spin_unlock_bh(&s->idr_lock); + return con; +} + +/* tipc_conn_delete_sub - delete a specific or all subscriptions + * for a given subscriber + */ +static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) +{ + struct tipc_net *tn = tipc_net(con->server->net); + struct list_head *sub_list = &con->sub_list; + struct tipc_subscription *sub, *tmp; + + spin_lock_bh(&con->sub_lock); + list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { + if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) { + tipc_sub_unsubscribe(sub); + atomic_dec(&tn->subscription_count); + } else if (s) { + break; + } + } + spin_unlock_bh(&con->sub_lock); +} + +static void tipc_conn_send_to_sock(struct tipc_conn *con) +{ + struct list_head *queue = &con->outqueue; + struct tipc_topsrv *srv = con->server; + struct outqueue_entry *e; + struct tipc_event *evt; + struct msghdr msg; + struct kvec iov; + int count = 0; + int ret; + + spin_lock_bh(&con->outqueue_lock); + + while (!list_empty(queue)) { + e = list_first_entry(queue, struct outqueue_entry, list); + evt = &e->evt; + spin_unlock_bh(&con->outqueue_lock); + + if (e->inactive) + tipc_conn_delete_sub(con, &evt->s); + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + iov.iov_base = evt; + iov.iov_len = sizeof(*evt); + msg.msg_name = NULL; + + if (con->sock) { + ret = kernel_sendmsg(con->sock, &msg, &iov, + 1, sizeof(*evt)); + if (ret == -EWOULDBLOCK || ret == 0) { + cond_resched(); + return; + } else if (ret < 0) { + return tipc_conn_close(con); + } + } else { + tipc_topsrv_kern_evt(srv->net, evt); + } + + /* Don't starve users filling buffers */ + if (++count >= MAX_SEND_MSG_COUNT) { + cond_resched(); + count = 0; + } + spin_lock_bh(&con->outqueue_lock); + list_del(&e->list); + kfree(e); + } + spin_unlock_bh(&con->outqueue_lock); +} + +static void tipc_conn_send_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, swork); + + if (connected(con)) + tipc_conn_send_to_sock(con); + + conn_put(con); +} + +/* tipc_conn_queue_evt() - interrupt level call from a subscription instance + * The queued work is launched into tipc_send_work()->tipc_send_to_sock() + */ +void tipc_topsrv_queue_evt(struct net *net, int conid, + u32 event, struct tipc_event *evt) +{ + struct tipc_topsrv *srv = tipc_topsrv(net); + struct outqueue_entry *e; + struct tipc_conn *con; + + con = tipc_conn_lookup(srv, conid); + if (!con) + return; + + if (!connected(con)) + goto err; + + e = kmalloc(sizeof(*e), GFP_ATOMIC); + if (!e) + goto err; + e->inactive = (event == TIPC_SUBSCR_TIMEOUT); + memcpy(&e->evt, evt, sizeof(*evt)); + spin_lock_bh(&con->outqueue_lock); + list_add_tail(&e->list, &con->outqueue); + spin_unlock_bh(&con->outqueue_lock); + + if (queue_work(srv->send_wq, &con->swork)) + return; +err: + conn_put(con); +} + +/* tipc_conn_write_space - interrupt callback after a sendmsg EAGAIN + * Indicates that there now is more space in the send buffer + * The queued work is launched into tipc_send_work()->tipc_conn_send_to_sock() + */ +static void tipc_conn_write_space(struct sock *sk) +{ + struct tipc_conn *con; + + read_lock_bh(&sk->sk_callback_lock); + con = sk->sk_user_data; + if (connected(con)) { + conn_get(con); + if (!queue_work(con->server->send_wq, &con->swork)) + conn_put(con); + } + read_unlock_bh(&sk->sk_callback_lock); +} + +static int tipc_conn_rcv_sub(struct tipc_topsrv *srv, + struct tipc_conn *con, + struct tipc_subscr *s) +{ + struct tipc_net *tn = tipc_net(srv->net); + struct tipc_subscription *sub; + + if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { + tipc_conn_delete_sub(con, s); + return 0; + } + if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { + pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); + return -1; + } + sub = tipc_sub_subscribe(srv->net, s, con->conid); + if (!sub) + return -1; + atomic_inc(&tn->subscription_count); + spin_lock_bh(&con->sub_lock); + list_add(&sub->sub_list, &con->sub_list); + spin_unlock_bh(&con->sub_lock); + return 0; +} + +static int tipc_conn_rcv_from_sock(struct tipc_conn *con) +{ + struct tipc_topsrv *srv = con->server; + struct sock *sk = con->sock->sk; + struct msghdr msg = {}; + struct tipc_subscr s; + struct kvec iov; + int ret; + + iov.iov_base = &s; + iov.iov_len = sizeof(s); + msg.msg_name = NULL; + iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); + ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); + if (ret == -EWOULDBLOCK) + return -EWOULDBLOCK; + if (ret > 0) { + read_lock_bh(&sk->sk_callback_lock); + ret = tipc_conn_rcv_sub(srv, con, &s); + read_unlock_bh(&sk->sk_callback_lock); + } + if (ret < 0) + tipc_conn_close(con); + + return ret; +} + +static void tipc_conn_recv_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); + int count = 0; + + while (connected(con)) { + if (tipc_conn_rcv_from_sock(con)) + break; + + /* Don't flood Rx machine */ + if (++count >= MAX_RECV_MSG_COUNT) { + cond_resched(); + count = 0; + } + } + conn_put(con); +} + +/* tipc_conn_data_ready - interrupt callback indicating the socket has data + * The queued work is launched into tipc_recv_work()->tipc_conn_rcv_from_sock() + */ +static void tipc_conn_data_ready(struct sock *sk) +{ + struct tipc_conn *con; + + read_lock_bh(&sk->sk_callback_lock); + con = sk->sk_user_data; + if (connected(con)) { + conn_get(con); + if (!queue_work(con->server->rcv_wq, &con->rwork)) + conn_put(con); + } + read_unlock_bh(&sk->sk_callback_lock); +} + +static void tipc_topsrv_accept(struct work_struct *work) +{ + struct tipc_topsrv *srv = container_of(work, struct tipc_topsrv, awork); + struct socket *lsock = srv->listener; + struct socket *newsock; + struct tipc_conn *con; + struct sock *newsk; + int ret; + + while (1) { + ret = kernel_accept(lsock, &newsock, O_NONBLOCK); + if (ret < 0) + return; + con = tipc_conn_alloc(srv); + if (IS_ERR(con)) { + ret = PTR_ERR(con); + sock_release(newsock); + return; + } + /* Register callbacks */ + newsk = newsock->sk; + write_lock_bh(&newsk->sk_callback_lock); + newsk->sk_data_ready = tipc_conn_data_ready; + newsk->sk_write_space = tipc_conn_write_space; + newsk->sk_user_data = con; + con->sock = newsock; + write_unlock_bh(&newsk->sk_callback_lock); + + /* Wake up receive process in case of 'SYN+' message */ + newsk->sk_data_ready(newsk); + } +} + +/* tipc_toprsv_listener_data_ready - interrupt callback with connection request + * The queued job is launched into tipc_topsrv_accept() + */ +static void tipc_topsrv_listener_data_ready(struct sock *sk) +{ + struct tipc_topsrv *srv; + + read_lock_bh(&sk->sk_callback_lock); + srv = sk->sk_user_data; + if (srv->listener) + queue_work(srv->rcv_wq, &srv->awork); + read_unlock_bh(&sk->sk_callback_lock); +} + +static int tipc_topsrv_create_listener(struct tipc_topsrv *srv) +{ + int imp = TIPC_CRITICAL_IMPORTANCE; + struct socket *lsock = NULL; + struct sockaddr_tipc saddr; + struct sock *sk; + int rc; + + rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock); + if (rc < 0) + return rc; + + srv->listener = lsock; + sk = lsock->sk; + write_lock_bh(&sk->sk_callback_lock); + sk->sk_data_ready = tipc_topsrv_listener_data_ready; + sk->sk_user_data = srv; + write_unlock_bh(&sk->sk_callback_lock); + + rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE, + (char *)&imp, sizeof(imp)); + if (rc < 0) + goto err; + + saddr.family = AF_TIPC; + saddr.addrtype = TIPC_ADDR_NAMESEQ; + saddr.addr.nameseq.type = TIPC_TOP_SRV; + saddr.addr.nameseq.lower = TIPC_TOP_SRV; + saddr.addr.nameseq.upper = TIPC_TOP_SRV; + saddr.scope = TIPC_NODE_SCOPE; + + rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr)); + if (rc < 0) + goto err; + rc = kernel_listen(lsock, 0); + if (rc < 0) + goto err; + + /* As server's listening socket owner and creator is the same module, + * we have to decrease TIPC module reference count to guarantee that + * it remains zero after the server socket is created, otherwise, + * executing "rmmod" command is unable to make TIPC module deleted + * after TIPC module is inserted successfully. + * + * However, the reference count is ever increased twice in + * sock_create_kern(): one is to increase the reference count of owner + * of TIPC socket's proto_ops struct; another is to increment the + * reference count of owner of TIPC proto struct. Therefore, we must + * decrement the module reference count twice to ensure that it keeps + * zero after server's listening socket is created. Of course, we + * must bump the module reference count twice as well before the socket + * is closed. + */ + module_put(lsock->ops->owner); + module_put(sk->sk_prot_creator->owner); + + return 0; +err: + sock_release(lsock); + return -EINVAL; +} + +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, + u32 upper, u32 filter, int *conid) +{ + struct tipc_subscr sub; + struct tipc_conn *con; + int rc; + + sub.seq.type = type; + sub.seq.lower = lower; + sub.seq.upper = upper; + sub.timeout = TIPC_WAIT_FOREVER; + sub.filter = filter; + *(u32 *)&sub.usr_handle = port; + + con = tipc_conn_alloc(tipc_topsrv(net)); + if (IS_ERR(con)) + return false; + + *conid = con->conid; + con->sock = NULL; + rc = tipc_conn_rcv_sub(tipc_topsrv(net), con, &sub); + if (rc < 0) + tipc_conn_close(con); + return !rc; +} + +void tipc_topsrv_kern_unsubscr(struct net *net, int conid) +{ + struct tipc_conn *con; + + con = tipc_conn_lookup(tipc_topsrv(net), conid); + if (!con) + return; + + test_and_clear_bit(CF_CONNECTED, &con->flags); + tipc_conn_delete_sub(con, NULL); + conn_put(con); + conn_put(con); +} + +static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt) +{ + u32 port = *(u32 *)&evt->s.usr_handle; + u32 self = tipc_own_addr(net); + struct sk_buff_head evtq; + struct sk_buff *skb; + + skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), + self, self, port, port, 0); + if (!skb) + return; + msg_set_dest_droppable(buf_msg(skb), true); + memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); + skb_queue_head_init(&evtq); + __skb_queue_tail(&evtq, skb); + tipc_sk_rcv(net, &evtq); +} + +static int tipc_topsrv_work_start(struct tipc_topsrv *s) +{ + s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0); + if (!s->rcv_wq) { + pr_err("can't start tipc receive workqueue\n"); + return -ENOMEM; + } + + s->send_wq = alloc_ordered_workqueue("tipc_send", 0); + if (!s->send_wq) { + pr_err("can't start tipc send workqueue\n"); + destroy_workqueue(s->rcv_wq); + return -ENOMEM; + } + + return 0; +} + +static void tipc_topsrv_work_stop(struct tipc_topsrv *s) +{ + destroy_workqueue(s->rcv_wq); + destroy_workqueue(s->send_wq); +} + +int tipc_topsrv_start(struct net *net) +{ + struct tipc_net *tn = tipc_net(net); + const char name[] = "topology_server"; + struct tipc_topsrv *srv; + int ret; + + srv = kzalloc(sizeof(*srv), GFP_ATOMIC); + if (!srv) + return -ENOMEM; + + srv->net = net; + srv->max_rcvbuf_size = sizeof(struct tipc_subscr); + INIT_WORK(&srv->awork, tipc_topsrv_accept); + + strncpy(srv->name, name, strlen(name) + 1); + tn->topsrv = srv; + atomic_set(&tn->subscription_count, 0); + + spin_lock_init(&srv->idr_lock); + idr_init(&srv->conn_idr); + srv->idr_in_use = 0; + + ret = tipc_topsrv_work_start(srv); + if (ret < 0) + return ret; + + ret = tipc_topsrv_create_listener(srv); + if (ret < 0) + tipc_topsrv_work_stop(srv); + + return ret; +} + +void tipc_topsrv_stop(struct net *net) +{ + struct tipc_topsrv *srv = tipc_topsrv(net); + struct socket *lsock = srv->listener; + struct tipc_conn *con; + int id; + + spin_lock_bh(&srv->idr_lock); + for (id = 0; srv->idr_in_use; id++) { + con = idr_find(&srv->conn_idr, id); + if (con) { + spin_unlock_bh(&srv->idr_lock); + tipc_conn_close(con); + spin_lock_bh(&srv->idr_lock); + } + } + __module_get(lsock->ops->owner); + __module_get(lsock->sk->sk_prot_creator->owner); + sock_release(lsock); + srv->listener = NULL; + spin_unlock_bh(&srv->idr_lock); + tipc_topsrv_work_stop(srv); + idr_destroy(&srv->conn_idr); + kfree(srv); +} diff --git a/net/tipc/topsrv.h b/net/tipc/topsrv.h new file mode 100644 index 000000000000..c7ea71293748 --- /dev/null +++ b/net/tipc/topsrv.h @@ -0,0 +1,54 @@ +/* + * net/tipc/server.h: Include file for TIPC server code + * + * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_SERVER_H +#define _TIPC_SERVER_H + +#include "core.h" + +#define TIPC_SERVER_NAME_LEN 32 +#define TIPC_SUB_CLUSTER_SCOPE 0x20 +#define TIPC_SUB_NODE_SCOPE 0x40 +#define TIPC_SUB_NO_STATUS 0x80 + +void tipc_topsrv_queue_evt(struct net *net, int conid, + u32 event, struct tipc_event *evt); + +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, + u32 upper, u32 filter, int *conid); +void tipc_topsrv_kern_unsubscr(struct net *net, int conid); + +#endif -- cgit v1.2.3-70-g09d2 From 218527fe27adaebeb81eb770459eb335517e90ee Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 29 Mar 2018 23:20:41 +0200 Subject: tipc: replace name table service range array with rb tree The current design of the binding table has an unnecessary memory consuming and complex data structure. It aggregates the service range items into an array, which is expanded by a factor two every time it becomes too small to hold a new item. Furthermore, the arrays never shrink when the number of ranges diminishes. We now replace this array with an RB tree that is holding the range items as tree nodes, each range directly holding a list of bindings. This, along with a few name changes, improves both readability and volume of the code, as well as reducing memory consumption and hopefully improving cache hit rate. Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/core.h | 1 + net/tipc/link.c | 2 +- net/tipc/name_table.c | 1032 ++++++++++++++++++++++--------------------------- net/tipc/name_table.h | 2 +- net/tipc/node.c | 4 +- net/tipc/subscr.h | 4 +- 6 files changed, 477 insertions(+), 568 deletions(-) (limited to 'net/tipc/subscr.h') diff --git a/net/tipc/core.h b/net/tipc/core.h index d0f64ca62d02..8020a6c360ff 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -58,6 +58,7 @@ #include #include #include +#include struct tipc_node; struct tipc_bearer; diff --git a/net/tipc/link.c b/net/tipc/link.c index 1289b4ba404f..8f2a9496439b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1810,7 +1810,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) { - int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); + int max_bulk = TIPC_MAX_PUBL / (l->mtu / ITEM_SIZE); l->window = win; l->backlog[TIPC_LOW_IMPORTANCE].limit = max_t(u16, 50, win); diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 4359605b1bec..e06c7a8907aa 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -44,52 +44,40 @@ #include "addr.h" #include "node.h" #include "group.h" -#include - -#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ /** - * struct name_info - name sequence publication info - * @node_list: list of publications on own node of this - * @all_publ: list of all publications of this + * struct service_range - container for all bindings of a service range + * @lower: service range lower bound + * @upper: service range upper bound + * @tree_node: member of service range RB tree + * @local_publ: list of identical publications made from this node + * Used by closest_first lookup and multicast lookup algorithm + * @all_publ: all publications identical to this one, whatever node and scope + * Used by round-robin lookup algorithm */ -struct name_info { - struct list_head local_publ; - struct list_head all_publ; -}; - -/** - * struct sub_seq - container for all published instances of a name sequence - * @lower: name sequence lower bound - * @upper: name sequence upper bound - * @info: pointer to name sequence publication info - */ -struct sub_seq { +struct service_range { u32 lower; u32 upper; - struct name_info *info; + struct rb_node tree_node; + struct list_head local_publ; + struct list_head all_publ; }; /** - * struct name_seq - container for all published instances of a name type - * @type: 32 bit 'type' value for name sequence - * @sseq: pointer to dynamically-sized array of sub-sequences of this 'type'; - * sub-sequences are sorted in ascending order - * @alloc: number of sub-sequences currently in array - * @first_free: array index of first unused sub-sequence entry - * @ns_list: links to adjacent name sequences in hash chain - * @subscriptions: list of subscriptions for this 'type' - * @lock: spinlock controlling access to publication lists of all sub-sequences + * struct tipc_service - container for all published instances of a service type + * @type: 32 bit 'type' value for service + * @ranges: rb tree containing all service ranges for this service + * @service_list: links to adjacent name ranges in hash chain + * @subscriptions: list of subscriptions for this service type + * @lock: spinlock controlling access to pertaining service ranges/publications * @rcu: RCU callback head used for deferred freeing */ -struct name_seq { +struct tipc_service { u32 type; - struct sub_seq *sseqs; - u32 alloc; - u32 first_free; - struct hlist_node ns_list; + struct rb_root ranges; + struct hlist_node service_list; struct list_head subscriptions; - spinlock_t lock; + spinlock_t lock; /* Covers service range list */ struct rcu_head rcu; }; @@ -99,17 +87,16 @@ static int hash(int x) } /** - * publ_create - create a publication structure + * tipc_publ_create - create a publication structure */ -static struct publication *publ_create(u32 type, u32 lower, u32 upper, - u32 scope, u32 node, u32 port, - u32 key) +static struct publication *tipc_publ_create(u32 type, u32 lower, u32 upper, + u32 scope, u32 node, u32 port, + u32 key) { struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC); - if (publ == NULL) { - pr_warn("Publication creation failure, no memory\n"); + + if (!publ) return NULL; - } publ->type = type; publ->lower = lower; @@ -119,372 +106,298 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper, publ->port = port; publ->key = key; INIT_LIST_HEAD(&publ->binding_sock); + INIT_LIST_HEAD(&publ->binding_node); + INIT_LIST_HEAD(&publ->local_publ); + INIT_LIST_HEAD(&publ->all_publ); return publ; } /** - * tipc_subseq_alloc - allocate a specified number of sub-sequence structures - */ -static struct sub_seq *tipc_subseq_alloc(u32 cnt) -{ - return kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC); -} - -/** - * tipc_nameseq_create - create a name sequence structure for the specified 'type' + * tipc_service_create - create a service structure for the specified 'type' * - * Allocates a single sub-sequence structure and sets it to all 0's. + * Allocates a single range structure and sets it to all 0's. */ -static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_head) +static struct tipc_service *tipc_service_create(u32 type, struct hlist_head *hd) { - struct name_seq *nseq = kzalloc(sizeof(*nseq), GFP_ATOMIC); - struct sub_seq *sseq = tipc_subseq_alloc(1); + struct tipc_service *service = kzalloc(sizeof(*service), GFP_ATOMIC); - if (!nseq || !sseq) { - pr_warn("Name sequence creation failed, no memory\n"); - kfree(nseq); - kfree(sseq); + if (!service) { + pr_warn("Service creation failed, no memory\n"); return NULL; } - spin_lock_init(&nseq->lock); - nseq->type = type; - nseq->sseqs = sseq; - nseq->alloc = 1; - INIT_HLIST_NODE(&nseq->ns_list); - INIT_LIST_HEAD(&nseq->subscriptions); - hlist_add_head_rcu(&nseq->ns_list, seq_head); - return nseq; + spin_lock_init(&service->lock); + service->type = type; + service->ranges = RB_ROOT; + INIT_HLIST_NODE(&service->service_list); + INIT_LIST_HEAD(&service->subscriptions); + hlist_add_head_rcu(&service->service_list, hd); + return service; } /** - * nameseq_find_subseq - find sub-sequence (if any) matching a name instance + * tipc_service_find_range - find service range matching a service instance * - * Very time-critical, so binary searches through sub-sequence array. + * Very time-critical, so binary search through range rb tree */ -static struct sub_seq *nameseq_find_subseq(struct name_seq *nseq, - u32 instance) +static struct service_range *tipc_service_find_range(struct tipc_service *sc, + u32 instance) { - struct sub_seq *sseqs = nseq->sseqs; - int low = 0; - int high = nseq->first_free - 1; - int mid; - - while (low <= high) { - mid = (low + high) / 2; - if (instance < sseqs[mid].lower) - high = mid - 1; - else if (instance > sseqs[mid].upper) - low = mid + 1; + struct rb_node *n = sc->ranges.rb_node; + struct service_range *sr; + + while (n) { + sr = container_of(n, struct service_range, tree_node); + if (sr->lower > instance) + n = n->rb_left; + else if (sr->upper < instance) + n = n->rb_right; else - return &sseqs[mid]; + return sr; } return NULL; } -/** - * nameseq_locate_subseq - determine position of name instance in sub-sequence - * - * Returns index in sub-sequence array of the entry that contains the specified - * instance value; if no entry contains that value, returns the position - * where a new entry for it would be inserted in the array. - * - * Note: Similar to binary search code for locating a sub-sequence. - */ -static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance) +static struct service_range *tipc_service_create_range(struct tipc_service *sc, + u32 lower, u32 upper) { - struct sub_seq *sseqs = nseq->sseqs; - int low = 0; - int high = nseq->first_free - 1; - int mid; - - while (low <= high) { - mid = (low + high) / 2; - if (instance < sseqs[mid].lower) - high = mid - 1; - else if (instance > sseqs[mid].upper) - low = mid + 1; + struct rb_node **n, *parent = NULL; + struct service_range *sr, *tmp; + + n = &sc->ranges.rb_node; + while (*n) { + tmp = container_of(*n, struct service_range, tree_node); + parent = *n; + tmp = container_of(parent, struct service_range, tree_node); + if (lower < tmp->lower) + n = &(*n)->rb_left; + else if (upper > tmp->upper) + n = &(*n)->rb_right; else - return mid; + return NULL; } - return low; + sr = kzalloc(sizeof(*sr), GFP_ATOMIC); + if (!sr) + return NULL; + sr->lower = lower; + sr->upper = upper; + INIT_LIST_HEAD(&sr->local_publ); + INIT_LIST_HEAD(&sr->all_publ); + rb_link_node(&sr->tree_node, parent, n); + rb_insert_color(&sr->tree_node, &sc->ranges); + return sr; } -/** - * tipc_nameseq_insert_publ - */ -static struct publication *tipc_nameseq_insert_publ(struct net *net, - struct name_seq *nseq, +static struct publication *tipc_service_insert_publ(struct net *net, + struct tipc_service *sc, u32 type, u32 lower, u32 upper, u32 scope, - u32 node, u32 port, u32 key) + u32 node, u32 port, + u32 key) { - struct tipc_subscription *s; - struct tipc_subscription *st; - struct publication *publ; - struct sub_seq *sseq; - struct name_info *info; - int created_subseq = 0; - - sseq = nameseq_find_subseq(nseq, lower); - if (sseq) { - - /* Lower end overlaps existing entry => need an exact match */ - if ((sseq->lower != lower) || (sseq->upper != upper)) { - return NULL; - } - - info = sseq->info; - - /* Check if an identical publication already exists */ - list_for_each_entry(publ, &info->all_publ, all_publ) { - if (publ->port == port && publ->key == key && - (!publ->node || publ->node == node)) - return NULL; - } - } else { - u32 inspos; - struct sub_seq *freesseq; - - /* Find where lower end should be inserted */ - inspos = nameseq_locate_subseq(nseq, lower); - - /* Fail if upper end overlaps into an existing entry */ - if ((inspos < nseq->first_free) && - (upper >= nseq->sseqs[inspos].lower)) { - return NULL; - } - - /* Ensure there is space for new sub-sequence */ - if (nseq->first_free == nseq->alloc) { - struct sub_seq *sseqs = tipc_subseq_alloc(nseq->alloc * 2); - - if (!sseqs) { - pr_warn("Cannot publish {%u,%u,%u}, no memory\n", - type, lower, upper); - return NULL; - } - memcpy(sseqs, nseq->sseqs, - nseq->alloc * sizeof(struct sub_seq)); - kfree(nseq->sseqs); - nseq->sseqs = sseqs; - nseq->alloc *= 2; - } - - info = kzalloc(sizeof(*info), GFP_ATOMIC); - if (!info) { - pr_warn("Cannot publish {%u,%u,%u}, no memory\n", - type, lower, upper); - return NULL; - } - - INIT_LIST_HEAD(&info->local_publ); - INIT_LIST_HEAD(&info->all_publ); - - /* Insert new sub-sequence */ - sseq = &nseq->sseqs[inspos]; - freesseq = &nseq->sseqs[nseq->first_free]; - memmove(sseq + 1, sseq, (freesseq - sseq) * sizeof(*sseq)); - memset(sseq, 0, sizeof(*sseq)); - nseq->first_free++; - sseq->lower = lower; - sseq->upper = upper; - sseq->info = info; - created_subseq = 1; + struct tipc_subscription *sub, *tmp; + struct service_range *sr; + struct publication *p; + bool first = false; + + sr = tipc_service_find_range(sc, lower); + if (!sr) { + sr = tipc_service_create_range(sc, lower, upper); + if (!sr) + goto err; + first = true; } - /* Insert a publication */ - publ = publ_create(type, lower, upper, scope, node, port, key); - if (!publ) + /* Lower end overlaps existing entry, but we need an exact match */ + if (sr->lower != lower || sr->upper != upper) return NULL; - list_add(&publ->all_publ, &info->all_publ); + /* Return if the publication already exists */ + list_for_each_entry(p, &sr->all_publ, all_publ) { + if (p->key == key && (!p->node || p->node == node)) + return NULL; + } + /* Create and insert publication */ + p = tipc_publ_create(type, lower, upper, scope, node, port, key); + if (!p) + goto err; if (in_own_node(net, node)) - list_add(&publ->local_publ, &info->local_publ); + list_add(&p->local_publ, &sr->local_publ); + list_add(&p->all_publ, &sr->all_publ); /* Any subscriptions waiting for notification? */ - list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { - tipc_sub_report_overlap(s, publ->lower, publ->upper, - TIPC_PUBLISHED, publ->port, - publ->node, publ->scope, - created_subseq); + list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) { + tipc_sub_report_overlap(sub, p->lower, p->upper, TIPC_PUBLISHED, + p->port, p->node, p->scope, first); } - return publ; + return p; +err: + pr_warn("Failed to bind to %u,%u,%u, no memory\n", type, lower, upper); + return NULL; } /** - * tipc_nameseq_remove_publ + * tipc_service_remove_publ - remove a publication from a service * * NOTE: There may be cases where TIPC is asked to remove a publication * that is not in the name table. For example, if another node issues a - * publication for a name sequence that overlaps an existing name sequence + * publication for a name range that overlaps an existing name range * the publication will not be recorded, which means the publication won't - * be found when the name sequence is later withdrawn by that node. + * be found when the name range is later withdrawn by that node. * A failed withdraw request simply returns a failure indication and lets the * caller issue any error or warning messages associated with such a problem. */ -static struct publication *tipc_nameseq_remove_publ(struct net *net, - struct name_seq *nseq, +static struct publication *tipc_service_remove_publ(struct net *net, + struct tipc_service *sc, u32 inst, u32 node, u32 port, u32 key) { - struct publication *publ; - struct sub_seq *sseq = nameseq_find_subseq(nseq, inst); - struct name_info *info; - struct sub_seq *free; - struct tipc_subscription *s, *st; - int removed_subseq = 0; - - if (!sseq) - return NULL; + struct tipc_subscription *sub, *tmp; + struct service_range *sr; + struct publication *p; + bool found = false; + bool last = false; - info = sseq->info; + sr = tipc_service_find_range(sc, inst); + if (!sr) + return NULL; - /* Locate publication, if it exists */ - list_for_each_entry(publ, &info->all_publ, all_publ) { - if (publ->key == key && publ->port == port && - (!publ->node || publ->node == node)) - goto found; + /* Find publication, if it exists */ + list_for_each_entry(p, &sr->all_publ, all_publ) { + if (p->key != key || (node && node != p->node)) + continue; + found = true; + break; } - return NULL; + if (!found) + return NULL; -found: - list_del(&publ->all_publ); - if (in_own_node(net, node)) - list_del(&publ->local_publ); - - /* Contract subseq list if no more publications for that subseq */ - if (list_empty(&info->all_publ)) { - kfree(info); - free = &nseq->sseqs[nseq->first_free--]; - memmove(sseq, sseq + 1, (free - (sseq + 1)) * sizeof(*sseq)); - removed_subseq = 1; + list_del(&p->all_publ); + list_del(&p->local_publ); + + /* Remove service range item if this was its last publication */ + if (list_empty(&sr->all_publ)) { + last = true; + rb_erase(&sr->tree_node, &sc->ranges); + kfree(sr); } /* Notify any waiting subscriptions */ - list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { - tipc_sub_report_overlap(s, publ->lower, publ->upper, - TIPC_WITHDRAWN, publ->port, - publ->node, publ->scope, - removed_subseq); + list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) { + tipc_sub_report_overlap(sub, p->lower, p->upper, TIPC_WITHDRAWN, + p->port, p->node, p->scope, last); } - - return publ; + return p; } /** - * tipc_nameseq_subscribe - attach a subscription, and optionally - * issue the prescribed number of events if there is any sub- - * sequence overlapping with the requested sequence + * tipc_service_subscribe - attach a subscription, and optionally + * issue the prescribed number of events if there is any service + * range overlapping with the requested range */ -static void tipc_nameseq_subscribe(struct name_seq *nseq, +static void tipc_service_subscribe(struct tipc_service *service, struct tipc_subscription *sub) { - struct sub_seq *sseq = nseq->sseqs; + struct tipc_subscr *sb = &sub->evt.s; + struct service_range *sr; struct tipc_name_seq ns; - struct tipc_subscr *s = &sub->evt.s; - bool no_status; + struct publication *p; + struct rb_node *n; + bool first; - ns.type = tipc_sub_read(s, seq.type); - ns.lower = tipc_sub_read(s, seq.lower); - ns.upper = tipc_sub_read(s, seq.upper); - no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS; + ns.type = tipc_sub_read(sb, seq.type); + ns.lower = tipc_sub_read(sb, seq.lower); + ns.upper = tipc_sub_read(sb, seq.upper); tipc_sub_get(sub); - list_add(&sub->nameseq_list, &nseq->subscriptions); + list_add(&sub->service_list, &service->subscriptions); - if (no_status || !sseq) + if (tipc_sub_read(sb, filter) & TIPC_SUB_NO_STATUS) return; - while (sseq != &nseq->sseqs[nseq->first_free]) { - if (tipc_sub_check_overlap(&ns, sseq->lower, sseq->upper)) { - struct publication *crs; - struct name_info *info = sseq->info; - int must_report = 1; - - list_for_each_entry(crs, &info->all_publ, all_publ) { - tipc_sub_report_overlap(sub, sseq->lower, - sseq->upper, - TIPC_PUBLISHED, - crs->port, - crs->node, - crs->scope, - must_report); - must_report = 0; - } + for (n = rb_first(&service->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->lower > ns.upper) + break; + if (!tipc_sub_check_overlap(&ns, sr->lower, sr->upper)) + continue; + first = true; + + list_for_each_entry(p, &sr->all_publ, all_publ) { + tipc_sub_report_overlap(sub, sr->lower, sr->upper, + TIPC_PUBLISHED, p->port, + p->node, p->scope, first); + first = false; } - sseq++; } } -static struct name_seq *nametbl_find_seq(struct net *net, u32 type) +static struct tipc_service *tipc_service_find(struct net *net, u32 type) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct hlist_head *seq_head; - struct name_seq *ns; - - seq_head = &tn->nametbl->seq_hlist[hash(type)]; - hlist_for_each_entry_rcu(ns, seq_head, ns_list) { - if (ns->type == type) - return ns; + struct name_table *nt = tipc_name_table(net); + struct hlist_head *service_head; + struct tipc_service *service; + + service_head = &nt->services[hash(type)]; + hlist_for_each_entry_rcu(service, service_head, service_list) { + if (service->type == type) + return service; } - return NULL; }; struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type, - u32 lower, u32 upper, u32 scope, - u32 node, u32 port, u32 key) + u32 lower, u32 upper, + u32 scope, u32 node, + u32 port, u32 key) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct publication *publ; - struct name_seq *seq = nametbl_find_seq(net, type); - int index = hash(type); + struct name_table *nt = tipc_name_table(net); + struct tipc_service *sc; + struct publication *p; if (scope > TIPC_NODE_SCOPE || lower > upper) { - pr_debug("Failed to publish illegal {%u,%u,%u} with scope %u\n", + pr_debug("Failed to bind illegal {%u,%u,%u} with scope %u\n", type, lower, upper, scope); return NULL; } - - if (!seq) - seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); - if (!seq) + sc = tipc_service_find(net, type); + if (!sc) + sc = tipc_service_create(type, &nt->services[hash(type)]); + if (!sc) return NULL; - spin_lock_bh(&seq->lock); - publ = tipc_nameseq_insert_publ(net, seq, type, lower, upper, - scope, node, port, key); - spin_unlock_bh(&seq->lock); - return publ; + spin_lock_bh(&sc->lock); + p = tipc_service_insert_publ(net, sc, type, lower, upper, + scope, node, port, key); + spin_unlock_bh(&sc->lock); + return p; } struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, - u32 lower, u32 node, u32 port, - u32 key) + u32 lower, u32 node, u32 port, + u32 key) { - struct publication *publ; - struct name_seq *seq = nametbl_find_seq(net, type); + struct tipc_service *sc = tipc_service_find(net, type); + struct publication *p = NULL; - if (!seq) + if (!sc) return NULL; - spin_lock_bh(&seq->lock); - publ = tipc_nameseq_remove_publ(net, seq, lower, node, port, key); - if (!seq->first_free && list_empty(&seq->subscriptions)) { - hlist_del_init_rcu(&seq->ns_list); - kfree(seq->sseqs); - spin_unlock_bh(&seq->lock); - kfree_rcu(seq, rcu); - return publ; + spin_lock_bh(&sc->lock); + p = tipc_service_remove_publ(net, sc, lower, node, port, key); + + /* Delete service item if this no more publications and subscriptions */ + if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) { + hlist_del_init_rcu(&sc->service_list); + kfree_rcu(sc, rcu); } - spin_unlock_bh(&seq->lock); - return publ; + spin_unlock_bh(&sc->lock); + return p; } /** - * tipc_nametbl_translate - perform name translation + * tipc_nametbl_translate - perform service instance to socket translation * * On entry, 'destnode' is the search domain used during translation. * @@ -492,7 +405,7 @@ struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, * - if name translation is deferred to another node/cluster/zone, * leaves 'destnode' unchanged (will be non-zero) and returns 0 * - if name translation is attempted and succeeds, sets 'destnode' - * to publishing node and returns port reference (will be non-zero) + * to publication node and returns port reference (will be non-zero) * - if name translation is attempted and fails, sets 'destnode' to 0 * and returns 0 */ @@ -502,10 +415,9 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, struct tipc_net *tn = tipc_net(net); bool legacy = tn->legacy_addr_format; u32 self = tipc_own_addr(net); - struct sub_seq *sseq; - struct name_info *info; - struct publication *publ; - struct name_seq *seq; + struct service_range *sr; + struct tipc_service *sc; + struct publication *p; u32 port = 0; u32 node = 0; @@ -513,49 +425,49 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, return 0; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (unlikely(!seq)) + sc = tipc_service_find(net, type); + if (unlikely(!sc)) goto not_found; - spin_lock_bh(&seq->lock); - sseq = nameseq_find_subseq(seq, instance); - if (unlikely(!sseq)) + + spin_lock_bh(&sc->lock); + sr = tipc_service_find_range(sc, instance); + if (unlikely(!sr)) goto no_match; - info = sseq->info; /* Closest-First Algorithm */ if (legacy && !*destnode) { - if (!list_empty(&info->local_publ)) { - publ = list_first_entry(&info->local_publ, - struct publication, - local_publ); - list_move_tail(&publ->local_publ, - &info->local_publ); + if (!list_empty(&sr->local_publ)) { + p = list_first_entry(&sr->local_publ, + struct publication, + local_publ); + list_move_tail(&p->local_publ, + &sr->local_publ); } else { - publ = list_first_entry(&info->all_publ, - struct publication, - all_publ); - list_move_tail(&publ->all_publ, - &info->all_publ); + p = list_first_entry(&sr->all_publ, + struct publication, + all_publ); + list_move_tail(&p->all_publ, + &sr->all_publ); } } /* Round-Robin Algorithm */ - else if (*destnode == tipc_own_addr(net)) { - if (list_empty(&info->local_publ)) + else if (*destnode == self) { + if (list_empty(&sr->local_publ)) goto no_match; - publ = list_first_entry(&info->local_publ, struct publication, - local_publ); - list_move_tail(&publ->local_publ, &info->local_publ); + p = list_first_entry(&sr->local_publ, struct publication, + local_publ); + list_move_tail(&p->local_publ, &sr->local_publ); } else { - publ = list_first_entry(&info->all_publ, struct publication, - all_publ); - list_move_tail(&publ->all_publ, &info->all_publ); + p = list_first_entry(&sr->all_publ, struct publication, + all_publ); + list_move_tail(&p->all_publ, &sr->all_publ); } - port = publ->port; - node = publ->node; + port = p->port; + node = p->node; no_match: - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&sc->lock); not_found: rcu_read_unlock(); *destnode = node; @@ -567,34 +479,36 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope, bool all) { u32 self = tipc_own_addr(net); - struct publication *publ; - struct name_info *info; - struct name_seq *seq; - struct sub_seq *sseq; + struct service_range *sr; + struct tipc_service *sc; + struct publication *p; *dstcnt = 0; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (unlikely(!seq)) + sc = tipc_service_find(net, type); + if (unlikely(!sc)) goto exit; - spin_lock_bh(&seq->lock); - sseq = nameseq_find_subseq(seq, instance); - if (likely(sseq)) { - info = sseq->info; - list_for_each_entry(publ, &info->all_publ, all_publ) { - if (publ->scope != scope) - continue; - if (publ->port == exclude && publ->node == self) - continue; - tipc_dest_push(dsts, publ->node, publ->port); - (*dstcnt)++; - if (all) - continue; - list_move_tail(&publ->all_publ, &info->all_publ); - break; - } + + spin_lock_bh(&sc->lock); + + sr = tipc_service_find_range(sc, instance); + if (!sr) + goto no_match; + + list_for_each_entry(p, &sr->all_publ, all_publ) { + if (p->scope != scope) + continue; + if (p->port == exclude && p->node == self) + continue; + tipc_dest_push(dsts, p->node, p->port); + (*dstcnt)++; + if (all) + continue; + list_move_tail(&p->all_publ, &sr->all_publ); + break; } - spin_unlock_bh(&seq->lock); +no_match: + spin_unlock_bh(&sc->lock); exit: rcu_read_unlock(); return !list_empty(dsts); @@ -603,61 +517,64 @@ exit: void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper, u32 scope, bool exact, struct list_head *dports) { - struct sub_seq *sseq_stop; - struct name_info *info; + struct service_range *sr; + struct tipc_service *sc; struct publication *p; - struct name_seq *seq; - struct sub_seq *sseq; + struct rb_node *n; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (!seq) + sc = tipc_service_find(net, type); + if (!sc) goto exit; - spin_lock_bh(&seq->lock); - sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); - sseq_stop = seq->sseqs + seq->first_free; - for (; sseq != sseq_stop; sseq++) { - if (sseq->lower > upper) + spin_lock_bh(&sc->lock); + + for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->upper < lower) + continue; + if (sr->lower > upper) break; - info = sseq->info; - list_for_each_entry(p, &info->local_publ, local_publ) { + list_for_each_entry(p, &sr->local_publ, local_publ) { if (p->scope == scope || (!exact && p->scope < scope)) tipc_dest_push(dports, 0, p->port); } } - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&sc->lock); exit: rcu_read_unlock(); } /* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes * - Creates list of nodes that overlap the given multicast address - * - Determines if any node local ports overlap + * - Determines if any node local destinations overlap */ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, u32 upper, struct tipc_nlist *nodes) { - struct sub_seq *sseq, *stop; - struct publication *publ; - struct name_info *info; - struct name_seq *seq; + struct service_range *sr; + struct tipc_service *sc; + struct publication *p; + struct rb_node *n; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (!seq) + sc = tipc_service_find(net, type); + if (!sc) goto exit; - spin_lock_bh(&seq->lock); - sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); - stop = seq->sseqs + seq->first_free; - for (; sseq != stop && sseq->lower <= upper; sseq++) { - info = sseq->info; - list_for_each_entry(publ, &info->all_publ, all_publ) { - tipc_nlist_add(nodes, publ->node); + spin_lock_bh(&sc->lock); + + for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->upper < lower) + continue; + if (sr->lower > upper) + break; + list_for_each_entry(p, &sr->all_publ, all_publ) { + tipc_nlist_add(nodes, p->node); } } - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&sc->lock); exit: rcu_read_unlock(); } @@ -667,89 +584,88 @@ exit: void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, u32 type, u32 scope) { - struct sub_seq *sseq, *stop; - struct name_info *info; + struct service_range *sr; + struct tipc_service *sc; struct publication *p; - struct name_seq *seq; + struct rb_node *n; rcu_read_lock(); - seq = nametbl_find_seq(net, type); - if (!seq) + sc = tipc_service_find(net, type); + if (!sc) goto exit; - spin_lock_bh(&seq->lock); - sseq = seq->sseqs; - stop = seq->sseqs + seq->first_free; - for (; sseq != stop; sseq++) { - info = sseq->info; - list_for_each_entry(p, &info->all_publ, all_publ) { + spin_lock_bh(&sc->lock); + for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + list_for_each_entry(p, &sr->all_publ, all_publ) { if (p->scope != scope) continue; tipc_group_add_member(grp, p->node, p->port, p->lower); } } - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&sc->lock); exit: rcu_read_unlock(); } -/* - * tipc_nametbl_publish - add name publication to network name tables +/* tipc_nametbl_publish - add service binding to name table */ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, - u32 upper, u32 scope, u32 port_ref, + u32 upper, u32 scope, u32 port, u32 key) { - struct publication *publ; - struct sk_buff *buf = NULL; - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct name_table *nt = tipc_name_table(net); + struct tipc_net *tn = tipc_net(net); + struct publication *p = NULL; + struct sk_buff *skb = NULL; spin_lock_bh(&tn->nametbl_lock); - if (tn->nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) { - pr_warn("Publication failed, local publication limit reached (%u)\n", - TIPC_MAX_PUBLICATIONS); - spin_unlock_bh(&tn->nametbl_lock); - return NULL; + + if (nt->local_publ_count >= TIPC_MAX_PUBL) { + pr_warn("Bind failed, max limit %u reached\n", TIPC_MAX_PUBL); + goto exit; } - publ = tipc_nametbl_insert_publ(net, type, lower, upper, scope, - tipc_own_addr(net), port_ref, key); - if (likely(publ)) { - tn->nametbl->local_publ_count++; - buf = tipc_named_publish(net, publ); + p = tipc_nametbl_insert_publ(net, type, lower, upper, scope, + tipc_own_addr(net), port, key); + if (p) { + nt->local_publ_count++; + skb = tipc_named_publish(net, p); /* Any pending external events? */ tipc_named_process_backlog(net); } +exit: spin_unlock_bh(&tn->nametbl_lock); - if (buf) - tipc_node_broadcast(net, buf); - return publ; + if (skb) + tipc_node_broadcast(net, skb); + return p; } /** - * tipc_nametbl_withdraw - withdraw name publication from network name tables + * tipc_nametbl_withdraw - withdraw a service binding */ -int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port, - u32 key) +int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, + u32 port, u32 key) { - struct publication *publ; + struct name_table *nt = tipc_name_table(net); + struct tipc_net *tn = tipc_net(net); + u32 self = tipc_own_addr(net); struct sk_buff *skb = NULL; - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct publication *p; spin_lock_bh(&tn->nametbl_lock); - publ = tipc_nametbl_remove_publ(net, type, lower, tipc_own_addr(net), - port, key); - if (likely(publ)) { - tn->nametbl->local_publ_count--; - skb = tipc_named_withdraw(net, publ); + + p = tipc_nametbl_remove_publ(net, type, lower, self, port, key); + if (p) { + nt->local_publ_count--; + skb = tipc_named_withdraw(net, p); /* Any pending external events? */ tipc_named_process_backlog(net); - list_del_init(&publ->binding_sock); - kfree_rcu(publ, rcu); + list_del_init(&p->binding_sock); + kfree_rcu(p, rcu); } else { - pr_err("Unable to remove local publication\n" - "(type=%u, lower=%u, port=%u, key=%u)\n", + pr_err("Failed to remove local publication {%u,%u,%u}/%u\n", type, lower, port, key); } spin_unlock_bh(&tn->nametbl_lock); @@ -766,27 +682,24 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port, */ void tipc_nametbl_subscribe(struct tipc_subscription *sub) { + struct name_table *nt = tipc_name_table(sub->net); struct tipc_net *tn = tipc_net(sub->net); struct tipc_subscr *s = &sub->evt.s; u32 type = tipc_sub_read(s, seq.type); - int index = hash(type); - struct name_seq *seq; - struct tipc_name_seq ns; + struct tipc_service *sc; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(sub->net, type); - if (!seq) - seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); - if (seq) { - spin_lock_bh(&seq->lock); - tipc_nameseq_subscribe(seq, sub); - spin_unlock_bh(&seq->lock); + sc = tipc_service_find(sub->net, type); + if (!sc) + sc = tipc_service_create(type, &nt->services[hash(type)]); + if (sc) { + spin_lock_bh(&sc->lock); + tipc_service_subscribe(sc, sub); + spin_unlock_bh(&sc->lock); } else { - ns.type = tipc_sub_read(s, seq.type); - ns.lower = tipc_sub_read(s, seq.lower); - ns.upper = tipc_sub_read(s, seq.upper); - pr_warn("Failed to create subscription for {%u,%u,%u}\n", - ns.type, ns.lower, ns.upper); + pr_warn("Failed to subscribe for {%u,%u,%u}\n", type, + tipc_sub_read(s, seq.lower), + tipc_sub_read(s, seq.upper)); } spin_unlock_bh(&tn->nametbl_lock); } @@ -796,124 +709,122 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) */ void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) { - struct tipc_subscr *s = &sub->evt.s; struct tipc_net *tn = tipc_net(sub->net); - struct name_seq *seq; + struct tipc_subscr *s = &sub->evt.s; u32 type = tipc_sub_read(s, seq.type); + struct tipc_service *sc; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(sub->net, type); - if (seq != NULL) { - spin_lock_bh(&seq->lock); - list_del_init(&sub->nameseq_list); - tipc_sub_put(sub); - if (!seq->first_free && list_empty(&seq->subscriptions)) { - hlist_del_init_rcu(&seq->ns_list); - kfree(seq->sseqs); - spin_unlock_bh(&seq->lock); - kfree_rcu(seq, rcu); - } else { - spin_unlock_bh(&seq->lock); - } + sc = tipc_service_find(sub->net, type); + if (!sc) + goto exit; + + spin_lock_bh(&sc->lock); + list_del_init(&sub->service_list); + tipc_sub_put(sub); + + /* Delete service item if no more publications and subscriptions */ + if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) { + hlist_del_init_rcu(&sc->service_list); + kfree_rcu(sc, rcu); } + spin_unlock_bh(&sc->lock); +exit: spin_unlock_bh(&tn->nametbl_lock); } int tipc_nametbl_init(struct net *net) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct name_table *tipc_nametbl; + struct tipc_net *tn = tipc_net(net); + struct name_table *nt; int i; - tipc_nametbl = kzalloc(sizeof(*tipc_nametbl), GFP_ATOMIC); - if (!tipc_nametbl) + nt = kzalloc(sizeof(*nt), GFP_ATOMIC); + if (!nt) return -ENOMEM; for (i = 0; i < TIPC_NAMETBL_SIZE; i++) - INIT_HLIST_HEAD(&tipc_nametbl->seq_hlist[i]); + INIT_HLIST_HEAD(&nt->services[i]); - INIT_LIST_HEAD(&tipc_nametbl->node_scope); - INIT_LIST_HEAD(&tipc_nametbl->cluster_scope); - tn->nametbl = tipc_nametbl; + INIT_LIST_HEAD(&nt->node_scope); + INIT_LIST_HEAD(&nt->cluster_scope); + tn->nametbl = nt; spin_lock_init(&tn->nametbl_lock); return 0; } /** - * tipc_purge_publications - remove all publications for a given type - * - * tipc_nametbl_lock must be held when calling this function + * tipc_service_delete - purge all publications for a service and delete it */ -static void tipc_purge_publications(struct net *net, struct name_seq *seq) +static void tipc_service_delete(struct net *net, struct tipc_service *sc) { - struct publication *publ, *safe; - struct sub_seq *sseq; - struct name_info *info; - - spin_lock_bh(&seq->lock); - sseq = seq->sseqs; - info = sseq->info; - list_for_each_entry_safe(publ, safe, &info->all_publ, all_publ) { - tipc_nameseq_remove_publ(net, seq, publ->lower, publ->node, - publ->port, publ->key); - kfree_rcu(publ, rcu); + struct service_range *sr, *tmpr; + struct publication *p, *tmpb; + + spin_lock_bh(&sc->lock); + rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) { + list_for_each_entry_safe(p, tmpb, + &sr->all_publ, all_publ) { + tipc_service_remove_publ(net, sc, p->lower, p->node, + p->port, p->key); + kfree_rcu(p, rcu); + } } - hlist_del_init_rcu(&seq->ns_list); - kfree(seq->sseqs); - spin_unlock_bh(&seq->lock); - - kfree_rcu(seq, rcu); + hlist_del_init_rcu(&sc->service_list); + spin_unlock_bh(&sc->lock); + kfree_rcu(sc, rcu); } void tipc_nametbl_stop(struct net *net) { + struct name_table *nt = tipc_name_table(net); + struct tipc_net *tn = tipc_net(net); + struct hlist_head *service_head; + struct tipc_service *service; u32 i; - struct name_seq *seq; - struct hlist_head *seq_head; - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct name_table *tipc_nametbl = tn->nametbl; /* Verify name table is empty and purge any lingering * publications, then release the name table */ spin_lock_bh(&tn->nametbl_lock); for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { - if (hlist_empty(&tipc_nametbl->seq_hlist[i])) + if (hlist_empty(&nt->services[i])) continue; - seq_head = &tipc_nametbl->seq_hlist[i]; - hlist_for_each_entry_rcu(seq, seq_head, ns_list) { - tipc_purge_publications(net, seq); + service_head = &nt->services[i]; + hlist_for_each_entry_rcu(service, service_head, service_list) { + tipc_service_delete(net, service); } } spin_unlock_bh(&tn->nametbl_lock); synchronize_net(); - kfree(tipc_nametbl); - + kfree(nt); } static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, - struct name_seq *seq, - struct sub_seq *sseq, u32 *last_publ) + struct tipc_service *service, + struct service_range *sr, + u32 *last_key) { - void *hdr; - struct nlattr *attrs; - struct nlattr *publ; struct publication *p; + struct nlattr *attrs; + struct nlattr *b; + void *hdr; - if (*last_publ) { - list_for_each_entry(p, &sseq->info->all_publ, all_publ) - if (p->key == *last_publ) + if (*last_key) { + list_for_each_entry(p, &sr->all_publ, all_publ) + if (p->key == *last_key) break; - if (p->key != *last_publ) + if (p->key != *last_key) return -EPIPE; } else { - p = list_first_entry(&sseq->info->all_publ, struct publication, + p = list_first_entry(&sr->all_publ, + struct publication, all_publ); } - list_for_each_entry_from(p, &sseq->info->all_publ, all_publ) { - *last_publ = p->key; + list_for_each_entry_from(p, &sr->all_publ, all_publ) { + *last_key = p->key; hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, NLM_F_MULTI, @@ -925,15 +836,15 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, if (!attrs) goto msg_full; - publ = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL); - if (!publ) + b = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL); + if (!b) goto attr_msg_full; - if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, seq->type)) + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, service->type)) goto publ_msg_full; - if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sseq->lower)) + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sr->lower)) goto publ_msg_full; - if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sseq->upper)) + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sr->upper)) goto publ_msg_full; if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope)) goto publ_msg_full; @@ -944,16 +855,16 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key)) goto publ_msg_full; - nla_nest_end(msg->skb, publ); + nla_nest_end(msg->skb, b); nla_nest_end(msg->skb, attrs); genlmsg_end(msg->skb, hdr); } - *last_publ = 0; + *last_key = 0; return 0; publ_msg_full: - nla_nest_cancel(msg->skb, publ); + nla_nest_cancel(msg->skb, b); attr_msg_full: nla_nest_cancel(msg->skb, attrs); msg_full: @@ -962,39 +873,34 @@ msg_full: return -EMSGSIZE; } -static int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq, - u32 *last_lower, u32 *last_publ) +static int __tipc_nl_service_range_list(struct tipc_nl_msg *msg, + struct tipc_service *sc, + u32 *last_lower, u32 *last_key) { - struct sub_seq *sseq; - struct sub_seq *sseq_start; + struct service_range *sr; + struct rb_node *n; int err; - if (*last_lower) { - sseq_start = nameseq_find_subseq(seq, *last_lower); - if (!sseq_start) - return -EPIPE; - } else { - sseq_start = seq->sseqs; - } - - for (sseq = sseq_start; sseq != &seq->sseqs[seq->first_free]; sseq++) { - err = __tipc_nl_add_nametable_publ(msg, seq, sseq, last_publ); + for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->lower < *last_lower) + continue; + err = __tipc_nl_add_nametable_publ(msg, sc, sr, last_key); if (err) { - *last_lower = sseq->lower; + *last_lower = sr->lower; return err; } } *last_lower = 0; - return 0; } -static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, - u32 *last_type, u32 *last_lower, u32 *last_publ) +static int tipc_nl_service_list(struct net *net, struct tipc_nl_msg *msg, + u32 *last_type, u32 *last_lower, u32 *last_key) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct hlist_head *seq_head; - struct name_seq *seq = NULL; + struct tipc_net *tn = tipc_net(net); + struct tipc_service *service = NULL; + struct hlist_head *head; int err; int i; @@ -1004,30 +910,31 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, i = 0; for (; i < TIPC_NAMETBL_SIZE; i++) { - seq_head = &tn->nametbl->seq_hlist[i]; + head = &tn->nametbl->services[i]; if (*last_type) { - seq = nametbl_find_seq(net, *last_type); - if (!seq) + service = tipc_service_find(net, *last_type); + if (!service) return -EPIPE; } else { - hlist_for_each_entry_rcu(seq, seq_head, ns_list) + hlist_for_each_entry_rcu(service, head, service_list) break; - if (!seq) + if (!service) continue; } - hlist_for_each_entry_from_rcu(seq, ns_list) { - spin_lock_bh(&seq->lock); - err = __tipc_nl_subseq_list(msg, seq, last_lower, - last_publ); + hlist_for_each_entry_from_rcu(service, service_list) { + spin_lock_bh(&service->lock); + err = __tipc_nl_service_range_list(msg, service, + last_lower, + last_key); if (err) { - *last_type = seq->type; - spin_unlock_bh(&seq->lock); + *last_type = service->type; + spin_unlock_bh(&service->lock); return err; } - spin_unlock_bh(&seq->lock); + spin_unlock_bh(&service->lock); } *last_type = 0; } @@ -1036,13 +943,13 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) { - int err; - int done = cb->args[3]; + struct net *net = sock_net(skb->sk); u32 last_type = cb->args[0]; u32 last_lower = cb->args[1]; - u32 last_publ = cb->args[2]; - struct net *net = sock_net(skb->sk); + u32 last_key = cb->args[2]; + int done = cb->args[3]; struct tipc_nl_msg msg; + int err; if (done) return 0; @@ -1052,7 +959,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) msg.seq = cb->nlh->nlmsg_seq; rcu_read_lock(); - err = tipc_nl_seq_list(net, &msg, &last_type, &last_lower, &last_publ); + err = tipc_nl_service_list(net, &msg, &last_type, + &last_lower, &last_key); if (!err) { done = 1; } else if (err != -EMSGSIZE) { @@ -1068,7 +976,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) cb->args[0] = last_type; cb->args[1] = last_lower; - cb->args[2] = last_publ; + cb->args[2] = last_key; cb->args[3] = done; return skb->len; diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 34a4ccb907aa..1b03b8751707 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -97,7 +97,7 @@ struct publication { * @local_publ_count: number of publications issued by this node */ struct name_table { - struct hlist_head seq_hlist[TIPC_NAMETBL_SIZE]; + struct hlist_head services[TIPC_NAMETBL_SIZE]; struct list_head node_scope; struct list_head cluster_scope; u32 local_publ_count; diff --git a/net/tipc/node.c b/net/tipc/node.c index 4fb4327311bb..85e777e00ec9 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -324,12 +324,12 @@ static void tipc_node_write_unlock(struct tipc_node *n) if (flags & TIPC_NOTIFY_LINK_UP) { tipc_mon_peer_up(net, addr, bearer_id); tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr, - TIPC_NODE_SCOPE, link_id, addr); + TIPC_NODE_SCOPE, link_id, link_id); } if (flags & TIPC_NOTIFY_LINK_DOWN) { tipc_mon_peer_down(net, addr, bearer_id); tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, - link_id, addr); + link_id, link_id); } } diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 8b2d22b18f22..d793b4343885 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -40,7 +40,7 @@ #include "topsrv.h" #define TIPC_MAX_SUBSCR 65535 -#define TIPC_MAX_PUBLICATIONS 65535 +#define TIPC_MAX_PUBL 65535 struct tipc_subscription; struct tipc_conn; @@ -58,7 +58,7 @@ struct tipc_subscription { struct kref kref; struct net *net; struct timer_list timer; - struct list_head nameseq_list; + struct list_head service_list; struct list_head sub_list; struct tipc_event evt; int conid; -- cgit v1.2.3-70-g09d2