All of lore.kernel.org
 help / color / mirror / Atom feed
From: Chuck Lever <chuck.lever@oracle.com>
To: "Talpey, Thomas" <Thomas.Talpey@netapp.com>
Cc: nfs@lists.sourceforge.net
Subject: Re: [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling
Date: Fri, 13 Jul 2007 12:35:42 -0400	[thread overview]
Message-ID: <4697A9DE.50703@oracle.com> (raw)
In-Reply-To: <EXNANE01jOuF1Qp4hZD00000c67@exnane01.hq.netapp.com>

[-- Attachment #1: Type: text/plain, Size: 39853 bytes --]

Hi Tom!


Talpey, Thomas wrote:
> RPCRDMA: rpc rdma protocol implementation
> 
> This implements the marshaling and unmarshaling of the rpcrdma transport
> headers. Connection management is also addressed.
> 
> Signed-off-by: Tom Talpey <talpey@netapp.com>
> 
> ---
> 
> Index: linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
> ===================================================================
> --- linux-2.6.22.orig/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ linux-2.6.22/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -1,9 +1,882 @@
>  /*
> - * Placeholders for subsequent patches
> + * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
> + *
> + * This software is available to you under a choice of one of two
> + * licenses.  You may choose to be licensed under the terms of the GNU
> + * General Public License (GPL) Version 2, available from the file
> + * COPYING in the main directory of this source tree, or the BSD-type
> + * license below:
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions
> + * are met:
> + *
> + *      Redistributions of source code must retain the above copyright
> + *      notice, this list of conditions and the following disclaimer.
> + *
> + *      Redistributions in binary form must reproduce the above
> + *      copyright notice, this list of conditions and the following
> + *      disclaimer in the documentation and/or other materials provided
> + *      with the distribution.
> + *
> + *      Neither the name of the Network Appliance, Inc. nor the names of
> + *      its contributors may be used to endorse or promote products
> + *      derived from this software without specific prior written
> + *      permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +/*
> + * rpc_rdma.c
> + *
> + * This file contains the guts of the RPC RDMA protocol, and
> + * does marshaling/unmarshaling, etc. It is also where interfacing
> + * to the Linux RPC framework lives.
>   */
>  
>  #include "xprt_rdma.h"
>  
> -void rpcrdma_conn_func(struct rpcrdma_ep *a) { }
> -void rpcrdma_reply_handler(struct rpcrdma_rep *a) { }
> -int rpcrdma_marshal_req(struct rpc_rqst *a) { return EINVAL; }
> +#include <linux/nfs2.h>
> +#include <linux/nfs3.h>
> +#include <linux/nfs4.h>

I haven't looked closely at this yet, but is there really a dependency 
in here on NFS?  I don't see NFS dependencies in other parts of the RPC 
client or server, save the legacy debugging interface 
(/proc/sys/sunrpc/nfs_debug and friends).


> +#include <linux/highmem.h>
> +
> +#ifdef RPC_DEBUG
> +# define RPCDBG_FACILITY       RPCDBG_TRANS
> +#endif
> +
> +enum rpcrdma_chunktype {
> +       rpcrdma_noch = 0,
> +       rpcrdma_readch,
> +       rpcrdma_areadch,
> +       rpcrdma_writech,
> +       rpcrdma_replych
> +};
> +
> +#ifdef RPC_DEBUG
> +static const char transfertypes[][12] = {
> +       "pure inline",  /* no chunks */
> +       " read chunk",  /* some argument via rdma read */
> +       "*read chunk",  /* entire request via rdma read */
> +       "write chunk",  /* some result via rdma write */
> +       "reply chunk"   /* entire reply via rdma write */
> +};
> +#endif
> +
> +/*
> + * Chunk assembly from upper layer xdr_buf.
> + *
> + * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
> + * elements. Segments are then coalesced when registered, if possible
> + * within the selected memreg mode.
> + *
> + * Note, this routine is never called if the connection's memory
> + * registration strategy is 0 (bounce buffers).
> + */
> +
> +static int
> +rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, int first,
> +       enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
> +{
> +       int len, n = 0, p;
> +
> +       if (first == 0 && xdrbuf->head[0].iov_len) {
> +               seg[n].mr_page = NULL;
> +               seg[n].mr_offset = xdrbuf->head[0].iov_base;
> +               seg[n].mr_len = xdrbuf->head[0].iov_len;
> +               ++n;
> +       }
> +
> +       if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) {
> +               if (n == nsegs)
> +                       return 0;
> +               seg[n].mr_page = xdrbuf->pages[0];
> +               seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base;
> +               seg[n].mr_len = PAGE_SIZE - xdrbuf->page_base;
> +               len = xdrbuf->page_len - seg[n].mr_len;
> +               ++n;
> +               p = 1;
> +               while (len > 0) {
> +                       if (n == nsegs)
> +                               return 0;
> +                       seg[n].mr_page = xdrbuf->pages[p];
> +                       seg[n].mr_offset = NULL;
> +                       seg[n].mr_len = ((len > PAGE_SIZE) ? PAGE_SIZE : len);
> +                       len -= seg[n].mr_len;
> +                       ++n;
> +                       ++p;
> +               }
> +       }
> +
> +       if (xdrbuf->tail[0].iov_len && type != rpcrdma_writech) {
> +               if (n == nsegs)
> +                       return 0;
> +               seg[n].mr_page = NULL;
> +               seg[n].mr_offset = xdrbuf->tail[0].iov_base;
> +               seg[n].mr_len = xdrbuf->tail[0].iov_len;
> +               ++n;
> +       }
> +       return n;
> +}
> +
> +/*
> + * Create read/write chunk lists, and reply chunks, for RDMA
> + *
> + *   Assume check against THRESHOLD has been done, and chunks are required.
> + *   Assume only encoding one list entry for read|write chunks. The NFSv3
> + *     protocol is simple enough to allow this as it only has a single "bulk
> + *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
> + *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
> + *
> + * When used for a single reply chunk (which is a special write
> + * chunk used for the entire reply, rather than just the data), it
> + * is used primarily for READDIR and READLINK which would otherwise
> + * be severely size-limited by a small rdma inline read max. The server
> + * response will come back as an RDMA Write, followed by a message
> + * of type RDMA_NOMSG carrying the xid and length. As a result, reply
> + * chunks do not provide data alignment, however they do not require
> + * "fixup" (moving the response to the upper layer buffer) either.
> + *
> + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
> + *
> + *  Read chunklist (a linked list):
> + *   N elements, position P (same P for all chunks of same arg!):
> + *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
> + *
> + *  Write chunklist (a list of (one) counted array):
> + *   N elements:
> + *    1 - N - HLOO - HLOO - ... - HLOO - 0
> + *
> + *  Reply chunk (a counted array):
> + *   N elements:
> + *    1 - N - HLOO - HLOO - ... - HLOO
> + */
> +
> +static unsigned int
> +rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
> +               struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
> +{
> +       struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> +       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);
> +       int nsegs, nchunks = 0;
> +       int pos;
> +       struct rpcrdma_mr_seg *seg = req->rl_segments;
> +       struct rpcrdma_read_chunk *cur_rchunk = NULL;
> +       struct rpcrdma_write_array *warray = NULL;
> +       struct rpcrdma_write_chunk *cur_wchunk = NULL;
> +       u32 *iptr = headerp->rm_body.rm_chunks;
> +
> +       if (type == rpcrdma_readch || type == rpcrdma_areadch) {
> +               /* a read chunk - server will RDMA Read our memory */
> +               cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
> +       } else {
> +               /* a write or reply chunk - server will RDMA Write our memory */
> +               *iptr++ = xdr_zero;     /* encode a NULL read chunk list */
> +               if (type == rpcrdma_replych)
> +                       *iptr++ = xdr_zero;     /* a NULL write chunk list */
> +               warray = (struct rpcrdma_write_array *) iptr;
> +               cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
> +       }
> +
> +       if (type == rpcrdma_replych || type == rpcrdma_areadch)
> +               pos = 0;
> +       else
> +               pos = target->head[0].iov_len;
> +
> +       nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
> +       if (nsegs == 0)
> +               return 0;
> +
> +       do {
> +               /* bind/register the memory, then build chunk from result. */
> +               int n = rpcrdma_register_external(seg, nsegs,
> +                                               cur_wchunk != NULL, r_xprt);
> +               if (n <= 0)
> +                       goto out;
> +               if (cur_rchunk) {       /* read */
> +                       cur_rchunk->rc_discrim = xdr_one;
> +                       /* all read chunks have the same "position" */
> +                       cur_rchunk->rc_position = htonl(pos);
> +                       cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
> +                       cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
> +                       xdr_encode_hyper((u32 *)&cur_rchunk->rc_target.rs_offset,
> +                                       seg->mr_base);
> +                       dprintk("RPC:       %s: read chunk "
> +                               "elem %d@0x%llx:0x%x pos %d (%s)\n", __func__,
> +                               seg->mr_len, seg->mr_base, seg->mr_rkey, pos,
> +                               n < nsegs ? "more" : "last");
> +                       cur_rchunk++;
> +                       r_xprt->rx_stats.read_chunk_count++;
> +               } else {                /* write/reply */
> +                       cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
> +                       cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
> +                       xdr_encode_hyper((u32 *)&cur_wchunk->wc_target.rs_offset,
> +                                       seg->mr_base);
> +                       dprintk("RPC:       %s: %s chunk "
> +                               "elem %d@0x%llx:0x%x (%s)\n", __func__,
> +                               (type == rpcrdma_replych) ? "reply" : "write",
> +                               seg->mr_len, seg->mr_base, seg->mr_rkey,
> +                               n < nsegs ? "more" : "last");
> +                       cur_wchunk++;
> +                       if (type == rpcrdma_replych)
> +                               r_xprt->rx_stats.reply_chunk_count++;
> +                       else
> +                               r_xprt->rx_stats.write_chunk_count++;
> +                       r_xprt->rx_stats.total_rdma_request += seg->mr_len;
> +               }
> +               nchunks++;
> +               seg   += n;
> +               nsegs -= n;
> +       } while (nsegs);
> +
> +       /* success. all failures return above */
> +       req->rl_nchunks = nchunks;
> +
> +       BUG_ON(nchunks == 0);
> +
> +       /*
> +       * finish off header. If write, marshal discrim and nchunks.
> +       */
> +       if (cur_rchunk) {
> +               iptr = (u32 *) cur_rchunk;
> +               *iptr++ = xdr_zero;     /* finish the read chunk list */
> +               *iptr++ = xdr_zero;     /* encode a NULL write chunk list */
> +               *iptr++ = xdr_zero;     /* encode a NULL reply chunk */
> +       } else {
> +               warray->wc_discrim = xdr_one;
> +               warray->wc_nchunks = htonl(nchunks);
> +               iptr = (u32 *) cur_wchunk;
> +               if (type == rpcrdma_writech) {
> +                       *iptr++ = xdr_zero; /* finish the write chunk list */
> +                       *iptr++ = xdr_zero; /* encode a NULL reply chunk */
> +               }
> +       }
> +
> +       /*
> +       * Return header size.
> +       */
> +       return (unsigned char *)iptr - (unsigned char *)headerp;
> +
> +out:
> +       for (pos = 0; nchunks--; )
> +               pos += rpcrdma_deregister_external(
> +                               &req->rl_segments[pos], r_xprt, NULL);
> +       return 0;
> +}
> +
> +/*
> + * Copy write data inline.
> + * This function is used for "small" requests. Data which is passed
> + * to RPC via iovecs (or page list) is copied directly into the
> + * pre-registered memory buffer for this request. For small amounts
> + * of data, this is efficient. The cutoff value is tunable.
> + */
> +static int
> +rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
> +{
> +       int i, npages, curlen;
> +       int copy_len;
> +       unsigned char *srcp, *destp;
> +
> +       destp = rqst->rq_svec[0].iov_base;
> +       curlen = rqst->rq_svec[0].iov_len;
> +       destp += curlen;
> +       /*
> +       * Do optional padding where it makes sense. Alignment of write
> +       * payload can help the server, if our setting is accurate.
> +       */
> +       pad -= (curlen + 36 /*sizeof(struct rpcrdma_msg_padded)*/);
> +       if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
> +               pad = 0;        /* don't pad this request */
> +
> +       dprintk("RPC:       %s: pad %d destp 0x%p len %d hdrlen %d\n",
> +               __func__, pad, destp, rqst->rq_slen, curlen);
> +
> +       copy_len = rqst->rq_snd_buf.page_len;
> +       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.pullup_copy_count += copy_len;
> +       npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base + copy_len) >> PAGE_SHIFT;
> +       for (i = 0; copy_len && i < npages; i++) {
> +               if (i == 0)
> +                       curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base;
> +               else
> +                       curlen = PAGE_SIZE;
> +               if (curlen > copy_len)
> +                       curlen = copy_len;
> +               dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",
> +                       __func__, i, destp, copy_len, curlen);
> +               srcp = kmap_atomic(rqst->rq_snd_buf.pages[i],
> +                                       KM_SKB_SUNRPC_DATA);
> +               if (i == 0)
> +                       memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen);
> +               else
> +                       memcpy(destp, srcp, curlen);
> +               kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
> +               rqst->rq_svec[0].iov_len += curlen;
> +               destp += curlen;
> +               copy_len -= curlen;
> +       }
> +       if (rqst->rq_snd_buf.tail[0].iov_len) {
> +               curlen = rqst->rq_snd_buf.tail[0].iov_len;
> +               if (destp != rqst->rq_snd_buf.tail[0].iov_base) {
> +                       memcpy(destp, rqst->rq_snd_buf.tail[0].iov_base, curlen);
> +                       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.pullup_copy_count += curlen;
> +               }
> +               dprintk("RPC:       %s: tail destp 0x%p len %d curlen %d\n",
> +                       __func__, destp, copy_len, curlen);
> +               rqst->rq_svec[0].iov_len += curlen;
> +       }
> +       /* header now contains entire send message */
> +       return pad;
> +}
> +
> +/*
> + * Totally imperfect, temporary attempt to detect nfs reads...
> + * e.g. establish a hint via xdr_inline_pages, etc.
> + */
> +static int
> +is_nfs_read(struct rpc_rqst *rqst)
> +{
> +       u32 *p;
> +
> +       if (rqst->rq_task->tk_client->cl_prog != NFS_PROGRAM)
> +               return 0;
> +       switch (rqst->rq_task->tk_client->cl_vers) {
> +       case 4:
> +               /* Must dig into the COMPOUND. */
> +               /* Back up from the end of what a read request would be */
> +               /* PUTFH, fh, OP_READ, stateid(16), offset(8), count(4) */
> +               p = (u32 *)(rqst->rq_snd_buf.head[0].iov_base +
> +                           rqst->rq_snd_buf.head[0].iov_len);
> +               /* test read and count */
> +               return (rqst->rq_snd_buf.head[0].iov_len > 40 &&
> +                       p[-8] == __constant_htonl(OP_READ) &&
> +
> +                       p[-1] == htonl(rqst->rq_rcv_buf.page_len));
> +       case 3:
> +               return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFS3PROC_READ;
> +       case 2:
> +               return rqst->rq_task->tk_msg.rpc_proc->p_proc == NFSPROC_READ;
> +       }
> +       return 0;
> +}
> +
> +/*
> + * Marshal a request: the primary job of this routine is to choose
> + * the transfer modes. See comments below.
> + *
> + * Uses multiple RDMA IOVs for a request:
> + *  [0] -- RPC RDMA header, which uses memory from the *start* of the
> + *         preregistered buffer that already holds the RPC data in
> + *         its middle.
> + *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
> + *  [2] -- optional padding.
> + *  [3] -- if padded, header only in [1] and data here.
> + */
> +
> +int
> +rpcrdma_marshal_req(struct rpc_rqst *rqst)
> +{
> +       struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;
> +       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> +       struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> +       char *base;
> +       size_t hdrlen, rpclen, padlen;
> +       enum rpcrdma_chunktype rtype, wtype;
> +       struct rpcrdma_msg *headerp;
> +
> +       /*
> +       * rpclen gets amount of data in first buffer, which is the
> +       * pre-registered buffer.
> +       */
> +       base = rqst->rq_svec[0].iov_base;
> +       rpclen = rqst->rq_svec[0].iov_len;
> +
> +       /* build RDMA header in private area at front */
> +       headerp = (struct rpcrdma_msg *) req->rl_base;
> +       /* don't htonl XID, it's already done in request */
> +       headerp->rm_xid = rqst->rq_xid;
> +       headerp->rm_vers = xdr_one;
> +       headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
> +       headerp->rm_type = __constant_htonl(RDMA_MSG);
> +
> +       /*
> +       * Chunks needed for results?
> +       *
> +       * o If the expected result is under the inline threshold, all ops
> +       *   return as inline (but see later).
> +       * o Large non-read ops return as a single reply chunk.
> +       * o Large read ops return data as write chunk(s), header as inline.
> +       *
> +       * Note: the NFS code sending down multiple result segments implies
> +       * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
> +       */
> +
> +       /*
> +       * This code can handle read chunks, write chunks OR reply
> +       * chunks -- only one type. If the request is too big to fit
> +       * inline, then we will choose read chunks. If the request is
> +       * a READ, then use write chunks to separate the file data
> +       * into pages; otherwise use reply chunks.
> +       */
> +       if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
> +               wtype = rpcrdma_noch;
> +       else if (rqst->rq_rcv_buf.page_len == 0)
> +               wtype = rpcrdma_replych;
> +       else if (is_nfs_read(rqst))
> +               wtype = rpcrdma_writech;
> +       else
> +               wtype = rpcrdma_replych;
> +
> +       /*
> +       * Chunks needed for arguments?
> +       *
> +       * o If the total request is under the inline threshold, all ops
> +       *   are sent as inline.
> +       * o Large non-write ops are sent with the entire message as a
> +       *   single read chunk (protocol 0-position special case).
> +       * o Large write ops transmit data as read chunk(s), header as
> +       *   inline.
> +       *
> +       * Note: the NFS code sending down multiple argument segments
> +       * implies the op is a write.
> +       * TBD check NFSv4 setacl
> +       */
> +       if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
> +               rtype = rpcrdma_noch;
> +       else if (rqst->rq_snd_buf.page_len == 0)
> +               rtype = rpcrdma_areadch;
> +       else
> +               rtype = rpcrdma_readch;
> +
> +       /* The following simplification is not true forever */
> +       if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
> +               wtype = rpcrdma_noch;
> +       BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
> +
> +       if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
> +           (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
> +               /* forced to "pure inline"? */
> +               dprintk("RPC:       %s: too much data (%d/%d) for inline\n",
> +                       __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
> +               return -1;
> +       }
> +
> +       hdrlen = 28; /*sizeof *headerp;*/
> +       padlen = 0;
> +
> +       /*
> +       * Pull up any extra send data into the preregistered buffer.
> +       * When padding is in use and applies to the transfer, insert
> +       * it and change the message type.
> +       */
> +       if (rtype == rpcrdma_noch) {
> +
> +               padlen = rpcrdma_inline_pullup(rqst, RPCRDMA_INLINE_PAD_VALUE(rqst));
> +
> +               if (padlen) {
> +                       headerp->rm_type = __constant_htonl(RDMA_MSGP);
> +                       headerp->rm_body.rm_padded.rm_align =
> +                               htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
> +                       headerp->rm_body.rm_padded.rm_thresh =
> +                               __constant_htonl(RPCRDMA_INLINE_PAD_THRESH);
> +                       headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
> +                       headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
> +                       headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
> +                       hdrlen += 2 * sizeof (u32);     /* extra words in padhdr */
> +                       BUG_ON(wtype != rpcrdma_noch);
> +
> +               } else {
> +                       headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
> +                       headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
> +                       headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
> +                       /* new length after pullup */
> +                       rpclen = rqst->rq_svec[0].iov_len;
> +                       /*
> +                       * Currently we try to not actually use read inline.
> +                       * Reply chunks have the desirable property that
> +                       * they land, packed, directly in the target buffers
> +                       * without headers, so they require no fixup. The
> +                       * additional RDMA Write op sends the same amount
> +                       * of data, streams on-the-wire and adds no overhead
> +                       * on receive. Therefore, we request a reply chunk
> +                       * for non-writes wherever feasible and efficient.
> +                       */
> +                       if (wtype == rpcrdma_noch &&
> +                           r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
> +                               wtype = rpcrdma_replych;
> +               }
> +       }
> +
> +       /*
> +       * Marshal chunks. This routine will return the header length
> +       * consumed by marshaling.
> +       */
> +       if (rtype != rpcrdma_noch) {
> +               hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, headerp, rtype);
> +               wtype = rtype;  /* simplify dprintk */
> +
> +       } else if (wtype != rpcrdma_noch) {
> +               hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, headerp, wtype);
> +       }
> +
> +       if (hdrlen == 0)
> +               return -1;
> +
> +       dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd\n"
> +               "                   headerp 0x%p base 0x%p lkey 0x%x\n",
> +               __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
> +               headerp, base, req->rl_iov.lkey);
> +
> +       /*
> +       * initialize send_iov's - normally only two: rdma chunk header and
> +       * single preregistered RPC header buffer, but if padding is present,
> +       * then use a preregistered (and zeroed) pad buffer between the RPC
> +       * header and any write data. In all non-rdma cases, any following
> +       * data has been copied into the RPC header buffer.
> +       */
> +       req->rl_send_iov[0].addr = req->rl_iov.addr;
> +       req->rl_send_iov[0].length = hdrlen;
> +       req->rl_send_iov[0].lkey = req->rl_iov.lkey;
> +
> +       req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
> +       req->rl_send_iov[1].length = rpclen;
> +       req->rl_send_iov[1].lkey = req->rl_iov.lkey;
> +
> +       req->rl_niovs = 2;
> +
> +       if (padlen) {
> +               struct rpcrdma_ep *ep = &r_xprt->rx_ep;
> +
> +               req->rl_send_iov[2].addr = ep->rep_pad.addr;
> +               req->rl_send_iov[2].length = padlen;
> +               req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
> +
> +               req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
> +               req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
> +               req->rl_send_iov[3].lkey = req->rl_iov.lkey;
> +
> +               req->rl_niovs = 4;
> +       }
> +
> +       return 0;
> +}
> +
> +/*
> + * Chase down a received write or reply chunklist to get length
> + * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
> + */
> +static int
> +rpcrdma_count_chunks(struct rpcrdma_rep *rep, int max, int wrchunk, u32 **iptrp)
> +{
> +       unsigned int i, total_len;
> +       struct rpcrdma_write_chunk *cur_wchunk;
> +
> +       i = ntohl(**iptrp);     /* get array count */
> +       if (i > max) {
> +               return -1;
> +       }
> +       cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
> +       total_len = 0;
> +       while (i--) {
> +               ifdebug(FACILITY) {
> +                       u64 off;
> +                       xdr_decode_hyper((u32 *)&cur_wchunk->wc_target.rs_offset, &off);
> +                       dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
> +                               __func__, ntohl(cur_wchunk->wc_target.rs_length),
> +                               off, ntohl(cur_wchunk->wc_target.rs_handle));
> +               }
> +               total_len += ntohl(cur_wchunk->wc_target.rs_length);
> +               ++cur_wchunk;
> +       }
> +       /* check and adjust for properly terminated write chunk */
> +       if (wrchunk) {
> +               u32 *w = (u32 *) cur_wchunk;
> +               if (*w++ != xdr_zero)
> +                       return -1;
> +               cur_wchunk = (struct rpcrdma_write_chunk *) w;
> +       }
> +       if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
> +               return -1;
> +
> +       *iptrp = (u32 *) cur_wchunk;
> +       return total_len;
> +}
> +
> +/*
> + * Scatter inline received data back into provided iov's.
> + */
> +static void
> +rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len)
> +{
> +       int i, npages, curlen, olen;
> +       char *destp;
> +
> +       curlen = rqst->rq_rcv_buf.head[0].iov_len;
> +       if (curlen > copy_len) {        /* write chunk header fixup */
> +               curlen = copy_len;
> +               rqst->rq_rcv_buf.head[0].iov_len = curlen;
> +       }
> +
> +       dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d\n",
> +               __func__, srcp, copy_len, curlen);
> +
> +       /* Shift pointer for first receive segment only */
> +       rqst->rq_rcv_buf.head[0].iov_base = srcp;
> +       srcp += curlen;
> +       copy_len -= curlen;
> +
> +       olen = copy_len;
> +       i = 0;
> +       rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
> +       if (copy_len && rqst->rq_rcv_buf.page_len) {
> +               npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base +
> +                       rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
> +               for ( ; i < npages; i++) {
> +                       if (i == 0)
> +                               curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base;
> +                       else
> +                               curlen = PAGE_SIZE;
> +                       if (curlen > copy_len)
> +                               curlen = copy_len;
> +                       dprintk("RPC:       %s: page %d srcp 0x%p len %d curlen %d\n",
> +                               __func__, i, srcp, copy_len, curlen);
> +                       destp = kmap_atomic(rqst->rq_rcv_buf.pages[i],
> +                                               KM_SKB_SUNRPC_DATA);
> +                       if (i == 0)
> +                               memcpy(destp + rqst->rq_rcv_buf.page_base,
> +                                               srcp, curlen);
> +                       else
> +                               memcpy(destp, srcp, curlen);
> +                       flush_dcache_page(rqst->rq_rcv_buf.pages[i]);
> +                       kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
> +                       srcp += curlen;
> +                       if ((copy_len -= curlen) == 0)
> +                               break;
> +               }
> +               rqst->rq_rcv_buf.page_len = olen - copy_len;
> +       } else
> +               rqst->rq_rcv_buf.page_len = 0;
> +
> +       if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
> +               curlen = copy_len;
> +               if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
> +                       curlen = rqst->rq_rcv_buf.tail[0].iov_len;
> +               if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
> +                       memcpy(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
> +               dprintk("RPC:       %s: tail srcp 0x%p len %d curlen %d\n",
> +                       __func__, srcp, copy_len, curlen);
> +               rqst->rq_rcv_buf.tail[0].iov_len = curlen;
> +               copy_len -= curlen; ++i;
> +       } else
> +               rqst->rq_rcv_buf.tail[0].iov_len = 0;
> +
> +       if (copy_len)
> +               dprintk("RPC:       %s: %d bytes in %d extra segments (%d lost)\n",
> +                       __func__, olen, i, copy_len);
> +
> +       /* TBD avoid a warning from call_decode() */
> +       rqst->rq_private_buf = rqst->rq_rcv_buf;
> +}
> +
> +/*
> + * This function is called when an async event is posted to
> + * the connection which changes the connection state. All it
> + * does at this point is mark the connection up/down, the rpc
> + * timers do the rest.
> + */
> +void
> +rpcrdma_conn_func(struct rpcrdma_ep *ep)
> +{
> +       struct rpc_xprt *xprt = ep->rep_xprt;
> +
> +       spin_lock_bh(&xprt->transport_lock);
> +       if (ep->rep_connected > 0) {
> +               if (!xprt_test_and_set_connected(xprt))
> +                       xprt_wake_pending_tasks(xprt, 0);
> +       } else {
> +               if (xprt_test_and_clear_connected(xprt))
> +                       xprt_wake_pending_tasks(xprt, ep->rep_connected);
> +       }
> +       spin_unlock_bh(&xprt->transport_lock);
> +}
> +
> +/*
> + * This function is called when memory window unbind which we are waiting
> + * for completes. Just use rr_func (zeroed by upcall) to signal completion.
> + */
> +static void
> +rpcrdma_unbind_func(struct rpcrdma_rep *rep)
> +{
> +       wake_up(&rep->rr_unbind);
> +}
> +
> +/*
> + * Called as a tasklet to do req/reply match and complete a request
> + * Errors must result in the RPC task either being awakened, or
> + * allowed to timeout, to discover the errors at that time.
> + */
> +void
> +rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> +{
> +       struct rpcrdma_msg *headerp;
> +       struct rpcrdma_req *req;
> +       struct rpc_rqst *rqst;
> +       struct rpc_xprt *xprt = rep->rr_xprt;
> +       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> +       u32 *iptr;
> +       int i, rdmalen, status;
> +
> +       /* Check status. If bad, signal disconnect and return rep to pool */
> +       if (rep->rr_len == ~0U) {
> +               rpcrdma_recv_buffer_put(rep);
> +               if (r_xprt->rx_ep.rep_connected == 1) {
> +                       r_xprt->rx_ep.rep_connected = -EIO;
> +                       rpcrdma_conn_func(&r_xprt->rx_ep);
> +               }
> +               return;
> +       }
> +       if (rep->rr_len < 28) {
> +               dprintk("RPC:       %s: short/invalid reply\n", __func__);
> +               goto repost;
> +       }
> +       headerp = (struct rpcrdma_msg *) rep->rr_base;
> +       if (headerp->rm_vers != xdr_one) {
> +               dprintk("RPC:       %s: invalid version %d\n",
> +                       __func__, ntohl(headerp->rm_vers));
> +               goto repost;
> +       }
> +
> +       /* Get XID and try for a match. */
> +       spin_lock(&xprt->transport_lock);
> +       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
> +       if (rqst == NULL) {
> +               spin_unlock(&xprt->transport_lock);
> +               dprintk("RPC:       %s: reply 0x%p failed "
> +                       "to match any request xid 0x%08x len %d\n",
> +                       __func__, rep, headerp->rm_xid, rep->rr_len);
> +repost:
> +               rep->rr_func = rpcrdma_reply_handler;
> +               if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
> +                       rpcrdma_recv_buffer_put(rep);
> +
> +               return;
> +       }
> +
> +       /* get request object */
> +       req = rpcr_to_rdmar(rqst);
> +
> +       dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
> +               "                   RPC request 0x%p xid 0x%08x\n",
> +                       __func__, rep, req, rqst, headerp->rm_xid);
> +
> +       BUG_ON(!req || req->rl_reply);
> +
> +       /* from here on, the reply is no longer an orphan */
> +       req->rl_reply = rep;
> +
> +       /* check for expected message types */
> +       /* The order of some of these tests is important. */
> +       switch (headerp->rm_type) {
> +       case __constant_htonl(RDMA_MSG):
> +               /* never expect read chunks */
> +               /* never expect reply chunks (two ways to check) */
> +               /* never expect write chunks without having offered RDMA */
> +               if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
> +                   (headerp->rm_body.rm_chunks[1] == xdr_zero &&
> +                    headerp->rm_body.rm_chunks[2] != xdr_zero) ||
> +                   (headerp->rm_body.rm_chunks[1] != xdr_zero &&
> +                    req->rl_nchunks == 0)) {
> +                       goto badheader;
> +               }
> +               if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
> +                       /* count any expected write chunks in read reply */
> +                       /* start at write chunk array count */
> +                       iptr = &headerp->rm_body.rm_chunks[2];
> +                       rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 1, &iptr);
> +                       /* check for validity, and no reply chunk after */
> +                       if (rdmalen < 0 || *iptr++ != xdr_zero) {
> +                               goto badheader;
> +                       }
> +                       rep->rr_len -=
> +                           ((unsigned char *)iptr - (unsigned char *)headerp);
> +                       status = rep->rr_len + rdmalen;
> +                       r_xprt->rx_stats.total_rdma_reply += rdmalen;
> +               } else {
> +                       /* else ordinary inline */
> +                       iptr = (u32 *)((unsigned char *)headerp + 28);
> +                       rep->rr_len -= 28; /*sizeof *headerp;*/
> +                       status = rep->rr_len;
> +               }
> +               /* Fix up the rpc results for upper layer */
> +               rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len);
> +               break;
> +
> +       case __constant_htonl(RDMA_NOMSG):
> +               /* never expect read or write chunks, always reply chunks */
> +               if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
> +                   headerp->rm_body.rm_chunks[1] != xdr_zero ||
> +                   headerp->rm_body.rm_chunks[2] != xdr_one ||
> +                   req->rl_nchunks == 0) {
> +                       goto badheader;
> +               }
> +               iptr = (u32 *)((unsigned char *)headerp + 28);
> +               rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
> +               if (rdmalen < 0) {
> +                       goto badheader;
> +               }
> +               r_xprt->rx_stats.total_rdma_reply += rdmalen;
> +               /* Reply chunk buffer already is the reply vector - no fixup. */
> +               status = rdmalen;
> +               break;
> +
> +       default:
> +       badheader:
> +               dprintk("%s: invalid rpcrdma reply header (type %d):"
> +                               " chunks[012] == %d %d %d expected chunks <= %d\n",
> +                               __func__, ntohl(headerp->rm_type),
> +                               headerp->rm_body.rm_chunks[0],
> +                               headerp->rm_body.rm_chunks[1],
> +                               headerp->rm_body.rm_chunks[2],
> +                               req->rl_nchunks);
> +               status = -EIO;
> +               r_xprt->rx_stats.bad_reply_count++;
> +               break;
> +       }
> +
> +       /* If using mw bind, start the deregister process now. */
> +       /* (Note: if mr_free(), cannot perform it here, in tasklet context) */
> +       if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
> +       case RPCRDMA_MEMWINDOWS:
> +               for (i = 0; req->rl_nchunks-- > 1; )
> +                       i += rpcrdma_deregister_external(
> +                               &req->rl_segments[i], r_xprt, NULL);
> +               /* Optionally wait (not here) for unbinds to complete */
> +               rep->rr_func = rpcrdma_unbind_func;
> +               (void) rpcrdma_deregister_external(&req->rl_segments[i], r_xprt, rep);
> +               break;
> +       case RPCRDMA_MEMWINDOWS_ASYNC:
> +               for (i = 0; req->rl_nchunks--; )
> +                       i += rpcrdma_deregister_external(
> +                               &req->rl_segments[i], r_xprt, NULL);
> +               break;
> +       default:
> +               break;
> +       }
> +
> +       dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
> +                       __func__, xprt, rqst, status);
> +       xprt_complete_rqst(rqst->rq_task, status);
> +       spin_unlock(&xprt->transport_lock);
> +}
> 
> 
> -------------------------------------------------------------------------
> This SF.net email is sponsored by DB2 Express
> Download DB2 Express C - the FREE version of DB2 express and take
> control of your XML. No limits. Just data. Click to get it now.
> http://sourceforge.net/powerbar/db2/
> _______________________________________________
> NFS maillist  -  NFS@lists.sourceforge.net
> https://lists.sourceforge.net/lists/listinfo/nfs


[-- Attachment #2: chuck.lever.vcf --]
[-- Type: text/x-vcard, Size: 315 bytes --]

begin:vcard
fn:Chuck Lever
n:Lever;Chuck
org:Oracle Corporation;Corporate Architecture: Linux Projects Group
adr:;;1015 Granger Avenue;Ann Arbor;MI;48104;USA
email;internet:chuck dot lever at nospam oracle dot com
title:Principal Member of Staff
tel;work:+1 248 614 5091
x-mozilla-html:FALSE
version:2.1
end:vcard


[-- Attachment #3: Type: text/plain, Size: 286 bytes --]

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/

[-- Attachment #4: Type: text/plain, Size: 140 bytes --]

_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

  reply	other threads:[~2007-07-13 16:36 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2007-07-11 21:08 [RFC Patch 08/09] NFS/RDMA client - rpcrdma protocol handling Talpey, Thomas
2007-07-13 16:35 ` Chuck Lever [this message]
2007-07-13 16:50   ` Talpey, Thomas
2007-07-13 17:11     ` Chuck Lever
2007-07-13 17:28       ` Talpey, Thomas
2007-08-24 17:12       ` Talpey, Thomas
2007-08-24 18:44         ` Chuck Lever
2007-08-24 19:38           ` Talpey, Thomas

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4697A9DE.50703@oracle.com \
    --to=chuck.lever@oracle.com \
    --cc=Thomas.Talpey@netapp.com \
    --cc=nfs@lists.sourceforge.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.