From: "Steve Wise" <swise@opengridcomputing.com>
To: "'Devesh Sharma'" <Devesh.Sharma@Emulex.Com>, <bfields@fieldses.org>
Cc: <linux-nfs@vger.kernel.org>, <linux-rdma@vger.kernel.org>,
<tom@opengridcomputing.com>
Subject: RE: [PATCH V3] svcrdma: refactor marshalling logic
Date: Fri, 30 May 2014 08:02:45 -0500 [thread overview]
Message-ID: <000001cf7c07$72e613d0$58b23b70$@opengridcomputing.com> (raw)
In-Reply-To: <EE7902D3F51F404C82415C4803930ACD3FE01265@CMEXMB1.ad.emulex.com>
>
> Hi Steve
>
> I am testing this patch. I have found that when server tries to initiate RDMA-READ on ocrdma
> device the RDMA-READ posting fails because there is no FENCE bit set for
> Non-iwarp device which is using frmr. Because of this, whenever server tries to initiate
> RDMA_READ operation, it fails with completion error.
> This bug was there in v1 and v2 as well.
>
Why would the FENCE bit not be required for mlx4, mthca, cxgb4, and yet be required for ocrdma?
> Check inline for the exact location of the change.
>
> Rest is okay from my side, iozone is passing with this patch. Off-course after putting a FENCE
> indicator.
>
> -Regards
> Devesh
>
> > -----Original Message-----
> > From: linux-rdma-owner@vger.kernel.org [mailto:linux-rdma-
> > owner@vger.kernel.org] On Behalf Of Steve Wise
> > Sent: Thursday, May 29, 2014 10:26 PM
> > To: bfields@fieldses.org
> > Cc: linux-nfs@vger.kernel.org; linux-rdma@vger.kernel.org;
> > tom@opengridcomputing.com
> > Subject: [PATCH V3] svcrdma: refactor marshalling logic
> >
> > This patch refactors the NFSRDMA server marshalling logic to remove the
> > intermediary map structures. It also fixes an existing bug where the
> > NFSRDMA server was not minding the device fast register page list length
> > limitations.
> >
> > I've also made a git repo available with these patches on top of 3.15-rc7:
> >
> > git://git.linux-nfs.org/projects/swise/linux.git svcrdma-refactor-v3
> >
> > Changes since V2:
> >
> > - fixed logic bug in rdma_read_chunk_frmr() and rdma_read_chunk_lcl()
> >
> > - in rdma_read_chunks(), set the reader function pointer only once since
> > it doesn't change
> >
> > - squashed the patch back into one patch since the previous split wasn't
> > bisectable
> >
> > Changes since V1:
> >
> > - fixed regression for devices that don't support FRMRs (see
> > rdma_read_chunk_lcl())
> >
> > - split patch up for closer review. However I request it be squashed
> > before merging as they is not bisectable, and I think these changes
> > should all be a single commit anyway.
> >
> > Please review, and test if you can. I'd like this to hit 3.16.
> >
> > Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
> > Signed-off-by: Steve Wise <swise@opengridcomputing.com>
> > ---
> >
> > include/linux/sunrpc/svc_rdma.h | 3
> > net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 643 +++++++++++++----------
> > -------
> > net/sunrpc/xprtrdma/svc_rdma_sendto.c | 230 +----------
> > net/sunrpc/xprtrdma/svc_rdma_transport.c | 62 ++-
> > 4 files changed, 332 insertions(+), 606 deletions(-)
> >
> > diff --git a/include/linux/sunrpc/svc_rdma.h
> > b/include/linux/sunrpc/svc_rdma.h index 0b8e3e6..5cf99a0 100644
> > --- a/include/linux/sunrpc/svc_rdma.h
> > +++ b/include/linux/sunrpc/svc_rdma.h
> > @@ -115,14 +115,13 @@ struct svc_rdma_fastreg_mr {
> > struct list_head frmr_list;
> > };
> > struct svc_rdma_req_map {
> > - struct svc_rdma_fastreg_mr *frmr;
> > unsigned long count;
> > union {
> > struct kvec sge[RPCSVC_MAXPAGES];
> > struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES];
> > + unsigned long lkey[RPCSVC_MAXPAGES];
> > };
> > };
> > -#define RDMACTXT_F_FAST_UNREG 1
> > #define RDMACTXT_F_LAST_CTXT 2
> >
> > #define SVCRDMA_DEVCAP_FAST_REG 1 /*
> > fast mr registration */
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > index 8d904e4..52d9f2c 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > @@ -1,4 +1,5 @@
> > /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> > * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> > *
> > * This software is available to you under a choice of one of two @@ -69,7
> > +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
> >
> > /* Set up the XDR head */
> > rqstp->rq_arg.head[0].iov_base = page_address(page);
> > - rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt-
> > >sge[0].length);
> > + rqstp->rq_arg.head[0].iov_len =
> > + min_t(size_t, byte_count, ctxt->sge[0].length);
> > rqstp->rq_arg.len = byte_count;
> > rqstp->rq_arg.buflen = byte_count;
> >
> > @@ -85,7 +87,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
> > page = ctxt->pages[sge_no];
> > put_page(rqstp->rq_pages[sge_no]);
> > rqstp->rq_pages[sge_no] = page;
> > - bc -= min(bc, ctxt->sge[sge_no].length);
> > + bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
> > rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
> > sge_no++;
> > }
> > @@ -113,291 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst
> > *rqstp,
> > rqstp->rq_arg.tail[0].iov_len = 0;
> > }
> >
> > -/* Encode a read-chunk-list as an array of IB SGE
> > - *
> > - * Assumptions:
> > - * - chunk[0]->position points to pages[0] at an offset of 0
> > - * - pages[] is not physically or virtually contiguous and consists of
> > - * PAGE_SIZE elements.
> > - *
> > - * Output:
> > - * - sge array pointing into pages[] array.
> > - * - chunk_sge array specifying sge index and count for each
> > - * chunk in the read list
> > - *
> > - */
> > -static int map_read_chunks(struct svcxprt_rdma *xprt,
> > - struct svc_rqst *rqstp,
> > - struct svc_rdma_op_ctxt *head,
> > - struct rpcrdma_msg *rmsgp,
> > - struct svc_rdma_req_map *rpl_map,
> > - struct svc_rdma_req_map *chl_map,
> > - int ch_count,
> > - int byte_count)
> > +static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
> > {
> > - int sge_no;
> > - int sge_bytes;
> > - int page_off;
> > - int page_no;
> > - int ch_bytes;
> > - int ch_no;
> > - struct rpcrdma_read_chunk *ch;
> > + if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type)
> > ==
> > + RDMA_TRANSPORT_IWARP)
> > + return 1;
> > + else
> > + return min_t(int, sge_count, xprt->sc_max_sge); }
> >
> > - sge_no = 0;
> > - page_no = 0;
> > - page_off = 0;
> > - ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > - ch_no = 0;
> > - ch_bytes = ntohl(ch->rc_target.rs_length);
> > - head->arg.head[0] = rqstp->rq_arg.head[0];
> > - head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > - head->arg.pages = &head->pages[head->count];
> > - head->hdr_count = head->count; /* save count of hdr pages */
> > - head->arg.page_base = 0;
> > - head->arg.page_len = ch_bytes;
> > - head->arg.len = rqstp->rq_arg.len + ch_bytes;
> > - head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
> > - head->count++;
> > - chl_map->ch[0].start = 0;
> > - while (byte_count) {
> > - rpl_map->sge[sge_no].iov_base =
> > - page_address(rqstp->rq_arg.pages[page_no]) +
> > page_off;
> > - sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
> > - rpl_map->sge[sge_no].iov_len = sge_bytes;
> > - /*
> > - * Don't bump head->count here because the same page
> > - * may be used by multiple SGE.
> > - */
> > - head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
> > - rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
> > +typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
> > + struct svc_rqst *rqstp,
> > + struct svc_rdma_op_ctxt *head,
> > + int *page_no,
> > + u32 *page_offset,
> > + u32 rs_handle,
> > + u32 rs_length,
> > + u64 rs_offset,
> > + int last);
> > +
> > +/* Issue an RDMA_READ using the local lkey to map the data sink */
> > +static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
> > + struct svc_rqst *rqstp,
> > + struct svc_rdma_op_ctxt *head,
> > + int *page_no,
> > + u32 *page_offset,
> > + u32 rs_handle,
> > + u32 rs_length,
> > + u64 rs_offset,
> > + int last)
> > +{
> > + struct ib_send_wr read_wr;
> > + int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >>
> > PAGE_SHIFT;
> > + struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
> > + int ret, read, pno;
> > + u32 pg_off = *page_offset;
> > + u32 pg_no = *page_no;
> > +
> > + ctxt->direction = DMA_FROM_DEVICE;
> > + ctxt->read_hdr = head;
> > + pages_needed =
> > + min_t(int, pages_needed, rdma_read_max_sge(xprt,
> > pages_needed));
> > + read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
> > +
> > + for (pno = 0; pno < pages_needed; pno++) {
> > + int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
> > +
> > + head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
> > + head->arg.page_len += len;
> > + head->arg.len += len;
> > + if (!pg_off)
> > + head->count++;
> > + rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
> > rqstp->rq_next_page = rqstp->rq_respages + 1;
> > + ctxt->sge[pno].addr =
> > + ib_dma_map_page(xprt->sc_cm_id->device,
> > + head->arg.pages[pg_no], pg_off,
> > + PAGE_SIZE - pg_off,
> > + DMA_FROM_DEVICE);
> > + ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
> > + ctxt->sge[pno].addr);
> > + if (ret)
> > + goto err;
> > + atomic_inc(&xprt->sc_dma_used);
> >
> > - byte_count -= sge_bytes;
> > - ch_bytes -= sge_bytes;
> > - sge_no++;
> > - /*
> > - * If all bytes for this chunk have been mapped to an
> > - * SGE, move to the next SGE
> > - */
> > - if (ch_bytes == 0) {
> > - chl_map->ch[ch_no].count =
> > - sge_no - chl_map->ch[ch_no].start;
> > - ch_no++;
> > - ch++;
> > - chl_map->ch[ch_no].start = sge_no;
> > - ch_bytes = ntohl(ch->rc_target.rs_length);
> > - /* If bytes remaining account for next chunk */
> > - if (byte_count) {
> > - head->arg.page_len += ch_bytes;
> > - head->arg.len += ch_bytes;
> > - head->arg.buflen += ch_bytes;
> > - }
> > + /* The lkey here is either a local dma lkey or a dma_mr lkey
> > */
> > + ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
> > + ctxt->sge[pno].length = len;
> > + ctxt->count++;
> > +
> > + /* adjust offset and wrap to next page if needed */
> > + pg_off += len;
> > + if (pg_off == PAGE_SIZE) {
> > + pg_off = 0;
> > + pg_no++;
> > }
> > - /*
> > - * If this SGE consumed all of the page, move to the
> > - * next page
> > - */
> > - if ((sge_bytes + page_off) == PAGE_SIZE) {
> > - page_no++;
> > - page_off = 0;
> > - /*
> > - * If there are still bytes left to map, bump
> > - * the page count
> > - */
> > - if (byte_count)
> > - head->count++;
> > - } else
> > - page_off += sge_bytes;
> > + rs_length -= len;
> > }
> > - BUG_ON(byte_count != 0);
> > - return sge_no;
> > +
> > + if (last && rs_length == 0)
> > + set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > + else
> > + clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > +
> > + memset(&read_wr, 0, sizeof(read_wr));
> > + read_wr.wr_id = (unsigned long)ctxt;
> > + read_wr.opcode = IB_WR_RDMA_READ;
> > + ctxt->wr_op = read_wr.opcode;
> > + read_wr.send_flags = IB_SEND_SIGNALED;
> > + read_wr.wr.rdma.rkey = rs_handle;
> > + read_wr.wr.rdma.remote_addr = rs_offset;
> > + read_wr.sg_list = ctxt->sge;
> > + read_wr.num_sge = pages_needed;
> > +
> > + ret = svc_rdma_send(xprt, &read_wr);
> > + if (ret) {
> > + pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
> > + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > + goto err;
> > + }
> > +
> > + /* return current location in page array */
> > + *page_no = pg_no;
> > + *page_offset = pg_off;
> > + ret = read;
> > + atomic_inc(&rdma_stat_read);
> > + return ret;
> > + err:
> > + svc_rdma_unmap_dma(ctxt);
> > + svc_rdma_put_context(ctxt, 0);
> > + return ret;
> > }
> >
> > -/* Map a read-chunk-list to an XDR and fast register the page-list.
> > - *
> > - * Assumptions:
> > - * - chunk[0] position points to pages[0] at an offset of 0
> > - * - pages[] will be made physically contiguous by creating a one-off
> > memory
> > - * region using the fastreg verb.
> > - * - byte_count is # of bytes in read-chunk-list
> > - * - ch_count is # of chunks in read-chunk-list
> > - *
> > - * Output:
> > - * - sge array pointing into pages[] array.
> > - * - chunk_sge array specifying sge index and count for each
> > - * chunk in the read list
> > - */
> > -static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
> > +/* Issue an RDMA_READ using an FRMR to map the data sink */ static int
> > +rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
> > struct svc_rqst *rqstp,
> > struct svc_rdma_op_ctxt *head,
> > - struct rpcrdma_msg *rmsgp,
> > - struct svc_rdma_req_map *rpl_map,
> > - struct svc_rdma_req_map *chl_map,
> > - int ch_count,
> > - int byte_count)
> > + int *page_no,
> > + u32 *page_offset,
> > + u32 rs_handle,
> > + u32 rs_length,
> > + u64 rs_offset,
> > + int last)
> > {
> > - int page_no;
> > - int ch_no;
> > - u32 offset;
> > - struct rpcrdma_read_chunk *ch;
> > - struct svc_rdma_fastreg_mr *frmr;
> > - int ret = 0;
> > + struct ib_send_wr read_wr;
> > + struct ib_send_wr inv_wr;
> > + struct ib_send_wr fastreg_wr;
> > + u8 key;
> > + int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >>
> > PAGE_SHIFT;
> > + struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
> > + struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
> > + int ret, read, pno;
> > + u32 pg_off = *page_offset;
> > + u32 pg_no = *page_no;
> >
> > - frmr = svc_rdma_get_frmr(xprt);
> > if (IS_ERR(frmr))
> > return -ENOMEM;
> >
> > - head->frmr = frmr;
> > - head->arg.head[0] = rqstp->rq_arg.head[0];
> > - head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > - head->arg.pages = &head->pages[head->count];
> > - head->hdr_count = head->count; /* save count of hdr pages */
> > - head->arg.page_base = 0;
> > - head->arg.page_len = byte_count;
> > - head->arg.len = rqstp->rq_arg.len + byte_count;
> > - head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
> > + ctxt->direction = DMA_FROM_DEVICE;
> > + ctxt->frmr = frmr;
> > + pages_needed = min_t(int, pages_needed, xprt-
> > >sc_frmr_pg_list_len);
> > + read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
> >
> > - /* Fast register the page list */
> > - frmr->kva = page_address(rqstp->rq_arg.pages[0]);
> > + frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
> > frmr->direction = DMA_FROM_DEVICE;
> > frmr->access_flags =
> > (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
> > - frmr->map_len = byte_count;
> > - frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
> > - for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
> > - frmr->page_list->page_list[page_no] =
> > + frmr->map_len = pages_needed << PAGE_SHIFT;
> > + frmr->page_list_len = pages_needed;
> > +
> > + for (pno = 0; pno < pages_needed; pno++) {
> > + int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
> > +
> > + head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
> > + head->arg.page_len += len;
> > + head->arg.len += len;
> > + if (!pg_off)
> > + head->count++;
> > + rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
> > + rqstp->rq_next_page = rqstp->rq_respages + 1;
> > + frmr->page_list->page_list[pno] =
> > ib_dma_map_page(xprt->sc_cm_id->device,
> > - rqstp->rq_arg.pages[page_no], 0,
> > + head->arg.pages[pg_no], 0,
> > PAGE_SIZE, DMA_FROM_DEVICE);
> > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > - frmr->page_list-
> > >page_list[page_no]))
> > - goto fatal_err;
> > + ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
> > + frmr->page_list->page_list[pno]);
> > + if (ret)
> > + goto err;
> > atomic_inc(&xprt->sc_dma_used);
> > - head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
> > - }
> > - head->count += page_no;
> > -
> > - /* rq_respages points one past arg pages */
> > - rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
> > - rqstp->rq_next_page = rqstp->rq_respages + 1;
> >
> > - /* Create the reply and chunk maps */
> > - offset = 0;
> > - ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > - for (ch_no = 0; ch_no < ch_count; ch_no++) {
> > - int len = ntohl(ch->rc_target.rs_length);
> > - rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
> > - rpl_map->sge[ch_no].iov_len = len;
> > - chl_map->ch[ch_no].count = 1;
> > - chl_map->ch[ch_no].start = ch_no;
> > - offset += len;
> > - ch++;
> > + /* adjust offset and wrap to next page if needed */
> > + pg_off += len;
> > + if (pg_off == PAGE_SIZE) {
> > + pg_off = 0;
> > + pg_no++;
> > + }
> > + rs_length -= len;
> > }
> >
> > - ret = svc_rdma_fastreg(xprt, frmr);
> > - if (ret)
> > - goto fatal_err;
> > -
> > - return ch_no;
> > -
> > - fatal_err:
> > - printk("svcrdma: error fast registering xdr for xprt %p", xprt);
> > - svc_rdma_put_frmr(xprt, frmr);
> > - return -EIO;
> > -}
> > -
> > -static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
> > - struct svc_rdma_op_ctxt *ctxt,
> > - struct svc_rdma_fastreg_mr *frmr,
> > - struct kvec *vec,
> > - u64 *sgl_offset,
> > - int count)
> > -{
> > - int i;
> > - unsigned long off;
> > + if (last && rs_length == 0)
> > + set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > + else
> > + clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> >
> > - ctxt->count = count;
> > - ctxt->direction = DMA_FROM_DEVICE;
> > - for (i = 0; i < count; i++) {
> > - ctxt->sge[i].length = 0; /* in case map fails */
> > - if (!frmr) {
> > - BUG_ON(!virt_to_page(vec[i].iov_base));
> > - off = (unsigned long)vec[i].iov_base &
> > ~PAGE_MASK;
> > - ctxt->sge[i].addr =
> > - ib_dma_map_page(xprt->sc_cm_id->device,
> > -
> > virt_to_page(vec[i].iov_base),
> > - off,
> > - vec[i].iov_len,
> > - DMA_FROM_DEVICE);
> > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > - ctxt->sge[i].addr))
> > - return -EINVAL;
> > - ctxt->sge[i].lkey = xprt->sc_dma_lkey;
> > - atomic_inc(&xprt->sc_dma_used);
> > - } else {
> > - ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
> > - ctxt->sge[i].lkey = frmr->mr->lkey;
> > - }
> > - ctxt->sge[i].length = vec[i].iov_len;
> > - *sgl_offset = *sgl_offset + vec[i].iov_len;
> > + /* Bump the key */
> > + key = (u8)(frmr->mr->lkey & 0x000000FF);
> > + ib_update_fast_reg_key(frmr->mr, ++key);
> > +
> > + ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
> > + ctxt->sge[0].lkey = frmr->mr->lkey;
> > + ctxt->sge[0].length = read;
> > + ctxt->count = 1;
> > + ctxt->read_hdr = head;
> > +
> > + /* Prepare FASTREG WR */
> > + memset(&fastreg_wr, 0, sizeof(fastreg_wr));
> > + fastreg_wr.opcode = IB_WR_FAST_REG_MR;
> > + fastreg_wr.send_flags = IB_SEND_SIGNALED;
> > + fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
> > + fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
> > + fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
> > + fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
> > + fastreg_wr.wr.fast_reg.length = frmr->map_len;
> > + fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
> > + fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
> > + fastreg_wr.next = &read_wr;
> > +
> > + /* Prepare RDMA_READ */
> > + memset(&read_wr, 0, sizeof(read_wr));
> > + read_wr.send_flags = IB_SEND_SIGNALED;
> > + read_wr.wr.rdma.rkey = rs_handle;
> > + read_wr.wr.rdma.remote_addr = rs_offset;
> > + read_wr.sg_list = ctxt->sge;
> > + read_wr.num_sge = 1;
> > + if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
> > + read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
> > + read_wr.wr_id = (unsigned long)ctxt;
> > + read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
> > + } else {
> > + read_wr.opcode = IB_WR_RDMA_READ;
> > + read_wr.next = &inv_wr;
> > + /* Prepare invalidate */
> > + memset(&inv_wr, 0, sizeof(inv_wr));
> > + inv_wr.wr_id = (unsigned long)ctxt;
> > + inv_wr.opcode = IB_WR_LOCAL_INV;
> > + inv_wr.send_flags = IB_SEND_SIGNALED;
>
> Change this to inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
>
> > + inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
> > + }
> > + ctxt->wr_op = read_wr.opcode;
> > +
> > + /* Post the chain */
> > + ret = svc_rdma_send(xprt, &fastreg_wr);
> > + if (ret) {
> > + pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
> > + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > + goto err;
> > }
> > - return 0;
> > -}
> >
> > -static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) -{
> > - if ((rdma_node_get_transport(xprt->sc_cm_id->device-
> > >node_type) ==
> > - RDMA_TRANSPORT_IWARP) &&
> > - sge_count > 1)
> > - return 1;
> > - else
> > - return min_t(int, sge_count, xprt->sc_max_sge);
> > + /* return current location in page array */
> > + *page_no = pg_no;
> > + *page_offset = pg_off;
> > + ret = read;
> > + atomic_inc(&rdma_stat_read);
> > + return ret;
> > + err:
> > + svc_rdma_unmap_dma(ctxt);
> > + svc_rdma_put_context(ctxt, 0);
> > + svc_rdma_put_frmr(xprt, frmr);
> > + return ret;
> > }
> >
> > -/*
> > - * Use RDMA_READ to read data from the advertised client buffer into the
> > - * XDR stream starting at rq_arg.head[0].iov_base.
> > - * Each chunk in the array
> > - * contains the following fields:
> > - * discrim - '1', This isn't used for data placement
> > - * position - The xdr stream offset (the same for every chunk)
> > - * handle - RMR for client memory region
> > - * length - data transfer length
> > - * offset - 64 bit tagged offset in remote memory region
> > - *
> > - * On our side, we need to read into a pagelist. The first page immediately
> > - * follows the RPC header.
> > - *
> > - * This function returns:
> > - * 0 - No error and no read-list found.
> > - *
> > - * 1 - Successful read-list processing. The data is not yet in
> > - * the pagelist and therefore the RPC request must be deferred. The
> > - * I/O completion will enqueue the transport again and
> > - * svc_rdma_recvfrom will complete the request.
> > - *
> > - * <0 - Error processing/posting read-list.
> > - *
> > - * NOTE: The ctxt must not be touched after the last WR has been posted
> > - * because the I/O completion processing may occur on another
> > - * processor and free / modify the context. Ne touche pas!
> > - */
> > -static int rdma_read_xdr(struct svcxprt_rdma *xprt,
> > - struct rpcrdma_msg *rmsgp,
> > - struct svc_rqst *rqstp,
> > - struct svc_rdma_op_ctxt *hdr_ctxt)
> > +static int rdma_read_chunks(struct svcxprt_rdma *xprt,
> > + struct rpcrdma_msg *rmsgp,
> > + struct svc_rqst *rqstp,
> > + struct svc_rdma_op_ctxt *head)
> > {
> > - struct ib_send_wr read_wr;
> > - struct ib_send_wr inv_wr;
> > - int err = 0;
> > - int ch_no;
> > - int ch_count;
> > - int byte_count;
> > - int sge_count;
> > - u64 sgl_offset;
> > + int page_no, ch_count, ret;
> > struct rpcrdma_read_chunk *ch;
> > - struct svc_rdma_op_ctxt *ctxt = NULL;
> > - struct svc_rdma_req_map *rpl_map;
> > - struct svc_rdma_req_map *chl_map;
> > + u32 page_offset, byte_count;
> > + u64 rs_offset;
> > + rdma_reader_fn reader;
> >
> > /* If no read list is present, return 0 */
> > ch = svc_rdma_get_read_chunk(rmsgp);
> > @@ -408,122 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma
> > *xprt,
> > if (ch_count > RPCSVC_MAXPAGES)
> > return -EINVAL;
> >
> > - /* Allocate temporary reply and chunk maps */
> > - rpl_map = svc_rdma_get_req_map();
> > - chl_map = svc_rdma_get_req_map();
> > + /* The request is completed when the RDMA_READs complete. The
> > + * head context keeps all the pages that comprise the
> > + * request.
> > + */
> > + head->arg.head[0] = rqstp->rq_arg.head[0];
> > + head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > + head->arg.pages = &head->pages[head->count];
> > + head->hdr_count = head->count;
> > + head->arg.page_base = 0;
> > + head->arg.page_len = 0;
> > + head->arg.len = rqstp->rq_arg.len;
> > + head->arg.buflen = rqstp->rq_arg.buflen;
> >
> > - if (!xprt->sc_frmr_pg_list_len)
> > - sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
> > - rpl_map, chl_map, ch_count,
> > - byte_count);
> > + /* Use FRMR if supported */
> > + if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
> > + reader = rdma_read_chunk_frmr;
> > else
> > - sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt,
> > rmsgp,
> > - rpl_map, chl_map, ch_count,
> > - byte_count);
> > - if (sge_count < 0) {
> > - err = -EIO;
> > - goto out;
> > - }
> > -
> > - sgl_offset = 0;
> > - ch_no = 0;
> > + reader = rdma_read_chunk_lcl;
> >
> > + page_no = 0; page_offset = 0;
> > for (ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > - ch->rc_discrim != 0; ch++, ch_no++) {
> > - u64 rs_offset;
> > -next_sge:
> > - ctxt = svc_rdma_get_context(xprt);
> > - ctxt->direction = DMA_FROM_DEVICE;
> > - ctxt->frmr = hdr_ctxt->frmr;
> > - ctxt->read_hdr = NULL;
> > - clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > - clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > + ch->rc_discrim != 0; ch++) {
> >
> > - /* Prepare READ WR */
> > - memset(&read_wr, 0, sizeof read_wr);
> > - read_wr.wr_id = (unsigned long)ctxt;
> > - read_wr.opcode = IB_WR_RDMA_READ;
> > - ctxt->wr_op = read_wr.opcode;
> > - read_wr.send_flags = IB_SEND_SIGNALED;
> > - read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
> > xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
> > &rs_offset);
> > - read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
> > - read_wr.sg_list = ctxt->sge;
> > - read_wr.num_sge =
> > - rdma_read_max_sge(xprt, chl_map-
> > >ch[ch_no].count);
> > - err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
> > - &rpl_map->sge[chl_map-
> > >ch[ch_no].start],
> > - &sgl_offset,
> > - read_wr.num_sge);
> > - if (err) {
> > - svc_rdma_unmap_dma(ctxt);
> > - svc_rdma_put_context(ctxt, 0);
> > - goto out;
> > - }
> > - if (((ch+1)->rc_discrim == 0) &&
> > - (read_wr.num_sge == chl_map->ch[ch_no].count)) {
> > - /*
> > - * Mark the last RDMA_READ with a bit to
> > - * indicate all RPC data has been fetched from
> > - * the client and the RPC needs to be enqueued.
> > - */
> > - set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > - if (hdr_ctxt->frmr) {
> > - set_bit(RDMACTXT_F_FAST_UNREG, &ctxt-
> > >flags);
> > - /*
> > - * Invalidate the local MR used to map the
> > data
> > - * sink.
> > - */
> > - if (xprt->sc_dev_caps &
> > - SVCRDMA_DEVCAP_READ_W_INV) {
> > - read_wr.opcode =
> > -
> > IB_WR_RDMA_READ_WITH_INV;
> > - ctxt->wr_op = read_wr.opcode;
> > - read_wr.ex.invalidate_rkey =
> > - ctxt->frmr->mr->lkey;
> > - } else {
> > - /* Prepare INVALIDATE WR */
> > - memset(&inv_wr, 0, sizeof inv_wr);
> > - inv_wr.opcode = IB_WR_LOCAL_INV;
> > - inv_wr.send_flags =
> > IB_SEND_SIGNALED;
> > - inv_wr.ex.invalidate_rkey =
> > - hdr_ctxt->frmr->mr->lkey;
> > - read_wr.next = &inv_wr;
> > - }
> > - }
> > - ctxt->read_hdr = hdr_ctxt;
> > - }
> > - /* Post the read */
> > - err = svc_rdma_send(xprt, &read_wr);
> > - if (err) {
> > - printk(KERN_ERR "svcrdma: Error %d posting
> > RDMA_READ\n",
> > - err);
> > - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > - svc_rdma_unmap_dma(ctxt);
> > - svc_rdma_put_context(ctxt, 0);
> > - goto out;
> > + byte_count = ntohl(ch->rc_target.rs_length);
> > +
> > + while (byte_count > 0) {
> > + ret = reader(xprt, rqstp, head,
> > + &page_no, &page_offset,
> > + ntohl(ch->rc_target.rs_handle),
> > + byte_count, rs_offset,
> > + ((ch+1)->rc_discrim == 0) /* last */
> > + );
> > + if (ret < 0)
> > + goto err;
> > + byte_count -= ret;
> > + rs_offset += ret;
> > + head->arg.buflen += ret;
> > }
> > - atomic_inc(&rdma_stat_read);
> > -
> > - if (read_wr.num_sge < chl_map->ch[ch_no].count) {
> > - chl_map->ch[ch_no].count -= read_wr.num_sge;
> > - chl_map->ch[ch_no].start += read_wr.num_sge;
> > - goto next_sge;
> > - }
> > - sgl_offset = 0;
> > - err = 1;
> > }
> > -
> > - out:
> > - svc_rdma_put_req_map(rpl_map);
> > - svc_rdma_put_req_map(chl_map);
> > -
> > + ret = 1;
> > + err:
> > /* Detach arg pages. svc_recv will replenish them */
> > - for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages;
> > ch_no++)
> > - rqstp->rq_pages[ch_no] = NULL;
> > + for (page_no = 0;
> > + &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
> > + rqstp->rq_pages[page_no] = NULL;
> >
> > - return err;
> > + return ret;
> > }
> >
> > static int rdma_read_complete(struct svc_rqst *rqstp, @@ -595,13 +504,9
> > @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> > struct svc_rdma_op_ctxt,
> > dto_q);
> > list_del_init(&ctxt->dto_q);
> > - }
> > - if (ctxt) {
> > spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
> > return rdma_read_complete(rqstp, ctxt);
> > - }
> > -
> > - if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
> > + } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
> > ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
> > struct svc_rdma_op_ctxt,
> > dto_q);
> > @@ -621,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> > if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
> > goto close_out;
> >
> > - BUG_ON(ret);
> > goto out;
> > }
> > dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p,
> > status=%d\n", @@ -644,12 +548,11 @@ int svc_rdma_recvfrom(struct
> > svc_rqst *rqstp)
> > }
> >
> > /* Read read-list data. */
> > - ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
> > + ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
> > if (ret > 0) {
> > /* read-list posted, defer until data received from client. */
> > goto defer;
> > - }
> > - if (ret < 0) {
> > + } else if (ret < 0) {
> > /* Post of read-list failed, free context. */
> > svc_rdma_put_context(ctxt, 1);
> > return 0;
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > index 7e024a5..49fd21a 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > @@ -1,4 +1,5 @@
> > /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> > * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> > *
> > * This software is available to you under a choice of one of two @@ -49,152
> > +50,6 @@
> >
> > #define RPCDBG_FACILITY RPCDBG_SVCXPRT
> >
> > -/* Encode an XDR as an array of IB SGE
> > - *
> > - * Assumptions:
> > - * - head[0] is physically contiguous.
> > - * - tail[0] is physically contiguous.
> > - * - pages[] is not physically or virtually contiguous and consists of
> > - * PAGE_SIZE elements.
> > - *
> > - * Output:
> > - * SGE[0] reserved for RCPRDMA header
> > - * SGE[1] data from xdr->head[]
> > - * SGE[2..sge_count-2] data from xdr->pages[]
> > - * SGE[sge_count-1] data from xdr->tail.
> > - *
> > - * The max SGE we need is the length of the XDR / pagesize + one for
> > - * head + one for tail + one for RPCRDMA header. Since
> > RPCSVC_MAXPAGES
> > - * reserves a page for both the request and the reply header, and this
> > - * array is only concerned with the reply we are assured that we have
> > - * on extra page for the RPCRMDA header.
> > - */
> > -static int fast_reg_xdr(struct svcxprt_rdma *xprt,
> > - struct xdr_buf *xdr,
> > - struct svc_rdma_req_map *vec)
> > -{
> > - int sge_no;
> > - u32 sge_bytes;
> > - u32 page_bytes;
> > - u32 page_off;
> > - int page_no = 0;
> > - u8 *frva;
> > - struct svc_rdma_fastreg_mr *frmr;
> > -
> > - frmr = svc_rdma_get_frmr(xprt);
> > - if (IS_ERR(frmr))
> > - return -ENOMEM;
> > - vec->frmr = frmr;
> > -
> > - /* Skip the RPCRDMA header */
> > - sge_no = 1;
> > -
> > - /* Map the head. */
> > - frva = (void *)((unsigned long)(xdr->head[0].iov_base) &
> > PAGE_MASK);
> > - vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
> > - vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
> > - vec->count = 2;
> > - sge_no++;
> > -
> > - /* Map the XDR head */
> > - frmr->kva = frva;
> > - frmr->direction = DMA_TO_DEVICE;
> > - frmr->access_flags = 0;
> > - frmr->map_len = PAGE_SIZE;
> > - frmr->page_list_len = 1;
> > - page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
> > - frmr->page_list->page_list[page_no] =
> > - ib_dma_map_page(xprt->sc_cm_id->device,
> > - virt_to_page(xdr->head[0].iov_base),
> > - page_off,
> > - PAGE_SIZE - page_off,
> > - DMA_TO_DEVICE);
> > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > - frmr->page_list->page_list[page_no]))
> > - goto fatal_err;
> > - atomic_inc(&xprt->sc_dma_used);
> > -
> > - /* Map the XDR page list */
> > - page_off = xdr->page_base;
> > - page_bytes = xdr->page_len + page_off;
> > - if (!page_bytes)
> > - goto encode_tail;
> > -
> > - /* Map the pages */
> > - vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
> > - vec->sge[sge_no].iov_len = page_bytes;
> > - sge_no++;
> > - while (page_bytes) {
> > - struct page *page;
> > -
> > - page = xdr->pages[page_no++];
> > - sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE -
> > page_off));
> > - page_bytes -= sge_bytes;
> > -
> > - frmr->page_list->page_list[page_no] =
> > - ib_dma_map_page(xprt->sc_cm_id->device,
> > - page, page_off,
> > - sge_bytes, DMA_TO_DEVICE);
> > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > - frmr->page_list-
> > >page_list[page_no]))
> > - goto fatal_err;
> > -
> > - atomic_inc(&xprt->sc_dma_used);
> > - page_off = 0; /* reset for next time through loop */
> > - frmr->map_len += PAGE_SIZE;
> > - frmr->page_list_len++;
> > - }
> > - vec->count++;
> > -
> > - encode_tail:
> > - /* Map tail */
> > - if (0 == xdr->tail[0].iov_len)
> > - goto done;
> > -
> > - vec->count++;
> > - vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
> > -
> > - if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
> > - ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
> > - /*
> > - * If head and tail use the same page, we don't need
> > - * to map it again.
> > - */
> > - vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
> > - } else {
> > - void *va;
> > -
> > - /* Map another page for the tail */
> > - page_off = (unsigned long)xdr->tail[0].iov_base &
> > ~PAGE_MASK;
> > - va = (void *)((unsigned long)xdr->tail[0].iov_base &
> > PAGE_MASK);
> > - vec->sge[sge_no].iov_base = frva + frmr->map_len +
> > page_off;
> > -
> > - frmr->page_list->page_list[page_no] =
> > - ib_dma_map_page(xprt->sc_cm_id->device,
> > virt_to_page(va),
> > - page_off,
> > - PAGE_SIZE,
> > - DMA_TO_DEVICE);
> > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > - frmr->page_list-
> > >page_list[page_no]))
> > - goto fatal_err;
> > - atomic_inc(&xprt->sc_dma_used);
> > - frmr->map_len += PAGE_SIZE;
> > - frmr->page_list_len++;
> > - }
> > -
> > - done:
> > - if (svc_rdma_fastreg(xprt, frmr))
> > - goto fatal_err;
> > -
> > - return 0;
> > -
> > - fatal_err:
> > - printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
> > - vec->frmr = NULL;
> > - svc_rdma_put_frmr(xprt, frmr);
> > - return -EIO;
> > -}
> > -
> > static int map_xdr(struct svcxprt_rdma *xprt,
> > struct xdr_buf *xdr,
> > struct svc_rdma_req_map *vec)
> > @@ -208,9 +63,6 @@ static int map_xdr(struct svcxprt_rdma *xprt,
> > BUG_ON(xdr->len !=
> > (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
> >
> > - if (xprt->sc_frmr_pg_list_len)
> > - return fast_reg_xdr(xprt, xdr, vec);
> > -
> > /* Skip the first sge, this is for the RPCRDMA header */
> > sge_no = 1;
> >
> > @@ -282,8 +134,6 @@ static dma_addr_t dma_map_xdr(struct
> > svcxprt_rdma *xprt, }
> >
> > /* Assumptions:
> > - * - We are using FRMR
> > - * - or -
> > * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
> > */
> > static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, @@ -
> > 327,23 +177,16 @@ static int send_write(struct svcxprt_rdma *xprt, struct
> > svc_rqst *rqstp,
> > sge_bytes = min_t(size_t,
> > bc, vec->sge[xdr_sge_no].iov_len-sge_off);
> > sge[sge_no].length = sge_bytes;
> > - if (!vec->frmr) {
> > - sge[sge_no].addr =
> > - dma_map_xdr(xprt, &rqstp->rq_res,
> > xdr_off,
> > - sge_bytes, DMA_TO_DEVICE);
> > - xdr_off += sge_bytes;
> > - if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > - sge[sge_no].addr))
> > - goto err;
> > - atomic_inc(&xprt->sc_dma_used);
> > - sge[sge_no].lkey = xprt->sc_dma_lkey;
> > - } else {
> > - sge[sge_no].addr = (unsigned long)
> > - vec->sge[xdr_sge_no].iov_base + sge_off;
> > - sge[sge_no].lkey = vec->frmr->mr->lkey;
> > - }
> > + sge[sge_no].addr =
> > + dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
> > + sge_bytes, DMA_TO_DEVICE);
> > + xdr_off += sge_bytes;
> > + if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > + sge[sge_no].addr))
> > + goto err;
> > + atomic_inc(&xprt->sc_dma_used);
> > + sge[sge_no].lkey = xprt->sc_dma_lkey;
> > ctxt->count++;
> > - ctxt->frmr = vec->frmr;
> > sge_off = 0;
> > sge_no++;
> > xdr_sge_no++;
> > @@ -369,7 +212,6 @@ static int send_write(struct svcxprt_rdma *xprt, struct
> > svc_rqst *rqstp,
> > return 0;
> > err:
> > svc_rdma_unmap_dma(ctxt);
> > - svc_rdma_put_frmr(xprt, vec->frmr);
> > svc_rdma_put_context(ctxt, 0);
> > /* Fatal error, close transport */
> > return -EIO;
> > @@ -397,10 +239,7 @@ static int send_write_chunks(struct svcxprt_rdma
> > *xprt,
> > res_ary = (struct rpcrdma_write_array *)
> > &rdma_resp->rm_body.rm_chunks[1];
> >
> > - if (vec->frmr)
> > - max_write = vec->frmr->map_len;
> > - else
> > - max_write = xprt->sc_max_sge * PAGE_SIZE;
> > + max_write = xprt->sc_max_sge * PAGE_SIZE;
> >
> > /* Write chunks start at the pagelist */
> > for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; @@ -
> > 472,10 +311,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
> > res_ary = (struct rpcrdma_write_array *)
> > &rdma_resp->rm_body.rm_chunks[2];
> >
> > - if (vec->frmr)
> > - max_write = vec->frmr->map_len;
> > - else
> > - max_write = xprt->sc_max_sge * PAGE_SIZE;
> > + max_write = xprt->sc_max_sge * PAGE_SIZE;
> >
> > /* xdr offset starts at RPC message */
> > nchunks = ntohl(arg_ary->wc_nchunks);
> > @@ -545,7 +381,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > int byte_count)
> > {
> > struct ib_send_wr send_wr;
> > - struct ib_send_wr inv_wr;
> > int sge_no;
> > int sge_bytes;
> > int page_no;
> > @@ -559,7 +394,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > "svcrdma: could not post a receive buffer, err=%d."
> > "Closing transport %p.\n", ret, rdma);
> > set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
> > - svc_rdma_put_frmr(rdma, vec->frmr);
> > svc_rdma_put_context(ctxt, 0);
> > return -ENOTCONN;
> > }
> > @@ -567,11 +401,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > /* Prepare the context */
> > ctxt->pages[0] = page;
> > ctxt->count = 1;
> > - ctxt->frmr = vec->frmr;
> > - if (vec->frmr)
> > - set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > - else
> > - clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> >
> > /* Prepare the SGE for the RPCRDMA Header */
> > ctxt->sge[0].lkey = rdma->sc_dma_lkey; @@ -590,21 +419,15 @@
> > static int send_reply(struct svcxprt_rdma *rdma,
> > int xdr_off = 0;
> > sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len,
> > byte_count);
> > byte_count -= sge_bytes;
> > - if (!vec->frmr) {
> > - ctxt->sge[sge_no].addr =
> > - dma_map_xdr(rdma, &rqstp->rq_res,
> > xdr_off,
> > - sge_bytes, DMA_TO_DEVICE);
> > - xdr_off += sge_bytes;
> > - if (ib_dma_mapping_error(rdma->sc_cm_id->device,
> > - ctxt->sge[sge_no].addr))
> > - goto err;
> > - atomic_inc(&rdma->sc_dma_used);
> > - ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
> > - } else {
> > - ctxt->sge[sge_no].addr = (unsigned long)
> > - vec->sge[sge_no].iov_base;
> > - ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
> > - }
> > + ctxt->sge[sge_no].addr =
> > + dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
> > + sge_bytes, DMA_TO_DEVICE);
> > + xdr_off += sge_bytes;
> > + if (ib_dma_mapping_error(rdma->sc_cm_id->device,
> > + ctxt->sge[sge_no].addr))
> > + goto err;
> > + atomic_inc(&rdma->sc_dma_used);
> > + ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
> > ctxt->sge[sge_no].length = sge_bytes;
> > }
> > BUG_ON(byte_count != 0);
> > @@ -627,6 +450,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > ctxt->sge[page_no+1].length = 0;
> > }
> > rqstp->rq_next_page = rqstp->rq_respages + 1;
> > +
> > BUG_ON(sge_no > rdma->sc_max_sge);
> > memset(&send_wr, 0, sizeof send_wr);
> > ctxt->wr_op = IB_WR_SEND;
> > @@ -635,15 +459,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> > send_wr.num_sge = sge_no;
> > send_wr.opcode = IB_WR_SEND;
> > send_wr.send_flags = IB_SEND_SIGNALED;
> > - if (vec->frmr) {
> > - /* Prepare INVALIDATE WR */
> > - memset(&inv_wr, 0, sizeof inv_wr);
> > - inv_wr.opcode = IB_WR_LOCAL_INV;
> > - inv_wr.send_flags = IB_SEND_SIGNALED;
> > - inv_wr.ex.invalidate_rkey =
> > - vec->frmr->mr->lkey;
> > - send_wr.next = &inv_wr;
> > - }
> >
> > ret = svc_rdma_send(rdma, &send_wr);
> > if (ret)
> > @@ -653,7 +468,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >
> > err:
> > svc_rdma_unmap_dma(ctxt);
> > - svc_rdma_put_frmr(rdma, vec->frmr);
> > svc_rdma_put_context(ctxt, 1);
> > return -EIO;
> > }
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > index 25688fa..2c5b201 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > @@ -1,4 +1,5 @@
> > /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> > * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
> > *
> > * This software is available to you under a choice of one of two @@ -160,7
> > +161,6 @@ struct svc_rdma_req_map *svc_rdma_get_req_map(void)
> > schedule_timeout_uninterruptible(msecs_to_jiffies(500));
> > }
> > map->count = 0;
> > - map->frmr = NULL;
> > return map;
> > }
> >
> > @@ -336,22 +336,21 @@ static void process_context(struct svcxprt_rdma
> > *xprt,
> >
> > switch (ctxt->wr_op) {
> > case IB_WR_SEND:
> > - if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
> > - svc_rdma_put_frmr(xprt, ctxt->frmr);
> > + BUG_ON(ctxt->frmr);
> > svc_rdma_put_context(ctxt, 1);
> > break;
> >
> > case IB_WR_RDMA_WRITE:
> > + BUG_ON(ctxt->frmr);
> > svc_rdma_put_context(ctxt, 0);
> > break;
> >
> > case IB_WR_RDMA_READ:
> > case IB_WR_RDMA_READ_WITH_INV:
> > + svc_rdma_put_frmr(xprt, ctxt->frmr);
> > if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
> > struct svc_rdma_op_ctxt *read_hdr = ctxt-
> > >read_hdr;
> > BUG_ON(!read_hdr);
> > - if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt-
> > >flags))
> > - svc_rdma_put_frmr(xprt, ctxt->frmr);
> > spin_lock_bh(&xprt->sc_rq_dto_lock);
> > set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
> > list_add_tail(&read_hdr->dto_q,
> > @@ -363,6 +362,7 @@ static void process_context(struct svcxprt_rdma
> > *xprt,
> > break;
> >
> > default:
> > + BUG_ON(1);
> > printk(KERN_ERR "svcrdma: unexpected completion type, "
> > "opcode=%d\n",
> > ctxt->wr_op);
> > @@ -378,29 +378,42 @@ static void process_context(struct svcxprt_rdma
> > *xprt, static void sq_cq_reap(struct svcxprt_rdma *xprt) {
> > struct svc_rdma_op_ctxt *ctxt = NULL;
> > - struct ib_wc wc;
> > + struct ib_wc wc_a[6];
> > + struct ib_wc *wc;
> > struct ib_cq *cq = xprt->sc_sq_cq;
> > int ret;
> >
> > + memset(wc_a, 0, sizeof(wc_a));
> > +
> > if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
> > return;
> >
> > ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> > atomic_inc(&rdma_stat_sq_poll);
> > - while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
> > - if (wc.status != IB_WC_SUCCESS)
> > - /* Close the transport */
> > - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > + while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
> > + int i;
> >
> > - /* Decrement used SQ WR count */
> > - atomic_dec(&xprt->sc_sq_count);
> > - wake_up(&xprt->sc_send_wait);
> > + for (i = 0; i < ret; i++) {
> > + wc = &wc_a[i];
> > + if (wc->status != IB_WC_SUCCESS) {
> > + dprintk("svcrdma: sq wc err status %d\n",
> > + wc->status);
> >
> > - ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
> > - if (ctxt)
> > - process_context(xprt, ctxt);
> > + /* Close the transport */
> > + set_bit(XPT_CLOSE, &xprt-
> > >sc_xprt.xpt_flags);
> > + }
> >
> > - svc_xprt_put(&xprt->sc_xprt);
> > + /* Decrement used SQ WR count */
> > + atomic_dec(&xprt->sc_sq_count);
> > + wake_up(&xprt->sc_send_wait);
> > +
> > + ctxt = (struct svc_rdma_op_ctxt *)
> > + (unsigned long)wc->wr_id;
> > + if (ctxt)
> > + process_context(xprt, ctxt);
> > +
> > + svc_xprt_put(&xprt->sc_xprt);
> > + }
> > }
> >
> > if (ctxt)
> > @@ -993,7 +1006,11 @@ static struct svc_xprt *svc_rdma_accept(struct
> > svc_xprt *xprt)
> > need_dma_mr = 0;
> > break;
> > case RDMA_TRANSPORT_IB:
> > - if (!(devattr.device_cap_flags &
> > IB_DEVICE_LOCAL_DMA_LKEY)) {
> > + if (!(newxprt->sc_dev_caps &
> > SVCRDMA_DEVCAP_FAST_REG)) {
> > + need_dma_mr = 1;
> > + dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
> > + } else if (!(devattr.device_cap_flags &
> > + IB_DEVICE_LOCAL_DMA_LKEY)) {
> > need_dma_mr = 1;
> > dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
> > } else
> > @@ -1190,14 +1207,7 @@ static int svc_rdma_has_wspace(struct svc_xprt
> > *xprt)
> > container_of(xprt, struct svcxprt_rdma, sc_xprt);
> >
> > /*
> > - * If there are fewer SQ WR available than required to send a
> > - * simple response, return false.
> > - */
> > - if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
> > - return 0;
> > -
> > - /*
> > - * ...or there are already waiters on the SQ,
> > + * If there are already waiters on the SQ,
> > * return false.
> > */
> > if (waitqueue_active(&rdma->sc_send_wait))
> >
> > --
> > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the
> > body of a message to majordomo@vger.kernel.org More majordomo info at
> > http://vger.kernel.org/majordomo-info.html
next prev parent reply other threads:[~2014-05-30 13:02 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2014-05-29 16:55 [PATCH V3] svcrdma: refactor marshalling logic Steve Wise
2014-05-29 23:43 ` Chuck Lever
2014-05-30 5:29 ` Devesh Sharma
2014-05-30 13:02 ` Steve Wise [this message]
2014-05-31 3:34 ` Devesh Sharma
2014-06-02 16:47 ` Steve Wise
2014-06-02 16:51 ` J. Bruce Fields
2014-06-02 16:52 ` Steve Wise
2014-06-02 16:57 ` 'J. Bruce Fields'
2014-06-02 17:06 ` Steve Wise
2014-06-02 18:10 ` 'J. Bruce Fields'
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to='000001cf7c07$72e613d0$58b23b70$@opengridcomputing.com' \
--to=swise@opengridcomputing.com \
--cc=Devesh.Sharma@Emulex.Com \
--cc=bfields@fieldses.org \
--cc=linux-nfs@vger.kernel.org \
--cc=linux-rdma@vger.kernel.org \
--cc=tom@opengridcomputing.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).