linux-scsi.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 4/7] [SCSI] scst: Add SRP target driver
@ 2010-12-20 17:49 Bart Van Assche
  2010-12-21  1:50 ` Jack Wang
       [not found] ` <201012201849.27639.bvanassche-HInyCGIudOg@public.gmane.org>
  0 siblings, 2 replies; 6+ messages in thread
From: Bart Van Assche @ 2010-12-20 17:49 UTC (permalink / raw)
  To: linux-scsi, scst-devel, linux-rdma, James.Bottomley
  Cc: Vu Pham, Roland Dreier, David Dillow

This patch adds the kernel module ib_srpt, which is a SCSI RDMA Protocol (SRP)
target implementation. This driver uses the InfiniBand stack and the SCST core.

It is a high performance driver capable of handling 600K+ 4K random write
IOPS by a single target as well as 2.5+ GB/s sequential throughput over
a single QDR IB port.

This driver was originally developed by Vu Pham (Mellanox) and has been
optimized by Bart Van Assche.

Signed-off-by: Bart Van Assche <bvanassche@acm.org>
Signed-off-by: Vladislav Bolkhovitin <vst@vlnb.net>
Cc: Vu Pham <vu@mellanox.com>
Cc: Roland Dreier <rdreier@cisco.com>
Cc: David Dillow <dillowda@ornl.gov>
---
 Documentation/scst/README.srpt |  112 ++
 drivers/scst/Kconfig           |    1 +
 drivers/scst/Makefile          |    2 +-
 drivers/scst/srpt/Kconfig      |   12 +
 drivers/scst/srpt/Makefile     |    1 +
 drivers/scst/srpt/ib_dm_mad.h  |  139 ++
 drivers/scst/srpt/ib_srpt.c    | 3613 ++++++++++++++++++++++++++++++++++++++++
 drivers/scst/srpt/ib_srpt.h    |  352 ++++
 8 files changed, 4231 insertions(+), 1 deletions(-)
 create mode 100644 Documentation/scst/README.srpt
 create mode 100644 drivers/scst/srpt/Kconfig
 create mode 100644 drivers/scst/srpt/Makefile
 create mode 100644 drivers/scst/srpt/ib_dm_mad.h
 create mode 100644 drivers/scst/srpt/ib_srpt.c
 create mode 100644 drivers/scst/srpt/ib_srpt.h

diff --git a/Documentation/scst/README.srpt b/Documentation/scst/README.srpt
new file mode 100644
index 0000000..6f8b3ca
--- /dev/null
+++ b/Documentation/scst/README.srpt
@@ -0,0 +1,112 @@
+SCSI RDMA Protocol (SRP) Target driver for Linux
+=================================================
+
+The SRP Target driver is designed to work directly on top of the
+OpenFabrics OFED-1.x software stack (http://www.openfabrics.org) or
+the Infiniband drivers in the Linux kernel tree
+(http://www.kernel.org). The SRP target driver also interfaces with
+the generic SCSI target mid-level driver called SCST
+(http://scst.sourceforge.net).
+
+How-to run
+-----------
+
+A. On srp target machine
+1. Please refer to SCST's README for loading scst driver and its
+dev_handlers drivers (scst_disk, scst_vdisk block or file IO mode, nullio, ...)
+
+Example 1: working with real back-end scsi disks
+a. modprobe scst
+b. modprobe scst_disk
+c. cat /proc/scsi_tgt/scsi_tgt
+
+ibstor00:~ # cat /proc/scsi_tgt/scsi_tgt
+Device (host:ch:id:lun or name)                             Device handler
+0:0:0:0                                                     dev_disk
+4:0:0:0                                                     dev_disk
+5:0:0:0                                                     dev_disk
+6:0:0:0                                                     dev_disk
+7:0:0:0                                                     dev_disk
+
+Now you want to exclude the first scsi disk and expose the last 4 scsi disks as
+IB/SRP luns for I/O
+echo "add 4:0:0:0 0" >/proc/scsi_tgt/groups/Default/devices
+echo "add 5:0:0:0 1" >/proc/scsi_tgt/groups/Default/devices
+echo "add 6:0:0:0 2" >/proc/scsi_tgt/groups/Default/devices
+echo "add 7:0:0:0 3" >/proc/scsi_tgt/groups/Default/devices
+
+Example 2: working with VDISK FILEIO mode (using md0 device and file 10G-file)
+a. modprobe scst
+b. modprobe scst_vdisk
+c. echo "open vdisk0 /dev/md0" > /proc/scsi_tgt/vdisk/vdisk
+d. echo "open vdisk1 /10G-file" > /proc/scsi_tgt/vdisk/vdisk
+e. echo "add vdisk0 0" >/proc/scsi_tgt/groups/Default/devices
+f. echo "add vdisk1 1" >/proc/scsi_tgt/groups/Default/devices
+
+Example 3: working with VDISK BLOCKIO mode (using md0 device, sda, and cciss/c1d0)
+a. modprobe scst
+b. modprobe scst_vdisk
+c. echo "open vdisk0 /dev/md0 BLOCKIO" > /proc/scsi_tgt/vdisk/vdisk
+d. echo "open vdisk1 /dev/sda BLOCKIO" > /proc/scsi_tgt/vdisk/vdisk
+e. echo "open vdisk2 /dev/cciss/c1d0 BLOCKIO" > /proc/scsi_tgt/vdisk/vdisk
+f. echo "add vdisk0 0" >/proc/scsi_tgt/groups/Default/devices
+g. echo "add vdisk1 1" >/proc/scsi_tgt/groups/Default/devices
+h. echo "add vdisk2 2" >/proc/scsi_tgt/groups/Default/devices
+
+2. modprobe ib_srpt
+
+
+B. On initiator machines you can manualy do the following steps:
+1. modprobe ib_srp
+2. ibsrpdm -c (to discover new SRP target)
+3. echo <new target info> > /sys/class/infiniband_srp/srp-mthca0-1/add_target
+4. fdisk -l (will show new discovered scsi disks)
+
+Example:
+Assume that you use port 1 of first HCA in the system ie. mthca0
+
+[root@lab104 ~]# ibsrpdm -c -d /dev/infiniband/umad0
+id_ext=0002c90200226cf4,ioc_guid=0002c90200226cf4,
+dgid=fe800000000000000002c90200226cf5,pkey=ffff,service_id=0002c90200226cf4
+[root@lab104 ~]# echo id_ext=0002c90200226cf4,ioc_guid=0002c90200226cf4,
+dgid=fe800000000000000002c90200226cf5,pkey=ffff,service_id=0002c90200226cf4 >
+/sys/class/infiniband_srp/srp-mthca0-1/add_target
+
+OR
+
++ You can edit /etc/infiniband/openib.conf to load srp driver and srp HA daemon
+automatically ie. set SRP_LOAD=yes, and SRPHA_ENABLE=yes
++ To set up and use high availability feature you need dm-multipath driver
+and multipath tool
++ Please refer to OFED-1.x SRP's user manual for more in-details instructions
+on how-to enable/use HA feature
+
+To minimize QUEUE_FULL conditions, you can apply scst_increase_max_tgt_cmds
+patch from SRPT package from http://sourceforge.net/project/showfiles.php?group_id=110471
+
+
+Performance notes
+-----------------
+
+In some cases, for instance working with SSD devices, which consume 100%
+of a single CPU load for data transfers in their internal threads, to
+maximize IOPS it can be needed to assign for those threads dedicated
+CPUs using Linux CPU affinity facilities. No IRQ processing should be
+done on those CPUs. Check that using /proc/interrupts. See taskset
+command and Documentation/IRQ-affinity.txt in your kernel's source tree
+for how to assign CPU affinity to tasks and IRQs.
+
+The reason for that is that processing of coming commands in SIRQ context
+can be done on the same CPUs as SSD devices' threads doing data
+transfers. As the result, those threads won't receive all the CPU power
+and perform worse.
+
+Alternatively to CPU affinity assignment, you can try to enable SRP
+target's internal thread. It will allows Linux CPU scheduler to better
+distribute load among available CPUs. To enable SRP target driver's
+internal thread you should load ib_srpt module with parameter
+"thread=1".
+
+
+Send questions about this driver to scst-devel@lists.sourceforge.net, CC:
+Vu Pham <vuhuong@mellanox.com> and Bart Van Assche <bart.vanassche@gmail.com>.
diff --git a/drivers/scst/Kconfig b/drivers/scst/Kconfig
index 0263931..08772ad 100644
--- a/drivers/scst/Kconfig
+++ b/drivers/scst/Kconfig
@@ -249,5 +249,6 @@ config SCST_MEASURE_LATENCY
 	  If unsure, say "N".
 
 source "drivers/scst/scst_local/Kconfig"
+source "drivers/scst/srpt/Kconfig"
 
 endmenu
diff --git a/drivers/scst/Makefile b/drivers/scst/Makefile
index ac87362..139df8b 100644
--- a/drivers/scst/Makefile
+++ b/drivers/scst/Makefile
@@ -8,4 +8,4 @@ scst-y        += scst_sysfs.o
 scst-y        += scst_mem.o
 scst-y        += scst_debug.o
 
-obj-$(CONFIG_SCST)   += scst.o dev_handlers/ scst_local/
+obj-$(CONFIG_SCST)   += scst.o dev_handlers/ scst_local/ srpt/
diff --git a/drivers/scst/srpt/Kconfig b/drivers/scst/srpt/Kconfig
new file mode 100644
index 0000000..58de814
--- /dev/null
+++ b/drivers/scst/srpt/Kconfig
@@ -0,0 +1,12 @@
+config SCST_SRPT
+	tristate "InfiniBand SCSI RDMA Protocol target support"
+	depends on INFINIBAND && SCST
+	---help---
+
+	  Support for the SCSI RDMA Protocol (SRP) Target driver. The
+	  SRP protocol is a protocol that allows an initiator to access
+	  a block storage device on another host (target) over a network
+	  that supports the RDMA protocol. Currently the RDMA protocol is
+	  supported by InfiniBand and by iWarp network hardware. More
+	  information about the SRP protocol can be found on the website
+	  of the INCITS T10 technical committee (http://www.t10.org/).
diff --git a/drivers/scst/srpt/Makefile b/drivers/scst/srpt/Makefile
new file mode 100644
index 0000000..39390dc
--- /dev/null
+++ b/drivers/scst/srpt/Makefile
@@ -0,0 +1 @@
+obj-$(CONFIG_SCST_SRPT)			+= ib_srpt.o
diff --git a/drivers/scst/srpt/ib_dm_mad.h b/drivers/scst/srpt/ib_dm_mad.h
new file mode 100644
index 0000000..fb1de1f
--- /dev/null
+++ b/drivers/scst/srpt/ib_dm_mad.h
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2006 - 2009 Mellanox Technology Inc.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+
+#ifndef IB_DM_MAD_H
+#define IB_DM_MAD_H
+
+#include <linux/types.h>
+
+#include <rdma/ib_mad.h>
+
+enum {
+	/*
+	 * See also section 13.4.7 Status Field, table 115 MAD Common Status
+	 * Field Bit Values and also section 16.3.1.1 Status Field in the
+	 * InfiniBand Architecture Specification.
+	 */
+	DM_MAD_STATUS_UNSUP_METHOD = 0x0008,
+	DM_MAD_STATUS_UNSUP_METHOD_ATTR = 0x000c,
+	DM_MAD_STATUS_INVALID_FIELD = 0x001c,
+	DM_MAD_STATUS_NO_IOC = 0x0100,
+
+	/*
+	 * See also the Device Management chapter, section 16.3.3 Attributes,
+	 * table 279 Device Management Attributes in the InfiniBand
+	 * Architecture Specification.
+	 */
+	DM_ATTR_CLASS_PORT_INFO = 0x01,
+	DM_ATTR_IOU_INFO = 0x10,
+	DM_ATTR_IOC_PROFILE = 0x11,
+	DM_ATTR_SVC_ENTRIES = 0x12
+};
+
+struct ib_dm_hdr {
+	u8 reserved[28];
+};
+
+/*
+ * Structure of management datagram sent by the SRP target implementation.
+ * Contains a management datagram header, reliable multi-packet transaction
+ * protocol (RMPP) header and ib_dm_hdr. Notes:
+ * - The SRP target implementation does not use RMPP or ib_dm_hdr when sending
+ *   management datagrams.
+ * - The header size must be exactly 64 bytes (IB_MGMT_DEVICE_HDR), since this
+ *   is the header size that is passed to ib_create_send_mad() in ib_srpt.c.
+ * - The maximum supported size for a management datagram when not using RMPP
+ *   is 256 bytes -- 64 bytes header and 192 (IB_MGMT_DEVICE_DATA) bytes data.
+ */
+struct ib_dm_mad {
+	struct ib_mad_hdr mad_hdr;
+	struct ib_rmpp_hdr rmpp_hdr;
+	struct ib_dm_hdr dm_hdr;
+	u8 data[IB_MGMT_DEVICE_DATA];
+};
+
+/*
+ * IOUnitInfo as defined in section 16.3.3.3 IOUnitInfo of the InfiniBand
+ * Architecture Specification.
+ */
+struct ib_dm_iou_info {
+	__be16 change_id;
+	u8 max_controllers;
+	u8 op_rom;
+	u8 controller_list[128];
+};
+
+/*
+ * IOControllerprofile as defined in section 16.3.3.4 IOControllerProfile of
+ * the InfiniBand Architecture Specification.
+ */
+struct ib_dm_ioc_profile {
+	__be64 guid;
+	__be32 vendor_id;
+	__be32 device_id;
+	__be16 device_version;
+	__be16 reserved1;
+	__be32 subsys_vendor_id;
+	__be32 subsys_device_id;
+	__be16 io_class;
+	__be16 io_subclass;
+	__be16 protocol;
+	__be16 protocol_version;
+	__be16 service_conn;
+	__be16 initiators_supported;
+	__be16 send_queue_depth;
+	u8 reserved2;
+	u8 rdma_read_depth;
+	__be32 send_size;
+	__be32 rdma_size;
+	u8 op_cap_mask;
+	u8 svc_cap_mask;
+	u8 num_svc_entries;
+	u8 reserved3[9];
+	u8 id_string[64];
+};
+
+struct ib_dm_svc_entry {
+	u8 name[40];
+	__be64 id;
+};
+
+/*
+ * See also section 16.3.3.5 ServiceEntries in the InfiniBand Architecture
+ * Specification. See also section B.7, table B.8 in the T10 SRP r16a document.
+ */
+struct ib_dm_svc_entries {
+	struct ib_dm_svc_entry service_entries[4];
+};
+
+#endif
diff --git a/drivers/scst/srpt/ib_srpt.c b/drivers/scst/srpt/ib_srpt.c
new file mode 100644
index 0000000..6d45cf4
--- /dev/null
+++ b/drivers/scst/srpt/ib_srpt.c
@@ -0,0 +1,3613 @@
+/*
+ * Copyright (c) 2006 - 2009 Mellanox Technology Inc.  All rights reserved.
+ * Copyright (C) 2008 Vladislav Bolkhovitin <vst@vlnb.net>
+ * Copyright (C) 2008 - 2010 Bart Van Assche <bart.vanassche@gmail.com>
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/err.h>
+#include <linux/ctype.h>
+#include <linux/kthread.h>
+#include <linux/string.h>
+#include <linux/delay.h>
+#include <asm/atomic.h>
+#include "ib_srpt.h"
+#define LOG_PREFIX "ib_srpt" /* Prefix for SCST tracing macros. */
+#include <scst/scst_debug.h>
+
+/* Name of this kernel module. */
+#define DRV_NAME		"ib_srpt"
+#define DRV_VERSION		"2.1.0-pre"
+#define DRV_RELDATE		"(not yet released)"
+#if defined(CONFIG_SCST_DEBUG) || defined(CONFIG_SCST_TRACING)
+/* Flags to be used in SCST debug tracing statements. */
+#define DEFAULT_SRPT_TRACE_FLAGS (TRACE_OUT_OF_MEM | TRACE_MINOR \
+				  | TRACE_MGMT | TRACE_SPECIAL)
+/* Name of the entry that will be created under /proc/scsi_tgt/ib_srpt. */
+#define SRPT_PROC_TRACE_LEVEL_NAME	"trace_level"
+#endif
+
+#define MELLANOX_SRPT_ID_STRING	"SCST SRP target"
+
+MODULE_AUTHOR("Vu Pham");
+MODULE_DESCRIPTION("InfiniBand SCSI RDMA Protocol target "
+		   "v" DRV_VERSION " (" DRV_RELDATE ")");
+MODULE_LICENSE("Dual BSD/GPL");
+
+/*
+ * Local data types.
+ */
+
+enum threading_mode {
+	MODE_ALL_IN_SIRQ             = 0,
+	MODE_IB_COMPLETION_IN_THREAD = 1,
+	MODE_IB_COMPLETION_IN_SIRQ   = 2,
+};
+
+/*
+ * Global Variables
+ */
+
+static u64 srpt_service_guid;
+/* List of srpt_device structures. */
+static atomic_t srpt_device_count;
+#if defined(CONFIG_SCST_DEBUG) || defined(CONFIG_SCST_TRACING)
+static unsigned long trace_flag = DEFAULT_SRPT_TRACE_FLAGS;
+module_param(trace_flag, long, 0644);
+MODULE_PARM_DESC(trace_flag, "SCST trace flags.");
+#endif
+
+static int thread = 1;
+module_param(thread, int, 0444);
+MODULE_PARM_DESC(thread,
+		 "IB completion and SCSI command processing context. Defaults"
+		 " to one, i.e. process IB completions and SCSI commands in"
+		 " kernel thread context. 0 means soft IRQ whenever possible"
+		 " and 2 means process IB completions in soft IRQ context and"
+		 " SCSI commands in kernel thread context.");
+
+static unsigned srp_max_rdma_size = DEFAULT_MAX_RDMA_SIZE;
+module_param(srp_max_rdma_size, int, 0744);
+MODULE_PARM_DESC(srp_max_rdma_size,
+		 "Maximum size of SRP RDMA transfers for new connections.");
+
+static unsigned srp_max_req_size = DEFAULT_MAX_REQ_SIZE;
+module_param(srp_max_req_size, int, 0444);
+MODULE_PARM_DESC(srp_max_req_size,
+		 "Maximum size of SRP request messages in bytes.");
+
+static unsigned int srp_max_rsp_size = DEFAULT_MAX_RSP_SIZE;
+module_param(srp_max_rsp_size, int, 0444);
+MODULE_PARM_DESC(thread,
+		 "Maximum size of SRP response messages in bytes.");
+
+static int srpt_srq_size = DEFAULT_SRPT_SRQ_SIZE;
+module_param(srpt_srq_size, int, 0444);
+MODULE_PARM_DESC(srpt_srq_size,
+		 "Shared receive queue (SRQ) size.");
+
+static int srpt_sq_size = DEF_SRPT_SQ_SIZE;
+module_param(srpt_sq_size, int, 0444);
+MODULE_PARM_DESC(srpt_sq_size,
+		 "Per-channel send queue (SQ) size.");
+
+static bool use_port_guid_in_session_name;
+module_param(use_port_guid_in_session_name, bool, 0444);
+MODULE_PARM_DESC(use_port_guid_in_session_name,
+		 "Use target port ID in the SCST session name such that"
+		 " redundant paths between multiport systems can be masked.");
+
+static int srpt_get_u64_x(char *buffer, struct kernel_param *kp)
+{
+	return sprintf(buffer, "0x%016llx", *(u64 *)kp->arg);
+}
+module_param_call(srpt_service_guid, NULL, srpt_get_u64_x, &srpt_service_guid,
+		  0444);
+MODULE_PARM_DESC(srpt_service_guid,
+		 "Using this value for ioc_guid, id_ext, and cm_listen_id"
+		 " instead of using the node_guid of the first HCA.");
+
+static void srpt_add_one(struct ib_device *device);
+static void srpt_remove_one(struct ib_device *device);
+static void srpt_unregister_mad_agent(struct srpt_device *sdev);
+static void srpt_unmap_sg_to_ib_sge(struct srpt_rdma_ch *ch,
+				    struct srpt_send_ioctx *ioctx);
+static void srpt_release_channel(struct scst_session *scst_sess);
+
+static struct ib_client srpt_client = {
+	.name = DRV_NAME,
+	.add = srpt_add_one,
+	.remove = srpt_remove_one
+};
+
+/**
+ * srpt_test_and_set_channel_state() - Test and set the channel state.
+ *
+ * @ch: RDMA channel.
+ * @old: channel state to compare with.
+ * @new: state to change the channel state to if the current state matches the
+ *       argument 'old'.
+ *
+ * Returns the previous channel state.
+ */
+static enum rdma_ch_state
+srpt_test_and_set_channel_state(struct srpt_rdma_ch *ch,
+				enum rdma_ch_state old,
+				enum rdma_ch_state new)
+{
+	return atomic_cmpxchg(&ch->state, old, new);
+}
+
+/**
+ * srpt_event_handler() - Asynchronous IB event callback function.
+ *
+ * Callback function called by the InfiniBand core when an asynchronous IB
+ * event occurs. This callback may occur in interrupt context. See also
+ * section 11.5.2, Set Asynchronous Event Handler in the InfiniBand
+ * Architecture Specification.
+ */
+static void srpt_event_handler(struct ib_event_handler *handler,
+			       struct ib_event *event)
+{
+	struct srpt_device *sdev;
+	struct srpt_port *sport;
+
+	sdev = ib_get_client_data(event->device, &srpt_client);
+	if (!sdev || sdev->device != event->device)
+		return;
+
+	TRACE_DBG("ASYNC event= %d on device= %s",
+		  event->event, sdev->device->name);
+
+	switch (event->event) {
+	case IB_EVENT_PORT_ERR:
+		if (event->element.port_num <= sdev->device->phys_port_cnt) {
+			sport = &sdev->port[event->element.port_num - 1];
+			sport->lid = 0;
+			sport->sm_lid = 0;
+		}
+		break;
+	case IB_EVENT_PORT_ACTIVE:
+	case IB_EVENT_LID_CHANGE:
+	case IB_EVENT_PKEY_CHANGE:
+	case IB_EVENT_SM_CHANGE:
+	case IB_EVENT_CLIENT_REREGISTER:
+		/*
+		 * Refresh port data asynchronously. Note: it is safe to call
+		 * schedule_work() even if &sport->work is already on the
+		 * global workqueue because schedule_work() tests for the
+		 * work_pending() condition before adding &sport->work to the
+		 * global work queue.
+		 */
+		if (event->element.port_num <= sdev->device->phys_port_cnt) {
+			sport = &sdev->port[event->element.port_num - 1];
+			if (!sport->lid && !sport->sm_lid)
+				schedule_work(&sport->work);
+		}
+		break;
+	default:
+		PRINT_ERROR("received unrecognized IB event %d", event->event);
+		break;
+	}
+}
+
+/**
+ * srpt_srq_event() - SRQ event callback function.
+ */
+static void srpt_srq_event(struct ib_event *event, void *ctx)
+{
+	PRINT_INFO("SRQ event %d", event->event);
+}
+
+/**
+ * srpt_qp_event() - QP event callback function.
+ */
+static void srpt_qp_event(struct ib_event *event, struct srpt_rdma_ch *ch)
+{
+	TRACE_DBG("QP event %d on cm_id=%p sess_name=%s state=%d",
+		  event->event, ch->cm_id, ch->sess_name,
+		  atomic_read(&ch->state));
+
+	switch (event->event) {
+	case IB_EVENT_COMM_EST:
+		ib_cm_notify(ch->cm_id, event->event);
+		break;
+	case IB_EVENT_QP_LAST_WQE_REACHED:
+		if (srpt_test_and_set_channel_state(ch, RDMA_CHANNEL_LIVE,
+			RDMA_CHANNEL_DISCONNECTING) == RDMA_CHANNEL_LIVE) {
+			PRINT_INFO("disconnected session %s.", ch->sess_name);
+			ib_send_cm_dreq(ch->cm_id, NULL, 0);
+		}
+		break;
+	default:
+		PRINT_ERROR("received unrecognized IB QP event %d",
+			    event->event);
+		break;
+	}
+}
+
+/**
+ * srpt_set_ioc() - Helper function for initializing an IOUnitInfo structure.
+ *
+ * @slot: one-based slot number.
+ * @value: four-bit value.
+ *
+ * Copies the lowest four bits of value in element slot of the array of four
+ * bit elements called c_list (controller list). The index slot is one-based.
+ */
+static void srpt_set_ioc(u8 *c_list, u32 slot, u8 value)
+{
+	u16 id;
+	u8 tmp;
+
+	id = (slot - 1) / 2;
+	if (slot & 0x1) {
+		tmp = c_list[id] & 0xf;
+		c_list[id] = (value << 4) | tmp;
+	} else {
+		tmp = c_list[id] & 0xf0;
+		c_list[id] = (value & 0xf) | tmp;
+	}
+}
+
+/**
+ * srpt_get_class_port_info() - Copy ClassPortInfo to a management datagram.
+ *
+ * See also section 16.3.3.1 ClassPortInfo in the InfiniBand Architecture
+ * Specification.
+ */
+static void srpt_get_class_port_info(struct ib_dm_mad *mad)
+{
+	struct ib_class_port_info *cif;
+
+	cif = (struct ib_class_port_info *)mad->data;
+	memset(cif, 0, sizeof *cif);
+	cif->base_version = 1;
+	cif->class_version = 1;
+	cif->resp_time_value = 20;
+
+	mad->mad_hdr.status = 0;
+}
+
+/**
+ * srpt_get_iou() - Write IOUnitInfo to a management datagram.
+ *
+ * See also section 16.3.3.3 IOUnitInfo in the InfiniBand Architecture
+ * Specification. See also section B.7, table B.6 in the SRP r16a document.
+ */
+static void srpt_get_iou(struct ib_dm_mad *mad)
+{
+	struct ib_dm_iou_info *ioui;
+	u8 slot;
+	int i;
+
+	ioui = (struct ib_dm_iou_info *)mad->data;
+	ioui->change_id = __constant_cpu_to_be16(1);
+	ioui->max_controllers = 16;
+
+	/* set present for slot 1 and empty for the rest */
+	srpt_set_ioc(ioui->controller_list, 1, 1);
+	for (i = 1, slot = 2; i < 16; i++, slot++)
+		srpt_set_ioc(ioui->controller_list, slot, 0);
+
+	mad->mad_hdr.status = 0;
+}
+
+/**
+ * srpt_get_ioc() - Write IOControllerprofile to a management datagram.
+ *
+ * See also section 16.3.3.4 IOControllerProfile in the InfiniBand
+ * Architecture Specification. See also section B.7, table B.7 in the SRP
+ * r16a document.
+ */
+static void srpt_get_ioc(struct srpt_device *sdev, u32 slot,
+			 struct ib_dm_mad *mad)
+{
+	struct ib_dm_ioc_profile *iocp;
+
+	iocp = (struct ib_dm_ioc_profile *)mad->data;
+
+	if (!slot || slot > 16) {
+		mad->mad_hdr.status
+			= __constant_cpu_to_be16(DM_MAD_STATUS_INVALID_FIELD);
+		return;
+	}
+
+	if (slot > 2) {
+		mad->mad_hdr.status
+			= __constant_cpu_to_be16(DM_MAD_STATUS_NO_IOC);
+		return;
+	}
+
+	memset(iocp, 0, sizeof *iocp);
+	strcpy(iocp->id_string, MELLANOX_SRPT_ID_STRING);
+	iocp->guid = cpu_to_be64(srpt_service_guid);
+	iocp->vendor_id = cpu_to_be32(sdev->dev_attr.vendor_id);
+	iocp->device_id = cpu_to_be32(sdev->dev_attr.vendor_part_id);
+	iocp->device_version = cpu_to_be16(sdev->dev_attr.hw_ver);
+	iocp->subsys_vendor_id = cpu_to_be32(sdev->dev_attr.vendor_id);
+	iocp->subsys_device_id = 0x0;
+	iocp->io_class = __constant_cpu_to_be16(SRP_REV16A_IB_IO_CLASS);
+	iocp->io_subclass = __constant_cpu_to_be16(SRP_IO_SUBCLASS);
+	iocp->protocol = __constant_cpu_to_be16(SRP_PROTOCOL);
+	iocp->protocol_version = __constant_cpu_to_be16(SRP_PROTOCOL_VERSION);
+	iocp->send_queue_depth = cpu_to_be16(sdev->srq_size);
+	iocp->rdma_read_depth = 4;
+	iocp->send_size = cpu_to_be32(srp_max_req_size);
+	iocp->rdma_size = cpu_to_be32(min(max(srp_max_rdma_size, 256U),
+					  1U << 24));
+	iocp->num_svc_entries = 1;
+	iocp->op_cap_mask = SRP_SEND_TO_IOC | SRP_SEND_FROM_IOC |
+		SRP_RDMA_READ_FROM_IOC | SRP_RDMA_WRITE_FROM_IOC;
+
+	mad->mad_hdr.status = 0;
+}
+
+/**
+ * srpt_get_svc_entries() - Write ServiceEntries to a management datagram.
+ *
+ * See also section 16.3.3.5 ServiceEntries in the InfiniBand Architecture
+ * Specification. See also section B.7, table B.8 in the SRP r16a document.
+ */
+static void srpt_get_svc_entries(u64 ioc_guid,
+				 u16 slot, u8 hi, u8 lo, struct ib_dm_mad *mad)
+{
+	struct ib_dm_svc_entries *svc_entries;
+
+	WARN_ON(!ioc_guid);
+
+	if (!slot || slot > 16) {
+		mad->mad_hdr.status
+			= __constant_cpu_to_be16(DM_MAD_STATUS_INVALID_FIELD);
+		return;
+	}
+
+	if (slot > 2 || lo > hi || hi > 1) {
+		mad->mad_hdr.status
+			= __constant_cpu_to_be16(DM_MAD_STATUS_NO_IOC);
+		return;
+	}
+
+	svc_entries = (struct ib_dm_svc_entries *)mad->data;
+	memset(svc_entries, 0, sizeof *svc_entries);
+	svc_entries->service_entries[0].id = cpu_to_be64(ioc_guid);
+	snprintf(svc_entries->service_entries[0].name,
+		 sizeof(svc_entries->service_entries[0].name),
+		 "%s%016llx",
+		 SRP_SERVICE_NAME_PREFIX,
+		 ioc_guid);
+
+	mad->mad_hdr.status = 0;
+}
+
+/**
+ * srpt_mgmt_method_get() - Process a received management datagram.
+ * @sp:      source port through which the MAD has been received.
+ * @rq_mad:  received MAD.
+ * @rsp_mad: response MAD.
+ */
+static void srpt_mgmt_method_get(struct srpt_port *sp, struct ib_mad *rq_mad,
+				 struct ib_dm_mad *rsp_mad)
+{
+	u16 attr_id;
+	u32 slot;
+	u8 hi, lo;
+
+	attr_id = be16_to_cpu(rq_mad->mad_hdr.attr_id);
+	switch (attr_id) {
+	case DM_ATTR_CLASS_PORT_INFO:
+		srpt_get_class_port_info(rsp_mad);
+		break;
+	case DM_ATTR_IOU_INFO:
+		srpt_get_iou(rsp_mad);
+		break;
+	case DM_ATTR_IOC_PROFILE:
+		slot = be32_to_cpu(rq_mad->mad_hdr.attr_mod);
+		srpt_get_ioc(sp->sdev, slot, rsp_mad);
+		break;
+	case DM_ATTR_SVC_ENTRIES:
+		slot = be32_to_cpu(rq_mad->mad_hdr.attr_mod);
+		hi = (u8) ((slot >> 8) & 0xff);
+		lo = (u8) (slot & 0xff);
+		slot = (u16) ((slot >> 16) & 0xffff);
+		srpt_get_svc_entries(srpt_service_guid,
+				     slot, hi, lo, rsp_mad);
+		break;
+	default:
+		rsp_mad->mad_hdr.status =
+		    __constant_cpu_to_be16(DM_MAD_STATUS_UNSUP_METHOD_ATTR);
+		break;
+	}
+}
+
+/**
+ * srpt_mad_send_handler() - Post MAD-send callback function.
+ */
+static void srpt_mad_send_handler(struct ib_mad_agent *mad_agent,
+				  struct ib_mad_send_wc *mad_wc)
+{
+	ib_destroy_ah(mad_wc->send_buf->ah);
+	ib_free_send_mad(mad_wc->send_buf);
+}
+
+/**
+ * srpt_mad_recv_handler() - MAD reception callback function.
+ */
+static void srpt_mad_recv_handler(struct ib_mad_agent *mad_agent,
+				  struct ib_mad_recv_wc *mad_wc)
+{
+	struct srpt_port *sport = (struct srpt_port *)mad_agent->context;
+	struct ib_ah *ah;
+	struct ib_mad_send_buf *rsp;
+	struct ib_dm_mad *dm_mad;
+
+	if (!mad_wc || !mad_wc->recv_buf.mad)
+		return;
+
+	ah = ib_create_ah_from_wc(mad_agent->qp->pd, mad_wc->wc,
+				  mad_wc->recv_buf.grh, mad_agent->port_num);
+	if (IS_ERR(ah))
+		goto err;
+
+	BUILD_BUG_ON(offsetof(struct ib_dm_mad, data) != IB_MGMT_DEVICE_HDR);
+
+	rsp = ib_create_send_mad(mad_agent, mad_wc->wc->src_qp,
+				 mad_wc->wc->pkey_index, 0,
+				 IB_MGMT_DEVICE_HDR, IB_MGMT_DEVICE_DATA,
+				 GFP_KERNEL);
+	if (IS_ERR(rsp))
+		goto err_rsp;
+
+	rsp->ah = ah;
+
+	dm_mad = rsp->mad;
+	memcpy(dm_mad, mad_wc->recv_buf.mad, sizeof *dm_mad);
+	dm_mad->mad_hdr.method = IB_MGMT_METHOD_GET_RESP;
+	dm_mad->mad_hdr.status = 0;
+
+	switch (mad_wc->recv_buf.mad->mad_hdr.method) {
+	case IB_MGMT_METHOD_GET:
+		srpt_mgmt_method_get(sport, mad_wc->recv_buf.mad, dm_mad);
+		break;
+	case IB_MGMT_METHOD_SET:
+		dm_mad->mad_hdr.status =
+		    __constant_cpu_to_be16(DM_MAD_STATUS_UNSUP_METHOD_ATTR);
+		break;
+	default:
+		dm_mad->mad_hdr.status =
+		    __constant_cpu_to_be16(DM_MAD_STATUS_UNSUP_METHOD);
+		break;
+	}
+
+	if (!ib_post_send_mad(rsp, NULL)) {
+		ib_free_recv_mad(mad_wc);
+		/* will destroy_ah & free_send_mad in send completion */
+		return;
+	}
+
+	ib_free_send_mad(rsp);
+
+err_rsp:
+	ib_destroy_ah(ah);
+err:
+	ib_free_recv_mad(mad_wc);
+}
+
+/**
+ * srpt_refresh_port() - Configure a HCA port.
+ *
+ * Enable InfiniBand management datagram processing, update the cached sm_lid,
+ * lid and gid values, and register a callback function for processing MADs
+ * on the specified port.
+ *
+ * Note: It is safe to call this function more than once for the same port.
+ */
+static int srpt_refresh_port(struct srpt_port *sport)
+{
+	struct ib_mad_reg_req reg_req;
+	struct ib_port_modify port_modify;
+	struct ib_port_attr port_attr;
+	int ret;
+
+	memset(&port_modify, 0, sizeof port_modify);
+	port_modify.set_port_cap_mask = IB_PORT_DEVICE_MGMT_SUP;
+	port_modify.clr_port_cap_mask = 0;
+
+	ret = ib_modify_port(sport->sdev->device, sport->port, 0, &port_modify);
+	if (ret)
+		goto err_mod_port;
+
+	ret = ib_query_port(sport->sdev->device, sport->port, &port_attr);
+	if (ret)
+		goto err_query_port;
+
+	sport->sm_lid = port_attr.sm_lid;
+	sport->lid = port_attr.lid;
+
+	ret = ib_query_gid(sport->sdev->device, sport->port, 0, &sport->gid);
+	if (ret)
+		goto err_query_port;
+
+	if (!sport->mad_agent) {
+		memset(&reg_req, 0, sizeof reg_req);
+		reg_req.mgmt_class = IB_MGMT_CLASS_DEVICE_MGMT;
+		reg_req.mgmt_class_version = IB_MGMT_BASE_VERSION;
+		set_bit(IB_MGMT_METHOD_GET, reg_req.method_mask);
+		set_bit(IB_MGMT_METHOD_SET, reg_req.method_mask);
+
+		sport->mad_agent = ib_register_mad_agent(sport->sdev->device,
+							 sport->port,
+							 IB_QPT_GSI,
+							 &reg_req, 0,
+							 srpt_mad_send_handler,
+							 srpt_mad_recv_handler,
+							 sport);
+		if (IS_ERR(sport->mad_agent)) {
+			ret = PTR_ERR(sport->mad_agent);
+			sport->mad_agent = NULL;
+			goto err_query_port;
+		}
+	}
+
+	return 0;
+
+err_query_port:
+
+	port_modify.set_port_cap_mask = 0;
+	port_modify.clr_port_cap_mask = IB_PORT_DEVICE_MGMT_SUP;
+	ib_modify_port(sport->sdev->device, sport->port, 0, &port_modify);
+
+err_mod_port:
+
+	return ret;
+}
+
+/**
+ * srpt_unregister_mad_agent() - Unregister MAD callback functions.
+ *
+ * Note: It is safe to call this function more than once for the same device.
+ */
+static void srpt_unregister_mad_agent(struct srpt_device *sdev)
+{
+	struct ib_port_modify port_modify = {
+		.clr_port_cap_mask = IB_PORT_DEVICE_MGMT_SUP,
+	};
+	struct srpt_port *sport;
+	int i;
+
+	for (i = 1; i <= sdev->device->phys_port_cnt; i++) {
+		sport = &sdev->port[i - 1];
+		WARN_ON(sport->port != i);
+		if (ib_modify_port(sdev->device, i, 0, &port_modify) < 0)
+			PRINT_ERROR("%s", "disabling MAD processing failed.");
+		if (sport->mad_agent) {
+			ib_unregister_mad_agent(sport->mad_agent);
+			sport->mad_agent = NULL;
+		}
+	}
+}
+
+/**
+ * srpt_alloc_ioctx() - Allocate an SRPT I/O context structure.
+ */
+static struct srpt_ioctx *srpt_alloc_ioctx(struct srpt_device *sdev,
+					   int ioctx_size, int dma_size,
+					   enum dma_data_direction dir)
+{
+	struct srpt_ioctx *ioctx;
+
+	ioctx = kmalloc(ioctx_size, GFP_KERNEL);
+	if (!ioctx)
+		goto err;
+
+	ioctx->buf = kmalloc(dma_size, GFP_KERNEL);
+	if (!ioctx->buf)
+		goto err_free_ioctx;
+
+	ioctx->dma = ib_dma_map_single(sdev->device, ioctx->buf, dma_size, dir);
+	if (ib_dma_mapping_error(sdev->device, ioctx->dma))
+		goto err_free_buf;
+
+	return ioctx;
+
+err_free_buf:
+	kfree(ioctx->buf);
+err_free_ioctx:
+	kfree(ioctx);
+err:
+	return NULL;
+}
+
+/**
+ * srpt_free_ioctx() - Free an SRPT I/O context structure.
+ */
+static void srpt_free_ioctx(struct srpt_device *sdev, struct srpt_ioctx *ioctx,
+			    int dma_size, enum dma_data_direction dir)
+{
+	if (!ioctx)
+		return;
+
+	ib_dma_unmap_single(sdev->device, ioctx->dma, dma_size, dir);
+	kfree(ioctx->buf);
+	kfree(ioctx);
+}
+
+/**
+ * srpt_alloc_ioctx_ring() - Allocate a ring of SRPT I/O context structures.
+ * @sdev:       Device to allocate the I/O context ring for.
+ * @ring_size:  Number of elements in the I/O context ring.
+ * @ioctx_size: I/O context size.
+ * @dma_size:   DMA buffer size.
+ * @dir:        DMA data direction.
+ */
+static struct srpt_ioctx **srpt_alloc_ioctx_ring(struct srpt_device *sdev,
+				int ring_size, int ioctx_size,
+				int dma_size, enum dma_data_direction dir)
+{
+	struct srpt_ioctx **ring;
+	int i;
+
+	WARN_ON(ioctx_size != sizeof(struct srpt_recv_ioctx)
+		&& ioctx_size != sizeof(struct srpt_send_ioctx));
+	WARN_ON(dma_size != srp_max_req_size && dma_size != srp_max_rsp_size);
+
+	ring = kmalloc(ring_size * sizeof(ring[0]), GFP_KERNEL);
+	if (!ring)
+		goto out;
+	for (i = 0; i < ring_size; ++i) {
+		ring[i] = srpt_alloc_ioctx(sdev, ioctx_size, dma_size, dir);
+		if (!ring[i])
+			goto err;
+		ring[i]->index = i;
+	}
+	goto out;
+
+err:
+	while (--i >= 0)
+		srpt_free_ioctx(sdev, ring[i], dma_size, dir);
+	kfree(ring);
+out:
+	return ring;
+}
+
+/**
+ * srpt_free_ioctx_ring() - Free the ring of SRPT I/O context structures.
+ */
+static void srpt_free_ioctx_ring(struct srpt_ioctx **ioctx_ring,
+				 struct srpt_device *sdev, int ring_size,
+				 int dma_size, enum dma_data_direction dir)
+{
+	int i;
+
+	WARN_ON(dma_size != srp_max_req_size && dma_size != srp_max_rsp_size);
+
+	for (i = 0; i < ring_size; ++i)
+		srpt_free_ioctx(sdev, ioctx_ring[i], dma_size, dir);
+	kfree(ioctx_ring);
+}
+
+/**
+ * srpt_get_cmd_state() - Get the state of a SCSI command.
+ */
+static enum srpt_command_state srpt_get_cmd_state(struct srpt_send_ioctx *ioctx)
+{
+	BUG_ON(!ioctx);
+
+	return atomic_read(&ioctx->state);
+}
+
+/**
+ * srpt_set_cmd_state() - Set the state of a SCSI command.
+ * @new: New state to be set.
+ *
+ * Does not modify the state of aborted commands. Returns the previous command
+ * state.
+ */
+static enum srpt_command_state srpt_set_cmd_state(struct srpt_send_ioctx *ioctx,
+						  enum srpt_command_state new)
+{
+	enum srpt_command_state previous;
+
+	BUG_ON(!ioctx);
+
+	do {
+		previous = atomic_read(&ioctx->state);
+	} while (previous != SRPT_STATE_DONE
+	       && atomic_cmpxchg(&ioctx->state, previous, new) != previous);
+
+	return previous;
+}
+
+/**
+ * srpt_test_and_set_cmd_state() - Test and set the state of a command.
+ * @old: State to compare against.
+ * @new: New state to be set if the current state matches 'old'.
+ *
+ * Returns the previous command state.
+ */
+static enum srpt_command_state
+srpt_test_and_set_cmd_state(struct srpt_send_ioctx *ioctx,
+			    enum srpt_command_state old,
+			    enum srpt_command_state new)
+{
+	WARN_ON(!ioctx);
+	WARN_ON(old == SRPT_STATE_DONE);
+	WARN_ON(new == SRPT_STATE_NEW);
+
+	return atomic_cmpxchg(&ioctx->state, old, new);
+}
+
+/**
+ * srpt_post_recv() - Post an IB receive request.
+ */
+static int srpt_post_recv(struct srpt_device *sdev,
+			  struct srpt_recv_ioctx *ioctx)
+{
+	struct ib_sge list;
+	struct ib_recv_wr wr, *bad_wr;
+
+	BUG_ON(!sdev);
+	wr.wr_id = encode_wr_id(IB_WC_RECV, ioctx->ioctx.index);
+
+	list.addr = ioctx->ioctx.dma;
+	list.length = srp_max_req_size;
+	list.lkey = sdev->mr->lkey;
+
+	wr.next = NULL;
+	wr.sg_list = &list;
+	wr.num_sge = 1;
+
+	return ib_post_srq_recv(sdev->srq, &wr, &bad_wr);
+}
+
+/**
+ * srpt_post_send() - Post an IB send request.
+ * @ch: RDMA channel to post the send request on.
+ * @ioctx: I/O context of the send request.
+ * @len: length of the request to be sent in bytes.
+ *
+ * Returns zero upon success and a non-zero value upon failure.
+ */
+static int srpt_post_send(struct srpt_rdma_ch *ch,
+			  struct srpt_send_ioctx *ioctx, int len)
+{
+	struct ib_sge list;
+	struct ib_send_wr wr, *bad_wr;
+	struct srpt_device *sdev = ch->sport->sdev;
+	int ret;
+
+	ret = -ENOMEM;
+	if (atomic_dec_return(&ch->sq_wr_avail) < 0) {
+		PRINT_WARNING("%s", "IB send queue full (needed 1)");
+		goto out;
+	}
+
+	ib_dma_sync_single_for_device(sdev->device, ioctx->ioctx.dma, len,
+				      DMA_TO_DEVICE);
+
+	list.addr = ioctx->ioctx.dma;
+	list.length = len;
+	list.lkey = sdev->mr->lkey;
+
+	wr.next = NULL;
+	wr.wr_id = encode_wr_id(IB_WC_SEND, ioctx->ioctx.index);
+	wr.sg_list = &list;
+	wr.num_sge = 1;
+	wr.opcode = IB_WR_SEND;
+	wr.send_flags = IB_SEND_SIGNALED;
+
+	ret = ib_post_send(ch->qp, &wr, &bad_wr);
+
+out:
+	if (ret < 0)
+		atomic_inc(&ch->sq_wr_avail);
+	return ret;
+}
+
+/**
+ * srpt_get_desc_tbl() - Parse the data descriptors of an SRP_CMD request.
+ * @ioctx: Pointer to the I/O context associated with the request.
+ * @srp_cmd: Pointer to the SRP_CMD request data.
+ * @dir: Pointer to the variable to which the transfer direction will be
+ *   written.
+ * @data_len: Pointer to the variable to which the total data length of all
+ *   descriptors in the SRP_CMD request will be written.
+ *
+ * This function initializes ioctx->nrbuf and ioctx->r_bufs.
+ *
+ * Returns -EINVAL when the SRP_CMD request contains inconsistent descriptors;
+ * -ENOMEM when memory allocation fails and zero upon success.
+ */
+static int srpt_get_desc_tbl(struct srpt_send_ioctx *ioctx,
+			     struct srp_cmd *srp_cmd,
+			     scst_data_direction *dir, u64 *data_len)
+{
+	struct srp_indirect_buf *idb;
+	struct srp_direct_buf *db;
+	unsigned add_cdb_offset;
+	int ret;
+
+	/*
+	 * The pointer computations below will only be compiled correctly
+	 * if srp_cmd::add_data is declared as s8*, u8*, s8[] or u8[], so check
+	 * whether srp_cmd::add_data has been declared as a byte pointer.
+	 */
+	BUILD_BUG_ON(!__same_type(srp_cmd->add_data[0], (s8)0)
+		     && !__same_type(srp_cmd->add_data[0], (u8)0));
+
+	BUG_ON(!dir);
+	BUG_ON(!data_len);
+
+	ret = 0;
+	*data_len = 0;
+
+	/*
+	 * The lower four bits of the buffer format field contain the DATA-IN
+	 * buffer descriptor format, and the highest four bits contain the
+	 * DATA-OUT buffer descriptor format.
+	 */
+	*dir = SCST_DATA_NONE;
+	if (srp_cmd->buf_fmt & 0xf)
+		/* DATA-IN: transfer data from target to initiator. */
+		*dir = SCST_DATA_READ;
+	else if (srp_cmd->buf_fmt >> 4)
+		/* DATA-OUT: transfer data from initiator to target. */
+		*dir = SCST_DATA_WRITE;
+
+	/*
+	 * According to the SRP spec, the lower two bits of the 'ADDITIONAL
+	 * CDB LENGTH' field are reserved and the size in bytes of this field
+	 * is four times the value specified in bits 3..7. Hence the "& ~3".
+	 */
+	add_cdb_offset = srp_cmd->add_cdb_len & ~3;
+	if (((srp_cmd->buf_fmt & 0xf) == SRP_DATA_DESC_DIRECT) ||
+	    ((srp_cmd->buf_fmt >> 4) == SRP_DATA_DESC_DIRECT)) {
+		ioctx->n_rbuf = 1;
+		ioctx->rbufs = &ioctx->single_rbuf;
+
+		db = (struct srp_direct_buf *)(srp_cmd->add_data
+					       + add_cdb_offset);
+		memcpy(ioctx->rbufs, db, sizeof *db);
+		*data_len = be32_to_cpu(db->len);
+	} else if (((srp_cmd->buf_fmt & 0xf) == SRP_DATA_DESC_INDIRECT) ||
+		   ((srp_cmd->buf_fmt >> 4) == SRP_DATA_DESC_INDIRECT)) {
+		idb = (struct srp_indirect_buf *)(srp_cmd->add_data
+						  + add_cdb_offset);
+
+		ioctx->n_rbuf = be32_to_cpu(idb->table_desc.len) / sizeof *db;
+
+		if (ioctx->n_rbuf >
+		    (srp_cmd->data_out_desc_cnt + srp_cmd->data_in_desc_cnt)) {
+			PRINT_ERROR("received unsupported SRP_CMD request type"
+				    " (%u out + %u in != %u / %zu)",
+				    srp_cmd->data_out_desc_cnt,
+				    srp_cmd->data_in_desc_cnt,
+				    be32_to_cpu(idb->table_desc.len),
+				    sizeof(*db));
+			ioctx->n_rbuf = 0;
+			ret = -EINVAL;
+			goto out;
+		}
+
+		if (ioctx->n_rbuf == 1)
+			ioctx->rbufs = &ioctx->single_rbuf;
+		else {
+			ioctx->rbufs =
+				kmalloc(ioctx->n_rbuf * sizeof *db, GFP_ATOMIC);
+			if (!ioctx->rbufs) {
+				ioctx->n_rbuf = 0;
+				ret = -ENOMEM;
+				goto out;
+			}
+		}
+
+		db = idb->desc_list;
+		memcpy(ioctx->rbufs, db, ioctx->n_rbuf * sizeof *db);
+		*data_len = be32_to_cpu(idb->len);
+	}
+out:
+	return ret;
+}
+
+/**
+ * srpt_init_ch_qp() - Initialize queue pair attributes.
+ *
+ * Initialized the attributes of queue pair 'qp' by allowing local write,
+ * remote read and remote write. Also transitions 'qp' to state IB_QPS_INIT.
+ */
+static int srpt_init_ch_qp(struct srpt_rdma_ch *ch, struct ib_qp *qp)
+{
+	struct ib_qp_attr *attr;
+	int ret;
+
+	attr = kzalloc(sizeof *attr, GFP_KERNEL);
+	if (!attr)
+		return -ENOMEM;
+
+	attr->qp_state = IB_QPS_INIT;
+	attr->qp_access_flags = IB_ACCESS_LOCAL_WRITE | IB_ACCESS_REMOTE_READ |
+	    IB_ACCESS_REMOTE_WRITE;
+	attr->port_num = ch->sport->port;
+	attr->pkey_index = 0;
+
+	ret = ib_modify_qp(qp, attr,
+			   IB_QP_STATE | IB_QP_ACCESS_FLAGS | IB_QP_PORT |
+			   IB_QP_PKEY_INDEX);
+
+	kfree(attr);
+	return ret;
+}
+
+/**
+ * srpt_ch_qp_rtr() - Change the state of a channel to 'ready to receive' (RTR).
+ * @ch: channel of the queue pair.
+ * @qp: queue pair to change the state of.
+ *
+ * Returns zero upon success and a negative value upon failure.
+ *
+ * Note: currently a struct ib_qp_attr takes 136 bytes on a 64-bit system.
+ * If this structure ever becomes larger, it might be necessary to allocate
+ * it dynamically instead of on the stack.
+ */
+static int srpt_ch_qp_rtr(struct srpt_rdma_ch *ch, struct ib_qp *qp)
+{
+	struct ib_qp_attr qp_attr;
+	int attr_mask;
+	int ret;
+
+	qp_attr.qp_state = IB_QPS_RTR;
+	ret = ib_cm_init_qp_attr(ch->cm_id, &qp_attr, &attr_mask);
+	if (ret)
+		goto out;
+
+	qp_attr.max_dest_rd_atomic = 4;
+
+	ret = ib_modify_qp(qp, &qp_attr, attr_mask);
+
+out:
+	return ret;
+}
+
+/**
+ * srpt_ch_qp_rts() - Change the state of a channel to 'ready to send' (RTS).
+ * @ch: channel of the queue pair.
+ * @qp: queue pair to change the state of.
+ *
+ * Returns zero upon success and a negative value upon failure.
+ *
+ * Note: currently a struct ib_qp_attr takes 136 bytes on a 64-bit system.
+ * If this structure ever becomes larger, it might be necessary to allocate
+ * it dynamically instead of on the stack.
+ */
+static int srpt_ch_qp_rts(struct srpt_rdma_ch *ch, struct ib_qp *qp)
+{
+	struct ib_qp_attr qp_attr;
+	int attr_mask;
+	int ret;
+
+	qp_attr.qp_state = IB_QPS_RTS;
+	ret = ib_cm_init_qp_attr(ch->cm_id, &qp_attr, &attr_mask);
+	if (ret)
+		goto out;
+
+	qp_attr.max_rd_atomic = 4;
+
+	ret = ib_modify_qp(qp, &qp_attr, attr_mask);
+
+out:
+	return ret;
+}
+
+/**
+ * srpt_get_send_ioctx() - Obtain an I/O context for sending to the initiator.
+ */
+static struct srpt_send_ioctx *srpt_get_send_ioctx(struct srpt_rdma_ch *ch)
+{
+	struct srpt_send_ioctx *ioctx;
+	unsigned long flags;
+
+	BUG_ON(!ch);
+
+	ioctx = NULL;
+	spin_lock_irqsave(&ch->spinlock, flags);
+	if (!list_empty(&ch->free_list)) {
+		ioctx = list_first_entry(&ch->free_list,
+					 struct srpt_send_ioctx, free_list);
+		list_del(&ioctx->free_list);
+	}
+	spin_unlock_irqrestore(&ch->spinlock, flags);
+
+	if (!ioctx)
+		return ioctx;
+
+	BUG_ON(ioctx->ch != ch);
+	atomic_set(&ioctx->state, SRPT_STATE_NEW);
+	ioctx->n_rbuf = 0;
+	ioctx->rbufs = NULL;
+	ioctx->n_rdma = 0;
+	ioctx->n_rdma_ius = 0;
+	ioctx->rdma_ius = NULL;
+	ioctx->mapped_sg_count = 0;
+	ioctx->scmnd = NULL;
+
+	return ioctx;
+}
+
+/**
+ * srpt_put_send_ioctx() - Free up resources.
+ */
+static void srpt_put_send_ioctx(struct srpt_send_ioctx *ioctx)
+{
+	struct srpt_rdma_ch *ch;
+	unsigned long flags;
+
+	BUG_ON(!ioctx);
+	ch = ioctx->ch;
+	BUG_ON(!ch);
+
+	WARN_ON(srpt_get_cmd_state(ioctx) != SRPT_STATE_DONE);
+
+	ioctx->scmnd = NULL;
+
+	/*
+	 * If the WARN_ON() below gets triggered this means that
+	 * srpt_unmap_sg_to_ib_sge() has not been called before
+	 * scst_tgt_cmd_done().
+	 */
+	WARN_ON(ioctx->mapped_sg_count);
+
+	if (ioctx->n_rbuf > 1) {
+		kfree(ioctx->rbufs);
+		ioctx->rbufs = NULL;
+		ioctx->n_rbuf = 0;
+	}
+
+	spin_lock_irqsave(&ch->spinlock, flags);
+	list_add(&ioctx->free_list, &ch->free_list);
+	spin_unlock_irqrestore(&ch->spinlock, flags);
+}
+
+/**
+ * srpt_abort_scst_cmd() - Abort a SCSI command.
+ * @ioctx:   I/O context associated with the SCSI command.
+ * @context: Preferred execution context.
+ */
+static void srpt_abort_scst_cmd(struct srpt_send_ioctx *ioctx,
+				enum scst_exec_context context)
+{
+	struct scst_cmd *scmnd;
+	enum srpt_command_state state;
+
+	BUG_ON(!ioctx);
+
+	/*
+	 * If the command is in a state where the SCST core is waiting for the
+	 * ib_srpt driver, change the state to the next state. Changing the
+	 * state of the command from SRPT_STATE_NEED_DATA to SRPT_STATE_DATA_IN
+	 * ensures that srpt_xmit_response() will call this function a second
+	 * time.
+	 */
+	state = srpt_test_and_set_cmd_state(ioctx, SRPT_STATE_NEED_DATA,
+					    SRPT_STATE_DATA_IN);
+	if (state != SRPT_STATE_NEED_DATA) {
+		state = srpt_test_and_set_cmd_state(ioctx, SRPT_STATE_DATA_IN,
+						    SRPT_STATE_DONE);
+		if (state != SRPT_STATE_DATA_IN) {
+			state = srpt_test_and_set_cmd_state(ioctx,
+				    SRPT_STATE_CMD_RSP_SENT, SRPT_STATE_DONE);
+		}
+	}
+	if (state == SRPT_STATE_DONE)
+		goto out;
+
+	scmnd = ioctx->scmnd;
+	WARN_ON(!scmnd);
+	if (!scmnd)
+		goto out;
+
+	WARN_ON(ioctx != scst_cmd_get_tgt_priv(scmnd));
+
+	TRACE_DBG("Aborting cmd with state %d and tag %lld",
+		  state, scst_cmd_get_tag(scmnd));
+
+	switch (state) {
+	case SRPT_STATE_NEW:
+	case SRPT_STATE_DATA_IN:
+		/*
+		 * Do nothing - defer abort processing until
+		 * srpt_xmit_response() is invoked.
+		 */
+		WARN_ON(!scst_cmd_aborted(scmnd));
+		break;
+	case SRPT_STATE_NEED_DATA:
+		/* SCST_DATA_WRITE - RDMA read error or RDMA read timeout. */
+		scst_rx_data(ioctx->scmnd, SCST_RX_STATUS_ERROR, context);
+		break;
+	case SRPT_STATE_CMD_RSP_SENT:
+		/*
+		 * SRP_RSP sending failed or the SRP_RSP send completion has
+		 * not been received in time.
+		 */
+		srpt_unmap_sg_to_ib_sge(ioctx->ch, ioctx);
+		srpt_put_send_ioctx(ioctx);
+		scst_set_delivery_status(scmnd, SCST_CMD_DELIVERY_ABORTED);
+		scst_tgt_cmd_done(scmnd, context);
+		break;
+	case SRPT_STATE_MGMT_RSP_SENT:
+		/*
+		 * Management command response sending failed. This state is
+		 * never reached since there is no scmnd associated with
+		 * management commands. Note: the SCST core frees these
+		 * commands immediately after srpt_tsk_mgmt_done() returned.
+		 */
+		WARN_ON("ERROR: unexpected command state");
+		break;
+	default:
+		WARN_ON("ERROR: unexpected command state");
+		break;
+	}
+
+out:
+	;
+}
+
+/**
+ * srpt_handle_send_err_comp() - Process an IB_WC_SEND error completion.
+ */
+static void srpt_handle_send_err_comp(struct srpt_rdma_ch *ch, u64 wr_id,
+				      enum scst_exec_context context)
+{
+	struct srpt_send_ioctx *ioctx;
+	enum srpt_command_state state;
+	struct scst_cmd *scmnd;
+	u32 index;
+
+	atomic_inc(&ch->sq_wr_avail);
+
+	index = idx_from_wr_id(wr_id);
+	ioctx = ch->ioctx_ring[index];
+	state = srpt_get_cmd_state(ioctx);
+	scmnd = ioctx->scmnd;
+
+	EXTRACHECKS_WARN_ON(state != SRPT_STATE_CMD_RSP_SENT
+			    && state != SRPT_STATE_MGMT_RSP_SENT
+			    && state != SRPT_STATE_NEED_DATA
+			    && state != SRPT_STATE_DONE);
+
+	/* If SRP_RSP sending failed, undo the ch->req_lim change. */
+	if (state == SRPT_STATE_CMD_RSP_SENT
+	    || state == SRPT_STATE_MGMT_RSP_SENT)
+		atomic_dec(&ch->req_lim);
+	if (state != SRPT_STATE_DONE) {
+		if (scmnd)
+			srpt_abort_scst_cmd(ioctx, context);
+		else {
+			srpt_set_cmd_state(ioctx, SRPT_STATE_DONE);
+			srpt_put_send_ioctx(ioctx);
+		}
+	} else
+		PRINT_ERROR("Received more than one IB error completion"
+			    " for wr_id = %u.", (unsigned)index);
+}
+
+/**
+ * srpt_handle_send_comp() - Process an IB send completion notification.
+ */
+static void srpt_handle_send_comp(struct srpt_rdma_ch *ch,
+				  struct srpt_send_ioctx *ioctx,
+				  enum scst_exec_context context)
+{
+	enum srpt_command_state state;
+
+	atomic_inc(&ch->sq_wr_avail);
+
+	state = srpt_set_cmd_state(ioctx, SRPT_STATE_DONE);
+
+	EXTRACHECKS_WARN_ON(state != SRPT_STATE_CMD_RSP_SENT
+			    && state != SRPT_STATE_MGMT_RSP_SENT
+			    && state != SRPT_STATE_DONE);
+
+	if (state != SRPT_STATE_DONE) {
+		struct scst_cmd *scmnd;
+
+		scmnd = ioctx->scmnd;
+		EXTRACHECKS_WARN_ON((state == SRPT_STATE_MGMT_RSP_SENT)
+				    != (scmnd == NULL));
+		if (scmnd) {
+			srpt_unmap_sg_to_ib_sge(ch, ioctx);
+			srpt_put_send_ioctx(ioctx);
+			scst_tgt_cmd_done(scmnd, context);
+		} else
+			srpt_put_send_ioctx(ioctx);
+	} else {
+		PRINT_ERROR("IB completion has been received too late for"
+			    " wr_id = %u.", ioctx->ioctx.index);
+	}
+}
+
+/**
+ * srpt_handle_rdma_comp() - Process an IB RDMA completion notification.
+ */
+static void srpt_handle_rdma_comp(struct srpt_rdma_ch *ch,
+				  struct srpt_send_ioctx *ioctx,
+				  enum scst_exec_context context)
+{
+	enum srpt_command_state state;
+	struct scst_cmd *scmnd;
+
+	EXTRACHECKS_WARN_ON(ioctx->n_rdma <= 0);
+	atomic_add(ioctx->n_rdma, &ch->sq_wr_avail);
+
+	scmnd = ioctx->scmnd;
+	if (scmnd) {
+		state = srpt_test_and_set_cmd_state(ioctx, SRPT_STATE_NEED_DATA,
+						    SRPT_STATE_DATA_IN);
+		if (state == SRPT_STATE_NEED_DATA)
+			scst_rx_data(ioctx->scmnd, SCST_RX_STATUS_SUCCESS,
+				     context);
+		else
+			PRINT_ERROR("%s[%d]: wrong state = %d", __func__,
+				    __LINE__, state);
+	} else
+		PRINT_ERROR("%s[%d]: scmnd == NULL", __func__, __LINE__);
+}
+
+/**
+ * srpt_handle_rdma_err_comp() - Process an IB RDMA error completion.
+ */
+static void srpt_handle_rdma_err_comp(struct srpt_rdma_ch *ch,
+				      struct srpt_send_ioctx *ioctx,
+				      u8 opcode,
+				      enum scst_exec_context context)
+{
+	struct scst_cmd *scmnd;
+	enum srpt_command_state state;
+
+	scmnd = ioctx->scmnd;
+	state = srpt_get_cmd_state(ioctx);
+	if (scmnd) {
+		switch (opcode) {
+		case IB_WC_RDMA_READ:
+			if (ioctx->n_rdma <= 0) {
+				PRINT_ERROR("Received invalid RDMA read error"
+					    " completion with idx %d",
+					    ioctx->ioctx.index);
+				break;
+			}
+			atomic_add(ioctx->n_rdma, &ch->sq_wr_avail);
+			if (state == SRPT_STATE_NEED_DATA)
+				srpt_abort_scst_cmd(ioctx, context);
+			else
+				PRINT_ERROR("%s[%d]: wrong state = %d",
+					    __func__, __LINE__, state);
+			break;
+		case IB_WC_RDMA_WRITE:
+			scst_set_delivery_status(scmnd,
+						 SCST_CMD_DELIVERY_ABORTED);
+			break;
+		default:
+			PRINT_ERROR("%s[%d]: opcode = %u", __func__, __LINE__,
+				    opcode);
+			break;
+		}
+	} else
+		PRINT_ERROR("%s[%d]: scmnd == NULL", __func__, __LINE__);
+}
+
+/**
+ * srpt_build_cmd_rsp() - Build an SRP_RSP response.
+ * @ch: RDMA channel through which the request has been received.
+ * @ioctx: I/O context associated with the SRP_CMD request. The response will
+ *   be built in the buffer ioctx->buf points at and hence this function will
+ *   overwrite the request data.
+ * @tag: tag of the request for which this response is being generated.
+ * @status: value for the STATUS field of the SRP_RSP information unit.
+ * @sense_data: pointer to sense data to be included in the response.
+ * @sense_data_len: length in bytes of the sense data.
+ *
+ * Returns the size in bytes of the SRP_RSP response.
+ *
+ * An SRP_RSP response contains a SCSI status or service response. See also
+ * section 6.9 in the SRP r16a document for the format of an SRP_RSP
+ * response. See also SPC-2 for more information about sense data.
+ */
+static int srpt_build_cmd_rsp(struct srpt_rdma_ch *ch,
+			      struct srpt_send_ioctx *ioctx, u64 tag,
+			      int status, const u8 *sense_data,
+			      int sense_data_len)
+{
+	struct srp_rsp *srp_rsp;
+	int max_sense_len;
+
+	/*
+	 * The lowest bit of all SAM-3 status codes is zero (see also
+	 * paragraph 5.3 in SAM-3).
+	 */
+	EXTRACHECKS_WARN_ON(status & 1);
+
+	srp_rsp = ioctx->ioctx.buf;
+	BUG_ON(!srp_rsp);
+	memset(srp_rsp, 0, sizeof *srp_rsp);
+
+	srp_rsp->opcode = SRP_RSP;
+	srp_rsp->req_lim_delta = __constant_cpu_to_be32(1
+				    + atomic_xchg(&ch->req_lim_delta, 0));
+	srp_rsp->tag = tag;
+	srp_rsp->status = status;
+
+	if (!SCST_SENSE_VALID(sense_data))
+		sense_data_len = 0;
+	else {
+		BUILD_BUG_ON(MIN_MAX_RSP_SIZE <= sizeof(*srp_rsp));
+		max_sense_len = ch->max_ti_iu_len - sizeof(*srp_rsp);
+		if (sense_data_len > max_sense_len) {
+			PRINT_WARNING("truncated sense data from %d to %d"
+				" bytes", sense_data_len, max_sense_len);
+			sense_data_len = max_sense_len;
+		}
+
+		srp_rsp->flags |= SRP_RSP_FLAG_SNSVALID;
+		srp_rsp->sense_data_len = cpu_to_be32(sense_data_len);
+		memcpy(srp_rsp + 1, sense_data, sense_data_len);
+	}
+
+	return sizeof(*srp_rsp) + sense_data_len;
+}
+
+/**
+ * srpt_build_tskmgmt_rsp() - Build a task management response.
+ * @ch:       RDMA channel through which the request has been received.
+ * @ioctx:    I/O context in which the SRP_RSP response will be built.
+ * @rsp_code: RSP_CODE that will be stored in the response.
+ * @tag:      Tag of the request for which this response is being generated.
+ *
+ * Returns the size in bytes of the SRP_RSP response.
+ *
+ * An SRP_RSP response contains a SCSI status or service response. See also
+ * section 6.9 in the SRP r16a document for the format of an SRP_RSP
+ * response.
+ */
+static int srpt_build_tskmgmt_rsp(struct srpt_rdma_ch *ch,
+				  struct srpt_send_ioctx *ioctx,
+				  u8 rsp_code, u64 tag)
+{
+	struct srp_rsp *srp_rsp;
+	int resp_data_len;
+	int resp_len;
+
+	resp_data_len = (rsp_code == SRP_TSK_MGMT_SUCCESS) ? 0 : 4;
+	resp_len = sizeof(*srp_rsp) + resp_data_len;
+
+	srp_rsp = ioctx->ioctx.buf;
+	BUG_ON(!srp_rsp);
+	memset(srp_rsp, 0, sizeof *srp_rsp);
+
+	srp_rsp->opcode = SRP_RSP;
+	srp_rsp->req_lim_delta = __constant_cpu_to_be32(1
+				    + atomic_xchg(&ch->req_lim_delta, 0));
+	srp_rsp->tag = tag;
+
+	if (rsp_code != SRP_TSK_MGMT_SUCCESS) {
+		srp_rsp->flags |= SRP_RSP_FLAG_RSPVALID;
+		srp_rsp->resp_data_len = cpu_to_be32(resp_data_len);
+		srp_rsp->data[3] = rsp_code;
+	}
+
+	return resp_len;
+}
+
+/**
+ * srpt_handle_cmd() - Process SRP_CMD.
+ */
+static int srpt_handle_cmd(struct srpt_rdma_ch *ch,
+			   struct srpt_recv_ioctx *recv_ioctx,
+			   struct srpt_send_ioctx *send_ioctx,
+			   enum scst_exec_context context)
+{
+	struct scst_cmd *scmnd;
+	struct srp_cmd *srp_cmd;
+	scst_data_direction dir;
+	u64 data_len;
+	int ret;
+	int atomic;
+
+	BUG_ON(!send_ioctx);
+
+	srp_cmd = recv_ioctx->ioctx.buf;
+
+	atomic = context == SCST_CONTEXT_TASKLET ? SCST_ATOMIC
+		 : SCST_NON_ATOMIC;
+	scmnd = scst_rx_cmd(ch->scst_sess, (u8 *) &srp_cmd->lun,
+			    sizeof srp_cmd->lun, srp_cmd->cdb,
+			    sizeof srp_cmd->cdb, atomic);
+	if (!scmnd) {
+		PRINT_ERROR("0x%llx: allocation of an SCST command failed",
+			    srp_cmd->tag);
+		goto err;
+	}
+
+	send_ioctx->scmnd = scmnd;
+
+	ret = srpt_get_desc_tbl(send_ioctx, srp_cmd, &dir, &data_len);
+	if (ret) {
+		PRINT_ERROR("0x%llx: parsing SRP descriptor table failed.",
+			    srp_cmd->tag);
+		scst_set_cmd_error(scmnd,
+			SCST_LOAD_SENSE(scst_sense_invalid_field_in_cdb));
+	}
+
+	switch (srp_cmd->task_attr) {
+	case SRP_CMD_HEAD_OF_Q:
+		scst_cmd_set_queue_type(scmnd, SCST_CMD_QUEUE_HEAD_OF_QUEUE);
+		break;
+	case SRP_CMD_ORDERED_Q:
+		scst_cmd_set_queue_type(scmnd, SCST_CMD_QUEUE_ORDERED);
+		break;
+	case SRP_CMD_SIMPLE_Q:
+		scst_cmd_set_queue_type(scmnd, SCST_CMD_QUEUE_SIMPLE);
+		break;
+	case SRP_CMD_ACA:
+		scst_cmd_set_queue_type(scmnd, SCST_CMD_QUEUE_ACA);
+		break;
+	default:
+		scst_cmd_set_queue_type(scmnd, SCST_CMD_QUEUE_ORDERED);
+		break;
+	}
+
+	scst_cmd_set_tag(scmnd, srp_cmd->tag);
+	scst_cmd_set_tgt_priv(scmnd, send_ioctx);
+	scst_cmd_set_expected(scmnd, dir, data_len);
+	scst_cmd_init_done(scmnd, context);
+
+	return 0;
+
+err:
+	srpt_put_send_ioctx(send_ioctx);
+	return -1;
+}
+
+/**
+ * srpt_handle_tsk_mgmt() - Process an SRP_TSK_MGMT information unit.
+ *
+ * Returns SCST_MGMT_STATUS_SUCCESS upon success.
+ *
+ * Each task management function is performed by calling one of the
+ * scst_rx_mgmt_fn*() functions. These functions will either report failure
+ * or process the task management function asynchronously. The function
+ * srpt_tsk_mgmt_done() will be called by the SCST core upon completion of the
+ * task management function. When srpt_handle_tsk_mgmt() reports failure
+ * (i.e. returns -1) a response will have been built in ioctx->buf. This
+ * information unit has to be sent back by the caller.
+ *
+ * For more information about SRP_TSK_MGMT information units, see also section
+ * 6.7 in the SRP r16a document.
+ */
+static u8 srpt_handle_tsk_mgmt(struct srpt_rdma_ch *ch,
+			       struct srpt_recv_ioctx *recv_ioctx,
+			       struct srpt_send_ioctx *send_ioctx)
+{
+	struct srp_tsk_mgmt *srp_tsk;
+	struct srpt_mgmt_ioctx *mgmt_ioctx;
+	int ret;
+
+	ret = SCST_MGMT_STATUS_FAILED;
+
+	BUG_ON(!send_ioctx);
+
+	srp_tsk = recv_ioctx->ioctx.buf;
+
+	TRACE_DBG("recv_tsk_mgmt= %d for task_tag= %lld"
+		  " using tag= %lld cm_id= %p sess= %p",
+		  srp_tsk->tsk_mgmt_func, srp_tsk->task_tag, srp_tsk->tag,
+		  ch->cm_id, ch->scst_sess);
+
+	mgmt_ioctx = kmalloc(sizeof *mgmt_ioctx, GFP_ATOMIC);
+	if (!mgmt_ioctx) {
+		PRINT_ERROR("tag 0x%llx: memory allocation for task management"
+			    " function failed. Ignoring task management request"
+			    " (func %d).", srp_tsk->task_tag,
+			    srp_tsk->tsk_mgmt_func);
+		goto err;
+	}
+
+	mgmt_ioctx->ioctx = send_ioctx;
+	BUG_ON(mgmt_ioctx->ioctx->ch != ch);
+	mgmt_ioctx->tag = srp_tsk->tag;
+
+	switch (srp_tsk->tsk_mgmt_func) {
+	case SRP_TSK_ABORT_TASK:
+		TRACE_DBG("%s", "Processing SRP_TSK_ABORT_TASK");
+		ret = scst_rx_mgmt_fn_tag(ch->scst_sess,
+					  SCST_ABORT_TASK,
+					  srp_tsk->task_tag,
+					  SCST_ATOMIC, mgmt_ioctx);
+		break;
+	case SRP_TSK_ABORT_TASK_SET:
+		TRACE_DBG("%s", "Processing SRP_TSK_ABORT_TASK_SET");
+		ret = scst_rx_mgmt_fn_lun(ch->scst_sess,
+					  SCST_ABORT_TASK_SET,
+					  (u8 *) &srp_tsk->lun,
+					  sizeof srp_tsk->lun,
+					  SCST_ATOMIC, mgmt_ioctx);
+		break;
+	case SRP_TSK_CLEAR_TASK_SET:
+		TRACE_DBG("%s", "Processing SRP_TSK_CLEAR_TASK_SET");
+		ret = scst_rx_mgmt_fn_lun(ch->scst_sess,
+					  SCST_CLEAR_TASK_SET,
+					  (u8 *) &srp_tsk->lun,
+					  sizeof srp_tsk->lun,
+					  SCST_ATOMIC, mgmt_ioctx);
+		break;
+	case SRP_TSK_LUN_RESET:
+		TRACE_DBG("%s", "Processing SRP_TSK_LUN_RESET");
+		ret = scst_rx_mgmt_fn_lun(ch->scst_sess,
+					  SCST_LUN_RESET,
+					  (u8 *) &srp_tsk->lun,
+					  sizeof srp_tsk->lun,
+					  SCST_ATOMIC, mgmt_ioctx);
+		break;
+	case SRP_TSK_CLEAR_ACA:
+		TRACE_DBG("%s", "Processing SRP_TSK_CLEAR_ACA");
+		ret = scst_rx_mgmt_fn_lun(ch->scst_sess,
+					  SCST_CLEAR_ACA,
+					  (u8 *) &srp_tsk->lun,
+					  sizeof srp_tsk->lun,
+					  SCST_ATOMIC, mgmt_ioctx);
+		break;
+	default:
+		TRACE_DBG("%s", "Unsupported task management function.");
+		ret = SCST_MGMT_STATUS_FN_NOT_SUPPORTED;
+	}
+
+	if (ret != SCST_MGMT_STATUS_SUCCESS)
+		goto err;
+	return ret;
+
+err:
+	kfree(mgmt_ioctx);
+	return ret;
+}
+
+static u8 scst_to_srp_tsk_mgmt_status(const int scst_mgmt_status)
+{
+	switch (scst_mgmt_status) {
+	case SCST_MGMT_STATUS_SUCCESS:
+		return SRP_TSK_MGMT_SUCCESS;
+	case SCST_MGMT_STATUS_FN_NOT_SUPPORTED:
+		return SRP_TSK_MGMT_FUNC_NOT_SUPP;
+	case SCST_MGMT_STATUS_TASK_NOT_EXIST:
+	case SCST_MGMT_STATUS_LUN_NOT_EXIST:
+	case SCST_MGMT_STATUS_REJECTED:
+	case SCST_MGMT_STATUS_FAILED:
+	default:
+		break;
+	}
+	return SRP_TSK_MGMT_FAILED;
+}
+
+/**
+ * srpt_handle_new_iu() - Process a newly received information unit.
+ * @ch:    RDMA channel through which the information unit has been received.
+ * @ioctx: SRPT I/O context associated with the information unit.
+ */
+static void srpt_handle_new_iu(struct srpt_rdma_ch *ch,
+			       struct srpt_recv_ioctx *recv_ioctx,
+			       struct srpt_send_ioctx *send_ioctx,
+			       enum scst_exec_context context)
+{
+	struct srp_cmd *srp_cmd;
+	enum rdma_ch_state ch_state;
+
+	BUG_ON(!ch);
+	BUG_ON(!recv_ioctx);
+
+	ib_dma_sync_single_for_cpu(ch->sport->sdev->device,
+				   recv_ioctx->ioctx.dma, srp_max_req_size,
+				   DMA_FROM_DEVICE);
+
+	ch_state = atomic_read(&ch->state);
+	srp_cmd = recv_ioctx->ioctx.buf;
+	if (unlikely(ch_state == RDMA_CHANNEL_CONNECTING)) {
+		list_add_tail(&recv_ioctx->wait_list, &ch->cmd_wait_list);
+		goto out;
+	}
+
+	if (unlikely(ch_state == RDMA_CHANNEL_DISCONNECTING))
+		goto post_recv;
+
+	if (srp_cmd->opcode == SRP_CMD || srp_cmd->opcode == SRP_TSK_MGMT) {
+		if (!send_ioctx)
+			send_ioctx = srpt_get_send_ioctx(ch);
+		if (unlikely(!send_ioctx)) {
+			list_add_tail(&recv_ioctx->wait_list,
+				      &ch->cmd_wait_list);
+			goto out;
+		}
+	}
+
+	WARN_ON(ch_state != RDMA_CHANNEL_LIVE);
+
+	switch (srp_cmd->opcode) {
+	case SRP_CMD:
+		srpt_handle_cmd(ch, recv_ioctx, send_ioctx, context);
+		break;
+	case SRP_TSK_MGMT:
+		srpt_handle_tsk_mgmt(ch, recv_ioctx, send_ioctx);
+		break;
+	case SRP_I_LOGOUT:
+		PRINT_ERROR("%s", "Not yet implemented: SRP_I_LOGOUT");
+		break;
+	case SRP_CRED_RSP:
+		TRACE_DBG("%s", "received SRP_CRED_RSP");
+		break;
+	case SRP_AER_RSP:
+		TRACE_DBG("%s", "received SRP_AER_RSP");
+		break;
+	case SRP_RSP:
+		PRINT_ERROR("%s", "Received SRP_RSP");
+		break;
+	default:
+		PRINT_ERROR("received IU with unknown opcode 0x%x",
+			    srp_cmd->opcode);
+		break;
+	}
+
+post_recv:
+	srpt_post_recv(ch->sport->sdev, recv_ioctx);
+out:
+	return;
+}
+
+static void srpt_process_rcv_completion(struct ib_cq *cq,
+					struct srpt_rdma_ch *ch,
+					enum scst_exec_context context,
+					struct ib_wc *wc)
+{
+	struct srpt_device *sdev = ch->sport->sdev;
+	struct srpt_recv_ioctx *ioctx;
+	u32 index;
+
+	index = idx_from_wr_id(wc->wr_id);
+	if (wc->status == IB_WC_SUCCESS) {
+		int req_lim;
+
+		req_lim = atomic_dec_return(&ch->req_lim);
+		if (unlikely(req_lim < 0))
+			PRINT_ERROR("req_lim = %d < 0", req_lim);
+		ioctx = sdev->ioctx_ring[index];
+		srpt_handle_new_iu(ch, ioctx, NULL, context);
+	} else {
+		PRINT_INFO("receiving failed for idx %u with status %d",
+			   index, wc->status);
+	}
+}
+
+/**
+ * srpt_process_send_completion() - Process an IB send completion.
+ *
+ * Note: Although this has not yet been observed during tests, at least in
+ * theory it is possible that the srpt_get_send_ioctx() call invoked by
+ * srpt_handle_new_iu() fails. This is possible because the req_lim_delta
+ * value in each response is set to one, and it is possible that this response
+ * makes the initiator send a new request before the send completion for that
+ * response has been processed. This could e.g. happen if the call to
+ * srpt_put_send_iotcx() is delayed because of a higher priority interrupt or
+ * if IB retransmission causes generation of the send completion to be
+ * delayed. Incoming information units for which srpt_get_send_ioctx() fails
+ * are queued on cmd_wait_list. The code below processes these delayed
+ * requests one at a time.
+ */
+static void srpt_process_send_completion(struct ib_cq *cq,
+					 struct srpt_rdma_ch *ch,
+					 enum scst_exec_context context,
+					 struct ib_wc *wc)
+{
+	struct srpt_send_ioctx *send_ioctx;
+	uint32_t index;
+	u8 opcode;
+
+	index = idx_from_wr_id(wc->wr_id);
+	opcode = opcode_from_wr_id(wc->wr_id);
+	send_ioctx = ch->ioctx_ring[index];
+	if (wc->status == IB_WC_SUCCESS) {
+		if (opcode == IB_WC_SEND)
+			srpt_handle_send_comp(ch, send_ioctx, context);
+		else {
+			EXTRACHECKS_WARN_ON(wc->opcode != IB_WC_RDMA_READ);
+			srpt_handle_rdma_comp(ch, send_ioctx, context);
+		}
+	} else {
+		if (opcode == IB_WC_SEND) {
+			PRINT_INFO("sending response for idx %u failed with"
+				   " status %d", index, wc->status);
+			srpt_handle_send_err_comp(ch, wc->wr_id, context);
+		} else {
+			PRINT_INFO("RDMA %s for idx %u failed with status %d",
+				   opcode == IB_WC_RDMA_READ ? "read"
+				   : opcode == IB_WC_RDMA_WRITE ? "write"
+				   : "???", index, wc->status);
+			srpt_handle_rdma_err_comp(ch, send_ioctx, opcode,
+						  context);
+		}
+	}
+
+	while (unlikely(opcode == IB_WC_SEND
+			&& !list_empty(&ch->cmd_wait_list)
+			&& atomic_read(&ch->state) == RDMA_CHANNEL_LIVE
+			&& (send_ioctx = srpt_get_send_ioctx(ch)) != NULL)) {
+		struct srpt_recv_ioctx *recv_ioctx;
+
+		recv_ioctx = list_first_entry(&ch->cmd_wait_list,
+					      struct srpt_recv_ioctx,
+					      wait_list);
+		list_del(&recv_ioctx->wait_list);
+		srpt_handle_new_iu(ch, recv_ioctx, send_ioctx, context);
+	}
+}
+
+static void srpt_process_completion(struct ib_cq *cq,
+				    struct srpt_rdma_ch *ch,
+				    enum scst_exec_context context)
+{
+	struct ib_wc *const wc = ch->wc;
+	int i, n;
+
+	EXTRACHECKS_WARN_ON(cq != ch->cq);
+
+	ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+	while ((n = ib_poll_cq(cq, ARRAY_SIZE(ch->wc), wc)) > 0) {
+		for (i = 0; i < n; i++) {
+			if (opcode_from_wr_id(wc[i].wr_id) & IB_WC_RECV)
+				srpt_process_rcv_completion(cq, ch, context,
+							    &wc[i]);
+			else
+				srpt_process_send_completion(cq, ch, context,
+							     &wc[i]);
+		}
+	}
+}
+
+/**
+ * srpt_completion() - IB completion queue callback function.
+ *
+ * Notes:
+ * - It is guaranteed that a completion handler will never be invoked
+ *   concurrently on two different CPUs for the same completion queue. See also
+ *   Documentation/infiniband/core_locking.txt and the implementation of
+ *   handle_edge_irq() in kernel/irq/chip.c.
+ * - When threaded IRQs are enabled, completion handlers are invoked in thread
+ *   context instead of interrupt context.
+ */
+static void srpt_completion(struct ib_cq *cq, void *ctx)
+{
+	struct srpt_rdma_ch *ch = ctx;
+
+	BUG_ON(!ch);
+	atomic_inc(&ch->processing_compl);
+	switch (thread) {
+	case MODE_IB_COMPLETION_IN_THREAD:
+		wake_up_interruptible(&ch->wait_queue);
+		break;
+	case MODE_IB_COMPLETION_IN_SIRQ:
+		srpt_process_completion(cq, ch, SCST_CONTEXT_THREAD);
+		break;
+	case MODE_ALL_IN_SIRQ:
+		srpt_process_completion(cq, ch, SCST_CONTEXT_TASKLET);
+		break;
+	}
+	atomic_dec(&ch->processing_compl);
+}
+
+static int srpt_compl_thread(void *arg)
+{
+	struct srpt_rdma_ch *ch;
+
+	/* Hibernation / freezing of the SRPT kernel thread is not supported. */
+	current->flags |= PF_NOFREEZE;
+
+	ch = arg;
+	BUG_ON(!ch);
+	PRINT_INFO("Session %s: kernel thread %s (PID %d) started",
+		   ch->sess_name, ch->thread->comm, current->pid);
+	while (!kthread_should_stop()) {
+		wait_event_interruptible(ch->wait_queue,
+			(srpt_process_completion(ch->cq, ch,
+						 SCST_CONTEXT_THREAD),
+			 kthread_should_stop()));
+	}
+	PRINT_INFO("Session %s: kernel thread %s (PID %d) stopped",
+		   ch->sess_name, ch->thread->comm, current->pid);
+	return 0;
+}
+
+/**
+ * srpt_create_ch_ib() - Create receive and send completion queues.
+ */
+static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
+{
+	struct ib_qp_init_attr *qp_init;
+	struct srpt_device *sdev = ch->sport->sdev;
+	int ret;
+
+	EXTRACHECKS_WARN_ON(ch->rq_size < 1);
+
+	ret = -ENOMEM;
+	qp_init = kzalloc(sizeof *qp_init, GFP_KERNEL);
+	if (!qp_init)
+		goto out;
+
+	ch->cq = ib_create_cq(sdev->device, srpt_completion, NULL, ch,
+			      ch->rq_size + srpt_sq_size, 0);
+	if (IS_ERR(ch->cq)) {
+		ret = PTR_ERR(ch->cq);
+		PRINT_ERROR("failed to create CQ cqe= %d ret= %d",
+			    ch->rq_size + srpt_sq_size, ret);
+		goto out;
+	}
+
+	qp_init->qp_context = (void *)ch;
+	qp_init->event_handler
+		= (void(*)(struct ib_event *, void*))srpt_qp_event;
+	qp_init->send_cq = ch->cq;
+	qp_init->recv_cq = ch->cq;
+	qp_init->srq = sdev->srq;
+	qp_init->sq_sig_type = IB_SIGNAL_REQ_WR;
+	qp_init->qp_type = IB_QPT_RC;
+	qp_init->cap.max_send_wr = srpt_sq_size;
+	qp_init->cap.max_send_sge = SRPT_DEF_SG_PER_WQE;
+
+	ch->qp = ib_create_qp(sdev->pd, qp_init);
+	if (IS_ERR(ch->qp)) {
+		ret = PTR_ERR(ch->qp);
+		PRINT_ERROR("failed to create_qp ret= %d", ret);
+		goto err_destroy_cq;
+	}
+
+	atomic_set(&ch->sq_wr_avail, qp_init->cap.max_send_wr);
+
+	TRACE_DBG("%s: max_cqe= %d max_sge= %d sq_size = %d"
+		  " cm_id= %p", __func__, ch->cq->cqe,
+		  qp_init->cap.max_send_sge, qp_init->cap.max_send_wr,
+		  ch->cm_id);
+
+	ret = srpt_init_ch_qp(ch, ch->qp);
+	if (ret)
+		goto err_destroy_qp;
+
+	if (thread == MODE_IB_COMPLETION_IN_THREAD) {
+		init_waitqueue_head(&ch->wait_queue);
+
+		TRACE_DBG("creating IB completion thread for session %s",
+			  ch->sess_name);
+
+		ch->thread = kthread_run(srpt_compl_thread, ch,
+					 "ib_srpt_compl");
+		if (IS_ERR(ch->thread)) {
+			PRINT_ERROR("failed to create kernel thread %ld",
+				    PTR_ERR(ch->thread));
+			ch->thread = NULL;
+			goto err_destroy_qp;
+		}
+	} else
+		ib_req_notify_cq(ch->cq, IB_CQ_NEXT_COMP);
+
+out:
+	kfree(qp_init);
+	return ret;
+
+err_destroy_qp:
+	ib_destroy_qp(ch->qp);
+err_destroy_cq:
+	ib_destroy_cq(ch->cq);
+	goto out;
+}
+
+static void srpt_destroy_ch_ib(struct srpt_rdma_ch *ch)
+{
+	if (ch->thread)
+		kthread_stop(ch->thread);
+
+	ib_destroy_qp(ch->qp);
+	ib_destroy_cq(ch->cq);
+}
+
+/**
+ * srpt_unregister_channel() - Start RDMA channel disconnection.
+ *
+ * Note: The caller must hold ch->sdev->spinlock.
+ */
+static void srpt_unregister_channel(struct srpt_rdma_ch *ch)
+	__acquires(&ch->sport->sdev->spinlock)
+	__releases(&ch->sport->sdev->spinlock)
+{
+	struct srpt_device *sdev;
+	struct ib_qp_attr qp_attr;
+	int ret;
+
+	sdev = ch->sport->sdev;
+	list_del(&ch->list);
+	atomic_set(&ch->state, RDMA_CHANNEL_DISCONNECTING);
+	spin_unlock_irq(&sdev->spinlock);
+
+	qp_attr.qp_state = IB_QPS_ERR;
+	ret = ib_modify_qp(ch->qp, &qp_attr, IB_QP_STATE);
+	if (ret < 0)
+		PRINT_ERROR("Setting queue pair in error state failed: %d",
+			    ret);
+
+	while (atomic_read(&ch->processing_compl))
+		;
+
+	/*
+	 * At this point it is guaranteed that no new commands will be sent to
+	 * the SCST core for channel ch, which is a requirement for
+	 * scst_unregister_session().
+	 */
+
+	TRACE_DBG("unregistering session %p", ch->scst_sess);
+	scst_unregister_session(ch->scst_sess, 0, srpt_release_channel);
+	spin_lock_irq(&sdev->spinlock);
+}
+
+/**
+ * srpt_release_channel_by_cmid() - Release a channel.
+ * @cm_id: Pointer to the CM ID of the channel to be released.
+ *
+ * Note: Must be called from inside srpt_cm_handler to avoid a race between
+ * accessing sdev->spinlock and the call to kfree(sdev) in srpt_remove_one()
+ * (the caller of srpt_cm_handler holds the cm_id spinlock; srpt_remove_one()
+ * waits until all SCST sessions for the associated IB device have been
+ * unregistered and SCST session registration involves a call to
+ * ib_destroy_cm_id(), which locks the cm_id spinlock and hence waits until
+ * this function has finished).
+ */
+static void srpt_release_channel_by_cmid(struct ib_cm_id *cm_id)
+{
+	struct srpt_device *sdev;
+	struct srpt_rdma_ch *ch;
+
+	EXTRACHECKS_WARN_ON_ONCE(irqs_disabled());
+
+	sdev = cm_id->context;
+	BUG_ON(!sdev);
+	spin_lock_irq(&sdev->spinlock);
+	list_for_each_entry(ch, &sdev->rch_list, list) {
+		if (ch->cm_id == cm_id) {
+			srpt_unregister_channel(ch);
+			break;
+		}
+	}
+	spin_unlock_irq(&sdev->spinlock);
+}
+
+/**
+ * srpt_find_channel() - Look up an RDMA channel.
+ * @cm_id: Pointer to the CM ID of the channel to be looked up.
+ *
+ * Return NULL if no matching RDMA channel has been found.
+ */
+static struct srpt_rdma_ch *srpt_find_channel(struct srpt_device *sdev,
+					      struct ib_cm_id *cm_id)
+{
+	struct srpt_rdma_ch *ch;
+	bool found;
+
+	EXTRACHECKS_WARN_ON_ONCE(irqs_disabled());
+	BUG_ON(!sdev);
+
+	found = false;
+	spin_lock_irq(&sdev->spinlock);
+	list_for_each_entry(ch, &sdev->rch_list, list) {
+		if (ch->cm_id == cm_id) {
+			found = true;
+			break;
+		}
+	}
+	spin_unlock_irq(&sdev->spinlock);
+
+	return found ? ch : NULL;
+}
+
+/**
+ * srpt_release_channel() - Release all resources associated with an RDMA channel.
+ *
+ * Notes:
+ * - The caller must have removed the channel from the channel list before
+ *   calling this function.
+ * - Must be called as a callback function via scst_unregister_session(). Never
+ *   call this function directly because doing so would trigger several race
+ *   conditions.
+ * - Do not access ch->sport or ch->sport->sdev in this function because the
+ *   memory that was allocated for the sport and/or sdev data structures may
+ *   already have been freed at the time this function is called.
+ */
+static void srpt_release_channel(struct scst_session *scst_sess)
+{
+	struct srpt_rdma_ch *ch;
+
+	ch = scst_sess_get_tgt_priv(scst_sess);
+	BUG_ON(!ch);
+	WARN_ON(atomic_read(&ch->state) != RDMA_CHANNEL_DISCONNECTING);
+
+	TRACE_DBG("destroying cm_id %p", ch->cm_id);
+	BUG_ON(!ch->cm_id);
+	ib_destroy_cm_id(ch->cm_id);
+
+	srpt_destroy_ch_ib(ch);
+
+	srpt_free_ioctx_ring((struct srpt_ioctx **)ch->ioctx_ring,
+			     ch->sport->sdev, ch->rq_size,
+			     srp_max_rsp_size, DMA_TO_DEVICE);
+
+	kfree(ch);
+}
+
+/**
+ * srpt_enable_target() - Allows to enable a target via sysfs.
+ */
+static int srpt_enable_target(struct scst_tgt *scst_tgt, bool enable)
+{
+	struct srpt_device *sdev = scst_tgt_get_tgt_priv(scst_tgt);
+
+	EXTRACHECKS_WARN_ON_ONCE(irqs_disabled());
+
+	if (!sdev)
+		return -ENOENT;
+
+	TRACE_DBG("%s target %s", enable ? "Enabling" : "Disabling",
+		  sdev->device->name);
+
+	spin_lock_irq(&sdev->spinlock);
+	sdev->enabled = enable;
+	spin_unlock_irq(&sdev->spinlock);
+
+	return 0;
+}
+
+/**
+ * srpt_is_target_enabled() - Allows to query a targets status via sysfs.
+ */
+static bool srpt_is_target_enabled(struct scst_tgt *scst_tgt)
+{
+	struct srpt_device *sdev = scst_tgt_get_tgt_priv(scst_tgt);
+	bool res;
+
+	EXTRACHECKS_WARN_ON_ONCE(irqs_disabled());
+
+	if (!sdev)
+		return false;
+
+	spin_lock_irq(&sdev->spinlock);
+	res = sdev->enabled;
+	spin_unlock_irq(&sdev->spinlock);
+	return res;
+}
+
+/**
+ * srpt_cm_req_recv() - Process the event IB_CM_REQ_RECEIVED.
+ *
+ * Ownership of the cm_id is transferred to the SCST session if this functions
+ * returns zero. Otherwise the caller remains the owner of cm_id.
+ */
+static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
+			    struct ib_cm_req_event_param *param,
+			    void *private_data)
+{
+	struct srpt_device *sdev = cm_id->context;
+	struct srp_login_req *req;
+	struct srp_login_rsp *rsp;
+	struct srp_login_rej *rej;
+	struct ib_cm_rep_param *rep_param;
+	struct srpt_rdma_ch *ch, *tmp_ch;
+	u32 it_iu_len;
+	int i;
+	int ret = 0;
+
+	EXTRACHECKS_WARN_ON_ONCE(irqs_disabled());
+
+	if (WARN_ON(!sdev || !private_data))
+		return -EINVAL;
+
+	req = (struct srp_login_req *)private_data;
+
+	it_iu_len = be32_to_cpu(req->req_it_iu_len);
+
+	PRINT_INFO("Received SRP_LOGIN_REQ with"
+	    " i_port_id 0x%llx:0x%llx, t_port_id 0x%llx:0x%llx and it_iu_len %d"
+	    " on port %d (guid=0x%llx:0x%llx)",
+	    be64_to_cpu(*(__be64 *)&req->initiator_port_id[0]),
+	    be64_to_cpu(*(__be64 *)&req->initiator_port_id[8]),
+	    be64_to_cpu(*(__be64 *)&req->target_port_id[0]),
+	    be64_to_cpu(*(__be64 *)&req->target_port_id[8]),
+	    it_iu_len,
+	    param->port,
+	    be64_to_cpu(*(__be64 *)&sdev->port[param->port - 1].gid.raw[0]),
+	    be64_to_cpu(*(__be64 *)&sdev->port[param->port - 1].gid.raw[8]));
+
+	rsp = kzalloc(sizeof *rsp, GFP_KERNEL);
+	rej = kzalloc(sizeof *rej, GFP_KERNEL);
+	rep_param = kzalloc(sizeof *rep_param, GFP_KERNEL);
+
+	if (!rsp || !rej || !rep_param) {
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	if (it_iu_len > srp_max_req_size || it_iu_len < 64) {
+		rej->reason = __constant_cpu_to_be32(
+				SRP_LOGIN_REJ_REQ_IT_IU_LENGTH_TOO_LARGE);
+		ret = -EINVAL;
+		PRINT_ERROR("rejected SRP_LOGIN_REQ because its"
+			    " length (%d bytes) is out of range (%d .. %d)",
+			    it_iu_len, 64, srp_max_req_size);
+		goto reject;
+	}
+
+	if (!srpt_is_target_enabled(sdev->scst_tgt)) {
+		rej->reason = __constant_cpu_to_be32(
+				SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
+		ret = -EINVAL;
+		PRINT_ERROR("rejected SRP_LOGIN_REQ because the target %s"
+			    " has not yet been enabled", sdev->device->name);
+		goto reject;
+	}
+
+	if ((req->req_flags & SRP_MTCH_ACTION) == SRP_MULTICHAN_SINGLE) {
+		rsp->rsp_flags = SRP_LOGIN_RSP_MULTICHAN_NO_CHAN;
+
+		spin_lock_irq(&sdev->spinlock);
+
+		list_for_each_entry_safe(ch, tmp_ch, &sdev->rch_list, list) {
+			if (!memcmp(ch->i_port_id, req->initiator_port_id, 16)
+			    && !memcmp(ch->t_port_id, req->target_port_id, 16)
+			    && param->port == ch->sport->port
+			    && param->listen_id == ch->sport->sdev->cm_id
+			    && ch->cm_id) {
+				enum rdma_ch_state prev_state;
+
+				/* found an existing channel */
+				TRACE_DBG("Found existing channel name= %s"
+					  " cm_id= %p state= %d",
+					  ch->sess_name, ch->cm_id,
+					  atomic_read(&ch->state));
+
+				prev_state = atomic_xchg(&ch->state,
+						RDMA_CHANNEL_DISCONNECTING);
+				if (prev_state == RDMA_CHANNEL_CONNECTING)
+					srpt_unregister_channel(ch);
+
+				spin_unlock_irq(&sdev->spinlock);
+
+				rsp->rsp_flags =
+					SRP_LOGIN_RSP_MULTICHAN_TERMINATED;
+
+				if (prev_state == RDMA_CHANNEL_LIVE) {
+					ib_send_cm_dreq(ch->cm_id, NULL, 0);
+					PRINT_INFO("disconnected"
+					  " session %s because a new"
+					  " SRP_LOGIN_REQ has been received.",
+					  ch->sess_name);
+				} else if (prev_state ==
+					 RDMA_CHANNEL_CONNECTING) {
+					PRINT_ERROR("%s", "rejected"
+					  " SRP_LOGIN_REQ because another login"
+					  " request is being processed.");
+					ib_send_cm_rej(ch->cm_id,
+						       IB_CM_REJ_NO_RESOURCES,
+						       NULL, 0, NULL, 0);
+				}
+
+				spin_lock_irq(&sdev->spinlock);
+			}
+		}
+
+		spin_unlock_irq(&sdev->spinlock);
+
+	} else
+		rsp->rsp_flags = SRP_LOGIN_RSP_MULTICHAN_MAINTAINED;
+
+	if (*(__be64 *)req->target_port_id != cpu_to_be64(srpt_service_guid)
+	    || *(__be64 *)(req->target_port_id + 8) !=
+	       cpu_to_be64(srpt_service_guid)) {
+		rej->reason = __constant_cpu_to_be32(
+				SRP_LOGIN_REJ_UNABLE_ASSOCIATE_CHANNEL);
+		ret = -ENOMEM;
+		PRINT_ERROR("%s", "rejected SRP_LOGIN_REQ because it"
+		       " has an invalid target port identifier.");
+		goto reject;
+	}
+
+	ch = kzalloc(sizeof *ch, GFP_KERNEL);
+	if (!ch) {
+		rej->reason = __constant_cpu_to_be32(
+					SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
+		PRINT_ERROR("%s",
+			    "rejected SRP_LOGIN_REQ because out of memory.");
+		ret = -ENOMEM;
+		goto reject;
+	}
+
+	memcpy(ch->i_port_id, req->initiator_port_id, 16);
+	memcpy(ch->t_port_id, req->target_port_id, 16);
+	ch->sport = &sdev->port[param->port - 1];
+	ch->cm_id = cm_id;
+	/*
+	 * Avoid QUEUE_FULL conditions by limiting the number of buffers used
+	 * for the SRP protocol to the SCST SCSI command queue size.
+	 */
+	ch->rq_size = min(SRPT_RQ_SIZE, scst_get_max_lun_commands(NULL, 0));
+	atomic_set(&ch->processing_compl, 0);
+	atomic_set(&ch->state, RDMA_CHANNEL_CONNECTING);
+	INIT_LIST_HEAD(&ch->cmd_wait_list);
+
+	spin_lock_init(&ch->spinlock);
+	ch->ioctx_ring = (struct srpt_send_ioctx **)
+		srpt_alloc_ioctx_ring(ch->sport->sdev, ch->rq_size,
+				      sizeof(*ch->ioctx_ring[0]),
+				      srp_max_rsp_size, DMA_TO_DEVICE);
+	if (!ch->ioctx_ring)
+		goto free_ch;
+
+	INIT_LIST_HEAD(&ch->free_list);
+	for (i = 0; i < ch->rq_size; i++) {
+		ch->ioctx_ring[i]->ch = ch;
+		list_add_tail(&ch->ioctx_ring[i]->free_list, &ch->free_list);
+	}
+
+	ret = srpt_create_ch_ib(ch);
+	if (ret) {
+		rej->reason = __constant_cpu_to_be32(
+				SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
+		PRINT_ERROR("%s", "rejected SRP_LOGIN_REQ because creating"
+			    " a new RDMA channel failed.");
+		goto free_ring;
+	}
+
+	ret = srpt_ch_qp_rtr(ch, ch->qp);
+	if (ret) {
+		rej->reason = __constant_cpu_to_be32(
+				SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
+		PRINT_ERROR("rejected SRP_LOGIN_REQ because enabling"
+		       " RTR failed (error code = %d)", ret);
+		goto destroy_ib;
+	}
+
+	if (use_port_guid_in_session_name) {
+		/*
+		 * If the kernel module parameter use_port_guid_in_session_name
+		 * has been specified, use a combination of the target port
+		 * GUID and the initiator port ID as the session name. This
+		 * was the original behavior of the SRP target implementation
+		 * (i.e. before the SRPT was included in OFED 1.3).
+		 */
+		snprintf(ch->sess_name, sizeof(ch->sess_name),
+			 "0x%016llx%016llx",
+			 be64_to_cpu(*(__be64 *)
+				&sdev->port[param->port - 1].gid.raw[8]),
+			 be64_to_cpu(*(__be64 *)(ch->i_port_id + 8)));
+	} else {
+		/*
+		 * Default behavior: use the initator port identifier as the
+		 * session name.
+		 */
+		snprintf(ch->sess_name, sizeof(ch->sess_name),
+			 "0x%016llx%016llx",
+			 be64_to_cpu(*(__be64 *)ch->i_port_id),
+			 be64_to_cpu(*(__be64 *)(ch->i_port_id + 8)));
+	}
+
+	TRACE_DBG("registering session %s", ch->sess_name);
+
+	BUG_ON(!sdev->scst_tgt);
+	ch->scst_sess = scst_register_session(sdev->scst_tgt, 0, ch->sess_name,
+					      ch, NULL, NULL);
+	if (!ch->scst_sess) {
+		rej->reason = __constant_cpu_to_be32(
+				SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
+		TRACE_DBG("%s", "Failed to create SCST session");
+		goto release_channel;
+	}
+
+	TRACE_DBG("Establish connection sess=%p name=%s cm_id=%p",
+		  ch->scst_sess, ch->sess_name, ch->cm_id);
+
+	/* create srp_login_response */
+	rsp->opcode = SRP_LOGIN_RSP;
+	rsp->tag = req->tag;
+	rsp->max_it_iu_len = req->req_it_iu_len;
+	rsp->max_ti_iu_len = req->req_it_iu_len;
+	ch->max_ti_iu_len = it_iu_len;
+	rsp->buf_fmt = __constant_cpu_to_be16(SRP_BUF_FORMAT_DIRECT
+					      | SRP_BUF_FORMAT_INDIRECT);
+	rsp->req_lim_delta = cpu_to_be32(ch->rq_size);
+	atomic_set(&ch->req_lim, ch->rq_size);
+	atomic_set(&ch->req_lim_delta, 0);
+
+	/* create cm reply */
+	rep_param->qp_num = ch->qp->qp_num;
+	rep_param->private_data = (void *)rsp;
+	rep_param->private_data_len = sizeof *rsp;
+	rep_param->rnr_retry_count = 7;
+	rep_param->flow_control = 1;
+	rep_param->failover_accepted = 0;
+	rep_param->srq = 1;
+	rep_param->responder_resources = 4;
+	rep_param->initiator_depth = 4;
+
+	ret = ib_send_cm_rep(cm_id, rep_param);
+	if (ret) {
+		PRINT_ERROR("sending SRP_LOGIN_REQ response failed"
+			    " (error code = %d)", ret);
+		goto release_channel;
+	}
+
+	spin_lock_irq(&sdev->spinlock);
+	list_add_tail(&ch->list, &sdev->rch_list);
+	spin_unlock_irq(&sdev->spinlock);
+
+	goto out;
+
+release_channel:
+	atomic_set(&ch->state, RDMA_CHANNEL_DISCONNECTING);
+	scst_unregister_session(ch->scst_sess, 0, NULL);
+	ch->scst_sess = NULL;
+
+destroy_ib:
+	srpt_destroy_ch_ib(ch);
+
+free_ring:
+	srpt_free_ioctx_ring((struct srpt_ioctx **)ch->ioctx_ring,
+			     ch->sport->sdev, ch->rq_size,
+			     srp_max_rsp_size, DMA_TO_DEVICE);
+
+free_ch:
+	kfree(ch);
+
+reject:
+	rej->opcode = SRP_LOGIN_REJ;
+	rej->tag = req->tag;
+	rej->buf_fmt = __constant_cpu_to_be16(SRP_BUF_FORMAT_DIRECT
+					      | SRP_BUF_FORMAT_INDIRECT);
+
+	ib_send_cm_rej(cm_id, IB_CM_REJ_CONSUMER_DEFINED, NULL, 0,
+			     (void *)rej, sizeof *rej);
+
+out:
+	kfree(rep_param);
+	kfree(rsp);
+	kfree(rej);
+
+	return ret;
+}
+
+static void srpt_cm_rej_recv(struct ib_cm_id *cm_id)
+{
+	PRINT_INFO("Received InfiniBand REJ packet for cm_id %p.", cm_id);
+	srpt_release_channel_by_cmid(cm_id);
+}
+
+/**
+ * srpt_cm_rtu_recv() - Process an IB_CM_RTU_RECEIVED or IB_CM_USER_ESTABLISHED event.
+ *
+ * An IB_CM_RTU_RECEIVED message indicates that the connection is established
+ * and that the recipient may begin transmitting (RTU = ready to use).
+ */
+static void srpt_cm_rtu_recv(struct ib_cm_id *cm_id)
+{
+	struct srpt_rdma_ch *ch;
+	int ret;
+
+	ch = srpt_find_channel(cm_id->context, cm_id);
+	WARN_ON(!ch);
+	if (!ch)
+		goto out;
+
+	if (srpt_test_and_set_channel_state(ch, RDMA_CHANNEL_CONNECTING,
+			RDMA_CHANNEL_LIVE) == RDMA_CHANNEL_CONNECTING) {
+		struct srpt_recv_ioctx *ioctx, *ioctx_tmp;
+
+		ret = srpt_ch_qp_rts(ch, ch->qp);
+
+		list_for_each_entry_safe(ioctx, ioctx_tmp, &ch->cmd_wait_list,
+					 wait_list) {
+			list_del(&ioctx->wait_list);
+			srpt_handle_new_iu(ch, ioctx, NULL,
+					   SCST_CONTEXT_THREAD);
+		}
+		if (ret && srpt_test_and_set_channel_state(ch,
+			RDMA_CHANNEL_LIVE,
+			RDMA_CHANNEL_DISCONNECTING) == RDMA_CHANNEL_LIVE) {
+			TRACE_DBG("cm_id=%p sess_name=%s state=%d",
+				  cm_id, ch->sess_name,
+				  atomic_read(&ch->state));
+			ib_send_cm_dreq(ch->cm_id, NULL, 0);
+		}
+	}
+
+out:
+	;
+}
+
+static void srpt_cm_timewait_exit(struct ib_cm_id *cm_id)
+{
+	PRINT_INFO("Received InfiniBand TimeWait exit for cm_id %p.", cm_id);
+	srpt_release_channel_by_cmid(cm_id);
+}
+
+static void srpt_cm_rep_error(struct ib_cm_id *cm_id)
+{
+	PRINT_INFO("Received InfiniBand REP error for cm_id %p.", cm_id);
+	srpt_release_channel_by_cmid(cm_id);
+}
+
+/**
+ * srpt_cm_dreq_recv() - Process reception of a DREQ message.
+ */
+static void srpt_cm_dreq_recv(struct ib_cm_id *cm_id)
+{
+	struct srpt_rdma_ch *ch;
+
+	ch = srpt_find_channel(cm_id->context, cm_id);
+	if (!ch) {
+		TRACE_DBG("Received DREQ for channel %p which is already"
+			  " being unregistered.", cm_id);
+		goto out;
+	}
+
+	TRACE_DBG("cm_id= %p ch->state= %d", cm_id, atomic_read(&ch->state));
+
+	switch (atomic_read(&ch->state)) {
+	case RDMA_CHANNEL_LIVE:
+	case RDMA_CHANNEL_CONNECTING:
+		ib_send_cm_drep(ch->cm_id, NULL, 0);
+		PRINT_INFO("Received DREQ and sent DREP for session %s.",
+			   ch->sess_name);
+		break;
+	case RDMA_CHANNEL_DISCONNECTING:
+	default:
+		break;
+	}
+
+out:
+	;
+}
+
+/**
+ * srpt_cm_drep_recv() - Process reception of a DREP message.
+ */
+static void srpt_cm_drep_recv(struct ib_cm_id *cm_id)
+{
+	PRINT_INFO("Received InfiniBand DREP message for cm_id %p.", cm_id);
+	srpt_release_channel_by_cmid(cm_id);
+}
+
+/**
+ * srpt_cm_handler() - IB connection manager callback function.
+ *
+ * A non-zero return value will cause the caller destroy the CM ID.
+ *
+ * Note: srpt_cm_handler() must only return a non-zero value when transferring
+ * ownership of the cm_id to a channel by srpt_cm_req_recv() failed. Returning
+ * a non-zero value in any other case will trigger a race with the
+ * ib_destroy_cm_id() call in srpt_release_channel().
+ */
+static int srpt_cm_handler(struct ib_cm_id *cm_id, struct ib_cm_event *event)
+{
+	int ret;
+
+	ret = 0;
+	switch (event->event) {
+	case IB_CM_REQ_RECEIVED:
+		ret = srpt_cm_req_recv(cm_id, &event->param.req_rcvd,
+				       event->private_data);
+		break;
+	case IB_CM_REJ_RECEIVED:
+		srpt_cm_rej_recv(cm_id);
+		break;
+	case IB_CM_RTU_RECEIVED:
+	case IB_CM_USER_ESTABLISHED:
+		srpt_cm_rtu_recv(cm_id);
+		break;
+	case IB_CM_DREQ_RECEIVED:
+		srpt_cm_dreq_recv(cm_id);
+		break;
+	case IB_CM_DREP_RECEIVED:
+		srpt_cm_drep_recv(cm_id);
+		break;
+	case IB_CM_TIMEWAIT_EXIT:
+		srpt_cm_timewait_exit(cm_id);
+		break;
+	case IB_CM_REP_ERROR:
+		srpt_cm_rep_error(cm_id);
+		break;
+	case IB_CM_DREQ_ERROR:
+		PRINT_INFO("%s", "Received IB DREQ ERROR event.");
+		break;
+	case IB_CM_MRA_RECEIVED:
+		PRINT_INFO("%s", "Received IB MRA event");
+		break;
+	default:
+		PRINT_ERROR("received unrecognized IB CM event %d",
+			    event->event);
+		break;
+	}
+
+	return ret;
+}
+
+/**
+ * srpt_map_sg_to_ib_sge() - Map an SG list to an IB SGE list.
+ */
+static int srpt_map_sg_to_ib_sge(struct srpt_rdma_ch *ch,
+				 struct srpt_send_ioctx *ioctx,
+				 struct scst_cmd *scmnd)
+{
+	struct scatterlist *sg;
+	int sg_cnt;
+	scst_data_direction dir;
+	struct rdma_iu *riu;
+	struct srp_direct_buf *db;
+	dma_addr_t dma_addr;
+	struct ib_sge *sge;
+	u64 raddr;
+	u32 rsize;
+	u32 tsize;
+	u32 dma_len;
+	int count, nrdma;
+	int i, j, k;
+
+	BUG_ON(!ch);
+	BUG_ON(!ioctx);
+	BUG_ON(!scmnd);
+	dir = scst_cmd_get_data_direction(scmnd);
+	BUG_ON(dir == SCST_DATA_NONE);
+	/*
+	 * Cache 'dir' because it is needed in srpt_unmap_sg_to_ib_sge()
+	 * and because scst_set_cmd_error_status() resets scmnd->data_direction.
+	 */
+	ioctx->dir = dir;
+	if (dir == SCST_DATA_WRITE) {
+		scst_cmd_get_write_fields(scmnd, &sg, &sg_cnt);
+		WARN_ON(!sg);
+	} else {
+		sg = scst_cmd_get_sg(scmnd);
+		sg_cnt = scst_cmd_get_sg_cnt(scmnd);
+		WARN_ON(!sg);
+	}
+	ioctx->sg = sg;
+	ioctx->sg_cnt = sg_cnt;
+	count = ib_dma_map_sg(ch->sport->sdev->device, sg, sg_cnt,
+			      scst_to_tgt_dma_dir(dir));
+	if (unlikely(!count))
+		return -EBUSY;
+
+	ioctx->mapped_sg_count = count;
+
+	if (ioctx->rdma_ius && ioctx->n_rdma_ius)
+		nrdma = ioctx->n_rdma_ius;
+	else {
+		nrdma = count / SRPT_DEF_SG_PER_WQE + ioctx->n_rbuf;
+
+		ioctx->rdma_ius = kzalloc(nrdma * sizeof *riu,
+					  scst_cmd_atomic(scmnd)
+					  ? GFP_ATOMIC : GFP_KERNEL);
+		if (!ioctx->rdma_ius)
+			goto free_mem;
+
+		ioctx->n_rdma_ius = nrdma;
+	}
+
+	db = ioctx->rbufs;
+	tsize = (dir == SCST_DATA_READ)
+		? scst_cmd_get_adjusted_resp_data_len(scmnd)
+		: scst_cmd_get_bufflen(scmnd);
+	dma_len = sg_dma_len(&sg[0]);
+	riu = ioctx->rdma_ius;
+
+	/*
+	 * For each remote desc - calculate the #ib_sge.
+	 * If #ib_sge < SRPT_DEF_SG_PER_WQE per rdma operation then
+	 *      each remote desc rdma_iu is required a rdma wr;
+	 * else
+	 *      we need to allocate extra rdma_iu to carry extra #ib_sge in
+	 *      another rdma wr
+	 */
+	for (i = 0, j = 0;
+	     j < count && i < ioctx->n_rbuf && tsize > 0; ++i, ++riu, ++db) {
+		rsize = be32_to_cpu(db->len);
+		raddr = be64_to_cpu(db->va);
+		riu->raddr = raddr;
+		riu->rkey = be32_to_cpu(db->key);
+		riu->sge_cnt = 0;
+
+		/* calculate how many sge required for this remote_buf */
+		while (rsize > 0 && tsize > 0) {
+
+			if (rsize >= dma_len) {
+				tsize -= dma_len;
+				rsize -= dma_len;
+				raddr += dma_len;
+
+				if (tsize > 0) {
+					++j;
+					if (j < count)
+						dma_len = sg_dma_len(&sg[j]);
+				}
+			} else {
+				tsize -= rsize;
+				dma_len -= rsize;
+				rsize = 0;
+			}
+
+			++riu->sge_cnt;
+
+			if (rsize > 0 && riu->sge_cnt == SRPT_DEF_SG_PER_WQE) {
+				++ioctx->n_rdma;
+				riu->sge =
+				    kmalloc(riu->sge_cnt * sizeof *riu->sge,
+					    scst_cmd_atomic(scmnd)
+					    ? GFP_ATOMIC : GFP_KERNEL);
+				if (!riu->sge)
+					goto free_mem;
+
+				++riu;
+				riu->sge_cnt = 0;
+				riu->raddr = raddr;
+				riu->rkey = be32_to_cpu(db->key);
+			}
+		}
+
+		++ioctx->n_rdma;
+		riu->sge = kmalloc(riu->sge_cnt * sizeof *riu->sge,
+				   scst_cmd_atomic(scmnd)
+				   ? GFP_ATOMIC : GFP_KERNEL);
+		if (!riu->sge)
+			goto free_mem;
+	}
+
+	db = ioctx->rbufs;
+	tsize = (dir == SCST_DATA_READ)
+		? scst_cmd_get_adjusted_resp_data_len(scmnd)
+		: scst_cmd_get_bufflen(scmnd);
+	riu = ioctx->rdma_ius;
+	dma_len = sg_dma_len(&sg[0]);
+	dma_addr = sg_dma_address(&sg[0]);
+
+	/* this second loop is really mapped sg_addres to rdma_iu->ib_sge */
+	for (i = 0, j = 0;
+	     j < count && i < ioctx->n_rbuf && tsize > 0; ++i, ++riu, ++db) {
+		rsize = be32_to_cpu(db->len);
+		sge = riu->sge;
+		k = 0;
+
+		while (rsize > 0 && tsize > 0) {
+			sge->addr = dma_addr;
+			sge->lkey = ch->sport->sdev->mr->lkey;
+
+			if (rsize >= dma_len) {
+				sge->length =
+					(tsize < dma_len) ? tsize : dma_len;
+				tsize -= dma_len;
+				rsize -= dma_len;
+
+				if (tsize > 0) {
+					++j;
+					if (j < count) {
+						dma_len = sg_dma_len(&sg[j]);
+						dma_addr =
+						    sg_dma_address(&sg[j]);
+					}
+				}
+			} else {
+				sge->length = (tsize < rsize) ? tsize : rsize;
+				tsize -= rsize;
+				dma_len -= rsize;
+				dma_addr += rsize;
+				rsize = 0;
+			}
+
+			++k;
+			if (k == riu->sge_cnt && rsize > 0) {
+				++riu;
+				sge = riu->sge;
+				k = 0;
+			} else if (rsize > 0)
+				++sge;
+		}
+	}
+
+	return 0;
+
+free_mem:
+	srpt_unmap_sg_to_ib_sge(ch, ioctx);
+
+	return -ENOMEM;
+}
+
+/**
+ * srpt_unmap_sg_to_ib_sge() - Unmap an IB SGE list.
+ */
+static void srpt_unmap_sg_to_ib_sge(struct srpt_rdma_ch *ch,
+				    struct srpt_send_ioctx *ioctx)
+{
+	struct scst_cmd *scmnd;
+	struct scatterlist *sg;
+	scst_data_direction dir;
+
+	EXTRACHECKS_BUG_ON(!ch);
+	EXTRACHECKS_BUG_ON(!ioctx);
+	EXTRACHECKS_BUG_ON(ioctx->n_rdma && !ioctx->rdma_ius);
+
+	while (ioctx->n_rdma)
+		kfree(ioctx->rdma_ius[--ioctx->n_rdma].sge);
+
+	kfree(ioctx->rdma_ius);
+	ioctx->rdma_ius = NULL;
+
+	if (ioctx->mapped_sg_count) {
+		scmnd = ioctx->scmnd;
+		EXTRACHECKS_BUG_ON(!scmnd);
+		EXTRACHECKS_WARN_ON(ioctx->scmnd != scmnd);
+		EXTRACHECKS_WARN_ON(ioctx != scst_cmd_get_tgt_priv(scmnd));
+		sg = ioctx->sg;
+		EXTRACHECKS_WARN_ON(!sg);
+		dir = ioctx->dir;
+		EXTRACHECKS_BUG_ON(dir == SCST_DATA_NONE);
+		ib_dma_unmap_sg(ch->sport->sdev->device, sg, ioctx->sg_cnt,
+				scst_to_tgt_dma_dir(dir));
+		ioctx->mapped_sg_count = 0;
+	}
+}
+
+/**
+ * srpt_perform_rdmas() - Perform IB RDMA.
+ *
+ * Returns zero upon success or a negative number upon failure.
+ */
+static int srpt_perform_rdmas(struct srpt_rdma_ch *ch,
+			      struct srpt_send_ioctx *ioctx,
+			      scst_data_direction dir)
+{
+	struct ib_send_wr wr;
+	struct ib_send_wr *bad_wr;
+	struct rdma_iu *riu;
+	int i;
+	int ret;
+	int sq_wr_avail;
+
+	if (dir == SCST_DATA_WRITE) {
+		ret = -ENOMEM;
+		sq_wr_avail = atomic_sub_return(ioctx->n_rdma,
+						 &ch->sq_wr_avail);
+		if (sq_wr_avail < 0) {
+			PRINT_WARNING("IB send queue full (needed %d)",
+				      ioctx->n_rdma);
+			goto out;
+		}
+	}
+
+	ret = 0;
+	riu = ioctx->rdma_ius;
+	memset(&wr, 0, sizeof wr);
+
+	for (i = 0; i < ioctx->n_rdma; ++i, ++riu) {
+		if (dir == SCST_DATA_READ) {
+			wr.opcode = IB_WR_RDMA_WRITE;
+			wr.wr_id = encode_wr_id(IB_WC_RDMA_WRITE,
+						ioctx->ioctx.index);
+		} else {
+			wr.opcode = IB_WR_RDMA_READ;
+			wr.wr_id = encode_wr_id(IB_WC_RDMA_READ,
+						ioctx->ioctx.index);
+		}
+		wr.next = NULL;
+		wr.wr.rdma.remote_addr = riu->raddr;
+		wr.wr.rdma.rkey = riu->rkey;
+		wr.num_sge = riu->sge_cnt;
+		wr.sg_list = riu->sge;
+
+		/* only get completion event for the last rdma wr */
+		if (i == (ioctx->n_rdma - 1) && dir == SCST_DATA_WRITE)
+			wr.send_flags = IB_SEND_SIGNALED;
+
+		ret = ib_post_send(ch->qp, &wr, &bad_wr);
+		if (ret)
+			goto out;
+	}
+
+out:
+	if (unlikely(dir == SCST_DATA_WRITE && ret < 0))
+		atomic_add(ioctx->n_rdma, &ch->sq_wr_avail);
+	return ret;
+}
+
+/**
+ * srpt_xfer_data() - Start data transfer from initiator to target.
+ *
+ * Returns an SCST_TGT_RES_... status code.
+ *
+ * Note: Must not block.
+ */
+static int srpt_xfer_data(struct srpt_rdma_ch *ch,
+			  struct srpt_send_ioctx *ioctx,
+			  struct scst_cmd *scmnd)
+{
+	int ret;
+
+	ret = srpt_map_sg_to_ib_sge(ch, ioctx, scmnd);
+	if (ret) {
+		PRINT_ERROR("%s[%d] ret=%d", __func__, __LINE__, ret);
+		ret = SCST_TGT_RES_QUEUE_FULL;
+		goto out;
+	}
+
+	ret = srpt_perform_rdmas(ch, ioctx, scst_cmd_get_data_direction(scmnd));
+	if (ret) {
+		if (ret == -EAGAIN || ret == -ENOMEM) {
+			PRINT_INFO("%s[%d] queue full -- ret=%d",
+				   __func__, __LINE__, ret);
+			ret = SCST_TGT_RES_QUEUE_FULL;
+		} else {
+			PRINT_ERROR("%s[%d] fatal error -- ret=%d",
+				    __func__, __LINE__, ret);
+			ret = SCST_TGT_RES_FATAL_ERROR;
+		}
+		goto out_unmap;
+	}
+
+	ret = SCST_TGT_RES_SUCCESS;
+
+out:
+	return ret;
+out_unmap:
+	srpt_unmap_sg_to_ib_sge(ch, ioctx);
+	goto out;
+}
+
+/**
+ * srpt_pending_cmd_timeout() - SCST command HCA processing timeout callback.
+ *
+ * Called by the SCST core if no IB completion notification has been received
+ * within max_hw_pending_time seconds.
+ */
+static void srpt_pending_cmd_timeout(struct scst_cmd *scmnd)
+{
+	struct srpt_send_ioctx *ioctx;
+	enum srpt_command_state state;
+
+	ioctx = scst_cmd_get_tgt_priv(scmnd);
+	BUG_ON(!ioctx);
+
+	state = srpt_get_cmd_state(ioctx);
+	switch (state) {
+	case SRPT_STATE_NEW:
+	case SRPT_STATE_DATA_IN:
+	case SRPT_STATE_DONE:
+		/*
+		 * srpt_pending_cmd_timeout() should never be invoked for
+		 * commands in this state.
+		 */
+		PRINT_ERROR("Processing SCST command %p (SRPT state %d) took"
+			    " too long -- aborting", scmnd, state);
+		break;
+	case SRPT_STATE_NEED_DATA:
+	case SRPT_STATE_CMD_RSP_SENT:
+	case SRPT_STATE_MGMT_RSP_SENT:
+	default:
+		PRINT_ERROR("Command %p: IB completion for idx %u has not"
+			    " been received in time (SRPT command state %d)",
+			    scmnd, ioctx->ioctx.index, state);
+		break;
+	}
+
+	srpt_abort_scst_cmd(ioctx, SCST_CONTEXT_SAME);
+}
+
+/**
+ * srpt_rdy_to_xfer() - Transfers data from initiator to target.
+ *
+ * Called by the SCST core to transfer data from the initiator to the target
+ * (SCST_DATA_WRITE). Must not block.
+ */
+static int srpt_rdy_to_xfer(struct scst_cmd *scmnd)
+{
+	struct srpt_rdma_ch *ch;
+	struct srpt_send_ioctx *ioctx;
+	enum srpt_command_state new_state;
+	enum rdma_ch_state ch_state;
+	int ret;
+
+	ioctx = scst_cmd_get_tgt_priv(scmnd);
+	BUG_ON(!ioctx);
+
+	new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_NEED_DATA);
+	WARN_ON(new_state == SRPT_STATE_DONE);
+
+	ch = ioctx->ch;
+	WARN_ON(ch != scst_sess_get_tgt_priv(scst_cmd_get_session(scmnd)));
+	BUG_ON(!ch);
+
+	ch_state = atomic_read(&ch->state);
+	if (ch_state == RDMA_CHANNEL_DISCONNECTING) {
+		TRACE_DBG("cmd with tag %lld: channel disconnecting",
+			  scst_cmd_get_tag(scmnd));
+		srpt_set_cmd_state(ioctx, SRPT_STATE_DATA_IN);
+		ret = SCST_TGT_RES_FATAL_ERROR;
+		goto out;
+	} else if (ch_state == RDMA_CHANNEL_CONNECTING) {
+		ret = SCST_TGT_RES_QUEUE_FULL;
+		goto out;
+	}
+	ret = srpt_xfer_data(ch, ioctx, scmnd);
+
+out:
+	return ret;
+}
+
+/**
+ * srpt_xmit_response() - Transmits the response to a SCSI command.
+ *
+ * Callback function called by the SCST core. Must not block. Must ensure that
+ * scst_tgt_cmd_done() will get invoked when returning SCST_TGT_RES_SUCCESS.
+ */
+static int srpt_xmit_response(struct scst_cmd *scmnd)
+{
+	struct srpt_rdma_ch *ch;
+	struct srpt_send_ioctx *ioctx;
+	enum srpt_command_state state;
+	int ret;
+	scst_data_direction dir;
+	int resp_len;
+
+	ret = SCST_TGT_RES_SUCCESS;
+
+	ioctx = scst_cmd_get_tgt_priv(scmnd);
+	BUG_ON(!ioctx);
+
+	ch = scst_sess_get_tgt_priv(scst_cmd_get_session(scmnd));
+	BUG_ON(!ch);
+
+	state = srpt_test_and_set_cmd_state(ioctx, SRPT_STATE_NEW,
+					    SRPT_STATE_CMD_RSP_SENT);
+	if (state != SRPT_STATE_NEW) {
+		state = srpt_test_and_set_cmd_state(ioctx, SRPT_STATE_DATA_IN,
+						    SRPT_STATE_CMD_RSP_SENT);
+		if (state != SRPT_STATE_DATA_IN)
+			PRINT_ERROR("Unexpected command state %d",
+				    srpt_get_cmd_state(ioctx));
+	}
+
+	if (unlikely(scst_cmd_aborted(scmnd))) {
+		atomic_inc(&ch->req_lim_delta);
+		srpt_abort_scst_cmd(ioctx, SCST_CONTEXT_SAME);
+		goto out;
+	}
+
+	EXTRACHECKS_BUG_ON(scst_cmd_atomic(scmnd));
+
+	dir = scst_cmd_get_data_direction(scmnd);
+
+	/* For read commands, transfer the data to the initiator. */
+	if (dir == SCST_DATA_READ
+	    && scst_cmd_get_adjusted_resp_data_len(scmnd)) {
+		ret = srpt_xfer_data(ch, ioctx, scmnd);
+		if (ret == SCST_TGT_RES_QUEUE_FULL) {
+			srpt_set_cmd_state(ioctx, state);
+			PRINT_WARNING("xfer_data failed for tag %llu"
+				      " - retrying", scst_cmd_get_tag(scmnd));
+			goto out;
+		} else if (ret != SCST_TGT_RES_SUCCESS) {
+			PRINT_ERROR("xfer_data failed for tag %llu",
+				    scst_cmd_get_tag(scmnd));
+			goto out;
+		}
+	}
+
+	atomic_inc(&ch->req_lim);
+
+	resp_len = srpt_build_cmd_rsp(ch, ioctx,
+				      scst_cmd_get_tag(scmnd),
+				      scst_cmd_get_status(scmnd),
+				      scst_cmd_get_sense_buffer(scmnd),
+				      scst_cmd_get_sense_buffer_len(scmnd));
+
+	if (srpt_post_send(ch, ioctx, resp_len)) {
+		srpt_unmap_sg_to_ib_sge(ch, ioctx);
+		srpt_set_cmd_state(ioctx, state);
+		atomic_dec(&ch->req_lim);
+		PRINT_WARNING("sending response failed for tag %llu - retrying",
+			      scst_cmd_get_tag(scmnd));
+		ret = SCST_TGT_RES_QUEUE_FULL;
+	}
+
+out:
+	return ret;
+}
+
+/**
+ * srpt_tsk_mgmt_done() - SCST callback function that sends back the response
+ * for a task management request.
+ *
+ * Must not block.
+ */
+static void srpt_tsk_mgmt_done(struct scst_mgmt_cmd *mcmnd)
+{
+	struct srpt_rdma_ch *ch;
+	struct srpt_mgmt_ioctx *mgmt_ioctx;
+	struct srpt_send_ioctx *ioctx;
+	enum srpt_command_state new_state;
+	int rsp_len;
+
+	mgmt_ioctx = scst_mgmt_cmd_get_tgt_priv(mcmnd);
+	BUG_ON(!mgmt_ioctx);
+
+	ioctx = mgmt_ioctx->ioctx;
+	BUG_ON(!ioctx);
+
+	ch = ioctx->ch;
+	BUG_ON(!ch);
+
+	TRACE_DBG("%s: tsk_mgmt_done for tag= %lld status=%d",
+		  __func__, mgmt_ioctx->tag, scst_mgmt_cmd_get_status(mcmnd));
+
+	WARN_ON(in_irq());
+
+	new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_MGMT_RSP_SENT);
+	WARN_ON(new_state == SRPT_STATE_DONE);
+
+	atomic_inc(&ch->req_lim);
+
+	rsp_len = srpt_build_tskmgmt_rsp(ch, ioctx,
+					 scst_to_srp_tsk_mgmt_status(
+					 scst_mgmt_cmd_get_status(mcmnd)),
+					 mgmt_ioctx->tag);
+	/*
+	 * Note: the srpt_post_send() call below sends the task management
+	 * response asynchronously. It is possible that the SCST core has
+	 * already freed the struct scst_mgmt_cmd structure before the
+	 * response is sent. This is fine however.
+	 */
+	if (srpt_post_send(ch, ioctx, rsp_len)) {
+		PRINT_ERROR("%s", "Sending SRP_RSP response failed.");
+		srpt_set_cmd_state(ioctx, SRPT_STATE_DONE);
+		srpt_put_send_ioctx(ioctx);
+		atomic_dec(&ch->req_lim);
+	}
+
+	scst_mgmt_cmd_set_tgt_priv(mcmnd, NULL);
+
+	kfree(mgmt_ioctx);
+}
+
+/**
+ * srpt_get_initiator_port_transport_id() - SCST TransportID callback function.
+ *
+ * See also SPC-3, section 7.5.4.5, TransportID for initiator ports using SRP.
+ */
+static int srpt_get_initiator_port_transport_id(struct scst_session *scst_sess,
+						uint8_t **transport_id)
+{
+	struct srpt_rdma_ch *ch;
+	struct spc_rdma_transport_id {
+		uint8_t protocol_identifier;
+		uint8_t reserved[7];
+		uint8_t i_port_id[16];
+	};
+	struct spc_rdma_transport_id *tr_id;
+	int res;
+
+	if (!scst_sess) {
+		res = SCSI_TRANSPORTID_PROTOCOLID_SRP;
+		goto out;
+	}
+
+	ch = scst_sess_get_tgt_priv(scst_sess);
+	BUG_ON(!ch);
+
+	BUILD_BUG_ON(sizeof(*tr_id) != 24);
+
+	tr_id = kzalloc(sizeof(struct spc_rdma_transport_id), GFP_KERNEL);
+	if (!tr_id) {
+		PRINT_ERROR("%s", "Allocation of TransportID failed");
+		res = -ENOMEM;
+		goto out;
+	}
+
+	res = 0;
+	tr_id->protocol_identifier = SCSI_TRANSPORTID_PROTOCOLID_SRP;
+	memcpy(tr_id->i_port_id, ch->i_port_id, sizeof(ch->i_port_id));
+
+	*transport_id = (uint8_t *)tr_id;
+
+out:
+	return res;
+}
+
+/**
+ * srpt_on_free_cmd() - Free command-private data.
+ *
+ * Called by the SCST core. May be called in IRQ context.
+ */
+static void srpt_on_free_cmd(struct scst_cmd *scmnd)
+{
+}
+
+static void srpt_refresh_port_work(struct work_struct *work)
+{
+	struct srpt_port *sport = container_of(work, struct srpt_port, work);
+
+	srpt_refresh_port(sport);
+}
+
+/**
+ * srpt_detect() - Returns the number of target adapters.
+ *
+ * Callback function called by the SCST core.
+ */
+static int srpt_detect(struct scst_tgt_template *tp)
+{
+	int device_count;
+
+	device_count = atomic_read(&srpt_device_count);
+
+	return device_count;
+}
+
+/**
+ * srpt_release() - Free the resources associated with an SCST target.
+ *
+ * Callback function called by the SCST core from scst_unregister_target().
+ */
+static int srpt_release(struct scst_tgt *scst_tgt)
+{
+	struct srpt_device *sdev = scst_tgt_get_tgt_priv(scst_tgt);
+	struct srpt_rdma_ch *ch;
+
+	EXTRACHECKS_WARN_ON_ONCE(irqs_disabled());
+
+	BUG_ON(!scst_tgt);
+	if (WARN_ON(!sdev))
+		return -ENODEV;
+
+	spin_lock_irq(&sdev->spinlock);
+	while (!list_empty(&sdev->rch_list)) {
+		ch = list_first_entry(&sdev->rch_list, typeof(*ch), list);
+		srpt_unregister_channel(ch);
+	}
+	spin_unlock_irq(&sdev->spinlock);
+
+	scst_tgt_set_tgt_priv(scst_tgt, NULL);
+
+	return 0;
+}
+
+/**
+ * srpt_get_scsi_transport_version() - Returns the SCSI transport version.
+ * This function is called from scst_pres.c, the code that implements
+ * persistent reservation support.
+ */
+static uint16_t srpt_get_scsi_transport_version(struct scst_tgt *scst_tgt)
+{
+	return 0x0940; /* SRP */
+}
+
+static ssize_t show_login_info(struct device *dev,
+			       struct device_attribute *attr, char *buf)
+{
+	struct scst_tgt *scst_tgt;
+	struct srpt_device *sdev;
+	struct srpt_port *sport;
+	int i;
+	int len;
+
+	scst_tgt = scst_dev_to_tgt(dev);
+	sdev = scst_tgt_get_tgt_priv(scst_tgt);
+	len = 0;
+	for (i = 0; i < sdev->device->phys_port_cnt; i++) {
+		sport = &sdev->port[i];
+
+		len += sprintf(buf + len,
+			       "tid_ext=%016llx,ioc_guid=%016llx,pkey=ffff,"
+			       "dgid=%04x%04x%04x%04x%04x%04x%04x%04x,"
+			       "service_id=%016llx\n",
+			       srpt_service_guid,
+			       srpt_service_guid,
+			       be16_to_cpu(((__be16 *) sport->gid.raw)[0]),
+			       be16_to_cpu(((__be16 *) sport->gid.raw)[1]),
+			       be16_to_cpu(((__be16 *) sport->gid.raw)[2]),
+			       be16_to_cpu(((__be16 *) sport->gid.raw)[3]),
+			       be16_to_cpu(((__be16 *) sport->gid.raw)[4]),
+			       be16_to_cpu(((__be16 *) sport->gid.raw)[5]),
+			       be16_to_cpu(((__be16 *) sport->gid.raw)[6]),
+			       be16_to_cpu(((__be16 *) sport->gid.raw)[7]),
+			       srpt_service_guid);
+	}
+
+	return len;
+}
+
+static struct device_attribute srpt_show_login_info_attr =
+	__ATTR(login_info, S_IRUGO, show_login_info, NULL);
+
+static const struct device_attribute *srpt_tgt_attrs[] = {
+	&srpt_show_login_info_attr,
+	NULL
+};
+
+static ssize_t show_req_lim(struct kobject *kobj,
+			    struct kobj_attribute *attr, char *buf)
+{
+	struct scst_session *scst_sess;
+	struct srpt_rdma_ch *ch;
+
+	scst_sess = scst_kobj_to_sess(kobj);
+	ch = scst_sess_get_tgt_priv(scst_sess);
+	if (!ch)
+		return -ENOENT;
+	return sprintf(buf, "%d\n", atomic_read(&ch->req_lim));
+}
+
+static ssize_t show_req_lim_delta(struct kobject *kobj,
+				  struct kobj_attribute *attr, char *buf)
+{
+	struct scst_session *scst_sess;
+	struct srpt_rdma_ch *ch;
+
+	scst_sess = scst_kobj_to_sess(kobj);
+	ch = scst_sess_get_tgt_priv(scst_sess);
+	if (!ch)
+		return -ENOENT;
+	return sprintf(buf, "%d\n", atomic_read(&ch->req_lim_delta));
+}
+
+static const struct kobj_attribute srpt_req_lim_attr =
+	__ATTR(req_lim,       S_IRUGO, show_req_lim,       NULL);
+static const struct kobj_attribute srpt_req_lim_delta_attr =
+	__ATTR(req_lim_delta, S_IRUGO, show_req_lim_delta, NULL);
+
+static const struct attribute *srpt_sess_attrs[] = {
+	&srpt_req_lim_attr.attr,
+	&srpt_req_lim_delta_attr.attr,
+	NULL
+};
+
+/* SCST target template for the SRP target implementation. */
+static struct scst_tgt_template srpt_template = {
+	.name				 = DRV_NAME,
+	.owner				 = THIS_MODULE,
+	.sg_tablesize			 = SRPT_DEF_SG_TABLESIZE,
+	.max_hw_pending_time		 = 60/*seconds*/,
+	.enable_target			 = srpt_enable_target,
+	.is_target_enabled		 = srpt_is_target_enabled,
+	.tgt_attrs			 = srpt_tgt_attrs,
+	.sess_attrs			 = srpt_sess_attrs,
+#if defined(CONFIG_SCST_DEBUG) || defined(CONFIG_SCST_TRACING)
+	.default_trace_flags		 = DEFAULT_SRPT_TRACE_FLAGS,
+	.trace_flags			 = &trace_flag,
+#endif
+	.detect				 = srpt_detect,
+	.release			 = srpt_release,
+	.xmit_response			 = srpt_xmit_response,
+	.rdy_to_xfer			 = srpt_rdy_to_xfer,
+	.on_hw_pending_cmd_timeout	 = srpt_pending_cmd_timeout,
+	.on_free_cmd			 = srpt_on_free_cmd,
+	.task_mgmt_fn_done		 = srpt_tsk_mgmt_done,
+	.get_initiator_port_transport_id = srpt_get_initiator_port_transport_id,
+	.get_scsi_transport_version	 = srpt_get_scsi_transport_version,
+};
+
+/**
+ * srpt_add_one() - Infiniband device addition callback function.
+ */
+static void srpt_add_one(struct ib_device *device)
+{
+	struct srpt_device *sdev;
+	struct srpt_port *sport;
+	struct ib_srq_init_attr srq_attr;
+	int i;
+
+	TRACE_DBG("device = %p, device->dma_ops = %p", device, device->dma_ops);
+
+	sdev = kzalloc(sizeof *sdev, GFP_KERNEL);
+	if (!sdev)
+		goto err;
+
+	sdev->device = device;
+	INIT_LIST_HEAD(&sdev->rch_list);
+	spin_lock_init(&sdev->spinlock);
+
+	sdev->scst_tgt = scst_register_target(&srpt_template, NULL);
+	if (!sdev->scst_tgt) {
+		PRINT_ERROR("SCST registration failed for %s.",
+			    sdev->device->name);
+		goto free_dev;
+	}
+
+	scst_tgt_set_tgt_priv(sdev->scst_tgt, sdev);
+
+	if (ib_query_device(device, &sdev->dev_attr))
+		goto unregister_tgt;
+
+	sdev->pd = ib_alloc_pd(device);
+	if (IS_ERR(sdev->pd))
+		goto unregister_tgt;
+
+	sdev->mr = ib_get_dma_mr(sdev->pd, IB_ACCESS_LOCAL_WRITE);
+	if (IS_ERR(sdev->mr))
+		goto err_pd;
+
+	sdev->srq_size = min(srpt_srq_size, sdev->dev_attr.max_srq_wr);
+
+	srq_attr.event_handler = srpt_srq_event;
+	srq_attr.srq_context = (void *)sdev;
+	srq_attr.attr.max_wr = sdev->srq_size;
+	srq_attr.attr.max_sge = 1;
+	srq_attr.attr.srq_limit = 0;
+
+	sdev->srq = ib_create_srq(sdev->pd, &srq_attr);
+	if (IS_ERR(sdev->srq))
+		goto err_mr;
+
+	TRACE_DBG("%s: create SRQ #wr= %d max_allow=%d dev= %s", __func__,
+		  sdev->srq_size, sdev->dev_attr.max_srq_wr, device->name);
+
+	if (!srpt_service_guid)
+		srpt_service_guid = be64_to_cpu(device->node_guid);
+
+	sdev->cm_id = ib_create_cm_id(device, srpt_cm_handler, sdev);
+	if (IS_ERR(sdev->cm_id))
+		goto err_srq;
+
+	/* print out target login information */
+	TRACE_DBG("Target login info: id_ext=%016llx,"
+		  "ioc_guid=%016llx,pkey=ffff,service_id=%016llx",
+		  srpt_service_guid, srpt_service_guid, srpt_service_guid);
+
+	/*
+	 * We do not have a consistent service_id (ie. also id_ext of target_id)
+	 * to identify this target. We currently use the guid of the first HCA
+	 * in the system as service_id; therefore, the target_id will change
+	 * if this HCA is gone bad and replaced by different HCA
+	 */
+	if (ib_cm_listen(sdev->cm_id, cpu_to_be64(srpt_service_guid), 0, NULL))
+		goto err_cm;
+
+	INIT_IB_EVENT_HANDLER(&sdev->event_handler, sdev->device,
+			      srpt_event_handler);
+	if (ib_register_event_handler(&sdev->event_handler))
+		goto err_cm;
+
+	sdev->ioctx_ring = (struct srpt_recv_ioctx **)
+		srpt_alloc_ioctx_ring(sdev, sdev->srq_size,
+				      sizeof(*sdev->ioctx_ring[0]),
+				      srp_max_req_size, DMA_FROM_DEVICE);
+	if (!sdev->ioctx_ring)
+		goto err_event;
+
+	for (i = 0; i < sdev->srq_size; ++i)
+		srpt_post_recv(sdev, sdev->ioctx_ring[i]);
+
+	WARN_ON(sdev->device->phys_port_cnt
+		> sizeof(sdev->port)/sizeof(sdev->port[0]));
+
+	for (i = 1; i <= sdev->device->phys_port_cnt; i++) {
+		sport = &sdev->port[i - 1];
+		sport->sdev = sdev;
+		sport->port = i;
+		INIT_WORK(&sport->work, srpt_refresh_port_work);
+		if (srpt_refresh_port(sport)) {
+			PRINT_ERROR("MAD registration failed for %s-%d.",
+				    sdev->device->name, i);
+			goto err_ring;
+		}
+	}
+
+	atomic_inc(&srpt_device_count);
+out:
+	ib_set_client_data(device, &srpt_client, sdev);
+	return;
+
+err_ring:
+	srpt_free_ioctx_ring((struct srpt_ioctx **)sdev->ioctx_ring, sdev,
+			     sdev->srq_size, srp_max_req_size,
+			     DMA_FROM_DEVICE);
+err_event:
+	ib_unregister_event_handler(&sdev->event_handler);
+err_cm:
+	ib_destroy_cm_id(sdev->cm_id);
+err_srq:
+	ib_destroy_srq(sdev->srq);
+err_mr:
+	ib_dereg_mr(sdev->mr);
+err_pd:
+	ib_dealloc_pd(sdev->pd);
+unregister_tgt:
+	scst_unregister_target(sdev->scst_tgt);
+free_dev:
+	kfree(sdev);
+err:
+	sdev = NULL;
+	PRINT_INFO("%s(%s) failed.", __func__, device->name);
+	goto out;
+}
+
+/**
+ * srpt_remove_one() - InfiniBand device removal callback function.
+ */
+static void srpt_remove_one(struct ib_device *device)
+{
+	int i;
+	struct srpt_device *sdev;
+
+	sdev = ib_get_client_data(device, &srpt_client);
+	if (!sdev) {
+		PRINT_INFO("%s(%s): nothing to do.", __func__, device->name);
+		return;
+	}
+
+	srpt_unregister_mad_agent(sdev);
+
+	ib_unregister_event_handler(&sdev->event_handler);
+
+	/* Cancel any work queued by the just unregistered IB event handler. */
+	for (i = 0; i < sdev->device->phys_port_cnt; i++)
+		cancel_work_sync(&sdev->port[i].work);
+
+	ib_destroy_cm_id(sdev->cm_id);
+	ib_destroy_srq(sdev->srq);
+	ib_dereg_mr(sdev->mr);
+	ib_dealloc_pd(sdev->pd);
+
+	/*
+	 * Unregistering an SCST target must happen after destroying sdev->cm_id
+	 * such that no new SRP_LOGIN_REQ information units can arrive while
+	 * destroying the SCST target.
+	 */
+	scst_unregister_target(sdev->scst_tgt);
+	sdev->scst_tgt = NULL;
+
+	srpt_free_ioctx_ring((struct srpt_ioctx **)sdev->ioctx_ring, sdev,
+			     sdev->srq_size, srp_max_req_size, DMA_FROM_DEVICE);
+	sdev->ioctx_ring = NULL;
+	kfree(sdev);
+}
+
+/**
+ * srpt_init_module() - Kernel module initialization.
+ *
+ * Note: Since ib_register_client() registers callback functions, and since at
+ * least one of these callback functions (srpt_add_one()) calls SCST functions,
+ * the SCST target template must be registered before ib_register_client() is
+ * called.
+ */
+static int __init srpt_init_module(void)
+{
+	int ret;
+
+	ret = -EINVAL;
+	if (srp_max_req_size < MIN_MAX_REQ_SIZE) {
+		PRINT_ERROR("invalid value %d for kernel module parameter"
+			    " srp_max_req_size -- must be at least %d.",
+			    srp_max_req_size,
+			    MIN_MAX_REQ_SIZE);
+		goto out;
+	}
+
+	if (srp_max_rsp_size < MIN_MAX_RSP_SIZE) {
+		PRINT_ERROR("invalid value %d for kernel module parameter"
+			    " srp_max_rsp_size -- must be at least %d.",
+			    srp_max_rsp_size,
+			    MIN_MAX_RSP_SIZE);
+		goto out;
+	}
+
+	if (srpt_srq_size < MIN_SRPT_SRQ_SIZE
+	    || srpt_srq_size > MAX_SRPT_SRQ_SIZE) {
+		PRINT_ERROR("invalid value %d for kernel module parameter"
+			    " srpt_srq_size -- must be in the range [%d..%d].",
+			    srpt_srq_size, MIN_SRPT_SRQ_SIZE,
+			    MAX_SRPT_SRQ_SIZE);
+		goto out;
+	}
+
+	if (srpt_sq_size < MIN_SRPT_SQ_SIZE) {
+		PRINT_ERROR("invalid value %d for kernel module parameter"
+			    " srpt_sq_size -- must be at least %d.",
+			    srpt_srq_size, MIN_SRPT_SQ_SIZE);
+		goto out;
+	}
+
+	switch (thread) {
+	case MODE_ALL_IN_SIRQ:
+		/*
+		 * Process both IB completions and SCST commands in SIRQ
+		 * context. May lead to soft lockups and other scary behavior
+		 * under sufficient load.
+		 */
+		srpt_template.rdy_to_xfer_atomic = true;
+		break;
+	case MODE_IB_COMPLETION_IN_THREAD:
+		/*
+		 * Process IB completions in the kernel thread associated with
+		 * the RDMA channel, and process SCST commands in the kernel
+		 * threads created by the SCST core.
+		 */
+		srpt_template.rdy_to_xfer_atomic = false;
+		break;
+	case MODE_IB_COMPLETION_IN_SIRQ:
+	default:
+		/*
+		 * Process IB completions in SIRQ context and SCST commands in
+		 * the kernel threads created by the SCST core.
+		 */
+		srpt_template.rdy_to_xfer_atomic = false;
+		break;
+	}
+
+	ret = scst_register_target_template(&srpt_template);
+	if (ret < 0) {
+		PRINT_ERROR("%s", "couldn't register with scst");
+		ret = -ENODEV;
+		goto out;
+	}
+
+	ret = ib_register_client(&srpt_client);
+	if (ret) {
+		PRINT_ERROR("%s", "couldn't register IB client");
+		goto out_unregister_target;
+	}
+
+	return 0;
+
+out_unregister_target:
+	scst_unregister_target_template(&srpt_template);
+out:
+	return ret;
+}
+
+static void __exit srpt_cleanup_module(void)
+{
+	ib_unregister_client(&srpt_client);
+	scst_unregister_target_template(&srpt_template);
+}
+
+module_init(srpt_init_module);
+module_exit(srpt_cleanup_module);
+
+/*
+ * Local variables:
+ * c-basic-offset:   8
+ * indent-tabs-mode: t
+ * End:
+ */
diff --git a/drivers/scst/srpt/ib_srpt.h b/drivers/scst/srpt/ib_srpt.h
new file mode 100644
index 0000000..1541add
--- /dev/null
+++ b/drivers/scst/srpt/ib_srpt.h
@@ -0,0 +1,352 @@
+/*
+ * Copyright (c) 2006 - 2009 Mellanox Technology Inc.  All rights reserved.
+ * Copyright (C) 2009 - 2010 Bart Van Assche <bart.vanassche@gmail.com>
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+
+#ifndef IB_SRPT_H
+#define IB_SRPT_H
+
+#include <linux/version.h>
+#include <linux/types.h>
+#include <linux/list.h>
+
+#include <rdma/ib_verbs.h>
+#include <rdma/ib_sa.h>
+#include <rdma/ib_cm.h>
+
+#include <scsi/srp.h>
+
+#include <scst/scst.h>
+
+#include "ib_dm_mad.h"
+
+/*
+ * The prefix the ServiceName field must start with in the device management
+ * ServiceEntries attribute pair. See also the SRP r16a document.
+ */
+#define SRP_SERVICE_NAME_PREFIX		"SRP.T10:"
+
+enum {
+	/*
+	 * SRP IOControllerProfile attributes for SRP target ports that have
+	 * not been defined in <scsi/srp.h>. Source: section B.7, table B.7
+	 * in the SRP r16a document.
+	 */
+	SRP_PROTOCOL = 0x0108,
+	SRP_PROTOCOL_VERSION = 0x0001,
+	SRP_IO_SUBCLASS = 0x609e,
+	SRP_SEND_TO_IOC = 0x01,
+	SRP_SEND_FROM_IOC = 0x02,
+	SRP_RDMA_READ_FROM_IOC = 0x08,
+	SRP_RDMA_WRITE_FROM_IOC = 0x20,
+
+	/*
+	 * srp_login_cmd.req_flags bitmasks. See also table 9 in the SRP r16a
+	 * document.
+	 */
+	SRP_MTCH_ACTION = 0x03, /* MULTI-CHANNEL ACTION */
+	SRP_LOSOLNT = 0x10, /* logout solicited notification */
+	SRP_CRSOLNT = 0x20, /* credit request solicited notification */
+	SRP_AESOLNT = 0x40, /* asynchronous event solicited notification */
+
+	/*
+	 * srp_cmd.sol_nt / srp_tsk_mgmt.sol_not bitmasks. See also tables
+	 * 18 and 20 in the T10 r16a document.
+	 */
+	SRP_SCSOLNT = 0x02, /* SCSOLNT = successful solicited notification */
+	SRP_UCSOLNT = 0x04, /* UCSOLNT = unsuccessful solicited notification */
+
+	/*
+	 * srp_rsp.sol_not / srp_t_logout.sol_not bitmasks. See also tables
+	 * 16 and 22 in the T10 r16a document.
+	 */
+	SRP_SOLNT = 0x01, /* SOLNT = solicited notification */
+
+	/* See also table 24 in the T10 r16a document. */
+	SRP_TSK_MGMT_SUCCESS = 0x00,
+	SRP_TSK_MGMT_FUNC_NOT_SUPP = 0x04,
+	SRP_TSK_MGMT_FAILED = 0x05,
+
+	/* See also table 21 in the T10 r16a document. */
+	SRP_CMD_SIMPLE_Q = 0x0,
+	SRP_CMD_HEAD_OF_Q = 0x1,
+	SRP_CMD_ORDERED_Q = 0x2,
+	SRP_CMD_ACA = 0x4,
+
+	SRP_LOGIN_RSP_MULTICHAN_NO_CHAN = 0x0,
+	SRP_LOGIN_RSP_MULTICHAN_TERMINATED = 0x1,
+	SRP_LOGIN_RSP_MULTICHAN_MAINTAINED = 0x2,
+
+	SRPT_DEF_SG_TABLESIZE = 128,
+	SRPT_DEF_SG_PER_WQE = 16,
+
+	MIN_SRPT_SQ_SIZE = 16,
+	DEF_SRPT_SQ_SIZE = 4096,
+	SRPT_RQ_SIZE = 128,
+	MIN_SRPT_SRQ_SIZE = 4,
+	DEFAULT_SRPT_SRQ_SIZE = 4095,
+	MAX_SRPT_SRQ_SIZE = 65535,
+
+	MIN_MAX_REQ_SIZE = 996,
+	DEFAULT_MAX_REQ_SIZE
+		= sizeof(struct srp_cmd)/*48*/
+		+ sizeof(struct srp_indirect_buf)/*20*/
+		+ 128 * sizeof(struct srp_direct_buf)/*16*/,
+
+	MIN_MAX_RSP_SIZE = sizeof(struct srp_rsp)/*36*/ + 4,
+	DEFAULT_MAX_RSP_SIZE = 256, /* leaves 220 bytes for sense data */
+
+	DEFAULT_MAX_RDMA_SIZE = 65536,
+};
+
+static inline u64 encode_wr_id(u8 opcode, u32 idx)
+{ return ((u64)opcode << 32) | idx; }
+static inline u8 opcode_from_wr_id(u64 wr_id)
+{ return wr_id >> 32; }
+static inline u32 idx_from_wr_id(u64 wr_id)
+{ return (u32)wr_id; }
+
+struct rdma_iu {
+	u64 raddr;
+	u32 rkey;
+	struct ib_sge *sge;
+	u32 sge_cnt;
+	int mem_id;
+};
+
+/**
+ * enum srpt_command_state - SCSI command state managed by SRPT.
+ * @SRPT_STATE_NEW:           New command arrived and is being processed.
+ * @SRPT_STATE_NEED_DATA:     Processing a write or bidir command and waiting
+ *                            for data arrival.
+ * @SRPT_STATE_DATA_IN:       Data for the write or bidir command arrived and is
+ *                            being processed.
+ * @SRPT_STATE_CMD_RSP_SENT:  SRP_RSP for SRP_CMD has been sent.
+ * @SRPT_STATE_MGMT_RSP_SENT: SRP_RSP for SRP_TSK_MGMT has been sent.
+ * @SRPT_STATE_DONE:          Command processing finished successfully, command
+ *                            processing has been aborted or command processing
+ *                            failed.
+ */
+enum srpt_command_state {
+	SRPT_STATE_NEW = 0,
+	SRPT_STATE_NEED_DATA = 1,
+	SRPT_STATE_DATA_IN = 2,
+	SRPT_STATE_CMD_RSP_SENT = 3,
+	SRPT_STATE_MGMT_RSP_SENT = 4,
+	SRPT_STATE_DONE = 5,
+};
+
+/**
+ * struct srpt_ioctx - Shared SRPT I/O context information.
+ * @buf:   Pointer to the buffer.
+ * @dma:   DMA address of the buffer.
+ * @index: Index of the I/O context in its ioctx_ring array.
+ */
+struct srpt_ioctx {
+	void			*buf;
+	dma_addr_t		dma;
+	uint32_t		index;
+};
+
+/**
+ * struct srpt_recv_ioctx - SRPT receive I/O context.
+ * @ioctx:     See above.
+ * @wait_list: Node for insertion in srpt_rdma_ch.cmd_wait_list.
+ */
+struct srpt_recv_ioctx {
+	struct srpt_ioctx	ioctx;
+	struct list_head	wait_list;
+};
+
+/**
+ * struct srpt_send_ioctx - SRPT send I/O context.
+ * @ioctx:     See above.
+ * @free_list: Allows to make this struct an entry in srpt_rdma_ch.free_list.
+ * @state:     I/O context state. See also enum srpt_command_state.
+ */
+struct srpt_send_ioctx {
+	struct srpt_ioctx	ioctx;
+	struct srpt_rdma_ch	*ch;
+	struct rdma_iu		*rdma_ius;
+	struct srp_direct_buf	*rbufs;
+	struct srp_direct_buf	single_rbuf;
+	struct scatterlist	*sg;
+	struct list_head	free_list;
+	int			sg_cnt;
+	int			mapped_sg_count;
+	u16			n_rdma_ius;
+	u8			n_rdma;
+	u8			n_rbuf;
+
+	struct scst_cmd		*scmnd;
+	scst_data_direction	dir;
+	atomic_t		state;
+};
+
+/**
+ * struct srpt_mgmt_ioctx - SCST management command context information.
+ * @ioctx: SRPT I/O context associated with the management command.
+ * @tag:   SCSI tag of the management command.
+ */
+struct srpt_mgmt_ioctx {
+	struct srpt_send_ioctx	*ioctx;
+	u64			tag;
+};
+
+/**
+ * enum rdma_ch_state - SRP channel state.
+ */
+enum rdma_ch_state {
+	RDMA_CHANNEL_CONNECTING,
+	RDMA_CHANNEL_LIVE,
+	RDMA_CHANNEL_DISCONNECTING
+};
+
+/**
+ * struct srpt_rdma_ch - RDMA channel.
+ * @wait_queue:    Allows the kernel thread to wait for more work.
+ * @thread:        Kernel thread that processes the IB queues associated with
+ *                 the channel.
+ * @cm_id:         IB CM ID associated with the channel.
+ * @rq_size:       IB receive queue size.
+ * @processing_compl: whether or not an IB completion is being processed.
+ * @qp:            IB queue pair used for communicating over this channel.
+ * @sq_wr_avail:   number of work requests available in the send queue.
+ * @cq:            IB completion queue for this channel.
+ * @sport:         pointer to the information of the HCA port used by this
+ *                 channel.
+ * @i_port_id:     128-bit initiator port identifier copied from SRP_LOGIN_REQ.
+ * @t_port_id:     128-bit target port identifier copied from SRP_LOGIN_REQ.
+ * @max_ti_iu_len: maximum target-to-initiator information unit length.
+ * @supports_cred_req: whether or not the initiator supports SRP_CRED_REQ.
+ * @req_lim:       request limit: maximum number of requests that may be sent
+ *                 by the initiator without having received a response.
+ * @state:         channel state. See also enum rdma_ch_state.
+ * @list:          node for insertion in the srpt_device.rch_list list.
+ * @cmd_wait_list: list of SCST commands that arrived before the RTU event. This
+ *                 list contains struct srpt_ioctx elements and is protected
+ *                 against concurrent modification by the cm_id spinlock.
+ * @spinlock:      Protects free_list.
+ * @free_list:     Head of list with free send I/O contexts.
+ * @scst_sess:     SCST session information associated with this SRP channel.
+ * @sess_name:     SCST session name.
+ */
+struct srpt_rdma_ch {
+	wait_queue_head_t	wait_queue;
+	struct task_struct	*thread;
+	struct ib_cm_id		*cm_id;
+	struct ib_qp		*qp;
+	int			rq_size;
+	atomic_t		processing_compl;
+	struct ib_cq		*cq;
+	atomic_t		sq_wr_avail;
+	struct srpt_port	*sport;
+	u8			i_port_id[16];
+	u8			t_port_id[16];
+	int			max_ti_iu_len;
+	atomic_t		req_lim;
+	atomic_t		req_lim_delta;
+	spinlock_t		spinlock;
+	struct list_head	free_list;
+	struct srpt_send_ioctx	**ioctx_ring;
+	struct ib_wc		wc[16];
+	atomic_t		state;
+	struct list_head	list;
+	struct list_head	cmd_wait_list;
+
+	struct scst_session	*scst_sess;
+	u8			sess_name[36];
+};
+
+/**
+ * struct srpt_port - Information associated by SRPT with a single IB port.
+ * @sdev:      backpointer to the HCA information.
+ * @mad_agent: per-port management datagram processing information.
+ * @port:      one-based port number.
+ * @sm_lid:    cached value of the port's sm_lid.
+ * @lid:       cached value of the port's lid.
+ * @gid:       cached value of the port's gid.
+ * @work:      work structure for refreshing the aforementioned cached values.
+ */
+struct srpt_port {
+	struct srpt_device *sdev;
+	struct ib_mad_agent *mad_agent;
+	u8 port;
+	u16 sm_lid;
+	u16 lid;
+	union ib_gid gid;
+	struct work_struct work;
+};
+
+/**
+ * struct srpt_device - Information associated by SRPT with a single HCA.
+ * @device:        backpointer to the struct ib_device managed by the IB core.
+ * @pd:            IB protection domain.
+ * @mr:            L_Key (local key) with write access to all local memory.
+ * @srq:           Per-HCA SRQ (shared receive queue).
+ * @cm_id:         connection identifier.
+ * @dev_attr:      attributes of the InfiniBand device as obtained during the
+ *                 ib_client.add() callback.
+ * @ioctx_ring:    Per-HCA I/O context ring.
+ * @rch_list:      per-device channel list -- see also srpt_rdma_ch.list.
+ * @spinlock:      protects rch_list.
+ * @srpt_port:     information about the ports owned by this HCA.
+ * @event_handler: per-HCA asynchronous IB event handler.
+ * @dev:           per-port srpt-<portname> device instance.
+ * @scst_tgt:      SCST target information associated with this HCA.
+ * @enabled:       Whether or not this SCST target is enabled.
+ */
+struct srpt_device {
+	struct ib_device *device;
+	struct ib_pd *pd;
+	struct ib_mr *mr;
+	struct ib_srq *srq;
+	struct ib_cm_id *cm_id;
+	struct ib_device_attr dev_attr;
+	int srq_size;
+	struct srpt_recv_ioctx **ioctx_ring;
+	struct list_head rch_list;
+	spinlock_t spinlock;
+	struct srpt_port port[2];
+	struct ib_event_handler event_handler;
+	struct scst_tgt *scst_tgt;
+	bool enabled;
+};
+
+#endif				/* IB_SRPT_H */
+
+/*
+ * Local variables:
+ * c-basic-offset:   8
+ * indent-tabs-mode: t
+ * End:
+ */
-- 
1.7.1


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* RE: [PATCH 4/7] [SCSI] scst: Add SRP target driver
  2010-12-20 17:49 [PATCH 4/7] [SCSI] scst: Add SRP target driver Bart Van Assche
@ 2010-12-21  1:50 ` Jack Wang
       [not found]   ` <544F03C80B314396A5517F4CD1A848B6-aA8Fybl81pBv1O+Z8WTAqQ@public.gmane.org>
       [not found] ` <201012201849.27639.bvanassche-HInyCGIudOg@public.gmane.org>
  1 sibling, 1 reply; 6+ messages in thread
From: Jack Wang @ 2010-12-21  1:50 UTC (permalink / raw)
  To: 'Bart Van Assche', linux-scsi, scst-devel, linux-rdma,
	James.Bottomley
  Cc: 'Vu Pham', 'Roland Dreier',
	'David Dillow'

> 
> This patch adds the kernel module ib_srpt, which is a SCSI RDMA Protocol
(SRP)
> target implementation. This driver uses the InfiniBand stack and the SCST
core.
> 
> It is a high performance driver capable of handling 600K+ 4K random write
> IOPS by a single target as well as 2.5+ GB/s sequential throughput over
> a single QDR IB port.
> 
> This driver was originally developed by Vu Pham (Mellanox) and has been
> optimized by Bart Van Assche.
> 
> Signed-off-by: Bart Van Assche <bvanassche@acm.org>
> Signed-off-by: Vladislav Bolkhovitin <vst@vlnb.net>
> Cc: Vu Pham <vu@mellanox.com>
> Cc: Roland Dreier <rdreier@cisco.com>
> Cc: David Dillow <dillowda@ornl.gov>
> ---
>  Documentation/scst/README.srpt |  112 ++
>  drivers/scst/Kconfig           |    1 +
>  drivers/scst/Makefile          |    2 +-
>  drivers/scst/srpt/Kconfig      |   12 +
>  drivers/scst/srpt/Makefile     |    1 +
>  drivers/scst/srpt/ib_dm_mad.h  |  139 ++
>  drivers/scst/srpt/ib_srpt.c    | 3613
> ++++++++++++++++++++++++++++++++++++++++
>  drivers/scst/srpt/ib_srpt.h    |  352 ++++
>  8 files changed, 4231 insertions(+), 1 deletions(-)
>  create mode 100644 Documentation/scst/README.srpt
>  create mode 100644 drivers/scst/srpt/Kconfig
>  create mode 100644 drivers/scst/srpt/Makefile
>  create mode 100644 drivers/scst/srpt/ib_dm_mad.h
>  create mode 100644 drivers/scst/srpt/ib_srpt.c
>  create mode 100644 drivers/scst/srpt/ib_srpt.h
> 
> diff --git a/Documentation/scst/README.srpt
> b/Documentation/scst/README.srpt
> new file mode 100644
> index 0000000..6f8b3ca
> --- /dev/null
> +++ b/Documentation/scst/README.srpt
> @@ -0,0 +1,112 @@
> +SCSI RDMA Protocol (SRP) Target driver for Linux
> +=================================================
> +
> +The SRP Target driver is designed to work directly on top of the
> +OpenFabrics OFED-1.x software stack (http://www.openfabrics.org) or
> +the Infiniband drivers in the Linux kernel tree
> +(http://www.kernel.org). The SRP target driver also interfaces with
> +the generic SCSI target mid-level driver called SCST
> +(http://scst.sourceforge.net).
> +
> +How-to run
> +-----------
> +
> +A. On srp target machine
> +1. Please refer to SCST's README for loading scst driver and its
> +dev_handlers drivers (scst_disk, scst_vdisk block or file IO mode,
> nullio, ...)
> +
> +Example 1: working with real back-end scsi disks
> +a. modprobe scst
> +b. modprobe scst_disk
> +c. cat /proc/scsi_tgt/scsi_tgt
> +
> +ibstor00:~ # cat /proc/scsi_tgt/scsi_tgt
> +Device (host:ch:id:lun or name)                             Device
handler
> +0:0:0:0                                                     dev_disk
> +4:0:0:0                                                     dev_disk
> +5:0:0:0                                                     dev_disk
> +6:0:0:0                                                     dev_disk
> +7:0:0:0                                                     dev_disk
> +
> +Now you want to exclude the first scsi disk and expose the last 4 scsi
disks
> as
> +IB/SRP luns for I/O
> +echo "add 4:0:0:0 0" >/proc/scsi_tgt/groups/Default/devices
> +echo "add 5:0:0:0 1" >/proc/scsi_tgt/groups/Default/devices
> +echo "add 6:0:0:0 2" >/proc/scsi_tgt/groups/Default/devices
> +echo "add 7:0:0:0 3" >/proc/scsi_tgt/groups/Default/devices
> +
> +Example 2: working with VDISK FILEIO mode (using md0 device and file
10G-file)
> +a. modprobe scst
> +b. modprobe scst_vdisk
> +c. echo "open vdisk0 /dev/md0" > /proc/scsi_tgt/vdisk/vdisk
> +d. echo "open vdisk1 /10G-file" > /proc/scsi_tgt/vdisk/vdisk
> +e. echo "add vdisk0 0" >/proc/scsi_tgt/groups/Default/devices
> +f. echo "add vdisk1 1" >/proc/scsi_tgt/groups/Default/devices
> +
> +Example 3: working with VDISK BLOCKIO mode (using md0 device, sda, and
> cciss/c1d0)
> +a. modprobe scst
> +b. modprobe scst_vdisk
> +c. echo "open vdisk0 /dev/md0 BLOCKIO" > /proc/scsi_tgt/vdisk/vdisk
> +d. echo "open vdisk1 /dev/sda BLOCKIO" > /proc/scsi_tgt/vdisk/vdisk
> +e. echo "open vdisk2 /dev/cciss/c1d0 BLOCKIO" >
/proc/scsi_tgt/vdisk/vdisk
> +f. echo "add vdisk0 0" >/proc/scsi_tgt/groups/Default/devices
> +g. echo "add vdisk1 1" >/proc/scsi_tgt/groups/Default/devices
> +h. echo "add vdisk2 2" >/proc/scsi_tgt/groups/Default/devices
> +

[Jack] This README looks should update to new sysfs interface too.



^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH 4/7] [SCSI] scst: Add SRP target driver
       [not found]   ` <544F03C80B314396A5517F4CD1A848B6-aA8Fybl81pBv1O+Z8WTAqQ@public.gmane.org>
@ 2010-12-21  9:50     ` Bart Van Assche
  0 siblings, 0 replies; 6+ messages in thread
From: Bart Van Assche @ 2010-12-21  9:50 UTC (permalink / raw)
  To: Jack Wang
  Cc: linux-scsi-u79uwXL29TY76Z2rM5mHXA,
	scst-devel-5NWGOfrQmneRv+LV9MX5uipxlwaOVQ5f,
	linux-rdma-u79uwXL29TY76Z2rM5mHXA, James.Bottomley-l3A5Bk7waGM,
	Vu Pham, Roland Dreier, David Dillow

On Tue, Dec 21, 2010 at 2:50 AM, Jack Wang <jack_wang-aA8Fybl81pAAvxtiuMwx3w@public.gmane.org> wrote:
>
> >
> > This patch adds the kernel module ib_srpt, which is a SCSI RDMA Protocol
> (SRP)
> > target implementation. This driver uses the InfiniBand stack and the SCST
> core.
> [ ... ]
>
> [Jack] This README looks should update to new sysfs interface too.

That's correct - thanks for the feedback. If I do not receive further
feedback, I will apply the patch below:

Subject: [PATCH] [SCSI] scst/ib_srpt: Updated documentation

Make the documentation more clear, update it for the new sysfs interface and
add detailed information about the ib_srpt kernel module parameters.

Signed-off-by: Bart Van Assche <bvanassche-HInyCGIudOg@public.gmane.org>
---
 Documentation/scst/README.srpt |  235 +++++++++++++++++++++++-----------------
 1 files changed, 136 insertions(+), 99 deletions(-)

diff --git a/Documentation/scst/README.srpt b/Documentation/scst/README.srpt
index 6f8b3ca..c1a1136 100644
--- a/Documentation/scst/README.srpt
+++ b/Documentation/scst/README.srpt
@@ -1,112 +1,149 @@
-SCSI RDMA Protocol (SRP) Target driver for Linux
+SCSI RDMA Protocol (SRP) Target Driver for Linux
 =================================================

-The SRP Target driver is designed to work directly on top of the
-OpenFabrics OFED-1.x software stack (http://www.openfabrics.org) or
-the Infiniband drivers in the Linux kernel tree
-(http://www.kernel.org). The SRP target driver also interfaces with
-the generic SCSI target mid-level driver called SCST
-(http://scst.sourceforge.net).
-
-How-to run
------------
-
-A. On srp target machine
-1. Please refer to SCST's README for loading scst driver and its
-dev_handlers drivers (scst_disk, scst_vdisk block or file IO mode, nullio, ...)
-
-Example 1: working with real back-end scsi disks
-a. modprobe scst
-b. modprobe scst_disk
-c. cat /proc/scsi_tgt/scsi_tgt
-
-ibstor00:~ # cat /proc/scsi_tgt/scsi_tgt
-Device (host:ch:id:lun or name)                             Device handler
-0:0:0:0                                                     dev_disk
-4:0:0:0                                                     dev_disk
-5:0:0:0                                                     dev_disk
-6:0:0:0                                                     dev_disk
-7:0:0:0                                                     dev_disk
-
-Now you want to exclude the first scsi disk and expose the last 4 scsi disks as
-IB/SRP luns for I/O
-echo "add 4:0:0:0 0" >/proc/scsi_tgt/groups/Default/devices
-echo "add 5:0:0:0 1" >/proc/scsi_tgt/groups/Default/devices
-echo "add 6:0:0:0 2" >/proc/scsi_tgt/groups/Default/devices
-echo "add 7:0:0:0 3" >/proc/scsi_tgt/groups/Default/devices
-
-Example 2: working with VDISK FILEIO mode (using md0 device and file 10G-file)
-a. modprobe scst
-b. modprobe scst_vdisk
-c. echo "open vdisk0 /dev/md0" > /proc/scsi_tgt/vdisk/vdisk
-d. echo "open vdisk1 /10G-file" > /proc/scsi_tgt/vdisk/vdisk
-e. echo "add vdisk0 0" >/proc/scsi_tgt/groups/Default/devices
-f. echo "add vdisk1 1" >/proc/scsi_tgt/groups/Default/devices
-
-Example 3: working with VDISK BLOCKIO mode (using md0 device, sda,
and cciss/c1d0)
-a. modprobe scst
-b. modprobe scst_vdisk
-c. echo "open vdisk0 /dev/md0 BLOCKIO" > /proc/scsi_tgt/vdisk/vdisk
-d. echo "open vdisk1 /dev/sda BLOCKIO" > /proc/scsi_tgt/vdisk/vdisk
-e. echo "open vdisk2 /dev/cciss/c1d0 BLOCKIO" > /proc/scsi_tgt/vdisk/vdisk
-f. echo "add vdisk0 0" >/proc/scsi_tgt/groups/Default/devices
-g. echo "add vdisk1 1" >/proc/scsi_tgt/groups/Default/devices
-h. echo "add vdisk2 2" >/proc/scsi_tgt/groups/Default/devices
-
-2. modprobe ib_srpt
-
-
-B. On initiator machines you can manualy do the following steps:
-1. modprobe ib_srp
-2. ibsrpdm -c (to discover new SRP target)
-3. echo <new target info> > /sys/class/infiniband_srp/srp-mthca0-1/add_target
-4. fdisk -l (will show new discovered scsi disks)
-
-Example:
-Assume that you use port 1 of first HCA in the system ie. mthca0
+The SRP target driver ib_srpt is based on the generic SCSI target
+infrastructure called SCST. It supports both the InfiniBand drivers included
+with the Linux kernel and the OpenFabrics InfiniBand software stack.

-[root@lab104 ~]# ibsrpdm -c -d /dev/infiniband/umad0
-id_ext=0002c90200226cf4,ioc_guid=0002c90200226cf4,
-dgid=fe800000000000000002c90200226cf5,pkey=ffff,service_id=0002c90200226cf4
-[root@lab104 ~]# echo id_ext=0002c90200226cf4,ioc_guid=0002c90200226cf4,
-dgid=fe800000000000000002c90200226cf5,pkey=ffff,service_id=0002c90200226cf4 >
-/sys/class/infiniband_srp/srp-mthca0-1/add_target
+Installation
+------------
+
+A. SRP target configuration
+
+1. Load the ib_srpt kernel module
+
+Add ib_srpt to the SCST_MODULES variable in /etc/init.d/scst such that ib_srpt
+is loaded automatically upon startup. Next, load the ib_srpt kernel module
+e.g. as follows:
+
+  touch /etc/scst.conf
+  /etc/init.d/scst start
+
+2. Configure SCST
+
+How to configure SCST is explained in detail in Documentation/scst/README.scst.
+Once you have finished configuring SCST, save the new configuration to
+/etc/scst.conf:

-OR
+  scstadmin -write_config /etc/scst.conf

-+ You can edit /etc/infiniband/openib.conf to load srp driver and srp HA daemon
-automatically ie. set SRP_LOAD=yes, and SRPHA_ENABLE=yes
-+ To set up and use high availability feature you need dm-multipath driver
-and multipath tool
-+ Please refer to OFED-1.x SRP's user manual for more in-details instructions
-on how-to enable/use HA feature

-To minimize QUEUE_FULL conditions, you can apply scst_increase_max_tgt_cmds
-patch from SRPT package from
http://sourceforge.net/project/showfiles.php?group_id=110471
+B. SRP initiator configuration

+Configure the initiator as follows:

-Performance notes
------------------
+1. Verify whether the InfiniBand subnet manager is operational, e.g.
as follows:
+     ping <IBoIB address of SRP target>

-In some cases, for instance working with SSD devices, which consume 100%
-of a single CPU load for data transfers in their internal threads, to
-maximize IOPS it can be needed to assign for those threads dedicated
-CPUs using Linux CPU affinity facilities. No IRQ processing should be
-done on those CPUs. Check that using /proc/interrupts. See taskset
-command and Documentation/IRQ-affinity.txt in your kernel's source tree
-for how to assign CPU affinity to tasks and IRQs.
+2. Load the SRP initator kernel module.
+     modprobe ib_srp

-The reason for that is that processing of coming commands in SIRQ context
-can be done on the same CPUs as SSD devices' threads doing data
-transfers. As the result, those threads won't receive all the CPU power
-and perform worse.
+3. Run ibsrpdm to obtain a list of available SRP target systems.
+     ibsrpdm -c

-Alternatively to CPU affinity assignment, you can try to enable SRP
-target's internal thread. It will allows Linux CPU scheduler to better
-distribute load among available CPUs. To enable SRP target driver's
-internal thread you should load ib_srpt module with parameter
-"thread=1".
+4. Tell the SRP initiator to log in to the SRP target.
+     echo <target info> > /sys/class/infiniband_srp/${SRP_HCA_NAME}/add_target
+
+5. Verify whether login succeeded, e.g. as follows:
+     lsscsi
+
+   SRP targets can be recognized in the lsscsi output by looking for
+   the disk names assigned to the SCST target ("disk01" in the example below):
+
+     [8:0:0:0]    disk    SCST_FIO disk01            102  /dev/sdb
+
+An example:
+
+[root@lab104 ~]# ibsrpdm -c -d /dev/infiniband/umad0
+id_ext=0002c90200226cf4,ioc_guid=0002c90200226cf4,
+dgid=fe800000000000000002c90200226cf5,pkey=ffff,service_id=0002c90200226cf4
+[root@lab104 ~]# echo id_ext=0002c90200226cf4,ioc_guid=0002c90200226cf4,
+dgid=fe800000000000000002c90200226cf5,pkey=ffff,service_id=0002c90200226cf4 >
+/sys/class/infiniband_srp/srp-mthca0-1/add_target


-Send questions about this driver to scst-devel-5NWGOfrQmneRv+LV9MX5uipxlwaOVQ5f@public.gmane.org, CC:
-Vu Pham <vuhuong-VPRAkNaXOzVWk0Htik3J/w@public.gmane.org> and Bart Van Assche <bart.vanassche-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>.
+C. High Availability
+
+If there are redundant paths in the IB network between initiator and target,
+automatic path failover can be set up on the initiator as follows:
+* Edit /etc/infiniband/openib.conf to load the SRP driver and SRP HA daemon
+  automatically: set SRP_LOAD=yes and SRPHA_ENABLE=yes.
+* To set up and use the high availability feature you need the dm-multipath
+  driver and multipath tool.
+* Please refer to the OFED-1.x user manual for more detailed instructions
+  on how to enable and how to use the HA feature. See e.g.
+  http://www.mellanox.com/related-docs/prod_software/Mellanox_OFED%20_Linux_user
+_manual_1_5_1_2.pdf.
+
+A setup with automatic failover between redundant targets is possible by
+installing and configuring replication software such as DRBD on both
+targets. If the initiator system supports mirroring (e.g. Linux), you can use
+the following approach:
+* Configure the replication software in Active/Active mode.
+* Configure the initiator(s) for mirroring between the redundant targets.
+
+If the initiator system does not support mirroring (e.g. VMware ESX), you
+can use the following approach:
+* Configure DRBD in Active/Passive mode and enable STONITH mode in the
+  Heartbeat software.
+
+
+D. Notes
+
+For workloads with large I/O depths increasing the SCST_MAX_TGT_DEV_COMMANDS
+constant in drivers/scst/scst_priv.h may improve performance.
+
+For latency sensitive applications, using the noop scheduler at the initiator
+side can give significantly better results than with other schedulers.
+
+The following initiator-side parameters have a small but measurable impact on
+SRP performance:
+  * /sys/class/block/${dev}/queue/rotational
+  * /sys/class/block/${dev}/queue/rq_affinity
+  * /proc/irq/${ib_int_no}/smp_affinity
+
+The ib_srpt kernel module supports the following parameters:
+* srp_max_req_size (number)
+  Maximum size of an SRP control message in bytes. Examples of SRP control
+  messages are: login request, logout request, data transfer request, ...
+  The larger this parameter, the more scatter/gather list elements can be
+  sent at once. Use the following formula to compute an appropriate value
+  for this parameter: 68 + 16 * (sg_tablesize). The default value of
+  this parameter is 2116, which corresponds to an sg table size of 128.
+* srp_max_rsp_size (number)
+  Maximum size of an SRP response message in bytes. Sense data is sent back
+  via these messages towards the initiator. The default size is 256 bytes.
+  With this value there remains (256-36) = 220 bytes for sense data.
+* srp_max_rdma_size (number)
+  Maximum number of bytes that may be transferred at once via RDMA. Defaults
+  to 65536 bytes, which is sufficient to use the full bandwidth of low-latency
+  HCAs. Increasing this value may decrease latency for applications
+  transferring large amounts of data at once.
+* srpt_srq_size (number, default 4095)
+  ib_srpt uses a shared receive queue (SRQ) for processing incoming SRP
+  requests. This number may have to be increased when a large number of
+  initiator systems is accessing a single SRP target system.
+* srpt_sq_size (number, default 4096)
+  Per-channel InfiniBand send queue size. The default setting is sufficient
+  for a credit limit of 128. Changing this parameter to a smaller value may
+  cause RDMA requests to be retried and hence may slow down data transfer
+  severely.
+* thread (0, 1 or 2, default 1)
+  Defines the context on which SRP requests are processed:
+  * thread=0: do as much processing in IRQ context as possible. Results in
+    lower latency than the other two modes but may trigger soft lockup
+    complaints when multiple initiators are simultaneously processing
+    workloads with large I/O depths. Scalability of this mode is limited
+    - it exploits only a fraction of the power available on multiprocessor
+    systems.
+  * thread=1: dedicates one kernel thread per initiator. Scales well on
+    multiprocessor systems. This is the recommended mode when multiple
+    initiator systems are accessing the same target system simultaneously.
+  * thread=2: makes one CPU process all IB completions and defer further
+    processing to kernel thread context. Scales better than mode thread=0 but
+    not as good as mode thread=1. May trigger soft lockup complaints when
+    multiple initiators are simultaneously processing workloads with large I/O
+    depths.
+* trace_flag (unsigned integer, only available in debug builds)
+  The individual bits of the trace_flag parameter define which categories of
+  trace messages should be sent to the kernel log and which ones not.
-- 
1.7.1
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [PATCH 4/7] [SCSI] scst: Add SRP target driver
       [not found] ` <201012201849.27639.bvanassche-HInyCGIudOg@public.gmane.org>
@ 2011-01-05 20:21   ` Roland Dreier
       [not found]     ` <adaaajfdrtr.fsf-FYB4Gu1CFyUAvxtiuMwx3w@public.gmane.org>
  0 siblings, 1 reply; 6+ messages in thread
From: Roland Dreier @ 2011-01-05 20:21 UTC (permalink / raw)
  To: Bart Van Assche
  Cc: linux-scsi-u79uwXL29TY76Z2rM5mHXA,
	scst-devel-5NWGOfrQmneRv+LV9MX5uipxlwaOVQ5f,
	linux-rdma-u79uwXL29TY76Z2rM5mHXA, James.Bottomley-l3A5Bk7waGM,
	Vu Pham, Roland Dreier, David Dillow

So, just looking over some of the atomic_t usage here (which is usually
one of the first places I review):

 > +struct srpt_send_ioctx {
 > ...
 > +	atomic_t		state;

this seems to be accessed only with atomic_read(), atomic_set() and
atomic_cmpxchg() (without any memory barriers).  It's rather hard to see
that this is correct (and indeed without memory barriers I suspect it's
not; also since atomic_read() does not protect the state from changing
immediately afterwards there may be other races).

I think it might be better to handle this in a simpler, more naive
way -- just have a lock (probably an existing one) that protects the
contents of state and not use cmpxchg().

In any case since no "real" atomic operations are used, I suspect it
would be better to just code this in terms of unsigned int and regular
cmpxchg().

Also, there is processing_compl:

 > +static void srpt_completion(struct ib_cq *cq, void *ctx)
 > +{
 > +	struct srpt_rdma_ch *ch = ctx;
 > +
 > +	BUG_ON(!ch);
 > +	atomic_inc(&ch->processing_compl);

and

 > +static void srpt_unregister_channel(struct srpt_rdma_ch *ch)
 > ...
 > +	while (atomic_read(&ch->processing_compl))
 > +		;

this seems racy to me -- I don't see any reason why we couldn't have:

	srpt_completion()

					srpt_unregister_channel()
					  processing_compl == 0,
					  continue

	  atomic_inc(&ch->processing_compl);

					  finish unregistering channel

	  use unregistered channel

 - R.

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH 4/7] [SCSI] scst: Add SRP target driver
       [not found]     ` <adaaajfdrtr.fsf-FYB4Gu1CFyUAvxtiuMwx3w@public.gmane.org>
@ 2011-01-06 17:41       ` Bart Van Assche
  2011-01-06 19:00         ` Roland Dreier
  0 siblings, 1 reply; 6+ messages in thread
From: Bart Van Assche @ 2011-01-06 17:41 UTC (permalink / raw)
  To: Roland Dreier
  Cc: linux-scsi-u79uwXL29TY76Z2rM5mHXA,
	scst-devel-5NWGOfrQmneRv+LV9MX5uipxlwaOVQ5f,
	linux-rdma-u79uwXL29TY76Z2rM5mHXA, James.Bottomley-l3A5Bk7waGM,
	Vu Pham, Roland Dreier, David Dillow

On Wed, Jan 5, 2011 at 9:21 PM, Roland Dreier <rdreier-FYB4Gu1CFyUAvxtiuMwx3w@public.gmane.org> wrote:
>
> So, just looking over some of the atomic_t usage here (which is usually
> one of the first places I review):
>
>  > +struct srpt_send_ioctx {
>  > ...
>  > +    atomic_t                state;
>
> this seems to be accessed only with atomic_read(), atomic_set() and
> atomic_cmpxchg() (without any memory barriers).  It's rather hard to see
> that this is correct (and indeed without memory barriers I suspect it's
> not; also since atomic_read() does not protect the state from changing
> immediately afterwards there may be other races).
>
> I think it might be better to handle this in a simpler, more naive
> way -- just have a lock (probably an existing one) that protects the
> contents of state and not use cmpxchg().
>
> In any case since no "real" atomic operations are used, I suspect it
> would be better to just code this in terms of unsigned int and regular
> cmpxchg().

Hello Roland,

Thanks for the feedback.

I'll have a look at converting these atomic operations into regular
locking. The current implementation should be fine though.
Manipulating I/O context state only happens inside callback functions.
These callback functions are either invoked by the SCST core or from
inside an IB completion callback. For each I/O context at most one
such callback function is active at any time. Even if different
callback functions would be invoked on different CPU cores, it is the
responsibility of either the SCST core or the IB core / driver to
provide proper synchronization (memory barrier).

There is one exception though: the function srpt_pending_cmd_timeout()
can in theory proceed concurrently with an IB completion callback.
This is why I/O context state manipulation was implemented via atomic
operations. That race is unlikely to cause any harm though: the
callback srpt_pending_cmd_timeout() is only invoked if 60 seconds
(srpt_template.max_hw_pending_time) after an IB send was posted no IB
completion has been received. This timeout should be a magnitude
larger than the time needed to receive an IB completion indicating
either success or failure.

Please note that with IB firmware that complies to the IB specs the
function srpt_pending_cmd_timeout() will never be invoked because
before that function gets invoked an IB completion will already have
been received.

> Also, there is processing_compl:
>
>  > +static void srpt_completion(struct ib_cq *cq, void *ctx)
>  > +{
>  > +    struct srpt_rdma_ch *ch = ctx;
>  > +
>  > +    BUG_ON(!ch);
>  > +    atomic_inc(&ch->processing_compl);
>
> and
>
>  > +static void srpt_unregister_channel(struct srpt_rdma_ch *ch)
>  > ...
>  > +    while (atomic_read(&ch->processing_compl))
>  > +            ;
>
> this seems racy to me -- I don't see any reason why we couldn't have:
>
>        srpt_completion()
>
>                                        srpt_unregister_channel()
>                                          processing_compl == 0,
>                                          continue
>
>          atomic_inc(&ch->processing_compl);
>
>                                          finish unregistering channel
>
>          use unregistered channel

I'm not sure what the above pseudo-code should do ?

Regarding the current implementation: there is a hard requirement in
SCST that no new commands are queued for a given session after
scst_unregister_session() has been invoked. So
scst_unregister_session() must only be invoked after the IB queue pair
has been reset *and* srpt_completion() has finished. It would be great
if that could be implemented without using one or another kind of
counter. I'm not sure however whether it is possible to eliminate the
"processing_compl" counter entirely.

Bart.
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH 4/7] [SCSI] scst: Add SRP target driver
  2011-01-06 17:41       ` Bart Van Assche
@ 2011-01-06 19:00         ` Roland Dreier
  0 siblings, 0 replies; 6+ messages in thread
From: Roland Dreier @ 2011-01-06 19:00 UTC (permalink / raw)
  To: Bart Van Assche
  Cc: linux-scsi, scst-devel, linux-rdma, James.Bottomley, Vu Pham,
	Roland Dreier, David Dillow

 > I'll have a look at converting these atomic operations into regular
 > locking. The current implementation should be fine though.

I believe that the current implementation is correct.  However it is
much harder for someone naive like me to understand, since using cmpxchg
is much subtler than just using a lock to protect data.  If this isn't
on the hottest of hot paths then I think cmpxchg is over-optimization --
much better to have less fancy code.

 > > Also, there is processing_compl:
 > >
 > >  > +static void srpt_completion(struct ib_cq *cq, void *ctx)
 > >  > +{
 > >  > +    struct srpt_rdma_ch *ch = ctx;
 > >  > +
 > >  > +    BUG_ON(!ch);
 > >  > +    atomic_inc(&ch->processing_compl);
 > >
 > > and
 > >
 > >  > +static void srpt_unregister_channel(struct srpt_rdma_ch *ch)
 > >  > ...
 > >  > +    while (atomic_read(&ch->processing_compl))
 > >  > +            ;
 > >
 > > this seems racy to me -- I don't see any reason why we couldn't have:
 > >
 > >        srpt_completion()
 > >
 > >                                        srpt_unregister_channel()
 > >                                          processing_compl == 0,
 > >                                          continue
 > >
 > >          atomic_inc(&ch->processing_compl);
 > >
 > >                                          finish unregistering channel
 > >
 > >          use unregistered channel
 > 
 > I'm not sure what the above pseudo-code should do ?

I'm trying to illustrate the race by showing what two different contexts
might be doing -- the left column is one context, the right column is
the other context.

In this specific case, if the while loop testing processing_compl
actually ever does anything then I don't see how it could be safe -- if
srpt_completion() could be executing during the while loop, then I don't
see anything that prevents the while loop from finishing before
srpt_completion does its atomic_inc().

 > Regarding the current implementation: there is a hard requirement in
 > SCST that no new commands are queued for a given session after
 > scst_unregister_session() has been invoked. So
 > scst_unregister_session() must only be invoked after the IB queue pair
 > has been reset *and* srpt_completion() has finished. It would be great
 > if that could be implemented without using one or another kind of
 > counter. I'm not sure however whether it is possible to eliminate the
 > "processing_compl" counter entirely.

I don't pretend to understand the flow of code here.  But it seems the
only safe way to implement this cleanup code is to stop posting work
requests and then wait until all the requests you've posted are
complete.  Trying to wait for the completion handler to stop executing
does not seem possible to implement in a safe way.

 - R.
--
To unsubscribe from this list: send the line "unsubscribe linux-scsi" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2011-01-06 19:00 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-12-20 17:49 [PATCH 4/7] [SCSI] scst: Add SRP target driver Bart Van Assche
2010-12-21  1:50 ` Jack Wang
     [not found]   ` <544F03C80B314396A5517F4CD1A848B6-aA8Fybl81pBv1O+Z8WTAqQ@public.gmane.org>
2010-12-21  9:50     ` Bart Van Assche
     [not found] ` <201012201849.27639.bvanassche-HInyCGIudOg@public.gmane.org>
2011-01-05 20:21   ` Roland Dreier
     [not found]     ` <adaaajfdrtr.fsf-FYB4Gu1CFyUAvxtiuMwx3w@public.gmane.org>
2011-01-06 17:41       ` Bart Van Assche
2011-01-06 19:00         ` Roland Dreier

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).