summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/sched.c1
-rw-r--r--net/sunrpc/xprt.c32
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c327
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c148
-rw-r--r--net/sunrpc/xprtrdma/transport.c83
-rw-r--r--net/sunrpc/xprtrdma/verbs.c115
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h44
-rw-r--r--net/sunrpc/xprtsock.c23
8 files changed, 441 insertions, 332 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index f820780280b5..8a0779e963f9 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -59,6 +59,7 @@ static struct rpc_wait_queue delay_queue;
*/
struct workqueue_struct *rpciod_workqueue __read_mostly;
struct workqueue_struct *xprtiod_workqueue __read_mostly;
+EXPORT_SYMBOL_GPL(xprtiod_workqueue);
unsigned long
rpc_task_timeout(const struct rpc_task *task)
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 70d6a1f10db9..70a704c44c6d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -846,6 +846,38 @@ void xprt_connect(struct rpc_task *task)
xprt_release_write(xprt, task);
}
+/**
+ * xprt_reconnect_delay - compute the wait before scheduling a connect
+ * @xprt: transport instance
+ *
+ */
+unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
+{
+ unsigned long start, now = jiffies;
+
+ start = xprt->stat.connect_start + xprt->reestablish_timeout;
+ if (time_after(start, now))
+ return start - now;
+ return 0;
+}
+EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
+
+/**
+ * xprt_reconnect_backoff - compute the new re-establish timeout
+ * @xprt: transport instance
+ * @init_to: initial reestablish timeout
+ *
+ */
+void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
+{
+ xprt->reestablish_timeout <<= 1;
+ if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
+ xprt->reestablish_timeout = xprt->max_reconnect_timeout;
+ if (xprt->reestablish_timeout < init_to)
+ xprt->reestablish_timeout = init_to;
+}
+EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
+
enum xprt_xid_rb_cmp {
XID_RB_EQUAL,
XID_RB_LEFT,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 794ba4ca0994..0b6dad7580a1 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -144,6 +144,26 @@ frwr_mr_recycle_worker(struct work_struct *work)
frwr_release_mr(mr);
}
+/* frwr_reset - Place MRs back on the free list
+ * @req: request to reset
+ *
+ * Used after a failed marshal. For FRWR, this means the MRs
+ * don't have to be fully released and recreated.
+ *
+ * NB: This is safe only as long as none of @req's MRs are
+ * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
+ * Work Request.
+ */
+void frwr_reset(struct rpcrdma_req *req)
+{
+ while (!list_empty(&req->rl_registered)) {
+ struct rpcrdma_mr *mr;
+
+ mr = rpcrdma_mr_pop(&req->rl_registered);
+ rpcrdma_mr_unmap_and_put(mr);
+ }
+}
+
/**
* frwr_init_mr - Initialize one MR
* @ia: interface adapter
@@ -168,7 +188,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
goto out_list_err;
mr->frwr.fr_mr = frmr;
- mr->frwr.fr_state = FRWR_IS_INVALID;
mr->mr_dir = DMA_NONE;
INIT_LIST_HEAD(&mr->mr_list);
INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
@@ -298,65 +317,6 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
}
/**
- * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
- *
- */
-static void
-frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr =
- container_of(cqe, struct rpcrdma_frwr, fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS)
- frwr->fr_state = FRWR_FLUSHED_FR;
- trace_xprtrdma_wc_fastreg(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
- *
- */
-static void
-frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
- fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS)
- frwr->fr_state = FRWR_FLUSHED_LI;
- trace_xprtrdma_wc_li(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
- *
- * Awaken anyone waiting for an MR to finish being fenced.
- */
-static void
-frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
- fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS)
- frwr->fr_state = FRWR_FLUSHED_LI;
- trace_xprtrdma_wc_li_wake(wc, frwr);
- complete(&frwr->fr_linv_done);
-}
-
-/**
* frwr_map - Register a memory region
* @r_xprt: controlling transport
* @seg: memory region co-ordinates
@@ -378,23 +338,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
- struct rpcrdma_frwr *frwr;
struct rpcrdma_mr *mr;
struct ib_mr *ibmr;
struct ib_reg_wr *reg_wr;
int i, n;
u8 key;
- mr = NULL;
- do {
- if (mr)
- rpcrdma_mr_recycle(mr);
- mr = rpcrdma_mr_get(r_xprt);
- if (!mr)
- return ERR_PTR(-EAGAIN);
- } while (mr->frwr.fr_state != FRWR_IS_INVALID);
- frwr = &mr->frwr;
- frwr->fr_state = FRWR_IS_VALID;
+ mr = rpcrdma_mr_get(r_xprt);
+ if (!mr)
+ goto out_getmr_err;
if (nsegs > ia->ri_max_frwr_depth)
nsegs = ia->ri_max_frwr_depth;
@@ -423,7 +375,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
if (!mr->mr_nents)
goto out_dmamap_err;
- ibmr = frwr->fr_mr;
+ ibmr = mr->frwr.fr_mr;
n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
if (unlikely(n != mr->mr_nents))
goto out_mapmr_err;
@@ -433,7 +385,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
key = (u8)(ibmr->rkey & 0x000000FF);
ib_update_fast_reg_key(ibmr, ++key);
- reg_wr = &frwr->fr_regwr;
+ reg_wr = &mr->frwr.fr_regwr;
reg_wr->mr = ibmr;
reg_wr->key = ibmr->rkey;
reg_wr->access = writing ?
@@ -448,6 +400,10 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
*out = mr;
return seg;
+out_getmr_err:
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+ return ERR_PTR(-EAGAIN);
+
out_dmamap_err:
mr->mr_dir = DMA_NONE;
trace_xprtrdma_frwr_sgerr(mr, i);
@@ -461,6 +417,23 @@ out_mapmr_err:
}
/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
+static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_fastreg(wc, frwr);
+ /* The MR will get recycled when the associated req is retransmitted */
+}
+
+/**
* frwr_send - post Send WR containing the RPC Call message
* @ia: interface adapter
* @req: Prepared RPC Call
@@ -512,31 +485,75 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
if (mr->mr_handle == rep->rr_inv_rkey) {
list_del_init(&mr->mr_list);
trace_xprtrdma_mr_remoteinv(mr);
- mr->frwr.fr_state = FRWR_IS_INVALID;
rpcrdma_mr_unmap_and_put(mr);
break; /* only one invalidated MR per RPC */
}
}
+static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
+{
+ if (wc->status != IB_WC_SUCCESS)
+ rpcrdma_mr_recycle(mr);
+ else
+ rpcrdma_mr_unmap_and_put(mr);
+}
+
/**
- * frwr_unmap_sync - invalidate memory regions that were registered for @req
- * @r_xprt: controlling transport
- * @mrs: list of MRs to process
+ * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
+static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li(wc, frwr);
+ __frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
*
- * Sleeps until it is safe for the host CPU to access the
- * previously mapped memory regions.
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li_wake(wc, frwr);
+ complete(&frwr->fr_linv_done);
+ __frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_unmap_sync - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
*
- * Caller ensures that @mrs is not empty before the call. This
- * function empties the list.
+ * Sleeps until it is safe for the host CPU to access the previously mapped
+ * memory regions. This guarantees that registered MRs are properly fenced
+ * from the server before the RPC consumer accesses the data in them. It
+ * also ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
*/
-void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct ib_send_wr *first, **prev, *last;
const struct ib_send_wr *bad_wr;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_frwr *frwr;
struct rpcrdma_mr *mr;
- int count, rc;
+ int rc;
/* ORDER: Invalidate all of the MRs first
*
@@ -544,33 +561,32 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
* a single ib_post_send() call.
*/
frwr = NULL;
- count = 0;
prev = &first;
- list_for_each_entry(mr, mrs, mr_list) {
- mr->frwr.fr_state = FRWR_IS_INVALID;
+ while (!list_empty(&req->rl_registered)) {
+ mr = rpcrdma_mr_pop(&req->rl_registered);
- frwr = &mr->frwr;
trace_xprtrdma_mr_localinv(mr);
+ r_xprt->rx_stats.local_inv_needed++;
+ frwr = &mr->frwr;
frwr->fr_cqe.done = frwr_wc_localinv;
last = &frwr->fr_invwr;
- memset(last, 0, sizeof(*last));
+ last->next = NULL;
last->wr_cqe = &frwr->fr_cqe;
+ last->sg_list = NULL;
+ last->num_sge = 0;
last->opcode = IB_WR_LOCAL_INV;
+ last->send_flags = IB_SEND_SIGNALED;
last->ex.invalidate_rkey = mr->mr_handle;
- count++;
*prev = last;
prev = &last->next;
}
- if (!frwr)
- goto unmap;
/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
* are complete.
*/
- last->send_flags = IB_SEND_SIGNALED;
frwr->fr_cqe.done = frwr_wc_localinv_wake;
reinit_completion(&frwr->fr_linv_done);
@@ -578,37 +594,126 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
* replaces the QP. The RPC reply handler won't call us
* unless ri_id->qp is a valid pointer.
*/
- r_xprt->rx_stats.local_inv_needed++;
bad_wr = NULL;
- rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
+ rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+ trace_xprtrdma_post_send(req, rc);
+
+ /* The final LOCAL_INV WR in the chain is supposed to
+ * do the wake. If it was never posted, the wake will
+ * not happen, so don't wait in that case.
+ */
if (bad_wr != first)
wait_for_completion(&frwr->fr_linv_done);
- if (rc)
- goto out_release;
+ if (!rc)
+ return;
- /* ORDER: Now DMA unmap all of the MRs, and return
- * them to the free MR list.
+ /* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
-unmap:
- while (!list_empty(mrs)) {
- mr = rpcrdma_mr_pop(mrs);
- rpcrdma_mr_unmap_and_put(mr);
+ while (bad_wr) {
+ frwr = container_of(bad_wr, struct rpcrdma_frwr,
+ fr_invwr);
+ mr = container_of(frwr, struct rpcrdma_mr, frwr);
+ bad_wr = bad_wr->next;
+
+ list_del_init(&mr->mr_list);
+ rpcrdma_mr_recycle(mr);
}
- return;
+}
-out_release:
- pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc);
+/**
+ * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
+static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
- /* Unmap and release the MRs in the LOCAL_INV WRs that did not
- * get posted.
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li_done(wc, frwr);
+ rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
+ __frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_unmap_async - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
+ *
+ * This guarantees that registered MRs are properly fenced from the
+ * server before the RPC consumer accesses the data in them. It also
+ * ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
+ */
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+ struct ib_send_wr *first, *last, **prev;
+ const struct ib_send_wr *bad_wr;
+ struct rpcrdma_frwr *frwr;
+ struct rpcrdma_mr *mr;
+ int rc;
+
+ /* Chain the LOCAL_INV Work Requests and post them with
+ * a single ib_post_send() call.
+ */
+ frwr = NULL;
+ prev = &first;
+ while (!list_empty(&req->rl_registered)) {
+ mr = rpcrdma_mr_pop(&req->rl_registered);
+
+ trace_xprtrdma_mr_localinv(mr);
+ r_xprt->rx_stats.local_inv_needed++;
+
+ frwr = &mr->frwr;
+ frwr->fr_cqe.done = frwr_wc_localinv;
+ frwr->fr_req = req;
+ last = &frwr->fr_invwr;
+ last->next = NULL;
+ last->wr_cqe = &frwr->fr_cqe;
+ last->sg_list = NULL;
+ last->num_sge = 0;
+ last->opcode = IB_WR_LOCAL_INV;
+ last->send_flags = IB_SEND_SIGNALED;
+ last->ex.invalidate_rkey = mr->mr_handle;
+
+ *prev = last;
+ prev = &last->next;
+ }
+
+ /* Strong send queue ordering guarantees that when the
+ * last WR in the chain completes, all WRs in the chain
+ * are complete. The last completion will wake up the
+ * RPC waiter.
+ */
+ frwr->fr_cqe.done = frwr_wc_localinv_done;
+
+ /* Transport disconnect drains the receive CQ before it
+ * replaces the QP. The RPC reply handler won't call us
+ * unless ri_id->qp is a valid pointer.
+ */
+ bad_wr = NULL;
+ rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+ trace_xprtrdma_post_send(req, rc);
+ if (!rc)
+ return;
+
+ /* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
while (bad_wr) {
- frwr = container_of(bad_wr, struct rpcrdma_frwr,
- fr_invwr);
+ frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
mr = container_of(frwr, struct rpcrdma_mr, frwr);
bad_wr = bad_wr->next;
- list_del_init(&mr->mr_list);
rpcrdma_mr_recycle(mr);
}
+
+ /* The final LOCAL_INV WR in the chain is supposed to
+ * do the wake. If it was never posted, the wake will
+ * not happen, so wake here in that case.
+ */
+ rpcrdma_complete_rqst(req->rl_reply);
}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 7dc62e55f526..4345e6912392 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -366,6 +366,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
unsigned int pos;
int nsegs;
+ if (rtype == rpcrdma_noch)
+ goto done;
+
pos = rqst->rq_snd_buf.head[0].iov_len;
if (rtype == rpcrdma_areadch)
pos = 0;
@@ -389,7 +392,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nsegs -= mr->mr_nents;
} while (nsegs);
- return 0;
+done:
+ return encode_item_not_present(xdr);
}
/* Register and XDR encode the Write list. Supports encoding a list
@@ -417,6 +421,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
int nsegs, nchunks;
__be32 *segcount;
+ if (wtype != rpcrdma_writech)
+ goto done;
+
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len,
@@ -451,7 +458,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
/* Update count of segments in this Write chunk */
*segcount = cpu_to_be32(nchunks);
- return 0;
+done:
+ return encode_item_not_present(xdr);
}
/* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -476,6 +484,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
int nsegs, nchunks;
__be32 *segcount;
+ if (wtype != rpcrdma_replych)
+ return encode_item_not_present(xdr);
+
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
if (nsegs < 0)
@@ -511,6 +522,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return 0;
}
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
+ struct rpcrdma_rep *rep = req->rl_reply;
+
+ rpcrdma_complete_rqst(rep);
+ rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
/**
* rpcrdma_sendctx_unmap - DMA-unmap Send buffer
* @sc: sendctx containing SGEs to unmap
@@ -520,6 +541,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{
struct ib_sge *sge;
+ if (!sc->sc_unmap_count)
+ return;
+
/* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so
* they can be cheaply re-used.
@@ -529,9 +553,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
DMA_TO_DEVICE);
- if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
- &sc->sc_req->rl_flags))
- wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+ kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
}
/* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -666,7 +688,7 @@ map_tail:
out:
sc->sc_wr.num_sge += sge_no;
if (sc->sc_unmap_count)
- __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+ kref_get(&req->rl_kref);
return true;
out_regbuf:
@@ -699,22 +721,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
+ int ret;
+
+ ret = -EAGAIN;
req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx)
- return -EAGAIN;
+ goto err;
req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
- __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+ kref_init(&req->rl_kref);
+ ret = -EIO;
if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
- return -EIO;
-
+ goto err;
if (rtype != rpcrdma_areadch)
if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
- return -EIO;
-
+ goto err;
return 0;
+
+err:
+ trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+ return ret;
}
/**
@@ -842,50 +870,28 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
* send a Call message with a Position Zero Read chunk and a
* regular Read chunk at the same time.
*/
- if (rtype != rpcrdma_noch) {
- ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
- if (ret)
- goto out_err;
- }
- ret = encode_item_not_present(xdr);
+ ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
if (ret)
goto out_err;
-
- if (wtype == rpcrdma_writech) {
- ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
- if (ret)
- goto out_err;
- }
- ret = encode_item_not_present(xdr);
+ ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
if (ret)
goto out_err;
-
- if (wtype != rpcrdma_replych)
- ret = encode_item_not_present(xdr);
- else
- ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
+ ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
if (ret)
goto out_err;
- trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
- ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+ ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
&rqst->rq_snd_buf, rtype);
if (ret)
goto out_err;
+
+ trace_xprtrdma_marshal(req, rtype, wtype);
return 0;
out_err:
trace_xprtrdma_marshal_failed(rqst, ret);
- switch (ret) {
- case -EAGAIN:
- xprt_wait_for_buffer_space(rqst->rq_xprt);
- break;
- case -ENOBUFS:
- break;
- default:
- r_xprt->rx_stats.failed_marshal_count++;
- }
+ r_xprt->rx_stats.failed_marshal_count++;
+ frwr_reset(req);
return ret;
}
@@ -1269,51 +1275,17 @@ out_badheader:
goto out;
}
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
-{
- /* Invalidate and unmap the data payloads before waking
- * the waiting application. This guarantees the memory
- * regions are properly fenced from the server before the
- * application accesses the data. It also ensures proper
- * send flow control: waking the next RPC waits until this
- * RPC has relinquished all its Send Queue entries.
- */
- if (!list_empty(&req->rl_registered))
- frwr_unmap_sync(r_xprt, &req->rl_registered);
-
- /* Ensure that any DMA mapped pages associated with
- * the Send of the RPC Call have been unmapped before
- * allowing the RPC to complete. This protects argument
- * memory not controlled by the RPC client from being
- * re-used before we're done with it.
- */
- if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
- r_xprt->rx_stats.reply_waits_for_send++;
- out_of_line_wait_on_bit(&req->rl_flags,
- RPCRDMA_REQ_F_TX_RESOURCES,
- bit_wait,
- TASK_UNINTERRUPTIBLE);
- }
-}
-
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
+static void rpcrdma_reply_done(struct kref *kref)
{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
- struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
- trace_xprtrdma_defer_cmp(rep);
- if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
- frwr_reminv(rep, &req->rl_registered);
- rpcrdma_release_rqst(r_xprt, req);
- rpcrdma_complete_rqst(rep);
+ rpcrdma_complete_rqst(req->rl_reply);
}
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
*
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
@@ -1373,10 +1345,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
}
req->rl_reply = rep;
rep->rr_rqst = rqst;
- clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
- queue_work(buf->rb_completion_wq, &rep->rr_work);
+
+ if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+ frwr_reminv(rep, &req->rl_registered);
+ if (!list_empty(&req->rl_registered))
+ frwr_unmap_async(r_xprt, req);
+ /* LocalInv completion will complete the RPC */
+ else
+ kref_put(&req->rl_kref, rpcrdma_reply_done);
return;
out_badversion:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 1f73a6a7e43c..4993aa49ecbe 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -298,6 +298,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
module_put(THIS_MODULE);
}
+/* 60 second timeout, no retries */
static const struct rpc_timeout xprt_rdma_default_timeout = {
.to_initval = 60 * HZ,
.to_maxval = 60 * HZ,
@@ -323,8 +324,9 @@ xprt_setup_rdma(struct xprt_create *args)
if (!xprt)
return ERR_PTR(-ENOMEM);
- /* 60 second timeout, no retries */
xprt->timeout = &xprt_rdma_default_timeout;
+ xprt->connect_timeout = xprt->timeout->to_initval;
+ xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
xprt->bind_timeout = RPCRDMA_BIND_TO;
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
@@ -487,31 +489,64 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
}
/**
- * xprt_rdma_connect - try to establish a transport connection
+ * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
+ * @xprt: controlling transport instance
+ * @connect_timeout: reconnect timeout after client disconnects
+ * @reconnect_timeout: reconnect timeout after server disconnects
+ *
+ */
+static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt,
+ unsigned long connect_timeout,
+ unsigned long reconnect_timeout)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
+
+ spin_lock(&xprt->transport_lock);
+
+ if (connect_timeout < xprt->connect_timeout) {
+ struct rpc_timeout to;
+ unsigned long initval;
+
+ to = *xprt->timeout;
+ initval = connect_timeout;
+ if (initval < RPCRDMA_INIT_REEST_TO << 1)
+ initval = RPCRDMA_INIT_REEST_TO << 1;
+ to.to_initval = initval;
+ to.to_maxval = initval;
+ r_xprt->rx_timeout = to;
+ xprt->timeout = &r_xprt->rx_timeout;
+ xprt->connect_timeout = connect_timeout;
+ }
+
+ if (reconnect_timeout < xprt->max_reconnect_timeout)
+ xprt->max_reconnect_timeout = reconnect_timeout;
+
+ spin_unlock(&xprt->transport_lock);
+}
+
+/**
+ * xprt_rdma_connect - schedule an attempt to reconnect
* @xprt: transport state
- * @task: RPC scheduler context
+ * @task: RPC scheduler context (unused)
*
*/
static void
xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ unsigned long delay;
trace_xprtrdma_op_connect(r_xprt);
+
+ delay = 0;
if (r_xprt->rx_ep.rep_connected != 0) {
- /* Reconnect */
- schedule_delayed_work(&r_xprt->rx_connect_worker,
- xprt->reestablish_timeout);
- xprt->reestablish_timeout <<= 1;
- if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
- xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
- else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
- xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
- } else {
- schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
- if (!RPC_IS_ASYNC(task))
- flush_delayed_work(&r_xprt->rx_connect_worker);
+ delay = xprt_reconnect_delay(xprt);
+ xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
}
+ queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
+ delay);
}
/**
@@ -550,8 +585,11 @@ out_sleep:
static void
xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
{
+ struct rpcrdma_xprt *r_xprt =
+ container_of(xprt, struct rpcrdma_xprt, rx_xprt);
+
memset(rqst, 0, sizeof(*rqst));
- rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
+ rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
rpc_wake_up_next(&xprt->backlog);
}
@@ -618,9 +656,16 @@ xprt_rdma_free(struct rpc_task *task)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
- rpcrdma_release_rqst(r_xprt, req);
trace_xprtrdma_op_free(task, req);
+
+ if (!list_empty(&req->rl_registered))
+ frwr_unmap_sync(r_xprt, req);
+
+ /* XXX: If the RPC is completing because of a signal and
+ * not because a reply was received, we ought to ensure
+ * that the Send completion has fired, so that memory
+ * involved with the Send is not still visible to the NIC.
+ */
}
/**
@@ -667,7 +712,6 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
goto drop_connection;
rqst->rq_xtime = ktime_get();
- __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection;
@@ -760,6 +804,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
.send_request = xprt_rdma_send_request,
.close = xprt_rdma_close,
.destroy = xprt_rdma_destroy,
+ .set_connect_timeout = xprt_rdma_tcp_set_connect_timeout,
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 84bb37924540..805b1f35e1ca 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -89,14 +89,12 @@ static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
*/
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
/* Flush Receives, then wait for deferred Reply work
* to complete.
*/
ib_drain_rq(ia->ri_id->qp);
- drain_workqueue(buf->rb_completion_wq);
/* Deferred Reply processing might have scheduled
* local invalidations.
@@ -901,7 +899,7 @@ out_emptyq:
* completions recently. This is a sign the Send Queue is
* backing up. Cause the caller to pause and try again.
*/
- set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
r_xprt->rx_stats.empty_sendctx_q++;
return NULL;
}
@@ -936,10 +934,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
- if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
- smp_mb__after_atomic();
- xprt_write_space(&sc->sc_xprt->rx_xprt);
- }
+ xprt_write_space(&sc->sc_xprt->rx_xprt);
}
static void
@@ -977,8 +972,6 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
r_xprt->rx_stats.mrs_allocated += count;
spin_unlock(&buf->rb_mrlock);
trace_xprtrdma_createmrs(r_xprt, count);
-
- xprt_write_space(&r_xprt->rx_xprt);
}
static void
@@ -990,6 +983,7 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
rx_buf);
rpcrdma_mrs_create(r_xprt);
+ xprt_write_space(&r_xprt->rx_xprt);
}
/**
@@ -1025,7 +1019,6 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
if (!req->rl_recvbuf)
goto out4;
- req->rl_buffer = buffer;
INIT_LIST_HEAD(&req->rl_registered);
spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
@@ -1042,9 +1035,9 @@ out1:
return NULL;
}
-static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)
+static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
+ bool temp)
{
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_rep *rep;
rep = kzalloc(sizeof(*rep), GFP_KERNEL);
@@ -1055,27 +1048,22 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)
DMA_FROM_DEVICE, GFP_KERNEL);
if (!rep->rr_rdmabuf)
goto out_free;
+
xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
rdmab_length(rep->rr_rdmabuf));
-
rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt;
- INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
rep->rr_recv_wr.next = NULL;
rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
rep->rr_recv_wr.num_sge = 1;
rep->rr_temp = temp;
-
- spin_lock(&buf->rb_lock);
- list_add(&rep->rr_list, &buf->rb_recv_bufs);
- spin_unlock(&buf->rb_lock);
- return true;
+ return rep;
out_free:
kfree(rep);
out:
- return false;
+ return NULL;
}
/**
@@ -1089,7 +1077,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
int i, rc;
- buf->rb_flags = 0;
buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_mrlock);
@@ -1122,15 +1109,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
if (rc)
goto out;
- buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
- WQ_MEM_RECLAIM | WQ_HIGHPRI,
- 0,
- r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
- if (!buf->rb_completion_wq) {
- rc = -ENOMEM;
- goto out;
- }
-
return 0;
out:
rpcrdma_buffer_destroy(buf);
@@ -1204,11 +1182,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
cancel_delayed_work_sync(&buf->rb_refresh_worker);
- if (buf->rb_completion_wq) {
- destroy_workqueue(buf->rb_completion_wq);
- buf->rb_completion_wq = NULL;
- }
-
rpcrdma_sendctxs_destroy(buf);
while (!list_empty(&buf->rb_recv_bufs)) {
@@ -1325,13 +1298,12 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
/**
* rpcrdma_buffer_put - Put request/reply buffers back into pool
+ * @buffers: buffer pool
* @req: object to return
*
*/
-void
-rpcrdma_buffer_put(struct rpcrdma_req *req)
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
- struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_rep *rep = req->rl_reply;
req->rl_reply = NULL;
@@ -1484,8 +1456,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
int rc;
- if (!ep->rep_send_count ||
- test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+ if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
send_wr->send_flags |= IB_SEND_SIGNALED;
ep->rep_send_count = ep->rep_send_batch;
} else {
@@ -1505,11 +1476,13 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- struct ib_recv_wr *wr, *bad_wr;
+ struct ib_recv_wr *i, *wr, *bad_wr;
+ struct rpcrdma_rep *rep;
int needed, count, rc;
rc = 0;
count = 0;
+
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
if (ep->rep_receive_count > needed)
goto out;
@@ -1517,51 +1490,65 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
if (!temp)
needed += RPCRDMA_MAX_RECV_BATCH;
- count = 0;
+ /* fast path: all needed reps can be found on the free list */
wr = NULL;
+ spin_lock(&buf->rb_lock);
while (needed) {
- struct rpcrdma_regbuf *rb;
- struct rpcrdma_rep *rep;
-
- spin_lock(&buf->rb_lock);
rep = list_first_entry_or_null(&buf->rb_recv_bufs,
struct rpcrdma_rep, rr_list);
- if (likely(rep))
- list_del(&rep->rr_list);
- spin_unlock(&buf->rb_lock);
- if (!rep) {
- if (!rpcrdma_rep_create(r_xprt, temp))
- break;
- continue;
- }
+ if (!rep)
+ break;
- rb = rep->rr_rdmabuf;
- if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) {
- rpcrdma_recv_buffer_put(rep);
+ list_del(&rep->rr_list);
+ rep->rr_recv_wr.next = wr;
+ wr = &rep->rr_recv_wr;
+ --needed;
+ }
+ spin_unlock(&buf->rb_lock);
+
+ while (needed) {
+ rep = rpcrdma_rep_create(r_xprt, temp);
+ if (!rep)
break;
- }
- trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
rep->rr_recv_wr.next = wr;
wr = &rep->rr_recv_wr;
- ++count;
--needed;
}
- if (!count)
+ if (!wr)
goto out;
+ for (i = wr; i; i = i->next) {
+ rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
+
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
+ goto release_wrs;
+
+ trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
+ ++count;
+ }
+
rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
(const struct ib_recv_wr **)&bad_wr);
+out:
+ trace_xprtrdma_post_recvs(r_xprt, count, rc);
if (rc) {
- for (wr = bad_wr; wr; wr = wr->next) {
+ for (wr = bad_wr; wr;) {
struct rpcrdma_rep *rep;
rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
+ wr = wr->next;
rpcrdma_recv_buffer_put(rep);
--count;
}
}
ep->rep_receive_count += count;
-out:
- trace_xprtrdma_post_recvs(r_xprt, count, rc);
+ return;
+
+release_wrs:
+ for (i = wr; i;) {
+ rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
+ i = i->next;
+ rpcrdma_recv_buffer_put(rep);
+ }
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index d1e0749bcbc4..8378f45d2da7 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -44,7 +44,8 @@
#include <linux/wait.h> /* wait_queue_head_t, etc */
#include <linux/spinlock.h> /* spinlock_t, etc */
-#include <linux/atomic.h> /* atomic_t, etc */
+#include <linux/atomic.h> /* atomic_t, etc */
+#include <linux/kref.h> /* struct kref */
#include <linux/workqueue.h> /* struct work_struct */
#include <rdma/rdma_cm.h> /* RDMA connection api */
@@ -202,10 +203,9 @@ struct rpcrdma_rep {
bool rr_temp;
struct rpcrdma_regbuf *rr_rdmabuf;
struct rpcrdma_xprt *rr_rxprt;
- struct work_struct rr_work;
+ struct rpc_rqst *rr_rqst;
struct xdr_buf rr_hdrbuf;
struct xdr_stream rr_stream;
- struct rpc_rqst *rr_rqst;
struct list_head rr_list;
struct ib_recv_wr rr_recv_wr;
};
@@ -240,18 +240,12 @@ struct rpcrdma_sendctx {
* An external memory region is any buffer or page that is registered
* on the fly (ie, not pre-registered).
*/
-enum rpcrdma_frwr_state {
- FRWR_IS_INVALID, /* ready to be used */
- FRWR_IS_VALID, /* in use */
- FRWR_FLUSHED_FR, /* flushed FASTREG WR */
- FRWR_FLUSHED_LI, /* flushed LOCALINV WR */
-};
-
+struct rpcrdma_req;
struct rpcrdma_frwr {
struct ib_mr *fr_mr;
struct ib_cqe fr_cqe;
- enum rpcrdma_frwr_state fr_state;
struct completion fr_linv_done;
+ struct rpcrdma_req *fr_req;
union {
struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr;
@@ -326,7 +320,6 @@ struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_list;
struct rpc_rqst rl_slot;
- struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;
struct xdr_stream rl_stream;
struct xdr_buf rl_hdrbuf;
@@ -336,18 +329,12 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
struct list_head rl_all;
- unsigned long rl_flags;
+ struct kref rl_kref;
struct list_head rl_registered; /* registered segments */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};
-/* rl_flags */
-enum {
- RPCRDMA_REQ_F_PENDING = 0,
- RPCRDMA_REQ_F_TX_RESOURCES,
-};
-
static inline struct rpcrdma_req *
rpcr_to_rdmar(const struct rpc_rqst *rqst)
{
@@ -391,22 +378,15 @@ struct rpcrdma_buffer {
struct list_head rb_recv_bufs;
struct list_head rb_allreqs;
- unsigned long rb_flags;
u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */
u32 rb_bc_srv_max_requests;
u32 rb_bc_max_requests;
- struct workqueue_struct *rb_completion_wq;
struct delayed_work rb_refresh_worker;
};
-/* rb_flags */
-enum {
- RPCRDMA_BUF_F_EMPTY_SCQ = 0,
-};
-
/*
* Statistics for RPCRDMA
*/
@@ -452,6 +432,7 @@ struct rpcrdma_xprt {
struct rpcrdma_ep rx_ep;
struct rpcrdma_buffer rx_buf;
struct delayed_work rx_connect_worker;
+ struct rpc_timeout rx_timeout;
struct rpcrdma_stats rx_stats;
};
@@ -518,7 +499,8 @@ rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
}
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
-void rpcrdma_buffer_put(struct rpcrdma_req *);
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
+ struct rpcrdma_req *req);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
@@ -564,6 +546,7 @@ rpcrdma_data_dir(bool writing)
/* Memory registration calls xprtrdma/frwr_ops.c
*/
bool frwr_is_supported(struct ib_device *device);
+void frwr_reset(struct rpcrdma_req *req);
int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
void frwr_release_mr(struct rpcrdma_mr *mr);
@@ -574,8 +557,8 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr **mr);
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
-void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt,
- struct list_head *mrs);
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
@@ -598,9 +581,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_req *req);
-void rpcrdma_deferred_completion(struct work_struct *work);
static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
{
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 97c15d47f343..3c2cc96afcaa 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2414,25 +2414,6 @@ out:
xprt_wake_pending_tasks(xprt, status);
}
-static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt)
-{
- unsigned long start, now = jiffies;
-
- start = xprt->stat.connect_start + xprt->reestablish_timeout;
- if (time_after(start, now))
- return start - now;
- return 0;
-}
-
-static void xs_reconnect_backoff(struct rpc_xprt *xprt)
-{
- xprt->reestablish_timeout <<= 1;
- if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
- xprt->reestablish_timeout = xprt->max_reconnect_timeout;
- if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
- xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
-}
-
/**
* xs_connect - connect a socket to a remote endpoint
* @xprt: pointer to transport structure
@@ -2462,8 +2443,8 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
/* Start by resetting any existing state */
xs_reset_transport(transport);
- delay = xs_reconnect_delay(xprt);
- xs_reconnect_backoff(xprt);
+ delay = xprt_reconnect_delay(xprt);
+ xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO);
} else
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);