netdev.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* Distributed storage.
@ 2007-07-31 17:13 Evgeniy Polyakov
  2007-08-02 21:08 ` Daniel Phillips
                   ` (4 more replies)
  0 siblings, 5 replies; 81+ messages in thread
From: Evgeniy Polyakov @ 2007-07-31 17:13 UTC (permalink / raw)
  To: netdev; +Cc: linux-kernel, linux-fsdevel

Hi.

I'm pleased to announce first release of the distributed storage
subsystem, which allows to form a storage on top of remote and local
nodes, which in turn can be exported to another storage as a node to
form tree-like storages.

There is number of main features, this device supports:
    * zero additional allocations in the common fast path (only one per node if
	network queue is full) not counting network alocations
    * zero-copy sending (except header) if supported by device using sendpage()
    * ability to use any implemented algorithm (linear algo implemented)
    * plugable mapping algorithms
    * failover recovery in case of broken link (reconnection if remote node 
	is down)
    * ability to suspend remote node for maintenance without breaking dataflow 
	to another nodes (if supported by algorithm and block layer) and 
	without turning down main node
    * initial autoconfiguration (ability to request remote node size and use 
	that dynamic data during array setup time)
    * non-blocking network data processing (except headers, which are 
	sent/received in blocking mode, can be simply changed to non-blocking 
	too by increasing request size to store state) without busy loops 
	checking return valu of processing functions. Non-blocking data 
	processing is based on ->poll() state machine with only one working 
	thread per storage.
    * support for any kind of network media (not limited to tcp or inet 
	protocols) higher MAC layer (socket layer), data consistensy must be 
	part of the protocol (i.e. will lose data with UDP in favour of 
	performance)
    * no need for any special tools for data processing (like special 
	userspace applications) except for configuration
    * userspace and kernelspace targets. Userspace target can work on top of 
	usual files. (Windows or any other OS userspace target support can be 
	trivially added on request)

Compared to other similar approaches namely iSCSI and NBD, 
there are following advantages:
    * non-blocking processing without busy loops (compared to both above)
    * small, plugable architecture
    * failover recovery (reconnect to remote target)
    * autoconfiguration (full absence in NBD and/or device mapper on top of it)
    * no additional allocatins (not including network part) - at least two in 
	device mapper for fast path
    * very simple - try to compare with iSCSI
    * works with different network protocols
    * storage can be formed on top of remote nodes and be exported 
	simultaneously (iSCSI is peer-to-peer only, NBD requires device 
	mapper and is synchronous)

TODO list currently includes following main items:
    * redundancy algorithm (drop me a request of your own, but it is highly 
	unlikley that Reed-Solomon based will ever be used - it is too slow 
	for distributed RAID, I consider WEAVER codes)
    * extended autoconfiguration
    * move away from ioctl based configuration

Patch, userspace configuration utility and userspace target can be found
on project homepage:

http://tservice.net.ru/~s0mbre/old/?section=projects&item=dst

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>

 drivers/block/Kconfig          |    2 +
 drivers/block/Makefile         |    1 +
 drivers/block/dst/Kconfig      |   12 +
 drivers/block/dst/Makefile     |    5 +
 drivers/block/dst/alg_linear.c |  348 ++++++++++
 drivers/block/dst/dcore.c      | 1222 ++++++++++++++++++++++++++++++++++
 drivers/block/dst/kst.c        | 1437 ++++++++++++++++++++++++++++++++++++++++
 include/linux/dst.h            |  282 ++++++++
 8 files changed, 3309 insertions(+), 0 deletions(-)

diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
index b4c8319..ca6592d 100644
--- a/drivers/block/Kconfig
+++ b/drivers/block/Kconfig
@@ -451,6 +451,8 @@ config ATA_OVER_ETH
 	This driver provides Support for ATA over Ethernet block
 	devices like the Coraid EtherDrive (R) Storage Blade.
 
+source "drivers/block/dst/Kconfig"
+
 source "drivers/s390/block/Kconfig"
 
 endmenu
diff --git a/drivers/block/Makefile b/drivers/block/Makefile
index dd88e33..fcf042d 100644
--- a/drivers/block/Makefile
+++ b/drivers/block/Makefile
@@ -29,3 +29,4 @@ obj-$(CONFIG_VIODASD)		+= viodasd.o
 obj-$(CONFIG_BLK_DEV_SX8)	+= sx8.o
 obj-$(CONFIG_BLK_DEV_UB)	+= ub.o
 
+obj-$(CONFIG_DST)		+= dst/
diff --git a/drivers/block/dst/Kconfig b/drivers/block/dst/Kconfig
new file mode 100644
index 0000000..874d2e4
--- /dev/null
+++ b/drivers/block/dst/Kconfig
@@ -0,0 +1,12 @@
+config DST
+	tristate "Distributed storage"
+	depends on NET
+	---help---
+	This driver allows to create a distributed storage.
+
+config DST_ALG_LINEAR
+	tristate "Linear distribution algorithm"
+	depends on DST
+	---help---
+	This module allows to create linear mapping of the nodes
+	in the distributed storage.
diff --git a/drivers/block/dst/Makefile b/drivers/block/dst/Makefile
new file mode 100644
index 0000000..48b7777
--- /dev/null
+++ b/drivers/block/dst/Makefile
@@ -0,0 +1,5 @@
+obj-$(CONFIG_DST) += dst.o
+
+dst-y := dcore.o kst.o
+
+obj-$(CONFIG_DST_ALG_LINEAR) += alg_linear.o
diff --git a/drivers/block/dst/alg_linear.c b/drivers/block/dst/alg_linear.c
new file mode 100644
index 0000000..9a134fc
--- /dev/null
+++ b/drivers/block/dst/alg_linear.c
@@ -0,0 +1,348 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/dst.h>
+
+static struct dst_alg *alg_linear;
+static struct bio_set *dst_linear_bio_set;
+
+/*
+ * This callback is invoked when node is removed from storage.
+ */
+static void dst_linear_del_node(struct dst_node *n)
+{
+}
+
+/*
+ * This callback is invoked when node is added to storage.
+ */
+static int dst_linear_add_node(struct dst_node *n)
+{
+	struct dst_storage *st = n->st;
+
+	n->start = st->disk_size;
+	st->disk_size += n->size;
+
+	return 0;
+}
+
+/*
+ * Internal callback for local requests (i.e. for local disk),
+ * which are splitted between nodes (part with local node destination
+ * ends up with this ->bi_end_io() callback).
+ */
+static int dst_linear_end_io(struct bio *bio, unsigned int size, int err)
+{
+	struct bio *orig_bio = bio->bi_private;
+
+	if (err)
+		printk("%s: bio: %p, orig_bio: %p, size: %u, orig_size: %u.\n", 
+			__func__, bio, orig_bio, size, orig_bio->bi_size);
+
+	bio_endio(orig_bio, size, 0);
+	bio_put(bio);
+	return 0;
+}
+
+static void dst_linear_destructor(struct bio *bio)
+{
+	bio_free(bio, dst_linear_bio_set);
+}
+
+/*
+ * This function sends processing request down to block layer (for local node)
+ * or to network state machine (for remote node).
+ */
+static int dst_linear_node_push(struct dst_request *req)
+{
+	int err = 0;
+
+	if (req->state->node->bdev) {
+		struct bio *bio = req->bio;
+
+		dprintk("%s: start: %llu, num: %d, idx: %d, offset: %u, "
+				"size: %llu, bi_idx: %d, bi_vcnt: %d.\n",
+			__func__, req->start, req->num, req->idx, 
+			req->offset, req->size,	bio->bi_idx, bio->bi_vcnt);
+
+		if (likely(bio->bi_idx == req->idx && 
+					bio->bi_vcnt == req->num)) {
+			bio->bi_bdev = req->state->node->bdev;
+			bio->bi_sector = req->start;
+			generic_make_request(bio);
+			goto out_put;
+		} else {
+			struct bio *clone = bio_alloc_bioset(GFP_NOIO, 
+					bio->bi_max_vecs, dst_linear_bio_set);
+			struct bio_vec *bv;
+
+			err = -ENOMEM;
+			if (!clone)
+				goto out_put;
+
+			dprintk("%s: start: %llu, num: %d, idx: %d, "
+					"offset: %u, size: %llu, "
+					"bi_idx: %d, bi_vcnt: %d.\n",
+				__func__, req->start, req->num, req->idx, 
+				req->offset, req->size, 
+				bio->bi_idx, bio->bi_vcnt);
+
+			__bio_clone(clone, bio);
+
+			bv = bio_iovec_idx(clone, req->idx);
+			bv->bv_offset += req->offset;
+			clone->bi_idx = req->idx;
+			clone->bi_vcnt = req->num;
+			clone->bi_bdev = req->state->node->bdev;
+			clone->bi_sector = req->start;
+			clone->bi_destructor = dst_linear_destructor;
+			clone->bi_private = bio;
+			clone->bi_size = req->orig_size;
+			clone->bi_end_io = &dst_linear_end_io;
+
+			generic_make_request(clone);
+			err = 0;
+			goto out_put;
+		}
+	}
+
+	err = req->state->node->state->ops->push(req);
+
+out_put:
+	dst_node_put(req->state->node);
+	return err;
+}
+
+/*
+ * This callback is invoked from block layer request processing function,
+ * its task is to remap block request to different nodes.
+ */
+static int dst_linear_remap(struct dst_storage *st, struct bio *bio)
+{
+	struct dst_node *n;
+	int err = -EINVAL, i, cnt;
+	unsigned int bio_sectors = bio->bi_size>>9;
+	struct bio_vec *bv;
+	struct dst_request req;
+	u64 rest_in_node, start, total_size;
+
+	mutex_lock(&st->tree_lock);
+	n = dst_storage_tree_search(st, bio->bi_sector);
+	mutex_unlock(&st->tree_lock);
+
+	if (!n) {
+		dprintk("%s: failed to find a node for bio: %p, "
+				"sector: %llu.\n", 
+				__func__, bio, bio->bi_sector);
+		return -ENODEV;
+	}
+
+	dprintk("%s: bio: %llu-%llu, dev: %llu-%llu, in sectors.\n",
+			__func__, bio->bi_sector, bio->bi_sector+bio_sectors, 
+			n->start, n->start+n->size);
+
+	memset(&req, 0, sizeof(struct dst_request));
+
+	start = bio->bi_sector;
+	total_size = bio->bi_size;
+
+	req.flags = (test_bit(DST_NODE_FROZEN, &n->flags))?
+				DST_REQ_ALWAYS_QUEUE:0;
+	req.start = start - n->start;
+	req.offset = 0;
+	req.state = n->state;
+	req.bio = bio;
+
+	req.size = bio->bi_size;
+	req.orig_size = bio->bi_size;
+	req.idx = 0;
+	req.num = bio->bi_vcnt;
+
+	/*
+	 * Common fast path - block request does not cross
+	 * boundaries between nodes.
+	 */
+	if (likely(bio->bi_sector + bio_sectors <= n->start + n->size))
+		return dst_linear_node_push(&req);
+
+	req.size = 0;
+	req.idx = 0;
+	req.num = 1;
+
+	cnt = bio->bi_vcnt;
+
+	rest_in_node = to_bytes(n->size - req.start);
+
+	for (i=0; i<cnt; ++i) {
+		bv = bio_iovec_idx(bio, i);
+
+		if (req.size + bv->bv_len >= rest_in_node) {
+			unsigned int diff = req.size + bv->bv_len - 
+				rest_in_node;
+
+			req.size += bv->bv_len - diff;
+			req.start = start - n->start;
+			req.orig_size = req.size;
+
+			dprintk("%s: split: start: %llu/%llu, size: %llu, "
+					"total_size: %llu, diff: %u, idx: %d, "
+					"num: %d, bv_len: %u, bv_offset: %u.\n",
+					__func__, start, req.start, req.size, 
+					total_size, diff, req.idx, req.num,
+					bv->bv_len, bv->bv_offset);
+
+			err = dst_linear_node_push(&req);
+			if (err)
+				break;
+
+			total_size -= req.orig_size;
+
+			if (!total_size)
+				break;
+
+			start += to_sector(req.orig_size);
+
+			req.flags = (test_bit(DST_NODE_FROZEN, &n->flags))?
+				DST_REQ_ALWAYS_QUEUE:0;
+			req.orig_size = req.size = diff;
+
+			if (diff) {
+				req.offset = bv->bv_len - diff;
+				req.idx = req.num - 1;
+			} else {
+				req.idx = req.num;
+				req.offset = 0;
+			}
+
+			dprintk("%s: next: start: %llu, size: %llu, "
+				"total_size: %llu, diff: %u, idx: %d, "
+				"num: %d, offset: %u, bv_len: %u, "
+				"bv_offset: %u.\n",
+				__func__, start, req.size, total_size, diff, 
+				req.idx, req.num, req.offset,
+				bv->bv_len, bv->bv_offset);
+
+			mutex_lock(&st->tree_lock);
+			n = dst_storage_tree_search(st, start);
+			mutex_unlock(&st->tree_lock);
+
+			if (!n) {
+				err = -ENODEV;
+				dprintk("%s: failed to find a split node for "
+				  "bio: %p, sector: %llu, start: %llu.\n", 
+						__func__, bio, bio->bi_sector, 
+						req.start);
+				break;
+			}
+
+			req.state = n->state;
+			req.start = start - n->start;
+			rest_in_node = to_bytes(n->size - req.start);
+
+			dprintk("%s: req.start: %llu, start: %llu, "
+					"dev_start: %llu, dev_size: %llu, "
+					"rest_in_node: %llu.\n",
+				__func__, req.start, start, n->start, 
+				n->size, rest_in_node);
+		} else {
+			req.size += bv->bv_len;
+			req.num++;
+		}
+	}
+
+	dprintk("%s: last request: start: %llu, size: %llu, "
+			"total_size: %llu.\n", __func__, 
+			req.start, req.size, total_size);
+	if (total_size) {
+		req.orig_size = req.size;
+
+		dprintk("%s: last: start: %llu/%llu, size: %llu, "
+				"total_size: %llu, idx: %d, num: %d.\n",
+			__func__, start, req.start, req.size, 
+			total_size, req.idx, req.num);
+
+		err = dst_linear_node_push(&req);
+		if (!err) {
+			total_size -= req.orig_size;
+
+			BUG_ON(total_size != 0);
+		}
+			
+	}
+	
+	dprintk("%s: end bio: %p, err: %d.\n", __func__, bio, err);
+	return err;
+}
+
+/*
+ * Failover callback - it is invoked each time error happens during 
+ * request processing.
+ */
+static int dst_linear_error(struct kst_state *st, int err)
+{
+	if (!err)
+		return 0;
+
+	if (err == -ECONNRESET || err == -EPIPE) {
+		if (st->ops->recovery(st, err)) {
+			err = st->ops->recovery(st, err);
+			if (err) {
+				set_bit(DST_NODE_FROZEN, &st->node->flags);
+			} else {
+				clear_bit(DST_NODE_FROZEN, &st->node->flags);
+			}
+			err = 0;
+		}
+	}
+
+	return err;
+}
+
+static struct dst_alg_ops alg_linear_ops = {
+	.remap		= dst_linear_remap,
+	.add_node 	= dst_linear_add_node,
+	.del_node 	= dst_linear_del_node,
+	.error		= dst_linear_error,
+	.owner		= THIS_MODULE,
+};
+
+static int __devinit alg_linear_init(void)
+{
+	dst_linear_bio_set = bioset_create(32, 32);
+	if (!dst_linear_bio_set)
+		panic("bio: can't allocate bios\n");
+
+	alg_linear = dst_alloc_alg("alg_linear", &alg_linear_ops);
+	if (!alg_linear)
+		return -ENOMEM;
+
+	return 0;
+}
+
+static void __devexit alg_linear_exit(void)
+{
+	dst_remove_alg(alg_linear);
+	bioset_free(dst_linear_bio_set);
+}
+
+module_init(alg_linear_init);
+module_exit(alg_linear_exit);
+
+MODULE_LICENSE("GPL");
+MODULE_AUTHOR("Evgeniy Polyakov <johnpol@2ka.mipt.ru>");
+MODULE_DESCRIPTION("Linear distributed algorithm.");
diff --git a/drivers/block/dst/dcore.c b/drivers/block/dst/dcore.c
new file mode 100644
index 0000000..fd11f86
--- /dev/null
+++ b/drivers/block/dst/dcore.c
@@ -0,0 +1,1222 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/slab.h>
+#include <linux/miscdevice.h>
+#include <linux/socket.h>
+#include <linux/dst.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/buffer_head.h>
+
+#include <net/sock.h>
+
+static LIST_HEAD(dst_storage_list);
+static LIST_HEAD(dst_alg_list);
+static DEFINE_MUTEX(dst_storage_lock);
+static DEFINE_MUTEX(dst_alg_lock);
+static int dst_major;
+static struct kst_worker *kst_main_worker;
+
+struct kmem_cache *dst_request_cache;
+
+/*
+ * DST sysfs tree. For device called 'storage' which is formed
+ * on top of two nodes this looks like this:
+ *
+ * /sys/devices/storage/
+ * /sys/devices/storage/alg : alg_linear
+ * /sys/devices/storage/n-800/type : R: 192.168.4.80:1025
+ * /sys/devices/storage/n-800/size : 800
+ * /sys/devices/storage/n-800/start : 800
+ * /sys/devices/storage/n-0/type : R: 192.168.4.81:1025
+ * /sys/devices/storage/n-0/size : 800
+ * /sys/devices/storage/n-0/start : 0
+ * /sys/devices/storage/remove_all_nodes
+ * /sys/devices/storage/nodes : sectors (start [size]): 0 [800] | 800 [800]
+ * /sys/devices/storage/name : storage
+ */
+
+static int dst_dev_match(struct device *dev, struct device_driver *drv)
+{
+	return 1;
+}
+
+static void dst_dev_release(struct device *dev)
+{
+}
+
+static struct bus_type dst_dev_bus_type = {
+	.name 		= "dst",
+	.match 		= &dst_dev_match,
+};
+
+static struct device dst_dev = {
+	.bus 		= &dst_dev_bus_type,
+	.release 	= &dst_dev_release
+};
+
+static void dst_node_release(struct device *dev)
+{
+}
+
+static struct device dst_node_dev = {
+	.release 	= &dst_node_release
+};
+
+/*
+ * Distributed storage erquest processing function.
+ * It calls algorithm spcific remapping code only.
+ */
+static int dst_request(request_queue_t *q, struct bio *bio)
+{
+	struct dst_storage *st = q->queuedata;
+	int err;
+
+	dprintk("\n%s: start: st: %p, bio: %p, cnt: %u.\n", 
+			__func__, st, bio, bio->bi_vcnt);
+
+	err = st->alg->ops->remap(st, bio);
+
+	dprintk("%s: end: st: %p, bio: %p, err: %d.\n", 
+			__func__, st, bio, err);
+
+	if (err) {
+		printk("%s: remap failed: bio: %p, err: %d.\n", 
+				__func__, bio, err);
+		bio_endio(bio, bio->bi_size, -EIO);
+	}
+	return 0;
+}
+
+static void dst_unplug(request_queue_t *q)
+{
+}
+
+static int dst_flush(request_queue_t *q, struct gendisk *disk, sector_t *sec)
+{
+	return 0;
+}
+
+static struct block_device_operations dst_blk_ops = {
+	.owner =	THIS_MODULE,
+};
+
+/*
+ * Block layer binding - disk is created when array is fully configured
+ * by userspace request.
+ */
+static int dst_create_disk(struct dst_storage *st)
+{
+	int err;
+
+	err = -ENOMEM;
+	st->queue = blk_alloc_queue(GFP_KERNEL);
+	if (!st->queue)
+		goto err_out_exit;
+
+	st->queue->queuedata = st;
+	blk_queue_make_request(st->queue, dst_request);
+	blk_queue_bounce_limit(st->queue, BLK_BOUNCE_ANY);
+	st->queue->unplug_fn = dst_unplug;
+	st->queue->issue_flush_fn = dst_flush;
+	
+	err = -EINVAL;
+	st->disk = alloc_disk(1);
+	if (!st->disk)
+		goto err_out_free_queue;
+
+	st->disk->major = dst_major;
+	st->disk->first_minor = 0;
+	st->disk->fops = &dst_blk_ops;
+	st->disk->queue = st->queue;
+	st->disk->private_data = st;
+	snprintf(st->disk->disk_name, sizeof(st->disk->disk_name), 
+			"dst-%s-%d", st->name, st->disk->first_minor);
+
+	return 0;
+
+err_out_free_queue:
+	blk_cleanup_queue(st->queue);
+err_out_exit:
+	return err;
+}
+
+static void dst_remove_disk(struct dst_storage *st)
+{
+	del_gendisk(st->disk);
+	put_disk(st->disk);
+	blk_cleanup_queue(st->queue);
+}
+
+/*
+ * Shows node name in sysfs.
+ */
+static ssize_t dst_name_show(struct device *dev, 
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_storage *st = container_of(dev, struct dst_storage, device);
+
+	return sprintf(buf, "%s\n", st->name);
+}
+
+static void dst_remove_all_nodes(struct dst_storage *st)
+{
+	struct dst_node *n;
+	struct rb_node *rb_node;
+
+	mutex_lock(&st->tree_lock);
+	while ((rb_node = rb_first(&st->tree_root)) != NULL) {
+		n = rb_entry(rb_node, struct dst_node, tree_node);
+		dprintk("%s: n: %p, start: %llu, size: %llu.\n", 
+				__func__, n, n->start, n->size);
+		rb_erase(&n->tree_node, &st->tree_root);
+		dst_node_put(n);
+	}
+	mutex_unlock(&st->tree_lock);
+}
+
+/*
+ * Shows node layout in syfs.
+ */
+static ssize_t dst_nodes_show(struct device *dev, 
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_storage *st = container_of(dev, struct dst_storage, device);
+	int size = PAGE_CACHE_SIZE, sz;
+	struct dst_node *n;
+	struct rb_node *rb_node;
+
+	sz = sprintf(buf, "sectors (start [size]): ");
+	size -= sz;
+	buf += sz;
+
+	mutex_lock(&st->tree_lock);
+	for (rb_node = rb_first(&st->tree_root); rb_node; 
+			rb_node = rb_next(rb_node)) {
+		n = rb_entry(rb_node, struct dst_node, tree_node);
+		if (size < 32)
+			break;
+		sz = sprintf(buf, "%llu [%llu]", n->start, n->size);
+		buf += sz;
+		size -= sz;
+
+		if (!rb_next(rb_node))
+			break;
+		
+		sz = sprintf(buf, " | ");
+		buf += sz;
+		size -= sz;
+	}
+	mutex_unlock(&st->tree_lock);
+	size -= sprintf(buf, "\n");
+	return PAGE_CACHE_SIZE - size;
+}
+
+/*
+ * Algorithm currently being used by given storage.
+ */
+static ssize_t dst_alg_show(struct device *dev, 
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_storage *st = container_of(dev, struct dst_storage, device);
+	return sprintf(buf, "%s\n", st->alg->name);
+}
+
+/*
+ * Writing to this sysfs file allows to remove all nodes
+ * and storage itself automatically.
+ */
+static ssize_t dst_remove_nodes(struct device *dev, 
+		struct device_attribute *attr, 
+		const char *buf, size_t count)
+{
+	struct dst_storage *st = container_of(dev, struct dst_storage, device);
+	dst_remove_all_nodes(st);
+	return count;
+}
+
+static DEVICE_ATTR(name, 0444, dst_name_show, NULL);
+static DEVICE_ATTR(nodes, 0444, dst_nodes_show, NULL);
+static DEVICE_ATTR(alg, 0444, dst_alg_show, NULL);
+static DEVICE_ATTR(remove_all_nodes, 0644, NULL, dst_remove_nodes);
+
+static int dst_create_storage_attributes(struct dst_storage *st)
+{
+	int err;
+
+	err = device_create_file(&st->device, &dev_attr_name);
+	err = device_create_file(&st->device, &dev_attr_nodes);
+	err = device_create_file(&st->device, &dev_attr_alg);
+	err = device_create_file(&st->device, &dev_attr_remove_all_nodes);
+	return 0;
+}
+
+static void dst_remove_storage_attributes(struct dst_storage *st)
+{
+	device_remove_file(&st->device, &dev_attr_name);
+	device_remove_file(&st->device, &dev_attr_nodes);
+	device_remove_file(&st->device, &dev_attr_alg);
+	device_remove_file(&st->device, &dev_attr_remove_all_nodes);
+}
+
+static void dst_storage_sysfs_exit(struct dst_storage *st)
+{
+	dst_remove_storage_attributes(st);
+	device_unregister(&st->device);
+}
+
+static int dst_storage_sysfs_init(struct dst_storage *st)
+{
+	int err;
+
+	memcpy(&st->device, &dst_dev, sizeof(struct device));
+	snprintf(st->device.bus_id, sizeof(st->device.bus_id), "%s", st->name);
+
+	err = device_register(&st->device);
+	if (err) {
+		dprintk(KERN_ERR "Failed to register dst device %s, err: %d.\n",
+			st->name, err);
+		goto err_out_exit;
+	}
+
+	dst_create_storage_attributes(st);
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+/*
+ * This functions shows size and start of the appropriate node.
+ * Both are in sectors.
+ */
+static ssize_t dst_show_start(struct device *dev, 
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+
+	return sprintf(buf, "%llu\n", n->start);
+}
+
+static ssize_t dst_show_size(struct device *dev, 
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+
+	return sprintf(buf, "%llu\n", n->size);
+}
+
+/*
+ * Shows type of the remote node - device major/minor number
+ * for local nodes and address (af_inet ipv4/ipv6 only) for remote nodes.
+ */
+static ssize_t dst_show_type(struct device *dev, 
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+	struct sockaddr addr;
+	struct socket *sock;
+	int addrlen;
+
+	if (!n->state && !n->bdev)
+		return 0;
+
+	if (n->bdev)
+		return sprintf(buf, "L: %d:%d\n", 
+				MAJOR(n->bdev->bd_dev), MINOR(n->bdev->bd_dev));
+
+	sock = n->state->socket;
+	if (sock->ops->getname(sock, &addr, &addrlen, 2))
+		return 0;
+
+	if (sock->ops->family == AF_INET) {
+		struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+		return sprintf(buf, "R: %u.%u.%u.%u:%d\n", 
+			NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+	} else if (sock->ops->family == AF_INET6) {
+		struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr;
+		return sprintf(buf, 
+			"R: %04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d\n", 
+			NIP6(sin->sin6_addr), ntohs(sin->sin6_port));
+	}
+	return 0;
+}
+
+static DEVICE_ATTR(start, 0444, dst_show_start, NULL);
+static DEVICE_ATTR(size, 0444, dst_show_size, NULL);
+static DEVICE_ATTR(type, 0444, dst_show_type, NULL);
+
+static int dst_create_node_attributes(struct dst_node *n)
+{
+	int err;
+
+	err = device_create_file(&n->device, &dev_attr_start);
+	err = device_create_file(&n->device, &dev_attr_size);
+	err = device_create_file(&n->device, &dev_attr_type);
+	return 0;
+}
+
+static void dst_remove_node_attributes(struct dst_node *n)
+{
+	device_remove_file(&n->device, &dev_attr_start);
+	device_remove_file(&n->device, &dev_attr_size);
+	device_remove_file(&n->device, &dev_attr_type);
+}
+
+static void dst_node_sysfs_exit(struct dst_node *n)
+{
+	if (n->device.parent == &n->st->device) {
+		dst_remove_node_attributes(n);
+		device_unregister(&n->device);
+		n->device.parent = NULL;
+	}
+}
+
+static int dst_node_sysfs_init(struct dst_node *n)
+{
+	int err;
+
+	memcpy(&n->device, &dst_node_dev, sizeof(struct device));
+
+	n->device.parent = &n->st->device;
+	
+	snprintf(n->device.bus_id, sizeof(n->device.bus_id), 
+			"n-%llu", n->start);
+	err = device_register(&n->device);
+	if (err) {
+		dprintk(KERN_ERR "Failed to register node, err: %d.\n", err);
+		goto err_out_exit;
+	}
+
+	dst_create_node_attributes(n);
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+/*
+ * Gets a reference for given storage, if 
+ * storage with given name and algorithm being used 
+ * does not exist it is created.
+ */
+static struct dst_storage *dst_get_storage(char *name, char *aname, int alloc)
+{
+	struct dst_storage *st, *rst = NULL;
+	int err;
+	struct dst_alg *alg;
+
+	mutex_lock(&dst_storage_lock);
+	list_for_each_entry(st, &dst_storage_list, entry) {
+		if (!strcmp(name, st->name) && !strcmp(st->alg->name, aname)) {
+			rst = st;
+			atomic_inc(&st->refcnt);
+			break;
+		}
+	}
+	mutex_unlock(&dst_storage_lock);
+
+	if (rst || !alloc)
+		return rst;
+
+	st = kzalloc(sizeof(struct dst_storage), GFP_KERNEL);
+	if (!st)
+		return NULL;
+
+	mutex_init(&st->tree_lock);
+	/* 
+	 * One for storage itself, 
+	 * another one for attached node below.
+	 */
+	atomic_set(&st->refcnt, 2);
+	snprintf(st->name, DST_NAMELEN, "%s", name);
+	st->tree_root.rb_node = NULL;
+
+	err = dst_storage_sysfs_init(st);
+	if (err)
+		goto err_out_free;
+
+	err = dst_create_disk(st);
+	if (err)
+		goto err_out_sysfs_exit;
+
+	mutex_lock(&dst_alg_lock);
+	list_for_each_entry(alg, &dst_alg_list, entry) {
+		if (!strcmp(alg->name, aname)) {
+			atomic_inc(&alg->refcnt);
+			try_module_get(alg->ops->owner);
+			st->alg = alg;
+			break;
+		}
+	}
+	mutex_unlock(&dst_alg_lock);
+
+	if (!st->alg)
+		goto err_out_disk_remove;
+
+	mutex_lock(&dst_storage_lock);
+	list_add_tail(&st->entry, &dst_storage_list);
+	mutex_unlock(&dst_storage_lock);
+	
+	return st;
+
+err_out_disk_remove:
+	dst_remove_disk(st);
+err_out_sysfs_exit:
+	dst_storage_sysfs_init(st);
+err_out_free:
+	kfree(st);
+	return NULL;
+}
+
+/*
+ * Allows to allocate and add new algorithm by external modules.
+ */
+struct dst_alg *dst_alloc_alg(char *name, struct dst_alg_ops *ops)
+{
+	struct dst_alg *alg;
+
+	alg = kzalloc(sizeof(struct dst_alg), GFP_KERNEL);
+	if (!alg)
+		return NULL;
+	snprintf(alg->name, DST_NAMELEN, "%s", name);
+	atomic_set(&alg->refcnt, 1);
+	alg->ops = ops;
+
+	mutex_lock(&dst_alg_lock);
+	list_add_tail(&alg->entry, &dst_alg_list);
+	mutex_unlock(&dst_alg_lock);
+
+	return alg;
+}
+EXPORT_SYMBOL_GPL(dst_alloc_alg);
+
+static void dst_free_alg(struct dst_alg *alg)
+{
+	dprintk("%s: alg: %p.\n", __func__, alg);
+	kfree(alg);
+}
+
+/*
+ * Algorithm is never freed directly,
+ * since its module reference counter is increased
+ * by storage when it is created - just like network protocols.
+ */
+static inline void dst_put_alg(struct dst_alg *alg)
+{
+	dprintk("%s: alg: %p, refcnt: %d.\n", 
+			__func__, alg, atomic_read(&alg->refcnt));
+	module_put(alg->ops->owner);
+	if (atomic_dec_and_test(&alg->refcnt))
+		dst_free_alg(alg);
+}
+
+/*
+ * Removing algorithm from main list of supported algorithms.
+ */
+void dst_remove_alg(struct dst_alg *alg)
+{
+	mutex_lock(&dst_alg_lock);
+	list_del_init(&alg->entry);
+	mutex_unlock(&dst_alg_lock);
+
+	dst_put_alg(alg);
+}
+
+EXPORT_SYMBOL_GPL(dst_remove_alg);
+
+static void dst_cleanup_node(struct dst_node *n)
+{
+	dprintk("%s: node: %p.\n", __func__, n);
+	n->st->alg->ops->del_node(n);
+	if (n->cleanup)
+		n->cleanup(n);
+	dst_node_sysfs_exit(n);
+	kfree(n);
+}
+
+static void dst_free_storage(struct dst_storage *st)
+{
+	dprintk("%s: st: %p.\n", __func__, st);
+
+	BUG_ON(rb_first(&st->tree_root) != NULL);
+
+	dst_put_alg(st->alg);
+	kfree(st);
+}
+
+static inline void dst_put_storage(struct dst_storage *st)
+{
+	dprintk("%s: st: %p, refcnt: %d.\n", 
+			__func__, st, atomic_read(&st->refcnt));
+	if (atomic_dec_and_test(&st->refcnt))
+		dst_free_storage(st);
+}
+
+void dst_node_put(struct dst_node *n)
+{
+	dprintk("%s: node: %p, start: %llu, size: %llu, refcnt: %d.\n", 
+			__func__, n, n->start, n->size, 
+			atomic_read(&n->refcnt));
+	
+	if (atomic_dec_and_test(&n->refcnt)) {
+		struct dst_storage *st = n->st;
+	
+		dprintk("%s: freeing node: %p, start: %llu, size: %llu, "
+				"refcnt: %d.\n", 
+				__func__, n, n->start, n->size, 
+				atomic_read(&n->refcnt));
+
+		dst_cleanup_node(n);
+		dst_put_storage(st);
+	}
+}
+EXPORT_SYMBOL_GPL(dst_node_put);
+
+static inline int dst_compare_id(struct dst_node *old, u64 new)
+{
+	if (old->start + old->size <= new)
+		return 1;
+	if (old->start > new)
+		return -1;
+	return 0;
+}
+
+/*
+ * Tree of of the nodes, which form the storage.
+ * Tree is indexed via start of the node and its size.
+ * Comparison function above.
+ */
+struct dst_node *dst_storage_tree_search(struct dst_storage *st, u64 start)
+{
+	struct rb_node *n = st->tree_root.rb_node;
+	struct dst_node *dn;
+	int cmp;
+
+	while (n) {
+		dn = rb_entry(n, struct dst_node, tree_node);
+
+		cmp = dst_compare_id(dn, start);
+		dprintk("%s: tree: %llu-%llu, new: %llu.\n", 
+			__func__, dn->start, dn->start+dn->size, start);
+		if (cmp < 0)
+			n = n->rb_left;
+		else if (cmp > 0)
+			n = n->rb_right;
+		else {
+			atomic_inc(&dn->refcnt);
+			return dn;
+		}
+	}
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(dst_storage_tree_search);
+
+/*
+ * This function allows to remove a node with given start address
+ * from the storage.
+ */
+static struct dst_node *dst_storage_tree_del(struct dst_storage *st, u64 start)
+{
+	struct dst_node *n = dst_storage_tree_search(st, start);
+
+	if (!n)
+		return NULL;
+
+	rb_erase(&n->tree_node, &st->tree_root);
+	dst_node_put(n);
+	return n;
+}
+
+/*
+ * This function allows to add given node to the storage.
+ * Returns -EEXIST if the same area is already covered by another node.
+ * This is return must be checked for redundancy algorithms.
+ */
+static int dst_storage_tree_add(struct dst_node *new, struct dst_storage *st)
+{
+	struct rb_node **n = &st->tree_root.rb_node, *parent = NULL;
+	struct dst_node *dn;
+	int cmp;
+
+	while (*n) {
+		parent = *n;
+		dn = rb_entry(parent, struct dst_node, tree_node);
+
+		cmp = dst_compare_id(dn, new->start);
+		dprintk("%s: tree: %llu-%llu, new: %llu.\n", 
+				__func__, dn->start, dn->start+dn->size, 
+				new->start);
+		if (cmp < 0)
+			n = &parent->rb_left;
+		else if (cmp > 0)
+			n = &parent->rb_right;
+		else
+			return -EEXIST;
+	}
+
+	rb_link_node(&new->tree_node, parent, n);
+	rb_insert_color(&new->tree_node, &st->tree_root);
+
+	return 0;
+}
+
+/*
+ * This function finds devices major/minor numbers for given pathname.
+ */
+static int dst_lookup_device(const char *path, dev_t *dev)
+{
+	int err;
+	struct nameidata nd;
+	struct inode *inode;
+
+	err = path_lookup(path, LOOKUP_FOLLOW, &nd);
+	if (err)
+		return err;
+
+	inode = nd.dentry->d_inode;
+	if (!inode) {
+		err = -ENOENT;
+		goto out;
+	}
+
+	if (!S_ISBLK(inode->i_mode)) {
+		err = -ENOTBLK;
+		goto out;
+	}
+
+	*dev = inode->i_rdev;
+
+out:
+	path_release(&nd);
+	return err;
+}
+
+/*
+ * Cleanup routings for local, local exporting and remote nodes.
+ */
+static void dst_cleanup_remote(struct dst_node *n)
+{
+	if (n->state) {
+		kst_state_exit(n->state);
+		n->state = NULL;
+	}
+}
+
+static void dst_cleanup_local(struct dst_node *n)
+{
+	if (n->bdev) {
+		sync_blockdev(n->bdev);
+		blkdev_put(n->bdev);
+		n->bdev = NULL;
+	}
+}
+
+static void dst_cleanup_local_export(struct dst_node *n)
+{
+	dst_cleanup_local(n);
+	dst_cleanup_remote(n);
+}
+
+/*
+ * Setup routings for local, local exporting and remote nodes.
+ */
+static int dst_setup_local(struct dst_node *n, struct dst_ctl *ctl, 
+		struct dst_local_ctl *l)
+{
+	dev_t dev;
+	int err;
+
+	err = dst_lookup_device(l->name, &dev);
+	if (err)
+		return err;
+
+	n->bdev = open_by_devnum(dev, FMODE_READ|FMODE_WRITE);
+	if (!n->bdev)
+		return -ENODEV;
+
+	if (!n->size)
+		n->size = get_capacity(n->bdev->bd_disk);
+
+	return 0;
+}
+
+static int dst_setup_local_export(struct dst_node *n, struct dst_ctl *ctl, 
+		struct dst_local_export_ctl *le)
+{
+	int err;
+
+	err = dst_setup_local(n, ctl, &le->lctl);
+	if (err)
+		goto err_out_exit;
+
+	n->state = kst_listener_state_init(kst_main_worker, n, le);
+	if (IS_ERR(n->state)) {
+		err = PTR_ERR(n->state);
+		goto err_out_cleanup;
+	}
+
+	return 0;
+
+err_out_cleanup:
+	dst_cleanup_local(n);
+err_out_exit:
+	return err;
+}
+
+static int dst_request_remote_config(struct dst_node *n, struct socket *sock)
+{
+	struct dst_remote_request cfg;
+	struct msghdr msg;
+	struct kvec iov;
+	int err;
+
+	memset(&cfg, 0, sizeof(struct dst_remote_request));
+	cfg.cmd = cpu_to_be32(DST_REMOTE_CFG);
+
+	iov.iov_base = &cfg;
+	iov.iov_len = sizeof(struct dst_remote_request);
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_WAITALL;
+
+	err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
+	if (err <= 0) {
+		if (err == 0)
+			err = -ECONNRESET;
+		return err;
+	}
+
+	iov.iov_base = &cfg;
+	iov.iov_len = sizeof(struct dst_remote_request);
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_WAITALL;
+
+	err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
+	if (err <= 0) {
+		if (err == 0)
+			err = -ECONNRESET;
+		return err;
+	}
+
+	n->size = be64_to_cpu(cfg.sector);
+
+	return 0;
+}
+
+static int dst_setup_remote(struct dst_node *n, struct dst_ctl *ctl, 
+		struct dst_remote_ctl *r)
+{
+	int err;
+	struct socket *sock;
+
+	err = sock_create(r->addr.sa_family, r->type, r->proto, &sock);
+	if (err < 0)
+		goto err_out_exit;
+
+	sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo = 
+		msecs_to_jiffies(DST_DEFAULT_TIMEO);
+
+	err = sock->ops->connect(sock, (struct sockaddr *)&r->addr, 
+			r->addr.sa_data_len, 0);
+	if (err)
+		goto err_out_destroy;
+
+	if (!n->size) {
+		err = dst_request_remote_config(n, sock);
+		if (err)
+			goto err_out_destroy;
+	}
+
+	n->state = kst_data_state_init(kst_main_worker, n, sock);
+	if (IS_ERR(n->state)) {
+		err = PTR_ERR(n->state);
+		goto err_out_destroy;
+	}
+
+	return 0;
+
+err_out_destroy:
+	sock_release(sock);
+err_out_exit:
+	return err;
+}
+
+/*
+ * This function inserts node into storage.
+ */
+static int dst_insert_node(struct dst_node *n)
+{
+	int err;
+	struct dst_storage *st = n->st;
+	
+	err = st->alg->ops->add_node(n);
+	if (err)
+		return err;
+
+	err = dst_node_sysfs_init(n);
+	if (err)
+		goto err_out_remove_node;
+
+	mutex_lock(&st->tree_lock);
+	err = dst_storage_tree_add(n, st);
+	mutex_unlock(&st->tree_lock);
+	if (err)
+		goto err_out_sysfs_exit;
+
+	return 0;
+
+err_out_sysfs_exit:
+	dst_node_sysfs_exit(n);
+err_out_remove_node:
+	st->alg->ops->del_node(n);
+	return err;
+}
+
+static struct dst_node *dst_alloc_node(struct dst_ctl *ctl, 
+		void (*cleanup)(struct dst_node *))
+{
+	struct dst_storage *st;
+	struct dst_node *n;
+
+	st = dst_get_storage(ctl->st, ctl->alg, 1);
+	if (!st)
+		goto err_out_exit;
+
+	n = kzalloc(sizeof(struct dst_node), GFP_KERNEL);
+	if (!n)
+		goto err_out_put_storage;
+
+	n->st = st;
+	n->cleanup = cleanup;
+	n->start = ctl->start;
+	n->size = ctl->size;
+	atomic_set(&n->refcnt, 1);
+
+	return n;
+
+err_out_put_storage:
+	mutex_lock(&dst_storage_lock);
+	list_del_init(&st->entry);
+	mutex_unlock(&dst_storage_lock);
+
+	dst_put_storage(st);
+err_out_exit:
+	return NULL;
+}
+
+/*
+ * Control callback for userspace commands to setup
+ * different nodes and start/stop array.
+ */
+static int dst_add_remote(struct dst_ctl *ctl, void __user *data)
+{
+	struct dst_node *n;
+	int err;
+	struct dst_remote_ctl rctl;
+
+	if (copy_from_user(&rctl, data, sizeof(struct dst_remote_ctl)))
+		return -EFAULT;
+
+	n = dst_alloc_node(ctl, &dst_cleanup_remote);
+	if (!n)
+		return -ENOMEM;
+
+	err = dst_setup_remote(n, ctl, &rctl);
+	if (err < 0)
+		goto err_out_free;
+
+	err = dst_insert_node(n);
+	if (err)
+		goto err_out_free;
+
+	return 0;
+
+err_out_free:
+	dst_node_put(n);
+	return err;
+}
+
+static int dst_add_local_export(struct dst_ctl *ctl, void __user *data)
+{
+	struct dst_node *n;
+	int err;
+	struct dst_local_export_ctl le;
+
+	if (copy_from_user(&le, data, sizeof(struct dst_local_export_ctl)))
+		return -EFAULT;
+
+	n = dst_alloc_node(ctl, &dst_cleanup_local_export);
+	if (!n)
+		return -EINVAL;
+
+	err = dst_setup_local_export(n, ctl, &le);
+	if (err < 0)
+		goto err_out_free;
+
+	err = dst_insert_node(n);
+	if (err)
+		goto err_out_free;
+
+
+	return 0;
+
+err_out_free:
+	dst_node_put(n);
+	return err;
+}
+
+static int dst_add_local(struct dst_ctl *ctl, void __user *data)
+{
+	struct dst_node *n;
+	int err;
+	struct dst_local_ctl lctl;
+
+	if (copy_from_user(&lctl, data, sizeof(struct dst_local_ctl)))
+		return -EFAULT;
+
+	n = dst_alloc_node(ctl, &dst_cleanup_local);
+	if (!n)
+		return -EINVAL;
+
+	err = dst_setup_local(n, ctl, &lctl);
+	if (err < 0)
+		goto err_out_free;
+
+	err = dst_insert_node(n);
+	if (err)
+		goto err_out_free;
+
+	return 0;
+
+err_out_free:
+	dst_node_put(n);
+	return err;
+}
+
+static int dst_del_node(struct dst_ctl *ctl, void __user *data)
+{
+	struct dst_node *n;
+	struct dst_storage *st;
+	int err = -ENODEV;
+
+	st = dst_get_storage(ctl->st, ctl->alg, 0);
+	if (!st)
+		goto err_out_exit;
+
+	mutex_lock(&st->tree_lock);
+	n = dst_storage_tree_del(st, ctl->start);
+	mutex_unlock(&st->tree_lock);
+	if (!n)
+		goto err_out_put;
+
+	dst_node_put(n);
+	dst_put_storage(st);
+
+	return 0;
+
+err_out_put:
+	dst_put_storage(st);
+err_out_exit:
+	return err;
+}
+
+static int dst_start_storage(struct dst_ctl *ctl, void __user *data)
+{
+	struct dst_storage *st;
+
+	st = dst_get_storage(ctl->st, ctl->alg, 0);
+	if (!st)
+		return -ENODEV;
+
+	mutex_lock(&st->tree_lock);
+	if (!(st->flags & DST_ST_STARTED)) {
+		set_capacity(st->disk, st->disk_size);
+		add_disk(st->disk);
+		st->flags |= DST_ST_STARTED;
+		dprintk("%s: STARTED st: %p, disk_size: %llu.\n", 
+				__func__, st, st->disk_size);
+	}
+	mutex_unlock(&st->tree_lock);
+
+	dst_put_storage(st);
+
+	return 0;
+}
+
+static int dst_stop_storage(struct dst_ctl *ctl, void __user *data)
+{
+	struct dst_storage *st;
+
+	st = dst_get_storage(ctl->st, ctl->alg, 0);
+	if (!st)
+		return -ENODEV;
+
+	dprintk("%s: STOPPED storage: %s.\n", __func__, st->name);
+
+	dst_storage_sysfs_exit(st);
+
+	mutex_lock(&dst_storage_lock);
+	list_del_init(&st->entry);
+	mutex_unlock(&dst_storage_lock);
+
+	if (st->flags & DST_ST_STARTED)
+		dst_remove_disk(st);
+
+	dst_remove_all_nodes(st);
+	dst_put_storage(st); /* One reference got above */
+	dst_put_storage(st); /* Another reference set during initialization */
+
+	return 0;
+}
+
+typedef int (*dst_command_func)(struct dst_ctl *ctl, void __user *data);
+
+/*
+ * List of userspace commands.
+ */
+static dst_command_func dst_commands[] = {
+	[DST_ADD_REMOTE] = &dst_add_remote,
+	[DST_ADD_LOCAL] = &dst_add_local,
+	[DST_ADD_LOCAL_EXPORT] = &dst_add_local_export,
+	[DST_DEL_NODE] = &dst_del_node,
+	[DST_START_STORAGE] = &dst_start_storage,
+	[DST_STOP_STORAGE] = &dst_stop_storage,
+};
+
+/*
+ * Move to connector for configuration is in TODO list.
+ */
+static int dst_ioctl(struct inode *inode, struct file *file, 
+		unsigned int command, unsigned long data)
+{
+	struct dst_ctl ctl;
+	unsigned int cmd = _IOC_NR(command);
+
+	if (!capable(CAP_SYS_ADMIN))
+		return -EACCES;
+	
+	if (_IOC_TYPE(command) != DST_IOCTL)
+		return -ENOTTY;
+
+	if (cmd >= DST_CMD_MAX)
+		return -EINVAL;
+
+	if (copy_from_user(&ctl, (void __user *)data, sizeof(struct dst_ctl)))
+		return -EFAULT;
+
+	data += sizeof(struct dst_ctl);
+
+	return dst_commands[cmd](&ctl, (void __user *)data);
+}
+
+static const struct file_operations dst_fops = {
+	.ioctl	 = dst_ioctl,
+	.owner	 = THIS_MODULE,
+};
+
+static struct miscdevice dst_misc = {
+	.minor 		= MISC_DYNAMIC_MINOR,
+	.name  		= DST_NAME,
+	.fops  		= &dst_fops
+};
+
+static int dst_sysfs_init(void)
+{
+	return bus_register(&dst_dev_bus_type);
+}
+
+static void dst_sysfs_exit(void)
+{
+	bus_unregister(&dst_dev_bus_type);
+}
+
+static int __devinit dst_sys_init(void)
+{
+	int err;
+	
+	dst_request_cache = kmem_cache_create("dst", sizeof(struct dst_request),
+				       0, 0, NULL, NULL);
+	if (!dst_request_cache)
+		return -ENOMEM;
+
+	err = register_blkdev(dst_major, DST_NAME);
+	if (err < 0)
+		goto err_out_destroy;
+	if (err)
+		dst_major = err;
+
+	err = dst_sysfs_init();
+	if (err)
+		goto err_out_unregister;
+
+	kst_main_worker = kst_worker_init(0);
+	if (IS_ERR(kst_main_worker)) {
+		err = PTR_ERR(kst_main_worker);
+		goto err_out_sysfs_exit;
+	}
+
+	err = misc_register(&dst_misc);
+	if (err)
+		goto err_out_worker_exit;
+
+	return 0;
+
+err_out_worker_exit:
+	kst_worker_exit(kst_main_worker);
+err_out_sysfs_exit:
+	dst_sysfs_exit();
+err_out_unregister:
+	unregister_blkdev(dst_major, DST_NAME);
+err_out_destroy:
+	kmem_cache_destroy(dst_request_cache);
+	return err;
+}
+
+static void __devexit dst_sys_exit(void)
+{
+	misc_deregister(&dst_misc);
+	dst_sysfs_exit();
+	unregister_blkdev(dst_major, DST_NAME);
+	kst_exit_all();
+	kmem_cache_destroy(dst_request_cache);
+}
+
+module_init(dst_sys_init);
+module_exit(dst_sys_exit);
+
+MODULE_DESCRIPTION("Distributed storage");
+MODULE_AUTHOR("Evgeniy Polyakov <johnpol@2ka.mipt.ru>");
+MODULE_LICENSE("GPL");
diff --git a/drivers/block/dst/kst.c b/drivers/block/dst/kst.c
new file mode 100644
index 0000000..7193d4c
--- /dev/null
+++ b/drivers/block/dst/kst.c
@@ -0,0 +1,1437 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/socket.h>
+#include <linux/kthread.h>
+#include <linux/net.h>
+#include <linux/in.h>
+#include <linux/poll.h>
+#include <linux/bio.h>
+#include <linux/dst.h>
+
+#include <net/sock.h>
+
+struct kst_poll_helper
+{
+	poll_table 		pt;
+	struct kst_state	*st;
+};
+
+static LIST_HEAD(kst_worker_list);
+static DEFINE_MUTEX(kst_worker_mutex);
+
+/*
+ * This function creates bound socket for local export node.
+ */
+static int kst_sock_create(struct kst_state *st, struct saddr *addr, 
+		int type, int proto, int backlog)
+{
+	int err;
+
+	err = sock_create(addr->sa_family, type, proto, &st->socket);
+	if (err)
+		goto err_out_exit;
+
+	err = st->socket->ops->bind(st->socket, (struct sockaddr *)addr, 
+			addr->sa_data_len);
+
+	err = st->socket->ops->listen(st->socket, backlog);
+	if (err)
+		goto err_out_release;
+
+	st->socket->sk->sk_allocation = GFP_NOIO;
+
+	return 0;
+
+err_out_release:
+	sock_release(st->socket);
+err_out_exit:
+	return err;
+}
+
+static void kst_sock_release(struct kst_state *st)
+{
+	if (st->socket) {
+		sock_release(st->socket);
+		st->socket = NULL;
+	}
+}
+
+static void kst_wake(struct kst_state *st)
+{
+	struct kst_worker *w = st->w;
+	unsigned long flags;
+
+	spin_lock_irqsave(&w->ready_lock, flags);
+	if (list_empty(&st->ready_entry))
+		list_add_tail(&st->ready_entry, &w->ready_list);
+	spin_unlock_irqrestore(&w->ready_lock, flags);
+
+	wake_up(&w->wait);
+}
+
+/*
+ * Polling machinery.
+ */
+static int kst_state_wake_callback(wait_queue_t *wait, unsigned mode, 
+		int sync, void *key)
+{
+	struct kst_state *st = container_of(wait, struct kst_state, wait);
+	kst_wake(st);
+	return 1;
+}
+
+static void kst_queue_func(struct file *file, wait_queue_head_t *whead,
+				 poll_table *pt)
+{
+	struct kst_state *st = container_of(pt, struct kst_poll_helper, pt)->st;
+
+	st->whead = whead;
+	init_waitqueue_func_entry(&st->wait, kst_state_wake_callback);
+	add_wait_queue(whead, &st->wait);
+}
+
+static void kst_poll_exit(struct kst_state *st)
+{
+	if (st->whead) {
+		remove_wait_queue(st->whead, &st->wait);
+		st->whead = NULL;
+	}
+}
+
+/*
+ * This function removes request from state tree and ordering list.
+ */
+static void kst_del_req(struct dst_request *req)
+{
+	struct kst_state *st = req->state;
+
+	rb_erase(&req->request_entry, &st->request_root);
+	RB_CLEAR_NODE(&req->request_entry);
+	list_del_init(&req->request_list_entry);
+}
+
+static struct dst_request *kst_req_first(struct kst_state *st)
+{
+	struct dst_request *req = NULL;
+	
+	if (!list_empty(&st->request_list))
+		req = list_entry(st->request_list.next, struct dst_request, 
+				request_list_entry);
+	return req;
+}
+
+/*
+ * This function dequeues first request from the queue and tree.
+ */
+static struct dst_request *kst_dequeue_req(struct kst_state *st)
+{
+	struct dst_request *req;
+
+	mutex_lock(&st->request_lock);
+	req = kst_req_first(st);
+	if (req)
+		kst_del_req(req);
+	mutex_unlock(&st->request_lock);
+	return req;
+}
+
+static inline int dst_compare_request_id(struct dst_request *old, 
+		struct dst_request *new)
+{
+	int cmd = 0;
+
+	if (old->start + to_sector(old->orig_size) <= new->start)
+		cmd = 1;
+	if (old->start >= new->start + to_sector(new->orig_size))
+		cmd = -1;
+
+	dprintk("%s: old: op: %lu, start: %llu, size: %llu, off: %u, "
+		"new: op: %lu, start: %llu, size: %llu, off: %u, cmp: %d.\n",
+		__func__, bio_rw(old->bio), old->start, old->orig_size, 
+		old->offset,
+		bio_rw(new->bio), new->start, new->orig_size, 
+		new->offset, cmd);
+
+	return cmd;
+}
+
+/*
+ * This function enqueues request into tree, indexed by start of the request,
+ * and also puts request into ordered queue.
+ */
+static int kst_enqueue_req(struct kst_state *st, struct dst_request *req)
+{
+	struct rb_node **n = &st->request_root.rb_node, *parent = NULL;
+	struct dst_request *old = NULL;
+	int cmp;
+
+	while (*n) {
+		parent = *n;
+		old = rb_entry(parent, struct dst_request, request_entry);
+
+		cmp = dst_compare_request_id(old, req);
+		if (cmp < 0)
+			n = &parent->rb_left;
+		else if (cmp > 0)
+			n = &parent->rb_right;
+		else
+			return -EEXIST;
+	}
+
+	rb_link_node(&req->request_entry, parent, n);
+	rb_insert_color(&req->request_entry, &st->request_root);
+
+	if (req->size != req->orig_size)
+		list_add(&req->request_list_entry, &st->request_list);
+	else
+		list_add_tail(&req->request_list_entry, &st->request_list);
+	return 0;
+}
+
+/*
+ * BIOs for local exporting node are freed via this function.
+ */
+static void kst_export_put_bio(struct bio *bio)
+{
+	int i;
+	struct bio_vec *bv;
+
+	dprintk("%s: bio: %p, size: %u, idx: %d, num: %d.\n", 
+			__func__, bio, bio->bi_size, bio->bi_idx, 
+			bio->bi_vcnt);
+
+	bio_for_each_segment(bv, bio, i)
+		__free_page(bv->bv_page);
+	bio_put(bio);
+}
+
+/*
+ * This is a generic request completion function.
+ * If it is local export node, state machine is different,
+ * see details below.
+ */
+static void kst_complete_req(struct dst_request *req, int err)
+{
+	if (err)
+		printk("%s: freeing bio: %p, req: %p, size: %llu, "
+			"orig_size: %llu, bi_size: %u, err: %d, flags: %u.\n", 
+			__func__, req->bio, req, req->size, req->orig_size, 
+			req->bio->bi_size, err, req->flags);
+
+	if (req->flags & DST_REQ_EXPORT) {
+		if (req->flags & DST_REQ_EXPORT_WRITE) {
+			req->bio->bi_rw = WRITE;
+			generic_make_request(req->bio);
+		} else
+			kst_export_put_bio(req->bio);
+	} else {
+		bio_endio(req->bio, req->orig_size, (err)?-EIO:0);
+	}
+	dprintk("%s: free req: %p, pool: %p.\n", 
+			__func__, req, req->state->w->req_pool);
+	mempool_free(req, req->state->w->req_pool);
+}
+
+static void kst_flush_requests(struct kst_state *st)
+{
+	struct dst_request *req;
+
+	while ((req = kst_dequeue_req(st)) != NULL)
+		kst_complete_req(req, -EIO);
+}
+
+static int kst_poll_init(struct kst_state *st)
+{
+	struct kst_poll_helper ph;
+
+	ph.st = st;
+	init_poll_funcptr(&ph.pt, &kst_queue_func);
+
+	st->socket->ops->poll(NULL, st->socket, &ph.pt);
+	return 0;
+}
+
+/*
+ * Main state creation function.
+ * It creates new state according to given operations
+ * and links it into worker structure and node.
+ */
+struct kst_state *kst_state_init(struct kst_worker *w, struct dst_node *node, 
+		struct kst_state_ops *ops, void *data)
+{
+	struct kst_state *st;
+	int err;
+
+	st = kzalloc(sizeof(struct kst_state), GFP_KERNEL);
+	if (!st)
+		return ERR_PTR(-ENOMEM);
+
+	st->node = node;
+	st->ops = ops;
+	st->w = w;
+	INIT_LIST_HEAD(&st->ready_entry);
+	INIT_LIST_HEAD(&st->entry);
+	st->request_root.rb_node = NULL;
+	INIT_LIST_HEAD(&st->request_list);
+	mutex_init(&st->request_lock);
+
+	err = st->ops->init(st, data);
+	if (err)
+		goto err_out_free;
+	mutex_lock(&w->state_mutex);
+	list_add_tail(&st->entry, &w->state_list);
+	mutex_unlock(&w->state_mutex);
+
+	kst_wake(st);
+
+	return st;
+
+err_out_free:
+	kfree(st);
+	return ERR_PTR(err);
+}
+
+/*
+ * This function is called when node is removed,
+ * or when state is destroyed for connected to local exporting
+ * node client.
+ */
+void kst_state_exit(struct kst_state *st)
+{
+	struct kst_worker *w = st->w;
+
+	dprintk("%s: st: %p.\n", __func__, st);
+
+	mutex_lock(&w->state_mutex);
+	list_del_init(&st->entry);
+	mutex_unlock(&w->state_mutex);
+
+	st->ops->exit(st);
+	kfree(st);
+}
+
+/*
+ * This is main state processing function.
+ * It tries to complete request and invoke appropriate
+ * callbacks in case of errors or successfull operation finish.
+ */
+static int kst_thread_process_state(struct kst_state *st)
+{
+	int err, empty;
+	unsigned int revents;
+	struct dst_request *req, *tmp;
+
+	mutex_lock(&st->request_lock);
+	if (st->ops->ready) {
+		err = st->ops->ready(st);
+		if (err) {
+			mutex_unlock(&st->request_lock);
+			if (err < 0)
+				kst_state_exit(st);
+			return err;
+		}
+	}
+
+	err = 0;
+	empty = 1;
+	req = NULL;
+	list_for_each_entry_safe(req, tmp, &st->request_list, 
+			request_list_entry) {
+		empty = 0;
+		revents = st->socket->ops->poll(st->socket->file, 
+				st->socket, NULL);
+		dprintk("\n%s: st: %p, revents: %x.\n", __func__, st, revents);
+		if (!revents)
+			break;
+		err = req->callback(req, revents);
+		dprintk("%s: callback returned, st: %p, err: %d.\n", 
+				__func__, st, err);
+		if (err)
+			break;
+	}
+	mutex_unlock(&st->request_lock);
+
+	dprintk("%s: req: %p, err: %d.\n", __func__, req, err);
+	if (err < 0) {
+		err = st->node->st->alg->ops->error(st, err);
+		if (err && (st != st->node->state)) {
+			dprintk("%s: err: %d, st: %p, node->state: %p.\n", 
+					__func__, err, st, st->node->state);
+			/*
+			 * Accepted client has state not related to storage 
+			 * node, so it must be freed explicitely.
+			 */
+
+			kst_state_exit(st);
+			return err;
+		}
+
+		kst_wake(st);
+	}
+
+	if (list_empty(&st->request_list) && !empty)
+		kst_wake(st);
+
+	return err;
+}
+
+/*
+ * Main worker thread - one per storage.
+ */
+static int kst_thread_func(void *data)
+{
+	struct kst_worker *w = data;
+	struct kst_state *st;
+	unsigned long flags;
+	int err = 0;
+
+	while (!kthread_should_stop()) {
+		wait_event_interruptible_timeout(w->wait, 
+				!list_empty(&w->ready_list) || 
+				kthread_should_stop(), 
+				HZ);
+
+		st = NULL;
+		spin_lock_irqsave(&w->ready_lock, flags);
+		if (!list_empty(&w->ready_list)) {
+			st = list_entry(w->ready_list.next, struct kst_state, 
+					ready_entry);
+			list_del_init(&st->ready_entry);
+		}
+		spin_unlock_irqrestore(&w->ready_lock, flags);
+
+		if (!st)
+			continue;
+
+		err = kst_thread_process_state(st);
+	}
+
+	return err;
+}
+
+/*
+ * Worker initialization - this object will host andprocess all states, 
+ * which in turn host requests for remote targets.
+ */
+struct kst_worker *kst_worker_init(int id)
+{
+	struct kst_worker *w;
+	int err;
+
+	w = kzalloc(sizeof(struct kst_worker), GFP_KERNEL);
+	if (!w)
+		return ERR_PTR(-ENOMEM);
+
+	w->id = id;
+	init_waitqueue_head(&w->wait);
+	spin_lock_init(&w->ready_lock);
+	mutex_init(&w->state_mutex);
+
+	INIT_LIST_HEAD(&w->ready_list);
+	INIT_LIST_HEAD(&w->state_list);
+
+	w->req_pool = mempool_create_slab_pool(256, dst_request_cache);
+	if (!w->req_pool) {
+		err = -ENOMEM;
+		goto err_out_free;
+	}
+
+	w->thread = kthread_run(&kst_thread_func, w, "kst%d", w->id);
+	if (IS_ERR(w->thread)) {
+		err = PTR_ERR(w->thread);
+		goto err_out_destroy;
+	}
+
+	mutex_lock(&kst_worker_mutex);
+	list_add_tail(&w->entry, &kst_worker_list);
+	mutex_unlock(&kst_worker_mutex);
+
+	return w;
+
+err_out_destroy:
+	mempool_destroy(w->req_pool);
+err_out_free:
+	kfree(w);
+	return ERR_PTR(err);
+}
+
+void kst_worker_exit(struct kst_worker *w)
+{
+	struct kst_state *st, *n;
+
+	mutex_lock(&kst_worker_mutex);
+	list_del(&w->entry);
+	mutex_unlock(&kst_worker_mutex);
+
+	kthread_stop(w->thread);
+
+	list_for_each_entry_safe(st, n, &w->state_list, entry) {
+		kst_state_exit(st);
+	}
+
+	mempool_destroy(w->req_pool);
+	kfree(w);
+}
+
+/*
+ * Common state exit callback.
+ * Removes itself from worker's list of states,
+ * releases socket and flushes all requests.
+ */
+static void kst_common_exit(struct kst_state *st)
+{
+	unsigned long flags;
+
+	dprintk("%s: st: %p.\n", __func__, st);
+	kst_poll_exit(st);
+
+	spin_lock_irqsave(&st->w->ready_lock, flags);
+	list_del_init(&st->ready_entry);
+	spin_unlock_irqrestore(&st->w->ready_lock, flags);
+
+	kst_sock_release(st);
+	kst_flush_requests(st);
+}
+
+/*
+ * Header sending function - may block.
+ */
+static int kst_data_send_header(struct kst_state *st, 
+		struct dst_remote_request *r)
+{
+	struct msghdr msg;
+	struct kvec iov;
+
+	iov.iov_base = r;
+	iov.iov_len = sizeof(struct dst_remote_request);
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_WAITALL | MSG_NOSIGNAL;
+
+	return kernel_sendmsg(st->socket, &msg, &iov, 1, iov.iov_len);
+}
+
+/*
+ * BIO vector receiving function - does not block, but may sleep because
+ * of scheduling policy.
+ */
+static int kst_data_recv_bio_vec(struct kst_state *st, struct bio_vec *bv, 
+		unsigned int offset, unsigned int size)
+{
+	struct msghdr msg;
+	struct kvec iov;
+	void *kaddr;
+	int err;
+
+	kaddr = kmap(bv->bv_page);
+
+	iov.iov_base = kaddr + bv->bv_offset + offset;
+	iov.iov_len = size;
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+
+	err = kernel_recvmsg(st->socket, &msg, &iov, 1, iov.iov_len, 
+			msg.msg_flags);
+	kunmap(bv->bv_page);
+
+	return err;
+}
+
+/*
+ * BIO vector sending function - does not block, but may sleep because
+ * of scheduling policy.
+ */
+static int kst_data_send_bio_vec(struct kst_state *st, struct bio_vec *bv, 
+		unsigned int offset, unsigned int size)
+{
+	return kernel_sendpage(st->socket, bv->bv_page, 
+			bv->bv_offset + offset, size, 
+			MSG_DONTWAIT | MSG_NOSIGNAL);
+}
+
+typedef int (*kst_data_process_bio_vec_t)(struct kst_state *st, 
+		struct bio_vec *bv, unsigned int offset, unsigned int size);
+
+/*
+ * @req: processing request. 
+ * Contains BIO and all related to its processing info.
+ *
+ * This function sends or receives requested number of pages from given BIO.
+ *
+ * In case of errors negative return value is returned and @size, 
+ * @index and @off are set to the:
+ * - number of bytes not yet processed (i.e. the rest of the bytes to be 
+ *   processed).
+ * - index of the last bio_vec started to be processed (header sent).
+ * - offset of the first byte to be processed in the bio_vec.
+ *
+ * If there are no errors, zero is returned.
+ * -EAGAIN is not an error and is transformed into zero return value,
+ * called must check if @size is zero, in that case whole BIO is processed
+ * and thus bio_endio() can be called, othervise new request must be allocated
+ * to be processed later.
+ */
+static int kst_data_process_bio(struct dst_request *req)
+{
+	int err = -ENOSPC, partial = (req->size != req->orig_size);
+	struct dst_remote_request r;
+	kst_data_process_bio_vec_t func;
+	unsigned int cur_size;
+
+	r.flags = cpu_to_be32(((unsigned long)req->bio) & 0xffffffff);
+
+	if (bio_rw(req->bio) == WRITE) {
+		r.cmd = cpu_to_be32(DST_WRITE);
+		func = kst_data_send_bio_vec;
+	} else {
+		r.cmd = cpu_to_be32(DST_READ);
+		func = kst_data_recv_bio_vec;
+	}
+
+	dprintk("%s: start: [%c], start: %llu, idx: %d, num: %d, "
+			"size: %llu, offset: %u.\n", 
+			__func__, (bio_rw(req->bio) == WRITE)?'W':'R',
+			req->start, req->idx, req->num, req->size, req->offset);
+
+	while (req->idx < req->num) {
+		struct bio_vec *bv = bio_iovec_idx(req->bio, req->idx);
+
+		cur_size = min_t(u64, bv->bv_len - req->offset, req->size);
+
+		BUG_ON(cur_size == 0);
+
+		if (!(req->flags & DST_REQ_HEADER_SENT)) {
+			r.sector = cpu_to_be64(req->start);
+			r.offset = cpu_to_be32(bv->bv_offset + req->offset);
+			r.size = cpu_to_be32(cur_size);
+
+			err = kst_data_send_header(req->state, &r);
+			if (err != sizeof(struct dst_remote_request)) {
+				dprintk("%s: %d/%d: header: start: %llu, "
+					"bv_offset: %u, bv_len: %u, "
+					"a offset: %u, offset: %u, "
+					"cur_size: %u, err: %d.\n", 
+					__func__, req->idx, req->num, 
+					req->start, bv->bv_offset, bv->bv_len, 
+					bv->bv_offset + req->offset, 
+					req->offset, cur_size, err);
+				if (err >= 0)
+					err = -EINVAL;
+				break;
+			}
+
+			req->flags |= DST_REQ_HEADER_SENT;
+		}
+
+		err = func(req->state, bv, req->offset, cur_size);
+		if (err <= 0)
+			break;
+
+		req->offset += err;
+		req->size -= err;
+		req->start += to_sector(err);
+
+		if (req->offset != bv->bv_len) {
+			dprintk("%s: %d/%d: this: start: %llu, bv_offset: %u, "
+				"bv_len: %u, a offset: %u, offset: %u, "
+				"cur_size: %u, err: %d.\n", 
+				__func__, req->idx, req->num, req->start,
+				bv->bv_offset, bv->bv_len, 
+				bv->bv_offset + req->offset,
+				req->offset, cur_size, err);
+			err = -EAGAIN;
+			break;
+		}
+		req->offset = 0;
+		req->idx++;
+		req->flags &= ~DST_REQ_HEADER_SENT;
+	}
+
+	if (err <= 0 && err != -EAGAIN) {
+		if (err == 0)
+			err = -ECONNRESET;
+	} else
+		err = 0;
+
+	if (req->size) {
+		req->state->flags |= KST_FLAG_PARTIAL;
+	} else if (partial) {
+		req->state->flags &= ~KST_FLAG_PARTIAL;
+	}
+
+	if (err < 0 || (req->idx == req->num && req->size)) {
+		dprintk("%s: return: idx: %d, num: %d, offset: %u, "
+				"size: %llu, err: %d.\n", 
+			__func__, req->idx, req->num, req->offset, 
+			req->size, err);
+	}
+	dprintk("%s: end: start: %llu, idx: %d, num: %d, "
+			"size: %llu, offset: %u.\n", 
+		__func__, req->start, req->idx, req->num, 
+		req->size, req->offset);
+
+
+	return err;
+}
+
+/*
+ * This callback is invoked by worker thread to process given request.
+ */
+static int kst_data_callback(struct dst_request *req, unsigned int revents)
+{
+	int err;
+	
+	dprintk("%s: req: %p, num: %d, idx: %d, bio: %p, "
+			"revents: %x, flags: %x.\n", 
+			__func__, req, req->num, req->idx, req->bio, 
+			revents, req->flags);
+
+	if (req->flags & DST_REQ_EXPORT_READ)
+		return 1;
+
+	err = kst_data_process_bio(req);
+	if (err < 0)
+		goto err_out;
+
+	if (!req->size) {
+		dprintk("%s: complete: req: %p, bio: %p.\n", 
+				__func__, req, req->bio);
+		kst_del_req(req);
+		kst_complete_req(req, 0);
+		return 0;
+	}
+
+	if (revents & (POLLERR | POLLHUP | POLLRDHUP)) {
+		err = -EPIPE;
+		goto err_out;
+	}
+
+	return 1;
+
+err_out:
+	return err;
+}
+
+#define KST_CONG_COMPLETED		(0)
+#define KST_CONG_NOT_FOUND		(1)
+#define KST_CONG_QUEUE			(-1)
+
+/*
+ * kst_congestion - checks for data congestion, i.e. the case, when given 
+ * 	block request crosses an area of the another block request which
+ * 	is not yet sent to the remote node.
+ *
+ * @req: dst request containing block io related information.
+ *
+ * Return value:
+ * %KST_CONG_COMPLETED  - congestion was found and processed, 
+ * 	bio must be ended, request is completed.
+ * %KST_CONG_NOT_FOUND  - no congestion found, 
+ * 	request must be processed as usual
+ * %KST_CONG_QUEUE - congestion has been found, but bio is not completed, 
+ * 	new request must be allocated and processed.
+ */
+static int kst_congestion(struct dst_request *req)
+{
+	int cmp, i;
+	struct kst_state *st = req->state;
+	struct rb_node *n = st->request_root.rb_node;
+	struct dst_request *old = NULL, *dst_req, *src_req;
+
+	while (n) {
+		src_req = rb_entry(n, struct dst_request, request_entry);
+		cmp = dst_compare_request_id(src_req, req);
+
+		if (cmp < 0)
+			n = n->rb_left;
+		else if (cmp > 0)
+			n = n->rb_right;
+		else {
+			old = src_req;
+			break;
+		}
+	}
+
+	if (likely(!old))
+		return KST_CONG_NOT_FOUND;
+	
+	dprintk("%s: old: op: %lu, start: %llu, size: %llu, off: %u, "
+			"new: op: %lu, start: %llu, size: %llu, off: %u.\n",
+		__func__, bio_rw(old->bio), old->start, old->orig_size, 
+		old->offset,
+		bio_rw(req->bio), req->start, req->orig_size, req->offset);
+
+	if ((bio_rw(old->bio) != WRITE) && (bio_rw(req->bio) != WRITE)) {
+		return KST_CONG_QUEUE;
+	}
+
+	if (unlikely(req->offset != old->offset))
+		return KST_CONG_QUEUE;
+
+	src_req = old;
+	dst_req = req;
+	if (bio_rw(req->bio) == WRITE) {
+		dst_req = old;
+		src_req = req;
+	}
+
+	/* Actually we could partially complete new request by copying
+	 * part of the first one, but not now, consider this as a 
+	 * (low-priority) todo item.
+	 */
+	if (src_req->start + src_req->orig_size < 
+			dst_req->start + dst_req->orig_size)
+		return KST_CONG_QUEUE;
+
+	/*
+	 * So, only process if new request is differnt from old one,
+	 * or subsequent write, i.e.:
+	 * - not completed write and request to read
+	 * - not completed read and request to write
+	 * - not completed write and request to (over)write
+	 */
+	for (i=old->idx; i<old->num; ++i) {
+		struct bio_vec *bv_src, *bv_dst;
+		void *src, *dst;
+		u64 len;
+
+		bv_src = bio_iovec_idx(src_req->bio, i);
+		bv_dst = bio_iovec_idx(dst_req->bio, i);
+
+		if (unlikely(bv_dst->bv_offset != bv_src->bv_offset))
+			return KST_CONG_QUEUE;
+		
+		if (unlikely(bv_dst->bv_len != bv_src->bv_len))
+			return KST_CONG_QUEUE;
+
+		src = kmap_atomic(bv_src->bv_page, KM_USER0);
+		dst = kmap_atomic(bv_dst->bv_page, KM_USER1);
+
+		len = min_t(u64, bv_dst->bv_len, dst_req->size);
+
+		memcpy(dst + bv_dst->bv_offset, src + bv_src->bv_offset, len);
+
+		kunmap_atomic(src, KM_USER0);
+		kunmap_atomic(dst, KM_USER1);
+
+		dst_req->idx++;
+		dst_req->size -= len;
+		dst_req->offset = 0;
+		dst_req->start += to_sector(len);
+
+		if (!dst_req->size)
+			break;
+	}
+
+	if (req == dst_req)
+		return KST_CONG_COMPLETED;
+
+	kst_del_req(dst_req);
+	kst_complete_req(dst_req, 0);
+
+	return KST_CONG_NOT_FOUND;
+}
+
+static struct dst_request *dst_clone_request(struct dst_request *req)
+{
+	struct dst_request *new_req;
+
+	new_req = mempool_alloc(req->state->w->req_pool, GFP_NOIO);
+	if (!new_req)
+		return NULL;
+
+	dprintk("%s: req: %p, new_req: %p, bio: %p.\n", 
+			__func__, req, new_req, req->bio);
+
+	RB_CLEAR_NODE(&new_req->request_entry);
+
+	new_req->bio = req->bio;
+	new_req->state = req->state;
+	new_req->idx = req->idx;
+	new_req->num = req->num;
+	new_req->size = req->size;
+	new_req->orig_size = req->orig_size;
+	new_req->offset = req->offset;
+	new_req->start = req->start;
+	new_req->flags = req->flags;
+
+	return new_req;
+}
+
+/*
+ * This is main data processing function, eventually invoked from block layer.
+ * It tries to complte request, but if it is about to block, it allocates
+ * new request and queues it to main worker to be processed when events allow.
+ */
+static int kst_data_push(struct dst_request *req)
+{
+	struct kst_state *st = req->state;
+	struct dst_request *new_req;
+	unsigned int revents;
+	int err, locked = 0;
+
+	dprintk("%s: start: %llu, size: %llu, bio: %p.\n", 
+			__func__, req->start, req->size, req->bio);
+
+	if (mutex_trylock(&st->request_lock)) {
+		locked = 1;
+
+		if (st->flags & (KST_FLAG_PARTIAL | DST_REQ_ALWAYS_QUEUE))
+			goto alloc_new_req;
+
+		err = kst_congestion(req);
+		if (err == KST_CONG_COMPLETED)
+			goto out_bio_endio;
+
+		if (err == KST_CONG_NOT_FOUND) {
+			revents = st->socket->ops->poll(NULL, st->socket, NULL);
+			dprintk("%s: st: %p, bio: %p, revents: %x.\n", 
+					__func__, st, req->bio, revents);
+			if (revents & POLLOUT) {
+				err = kst_data_process_bio(req);
+				if (err < 0)
+					goto out_unlock;
+
+				if (!req->size) {
+					err = 0;
+					goto out_bio_endio;
+				}
+			}
+		}
+	}
+
+alloc_new_req:
+	err = -ENOMEM;
+	new_req = dst_clone_request(req);
+	if (!new_req)
+		goto out_unlock;
+
+	new_req->callback = &kst_data_callback;
+
+	if (!locked)
+		mutex_lock(&st->request_lock);
+	locked = 1;
+
+	err = kst_enqueue_req(st, new_req);
+	mutex_unlock(&st->request_lock);
+	if (err) {
+		printk("%s: free req: %p, pool: %p.\n", 
+				__func__, new_req, st->w->req_pool);
+		printk("%s: free [%c], start: %llu, idx: %d, "
+				"num: %d, size: %llu, offset: %u, err: %d.\n", 
+			__func__, (bio_rw(req->bio) == WRITE)?'W':'R',
+			req->start, req->idx, req->num, req->size, 
+			req->offset, err);
+		mempool_free(new_req, st->w->req_pool);
+		goto err_out;
+	}
+
+	kst_wake(st);
+
+	return 0;
+
+out_bio_endio:
+	if (err)
+		printk("%s: freeing bio: %p, bi_size: %u, orig_size: %llu.\n", 
+			__func__, req->bio, req->bio->bi_size, req->orig_size);
+	bio_endio(req->bio, req->orig_size, err);
+out_unlock:
+	mutex_unlock(&st->request_lock);
+	locked = 0;
+err_out:
+	if (err) {
+		err = st->node->st->alg->ops->error(st, err);
+		if (!err)
+			goto alloc_new_req;
+	}
+
+	if (err)
+		printk("%s: [%c], start: %llu, idx: %d, num: %d, "
+				"size: %llu, offset: %u, err: %d.\n", 
+			__func__, (bio_rw(req->bio) == WRITE)?'W':'R',
+			req->start, req->idx, req->num, req->size, 
+			req->offset, err);
+	kst_wake(st);
+	return err;
+}
+
+/* 
+ * Remote node initialization callback.
+ */
+static int kst_data_init(struct kst_state *st, void *data)
+{
+	int err;
+
+	st->socket = data;
+	st->socket->sk->sk_allocation = GFP_NOIO;
+	/*
+	 * Why not?
+	 */
+	st->socket->sk->sk_sndbuf = st->socket->sk->sk_sndbuf = 1024*1024*10;
+
+	err = kst_poll_init(st);
+	if (err)
+		return err;
+
+	return 0;
+}
+
+/*
+ * Remote node recovery function - tries to reconnect to given target.
+ */
+static int kst_data_recovery(struct kst_state *st, int err)
+{
+	struct socket *sock;
+	struct sockaddr addr;
+	int addrlen;
+	struct dst_request *req;
+
+	if (err != -ECONNRESET && err != -EPIPE) {
+		dprintk("%s: state %p does not know how "
+				"to recover from error %d.\n", 
+				__func__, st, err);
+		return err;
+	}
+	
+	err = sock_create(st->socket->ops->family, st->socket->type, 
+			st->socket->sk->sk_protocol, &sock);
+	if (err < 0)
+		goto err_out_exit;
+
+	sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo = 
+		msecs_to_jiffies(DST_DEFAULT_TIMEO);
+
+	err = sock->ops->getname(st->socket, &addr, &addrlen, 2);
+	if (err)
+		goto err_out_destroy;
+
+	err = sock->ops->connect(sock, &addr, addrlen, 0);
+	if (err)
+		goto err_out_destroy;
+
+	kst_poll_exit(st);
+	kst_sock_release(st);
+
+	mutex_lock(&st->request_lock);
+	err = st->ops->init(st, sock);
+	if (!err) {
+		/*
+		 * After reconnection is completed all requests
+		 * must be resent from the state they were finished previously,
+		 * but with new headers.
+		 */
+		list_for_each_entry(req, &st->request_list, request_list_entry)
+			req->flags &= ~DST_REQ_HEADER_SENT;
+	}
+	mutex_unlock(&st->request_lock);
+	if (err < 0)
+		goto err_out_destroy;
+
+	kst_wake(st);
+	printk("%s: recovery completed.\n", __func__);
+
+	return 0;
+
+err_out_destroy:
+	sock_release(sock);
+err_out_exit:
+	dprintk("%s: reovery failed: st: %p, err: %d.\n", __func__, st, err);
+	return err;
+}
+
+static inline void kst_convert_header(struct dst_remote_request *r)
+{
+	r->cmd = be32_to_cpu(r->cmd);
+	r->sector = be64_to_cpu(r->sector);
+	r->offset = be32_to_cpu(r->offset);
+	r->size = be32_to_cpu(r->size);
+	r->flags = be32_to_cpu(r->flags);
+}
+
+/*
+ * Local exporting node end IO callbacks.
+ */
+static int kst_export_write_end_io(struct bio *bio, unsigned int size, int err)
+{
+	dprintk("%s: bio: %p, size: %u, idx: %d, num: %d, err: %d.\n", 
+		__func__, bio, bio->bi_size, bio->bi_idx, bio->bi_vcnt, err);
+
+	if (bio->bi_size)
+		return 1;
+
+	kst_export_put_bio(bio);
+	return 0;
+}
+
+static int kst_export_read_end_io(struct bio *bio, unsigned int size, int err)
+{
+	struct dst_request *req = bio->bi_private;
+	struct kst_state *st = req->state;
+
+	dprintk("%s: bio: %p, req: %p, size: %u, idx: %d, num: %d, err: %d.\n", 
+		__func__, bio, req, bio->bi_size, bio->bi_idx, 
+		bio->bi_vcnt, err);
+
+	if (bio->bi_size)
+		return 1;
+
+	bio->bi_size = req->size = req->orig_size;
+	bio->bi_rw = WRITE;
+	req->flags &= ~DST_REQ_EXPORT_READ;
+	kst_wake(st);
+	return 0;
+}
+
+/*
+ * This callback is invoked each time new request from remote
+ * node to given local export node is received.
+ * It allocates new block IO request and queues it for processing.
+ */
+static int kst_export_ready(struct kst_state *st)
+{
+	struct dst_remote_request r;
+	struct msghdr msg;
+	struct kvec iov;
+	struct bio *bio;
+	int err, nr, i;
+	struct dst_request *req;
+	sector_t data_size;
+	unsigned int revents = st->socket->ops->poll(NULL, st->socket, NULL);
+
+	if (revents & (POLLERR | POLLHUP)) {
+		err = -EPIPE;
+		goto err_out_exit;
+	}
+
+	if (!(revents & POLLIN) || !list_empty(&st->request_list))
+		return 0;
+
+	iov.iov_base = &r;
+	iov.iov_len = sizeof(struct dst_remote_request);
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_WAITALL | MSG_NOSIGNAL;
+
+	err = kernel_recvmsg(st->socket, &msg, &iov, 1, 
+			iov.iov_len, msg.msg_flags);
+	if (err != sizeof(struct dst_remote_request)) {
+		err = -EINVAL;
+		goto err_out_exit;
+	}
+
+	kst_convert_header(&r);
+
+	dprintk("\n%s: cmd: %u, sector: %llu, size: %u, "
+			"flags: %x, offset: %u.\n",
+			__func__, r.cmd, r.sector, r.size, r.flags, r.offset);
+
+	/*
+	 * Does not support autoconfig yet.
+	 */
+	err = -EINVAL;
+	if (r.cmd != DST_READ && r.cmd != DST_WRITE)
+		goto err_out_exit;
+
+	data_size = get_capacity(st->node->bdev->bd_disk);
+	if ((signed)(r.sector + to_sector(r.size)) < 0 || 
+			(signed)(r.sector + to_sector(r.size)) > data_size || 
+			(signed)r.sector > data_size)
+		goto err_out_exit;
+
+	nr = r.size/PAGE_SIZE + 1;
+
+	while (r.size) {
+		int nr_pages = min(BIO_MAX_PAGES, nr);
+		unsigned int size;
+		struct page *page;
+
+		err = -ENOMEM;
+		req = mempool_alloc(st->w->req_pool, GFP_NOIO);
+		if (!req)
+			goto err_out_exit;
+
+		dprintk("%s: alloc req: %p, pool: %p.\n", 
+				__func__, req, st->w->req_pool);
+
+		bio = bio_alloc(GFP_NOIO, nr_pages);
+		if (!bio)
+			goto err_out_free_req;
+
+		req->flags = DST_REQ_EXPORT | DST_REQ_HEADER_SENT;
+		req->bio = bio;
+		req->state = st;
+		req->callback = &kst_data_callback;
+
+		/*
+		 * Yes, looks a bit weird.
+		 * Logic is simple - for local exporting node all operations
+		 * are reversed compared to usual nodes, since usual nodes 
+		 * process remote data and local export node process remote 
+		 * requests, so that writing data means sending data to 
+		 * remote node and receiving on the local export one.
+		 *
+		 * So, to process writing to the exported node we need first to 
+		 * receive data from the net (i.e. to perform READ operation
+		 * in terms of usual node), and then put it to the storage
+		 * (WRITE command, so it will be changed before calling 
+		 * generic_make_request()).
+		 *
+		 * To process read request from the exported node we need
+		 * first to read it from storage (READ command for BIO)
+		 * and then send it over the net (perform WRITE operation
+		 * in terms of network).
+		 */
+		if (r.cmd == DST_WRITE) {
+			req->flags |= DST_REQ_EXPORT_WRITE;
+			bio->bi_end_io = kst_export_write_end_io;
+		} else {
+			req->flags |= DST_REQ_EXPORT_READ;
+			bio->bi_end_io = kst_export_read_end_io;
+		}
+		bio->bi_rw = READ;
+		bio->bi_private = req;
+		bio->bi_sector = r.sector;
+		bio->bi_bdev = st->node->bdev;
+
+		for (i=0; i<nr_pages; ++i) {
+			page = alloc_page(GFP_NOIO);
+			if (!page)
+				break;
+
+			size = min_t(u32, PAGE_SIZE, r.size);
+
+			err = bio_add_page(bio, page, size, r.offset);
+			dprintk("%s: %d/%d: page: %p, size: %u, offset: %u, "
+					"err: %d.\n", 
+					__func__, i, nr_pages, page, size, 
+					r.offset, err);
+			if (err <= 0)
+				break;
+
+			if (err == size) {
+				r.offset = 0;
+				nr--;
+			} else {
+				r.offset += err;
+			}
+
+			r.size -= err;
+			r.sector += to_sector(err);
+
+			if (!r.size)
+				break;
+		}
+
+		if (!bio->bi_vcnt) {
+			err = -ENOMEM;
+			goto err_out_put;
+		}
+
+		req->size = req->orig_size = bio->bi_size;
+		req->start = bio->bi_sector;
+		req->idx = 0;
+		req->num = bio->bi_vcnt;
+
+		dprintk("%s: submitting: bio: %p, req: %p, start: %llu, "
+			"size: %llu, idx: %d, num: %d, offset: %u, err: %d.\n",
+			__func__, bio, req, req->start, req->size, 
+			req->idx, req->num, req->offset, err);
+
+		err = kst_enqueue_req(st, req);
+		if (err)
+			goto err_out_put;
+
+		if (r.cmd == DST_READ) {
+			generic_make_request(bio);
+		}
+	}
+
+	kst_wake(st);
+	return 0;
+
+err_out_put:
+	bio_put(bio);
+err_out_free_req:
+	dprintk("%s: free req: %p, pool: %p.\n", 
+			__func__, req, st->w->req_pool);
+	mempool_free(req, st->w->req_pool);
+err_out_exit:
+	dprintk("%s: error: %d.\n", __func__, err);
+	return err;
+}
+
+static void kst_export_exit(struct kst_state *st)
+{
+	struct dst_node *n = st->node;
+
+	dprintk("%s: st: %p.\n", __func__, st);
+
+	kst_common_exit(st);
+	dst_node_put(n);
+}
+
+static struct kst_state_ops kst_data_export_ops = {
+	.init = &kst_data_init,
+	.push = &kst_data_push,
+	.exit = &kst_export_exit,
+	.ready = &kst_export_ready,
+};
+
+/*
+ * This callback is invoked each time listening socket for
+ * given local export node becomes ready.
+ * It creates new state for connected client and queues for processing.
+ */
+static int kst_listen_ready(struct kst_state *st)
+{
+	struct socket *newsock;
+	struct saddr addr;
+	struct kst_state *newst;
+	int err;
+	unsigned int revents;
+
+	revents = st->socket->ops->poll(NULL, st->socket, NULL);
+	if (!(revents & POLLIN))
+		return 1;
+
+	err = sock_create(st->socket->ops->family, st->socket->type, 
+			st->socket->sk->sk_protocol, &newsock);
+	if (err)
+		goto err_out_exit;
+
+	err = st->socket->ops->accept(st->socket, newsock, 0);
+	if (err)
+		goto err_out_put;
+
+	if (newsock->ops->getname(newsock, (struct sockaddr *)&addr,
+				  (int *)&addr.sa_data_len, 2) < 0) {
+		err = -ECONNABORTED;
+		goto err_out_put;
+	}
+
+	if (st->socket->ops->family == AF_INET) {
+		struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+		printk("%s: Client: %u.%u.%u.%u:%d.\n", __func__, 
+			NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+	} else if (st->socket->ops->family == AF_INET6) {
+		struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr;
+		printk("%s: Client: %04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d",
+			__func__, NIP6(sin->sin6_addr), ntohs(sin->sin6_port));
+	}
+
+	atomic_inc(&st->node->refcnt);
+	newst = kst_state_init(st->w, st->node, &kst_data_export_ops, newsock);
+	if (IS_ERR(newst)) {
+		err = PTR_ERR(newst);
+		goto err_out_put;
+	}
+
+	return 0;
+
+err_out_put:
+	dst_node_put(st->node);
+	sock_release(newsock);
+err_out_exit:
+	return err;
+}
+
+static int kst_listen_init(struct kst_state *st, void *data)
+{
+	int err;
+	struct dst_local_export_ctl *le = data;
+
+	err = kst_sock_create(st, &le->rctl.addr, le->rctl.type, 
+			le->rctl.proto, le->backlog);
+	if (err)
+		goto err_out_exit;
+	
+	err = kst_poll_init(st);
+	if (err)
+		goto err_out_release;
+
+	return 0;
+
+err_out_release:
+	kst_sock_release(st);
+err_out_exit:
+	return err;
+}
+
+/*
+ * Operations for different types of states.
+ * There are three:
+ * data state - created for remote node, when distributed storage connects
+ * 	to remote node, which contain data.
+ * listen state - created for local export node, when remote distributed
+ * 	storage's node connects to given node to get/put data.
+ * data export state - created for each client connected to above listen
+ * 	state.
+ */
+static struct kst_state_ops kst_listen_ops = {
+	.init = &kst_listen_init,
+	.exit = &kst_common_exit,
+	.ready = &kst_listen_ready,
+};
+static struct kst_state_ops kst_data_ops = {
+	.init = &kst_data_init,
+	.push = &kst_data_push,
+	.exit = &kst_common_exit,
+	.recovery = &kst_data_recovery,
+};
+
+struct kst_state *kst_listener_state_init(struct kst_worker *w, 
+		struct dst_node *node, struct dst_local_export_ctl *le)
+{
+	return kst_state_init(w, node, &kst_listen_ops, le);
+}
+
+struct kst_state *kst_data_state_init(struct kst_worker *w, 
+		struct dst_node *node, struct socket *newsock)
+{
+	return kst_state_init(w, node, &kst_data_ops, newsock);
+}
+
+/*
+ * Remove all workers and associated states.
+ */
+void kst_exit_all(void)
+{
+	struct kst_worker *w, *n;
+
+	list_for_each_entry_safe(w, n, &kst_worker_list, entry) {
+		kst_worker_exit(w);
+	}
+}
diff --git a/include/linux/dst.h b/include/linux/dst.h
new file mode 100644
index 0000000..b92fb55
--- /dev/null
+++ b/include/linux/dst.h
@@ -0,0 +1,282 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef __DST_H
+#define __DST_H
+
+#include <linux/types.h>
+
+#define DST_NAMELEN		32
+#define DST_NAME		"dst"
+#define DST_IOCTL		0xba
+
+enum {
+	DST_DEL_NODE	= 0,		/* Remove node with given id from storage */
+	DST_ADD_REMOTE,			/* Add remote node with given id to the storage */
+	DST_ADD_LOCAL,			/* Add local node with given id to the storage */
+	DST_ADD_LOCAL_EXPORT,		/* Add local node with given id to the storage to be exported and used by remote peers */
+	DST_START_STORAGE,		/* Array is ready and storage can be started, if there will be new nodes
+					 * added to the storage, they will be checked against existing size and
+					 * probably be dropped (for example in mirror format when new node has smaller
+					 * size than array created) or inserted.
+					 */
+	DST_STOP_STORAGE,		/* Remove array and all nodes. */
+	DST_CMD_MAX
+};
+
+#define DST_CTL_FLAGS_REMOTE	(1<<0)
+#define DST_CTL_FLAGS_EXPORT	(1<<1)
+
+struct dst_ctl
+{
+	char			st[DST_NAMELEN];
+	char			alg[DST_NAMELEN];
+	__u32			flags;
+	__u64			start, size;
+};
+
+struct dst_local_ctl
+{
+	char			name[DST_NAMELEN];
+};
+
+#define SADDR_MAX_DATA	128
+
+struct saddr {
+	unsigned short		sa_family;			/* address family, AF_xxx	*/
+	char			sa_data[SADDR_MAX_DATA];	/* 14 bytes of protocol address	*/
+	unsigned short		sa_data_len;			/* Number of bytes used in sa_data */
+};
+
+struct dst_remote_ctl
+{
+	__u16			type;
+	__u16			proto;
+	struct saddr		addr;
+};
+
+struct dst_local_export_ctl
+{
+	__u32			backlog;
+	struct dst_local_ctl	lctl;
+	struct dst_remote_ctl	rctl;
+};
+
+
+enum {
+	DST_REMOTE_CFG		= 1, 		/* Request remote configuration */
+	DST_WRITE,				/* Writing */
+	DST_READ,				/* Reading */
+	DST_NCMD_MAX,
+};
+
+struct dst_remote_request
+{
+	__u32			cmd;
+	__u32			flags;
+	__u64			sector;
+	__u32			offset;
+	__u32			size;
+};
+
+#ifdef __KERNEL__
+
+#include <linux/rbtree.h>
+#include <linux/net.h>
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/mempool.h>
+#include <linux/device.h>
+
+//#define DST_DEBUG
+
+#ifdef DST_DEBUG
+#define dprintk(f, a...) printk(f, ##a)
+#else
+#define dprintk(f, a...) do {} while (0)
+#endif
+
+struct kst_worker
+{
+	struct list_head	entry;
+
+	struct list_head	state_list;
+	struct mutex		state_mutex;
+	
+	struct list_head	ready_list;
+	spinlock_t		ready_lock;
+
+	mempool_t		*req_pool;
+	
+	struct task_struct	*thread;
+
+	wait_queue_head_t 	wait;
+	
+	int			id;
+};
+
+struct kst_state;
+struct dst_node;
+
+#define DST_REQ_HEADER_SENT	(1<<0)
+#define DST_REQ_EXPORT		(1<<1)
+#define DST_REQ_EXPORT_WRITE	(1<<2)
+#define DST_REQ_EXPORT_READ	(1<<3)
+#define DST_REQ_ALWAYS_QUEUE	(1<<4)
+
+struct dst_request
+{
+	struct rb_node		request_entry;
+	struct list_head	request_list_entry;
+	struct bio		*bio;
+	struct kst_state	*state;
+
+	u32			flags;
+
+	int 			(*callback)(struct dst_request *, unsigned int);
+
+	u64			size, orig_size, start;
+	int			idx, num;
+	u32			offset;
+};
+
+struct kst_state_ops
+{
+	int 		(*init)(struct kst_state *, void *);
+	int 		(*push)(struct dst_request *req);
+	int		(*ready)(struct kst_state *);
+	int		(*recovery)(struct kst_state *, int err);
+	void 		(*exit)(struct kst_state *);
+};
+
+#define KST_FLAG_PARTIAL		(1<<0)
+
+struct kst_state
+{
+	struct list_head	entry;
+	struct list_head	ready_entry;
+
+	wait_queue_t 		wait;
+	wait_queue_head_t 	*whead;
+
+	struct dst_node		*node;
+	struct kst_worker	*w;
+	struct socket		*socket;
+
+	u32			flags;
+
+	struct rb_root		request_root;
+	struct mutex		request_lock;
+	struct list_head	request_list;
+
+	struct kst_state_ops	*ops;
+};
+
+#define DST_DEFAULT_TIMEO	2000
+
+struct dst_storage;
+
+struct dst_alg_ops
+{
+	int			(*add_node)(struct dst_node *n);
+	void			(*del_node)(struct dst_node *n);
+	int 			(*remap)(struct dst_storage *st, struct bio *bio);
+	int			(*error)(struct kst_state *state, int err);
+	struct module 		*owner;
+};
+
+struct dst_alg
+{
+	struct list_head	entry;
+	char			name[DST_NAMELEN];
+	atomic_t		refcnt;
+	struct dst_alg_ops	*ops;
+};
+
+#define DST_ST_STARTED		(1<<0)
+
+struct dst_storage
+{
+	struct list_head	entry;
+	char			name[DST_NAMELEN];
+	struct dst_alg		*alg;
+	atomic_t		refcnt;
+	struct mutex		tree_lock;
+	struct rb_root		tree_root;
+
+	request_queue_t		*queue;
+	struct gendisk		*disk;
+
+	long			flags;
+	u64			disk_size;
+
+	struct device		device;
+};
+
+#define DST_NODE_FROZEN		0
+
+struct dst_node
+{
+	struct rb_node		tree_node;
+	struct block_device 	*bdev;
+	struct dst_storage	*st;
+	struct kst_state	*state;
+
+	atomic_t		refcnt;
+
+	void			(*cleanup)(struct dst_node *);
+
+	long			flags;
+
+	u64			start, size;
+
+	struct device		device;
+};
+
+struct kst_state *kst_state_init(struct kst_worker *w, struct dst_node *node, 
+		struct kst_state_ops *ops, void *data);
+void kst_state_exit(struct kst_state *st);
+
+struct kst_worker *kst_worker_init(int id);
+void kst_worker_exit(struct kst_worker *w);
+
+struct kst_state *kst_listener_state_init(struct kst_worker *w, struct dst_node *node, 
+		struct dst_local_export_ctl *le);
+struct kst_state *kst_data_state_init(struct kst_worker *w, struct dst_node *node, 
+		struct socket *newsock);
+
+void kst_exit_all(void);
+
+struct dst_alg *dst_alloc_alg(char *name, struct dst_alg_ops *ops);
+void dst_remove_alg(struct dst_alg *alg);
+
+struct dst_node *dst_storage_tree_search(struct dst_storage *st, u64 start);
+
+void dst_node_put(struct dst_node *n);
+
+extern struct kmem_cache *dst_request_cache;
+
+static inline sector_t to_sector(unsigned long n)
+{
+	return (n >> 9);
+}
+
+static inline unsigned long to_bytes(sector_t n)
+{
+	return (n << 9);
+}
+
+#endif /* __KERNEL__ */
+#endif /* __DST_H */


-- 
	Evgeniy Polyakov

^ permalink raw reply related	[flat|nested] 81+ messages in thread
* Distributed STorage.
@ 2008-08-27 16:07 Evgeniy Polyakov
  0 siblings, 0 replies; 81+ messages in thread
From: Evgeniy Polyakov @ 2008-08-27 16:07 UTC (permalink / raw)
  To: linux-kernel; +Cc: netdev, linux-fsdevel

Hi.

I am pleased to announce new Distributed STorage (DST) project release.

Distributed storage was completely rewritten from scratch recenly.
I dropped essentially mirrored features of the device mapper in
favour of the more robust block io processing and effective protocol.


DST is a block layer network device, which among others has following
features:

 * Kernel-side client and server. No need for any special tools for
	data processing (like special userspace applications) except
	for configuration.
 * Bullet-proof memory allocations via memory pools for all temporary
 	objects (transaction and so on).
 * Zero-copy sending (except header) if supported by device
 	using sendpage().
 * Failover recovery in case of broken link
 	(reconnection if remote node is down).
 * Full transaction support (resending of the failed transactions
 	on timeout of after reconnect to failed node).
 * Dynamically resizeable pool of threads used for data receiving
 	and crypto processing.
 * Initial autoconfiguration. Ability to extend it with additional
 	attributes if needed.
 * Support for any kind of network media (not limited to tcp or inet
 	protocols) higher MAC layer (socket layer). Out of the box
	kernel-side IPv6 support (needs to extend configuration utility,
	check how it was done in POHMELFS [1]).
 * Security attributes for local export nodes (list of allowed to
 	connect addresses with permissions). Not used currently though.
 * Ability to use any supported cryptographically strong checksums.
 	Ability to encrypt data channel.

One can grab sources (various configuration examples can be found
in 'userspace' dir) from archive, or via kernel and userspace GIT trees.

1. POHMELFS homepage.
http://tservice.net.ru/~s0mbre/old/?section=projects&item=pohmelfs

2. DST homepage.
http://tservice.net.ru/~s0mbre/old/?section=projects&item=dst

3. DST archive.
http://tservice.net.ru/~s0mbre/archive/dst/

4. DST git trees.
http://tservice.net.ru/~s0mbre/archive/dst/dst.git/
http://tservice.net.ru/~s0mbre/archive/dst/dst-userspace.git/

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>

diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
index 0d1d213..56a64fe 100644
--- a/drivers/block/Kconfig
+++ b/drivers/block/Kconfig
@@ -433,4 +433,6 @@ config VIRTIO_BLK
 	  This is the virtual block driver for virtio.  It can be used with
           lguest or QEMU based VMMs (like KVM or Xen).  Say Y or M.
 
+source "drivers/block/dst/Kconfig"
+
 endif # BLK_DEV
diff --git a/drivers/block/Makefile b/drivers/block/Makefile
index 5e58430..26bcf8a 100644
--- a/drivers/block/Makefile
+++ b/drivers/block/Makefile
@@ -31,3 +31,5 @@ obj-$(CONFIG_BLK_DEV_SX8)	+= sx8.o
 obj-$(CONFIG_BLK_DEV_UB)	+= ub.o
 
 obj-$(CONFIG_XEN_BLKDEV_FRONTEND)	+= xen-blkfront.o
+
+obj-$(CONFIG_DST)	+= dst/
diff --git a/drivers/block/dst/Kconfig b/drivers/block/dst/Kconfig
new file mode 100644
index 0000000..0b641f0
--- /dev/null
+++ b/drivers/block/dst/Kconfig
@@ -0,0 +1,18 @@
+menuconfig DST
+	tristate "Distributed storage"
+	depends on NET
+	select CONNECTOR
+	select LIBCRC32C
+	---help---
+	This driver allows to create a distributed storage block device.
+
+if DST
+
+config DST_DEBUG
+	bool "DST debug"
+	depends on DST
+	---help---
+	This option will turn HEAVY debugging of the DST.
+	Turn it on ONLY if you have to debug some really obscure problem.
+
+endif # DST
diff --git a/drivers/block/dst/Makefile b/drivers/block/dst/Makefile
new file mode 100644
index 0000000..526fc62
--- /dev/null
+++ b/drivers/block/dst/Makefile
@@ -0,0 +1,3 @@
+obj-$(CONFIG_DST) += dst.o
+
+dst-y := dcore.o state.o export.o thread_pool.o crypto.o trans.o
diff --git a/drivers/block/dst/crypto.c b/drivers/block/dst/crypto.c
new file mode 100644
index 0000000..7d86fae
--- /dev/null
+++ b/drivers/block/dst/crypto.c
@@ -0,0 +1,767 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/bio.h>
+#include <linux/crypto.h>
+#include <linux/dst.h>
+#include <linux/kernel.h>
+#include <linux/scatterlist.h>
+#include <linux/slab.h>
+
+static inline u64 dst_gen_iv(struct dst_trans *t)
+{
+	return t->gen;
+}
+
+static struct crypto_hash *dst_init_hash(struct dst_crypto_ctl *ctl, u8 *key)
+{
+	int err;
+	struct crypto_hash *hash;
+
+	hash = crypto_alloc_hash(ctl->hash_algo, 0, CRYPTO_ALG_ASYNC);
+	if (IS_ERR(hash)) {
+		err = PTR_ERR(hash);
+		dprintk("%s: failed to allocate hash '%s', err: %d.\n",
+				__func__, ctl->hash_algo, err);
+		goto err_out_exit;
+	}
+
+	ctl->crypto_attached_size = crypto_hash_digestsize(hash);
+
+	dprintk("%s: keysize: %u, key: ", __func__, ctl->hash_keysize);
+	for (err = 0; err < ctl->hash_keysize; ++err)
+		printk("%02x ", key[err]);
+	printk("\n");
+
+	if (!ctl->hash_keysize)
+		return hash;
+
+	err = crypto_hash_setkey(hash, key, ctl->hash_keysize);
+	if (err) {
+		dprintk("%s: failed to set key for hash '%s', err: %d.\n",
+				__func__, ctl->hash_algo, err);
+		goto err_out_free;
+	}
+
+	return hash;
+
+err_out_free:
+	crypto_free_hash(hash);
+err_out_exit:
+	return ERR_PTR(err);
+}
+
+static struct crypto_ablkcipher *dst_init_cipher(struct dst_crypto_ctl *ctl, u8 *key)
+{
+	int err = -EINVAL;
+	struct crypto_ablkcipher *cipher;
+
+	if (!ctl->cipher_keysize)
+		goto err_out_exit;
+
+	cipher = crypto_alloc_ablkcipher(ctl->cipher_algo, 0, 0);
+	if (IS_ERR(cipher)) {
+		err = PTR_ERR(cipher);
+		dprintk("%s: failed to allocate cipher '%s', err: %d.\n",
+				__func__, ctl->cipher_algo, err);
+		goto err_out_exit;
+	}
+
+	crypto_ablkcipher_clear_flags(cipher, ~0);
+
+	err = crypto_ablkcipher_setkey(cipher, key, ctl->cipher_keysize);
+	if (err) {
+		dprintk("%s: failed to set key for cipher '%s', err: %d.\n",
+				__func__, ctl->cipher_algo, err);
+		goto err_out_free;
+	}
+
+	return cipher;
+
+err_out_free:
+	crypto_free_ablkcipher(cipher);
+err_out_exit:
+	return ERR_PTR(err);
+}
+
+static void dst_crypto_pages_free(struct dst_crypto_engine *e)
+{
+	unsigned int i;
+
+	for (i=0; i<e->page_num; ++i)
+		__free_page(e->pages[i]);
+	kfree(e->pages);
+}
+
+static int dst_crypto_pages_alloc(struct dst_crypto_engine *e, int num)
+{
+	int i;
+
+	e->pages = kmalloc(num * sizeof(struct page **), GFP_KERNEL);
+	if (!e->pages)
+		return -ENOMEM;
+
+	for (i=0; i<num; ++i) {
+		e->pages[i] = alloc_page(GFP_KERNEL);
+		if (!e->pages[i])
+			goto err_out_free_pages;
+	}
+
+	e->page_num = num;
+	return 0;
+
+err_out_free_pages:
+	while (--i >= 0)
+		__free_page(e->pages[i]);
+
+	kfree(e->pages);
+	return -ENOMEM;
+}
+
+static int dst_crypto_engine_init(struct dst_crypto_engine *e, struct dst_node *n)
+{
+	int err;
+	struct dst_crypto_ctl *ctl = &n->crypto;
+
+	err = dst_crypto_pages_alloc(e, n->max_pages);
+	if (err)
+		goto err_out_exit;
+
+	e->size = PAGE_SIZE;
+	e->data = kmalloc(e->size, GFP_KERNEL);
+	if (!e->data) {
+		err = -ENOMEM;
+		goto err_out_free_pages;
+	}
+
+	if (ctl->hash_algo[0]) {
+		e->hash = dst_init_hash(ctl, n->hash_key);
+		if (IS_ERR(e->hash)) {
+			err = PTR_ERR(e->hash);
+			e->hash = NULL;
+			goto err_out_free;
+		}
+	}
+
+	if (ctl->cipher_algo[0]) {
+		e->cipher = dst_init_cipher(ctl, n->cipher_key);
+		if (IS_ERR(e->cipher)) {
+			err = PTR_ERR(e->cipher);
+			e->cipher = NULL;
+			goto err_out_free_hash;
+		}
+	}
+
+	return 0;
+
+err_out_free_hash:
+	crypto_free_hash(e->hash);
+err_out_free:
+	kfree(e->data);
+err_out_free_pages:
+	dst_crypto_pages_free(e);
+err_out_exit:
+	return err;
+}
+
+static void dst_crypto_engine_exit(struct dst_crypto_engine *e)
+{
+	if (e->hash)
+		crypto_free_hash(e->hash);
+	if (e->cipher)
+		crypto_free_ablkcipher(e->cipher);
+	dst_crypto_pages_free(e);
+	kfree(e->data);
+}
+
+struct dst_crypto_completion
+{
+	struct completion		complete;
+	int				error;
+};
+
+static void dst_crypto_complete(struct crypto_async_request *req, int err)
+{
+	struct dst_crypto_completion *c = req->data;
+
+	if (err == -EINPROGRESS)
+		return;
+
+	dprintk("%s: req: %p, err: %d.\n", __func__, req, err);
+	c->error = err;
+	complete(&c->complete);
+}
+
+static int dst_crypto_process(struct ablkcipher_request *req,
+		struct scatterlist *sg_dst, struct scatterlist *sg_src,
+		void *iv, int enc, unsigned long timeout)
+{
+	struct dst_crypto_completion c;
+	int err;
+
+	init_completion(&c.complete);
+	c.error = -EINPROGRESS;
+
+	ablkcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_BACKLOG,
+					dst_crypto_complete, &c);
+
+	ablkcipher_request_set_crypt(req, sg_src, sg_dst, sg_src->length, iv);
+
+	if (enc)
+		err = crypto_ablkcipher_encrypt(req);
+	else
+		err = crypto_ablkcipher_decrypt(req);
+
+	switch (err) {
+		case -EINPROGRESS:
+		case -EBUSY:
+			err = wait_for_completion_interruptible_timeout(&c.complete,
+					timeout);
+			if (!err)
+				err = -ETIMEDOUT;
+			else
+				err = c.error;
+			break;
+		default:
+			break;
+	}
+
+	return err;
+}
+
+static int dst_trans_iter_out(struct bio *bio, struct dst_crypto_engine *e,
+		int (* iterator) (struct dst_crypto_engine *e,
+				  struct scatterlist *dst,
+				  struct scatterlist *src))
+{
+	struct bio_vec *bv;
+	int err, i;
+	
+	sg_init_table(e->src, bio->bi_vcnt);
+	sg_init_table(e->dst, bio->bi_vcnt);
+
+	bio_for_each_segment(bv, bio, i) {
+		sg_set_page(&e->src[i], bv->bv_page, bv->bv_len, bv->bv_offset);
+		sg_set_page(&e->dst[i], e->pages[i], bv->bv_len, bv->bv_offset);
+
+		err = iterator(e, &e->dst[i], &e->src[i]);
+		if (err)
+			return err;
+	}
+
+	return 0;
+}
+
+static int dst_trans_iter_in(struct bio *bio, struct dst_crypto_engine *e,
+		int (* iterator) (struct dst_crypto_engine *e,
+				  struct scatterlist *dst,
+				  struct scatterlist *src))
+{
+	struct bio_vec *bv;
+	int err, i;
+	
+	sg_init_table(e->src, bio->bi_vcnt);
+	sg_init_table(e->dst, bio->bi_vcnt);
+
+	bio_for_each_segment(bv, bio, i) {
+		sg_set_page(&e->src[i], bv->bv_page, bv->bv_len, bv->bv_offset);
+		sg_set_page(&e->dst[i], bv->bv_page, bv->bv_len, bv->bv_offset);
+
+		err = iterator(e, &e->dst[i], &e->src[i]);
+		if (err)
+			return err;
+	}
+
+	return 0;
+}
+
+static int dst_crypt_iterator(struct dst_crypto_engine *e,
+		struct scatterlist *sg_dst, struct scatterlist *sg_src)
+{
+	struct ablkcipher_request *req = e->data;
+	u8 iv[32];
+
+	memset(iv, 0, sizeof(iv));
+
+	memcpy(iv, &e->iv, sizeof(e->iv));
+
+	return dst_crypto_process(req, sg_dst, sg_src, iv, e->enc, e->timeout);
+}
+
+static int dst_crypt(struct dst_crypto_engine *e, struct bio *bio)
+{
+	struct ablkcipher_request *req = e->data;
+
+	memset(req, 0, sizeof(struct ablkcipher_request));
+	ablkcipher_request_set_tfm(req, e->cipher);
+
+	if (e->enc)
+		return dst_trans_iter_out(bio, e, dst_crypt_iterator);
+	else
+		return dst_trans_iter_in(bio, e, dst_crypt_iterator);
+}
+
+static int dst_hash_iterator(struct dst_crypto_engine *e,
+		struct scatterlist *sg_dst, struct scatterlist *sg_src)
+{
+	return crypto_hash_update(e->data, sg_src, sg_src->length);
+}
+
+static int dst_hash(struct dst_crypto_engine *e, struct bio *bio, void *dst)
+{
+	struct hash_desc *desc = e->data;
+	int err;
+
+	desc->tfm = e->hash;
+	desc->flags = 0;
+
+	err = crypto_hash_init(desc);
+	if (err)
+		return err;
+
+	err = dst_trans_iter_in(bio, e, dst_hash_iterator);
+	if (err)
+		return err;
+
+	err = crypto_hash_final(desc, dst);
+	if (err)
+		return err;
+
+	return 0;
+}
+
+static void *dst_crypto_thread_init(void *data)
+{
+	struct dst_node *n = data;
+	struct dst_crypto_engine *e;
+	int err = -ENOMEM;
+
+	e = kzalloc(sizeof(struct dst_crypto_engine), GFP_KERNEL);
+	if (!e)
+		goto err_out_exit;
+	e->src = kzalloc(sizeof(struct scatterlist) * 2 * n->max_pages, GFP_KERNEL);
+	if (!e->src)
+		goto err_out_free;
+
+	e->dst = e->src + n->max_pages;
+
+	err = dst_crypto_engine_init(e, n);
+	if (err)
+		goto err_out_free;
+
+	return e;
+
+err_out_free:
+	kfree(e->src);
+err_out_exit:
+	return ERR_PTR(err);
+}
+
+static void dst_crypto_thread_cleanup(void *private)
+{
+	struct dst_crypto_engine *e = private;
+
+	dst_crypto_engine_exit(e);
+	kfree(e->src);
+	kfree(e);
+}
+
+int dst_node_crypto_init(struct dst_node *n, struct dst_crypto_ctl *ctl)
+{
+	void *key = (ctl + 1);
+	int err = -ENOMEM, i;
+	char name[32];
+
+	if (ctl->hash_keysize) {
+		n->hash_key = kmalloc(ctl->hash_keysize, GFP_KERNEL);
+		if (!n->hash_key)
+			goto err_out_exit;
+		memcpy(n->hash_key, key, ctl->hash_keysize);
+	}
+
+	if (ctl->cipher_keysize) {
+		n->cipher_key = kmalloc(ctl->cipher_keysize, GFP_KERNEL);
+		if (!n->cipher_key)
+			goto err_out_free_hash;
+		memcpy(n->cipher_key, key, ctl->cipher_keysize);
+	}
+	memcpy(&n->crypto, ctl, sizeof(struct dst_crypto_ctl));
+
+	for (i=0; i<ctl->thread_num; ++i) {
+		snprintf(name, sizeof(name), "%s-crypto-%d", n->name, i);
+		/* Unique ids... */
+		err = thread_pool_add_worker(n->pool, name, i+10,
+			dst_crypto_thread_init, dst_crypto_thread_cleanup, n);
+		if (err)
+			goto err_out_free_threads;
+	}
+
+	return 0;
+
+err_out_free_threads:
+	while (--i >= 0)
+		thread_pool_del_worker_id(n->pool, i+10);
+
+	if (ctl->cipher_keysize)
+		kfree(n->cipher_key);
+	ctl->cipher_keysize = 0;
+err_out_free_hash:
+	if (ctl->hash_keysize)
+		kfree(n->hash_key);
+	ctl->hash_keysize = 0;
+err_out_exit:
+	return err;
+}
+
+void dst_node_crypto_exit(struct dst_node *n)
+{
+	struct dst_crypto_ctl *ctl = &n->crypto;
+
+	if (ctl->cipher_algo[0] || ctl->hash_algo[0]) {
+		kfree(n->hash_key);
+		kfree(n->cipher_key);
+	}
+}
+
+static int dst_trans_crypto_setup(void *crypto_engine, void *trans)
+{
+	struct dst_crypto_engine *e = crypto_engine;
+
+	e->private = trans;
+	return 0;
+}
+
+#if 0
+static void dst_dump_bio(struct bio *bio)
+{
+	u8 *p;
+	struct bio_vec *bv;
+	int i;
+
+	bio_for_each_segment(bv, bio, i) {
+		printk("%s: %llu/%u: size: %u, offset: %u, data: ",
+				__func__, bio->bi_sector, bio->bi_size,
+				bv->bv_len, bv->bv_offset);
+
+		p = kmap(bv->bv_page) + bv->bv_offset;
+		for (i=0; i<bv->bv_len; ++i)
+			printk("%02x ", p[i]);
+		kunmap(bv->bv_page);
+		printk("\n");
+	}
+}
+#endif
+
+static int dst_crypto_process_sending(struct dst_crypto_engine *e, struct bio *bio, u8 *hash)
+{
+	int err;
+
+	if (e->cipher) {
+		err = dst_crypt(e, bio);
+		if (err)
+			goto err_out_exit;
+	}
+
+	if (e->hash) {
+		err = dst_hash(e, bio, hash);
+		if (err)
+			goto err_out_exit;
+
+#if defined CONFIG_DST_DEBUG
+		{
+			unsigned int i;
+
+			//dst_dump_bio(bio);
+
+			printk("%s: bio: %llu/%u, rw: %lu, hash: ",
+				__func__, bio->bi_sector, bio->bi_size, bio_data_dir(bio));
+			for (i=0; i<crypto_hash_digestsize(e->hash); ++i)
+					printk("%02x ", hash[i]);
+			printk("\n");
+		}
+#endif
+	}
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+static int dst_crypto_process_receiving(struct dst_crypto_engine *e, struct bio *bio, u8 *hash, u8 *recv_hash)
+{
+	int err;
+
+	if (e->hash) {
+		int mismatch;
+
+		err = dst_hash(e, bio, hash);
+		if (err)
+			goto err_out_exit;
+
+		mismatch = !!memcmp(recv_hash, hash, crypto_hash_digestsize(e->hash));
+#if defined CONFIG_DST_DEBUG
+		//dst_dump_bio(bio);
+
+		printk("%s: bio: %llu/%u, rw: %lu, hash mismatch: %d",
+			__func__, bio->bi_sector, bio->bi_size, bio_data_dir(bio), mismatch);
+		if (mismatch) {
+			unsigned int i;
+
+			printk(", recv/calc: ");
+			for (i=0; i<crypto_hash_digestsize(e->hash); ++i) {
+				printk("%02x/%02x ", recv_hash[i], hash[i]);
+			}
+		}
+		printk("\n");
+#endif
+		err = -1;
+		if (mismatch)
+			goto err_out_exit;
+	}
+
+	if (e->cipher) {
+		err = dst_crypt(e, bio);
+		if (err)
+			goto err_out_exit;
+	}
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+static int dst_trans_crypto_action(void *crypto_engine, void *schedule_data)
+{
+	struct dst_crypto_engine *e = crypto_engine;
+	struct dst_trans *t = schedule_data;
+	struct bio *bio = t->bio;
+	int err;
+
+	dprintk("%s: t: %p, gen: %llu, cipher: %p, hash: %p.\n", __func__, t, t->gen, e->cipher, e->hash);
+
+	e->enc = t->enc;
+	e->iv = dst_gen_iv(t);
+
+	if (bio_data_dir(bio) == WRITE) {
+		err = dst_crypto_process_sending(e, bio, t->cmd.hash);
+		if (err)
+			goto err_out_exit;
+
+		if (e->hash) {
+			t->cmd.csize = crypto_hash_digestsize(e->hash);
+			t->cmd.size += t->cmd.csize;
+		}
+
+		return dst_trans_send(t);
+	} else {
+		u8 *hash = e->data + e->size/2;
+
+		err = dst_crypto_process_receiving(e, bio, hash, t->cmd.hash);
+		if (err)
+			goto err_out_exit;
+
+		dst_trans_remove(t);
+		dst_trans_put(t);
+	}
+
+	return 0;
+
+err_out_exit:
+	t->error = err;
+	dst_trans_put(t);
+	return err;
+}
+
+int dst_trans_crypto(struct dst_trans *t)
+{
+	struct dst_node *n = t->n;
+	int err;
+
+	err = thread_pool_schedule(n->pool,
+		dst_trans_crypto_setup, dst_trans_crypto_action,
+		t, MAX_SCHEDULE_TIMEOUT);
+	if (err)
+		goto err_out_exit;
+
+	return 0;
+
+err_out_exit:
+	dst_trans_put(t);
+	return err;
+}
+
+static int dst_export_crypto_setup(void *crypto_engine, void *bio)
+{
+	struct dst_crypto_engine *e = crypto_engine;
+
+	e->private = bio;
+	return 0;
+}
+
+static int dst_export_crypto_action(void *crypto_engine, void *schedule_data)
+{
+	struct dst_crypto_engine *e = crypto_engine;
+	struct bio *bio = schedule_data;
+	struct dst_export_priv *p = bio->bi_private;
+	int err;
+
+	dprintk("%s: e: %p, data: %p, bio: %llu/%u, dir: %lu.\n", __func__,
+		e, e->data, bio->bi_sector, bio->bi_size, bio_data_dir(bio));
+
+	e->enc = (bio_data_dir(bio) == READ);
+	e->iv = p->cmd.id;
+
+	if (bio_data_dir(bio) == WRITE) {
+		u8 *hash = e->data + e->size/2;
+
+		err = dst_crypto_process_receiving(e, bio, hash, p->cmd.hash);
+		if (err)
+			goto err_out_exit;
+
+		generic_make_request(bio);
+	} else {
+		err = dst_crypto_process_sending(e, bio, p->cmd.hash);
+		if (err)
+			goto err_out_exit;
+
+		if (e->hash) {
+			p->cmd.csize = crypto_hash_digestsize(e->hash);
+			p->cmd.size += p->cmd.csize;
+		}
+
+		err = dst_export_send_bio(bio);
+	}
+	return 0;
+
+err_out_exit:
+	bio_put(bio);
+	return err;
+}
+
+int dst_export_crypto(struct dst_node *n, struct bio *bio)
+{
+	int err;
+
+	err = thread_pool_schedule(n->pool,
+		dst_export_crypto_setup, dst_export_crypto_action,
+		bio, MAX_SCHEDULE_TIMEOUT);
+	if (err)
+		goto err_out_exit;
+
+	return 0;
+
+err_out_exit:
+	bio_put(bio);
+	return err;
+}
+
+#if 0
+static int pohmelfs_crypto_init_handshake(struct pohmelfs_sb *psb)
+{
+	struct netfs_trans *t;
+	struct netfs_capabilities *cap;
+	struct netfs_cmd *cmd;
+	char *str;
+	int err = -ENOMEM, size;
+
+	size = sizeof(struct netfs_capabilities) +
+		psb->cipher_strlen + psb->hash_strlen + 2; /* 0 bytes */
+
+	t = netfs_trans_alloc(psb, size, 0, 0);
+	if (!t)
+		goto err_out_exit;
+
+	t->complete = pohmelfs_crypt_init_complete;
+	t->private = psb;
+
+	cmd = netfs_trans_current(t);
+	cap = (struct netfs_capabilities *)(cmd + 1);
+	str = (char *)(cap + 1);
+
+	cmd->cmd = NETFS_CAPABILITIES;
+	cmd->id = psb->idx;
+	cmd->size = size;
+	cmd->start = 0;
+	cmd->ext = 0;
+	cmd->csize = 0;
+
+	netfs_convert_cmd(cmd);
+	netfs_trans_update(cmd, t, size);
+
+	cap->hash_strlen = psb->hash_strlen;
+	if (cap->hash_strlen) {
+		sprintf(str, "%s", psb->hash_algo);
+		str += cap->hash_strlen;
+	}
+
+	cap->cipher_strlen = psb->cipher_strlen;
+	cap->cipher_keysize = psb->cipher_keysize;
+	if (cap->cipher_strlen)
+		sprintf(str, "%s", psb->cipher_algo);
+
+	netfs_convert_capabilities(cap);
+
+	psb->flags = ~0;
+	err = netfs_trans_finish(t, psb);
+	if (err)
+		goto err_out_exit;
+
+	err = wait_event_interruptible_timeout(psb->wait, (psb->flags != ~0),
+			psb->wait_on_page_timeout);
+	if (!err)
+		err = -ETIMEDOUT;
+	else
+		err = -psb->flags;
+
+	if (!err)
+		psb->perform_crypto = 1;
+	psb->flags = 0;
+
+	/*
+	 * At this point NETFS_CAPABILITIES response command
+	 * should setup superblock in a way, which is acceptible
+	 * for both client and server, so if server refuses connection,
+	 * it will send error in transaction response.
+	 */
+
+	if (err)
+		goto err_out_exit;
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+int pohmelfs_crypto_init(struct pohmelfs_sb *psb)
+{
+	int err;
+
+	if (!psb->cipher_algo && !psb->hash_algo)
+		return 0;
+
+	err = pohmelfs_crypto_init_handshake(psb);
+	if (err)
+		return err;
+
+	err = pohmelfs_sys_crypto_init(psb);
+	if (err)
+		return err;
+
+	return 0;
+}
+#endif
diff --git a/drivers/block/dst/dcore.c b/drivers/block/dst/dcore.c
new file mode 100644
index 0000000..302b284
--- /dev/null
+++ b/drivers/block/dst/dcore.c
@@ -0,0 +1,793 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/buffer_head.h>
+#include <linux/connector.h>
+#include <linux/dst.h>
+#include <linux/device.h>
+#include <linux/jhash.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/socket.h>
+
+#include <linux/in.h>
+#include <linux/in6.h>
+
+#include <net/sock.h>
+
+static int dst_major;
+
+static DEFINE_MUTEX(dst_hash_lock);
+static struct list_head *dst_hashtable;
+static unsigned int dst_hashtable_size = 128;
+module_param(dst_hashtable_size, uint, 0644);
+
+static char dst_name[] = "Succumbed to live ant.";
+
+/*
+ * DST sysfs tree for device called 'storage':
+ *
+ * /sys/bus/dst/devices/storage/
+ * /sys/bus/dst/devices/storage/type : 192.168.4.80:1025
+ * /sys/bus/dst/devices/storage/size : 800
+ * /sys/bus/dst/devices/storage/name : storage
+ */
+
+static int dst_dev_match(struct device *dev, struct device_driver *drv)
+{
+	return 1;
+}
+
+static struct bus_type dst_dev_bus_type = {
+	.name 		= "dst",
+	.match 		= &dst_dev_match,
+};
+
+static void dst_node_release(struct device *dev)
+{
+}
+
+static struct device dst_node_dev = {
+	.bus 		= &dst_dev_bus_type,
+	.release 	= &dst_node_release
+};
+
+static void dst_node_set_size(struct dst_node *n)
+{
+	struct block_device *bdev;
+
+	set_capacity(n->disk, n->size >> 9);
+	
+	bdev = bdget_disk(n->disk, 0);
+	if (bdev) {
+		mutex_lock(&bdev->bd_inode->i_mutex);
+		i_size_write(bdev->bd_inode, n->size);
+		mutex_unlock(&bdev->bd_inode->i_mutex);
+		bdput(bdev);
+	}
+}
+
+/*
+ * Distributed storage request processing function.
+ */
+static int dst_request(struct request_queue *q, struct bio *bio)
+{
+	struct dst_node *n = q->queuedata;
+
+	bio_get(bio);
+
+	return dst_process_bio(n, bio);
+}
+
+static int dst_bdev_open(struct inode *inode, struct file *filp)
+{
+	struct dst_node *n = inode->i_bdev->bd_disk->private_data;
+
+	dst_node_get(n);
+	return 0;
+}
+
+static int dst_bdev_release(struct inode *inode, struct file *filp)
+{
+	struct dst_node *n = inode->i_bdev->bd_disk->private_data;
+
+	dst_node_put(n);
+	return 0;
+}
+
+static struct block_device_operations dst_blk_ops = {
+	.open		= dst_bdev_open,
+	.release	= dst_bdev_release,
+	.owner		= THIS_MODULE,
+};
+
+/*
+ * Block layer binding - disk is created when array is fully configured
+ * by userspace request.
+ */
+static int dst_node_create_disk(struct dst_node *n)
+{
+	int err = -ENOMEM;
+
+	n->queue = blk_alloc_queue(GFP_KERNEL);
+	if (!n->queue)
+		goto err_out_exit;
+
+	n->queue->queuedata = n;
+	blk_queue_max_phys_segments(n->queue, n->max_pages);
+	blk_queue_make_request(n->queue, dst_request);
+	blk_queue_bounce_limit(n->queue, BLK_BOUNCE_ANY);
+
+	err = -EINVAL;
+	n->disk = alloc_disk(1);
+	if (!n->disk)
+		goto err_out_free_queue;
+
+	n->disk->major = dst_major;
+	n->disk->first_minor = (((unsigned long)n->disk) ^
+		(((unsigned long)n->disk) >> 31)) & 0xff;
+	n->disk->fops = &dst_blk_ops;
+	n->disk->queue = n->queue;
+	n->disk->private_data = n;
+	snprintf(n->disk->disk_name, sizeof(n->disk->disk_name),
+			"dst-%s-%d", n->name, n->disk->first_minor);
+
+	return 0;
+
+err_out_free_queue:
+	blk_cleanup_queue(n->queue);
+err_out_exit:
+	return err;
+}
+
+static ssize_t dst_show_size(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+
+	return sprintf(buf, "%llu\n", n->size);
+}
+
+/*
+ * Shows type of the remote node - device major/minor number
+ * for local nodes and address (af_inet ipv4/ipv6 only) for remote nodes.
+ */
+static ssize_t dst_show_type(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+	struct sockaddr addr;
+	struct socket *sock;
+	int addrlen;
+
+	sock = n->state->socket;
+	if (sock->ops->getname(sock, &addr, &addrlen, 2))
+		return 0;
+
+	if (sock->ops->family == AF_INET) {
+		struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+		return sprintf(buf, "%u.%u.%u.%u:%d\n",
+			NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+	} else if (sock->ops->family == AF_INET6) {
+		struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr;
+		return sprintf(buf,
+			"%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d\n",
+			NIP6(sin->sin6_addr), ntohs(sin->sin6_port));
+	} else {
+		int i, sz = PAGE_SIZE - 2; /* 0 symbol and '\n' below */
+		int size;
+		unsigned char *a = (unsigned char *)&addr;
+		char *buf_orig = buf;
+
+		size = snprintf(buf, sz, "family: %d, addrlen: %u, addr: ",
+				addr.sa_family, addrlen);
+		sz -= size;
+		buf += size;
+
+		for (i=0; i<addrlen; ++i) {
+			if (sz < 3)
+				break;
+
+			size = snprintf(buf, sz, "%02x ", a[i]);
+			sz -= size;
+			buf += size;
+		}
+		buf += sprintf(buf, "\n");
+
+		return buf - buf_orig;
+	}
+	return 0;
+}
+
+static struct device_attribute dst_node_attrs[] = {
+	__ATTR(size, 0444, dst_show_size, NULL),
+	__ATTR(type, 0444, dst_show_type, NULL),
+};
+
+static int dst_create_node_attributes(struct dst_node *n)
+{
+	int err, i;
+
+	for (i=0; i<ARRAY_SIZE(dst_node_attrs); ++i)
+		err = device_create_file(&n->device,
+				&dst_node_attrs[i]);
+	return 0;
+}
+
+static void dst_remove_node_attributes(struct dst_node *n)
+{
+	int i;
+
+	for (i=0; i<ARRAY_SIZE(dst_node_attrs); ++i)
+		device_remove_file(&n->device,
+				&dst_node_attrs[i]);
+}
+
+static void dst_node_sysfs_exit(struct dst_node *n)
+{
+	dst_remove_node_attributes(n);
+	device_unregister(&n->device);
+}
+
+static int dst_node_sysfs_init(struct dst_node *n)
+{
+	int err;
+
+	memcpy(&n->device, &dst_node_dev, sizeof(struct device));
+
+	snprintf(n->device.bus_id, sizeof(n->device.bus_id), "dst-%s", n->name);
+	err = device_register(&n->device);
+	if (err) {
+		dprintk(KERN_ERR "Failed to register node '%s', err: %d.\n", n->name, err);
+		goto err_out_exit;
+	}
+
+	dst_create_node_attributes(n);
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+static inline unsigned int dst_hash(char *str, unsigned int size)
+{
+	return (jhash(str, size, 0) % dst_hashtable_size);
+}
+
+static void dst_node_remove(struct dst_node *n)
+{
+	mutex_lock(&dst_hash_lock);
+	list_del_init(&n->node_entry);
+	mutex_unlock(&dst_hash_lock);
+}
+
+static void dst_node_add(struct dst_node *n)
+{
+	unsigned hash = dst_hash(n->name, sizeof(n->name));
+
+	mutex_lock(&dst_hash_lock);
+	list_add_tail(&n->node_entry, &dst_hashtable[hash]);
+	mutex_unlock(&dst_hash_lock);
+}
+
+static void dst_node_cleanup(struct dst_node *n)
+{
+	struct dst_state *st = n->state;
+
+	if (!st)
+		return;
+
+	if (n->queue) {
+		blk_cleanup_queue(n->queue);
+		put_disk(n->disk);
+	}
+
+	if (n->bdev) {
+		sync_blockdev(n->bdev);
+		blkdev_put(n->bdev);
+	}
+
+	dst_state_lock(st);
+	st->need_exit = 1;
+	dst_state_exit_connected(st);
+	dst_state_unlock(st);
+
+	wake_up(&st->thread_wait);
+
+	dst_state_put(st);
+	n->state = NULL;
+}
+
+void dst_node_put(struct dst_node *n)
+{
+	if (unlikely(!n))
+		return;
+
+	dprintk("%s: n: %p, refcnt: %d.\n", __func__, n, atomic_read(&n->refcnt));
+
+	if (atomic_dec_and_test(&n->refcnt)) {
+		dst_node_remove(n);
+		n->trans_scan_timeout = 0;
+		dst_node_cleanup(n);
+		thread_pool_destroy(n->pool);
+		dst_node_sysfs_exit(n);
+		dst_node_crypto_exit(n);
+		dst_node_trans_exit(n);
+
+		kfree(n);
+
+		dprintk("%s: freed n: %p.\n", __func__, n);
+	}
+}
+
+/*
+ * This function finds devices major/minor numbers for given pathname.
+ */
+static int dst_lookup_device(const char *path, dev_t *dev)
+{
+	int err;
+	struct nameidata nd;
+	struct inode *inode;
+
+	err = path_lookup(path, LOOKUP_FOLLOW, &nd);
+	if (err)
+		return err;
+
+	inode = nd.path.dentry->d_inode;
+	if (!inode) {
+		err = -ENOENT;
+		goto out;
+	}
+
+	if (!S_ISBLK(inode->i_mode)) {
+		err = -ENOTBLK;
+		goto out;
+	}
+
+	*dev = inode->i_rdev;
+
+out:
+	path_put(&nd.path);
+	return err;
+}
+
+static int dst_setup_export(struct dst_node *n, struct dst_ctl *ctl,
+		struct dst_export_ctl *le)
+{
+	int err;
+	dev_t dev;
+
+	err = dst_lookup_device(le->device, &dev);
+	if (err)
+		return err;
+
+	n->bdev = open_by_devnum(dev, FMODE_READ|FMODE_WRITE);
+	if (!n->bdev)
+		return -ENODEV;
+
+	if (n->size != 0)
+		n->size = min_t(loff_t, n->bdev->bd_inode->i_size, n->size);
+	else
+		n->size = n->bdev->bd_inode->i_size;
+
+	err = dst_node_init_listened(n, le);
+	if (err)
+		goto err_out_cleanup;
+
+	return 0;
+
+err_out_cleanup:
+	sync_blockdev(n->bdev);
+	blkdev_put(n->bdev);
+	n->bdev = NULL;
+
+	return err;
+}
+
+static inline void *dst_thread_network_init(void *data)
+{
+	dprintk("%s: data: %p.\n", __func__, data);
+	return data;
+}
+
+static inline void dst_thread_network_cleanup(void *data)
+{
+	dprintk("%s: data: %p.\n", __func__, data);
+}
+
+static struct dst_node *dst_alloc_node(struct dst_ctl *ctl,
+		int (*start)(struct dst_node *),
+		int num)
+{
+	struct dst_node *n;
+	int err;
+
+	n = kzalloc(sizeof(struct dst_node), GFP_KERNEL);
+	if (!n)
+		return NULL;
+
+	INIT_LIST_HEAD(&n->node_entry);
+
+	INIT_LIST_HEAD(&n->security_list);
+	mutex_init(&n->security_lock);
+
+	n->trans_scan_timeout = msecs_to_jiffies(ctl->trans_scan_timeout);
+	if (!n->trans_scan_timeout)
+		n->trans_scan_timeout = HZ;
+
+	n->trans_max_retries = ctl->trans_max_retries;
+	if (!n->trans_max_retries)
+		n->trans_max_retries = 10;
+
+	n->max_pages = ctl->max_pages;
+	if (!n->max_pages)
+		n->max_pages = 30;
+
+	n->start = start;
+	n->size = ctl->size;
+
+	atomic_set(&n->refcnt, 1);
+	atomic_long_set(&n->gen, 0);
+	snprintf(n->name, sizeof(n->name), "%s", ctl->name);
+
+	err = dst_node_sysfs_init(n);
+	if (err)
+		goto err_out_free;
+
+	n->pool = thread_pool_create(num, n->name, dst_thread_network_init,
+			dst_thread_network_cleanup, n);
+	if (IS_ERR(n->pool)) {
+		err = PTR_ERR(n->pool);
+		goto err_out_sysfs_exit;
+	}
+
+	dprintk("%s: n: %p, name: %s.\n", __func__, n, n->name);
+
+	return n;
+
+err_out_sysfs_exit:
+	dst_node_sysfs_exit(n);
+err_out_free:
+	kfree(n);
+	return NULL;
+}
+
+static int dst_start_remote(struct dst_node *n)
+{
+	int err;
+
+	err = dst_node_trans_init(n, sizeof(struct dst_trans));
+	if (err)
+		return err;
+
+	err = dst_node_create_disk(n);
+	if (err)
+		return err;
+
+	dst_node_set_size(n);
+	add_disk(n->disk);
+	return 0;
+}
+
+/*
+ * Control callback for userspace commands to setup
+ * different nodes and start/stop array.
+ */
+static int dst_add_remote(struct dst_node *n, struct dst_ctl *ctl, void *data, unsigned int size)
+{
+	int err;
+	struct dst_network_ctl *rctl = data;
+
+	if (n)
+		return -EEXIST;
+
+	if (size != sizeof(struct dst_network_ctl))
+		return -EINVAL;
+
+	n = dst_alloc_node(ctl, dst_start_remote, 1);
+	if (!n)
+		return -ENOMEM;
+
+	err = dst_node_init_connected(n, rctl);
+	if (err)
+		goto err_out_free;
+
+	dst_node_add(n);
+
+	return 0;
+
+err_out_free:
+	dst_node_put(n);
+	return err;
+}
+
+static int dst_add_export(struct dst_node *n, struct dst_ctl *ctl, void *data, unsigned int size)
+{
+	int err;
+	struct dst_export_ctl *le = data;
+
+	if (n)
+		return -EEXIST;
+
+	if (size != sizeof(struct dst_export_ctl))
+		return -EINVAL;
+
+	n = dst_alloc_node(ctl, dst_start_export, 2);
+	if (!n)
+		return -EINVAL;
+
+	err = dst_setup_export(n, ctl, le);
+	if (err)
+		goto err_out_free;
+
+	dst_node_add(n);
+
+	return 0;
+
+err_out_free:
+	dst_node_put(n);
+	return err;
+}
+
+static int dst_node_remove_unload(struct dst_node *n)
+{
+	printk(KERN_INFO "STOPPED name: '%s', size: %llu.\n", n->name, n->size);
+
+	if (n->disk)
+		del_gendisk(n->disk);
+
+	dst_node_put(n);
+	return 0;
+}
+
+static int dst_del_node(struct dst_node *n, struct dst_ctl *ctl, void *data, unsigned int size)
+{
+	if (!n)
+		return -ENODEV;
+
+	return dst_node_remove_unload(n);
+}
+
+static int dst_crypto_init(struct dst_node *n, struct dst_ctl *ctl, void *data, unsigned int size)
+{
+	struct dst_crypto_ctl *crypto = data;
+
+	if (!n)
+		return -ENODEV;
+
+	if (size != sizeof(struct dst_crypto_ctl) + crypto->hash_keysize + crypto->cipher_keysize)
+		return -EINVAL;
+
+	if (n->trans_cache)
+		return -EEXIST;
+
+	return dst_node_crypto_init(n, crypto);
+}
+
+static int dst_security_init(struct dst_node *n, struct dst_ctl *ctl, void *data, unsigned int size)
+{
+	struct dst_secure *s;
+
+	if (!n)
+		return -ENODEV;
+
+	if (size != sizeof(struct dst_secure_user))
+		return -EINVAL;
+
+	s = kmalloc(sizeof(struct dst_secure), GFP_KERNEL);
+	if (!s)
+		return -ENOMEM;
+
+	memcpy(&s->sec, data, size);
+
+	mutex_lock(&n->security_lock);
+	list_add_tail(&s->sec_entry, &n->security_list);
+	mutex_unlock(&n->security_lock);
+
+	return 0;
+}
+
+static int dst_start_node(struct dst_node *n, struct dst_ctl *ctl, void *data, unsigned int size)
+{
+	int err;
+
+	if (!n)
+		return -ENODEV;
+
+	if (n->trans_cache)
+		return 0;
+
+	err = n->start(n);
+	if (err)
+		return err;
+
+	printk(KERN_INFO "STARTED name: '%s', size: %llu.\n", n->name, n->size);
+	return 0;
+}
+
+
+typedef int (*dst_command_func)(struct dst_node *n, struct dst_ctl *ctl, void *data, unsigned int size);
+
+/*
+ * List of userspace commands.
+ */
+static dst_command_func dst_commands[] = {
+	[DST_ADD_REMOTE] = &dst_add_remote,
+	[DST_ADD_EXPORT] = &dst_add_export,
+	[DST_DEL_NODE] = &dst_del_node,
+	[DST_CRYPTO] = &dst_crypto_init,
+	[DST_SECURITY] = &dst_security_init,
+	[DST_START] = &dst_start_node,
+};
+
+/*
+ * Configuration parser.
+ */
+static void cn_dst_callback(void *data)
+{
+	struct dst_ctl *ctl;
+	struct cn_msg *msg = data;
+	int err;
+	struct dst_ctl_ack *ack;
+	struct dst_node *n = NULL, *tmp;
+	unsigned int hash;
+
+
+	if (msg->len < sizeof(struct dst_ctl)) {
+		err = -EBADMSG;
+		goto out;
+	}
+
+	ctl = (struct dst_ctl *)msg->data;
+
+	if (ctl->cmd >= DST_CMD_MAX) {
+		err = -EINVAL;
+		goto out;
+	}
+	hash = dst_hash(ctl->name, sizeof(ctl->name));
+
+	mutex_lock(&dst_hash_lock);
+	list_for_each_entry(tmp, &dst_hashtable[hash], node_entry) {
+		if (!memcmp(tmp->name, ctl->name, sizeof(tmp->name))) {
+			n = tmp;
+			dst_node_get(n);
+			break;
+		}
+	}
+	mutex_unlock(&dst_hash_lock);
+
+	err = dst_commands[ctl->cmd](n, ctl, msg->data + sizeof(struct dst_ctl),
+			msg->len - sizeof(struct dst_ctl));
+
+	dst_node_put(n);
+out:
+	ack = kmalloc(sizeof(struct dst_ctl_ack), GFP_KERNEL);
+	if (!ack)
+		return;
+
+	memcpy(&ack->msg, msg, sizeof(struct cn_msg));
+
+	ack->msg.ack = msg->ack + 1;
+	ack->msg.len = sizeof(struct dst_ctl_ack) - sizeof(struct cn_msg);
+
+	ack->error = err;
+
+	cn_netlink_send(&ack->msg, 0, GFP_KERNEL);
+	kfree(ack);
+}
+
+static int dst_sysfs_init(void)
+{
+	return bus_register(&dst_dev_bus_type);
+}
+
+static void dst_sysfs_exit(void)
+{
+	bus_unregister(&dst_dev_bus_type);
+}
+
+static int __init dst_hashtable_init(void)
+{
+	unsigned int i;
+
+	dst_hashtable = kzalloc(sizeof(struct list_head) * dst_hashtable_size, GFP_KERNEL);
+	if (!dst_hashtable)
+		return -ENOMEM;
+
+	for (i=0; i<dst_hashtable_size; ++i)
+		INIT_LIST_HEAD(&dst_hashtable[i]);
+
+	return 0;
+}
+
+static void dst_hashtable_exit(void)
+{
+	unsigned int i;
+	struct dst_node *n, *tmp;
+	
+	for (i=0; i<dst_hashtable_size; ++i) {
+		list_for_each_entry_safe(n, tmp, &dst_hashtable[i], node_entry) {
+			dst_node_remove_unload(n);
+		}
+	}
+
+	kfree(dst_hashtable);
+}
+
+static int __init dst_sys_init(void)
+{
+	int err = -ENOMEM;
+	struct cb_id cn_dst_id = { CN_DST_IDX, CN_DST_VAL };
+
+	err = dst_hashtable_init();
+	if (err)
+		goto err_out_exit;
+
+	err = dst_export_init();
+	if (err)
+		goto err_out_hashtable_exit;
+
+	err = register_blkdev(dst_major, DST_NAME);
+	if (err < 0)
+		goto err_out_export_exit;
+	if (err)
+		dst_major = err;
+
+	err = dst_sysfs_init();
+	if (err)
+		goto err_out_unregister;
+
+	err = cn_add_callback(&cn_dst_id, "DST", cn_dst_callback);
+	if (err)
+		goto err_out_sysfs_exit;
+
+	printk(KERN_INFO "Distributed storage, '%s' release.\n", dst_name);
+
+	return 0;
+
+err_out_sysfs_exit:
+	dst_sysfs_exit();
+err_out_unregister:
+	unregister_blkdev(dst_major, DST_NAME);
+err_out_export_exit:
+	dst_export_exit();
+err_out_hashtable_exit:
+	dst_hashtable_exit();
+err_out_exit:
+	return err;
+}
+
+static void __exit dst_sys_exit(void)
+{
+	struct cb_id cn_dst_id = { CN_DST_IDX, CN_DST_VAL };
+
+	cn_del_callback(&cn_dst_id);
+	unregister_blkdev(dst_major, DST_NAME);
+	dst_hashtable_exit();
+	dst_sysfs_exit();
+	dst_export_exit();
+}
+
+module_init(dst_sys_init);
+module_exit(dst_sys_exit);
+
+MODULE_DESCRIPTION("Distributed storage");
+MODULE_AUTHOR("Evgeniy Polyakov <johnpol@2ka.mipt.ru>");
+MODULE_LICENSE("GPL");
diff --git a/drivers/block/dst/export.c b/drivers/block/dst/export.c
new file mode 100644
index 0000000..0dd8dfb
--- /dev/null
+++ b/drivers/block/dst/export.c
@@ -0,0 +1,543 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/dst.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/poll.h>
+#include <linux/slab.h>
+#include <linux/socket.h>
+
+#include <net/sock.h>
+
+static struct bio_set *dst_bio_set;
+
+int __init dst_export_init(void)
+{
+	int err = -ENOMEM;
+
+	dst_bio_set = bioset_create(32, 32);
+	if (!dst_bio_set)
+		goto err_out_exit;
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+void dst_export_exit(void)
+{
+	bioset_free(dst_bio_set);
+}
+
+/*
+ * Checks state's permissions.
+ * Returns -EPERM if check failed.
+ */
+static inline int dst_check_permissions(struct dst_state *st, struct bio *bio)
+{
+	if ((bio_rw(bio) == WRITE) && !(st->permissions & DST_PERM_WRITE))
+		return -EPERM;
+
+	return 0;
+}
+
+static struct dst_state *dst_accept_client(struct dst_state *st)
+{
+	unsigned int revents = 0;
+	unsigned int err_mask = POLLERR | POLLHUP | POLLRDHUP;
+	unsigned int mask = err_mask | POLLIN;
+	struct dst_node *n = st->node;
+	int err = 0;
+	struct socket *sock = NULL;
+	struct dst_state *new;
+
+	while (!err && !sock) {
+		revents = dst_state_poll(st);
+
+		if (!(revents & mask)) {
+			DEFINE_WAIT(wait);
+
+			for (;;) {
+				prepare_to_wait(&st->thread_wait, &wait, TASK_INTERRUPTIBLE);
+				if (!n->trans_scan_timeout || st->need_exit)
+					break;
+
+				revents = dst_state_poll(st);
+
+				if (revents & mask)
+					break;
+
+				if (signal_pending(current))
+					break;
+
+				schedule_timeout(HZ);
+			}
+			finish_wait(&st->thread_wait, &wait);
+		}
+
+		err = -ECONNRESET;
+		dst_state_lock(st);
+
+		dprintk("%s: st: %p, revents: %x [err: %d, in: %d].\n",
+			__func__, st, revents, revents & err_mask, revents & POLLIN);
+
+		if (revents & err_mask) {
+			printk("%s: revents: %x, socket: %p, err: %d.\n",
+					__func__, revents, st->socket, err);
+			err = -ECONNRESET;
+		}
+
+		if (!n->trans_scan_timeout || st->need_exit)
+			err = -ENODEV;
+
+		if (st->socket && (revents & POLLIN))
+			err = kernel_accept(st->socket, &sock, 0);
+
+		dst_state_unlock(st);
+	}
+
+	if (err)
+		goto err_out_exit;
+
+	new = dst_state_alloc(st->node);
+	if (!new) {
+		err = -ENOMEM;
+		goto err_out_release;
+	}
+	new->socket = sock;
+
+	new->ctl.addr.sa_data_len = sizeof(struct sockaddr);
+	err = kernel_getpeername(sock, (struct sockaddr *)&new->ctl.addr,
+			(int *)&new->ctl.addr.sa_data_len);
+	if (err)
+		goto err_out_put;
+
+	err = dst_poll_init(new);
+	if (err)
+		goto err_out_put;
+
+	dst_dump_addr(sock, (struct sockaddr *)&new->ctl.addr, "Connected client");
+
+	return new;
+
+err_out_put:
+	dst_state_put(new);
+err_out_release:
+	sock_release(sock);
+err_out_exit:
+	return ERR_PTR(err);
+}
+
+static int dst_export_process_request_queue(struct dst_state *st)
+{
+	unsigned long flags;
+	struct dst_export_priv *p = NULL;
+	struct bio *bio;
+	int err = 0;
+
+	while (!list_empty(&st->request_list)) {
+		spin_lock_irqsave(&st->request_lock, flags);
+		if (!list_empty(&st->request_list)) {
+			p = list_first_entry(&st->request_list, struct dst_export_priv, request_entry);
+			list_del(&p->request_entry);
+		}
+		spin_unlock_irqrestore(&st->request_lock, flags);
+
+		if (!p)
+			break;
+
+		bio = p->bio;
+
+		if (dst_need_crypto(st->node) && (bio_data_dir(bio) == READ))
+			err = dst_export_crypto(st->node, bio);
+		else
+			err = dst_export_send_bio(bio);
+
+		if (err)
+			break;
+	}
+
+	return err;
+}
+
+static void dst_state_cleanup_export(struct dst_state *st)
+{
+	struct dst_export_priv *p;
+	unsigned long flags;
+
+	/*
+	 * This loop waits for all pending bios to be completed and freed.
+	 */
+	while (atomic_read(&st->refcnt) > 1) {
+		dprintk("%s: st: %p, refcnt: %d, list_empty: %d.\n",
+				__func__, st, atomic_read(&st->refcnt),
+				list_empty(&st->request_list));
+		wait_event_timeout(st->thread_wait,
+				(atomic_read(&st->refcnt) == 1) ||
+				!list_empty(&st->request_list),
+				HZ/2);
+
+		while (!list_empty(&st->request_list)) {
+			p = NULL;
+			spin_lock_irqsave(&st->request_lock, flags);
+			if (!list_empty(&st->request_list)) {
+				p = list_first_entry(&st->request_list,
+					struct dst_export_priv, request_entry);
+				list_del(&p->request_entry);
+			}
+			spin_unlock_irqrestore(&st->request_lock, flags);
+
+			if (p)
+				bio_put(p->bio);
+			
+			dprintk("%s: st: %p, refcnt: %d, list_empty: %d, p: %p.\n",
+				__func__, st, atomic_read(&st->refcnt),
+				list_empty(&st->request_list), p);
+		}
+	}
+	
+	dst_state_put(st);
+}
+
+static int dst_accept(void *init_data, void *schedule_data)
+{
+	struct dst_state *main = schedule_data;
+	struct dst_node *n = init_data;
+	struct dst_state *st;
+	int err;
+
+	while (n->trans_scan_timeout && !main->need_exit) {
+		dprintk("%s: main: %p, n: %p.\n", __func__, main, n);
+		st = dst_accept_client(main);
+		if (IS_ERR(st))
+			continue;
+
+		err = dst_state_schedule_receiver(st);
+		if (!err) {
+			while (n->trans_scan_timeout) {
+				err = wait_event_interruptible_timeout(st->thread_wait,
+						!list_empty(&st->request_list) ||
+						!n->trans_scan_timeout ||
+						st->need_exit,
+					HZ);
+
+				if (!n->trans_scan_timeout || st->need_exit)
+					break;
+
+				if (list_empty(&st->request_list))
+					continue;
+
+				err = dst_export_process_request_queue(st);
+				if (err)
+					break;
+			}
+
+			st->need_exit = 1;
+			wake_up(&st->thread_wait);
+		}
+
+		dst_state_cleanup_export(st);
+	}
+
+	dprintk("%s: freeing listening socket st: %p.\n", __func__, main);
+	
+	dst_state_lock(main);
+	dst_poll_exit(main);
+	dst_state_socket_release(main);
+	dst_state_unlock(main);
+	dst_state_put(main);
+	dprintk("%s: freed listening socket st: %p.\n", __func__, main);
+
+	return 0;
+}
+
+int dst_start_export(struct dst_node *n)
+{
+	return dst_node_trans_init(n, sizeof(struct dst_export_priv));
+}
+
+int dst_node_init_listened(struct dst_node *n, struct dst_export_ctl *le)
+{
+	struct dst_state *st;
+	int err = -ENOMEM;
+	struct dst_network_ctl *ctl = &le->ctl;
+
+	st = dst_state_alloc(n);
+	if (IS_ERR(st)) {
+		err = PTR_ERR(st);
+		goto err_out_exit;
+	}
+	memcpy(&st->ctl, ctl, sizeof(struct dst_network_ctl));
+
+	err = dst_state_socket_create(st);
+	if (err)
+		goto err_out_put;
+
+	err = kernel_bind(st->socket, (struct sockaddr *)&ctl->addr, ctl->addr.sa_data_len);
+	if (err)
+		goto err_out_socket_release;
+
+	err = kernel_listen(st->socket, 1024);
+	if (err)
+		goto err_out_socket_release;
+	n->state = st;
+
+	err = dst_poll_init(st);
+	if (err)
+		goto err_out_socket_release;
+
+	dst_state_get(st);
+
+	err = thread_pool_schedule(n->pool, dst_thread_setup,
+			dst_accept, st, MAX_SCHEDULE_TIMEOUT);
+	if (err)
+		goto err_out_poll_exit;
+
+	return 0;
+
+err_out_poll_exit:
+	dst_poll_exit(st);
+err_out_socket_release:
+	dst_state_socket_release(st);
+err_out_put:
+	dst_state_put(st);
+err_out_exit:
+	n->state = NULL;
+	return err;
+}
+
+static void dst_bio_destructor(struct bio *bio)
+{
+	struct bio_vec *bv;
+	struct dst_export_priv *priv = bio->bi_private;
+	int i;
+
+	bio_for_each_segment(bv, bio, i) {
+		if (!bv->bv_page)
+			break;
+
+		__free_page(bv->bv_page);
+	}
+
+	if (priv) {
+		struct dst_node *n = priv->state->node;
+
+		dst_state_put(priv->state);
+		mempool_free(priv, n->trans_pool);
+	}
+	bio_free(bio, dst_bio_set);
+}
+
+static void dst_bio_end_io(struct bio *bio, int err)
+{
+	struct dst_export_priv *p = bio->bi_private;
+	struct dst_state *st = p->state;
+	unsigned long flags;
+
+	spin_lock_irqsave(&st->request_lock, flags);
+	list_add_tail(&p->request_entry, &st->request_list);
+	spin_unlock_irqrestore(&st->request_lock, flags);
+
+	wake_up(&st->thread_wait);
+}
+
+static int dst_export_read_request(struct bio *bio, unsigned int total_size)
+{
+	unsigned int size;
+	struct page *page;
+	int err;
+
+	while (total_size) {
+		err = -ENOMEM;
+		page = alloc_page(GFP_KERNEL);
+		if (!page)
+			goto err_out_exit;
+
+		size = min_t(unsigned int, PAGE_SIZE, total_size);
+
+		err = bio_add_page(bio, page, size, 0);
+		dprintk("%s: bio: %llu/%u, size: %u, err: %d.\n",
+				__func__, bio->bi_sector, bio->bi_size, size, err);
+		if (err <= 0)
+			goto err_out_free_page;
+
+		total_size -= size;
+	}
+
+	return 0;
+
+err_out_free_page:
+	__free_page(page);
+err_out_exit:
+	return err;
+}
+
+static int dst_export_write_request(struct dst_state *st, struct bio *bio, unsigned int total_size)
+{
+	unsigned int size;
+	struct page *page;
+	void *data;
+	int err;
+
+	while (total_size) {
+		err = -ENOMEM;
+		page = alloc_page(GFP_KERNEL);
+		if (!page)
+			goto err_out_exit;
+
+		data = kmap(page);
+		if (!data)
+			goto err_out_free_page;
+
+		size = min_t(unsigned int, PAGE_SIZE, total_size);
+
+		err = dst_data_recv(st, data, size);
+		if (err)
+			goto err_out_unmap_page;
+
+		err = bio_add_page(bio, page, size, 0);
+		if (err <= 0)
+			goto err_out_unmap_page;
+
+		kunmap(page);
+
+		total_size -= size;
+	}
+
+	return 0;
+
+err_out_unmap_page:
+	kunmap(page);
+err_out_free_page:
+	__free_page(page);
+err_out_exit:
+	return err;
+}
+
+int dst_process_io(struct dst_state *st)
+{
+	struct dst_node *n = st->node;
+	struct dst_cmd *cmd = st->data;
+	struct bio *bio;
+	struct dst_export_priv *priv;
+	int err = -ENOMEM;
+
+	if (unlikely(!n->bdev)) {
+		err = -EINVAL;
+		goto err_out_exit;
+	}
+
+	bio = bio_alloc_bioset(GFP_KERNEL, PAGE_ALIGN(cmd->size) >> PAGE_SHIFT, dst_bio_set);
+	if (!bio)
+		goto err_out_exit;
+	bio->bi_private = NULL;
+
+	priv = mempool_alloc(st->node->trans_pool, GFP_KERNEL);
+	if (!priv)
+		goto err_out_free;
+
+	priv->state = dst_state_get(st);
+	priv->bio = bio;
+
+	bio->bi_private = priv;
+	bio->bi_end_io = dst_bio_end_io;
+	bio->bi_destructor = dst_bio_destructor;
+	bio->bi_bdev = n->bdev;
+	bio->bi_flags |= cmd->flags;
+	bio->bi_rw = cmd->rw;
+	bio->bi_size = 0;
+	bio->bi_sector = cmd->sector;
+
+	dst_bio_to_cmd(bio, &priv->cmd, DST_IO_RESPONSE, cmd->id);
+
+	priv->cmd.flags = 0;
+	priv->cmd.size = cmd->size;
+
+	if (bio_data_dir(bio) == WRITE) {
+		err = dst_recv_cdata(st, priv->cmd.hash);
+		if (err)
+			goto err_out_free;
+
+		err = dst_export_write_request(st, bio, cmd->size);
+		if (err)
+			goto err_out_free;
+
+		if (dst_need_crypto(n))
+			return dst_export_crypto(n, bio);
+	} else {
+		err = dst_export_read_request(bio, cmd->size);
+		if (err)
+			goto err_out_free;
+	}
+
+	dprintk("%s: bio: %llu/%u, rw: %lu, dir: %lu.\n",
+			__func__, bio->bi_sector, bio->bi_size,
+			bio->bi_rw, bio_data_dir(bio));
+
+	generic_make_request(bio);
+
+	return 0;
+
+err_out_free:
+	bio_put(bio);
+err_out_exit:
+	return err;
+}
+
+int dst_export_send_bio(struct bio *bio)
+{
+	struct dst_export_priv *p = bio->bi_private;
+	struct dst_state *st = p->state;
+	struct dst_cmd *cmd = &p->cmd;
+	int err;
+
+	dprintk("%s: id: %llu, bio: %llu/%u, csize: %u, flags: %lu, rw: %lu.\n",
+			__func__, cmd->id, bio->bi_sector, bio->bi_size,
+			cmd->csize, bio->bi_flags, bio->bi_rw);
+
+	dst_convert_cmd(cmd);
+
+	dst_state_lock(st);
+	if (!st->socket) {
+		err = -ECONNRESET;
+		goto err_out_unlock;
+	}
+
+	if (bio_data_dir(bio) == WRITE) {
+		cmd->size = cmd->csize = 0;
+		err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
+		if (err)
+			goto err_out_unlock;
+	} else {
+		err = dst_send_bio(st, cmd, bio);
+		if (err)
+			goto err_out_unlock;
+	}
+
+	dst_state_unlock(st);
+
+	bio_put(bio);
+	return 0;
+
+err_out_unlock:
+	dst_state_unlock(st);
+
+	bio_put(bio);
+	return err;
+}
diff --git a/drivers/block/dst/state.c b/drivers/block/dst/state.c
new file mode 100644
index 0000000..71faa20
--- /dev/null
+++ b/drivers/block/dst/state.c
@@ -0,0 +1,738 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/buffer_head.h>
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/connector.h>
+#include <linux/dst.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/socket.h>
+#include <linux/slab.h>
+
+#include <net/sock.h>
+
+/*
+ * Polling machinery.
+ */
+
+struct dst_poll_helper
+{
+	poll_table 		pt;
+	struct dst_state	*st;
+};
+
+static int dst_queue_wake(wait_queue_t *wait, unsigned mode, int sync, void *key)
+{
+	struct dst_state *st = container_of(wait, struct dst_state, wait);
+
+	wake_up(&st->thread_wait);
+	return 1;
+}
+
+static void dst_queue_func(struct file *file, wait_queue_head_t *whead,
+				 poll_table *pt)
+{
+	struct dst_state *st = container_of(pt, struct dst_poll_helper, pt)->st;
+
+	st->whead = whead;
+	init_waitqueue_func_entry(&st->wait, dst_queue_wake);
+	add_wait_queue(whead, &st->wait);
+}
+
+void dst_poll_exit(struct dst_state *st)
+{
+	if (st->whead) {
+		remove_wait_queue(st->whead, &st->wait);
+		st->whead = NULL;
+	}
+}
+
+int dst_poll_init(struct dst_state *st)
+{
+	struct dst_poll_helper ph;
+
+	ph.st = st;
+	init_poll_funcptr(&ph.pt, &dst_queue_func);
+
+	st->socket->ops->poll(NULL, st->socket, &ph.pt);
+	return 0;
+}
+
+/*
+ * Header receiving function - may block.
+ */
+static int dst_data_recv_header(struct socket *sock,
+		void *data, unsigned int size, int block)
+{
+	struct msghdr msg;
+	struct kvec iov;
+	int err;
+
+	iov.iov_base = data;
+	iov.iov_len = size;
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = (block)?MSG_WAITALL:MSG_DONTWAIT;
+
+	err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len,
+			msg.msg_flags);
+	if (err != size)
+		return -1;
+
+	return 0;
+}
+
+/*
+ * Header sending function - may block.
+ */
+int dst_data_send_header(struct socket *sock,
+		void *data, unsigned int size, int more)
+{
+	struct msghdr msg;
+	struct kvec iov;
+	int err;
+
+	iov.iov_base = data;
+	iov.iov_len = size;
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_WAITALL | (more)?MSG_MORE:0;
+
+	err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
+	if (err != size) {
+		dprintk("%s: size: %u, more: %d, err: %d.\n", __func__, size, more, err);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int dst_request_remote_config(struct dst_state *st)
+{
+	struct dst_node *n = st->node;
+	int err = -EINVAL;
+	struct dst_cmd *cmd = st->data;
+
+	memset(cmd, 0, sizeof(struct dst_cmd));
+	cmd->cmd = DST_CFG;
+
+	dst_convert_cmd(cmd);
+
+	err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
+	if (err)
+		goto out;
+
+	err = dst_data_recv_header(st->socket, cmd, sizeof(struct dst_cmd), 1);
+	if (err)
+		goto out;
+
+	dst_convert_cmd(cmd);
+
+	if (cmd->cmd != DST_CFG) {
+		err = -EINVAL;
+		printk("%s: checking result: cmd: %d, size reported: %llu.\n",
+			__func__, cmd->cmd, cmd->sector);
+		goto out;
+	}
+
+	if (n->size != 0)
+		n->size = min_t(loff_t, n->size, cmd->sector);
+	else
+		n->size = cmd->sector;
+
+out:
+	dprintk("%s: n: %p, err: %d.\n", __func__, n, err);
+	return err;
+}
+
+#define DST_DEFAULT_TIMEO	20000
+
+int dst_state_socket_create(struct dst_state *st)
+{
+	int err;
+	struct socket *sock;
+	struct dst_network_ctl *ctl = &st->ctl;
+
+	err = sock_create(ctl->addr.sa_family, ctl->type, ctl->proto, &sock);
+	if (err < 0)
+		return err;
+
+	sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo = msecs_to_jiffies(DST_DEFAULT_TIMEO);
+	sock->sk->sk_allocation = GFP_NOIO;
+
+	st->socket = st->read_socket = sock;
+	return 0;
+}
+
+void dst_state_socket_release(struct dst_state *st)
+{
+	dprintk("%s: st: %p, socket: %p, n: %p.\n", __func__, st, st->socket, st->node);
+	if (st->socket) {
+		sock_release(st->socket);
+		st->socket = NULL;
+		st->read_socket = NULL;
+	}
+}
+
+void dst_dump_addr(struct socket *sk, struct sockaddr *sa, char *str)
+{
+	if (sk->ops->family == AF_INET) {
+		struct sockaddr_in *sin = (struct sockaddr_in *)sa;
+		printk(KERN_INFO "%s %u.%u.%u.%u:%d.\n",
+			str, NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+	} else if (sk->ops->family == AF_INET6) {
+		struct sockaddr_in6 *sin = (struct sockaddr_in6 *)sa;
+		printk(KERN_INFO "%s %04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d",
+			str, NIP6(sin->sin6_addr), ntohs(sin->sin6_port));
+	}
+}
+
+void dst_state_exit_connected(struct dst_state *st)
+{
+	if (st->socket) {
+		dst_poll_exit(st);
+		st->socket->ops->shutdown(st->socket, 2);
+
+		dst_dump_addr(st->socket, (struct sockaddr *)&st->ctl.addr, "Disconnected peer");
+		dst_state_socket_release(st);
+	}
+}
+
+static int dst_state_init_connected(struct dst_state *st)
+{
+	int err;
+	struct dst_network_ctl *ctl = &st->ctl;
+
+	err = dst_state_socket_create(st);
+	if (err)
+		goto err_out_exit;
+
+	err = kernel_connect(st->socket, (struct sockaddr *)&st->ctl.addr,
+			st->ctl.addr.sa_data_len, 0);
+	if (err)
+		goto err_out_release;
+
+	err = dst_poll_init(st);
+	if (err)
+		goto err_out_release;
+
+	dst_dump_addr(st->socket, (struct sockaddr *)&ctl->addr, "Connected to peer");
+
+	return 0;
+
+err_out_release:
+	dst_state_socket_release(st);
+err_out_exit:
+	return err;
+}
+
+static void __inline__ dst_state_reset_nolock(struct dst_state *st)
+{
+	dst_state_exit_connected(st);
+	dst_state_init_connected(st);
+}
+
+static void __inline__ dst_state_reset(struct dst_state *st)
+{
+	dst_state_lock(st);
+	dst_state_reset_nolock(st);
+	dst_state_unlock(st);
+}
+
+/*
+ * Basic network sending/receiving functions.
+ * Blocked mode is used.
+ */
+static int dst_data_recv_raw(struct dst_state *st, void *buf, u64 size)
+{
+	struct msghdr msg;
+	struct kvec iov;
+	int err;
+
+	BUG_ON(!size);
+
+	iov.iov_base = buf;
+	iov.iov_len = size;
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_DONTWAIT;
+
+	err = kernel_recvmsg(st->socket, &msg, &iov, 1, iov.iov_len,
+			msg.msg_flags);
+	if (err <= 0) {
+		printk("%s: failed to recv data: size: %llu, err: %d.\n", __func__, size, err);
+		if (err == 0)
+			err = -ECONNRESET;
+
+		dst_state_exit_connected(st);
+	}
+
+	return err;
+}
+
+int dst_data_recv(struct dst_state *st, void *data, unsigned int size)
+{
+	unsigned int revents = 0;
+	unsigned int err_mask = POLLERR | POLLHUP | POLLRDHUP;
+	unsigned int mask = err_mask | POLLIN;
+	struct dst_node *n = st->node;
+	int err = 0;
+
+	while (size && !err) {
+		revents = dst_state_poll(st);
+
+		if (!(revents & mask)) {
+			DEFINE_WAIT(wait);
+
+			for (;;) {
+				prepare_to_wait(&st->thread_wait, &wait, TASK_INTERRUPTIBLE);
+				if (!n->trans_scan_timeout || st->need_exit)
+					break;
+
+				revents = dst_state_poll(st);
+
+				if (revents & mask)
+					break;
+
+				if (signal_pending(current))
+					break;
+
+				schedule_timeout(HZ);
+				continue;
+			}
+			finish_wait(&st->thread_wait, &wait);
+		}
+
+		err = -ECONNRESET;
+		dst_state_lock(st);
+
+		if (st->socket && (st->read_socket == st->socket) && (revents & POLLIN)) {
+			err = dst_data_recv_raw(st, data, size);
+			if (err > 0) {
+				data += err;
+				size -= err;
+				err = 0;
+			}
+		}
+
+		if (revents & err_mask || !st->socket) {
+			printk("%s: revents: %x, socket: %p, size: %u, err: %d.\n",
+					__func__, revents, st->socket, size, err);
+			err = -ECONNRESET;
+		}
+
+		dst_state_unlock(st);
+
+		if (!n->trans_scan_timeout)
+			err = -ENODEV;
+	}
+
+	return err;
+}
+
+static int dst_process_cfg(struct dst_state *st)
+{
+	struct dst_node *n = st->node;
+	struct dst_cmd *cmd = st->data;
+	int err;
+
+	cmd->sector = n->size;
+
+	dst_convert_cmd(cmd);
+
+	dst_state_lock(st);
+	err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
+	dst_state_unlock(st);
+
+	return err;
+}
+
+static int dst_recv_bio(struct dst_state *st, struct bio *bio, unsigned int total_size)
+{
+	struct bio_vec *bv;
+	int i, err;
+	void *data;
+	unsigned int sz;
+
+	bio_for_each_segment(bv, bio, i) {
+		sz = min(total_size, bv->bv_len);
+
+		dprintk("%s: bio: %llu/%u, total: %u, len: %u, sz: %u, off: %u.\n",
+			__func__, bio->bi_sector, bio->bi_size, total_size,
+			bv->bv_len, sz, bv->bv_offset);
+
+		data = kmap(bv->bv_page) + bv->bv_offset;
+		err = dst_data_recv(st, data, sz);
+		kunmap(bv->bv_page);
+
+		bv->bv_len = sz;
+
+		if (err)
+			return err;
+
+		total_size -= sz;
+		if (total_size == 0)
+			break;
+	}
+
+	return 0;
+}
+
+static int dst_process_io_response(struct dst_state *st)
+{
+	struct dst_node *n = st->node;
+	struct dst_cmd *cmd = st->data;
+	struct dst_trans *t;
+	int err = 0;
+	struct bio *bio;
+
+	mutex_lock(&n->trans_lock);
+	t = dst_trans_search(n, cmd->id);
+	mutex_unlock(&n->trans_lock);
+
+	if (!t)
+		goto err_out_exit;
+
+	bio = t->bio;
+
+	dprintk("%s: bio: %llu/%u, cmd_size: %u, csize: %u, dir: %lu.\n",
+		__func__, bio->bi_sector, bio->bi_size, cmd->size,
+		cmd->csize, bio_data_dir(bio));
+
+	if (bio_data_dir(bio) == READ) {
+		if (bio->bi_size != cmd->size - cmd->csize)
+			goto err_out_exit;
+
+		if (dst_need_crypto(n)) {
+			err = dst_recv_cdata(st, t->cmd.hash);
+			if (err)
+				goto err_out_exit;
+		}
+
+		err = dst_recv_bio(st, t->bio, bio->bi_size);
+		if (err)
+			goto err_out_exit;
+
+		if (dst_need_crypto(n))
+			return dst_trans_crypto(t);
+	} else {
+		err = -EBADMSG;
+		if (cmd->size || cmd->csize)
+			goto err_out_exit;
+	}
+
+	dst_trans_remove(t);
+	dst_trans_put(t);
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+int dst_recv_cdata(struct dst_state *st, void *cdata)
+{
+	struct dst_cmd *cmd = st->data;
+	struct dst_node *n = st->node;
+	struct dst_crypto_ctl *c = &n->crypto;
+	int err;
+
+	if (cmd->csize != c->crypto_attached_size) {
+		dprintk("%s: cmd: cmd: %u, sector: %llu, size: %u, "
+				"csize: %u != digest size %u.\n",
+				__func__, cmd->cmd, cmd->sector, cmd->size,
+				cmd->csize, c->crypto_attached_size);
+		err = -EINVAL;
+		goto err_out_exit;
+	}
+
+	err = dst_data_recv(st, cdata, cmd->csize);
+	if (err)
+		goto err_out_exit;
+
+	cmd->size -= cmd->csize;
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+static int dst_recv_processing(struct dst_state *st)
+{
+	int err = -EINTR;
+	struct dst_cmd *cmd = st->data;
+
+	/*
+	 * If socket will be reset after this statement, then
+	 * dst_data_recv() will just fail and loop will
+	 * start again, so it can be done without any locks.
+	 *
+	 * st->read_socket is needed to prevents state machine
+	 * breaking between this data reading and subsequent one
+	 * in protocol specific functions during connection reset.
+	 * In case of reset we have to read next command and do
+	 * not expect data for old command to magically appear in
+	 * new connection.
+	 */
+	st->read_socket = st->socket;
+	err = dst_data_recv(st, cmd, sizeof(struct dst_cmd));
+	if (err)
+		goto out_exit;
+
+	dst_convert_cmd(cmd);
+
+	dprintk("%s: cmd: %u, size: %u, csize: %u, id: %llu, sector: %llu, flags: %llx, rw: %llx.\n",
+			__func__, cmd->cmd, cmd->size,
+			cmd->csize, cmd->id, cmd->sector,
+			cmd->flags, cmd->rw);
+
+	/*
+	 * This should catch protocol breakage and random garbage instead of commands.
+	 */
+	if (unlikely(cmd->csize > st->size - sizeof(struct dst_cmd))) {
+		err = -EBADMSG;
+		goto out_exit;
+	}
+
+	err = -EPROTO;
+	switch (cmd->cmd) {
+		case DST_IO_RESPONSE:
+			err = dst_process_io_response(st);
+			break;
+		case DST_IO:
+			err = dst_process_io(st);
+			break;
+		case DST_CFG:
+			err = dst_process_cfg(st);
+			break;
+		default:
+			break;
+	}
+
+out_exit:
+	return err;
+}
+
+static int dst_recv(void *init_data, void *schedule_data)
+{
+	struct dst_state *st = schedule_data;
+	struct dst_node *n = init_data;
+	int err = 0;
+	
+	dprintk("%s: start st: %p, n: %p, scan: %lu, need_exit: %d.\n",
+			__func__, st, n, n->trans_scan_timeout, st->need_exit);
+
+	while (n->trans_scan_timeout && !st->need_exit) {
+		err = dst_recv_processing(st);
+		if (err < 0) {
+			if (!st->ctl.type)
+				break;
+
+			if (!n->trans_scan_timeout || st->need_exit)
+				break;
+
+			dst_state_reset(st);
+			msleep(1000);
+		}
+	}
+
+	st->need_exit = 1;
+	wake_up(&st->thread_wait);
+
+	dprintk("%s: freeing receiving socket st: %p.\n", __func__, st);
+	dst_state_exit_connected(st);
+	dst_state_put(st);
+	
+	dprintk("%s: freed receiving socket st: %p.\n", __func__, st);
+
+	return err;
+}
+
+static void dst_state_free(struct dst_state *st)
+{
+	dprintk("%s: st: %p.\n", __func__, st);
+	if (st->cleanup)
+		st->cleanup(st);
+	kfree(st->data);
+	kfree(st);
+}
+
+struct dst_state *dst_state_alloc(struct dst_node *n)
+{
+	struct dst_state *st;
+	int err = -ENOMEM;
+
+	st = kzalloc(sizeof(struct dst_state), GFP_KERNEL);
+	if (!st)
+		goto err_out_exit;
+
+	st->node = n;
+	st->need_exit = 0;
+
+	st->size = PAGE_SIZE;
+	st->data = kmalloc(st->size, GFP_KERNEL);
+	if (!st->data)
+		goto err_out_free;
+
+	spin_lock_init(&st->request_lock);
+	INIT_LIST_HEAD(&st->request_list);
+
+	mutex_init(&st->state_lock);
+	init_waitqueue_head(&st->thread_wait);
+
+	/*
+	 * One for processing thread, another one for node itself.
+	 */
+	atomic_set(&st->refcnt, 2);
+
+	dprintk("%s: st: %p, n: %p.\n", __func__, st, st->node);
+
+	return st;
+
+err_out_free:
+	kfree(st);
+err_out_exit:
+	return ERR_PTR(err);
+}
+
+int dst_state_schedule_receiver(struct dst_state *st)
+{
+	return thread_pool_schedule_private(st->node->pool, dst_thread_setup,
+			dst_recv, st, MAX_SCHEDULE_TIMEOUT, st->node);
+}
+
+int dst_node_init_connected(struct dst_node *n, struct dst_network_ctl *r)
+{
+	struct dst_state *st;
+	int err = -ENOMEM;
+
+	st = dst_state_alloc(n);
+	if (IS_ERR(st)) {
+		err = PTR_ERR(st);
+		goto err_out_exit;
+	}
+	memcpy(&st->ctl, r, sizeof(struct dst_network_ctl));
+
+	err = dst_state_init_connected(st);
+	if (err)
+		goto err_out_free_data;
+
+	err = dst_request_remote_config(st);
+	if (err)
+		goto err_out_exit_connected;
+	n->state = st;
+
+	err = dst_state_schedule_receiver(st);
+	if (err)
+		goto err_out_exit_connected;
+
+	return 0;
+
+err_out_exit_connected:
+	dst_state_exit_connected(st);
+err_out_free_data:
+	dst_state_free(st);
+err_out_exit:
+	n->state = NULL;
+	return err;
+}
+
+void dst_state_put(struct dst_state *st)
+{
+	dprintk("%s: st: %p, refcnt: %d.\n", __func__, st, atomic_read(&st->refcnt));
+	if (atomic_dec_and_test(&st->refcnt))
+		dst_state_free(st);
+}
+
+int dst_send_bio(struct dst_state *st, struct dst_cmd *cmd, struct bio *bio)
+{
+	struct bio_vec *bv;
+	struct dst_crypto_ctl *c = &st->node->crypto;
+	int err, i = 0;
+	int flags = MSG_WAITALL;
+
+	err = dst_data_send_header(st->socket, cmd,
+		sizeof(struct dst_cmd) + c->crypto_attached_size, bio->bi_vcnt);
+	if (err)
+		goto err_out_exit;
+
+	bio_for_each_segment(bv, bio, i) {
+		if (i < bio->bi_vcnt - 1)
+			flags |= MSG_MORE;
+
+		err = kernel_sendpage(st->socket, bv->bv_page, bv->bv_offset,
+				bv->bv_len, flags);
+		if (err <= 0)
+			goto err_out_exit;
+	}
+
+	return 0;
+
+err_out_exit:
+	dprintk("%s: %d/%d, flags: %x, err: %d.\n", __func__, i, bio->bi_vcnt, flags, err);
+	return err;
+}
+
+int dst_trans_send(struct dst_trans *t)
+{
+	int err;
+	struct dst_state *st = t->n->state;
+	struct bio *bio = t->bio;
+
+	dst_convert_cmd(&t->cmd);
+	
+	dst_state_lock(st);
+	if (!st->socket) {
+		err = dst_state_init_connected(st);
+		if (err)
+			goto err_out_unlock;
+	}
+
+	if (bio_data_dir(bio) == WRITE) {
+		err = dst_send_bio(st, &t->cmd, t->bio);
+	} else {
+		err = dst_data_send_header(st->socket, &t->cmd, sizeof(struct dst_cmd), 0);
+	}
+	if (err)
+		goto err_out_reset;
+
+	dst_state_unlock(st);
+	return 0;
+	
+err_out_reset:
+	dst_state_reset_nolock(st);
+err_out_unlock:
+	dst_state_unlock(st);
+
+	return err;
+}
diff --git a/drivers/block/dst/thread_pool.c b/drivers/block/dst/thread_pool.c
new file mode 100644
index 0000000..6a14cb5
--- /dev/null
+++ b/drivers/block/dst/thread_pool.c
@@ -0,0 +1,302 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/kernel.h>
+#include <linux/dst.h>
+#include <linux/kthread.h>
+#include <linux/slab.h>
+
+struct thread_pool_worker
+{
+	struct list_head	worker_entry;
+
+	struct task_struct	*thread;
+
+	struct thread_pool	*pool;
+
+	int			error;
+	int			has_data;
+	int			need_exit;
+	unsigned int		id;
+	
+	wait_queue_head_t	wait;
+
+	void			*private;
+	void			*schedule_data;
+
+	int			(* action)(void *private, void *schedule_data);
+	void			(* cleanup)(void *private);
+};
+
+static void thread_pool_exit_worker(struct thread_pool_worker *w)
+{
+	dprintk("%s: 1 w: %p, need_exit: %d.\n", __func__, w, w->need_exit);
+	kthread_stop(w->thread);
+	dprintk("%s: 2 w: %p, need_exit: %d.\n", __func__, w, w->need_exit);
+
+	w->cleanup(w->private);
+	kfree(w);
+}
+
+static void thread_pool_worker_make_ready(struct thread_pool_worker *w)
+{
+	struct thread_pool *p = w->pool;
+
+	dprintk("%s: w: %p, need_exit: %d.\n", __func__, w, w->need_exit);
+
+	mutex_lock(&p->thread_lock);
+
+	if (!w->need_exit) {
+		list_move_tail(&w->worker_entry, &p->ready_list);
+		w->has_data = 0;
+		mutex_unlock(&p->thread_lock);
+
+		wake_up(&p->wait);
+	} else {
+		p->thread_num--;
+		list_del(&w->worker_entry);
+		mutex_unlock(&p->thread_lock);
+
+		thread_pool_exit_worker(w);
+	}
+	
+	dprintk("%s: w: %p.\n", __func__, w);
+}
+
+static int thread_pool_worker_func(void *data)
+{
+	struct thread_pool_worker *w = data;
+
+	while (!kthread_should_stop()) {
+		wait_event_interruptible(w->wait, kthread_should_stop() || w->has_data);
+
+		if (kthread_should_stop())
+			break;
+
+		if (!w->has_data)
+			continue;
+
+		w->action(w->private, w->schedule_data);
+		thread_pool_worker_make_ready(w);
+	}
+
+	return 0;
+}
+
+void thread_pool_del_worker(struct thread_pool *p)
+{
+	struct thread_pool_worker *w = NULL;
+
+	while (!w) {
+		wait_event(p->wait, !list_empty(&p->ready_list) || !p->thread_num);
+
+		dprintk("%s: locking list_empty: %d, thread_num: %d.\n",
+				__func__, list_empty(&p->ready_list), p->thread_num);
+
+		mutex_lock(&p->thread_lock);
+		if (!list_empty(&p->ready_list)) {
+			w = list_first_entry(&p->ready_list,
+					struct thread_pool_worker,
+					worker_entry);
+
+			dprintk("%s: deleting w: %p, thread_num: %d, list: %p [%p.%p].\n",
+					__func__, w, p->thread_num, &p->ready_list,
+					p->ready_list.prev, p->ready_list.next);
+
+			p->thread_num--;
+			list_del(&w->worker_entry);
+		}
+		mutex_unlock(&p->thread_lock);
+	}
+
+	if (w)
+		thread_pool_exit_worker(w);
+	dprintk("%s: deleted w: %p, thread_num: %d.\n", __func__, w, p->thread_num);
+}
+
+void thread_pool_del_worker_id(struct thread_pool *p, unsigned int id)
+{
+	struct thread_pool_worker *w, *tmp;
+	int found = 0;
+
+	mutex_lock(&p->thread_lock);
+	list_for_each_entry_safe(w, tmp, &p->ready_list, worker_entry) {
+		if (w->id == id) {
+			found = 1;
+			p->thread_num--;
+			list_del(&w->worker_entry);
+			break;
+		}
+	}
+
+	if (!found) {
+		list_for_each_entry_safe(w, tmp, &p->active_list, worker_entry) {
+			if (w->id == id) {
+				w->need_exit = 1;
+				break;
+			}
+		}
+	}
+	mutex_unlock(&p->thread_lock);
+	
+	if (found)
+		thread_pool_exit_worker(w);
+}
+
+int thread_pool_add_worker(struct thread_pool *p,
+		char *name,
+		unsigned int id,
+		void *(* init)(void *private),
+		void (* cleanup)(void *private),
+		void *private)
+{
+	struct thread_pool_worker *w;
+	int err = -ENOMEM;
+
+	w = kzalloc(sizeof(struct thread_pool_worker), GFP_KERNEL);
+	if (!w)
+		goto err_out_exit;
+
+	w->pool = p;
+	init_waitqueue_head(&w->wait);
+	w->cleanup = cleanup;
+	w->id = id;
+
+	w->thread = kthread_run(thread_pool_worker_func, w, "%s", name);
+	if (IS_ERR(w->thread)) {
+		err = PTR_ERR(w->thread);
+		goto err_out_free;
+	}
+
+	w->private = init(private);
+	if (IS_ERR(w->private)) {
+		err = PTR_ERR(w->private);
+		goto err_out_stop_thread;
+	}
+
+	mutex_lock(&p->thread_lock);
+	list_add_tail(&w->worker_entry, &p->ready_list);
+	p->thread_num++;
+	mutex_unlock(&p->thread_lock);
+
+	return 0;
+
+err_out_stop_thread:
+	kthread_stop(w->thread);
+err_out_free:
+	kfree(w);
+err_out_exit:
+	return err;
+}
+
+void thread_pool_destroy(struct thread_pool *p)
+{
+	while (p->thread_num) {
+		dprintk("%s: num: %d.\n", __func__, p->thread_num);
+		thread_pool_del_worker(p);
+	}
+}
+
+struct thread_pool *thread_pool_create(int num, char *name,
+		void *(* init)(void *private),
+		void (* cleanup)(void *private),
+		void *private)
+{
+	struct thread_pool_worker *w, *tmp;
+	struct thread_pool *p;
+	int err = -ENOMEM;
+	int i;
+
+	p = kzalloc(sizeof(struct thread_pool), GFP_KERNEL);
+	if (!p)
+		goto err_out_exit;
+
+	init_waitqueue_head(&p->wait);
+	mutex_init(&p->thread_lock);
+	INIT_LIST_HEAD(&p->ready_list);
+	INIT_LIST_HEAD(&p->active_list);
+	p->thread_num = 0;
+
+	for (i=0; i<num; ++i) {
+		err = thread_pool_add_worker(p, name, i, init, cleanup, private);
+		if (err)
+			goto err_out_free_all;
+	}
+
+	return p;
+
+err_out_free_all:
+	list_for_each_entry_safe(w, tmp, &p->ready_list, worker_entry) {
+		list_del(&w->worker_entry);
+		thread_pool_exit_worker(w);
+	}
+	kfree(p);
+err_out_exit:
+	return ERR_PTR(err);
+}
+
+int thread_pool_schedule_private(struct thread_pool *p,
+		int (* setup)(void *private, void *data),
+		int (* action)(void *private, void *data),
+		void *data, long timeout, void *id)
+{
+	struct thread_pool_worker *w, *tmp, *worker = NULL;
+	int err = 0;
+
+	while (!worker && !err) {
+		timeout = wait_event_interruptible_timeout(p->wait,
+				!list_empty(&p->ready_list),
+				timeout);
+
+		if (!timeout) {
+			err = -ETIMEDOUT;
+			break;
+		}
+
+		worker = NULL;
+		mutex_lock(&p->thread_lock);
+		list_for_each_entry_safe(w, tmp, &p->ready_list, worker_entry) {
+			if (id && id != w->private)
+				continue;
+
+			worker = w;
+
+			list_move_tail(&w->worker_entry, &p->active_list);
+
+			err = setup(w->private, data);
+			if (!err) {
+				w->schedule_data = data;
+				w->action = action;
+				w->has_data = 1;
+				wake_up(&w->wait);
+			} else {
+				list_move_tail(&w->worker_entry, &p->ready_list);
+			}
+
+			break;
+		}
+		mutex_unlock(&p->thread_lock);
+	}
+
+	return err;
+}
+
+int thread_pool_schedule(struct thread_pool *p,
+		int (* setup)(void *private, void *data),
+		int (* action)(void *private, void *data),
+		void *data, long timeout)
+{
+	return thread_pool_schedule_private(p, setup, action, data, timeout, NULL);
+}
diff --git a/drivers/block/dst/trans.c b/drivers/block/dst/trans.c
new file mode 100644
index 0000000..6122a9f
--- /dev/null
+++ b/drivers/block/dst/trans.c
@@ -0,0 +1,288 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/bio.h>
+#include <linux/dst.h>
+#include <linux/slab.h>
+#include <linux/mempool.h>
+
+static int dst_mempool_num = 32;
+module_param(dst_mempool_num, int, 0644);
+
+static inline int dst_trans_cmp(dst_gen_t gen, dst_gen_t new)
+{
+	if (gen < new)
+		return 1;
+	if (gen > new)
+		return -1;
+	return 0;
+}
+
+struct dst_trans *dst_trans_search(struct dst_node *node, dst_gen_t gen)
+{
+	struct rb_root *root = &node->trans_root;
+	struct rb_node *n = root->rb_node;
+	struct dst_trans *t, *ret = NULL;
+	int cmp;
+
+	while (n) {
+		t = rb_entry(n, struct dst_trans, trans_entry);
+
+		cmp = dst_trans_cmp(t->gen, gen);
+		if (cmp < 0)
+			n = n->rb_left;
+		else if (cmp > 0)
+			n = n->rb_right;
+		else {
+			ret = t;
+			break;
+		}
+	}
+
+	dprintk("%s: %s transaction: id: %llu.\n", __func__, (ret)?"found":"not found", gen);
+
+	return ret;
+}
+
+static int dst_trans_insert(struct dst_trans *new)
+{
+	struct rb_root *root = &new->n->trans_root;
+	struct rb_node **n = &root->rb_node, *parent = NULL;
+	struct dst_trans *ret = NULL, *t;
+	int cmp;
+
+	while (*n) {
+		parent = *n;
+
+		t = rb_entry(parent, struct dst_trans, trans_entry);
+
+		cmp = dst_trans_cmp(t->gen, new->gen);
+		if (cmp < 0)
+			n = &parent->rb_left;
+		else if (cmp > 0)
+			n = &parent->rb_right;
+		else {
+			ret = t;
+			break;
+		}
+	}
+
+	new->send_time = jiffies;
+	if (ret) {
+		printk("%s: exist: old: gen: %llu, bio: %llu/%u, send_time: %lu, "
+				"new: gen: %llu, bio: %llu/%u, send_time: %lu.\n",
+			__func__, t->gen, t->bio->bi_sector, t->bio->bi_size, ret->send_time,
+			new->gen, new->bio->bi_sector, new->bio->bi_size, new->send_time);
+		return -EEXIST;
+	}
+
+	rb_link_node(&new->trans_entry, parent, n);
+	rb_insert_color(&new->trans_entry, root);
+
+	dprintk("%s: inserted: gen: %llu, bio: %llu/%u, send_time: %lu.\n",
+		__func__, new->gen, new->bio->bi_sector, new->bio->bi_size, new->send_time);
+
+	return 0;
+}
+
+int dst_trans_remove_nolock(struct dst_trans *t)
+{
+	struct dst_node *n = t->n;
+
+	rb_erase(&t->trans_entry, &n->trans_root);
+	return 0;
+}
+
+int dst_trans_remove(struct dst_trans *t)
+{
+	int ret;
+	struct dst_node *n = t->n;
+
+	mutex_lock(&n->trans_lock);
+	ret = dst_trans_remove_nolock(t);
+	mutex_unlock(&n->trans_lock);
+
+	return ret;
+}
+
+void dst_trans_put(struct dst_trans *t)
+{
+	if (atomic_dec_and_test(&t->refcnt)) {
+		struct bio *bio = t->bio;
+
+		dprintk("%s: completed t: %p, gen: %llu, bio: %p.\n", __func__, t, t->gen, bio);
+
+		bio_endio(bio, t->error);
+		bio_put(bio);
+
+		dst_node_put(t->n);
+		mempool_free(t, t->n->trans_pool);
+	}
+}
+
+int dst_process_bio(struct dst_node *n, struct bio *bio)
+{
+	struct dst_trans *t;
+	int err = -ENOMEM;
+
+	t = mempool_alloc(n->trans_pool, GFP_NOFS);
+	if (!t)
+		goto err_out_exit;
+
+	t->n = dst_node_get(n);
+	t->bio = bio;
+	t->error = 0;
+	t->retries = 0;
+	atomic_set(&t->refcnt, 1);
+	t->gen = atomic_long_inc_return(&n->gen);
+
+	t->enc = bio_data_dir(bio);
+	dst_bio_to_cmd(bio, &t->cmd, DST_IO, t->gen);
+
+	err = dst_trans_insert(t);
+	if (err)
+		goto err_out_free;
+
+	dprintk("%s: gen: %llu, bio: %llu/%u, dir/enc: %d, need_crypto: %d.\n",
+			__func__, t->gen, bio->bi_sector, bio->bi_size, t->enc,
+			dst_need_crypto(n));
+
+	if (dst_need_crypto(n) && t->enc)
+		dst_trans_crypto(t);
+	else
+		dst_trans_send(t);
+
+	return 0;
+
+err_out_free:
+	dst_node_put(n);
+	mempool_free(t, n->trans_pool);
+err_out_exit:
+	bio_endio(bio, err);
+	bio_put(bio);
+	return err;
+}
+
+static void dst_trans_scan(struct work_struct *work)
+{
+	struct dst_node *n = container_of(work, struct dst_node, trans_work.work);
+	struct rb_node *rb_node;
+	struct dst_trans *t;
+	unsigned long timeout = n->trans_scan_timeout;
+	int num = 10 * n->trans_max_retries;
+	
+	mutex_lock(&n->trans_lock);
+
+	for (rb_node = rb_first(&n->trans_root); rb_node; ) {
+		t = rb_entry(rb_node, struct dst_trans, trans_entry);
+
+		if (timeout && time_after(t->send_time + timeout, jiffies)
+				&& t->retries == 0)
+			break;
+#if 0
+		dprintk("%s: t: %p, gen: %llu, n: %s, retries: %u, max: %u.\n",
+			__func__, t, t->gen, n->name, t->retries, n->trans_max_retries);
+#endif
+		if (--num == 0)
+			break;
+
+		dst_trans_get(t);
+
+		rb_node = rb_next(rb_node);
+
+		if (timeout && (++t->retries < n->trans_max_retries)) {
+			dst_trans_send(t);
+		} else {
+			t->error = -ETIMEDOUT;
+			dst_trans_remove_nolock(t);
+			dst_trans_put(t);
+		}
+
+		dst_trans_put(t);
+	}
+
+	mutex_unlock(&n->trans_lock);
+
+	//dprintk("%s: processed: %d.\n", __func__, 10 * n->trans_max_retries - num);
+
+	/*
+	 * If no timeout specified then system is in the middle of exiting process,
+	 * so no need to reschedule scanning process again.
+	 */
+	if (timeout) {
+		if (!num)
+			timeout = HZ;
+		schedule_delayed_work(&n->trans_work, timeout);
+	}
+}
+
+void dst_node_trans_exit(struct dst_node *n)
+{
+	struct dst_trans *t;
+	struct rb_node *rb_node;
+
+	if (!n->trans_cache)
+		return;
+
+	dprintk("%s: n: %p, cancelling the work.\n", __func__, n);
+	cancel_delayed_work_sync(&n->trans_work);
+	flush_scheduled_work();
+	dprintk("%s: n: %p, work has been cancelled.\n", __func__, n);
+	
+	for (rb_node = rb_first(&n->trans_root); rb_node; ) {
+		t = rb_entry(rb_node, struct dst_trans, trans_entry);
+
+		dprintk("%s: t: %p, gen: %llu, n: %s.\n",
+			__func__, t, t->gen, n->name);
+
+		rb_node = rb_next(rb_node);
+
+		t->error = -ETIMEDOUT;
+		dst_trans_remove_nolock(t);
+		dst_trans_put(t);
+	}
+
+	mempool_destroy(n->trans_pool);
+	kmem_cache_destroy(n->trans_cache);
+}
+
+int dst_node_trans_init(struct dst_node *n, unsigned int size)
+{
+	n->trans_cache = kmem_cache_create(n->name,
+			size + n->crypto.crypto_attached_size,
+			0, 0, NULL);
+	if (!n->trans_cache)
+		goto err_out_exit;
+
+	n->trans_pool = mempool_create_slab_pool(dst_mempool_num, n->trans_cache);
+	if (!n->trans_pool)
+		goto err_out_cache_destroy;
+
+	mutex_init(&n->trans_lock);
+	n->trans_root = RB_ROOT;
+
+	INIT_DELAYED_WORK(&n->trans_work, dst_trans_scan);
+	schedule_delayed_work(&n->trans_work, n->trans_scan_timeout);
+
+	dprintk("%s: n: %p, size: %u, crypto: %u.\n",
+		__func__, n, size, n->crypto.crypto_attached_size);
+
+	return 0;
+
+err_out_cache_destroy:
+	kmem_cache_destroy(n->trans_cache);
+err_out_exit:
+	return -ENOMEM;
+}
diff --git a/include/linux/connector.h b/include/linux/connector.h
index 96a89d3..cfc5ce7 100644
--- a/include/linux/connector.h
+++ b/include/linux/connector.h
@@ -38,8 +38,10 @@
 #define CN_W1_VAL			0x1
 #define CN_IDX_V86D			0x4
 #define CN_VAL_V86D_UVESAFB		0x1
+#define CN_DST_IDX			0x5
+#define CN_DST_VAL			0x1
 
-#define CN_NETLINK_USERS		5
+#define CN_NETLINK_USERS		6
 
 /*
  * Maximum connector's message size.
diff --git a/include/linux/dst.h b/include/linux/dst.h
new file mode 100644
index 0000000..1609cd1
--- /dev/null
+++ b/include/linux/dst.h
@@ -0,0 +1,431 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef __DST_H
+#define __DST_H
+
+#include <linux/types.h>
+#include <linux/connector.h>
+
+#define DST_NAMELEN		32
+#define DST_NAME		"dst"
+
+enum {
+	DST_DEL_NODE	= 0,	/* Remove node with given id from storage */
+	DST_ADD_REMOTE,		/* Add remote node with given id to the storage */
+	DST_ADD_EXPORT,		/* Add local node with given id to the storage to be exported and used by remote peers */
+	DST_CRYPTO,		/* Crypto initialization command (hash/cipher used to protect the connection) */
+	DST_SECURITY,		/* Security attributes for given connection (permissions for example) */
+	DST_START,		/* Register given node in the block layer subsystem */
+	DST_CMD_MAX
+};
+
+struct dst_ctl
+{
+	char			name[DST_NAMELEN];
+	__u32			flags;
+	__u32			cmd;
+	__u32			max_pages;
+	__u32			trans_scan_timeout;
+	__u32			trans_max_retries;
+	__u64			size;
+};
+
+struct dst_ctl_ack
+{
+	struct cn_msg		msg;
+	int			error;
+	int			unused[3];
+};
+
+#define SADDR_MAX_DATA	128
+
+struct saddr {
+	unsigned short		sa_family;			/* address family, AF_xxx	*/
+	char			sa_data[SADDR_MAX_DATA];	/* 14 bytes of protocol address	*/
+	unsigned short		sa_data_len;			/* Number of bytes used in sa_data */
+};
+
+struct dst_network_ctl
+{
+	unsigned int		type;
+	unsigned int		proto;
+	struct saddr		addr;
+};
+
+struct dst_crypto_ctl
+{
+	char			cipher_algo[DST_NAMELEN];
+	char			hash_algo[DST_NAMELEN];
+
+	unsigned int		cipher_keysize, hash_keysize;
+	unsigned int		crypto_attached_size;
+	int			thread_num;
+};
+
+#define DST_PERM_READ		(1<<0)
+#define DST_PERM_WRITE		(1<<1)
+
+/*
+ * Right now it is simple model, where each remote address
+ * is assigned to set of permissions it is allowed to perform.
+ * In real world block device does not know anything but
+ * reading and writing, so it should be more than enough.
+ */
+struct dst_secure_user
+{
+	unsigned int		permissions;
+	struct saddr		addr;
+};
+
+struct dst_export_ctl
+{
+	char			device[DST_NAMELEN];
+	struct dst_network_ctl	ctl;
+};
+
+enum {
+	DST_CFG	= 1, 		/* Request remote configuration */
+	DST_IO,			/* IO command */
+	DST_IO_RESPONSE,	/* IO response */
+	DST_NCMD_MAX,
+};
+
+struct dst_cmd
+{
+	__u32			cmd;
+	__u32			size;
+	__u32			csize;
+	__u64			rw;
+	__u64			flags;
+	__u64			id;
+	__u64			sector;
+	__u8			hash[0];
+};
+
+static inline void dst_convert_cmd(struct dst_cmd *c)
+{
+	c->cmd = __cpu_to_be32(c->cmd);
+	c->csize = __cpu_to_be32(c->csize);
+	c->size = __cpu_to_be32(c->size);
+	c->sector = __cpu_to_be64(c->sector);
+	c->id = __cpu_to_be64(c->id);
+	c->flags = __cpu_to_be64(c->flags);
+	c->rw = __cpu_to_be64(c->rw);
+}
+
+typedef __u64 dst_gen_t;
+
+#ifdef __KERNEL__
+
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/device.h>
+#include <linux/mempool.h>
+#include <linux/net.h>
+#include <linux/poll.h>
+#include <linux/rbtree.h>
+
+//#define CONFIG_DST_DEBUG
+
+#ifdef CONFIG_DST_DEBUG
+#define dprintk(f, a...) printk(KERN_NOTICE f, ##a)
+#else
+static inline void __attribute__ ((format (printf, 1, 2))) dprintk(const char * fmt, ...) {}
+#endif
+
+struct dst_node;
+
+struct dst_trans
+{
+	struct dst_node		*n;
+
+	struct rb_node		trans_entry;
+
+	atomic_t		refcnt;
+
+	short			enc;
+	short			retries;
+	int			error;
+
+	long			send_time;
+
+	dst_gen_t		gen;
+
+	struct bio		*bio;
+
+	struct dst_cmd		cmd;
+};
+
+struct dst_crypto_engine
+{
+	struct crypto_hash	*hash;
+	struct crypto_ablkcipher	*cipher;
+
+	int			page_num;
+	struct page		**pages;
+
+	int			enc;
+	struct scatterlist	*src, *dst;
+
+	long			timeout;
+	u64			iv;
+
+	void			*private;
+
+	int			size;
+	void			*data;
+};
+
+struct dst_state
+{
+	struct mutex		state_lock;
+
+	wait_queue_t 		wait;
+	wait_queue_head_t 	*whead;
+	wait_queue_head_t 	thread_wait;
+
+	struct dst_node		*node;
+
+	struct dst_network_ctl	ctl;
+
+	u32			permissions;
+
+	void			(* cleanup)(struct dst_state *st);
+
+	struct list_head	request_list;
+	spinlock_t		request_lock;
+
+	atomic_t		refcnt;
+
+	int			need_exit;
+
+	struct socket		*socket, *read_socket;
+
+	void			*data;
+	unsigned int		size;
+	
+	struct dst_cmd		cmd;
+};
+
+struct dst_node
+{
+	struct list_head	node_entry;
+
+	char			name[DST_NAMELEN];
+
+	struct block_device 	*bdev;
+	struct dst_state	*state;
+
+	struct request_queue	*queue;
+	struct gendisk		*disk;
+
+	int			thread_num;
+	int			max_pages;
+
+	loff_t			size;
+
+	struct device		device;
+
+	struct list_head	security_list;
+	struct mutex		security_lock;
+
+	atomic_t		refcnt;
+
+	int 			(*start)(struct dst_node *);
+
+	struct dst_crypto_ctl	crypto;
+	u8			*hash_key;
+	u8			*cipher_key;
+
+	struct thread_pool	*pool;
+
+	atomic_long_t		gen;
+
+	long			trans_scan_timeout;
+	int			trans_max_retries;
+
+	struct rb_root		trans_root;
+	struct mutex		trans_lock;
+
+	struct kmem_cache	*trans_cache;
+	mempool_t		*trans_pool;
+	struct delayed_work 	trans_work;
+};
+
+struct dst_secure
+{
+	struct list_head	sec_entry;
+	struct dst_secure_user	sec;
+};
+
+int dst_process_bio(struct dst_node *n, struct bio *bio);
+
+int dst_node_init_connected(struct dst_node *n, struct dst_network_ctl *r);
+int dst_node_init_listened(struct dst_node *n, struct dst_export_ctl *le);
+
+static inline struct dst_state *dst_state_get(struct dst_state *st)
+{
+	BUG_ON(atomic_read(&st->refcnt) == 0);
+	atomic_inc(&st->refcnt);
+	return st;
+}
+
+void dst_state_put(struct dst_state *st);
+
+struct dst_state *dst_state_alloc(struct dst_node *n);
+int dst_state_socket_create(struct dst_state *st);
+void dst_state_socket_release(struct dst_state *st);
+
+void dst_state_exit_connected(struct dst_state *st);
+
+int dst_state_schedule_receiver(struct dst_state *st);
+
+void dst_dump_addr(struct socket *sk, struct sockaddr *sa, char *str);
+
+static inline void dst_state_lock(struct dst_state *st)
+{
+	mutex_lock(&st->state_lock);
+}
+
+static inline void dst_state_unlock(struct dst_state *st)
+{
+	BUG_ON(!mutex_is_locked(&st->state_lock));
+
+	mutex_unlock(&st->state_lock);
+}
+
+void dst_poll_exit(struct dst_state *st);
+int dst_poll_init(struct dst_state *st);
+
+static inline unsigned int dst_state_poll(struct dst_state *st)
+{
+	unsigned int revents = POLLHUP | POLLERR;
+
+	dst_state_lock(st);
+	if (st->socket)
+		revents = st->socket->ops->poll(NULL, st->socket, NULL);
+	dst_state_unlock(st);
+
+	return revents;
+}
+
+static inline int dst_thread_setup(void *private, void *data)
+{
+	return 0;
+}
+
+void dst_node_put(struct dst_node *n);
+
+static inline struct dst_node *dst_node_get(struct dst_node *n)
+{
+	atomic_inc(&n->refcnt);
+	return n;
+}
+
+int dst_data_recv(struct dst_state *st, void *data, unsigned int size);
+int dst_recv_cdata(struct dst_state *st, void *cdata);
+int dst_data_send_header(struct socket *sock,
+		void *data, unsigned int size, int more);
+
+int dst_send_bio(struct dst_state *st, struct dst_cmd *cmd, struct bio *bio);
+
+int dst_process_io(struct dst_state *st);
+int dst_export_crypto(struct dst_node *n, struct bio *bio);
+int dst_export_send_bio(struct bio *bio);
+int dst_start_export(struct dst_node *n);
+
+int __init dst_export_init(void);
+void dst_export_exit(void);
+
+struct dst_export_priv
+{
+	struct list_head		request_entry;
+	struct dst_state		*state;
+	struct bio			*bio;
+	struct dst_cmd			cmd;
+};
+
+static inline void dst_trans_get(struct dst_trans *t)
+{
+	atomic_inc(&t->refcnt);
+}
+
+struct dst_trans *dst_trans_search(struct dst_node *node, dst_gen_t gen);
+int dst_trans_remove(struct dst_trans *t);
+int dst_trans_remove_nolock(struct dst_trans *t);
+void dst_trans_put(struct dst_trans *t);
+
+static inline void dst_bio_to_cmd(struct bio *bio, struct dst_cmd *cmd, u32 command, u64 id)
+{
+	cmd->cmd = command;
+	cmd->flags = (bio->bi_flags << BIO_POOL_BITS) >> BIO_POOL_BITS;
+	cmd->rw = bio->bi_rw;
+	cmd->size = bio->bi_size;
+	cmd->csize = 0;
+	cmd->id = id;
+	cmd->sector = bio->bi_sector;
+};
+
+int dst_trans_send(struct dst_trans *t);
+int dst_trans_crypto(struct dst_trans *t);
+
+int dst_node_crypto_init(struct dst_node *n, struct dst_crypto_ctl *ctl);
+void dst_node_crypto_exit(struct dst_node *n);
+
+static inline int dst_need_crypto(struct dst_node *n)
+{
+	struct dst_crypto_ctl *c = &n->crypto;
+	return (c->hash_algo[0] || c->cipher_algo[0]);
+}
+
+int dst_node_trans_init(struct dst_node *n, unsigned int size);
+void dst_node_trans_exit(struct dst_node *n);
+
+struct thread_pool
+{
+	int			thread_num;
+	struct mutex		thread_lock;
+	struct list_head	ready_list, active_list;
+
+	wait_queue_head_t	wait;
+};
+
+void thread_pool_del_worker(struct thread_pool *p);
+void thread_pool_del_worker_id(struct thread_pool *p, unsigned int id);
+int thread_pool_add_worker(struct thread_pool *p,
+		char *name,
+		unsigned int id,
+		void *(* init)(void *data),
+		void (* cleanup)(void *data),
+		void *data);
+
+void thread_pool_destroy(struct thread_pool *p);
+struct thread_pool *thread_pool_create(int num, char *name,
+		void *(* init)(void *data),
+		void (* cleanup)(void *data),
+		void *data);
+
+int thread_pool_schedule(struct thread_pool *p,
+		int (* setup)(void *stored_private, void *setup_data),
+		int (* action)(void *stored_private, void *setup_data),
+		void *setup_data, long timeout);
+int thread_pool_schedule_private(struct thread_pool *p,
+		int (* setup)(void *private, void *data),
+		int (* action)(void *private, void *data),
+		void *data, long timeout, void *id);
+
+#endif /* __KERNEL__ */
+#endif /* __DST_H */

-- 
	Evgeniy Polyakov

^ permalink raw reply related	[flat|nested] 81+ messages in thread

end of thread, other threads:[~2008-08-27 16:07 UTC | newest]

Thread overview: 81+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2007-07-31 17:13 Distributed storage Evgeniy Polyakov
2007-08-02 21:08 ` Daniel Phillips
2007-08-03 10:26   ` Evgeniy Polyakov
2007-08-03 10:57     ` Evgeniy Polyakov
2007-08-03 12:27       ` Peter Zijlstra
2007-08-03 13:49         ` Evgeniy Polyakov
2007-08-03 14:53           ` Peter Zijlstra
2007-08-03 19:48             ` Daniel Phillips
2007-08-03 19:41           ` Daniel Phillips
2007-08-04  1:19     ` Daniel Phillips
2007-08-04 16:37       ` Evgeniy Polyakov
2007-08-05  8:04         ` Daniel Phillips
2007-08-05 15:08           ` Evgeniy Polyakov
2007-08-05 21:23             ` Daniel Phillips
2007-08-06  8:25               ` Evgeniy Polyakov
2007-08-07 12:05               ` Jens Axboe
2007-08-07 18:24                 ` Daniel Phillips
2007-08-07 20:55                   ` Jens Axboe
2007-08-08  9:54                     ` Block device throttling [Re: Distributed storage.] Evgeniy Polyakov
2007-08-08 10:17                       ` [1/1] " Evgeniy Polyakov
2007-08-08 13:28                         ` Evgeniy Polyakov
2007-08-12 23:16                         ` Daniel Phillips
2007-08-13  8:18                           ` Evgeniy Polyakov
2007-08-27 21:57                         ` Daniel Phillips
2007-08-13  5:22                       ` Daniel Phillips
2007-08-13  5:36                       ` Daniel Phillips
2007-08-13  6:44                         ` Daniel Phillips
2007-08-13  8:14                           ` Evgeniy Polyakov
2007-08-13 11:04                             ` Daniel Phillips
2007-08-13 12:04                               ` Evgeniy Polyakov
2007-08-13 12:18                                 ` Daniel Phillips
2007-08-13 12:24                                   ` Evgeniy Polyakov
2007-08-13  8:23                         ` Evgeniy Polyakov
2007-08-13 11:18                           ` Daniel Phillips
2007-08-13 12:18                             ` Evgeniy Polyakov
2007-08-13 13:04                               ` Daniel Phillips
2007-08-14  8:46                                 ` Evgeniy Polyakov
2007-08-14 11:13                                   ` Daniel Phillips
2007-08-14 11:30                                     ` Evgeniy Polyakov
2007-08-14 11:35                                       ` Daniel Phillips
2007-08-14 11:50                                         ` Evgeniy Polyakov
2007-08-14 12:32                                           ` Daniel Phillips
2007-08-14 12:46                                             ` Evgeniy Polyakov
2007-08-14 12:54                                               ` Daniel Phillips
2007-08-12 23:36                     ` Distributed storage Daniel Phillips
2007-08-13  7:28                       ` Jens Axboe
2007-08-13  7:45                         ` Jens Axboe
2007-08-13  9:08                           ` Daniel Phillips
2007-08-13  9:13                             ` Jens Axboe
2007-08-13  9:55                               ` Daniel Phillips
2007-08-13 10:06                                 ` Jens Axboe
2007-08-13 10:15                                   ` Daniel Phillips
2007-08-13 10:22                                     ` Jens Axboe
2007-08-13 10:32                                       ` Daniel Phillips
2007-08-13  9:18                             ` Evgeniy Polyakov
2007-08-13 10:12                               ` Daniel Phillips
2007-08-13 11:03                                 ` Evgeniy Polyakov
2007-08-13 11:45                                   ` Daniel Phillips
2007-08-13  8:59                         ` Daniel Phillips
2007-08-13  9:12                           ` Jens Axboe
2007-08-13 23:27                             ` Daniel Phillips
2007-08-03  4:09 ` Mike Snitzer
2007-08-03 10:42   ` Evgeniy Polyakov
2007-08-04  0:49   ` Daniel Phillips
2007-08-03  5:04 ` Manu Abraham
2007-08-03 10:44   ` Evgeniy Polyakov
2007-08-04  2:51   ` Dave Dillow
2007-08-04  3:44     ` Manu Abraham
2007-08-04 17:03   ` Evgeniy Polyakov
2007-08-04  0:41 ` Daniel Phillips
2007-08-04 16:44   ` Evgeniy Polyakov
2007-08-05  8:06     ` Daniel Phillips
2007-08-05 15:01       ` Evgeniy Polyakov
2007-08-05 21:35         ` Daniel Phillips
2007-08-06  8:28           ` Evgeniy Polyakov
     [not found] ` <200708281027.59528.phillips@phunq.net>
     [not found]   ` <20070828175403.GA28440@2ka.mipt.ru>
     [not found]     ` <200708281408.06618.phillips@phunq.net>
2007-08-29  8:53       ` [1/1] Block device throttling [Re: Distributed storage.] Evgeniy Polyakov
2007-08-30 23:20         ` Daniel Phillips
2007-08-31 17:33           ` Evgeniy Polyakov
2007-08-31 21:41           ` Alasdair G Kergon
2007-09-02  4:42             ` Daniel Phillips
  -- strict thread matches above, loose matches on Subject: below --
2008-08-27 16:07 Distributed STorage Evgeniy Polyakov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).