summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2023-07-19 14:31:03 -0400
committerChuck Lever <chuck.lever@oracle.com>2023-08-29 17:45:22 -0400
commit2eb2b93581813b74c7174961126f6ec38eadb5a7 (patch)
treefb853ebade59659620ed52b25bf3cc8bf58e0d18
parentd424797032c6e24b44037e6c7a2d32fd958300f0 (diff)
SUNRPC: Convert svc_tcp_sendmsg to use bio_vecs directly
Add a helper to convert a whole xdr_buf directly into an array of bio_vecs, then send this array instead of iterating piecemeal over the xdr_buf containing the outbound RPC message. Reviewed-by: David Howells <dhowells@redhat.com> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
-rw-r--r--include/linux/sunrpc/xdr.h2
-rw-r--r--net/sunrpc/svcsock.c64
-rw-r--r--net/sunrpc/xdr.c50
3 files changed, 72 insertions, 44 deletions
diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
index f89ec4b5ea16..42f9d7eb9a1a 100644
--- a/include/linux/sunrpc/xdr.h
+++ b/include/linux/sunrpc/xdr.h
@@ -139,6 +139,8 @@ void xdr_terminate_string(const struct xdr_buf *, const u32);
size_t xdr_buf_pagecount(const struct xdr_buf *buf);
int xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp);
void xdr_free_bvec(struct xdr_buf *buf);
+unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
+ const struct xdr_buf *xdr);
static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int len)
{
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 589020ed909d..90b1ab95c223 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -36,6 +36,8 @@
#include <linux/skbuff.h>
#include <linux/file.h>
#include <linux/freezer.h>
+#include <linux/bvec.h>
+
#include <net/sock.h>
#include <net/checksum.h>
#include <net/ip.h>
@@ -1194,77 +1196,52 @@ err_noclose:
return 0; /* record not complete */
}
-static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
- int flags)
-{
- struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };
-
- iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
- return sock_sendmsg(sock, &msg);
-}
-
/*
* MSG_SPLICE_PAGES is used exclusively to reduce the number of
* copy operations in this path. Therefore the caller must ensure
* that the pages backing @xdr are unchanging.
*
- * In addition, the logic assumes that * .bv_len is never larger
- * than PAGE_SIZE.
+ * Note that the send is non-blocking. The caller has incremented
+ * the reference count on each page backing the RPC message, and
+ * the network layer will "put" these pages when transmission is
+ * complete.
+ *
+ * This is safe for our RPC services because the memory backing
+ * the head and tail components is never kmalloc'd. These always
+ * come from pages in the svc_rqst::rq_pages array.
*/
-static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
+static int svc_tcp_sendmsg(struct svc_sock *svsk, struct svc_rqst *rqstp,
rpc_fraghdr marker, unsigned int *sentp)
{
- const struct kvec *head = xdr->head;
- const struct kvec *tail = xdr->tail;
struct kvec rm = {
.iov_base = &marker,
.iov_len = sizeof(marker),
};
struct msghdr msg = {
- .msg_flags = 0,
+ .msg_flags = MSG_MORE,
};
+ unsigned int count;
int ret;
*sentp = 0;
- ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
- if (ret < 0)
- return ret;
- ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
+ ret = kernel_sendmsg(svsk->sk_sock, &msg, &rm, 1, rm.iov_len);
if (ret < 0)
return ret;
*sentp += ret;
if (ret != rm.iov_len)
return -EAGAIN;
- ret = svc_tcp_send_kvec(sock, head, 0);
- if (ret < 0)
- return ret;
- *sentp += ret;
- if (ret != head->iov_len)
- goto out;
-
- if (xdr_buf_pagecount(xdr)) {
- xdr->bvec[0].bv_offset = offset_in_page(xdr->page_base);
- xdr->bvec[0].bv_len -= offset_in_page(xdr->page_base);
- }
+ count = xdr_buf_to_bvec(rqstp->rq_bvec, ARRAY_SIZE(rqstp->rq_bvec),
+ &rqstp->rq_res);
msg.msg_flags = MSG_SPLICE_PAGES;
- iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
- xdr_buf_pagecount(xdr), xdr->page_len);
- ret = sock_sendmsg(sock, &msg);
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
+ count, rqstp->rq_res.len);
+ ret = sock_sendmsg(svsk->sk_sock, &msg);
if (ret < 0)
return ret;
*sentp += ret;
-
- if (tail->iov_len) {
- ret = svc_tcp_send_kvec(sock, tail, 0);
- if (ret < 0)
- return ret;
- *sentp += ret;
- }
-
-out:
return 0;
}
@@ -1295,8 +1272,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
if (svc_xprt_is_dead(xprt))
goto out_notconn;
tcp_sock_set_cork(svsk->sk_sk, true);
- err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
- xdr_free_bvec(xdr);
+ err = svc_tcp_sendmsg(svsk, rqstp, marker, &sent);
trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
if (err < 0 || sent != (xdr->len + sizeof(marker)))
goto out_close;
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 2a22e78af116..358e6de91775 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -165,6 +165,56 @@ xdr_free_bvec(struct xdr_buf *buf)
}
/**
+ * xdr_buf_to_bvec - Copy components of an xdr_buf into a bio_vec array
+ * @bvec: bio_vec array to populate
+ * @bvec_size: element count of @bio_vec
+ * @xdr: xdr_buf to be copied
+ *
+ * Returns the number of entries consumed in @bvec.
+ */
+unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
+ const struct xdr_buf *xdr)
+{
+ const struct kvec *head = xdr->head;
+ const struct kvec *tail = xdr->tail;
+ unsigned int count = 0;
+
+ if (head->iov_len) {
+ bvec_set_virt(bvec++, head->iov_base, head->iov_len);
+ ++count;
+ }
+
+ if (xdr->page_len) {
+ unsigned int offset, len, remaining;
+ struct page **pages = xdr->pages;
+
+ offset = offset_in_page(xdr->page_base);
+ remaining = xdr->page_len;
+ while (remaining > 0) {
+ len = min_t(unsigned int, remaining,
+ PAGE_SIZE - offset);
+ bvec_set_page(bvec++, *pages++, len, offset);
+ remaining -= len;
+ offset = 0;
+ if (unlikely(++count > bvec_size))
+ goto bvec_overflow;
+ }
+ }
+
+ if (tail->iov_len) {
+ bvec_set_virt(bvec, tail->iov_base, tail->iov_len);
+ if (unlikely(++count > bvec_size))
+ goto bvec_overflow;
+ }
+
+ return count;
+
+bvec_overflow:
+ pr_warn_once("%s: bio_vec array overflow\n", __func__);
+ return count - 1;
+}
+
+/**
* xdr_inline_pages - Prepare receive buffer for a large reply
* @xdr: xdr_buf into which reply will be placed
* @offset: expected offset where data payload will start, in bytes