diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2021-08-31 10:57:06 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2021-08-31 10:57:06 -0700 |
commit | 8bda95577627dc0633c48d581ea3605c27efe829 (patch) | |
tree | 6fbf3fb021e9ee67d4f14eb19f3ff2059912e506 /net/sunrpc | |
parent | 4529fb1546b9cd3f5dbd8a36595aa4159331c963 (diff) | |
parent | 0bcc7ca40bd823193224e9f38bafbd8325aaf566 (diff) |
Merge tag 'nfsd-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux
Pull nfsd updates from Chuck Lever:
"New features:
- Support for server-side disconnect injection via debugfs
- Protocol definitions for new RPC_AUTH_TLS authentication flavor
Performance improvements:
- Reduce page allocator traffic in the NFSD splice read actor
- Reduce CPU utilization in svcrdma's Send completion handler
Notable bug fixes:
- Stabilize lockd operation when re-exporting NFS mounts
- Fix the use of %.*s in NFSD tracepoints
- Fix /proc/sys/fs/nfs/nsm_use_hostnames"
* tag 'nfsd-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux: (31 commits)
nfsd: fix crash on LOCKT on reexported NFSv3
nfs: don't allow reexport reclaims
lockd: don't attempt blocking locks on nfs reexports
nfs: don't atempt blocking locks on nfs reexports
Keep read and write fds with each nlm_file
lockd: update nlm_lookup_file reexport comment
nlm: minor refactoring
nlm: minor nlm_lookup_file argument change
lockd: lockd server-side shouldn't set fl_ops
SUNRPC: Add documentation for the fail_sunrpc/ directory
SUNRPC: Server-side disconnect injection
SUNRPC: Move client-side disconnect injection
SUNRPC: Add a /sys/kernel/debug/fail_sunrpc/ directory
svcrdma: xpt_bc_xprt is already clear in __svc_rdma_free()
nfsd4: Fix forced-expiry locking
rpc: fix gss_svc_init cleanup on failure
SUNRPC: Add RPC_AUTH_TLS protocol numbers
lockd: change the proc_handler for nsm_use_hostnames
sysctl: introduce new proc handler proc_dobool
SUNRPC: Fix a NULL pointer deref in trace_svc_stats_latency()
...
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth_gss/svcauth_gss.c | 2 | ||||
-rw-r--r-- | net/sunrpc/debugfs.c | 73 | ||||
-rw-r--r-- | net/sunrpc/fail.h | 25 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 44 | ||||
-rw-r--r-- | net/sunrpc/svc_xprt.c | 3 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 14 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_rw.c | 56 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 41 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 11 |
9 files changed, 169 insertions, 100 deletions
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c index a81be45f40d9..3d685fe328fa 100644 --- a/net/sunrpc/auth_gss/svcauth_gss.c +++ b/net/sunrpc/auth_gss/svcauth_gss.c @@ -1980,7 +1980,7 @@ gss_svc_init_net(struct net *net) goto out2; return 0; out2: - destroy_use_gss_proxy_proc_entry(net); + rsi_cache_destroy_net(net); out1: rsc_cache_destroy_net(net); return rv; diff --git a/net/sunrpc/debugfs.c b/net/sunrpc/debugfs.c index 56029e3af6ff..827bf3a28178 100644 --- a/net/sunrpc/debugfs.c +++ b/net/sunrpc/debugfs.c @@ -8,14 +8,14 @@ #include <linux/debugfs.h> #include <linux/sunrpc/sched.h> #include <linux/sunrpc/clnt.h> + #include "netns.h" +#include "fail.h" static struct dentry *topdir; static struct dentry *rpc_clnt_dir; static struct dentry *rpc_xprt_dir; -unsigned int rpc_inject_disconnect; - static int tasks_show(struct seq_file *f, void *v) { @@ -235,8 +235,6 @@ rpc_xprt_debugfs_register(struct rpc_xprt *xprt) /* make tasks file */ debugfs_create_file("info", S_IFREG | 0400, xprt->debugfs, xprt, &xprt_info_fops); - - atomic_set(&xprt->inject_disconnect, rpc_inject_disconnect); } void @@ -246,56 +244,30 @@ rpc_xprt_debugfs_unregister(struct rpc_xprt *xprt) xprt->debugfs = NULL; } -static int -fault_open(struct inode *inode, struct file *filp) -{ - filp->private_data = kmalloc(128, GFP_KERNEL); - if (!filp->private_data) - return -ENOMEM; - return 0; -} +#if IS_ENABLED(CONFIG_FAIL_SUNRPC) +struct fail_sunrpc_attr fail_sunrpc = { + .attr = FAULT_ATTR_INITIALIZER, +}; +EXPORT_SYMBOL_GPL(fail_sunrpc); -static int -fault_release(struct inode *inode, struct file *filp) +static void fail_sunrpc_init(void) { - kfree(filp->private_data); - return 0; -} + struct dentry *dir; -static ssize_t -fault_disconnect_read(struct file *filp, char __user *user_buf, - size_t len, loff_t *offset) -{ - char *buffer = (char *)filp->private_data; - size_t size; + dir = fault_create_debugfs_attr("fail_sunrpc", NULL, + &fail_sunrpc.attr); - size = sprintf(buffer, "%u\n", rpc_inject_disconnect); - return simple_read_from_buffer(user_buf, len, offset, buffer, size); -} + debugfs_create_bool("ignore-client-disconnect", S_IFREG | 0600, dir, + &fail_sunrpc.ignore_client_disconnect); -static ssize_t -fault_disconnect_write(struct file *filp, const char __user *user_buf, - size_t len, loff_t *offset) + debugfs_create_bool("ignore-server-disconnect", S_IFREG | 0600, dir, + &fail_sunrpc.ignore_server_disconnect); +} +#else +static void fail_sunrpc_init(void) { - char buffer[16]; - - if (len >= sizeof(buffer)) - len = sizeof(buffer) - 1; - if (copy_from_user(buffer, user_buf, len)) - return -EFAULT; - buffer[len] = '\0'; - if (kstrtouint(buffer, 10, &rpc_inject_disconnect)) - return -EINVAL; - return len; } - -static const struct file_operations fault_disconnect_fops = { - .owner = THIS_MODULE, - .open = fault_open, - .read = fault_disconnect_read, - .write = fault_disconnect_write, - .release = fault_release, -}; +#endif void __exit sunrpc_debugfs_exit(void) @@ -309,16 +281,11 @@ sunrpc_debugfs_exit(void) void __init sunrpc_debugfs_init(void) { - struct dentry *rpc_fault_dir; - topdir = debugfs_create_dir("sunrpc", NULL); rpc_clnt_dir = debugfs_create_dir("rpc_clnt", topdir); rpc_xprt_dir = debugfs_create_dir("rpc_xprt", topdir); - rpc_fault_dir = debugfs_create_dir("inject_fault", topdir); - - debugfs_create_file("disconnect", S_IFREG | 0400, rpc_fault_dir, NULL, - &fault_disconnect_fops); + fail_sunrpc_init(); } diff --git a/net/sunrpc/fail.h b/net/sunrpc/fail.h new file mode 100644 index 000000000000..69dc30cc44b8 --- /dev/null +++ b/net/sunrpc/fail.h @@ -0,0 +1,25 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Copyright (C) 2021, Oracle. All rights reserved. + */ + +#ifndef _NET_SUNRPC_FAIL_H_ +#define _NET_SUNRPC_FAIL_H_ + +#include <linux/fault-inject.h> + +#if IS_ENABLED(CONFIG_FAULT_INJECTION) + +struct fail_sunrpc_attr { + struct fault_attr attr; + + bool ignore_client_disconnect; + + bool ignore_server_disconnect; +}; + +extern struct fail_sunrpc_attr fail_sunrpc; + +#endif /* CONFIG_FAULT_INJECTION */ + +#endif /* _NET_SUNRPC_FAIL_H_ */ diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 0de918cb3d90..bfcbaf7b3822 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -31,6 +31,8 @@ #include <trace/events/sunrpc.h> +#include "fail.h" + #define RPCDBG_FACILITY RPCDBG_SVCDSP static void svc_unregister(const struct svc_serv *serv, struct net *net); @@ -838,6 +840,27 @@ svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrser } EXPORT_SYMBOL_GPL(svc_set_num_threads_sync); +/** + * svc_rqst_replace_page - Replace one page in rq_pages[] + * @rqstp: svc_rqst with pages to replace + * @page: replacement page + * + * When replacing a page in rq_pages, batch the release of the + * replaced pages to avoid hammering the page allocator. + */ +void svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page) +{ + if (*rqstp->rq_next_page) { + if (!pagevec_space(&rqstp->rq_pvec)) + __pagevec_release(&rqstp->rq_pvec); + pagevec_add(&rqstp->rq_pvec, *rqstp->rq_next_page); + } + + get_page(page); + *(rqstp->rq_next_page++) = page; +} +EXPORT_SYMBOL_GPL(svc_rqst_replace_page); + /* * Called from a server thread as it's exiting. Caller must hold the "service * mutex" for the service. @@ -1503,6 +1526,12 @@ svc_process(struct svc_rqst *rqstp) struct svc_serv *serv = rqstp->rq_server; u32 dir; +#if IS_ENABLED(CONFIG_FAIL_SUNRPC) + if (!fail_sunrpc.ignore_server_disconnect && + should_fail(&fail_sunrpc.attr, 1)) + svc_xprt_deferred_close(rqstp->rq_xprt); +#endif + /* * Setup response xdr_buf. * Initially it has just one page @@ -1630,6 +1659,21 @@ u32 svc_max_payload(const struct svc_rqst *rqstp) EXPORT_SYMBOL_GPL(svc_max_payload); /** + * svc_proc_name - Return RPC procedure name in string form + * @rqstp: svc_rqst to operate on + * + * Return value: + * Pointer to a NUL-terminated string + */ +const char *svc_proc_name(const struct svc_rqst *rqstp) +{ + if (rqstp && rqstp->rq_procinfo) + return rqstp->rq_procinfo->pc_name; + return "unknown"; +} + + +/** * svc_encode_result_payload - mark a range of bytes as a result payload * @rqstp: svc_rqst to operate on * @offset: payload's byte offset in rqstp->rq_res diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index dbb41821b1b8..e1153cba9cc6 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -539,6 +539,7 @@ static void svc_xprt_release(struct svc_rqst *rqstp) kfree(rqstp->rq_deferred); rqstp->rq_deferred = NULL; + pagevec_release(&rqstp->rq_pvec); svc_free_res_pages(rqstp); rqstp->rq_res.page_len = 0; rqstp->rq_res.page_base = 0; @@ -664,6 +665,8 @@ static int svc_alloc_arg(struct svc_rqst *rqstp) struct xdr_buf *arg = &rqstp->rq_arg; unsigned long pages, filled; + pagevec_init(&rqstp->rq_pvec); + pages = (serv->sv_max_mesg + 2 * PAGE_SIZE) >> PAGE_SHIFT; if (pages > RPCSVC_MAXPAGES) { pr_warn_once("svc: warning: pages=%lu > RPCSVC_MAXPAGES=%lu\n", diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index fb6db09725c7..05abe344a269 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -56,6 +56,7 @@ #include "sunrpc.h" #include "sysfs.h" +#include "fail.h" /* * Local variables @@ -855,6 +856,19 @@ xprt_init_autodisconnect(struct timer_list *t) queue_work(xprtiod_workqueue, &xprt->task_cleanup); } +#if IS_ENABLED(CONFIG_FAIL_SUNRPC) +static void xprt_inject_disconnect(struct rpc_xprt *xprt) +{ + if (!fail_sunrpc.ignore_client_disconnect && + should_fail(&fail_sunrpc.attr, 1)) + xprt->ops->inject_disconnect(xprt); +} +#else +static inline void xprt_inject_disconnect(struct rpc_xprt *xprt) +{ +} +#endif + bool xprt_lock_connect(struct rpc_xprt *xprt, struct rpc_task *task, void *cookie) diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index 1e651447dc4e..e27433f08ca7 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -35,6 +35,7 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); * controlling svcxprt_rdma is destroyed. */ struct svc_rdma_rw_ctxt { + struct llist_node rw_node; struct list_head rw_list; struct rdma_rw_ctx rw_ctx; unsigned int rw_nents; @@ -53,19 +54,19 @@ static struct svc_rdma_rw_ctxt * svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) { struct svc_rdma_rw_ctxt *ctxt; + struct llist_node *node; spin_lock(&rdma->sc_rw_ctxt_lock); - - ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts); - if (ctxt) { - list_del(&ctxt->rw_list); - spin_unlock(&rdma->sc_rw_ctxt_lock); + node = llist_del_first(&rdma->sc_rw_ctxts); + spin_unlock(&rdma->sc_rw_ctxt_lock); + if (node) { + ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); } else { - spin_unlock(&rdma->sc_rw_ctxt_lock); ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE), GFP_KERNEL); if (!ctxt) goto out_noctx; + INIT_LIST_HEAD(&ctxt->rw_list); } @@ -83,14 +84,18 @@ out_noctx: return NULL; } -static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, - struct svc_rdma_rw_ctxt *ctxt) +static void __svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, + struct svc_rdma_rw_ctxt *ctxt, + struct llist_head *list) { sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE); + llist_add(&ctxt->rw_node, list); +} - spin_lock(&rdma->sc_rw_ctxt_lock); - list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts); - spin_unlock(&rdma->sc_rw_ctxt_lock); +static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, + struct svc_rdma_rw_ctxt *ctxt) +{ + __svc_rdma_put_rw_ctxt(rdma, ctxt, &rdma->sc_rw_ctxts); } /** @@ -101,9 +106,10 @@ static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) { struct svc_rdma_rw_ctxt *ctxt; + struct llist_node *node; - while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) { - list_del(&ctxt->rw_list); + while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) { + ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); kfree(ctxt); } } @@ -171,20 +177,35 @@ static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, cc->cc_sqecount = 0; } +/* + * The consumed rw_ctx's are cleaned and placed on a local llist so + * that only one atomic llist operation is needed to put them all + * back on the free list. + */ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, enum dma_data_direction dir) { struct svcxprt_rdma *rdma = cc->cc_rdma; + struct llist_node *first, *last; struct svc_rdma_rw_ctxt *ctxt; + LLIST_HEAD(free); + first = last = NULL; while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { list_del(&ctxt->rw_list); rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num, ctxt->rw_sg_table.sgl, ctxt->rw_nents, dir); - svc_rdma_put_rw_ctxt(rdma, ctxt); + __svc_rdma_put_rw_ctxt(rdma, ctxt, &free); + + ctxt->rw_node.next = first; + first = &ctxt->rw_node; + if (!last) + last = first; } + if (first) + llist_add_batch(first, last, &rdma->sc_rw_ctxts); } /* State for sending a Write or Reply chunk. @@ -248,8 +269,7 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) trace_svcrdma_wc_write(wc, &cc->cc_cid); - atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); - wake_up(&rdma->sc_send_wait); + svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); if (unlikely(wc->status != IB_WC_SUCCESS)) svc_xprt_deferred_close(&rdma->sc_xprt); @@ -304,9 +324,7 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) trace_svcrdma_wc_read(wc, &cc->cc_cid); - atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); - wake_up(&rdma->sc_send_wait); - + svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); cc->cc_status = wc->status; complete(&cc->cc_done); return; diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index d6bbafb773e1..599021b2391d 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -113,13 +113,6 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); -static inline struct svc_rdma_send_ctxt * -svc_rdma_next_send_ctxt(struct list_head *list) -{ - return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, - sc_list); -} - static void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma, struct rpc_rdma_cid *cid) { @@ -182,9 +175,10 @@ fail0: void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) { struct svc_rdma_send_ctxt *ctxt; + struct llist_node *node; - while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { - list_del(&ctxt->sc_list); + while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) { + ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node); ib_dma_unmap_single(rdma->sc_pd->device, ctxt->sc_sges[0].addr, rdma->sc_max_req_size, @@ -204,12 +198,13 @@ void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) { struct svc_rdma_send_ctxt *ctxt; + struct llist_node *node; spin_lock(&rdma->sc_send_lock); - ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); - if (!ctxt) + node = llist_del_first(&rdma->sc_send_ctxts); + if (!node) goto out_empty; - list_del(&ctxt->sc_list); + ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node); spin_unlock(&rdma->sc_send_lock); out: @@ -253,9 +248,21 @@ void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, ctxt->sc_sges[i].length); } - spin_lock(&rdma->sc_send_lock); - list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); - spin_unlock(&rdma->sc_send_lock); + llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts); +} + +/** + * svc_rdma_wake_send_waiters - manage Send Queue accounting + * @rdma: controlling transport + * @avail: Number of additional SQEs that are now available + * + */ +void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail) +{ + atomic_add(avail, &rdma->sc_sq_avail); + smp_mb__after_atomic(); + if (unlikely(waitqueue_active(&rdma->sc_send_wait))) + wake_up(&rdma->sc_send_wait); } /** @@ -275,11 +282,9 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) trace_svcrdma_wc_send(wc, &ctxt->sc_cid); + svc_rdma_wake_send_waiters(rdma, 1); complete(&ctxt->sc_done); - atomic_inc(&rdma->sc_sq_avail); - wake_up(&rdma->sc_send_wait); - if (unlikely(wc->status != IB_WC_SUCCESS)) svc_xprt_deferred_close(&rdma->sc_xprt); } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index d94b7759ada1..94b20fb47135 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -136,9 +136,9 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, svc_xprt_init(net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); INIT_LIST_HEAD(&cma_xprt->sc_accept_q); INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); - INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts); + init_llist_head(&cma_xprt->sc_send_ctxts); init_llist_head(&cma_xprt->sc_recv_ctxts); - INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); + init_llist_head(&cma_xprt->sc_rw_ctxts); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); @@ -545,7 +545,6 @@ static void __svc_rdma_free(struct work_struct *work) { struct svcxprt_rdma *rdma = container_of(work, struct svcxprt_rdma, sc_work); - struct svc_xprt *xprt = &rdma->sc_xprt; /* This blocks until the Completion Queues are empty */ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) @@ -553,12 +552,6 @@ static void __svc_rdma_free(struct work_struct *work) svc_rdma_flush_recv_queues(rdma); - /* Final put of backchannel client transport */ - if (xprt->xpt_bc_xprt) { - xprt_put(xprt->xpt_bc_xprt); - xprt->xpt_bc_xprt = NULL; - } - svc_rdma_destroy_rw_ctxts(rdma); svc_rdma_send_ctxts_destroy(rdma); svc_rdma_recv_ctxts_destroy(rdma); |