diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 2 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 26 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 32 | ||||
-rw-r--r-- | net/sunrpc/xdr.c | 65 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 61 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 4 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 166 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 71 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 15 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 263 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 59 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 8 |
12 files changed, 388 insertions, 384 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 4ce42c62458e..d75fddca44c9 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1960,7 +1960,7 @@ gss_unwrap_resp_integ(struct rpc_task *task, struct rpc_cred *cred, if (xdr_buf_subsegment(rcv_buf, &integ_buf, data_offset, integ_len)) goto unwrap_failed; - if (xdr_buf_read_netobj(rcv_buf, &mic, mic_offset)) + if (xdr_buf_read_mic(rcv_buf, &mic, mic_offset)) goto unwrap_failed; maj_stat = gss_verify_mic(ctx->gc_gss_ctx, &integ_buf, &mic); if (maj_stat == GSS_S_CONTEXT_EXPIRED) diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index a07b516e503a..f7f78566be46 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1837,7 +1837,7 @@ call_allocate(struct rpc_task *task) return; } - rpc_exit(task, -ERESTARTSYS); + rpc_call_rpcerror(task, -ERESTARTSYS); } static int @@ -1862,6 +1862,7 @@ rpc_xdr_encode(struct rpc_task *task) req->rq_rbuffer, req->rq_rcvsize); + req->rq_reply_bytes_recvd = 0; req->rq_snd_buf.head[0].iov_len = 0; xdr_init_encode(&xdr, &req->rq_snd_buf, req->rq_snd_buf.head[0].iov_base, req); @@ -1881,6 +1882,8 @@ call_encode(struct rpc_task *task) if (!rpc_task_need_encode(task)) goto out; dprint_status(task); + /* Dequeue task from the receive queue while we're encoding */ + xprt_request_dequeue_xprt(task); /* Encode here so that rpcsec_gss can use correct sequence number. */ rpc_xdr_encode(task); /* Did the encode result in an error condition? */ @@ -2479,6 +2482,7 @@ call_decode(struct rpc_task *task) struct rpc_clnt *clnt = task->tk_client; struct rpc_rqst *req = task->tk_rqstp; struct xdr_stream xdr; + int err; dprint_status(task); @@ -2501,6 +2505,15 @@ call_decode(struct rpc_task *task) * before it changed req->rq_reply_bytes_recvd. */ smp_rmb(); + + /* + * Did we ever call xprt_complete_rqst()? If not, we should assume + * the message is incomplete. + */ + err = -EAGAIN; + if (!req->rq_reply_bytes_recvd) + goto out; + req->rq_rcv_buf.len = req->rq_private_buf.len; /* Check that the softirq receive buffer is valid */ @@ -2509,7 +2522,9 @@ call_decode(struct rpc_task *task) xdr_init_decode(&xdr, &req->rq_rcv_buf, req->rq_rcv_buf.head[0].iov_base, req); - switch (rpc_decode_header(task, &xdr)) { + err = rpc_decode_header(task, &xdr); +out: + switch (err) { case 0: task->tk_action = rpc_exit_task; task->tk_status = rpcauth_unwrap_resp(task, &xdr); @@ -2518,9 +2533,6 @@ call_decode(struct rpc_task *task) return; case -EAGAIN: task->tk_status = 0; - xdr_free_bvec(&req->rq_rcv_buf); - req->rq_reply_bytes_recvd = 0; - req->rq_rcv_buf.len = 0; if (task->tk_client->cl_discrtry) xprt_conditional_disconnect(req->rq_xprt, req->rq_connect_cookie); @@ -2561,7 +2573,7 @@ rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr) return 0; out_fail: trace_rpc_bad_callhdr(task); - rpc_exit(task, error); + rpc_call_rpcerror(task, error); return error; } @@ -2628,7 +2640,7 @@ out_garbage: return -EAGAIN; } out_err: - rpc_exit(task, error); + rpc_call_rpcerror(task, error); return error; out_unparsable: diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 1f275aba786f..360afe153193 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -541,33 +541,14 @@ rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq, return NULL; } -static void -rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq, - struct rpc_wait_queue *queue, struct rpc_task *task) -{ - rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL); -} - /* * Wake up a queued task while the queue lock is being held */ -static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) -{ - rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task); -} - -/* - * Wake up a task on a specific queue - */ -void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq, - struct rpc_wait_queue *queue, - struct rpc_task *task) +static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, + struct rpc_task *task) { - if (!RPC_IS_QUEUED(task)) - return; - spin_lock(&queue->lock); - rpc_wake_up_task_on_wq_queue_locked(wq, queue, task); - spin_unlock(&queue->lock); + rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue, + task, NULL, NULL); } /* @@ -930,8 +911,10 @@ static void __rpc_execute(struct rpc_task *task) /* * Signalled tasks should exit rather than sleep. */ - if (RPC_SIGNALLED(task)) + if (RPC_SIGNALLED(task)) { + task->tk_rpc_status = -ERESTARTSYS; rpc_exit(task, -ERESTARTSYS); + } /* * The queue->lock protects against races with @@ -967,6 +950,7 @@ static void __rpc_execute(struct rpc_task *task) */ dprintk("RPC: %5u got signal\n", task->tk_pid); set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate); + task->tk_rpc_status = -ERESTARTSYS; rpc_exit(task, -ERESTARTSYS); } dprintk("RPC: %5u sync task resuming\n", task->tk_pid); diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 48c93b9e525e..14ba9e72a204 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -560,7 +560,7 @@ EXPORT_SYMBOL_GPL(xdr_init_encode); * required at the end of encoding, or any other time when the xdr_buf * data might be read. */ -void xdr_commit_encode(struct xdr_stream *xdr) +inline void xdr_commit_encode(struct xdr_stream *xdr) { int shift = xdr->scratch.iov_len; void *page; @@ -1236,43 +1236,60 @@ xdr_encode_word(struct xdr_buf *buf, unsigned int base, u32 obj) } EXPORT_SYMBOL_GPL(xdr_encode_word); -/* If the netobj starting offset bytes from the start of xdr_buf is contained - * entirely in the head or the tail, set object to point to it; otherwise - * try to find space for it at the end of the tail, copy it there, and - * set obj to point to it. */ -int xdr_buf_read_netobj(struct xdr_buf *buf, struct xdr_netobj *obj, unsigned int offset) +/** + * xdr_buf_read_mic() - obtain the address of the GSS mic from xdr buf + * @buf: pointer to buffer containing a mic + * @mic: on success, returns the address of the mic + * @offset: the offset in buf where mic may be found + * + * This function may modify the xdr buf if the mic is found to be straddling + * a boundary between head, pages, and tail. On success the mic can be read + * from the address returned. There is no need to free the mic. + * + * Return: Success returns 0, otherwise an integer error. + */ +int xdr_buf_read_mic(struct xdr_buf *buf, struct xdr_netobj *mic, unsigned int offset) { struct xdr_buf subbuf; + unsigned int boundary; - if (xdr_decode_word(buf, offset, &obj->len)) + if (xdr_decode_word(buf, offset, &mic->len)) return -EFAULT; - if (xdr_buf_subsegment(buf, &subbuf, offset + 4, obj->len)) + offset += 4; + + /* Is the mic partially in the head? */ + boundary = buf->head[0].iov_len; + if (offset < boundary && (offset + mic->len) > boundary) + xdr_shift_buf(buf, boundary - offset); + + /* Is the mic partially in the pages? */ + boundary += buf->page_len; + if (offset < boundary && (offset + mic->len) > boundary) + xdr_shrink_pagelen(buf, boundary - offset); + + if (xdr_buf_subsegment(buf, &subbuf, offset, mic->len)) return -EFAULT; - /* Is the obj contained entirely in the head? */ - obj->data = subbuf.head[0].iov_base; - if (subbuf.head[0].iov_len == obj->len) + /* Is the mic contained entirely in the head? */ + mic->data = subbuf.head[0].iov_base; + if (subbuf.head[0].iov_len == mic->len) return 0; - /* ..or is the obj contained entirely in the tail? */ - obj->data = subbuf.tail[0].iov_base; - if (subbuf.tail[0].iov_len == obj->len) + /* ..or is the mic contained entirely in the tail? */ + mic->data = subbuf.tail[0].iov_base; + if (subbuf.tail[0].iov_len == mic->len) return 0; - /* use end of tail as storage for obj: - * (We don't copy to the beginning because then we'd have - * to worry about doing a potentially overlapping copy. - * This assumes the object is at most half the length of the - * tail.) */ - if (obj->len > buf->buflen - buf->len) + /* Find a contiguous area in @buf to hold all of @mic */ + if (mic->len > buf->buflen - buf->len) return -ENOMEM; if (buf->tail[0].iov_len != 0) - obj->data = buf->tail[0].iov_base + buf->tail[0].iov_len; + mic->data = buf->tail[0].iov_base + buf->tail[0].iov_len; else - obj->data = buf->head[0].iov_base + buf->head[0].iov_len; - __read_bytes_from_xdr_buf(&subbuf, obj->data, obj->len); + mic->data = buf->head[0].iov_base + buf->head[0].iov_len; + __read_bytes_from_xdr_buf(&subbuf, mic->data, mic->len); return 0; } -EXPORT_SYMBOL_GPL(xdr_buf_read_netobj); +EXPORT_SYMBOL_GPL(xdr_buf_read_mic); /* Returns 0 on success, or else a negative error code. */ static int diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 2e71f5455c6c..8a45b3ccc313 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -456,6 +456,12 @@ void xprt_release_rqst_cong(struct rpc_task *task) } EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); +static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) +{ + if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) + __xprt_lock_write_next_cong(xprt); +} + /* * Clear the congestion window wait flag and wake up the next * entry on xprt->sending @@ -671,6 +677,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt) spin_lock(&xprt->transport_lock); xprt_clear_connected(xprt); xprt_clear_write_space_locked(xprt); + xprt_clear_congestion_window_wait_locked(xprt); xprt_wake_pending_tasks(xprt, -ENOTCONN); spin_unlock(&xprt->transport_lock); } @@ -1324,6 +1331,36 @@ xprt_request_dequeue_transmit(struct rpc_task *task) } /** + * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue + * @task: pointer to rpc_task + * + * Remove a task from the transmit and receive queues, and ensure that + * it is not pinned by the receive work item. + */ +void +xprt_request_dequeue_xprt(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || + test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || + xprt_is_pinned_rqst(req)) { + spin_lock(&xprt->queue_lock); + xprt_request_dequeue_transmit_locked(task); + xprt_request_dequeue_receive_locked(task); + while (xprt_is_pinned_rqst(req)) { + set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); + spin_unlock(&xprt->queue_lock); + xprt_wait_on_pinned_rqst(req); + spin_lock(&xprt->queue_lock); + clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); + } + spin_unlock(&xprt->queue_lock); + } +} + +/** * xprt_request_prepare - prepare an encoded request for transport * @req: pointer to rpc_rqst * @@ -1747,28 +1784,6 @@ void xprt_retry_reserve(struct rpc_task *task) xprt_do_reserve(xprt, task); } -static void -xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) -{ - struct rpc_xprt *xprt = req->rq_xprt; - - if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || - test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || - xprt_is_pinned_rqst(req)) { - spin_lock(&xprt->queue_lock); - xprt_request_dequeue_transmit_locked(task); - xprt_request_dequeue_receive_locked(task); - while (xprt_is_pinned_rqst(req)) { - set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); - spin_unlock(&xprt->queue_lock); - xprt_wait_on_pinned_rqst(req); - spin_lock(&xprt->queue_lock); - clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); - } - spin_unlock(&xprt->queue_lock); - } -} - /** * xprt_release - release an RPC request slot * @task: task which is finished with the slot @@ -1788,7 +1803,7 @@ void xprt_release(struct rpc_task *task) } xprt = req->rq_xprt; - xprt_request_dequeue_all(task, req); + xprt_request_dequeue_xprt(task); spin_lock(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 59e624b1d7a0..50e075fcdd8f 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -54,9 +54,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt) { - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - - return r_xprt->rx_buf.rb_bc_srv_max_requests; + return RPCRDMA_BACKWARD_WRS >> 1; } static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 0b6dad7580a1..30065a28628c 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -7,67 +7,37 @@ /* Lightweight memory registration using Fast Registration Work * Requests (FRWR). * - * FRWR features ordered asynchronous registration and deregistration - * of arbitrarily sized memory regions. This is the fastest and safest + * FRWR features ordered asynchronous registration and invalidation + * of arbitrarily-sized memory regions. This is the fastest and safest * but most complex memory registration mode. */ /* Normal operation * - * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG + * A Memory Region is prepared for RDMA Read or Write using a FAST_REG * Work Request (frwr_map). When the RDMA operation is finished, this * Memory Region is invalidated using a LOCAL_INV Work Request - * (frwr_unmap_sync). + * (frwr_unmap_async and frwr_unmap_sync). * - * Typically these Work Requests are not signaled, and neither are RDMA - * SEND Work Requests (with the exception of signaling occasionally to - * prevent provider work queue overflows). This greatly reduces HCA + * Typically FAST_REG Work Requests are not signaled, and neither are + * RDMA Send Work Requests (with the exception of signaling occasionally + * to prevent provider work queue overflows). This greatly reduces HCA * interrupt workload. - * - * As an optimization, frwr_unmap marks MRs INVALID before the - * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on - * rb_mrs immediately so that no work (like managing a linked list - * under a spinlock) is needed in the completion upcall. - * - * But this means that frwr_map() can occasionally encounter an MR - * that is INVALID but the LOCAL_INV WR has not completed. Work Queue - * ordering prevents a subsequent FAST_REG WR from executing against - * that MR while it is still being invalidated. */ /* Transport recovery * - * ->op_map and the transport connect worker cannot run at the same - * time, but ->op_unmap can fire while the transport connect worker - * is running. Thus MR recovery is handled in ->op_map, to guarantee - * that recovered MRs are owned by a sending RPC, and not one where - * ->op_unmap could fire at the same time transport reconnect is - * being done. - * - * When the underlying transport disconnects, MRs are left in one of - * four states: - * - * INVALID: The MR was not in use before the QP entered ERROR state. - * - * VALID: The MR was registered before the QP entered ERROR state. - * - * FLUSHED_FR: The MR was being registered when the QP entered ERROR - * state, and the pending WR was flushed. - * - * FLUSHED_LI: The MR was being invalidated when the QP entered ERROR - * state, and the pending WR was flushed. - * - * When frwr_map encounters FLUSHED and VALID MRs, they are recovered - * with ib_dereg_mr and then are re-initialized. Because MR recovery - * allocates fresh resources, it is deferred to a workqueue, and the - * recovered MRs are placed back on the rb_mrs list when recovery is - * complete. frwr_map allocates another MR for the current RPC while - * the broken MR is reset. - * - * To ensure that frwr_map doesn't encounter an MR that is marked - * INVALID but that is about to be flushed due to a previous transport - * disconnect, the transport connect worker attempts to drain all - * pending send queue WRs before the transport is reconnected. + * frwr_map and frwr_unmap_* cannot run at the same time the transport + * connect worker is running. The connect worker holds the transport + * send lock, just as ->send_request does. This prevents frwr_map and + * the connect worker from running concurrently. When a connection is + * closed, the Receive completion queue is drained before the allowing + * the connect worker to get control. This prevents frwr_unmap and the + * connect worker from running concurrently. + * + * When the underlying transport disconnects, MRs that are in flight + * are flushed and are likely unusable. Thus all flushed MRs are + * destroyed. New MRs are created on demand. */ #include <linux/sunrpc/rpc_rdma.h> @@ -118,15 +88,8 @@ void frwr_release_mr(struct rpcrdma_mr *mr) kfree(mr); } -/* MRs are dynamically allocated, so simply clean up and release the MR. - * A replacement MR will subsequently be allocated on demand. - */ -static void -frwr_mr_recycle_worker(struct work_struct *work) +static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) { - struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle); - struct rpcrdma_xprt *r_xprt = mr->mr_xprt; - trace_xprtrdma_mr_recycle(mr); if (mr->mr_dir != DMA_NONE) { @@ -136,14 +99,40 @@ frwr_mr_recycle_worker(struct work_struct *work) mr->mr_dir = DMA_NONE; } - spin_lock(&r_xprt->rx_buf.rb_mrlock); + spin_lock(&r_xprt->rx_buf.rb_lock); list_del(&mr->mr_all); r_xprt->rx_stats.mrs_recycled++; - spin_unlock(&r_xprt->rx_buf.rb_mrlock); + spin_unlock(&r_xprt->rx_buf.rb_lock); frwr_release_mr(mr); } +/* MRs are dynamically allocated, so simply clean up and release the MR. + * A replacement MR will subsequently be allocated on demand. + */ +static void +frwr_mr_recycle_worker(struct work_struct *work) +{ + struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, + mr_recycle); + + frwr_mr_recycle(mr->mr_xprt, mr); +} + +/* frwr_recycle - Discard MRs + * @req: request to reset + * + * Used after a reconnect. These MRs could be in flight, we can't + * tell. Safe thing to do is release them. + */ +void frwr_recycle(struct rpcrdma_req *req) +{ + struct rpcrdma_mr *mr; + + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) + frwr_mr_recycle(mr->mr_xprt, mr); +} + /* frwr_reset - Place MRs back on the free list * @req: request to reset * @@ -156,12 +145,10 @@ frwr_mr_recycle_worker(struct work_struct *work) */ void frwr_reset(struct rpcrdma_req *req) { - while (!list_empty(&req->rl_registered)) { - struct rpcrdma_mr *mr; + struct rpcrdma_mr *mr; - mr = rpcrdma_mr_pop(&req->rl_registered); - rpcrdma_mr_unmap_and_put(mr); - } + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) + rpcrdma_mr_put(mr); } /** @@ -179,11 +166,14 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) struct ib_mr *frmr; int rc; + /* NB: ib_alloc_mr and device drivers typically allocate + * memory with GFP_KERNEL. + */ frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth); if (IS_ERR(frmr)) goto out_mr_err; - sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL); + sg = kcalloc(depth, sizeof(*sg), GFP_NOFS); if (!sg) goto out_list_err; @@ -203,8 +193,6 @@ out_mr_err: return rc; out_list_err: - dprintk("RPC: %s: sg allocation failure\n", - __func__); ib_dereg_mr(frmr); return -ENOMEM; } @@ -290,8 +278,8 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep) ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ - ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS / - ia->ri_max_frwr_depth); + ia->ri_max_segs = + DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth); /* Reply chunks require segments for head and tail buffers */ ia->ri_max_segs += 2; if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS) @@ -323,31 +311,25 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt) * @nsegs: number of segments remaining * @writing: true when RDMA Write will be used * @xid: XID of RPC using the registered memory - * @out: initialized MR + * @mr: MR to fill in * * Prepare a REG_MR Work Request to register a memory region * for remote access via RDMA READ or RDMA WRITE. * * Returns the next segment or a negative errno pointer. - * On success, the prepared MR is planted in @out. + * On success, @mr is filled in. */ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, __be32 xid, - struct rpcrdma_mr **out) + struct rpcrdma_mr *mr) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS; - struct rpcrdma_mr *mr; - struct ib_mr *ibmr; struct ib_reg_wr *reg_wr; + struct ib_mr *ibmr; int i, n; u8 key; - mr = rpcrdma_mr_get(r_xprt); - if (!mr) - goto out_getmr_err; - if (nsegs > ia->ri_max_frwr_depth) nsegs = ia->ri_max_frwr_depth; for (i = 0; i < nsegs;) { @@ -362,7 +344,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, ++seg; ++i; - if (holes_ok) + if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS) continue; if ((i < nsegs && offset_in_page(seg->mr_offset)) || offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) @@ -397,22 +379,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, mr->mr_offset = ibmr->iova; trace_xprtrdma_mr_map(mr); - *out = mr; return seg; -out_getmr_err: - xprt_wait_for_buffer_space(&r_xprt->rx_xprt); - return ERR_PTR(-EAGAIN); - out_dmamap_err: mr->mr_dir = DMA_NONE; trace_xprtrdma_frwr_sgerr(mr, i); - rpcrdma_mr_put(mr); return ERR_PTR(-EIO); out_mapmr_err: trace_xprtrdma_frwr_maperr(mr, n); - rpcrdma_mr_recycle(mr); return ERR_PTR(-EIO); } @@ -485,7 +460,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); trace_xprtrdma_mr_remoteinv(mr); - rpcrdma_mr_unmap_and_put(mr); + rpcrdma_mr_put(mr); break; /* only one invalidated MR per RPC */ } } @@ -495,7 +470,7 @@ static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr) if (wc->status != IB_WC_SUCCESS) rpcrdma_mr_recycle(mr); else - rpcrdma_mr_unmap_and_put(mr); + rpcrdma_mr_put(mr); } /** @@ -532,8 +507,8 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_li_wake(wc, frwr); - complete(&frwr->fr_linv_done); __frwr_release_mr(wc, mr); + complete(&frwr->fr_linv_done); } /** @@ -562,8 +537,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ frwr = NULL; prev = &first; - while (!list_empty(&req->rl_registered)) { - mr = rpcrdma_mr_pop(&req->rl_registered); + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; @@ -632,11 +606,15 @@ static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, fr_cqe); struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_rep *rep = mr->mr_req->rl_reply; /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_li_done(wc, frwr); - rpcrdma_complete_rqst(frwr->fr_req->rl_reply); __frwr_release_mr(wc, mr); + + /* Ensure @rep is generated before __frwr_release_mr */ + smp_rmb(); + rpcrdma_complete_rqst(rep); } /** @@ -662,15 +640,13 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ frwr = NULL; prev = &first; - while (!list_empty(&req->rl_registered)) { - mr = rpcrdma_mr_pop(&req->rl_registered); + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; frwr = &mr->frwr; frwr->fr_cqe.done = frwr_wc_localinv; - frwr->fr_req = req; last = &frwr->fr_invwr; last->next = NULL; last->wr_cqe = &frwr->fr_cqe; diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 4345e6912392..b86b5fd62d9f 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -342,6 +342,32 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, return 0; } +static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpcrdma_mr_seg *seg, + int nsegs, bool writing, + struct rpcrdma_mr **mr) +{ + *mr = rpcrdma_mr_pop(&req->rl_free_mrs); + if (!*mr) { + *mr = rpcrdma_mr_get(r_xprt); + if (!*mr) + goto out_getmr_err; + trace_xprtrdma_mr_get(req); + (*mr)->mr_req = req; + } + + rpcrdma_mr_push(*mr, &req->rl_registered); + return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); + +out_getmr_err: + trace_xprtrdma_nomrs(req); + xprt_wait_for_buffer_space(&r_xprt->rx_xprt); + if (r_xprt->rx_ep.rep_connected != -ENODEV) + schedule_work(&r_xprt->rx_buf.rb_refresh_worker); + return ERR_PTR(-EAGAIN); +} + /* Register and XDR encode the Read list. Supports encoding a list of read * segments that belong to a single read chunk. * @@ -356,9 +382,10 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, * * Only a single @pos value is currently supported. */ -static noinline int -rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype) +static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype rtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -379,10 +406,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return nsegs; do { - seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_read_segment(xdr, mr, pos) < 0) return -EMSGSIZE; @@ -411,9 +437,10 @@ done: * * Only a single Write chunk is currently supported. */ -static noinline int -rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) +static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -440,10 +467,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; @@ -474,9 +500,10 @@ done: * Returns zero on success, or a negative errno if a failure occurred. * @xdr is advanced to the next position in the stream. */ -static noinline int -rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) +static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -501,10 +528,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; @@ -841,12 +867,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) * chunks. Very likely the connection has been replaced, * so these registrations are invalid and unusable. */ - while (unlikely(!list_empty(&req->rl_registered))) { - struct rpcrdma_mr *mr; - - mr = rpcrdma_mr_pop(&req->rl_registered); - rpcrdma_mr_recycle(mr); - } + frwr_recycle(req); /* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: @@ -1240,8 +1261,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) struct rpc_rqst *rqst = rep->rr_rqst; int status; - xprt->reestablish_timeout = 0; - switch (rep->rr_proc) { case rdma_msg: status = rpcrdma_decode_msg(r_xprt, rep, rqst); @@ -1300,6 +1319,12 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) u32 credits; __be32 *p; + /* Any data means we had a useful conversation, so + * then we don't need to delay the next reconnect. + */ + if (xprt->reestablish_timeout) + xprt->reestablish_timeout = 0; + /* Fixed transport header fields */ xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, rep->rr_hdrbuf.head[0].iov_base, NULL); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 2ec349ed4770..160558b4135e 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -423,8 +423,6 @@ void xprt_rdma_close(struct rpc_xprt *xprt) if (ep->rep_connected == -ENODEV) return; - if (ep->rep_connected > 0) - xprt->reestablish_timeout = 0; rpcrdma_ep_disconnect(ep, ia); /* Prepare @xprt for the next connection by reinitializing @@ -434,6 +432,7 @@ void xprt_rdma_close(struct rpc_xprt *xprt) xprt->cwnd = RPC_CWNDSHIFT; out: + xprt->reestablish_timeout = 0; ++xprt->connect_cookie; xprt_disconnect_done(xprt); } @@ -494,9 +493,9 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) * @reconnect_timeout: reconnect timeout after server disconnects * */ -static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt, - unsigned long connect_timeout, - unsigned long reconnect_timeout) +static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt, + unsigned long connect_timeout, + unsigned long reconnect_timeout) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); @@ -571,6 +570,7 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) return; out_sleep: + set_bit(XPRT_CONGESTED, &xprt->state); rpc_sleep_on(&xprt->backlog, task, NULL); task->tk_status = -EAGAIN; } @@ -589,7 +589,8 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst) memset(rqst, 0, sizeof(*rqst)); rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst)); - rpc_wake_up_next(&xprt->backlog); + if (unlikely(!rpc_wake_up_next(&xprt->backlog))) + clear_bit(XPRT_CONGESTED, &xprt->state); } static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt, @@ -803,7 +804,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = { .send_request = xprt_rdma_send_request, .close = xprt_rdma_close, .destroy = xprt_rdma_destroy, - .set_connect_timeout = xprt_rdma_tcp_set_connect_timeout, + .set_connect_timeout = xprt_rdma_set_connect_timeout, .print_stats = xprt_rdma_print_stats, .enable_swap = xprt_rdma_enable_swap, .disable_swap = xprt_rdma_disable_swap, diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index b10aa16557f0..3a907537e2cf 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -53,6 +53,7 @@ #include <linux/slab.h> #include <linux/sunrpc/addr.h> #include <linux/sunrpc/svc_rdma.h> +#include <linux/log2.h> #include <asm-generic/barrier.h> #include <asm/bitops.h> @@ -74,8 +75,10 @@ * internal functions */ static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); +static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf); static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); +static void rpcrdma_mr_free(struct rpcrdma_mr *mr); static struct rpcrdma_regbuf * rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction, gfp_t flags); @@ -405,9 +408,8 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) struct rpcrdma_ep *ep = &r_xprt->rx_ep; struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_req *req; - struct rpcrdma_rep *rep; - cancel_delayed_work_sync(&buf->rb_refresh_worker); + cancel_work_sync(&buf->rb_refresh_worker); /* This is similar to rpcrdma_ep_destroy, but: * - Don't cancel the connect worker. @@ -429,8 +431,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) /* The ULP is responsible for ensuring all DMA * mappings and MRs are gone. */ - list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) - rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf); + rpcrdma_reps_destroy(buf); list_for_each_entry(req, &buf->rb_allreqs, rl_all) { rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf); rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); @@ -604,10 +605,10 @@ void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt) * Unlike a normal reconnection, a fresh PD and a new set * of MRs and buffers is needed. */ -static int -rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) +static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, + struct ib_qp_init_attr *qp_init_attr) { + struct rpcrdma_ia *ia = &r_xprt->rx_ia; int rc, err; trace_xprtrdma_reinsert(r_xprt); @@ -624,7 +625,7 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, } rc = -ENETUNREACH; - err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); + err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr); if (err) { pr_err("rpcrdma: rdma_create_qp returned %d\n", err); goto out3; @@ -641,16 +642,16 @@ out1: return rc; } -static int -rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, - struct rpcrdma_ia *ia) +static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, + struct ib_qp_init_attr *qp_init_attr) { + struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rdma_cm_id *id, *old; int err, rc; trace_xprtrdma_reconnect(r_xprt); - rpcrdma_ep_disconnect(ep, ia); + rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia); rc = -EHOSTUNREACH; id = rpcrdma_create_id(r_xprt, ia); @@ -672,7 +673,7 @@ rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, goto out_destroy; } - err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); + err = rdma_create_qp(id, ia->ri_pd, qp_init_attr); if (err) goto out_destroy; @@ -697,25 +698,27 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct ib_qp_init_attr qp_init_attr; int rc; retry: + memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr)); switch (ep->rep_connected) { case 0: dprintk("RPC: %s: connecting...\n", __func__); - rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); + rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr); if (rc) { rc = -ENETUNREACH; goto out_noupdate; } break; case -ENODEV: - rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); + rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr); if (rc) goto out_noupdate; break; default: - rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); + rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr); if (rc) goto out; } @@ -729,6 +732,8 @@ retry: if (rc) goto out; + if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) + xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); if (ep->rep_connected <= 0) { if (ep->rep_connected == -EAGAIN) @@ -942,14 +947,12 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; unsigned int count; - LIST_HEAD(free); - LIST_HEAD(all); for (count = 0; count < ia->ri_max_segs; count++) { struct rpcrdma_mr *mr; int rc; - mr = kzalloc(sizeof(*mr), GFP_KERNEL); + mr = kzalloc(sizeof(*mr), GFP_NOFS); if (!mr) break; @@ -961,15 +964,13 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) mr->mr_xprt = r_xprt; - list_add(&mr->mr_list, &free); - list_add(&mr->mr_all, &all); + spin_lock(&buf->rb_lock); + list_add(&mr->mr_list, &buf->rb_mrs); + list_add(&mr->mr_all, &buf->rb_all_mrs); + spin_unlock(&buf->rb_lock); } - spin_lock(&buf->rb_mrlock); - list_splice(&free, &buf->rb_mrs); - list_splice(&all, &buf->rb_all); r_xprt->rx_stats.mrs_allocated += count; - spin_unlock(&buf->rb_mrlock); trace_xprtrdma_createmrs(r_xprt, count); } @@ -977,7 +978,7 @@ static void rpcrdma_mr_refresh_worker(struct work_struct *work) { struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, - rb_refresh_worker.work); + rb_refresh_worker); struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); @@ -999,12 +1000,18 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; struct rpcrdma_regbuf *rb; struct rpcrdma_req *req; + size_t maxhdrsize; req = kzalloc(sizeof(*req), flags); if (req == NULL) goto out1; - rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags); + /* Compute maximum header buffer size in bytes */ + maxhdrsize = rpcrdma_fixed_maxsz + 3 + + r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz; + maxhdrsize *= sizeof(__be32); + rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize), + DMA_TO_DEVICE, flags); if (!rb) goto out2; req->rl_rdmabuf = rb; @@ -1018,6 +1025,7 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, if (!req->rl_recvbuf) goto out4; + INIT_LIST_HEAD(&req->rl_free_mrs); INIT_LIST_HEAD(&req->rl_registered); spin_lock(&buffer->rb_lock); list_add(&req->rl_all, &buffer->rb_allreqs); @@ -1065,6 +1073,40 @@ out: return NULL; } +static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) +{ + rpcrdma_regbuf_free(rep->rr_rdmabuf); + kfree(rep); +} + +static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) +{ + struct llist_node *node; + + /* Calls to llist_del_first are required to be serialized */ + node = llist_del_first(&buf->rb_free_reps); + if (!node) + return NULL; + return llist_entry(node, struct rpcrdma_rep, rr_node); +} + +static void rpcrdma_rep_put(struct rpcrdma_buffer *buf, + struct rpcrdma_rep *rep) +{ + if (!rep->rr_temp) + llist_add(&rep->rr_node, &buf->rb_free_reps); + else + rpcrdma_rep_destroy(rep); +} + +static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_rep *rep; + + while ((rep = rpcrdma_rep_get_locked(buf)) != NULL) + rpcrdma_rep_destroy(rep); +} + /** * rpcrdma_buffer_create - Create initial set of req/rep objects * @r_xprt: transport instance to (re)initialize @@ -1078,12 +1120,10 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests; buf->rb_bc_srv_max_requests = 0; - spin_lock_init(&buf->rb_mrlock); spin_lock_init(&buf->rb_lock); INIT_LIST_HEAD(&buf->rb_mrs); - INIT_LIST_HEAD(&buf->rb_all); - INIT_DELAYED_WORK(&buf->rb_refresh_worker, - rpcrdma_mr_refresh_worker); + INIT_LIST_HEAD(&buf->rb_all_mrs); + INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker); rpcrdma_mrs_create(r_xprt); @@ -1102,7 +1142,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) } buf->rb_credits = 1; - INIT_LIST_HEAD(&buf->rb_recv_bufs); + init_llist_head(&buf->rb_free_reps); rc = rpcrdma_sendctxs_create(r_xprt); if (rc) @@ -1114,12 +1154,6 @@ out: return rc; } -static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) -{ - rpcrdma_regbuf_free(rep->rr_rdmabuf); - kfree(rep); -} - /** * rpcrdma_req_destroy - Destroy an rpcrdma_req object * @req: unused object to be destroyed @@ -1127,11 +1161,13 @@ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) * This function assumes that the caller prevents concurrent device * unload and transport tear-down. */ -void -rpcrdma_req_destroy(struct rpcrdma_req *req) +void rpcrdma_req_destroy(struct rpcrdma_req *req) { list_del(&req->rl_all); + while (!list_empty(&req->rl_free_mrs)) + rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs)); + rpcrdma_regbuf_free(req->rl_recvbuf); rpcrdma_regbuf_free(req->rl_sendbuf); rpcrdma_regbuf_free(req->rl_rdmabuf); @@ -1147,25 +1183,19 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) unsigned int count; count = 0; - spin_lock(&buf->rb_mrlock); - while (!list_empty(&buf->rb_all)) { - mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all); + spin_lock(&buf->rb_lock); + while ((mr = list_first_entry_or_null(&buf->rb_all_mrs, + struct rpcrdma_mr, + mr_all)) != NULL) { list_del(&mr->mr_all); - - spin_unlock(&buf->rb_mrlock); - - /* Ensure MW is not on any rl_registered list */ - if (!list_empty(&mr->mr_list)) - list_del(&mr->mr_list); + spin_unlock(&buf->rb_lock); frwr_release_mr(mr); count++; - spin_lock(&buf->rb_mrlock); + spin_lock(&buf->rb_lock); } - spin_unlock(&buf->rb_mrlock); + spin_unlock(&buf->rb_lock); r_xprt->rx_stats.mrs_allocated = 0; - - dprintk("RPC: %s: released %u MRs\n", __func__, count); } /** @@ -1179,18 +1209,10 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { - cancel_delayed_work_sync(&buf->rb_refresh_worker); + cancel_work_sync(&buf->rb_refresh_worker); rpcrdma_sendctxs_destroy(buf); - - while (!list_empty(&buf->rb_recv_bufs)) { - struct rpcrdma_rep *rep; - - rep = list_first_entry(&buf->rb_recv_bufs, - struct rpcrdma_rep, rr_list); - list_del(&rep->rr_list); - rpcrdma_rep_destroy(rep); - } + rpcrdma_reps_destroy(buf); while (!list_empty(&buf->rb_send_bufs)) { struct rpcrdma_req *req; @@ -1215,54 +1237,20 @@ struct rpcrdma_mr * rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_mr *mr = NULL; - - spin_lock(&buf->rb_mrlock); - if (!list_empty(&buf->rb_mrs)) - mr = rpcrdma_mr_pop(&buf->rb_mrs); - spin_unlock(&buf->rb_mrlock); + struct rpcrdma_mr *mr; - if (!mr) - goto out_nomrs; + spin_lock(&buf->rb_lock); + mr = rpcrdma_mr_pop(&buf->rb_mrs); + spin_unlock(&buf->rb_lock); return mr; - -out_nomrs: - trace_xprtrdma_nomrs(r_xprt); - if (r_xprt->rx_ep.rep_connected != -ENODEV) - schedule_delayed_work(&buf->rb_refresh_worker, 0); - - /* Allow the reply handler and refresh worker to run */ - cond_resched(); - - return NULL; -} - -static void -__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr) -{ - spin_lock(&buf->rb_mrlock); - rpcrdma_mr_push(mr, &buf->rb_mrs); - spin_unlock(&buf->rb_mrlock); -} - -/** - * rpcrdma_mr_put - Release an rpcrdma_mr object - * @mr: object to release - * - */ -void -rpcrdma_mr_put(struct rpcrdma_mr *mr) -{ - __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr); } /** - * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it - * @mr: object to release + * rpcrdma_mr_put - DMA unmap an MR and release it + * @mr: MR to release * */ -void -rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) +void rpcrdma_mr_put(struct rpcrdma_mr *mr) { struct rpcrdma_xprt *r_xprt = mr->mr_xprt; @@ -1272,7 +1260,19 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) mr->mr_sg, mr->mr_nents, mr->mr_dir); mr->mr_dir = DMA_NONE; } - __rpcrdma_mr_put(&r_xprt->rx_buf, mr); + + rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs); +} + +static void rpcrdma_mr_free(struct rpcrdma_mr *mr) +{ + struct rpcrdma_xprt *r_xprt = mr->mr_xprt; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + + mr->mr_req = NULL; + spin_lock(&buf->rb_lock); + rpcrdma_mr_push(mr, &buf->rb_mrs); + spin_unlock(&buf->rb_lock); } /** @@ -1303,39 +1303,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) */ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) { - struct rpcrdma_rep *rep = req->rl_reply; - + if (req->rl_reply) + rpcrdma_rep_put(buffers, req->rl_reply); req->rl_reply = NULL; spin_lock(&buffers->rb_lock); list_add(&req->rl_list, &buffers->rb_send_bufs); - if (rep) { - if (!rep->rr_temp) { - list_add(&rep->rr_list, &buffers->rb_recv_bufs); - rep = NULL; - } - } spin_unlock(&buffers->rb_lock); - if (rep) - rpcrdma_rep_destroy(rep); } -/* - * Put reply buffers back into pool when not attached to - * request. This happens in error conditions. +/** + * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list + * @rep: rep to release + * + * Used after error conditions. */ -void -rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) +void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) { - struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; - - if (!rep->rr_temp) { - spin_lock(&buffers->rb_lock); - list_add(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock(&buffers->rb_lock); - } else { - rpcrdma_rep_destroy(rep); - } + rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep); } /* Returns a pointer to a rpcrdma_regbuf object, or NULL. @@ -1483,7 +1468,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) count = 0; needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); - if (ep->rep_receive_count > needed) + if (likely(ep->rep_receive_count > needed)) goto out; needed -= ep->rep_receive_count; if (!temp) @@ -1491,22 +1476,10 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) /* fast path: all needed reps can be found on the free list */ wr = NULL; - spin_lock(&buf->rb_lock); while (needed) { - rep = list_first_entry_or_null(&buf->rb_recv_bufs, - struct rpcrdma_rep, rr_list); + rep = rpcrdma_rep_get_locked(buf); if (!rep) - break; - - list_del(&rep->rr_list); - rep->rr_recv_wr.next = wr; - wr = &rep->rr_recv_wr; - --needed; - } - spin_unlock(&buf->rb_lock); - - while (needed) { - rep = rpcrdma_rep_create(r_xprt, temp); + rep = rpcrdma_rep_create(r_xprt, temp); if (!rep) break; @@ -1523,7 +1496,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) goto release_wrs; - trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe); + trace_xprtrdma_post_recv(rep); ++count; } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 92ce09fcea74..65e6b0eb862e 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -47,6 +47,7 @@ #include <linux/atomic.h> /* atomic_t, etc */ #include <linux/kref.h> /* struct kref */ #include <linux/workqueue.h> /* struct work_struct */ +#include <linux/llist.h> #include <rdma/rdma_cm.h> /* RDMA connection api */ #include <rdma/ib_verbs.h> /* RDMA verbs api */ @@ -117,9 +118,6 @@ struct rpcrdma_ep { #endif /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV - * - * The below structure appears at the front of a large region of kmalloc'd - * memory, which always starts on a good alignment boundary. */ struct rpcrdma_regbuf { @@ -158,25 +156,22 @@ static inline void *rdmab_data(const struct rpcrdma_regbuf *rb) /* To ensure a transport can always make forward progress, * the number of RDMA segments allowed in header chunk lists - * is capped at 8. This prevents less-capable devices and - * memory registrations from overrunning the Send buffer - * while building chunk lists. + * is capped at 16. This prevents less-capable devices from + * overrunning the Send buffer while building chunk lists. * * Elements of the Read list take up more room than the - * Write list or Reply chunk. 8 read segments means the Read - * list (or Write list or Reply chunk) cannot consume more - * than - * - * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes. + * Write list or Reply chunk. 16 read segments means the + * chunk lists cannot consume more than * - * And the fixed part of the header is another 24 bytes. + * ((16 + 2) * read segment size) + 1 XDR words, * - * The smallest inline threshold is 1024 bytes, ensuring that - * at least 750 bytes are available for RPC messages. + * or about 400 bytes. The fixed part of the header is + * another 24 bytes. Thus when the inline threshold is + * 1024 bytes, at least 600 bytes are available for RPC + * message bodies. */ enum { - RPCRDMA_MAX_HDR_SEGS = 8, - RPCRDMA_HDRBUF_SIZE = 256, + RPCRDMA_MAX_HDR_SEGS = 16, }; /* @@ -206,7 +201,7 @@ struct rpcrdma_rep { struct rpc_rqst *rr_rqst; struct xdr_buf rr_hdrbuf; struct xdr_stream rr_stream; - struct list_head rr_list; + struct llist_node rr_node; struct ib_recv_wr rr_recv_wr; }; @@ -240,20 +235,20 @@ struct rpcrdma_sendctx { * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). */ -struct rpcrdma_req; struct rpcrdma_frwr { struct ib_mr *fr_mr; struct ib_cqe fr_cqe; struct completion fr_linv_done; - struct rpcrdma_req *fr_req; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; }; }; +struct rpcrdma_req; struct rpcrdma_mr { struct list_head mr_list; + struct rpcrdma_req *mr_req; struct scatterlist *mr_sg; int mr_nents; enum dma_data_direction mr_dir; @@ -331,7 +326,8 @@ struct rpcrdma_req { struct list_head rl_all; struct kref rl_kref; - struct list_head rl_registered; /* registered segments */ + struct list_head rl_free_mrs; + struct list_head rl_registered; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; @@ -344,7 +340,7 @@ rpcr_to_rdmar(const struct rpc_rqst *rqst) static inline void rpcrdma_mr_push(struct rpcrdma_mr *mr, struct list_head *list) { - list_add_tail(&mr->mr_list, list); + list_add(&mr->mr_list, list); } static inline struct rpcrdma_mr * @@ -352,8 +348,9 @@ rpcrdma_mr_pop(struct list_head *list) { struct rpcrdma_mr *mr; - mr = list_first_entry(list, struct rpcrdma_mr, mr_list); - list_del_init(&mr->mr_list); + mr = list_first_entry_or_null(list, struct rpcrdma_mr, mr_list); + if (mr) + list_del_init(&mr->mr_list); return mr; } @@ -364,19 +361,19 @@ rpcrdma_mr_pop(struct list_head *list) * One of these is associated with a transport instance */ struct rpcrdma_buffer { - spinlock_t rb_mrlock; /* protect rb_mrs list */ + spinlock_t rb_lock; + struct list_head rb_send_bufs; struct list_head rb_mrs; - struct list_head rb_all; unsigned long rb_sc_head; unsigned long rb_sc_tail; unsigned long rb_sc_last; struct rpcrdma_sendctx **rb_sc_ctxs; - spinlock_t rb_lock; /* protect buf lists */ - struct list_head rb_send_bufs; - struct list_head rb_recv_bufs; struct list_head rb_allreqs; + struct list_head rb_all_mrs; + + struct llist_head rb_free_reps; u32 rb_max_requests; u32 rb_credits; /* most recent credit grant */ @@ -384,7 +381,7 @@ struct rpcrdma_buffer { u32 rb_bc_srv_max_requests; u32 rb_bc_max_requests; - struct delayed_work rb_refresh_worker; + struct work_struct rb_refresh_worker; }; /* @@ -490,7 +487,6 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt); struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt); void rpcrdma_mr_put(struct rpcrdma_mr *mr); -void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr); static inline void rpcrdma_mr_recycle(struct rpcrdma_mr *mr) @@ -546,6 +542,7 @@ rpcrdma_data_dir(bool writing) /* Memory registration calls xprtrdma/frwr_ops.c */ bool frwr_is_supported(struct ib_device *device); +void frwr_recycle(struct rpcrdma_req *req); void frwr_reset(struct rpcrdma_req *req); int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep); int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr); @@ -554,7 +551,7 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt); struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, __be32 xid, - struct rpcrdma_mr **mr); + struct rpcrdma_mr *mr); int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index e2176c167a57..9ac88722fa83 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -562,10 +562,14 @@ xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) printk(KERN_WARNING "Callback slot table overflowed\n"); return -ESHUTDOWN; } + if (transport->recv.copied && !req->rq_private_buf.len) + return -ESHUTDOWN; ret = xs_read_stream_request(transport, msg, flags, req); if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) xprt_complete_bc_request(req, transport->recv.copied); + else + req->rq_private_buf.len = transport->recv.copied; return ret; } @@ -587,7 +591,7 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) /* Look up and lock the request corresponding to the given XID */ spin_lock(&xprt->queue_lock); req = xprt_lookup_rqst(xprt, transport->recv.xid); - if (!req) { + if (!req || (transport->recv.copied && !req->rq_private_buf.len)) { msg->msg_flags |= MSG_TRUNC; goto out; } @@ -599,6 +603,8 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) spin_lock(&xprt->queue_lock); if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) xprt_complete_rqst(req->rq_task, transport->recv.copied); + else + req->rq_private_buf.len = transport->recv.copied; xprt_unpin_rqst(req); out: spin_unlock(&xprt->queue_lock); |