* [PATCH v2 01/12] xprtrdma: rpcrdma_bc_receive_call() should init rq_private_buf.len
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
@ 2016-02-18 17:30 ` Chuck Lever
2016-02-18 17:30 ` [PATCH v2 02/12] xprtrdma: Clean up unused RPCRDMA_INLINE_PAD_THRESH macro Chuck Lever
` (10 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:30 UTC (permalink / raw)
To: linux-rdma, linux-nfs
Some NFSv4.1 OPEN requests were hanging waiting for the NFS server
to finish recalling delegations. Turns out that each NFSv4.1 CB
request on RDMA gets a GARBAGE_ARGS reply from the Linux client.
Commit 756b9b37cfb2e3dc added a line in bc_svc_process that
overwrites the incoming rq_rcv_buf's length with the value in
rq_private_buf.len. But rpcrdma_bc_receive_call() does not invoke
xprt_complete_bc_request(), thus rq_private_buf.len is not
initialized. svc_process_common() is invoked with a zero-length
RPC message, and fails.
Fixes: 756b9b37cfb2e3dc ('SUNRPC: Fix callback channel')
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
net/sunrpc/xprtrdma/backchannel.c | 2 ++
1 file changed, 2 insertions(+)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index cc1251d..2dcd764 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -341,6 +341,8 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
rqst->rq_reply_bytes_recvd = 0;
rqst->rq_bytes_sent = 0;
rqst->rq_xid = headerp->rm_xid;
+
+ rqst->rq_private_buf.len = size;
set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
buf = &rqst->rq_rcv_buf;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 02/12] xprtrdma: Clean up unused RPCRDMA_INLINE_PAD_THRESH macro
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
2016-02-18 17:30 ` [PATCH v2 01/12] xprtrdma: rpcrdma_bc_receive_call() should init rq_private_buf.len Chuck Lever
@ 2016-02-18 17:30 ` Chuck Lever
2016-02-18 17:30 ` [PATCH v2 03/12] xprtrdma: Clean up physical_op_map() Chuck Lever
` (9 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:30 UTC (permalink / raw)
To: linux-rdma, linux-nfs
Fixes: b3221d6a53c4 ('xprtrdma: Remove logic that constructs...')
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
include/linux/sunrpc/xprtrdma.h | 2 --
1 file changed, 2 deletions(-)
diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index b7b279b..767190b 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -54,8 +54,6 @@
#define RPCRDMA_DEF_INLINE (1024) /* default inline max */
-#define RPCRDMA_INLINE_PAD_THRESH (512)/* payload threshold to pad (bytes) */
-
/* Memory registration strategies, by number.
* This is part of a kernel / user space API. Do not remove. */
enum rpcrdma_memreg {
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 03/12] xprtrdma: Clean up physical_op_map()
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
2016-02-18 17:30 ` [PATCH v2 01/12] xprtrdma: rpcrdma_bc_receive_call() should init rq_private_buf.len Chuck Lever
2016-02-18 17:30 ` [PATCH v2 02/12] xprtrdma: Clean up unused RPCRDMA_INLINE_PAD_THRESH macro Chuck Lever
@ 2016-02-18 17:30 ` Chuck Lever
2016-02-18 17:30 ` [PATCH v2 04/12] xprtrdma: Clean up dprintk format string containing a newline Chuck Lever
` (8 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:30 UTC (permalink / raw)
To: linux-rdma, linux-nfs
physical_op_unmap{_sync} don't use mr_nsegs, so don't bother to set
it in physical_op_map.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
net/sunrpc/xprtrdma/physical_ops.c | 1 -
1 file changed, 1 deletion(-)
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index dbb302e..481b9b6 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -68,7 +68,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
seg->mr_rkey = ia->ri_dma_mr->rkey;
seg->mr_base = seg->mr_dma;
- seg->mr_nsegs = 1;
return 1;
}
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 04/12] xprtrdma: Clean up dprintk format string containing a newline
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (2 preceding siblings ...)
2016-02-18 17:30 ` [PATCH v2 03/12] xprtrdma: Clean up physical_op_map() Chuck Lever
@ 2016-02-18 17:30 ` Chuck Lever
2016-02-18 17:30 ` [PATCH v2 05/12] xprtrdma: Segment head and tail XDR buffers on page boundaries Chuck Lever
` (7 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:30 UTC (permalink / raw)
To: linux-rdma, linux-nfs
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 0f28f2d..e9dfd6a 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -809,10 +809,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
*/
list_del_init(&rqst->rq_list);
spin_unlock_bh(&xprt->transport_lock);
- dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
- " RPC request 0x%p xid 0x%08x\n",
- __func__, rep, req, rqst,
- be32_to_cpu(headerp->rm_xid));
+ dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
+ __func__, rep, req, be32_to_cpu(headerp->rm_xid));
/* from here on, the reply is no longer an orphan */
req->rl_reply = rep;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 05/12] xprtrdma: Segment head and tail XDR buffers on page boundaries
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (3 preceding siblings ...)
2016-02-18 17:30 ` [PATCH v2 04/12] xprtrdma: Clean up dprintk format string containing a newline Chuck Lever
@ 2016-02-18 17:30 ` Chuck Lever
2016-02-18 17:30 ` [PATCH v2 06/12] xprtrdma: Do not wait if ib_post_send() fails Chuck Lever
` (6 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:30 UTC (permalink / raw)
To: linux-rdma, linux-nfs
A single memory allocation is used for the pair of buffers wherein
the RPC client builds an RPC call message and decodes its matching
reply. These buffers are sized based on the maximum possible size
of the RPC call and reply messages for the operation in progress.
This means that as the call buffer increases in size, the start of
the reply buffer is pushed farther into the memory allocation.
RPC requests are growing in size. It used to be that both the call
and reply buffers fit inside a single page.
But these days, thanks to NFSv4 (and especially security labels in
NFSv4.2) the maximum call and reply sizes are large. NFSv4.0 OPEN,
for example, now requires a 6KB allocation for a pair of call and
reply buffers, and NFSv4 LOOKUP is not far behind.
As the maximum size of a call increases, the reply buffer is pushed
far enough into the buffer's memory allocation that a page boundary
can appear in the middle of it.
When the maximum possible reply size is larger than the client's
RDMA receive buffers (currently 1KB), the client has to register a
Reply chunk for the server to RDMA Write the reply into.
The logic in rpcrdma_convert_iovs() assumes that xdr_buf head and
tail buffers would always be contained on a single page. It supplies
just one segment for the head and one for the tail.
FMR, for example, registers up to a page boundary (only a portion of
the reply buffer in the OPEN case above). But without additional
segments, it doesn't register the rest of the buffer.
When the server tries to write the OPEN reply, the RDMA Write fails
with a remote access error since the client registered only part of
the Reply chunk.
rpcrdma_convert_iovs() must split the XDR buffer into multiple
segments, each of which are guaranteed not to contain a page
boundary. That way fmr_op_map is given the proper number of segments
to register the whole reply buffer.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Devesh Sharma <devesh.sharma@broadcom.com>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 42 ++++++++++++++++++++++++++++++----------
1 file changed, 32 insertions(+), 10 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index e9dfd6a..0607391 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
return tlen;
}
+/* Split "vec" on page boundaries into segments. FMR registers pages,
+ * not a byte range. Other modes coalesce these segments into a single
+ * MR when they can.
+ */
+static int
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
+ int n, int nsegs)
+{
+ size_t page_offset;
+ u32 remaining;
+ char *base;
+
+ base = vec->iov_base;
+ page_offset = offset_in_page(base);
+ remaining = vec->iov_len;
+ while (remaining && n < nsegs) {
+ seg[n].mr_page = NULL;
+ seg[n].mr_offset = base;
+ seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
+ remaining -= seg[n].mr_len;
+ base += seg[n].mr_len;
+ ++n;
+ page_offset = 0;
+ }
+ return n;
+}
+
/*
* Chunk assembly from upper layer xdr_buf.
*
@@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
int page_base;
struct page **ppages;
- if (pos == 0 && xdrbuf->head[0].iov_len) {
- seg[n].mr_page = NULL;
- seg[n].mr_offset = xdrbuf->head[0].iov_base;
- seg[n].mr_len = xdrbuf->head[0].iov_len;
- ++n;
+ if (pos == 0) {
+ n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
+ if (n == nsegs)
+ return -EIO;
}
len = xdrbuf->page_len;
@@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
* xdr pad bytes, saving the server an RDMA operation. */
if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
return n;
+ n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
if (n == nsegs)
- /* Tail remains, but we're out of segments */
return -EIO;
- seg[n].mr_page = NULL;
- seg[n].mr_offset = xdrbuf->tail[0].iov_base;
- seg[n].mr_len = xdrbuf->tail[0].iov_len;
- ++n;
}
return n;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 06/12] xprtrdma: Do not wait if ib_post_send() fails
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (4 preceding siblings ...)
2016-02-18 17:30 ` [PATCH v2 05/12] xprtrdma: Segment head and tail XDR buffers on page boundaries Chuck Lever
@ 2016-02-18 17:30 ` Chuck Lever
2016-02-18 17:30 ` [PATCH v2 07/12] rpcrdma: Add RPCRDMA_HDRLEN_ERR Chuck Lever
` (5 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:30 UTC (permalink / raw)
To: linux-rdma, linux-nfs
If ib_post_send() in ro_unmap_sync() fails, the WRs have not been
posted, no completions will fire, and wait_for_completion() will
wait forever. Skip the wait in that case.
To ensure the MRs are invalid, disconnect.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
net/sunrpc/xprtrdma/frwr_ops.c | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index e165673..ecb005f 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -520,14 +520,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* unless ri_id->qp is a valid pointer.
*/
rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
- if (rc)
+ if (rc) {
pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
+ rdma_disconnect(ia->ri_id);
+ goto unmap;
+ }
wait_for_completion(&f->fr_linv_done);
/* ORDER: Now DMA unmap all of the req's MRs, and return
* them to the free MW list.
*/
+unmap:
for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
seg = &req->rl_segments[i];
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 07/12] rpcrdma: Add RPCRDMA_HDRLEN_ERR
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (5 preceding siblings ...)
2016-02-18 17:30 ` [PATCH v2 06/12] xprtrdma: Do not wait if ib_post_send() fails Chuck Lever
@ 2016-02-18 17:30 ` Chuck Lever
2016-02-18 17:30 ` [PATCH v2 08/12] xprtrdma: Properly handle RDMA_ERROR replies Chuck Lever
` (4 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:30 UTC (permalink / raw)
To: linux-rdma, linux-nfs
Error headers are shorter than either RDMA_MSG or RDMA_NOMSG.
Since HDRLEN_MIN is already used in several other places that would
be annoying to change, add RPCRDMA_HDRLEN_ERR for the one or two
spots where the shorter length is needed.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Devesh Sharma <devesh.sharma@broadcom.com>
---
include/linux/sunrpc/rpc_rdma.h | 1 +
1 file changed, 1 insertion(+)
diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index f33c5a4..8c6d23c 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -102,6 +102,7 @@ struct rpcrdma_msg {
* Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
*/
#define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)
+#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5)
enum rpcrdma_errcode {
ERR_VERS = 1,
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 08/12] xprtrdma: Properly handle RDMA_ERROR replies
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (6 preceding siblings ...)
2016-02-18 17:30 ` [PATCH v2 07/12] rpcrdma: Add RPCRDMA_HDRLEN_ERR Chuck Lever
@ 2016-02-18 17:30 ` Chuck Lever
2016-02-18 17:31 ` [PATCH v2 09/12] xprtrdma: Serialize credit accounting again Chuck Lever
` (3 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:30 UTC (permalink / raw)
To: linux-rdma, linux-nfs
These are shorter than RPCRDMA_HDRLEN_MIN, and they need to
complete the waiting RPC.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
include/linux/sunrpc/rpc_rdma.h | 11 +++++---
net/sunrpc/xprtrdma/rpc_rdma.c | 51 +++++++++++++++++++++++++++++++++------
2 files changed, 49 insertions(+), 13 deletions(-)
diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index 8c6d23c..3b1ff38 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -93,6 +93,12 @@ struct rpcrdma_msg {
__be32 rm_pempty[3]; /* 3 empty chunk lists */
} rm_padded;
+ struct {
+ __be32 rm_err;
+ __be32 rm_vers_low;
+ __be32 rm_vers_high;
+ } rm_error;
+
__be32 rm_chunks[0]; /* read, write and reply chunks */
} rm_body;
@@ -109,11 +115,6 @@ enum rpcrdma_errcode {
ERR_CHUNK = 2
};
-struct rpcrdma_err_vers {
- uint32_t rdma_vers_low; /* Version range supported by peer */
- uint32_t rdma_vers_high;
-};
-
enum rpcrdma_proc {
RDMA_MSG = 0, /* An RPC call or reply msg */
RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 0607391..35f8108 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -795,7 +795,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
- int rdmalen, status;
+ int rdmalen, status, rmerr;
unsigned long cwnd;
u32 credits;
@@ -803,12 +803,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (rep->rr_len == RPCRDMA_BAD_LEN)
goto out_badstatus;
- if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+ if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
goto out_shortreply;
headerp = rdmab_to_msg(rep->rr_rdmabuf);
- if (headerp->rm_vers != rpcrdma_version)
- goto out_badversion;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
if (rpcrdma_is_bcall(headerp))
goto out_bcall;
@@ -838,6 +836,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
req->rl_reply = rep;
xprt->reestablish_timeout = 0;
+ if (headerp->rm_vers != rpcrdma_version)
+ goto out_badversion;
+
/* check for expected message types */
/* The order of some of these tests is important. */
switch (headerp->rm_type) {
@@ -898,6 +899,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
status = rdmalen;
break;
+ case rdma_error:
+ goto out_rdmaerr;
+
badheader:
default:
dprintk("%s: invalid rpcrdma reply header (type %d):"
@@ -913,6 +917,7 @@ badheader:
break;
}
+out:
/* Invalidate and flush the data payloads before waking the
* waiting application. This guarantees the memory region is
* properly fenced from the server before the application
@@ -955,13 +960,43 @@ out_bcall:
return;
#endif
-out_shortreply:
- dprintk("RPC: %s: short/invalid reply\n", __func__);
- goto repost;
-
+/* If the incoming reply terminated a pending RPC, the next
+ * RPC call will post a replacement receive buffer as it is
+ * being marshaled.
+ */
out_badversion:
dprintk("RPC: %s: invalid version %d\n",
__func__, be32_to_cpu(headerp->rm_vers));
+ status = -EIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ goto out;
+
+out_rdmaerr:
+ rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
+ switch (rmerr) {
+ case ERR_VERS:
+ pr_err("%s: server reports header version error (%u-%u)\n",
+ __func__,
+ be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
+ be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
+ break;
+ case ERR_CHUNK:
+ pr_err("%s: server reports header decoding error\n",
+ __func__);
+ break;
+ default:
+ pr_err("%s: server reports unknown error %d\n",
+ __func__, rmerr);
+ }
+ status = -EREMOTEIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ goto out;
+
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
+out_shortreply:
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
out_nomatch:
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 09/12] xprtrdma: Serialize credit accounting again
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (7 preceding siblings ...)
2016-02-18 17:30 ` [PATCH v2 08/12] xprtrdma: Properly handle RDMA_ERROR replies Chuck Lever
@ 2016-02-18 17:31 ` Chuck Lever
2016-02-18 17:31 ` [PATCH v2 10/12] xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs Chuck Lever
` (2 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:31 UTC (permalink / raw)
To: linux-rdma, linux-nfs
Commit fe97b47cd623 ("xprtrdma: Use workqueue to process RPC/RDMA
replies") replaced the reply tasklet with a workqueue that allows
RPC replies to be processed in parallel. Thus the credit values in
RPC-over-RDMA replies can be applied in a different order than in
which the server sent them.
To fix this, revert commit eba8ff660b2d ("xprtrdma: Move credit
update to RPC reply handler"). Reverting is done by hand to
accommodate code changes that have occurred since then.
Fixes: fe97b47cd623 ("xprtrdma: Use workqueue to process . . .")
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 9 +--------
net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++++++++-
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
3 files changed, 28 insertions(+), 9 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 35f8108..888823b 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -797,7 +797,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
__be32 *iptr;
int rdmalen, status, rmerr;
unsigned long cwnd;
- u32 credits;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
@@ -928,15 +927,9 @@ out:
if (req->rl_nchunks)
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
- credits = be32_to_cpu(headerp->rm_credit);
- if (credits == 0)
- credits = 1; /* don't deadlock */
- else if (credits > r_xprt->rx_buf.rb_max_requests)
- credits = r_xprt->rx_buf.rb_max_requests;
-
spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
- xprt->cwnd = credits << RPC_CWNDSHIFT;
+ xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 878f1bf..fc1ef5f 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -190,6 +190,28 @@ rpcrdma_receive_worker(struct work_struct *work)
rpcrdma_reply_handler(rep);
}
+/* Perform basic sanity checking to avoid using garbage
+ * to update the credit grant value.
+ */
+static void
+rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
+ struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
+ u32 credits;
+
+ if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
+ return;
+
+ credits = be32_to_cpu(rmsgp->rm_credit);
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > buffer->rb_max_requests)
+ credits = buffer->rb_max_requests;
+
+ atomic_set(&buffer->rb_credits, credits);
+}
+
static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{
@@ -211,7 +233,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
ib_dma_sync_single_for_cpu(rep->rr_device,
rdmab_addr(rep->rr_rdmabuf),
rep->rr_len, DMA_FROM_DEVICE);
- prefetch(rdmab_to_msg(rep->rr_rdmabuf));
+
+ rpcrdma_update_granted_credits(rep);
out_schedule:
queue_work(rpcrdma_receive_wq, &rep->rr_work);
@@ -330,6 +353,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
connected:
dprintk("RPC: %s: %sconnected\n",
__func__, connstate > 0 ? "" : "dis");
+ atomic_set(&xprt->rx_buf.rb_credits, 1);
ep->rep_connected = connstate;
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
@@ -943,6 +967,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
+ atomic_set(&buf->rb_credits, 1);
rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 38fe11b..7bf6f43 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -311,6 +311,7 @@ struct rpcrdma_buffer {
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
u32 rb_max_requests;
+ atomic_t rb_credits; /* most recent credit grant */
u32 rb_bc_srv_max_requests;
spinlock_t rb_reqslock; /* protect rb_allreqs */
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 10/12] xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (8 preceding siblings ...)
2016-02-18 17:31 ` [PATCH v2 09/12] xprtrdma: Serialize credit accounting again Chuck Lever
@ 2016-02-18 17:31 ` Chuck Lever
2016-02-18 17:31 ` [PATCH v2 11/12] xprtrdma: Use an anonymous union in struct rpcrdma_mw Chuck Lever
2016-02-18 17:31 ` [PATCH v2 12/12] xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs Chuck Lever
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:31 UTC (permalink / raw)
To: linux-rdma, linux-nfs
Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.
By converting to this new API, xprtrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.
Because each ib_cqe carries a pointer to a completion method, the
core can now post its own operations on a consumer's QP, and handle
the completions itself, without changes to the consumer.
xprtrdma's reply processing is already handled in a work queue, but
there is some initial order-dependent processing that is done in the
soft IRQ context before a work item is scheduled.
IB_POLL_SOFTIRQ is a direct replacement for the current xprtrdma
receive code path.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Devesh Sharma <devesh.sharma@broadcom.com>
---
net/sunrpc/xprtrdma/verbs.c | 78 ++++++++++-----------------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
2 files changed, 21 insertions(+), 58 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index fc1ef5f..9533b5f 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -212,11 +212,18 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
atomic_set(&buffer->rb_credits, credits);
}
+/**
+ * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc)
+rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
{
- struct rpcrdma_rep *rep =
- (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
+ rr_cqe);
/* WARNING: Only wr_id and status are reliable at this point */
if (wc->status != IB_WC_SUCCESS)
@@ -242,55 +249,20 @@ out_schedule:
out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("RPC: %s: rep %p: %s\n",
- __func__, rep, ib_wc_status_msg(wc->status));
+ pr_err("RPC: %s: Recv: %s (%u, vendor %u)\n",
+ __func__, ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
rep->rr_len = RPCRDMA_BAD_LEN;
goto out_schedule;
}
-/* The wc array is on stack: automatic memory is always CPU-local.
- *
- * struct ib_wc is 64 bytes, making the poll array potentially
- * large. But this is at the bottom of the call chain. Further
- * substantial work is done in another thread.
- */
-static void
-rpcrdma_recvcq_poll(struct ib_cq *cq)
-{
- struct ib_wc *pos, wcs[4];
- int count, rc;
-
- do {
- pos = wcs;
-
- rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
- if (rc < 0)
- break;
-
- count = rc;
- while (count-- > 0)
- rpcrdma_recvcq_process_wc(pos++);
- } while (rc == ARRAY_SIZE(wcs));
-}
-
-/* Handle provider receive completion upcalls.
- */
-static void
-rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
-{
- do {
- rpcrdma_recvcq_poll(cq);
- } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
- IB_CQ_REPORT_MISSED_EVENTS) > 0);
-}
-
static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{
struct ib_wc wc;
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
- rpcrdma_recvcq_process_wc(&wc);
+ rpcrdma_receive_wc(NULL, &wc);
while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
rpcrdma_sendcq_process_wc(&wc);
}
@@ -655,9 +627,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out2;
}
- cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
- recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
- rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+ recvcq = ib_alloc_cq(ia->ri_device, NULL,
+ ep->rep_attr.cap.max_recv_wr + 1,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -665,14 +637,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out2;
}
- rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
- if (rc) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- ib_destroy_cq(recvcq);
- goto out2;
- }
-
ep->rep_attr.send_cq = sendcq;
ep->rep_attr.recv_cq = recvcq;
@@ -735,10 +699,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ia->ri_id->qp = NULL;
}
- rc = ib_destroy_cq(ep->rep_attr.recv_cq);
- if (rc)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, rc);
+ ib_free_cq(ep->rep_attr.recv_cq);
rc = ib_destroy_cq(ep->rep_attr.send_cq);
if (rc)
@@ -947,6 +908,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
}
rep->rr_device = ia->ri_device;
+ rep->rr_cqe.done = rpcrdma_receive_wc;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
return rep;
@@ -1322,7 +1284,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
int rc;
recv_wr.next = NULL;
- recv_wr.wr_id = (u64) (unsigned long) rep;
+ recv_wr.wr_cqe = &rep->rr_cqe;
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 7bf6f43..d60feb9 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -171,6 +171,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
struct rpcrdma_buffer;
struct rpcrdma_rep {
+ struct ib_cqe rr_cqe;
unsigned int rr_len;
struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 11/12] xprtrdma: Use an anonymous union in struct rpcrdma_mw
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (9 preceding siblings ...)
2016-02-18 17:31 ` [PATCH v2 10/12] xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs Chuck Lever
@ 2016-02-18 17:31 ` Chuck Lever
2016-02-18 17:31 ` [PATCH v2 12/12] xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs Chuck Lever
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:31 UTC (permalink / raw)
To: linux-rdma, linux-nfs
Clean up: Make code more readable.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Devesh Sharma <devesh.sharma@broadcom.com>
---
net/sunrpc/xprtrdma/fmr_ops.c | 28 +++++++++++++-------------
net/sunrpc/xprtrdma/frwr_ops.c | 42 ++++++++++++++++++++-------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +-
3 files changed, 36 insertions(+), 36 deletions(-)
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index c14f3a4..b289e10 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -80,13 +80,13 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
if (!r)
goto out;
- r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
- sizeof(u64), GFP_KERNEL);
- if (!r->r.fmr.physaddrs)
+ r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
+ sizeof(u64), GFP_KERNEL);
+ if (!r->fmr.physaddrs)
goto out_free;
- r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
- if (IS_ERR(r->r.fmr.fmr))
+ r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+ if (IS_ERR(r->fmr.fmr))
goto out_fmr_err;
list_add(&r->mw_list, &buf->rb_mws);
@@ -95,9 +95,9 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
return 0;
out_fmr_err:
- rc = PTR_ERR(r->r.fmr.fmr);
+ rc = PTR_ERR(r->fmr.fmr);
dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
- kfree(r->r.fmr.physaddrs);
+ kfree(r->fmr.physaddrs);
out_free:
kfree(r);
out:
@@ -109,7 +109,7 @@ __fmr_unmap(struct rpcrdma_mw *r)
{
LIST_HEAD(l);
- list_add(&r->r.fmr.fmr->list, &l);
+ list_add(&r->fmr.fmr->list, &l);
return ib_unmap_fmr(&l);
}
@@ -148,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
nsegs = RPCRDMA_MAX_FMR_SGES;
for (i = 0; i < nsegs;) {
rpcrdma_map_one(device, seg, direction);
- mw->r.fmr.physaddrs[i] = seg->mr_dma;
+ mw->fmr.physaddrs[i] = seg->mr_dma;
len += seg->mr_len;
++seg;
++i;
@@ -158,13 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
break;
}
- rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
+ rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
i, seg1->mr_dma);
if (rc)
goto out_maperr;
seg1->rl_mw = mw;
- seg1->mr_rkey = mw->r.fmr.fmr->rkey;
+ seg1->mr_rkey = mw->fmr.fmr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
seg1->mr_len = len;
@@ -219,7 +219,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
seg = &req->rl_segments[i];
mw = seg->rl_mw;
- list_add(&mw->r.fmr.fmr->list, &unmap_list);
+ list_add(&mw->fmr.fmr->list, &unmap_list);
i += seg->mr_nsegs;
}
@@ -281,9 +281,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
- kfree(r->r.fmr.physaddrs);
+ kfree(r->fmr.physaddrs);
- rc = ib_dealloc_fmr(r->r.fmr.fmr);
+ rc = ib_dealloc_fmr(r->fmr.fmr);
if (rc)
dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
__func__, rc);
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index ecb005f..0cb9efa 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -109,20 +109,20 @@ static void
__frwr_recovery_worker(struct work_struct *work)
{
struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
- r.frmr.fr_work);
- struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
+ frmr.fr_work);
+ struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
- if (ib_dereg_mr(r->r.frmr.fr_mr))
+ if (ib_dereg_mr(r->frmr.fr_mr))
goto out_fail;
- r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
- if (IS_ERR(r->r.frmr.fr_mr))
+ r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
+ if (IS_ERR(r->frmr.fr_mr))
goto out_fail;
dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
- r->r.frmr.fr_state = FRMR_IS_INVALID;
+ r->frmr.fr_state = FRMR_IS_INVALID;
rpcrdma_put_mw(r_xprt, r);
return;
@@ -137,15 +137,15 @@ out_fail:
static void
__frwr_queue_recovery(struct rpcrdma_mw *r)
{
- INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
- queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
+ INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
+ queue_work(frwr_recovery_wq, &r->frmr.fr_work);
}
static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
unsigned int depth)
{
- struct rpcrdma_frmr *f = &r->r.frmr;
+ struct rpcrdma_frmr *f = &r->frmr;
int rc;
f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
@@ -179,11 +179,11 @@ __frwr_release(struct rpcrdma_mw *r)
{
int rc;
- rc = ib_dereg_mr(r->r.frmr.fr_mr);
+ rc = ib_dereg_mr(r->frmr.fr_mr);
if (rc)
dprintk("RPC: %s: ib_dereg_mr status %i\n",
__func__, rc);
- kfree(r->r.frmr.sg);
+ kfree(r->frmr.sg);
}
static int
@@ -263,14 +263,14 @@ __frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
__func__, r, ib_wc_status_msg(wc->status), wc->status);
- r->r.frmr.fr_state = FRMR_IS_STALE;
+ r->frmr.fr_state = FRMR_IS_STALE;
}
static void
frwr_sendcompletion(struct ib_wc *wc)
{
struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- struct rpcrdma_frmr *f = &r->r.frmr;
+ struct rpcrdma_frmr *f = &r->frmr;
if (unlikely(wc->status != IB_WC_SUCCESS))
__frwr_sendcompletion_flush(wc, r);
@@ -314,7 +314,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
r->mw_sendcompletion = frwr_sendcompletion;
- r->r.frmr.fr_xprt = r_xprt;
+ r->frmr.fr_xprt = r_xprt;
}
return 0;
@@ -347,8 +347,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
mw = rpcrdma_get_mw(r_xprt);
if (!mw)
return -ENOMEM;
- } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
- frmr = &mw->r.frmr;
+ } while (mw->frmr.fr_state != FRMR_IS_INVALID);
+ frmr = &mw->frmr;
frmr->fr_state = FRMR_IS_VALID;
frmr->fr_waiter = false;
mr = frmr->fr_mr;
@@ -434,7 +434,7 @@ static struct ib_send_wr *
__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_mw *mw = seg->rl_mw;
- struct rpcrdma_frmr *f = &mw->r.frmr;
+ struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;
f->fr_waiter = false;
@@ -455,7 +455,7 @@ __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
{
struct ib_device *device = r_xprt->rx_ia.ri_device;
struct rpcrdma_mw *mw = seg->rl_mw;
- struct rpcrdma_frmr *f = &mw->r.frmr;
+ struct rpcrdma_frmr *f = &mw->frmr;
seg->rl_mw = NULL;
@@ -504,7 +504,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
i += seg->mr_nsegs;
}
- f = &seg->rl_mw->r.frmr;
+ f = &seg->rl_mw->frmr;
/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
@@ -553,7 +553,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mw *mw = seg1->rl_mw;
- struct rpcrdma_frmr *frmr = &mw->r.frmr;
+ struct rpcrdma_frmr *frmr = &mw->frmr;
struct ib_send_wr *invalidate_wr, *bad_wr;
int rc, nsegs = seg->mr_nsegs;
@@ -561,7 +561,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
seg1->rl_mw = NULL;
frmr->fr_state = FRMR_IS_INVALID;
- invalidate_wr = &mw->r.frmr.fr_invwr;
+ invalidate_wr = &mw->frmr.fr_invwr;
memset(invalidate_wr, 0, sizeof(*invalidate_wr));
invalidate_wr->wr_id = (uintptr_t)mw;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index d60feb9..b3c4472 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -225,7 +225,7 @@ struct rpcrdma_mw {
union {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
- } r;
+ };
void (*mw_sendcompletion)(struct ib_wc *);
struct list_head mw_list;
struct list_head mw_all;
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH v2 12/12] xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs
2016-02-18 17:29 [PATCH v2 00/12] NFS/RDMA client patches for v4.6 Chuck Lever
` (10 preceding siblings ...)
2016-02-18 17:31 ` [PATCH v2 11/12] xprtrdma: Use an anonymous union in struct rpcrdma_mw Chuck Lever
@ 2016-02-18 17:31 ` Chuck Lever
11 siblings, 0 replies; 13+ messages in thread
From: Chuck Lever @ 2016-02-18 17:31 UTC (permalink / raw)
To: linux-rdma, linux-nfs
Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.
By converting to this new API, xprtrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.
Because each ib_cqe carries a pointer to a completion method, the
core can now post its own operations on a consumer's QP, and handle
the completions itself, without changes to the consumer.
Send completions were previously handled entirely in the completion
upcall handler (ie, deferring to a process context is unneeded).
Thus IB_POLL_SOFTIRQ is a direct replacement for the current
xprtrdma send code path.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Devesh Sharma <devesh.sharma@broadcom.com>
---
net/sunrpc/xprtrdma/frwr_ops.c | 99 +++++++++++++++++++++++++-----------
net/sunrpc/xprtrdma/verbs.c | 107 +++++++--------------------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 10 +---
3 files changed, 91 insertions(+), 125 deletions(-)
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 0cb9efa..83c4015 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -158,6 +158,8 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
sg_init_table(f->sg, depth);
+ init_completion(&f->fr_linv_done);
+
return 0;
out_mr_err:
@@ -244,39 +246,76 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
}
-/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs
- * to be reset.
+static void
+__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
+ const char *wr)
+{
+ frmr->fr_state = FRMR_IS_STALE;
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: %s %s (%u, vendor %u)\n",
+ __func__, ib_wc_status_msg(wc->status),
+ wr, wc->status, wc->vendor_err);
+}
+
+/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
*
- * WARNING: Only wr_id and status are reliable at this point
*/
static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
+frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
{
- if (likely(wc->status == IB_WC_SUCCESS))
- return;
-
- /* WARNING: Only wr_id and status are reliable at this point */
- r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- if (wc->status == IB_WC_WR_FLUSH_ERR)
- dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
- else
- pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
- __func__, r, ib_wc_status_msg(wc->status), wc->status);
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;
- r->frmr.fr_state = FRMR_IS_STALE;
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS) {
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ __frwr_sendcompletion_flush(wc, frmr, "fastreg");
+ }
}
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
static void
-frwr_sendcompletion(struct ib_wc *wc)
+frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
{
- struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- struct rpcrdma_frmr *f = &r->frmr;
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;
- if (unlikely(wc->status != IB_WC_SUCCESS))
- __frwr_sendcompletion_flush(wc, r);
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS) {
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ __frwr_sendcompletion_flush(wc, frmr, "localinv");
+ }
+}
- if (f->fr_waiter)
- complete(&f->fr_linv_done);
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void
+frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ if (wc->status != IB_WC_SUCCESS)
+ __frwr_sendcompletion_flush(wc, frmr, "localinv");
+ complete_all(&frmr->fr_linv_done);
}
static int
@@ -313,7 +352,6 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
- r->mw_sendcompletion = frwr_sendcompletion;
r->frmr.fr_xprt = r_xprt;
}
@@ -350,7 +388,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
} while (mw->frmr.fr_state != FRMR_IS_INVALID);
frmr = &mw->frmr;
frmr->fr_state = FRMR_IS_VALID;
- frmr->fr_waiter = false;
mr = frmr->fr_mr;
reg_wr = &frmr->fr_regwr;
@@ -400,7 +437,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
reg_wr->wr.next = NULL;
reg_wr->wr.opcode = IB_WR_REG_MR;
- reg_wr->wr.wr_id = (uintptr_t)mw;
+ frmr->fr_cqe.done = frwr_wc_fastreg;
+ reg_wr->wr.wr_cqe = &frmr->fr_cqe;
reg_wr->wr.num_sge = 0;
reg_wr->wr.send_flags = 0;
reg_wr->mr = mr;
@@ -437,12 +475,12 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;
- f->fr_waiter = false;
f->fr_state = FRMR_IS_INVALID;
invalidate_wr = &f->fr_invwr;
memset(invalidate_wr, 0, sizeof(*invalidate_wr));
- invalidate_wr->wr_id = (unsigned long)(void *)mw;
+ f->fr_cqe.done = frwr_wc_localinv;
+ invalidate_wr->wr_cqe = &f->fr_cqe;
invalidate_wr->opcode = IB_WR_LOCAL_INV;
invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;
@@ -511,8 +549,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* are complete.
*/
f->fr_invwr.send_flags = IB_SEND_SIGNALED;
- f->fr_waiter = true;
- init_completion(&f->fr_linv_done);
+ f->fr_cqe.done = frwr_wc_localinv_wake;
+ reinit_completion(&f->fr_linv_done);
INIT_CQCOUNT(&r_xprt->rx_ep);
/* Transport disconnect drains the receive CQ before it
@@ -564,7 +602,8 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
invalidate_wr = &mw->frmr.fr_invwr;
memset(invalidate_wr, 0, sizeof(*invalidate_wr));
- invalidate_wr->wr_id = (uintptr_t)mw;
+ frmr->fr_cqe.done = frwr_wc_localinv;
+ invalidate_wr->wr_cqe = &frmr->fr_cqe;
invalidate_wr->opcode = IB_WR_LOCAL_INV;
invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 9533b5f..6a8d4d3 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -112,73 +112,20 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
}
}
-static void
-rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
-{
- struct rpcrdma_ep *ep = context;
-
- pr_err("RPC: %s: %s on device %s ep %p\n",
- __func__, ib_event_msg(event->event),
- event->device->name, context);
- if (ep->rep_connected == 1) {
- ep->rep_connected = -EIO;
- rpcrdma_conn_func(ep);
- wake_up_all(&ep->rep_connect_wait);
- }
-}
-
-static void
-rpcrdma_sendcq_process_wc(struct ib_wc *wc)
-{
- /* WARNING: Only wr_id and status are reliable at this point */
- if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
- if (wc->status != IB_WC_SUCCESS &&
- wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("RPC: %s: SEND: %s\n",
- __func__, ib_wc_status_msg(wc->status));
- } else {
- struct rpcrdma_mw *r;
-
- r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- r->mw_sendcompletion(wc);
- }
-}
-
-/* The common case is a single send completion is waiting. By
- * passing two WC entries to ib_poll_cq, a return code of 1
- * means there is exactly one WC waiting and no more. We don't
- * have to invoke ib_poll_cq again to know that the CQ has been
- * properly drained.
- */
-static void
-rpcrdma_sendcq_poll(struct ib_cq *cq)
-{
- struct ib_wc *pos, wcs[2];
- int count, rc;
-
- do {
- pos = wcs;
-
- rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
- if (rc < 0)
- break;
-
- count = rc;
- while (count-- > 0)
- rpcrdma_sendcq_process_wc(pos++);
- } while (rc == ARRAY_SIZE(wcs));
- return;
-}
-
-/* Handle provider send completion upcalls.
+/**
+ * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
*/
static void
-rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
+rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
- do {
- rpcrdma_sendcq_poll(cq);
- } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
- IB_CQ_REPORT_MISSED_EVENTS) > 0);
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: Send: %s (%u, vendor %u)\n",
+ __func__, ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
}
static void
@@ -263,8 +210,6 @@ rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
rpcrdma_receive_wc(NULL, &wc);
- while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
- rpcrdma_sendcq_process_wc(&wc);
}
static int
@@ -556,9 +501,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct rpcrdma_create_data_internal *cdata)
{
struct ib_cq *sendcq, *recvcq;
- struct ib_cq_init_attr cq_attr = {};
unsigned int max_qp_wr;
- int rc, err;
+ int rc;
if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
dprintk("RPC: %s: insufficient sge's available\n",
@@ -610,9 +554,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
- cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
- sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
- rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+ sendcq = ib_alloc_cq(ia->ri_device, NULL,
+ ep->rep_attr.cap.max_send_wr + 1,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -620,13 +564,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out1;
}
- rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
- if (rc) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- goto out2;
- }
-
recvcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_recv_wr + 1,
0, IB_POLL_SOFTIRQ);
@@ -661,10 +598,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
return 0;
out2:
- err = ib_destroy_cq(sendcq);
- if (err)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, err);
+ ib_free_cq(sendcq);
out1:
if (ia->ri_dma_mr)
ib_dereg_mr(ia->ri_dma_mr);
@@ -700,11 +634,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
}
ib_free_cq(ep->rep_attr.recv_cq);
-
- rc = ib_destroy_cq(ep->rep_attr.send_cq);
- if (rc)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, rc);
+ ib_free_cq(ep->rep_attr.send_cq);
if (ia->ri_dma_mr) {
rc = ib_dereg_mr(ia->ri_dma_mr);
@@ -883,6 +813,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
spin_lock(&buffer->rb_reqslock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_reqslock);
+ req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
return req;
}
@@ -1246,7 +1177,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
}
send_wr.next = NULL;
- send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
+ send_wr.wr_cqe = &req->rl_cqe;
send_wr.sg_list = iov;
send_wr.num_sge = req->rl_niovs;
send_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index b3c4472..2ebc743 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -95,10 +95,6 @@ struct rpcrdma_ep {
#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
-/* Force completion handler to ignore the signal
- */
-#define RPCRDMA_IGNORE_COMPLETION (0ULL)
-
/* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are
* allocated when the forward channel is set up.
@@ -205,11 +201,11 @@ struct rpcrdma_frmr {
struct scatterlist *sg;
int sg_nents;
struct ib_mr *fr_mr;
+ struct ib_cqe fr_cqe;
enum rpcrdma_frmr_state fr_state;
+ struct completion fr_linv_done;
struct work_struct fr_work;
struct rpcrdma_xprt *fr_xprt;
- bool fr_waiter;
- struct completion fr_linv_done;;
union {
struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr;
@@ -226,7 +222,6 @@ struct rpcrdma_mw {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
};
- void (*mw_sendcompletion)(struct ib_wc *);
struct list_head mw_list;
struct list_head mw_all;
};
@@ -282,6 +277,7 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
+ struct ib_cqe rl_cqe;
struct list_head rl_all;
bool rl_backchannel;
};
^ permalink raw reply related [flat|nested] 13+ messages in thread