From: "Steve Wise" <swise@opengridcomputing.com>
To: "'Devesh Sharma'" <Devesh.Sharma@Emulex.Com>, <bfields@fieldses.org>
Cc: <linux-nfs@vger.kernel.org>, <linux-rdma@vger.kernel.org>,
<tom@opengridcomputing.com>
Subject: RE: [PATCH V3] svcrdma: refactor marshalling logic
Date: Mon, 2 Jun 2014 11:47:58 -0500 [thread overview]
Message-ID: <004101cf7e82$68b58ad0$3a20a070$@opengridcomputing.com> (raw)
In-Reply-To: <EE7902D3F51F404C82415C4803930ACD3FE01355@CMEXMB1.ad.emulex.com>
> Steve
>
> I have not checked the code because I am away from my laptop
> But I assume mlx and mthca is using fmr and cxgb4 is using rdma-read-with-invalidate
which
> has implicit fence.
>
> If invalidate is issued before read is completed its a problem.
>
You're correct. And this bug appears to be in the current upstream code as well. If an
IB_WR_LOCAL_INV wr is used, it must include IB_SEND_FENCE to fence it until the prior read
completes.
Good catch! I'll post V4 soon.
> Regards
> Devesh
> ____________________________________
> From: Steve Wise [swise@opengridcomputing.com]
> Sent: Friday, May 30, 2014 6:32 PM
> To: Devesh Sharma; bfields@fieldses.org
> Cc: linux-nfs@vger.kernel.org; linux-rdma@vger.kernel.org; tom@opengridcomputing.com
> Subject: RE: [PATCH V3] svcrdma: refactor marshalling logic
>
> >
> > Hi Steve
> >
> > I am testing this patch. I have found that when server tries to initiate RDMA-READ on
> ocrdma
> > device the RDMA-READ posting fails because there is no FENCE bit set for
> > Non-iwarp device which is using frmr. Because of this, whenever server tries to
initiate
> > RDMA_READ operation, it fails with completion error.
> > This bug was there in v1 and v2 as well.
> >
>
> Why would the FENCE bit not be required for mlx4, mthca, cxgb4, and yet be required for
> ocrdma?
>
>
> > Check inline for the exact location of the change.
> >
> > Rest is okay from my side, iozone is passing with this patch. Off-course after putting
a FENCE
> > indicator.
> >
> > -Regards
> > Devesh
> >
> > > -----Original Message-----
> > > From: linux-rdma-owner@vger.kernel.org [mailto:linux-rdma-
> > > owner@vger.kernel.org] On Behalf Of Steve Wise
> > > Sent: Thursday, May 29, 2014 10:26 PM
> > > To: bfields@fieldses.org
> > > Cc: linux-nfs@vger.kernel.org; linux-rdma@vger.kernel.org;
> > > tom@opengridcomputing.com
> > > Subject: [PATCH V3] svcrdma: refactor marshalling logic
> > >
> > > This patch refactors the NFSRDMA server marshalling logic to remove the
> > > intermediary map structures. It also fixes an existing bug where the
> > > NFSRDMA server was not minding the device fast register page list length
> > > limitations.
> > >
> > > I've also made a git repo available with these patches on top of 3.15-rc7:
> > >
> > > git://git.linux-nfs.org/projects/swise/linux.git svcrdma-refactor-v3
> > >
> > > Changes since V2:
> > >
> > > - fixed logic bug in rdma_read_chunk_frmr() and rdma_read_chunk_lcl()
> > >
> > > - in rdma_read_chunks(), set the reader function pointer only once since
> > > it doesn't change
> > >
> > > - squashed the patch back into one patch since the previous split wasn't
> > > bisectable
> > >
> > > Changes since V1:
> > >
> > > - fixed regression for devices that don't support FRMRs (see
> > > rdma_read_chunk_lcl())
> > >
> > > - split patch up for closer review. However I request it be squashed
> > > before merging as they is not bisectable, and I think these changes
> > > should all be a single commit anyway.
> > >
> > > Please review, and test if you can. I'd like this to hit 3.16.
> > >
> > > Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
> > > Signed-off-by: Steve Wise <swise@opengridcomputing.com>
> > > ---
> > >
> > > include/linux/sunrpc/svc_rdma.h | 3
> > > net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 643 +++++++++++++----------
> > > -------
> > > net/sunrpc/xprtrdma/svc_rdma_sendto.c | 230 +----------
> > > net/sunrpc/xprtrdma/svc_rdma_transport.c | 62 ++-
> > > 4 files changed, 332 insertions(+), 606 deletions(-)
> > >
> > > diff --git a/include/linux/sunrpc/svc_rdma.h
> > > b/include/linux/sunrpc/svc_rdma.h index 0b8e3e6..5cf99a0 100644
> > > --- a/include/linux/sunrpc/svc_rdma.h
> > > +++ b/include/linux/sunrpc/svc_rdma.h
> > > @@ -115,14 +115,13 @@ struct svc_rdma_fastreg_mr {
> > > struct list_head frmr_list;
> > > };
> > > struct svc_rdma_req_map {
> > > - struct svc_rdma_fastreg_mr *frmr;
> > > unsigned long count;
> > > union {
> > > struct kvec sge[RPCSVC_MAXPAGES];
> > > struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES];
> > > + unsigned long lkey[RPCSVC_MAXPAGES];
> > > };
> > > };
> > > -#define RDMACTXT_F_FAST_UNREG 1
> > > #define RDMACTXT_F_LAST_CTXT 2
> > >
> > > #define SVCRDMA_DEVCAP_FAST_REG 1 /*
> > > fast mr registration */
> > > diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > > b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > > index 8d904e4..52d9f2c 100644
> > > --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > > +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > > @@ -1,4 +1,5 @@
> > > /*
> > > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> > > * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> > > *
> > > * This software is available to you under a choice of one of two @@ -69,7
> > > +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
> > >
> > > /* Set up the XDR head */
> > > rqstp->rq_arg.head[0].iov_base = page_address(page);
> > > - rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt-
> > > >sge[0].length);
> > > + rqstp->rq_arg.head[0].iov_len =
> > > + min_t(size_t, byte_count, ctxt->sge[0].length);
> > > rqstp->rq_arg.len = byte_count;
> > > rqstp->rq_arg.buflen = byte_count;
> > >
> > > @@ -85,7 +87,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
> > > page = ctxt->pages[sge_no];
> > > put_page(rqstp->rq_pages[sge_no]);
> > > rqstp->rq_pages[sge_no] = page;
> > > - bc -= min(bc, ctxt->sge[sge_no].length);
> > > + bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
> > > rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
> > > sge_no++;
> > > }
> > > @@ -113,291 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst
> > > *rqstp,
> > > rqstp->rq_arg.tail[0].iov_len = 0;
> > > }
> > >
> > > -/* Encode a read-chunk-list as an array of IB SGE
> > > - *
> > > - * Assumptions:
> > > - * - chunk[0]->position points to pages[0] at an offset of 0
> > > - * - pages[] is not physically or virtually contiguous and consists of
> > > - * PAGE_SIZE elements.
> > > - *
> > > - * Output:
> > > - * - sge array pointing into pages[] array.
> > > - * - chunk_sge array specifying sge index and count for each
> > > - * chunk in the read list
> > > - *
> > > - */
> > > -static int map_read_chunks(struct svcxprt_rdma *xprt,
> > > - struct svc_rqst *rqstp,
> > > - struct svc_rdma_op_ctxt *head,
> > > - struct rpcrdma_msg *rmsgp,
> > > - struct svc_rdma_req_map *rpl_map,
> > > - struct svc_rdma_req_map *chl_map,
> > > - int ch_count,
> > > - int byte_count)
> > > +static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
> > > {
> > > - int sge_no;
> > > - int sge_bytes;
> > > - int page_off;
> > > - int page_no;
> > > - int ch_bytes;
> > > - int ch_no;
> > > - struct rpcrdma_read_chunk *ch;
> > > + if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type)
> > > ==
> > > + RDMA_TRANSPORT_IWARP)
> > > + return 1;
> > > + else
> > > + return min_t(int, sge_count, xprt->sc_max_sge); }
> > >
> > > - sge_no = 0;
> > > - page_no = 0;
> > > - page_off = 0;
> > > - ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > > >rm_body.rm_chunks[0];
> > > - ch_no = 0;
> > > - ch_bytes = ntohl(ch->rc_target.rs_length);
> > > - head->arg.head[0] = rqstp->rq_arg.head[0];
> > > - head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > > - head->arg.pages = &head->pages[head->count];
> > > - head->hdr_count = head->count; /* save count of hdr pages */
> > > - head->arg.page_base = 0;
> > > - head->arg.page_len = ch_bytes;
> > > - head->arg.len = rqstp->rq_arg.len + ch_bytes;
> > > - head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
> > > - head->count++;
> > > - chl_map->ch[0].start = 0;
> > > - while (byte_count) {
> > > - rpl_map->sge[sge_no].iov_base =
> > > - page_address(rqstp->rq_arg.pages[page_no]) +
> > > page_off;
> > > - sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
> > > - rpl_map->sge[sge_no].iov_len = sge_bytes;
> > > - /*
> > > - * Don't bump head->count here because the same page
> > > - * may be used by multiple SGE.
> > > - */
> > > - head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
> > > - rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
> > > +typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
> > > + struct svc_rqst *rqstp,
> > > + struct svc_rdma_op_ctxt *head,
> > > + int *page_no,
> > > + u32 *page_offset,
> > > + u32 rs_handle,
> > > + u32 rs_length,
> > > + u64 rs_offset,
> > > + int last);
> > > +
> > > +/* Issue an RDMA_READ using the local lkey to map the data sink */
> > > +static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
> > > + struct svc_rqst *rqstp,
> > > + struct svc_rdma_op_ctxt *head,
> > > + int *page_no,
> > > + u32 *page_offset,
> > > + u32 rs_handle,
> > > + u32 rs_length,
> > > + u64 rs_offset,
> > > + int last)
> > > +{
> > > + struct ib_send_wr read_wr;
> > > + int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >>
> > > PAGE_SHIFT;
> > > + struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
> > > + int ret, read, pno;
> > > + u32 pg_off = *page_offset;
> > > + u32 pg_no = *page_no;
> > > +
> > > + ctxt->direction = DMA_FROM_DEVICE;
> > > + ctxt->read_hdr = head;
> > > + pages_needed =
> > > + min_t(int, pages_needed, rdma_read_max_sge(xprt,
> > > pages_needed));
> > > + read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
> > > +
> > > + for (pno = 0; pno < pages_needed; pno++) {
> > > + int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
> > > +
> > > + head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
> > > + head->arg.page_len += len;
> > > + head->arg.len += len;
> > > + if (!pg_off)
> > > + head->count++;
> > > + rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
> > > rqstp->rq_next_page = rqstp->rq_respages + 1;
> > > + ctxt->sge[pno].addr =
> > > + ib_dma_map_page(xprt->sc_cm_id->device,
> > > + head->arg.pages[pg_no], pg_off,
> > > + PAGE_SIZE - pg_off,
> > > + DMA_FROM_DEVICE);
> > > + ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > + ctxt->sge[pno].addr);
> > > + if (ret)
> > > + goto err;
> > > + atomic_inc(&xprt->sc_dma_used);
> > >
> > > - byte_count -= sge_bytes;
> > > - ch_bytes -= sge_bytes;
> > > - sge_no++;
> > > - /*
> > > - * If all bytes for this chunk have been mapped to an
> > > - * SGE, move to the next SGE
> > > - */
> > > - if (ch_bytes == 0) {
> > > - chl_map->ch[ch_no].count =
> > > - sge_no - chl_map->ch[ch_no].start;
> > > - ch_no++;
> > > - ch++;
> > > - chl_map->ch[ch_no].start = sge_no;
> > > - ch_bytes = ntohl(ch->rc_target.rs_length);
> > > - /* If bytes remaining account for next chunk */
> > > - if (byte_count) {
> > > - head->arg.page_len += ch_bytes;
> > > - head->arg.len += ch_bytes;
> > > - head->arg.buflen += ch_bytes;
> > > - }
> > > + /* The lkey here is either a local dma lkey or a dma_mr lkey
> > > */
> > > + ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
> > > + ctxt->sge[pno].length = len;
> > > + ctxt->count++;
> > > +
> > > + /* adjust offset and wrap to next page if needed */
> > > + pg_off += len;
> > > + if (pg_off == PAGE_SIZE) {
> > > + pg_off = 0;
> > > + pg_no++;
> > > }
> > > - /*
> > > - * If this SGE consumed all of the page, move to the
> > > - * next page
> > > - */
> > > - if ((sge_bytes + page_off) == PAGE_SIZE) {
> > > - page_no++;
> > > - page_off = 0;
> > > - /*
> > > - * If there are still bytes left to map, bump
> > > - * the page count
> > > - */
> > > - if (byte_count)
> > > - head->count++;
> > > - } else
> > > - page_off += sge_bytes;
> > > + rs_length -= len;
> > > }
> > > - BUG_ON(byte_count != 0);
> > > - return sge_no;
> > > +
> > > + if (last && rs_length == 0)
> > > + set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > > + else
> > > + clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > > +
> > > + memset(&read_wr, 0, sizeof(read_wr));
> > > + read_wr.wr_id = (unsigned long)ctxt;
> > > + read_wr.opcode = IB_WR_RDMA_READ;
> > > + ctxt->wr_op = read_wr.opcode;
> > > + read_wr.send_flags = IB_SEND_SIGNALED;
> > > + read_wr.wr.rdma.rkey = rs_handle;
> > > + read_wr.wr.rdma.remote_addr = rs_offset;
> > > + read_wr.sg_list = ctxt->sge;
> > > + read_wr.num_sge = pages_needed;
> > > +
> > > + ret = svc_rdma_send(xprt, &read_wr);
> > > + if (ret) {
> > > + pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
> > > + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > > + goto err;
> > > + }
> > > +
> > > + /* return current location in page array */
> > > + *page_no = pg_no;
> > > + *page_offset = pg_off;
> > > + ret = read;
> > > + atomic_inc(&rdma_stat_read);
> > > + return ret;
> > > + err:
> > > + svc_rdma_unmap_dma(ctxt);
> > > + svc_rdma_put_context(ctxt, 0);
> > > + return ret;
> > > }
> > >
> > > -/* Map a read-chunk-list to an XDR and fast register the page-list.
> > > - *
> > > - * Assumptions:
> > > - * - chunk[0] position points to pages[0] at an offset of 0
> > > - * - pages[] will be made physically contiguous by creating a one-off
> > > memory
> > > - * region using the fastreg verb.
> > > - * - byte_count is # of bytes in read-chunk-list
> > > - * - ch_count is # of chunks in read-chunk-list
> > > - *
> > > - * Output:
> > > - * - sge array pointing into pages[] array.
> > > - * - chunk_sge array specifying sge index and count for each
> > > - * chunk in the read list
> > > - */
> > > -static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
> > > +/* Issue an RDMA_READ using an FRMR to map the data sink */ static int
> > > +rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
> > > struct svc_rqst *rqstp,
> > > struct svc_rdma_op_ctxt *head,
> > > - struct rpcrdma_msg *rmsgp,
> > > - struct svc_rdma_req_map *rpl_map,
> > > - struct svc_rdma_req_map *chl_map,
> > > - int ch_count,
> > > - int byte_count)
> > > + int *page_no,
> > > + u32 *page_offset,
> > > + u32 rs_handle,
> > > + u32 rs_length,
> > > + u64 rs_offset,
> > > + int last)
> > > {
> > > - int page_no;
> > > - int ch_no;
> > > - u32 offset;
> > > - struct rpcrdma_read_chunk *ch;
> > > - struct svc_rdma_fastreg_mr *frmr;
> > > - int ret = 0;
> > > + struct ib_send_wr read_wr;
> > > + struct ib_send_wr inv_wr;
> > > + struct ib_send_wr fastreg_wr;
> > > + u8 key;
> > > + int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >>
> > > PAGE_SHIFT;
> > > + struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
> > > + struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
> > > + int ret, read, pno;
> > > + u32 pg_off = *page_offset;
> > > + u32 pg_no = *page_no;
> > >
> > > - frmr = svc_rdma_get_frmr(xprt);
> > > if (IS_ERR(frmr))
> > > return -ENOMEM;
> > >
> > > - head->frmr = frmr;
> > > - head->arg.head[0] = rqstp->rq_arg.head[0];
> > > - head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > > - head->arg.pages = &head->pages[head->count];
> > > - head->hdr_count = head->count; /* save count of hdr pages */
> > > - head->arg.page_base = 0;
> > > - head->arg.page_len = byte_count;
> > > - head->arg.len = rqstp->rq_arg.len + byte_count;
> > > - head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
> > > + ctxt->direction = DMA_FROM_DEVICE;
> > > + ctxt->frmr = frmr;
> > > + pages_needed = min_t(int, pages_needed, xprt-
> > > >sc_frmr_pg_list_len);
> > > + read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
> > >
> > > - /* Fast register the page list */
> > > - frmr->kva = page_address(rqstp->rq_arg.pages[0]);
> > > + frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
> > > frmr->direction = DMA_FROM_DEVICE;
> > > frmr->access_flags =
> > > (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
> > > - frmr->map_len = byte_count;
> > > - frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
> > > - for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
> > > - frmr->page_list->page_list[page_no] =
> > > + frmr->map_len = pages_needed << PAGE_SHIFT;
> > > + frmr->page_list_len = pages_needed;
> > > +
> > > + for (pno = 0; pno < pages_needed; pno++) {
> > > + int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
> > > +
> > > + head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
> > > + head->arg.page_len += len;
> > > + head->arg.len += len;
> > > + if (!pg_off)
> > > + head->count++;
> > > + rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
> > > + rqstp->rq_next_page = rqstp->rq_respages + 1;
> > > + frmr->page_list->page_list[pno] =
> > > ib_dma_map_page(xprt->sc_cm_id->device,
> > > - rqstp->rq_arg.pages[page_no], 0,
> > > + head->arg.pages[pg_no], 0,
> > > PAGE_SIZE, DMA_FROM_DEVICE);
> > > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > - frmr->page_list-
> > > >page_list[page_no]))
> > > - goto fatal_err;
> > > + ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > + frmr->page_list->page_list[pno]);
> > > + if (ret)
> > > + goto err;
> > > atomic_inc(&xprt->sc_dma_used);
> > > - head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
> > > - }
> > > - head->count += page_no;
> > > -
> > > - /* rq_respages points one past arg pages */
> > > - rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
> > > - rqstp->rq_next_page = rqstp->rq_respages + 1;
> > >
> > > - /* Create the reply and chunk maps */
> > > - offset = 0;
> > > - ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > > >rm_body.rm_chunks[0];
> > > - for (ch_no = 0; ch_no < ch_count; ch_no++) {
> > > - int len = ntohl(ch->rc_target.rs_length);
> > > - rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
> > > - rpl_map->sge[ch_no].iov_len = len;
> > > - chl_map->ch[ch_no].count = 1;
> > > - chl_map->ch[ch_no].start = ch_no;
> > > - offset += len;
> > > - ch++;
> > > + /* adjust offset and wrap to next page if needed */
> > > + pg_off += len;
> > > + if (pg_off == PAGE_SIZE) {
> > > + pg_off = 0;
> > > + pg_no++;
> > > + }
> > > + rs_length -= len;
> > > }
> > >
> > > - ret = svc_rdma_fastreg(xprt, frmr);
> > > - if (ret)
> > > - goto fatal_err;
> > > -
> > > - return ch_no;
> > > -
> > > - fatal_err:
> > > - printk("svcrdma: error fast registering xdr for xprt %p", xprt);
> > > - svc_rdma_put_frmr(xprt, frmr);
> > > - return -EIO;
> > > -}
> > > -
> > > -static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
> > > - struct svc_rdma_op_ctxt *ctxt,
> > > - struct svc_rdma_fastreg_mr *frmr,
> > > - struct kvec *vec,
> > > - u64 *sgl_offset,
> > > - int count)
> > > -{
> > > - int i;
> > > - unsigned long off;
> > > + if (last && rs_length == 0)
> > > + set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > > + else
> > > + clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > >
> > > - ctxt->count = count;
> > > - ctxt->direction = DMA_FROM_DEVICE;
> > > - for (i = 0; i < count; i++) {
> > > - ctxt->sge[i].length = 0; /* in case map fails */
> > > - if (!frmr) {
> > > - BUG_ON(!virt_to_page(vec[i].iov_base));
> > > - off = (unsigned long)vec[i].iov_base &
> > > ~PAGE_MASK;
> > > - ctxt->sge[i].addr =
> > > - ib_dma_map_page(xprt->sc_cm_id->device,
> > > -
> > > virt_to_page(vec[i].iov_base),
> > > - off,
> > > - vec[i].iov_len,
> > > - DMA_FROM_DEVICE);
> > > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > - ctxt->sge[i].addr))
> > > - return -EINVAL;
> > > - ctxt->sge[i].lkey = xprt->sc_dma_lkey;
> > > - atomic_inc(&xprt->sc_dma_used);
> > > - } else {
> > > - ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
> > > - ctxt->sge[i].lkey = frmr->mr->lkey;
> > > - }
> > > - ctxt->sge[i].length = vec[i].iov_len;
> > > - *sgl_offset = *sgl_offset + vec[i].iov_len;
> > > + /* Bump the key */
> > > + key = (u8)(frmr->mr->lkey & 0x000000FF);
> > > + ib_update_fast_reg_key(frmr->mr, ++key);
> > > +
> > > + ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
> > > + ctxt->sge[0].lkey = frmr->mr->lkey;
> > > + ctxt->sge[0].length = read;
> > > + ctxt->count = 1;
> > > + ctxt->read_hdr = head;
> > > +
> > > + /* Prepare FASTREG WR */
> > > + memset(&fastreg_wr, 0, sizeof(fastreg_wr));
> > > + fastreg_wr.opcode = IB_WR_FAST_REG_MR;
> > > + fastreg_wr.send_flags = IB_SEND_SIGNALED;
> > > + fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
> > > + fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
> > > + fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
> > > + fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
> > > + fastreg_wr.wr.fast_reg.length = frmr->map_len;
> > > + fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
> > > + fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
> > > + fastreg_wr.next = &read_wr;
> > > +
> > > + /* Prepare RDMA_READ */
> > > + memset(&read_wr, 0, sizeof(read_wr));
> > > + read_wr.send_flags = IB_SEND_SIGNALED;
> > > + read_wr.wr.rdma.rkey = rs_handle;
> > > + read_wr.wr.rdma.remote_addr = rs_offset;
> > > + read_wr.sg_list = ctxt->sge;
> > > + read_wr.num_sge = 1;
> > > + if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
> > > + read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
> > > + read_wr.wr_id = (unsigned long)ctxt;
> > > + read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
> > > + } else {
> > > + read_wr.opcode = IB_WR_RDMA_READ;
> > > + read_wr.next = &inv_wr;
> > > + /* Prepare invalidate */
> > > + memset(&inv_wr, 0, sizeof(inv_wr));
> > > + inv_wr.wr_id = (unsigned long)ctxt;
> > > + inv_wr.opcode = IB_WR_LOCAL_INV;
> > > + inv_wr.send_flags = IB_SEND_SIGNALED;
> >
> > Change this to inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
> >
> > > + inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
> > > + }
> > > + ctxt->wr_op = read_wr.opcode;
> > > +
> > > + /* Post the chain */
> > > + ret = svc_rdma_send(xprt, &fastreg_wr);
> > > + if (ret) {
> > > + pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
> > > + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > > + goto err;
> > > }
> > > - return 0;
> > > -}
> > >
> > > -static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) -{
> > > - if ((rdma_node_get_transport(xprt->sc_cm_id->device-
> > > >node_type) ==
> > > - RDMA_TRANSPORT_IWARP) &&
> > > - sge_count > 1)
> > > - return 1;
> > > - else
> > > - return min_t(int, sge_count, xprt->sc_max_sge);
> > > + /* return current location in page array */
> > > + *page_no = pg_no;
> > > + *page_offset = pg_off;
> > > + ret = read;
> > > + atomic_inc(&rdma_stat_read);
> > > + return ret;
> > > + err:
> > > + svc_rdma_unmap_dma(ctxt);
> > > + svc_rdma_put_context(ctxt, 0);
> > > + svc_rdma_put_frmr(xprt, frmr);
> > > + return ret;
> > > }
> > >
> > > -/*
> > > - * Use RDMA_READ to read data from the advertised client buffer into the
> > > - * XDR stream starting at rq_arg.head[0].iov_base.
> > > - * Each chunk in the array
> > > - * contains the following fields:
> > > - * discrim - '1', This isn't used for data placement
> > > - * position - The xdr stream offset (the same for every chunk)
> > > - * handle - RMR for client memory region
> > > - * length - data transfer length
> > > - * offset - 64 bit tagged offset in remote memory region
> > > - *
> > > - * On our side, we need to read into a pagelist. The first page immediately
> > > - * follows the RPC header.
> > > - *
> > > - * This function returns:
> > > - * 0 - No error and no read-list found.
> > > - *
> > > - * 1 - Successful read-list processing. The data is not yet in
> > > - * the pagelist and therefore the RPC request must be deferred. The
> > > - * I/O completion will enqueue the transport again and
> > > - * svc_rdma_recvfrom will complete the request.
> > > - *
> > > - * <0 - Error processing/posting read-list.
> > > - *
> > > - * NOTE: The ctxt must not be touched after the last WR has been posted
> > > - * because the I/O completion processing may occur on another
> > > - * processor and free / modify the context. Ne touche pas!
> > > - */
> > > -static int rdma_read_xdr(struct svcxprt_rdma *xprt,
> > > - struct rpcrdma_msg *rmsgp,
> > > - struct svc_rqst *rqstp,
> > > - struct svc_rdma_op_ctxt *hdr_ctxt)
> > > +static int rdma_read_chunks(struct svcxprt_rdma *xprt,
> > > + struct rpcrdma_msg *rmsgp,
> > > + struct svc_rqst *rqstp,
> > > + struct svc_rdma_op_ctxt *head)
> > > {
> > > - struct ib_send_wr read_wr;
> > > - struct ib_send_wr inv_wr;
> > > - int err = 0;
> > > - int ch_no;
> > > - int ch_count;
> > > - int byte_count;
> > > - int sge_count;
> > > - u64 sgl_offset;
> > > + int page_no, ch_count, ret;
> > > struct rpcrdma_read_chunk *ch;
> > > - struct svc_rdma_op_ctxt *ctxt = NULL;
> > > - struct svc_rdma_req_map *rpl_map;
> > > - struct svc_rdma_req_map *chl_map;
> > > + u32 page_offset, byte_count;
> > > + u64 rs_offset;
> > > + rdma_reader_fn reader;
> > >
> > > /* If no read list is present, return 0 */
> > > ch = svc_rdma_get_read_chunk(rmsgp);
> > > @@ -408,122 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma
> > > *xprt,
> > > if (ch_count > RPCSVC_MAXPAGES)
> > > return -EINVAL;
> > >
> > > - /* Allocate temporary reply and chunk maps */
> > > - rpl_map = svc_rdma_get_req_map();
> > > - chl_map = svc_rdma_get_req_map();
> > > + /* The request is completed when the RDMA_READs complete. The
> > > + * head context keeps all the pages that comprise the
> > > + * request.
> > > + */
> > > + head->arg.head[0] = rqstp->rq_arg.head[0];
> > > + head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > > + head->arg.pages = &head->pages[head->count];
> > > + head->hdr_count = head->count;
> > > + head->arg.page_base = 0;
> > > + head->arg.page_len = 0;
> > > + head->arg.len = rqstp->rq_arg.len;
> > > + head->arg.buflen = rqstp->rq_arg.buflen;
> > >
> > > - if (!xprt->sc_frmr_pg_list_len)
> > > - sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
> > > - rpl_map, chl_map, ch_count,
> > > - byte_count);
> > > + /* Use FRMR if supported */
> > > + if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
> > > + reader = rdma_read_chunk_frmr;
> > > else
> > > - sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt,
> > > rmsgp,
> > > - rpl_map, chl_map, ch_count,
> > > - byte_count);
> > > - if (sge_count < 0) {
> > > - err = -EIO;
> > > - goto out;
> > > - }
> > > -
> > > - sgl_offset = 0;
> > > - ch_no = 0;
> > > + reader = rdma_read_chunk_lcl;
> > >
> > > + page_no = 0; page_offset = 0;
> > > for (ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > > >rm_body.rm_chunks[0];
> > > - ch->rc_discrim != 0; ch++, ch_no++) {
> > > - u64 rs_offset;
> > > -next_sge:
> > > - ctxt = svc_rdma_get_context(xprt);
> > > - ctxt->direction = DMA_FROM_DEVICE;
> > > - ctxt->frmr = hdr_ctxt->frmr;
> > > - ctxt->read_hdr = NULL;
> > > - clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > > - clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > > + ch->rc_discrim != 0; ch++) {
> > >
> > > - /* Prepare READ WR */
> > > - memset(&read_wr, 0, sizeof read_wr);
> > > - read_wr.wr_id = (unsigned long)ctxt;
> > > - read_wr.opcode = IB_WR_RDMA_READ;
> > > - ctxt->wr_op = read_wr.opcode;
> > > - read_wr.send_flags = IB_SEND_SIGNALED;
> > > - read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
> > > xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
> > > &rs_offset);
> > > - read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
> > > - read_wr.sg_list = ctxt->sge;
> > > - read_wr.num_sge =
> > > - rdma_read_max_sge(xprt, chl_map-
> > > >ch[ch_no].count);
> > > - err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
> > > - &rpl_map->sge[chl_map-
> > > >ch[ch_no].start],
> > > - &sgl_offset,
> > > - read_wr.num_sge);
> > > - if (err) {
> > > - svc_rdma_unmap_dma(ctxt);
> > > - svc_rdma_put_context(ctxt, 0);
> > > - goto out;
> > > - }
> > > - if (((ch+1)->rc_discrim == 0) &&
> > > - (read_wr.num_sge == chl_map->ch[ch_no].count)) {
> > > - /*
> > > - * Mark the last RDMA_READ with a bit to
> > > - * indicate all RPC data has been fetched from
> > > - * the client and the RPC needs to be enqueued.
> > > - */
> > > - set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > > - if (hdr_ctxt->frmr) {
> > > - set_bit(RDMACTXT_F_FAST_UNREG, &ctxt-
> > > >flags);
> > > - /*
> > > - * Invalidate the local MR used to map the
> > > data
> > > - * sink.
> > > - */
> > > - if (xprt->sc_dev_caps &
> > > - SVCRDMA_DEVCAP_READ_W_INV) {
> > > - read_wr.opcode =
> > > -
> > > IB_WR_RDMA_READ_WITH_INV;
> > > - ctxt->wr_op = read_wr.opcode;
> > > - read_wr.ex.invalidate_rkey =
> > > - ctxt->frmr->mr->lkey;
> > > - } else {
> > > - /* Prepare INVALIDATE WR */
> > > - memset(&inv_wr, 0, sizeof inv_wr);
> > > - inv_wr.opcode = IB_WR_LOCAL_INV;
> > > - inv_wr.send_flags =
> > > IB_SEND_SIGNALED;
> > > - inv_wr.ex.invalidate_rkey =
> > > - hdr_ctxt->frmr->mr->lkey;
> > > - read_wr.next = &inv_wr;
> > > - }
> > > - }
> > > - ctxt->read_hdr = hdr_ctxt;
> > > - }
> > > - /* Post the read */
> > > - err = svc_rdma_send(xprt, &read_wr);
> > > - if (err) {
> > > - printk(KERN_ERR "svcrdma: Error %d posting
> > > RDMA_READ\n",
> > > - err);
> > > - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > > - svc_rdma_unmap_dma(ctxt);
> > > - svc_rdma_put_context(ctxt, 0);
> > > - goto out;
> > > + byte_count = ntohl(ch->rc_target.rs_length);
> > > +
> > > + while (byte_count > 0) {
> > > + ret = reader(xprt, rqstp, head,
> > > + &page_no, &page_offset,
> > > + ntohl(ch->rc_target.rs_handle),
> > > + byte_count, rs_offset,
> > > + ((ch+1)->rc_discrim == 0) /* last */
> > > + );
> > > + if (ret < 0)
> > > + goto err;
> > > + byte_count -= ret;
> > > + rs_offset += ret;
> > > + head->arg.buflen += ret;
> > > }
> > > - atomic_inc(&rdma_stat_read);
> > > -
> > > - if (read_wr.num_sge < chl_map->ch[ch_no].count) {
> > > - chl_map->ch[ch_no].count -= read_wr.num_sge;
> > > - chl_map->ch[ch_no].start += read_wr.num_sge;
> > > - goto next_sge;
> > > - }
> > > - sgl_offset = 0;
> > > - err = 1;
> > > }
> > > -
> > > - out:
> > > - svc_rdma_put_req_map(rpl_map);
> > > - svc_rdma_put_req_map(chl_map);
> > > -
> > > + ret = 1;
> > > + err:
> > > /* Detach arg pages. svc_recv will replenish them */
> > > - for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages;
> > > ch_no++)
> > > - rqstp->rq_pages[ch_no] = NULL;
> > > + for (page_no = 0;
> > > + &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
> > > + rqstp->rq_pages[page_no] = NULL;
> > >
> > > - return err;
> > > + return ret;
> > > }
> > >
> > > static int rdma_read_complete(struct svc_rqst *rqstp, @@ -595,13 +504,9
> > > @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> > > struct svc_rdma_op_ctxt,
> > > dto_q);
> > > list_del_init(&ctxt->dto_q);
> > > - }
> > > - if (ctxt) {
> > > spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
> > > return rdma_read_complete(rqstp, ctxt);
> > > - }
> > > -
> > > - if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
> > > + } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
> > > ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
> > > struct svc_rdma_op_ctxt,
> > > dto_q);
> > > @@ -621,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> > > if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
> > > goto close_out;
> > >
> > > - BUG_ON(ret);
> > > goto out;
> > > }
> > > dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p,
> > > status=%d\n", @@ -644,12 +548,11 @@ int svc_rdma_recvfrom(struct
> > > svc_rqst *rqstp)
> > > }
> > >
> > > /* Read read-list data. */
> > > - ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
> > > + ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
> > > if (ret > 0) {
> > > /* read-list posted, defer until data received from client. */
> > > goto defer;
> > > - }
> > > - if (ret < 0) {
> > > + } else if (ret < 0) {
> > > /* Post of read-list failed, free context. */
> > > svc_rdma_put_context(ctxt, 1);
> > > return 0;
> > > diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > > b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > > index 7e024a5..49fd21a 100644
> > > --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > > +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > > @@ -1,4 +1,5 @@
> > > /*
> > > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> > > * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> > > *
> > > * This software is available to you under a choice of one of two @@ -49,152
> > > +50,6 @@
> > >
> > > #define RPCDBG_FACILITY RPCDBG_SVCXPRT
> > >
> > > -/* Encode an XDR as an array of IB SGE
> > > - *
> > > - * Assumptions:
> > > - * - head[0] is physically contiguous.
> > > - * - tail[0] is physically contiguous.
> > > - * - pages[] is not physically or virtually contiguous and consists of
> > > - * PAGE_SIZE elements.
> > > - *
> > > - * Output:
> > > - * SGE[0] reserved for RCPRDMA header
> > > - * SGE[1] data from xdr->head[]
> > > - * SGE[2..sge_count-2] data from xdr->pages[]
> > > - * SGE[sge_count-1] data from xdr->tail.
> > > - *
> > > - * The max SGE we need is the length of the XDR / pagesize + one for
> > > - * head + one for tail + one for RPCRDMA header. Since
> > > RPCSVC_MAXPAGES
> > > - * reserves a page for both the request and the reply header, and this
> > > - * array is only concerned with the reply we are assured that we have
> > > - * on extra page for the RPCRMDA header.
> > > - */
> > > -static int fast_reg_xdr(struct svcxprt_rdma *xprt,
> > > - struct xdr_buf *xdr,
> > > - struct svc_rdma_req_map *vec)
> > > -{
> > > - int sge_no;
> > > - u32 sge_bytes;
> > > - u32 page_bytes;
> > > - u32 page_off;
> > > - int page_no = 0;
> > > - u8 *frva;
> > > - struct svc_rdma_fastreg_mr *frmr;
> > > -
> > > - frmr = svc_rdma_get_frmr(xprt);
> > > - if (IS_ERR(frmr))
> > > - return -ENOMEM;
> > > - vec->frmr = frmr;
> > > -
> > > - /* Skip the RPCRDMA header */
> > > - sge_no = 1;
> > > -
> > > - /* Map the head. */
> > > - frva = (void *)((unsigned long)(xdr->head[0].iov_base) &
> > > PAGE_MASK);
> > > - vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
> > > - vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
> > > - vec->count = 2;
> > > - sge_no++;
> > > -
> > > - /* Map the XDR head */
> > > - frmr->kva = frva;
> > > - frmr->direction = DMA_TO_DEVICE;
> > > - frmr->access_flags = 0;
> > > - frmr->map_len = PAGE_SIZE;
> > > - frmr->page_list_len = 1;
> > > - page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
> > > - frmr->page_list->page_list[page_no] =
> > > - ib_dma_map_page(xprt->sc_cm_id->device,
> > > - virt_to_page(xdr->head[0].iov_base),
> > > - page_off,
> > > - PAGE_SIZE - page_off,
> > > - DMA_TO_DEVICE);
> > > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > - frmr->page_list->page_list[page_no]))
> > > - goto fatal_err;
> > > - atomic_inc(&xprt->sc_dma_used);
> > > -
> > > - /* Map the XDR page list */
> > > - page_off = xdr->page_base;
> > > - page_bytes = xdr->page_len + page_off;
> > > - if (!page_bytes)
> > > - goto encode_tail;
> > > -
> > > - /* Map the pages */
> > > - vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
> > > - vec->sge[sge_no].iov_len = page_bytes;
> > > - sge_no++;
> > > - while (page_bytes) {
> > > - struct page *page;
> > > -
> > > - page = xdr->pages[page_no++];
> > > - sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE -
> > > page_off));
> > > - page_bytes -= sge_bytes;
> > > -
> > > - frmr->page_list->page_list[page_no] =
> > > - ib_dma_map_page(xprt->sc_cm_id->device,
> > > - page, page_off,
> > > - sge_bytes, DMA_TO_DEVICE);
> > > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > - frmr->page_list-
> > > >page_list[page_no]))
> > > - goto fatal_err;
> > > -
> > > - atomic_inc(&xprt->sc_dma_used);
> > > - page_off = 0; /* reset for next time through loop */
> > > - frmr->map_len += PAGE_SIZE;
> > > - frmr->page_list_len++;
> > > - }
> > > - vec->count++;
> > > -
> > > - encode_tail:
> > > - /* Map tail */
> > > - if (0 == xdr->tail[0].iov_len)
> > > - goto done;
> > > -
> > > - vec->count++;
> > > - vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
> > > -
> > > - if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
> > > - ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
> > > - /*
> > > - * If head and tail use the same page, we don't need
> > > - * to map it again.
> > > - */
> > > - vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
> > > - } else {
> > > - void *va;
> > > -
> > > - /* Map another page for the tail */
> > > - page_off = (unsigned long)xdr->tail[0].iov_base &
> > > ~PAGE_MASK;
> > > - va = (void *)((unsigned long)xdr->tail[0].iov_base &
> > > PAGE_MASK);
> > > - vec->sge[sge_no].iov_base = frva + frmr->map_len +
> > > page_off;
> > > -
> > > - frmr->page_list->page_list[page_no] =
> > > - ib_dma_map_page(xprt->sc_cm_id->device,
> > > virt_to_page(va),
> > > - page_off,
> > > - PAGE_SIZE,
> > > - DMA_TO_DEVICE);
> > > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > - frmr->page_list-
> > > >page_list[page_no]))
> > > - goto fatal_err;
> > > - atomic_inc(&xprt->sc_dma_used);
> > > - frmr->map_len += PAGE_SIZE;
> > > - frmr->page_list_len++;
> > > - }
> > > -
> > > - done:
> > > - if (svc_rdma_fastreg(xprt, frmr))
> > > - goto fatal_err;
> > > -
> > > - return 0;
> > > -
> > > - fatal_err:
> > > - printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
> > > - vec->frmr = NULL;
> > > - svc_rdma_put_frmr(xprt, frmr);
> > > - return -EIO;
> > > -}
> > > -
> > > static int map_xdr(struct svcxprt_rdma *xprt,
> > > struct xdr_buf *xdr,
> > > struct svc_rdma_req_map *vec)
> > > @@ -208,9 +63,6 @@ static int map_xdr(struct svcxprt_rdma *xprt,
> > > BUG_ON(xdr->len !=
> > > (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
> > >
> > > - if (xprt->sc_frmr_pg_list_len)
> > > - return fast_reg_xdr(xprt, xdr, vec);
> > > -
> > > /* Skip the first sge, this is for the RPCRDMA header */
> > > sge_no = 1;
> > >
> > > @@ -282,8 +134,6 @@ static dma_addr_t dma_map_xdr(struct
> > > svcxprt_rdma *xprt, }
> > >
> > > /* Assumptions:
> > > - * - We are using FRMR
> > > - * - or -
> > > * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
> > > */
> > > static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, @@ -
> > > 327,23 +177,16 @@ static int send_write(struct svcxprt_rdma *xprt, struct
> > > svc_rqst *rqstp,
> > > sge_bytes = min_t(size_t,
> > > bc, vec->sge[xdr_sge_no].iov_len-sge_off);
> > > sge[sge_no].length = sge_bytes;
> > > - if (!vec->frmr) {
> > > - sge[sge_no].addr =
> > > - dma_map_xdr(xprt, &rqstp->rq_res,
> > > xdr_off,
> > > - sge_bytes, DMA_TO_DEVICE);
> > > - xdr_off += sge_bytes;
> > > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > - sge[sge_no].addr))
> > > - goto err;
> > > - atomic_inc(&xprt->sc_dma_used);
> > > - sge[sge_no].lkey = xprt->sc_dma_lkey;
> > > - } else {
> > > - sge[sge_no].addr = (unsigned long)
> > > - vec->sge[xdr_sge_no].iov_base + sge_off;
> > > - sge[sge_no].lkey = vec->frmr->mr->lkey;
> > > - }
> > > + sge[sge_no].addr =
> > > + dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
> > > + sge_bytes, DMA_TO_DEVICE);
> > > + xdr_off += sge_bytes;
> > > + if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > > + sge[sge_no].addr))
> > > + goto err;
> > > + atomic_inc(&xprt->sc_dma_used);
> > > + sge[sge_no].lkey = xprt->sc_dma_lkey;
> > > ctxt->count++;
> > > - ctxt->frmr = vec->frmr;
> > > sge_off = 0;
> > > sge_no++;
> > > xdr_sge_no++;
> > > @@ -369,7 +212,6 @@ static int send_write(struct svcxprt_rdma *xprt, struct
> > > svc_rqst *rqstp,
> > > return 0;
> > > err:
> > > svc_rdma_unmap_dma(ctxt);
> > > - svc_rdma_put_frmr(xprt, vec->frmr);
> > > svc_rdma_put_context(ctxt, 0);
> > > /* Fatal error, close transport */
> > > return -EIO;
> > > @@ -397,10 +239,7 @@ static int send_write_chunks(struct svcxprt_rdma
> > > *xprt,
> > > res_ary = (struct rpcrdma_write_array *)
> > > &rdma_resp->rm_body.rm_chunks[1];
> > >
> > > - if (vec->frmr)
> > > - max_write = vec->frmr->map_len;
> > > - else
> > > - max_write = xprt->sc_max_sge * PAGE_SIZE;
> > > + max_write = xprt->sc_max_sge * PAGE_SIZE;
> > >
> > > /* Write chunks start at the pagelist */
> > > for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; @@ -
> > > 472,10 +311,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
> > > res_ary = (struct rpcrdma_write_array *)
> > > &rdma_resp->rm_body.rm_chunks[2];
> > >
> > > - if (vec->frmr)
> > > - max_write = vec->frmr->map_len;
> > > - else
> > > - max_write = xprt->sc_max_sge * PAGE_SIZE;
> > > + max_write = xprt->sc_max_sge * PAGE_SIZE;
> > >
> > > /* xdr offset starts at RPC message */
> > > nchunks = ntohl(arg_ary->wc_nchunks);
> > > @@ -545,7 +381,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > > int byte_count)
> > > {
> > > struct ib_send_wr send_wr;
> > > - struct ib_send_wr inv_wr;
> > > int sge_no;
> > > int sge_bytes;
> > > int page_no;
> > > @@ -559,7 +394,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > > "svcrdma: could not post a receive buffer, err=%d."
> > > "Closing transport %p.\n", ret, rdma);
> > > set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
> > > - svc_rdma_put_frmr(rdma, vec->frmr);
> > > svc_rdma_put_context(ctxt, 0);
> > > return -ENOTCONN;
> > > }
> > > @@ -567,11 +401,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > > /* Prepare the context */
> > > ctxt->pages[0] = page;
> > > ctxt->count = 1;
> > > - ctxt->frmr = vec->frmr;
> > > - if (vec->frmr)
> > > - set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > > - else
> > > - clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > >
> > > /* Prepare the SGE for the RPCRDMA Header */
> > > ctxt->sge[0].lkey = rdma->sc_dma_lkey; @@ -590,21 +419,15 @@
> > > static int send_reply(struct svcxprt_rdma *rdma,
> > > int xdr_off = 0;
> > > sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len,
> > > byte_count);
> > > byte_count -= sge_bytes;
> > > - if (!vec->frmr) {
> > > - ctxt->sge[sge_no].addr =
> > > - dma_map_xdr(rdma, &rqstp->rq_res,
> > > xdr_off,
> > > - sge_bytes, DMA_TO_DEVICE);
> > > - xdr_off += sge_bytes;
> > > - if (ib_dma_mapping_error(rdma->sc_cm_id->device,
> > > - ctxt->sge[sge_no].addr))
> > > - goto err;
> > > - atomic_inc(&rdma->sc_dma_used);
> > > - ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
> > > - } else {
> > > - ctxt->sge[sge_no].addr = (unsigned long)
> > > - vec->sge[sge_no].iov_base;
> > > - ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
> > > - }
> > > + ctxt->sge[sge_no].addr =
> > > + dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
> > > + sge_bytes, DMA_TO_DEVICE);
> > > + xdr_off += sge_bytes;
> > > + if (ib_dma_mapping_error(rdma->sc_cm_id->device,
> > > + ctxt->sge[sge_no].addr))
> > > + goto err;
> > > + atomic_inc(&rdma->sc_dma_used);
> > > + ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
> > > ctxt->sge[sge_no].length = sge_bytes;
> > > }
> > > BUG_ON(byte_count != 0);
> > > @@ -627,6 +450,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > > ctxt->sge[page_no+1].length = 0;
> > > }
> > > rqstp->rq_next_page = rqstp->rq_respages + 1;
> > > +
> > > BUG_ON(sge_no > rdma->sc_max_sge);
> > > memset(&send_wr, 0, sizeof send_wr);
> > > ctxt->wr_op = IB_WR_SEND;
> > > @@ -635,15 +459,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > > send_wr.num_sge = sge_no;
> > > send_wr.opcode = IB_WR_SEND;
> > > send_wr.send_flags = IB_SEND_SIGNALED;
> > > - if (vec->frmr) {
> > > - /* Prepare INVALIDATE WR */
> > > - memset(&inv_wr, 0, sizeof inv_wr);
> > > - inv_wr.opcode = IB_WR_LOCAL_INV;
> > > - inv_wr.send_flags = IB_SEND_SIGNALED;
> > > - inv_wr.ex.invalidate_rkey =
> > > - vec->frmr->mr->lkey;
> > > - send_wr.next = &inv_wr;
> > > - }
> > >
> > > ret = svc_rdma_send(rdma, &send_wr);
> > > if (ret)
> > > @@ -653,7 +468,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > >
> > > err:
> > > svc_rdma_unmap_dma(ctxt);
> > > - svc_rdma_put_frmr(rdma, vec->frmr);
> > > svc_rdma_put_context(ctxt, 1);
> > > return -EIO;
> > > }
> > > diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > > b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > > index 25688fa..2c5b201 100644
> > > --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > > +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > > @@ -1,4 +1,5 @@
> > > /*
> > > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> > > * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
> > > *
> > > * This software is available to you under a choice of one of two @@ -160,7
> > > +161,6 @@ struct svc_rdma_req_map *svc_rdma_get_req_map(void)
> > > schedule_timeout_uninterruptible(msecs_to_jiffies(500));
> > > }
> > > map->count = 0;
> > > - map->frmr = NULL;
> > > return map;
> > > }
> > >
> > > @@ -336,22 +336,21 @@ static void process_context(struct svcxprt_rdma
> > > *xprt,
> > >
> > > switch (ctxt->wr_op) {
> > > case IB_WR_SEND:
> > > - if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
> > > - svc_rdma_put_frmr(xprt, ctxt->frmr);
> > > + BUG_ON(ctxt->frmr);
> > > svc_rdma_put_context(ctxt, 1);
> > > break;
> > >
> > > case IB_WR_RDMA_WRITE:
> > > + BUG_ON(ctxt->frmr);
> > > svc_rdma_put_context(ctxt, 0);
> > > break;
> > >
> > > case IB_WR_RDMA_READ:
> > > case IB_WR_RDMA_READ_WITH_INV:
> > > + svc_rdma_put_frmr(xprt, ctxt->frmr);
> > > if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
> > > struct svc_rdma_op_ctxt *read_hdr = ctxt-
> > > >read_hdr;
> > > BUG_ON(!read_hdr);
> > > - if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt-
> > > >flags))
> > > - svc_rdma_put_frmr(xprt, ctxt->frmr);
> > > spin_lock_bh(&xprt->sc_rq_dto_lock);
> > > set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
> > > list_add_tail(&read_hdr->dto_q,
> > > @@ -363,6 +362,7 @@ static void process_context(struct svcxprt_rdma
> > > *xprt,
> > > break;
> > >
> > > default:
> > > + BUG_ON(1);
> > > printk(KERN_ERR "svcrdma: unexpected completion type, "
> > > "opcode=%d\n",
> > > ctxt->wr_op);
> > > @@ -378,29 +378,42 @@ static void process_context(struct svcxprt_rdma
> > > *xprt, static void sq_cq_reap(struct svcxprt_rdma *xprt) {
> > > struct svc_rdma_op_ctxt *ctxt = NULL;
> > > - struct ib_wc wc;
> > > + struct ib_wc wc_a[6];
> > > + struct ib_wc *wc;
> > > struct ib_cq *cq = xprt->sc_sq_cq;
> > > int ret;
> > >
> > > + memset(wc_a, 0, sizeof(wc_a));
> > > +
> > > if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
> > > return;
> > >
> > > ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> > > atomic_inc(&rdma_stat_sq_poll);
> > > - while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
> > > - if (wc.status != IB_WC_SUCCESS)
> > > - /* Close the transport */
> > > - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > > + while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
> > > + int i;
> > >
> > > - /* Decrement used SQ WR count */
> > > - atomic_dec(&xprt->sc_sq_count);
> > > - wake_up(&xprt->sc_send_wait);
> > > + for (i = 0; i < ret; i++) {
> > > + wc = &wc_a[i];
> > > + if (wc->status != IB_WC_SUCCESS) {
> > > + dprintk("svcrdma: sq wc err status %d\n",
> > > + wc->status);
> > >
> > > - ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
> > > - if (ctxt)
> > > - process_context(xprt, ctxt);
> > > + /* Close the transport */
> > > + set_bit(XPT_CLOSE, &xprt-
> > > >sc_xprt.xpt_flags);
> > > + }
> > >
> > > - svc_xprt_put(&xprt->sc_xprt);
> > > + /* Decrement used SQ WR count */
> > > + atomic_dec(&xprt->sc_sq_count);
> > > + wake_up(&xprt->sc_send_wait);
> > > +
> > > + ctxt = (struct svc_rdma_op_ctxt *)
> > > + (unsigned long)wc->wr_id;
> > > + if (ctxt)
> > > + process_context(xprt, ctxt);
> > > +
> > > + svc_xprt_put(&xprt->sc_xprt);
> > > + }
> > > }
> > >
> > > if (ctxt)
> > > @@ -993,7 +1006,11 @@ static struct svc_xprt *svc_rdma_accept(struct
> > > svc_xprt *xprt)
> > > need_dma_mr = 0;
> > > break;
> > > case RDMA_TRANSPORT_IB:
> > > - if (!(devattr.device_cap_flags &
> > > IB_DEVICE_LOCAL_DMA_LKEY)) {
> > > + if (!(newxprt->sc_dev_caps &
> > > SVCRDMA_DEVCAP_FAST_REG)) {
> > > + need_dma_mr = 1;
> > > + dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
> > > + } else if (!(devattr.device_cap_flags &
> > > + IB_DEVICE_LOCAL_DMA_LKEY)) {
> > > need_dma_mr = 1;
> > > dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
> > > } else
> > > @@ -1190,14 +1207,7 @@ static int svc_rdma_has_wspace(struct svc_xprt
> > > *xprt)
> > > container_of(xprt, struct svcxprt_rdma, sc_xprt);
> > >
> > > /*
> > > - * If there are fewer SQ WR available than required to send a
> > > - * simple response, return false.
> > > - */
> > > - if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
> > > - return 0;
> > > -
> > > - /*
> > > - * ...or there are already waiters on the SQ,
> > > + * If there are already waiters on the SQ,
> > > * return false.
> > > */
> > > if (waitqueue_active(&rdma->sc_send_wait))
> > >
> > > --
> > > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the
> > > body of a message to majordomo@vger.kernel.org More majordomo info at
> > > http://vger.kernel.org/majordomo-info.html
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
next prev parent reply other threads:[~2014-06-02 16:47 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2014-05-29 16:55 [PATCH V3] svcrdma: refactor marshalling logic Steve Wise
2014-05-29 23:43 ` Chuck Lever
2014-05-30 5:29 ` Devesh Sharma
2014-05-30 13:02 ` Steve Wise
2014-05-31 3:34 ` Devesh Sharma
2014-06-02 16:47 ` Steve Wise [this message]
2014-06-02 16:51 ` J. Bruce Fields
2014-06-02 16:52 ` Steve Wise
2014-06-02 16:57 ` 'J. Bruce Fields'
2014-06-02 17:06 ` Steve Wise
2014-06-02 18:10 ` 'J. Bruce Fields'
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to='004101cf7e82$68b58ad0$3a20a070$@opengridcomputing.com' \
--to=swise@opengridcomputing.com \
--cc=Devesh.Sharma@Emulex.Com \
--cc=bfields@fieldses.org \
--cc=linux-nfs@vger.kernel.org \
--cc=linux-rdma@vger.kernel.org \
--cc=tom@opengridcomputing.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).