All of lore.kernel.org
 help / color / mirror / Atom feed
* [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver
@ 2007-10-19 21:53 Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 01/10] rdma: ONCRPC RDMA Header File Tom Tucker
                   ` (10 more replies)
  0 siblings, 11 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:53 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb

This patchset implements the RDMA Transport Driver for the 
server transport switch. This patchset has been tested with iozone
and Connectathon over NFSv3. 

This patchset along with all of the previous server side
transport switches are also available in this git tree:

git://linux-nfs.org/~tomtucker/nfs-rdma-dev-2.6.git

-- 
Signed-off-by: Tom Tucker <tom@opengridcomputing.com>

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [RFC,PATCH 01/10] rdma: ONCRPC RDMA Header File
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 02/10] rdma: sysctl for SVCRDMA transport module Tom Tucker
                   ` (9 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


These are the core data types that are used to process the ONCRPC protocol
in the NFS-RDMA client and server.

Signed-off-by: Tom Talpey <talpey@netapp.com>
---

 include/linux/sunrpc/rpc_rdma.h |  116 +++++++++++++++++++++++++++++++++++++++
 1 files changed, 116 insertions(+), 0 deletions(-)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
new file mode 100644
index 0000000..0013a0d
--- /dev/null
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _LINUX_SUNRPC_RPC_RDMA_H
+#define _LINUX_SUNRPC_RPC_RDMA_H
+
+struct rpcrdma_segment {
+	uint32_t rs_handle;	/* Registered memory handle */
+	uint32_t rs_length;	/* Length of the chunk in bytes */
+	uint64_t rs_offset;	/* Chunk virtual address or offset */
+};
+
+/*
+ * read chunk(s), encoded as a linked list.
+ */
+struct rpcrdma_read_chunk {
+	uint32_t rc_discrim;	/* 1 indicates presence */
+	uint32_t rc_position;	/* Position in XDR stream */
+	struct rpcrdma_segment rc_target;
+};
+
+/*
+ * write chunk, and reply chunk.
+ */
+struct rpcrdma_write_chunk {
+	struct rpcrdma_segment wc_target;
+};
+
+/*
+ * write chunk(s), encoded as a counted array.
+ */
+struct rpcrdma_write_array {
+	uint32_t wc_discrim;	/* 1 indicates presence */
+	uint32_t wc_nchunks;	/* Array count */
+	struct rpcrdma_write_chunk wc_array[0];
+};
+
+struct rpcrdma_msg {
+	uint32_t rm_xid;	/* Mirrors the RPC header xid */
+	uint32_t rm_vers;	/* Version of this protocol */
+	uint32_t rm_credit;	/* Buffers requested/granted */
+	uint32_t rm_type;	/* Type of message (enum rpcrdma_proc) */
+	union {
+
+		struct {			/* no chunks */
+			uint32_t rm_empty[3];	/* 3 empty chunk lists */
+		} rm_nochunks;
+
+		struct {			/* no chunks and padded */
+			uint32_t rm_align;	/* Padding alignment */
+			uint32_t rm_thresh;	/* Padding threshold */
+			uint32_t rm_pempty[3];	/* 3 empty chunk lists */
+		} rm_padded;
+
+		uint32_t rm_chunks[0];	/* read, write and reply chunks */
+
+	} rm_body;
+};
+
+#define RPCRDMA_HDRLEN_MIN	28
+
+enum rpcrdma_errcode {
+	ERR_VERS = 1,
+	ERR_CHUNK = 2
+};
+
+struct rpcrdma_err_vers {
+	uint32_t rdma_vers_low;	/* Version range supported by peer */
+	uint32_t rdma_vers_high;
+};
+
+enum rpcrdma_proc {
+	RDMA_MSG = 0,		/* An RPC call or reply msg */
+	RDMA_NOMSG = 1,		/* An RPC call or reply msg - separate body */
+	RDMA_MSGP = 2,		/* An RPC call or reply msg with padding */
+	RDMA_DONE = 3,		/* Client signals reply completion */
+	RDMA_ERROR = 4		/* An RPC RDMA encoding error */
+};
+
+#endif				/* _LINUX_SUNRPC_RPC_RDMA_H */

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 02/10] rdma: sysctl for SVCRDMA transport module
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 01/10] rdma: ONCRPC RDMA Header File Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 03/10] rdma: SVCRMDA Header File Tom Tucker
                   ` (8 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


Add sysctl flags for SVCRDMA transport module debug messages.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 include/linux/sunrpc/debug.h |   15 ++++++++++++++-
 1 files changed, 14 insertions(+), 1 deletions(-)

diff --git a/include/linux/sunrpc/debug.h b/include/linux/sunrpc/debug.h
index 89458df..ea7ae15 100644
--- a/include/linux/sunrpc/debug.h
+++ b/include/linux/sunrpc/debug.h
@@ -52,7 +52,7 @@ #endif
 #define dprintk(args...)	dfprintk(FACILITY, ## args)
 
 #undef ifdebug
-#ifdef RPC_DEBUG			
+#ifdef RPC_DEBUG
 # define ifdebug(fac)		if (unlikely(rpc_debug & RPCDBG_##fac))
 # define dfprintk(fac, args...)	do { ifdebug(fac) printk(args); } while(0)
 # define RPC_IFDEBUG(x)		x
@@ -89,6 +89,19 @@ enum {
 	CTL_MIN_RESVPORT,
 	CTL_MAX_RESVPORT,
 	CTL_TRANSPORTS,
+	CTL_SVCRDMA,
+	CTL_RDMA_MAX_REQUESTS,
+	CTL_RDMA_MAX_REQ_SIZE,
+	CTL_RDMA_ORD,
+	CTL_RDMA_STAT_RECV,
+	CTL_RDMA_STAT_READ,
+	CTL_RDMA_STAT_WRITE,
+	CTL_RDMA_STAT_SQ_STARVE,
+	CTL_RDMA_STAT_RQ_STARVE,
+	CTL_RDMA_STAT_RQ_POLL,
+	CTL_RDMA_STAT_RQ_PROD,
+	CTL_RDMA_STAT_SQ_POLL,
+	CTL_RDMA_STAT_SQ_PROD
 };
 
 #endif /* _LINUX_SUNRPC_DEBUG_H_ */

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 03/10] rdma: SVCRMDA Header File
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 01/10] rdma: ONCRPC RDMA Header File Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 02/10] rdma: sysctl for SVCRDMA transport module Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 04/10] rdma: SVCRDMA Transport Module Tom Tucker
                   ` (7 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


This file defines the data types used by the SVCRDMA transport module.
The principle data structure is the transport specific extension to 
the svcxprt structure.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 include/linux/sunrpc/svc_rdma.h |  262 +++++++++++++++++++++++++++++++++++++++
 1 files changed, 262 insertions(+), 0 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
new file mode 100644
index 0000000..9c48fdc
--- /dev/null
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -0,0 +1,262 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#ifndef SVC_RDMA_H
+#define SVC_RDMA_H
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/svcsock.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#define SVCRDMA_DEBUG
+
+/* RPC/RDMA parameters and stats */
+extern unsigned int svcrdma_ord;
+extern unsigned int svcrdma_max_requests;
+extern unsigned int svcrdma_max_req_size;
+
+extern atomic_t rdma_stat_recv;
+extern atomic_t rdma_stat_read;
+extern atomic_t rdma_stat_write;
+extern atomic_t rdma_stat_sq_starve;
+extern atomic_t rdma_stat_rq_starve;
+extern atomic_t rdma_stat_rq_poll;
+extern atomic_t rdma_stat_rq_prod;
+extern atomic_t rdma_stat_sq_poll;
+extern atomic_t rdma_stat_sq_prod;
+
+#define RPCRDMA_VERSION 1
+
+/*
+ * Contexts are built when an RDMA request is created and are a
+ * record of the resources that can be recovered when the request
+ * completes.
+ */
+struct svc_rdma_op_ctxt {
+	struct svc_rdma_op_ctxt *next;
+	struct xdr_buf arg;
+	struct list_head dto_q;
+	enum ib_wr_opcode wr_op;
+	enum ib_wc_status wc_status;
+	u32 byte_len;
+	struct svcxprt_rdma *xprt;
+	unsigned long flags;
+	enum dma_data_direction direction;
+	int count;
+	struct ib_sge sge[RPCSVC_MAXPAGES];
+	struct page *pages[RPCSVC_MAXPAGES];
+};
+
+#define RDMACTXT_F_READ_DONE	1
+#define RDMACTXT_F_LAST_CTXT	2
+
+struct svcxprt_rdma {
+	struct svc_xprt      sc_xprt;		/* SVC transport structure */
+	struct rdma_cm_id    *sc_cm_id;		/* RDMA connection id */
+	struct list_head     sc_accept_q;	/* Conn. waiting accept */
+	int		     sc_ord;		/* RDMA read limit */
+	wait_queue_head_t    sc_read_wait;
+	int                  sc_max_sge;
+
+	int                  sc_sq_depth;	/* Depth of SQ */
+	atomic_t             sc_sq_count;	/* Number of SQ WR on queue */
+
+	int                  sc_max_requests;	/* Depth of RQ */
+	int                  sc_max_req_size;	/* Size of each RQ WR buf */
+
+	struct ib_pd         *sc_pd;
+
+	struct svc_rdma_op_ctxt  *sc_ctxt_head;
+	int		     sc_ctxt_cnt;
+	int		     sc_ctxt_bump;
+	int		     sc_ctxt_max;
+	spinlock_t	     sc_ctxt_lock;
+	struct list_head     sc_rq_dto_q;
+	spinlock_t	     sc_rq_dto_lock;
+	struct ib_qp         *sc_qp;
+	struct ib_cq         *sc_rq_cq;
+	struct ib_cq         *sc_sq_cq;
+	struct ib_mr         *sc_phys_mr;	/* MR for server memory */
+
+	spinlock_t	     sc_lock;		/* transport lock */
+
+	wait_queue_head_t    sc_send_wait;	/* SQ exhaustion waitlist */
+	int		     sc_flags;
+	struct list_head     sc_dto_q;		/* DTO tasklet I/O pending Q */
+	struct list_head     sc_read_complete_q;
+	spinlock_t           sc_read_complete_lock;
+};
+/* sc_flags */
+#define RDMAXPRT_RQ_PENDING	1
+#define RDMAXPRT_SQ_PENDING	2
+#define RDMAXPRT_CONN_PENDING	3
+
+#define RPCRDMA_LISTEN_BACKLOG  10
+/* The default ORD value is based on two outstanding full-size writes with a
+ * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ.  */
+#define RPCRDMA_ORD             (64/4)
+#define RPCRDMA_SQ_DEPTH_MULT   8
+#define RPCRDMA_MAX_THREADS     16
+#define RPCRDMA_MAX_REQUESTS    16
+#define RPCRDMA_MAX_REQ_SIZE    4096
+
+/* svc_rdma_marshal.c */
+extern void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *,
+				      int *, int *);
+extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
+extern int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *);
+extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
+				     struct rpcrdma_msg *,
+				     enum rpcrdma_errcode, u32 *);
+extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int);
+extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int);
+extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int,
+					    u32, u64, u32);
+extern void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *,
+					     struct rpcrdma_msg *,
+					     struct rpcrdma_msg *,
+					     enum rpcrdma_proc);
+extern int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *);
+
+/* svc_rdma_recvfrom.c */
+extern int svc_rdma_recvfrom(struct svc_rqst *);
+
+/* svc_rdma_sendto.c */
+extern int svc_rdma_sendto(struct svc_rqst *);
+
+/* svc_rdma_transport.c */
+extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
+extern int svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
+			       enum rpcrdma_errcode);
+struct page *svc_rdma_get_page(void);
+extern int svc_rdma_post_recv(struct svcxprt_rdma *);
+extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
+extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
+extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
+extern void svc_sq_reap(struct svcxprt_rdma *);
+extern void svc_rq_reap(struct svcxprt_rdma *);
+extern struct svc_xprt_class svc_rdma_class;
+extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
+
+/* svc_rdma.c */
+extern int svc_rdma_init(void);
+extern void svc_rdma_cleanup(void);
+
+/*
+ * Returns the address of the first read chunk or <nul> if no read chunk is
+ * present
+ */
+static inline struct rpcrdma_read_chunk *
+svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
+{
+	struct rpcrdma_read_chunk *ch =
+		(struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+
+	if (ch->rc_discrim == 0)
+		return NULL;
+
+	return ch;
+}
+
+/*
+ * Returns the address of the first read write array element or <nul> if no
+ * write array list is present
+ */
+static inline struct rpcrdma_write_array *
+svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
+{
+	if (rmsgp->rm_body.rm_chunks[0] != 0
+	    || rmsgp->rm_body.rm_chunks[1] == 0)
+		return NULL;
+
+	return (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[1];
+}
+
+/*
+ * Returns the address of the first reply array element or <nul> if no
+ * reply array is present
+ */
+static inline struct rpcrdma_write_array *
+svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
+{
+	struct rpcrdma_read_chunk *rch;
+	struct rpcrdma_write_array *wr_ary;
+	struct rpcrdma_write_array *rp_ary;
+
+	/* XXX: Need to fix when reply list may occur with read-list and/or
+	 * write list */
+	if (rmsgp->rm_body.rm_chunks[0] != 0 ||
+	    rmsgp->rm_body.rm_chunks[1] != 0)
+		return NULL;
+
+	rch = svc_rdma_get_read_chunk(rmsgp);
+	if (rch) {
+		while (rch->rc_discrim)
+			rch++;
+
+		/* The reply list follows an empty write array located
+		 * at 'rc_position' here. The reply array is at rc_target.
+		 */
+		rp_ary = (struct rpcrdma_write_array *)&rch->rc_target;
+
+		goto found_it;
+	}
+
+	wr_ary = svc_rdma_get_write_array(rmsgp);
+	if (wr_ary) {
+		rp_ary = (struct rpcrdma_write_array *)
+			&wr_ary->
+			wc_array[wr_ary->wc_nchunks].wc_target.rs_length;
+
+		goto found_it;
+	}
+
+	/* No read list, no write list */
+	rp_ary = (struct rpcrdma_write_array *)
+		&rmsgp->rm_body.rm_chunks[2];
+
+ found_it:
+	if (rp_ary->wc_discrim == 0)
+		return NULL;
+
+	return rp_ary;
+}
+#endif

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 04/10] rdma: SVCRDMA Transport Module
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
                   ` (2 preceding siblings ...)
  2007-10-19 21:56 ` [RFC,PATCH 03/10] rdma: SVCRMDA Header File Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 05/10] rdma: SVCRDMA Core Transport Services Tom Tucker
                   ` (6 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


This file implements the RDMA transport module initialization and
termination logic and registers the transport sysctl variables.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 net/sunrpc/svc_rdma.c |  280 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 280 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma.c b/net/sunrpc/svc_rdma.c
new file mode 100644
index 0000000..a8b1ed3
--- /dev/null
+++ b/net/sunrpc/svc_rdma.c
@@ -0,0 +1,280 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/sysctl.h>
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/* RPC/RDMA parameters */
+unsigned int svcrdma_ord = RPCRDMA_ORD;
+static unsigned int min_ord = 1;
+static unsigned int max_ord = 4096;
+unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
+static unsigned int min_max_requests = 4;
+static unsigned int max_max_requests = 16384;
+unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
+static unsigned int min_max_inline = 4096;
+static unsigned int max_max_inline = 65536;
+
+#define MAX_RDMA_STAT_LEN	32
+atomic_t rdma_stat_recv;
+atomic_t rdma_stat_read;
+atomic_t rdma_stat_write;
+atomic_t rdma_stat_sq_starve;
+atomic_t rdma_stat_rq_starve;
+atomic_t rdma_stat_rq_poll;
+atomic_t rdma_stat_rq_prod;
+atomic_t rdma_stat_sq_poll;
+atomic_t rdma_stat_sq_prod;
+
+/*
+ * This function implements reading and resetting an atomic_t stat
+ * variable through read/write to a proc file. Any write to the file
+ * resets the associated statistic to zero. Any read returns it's
+ * current value.
+ */
+static int read_reset_stat(ctl_table *table, int write,
+			   struct file *filp, void __user *buffer, size_t *lenp,
+			   loff_t *ppos)
+{
+	atomic_t *stat = (atomic_t *)table->data;
+	if (!stat)
+		return -EINVAL;
+
+	if (write)
+		atomic_set(stat, 0);
+	else {
+		char str_buf[32];
+		char *data;
+		int len = snprintf(str_buf, 32, "%d\n", atomic_read(stat));
+		if (len)
+			return -EFAULT;
+		len = strlen(str_buf);
+		if (*ppos > len) {
+			*lenp = 0;
+			return 0;
+		}
+		data = &str_buf[*ppos];
+		len -= *ppos;
+		if (len > *lenp)
+			len = *lenp;
+		if (len)
+			if (copy_to_user(str_buf, buffer, len))
+				return -EFAULT;
+		*lenp = len;
+		*ppos += len;
+	}
+	return 0;
+}
+
+static struct ctl_table_header *svcrdma_table_header;
+static ctl_table svcrdma_parm_table[] = {
+	{
+		.ctl_name	= CTL_RDMA_MAX_REQUESTS,
+		.procname	= "max_requests",
+		.data		= &svcrdma_max_requests,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_max_requests,
+		.extra2		= &max_max_requests
+	},
+	{
+		.ctl_name	= CTL_RDMA_MAX_REQ_SIZE,
+		.procname	= "max_req_size",
+		.data		= &svcrdma_max_req_size,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_max_inline,
+		.extra2		= &max_max_inline
+	},
+	{
+		.ctl_name	= CTL_RDMA_ORD,
+		.procname	= "max_outbound_read_requests",
+		.data		= &svcrdma_ord,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_ord,
+		.extra2		= &max_ord,
+	},
+
+	{
+		.ctl_name	= CTL_RDMA_STAT_READ,
+		.procname	= "rdma_stat_read",
+		.data		= &rdma_stat_read,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name	= CTL_RDMA_STAT_RECV,
+		.procname	= "rdma_stat_recv",
+		.data		= &rdma_stat_recv,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name	= CTL_RDMA_STAT_WRITE,
+		.procname	= "rdma_stat_write",
+		.data		= &rdma_stat_write,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name	= CTL_RDMA_STAT_SQ_STARVE,
+		.procname	= "rdma_stat_sq_starve",
+		.data		= &rdma_stat_sq_starve,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name	= CTL_RDMA_STAT_RQ_STARVE,
+		.procname	= "rdma_stat_rq_starve",
+		.data		= &rdma_stat_rq_starve,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name	= CTL_RDMA_STAT_RQ_POLL,
+		.procname	= "rdma_stat_rq_poll",
+		.data		= &rdma_stat_rq_poll,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name	= CTL_RDMA_STAT_RQ_PROD,
+		.procname	= "rdma_stat_rq_prod",
+		.data		= &rdma_stat_rq_prod,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name	= CTL_RDMA_STAT_SQ_POLL,
+		.procname	= "rdma_stat_sq_poll",
+		.data		= &rdma_stat_sq_poll,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name	= CTL_RDMA_STAT_SQ_PROD,
+		.procname	= "rdma_stat_sq_prod",
+		.data		= &rdma_stat_sq_prod,
+		.maxlen		= MAX_RDMA_STAT_LEN,
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name = 0,
+	},
+};
+
+static ctl_table svcrdma_table[] = {
+	{
+		.ctl_name	= CTL_SVCRDMA,
+		.procname	= "svc_rdma",
+		.mode		= 0555,
+		.child		= svcrdma_parm_table
+	},
+	{
+		.ctl_name = 0,
+	},
+};
+
+static ctl_table svcrdma_root_table[] = {
+	{
+		.ctl_name	= CTL_SUNRPC,
+		.procname	= "sunrpc",
+		.mode		= 0555,
+		.child		= svcrdma_table
+	},
+	{
+		.ctl_name = 0,
+	},
+};
+
+void svc_rdma_cleanup(void)
+{
+	dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
+	if (svcrdma_table_header) {
+		unregister_sysctl_table(svcrdma_table_header);
+		svcrdma_table_header = NULL;
+	}
+	svc_unreg_xprt_class(&svc_rdma_class);
+}
+
+int svc_rdma_init(void)
+{
+	dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
+	dprintk("\tsvcrdma_ord      : %d\n", svcrdma_ord);
+	dprintk("\tmax_requests     : %d\n", svcrdma_max_requests);
+	dprintk("\tsq_depth         : %d\n",
+		svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
+	dprintk("\tmax_inline       : %d\n", svcrdma_max_req_size);
+	if (!svcrdma_table_header)
+		svcrdma_table_header =
+			register_sysctl_table(svcrdma_root_table);
+
+	/* Register RDMA with the SVC transport switch */
+	svc_reg_xprt_class(&svc_rdma_class);
+	return 0;
+}
+MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
+MODULE_DESCRIPTION("SVC RDMA Transport");
+MODULE_LICENSE("Dual BSD/GPL");
+module_init(svc_rdma_init);
+module_exit(svc_rdma_cleanup);

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 05/10] rdma: SVCRDMA Core Transport Services
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
                   ` (3 preceding siblings ...)
  2007-10-19 21:56 ` [RFC,PATCH 04/10] rdma: SVCRDMA Transport Module Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 06/10] rdma: SVCRDMA recvfrom Tom Tucker
                   ` (5 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


This file implements the core transport data management and I/O
path. The I/O path for RDMA involves receiving callbacks on interrupt
context. Since all the svc transport locks are _bh locks we enqueue the
transport on a list, schedule a tasklet to dequeue data indications from
the RDMA completion queue. The tasklet in turn takes _bh locks to
enqueue receive data indications on a list for the transport. The
svc_rdma_recvfrom transport function dequeues data from this list in an
NFSD thread context.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 net/sunrpc/svc_rdma_transport.c | 1070 +++++++++++++++++++++++++++++++++++++++
 1 files changed, 1070 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma_transport.c b/net/sunrpc/svc_rdma_transport.c
new file mode 100644
index 0000000..6e62c2a
--- /dev/null
+++ b/net/sunrpc/svc_rdma_transport.c
@@ -0,0 +1,1070 @@
+/*
+ * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/svc_xprt.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+					struct sockaddr *sa, int salen,
+					int flags);
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
+static void svc_rdma_release_rqst(struct svc_rqst *);
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
+static void dto_tasklet_func(unsigned long data);
+static void svc_rdma_detach(struct svc_xprt *xprt);
+static void svc_rdma_free(struct svc_xprt *xprt);
+static int svc_rdma_has_wspace(struct svc_xprt *xprt);
+static void rq_cq_reap(struct svcxprt_rdma *xprt);
+static void sq_cq_reap(struct svcxprt_rdma *xprt);
+
+DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
+static spinlock_t dto_lock = SPIN_LOCK_UNLOCKED;
+static LIST_HEAD(dto_xprt_q);
+
+static struct svc_xprt_ops svc_rdma_ops = {
+	.xpo_create = svc_rdma_create,
+	.xpo_recvfrom = svc_rdma_recvfrom,
+	.xpo_sendto = svc_rdma_sendto,
+	.xpo_release_rqst = svc_rdma_release_rqst,
+	.xpo_detach = svc_rdma_detach,
+	.xpo_free = svc_rdma_free,
+	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+	.xpo_has_wspace = svc_rdma_has_wspace,
+	.xpo_accept = svc_rdma_accept,
+};
+
+struct svc_xprt_class svc_rdma_class = {
+	.xcl_name = "rdma",
+	.xcl_owner = THIS_MODULE,
+	.xcl_ops = &svc_rdma_ops,
+	.xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
+};
+
+static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
+{
+	int target;
+	int at_least_one = 0;
+	struct svc_rdma_op_ctxt *ctxt;
+
+	target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump,
+		     xprt->sc_ctxt_max);
+
+	spin_lock_bh(&xprt->sc_ctxt_lock);
+	while (xprt->sc_ctxt_cnt < target) {
+		xprt->sc_ctxt_cnt++;
+		spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+		ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+
+		spin_lock_bh(&xprt->sc_ctxt_lock);
+		if (ctxt) {
+			at_least_one = 1;
+			ctxt->next = xprt->sc_ctxt_head;
+			xprt->sc_ctxt_head = ctxt;
+		} else {
+			/* kmalloc failed...give up for now */
+			xprt->sc_ctxt_cnt--;
+			break;
+		}
+	}
+	spin_unlock_bh(&xprt->sc_ctxt_lock);
+	dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n",
+		xprt->sc_ctxt_max, xprt->sc_ctxt_cnt);
+	return at_least_one;
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+	struct svc_rdma_op_ctxt *ctxt;
+
+	while (1) {
+		spin_lock_bh(&xprt->sc_ctxt_lock);
+		if (unlikely(xprt->sc_ctxt_head == NULL)) {
+			/* Try to bump my cache. */
+			spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+			if (rdma_bump_context_cache(xprt))
+				continue;
+
+			printk(KERN_INFO "svcrdma: sleeping waiting for "
+			       "context memory on xprt=%p\n",
+			       xprt);
+			schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+			continue;
+		}
+		ctxt = xprt->sc_ctxt_head;
+		xprt->sc_ctxt_head = ctxt->next;
+		spin_unlock_bh(&xprt->sc_ctxt_lock);
+		ctxt->xprt = xprt;
+		INIT_LIST_HEAD(&ctxt->dto_q);
+		ctxt->count = 0;
+		break;
+	}
+	return ctxt;
+}
+
+void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
+{
+	struct svcxprt_rdma *xprt;
+	int i;
+
+	BUG_ON(!ctxt);
+	xprt = ctxt->xprt;
+	if (free_pages)
+		for (i = 0; i < ctxt->count; i++)
+			put_page(ctxt->pages[i]);
+
+	for (i = 0; i < ctxt->count; i++)
+		dma_unmap_single(xprt->sc_cm_id->device->dma_device,
+				 ctxt->sge[i].addr,
+				 ctxt->sge[i].length,
+				 ctxt->direction);
+	spin_lock_bh(&xprt->sc_ctxt_lock);
+	ctxt->next = xprt->sc_ctxt_head;
+	xprt->sc_ctxt_head = ctxt;
+	spin_unlock_bh(&xprt->sc_ctxt_lock);
+}
+
+/* ib_cq event handler */
+static void cq_event_handler(struct ib_event *event, void *context)
+{
+	struct svc_xprt *xprt = context;
+	dprintk("svcrdma: received CQ event id=%d, context=%p\n",
+		event->event, context);
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+}
+
+/* QP event handler */
+static void qp_event_handler(struct ib_event *event, void *context)
+{
+	struct svc_xprt *xprt = context;
+
+	switch (event->event) {
+	/* These are considered benign events */
+	case IB_EVENT_PATH_MIG:
+	case IB_EVENT_COMM_EST:
+	case IB_EVENT_SQ_DRAINED:
+	case IB_EVENT_QP_LAST_WQE_REACHED:
+		dprintk("svcrdma: QP event %d received for QP=%p\n",
+			event->event, event->element.qp);
+		break;
+	/* These are considered fatal events */
+	case IB_EVENT_PATH_MIG_ERR:
+	case IB_EVENT_QP_FATAL:
+	case IB_EVENT_QP_REQ_ERR:
+	case IB_EVENT_QP_ACCESS_ERR:
+	case IB_EVENT_DEVICE_FATAL:
+	default:
+		dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
+			"closing transport\n",
+			event->event, event->element.qp);
+		set_bit(XPT_CLOSE, &xprt->xpt_flags);
+		break;
+	}
+}
+
+/*
+ * Data Transfer Operation Tasklet
+ *
+ * Walks a list of transports with I/O pending, removing entries as
+ * they are added to the server's I/O pending list. Two bits indicate
+ * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
+ * spinlock that serializes access to the transport list with the RQ
+ * and SQ interrupt handlers.
+ */
+static void dto_tasklet_func(unsigned long data)
+{
+	struct svcxprt_rdma *xprt;
+	unsigned long flags;
+
+	spin_lock_irqsave(&dto_lock, flags);
+	while (!list_empty(&dto_xprt_q)) {
+		xprt = list_entry(dto_xprt_q.next,
+				  struct svcxprt_rdma, sc_dto_q);
+		list_del_init(&xprt->sc_dto_q);
+		spin_unlock_irqrestore(&dto_lock, flags);
+
+		if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
+			ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+			rq_cq_reap(xprt);
+			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+			/*
+			 * If data arrived before established event,
+			 * don't enqueue. This defers RPC I/O until the
+			 * RDMA connection is complete.
+			 */
+			if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+				svc_xprt_enqueue(&xprt->sc_xprt);
+		}
+
+		if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
+			ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+			sq_cq_reap(xprt);
+		}
+
+		spin_lock_irqsave(&dto_lock, flags);
+	}
+	spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+/*
+ * Receive Queue Completion Handler
+ *
+ * Since an RQ completion handler is called on interrupt context, we
+ * need to defer the handling of the I/O to a tasklet
+ */
+static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+	struct svcxprt_rdma *xprt = cq_context;
+	unsigned long flags;
+
+	/*
+	 * Set the bit regardless of whether or not it's on the list
+	 * because it may be on the list already due to an SQ
+	 * completion.
+	*/
+	set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+
+	/*
+	 * If this transport is not already on the DTO transport queue,
+	 * add it
+	 */
+	spin_lock_irqsave(&dto_lock, flags);
+	if (list_empty(&xprt->sc_dto_q))
+		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+	spin_unlock_irqrestore(&dto_lock, flags);
+
+	/* Tasklet does all the work to avoid irqsave locks. */
+	tasklet_schedule(&dto_tasklet);
+}
+
+/*
+ * rq_cq_reap - Process the RQ CQ.
+ *
+ * Take all completing WC off the CQE and enqueue the associated DTO
+ * context on the dto_q for the transport.
+ */
+static void rq_cq_reap(struct svcxprt_rdma *xprt)
+{
+	int ret;
+	struct ib_wc wc;
+	struct svc_rdma_op_ctxt *ctxt = NULL;
+
+	atomic_inc(&rdma_stat_rq_poll);
+
+	spin_lock_bh(&xprt->sc_rq_dto_lock);
+	while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
+		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+		ctxt->wc_status = wc.status;
+		ctxt->byte_len = wc.byte_len;
+		if (wc.status != IB_WC_SUCCESS) {
+			/* Close the transport */
+			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+			svc_rdma_put_context(ctxt, 1);
+			continue;
+		}
+		list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+	}
+	spin_unlock_bh(&xprt->sc_rq_dto_lock);
+
+	if (ctxt)
+		atomic_inc(&rdma_stat_rq_prod);
+}
+
+/*
+ * Send Queue Completion Handler - potentially called on interrupt context.
+ */
+static void sq_cq_reap(struct svcxprt_rdma *xprt)
+{
+	struct svc_rdma_op_ctxt *ctxt = NULL;
+	struct ib_wc wc;
+	struct ib_cq *cq = xprt->sc_sq_cq;
+	int ret;
+
+	atomic_inc(&rdma_stat_sq_poll);
+	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
+		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+		xprt = ctxt->xprt;
+
+		if (wc.status != IB_WC_SUCCESS)
+			/* Close the transport */
+			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+
+		/* Decrement used SQ WR count */
+		atomic_dec(&xprt->sc_sq_count);
+		wake_up(&xprt->sc_send_wait);
+
+		switch (ctxt->wr_op) {
+		case IB_WR_SEND:
+		case IB_WR_RDMA_WRITE:
+			svc_rdma_put_context(ctxt, 1);
+			break;
+
+		case IB_WR_RDMA_READ:
+			if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+				set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+				set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+				spin_lock_bh(&xprt->sc_read_complete_lock);
+				list_add_tail(&ctxt->dto_q,
+					      &xprt->sc_read_complete_q);
+				spin_unlock_bh(&xprt->sc_read_complete_lock);
+				svc_xprt_enqueue(&xprt->sc_xprt);
+			}
+			break;
+
+		default:
+			printk(KERN_ERR "svcrdma: unexpected completion type, "
+			       "opcode=%d, status=%d\n",
+			       wc.opcode, wc.status);
+			break;
+		}
+	}
+
+	if (ctxt)
+		atomic_inc(&rdma_stat_sq_prod);
+}
+
+static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+	struct svcxprt_rdma *xprt = cq_context;
+	unsigned long flags;
+
+	/*
+	 * Set the bit regardless of whether or not it's on the list
+	 * because it may be on the list already due to an RQ
+	 * completion.
+	*/
+	set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
+
+	/*
+	 * If this transport is not already on the DTO transport queue,
+	 * add it
+	 */
+	spin_lock_irqsave(&dto_lock, flags);
+	if (list_empty(&xprt->sc_dto_q))
+		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+	spin_unlock_irqrestore(&dto_lock, flags);
+
+	/* Tasklet does all the work to avoid irqsave locks. */
+	tasklet_schedule(&dto_tasklet);
+}
+
+static void create_context_cache(struct svcxprt_rdma *xprt,
+				 int ctxt_count, int ctxt_bump, int ctxt_max)
+{
+	struct svc_rdma_op_ctxt *ctxt;
+	int i;
+
+	xprt->sc_ctxt_max = ctxt_max;
+	xprt->sc_ctxt_bump = ctxt_bump;
+	xprt->sc_ctxt_cnt = 0;
+	xprt->sc_ctxt_head = NULL;
+	for (i = 0; i < ctxt_count; i++) {
+		ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+		if (ctxt) {
+			ctxt->next = xprt->sc_ctxt_head;
+			xprt->sc_ctxt_head = ctxt;
+			xprt->sc_ctxt_cnt++;
+		}
+	}
+}
+
+static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+{
+	struct svc_rdma_op_ctxt *next;
+	if (!ctxt)
+		return;
+
+	do {
+		next = ctxt->next;
+		kfree(ctxt);
+		ctxt = next;
+	} while (next);
+}
+
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
+					     int listener)
+{
+	struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
+
+	if (!cma_xprt)
+		return NULL;
+	svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv);
+	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
+	INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
+	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
+	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
+	init_waitqueue_head(&cma_xprt->sc_send_wait);
+
+	spin_lock_init(&cma_xprt->sc_lock);
+	spin_lock_init(&cma_xprt->sc_read_complete_lock);
+	spin_lock_init(&cma_xprt->sc_ctxt_lock);
+	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
+
+	cma_xprt->sc_ord = svcrdma_ord;
+
+	cma_xprt->sc_max_req_size = svcrdma_max_req_size;
+	cma_xprt->sc_max_requests = svcrdma_max_requests;
+	cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
+	atomic_set(&cma_xprt->sc_sq_count, 0);
+
+	if (!listener) {
+		int reqs = cma_xprt->sc_max_requests;
+		create_context_cache(cma_xprt,
+				     reqs << 1, /* starting size */
+				     reqs,	/* bump amount */
+				     reqs +
+				     cma_xprt->sc_sq_depth +
+				     RPCRDMA_MAX_THREADS + 1); /* max */
+		if (!cma_xprt->sc_ctxt_head) {
+			kfree(cma_xprt);
+			return NULL;
+		}
+		clear_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+	} else
+		set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+
+	return cma_xprt;
+}
+
+struct page *svc_rdma_get_page(void)
+{
+	struct page *page;
+
+	while ((page = alloc_page(GFP_KERNEL)) == NULL) {
+		/* If we can't get memory, wait a bit and try again */
+		printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 "
+		       "jiffies.\n");
+		schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
+	}
+	return page;
+}
+
+int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
+{
+	struct ib_recv_wr recv_wr, *bad_recv_wr;
+	struct svc_rdma_op_ctxt *ctxt;
+	struct page *page;
+	unsigned long pa;
+	int sge_no;
+	int buflen;
+	int ret;
+
+	ctxt = svc_rdma_get_context(xprt);
+	buflen = 0;
+	ctxt->direction = DMA_FROM_DEVICE;
+	for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
+		BUG_ON(sge_no >= xprt->sc_max_sge);
+		page = svc_rdma_get_page();
+		ctxt->pages[sge_no] = page;
+		pa = ib_dma_map_page(xprt->sc_cm_id->device,
+				     page, 0, PAGE_SIZE,
+				     DMA_FROM_DEVICE);
+		ctxt->sge[sge_no].addr = pa;
+		ctxt->sge[sge_no].length = PAGE_SIZE;
+		ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+		buflen += PAGE_SIZE;
+	}
+	ctxt->count = sge_no;
+	recv_wr.next = NULL;
+	recv_wr.sg_list = &ctxt->sge[0];
+	recv_wr.num_sge = ctxt->count;
+	recv_wr.wr_id = (u64)(unsigned long)ctxt;
+
+	ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+	return ret;
+}
+
+/*
+ * This function handles the CONNECT_REQUEST event on a listening
+ * endpoint. It is passed the cma_id for the _new_ connection. The context in
+ * this cma_id is inherited from the listening cma_id and is the svc_xprt
+ * structure for the listening endpoint.
+ *
+ * This function creates a new xprt for the new connection and enqueues it on
+ * the accept queue for the listent xprt. When the listen thread is kicked, it
+ * will call the recvfrom method on the listen xprt which will accept the new
+ * connection.
+ */
+static void handle_connect_req(struct rdma_cm_id *new_cma_id)
+{
+	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
+	struct svcxprt_rdma *newxprt;
+
+	/* Create a new transport */
+	newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
+	if (!newxprt) {
+		dprintk("svcrdma: failed to create new transport\n");
+		return;
+	}
+	newxprt->sc_cm_id = new_cma_id;
+	new_cma_id->context = newxprt;
+	dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
+		newxprt, newxprt->sc_cm_id, listen_xprt);
+
+	/*
+	 * Enqueue the new transport on the accept queue of the listening
+	 * transport
+	 */
+	spin_lock_bh(&listen_xprt->sc_lock);
+	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
+	spin_unlock_bh(&listen_xprt->sc_lock);
+
+	/*
+	 * Can't use svc_xprt_received here because we are not on a
+	 * rqstp thread
+	*/
+	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
+	listen_xprt->sc_xprt.xpt_pool = NULL;
+	svc_xprt_enqueue(&listen_xprt->sc_xprt);
+}
+
+/*
+ * Handles events generated on the listening endpoint. These events will be
+ * either be incoming connect requests or adapter removal  events.
+ */
+static int rdma_listen_handler(struct rdma_cm_id *cma_id,
+			       struct rdma_cm_event *event)
+{
+	struct svcxprt_rdma *xprt = cma_id->context;
+	int ret = 0;
+
+	switch (event->event) {
+	case RDMA_CM_EVENT_CONNECT_REQUEST:
+		dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
+			"event=%d\n", cma_id, cma_id->context, event->event);
+		handle_connect_req(cma_id);
+		break;
+
+	case RDMA_CM_EVENT_ESTABLISHED:
+		/* Accept complete */
+		dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
+			"cm_id=%p\n", xprt, cma_id);
+		break;
+
+	case RDMA_CM_EVENT_DEVICE_REMOVAL:
+		dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
+			xprt, cma_id);
+		if (xprt)
+			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+		break;
+
+	default:
+		dprintk("svcrdma: Unexpected event on listening endpoint %p, "
+			"event=%d\n", cma_id, event->event);
+		break;
+	}
+
+	return ret;
+}
+
+static int svc_rdma_copy_addr(struct sockaddr *dst, struct sockaddr *src)
+{
+	switch (src->sa_family) {
+	case AF_INET:
+		*((struct sockaddr_in *)dst) = *((struct sockaddr_in *)src);
+		return sizeof(struct sockaddr_in);
+	case AF_INET6:
+		*((struct sockaddr_in6 *)dst) = *((struct sockaddr_in6 *)src);
+		return sizeof(struct sockaddr_in6);
+	default:
+		return 0;
+	}
+}
+
+static int rdma_cma_handler(struct rdma_cm_id *cma_id,
+			    struct rdma_cm_event *event)
+{
+	struct svc_xprt *xprt = cma_id->context;
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	switch (event->event) {
+	case RDMA_CM_EVENT_ESTABLISHED:
+		/* Accept complete */
+		dprintk("svcrdma: Connection completed on DTO xprt=%p, "
+			"cm_id=%p\n", xprt, cma_id);
+		xprt->xpt_remotelen =
+			svc_rdma_copy_addr((struct sockaddr *)
+					   &xprt->xpt_remote,
+					   &cma_id->route.addr.dst_addr);
+		(void)svc_rdma_copy_addr((struct sockaddr *)
+					 &xprt->xpt_local,
+					 &cma_id->route.addr.src_addr);
+		clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
+		svc_xprt_enqueue(xprt);
+		break;
+	case RDMA_CM_EVENT_DISCONNECTED:
+		dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
+			xprt, cma_id);
+		if (xprt) {
+			set_bit(XPT_CLOSE, &xprt->xpt_flags);
+			svc_xprt_enqueue(xprt);
+		}
+		break;
+	case RDMA_CM_EVENT_DEVICE_REMOVAL:
+		dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
+			"event=%d\n", cma_id, xprt, event->event);
+		if (xprt) {
+			set_bit(XPT_CLOSE, &xprt->xpt_flags);
+			svc_xprt_enqueue(xprt);
+		}
+		break;
+	default:
+		dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
+			"event=%d\n", cma_id, event->event);
+		break;
+	}
+	return 0;
+}
+
+/*
+ * Create a listening RDMA service endpoint.
+ */
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+					struct sockaddr *sa, int salen,
+					int flags)
+{
+	struct rdma_cm_id *listen_id;
+	struct svcxprt_rdma *cma_xprt;
+	struct svc_xprt *xprt;
+	int ret;
+
+	dprintk("svcrdma: Creating RDMA socket\n");
+
+	cma_xprt = rdma_create_xprt(serv, 1);
+	if (!cma_xprt)
+		return ERR_PTR(ENOMEM);
+	xprt = &cma_xprt->sc_xprt;
+
+	listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
+	if (IS_ERR(listen_id)) {
+		rdma_destroy_xprt(cma_xprt);
+		dprintk("svcrdma: rdma_create_id failed = %ld\n",
+			PTR_ERR(listen_id));
+		return (void *)listen_id;
+	}
+	ret = rdma_bind_addr(listen_id, sa);
+	if (ret) {
+		rdma_destroy_xprt(cma_xprt);
+		rdma_destroy_id(listen_id);
+		dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
+		return ERR_PTR(ret);
+	}
+	cma_xprt->sc_cm_id = listen_id;
+
+	ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
+	if (ret) {
+		rdma_destroy_id(listen_id);
+		rdma_destroy_xprt(cma_xprt);
+		dprintk("svcrdma: rdma_listen failed = %d\n", ret);
+	}
+
+	(void)svc_rdma_copy_addr((struct sockaddr *)
+				 &cma_xprt->sc_xprt.xpt_local,
+				 &listen_id->route.addr.src_addr);
+	return &cma_xprt->sc_xprt;
+}
+
+/*
+ * This is the xpo_recvfrom function for listening endpoints. Its
+ * purpose is to accept incoming connections. The CMA callback handler
+ * has already created a new transport and attached it to the new CMA
+ * ID.
+ *
+ * There is a queue of pending connections hung on the listening
+ * transport. This queue contains the new svc_xprt structure. This
+ * function takes svc_xprt structures off the accept_q and completes
+ * the connection.
+ */
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *listen_rdma;
+	struct svcxprt_rdma *newxprt = NULL;
+	struct rdma_conn_param conn_param;
+	struct ib_qp_init_attr qp_attr;
+	struct ib_device_attr devattr;
+	int ret;
+	int i;
+
+	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	clear_bit(XPT_CONN, &xprt->xpt_flags);
+	/* Get the next entry off the accept list */
+	spin_lock_bh(&listen_rdma->sc_lock);
+	if (!list_empty(&listen_rdma->sc_accept_q)) {
+		newxprt = list_entry(listen_rdma->sc_accept_q.next,
+				     struct svcxprt_rdma, sc_accept_q);
+		list_del_init(&newxprt->sc_accept_q);
+	}
+	if (!list_empty(&listen_rdma->sc_accept_q))
+		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
+	spin_unlock_bh(&listen_rdma->sc_lock);
+	if (!newxprt)
+		return NULL;
+
+	dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
+		newxprt, newxprt->sc_cm_id);
+
+	ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
+	if (ret) {
+		dprintk("svcrdma: could not query device attributes on "
+			"device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
+		goto errout;
+	}
+
+	/* Qualify the transport resource defaults with the
+	 * capabilities of this particular device */
+	newxprt->sc_max_sge = min((size_t)devattr.max_sge,
+				  (size_t)RPCSVC_MAXPAGES);
+	newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
+				   (size_t)svcrdma_max_requests);
+	newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+
+	newxprt->sc_ord =  min((size_t)devattr.max_qp_rd_atom,
+			       (size_t)svcrdma_ord);
+
+	newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
+	if (IS_ERR(newxprt->sc_pd)) {
+		dprintk("svcrdma: error creating PD for connect request\n");
+		goto errout;
+	}
+	newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+					 sq_comp_handler,
+					 cq_event_handler,
+					 newxprt,
+					 newxprt->sc_sq_depth,
+					 0);
+	if (IS_ERR(newxprt->sc_sq_cq)) {
+		dprintk("svcrdma: error creating SQ CQ for connect request\n");
+		goto errout;
+	}
+	newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+					 rq_comp_handler,
+					 cq_event_handler,
+					 newxprt,
+					 newxprt->sc_max_requests,
+					 0);
+	if (IS_ERR(newxprt->sc_rq_cq)) {
+		dprintk("svcrdma: error creating RQ CQ for connect request\n");
+		goto errout;
+	}
+
+	memset(&qp_attr, 0, sizeof qp_attr);
+	qp_attr.event_handler = qp_event_handler;
+	qp_attr.qp_context = &newxprt->sc_xprt;
+	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
+	qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+	qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
+	qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
+	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+	qp_attr.qp_type = IB_QPT_RC;
+	qp_attr.send_cq = newxprt->sc_sq_cq;
+	qp_attr.recv_cq = newxprt->sc_rq_cq;
+	dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
+		"    cm_id->device=%p, sc_pd->device=%p\n"
+		"    cap.max_send_wr = %d\n"
+		"    cap.max_recv_wr = %d\n"
+		"    cap.max_send_sge = %d\n"
+		"    cap.max_recv_sge = %d\n",
+		newxprt->sc_cm_id, newxprt->sc_pd,
+		newxprt->sc_cm_id->device, newxprt->sc_pd->device,
+		qp_attr.cap.max_send_wr,
+		qp_attr.cap.max_recv_wr,
+		qp_attr.cap.max_send_sge,
+		qp_attr.cap.max_recv_sge);
+
+	ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+	if (ret) {
+		/*
+		 * XXX: This is a hack. We need a xx_request_qp interface
+		 * that will adjust the qp_attr's with a best-effort
+		 * number
+		 */
+		qp_attr.cap.max_send_sge -= 2;
+		qp_attr.cap.max_recv_sge -= 2;
+		ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd,
+				     &qp_attr);
+		if (ret) {
+			dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
+			goto errout;
+		}
+		newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
+		newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
+		newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
+		newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
+	}
+	newxprt->sc_qp = newxprt->sc_cm_id->qp;
+
+	/* Register all of physical memory */
+	newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
+					    IB_ACCESS_LOCAL_WRITE |
+					    IB_ACCESS_REMOTE_WRITE);
+	if (IS_ERR(newxprt->sc_phys_mr)) {
+		dprintk("svcrdma: Failed to create DMA MR ret=%d\n", ret);
+		goto errout;
+	}
+
+	/* Post receive buffers */
+	for (i = 0; i < newxprt->sc_max_requests; i++) {
+		ret = svc_rdma_post_recv(newxprt);
+		if (ret) {
+			dprintk("svcrdma: failure posting receive buffers\n");
+			goto errout;
+		}
+	}
+
+	/* Swap out the handler */
+	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+
+	/* Accept Connection */
+	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
+	memset(&conn_param, 0, sizeof conn_param);
+	conn_param.responder_resources = 0;
+	conn_param.initiator_depth = newxprt->sc_ord;
+	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
+	if (ret) {
+		dprintk("svcrdma: failed to accept new connection, ret=%d\n",
+		       ret);
+		goto errout;
+	}
+
+	dprintk("svcrdma: new connection %p accepted with the following "
+		"attributes:\n"
+		"    local_ip        : %d.%d.%d.%d\n"
+		"    local_port	     : %d\n"
+		"    remote_ip       : %d.%d.%d.%d\n"
+		"    remote_port     : %d\n"
+		"    max_sge         : %d\n"
+		"    sq_depth        : %d\n"
+		"    max_requests    : %d\n"
+		"    ord             : %d\n",
+		newxprt,
+		NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
+			 route.addr.src_addr)->sin_addr.s_addr),
+		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+		       route.addr.src_addr)->sin_port),
+		NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
+			 route.addr.dst_addr)->sin_addr.s_addr),
+		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+		       route.addr.dst_addr)->sin_port),
+		newxprt->sc_max_sge,
+		newxprt->sc_sq_depth,
+		newxprt->sc_max_requests,
+		newxprt->sc_ord);
+
+	ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+	ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+	return &newxprt->sc_xprt;
+
+ errout:
+	dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
+	rdma_destroy_id(newxprt->sc_cm_id);
+	rdma_destroy_xprt(newxprt);
+	return NULL;
+}
+
+/* No per-request resources cached by the transport */
+static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
+{
+}
+
+/* Disable data ready events for this connection */
+static void svc_rdma_detach(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	unsigned long flags;
+
+	dprintk("svc: svc_rdma_detach(%p)\n", xprt);
+	/*
+	 * Shutdown the connection. This will ensure we don't get any
+	 * more events from the provider.
+	 */
+	rdma_disconnect(rdma->sc_cm_id);
+	rdma_destroy_id(rdma->sc_cm_id);
+
+	/* We may already be on the DTO list */
+	spin_lock_irqsave(&dto_lock, flags);
+	if (!list_empty(&rdma->sc_dto_q))
+		list_del_init(&rdma->sc_dto_q);
+	spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+	dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+	rdma_destroy_xprt(rdma);
+	kfree(rdma);
+}
+
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
+{
+	if (xprt->sc_qp)
+		ib_destroy_qp(xprt->sc_qp);
+
+	if (xprt->sc_sq_cq)
+		ib_destroy_cq(xprt->sc_sq_cq);
+
+	if (xprt->sc_rq_cq)
+		ib_destroy_cq(xprt->sc_rq_cq);
+
+	if (xprt->sc_pd)
+		ib_dealloc_pd(xprt->sc_pd);
+
+	destroy_context_cache(xprt->sc_ctxt_head);
+}
+
+static int svc_rdma_has_wspace(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+	/*
+	 * If there are fewer SQ WR available than required to send a
+	 * simple response, return false.
+	 */
+	if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
+		return 0;
+
+	/*
+	 * ...or there are already waiters on the SQ,
+	 * return false.
+	 */
+	if (waitqueue_active(&rdma->sc_send_wait))
+		return 0;
+
+	/* Otherwise return true. */
+	return 1;
+}
+
+int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
+{
+	struct ib_send_wr *bad_wr;
+	int ret;
+
+	if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
+		return 0;
+
+	BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
+	BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
+		wr->opcode);
+	/* If the SQ is full, wait until an SQ entry is available */
+	while (1) {
+		spin_lock_bh(&xprt->sc_lock);
+		if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
+			spin_unlock_bh(&xprt->sc_lock);
+			atomic_inc(&rdma_stat_sq_starve);
+			/* See if we can reap some SQ WR */
+			sq_cq_reap(xprt);
+
+			/* Wait until SQ WR available if SQ still full */
+			wait_event(xprt->sc_send_wait,
+				   atomic_read(&xprt->sc_sq_count) <
+				   xprt->sc_sq_depth);
+			continue;
+		}
+		/* Bumped used SQ WR count and post */
+		ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
+		if (!ret)
+			atomic_inc(&xprt->sc_sq_count);
+		else
+			dprintk("svcrdma: failed to post SQ WR rc=%d, "
+			       "sc_sq_count=%d, sc_sq_depth=%d\n",
+			       ret, atomic_read(&xprt->sc_sq_count),
+			       xprt->sc_sq_depth);
+		spin_unlock_bh(&xprt->sc_lock);
+		break;
+	}
+	return ret;
+}
+
+int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+			enum rpcrdma_errcode err)
+{
+	struct ib_send_wr err_wr;
+	struct ib_sge sge;
+	struct page *p;
+	struct svc_rdma_op_ctxt *ctxt;
+	u32 *va;
+	int length;
+	int ret;
+
+	p = svc_rdma_get_page();
+	va = page_address(p);
+
+	/* XDR encode error */
+	length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+	/* Prepare SGE for local address */
+	sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
+				   p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
+	sge.lkey = xprt->sc_phys_mr->lkey;
+	sge.length = length;
+
+	ctxt = svc_rdma_get_context(xprt);
+	ctxt->count = 1;
+	ctxt->pages[0] = p;
+
+	/* Prepare SEND WR */
+	memset(&err_wr, 0, sizeof err_wr);
+	ctxt->wr_op = IB_WR_SEND;
+	err_wr.wr_id = (unsigned long)ctxt;
+	err_wr.sg_list = &sge;
+	err_wr.num_sge = 1;
+	err_wr.opcode = IB_WR_SEND;
+	err_wr.send_flags = IB_SEND_SIGNALED;
+
+	/* Post It */
+	ret = svc_rdma_send(xprt, &err_wr);
+	if (ret) {
+		dprintk("svcrdma: Error posting send = %d\n", ret);
+		svc_rdma_put_context(ctxt, 1);
+	}
+
+	return ret;
+}

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 06/10] rdma: SVCRDMA recvfrom
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
                   ` (4 preceding siblings ...)
  2007-10-19 21:56 ` [RFC,PATCH 05/10] rdma: SVCRDMA Core Transport Services Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 07/10] rdma: SVCRDMA sendto Tom Tucker
                   ` (4 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


This file implements the RDMA transport recvfrom function. The function
dequeues work reqeust completion contexts from an I/O list that it shares
with the I/O tasklet in svc_rdma_transport.c. For ONCRPC RDMA, an RPC may
not be complete when it is received. Instead, the RDMA header that precedes
the RPC message informs the transport where to get the RPC data from on
the client and where to place it in the RPC message before it is delivered
to the server. The svc_rdma_recvfrom function therefore, parses this RDMA
header and issues any necessary RDMA operations to fetch the remainder of
the RPC from the client.

Special handling is required when the request involves an RDMA_READ.
In this case, recvfrom submits the RDMA_READ requests to the underlying
transport driver and then returns 0. When the transport
completes the last RDMA_READ for the request, it enqueues it on a
read completion queue and enqueues the transport. The recvfrom code
favors this queue over the regular DTO queue when satisfying reads.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 net/sunrpc/svc_rdma_recvfrom.c |  576 ++++++++++++++++++++++++++++++++++++++++
 1 files changed, 576 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma_recvfrom.c b/net/sunrpc/svc_rdma_recvfrom.c
new file mode 100644
index 0000000..b940bdf
--- /dev/null
+++ b/net/sunrpc/svc_rdma_recvfrom.c
@@ -0,0 +1,576 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/*
+ * Replace the pages in the rq_argpages array with the pages from the SGE in
+ * the RDMA_RECV completion. The SGL should contain full pages up until the
+ * last one.
+ */
+static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
+			       struct svc_rdma_op_ctxt *ctxt,
+			       u32 byte_count)
+{
+	struct page *page;
+	u32 bc;
+	int sge_no;
+
+	/* Swap the page in the SGE with the page in argpages */
+	page = ctxt->pages[0];
+	put_page(rqstp->rq_pages[0]);
+	rqstp->rq_pages[0] = page;
+
+	/* Set up the XDR head */
+	rqstp->rq_arg.head[0].iov_base = page_address(page);
+	rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
+	rqstp->rq_arg.len = byte_count;
+	rqstp->rq_arg.buflen = byte_count;
+
+	/* Compute bytes past head in the SGL */
+	bc = byte_count - rqstp->rq_arg.head[0].iov_len;
+
+	/* If data remains, store it in the pagelist */
+	rqstp->rq_arg.page_len = bc;
+	rqstp->rq_arg.page_base = 0;
+	rqstp->rq_arg.pages = &rqstp->rq_pages[1];
+	sge_no = 1;
+	while (bc && sge_no < ctxt->count) {
+		page = ctxt->pages[sge_no];
+		put_page(rqstp->rq_pages[sge_no]);
+		rqstp->rq_pages[sge_no] = page;
+		bc -= min(bc, ctxt->sge[sge_no].length);
+		rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
+		sge_no++;
+	}
+	rqstp->rq_respages = &rqstp->rq_pages[sge_no];
+
+	/* We should never run out of SGE because the limit is defined to
+	 * support the max allowed RPC data length
+	 */
+	BUG_ON(bc && (sge_no == ctxt->count));
+	BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
+	       != byte_count);
+	BUG_ON(rqstp->rq_arg.len != byte_count);
+
+	/* If not all pages were used from the SGL, free the remaining ones */
+	bc = sge_no;
+	while (sge_no < ctxt->count) {
+		page = ctxt->pages[sge_no++];
+		put_page(page);
+	}
+	ctxt->count = bc;
+
+	/* Set up tail */
+	rqstp->rq_arg.tail[0].iov_base = NULL;
+	rqstp->rq_arg.tail[0].iov_len = 0;
+}
+
+struct chunk_sge {
+	int start;		/* sge no for this chunk */
+	int count;		/* sge count for this chunk */
+};
+
+/* Encode a read-chunk-list as an array of IB SGE
+ *
+ * Assumptions:
+ * - chunk[0]->position points to pages[0] at an offset of 0
+ * - pages[] is not physically or virtually contigous and consists of
+ *   PAGE_SIZE elements.
+ *
+ * Output:
+ * - sge array pointing into pages[] array.
+ * - chunk_sge array specifying sge index and count for each
+ *   chunk in the read list
+ *
+ */
+static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
+			   struct svc_rqst *rqstp,
+			   struct svc_rdma_op_ctxt *head,
+			   struct rpcrdma_msg *rmsgp,
+			   struct ib_sge *sge,
+			   struct chunk_sge *ch_sge_ary,
+			   int ch_count,
+			   int byte_count)
+{
+	int sge_no;
+	int sge_bytes;
+	int page_off;
+	int page_no;
+	int ch_bytes;
+	int ch_no;
+	struct rpcrdma_read_chunk *ch;
+
+	sge_no = 0;
+	page_no = 0;
+	page_off = 0;
+	ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+	ch_no = 0;
+	ch_bytes = ch->rc_target.rs_length;
+	head->arg.head[0] = rqstp->rq_arg.head[0];
+	head->arg.tail[0] = rqstp->rq_arg.tail[0];
+	head->arg.pages = &head->pages[head->count];
+	head->sge[0].length = head->count; /* save count of hdr pages */
+	head->arg.page_base = 0;
+	head->arg.page_len = ch_bytes;
+	head->arg.len = rqstp->rq_arg.len + ch_bytes;
+	head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
+	head->count++;
+	ch_sge_ary[0].start = 0;
+	while (byte_count) {
+		sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
+		sge[sge_no].addr =
+			ib_dma_map_page(xprt->sc_cm_id->device,
+					rqstp->rq_arg.pages[page_no],
+					page_off, sge_bytes,
+					DMA_FROM_DEVICE);
+		sge[sge_no].length = sge_bytes;
+		sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+		/*
+		 * Don't bump head->count here because the same page
+		 * may be used by multiple SGE.
+		 */
+		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
+		rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+
+		byte_count -= sge_bytes;
+		ch_bytes -= sge_bytes;
+		sge_no++;
+		/*
+		 * If all bytes for this chunk have been mapped to an
+		 * SGE, move to the next SGE
+		 */
+		if (ch_bytes == 0) {
+			ch_sge_ary[ch_no].count =
+				sge_no - ch_sge_ary[ch_no].start;
+			ch_no++;
+			ch++;
+			ch_sge_ary[ch_no].start = sge_no;
+			ch_bytes = ch->rc_target.rs_length;
+			/* If bytes remaining account for next chunk */
+			if (byte_count) {
+				head->arg.page_len += ch_bytes;
+				head->arg.len += ch_bytes;
+				head->arg.buflen += ch_bytes;
+			}
+		}
+		/*
+		 * If this SGE consumed all of the page, move to the
+		 * next page
+		 */
+		if ((sge_bytes + page_off) == PAGE_SIZE) {
+			page_no++;
+			page_off = 0;
+			/*
+			 * If there are still bytes left to map, bump
+			 * the page count
+			 */
+			if (byte_count)
+				head->count++;
+		} else
+			page_off += sge_bytes;
+	}
+	BUG_ON(byte_count != 0);
+	return sge_no;
+}
+
+static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt,
+			      struct ib_sge *sge,
+			      u64 *sgl_offset,
+			      int count)
+{
+	int i;
+
+	ctxt->count = count;
+	for (i = 0; i < count; i++) {
+		ctxt->sge[i].addr = sge[i].addr;
+		ctxt->sge[i].length = sge[i].length;
+		*sgl_offset = *sgl_offset + sge[i].length;
+	}
+}
+
+static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
+{
+#ifdef RDMA_TRANSPORT_IWARP
+	if ((RDMA_TRANSPORT_IWARP ==
+	     rdma_node_get_transport(xprt->sc_cm_id->
+				     device->node_type))
+	    && sge_count > 1)
+		return 1;
+	else
+#endif
+		return min_t(int, sge_count, xprt->sc_max_sge);
+}
+
+/*
+ * Use RDMA_READ to read data from the advertised client buffer into the
+ * XDR stream starting at rq_arg.head[0].iov_base.
+ * Each chunk in the array
+ * contains the following fields:
+ * discrim      - '1', This isn't used for data placement
+ * position     - The xdr stream offset (the same for every chunk)
+ * handle       - RMR for client memory region
+ * length       - data transfer length
+ * offset       - 64 bit tagged offset in remote memory region
+ *
+ * On our side, we need to read into a pagelist. The first page immediately
+ * follows the RPC header.
+ *
+ * This function returns 1 to indicate success. The data is not yet in
+ * the pagelist and therefore the RPC request must be deferred. The
+ * I/O completion will enqueue the transport again and
+ * svc_rdma_recvfrom will complete the request.
+ *
+ * NOTE: The ctxt must not be touched after the last WR has been posted
+ * because the I/O completion processing may occur on another
+ * processor and free / modify the context. Ne touche pas!
+ */
+static int rdma_read_xdr(struct svcxprt_rdma *xprt,
+			 struct rpcrdma_msg *rmsgp,
+			 struct svc_rqst *rqstp,
+			 struct svc_rdma_op_ctxt *hdr_ctxt)
+{
+	struct ib_send_wr read_wr;
+	int err = 0;
+	int ch_no;
+	struct ib_sge *sge;
+	int ch_count;
+	int byte_count;
+	int sge_count;
+	u64 sgl_offset;
+	struct rpcrdma_read_chunk *ch;
+	struct svc_rdma_op_ctxt *ctxt = NULL;
+	struct svc_rdma_op_ctxt *head;
+	struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+	struct svc_rdma_op_ctxt *tmp_ch_ctxt;
+	struct chunk_sge *ch_sge_ary;
+
+	/* If no read list is present, return 0 */
+	ch = svc_rdma_get_read_chunk(rmsgp);
+	if (!ch)
+		return 0;
+
+	/* Allocate temporary contexts to keep SGE */
+	BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge));
+	tmp_sge_ctxt = svc_rdma_get_context(xprt);
+	sge = tmp_sge_ctxt->sge;
+	tmp_ch_ctxt = svc_rdma_get_context(xprt);
+	ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
+
+	svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
+	sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
+				    sge, ch_sge_ary,
+				    ch_count, byte_count);
+	head = svc_rdma_get_context(xprt);
+	sgl_offset = 0;
+	ch_no = 0;
+
+	for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+	     ch->rc_discrim != 0; ch++, ch_no++) {
+	next_sge:
+		if (!ctxt)
+			ctxt = head;
+		else {
+			ctxt->next = svc_rdma_get_context(xprt);
+			ctxt = ctxt->next;
+		}
+		ctxt->next = NULL;
+		ctxt->direction = DMA_FROM_DEVICE;
+		clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+		if ((ch+1)->rc_discrim == 0) {
+			/*
+			 * Checked in sq_cq_reap to see if we need to
+			 * be enqueued
+			 */
+			set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+			ctxt->next = hdr_ctxt;
+			hdr_ctxt->next = head;
+		}
+
+		/* Prepare READ WR */
+		memset(&read_wr, 0, sizeof read_wr);
+		ctxt->wr_op = IB_WR_RDMA_READ;
+		read_wr.wr_id = (unsigned long)ctxt;
+		read_wr.opcode = IB_WR_RDMA_READ;
+		read_wr.send_flags = IB_SEND_SIGNALED;
+		read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
+		read_wr.wr.rdma.remote_addr =
+			get_unaligned(&(ch->rc_target.rs_offset)) +
+			sgl_offset;
+		read_wr.sg_list = &sge[ch_sge_ary[ch_no].start];
+		read_wr.num_sge =
+			rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count);
+		rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start],
+				  &sgl_offset,
+				  read_wr.num_sge);
+
+		/* Post the read */
+		err = svc_rdma_send(xprt, &read_wr);
+		if (err) {
+			printk(KERN_ERR "svcrdma: Error posting send = %d\n",
+			       err);
+			/*
+			 * Break the circular list so free knows when
+			 * to stop if the error happened to occur on
+			 * the last read
+			 */
+			ctxt->next = NULL;
+			goto out;
+		}
+		atomic_inc(&rdma_stat_read);
+
+		if (read_wr.num_sge < ch_sge_ary[ch_no].count) {
+			ch_sge_ary[ch_no].count -= read_wr.num_sge;
+			ch_sge_ary[ch_no].start += read_wr.num_sge;
+			goto next_sge;
+		}
+		sgl_offset = 0;
+		err = 0;
+	}
+
+ out:
+	svc_rdma_put_context(tmp_sge_ctxt, 0);
+	svc_rdma_put_context(tmp_ch_ctxt, 0);
+
+	/* Detach arg pages. svc_recv will replenish them */
+	for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
+		rqstp->rq_pages[ch_no] = NULL;
+
+	/*
+	 * Detach res pages. svc_release must see a resused count of
+	 * zero or it will attempt to put them.
+	 */
+	while (rqstp->rq_resused)
+		rqstp->rq_respages[--rqstp->rq_resused] = NULL;
+
+	if (err) {
+		printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
+		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+		/* Free the linked list of read contexts */
+		while (head != NULL) {
+			ctxt = head->next;
+			svc_rdma_put_context(head, 1);
+			head = ctxt;
+		}
+		return 0;
+	}
+
+	return 1;
+}
+
+static int rdma_read_complete(struct svc_rqst *rqstp,
+			      struct svc_rdma_op_ctxt *data)
+{
+	struct svc_rdma_op_ctxt *head = data->next;
+	struct svcxprt_rdma *rdma_xprt =
+		container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
+	int page_no;
+	int ret;
+
+	BUG_ON(!head);
+
+	/* Copy RPC pages */
+	for (page_no = 0; page_no < head->count; page_no++) {
+		put_page(rqstp->rq_pages[page_no]);
+		rqstp->rq_pages[page_no] = head->pages[page_no];
+	}
+	/* Point rq_arg.pages past header */
+	rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length];
+	rqstp->rq_arg.page_len = head->arg.page_len;
+	rqstp->rq_arg.page_base = head->arg.page_base;
+
+	/* rq_respages starts after the last arg page */
+	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
+	rqstp->rq_resused = 0;
+
+	/* Rebuild rq_arg head and tail. */
+	rqstp->rq_arg.head[0] = head->arg.head[0];
+	rqstp->rq_arg.tail[0] = head->arg.tail[0];
+	rqstp->rq_arg.len = head->arg.len;
+	rqstp->rq_arg.buflen = head->arg.buflen;
+
+	rqstp->rq_prot = IPPROTO_MAX;
+	memcpy(&rqstp->rq_addr,
+	       &rdma_xprt->sc_cm_id->route.addr.dst_addr,
+	       sizeof(rqstp->rq_addr));
+	rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
+
+	/*
+	 * Free the contexts we used to build the RDMA_READ. We have
+	 * to be careful here because the context list uses the same
+	 * next pointer used to chain the contexts associated with the
+	 * RDMA_READ
+	 */
+	data->next = NULL;	/* terminate circular list */
+	do {
+		data = head->next;
+		svc_rdma_put_context(head, 0);
+		head = data;
+	} while (head != NULL);
+
+	ret = rqstp->rq_arg.head[0].iov_len
+		+ rqstp->rq_arg.page_len
+		+ rqstp->rq_arg.tail[0].iov_len;
+	dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
+		"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+		ret, rqstp->rq_arg.len,	rqstp->rq_arg.head[0].iov_base,
+		rqstp->rq_arg.head[0].iov_len);
+	svc_xprt_received(rqstp->rq_xprt);
+	return ret;
+}
+
+/*
+ * Set up the rqstp thread context to point to the RQ buffer. If
+ * necessary, pull additional data from the client with an RDMA_READ
+ * request.
+ */
+int svc_rdma_recvfrom(struct svc_rqst *rqstp)
+{
+	struct svc_xprt *xprt = rqstp->rq_xprt;
+	struct svcxprt_rdma *rdma_xprt =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	struct svc_rdma_op_ctxt *ctxt = NULL;
+	struct rpcrdma_msg *rmsgp;
+	int ret = 0;
+	int len;
+
+	dprintk("svcrdma: rqstp=%p\n", rqstp);
+	spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
+	if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
+		ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
+				  struct svc_rdma_op_ctxt,
+				  dto_q);
+		list_del_init(&ctxt->dto_q);
+	}
+	spin_unlock_bh(&rdma_xprt->sc_read_complete_lock);
+	if (ctxt)
+		return rdma_read_complete(rqstp, ctxt);
+
+	spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
+	if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
+		ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
+				  struct svc_rdma_op_ctxt,
+				  dto_q);
+		list_del_init(&ctxt->dto_q);
+	} else {
+		atomic_inc(&rdma_stat_rq_starve);
+		clear_bit(XPT_DATA, &xprt->xpt_flags);
+		ctxt = NULL;
+	}
+	spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+	if (!ctxt) {
+		/* This is the EAGAIN path. The svc_recv routine will
+		 * return -EAGAIN, the nfsd thread will go to call into
+		 * svc_recv again and we shouldn't be on the active
+		 * transport list
+		 */
+		if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
+			goto close_out;
+
+		BUG_ON(ret);
+		goto out;
+	}
+	dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
+		ctxt, rdma_xprt, rqstp, ctxt->wc_status);
+	BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
+	atomic_inc(&rdma_stat_recv);
+
+	/* rqstp struct expects transport to fill in peer address */
+	rqstp->rq_prot = IPPROTO_MAX;
+	memcpy(&rqstp->rq_addr,
+	       &rdma_xprt->sc_cm_id->route.addr.dst_addr,
+	       sizeof(rqstp->rq_addr));
+	rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
+
+	/* Build up the XDR from the receive buffers. */
+	rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
+
+	/* Decode the RDMA header. */
+	len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+	rqstp->rq_xprt_hlen = len;
+
+	/* If the request is invalid, reply with an error */
+	if (len < 0) {
+		if (len == -ENOSYS)
+			(void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
+		goto close_out;
+	}
+
+	/* Read read-list data. If we would need to wait, defer it */
+	if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) {
+		svc_xprt_received(xprt);
+		return 0;
+	}
+
+	ret = rqstp->rq_arg.head[0].iov_len
+		+ rqstp->rq_arg.page_len
+		+ rqstp->rq_arg.tail[0].iov_len;
+	svc_rdma_put_context(ctxt, 0);
+ out:
+	dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
+		"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+		ret, rqstp->rq_arg.len,
+		rqstp->rq_arg.head[0].iov_base,
+		rqstp->rq_arg.head[0].iov_len);
+	svc_xprt_received(xprt);
+	return ret;
+
+ close_out:
+	if (ctxt)
+		svc_rdma_put_context(ctxt, 1);
+	dprintk("svcrdma: transport %p is closing\n", xprt);
+	/*
+	 * Set the close bit and enqueue it. svc_recv will see the
+	 * close bit and call svc_xprt_delete
+	 */
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+	svc_xprt_received(xprt);
+	return 0;
+}

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 07/10] rdma: SVCRDMA sendto
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
                   ` (5 preceding siblings ...)
  2007-10-19 21:56 ` [RFC,PATCH 06/10] rdma: SVCRDMA recvfrom Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 08/10] rdma: ONCRPC RDMA protocol marshalling Tom Tucker
                   ` (3 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


This file implements the RDMA transport sendto function. A RPC reply
on an RDMA transport consists of some number of RDMA_WRITE requests 
followed by an RDMA_SEND request. The sendto function parses the 
ONCRPC RDMA reply header to determine how to send the reply back to 
the client. The send queue is sized so as to be able to send complete 
replies for requests in most cases.  In the event that there are not 
enough SQ WR slots to reply, e.g.  big data, the send will block the 
NFSD thread. The I/O callback functions in svc_rdma_transport.c that 
reap WR completions wake any waiters blocked on the SQ. In general,
the goal is not to block NFSD threads and the has_wspace method
stall requests when the SQ is nearly full. 


Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 net/sunrpc/svc_rdma_sendto.c |  515 ++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 515 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma_sendto.c b/net/sunrpc/svc_rdma_sendto.c
new file mode 100644
index 0000000..cbedfd1
--- /dev/null
+++ b/net/sunrpc/svc_rdma_sendto.c
@@ -0,0 +1,515 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/* Encode an XDR as an array of IB SGE
+ *
+ * Assumptions:
+ * - head[0] is physically contiguous.
+ * - tail[0] is physically contiguous.
+ * - pages[] is not physically or virtually contigous and consists of
+ *   PAGE_SIZE elements.
+ *
+ * Output:
+ * SGE[0]              reserved for RCPRDMA header
+ * SGE[1]              data from xdr->head[]
+ * SGE[2..sge_count-2] data from xdr->pages[]
+ * SGE[sge_count-1]    data from xdr->tail.
+ *
+ */
+static struct ib_sge *xdr_to_sge(struct svcxprt_rdma *xprt,
+				 struct xdr_buf *xdr,
+				 struct ib_sge *sge,
+				 int *sge_count)
+{
+	/* Max we need is the length of the XDR / pagesize + one for
+	 * head + one for tail + one for RPCRDMA header
+	 */
+	int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
+	int sge_no;
+	u32 byte_count = xdr->len;
+	u32 sge_bytes;
+	u32 page_bytes;
+	int page_off;
+	int page_no;
+
+	/* Skip the first sge, this is for the RPCRDMA header */
+	sge_no = 1;
+
+	/* Head SGE */
+	sge[sge_no].addr = ib_dma_map_single(xprt->sc_cm_id->device,
+					     xdr->head[0].iov_base,
+					     xdr->head[0].iov_len,
+					     DMA_TO_DEVICE);
+	sge_bytes = min_t(u32, byte_count, xdr->head[0].iov_len);
+	byte_count -= sge_bytes;
+	sge[sge_no].length = sge_bytes;
+	sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+	sge_no++;
+
+	/* pages SGE */
+	page_no = 0;
+	page_bytes = xdr->page_len;
+	page_off = xdr->page_base;
+	while (byte_count && page_bytes) {
+		sge_bytes = min_t(u32, byte_count, (PAGE_SIZE-page_off));
+		sge[sge_no].addr =
+			ib_dma_map_page(xprt->sc_cm_id->device,
+					xdr->pages[page_no], page_off,
+					sge_bytes, DMA_TO_DEVICE);
+		sge_bytes = min(sge_bytes, page_bytes);
+		byte_count -= sge_bytes;
+		page_bytes -= sge_bytes;
+		sge[sge_no].length = sge_bytes;
+		sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+
+		sge_no++;
+		page_no++;
+		page_off = 0; /* reset for next time through loop */
+	}
+
+	/* Tail SGE */
+	if (byte_count && xdr->tail[0].iov_len) {
+		sge[sge_no].addr =
+			ib_dma_map_single(xprt->sc_cm_id->device,
+					  xdr->tail[0].iov_base,
+					  xdr->tail[0].iov_len,
+					  DMA_TO_DEVICE);
+		sge_bytes = min_t(u32, byte_count, xdr->tail[0].iov_len);
+		byte_count -= sge_bytes;
+		sge[sge_no].length = sge_bytes;
+		sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+		sge_no++;
+	}
+
+	BUG_ON(sge_no > sge_max);
+	BUG_ON(byte_count != 0);
+
+	*sge_count = sge_no;
+	return sge;
+}
+
+
+/* Assumptions:
+ * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
+ */
+static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
+		      u32 rmr, u64 to,
+		      u32 xdr_off, int write_len,
+		      struct ib_sge *xdr_sge, int sge_count)
+{
+	struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+	struct ib_send_wr write_wr;
+	struct ib_sge *sge;
+	int xdr_sge_no;
+	int sge_no;
+	int sge_bytes;
+	int sge_off;
+	int bc;
+	struct svc_rdma_op_ctxt *ctxt;
+	int ret = 0;
+
+	BUG_ON(sge_count >= 32);
+	dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
+		"write_len=%d, xdr_sge=%p, sge_count=%d\n",
+		rmr, to, xdr_off, write_len, xdr_sge, sge_count);
+
+	ctxt = svc_rdma_get_context(xprt);
+	ctxt->count = 0;
+	tmp_sge_ctxt = svc_rdma_get_context(xprt);
+	sge = tmp_sge_ctxt->sge;
+
+	/* Find the SGE associated with xdr_off */
+	for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < sge_count;
+	     xdr_sge_no++) {
+		if (xdr_sge[xdr_sge_no].length > bc)
+			break;
+		bc -= xdr_sge[xdr_sge_no].length;
+	}
+
+	sge_off = bc;
+	bc = write_len;
+	sge_no = 0;
+
+	/* Copy the remaining SGE */
+	while (bc != 0 && xdr_sge_no < sge_count) {
+		sge[sge_no].addr = xdr_sge[xdr_sge_no].addr + sge_off;
+		sge[sge_no].lkey = xdr_sge[xdr_sge_no].lkey;
+		sge_bytes = min((size_t)bc,
+				(size_t)(xdr_sge[xdr_sge_no].length-sge_off));
+		sge[sge_no].length = sge_bytes;
+
+		sge_off = 0;
+		sge_no++;
+		xdr_sge_no++;
+		bc -= sge_bytes;
+	}
+
+	BUG_ON(bc != 0);
+	BUG_ON(xdr_sge_no > sge_count);
+
+	/* Prepare WRITE WR */
+	memset(&write_wr, 0, sizeof write_wr);
+	ctxt->wr_op = IB_WR_RDMA_WRITE;
+	write_wr.wr_id = (unsigned long)ctxt;
+	write_wr.sg_list = &sge[0];
+	write_wr.num_sge = sge_no;
+	write_wr.opcode = IB_WR_RDMA_WRITE;
+	write_wr.send_flags = IB_SEND_SIGNALED;
+	write_wr.wr.rdma.rkey = rmr;
+	write_wr.wr.rdma.remote_addr = to;
+
+	/* Post It */
+	atomic_inc(&rdma_stat_write);
+	if (svc_rdma_send(xprt, &write_wr)) {
+		svc_rdma_put_context(ctxt, 1);
+		/* Fatal error, close transport */
+		ret = -EIO;
+	}
+	svc_rdma_put_context(tmp_sge_ctxt, 0);
+	return ret;
+}
+
+static int send_write_chunks(struct svcxprt_rdma *xprt,
+			     struct rpcrdma_msg *rdma_argp,
+			     struct rpcrdma_msg *rdma_resp,
+			     struct svc_rqst *rqstp,
+			     struct ib_sge *sge,
+			     int sge_count)
+{
+	u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+	int write_len;
+	int max_write;
+	u32 xdr_off;
+	int chunk_off;
+	int chunk_no;
+	struct rpcrdma_write_array *arg_ary;
+	struct rpcrdma_write_array *res_ary;
+	int ret;
+
+	arg_ary = svc_rdma_get_write_array(rdma_argp);
+	if (!arg_ary)
+		return 0;
+	res_ary = (struct rpcrdma_write_array *)
+		&rdma_resp->rm_body.rm_chunks[1];
+
+	max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+	/* Write chunks start at the pagelist */
+	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
+	     xfer_len && chunk_no < arg_ary->wc_nchunks;
+	     chunk_no++) {
+		struct rpcrdma_segment *arg_ch;
+		u64 rs_offset;
+
+		arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+		write_len = min(xfer_len, arg_ch->rs_length);
+
+		/* Prepare the response chunk given the length actually
+		 * written */
+		rs_offset = get_unaligned(&(arg_ch->rs_offset));
+		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+					    arg_ch->rs_handle,
+					    rs_offset,
+					    write_len);
+		chunk_off = 0;
+		while (write_len) {
+			int this_write;
+			this_write = min(write_len, max_write);
+			ret = send_write(xprt, rqstp,
+					 arg_ch->rs_handle,
+					 rs_offset + chunk_off,
+					 xdr_off,
+					 this_write,
+					 sge,
+					 sge_count);
+			if (ret) {
+				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+					ret);
+				return -EIO;
+			}
+			chunk_off += this_write;
+			xdr_off += this_write;
+			xfer_len -= this_write;
+			write_len -= this_write;
+		}
+	}
+	/* Update the req with the number of chunks actually used */
+	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
+
+	return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+}
+
+static int send_reply_chunks(struct svcxprt_rdma *xprt,
+			     struct rpcrdma_msg *rdma_argp,
+			     struct rpcrdma_msg *rdma_resp,
+			     struct svc_rqst *rqstp,
+			     struct ib_sge *sge,
+			     int sge_count)
+{
+	u32 xfer_len = rqstp->rq_res.len;
+	int write_len;
+	int max_write;
+	u32 xdr_off;
+	int chunk_no;
+	int chunk_off;
+	struct rpcrdma_segment *ch;
+	struct rpcrdma_write_array *arg_ary;
+	struct rpcrdma_write_array *res_ary;
+	int ret;
+
+	arg_ary = svc_rdma_get_reply_array(rdma_argp);
+	if (!arg_ary)
+		return 0;
+	/* XXX: need to fix when reply lists occur with read-list and or
+	 * write-list */
+	res_ary = (struct rpcrdma_write_array *)
+		&rdma_resp->rm_body.rm_chunks[2];
+
+	max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+	/* xdr offset starts at RPC message */
+	for (xdr_off = 0, chunk_no = 0;
+	     xfer_len && chunk_no < arg_ary->wc_nchunks;
+	     chunk_no++) {
+		u64 rs_offset;
+		ch = &arg_ary->wc_array[chunk_no].wc_target;
+		write_len = min(xfer_len, ch->rs_length);
+
+
+		/* Prepare the reply chunk given the length actually
+		 * written */
+		rs_offset = get_unaligned(&(ch->rs_offset));
+		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+					    ch->rs_handle, rs_offset,
+					    write_len);
+		chunk_off = 0;
+		while (write_len) {
+			int this_write;
+
+			this_write = min(write_len, max_write);
+			ret = send_write(xprt, rqstp,
+					 ch->rs_handle,
+					 rs_offset + chunk_off,
+					 xdr_off,
+					 this_write,
+					 sge,
+					 sge_count);
+			if (ret) {
+				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+					ret);
+				return -EIO;
+			}
+			chunk_off += this_write;
+			xdr_off += this_write;
+			xfer_len -= this_write;
+			write_len -= this_write;
+		}
+	}
+	/* Update the req with the number of chunks actually used */
+	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+
+	return rqstp->rq_res.len;
+}
+
+/* This function prepares the portion of the RPCRDMA message to be
+ * sent in the RDMA_SEND. This function is called after data sent via
+ * RDMA has already been transmitted. There are three cases:
+ * - The RPCRDMA header, RPC header, and payload are all sent in a
+ *   single RDMA_SEND. This is the "inline" case.
+ * - The RPCRDMA header and some portion of the RPC header and data
+ *   are sent via this RDMA_SEND and another portion of the data is
+ *   sent via RDMA.
+ * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
+ *   header and data are all transmitted via RDMA.
+ * In all three cases, this function prepares the RPCRDMA header in
+ * sge[0], the 'type' parameter indicates the type to place in the
+ * RPCRDMA header, and the 'byte_count' field indicates how much of
+ * the XDR to include in this RDMA_SEND.
+ */
+static int send_reply(struct svcxprt_rdma *rdma,
+		      struct svc_rqst *rqstp,
+		      struct page *page,
+		      struct rpcrdma_msg *rdma_resp,
+		      struct svc_rdma_op_ctxt *ctxt,
+		      int sge_count,
+		      int byte_count)
+{
+	struct ib_send_wr send_wr;
+	int sge_no;
+	int sge_bytes;
+	int page_no;
+	int ret;
+
+	/* Prepare the context */
+	ctxt->pages[0] = page;
+	ctxt->count = 1;
+
+	/* Prepare the SGE for the RPCRDMA Header */
+	ctxt->sge[0].addr =
+		ib_dma_map_page(rdma->sc_cm_id->device,
+				page, 0, PAGE_SIZE, DMA_TO_DEVICE);
+	ctxt->direction = DMA_TO_DEVICE;
+	ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
+	ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
+
+	/* Determine how many of our SGE are to be transmitted */
+	for (sge_no = 1; byte_count && sge_no < sge_count; sge_no++) {
+		sge_bytes = min((size_t)ctxt->sge[sge_no].length,
+				(size_t)byte_count);
+		byte_count -= sge_bytes;
+	}
+	BUG_ON(byte_count != 0);
+
+	/* Save all respages in the ctxt and remove them from the
+	 * respages array. They are our pages until the I/O
+	 * completes.
+	 */
+	for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
+		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
+		ctxt->count++;
+		rqstp->rq_respages[page_no] = NULL;
+	}
+
+	BUG_ON(sge_no > rdma->sc_max_sge);
+	memset(&send_wr, 0, sizeof send_wr);
+	ctxt->wr_op = IB_WR_SEND;
+	send_wr.wr_id = (unsigned long)ctxt;
+	send_wr.sg_list = ctxt->sge;
+	send_wr.num_sge = sge_no;
+	send_wr.opcode = IB_WR_SEND;
+	send_wr.send_flags =  IB_SEND_SIGNALED;
+
+	ret = svc_rdma_send(rdma, &send_wr);
+	if (ret)
+		svc_rdma_put_context(ctxt, 1);
+
+	return ret;
+}
+
+void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+	struct svcxprt_rdma *rdma =
+		container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
+	/*
+	 * Return the receive WR to the RQ. Any error posting to the
+	 * RQ will surface on the SQ post below.
+	 */
+	(void)svc_rdma_post_recv(rdma);
+}
+
+int svc_rdma_sendto(struct svc_rqst *rqstp)
+{
+	struct svc_xprt *xprt = rqstp->rq_xprt;
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	struct rpcrdma_msg *rdma_argp;
+	struct rpcrdma_msg *rdma_resp;
+	struct rpcrdma_write_array *reply_ary;
+	enum rpcrdma_proc reply_type;
+	int ret;
+	int inline_bytes;
+	struct ib_sge *sge;
+	int sge_count = 0;
+	struct page *res_page;
+	struct svc_rdma_op_ctxt *ctxt;
+
+	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+
+	/* Get the RDMA request header. */
+	rdma_argp = page_address(rqstp->rq_pages[0]);
+
+	/* Build an SGE for the XDR */
+	ctxt = svc_rdma_get_context(rdma);
+	ctxt->direction = DMA_TO_DEVICE;
+	sge = xdr_to_sge(rdma, &rqstp->rq_res, ctxt->sge, &sge_count);
+
+	inline_bytes = rqstp->rq_res.len;
+
+	/* Create the RDMA response header */
+	res_page = svc_rdma_get_page();
+	rdma_resp = page_address(res_page);
+	reply_ary = svc_rdma_get_reply_array(rdma_argp);
+	if (reply_ary)
+		reply_type = RDMA_NOMSG;
+	else
+		reply_type = RDMA_MSG;
+	svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
+					 rdma_resp, reply_type);
+
+	/* Send any write-chunk data and build resp write-list */
+	ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
+				rqstp, sge, sge_count);
+	if (ret < 0) {
+		printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
+		       ret);
+		goto error;
+	}
+	inline_bytes -= ret;
+
+	/* Send any reply-list data and update resp reply-list */
+	ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
+				rqstp, sge, sge_count);
+	if (ret < 0) {
+		printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
+		       ret);
+		goto error;
+	}
+	inline_bytes -= ret;
+
+	ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, sge_count,
+			 inline_bytes);
+	dprintk("svcrdma: send_reply returns %d\n", ret);
+	return ret;
+ error:
+	svc_rdma_put_context(ctxt, 0);
+	put_page(res_page);
+	return ret;
+}

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 08/10] rdma: ONCRPC RDMA protocol marshalling
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
                   ` (6 preceding siblings ...)
  2007-10-19 21:56 ` [RFC,PATCH 07/10] rdma: SVCRDMA sendto Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 09/10] rdma: makefile Tom Tucker
                   ` (2 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


This logic parses the ONCRDMA protocol headers that
precede the actual RPC header. It is placed in a separate
file to keep all protocol aware code in a single place.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 net/sunrpc/svc_rdma_marshal.c |  412 +++++++++++++++++++++++++++++++++++++++++
 1 files changed, 412 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma_marshal.c b/net/sunrpc/svc_rdma_marshal.c
new file mode 100644
index 0000000..9530ef2
--- /dev/null
+++ b/net/sunrpc/svc_rdma_marshal.c
@@ -0,0 +1,412 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/debug.h>
+#include <asm/unaligned.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/*
+ * Decodes a read chunk list. The expected format is as follows:
+ *    descrim  : xdr_one
+ *    position : u32 offset into XDR stream
+ *    handle   : u32 RKEY
+ *    . . .
+ *  end-of-list: xdr_zero
+ */
+static u32 *decode_read_list(u32 *va, u32 *vaend)
+{
+	struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk *)va;
+
+	while (ch->rc_discrim != xdr_zero) {
+		u64 ch_offset;
+
+		if (((unsigned long)ch + sizeof(struct rpcrdma_read_chunk)) >
+		    (unsigned long)vaend) {
+			dprintk("svcrdma: vaend=%p, ch=%p\n", vaend, ch);
+			return NULL;
+		}
+
+		ch->rc_discrim = ntohl(ch->rc_discrim);
+		ch->rc_position = ntohl(ch->rc_position);
+		ch->rc_target.rs_handle = ntohl(ch->rc_target.rs_handle);
+		ch->rc_target.rs_length = ntohl(ch->rc_target.rs_length);
+		va = (u32 *)&ch->rc_target.rs_offset;
+		xdr_decode_hyper(va, &ch_offset);
+		put_unaligned(ch_offset, (u64 *)va);
+		ch++;
+	}
+	return (u32 *)&ch->rc_position;
+}
+
+/*
+ * Determine number of chunks and total bytes in chunk list. The chunk
+ * list has already been verified to fit within the RPCRDMA header.
+ */
+void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,
+			       int *ch_count, int *byte_count)
+{
+	/* compute the number of bytes represented by read chunks */
+	*byte_count = 0;
+	*ch_count = 0;
+	for (; ch->rc_discrim != 0; ch++) {
+		*byte_count = *byte_count + ch->rc_target.rs_length;
+		*ch_count = *ch_count + 1;
+	}
+}
+
+/*
+ * Decodes a write chunk list. The expected format is as follows:
+ *    descrim  : xdr_one
+ *    nchunks  : <count>
+ *       handle   : u32 RKEY              ---+
+ *       length   : u32 <len of segment>     |
+ *       offset   : remove va                + <count>
+ *       . . .                               |
+ *                                        ---+
+ */
+static u32 *decode_write_list(u32 *va, u32 *vaend)
+{
+	int ch_no;
+	struct rpcrdma_write_array *ary =
+		(struct rpcrdma_write_array *)va;
+
+	/* Check for not write-array */
+	if (ary->wc_discrim == xdr_zero)
+		return (u32 *)&ary->wc_nchunks;
+
+	if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+	    (unsigned long)vaend) {
+		dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+		return NULL;
+	}
+	ary->wc_discrim = ntohl(ary->wc_discrim);
+	ary->wc_nchunks = ntohl(ary->wc_nchunks);
+	if (((unsigned long)&ary->wc_array[0] +
+	     (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+	    (unsigned long)vaend) {
+		dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+			ary, ary->wc_nchunks, vaend);
+		return NULL;
+	}
+	for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+		u64 ch_offset;
+
+		ary->wc_array[ch_no].wc_target.rs_handle =
+			ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+		ary->wc_array[ch_no].wc_target.rs_length =
+			ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+		va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset;
+		xdr_decode_hyper(va, &ch_offset);
+		put_unaligned(ch_offset, (u64 *)va);
+	}
+
+	/*
+	 * rs_length is the 2nd 4B field in wc_target and taking its
+	 * address skips the list terminator
+	 */
+	return (u32 *)&ary->wc_array[ch_no].wc_target.rs_length;
+}
+
+static u32 *decode_reply_array(u32 *va, u32 *vaend)
+{
+	int ch_no;
+	struct rpcrdma_write_array *ary =
+		(struct rpcrdma_write_array *)va;
+
+	/* Check for no reply-array */
+	if (ary->wc_discrim == xdr_zero)
+		return (u32 *)&ary->wc_nchunks;
+
+	if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+	    (unsigned long)vaend) {
+		dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+		return NULL;
+	}
+	ary->wc_discrim = ntohl(ary->wc_discrim);
+	ary->wc_nchunks = ntohl(ary->wc_nchunks);
+	if (((unsigned long)&ary->wc_array[0] +
+	     (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+	    (unsigned long)vaend) {
+		dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+			ary, ary->wc_nchunks, vaend);
+		return NULL;
+	}
+	for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+		u64 ch_offset;
+
+		ary->wc_array[ch_no].wc_target.rs_handle =
+			ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+		ary->wc_array[ch_no].wc_target.rs_length =
+			ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+		va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset;
+		xdr_decode_hyper(va, &ch_offset);
+		put_unaligned(ch_offset, (u64 *)va);
+	}
+
+	return (u32 *)&ary->wc_array[ch_no];
+}
+
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
+			    struct svc_rqst *rqstp)
+{
+	struct rpcrdma_msg *rmsgp = NULL;
+	u32 *va;
+	u32 *vaend;
+	u32 hdr_len;
+
+	rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+	/* Verify that there's enough bytes for header + something */
+	if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+		dprintk("svcrdma: header too short = %d\n",
+			rqstp->rq_arg.len);
+		return -EINVAL;
+	}
+
+	/* Decode the header */
+	rmsgp->rm_xid = ntohl(rmsgp->rm_xid);
+	rmsgp->rm_vers = ntohl(rmsgp->rm_vers);
+	rmsgp->rm_credit = ntohl(rmsgp->rm_credit);
+	rmsgp->rm_type = ntohl(rmsgp->rm_type);
+
+	if (rmsgp->rm_vers != RPCRDMA_VERSION)
+		return -ENOSYS;
+
+	/* Pull in the extra for the padded case and bump our pointer */
+	if (rmsgp->rm_type == RDMA_MSGP) {
+		int hdrlen;
+		rmsgp->rm_body.rm_padded.rm_align =
+			ntohl(rmsgp->rm_body.rm_padded.rm_align);
+		rmsgp->rm_body.rm_padded.rm_thresh =
+			ntohl(rmsgp->rm_body.rm_padded.rm_thresh);
+
+		va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+		rqstp->rq_arg.head[0].iov_base = va;
+		hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+		rqstp->rq_arg.head[0].iov_len -= hdrlen;
+		if (hdrlen > rqstp->rq_arg.len)
+			return -EINVAL;
+		return hdrlen;
+	}
+
+	/* The chunk list may contain either a read chunk list or a write
+	 * chunk list and a reply chunk list.
+	 */
+	va = &rmsgp->rm_body.rm_chunks[0];
+	vaend = (u32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
+	va = decode_read_list(va, vaend);
+	if (!va)
+		return -EINVAL;
+	va = decode_write_list(va, vaend);
+	if (!va)
+		return -EINVAL;
+	va = decode_reply_array(va, vaend);
+	if (!va)
+		return -EINVAL;
+
+	rqstp->rq_arg.head[0].iov_base = va;
+	hdr_len = (unsigned long)va - (unsigned long)rmsgp;
+	rqstp->rq_arg.head[0].iov_len -= hdr_len;
+
+	*rdma_req = rmsgp;
+	return hdr_len;
+}
+
+int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *rqstp)
+{
+	struct rpcrdma_msg *rmsgp = NULL;
+	struct rpcrdma_read_chunk *ch;
+	struct rpcrdma_write_array *ary;
+	u32 *va;
+	u32 hdrlen;
+
+	dprintk("svcrdma: processing deferred RDMA header on rqstp=%p\n",
+		rqstp);
+	rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+	/* Pull in the extra for the padded case and bump our pointer */
+	if (rmsgp->rm_type == RDMA_MSGP) {
+		va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+		rqstp->rq_arg.head[0].iov_base = va;
+		hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+		rqstp->rq_arg.head[0].iov_len -= hdrlen;
+		return hdrlen;
+	}
+
+	/*
+	 * Skip all chunks to find RPC msg. These were previously processed
+	 */
+	va = &rmsgp->rm_body.rm_chunks[0];
+
+	/* Skip read-list */
+	for (ch = (struct rpcrdma_read_chunk *)va;
+	     ch->rc_discrim != xdr_zero; ch++);
+	va = (u32 *)&ch->rc_position;
+
+	/* Skip write-list */
+	ary = (struct rpcrdma_write_array *)va;
+	if (ary->wc_discrim == xdr_zero)
+		va = (u32 *)&ary->wc_nchunks;
+	else
+		/*
+		 * rs_length is the 2nd 4B field in wc_target and taking its
+		 * address skips the list terminator
+		 */
+		va = (u32 *)&ary->wc_array[ary->wc_nchunks].wc_target.rs_length;
+
+	/* Skip reply-array */
+	ary = (struct rpcrdma_write_array *)va;
+	if (ary->wc_discrim == xdr_zero)
+		va = (u32 *)&ary->wc_nchunks;
+	else
+		va = (u32 *)&ary->wc_array[ary->wc_nchunks];
+
+	rqstp->rq_arg.head[0].iov_base = va;
+	hdrlen = (unsigned long)va - (unsigned long)rmsgp;
+	rqstp->rq_arg.head[0].iov_len -= hdrlen;
+
+	return hdrlen;
+}
+
+int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
+			      struct rpcrdma_msg *rmsgp,
+			      enum rpcrdma_errcode err, u32 *va)
+{
+	u32 *startp = va;
+
+	*va++ = htonl(rmsgp->rm_xid);
+	*va++ = htonl(rmsgp->rm_vers);
+	*va++ = htonl(xprt->sc_max_requests);
+	*va++ = htonl(RDMA_ERROR);
+	*va++ = htonl(err);
+	if (err == ERR_VERS) {
+		*va++ = htonl(RPCRDMA_VERSION);
+		*va++ = htonl(RPCRDMA_VERSION);
+	}
+
+	return (int)((unsigned long)va - (unsigned long)startp);
+}
+
+int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
+{
+	struct rpcrdma_write_array *wr_ary;
+
+	/* There is no read-list in a reply */
+
+	/* skip write list */
+	wr_ary = (struct rpcrdma_write_array *)
+		&rmsgp->rm_body.rm_chunks[1];
+	if (wr_ary->wc_discrim)
+		wr_ary = (struct rpcrdma_write_array *)
+			&wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)].
+			wc_target.rs_length;
+	else
+		wr_ary = (struct rpcrdma_write_array *)
+			&wr_ary->wc_nchunks;
+
+	/* skip reply array */
+	if (wr_ary->wc_discrim)
+		wr_ary = (struct rpcrdma_write_array *)
+			&wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)];
+	else
+		wr_ary = (struct rpcrdma_write_array *)
+			&wr_ary->wc_nchunks;
+
+	return (unsigned long) wr_ary - (unsigned long) rmsgp;
+}
+
+void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
+{
+	struct rpcrdma_write_array *ary;
+
+	/* no read-list */
+	rmsgp->rm_body.rm_chunks[0] = xdr_zero;
+
+	/* write-array discrim */
+	ary = (struct rpcrdma_write_array *)
+		&rmsgp->rm_body.rm_chunks[1];
+	ary->wc_discrim = xdr_one;
+	ary->wc_nchunks = htonl(chunks);
+
+	/* write-list terminator */
+	ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
+
+	/* reply-array discriminator */
+	ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
+}
+
+void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
+				 int chunks)
+{
+	ary->wc_discrim = xdr_one;
+	ary->wc_nchunks = htonl(chunks);
+}
+
+void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
+				     int chunk_no,
+				     u32 rs_handle, u64 rs_offset,
+				     u32 write_len)
+{
+	struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
+	seg->rs_handle = htonl(rs_handle);
+	seg->rs_length = htonl(write_len);
+	xdr_encode_hyper((u32 *) &seg->rs_offset, rs_offset);
+}
+
+void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
+				  struct rpcrdma_msg *rdma_argp,
+				  struct rpcrdma_msg *rdma_resp,
+				  enum rpcrdma_proc rdma_type)
+{
+	rdma_resp->rm_xid = htonl(rdma_argp->rm_xid);
+	rdma_resp->rm_vers = htonl(rdma_argp->rm_vers);
+	rdma_resp->rm_credit = htonl(xprt->sc_max_requests);
+	rdma_resp->rm_type = htonl(rdma_type);
+
+	/* Encode <nul> chunks lists */
+	rdma_resp->rm_body.rm_chunks[0] = xdr_zero;
+	rdma_resp->rm_body.rm_chunks[1] = xdr_zero;
+	rdma_resp->rm_body.rm_chunks[2] = xdr_zero;
+}

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 09/10] rdma: makefile
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
                   ` (7 preceding siblings ...)
  2007-10-19 21:56 ` [RFC,PATCH 08/10] rdma: ONCRPC RDMA protocol marshalling Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-19 21:56 ` [RFC,PATCH 10/10] rdma: Kconfig Tom Tucker
  2007-10-22 22:02 ` [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver J. Bruce Fields
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


Add the NFSD_RDMA module to the sunrpc makefile.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 net/sunrpc/Makefile |    4 ++++
 1 files changed, 4 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index e37aa99..73038ac 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -14,3 +14,7 @@ sunrpc-y := clnt.o xprt.o socklib.o xprt
 	    svc_xprt.o
 sunrpc-$(CONFIG_PROC_FS) += stats.o
 sunrpc-$(CONFIG_SYSCTL) += sysctl.o
+
+obj-$(CONFIG_NFSD_RDMA) += svcrdma.o
+svcrdma-y := svc_rdma.o svc_rdma_transport.o \
+	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [RFC,PATCH 10/10] rdma: Kconfig
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
                   ` (8 preceding siblings ...)
  2007-10-19 21:56 ` [RFC,PATCH 09/10] rdma: makefile Tom Tucker
@ 2007-10-19 21:56 ` Tom Tucker
  2007-10-22 22:02 ` [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver J. Bruce Fields
  10 siblings, 0 replies; 15+ messages in thread
From: Tom Tucker @ 2007-10-19 21:56 UTC (permalink / raw)
  To: nfs; +Cc: bfields, neilb, gnb


Add NFS_RDMA as an option to the Kconfig file.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
---

 fs/Kconfig |    7 +++++++
 1 files changed, 7 insertions(+), 0 deletions(-)

diff --git a/fs/Kconfig b/fs/Kconfig
index f9eed6d..b5d67f4 100644
--- a/fs/Kconfig
+++ b/fs/Kconfig
@@ -1649,6 +1649,13 @@ config NFSD
 	  To compile the NFS server support as a module, choose M here: the
 	  module will be called nfsd.  If unsure, say N.
 
+config NFSD_RDMA
+	tristate "Provide NFS server over RDMA support (EXPERIMENTAL)"
+	depends on SUNRPC && NFSD && INFINIBAND && EXPERIMENTAL
+	help
+	  If you want your NFS server to support RDMA connections, 
+	  say M or Y here. If unsure, say N.
+
 config NFSD_V2_ACL
 	bool
 	depends on NFSD

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* Re: [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver
  2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
                   ` (9 preceding siblings ...)
  2007-10-19 21:56 ` [RFC,PATCH 10/10] rdma: Kconfig Tom Tucker
@ 2007-10-22 22:02 ` J. Bruce Fields
  2007-10-22 22:03   ` J. Bruce Fields
  2007-10-23  2:44   ` Tom Tucker
  10 siblings, 2 replies; 15+ messages in thread
From: J. Bruce Fields @ 2007-10-22 22:02 UTC (permalink / raw)
  To: Tom Tucker; +Cc: neilb, nfs, gnb

On Fri, Oct 19, 2007 at 04:53:04PM -0500, Tom Tucker wrote:
> This patchset implements the RDMA Transport Driver for the 
> server transport switch. This patchset has been tested with iozone
> and Connectathon over NFSv3. 

Do you want these applied?

> This patchset along with all of the previous server side
> transport switches are also available in this git tree:
> 
> git://linux-nfs.org/~tomtucker/nfs-rdma-dev-2.6.git

This is missing a couple changes I made in my version: the two I notice
are that I merged both "svc: Don't copy xprt_class data into svc_xprt
instance" and "svc: Rename xpo_release to xpo_release_rqst" in with the
previous patches.

So I've ignored that for now and just applied these new patches on top
of mine.  I also rebased onto the latest linus git.

Also, there's still that small sysctl complaint from Andrew outstanding,
which I haven't had the time to look into.

--b.

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver
  2007-10-22 22:02 ` [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver J. Bruce Fields
@ 2007-10-22 22:03   ` J. Bruce Fields
  2007-10-23  2:44   ` Tom Tucker
  1 sibling, 0 replies; 15+ messages in thread
From: J. Bruce Fields @ 2007-10-22 22:03 UTC (permalink / raw)
  To: Tom Tucker; +Cc: neilb, nfs, gnb

On Mon, Oct 22, 2007 at 06:02:19PM -0400, bfields wrote:
> On Fri, Oct 19, 2007 at 04:53:04PM -0500, Tom Tucker wrote:
> > This patchset implements the RDMA Transport Driver for the 
> > server transport switch. This patchset has been tested with iozone
> > and Connectathon over NFSv3. 
> 
> Do you want these applied?
> 
> > This patchset along with all of the previous server side
> > transport switches are also available in this git tree:
> > 
> > git://linux-nfs.org/~tomtucker/nfs-rdma-dev-2.6.git

(I'm assuming, by the way, that you were referring to the
server-xprt-switch branch.)

> 
> This is missing a couple changes I made in my version: the two I notice
> are that I merged both "svc: Don't copy xprt_class data into svc_xprt
> instance" and "svc: Rename xpo_release to xpo_release_rqst" in with the
> previous patches.
> 
> So I've ignored that for now and just applied these new patches on top
> of mine.  I also rebased onto the latest linus git.
> 
> Also, there's still that small sysctl complaint from Andrew outstanding,
> which I haven't had the time to look into.
> 
> --b.

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver
  2007-10-22 22:02 ` [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver J. Bruce Fields
  2007-10-22 22:03   ` J. Bruce Fields
@ 2007-10-23  2:44   ` Tom Tucker
  2007-10-23 18:05     ` J. Bruce Fields
  1 sibling, 1 reply; 15+ messages in thread
From: Tom Tucker @ 2007-10-23  2:44 UTC (permalink / raw)
  To: J. Bruce Fields; +Cc: neilb, nfs, gnb

J. Bruce Fields wrote:
> On Fri, Oct 19, 2007 at 04:53:04PM -0500, Tom Tucker wrote:
>   
>> This patchset implements the RDMA Transport Driver for the 
>> server transport switch. This patchset has been tested with iozone
>> and Connectathon over NFSv3. 
>>     
>
> Do you want these applied?
>
>   
Yes. I'm assuming that this would include all incremental patches prior 
to the RDMA transport driver as well?
>> This patchset along with all of the previous server side
>> transport switches are also available in this git tree:
>>
>> git://linux-nfs.org/~tomtucker/nfs-rdma-dev-2.6.git
>>     
>
> This is missing a couple changes I made in my version: the two I notice
> are that I merged both "svc: Don't copy xprt_class data into svc_xprt
> instance" and "svc: Rename xpo_release to xpo_release_rqst" in with the
> previous patches.
>
>   
Yes, please. Should I post my subsequent patches relative to your git 
tree? That might make life easier.
> So I've ignored that for now and just applied these new patches on top
> of mine.  I also rebased onto the latest linus git.
>   
I should compare your tot to mine to make sure we're not out of sync. 
I'll do that tomorrow.

FYI, I'm working on a "perfect hindsight" branch as well. I'll let you 
know when I've got that done.
> Also, there's still that small sysctl complaint from Andrew outstanding,
> which I haven't had the time to look into.
>
>   

> --b.
>   



-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver
  2007-10-23  2:44   ` Tom Tucker
@ 2007-10-23 18:05     ` J. Bruce Fields
  0 siblings, 0 replies; 15+ messages in thread
From: J. Bruce Fields @ 2007-10-23 18:05 UTC (permalink / raw)
  To: Tom Tucker; +Cc: neilb, nfs, gnb

On Mon, Oct 22, 2007 at 09:44:13PM -0500, Tom Tucker wrote:
> J. Bruce Fields wrote:
>> On Fri, Oct 19, 2007 at 04:53:04PM -0500, Tom Tucker wrote:
>>   
>>> This patchset implements the RDMA Transport Driver for the server 
>>> transport switch. This patchset has been tested with iozone
>>> and Connectathon over NFSv3.     
>>
>> Do you want these applied?
>>
>>   
> Yes. I'm assuming that this would include all incremental patches prior to 
> the RDMA transport driver as well?

Yes; you can check my server-xprt-switch branch to see if I got it all.

>>> This patchset along with all of the previous server side
>>> transport switches are also available in this git tree:
>>>
>>> git://linux-nfs.org/~tomtucker/nfs-rdma-dev-2.6.git
>>>     
>>
>> This is missing a couple changes I made in my version: the two I notice
>> are that I merged both "svc: Don't copy xprt_class data into svc_xprt
>> instance" and "svc: Rename xpo_release to xpo_release_rqst" in with the
>> previous patches.
>>
>>   
> Yes, please. Should I post my subsequent patches relative to your git tree? 
> That might make life easier.

Sure; thanks.

>> So I've ignored that for now and just applied these new patches on top
>> of mine.  I also rebased onto the latest linus git.
>>   
> I should compare your tot to mine to make sure we're not out of sync. I'll 
> do that tomorrow.
>
> FYI, I'm working on a "perfect hindsight" branch as well. I'll let you know 
> when I've got that done.

Oh, OK, fantastic.  I'll stop monkeying with my copy if that's the case.
(Though only exception being that if I may need to rebase every now and
then.)

--b.

-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems?  Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
NFS maillist  -  NFS@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfs

^ permalink raw reply	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2007-10-23 18:05 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2007-10-19 21:53 [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 01/10] rdma: ONCRPC RDMA Header File Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 02/10] rdma: sysctl for SVCRDMA transport module Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 03/10] rdma: SVCRMDA Header File Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 04/10] rdma: SVCRDMA Transport Module Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 05/10] rdma: SVCRDMA Core Transport Services Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 06/10] rdma: SVCRDMA recvfrom Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 07/10] rdma: SVCRDMA sendto Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 08/10] rdma: ONCRPC RDMA protocol marshalling Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 09/10] rdma: makefile Tom Tucker
2007-10-19 21:56 ` [RFC,PATCH 10/10] rdma: Kconfig Tom Tucker
2007-10-22 22:02 ` [RFC,PATCH 00/10] rdma: SVCRDMA Transport Driver J. Bruce Fields
2007-10-22 22:03   ` J. Bruce Fields
2007-10-23  2:44   ` Tom Tucker
2007-10-23 18:05     ` J. Bruce Fields

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.