diff options
Diffstat (limited to 'net/rds')
-rw-r--r-- | net/rds/bind.c | 6 | ||||
-rw-r--r-- | net/rds/cong.c | 3 | ||||
-rw-r--r-- | net/rds/connection.c | 329 | ||||
-rw-r--r-- | net/rds/ib.c | 9 | ||||
-rw-r--r-- | net/rds/ib.h | 8 | ||||
-rw-r--r-- | net/rds/ib_cm.c | 9 | ||||
-rw-r--r-- | net/rds/ib_rdma.c | 3 | ||||
-rw-r--r-- | net/rds/ib_recv.c | 4 | ||||
-rw-r--r-- | net/rds/ib_send.c | 4 | ||||
-rw-r--r-- | net/rds/loop.c | 15 | ||||
-rw-r--r-- | net/rds/message.c | 1 | ||||
-rw-r--r-- | net/rds/rdma_transport.c | 1 | ||||
-rw-r--r-- | net/rds/rds.h | 178 | ||||
-rw-r--r-- | net/rds/rds_single_path.h | 30 | ||||
-rw-r--r-- | net/rds/recv.c | 106 | ||||
-rw-r--r-- | net/rds/send.c | 356 | ||||
-rw-r--r-- | net/rds/tcp.c | 160 | ||||
-rw-r--r-- | net/rds/tcp.h | 23 | ||||
-rw-r--r-- | net/rds/tcp_connect.c | 43 | ||||
-rw-r--r-- | net/rds/tcp_listen.c | 76 | ||||
-rw-r--r-- | net/rds/tcp_recv.c | 38 | ||||
-rw-r--r-- | net/rds/tcp_send.c | 39 | ||||
-rw-r--r-- | net/rds/threads.c | 105 |
23 files changed, 1035 insertions, 511 deletions
diff --git a/net/rds/bind.c b/net/rds/bind.c index b22ea956522b..095f6ce583fe 100644 --- a/net/rds/bind.c +++ b/net/rds/bind.c @@ -81,6 +81,8 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port) if (*port != 0) { rover = be16_to_cpu(*port); + if (rover == RDS_FLAG_PROBE_PORT) + return -EINVAL; last = rover; } else { rover = max_t(u16, prandom_u32(), 2); @@ -91,12 +93,16 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port) if (rover == 0) rover++; + if (rover == RDS_FLAG_PROBE_PORT) + continue; key = ((u64)addr << 32) | cpu_to_be16(rover); if (rhashtable_lookup_fast(&bind_hash_table, &key, ht_parms)) continue; rs->rs_bound_key = key; rs->rs_bound_addr = addr; + net_get_random_once(&rs->rs_hash_initval, + sizeof(rs->rs_hash_initval)); rs->rs_bound_port = cpu_to_be16(rover); rs->rs_bound_node.next = NULL; rds_sock_addref(rs); diff --git a/net/rds/cong.c b/net/rds/cong.c index 6641bcf7c185..8398fee7c866 100644 --- a/net/rds/cong.c +++ b/net/rds/cong.c @@ -235,7 +235,8 @@ void rds_cong_queue_updates(struct rds_cong_map *map) * therefore trigger warnings. * Defer the xmit to rds_send_worker() instead. */ - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + queue_delayed_work(rds_wq, + &conn->c_path[0].cp_send_w, 0); } } diff --git a/net/rds/connection.c b/net/rds/connection.c index e3b118cae81d..f5058559bb08 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -95,14 +95,16 @@ static struct rds_connection *rds_conn_lookup(struct net *net, * and receiving over this connection again in the future. It is up to * the transport to have serialized this call with its send and recv. */ -static void rds_conn_reset(struct rds_connection *conn) +static void rds_conn_path_reset(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; + rdsdebug("connection %pI4 to %pI4 reset\n", &conn->c_laddr, &conn->c_faddr); rds_stats_inc(s_conn_reset); - rds_send_reset(conn); - conn->c_flags = 0; + rds_send_path_reset(cp); + cp->cp_flags = 0; /* Do not clear next_rx_seq here, else we cannot distinguish * retransmitted packets from new packets, and will hand all @@ -110,6 +112,32 @@ static void rds_conn_reset(struct rds_connection *conn) * reliability guarantees of RDS. */ } +static void __rds_conn_path_init(struct rds_connection *conn, + struct rds_conn_path *cp, bool is_outgoing) +{ + spin_lock_init(&cp->cp_lock); + cp->cp_next_tx_seq = 1; + init_waitqueue_head(&cp->cp_waitq); + INIT_LIST_HEAD(&cp->cp_send_queue); + INIT_LIST_HEAD(&cp->cp_retrans); + + cp->cp_conn = conn; + atomic_set(&cp->cp_state, RDS_CONN_DOWN); + cp->cp_send_gen = 0; + /* cp_outgoing is per-path. So we can only set it here + * for the single-path transports. + */ + if (!conn->c_trans->t_mp_capable) + cp->cp_outgoing = (is_outgoing ? 1 : 0); + cp->cp_reconnect_jiffies = 0; + INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker); + INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker); + INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker); + INIT_WORK(&cp->cp_down_w, rds_shutdown_worker); + mutex_init(&cp->cp_cm_lock); + cp->cp_flags = 0; +} + /* * There is only every one 'conn' for a given pair of addresses in the * system at a time. They contain messages to be retransmitted and so @@ -127,7 +155,7 @@ static struct rds_connection *__rds_conn_create(struct net *net, struct hlist_head *head = rds_conn_bucket(laddr, faddr); struct rds_transport *loop_trans; unsigned long flags; - int ret; + int ret, i; rcu_read_lock(); conn = rds_conn_lookup(net, head, laddr, faddr, trans); @@ -153,13 +181,8 @@ static struct rds_connection *__rds_conn_create(struct net *net, INIT_HLIST_NODE(&conn->c_hash_node); conn->c_laddr = laddr; conn->c_faddr = faddr; - spin_lock_init(&conn->c_lock); - conn->c_next_tx_seq = 1; - rds_conn_net_set(conn, net); - init_waitqueue_head(&conn->c_waitq); - INIT_LIST_HEAD(&conn->c_send_queue); - INIT_LIST_HEAD(&conn->c_retrans); + rds_conn_net_set(conn, net); ret = rds_cong_get_maps(conn); if (ret) { @@ -188,6 +211,12 @@ static struct rds_connection *__rds_conn_create(struct net *net, conn->c_trans = trans; + init_waitqueue_head(&conn->c_hs_waitq); + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + __rds_conn_path_init(conn, &conn->c_path[i], + is_outgoing); + conn->c_path[i].cp_index = i; + } ret = trans->conn_alloc(conn, gfp); if (ret) { kmem_cache_free(rds_conn_slab, conn); @@ -195,17 +224,6 @@ static struct rds_connection *__rds_conn_create(struct net *net, goto out; } - atomic_set(&conn->c_state, RDS_CONN_DOWN); - conn->c_send_gen = 0; - conn->c_outgoing = (is_outgoing ? 1 : 0); - conn->c_reconnect_jiffies = 0; - INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); - INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); - INIT_DELAYED_WORK(&conn->c_conn_w, rds_connect_worker); - INIT_WORK(&conn->c_down_w, rds_shutdown_worker); - mutex_init(&conn->c_cm_lock); - conn->c_flags = 0; - rdsdebug("allocated conn %p for %pI4 -> %pI4 over %s %s\n", conn, &laddr, &faddr, trans->t_name ? trans->t_name : "[unknown]", @@ -222,7 +240,7 @@ static struct rds_connection *__rds_conn_create(struct net *net, if (parent) { /* Creating passive conn */ if (parent->c_passive) { - trans->conn_free(conn->c_transport_data); + trans->conn_free(conn->c_path[0].cp_transport_data); kmem_cache_free(rds_conn_slab, conn); conn = parent->c_passive; } else { @@ -236,7 +254,18 @@ static struct rds_connection *__rds_conn_create(struct net *net, found = rds_conn_lookup(net, head, laddr, faddr, trans); if (found) { - trans->conn_free(conn->c_transport_data); + struct rds_conn_path *cp; + int i; + + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + /* The ->conn_alloc invocation may have + * allocated resource for all paths, so all + * of them may have to be freed here. + */ + if (cp->cp_transport_data) + trans->conn_free(cp->cp_transport_data); + } kmem_cache_free(rds_conn_slab, conn); conn = found; } else { @@ -267,10 +296,12 @@ struct rds_connection *rds_conn_create_outgoing(struct net *net, } EXPORT_SYMBOL_GPL(rds_conn_create_outgoing); -void rds_conn_shutdown(struct rds_connection *conn) +void rds_conn_shutdown(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; + /* shut it down unless it's down already */ - if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) { + if (!rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_DOWN)) { /* * Quiesce the connection mgmt handlers before we start tearing * things down. We don't hold the mutex for the entire @@ -278,35 +309,38 @@ void rds_conn_shutdown(struct rds_connection *conn) * deadlocking with the CM handler. Instead, the CM event * handler is supposed to check for state DISCONNECTING */ - mutex_lock(&conn->c_cm_lock); - if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING) - && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) { - rds_conn_error(conn, "shutdown called in state %d\n", - atomic_read(&conn->c_state)); - mutex_unlock(&conn->c_cm_lock); + mutex_lock(&cp->cp_cm_lock); + if (!rds_conn_path_transition(cp, RDS_CONN_UP, + RDS_CONN_DISCONNECTING) && + !rds_conn_path_transition(cp, RDS_CONN_ERROR, + RDS_CONN_DISCONNECTING)) { + rds_conn_path_error(cp, + "shutdown called in state %d\n", + atomic_read(&cp->cp_state)); + mutex_unlock(&cp->cp_cm_lock); return; } - mutex_unlock(&conn->c_cm_lock); + mutex_unlock(&cp->cp_cm_lock); - wait_event(conn->c_waitq, - !test_bit(RDS_IN_XMIT, &conn->c_flags)); - wait_event(conn->c_waitq, - !test_bit(RDS_RECV_REFILL, &conn->c_flags)); + wait_event(cp->cp_waitq, + !test_bit(RDS_IN_XMIT, &cp->cp_flags)); + wait_event(cp->cp_waitq, + !test_bit(RDS_RECV_REFILL, &cp->cp_flags)); - conn->c_trans->conn_shutdown(conn); - rds_conn_reset(conn); + conn->c_trans->conn_path_shutdown(cp); + rds_conn_path_reset(cp); - if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) { + if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING, + RDS_CONN_DOWN)) { /* This can happen - eg when we're in the middle of tearing * down the connection, and someone unloads the rds module. * Quite reproduceable with loopback connections. * Mostly harmless. */ - rds_conn_error(conn, - "%s: failed to transition to state DOWN, " - "current state is %d\n", - __func__, - atomic_read(&conn->c_state)); + rds_conn_path_error(cp, "%s: failed to transition " + "to state DOWN, current state " + "is %d\n", __func__, + atomic_read(&cp->cp_state)); return; } } @@ -315,18 +349,47 @@ void rds_conn_shutdown(struct rds_connection *conn) * The passive side of an IB loopback connection is never added * to the conn hash, so we never trigger a reconnect on this * conn - the reconnect is always triggered by the active peer. */ - cancel_delayed_work_sync(&conn->c_conn_w); + cancel_delayed_work_sync(&cp->cp_conn_w); rcu_read_lock(); if (!hlist_unhashed(&conn->c_hash_node)) { rcu_read_unlock(); - if (conn->c_trans->t_type != RDS_TRANS_TCP || - conn->c_outgoing == 1) - rds_queue_reconnect(conn); + rds_queue_reconnect(cp); } else { rcu_read_unlock(); } } +/* destroy a single rds_conn_path. rds_conn_destroy() iterates over + * all paths using rds_conn_path_destroy() + */ +static void rds_conn_path_destroy(struct rds_conn_path *cp) +{ + struct rds_message *rm, *rtmp; + + if (!cp->cp_transport_data) + return; + + rds_conn_path_drop(cp); + flush_work(&cp->cp_down_w); + + /* make sure lingering queued work won't try to ref the conn */ + cancel_delayed_work_sync(&cp->cp_send_w); + cancel_delayed_work_sync(&cp->cp_recv_w); + + /* tear down queued messages */ + list_for_each_entry_safe(rm, rtmp, + &cp->cp_send_queue, + m_conn_item) { + list_del_init(&rm->m_conn_item); + BUG_ON(!list_empty(&rm->m_sock_item)); + rds_message_put(rm); + } + if (cp->cp_xmit_rm) + rds_message_put(cp->cp_xmit_rm); + + cp->cp_conn->c_trans->conn_free(cp->cp_transport_data); +} + /* * Stop and free a connection. * @@ -336,8 +399,9 @@ void rds_conn_shutdown(struct rds_connection *conn) */ void rds_conn_destroy(struct rds_connection *conn) { - struct rds_message *rm, *rtmp; unsigned long flags; + int i; + struct rds_conn_path *cp; rdsdebug("freeing conn %p for %pI4 -> " "%pI4\n", conn, &conn->c_laddr, @@ -350,25 +414,11 @@ void rds_conn_destroy(struct rds_connection *conn) synchronize_rcu(); /* shut the connection down */ - rds_conn_drop(conn); - flush_work(&conn->c_down_w); - - /* make sure lingering queued work won't try to ref the conn */ - cancel_delayed_work_sync(&conn->c_send_w); - cancel_delayed_work_sync(&conn->c_recv_w); - - /* tear down queued messages */ - list_for_each_entry_safe(rm, rtmp, - &conn->c_send_queue, - m_conn_item) { - list_del_init(&rm->m_conn_item); - BUG_ON(!list_empty(&rm->m_sock_item)); - rds_message_put(rm); + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + rds_conn_path_destroy(cp); + BUG_ON(!list_empty(&cp->cp_retrans)); } - if (conn->c_xmit_rm) - rds_message_put(conn->c_xmit_rm); - - conn->c_trans->conn_free(conn->c_transport_data); /* * The congestion maps aren't freed up here. They're @@ -377,7 +427,6 @@ void rds_conn_destroy(struct rds_connection *conn) */ rds_cong_remove_conn(conn); - BUG_ON(!list_empty(&conn->c_retrans)); kmem_cache_free(rds_conn_slab, conn); spin_lock_irqsave(&rds_conn_lock, flags); @@ -398,6 +447,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, unsigned int total = 0; unsigned long flags; size_t i; + int j; len /= sizeof(struct rds_info_message); @@ -406,23 +456,32 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); i++, head++) { hlist_for_each_entry_rcu(conn, head, c_hash_node) { - if (want_send) - list = &conn->c_send_queue; - else - list = &conn->c_retrans; - - spin_lock_irqsave(&conn->c_lock, flags); - - /* XXX too lazy to maintain counts.. */ - list_for_each_entry(rm, list, m_conn_item) { - total++; - if (total <= len) - rds_inc_info_copy(&rm->m_inc, iter, - conn->c_laddr, - conn->c_faddr, 0); + struct rds_conn_path *cp; + + for (j = 0; j < RDS_MPATH_WORKERS; j++) { + cp = &conn->c_path[j]; + if (want_send) + list = &cp->cp_send_queue; + else + list = &cp->cp_retrans; + + spin_lock_irqsave(&cp->cp_lock, flags); + + /* XXX too lazy to maintain counts.. */ + list_for_each_entry(rm, list, m_conn_item) { + total++; + if (total <= len) + rds_inc_info_copy(&rm->m_inc, + iter, + conn->c_laddr, + conn->c_faddr, + 0); + } + + spin_unlock_irqrestore(&cp->cp_lock, flags); + if (!conn->c_trans->t_mp_capable) + break; } - - spin_unlock_irqrestore(&conn->c_lock, flags); } } rcu_read_unlock(); @@ -484,27 +543,72 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, } EXPORT_SYMBOL_GPL(rds_for_each_conn_info); -static int rds_conn_info_visitor(struct rds_connection *conn, - void *buffer) +void rds_walk_conn_path_info(struct socket *sock, unsigned int len, + struct rds_info_iterator *iter, + struct rds_info_lengths *lens, + int (*visitor)(struct rds_conn_path *, void *), + size_t item_len) +{ + u64 buffer[(item_len + 7) / 8]; + struct hlist_head *head; + struct rds_connection *conn; + size_t i; + int j; + + rcu_read_lock(); + + lens->nr = 0; + lens->each = item_len; + + for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); + i++, head++) { + hlist_for_each_entry_rcu(conn, head, c_hash_node) { + struct rds_conn_path *cp; + + for (j = 0; j < RDS_MPATH_WORKERS; j++) { + cp = &conn->c_path[j]; + + /* XXX no cp_lock usage.. */ + if (!visitor(cp, buffer)) + continue; + if (!conn->c_trans->t_mp_capable) + break; + } + + /* We copy as much as we can fit in the buffer, + * but we count all items so that the caller + * can resize the buffer. + */ + if (len >= item_len) { + rds_info_copy(iter, buffer, item_len); + len -= item_len; + } + lens->nr++; + } + } + rcu_read_unlock(); +} + +static int rds_conn_info_visitor(struct rds_conn_path *cp, void *buffer) { struct rds_info_connection *cinfo = buffer; - cinfo->next_tx_seq = conn->c_next_tx_seq; - cinfo->next_rx_seq = conn->c_next_rx_seq; - cinfo->laddr = conn->c_laddr; - cinfo->faddr = conn->c_faddr; - strncpy(cinfo->transport, conn->c_trans->t_name, + cinfo->next_tx_seq = cp->cp_next_tx_seq; + cinfo->next_rx_seq = cp->cp_next_rx_seq; + cinfo->laddr = cp->cp_conn->c_laddr; + cinfo->faddr = cp->cp_conn->c_faddr; + strncpy(cinfo->transport, cp->cp_conn->c_trans->t_name, sizeof(cinfo->transport)); cinfo->flags = 0; - rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags), + rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags), SENDING); /* XXX Future: return the state rather than these funky bits */ rds_conn_info_set(cinfo->flags, - atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, + atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING, CONNECTING); rds_conn_info_set(cinfo->flags, - atomic_read(&conn->c_state) == RDS_CONN_UP, + atomic_read(&cp->cp_state) == RDS_CONN_UP, CONNECTED); return 1; } @@ -513,7 +617,7 @@ static void rds_conn_info(struct socket *sock, unsigned int len, struct rds_info_iterator *iter, struct rds_info_lengths *lens) { - rds_for_each_conn_info(sock, len, iter, lens, + rds_walk_conn_path_info(sock, len, iter, lens, rds_conn_info_visitor, sizeof(struct rds_info_connection)); } @@ -553,10 +657,17 @@ void rds_conn_exit(void) /* * Force a disconnect */ +void rds_conn_path_drop(struct rds_conn_path *cp) +{ + atomic_set(&cp->cp_state, RDS_CONN_ERROR); + queue_work(rds_wq, &cp->cp_down_w); +} +EXPORT_SYMBOL_GPL(rds_conn_path_drop); + void rds_conn_drop(struct rds_connection *conn) { - atomic_set(&conn->c_state, RDS_CONN_ERROR); - queue_work(rds_wq, &conn->c_down_w); + WARN_ON(conn->c_trans->t_mp_capable); + rds_conn_path_drop(&conn->c_path[0]); } EXPORT_SYMBOL_GPL(rds_conn_drop); @@ -564,11 +675,17 @@ EXPORT_SYMBOL_GPL(rds_conn_drop); * If the connection is down, trigger a connect. We may have scheduled a * delayed reconnect however - in this case we should not interfere. */ +void rds_conn_path_connect_if_down(struct rds_conn_path *cp) +{ + if (rds_conn_path_state(cp) == RDS_CONN_DOWN && + !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); +} + void rds_conn_connect_if_down(struct rds_connection *conn) { - if (rds_conn_state(conn) == RDS_CONN_DOWN && - !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_conn_w, 0); + WARN_ON(conn->c_trans->t_mp_capable); + rds_conn_path_connect_if_down(&conn->c_path[0]); } EXPORT_SYMBOL_GPL(rds_conn_connect_if_down); @@ -586,3 +703,15 @@ __rds_conn_error(struct rds_connection *conn, const char *fmt, ...) rds_conn_drop(conn); } + +void +__rds_conn_path_error(struct rds_conn_path *cp, const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vprintk(fmt, ap); + va_end(ap); + + rds_conn_path_drop(cp); +} diff --git a/net/rds/ib.c b/net/rds/ib.c index b5342fddaf98..7eaf887e46f8 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -40,6 +40,7 @@ #include <linux/slab.h> #include <linux/module.h> +#include "rds_single_path.h" #include "rds.h" #include "ib.h" #include "ib_mr.h" @@ -380,15 +381,15 @@ void rds_ib_exit(void) struct rds_transport rds_ib_transport = { .laddr_check = rds_ib_laddr_check, - .xmit_complete = rds_ib_xmit_complete, + .xmit_path_complete = rds_ib_xmit_path_complete, .xmit = rds_ib_xmit, .xmit_rdma = rds_ib_xmit_rdma, .xmit_atomic = rds_ib_xmit_atomic, - .recv = rds_ib_recv, + .recv_path = rds_ib_recv_path, .conn_alloc = rds_ib_conn_alloc, .conn_free = rds_ib_conn_free, - .conn_connect = rds_ib_conn_connect, - .conn_shutdown = rds_ib_conn_shutdown, + .conn_path_connect = rds_ib_conn_path_connect, + .conn_path_shutdown = rds_ib_conn_path_shutdown, .inc_copy_to_user = rds_ib_inc_copy_to_user, .inc_free = rds_ib_inc_free, .cm_initiate_connect = rds_ib_cm_initiate_connect, diff --git a/net/rds/ib.h b/net/rds/ib.h index 627fb79aee65..046f7508c06b 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -328,8 +328,8 @@ extern struct list_head ib_nodev_conns; /* ib_cm.c */ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp); void rds_ib_conn_free(void *arg); -int rds_ib_conn_connect(struct rds_connection *conn); -void rds_ib_conn_shutdown(struct rds_connection *conn); +int rds_ib_conn_path_connect(struct rds_conn_path *cp); +void rds_ib_conn_path_shutdown(struct rds_conn_path *cp); void rds_ib_state_change(struct sock *sk); int rds_ib_listen_init(void); void rds_ib_listen_stop(void); @@ -354,7 +354,7 @@ void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc); /* ib_recv.c */ int rds_ib_recv_init(void); void rds_ib_recv_exit(void); -int rds_ib_recv(struct rds_connection *conn); +int rds_ib_recv_path(struct rds_conn_path *conn); int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic); void rds_ib_recv_free_caches(struct rds_ib_connection *ic); void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp); @@ -384,7 +384,7 @@ u32 rds_ib_ring_completed(struct rds_ib_work_ring *ring, u32 wr_id, u32 oldest); extern wait_queue_head_t rds_ib_ring_empty_wait; /* ib_send.c */ -void rds_ib_xmit_complete(struct rds_connection *conn); +void rds_ib_xmit_path_complete(struct rds_conn_path *cp); int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 7c2a65a6af5c..5b2ab95afa07 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -36,6 +36,7 @@ #include <linux/vmalloc.h> #include <linux/ratelimit.h> +#include "rds_single_path.h" #include "rds.h" #include "ib.h" @@ -273,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data) if (rds_conn_up(conn) && (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || test_bit(0, &conn->c_map_queued))) - rds_send_xmit(ic->conn); + rds_send_xmit(&ic->conn->c_path[0]); } static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq, @@ -684,8 +685,9 @@ out: return ret; } -int rds_ib_conn_connect(struct rds_connection *conn) +int rds_ib_conn_path_connect(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; struct sockaddr_in src, dest; int ret; @@ -730,8 +732,9 @@ out: * so that it can be called at any point during startup. In fact it * can be called multiple times for a given connection. */ -void rds_ib_conn_shutdown(struct rds_connection *conn) +void rds_ib_conn_path_shutdown(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; int err = 0; diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index f7164ac1ffc1..977f69886c00 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -35,6 +35,7 @@ #include <linux/rculist.h> #include <linux/llist.h> +#include "rds_single_path.h" #include "ib_mr.h" struct workqueue_struct *rds_ib_mr_wq; @@ -618,7 +619,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev, int rds_ib_mr_init(void) { - rds_ib_mr_wq = create_workqueue("rds_mr_flushd"); + rds_ib_mr_wq = alloc_workqueue("rds_mr_flushd", WQ_MEM_RECLAIM, 0); if (!rds_ib_mr_wq) return -ENOMEM; return 0; diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index abc8cc805e8d..606a11f681d2 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -36,6 +36,7 @@ #include <linux/dma-mapping.h> #include <rdma/rdma_cm.h> +#include "rds_single_path.h" #include "rds.h" #include "ib.h" @@ -1008,8 +1009,9 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, rds_ib_recv_refill(conn, 0, GFP_NOWAIT); } -int rds_ib_recv(struct rds_connection *conn) +int rds_ib_recv_path(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; int ret = 0; diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index f27d2c82b036..84d90c97332f 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -36,6 +36,7 @@ #include <linux/dmapool.h> #include <linux/ratelimit.h> +#include "rds_single_path.h" #include "rds.h" #include "ib.h" @@ -979,8 +980,9 @@ out: return ret; } -void rds_ib_xmit_complete(struct rds_connection *conn) +void rds_ib_xmit_path_complete(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; /* We may have a pending ACK or window update we were unable diff --git a/net/rds/loop.c b/net/rds/loop.c index 814173b466d9..f2bf78de5688 100644 --- a/net/rds/loop.c +++ b/net/rds/loop.c @@ -34,6 +34,7 @@ #include <linux/slab.h> #include <linux/in.h> +#include "rds_single_path.h" #include "rds.h" #include "loop.h" @@ -101,7 +102,7 @@ static void rds_loop_inc_free(struct rds_incoming *inc) } /* we need to at least give the thread something to succeed */ -static int rds_loop_recv(struct rds_connection *conn) +static int rds_loop_recv_path(struct rds_conn_path *cp) { return 0; } @@ -149,13 +150,13 @@ static void rds_loop_conn_free(void *arg) kfree(lc); } -static int rds_loop_conn_connect(struct rds_connection *conn) +static int rds_loop_conn_path_connect(struct rds_conn_path *cp) { - rds_connect_complete(conn); + rds_connect_complete(cp->cp_conn); return 0; } -static void rds_loop_conn_shutdown(struct rds_connection *conn) +static void rds_loop_conn_path_shutdown(struct rds_conn_path *cp) { } @@ -184,11 +185,11 @@ void rds_loop_exit(void) */ struct rds_transport rds_loop_transport = { .xmit = rds_loop_xmit, - .recv = rds_loop_recv, + .recv_path = rds_loop_recv_path, .conn_alloc = rds_loop_conn_alloc, .conn_free = rds_loop_conn_free, - .conn_connect = rds_loop_conn_connect, - .conn_shutdown = rds_loop_conn_shutdown, + .conn_path_connect = rds_loop_conn_path_connect, + .conn_path_shutdown = rds_loop_conn_path_shutdown, .inc_copy_to_user = rds_message_inc_copy_to_user, .inc_free = rds_loop_inc_free, .t_name = "loopback", diff --git a/net/rds/message.c b/net/rds/message.c index 756c73729126..6cb91061556a 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -41,6 +41,7 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = { [RDS_EXTHDR_VERSION] = sizeof(struct rds_ext_header_version), [RDS_EXTHDR_RDMA] = sizeof(struct rds_ext_header_rdma), [RDS_EXTHDR_RDMA_DEST] = sizeof(struct rds_ext_header_rdma_dest), +[RDS_EXTHDR_NPATHS] = sizeof(u16), }; diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c index 7220bebcf558..345f09059e9f 100644 --- a/net/rds/rdma_transport.c +++ b/net/rds/rdma_transport.c @@ -33,6 +33,7 @@ #include <linux/module.h> #include <rdma/rdma_cm.h> +#include "rds_single_path.h" #include "rdma_transport.h" #include "ib.h" diff --git a/net/rds/rds.h b/net/rds/rds.h index 387df5f32e49..b2d17f0fafa8 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -84,56 +84,73 @@ enum { #define RDS_IN_XMIT 2 #define RDS_RECV_REFILL 3 +/* Max number of multipaths per RDS connection. Must be a power of 2 */ +#define RDS_MPATH_WORKERS 8 +#define RDS_MPATH_HASH(rs, n) (jhash_1word((rs)->rs_bound_port, \ + (rs)->rs_hash_initval) & ((n) - 1)) + +/* Per mpath connection state */ +struct rds_conn_path { + struct rds_connection *cp_conn; + struct rds_message *cp_xmit_rm; + unsigned long cp_xmit_sg; + unsigned int cp_xmit_hdr_off; + unsigned int cp_xmit_data_off; + unsigned int cp_xmit_atomic_sent; + unsigned int cp_xmit_rdma_sent; + unsigned int cp_xmit_data_sent; + + spinlock_t cp_lock; /* protect msg queues */ + u64 cp_next_tx_seq; + struct list_head cp_send_queue; + struct list_head cp_retrans; + + u64 cp_next_rx_seq; + + void *cp_transport_data; + + atomic_t cp_state; + unsigned long cp_send_gen; + unsigned long cp_flags; + unsigned long cp_reconnect_jiffies; + struct delayed_work cp_send_w; + struct delayed_work cp_recv_w; + struct delayed_work cp_conn_w; + struct work_struct cp_down_w; + struct mutex cp_cm_lock; /* protect cp_state & cm */ + wait_queue_head_t cp_waitq; + + unsigned int cp_unacked_packets; + unsigned int cp_unacked_bytes; + unsigned int cp_outgoing:1, + cp_pad_to_32:31; + unsigned int cp_index; +}; + +/* One rds_connection per RDS address pair */ struct rds_connection { struct hlist_node c_hash_node; __be32 c_laddr; __be32 c_faddr; unsigned int c_loopback:1, - c_outgoing:1, + c_ping_triggered:1, c_pad_to_32:30; + int c_npaths; struct rds_connection *c_passive; + struct rds_transport *c_trans; struct rds_cong_map *c_lcong; struct rds_cong_map *c_fcong; - struct rds_message *c_xmit_rm; - unsigned long c_xmit_sg; - unsigned int c_xmit_hdr_off; - unsigned int c_xmit_data_off; - unsigned int c_xmit_atomic_sent; - unsigned int c_xmit_rdma_sent; - unsigned int c_xmit_data_sent; - - spinlock_t c_lock; /* protect msg queues */ - u64 c_next_tx_seq; - struct list_head c_send_queue; - struct list_head c_retrans; - - u64 c_next_rx_seq; - - struct rds_transport *c_trans; - void *c_transport_data; - - atomic_t c_state; - unsigned long c_send_gen; - unsigned long c_flags; - unsigned long c_reconnect_jiffies; - struct delayed_work c_send_w; - struct delayed_work c_recv_w; - struct delayed_work c_conn_w; - struct work_struct c_down_w; - struct mutex c_cm_lock; /* protect conn state & cm */ - wait_queue_head_t c_waitq; + /* Protocol version */ + unsigned int c_version; + possible_net_t c_net; struct list_head c_map_item; unsigned long c_map_queued; - unsigned int c_unacked_packets; - unsigned int c_unacked_bytes; - - /* Protocol version */ - unsigned int c_version; - possible_net_t c_net; + struct rds_conn_path c_path[RDS_MPATH_WORKERS]; + wait_queue_head_t c_hs_waitq; /* handshake waitq */ }; static inline @@ -153,6 +170,17 @@ void rds_conn_net_set(struct rds_connection *conn, struct net *net) #define RDS_FLAG_RETRANSMITTED 0x04 #define RDS_MAX_ADV_CREDIT 255 +/* RDS_FLAG_PROBE_PORT is the reserved sport used for sending a ping + * probe to exchange control information before establishing a connection. + * Currently the control information that is exchanged is the number of + * supported paths. If the peer is a legacy (older kernel revision) peer, + * it would return a pong message without additional control information + * that would then alert the sender that the peer was an older rev. + */ +#define RDS_FLAG_PROBE_PORT 1 +#define RDS_HS_PROBE(sport, dport) \ + ((sport == RDS_FLAG_PROBE_PORT && dport == 0) || \ + (sport == 0 && dport == RDS_FLAG_PROBE_PORT)) /* * Maximum space available for extension headers. */ @@ -212,12 +240,18 @@ struct rds_ext_header_rdma_dest { __be32 h_rdma_offset; }; +/* Extension header announcing number of paths. + * Implicit length = 2 bytes. + */ +#define RDS_EXTHDR_NPATHS 4 + #define __RDS_EXTHDR_MAX 16 /* for now */ struct rds_incoming { atomic_t i_refcount; struct list_head i_item; struct rds_connection *i_conn; + struct rds_conn_path *i_conn_path; struct rds_header i_hdr; unsigned long i_rx_jiffies; __be32 i_saddr; @@ -433,21 +467,22 @@ struct rds_transport { char t_name[TRANSNAMSIZ]; struct list_head t_item; struct module *t_owner; - unsigned int t_prefer_loopback:1; + unsigned int t_prefer_loopback:1, + t_mp_capable:1; unsigned int t_type; int (*laddr_check)(struct net *net, __be32 addr); int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp); void (*conn_free)(void *data); - int (*conn_connect)(struct rds_connection *conn); - void (*conn_shutdown)(struct rds_connection *conn); - void (*xmit_prepare)(struct rds_connection *conn); - void (*xmit_complete)(struct rds_connection *conn); + int (*conn_path_connect)(struct rds_conn_path *cp); + void (*conn_path_shutdown)(struct rds_conn_path *conn); + void (*xmit_path_prepare)(struct rds_conn_path *cp); + void (*xmit_path_complete)(struct rds_conn_path *cp); int (*xmit)(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op); - int (*recv)(struct rds_connection *conn); + int (*recv_path)(struct rds_conn_path *cp); int (*inc_copy_to_user)(struct rds_incoming *inc, struct iov_iter *to); void (*inc_free)(struct rds_incoming *inc); @@ -530,6 +565,7 @@ struct rds_sock { /* Socket options - in case there will be more */ unsigned char rs_recverr, rs_cong_monitor; + u32 rs_hash_initval; }; static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) @@ -636,10 +672,12 @@ struct rds_connection *rds_conn_create(struct net *net, struct rds_connection *rds_conn_create_outgoing(struct net *net, __be32 laddr, __be32 faddr, struct rds_transport *trans, gfp_t gfp); -void rds_conn_shutdown(struct rds_connection *conn); +void rds_conn_shutdown(struct rds_conn_path *cpath); void rds_conn_destroy(struct rds_connection *conn); void rds_conn_drop(struct rds_connection *conn); +void rds_conn_path_drop(struct rds_conn_path *cpath); void rds_conn_connect_if_down(struct rds_connection *conn); +void rds_conn_path_connect_if_down(struct rds_conn_path *cp); void rds_for_each_conn_info(struct socket *sock, unsigned int len, struct rds_info_iterator *iter, struct rds_info_lengths *lens, @@ -650,28 +688,60 @@ void __rds_conn_error(struct rds_connection *conn, const char *, ...); #define rds_conn_error(conn, fmt...) \ __rds_conn_error(conn, KERN_WARNING "RDS: " fmt) +void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...); +#define rds_conn_path_error(cp, fmt...) \ + __rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt) + +static inline int +rds_conn_path_transition(struct rds_conn_path *cp, int old, int new) +{ + return atomic_cmpxchg(&cp->cp_state, old, new) == old; +} + static inline int rds_conn_transition(struct rds_connection *conn, int old, int new) { - return atomic_cmpxchg(&conn->c_state, old, new) == old; + WARN_ON(conn->c_trans->t_mp_capable); + return rds_conn_path_transition(&conn->c_path[0], old, new); +} + +static inline int +rds_conn_path_state(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state); } static inline int rds_conn_state(struct rds_connection *conn) { - return atomic_read(&conn->c_state); + WARN_ON(conn->c_trans->t_mp_capable); + return rds_conn_path_state(&conn->c_path[0]); +} + +static inline int +rds_conn_path_up(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state) == RDS_CONN_UP; } static inline int rds_conn_up(struct rds_connection *conn) { - return atomic_read(&conn->c_state) == RDS_CONN_UP; + WARN_ON(conn->c_trans->t_mp_capable); + return rds_conn_path_up(&conn->c_path[0]); +} + +static inline int +rds_conn_path_connecting(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING; } static inline int rds_conn_connecting(struct rds_connection *conn) { - return atomic_read(&conn->c_state) == RDS_CONN_CONNECTING; + WARN_ON(conn->c_trans->t_mp_capable); + return rds_conn_path_connecting(&conn->c_path[0]); } /* message.c */ @@ -720,6 +790,8 @@ void rds_page_exit(void); /* recv.c */ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, __be32 saddr); +void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *conn, + __be32 saddr); void rds_inc_put(struct rds_incoming *inc); void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, struct rds_incoming *inc, gfp_t gfp); @@ -733,16 +805,16 @@ void rds_inc_info_copy(struct rds_incoming *inc, /* send.c */ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len); -void rds_send_reset(struct rds_connection *conn); -int rds_send_xmit(struct rds_connection *conn); +void rds_send_path_reset(struct rds_conn_path *conn); +int rds_send_xmit(struct rds_conn_path *cp); struct sockaddr_in; void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest); typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack); void rds_send_drop_acked(struct rds_connection *conn, u64 ack, is_acked_func is_acked); -int rds_send_pong(struct rds_connection *conn, __be16 dport); -struct rds_message *rds_send_get_message(struct rds_connection *, - struct rm_rdma_op *); +void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, + is_acked_func is_acked); +int rds_send_pong(struct rds_conn_path *cp, __be16 dport); /* rdma.c */ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force); @@ -809,12 +881,12 @@ extern unsigned int rds_sysctl_trace_level; int rds_threads_init(void); void rds_threads_exit(void); extern struct workqueue_struct *rds_wq; -void rds_queue_reconnect(struct rds_connection *conn); +void rds_queue_reconnect(struct rds_conn_path *cp); void rds_connect_worker(struct work_struct *); void rds_shutdown_worker(struct work_struct *); void rds_send_worker(struct work_struct *); void rds_recv_worker(struct work_struct *); -void rds_connect_path_complete(struct rds_connection *conn, int curr); +void rds_connect_path_complete(struct rds_conn_path *conn, int curr); void rds_connect_complete(struct rds_connection *conn); /* transport.c */ diff --git a/net/rds/rds_single_path.h b/net/rds/rds_single_path.h new file mode 100644 index 000000000000..e1241af7c1ad --- /dev/null +++ b/net/rds/rds_single_path.h @@ -0,0 +1,30 @@ +#ifndef _RDS_RDS_SINGLE_H +#define _RDS_RDS_SINGLE_H + +#define c_xmit_rm c_path[0].cp_xmit_rm +#define c_xmit_sg c_path[0].cp_xmit_sg +#define c_xmit_hdr_off c_path[0].cp_xmit_hdr_off +#define c_xmit_data_off c_path[0].cp_xmit_data_off +#define c_xmit_atomic_sent c_path[0].cp_xmit_atomic_sent +#define c_xmit_rdma_sent c_path[0].cp_xmit_rdma_sent +#define c_xmit_data_sent c_path[0].cp_xmit_data_sent +#define c_lock c_path[0].cp_lock +#define c_next_tx_seq c_path[0].cp_next_tx_seq +#define c_send_queue c_path[0].cp_send_queue +#define c_retrans c_path[0].cp_retrans +#define c_next_rx_seq c_path[0].cp_next_rx_seq +#define c_transport_data c_path[0].cp_transport_data +#define c_state c_path[0].cp_state +#define c_send_gen c_path[0].cp_send_gen +#define c_flags c_path[0].cp_flags +#define c_reconnect_jiffies c_path[0].cp_reconnect_jiffies +#define c_send_w c_path[0].cp_send_w +#define c_recv_w c_path[0].cp_recv_w +#define c_conn_w c_path[0].cp_conn_w +#define c_down_w c_path[0].cp_down_w +#define c_cm_lock c_path[0].cp_cm_lock +#define c_waitq c_path[0].cp_waitq +#define c_unacked_packets c_path[0].cp_unacked_packets +#define c_unacked_bytes c_path[0].cp_unacked_bytes + +#endif /* _RDS_RDS_SINGLE_H */ diff --git a/net/rds/recv.c b/net/rds/recv.c index 8413f6c99e13..cbfabdf3ff48 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -53,6 +53,20 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, } EXPORT_SYMBOL_GPL(rds_inc_init); +void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp, + __be32 saddr) +{ + atomic_set(&inc->i_refcount, 1); + INIT_LIST_HEAD(&inc->i_item); + inc->i_conn = cp->cp_conn; + inc->i_conn_path = cp; + inc->i_saddr = saddr; + inc->i_rdma_cookie = 0; + inc->i_rx_tstamp.tv_sec = 0; + inc->i_rx_tstamp.tv_usec = 0; +} +EXPORT_SYMBOL_GPL(rds_inc_path_init); + static void rds_inc_addref(struct rds_incoming *inc) { rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount)); @@ -142,6 +156,67 @@ static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock } } +static void rds_recv_hs_exthdrs(struct rds_header *hdr, + struct rds_connection *conn) +{ + unsigned int pos = 0, type, len; + union { + struct rds_ext_header_version version; + u16 rds_npaths; + } buffer; + + while (1) { + len = sizeof(buffer); + type = rds_message_next_extension(hdr, &pos, &buffer, &len); + if (type == RDS_EXTHDR_NONE) + break; + /* Process extension header here */ + switch (type) { + case RDS_EXTHDR_NPATHS: + conn->c_npaths = min_t(int, RDS_MPATH_WORKERS, + buffer.rds_npaths); + break; + default: + pr_warn_ratelimited("ignoring unknown exthdr type " + "0x%x\n", type); + } + } + /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */ + conn->c_npaths = max_t(int, conn->c_npaths, 1); +} + +/* rds_start_mprds() will synchronously start multiple paths when appropriate. + * The scheme is based on the following rules: + * + * 1. rds_sendmsg on first connect attempt sends the probe ping, with the + * sender's npaths (s_npaths) + * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It + * sends back a probe-pong with r_npaths. After that, if rcvr is the + * smaller ip addr, it starts rds_conn_path_connect_if_down on all + * mprds_paths. + * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down. + * If it is the smaller ipaddr, rds_conn_path_connect_if_down can be + * called after reception of the probe-pong on all mprds_paths. + * Otherwise (sender of probe-ping is not the smaller ip addr): just call + * rds_conn_path_connect_if_down on the hashed path. (see rule 4) + * 4. when cp_index > 0, rds_connect_worker must only trigger + * a connection if laddr < faddr. + * 5. sender may end up queuing the packet on the cp. will get sent out later. + * when connection is completed. + */ +static void rds_start_mprds(struct rds_connection *conn) +{ + int i; + struct rds_conn_path *cp; + + if (conn->c_npaths > 1 && conn->c_laddr < conn->c_faddr) { + for (i = 1; i < conn->c_npaths; i++) { + cp = &conn->c_path[i]; + rds_conn_path_connect_if_down(cp); + } + } +} + /* * The transport must make sure that this is serialized against other * rx and conn reset on this specific conn. @@ -164,13 +239,18 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, struct rds_sock *rs = NULL; struct sock *sk; unsigned long flags; + struct rds_conn_path *cp; inc->i_conn = conn; inc->i_rx_jiffies = jiffies; + if (conn->c_trans->t_mp_capable) + cp = inc->i_conn_path; + else + cp = &conn->c_path[0]; rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u " "flags 0x%x rx_jiffies %lu\n", conn, - (unsigned long long)conn->c_next_rx_seq, + (unsigned long long)cp->cp_next_rx_seq, inc, (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence), be32_to_cpu(inc->i_hdr.h_len), @@ -199,16 +279,34 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, * XXX we could spend more on the wire to get more robust failure * detection, arguably worth it to avoid data corruption. */ - if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq && + if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq && (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) { rds_stats_inc(s_recv_drop_old_seq); goto out; } - conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1; + cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1; if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) { + if (inc->i_hdr.h_sport == 0) { + rdsdebug("ignore ping with 0 sport from 0x%x\n", saddr); + goto out; + } rds_stats_inc(s_recv_ping); - rds_send_pong(conn, inc->i_hdr.h_sport); + rds_send_pong(cp, inc->i_hdr.h_sport); + /* if this is a handshake ping, start multipath if necessary */ + if (RDS_HS_PROBE(inc->i_hdr.h_sport, inc->i_hdr.h_dport)) { + rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn); + rds_start_mprds(cp->cp_conn); + } + goto out; + } + + if (inc->i_hdr.h_dport == RDS_FLAG_PROBE_PORT && + inc->i_hdr.h_sport == 0) { + rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn); + /* if this is a handshake pong, start multipath if necessary */ + rds_start_mprds(cp->cp_conn); + wake_up(&cp->cp_conn->c_hs_waitq); goto out; } diff --git a/net/rds/send.c b/net/rds/send.c index b1962f8e30f7..896626b9a0ef 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -62,14 +62,14 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status); * Reset the send state. Callers must ensure that this doesn't race with * rds_send_xmit(). */ -void rds_send_reset(struct rds_connection *conn) +void rds_send_path_reset(struct rds_conn_path *cp) { struct rds_message *rm, *tmp; unsigned long flags; - if (conn->c_xmit_rm) { - rm = conn->c_xmit_rm; - conn->c_xmit_rm = NULL; + if (cp->cp_xmit_rm) { + rm = cp->cp_xmit_rm; + cp->cp_xmit_rm = NULL; /* Tell the user the RDMA op is no longer mapped by the * transport. This isn't entirely true (it's flushed out * independently) but as the connection is down, there's @@ -78,37 +78,37 @@ void rds_send_reset(struct rds_connection *conn) rds_message_put(rm); } - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_data_sent = 0; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_data_sent = 0; - conn->c_map_queued = 0; + cp->cp_conn->c_map_queued = 0; - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; /* Mark messages as retransmissions, and move them to the send q */ - spin_lock_irqsave(&conn->c_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + spin_lock_irqsave(&cp->cp_lock, flags); + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); } - list_splice_init(&conn->c_retrans, &conn->c_send_queue); - spin_unlock_irqrestore(&conn->c_lock, flags); + list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); + spin_unlock_irqrestore(&cp->cp_lock, flags); } -EXPORT_SYMBOL_GPL(rds_send_reset); +EXPORT_SYMBOL_GPL(rds_send_path_reset); -static int acquire_in_xmit(struct rds_connection *conn) +static int acquire_in_xmit(struct rds_conn_path *cp) { - return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; + return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0; } -static void release_in_xmit(struct rds_connection *conn) +static void release_in_xmit(struct rds_conn_path *cp) { - clear_bit(RDS_IN_XMIT, &conn->c_flags); + clear_bit(RDS_IN_XMIT, &cp->cp_flags); smp_mb__after_atomic(); /* * We don't use wait_on_bit()/wake_up_bit() because our waking is in a @@ -116,8 +116,8 @@ static void release_in_xmit(struct rds_connection *conn) * the system-wide hashed waitqueue buckets in the fast path only to * almost never find waiters. */ - if (waitqueue_active(&conn->c_waitq)) - wake_up_all(&conn->c_waitq); + if (waitqueue_active(&cp->cp_waitq)) + wake_up_all(&cp->cp_waitq); } /* @@ -134,8 +134,9 @@ static void release_in_xmit(struct rds_connection *conn) * - small message latency is higher behind queued large messages * - large message latency isn't starved by intervening small sends */ -int rds_send_xmit(struct rds_connection *conn) +int rds_send_xmit(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_message *rm; unsigned long flags; unsigned int tmp; @@ -155,7 +156,7 @@ restart: * avoids blocking the caller and trading per-connection data between * caches per message. */ - if (!acquire_in_xmit(conn)) { + if (!acquire_in_xmit(cp)) { rds_stats_inc(s_send_lock_contention); ret = -ENOMEM; goto out; @@ -169,21 +170,21 @@ restart: * The acquire_in_xmit() check above ensures that only one * caller can increment c_send_gen at any time. */ - conn->c_send_gen++; - send_gen = conn->c_send_gen; + cp->cp_send_gen++; + send_gen = cp->cp_send_gen; /* * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, * we do the opposite to avoid races. */ - if (!rds_conn_up(conn)) { - release_in_xmit(conn); + if (!rds_conn_path_up(cp)) { + release_in_xmit(cp); ret = 0; goto out; } - if (conn->c_trans->xmit_prepare) - conn->c_trans->xmit_prepare(conn); + if (conn->c_trans->xmit_path_prepare) + conn->c_trans->xmit_path_prepare(cp); /* * spin trying to push headers and data down the connection until @@ -191,7 +192,7 @@ restart: */ while (1) { - rm = conn->c_xmit_rm; + rm = cp->cp_xmit_rm; /* * If between sending messages, we can send a pending congestion @@ -204,14 +205,16 @@ restart: break; } rm->data.op_active = 1; + rm->m_inc.i_conn_path = cp; + rm->m_inc.i_conn = cp->cp_conn; - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* * If not already working on one, grab the next message. * - * c_xmit_rm holds a ref while we're sending this message down + * cp_xmit_rm holds a ref while we're sending this message down * the connction. We can use this ref while holding the * send_sem.. rds_send_reset() is serialized with it. */ @@ -228,10 +231,10 @@ restart: if (batch_count >= send_batch_count) goto over_batch; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - if (!list_empty(&conn->c_send_queue)) { - rm = list_entry(conn->c_send_queue.next, + if (!list_empty(&cp->cp_send_queue)) { + rm = list_entry(cp->cp_send_queue.next, struct rds_message, m_conn_item); rds_message_addref(rm); @@ -240,10 +243,11 @@ restart: * Move the message from the send queue to the retransmit * list right away. */ - list_move_tail(&rm->m_conn_item, &conn->c_retrans); + list_move_tail(&rm->m_conn_item, + &cp->cp_retrans); } - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); if (!rm) break; @@ -257,32 +261,34 @@ restart: */ if (rm->rdma.op_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move(&rm->m_conn_item, &to_be_dropped); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); continue; } /* Require an ACK every once in a while */ len = ntohl(rm->m_inc.i_hdr.h_len); - if (conn->c_unacked_packets == 0 || - conn->c_unacked_bytes < len) { + if (cp->cp_unacked_packets == 0 || + cp->cp_unacked_bytes < len) { __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = + rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = + rds_sysctl_max_unacked_bytes; rds_stats_inc(s_send_ack_required); } else { - conn->c_unacked_bytes -= len; - conn->c_unacked_packets--; + cp->cp_unacked_bytes -= len; + cp->cp_unacked_packets--; } - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* The transport either sends the whole rdma or none of it */ - if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { rm->m_final_op = &rm->rdma; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue @@ -294,11 +300,11 @@ restart: wake_up_interruptible(&rm->m_flush_wait); break; } - conn->c_xmit_rdma_sent = 1; + cp->cp_xmit_rdma_sent = 1; } - if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { rm->m_final_op = &rm->atomic; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue @@ -310,7 +316,7 @@ restart: wake_up_interruptible(&rm->m_flush_wait); break; } - conn->c_xmit_atomic_sent = 1; + cp->cp_xmit_atomic_sent = 1; } @@ -336,41 +342,42 @@ restart: rm->data.op_active = 0; } - if (rm->data.op_active && !conn->c_xmit_data_sent) { + if (rm->data.op_active && !cp->cp_xmit_data_sent) { rm->m_final_op = &rm->data; + ret = conn->c_trans->xmit(conn, rm, - conn->c_xmit_hdr_off, - conn->c_xmit_sg, - conn->c_xmit_data_off); + cp->cp_xmit_hdr_off, + cp->cp_xmit_sg, + cp->cp_xmit_data_off); if (ret <= 0) break; - if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { + if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { tmp = min_t(int, ret, sizeof(struct rds_header) - - conn->c_xmit_hdr_off); - conn->c_xmit_hdr_off += tmp; + cp->cp_xmit_hdr_off); + cp->cp_xmit_hdr_off += tmp; ret -= tmp; } - sg = &rm->data.op_sg[conn->c_xmit_sg]; + sg = &rm->data.op_sg[cp->cp_xmit_sg]; while (ret) { tmp = min_t(int, ret, sg->length - - conn->c_xmit_data_off); - conn->c_xmit_data_off += tmp; + cp->cp_xmit_data_off); + cp->cp_xmit_data_off += tmp; ret -= tmp; - if (conn->c_xmit_data_off == sg->length) { - conn->c_xmit_data_off = 0; + if (cp->cp_xmit_data_off == sg->length) { + cp->cp_xmit_data_off = 0; sg++; - conn->c_xmit_sg++; - BUG_ON(ret != 0 && - conn->c_xmit_sg == rm->data.op_nents); + cp->cp_xmit_sg++; + BUG_ON(ret != 0 && cp->cp_xmit_sg == + rm->data.op_nents); } } - if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && - (conn->c_xmit_sg == rm->data.op_nents)) - conn->c_xmit_data_sent = 1; + if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && + (cp->cp_xmit_sg == rm->data.op_nents)) + cp->cp_xmit_data_sent = 1; } /* @@ -378,23 +385,23 @@ restart: * if there is a data op. Thus, if the data is sent (or there was * none), then we're done with the rm. */ - if (!rm->data.op_active || conn->c_xmit_data_sent) { - conn->c_xmit_rm = NULL; - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_data_sent = 0; + if (!rm->data.op_active || cp->cp_xmit_data_sent) { + cp->cp_xmit_rm = NULL; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_data_sent = 0; rds_message_put(rm); } } over_batch: - if (conn->c_trans->xmit_complete) - conn->c_trans->xmit_complete(conn); - release_in_xmit(conn); + if (conn->c_trans->xmit_path_complete) + conn->c_trans->xmit_path_complete(cp); + release_in_xmit(cp); /* Nuke any messages we decided not to retransmit. */ if (!list_empty(&to_be_dropped)) { @@ -422,12 +429,12 @@ over_batch: if (ret == 0) { smp_mb(); if ((test_bit(0, &conn->c_map_queued) || - !list_empty(&conn->c_send_queue)) && - send_gen == conn->c_send_gen) { + !list_empty(&cp->cp_send_queue)) && + send_gen == cp->cp_send_gen) { rds_stats_inc(s_send_lock_queue_raced); if (batch_count < send_batch_count) goto restart; - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); } } out: @@ -560,42 +567,6 @@ __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) } /* - * This is called from the IB send completion when we detect - * a RDMA operation that failed with remote access error. - * So speed is not an issue here. - */ -struct rds_message *rds_send_get_message(struct rds_connection *conn, - struct rm_rdma_op *op) -{ - struct rds_message *rm, *tmp, *found = NULL; - unsigned long flags; - - spin_lock_irqsave(&conn->c_lock, flags); - - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - goto out; - } - } - - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - break; - } - } - -out: - spin_unlock_irqrestore(&conn->c_lock, flags); - - return found; -} -EXPORT_SYMBOL_GPL(rds_send_get_message); - -/* * This removes messages from the socket's list if they're on it. The list * argument must be private to the caller, we must be able to modify it * without locks. The messages must have a reference held for their @@ -685,16 +656,16 @@ unlock_and_drop: * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked * checks the RDS_MSG_HAS_ACK_SEQ bit. */ -void rds_send_drop_acked(struct rds_connection *conn, u64 ack, - is_acked_func is_acked) +void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, + is_acked_func is_acked) { struct rds_message *rm, *tmp; unsigned long flags; LIST_HEAD(list); - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { if (!rds_send_is_acked(rm, ack, is_acked)) break; @@ -706,17 +677,26 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack, if (!list_empty(&list)) smp_mb__after_atomic(); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* now remove the messages from the sock list as needed */ rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); } +EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); + +void rds_send_drop_acked(struct rds_connection *conn, u64 ack, + is_acked_func is_acked) +{ + WARN_ON(conn->c_trans->t_mp_capable); + rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); +} EXPORT_SYMBOL_GPL(rds_send_drop_acked); void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) { struct rds_message *rm, *tmp; struct rds_connection *conn; + struct rds_conn_path *cp; unsigned long flags; LIST_HEAD(list); @@ -745,22 +725,26 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) list_for_each_entry(rm, &list, m_sock_item) { conn = rm->m_inc.i_conn; + if (conn->c_trans->t_mp_capable) + cp = rm->m_inc.i_conn_path; + else + cp = &conn->c_path[0]; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); /* * Maybe someone else beat us to removing rm from the conn. * If we race with their flag update we'll get the lock and * then really see that the flag has been cleared. */ if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); spin_lock_irqsave(&rm->m_rs_lock, flags); rm->m_rs = NULL; spin_unlock_irqrestore(&rm->m_rs_lock, flags); continue; } list_del_init(&rm->m_conn_item); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* * Couldn't grab m_rs_lock in top loop (lock ordering), @@ -809,6 +793,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) * message from the flow with RDS_CANCEL_SENT_TO. */ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, + struct rds_conn_path *cp, struct rds_message *rm, __be16 sport, __be16 dport, int *queued) { @@ -852,13 +837,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, trying to minimize the time we hold c_lock */ rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); rm->m_inc.i_conn = conn; + rm->m_inc.i_conn_path = cp; rds_message_addref(rm); - spin_lock(&conn->c_lock); - rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock(&cp->cp_lock); + rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); - spin_unlock(&conn->c_lock); + spin_unlock(&cp->cp_lock); rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", rm, len, rs, rs->rs_snd_bytes, @@ -977,6 +963,29 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, return ret; } +static void rds_send_ping(struct rds_connection *conn); + +static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn) +{ + int hash; + + if (conn->c_npaths == 0) + hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS); + else + hash = RDS_MPATH_HASH(rs, conn->c_npaths); + if (conn->c_npaths == 0 && hash != 0) { + rds_send_ping(conn); + + if (conn->c_npaths == 0) { + wait_event_interruptible(conn->c_hs_waitq, + (conn->c_npaths != 0)); + } + if (conn->c_npaths == 1) + hash = 0; + } + return hash; +} + int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) { struct sock *sk = sock->sk; @@ -990,6 +999,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) int queued = 0, allocated_mr = 0; int nonblock = msg->msg_flags & MSG_DONTWAIT; long timeo = sock_sndtimeo(sk, nonblock); + struct rds_conn_path *cpath; /* Mirror Linux UDP mirror of BSD error message compatibility */ /* XXX: Perhaps MSG_MORE someday */ @@ -1088,15 +1098,19 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - rds_conn_connect_if_down(conn); + if (conn->c_trans->t_mp_capable) + cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)]; + else + cpath = &conn->c_path[0]; + + rds_conn_path_connect_if_down(cpath); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); if (ret) { rs->rs_seen_congestion = 1; goto out; } - - while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, + while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full); @@ -1106,7 +1120,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) } timeo = wait_event_interruptible_timeout(*sk_sleep(sk), - rds_send_queue_rm(rs, conn, rm, + rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued), @@ -1127,9 +1141,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) */ rds_stats_inc(s_send_queued); - ret = rds_send_xmit(conn); + ret = rds_send_xmit(cpath); if (ret == -ENOMEM || ret == -EAGAIN) - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); rds_message_put(rm); return payload_len; @@ -1147,10 +1161,16 @@ out: } /* - * Reply to a ping packet. + * send out a probe. Can be shared by rds_send_ping, + * rds_send_pong, rds_send_hb. + * rds_send_hb should use h_flags + * RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED + * or + * RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED */ int -rds_send_pong(struct rds_connection *conn, __be16 dport) +rds_send_probe(struct rds_conn_path *cp, __be16 sport, + __be16 dport, u8 h_flags) { struct rds_message *rm; unsigned long flags; @@ -1162,31 +1182,41 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) goto out; } - rm->m_daddr = conn->c_faddr; + rm->m_daddr = cp->cp_conn->c_faddr; rm->data.op_active = 1; - rds_conn_connect_if_down(conn); + rds_conn_path_connect_if_down(cp); - ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); + ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); if (ret) goto out; - spin_lock_irqsave(&conn->c_lock, flags); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock_irqsave(&cp->cp_lock, flags); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); rds_message_addref(rm); - rm->m_inc.i_conn = conn; + rm->m_inc.i_conn = cp->cp_conn; + rm->m_inc.i_conn_path = cp; + + rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, + cp->cp_next_tx_seq); + rm->m_inc.i_hdr.h_flags |= h_flags; + cp->cp_next_tx_seq++; + + if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) { + u16 npaths = RDS_MPATH_WORKERS; - rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport, - conn->c_next_tx_seq); - conn->c_next_tx_seq++; - spin_unlock_irqrestore(&conn->c_lock, flags); + rds_message_add_extension(&rm->m_inc.i_hdr, + RDS_EXTHDR_NPATHS, &npaths, + sizeof(npaths)); + } + spin_unlock_irqrestore(&cp->cp_lock, flags); rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); /* schedule the send work on rds_wq */ - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); rds_message_put(rm); return 0; @@ -1196,3 +1226,25 @@ out: rds_message_put(rm); return ret; } + +int +rds_send_pong(struct rds_conn_path *cp, __be16 dport) +{ + return rds_send_probe(cp, 0, dport, 0); +} + +void +rds_send_ping(struct rds_connection *conn) +{ + unsigned long flags; + struct rds_conn_path *cp = &conn->c_path[0]; + + spin_lock_irqsave(&cp->cp_lock, flags); + if (conn->c_ping_triggered) { + spin_unlock_irqrestore(&cp->cp_lock, flags); + return; + } + conn->c_ping_triggered = 1; + spin_unlock_irqrestore(&cp->cp_lock, flags); + rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0); +} diff --git a/net/rds/tcp.c b/net/rds/tcp.c index c8a7b4c90190..fcddacc92e01 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -56,8 +56,8 @@ static int rds_tcp_skbuf_handler(struct ctl_table *ctl, int write, void __user *buffer, size_t *lenp, loff_t *fpos); -int rds_tcp_min_sndbuf = SOCK_MIN_SNDBUF; -int rds_tcp_min_rcvbuf = SOCK_MIN_RCVBUF; +static int rds_tcp_min_sndbuf = SOCK_MIN_SNDBUF; +static int rds_tcp_min_rcvbuf = SOCK_MIN_RCVBUF; static struct ctl_table rds_tcp_sysctl_table[] = { #define RDS_TCP_SNDBUF 0 @@ -135,9 +135,9 @@ void rds_tcp_restore_callbacks(struct socket *sock, * from being called while it isn't set. */ void rds_tcp_reset_callbacks(struct socket *sock, - struct rds_connection *conn) + struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *osock = tc->t_sock; if (!osock) @@ -147,8 +147,8 @@ void rds_tcp_reset_callbacks(struct socket *sock, * We have an outstanding SYN to this peer, which may * potentially have transitioned to the RDS_CONN_UP state, * so we must quiesce any send threads before resetting - * c_transport_data. We quiesce these threads by setting - * c_state to something other than RDS_CONN_UP, and then + * cp_transport_data. We quiesce these threads by setting + * cp_state to something other than RDS_CONN_UP, and then * waiting for any existing threads in rds_send_xmit to * complete release_in_xmit(). (Subsequent threads entering * rds_send_xmit() will bail on !rds_conn_up(). @@ -163,38 +163,25 @@ void rds_tcp_reset_callbacks(struct socket *sock, * RDS_CONN_RESETTTING, to ensure that rds_tcp_state_change * cannot mark rds_conn_path_up() in the window before lock_sock() */ - atomic_set(&conn->c_state, RDS_CONN_RESETTING); - wait_event(conn->c_waitq, !test_bit(RDS_IN_XMIT, &conn->c_flags)); + atomic_set(&cp->cp_state, RDS_CONN_RESETTING); + wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags)); lock_sock(osock->sk); /* reset receive side state for rds_tcp_data_recv() for osock */ + cancel_delayed_work_sync(&cp->cp_send_w); + cancel_delayed_work_sync(&cp->cp_recv_w); if (tc->t_tinc) { rds_inc_put(&tc->t_tinc->ti_inc); tc->t_tinc = NULL; } tc->t_tinc_hdr_rem = sizeof(struct rds_header); tc->t_tinc_data_rem = 0; - tc->t_sock = NULL; - - write_lock_bh(&osock->sk->sk_callback_lock); - - osock->sk->sk_user_data = NULL; - osock->sk->sk_data_ready = tc->t_orig_data_ready; - osock->sk->sk_write_space = tc->t_orig_write_space; - osock->sk->sk_state_change = tc->t_orig_state_change; - write_unlock_bh(&osock->sk->sk_callback_lock); + rds_tcp_restore_callbacks(osock, tc); release_sock(osock->sk); sock_release(osock); newsock: - rds_send_reset(conn); + rds_send_path_reset(cp); lock_sock(sock->sk); - write_lock_bh(&sock->sk->sk_callback_lock); - tc->t_sock = sock; - sock->sk->sk_user_data = conn; - sock->sk->sk_data_ready = rds_tcp_data_ready; - sock->sk->sk_write_space = rds_tcp_write_space; - sock->sk->sk_state_change = rds_tcp_state_change; - - write_unlock_bh(&sock->sk->sk_callback_lock); + rds_tcp_set_callbacks(sock, cp); release_sock(sock->sk); } @@ -202,9 +189,9 @@ newsock: * above rds_tcp_reset_callbacks for notes about synchronization * with data path */ -void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn) +void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rdsdebug("setting sock %p callbacks to tc %p\n", sock, tc); write_lock_bh(&sock->sk->sk_callback_lock); @@ -220,12 +207,12 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn) sock->sk->sk_data_ready = sock->sk->sk_user_data; tc->t_sock = sock; - tc->conn = conn; + tc->t_cpath = cp; tc->t_orig_data_ready = sock->sk->sk_data_ready; tc->t_orig_write_space = sock->sk->sk_write_space; tc->t_orig_state_change = sock->sk->sk_state_change; - sock->sk->sk_user_data = conn; + sock->sk->sk_user_data = cp; sock->sk->sk_data_ready = rds_tcp_data_ready; sock->sk->sk_write_space = rds_tcp_write_space; sock->sk->sk_state_change = rds_tcp_state_change; @@ -283,24 +270,29 @@ static int rds_tcp_laddr_check(struct net *net, __be32 addr) static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp) { struct rds_tcp_connection *tc; + int i; - tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp); - if (!tc) - return -ENOMEM; + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp); + if (!tc) + return -ENOMEM; - mutex_init(&tc->t_conn_lock); - tc->t_sock = NULL; - tc->t_tinc = NULL; - tc->t_tinc_hdr_rem = sizeof(struct rds_header); - tc->t_tinc_data_rem = 0; + mutex_init(&tc->t_conn_path_lock); + tc->t_sock = NULL; + tc->t_tinc = NULL; + tc->t_tinc_hdr_rem = sizeof(struct rds_header); + tc->t_tinc_data_rem = 0; - conn->c_transport_data = tc; + conn->c_path[i].cp_transport_data = tc; + tc->t_cpath = &conn->c_path[i]; - spin_lock_irq(&rds_tcp_conn_lock); - list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list); - spin_unlock_irq(&rds_tcp_conn_lock); + spin_lock_irq(&rds_tcp_conn_lock); + list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list); + spin_unlock_irq(&rds_tcp_conn_lock); + rdsdebug("rds_conn_path [%d] tc %p\n", i, + conn->c_path[i].cp_transport_data); + } - rdsdebug("alloced tc %p\n", conn->c_transport_data); return 0; } @@ -317,6 +309,17 @@ static void rds_tcp_conn_free(void *arg) kmem_cache_free(rds_tcp_conn_slab, tc); } +static bool list_has_conn(struct list_head *list, struct rds_connection *conn) +{ + struct rds_tcp_connection *tc, *_tc; + + list_for_each_entry_safe(tc, _tc, list, t_tcp_node) { + if (tc->t_cpath->cp_conn == conn) + return true; + } + return false; +} + static void rds_tcp_destroy_conns(void) { struct rds_tcp_connection *tc, *_tc; @@ -324,29 +327,28 @@ static void rds_tcp_destroy_conns(void) /* avoid calling conn_destroy with irqs off */ spin_lock_irq(&rds_tcp_conn_lock); - list_splice(&rds_tcp_conn_list, &tmp_list); - INIT_LIST_HEAD(&rds_tcp_conn_list); + list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { + if (!list_has_conn(&tmp_list, tc->t_cpath->cp_conn)) + list_move_tail(&tc->t_tcp_node, &tmp_list); + } spin_unlock_irq(&rds_tcp_conn_lock); - list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) { - if (tc->conn->c_passive) - rds_conn_destroy(tc->conn->c_passive); - rds_conn_destroy(tc->conn); - } + list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) + rds_conn_destroy(tc->t_cpath->cp_conn); } static void rds_tcp_exit(void); struct rds_transport rds_tcp_transport = { .laddr_check = rds_tcp_laddr_check, - .xmit_prepare = rds_tcp_xmit_prepare, - .xmit_complete = rds_tcp_xmit_complete, + .xmit_path_prepare = rds_tcp_xmit_path_prepare, + .xmit_path_complete = rds_tcp_xmit_path_complete, .xmit = rds_tcp_xmit, - .recv = rds_tcp_recv, + .recv_path = rds_tcp_recv_path, .conn_alloc = rds_tcp_conn_alloc, .conn_free = rds_tcp_conn_free, - .conn_connect = rds_tcp_conn_connect, - .conn_shutdown = rds_tcp_conn_shutdown, + .conn_path_connect = rds_tcp_conn_path_connect, + .conn_path_shutdown = rds_tcp_conn_path_shutdown, .inc_copy_to_user = rds_tcp_inc_copy_to_user, .inc_free = rds_tcp_inc_free, .stats_info_copy = rds_tcp_stats_info_copy, @@ -355,6 +357,7 @@ struct rds_transport rds_tcp_transport = { .t_name = "tcp", .t_type = RDS_TRANS_TCP, .t_prefer_loopback = 1, + .t_mp_capable = 1, }; static int rds_tcp_netid; @@ -488,10 +491,30 @@ static struct pernet_operations rds_tcp_net_ops = { .size = sizeof(struct rds_tcp_net), }; +/* explicitly send a RST on each socket, thereby releasing any socket refcnts + * that may otherwise hold up netns deletion. + */ +static void rds_tcp_conn_paths_destroy(struct rds_connection *conn) +{ + struct rds_conn_path *cp; + struct rds_tcp_connection *tc; + int i; + struct sock *sk; + + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + tc = cp->cp_transport_data; + if (!tc->t_sock) + continue; + sk = tc->t_sock->sk; + sk->sk_prot->disconnect(sk, 0); + tcp_done(sk); + } +} + static void rds_tcp_kill_sock(struct net *net) { struct rds_tcp_connection *tc, *_tc; - struct sock *sk; LIST_HEAD(tmp_list); struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); @@ -500,23 +523,27 @@ static void rds_tcp_kill_sock(struct net *net) flush_work(&rtn->rds_tcp_accept_w); spin_lock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { - struct net *c_net = read_pnet(&tc->conn->c_net); + struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net); if (net != c_net || !tc->t_sock) continue; - list_move_tail(&tc->t_tcp_node, &tmp_list); + if (!list_has_conn(&tmp_list, tc->t_cpath->cp_conn)) + list_move_tail(&tc->t_tcp_node, &tmp_list); } spin_unlock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) { - sk = tc->t_sock->sk; - sk->sk_prot->disconnect(sk, 0); - tcp_done(sk); - if (tc->conn->c_passive) - rds_conn_destroy(tc->conn->c_passive); - rds_conn_destroy(tc->conn); + rds_tcp_conn_paths_destroy(tc->t_cpath->cp_conn); + rds_conn_destroy(tc->t_cpath->cp_conn); } } +void *rds_tcp_listen_sock_def_readable(struct net *net) +{ + struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); + + return rtn->rds_tcp_listen_sock->sk->sk_user_data; +} + static int rds_tcp_dev_event(struct notifier_block *this, unsigned long event, void *ptr) { @@ -551,12 +578,13 @@ static void rds_tcp_sysctl_reset(struct net *net) spin_lock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { - struct net *c_net = read_pnet(&tc->conn->c_net); + struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net); if (net != c_net || !tc->t_sock) continue; - rds_conn_drop(tc->conn); /* reconnect with new parameters */ + /* reconnect with new parameters */ + rds_conn_path_drop(tc->t_cpath); } spin_unlock_irq(&rds_tcp_conn_lock); } diff --git a/net/rds/tcp.h b/net/rds/tcp.h index 7940babf6c71..9a1cc8906576 100644 --- a/net/rds/tcp.h +++ b/net/rds/tcp.h @@ -11,11 +11,11 @@ struct rds_tcp_incoming { struct rds_tcp_connection { struct list_head t_tcp_node; - struct rds_connection *conn; - /* t_conn_lock synchronizes the connection establishment between - * rds_tcp_accept_one and rds_tcp_conn_connect + struct rds_conn_path *t_cpath; + /* t_conn_path_lock synchronizes the connection establishment between + * rds_tcp_accept_one and rds_tcp_conn_path_connect */ - struct mutex t_conn_lock; + struct mutex t_conn_path_lock; struct socket *t_sock; void *t_orig_write_space; void *t_orig_data_ready; @@ -49,8 +49,8 @@ struct rds_tcp_statistics { /* tcp.c */ void rds_tcp_tune(struct socket *sock); void rds_tcp_nonagle(struct socket *sock); -void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn); -void rds_tcp_reset_callbacks(struct socket *sock, struct rds_connection *conn); +void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp); +void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp); void rds_tcp_restore_callbacks(struct socket *sock, struct rds_tcp_connection *tc); u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc); @@ -60,8 +60,8 @@ extern struct rds_transport rds_tcp_transport; void rds_tcp_accept_work(struct sock *sk); /* tcp_connect.c */ -int rds_tcp_conn_connect(struct rds_connection *conn); -void rds_tcp_conn_shutdown(struct rds_connection *conn); +int rds_tcp_conn_path_connect(struct rds_conn_path *cp); +void rds_tcp_conn_path_shutdown(struct rds_conn_path *conn); void rds_tcp_state_change(struct sock *sk); /* tcp_listen.c */ @@ -70,18 +70,19 @@ void rds_tcp_listen_stop(struct socket *); void rds_tcp_listen_data_ready(struct sock *sk); int rds_tcp_accept_one(struct socket *sock); int rds_tcp_keepalive(struct socket *sock); +void *rds_tcp_listen_sock_def_readable(struct net *net); /* tcp_recv.c */ int rds_tcp_recv_init(void); void rds_tcp_recv_exit(void); void rds_tcp_data_ready(struct sock *sk); -int rds_tcp_recv(struct rds_connection *conn); +int rds_tcp_recv_path(struct rds_conn_path *cp); void rds_tcp_inc_free(struct rds_incoming *inc); int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); /* tcp_send.c */ -void rds_tcp_xmit_prepare(struct rds_connection *conn); -void rds_tcp_xmit_complete(struct rds_connection *conn); +void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp); +void rds_tcp_xmit_path_complete(struct rds_conn_path *cp); int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); void rds_tcp_write_space(struct sock *sk); diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c index f6e95d60db54..05f61c533ed3 100644 --- a/net/rds/tcp_connect.c +++ b/net/rds/tcp_connect.c @@ -40,16 +40,16 @@ void rds_tcp_state_change(struct sock *sk) { void (*state_change)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; read_lock_bh(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { + cp = sk->sk_user_data; + if (!cp) { state_change = sk->sk_state_change; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; state_change = tc->t_orig_state_change; rdsdebug("sock %p state_change to %d\n", tc->t_sock, sk->sk_state); @@ -60,11 +60,11 @@ void rds_tcp_state_change(struct sock *sk) case TCP_SYN_RECV: break; case TCP_ESTABLISHED: - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); + rds_connect_path_complete(cp, RDS_CONN_CONNECTING); break; case TCP_CLOSE_WAIT: case TCP_CLOSE: - rds_conn_drop(conn); + rds_conn_path_drop(cp); default: break; } @@ -73,17 +73,24 @@ out: state_change(sk); } -int rds_tcp_conn_connect(struct rds_connection *conn) +int rds_tcp_conn_path_connect(struct rds_conn_path *cp) { struct socket *sock = NULL; struct sockaddr_in src, dest; int ret; - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_connection *conn = cp->cp_conn; + struct rds_tcp_connection *tc = cp->cp_transport_data; - mutex_lock(&tc->t_conn_lock); + /* for multipath rds,we only trigger the connection after + * the handshake probe has determined the number of paths. + */ + if (cp->cp_index > 0 && cp->cp_conn->c_npaths < 2) + return -EAGAIN; + + mutex_lock(&tc->t_conn_path_lock); - if (rds_conn_up(conn)) { - mutex_unlock(&tc->t_conn_lock); + if (rds_conn_path_up(cp)) { + mutex_unlock(&tc->t_conn_path_lock); return 0; } ret = sock_create_kern(rds_conn_net(conn), PF_INET, @@ -112,10 +119,11 @@ int rds_tcp_conn_connect(struct rds_connection *conn) * once we call connect() we can start getting callbacks and they * own the socket */ - rds_tcp_set_callbacks(sock, conn); + rds_tcp_set_callbacks(sock, cp); ret = sock->ops->connect(sock, (struct sockaddr *)&dest, sizeof(dest), O_NONBLOCK); + cp->cp_outgoing = 1; rdsdebug("connect to address %pI4 returned %d\n", &conn->c_faddr, ret); if (ret == -EINPROGRESS) ret = 0; @@ -123,11 +131,11 @@ int rds_tcp_conn_connect(struct rds_connection *conn) rds_tcp_keepalive(sock); sock = NULL; } else { - rds_tcp_restore_callbacks(sock, conn->c_transport_data); + rds_tcp_restore_callbacks(sock, cp->cp_transport_data); } out: - mutex_unlock(&tc->t_conn_lock); + mutex_unlock(&tc->t_conn_path_lock); if (sock) sock_release(sock); return ret; @@ -142,12 +150,13 @@ out: * callbacks to those set by TCP. Our callbacks won't execute again once we * hold the sock lock. */ -void rds_tcp_conn_shutdown(struct rds_connection *conn) +void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; - rdsdebug("shutting down conn %p tc %p sock %p\n", conn, tc, sock); + rdsdebug("shutting down conn %p tc %p sock %p\n", + cp->cp_conn, tc, sock); if (sock) { sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN); diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index 245542ca4718..e0b23fb5b8d5 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -70,6 +70,52 @@ bail: return ret; } +/* rds_tcp_accept_one_path(): if accepting on cp_index > 0, make sure the + * client's ipaddr < server's ipaddr. Otherwise, close the accepted + * socket and force a reconneect from smaller -> larger ip addr. The reason + * we special case cp_index 0 is to allow the rds probe ping itself to itself + * get through efficiently. + * Since reconnects are only initiated from the node with the numerically + * smaller ip address, we recycle conns in RDS_CONN_ERROR on the passive side + * by moving them to CONNECTING in this function. + */ +struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn) +{ + int i; + bool peer_is_smaller = (conn->c_faddr < conn->c_laddr); + int npaths = conn->c_npaths; + + if (npaths <= 1) { + struct rds_conn_path *cp = &conn->c_path[0]; + int ret; + + ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, + RDS_CONN_CONNECTING); + if (!ret) + rds_conn_path_transition(cp, RDS_CONN_ERROR, + RDS_CONN_CONNECTING); + return cp->cp_transport_data; + } + + /* for mprds, paths with cp_index > 0 MUST be initiated by the peer + * with the smaller address. + */ + if (!peer_is_smaller) + return NULL; + + for (i = 1; i < npaths; i++) { + struct rds_conn_path *cp = &conn->c_path[i]; + + if (rds_conn_path_transition(cp, RDS_CONN_DOWN, + RDS_CONN_CONNECTING) || + rds_conn_path_transition(cp, RDS_CONN_ERROR, + RDS_CONN_CONNECTING)) { + return cp->cp_transport_data; + } + } + return NULL; +} + int rds_tcp_accept_one(struct socket *sock) { struct socket *new_sock = NULL; @@ -78,6 +124,7 @@ int rds_tcp_accept_one(struct socket *sock) struct inet_sock *inet; struct rds_tcp_connection *rs_tcp = NULL; int conn_state; + struct rds_conn_path *cp; if (!sock) /* module unload or netns delete in progress */ return -ENETUNREACH; @@ -118,11 +165,14 @@ int rds_tcp_accept_one(struct socket *sock) * If the client reboots, this conn will need to be cleaned up. * rds_tcp_state_change() will do that cleanup */ - rs_tcp = (struct rds_tcp_connection *)conn->c_transport_data; - rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING); - mutex_lock(&rs_tcp->t_conn_lock); - conn_state = rds_conn_state(conn); - if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_UP) + rs_tcp = rds_tcp_accept_one_path(conn); + if (!rs_tcp) + goto rst_nsk; + mutex_lock(&rs_tcp->t_conn_path_lock); + cp = rs_tcp->t_cpath; + conn_state = rds_conn_path_state(cp); + if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_UP && + conn_state != RDS_CONN_ERROR) goto rst_nsk; if (rs_tcp->t_sock) { /* Need to resolve a duelling SYN between peers. @@ -132,17 +182,17 @@ int rds_tcp_accept_one(struct socket *sock) * c_transport_data. */ if (ntohl(inet->inet_saddr) < ntohl(inet->inet_daddr) || - !conn->c_outgoing) { + !cp->cp_outgoing) { goto rst_nsk; } else { - rds_tcp_reset_callbacks(new_sock, conn); - conn->c_outgoing = 0; + rds_tcp_reset_callbacks(new_sock, cp); + cp->cp_outgoing = 0; /* rds_connect_path_complete() marks RDS_CONN_UP */ - rds_connect_path_complete(conn, RDS_CONN_RESETTING); + rds_connect_path_complete(cp, RDS_CONN_RESETTING); } } else { - rds_tcp_set_callbacks(new_sock, conn); - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); + rds_tcp_set_callbacks(new_sock, cp); + rds_connect_path_complete(cp, RDS_CONN_CONNECTING); } new_sock = NULL; ret = 0; @@ -153,7 +203,7 @@ rst_nsk: ret = 0; out: if (rs_tcp) - mutex_unlock(&rs_tcp->t_conn_lock); + mutex_unlock(&rs_tcp->t_conn_path_lock); if (new_sock) sock_release(new_sock); return ret; @@ -180,6 +230,8 @@ void rds_tcp_listen_data_ready(struct sock *sk) */ if (sk->sk_state == TCP_LISTEN) rds_tcp_accept_work(sk); + else + ready = rds_tcp_listen_sock_def_readable(sock_net(sk)); out: read_unlock_bh(&sk->sk_callback_lock); diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index 6e6a7111a034..ad4892e97f91 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -147,7 +147,7 @@ static void rds_tcp_cong_recv(struct rds_connection *conn, } struct rds_tcp_desc_arg { - struct rds_connection *conn; + struct rds_conn_path *conn_path; gfp_t gfp; }; @@ -155,8 +155,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, unsigned int offset, size_t len) { struct rds_tcp_desc_arg *arg = desc->arg.data; - struct rds_connection *conn = arg->conn; - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_conn_path *cp = arg->conn_path; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct rds_tcp_incoming *tinc = tc->t_tinc; struct sk_buff *clone; size_t left = len, to_copy; @@ -178,7 +178,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, } tc->t_tinc = tinc; rdsdebug("alloced tinc %p\n", tinc); - rds_inc_init(&tinc->ti_inc, conn, conn->c_faddr); + rds_inc_path_init(&tinc->ti_inc, cp, + cp->cp_conn->c_faddr); /* * XXX * we might be able to use the __ variants when * we've already serialized at a higher level. @@ -228,6 +229,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, } if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) { + struct rds_connection *conn = cp->cp_conn; + if (tinc->ti_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP) rds_tcp_cong_recv(conn, tinc); else @@ -250,15 +253,15 @@ out: } /* the caller has to hold the sock lock */ -static int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp) +static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; read_descriptor_t desc; struct rds_tcp_desc_arg arg; /* It's like glib in the kernel! */ - arg.conn = conn; + arg.conn_path = cp; arg.gfp = gfp; desc.arg.data = &arg; desc.error = 0; @@ -278,16 +281,17 @@ static int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp) * if we fail to allocate we're in trouble.. blindly wait some time before * trying again to see if the VM can free up something for us. */ -int rds_tcp_recv(struct rds_connection *conn) +int rds_tcp_recv_path(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; int ret = 0; - rdsdebug("recv worker conn %p tc %p sock %p\n", conn, tc, sock); + rdsdebug("recv worker path [%d] tc %p sock %p\n", + cp->cp_index, tc, sock); lock_sock(sock->sk); - ret = rds_tcp_read_sock(conn, GFP_KERNEL); + ret = rds_tcp_read_sock(cp, GFP_KERNEL); release_sock(sock->sk); return ret; @@ -296,24 +300,24 @@ int rds_tcp_recv(struct rds_connection *conn) void rds_tcp_data_ready(struct sock *sk) { void (*ready)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; rdsdebug("data ready sk %p\n", sk); read_lock_bh(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { /* check for teardown race */ + cp = sk->sk_user_data; + if (!cp) { /* check for teardown race */ ready = sk->sk_data_ready; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; ready = tc->t_orig_data_ready; rds_tcp_stats_inc(s_tcp_data_ready_calls); - if (rds_tcp_read_sock(conn, GFP_ATOMIC) == -ENOMEM) - queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); out: read_unlock_bh(&sk->sk_callback_lock); ready(sk); diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c index 618be69c9c3b..89d09b481f47 100644 --- a/net/rds/tcp_send.c +++ b/net/rds/tcp_send.c @@ -34,6 +34,7 @@ #include <linux/in.h> #include <net/tcp.h> +#include "rds_single_path.h" #include "rds.h" #include "tcp.h" @@ -48,16 +49,16 @@ static void rds_tcp_cork(struct socket *sock, int val) set_fs(oldfs); } -void rds_tcp_xmit_prepare(struct rds_connection *conn) +void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rds_tcp_cork(tc->t_sock, 1); } -void rds_tcp_xmit_complete(struct rds_connection *conn) +void rds_tcp_xmit_path_complete(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rds_tcp_cork(tc->t_sock, 0); } @@ -80,7 +81,8 @@ static int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len) int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_conn_path *cp = rm->m_inc.i_conn_path; + struct rds_tcp_connection *tc = cp->cp_transport_data; int done = 0; int ret = 0; int more; @@ -149,10 +151,17 @@ out: rds_tcp_stats_inc(s_tcp_sndbuf_full); ret = 0; } else { - printk(KERN_WARNING "RDS/tcp: send to %pI4 " - "returned %d, disconnecting and reconnecting\n", - &conn->c_faddr, ret); - rds_conn_drop(conn); + /* No need to disconnect/reconnect if path_drop + * has already been triggered, because, e.g., of + * an incoming RST. + */ + if (rds_conn_path_up(cp)) { + pr_warn("RDS/tcp: send to %pI4 on cp [%d]" + "returned %d, " + "disconnecting and reconnecting\n", + &conn->c_faddr, cp->cp_index, ret); + rds_conn_path_drop(cp); + } } } if (done == 0) @@ -177,27 +186,27 @@ static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack) void rds_tcp_write_space(struct sock *sk) { void (*write_space)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; read_lock_bh(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { + cp = sk->sk_user_data; + if (!cp) { write_space = sk->sk_write_space; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; rdsdebug("write_space for tc %p\n", tc); write_space = tc->t_orig_write_space; rds_tcp_stats_inc(s_tcp_write_space_calls); rdsdebug("tcp una %u\n", rds_tcp_snd_una(tc)); tc->t_last_seen_una = rds_tcp_snd_una(tc); - rds_send_drop_acked(conn, rds_tcp_snd_una(tc), rds_tcp_is_acked); + rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked); if ((atomic_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); out: read_unlock_bh(&sk->sk_callback_lock); diff --git a/net/rds/threads.c b/net/rds/threads.c index 4a323045719b..e42df11bf30a 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -71,30 +71,30 @@ struct workqueue_struct *rds_wq; EXPORT_SYMBOL_GPL(rds_wq); -void rds_connect_path_complete(struct rds_connection *conn, int curr) +void rds_connect_path_complete(struct rds_conn_path *cp, int curr) { - if (!rds_conn_transition(conn, curr, RDS_CONN_UP)) { + if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) { printk(KERN_WARNING "%s: Cannot transition to state UP, " "current state is %d\n", __func__, - atomic_read(&conn->c_state)); - rds_conn_drop(conn); + atomic_read(&cp->cp_state)); + rds_conn_path_drop(cp); return; } rdsdebug("conn %p for %pI4 to %pI4 complete\n", - conn, &conn->c_laddr, &conn->c_faddr); + cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr); - conn->c_reconnect_jiffies = 0; - set_bit(0, &conn->c_map_queued); - queue_delayed_work(rds_wq, &conn->c_send_w, 0); - queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + cp->cp_reconnect_jiffies = 0; + set_bit(0, &cp->cp_conn->c_map_queued); + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); } EXPORT_SYMBOL_GPL(rds_connect_path_complete); void rds_connect_complete(struct rds_connection *conn) { - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); + rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING); } EXPORT_SYMBOL_GPL(rds_connect_complete); @@ -116,70 +116,87 @@ EXPORT_SYMBOL_GPL(rds_connect_complete); * We should *always* start with a random backoff; otherwise a broken connection * will always take several iterations to be re-established. */ -void rds_queue_reconnect(struct rds_connection *conn) +void rds_queue_reconnect(struct rds_conn_path *cp) { unsigned long rand; + struct rds_connection *conn = cp->cp_conn; rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n", conn, &conn->c_laddr, &conn->c_faddr, - conn->c_reconnect_jiffies); + cp->cp_reconnect_jiffies); - set_bit(RDS_RECONNECT_PENDING, &conn->c_flags); - if (conn->c_reconnect_jiffies == 0) { - conn->c_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; - queue_delayed_work(rds_wq, &conn->c_conn_w, 0); + /* let peer with smaller addr initiate reconnect, to avoid duels */ + if (conn->c_trans->t_type == RDS_TRANS_TCP && + conn->c_laddr > conn->c_faddr) + return; + + set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); + if (cp->cp_reconnect_jiffies == 0) { + cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; + queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); return; } get_random_bytes(&rand, sizeof(rand)); rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n", - rand % conn->c_reconnect_jiffies, conn->c_reconnect_jiffies, + rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies, conn, &conn->c_laddr, &conn->c_faddr); - queue_delayed_work(rds_wq, &conn->c_conn_w, - rand % conn->c_reconnect_jiffies); + queue_delayed_work(rds_wq, &cp->cp_conn_w, + rand % cp->cp_reconnect_jiffies); - conn->c_reconnect_jiffies = min(conn->c_reconnect_jiffies * 2, + cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2, rds_sysctl_reconnect_max_jiffies); } void rds_connect_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_conn_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_conn_w.work); + struct rds_connection *conn = cp->cp_conn; int ret; - clear_bit(RDS_RECONNECT_PENDING, &conn->c_flags); - if (rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) { - ret = conn->c_trans->conn_connect(conn); + if (cp->cp_index > 1 && cp->cp_conn->c_laddr > cp->cp_conn->c_faddr) + return; + clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); + ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING); + if (ret) { + ret = conn->c_trans->conn_path_connect(cp); rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n", conn, &conn->c_laddr, &conn->c_faddr, ret); if (ret) { - if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN)) - rds_queue_reconnect(conn); + if (rds_conn_path_transition(cp, + RDS_CONN_CONNECTING, + RDS_CONN_DOWN)) + rds_queue_reconnect(cp); else - rds_conn_error(conn, "RDS: connect failed\n"); + rds_conn_path_error(cp, + "RDS: connect failed\n"); } } } void rds_send_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_send_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_send_w.work); int ret; - if (rds_conn_state(conn) == RDS_CONN_UP) { - clear_bit(RDS_LL_SEND_FULL, &conn->c_flags); - ret = rds_send_xmit(conn); + if (rds_conn_path_state(cp) == RDS_CONN_UP) { + clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags); + ret = rds_send_xmit(cp); cond_resched(); - rdsdebug("conn %p ret %d\n", conn, ret); + rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { case -EAGAIN: rds_stats_inc(s_send_immediate_retry); - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); break; case -ENOMEM: rds_stats_inc(s_send_delayed_retry); - queue_delayed_work(rds_wq, &conn->c_send_w, 2); + queue_delayed_work(rds_wq, &cp->cp_send_w, 2); default: break; } @@ -188,20 +205,22 @@ void rds_send_worker(struct work_struct *work) void rds_recv_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_recv_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_recv_w.work); int ret; - if (rds_conn_state(conn) == RDS_CONN_UP) { - ret = conn->c_trans->recv(conn); - rdsdebug("conn %p ret %d\n", conn, ret); + if (rds_conn_path_state(cp) == RDS_CONN_UP) { + ret = cp->cp_conn->c_trans->recv_path(cp); + rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { case -EAGAIN: rds_stats_inc(s_recv_immediate_retry); - queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); break; case -ENOMEM: rds_stats_inc(s_recv_delayed_retry); - queue_delayed_work(rds_wq, &conn->c_recv_w, 2); + queue_delayed_work(rds_wq, &cp->cp_recv_w, 2); default: break; } @@ -210,9 +229,11 @@ void rds_recv_worker(struct work_struct *work) void rds_shutdown_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_down_w); - rds_conn_shutdown(conn); + rds_conn_shutdown(cp); } void rds_threads_exit(void) |