From: Evgeniy Polyakov <johnpol@2ka.mipt.ru>
To: lkml <linux-kernel@vger.kernel.org>
Cc: netdev@vger.kernel.org, linux-fsdevel@vger.kernel.org
Subject: [4/4] DST: Algorithms used in distributed storage.
Date: Wed, 26 Dec 2007 14:22:51 +0300 [thread overview]
Message-ID: <11986681714159@2ka.mipt.ru> (raw)
In-Reply-To: <11986681711013@2ka.mipt.ru>
Algorithms used in distributed storage.
Mirror and linear mapping code.
Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>
diff --git a/drivers/block/dst/alg_linear.c b/drivers/block/dst/alg_linear.c
new file mode 100644
index 0000000..2f9ed65
--- /dev/null
+++ b/drivers/block/dst/alg_linear.c
@@ -0,0 +1,105 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/dst.h>
+
+static struct dst_alg *alg_linear;
+
+/*
+ * This callback is invoked when node is removed from storage.
+ */
+static void dst_linear_del_node(struct dst_node *n)
+{
+}
+
+/*
+ * This callback is invoked when node is added to storage.
+ */
+static int dst_linear_add_node(struct dst_node *n)
+{
+ struct dst_storage *st = n->st;
+
+ dprintk("%s: disk_size: %llu, node_size: %llu.\n",
+ __func__, st->disk_size, n->size);
+
+ mutex_lock(&st->tree_lock);
+ n->start = st->disk_size;
+ st->disk_size += n->size;
+ dst_set_disk_size(st);
+ mutex_unlock(&st->tree_lock);
+
+ return 0;
+}
+
+static int dst_linear_remap(struct dst_request *req)
+{
+ int err;
+
+ if (req->node->bdev) {
+ generic_make_request(req->bio);
+ return 0;
+ }
+
+ err = kst_check_permissions(req->state, req->bio);
+ if (err)
+ return err;
+
+ return req->state->ops->push(req);
+}
+
+/*
+ * Failover callback - it is invoked each time error happens during
+ * request processing.
+ */
+static int dst_linear_error(struct kst_state *st, int err)
+{
+ if (err)
+ set_bit(DST_NODE_FROZEN, &st->node->flags);
+ else
+ clear_bit(DST_NODE_FROZEN, &st->node->flags);
+ return 0;
+}
+
+static struct dst_alg_ops alg_linear_ops = {
+ .remap = dst_linear_remap,
+ .add_node = dst_linear_add_node,
+ .del_node = dst_linear_del_node,
+ .error = dst_linear_error,
+ .owner = THIS_MODULE,
+};
+
+static int __devinit alg_linear_init(void)
+{
+ alg_linear = dst_alloc_alg("alg_linear", &alg_linear_ops);
+ if (!alg_linear)
+ return -ENOMEM;
+
+ return 0;
+}
+
+static void __devexit alg_linear_exit(void)
+{
+ dst_remove_alg(alg_linear);
+}
+
+module_init(alg_linear_init);
+module_exit(alg_linear_exit);
+
+MODULE_LICENSE("GPL");
+MODULE_AUTHOR("Evgeniy Polyakov <johnpol@2ka.mipt.ru>");
+MODULE_DESCRIPTION("Linear distributed algorithm.");
diff --git a/drivers/block/dst/alg_mirror.c b/drivers/block/dst/alg_mirror.c
new file mode 100644
index 0000000..529b8cb
--- /dev/null
+++ b/drivers/block/dst/alg_mirror.c
@@ -0,0 +1,1614 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/poll.h>
+#include <linux/dst.h>
+#include <linux/vmstat.h>
+
+struct dst_write_entry
+{
+ int error;
+ u32 size;
+ u64 start;
+};
+#define DST_LOG_ENTRIES_PER_PAGE (PAGE_SIZE/sizeof(struct dst_write_entry))
+
+#define DST_MIRROR_COOKIE 0xc47fd0d33274d7c6ULL
+
+struct dst_mirror_node_data
+{
+ u64 age;
+ u32 num, write_idx, resync_idx, unused;
+ u64 magic;
+};
+
+struct dst_mirror_log
+{
+ unsigned int nr_pages;
+ struct dst_write_entry **entries;
+};
+
+struct dst_mirror_priv
+{
+ u64 resync_start, resync_size;
+ atomic_t resync_num;
+ struct completion resync_complete;
+ struct delayed_work resync_work;
+ unsigned int resync_timeout;
+
+ u64 last_start;
+
+ spinlock_t resync_wait_lock;
+ struct list_head resync_wait_list;
+ int resync_wait_num;
+ int full_resync;
+
+ spinlock_t backlog_lock;
+ struct list_head backlog_list;
+
+ struct dst_node *node;
+
+ u64 old_age, ndp_sector;
+ struct dst_mirror_node_data data;
+
+ spinlock_t log_lock;
+ struct dst_mirror_log log;
+};
+
+struct dst_mirror_sync_container
+{
+ struct list_head sync_entry;
+ u64 start, size;
+ struct dst_node *node;
+ struct bio *bio;
+};
+
+static struct dst_alg *alg_mirror;
+static struct bio_set *dst_mirror_bio_set;
+
+static int dst_mirror_resync(struct dst_node *n, int ndp);
+static int dst_mirror_process_log_on_disk(struct dst_node *n, int op);
+
+static int dst_mirror_mark_notsync(struct dst_node *n)
+{
+ if (!test_and_set_bit(DST_NODE_NOTSYNC, &n->flags)) {
+ struct dst_mirror_priv *priv = n->priv;
+ printk(KERN_NOTICE "%s: not synced node n: %p.\n", __func__, n);
+
+ priv->data.resync_idx = priv->data.write_idx;
+ return 1;
+ }
+
+ return 0;
+}
+
+static void dst_mirror_mark_node_sync(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+
+ if (test_and_clear_bit(DST_NODE_NOTSYNC, &n->flags))
+ printk(KERN_NOTICE "%s: node: %p, %llu:%llu synchronization "
+ "has been completed.\n",
+ __func__, n, n->start, n->size);
+
+ priv->full_resync = 0;
+ complete(&priv->resync_complete);
+}
+
+static ssize_t dst_mirror_mark_dirty(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+
+ priv->data.age = 0;
+ priv->full_resync = 1;
+ dst_mirror_mark_notsync(n);
+ return count;
+}
+
+static ssize_t dst_mirror_mark_clean(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_priv *fp = priv;
+
+ if (n->shared_head)
+ fp = n->shared_head->priv;
+
+ priv->data = fp->data;
+ priv->full_resync = 0;
+ dst_mirror_process_log_on_disk(n, WRITE);
+ dst_mirror_mark_node_sync(n);
+ return count;
+}
+
+static ssize_t dst_mirror_show_state(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+
+ return sprintf(buf, "%s\n", test_bit(DST_NODE_NOTSYNC, &n->flags) ? "notsync" : "sync");
+}
+
+static ssize_t dst_mirror_show_resync_timeout(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+
+ return sprintf(buf, "%u\n", priv->resync_timeout);
+}
+
+static ssize_t dst_mirror_show_resync_size(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+
+ return sprintf(buf, "%llu\n", priv->resync_size);
+}
+
+static ssize_t dst_mirror_set_resync_size(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+ unsigned long size;
+
+ size = simple_strtoul(buf, NULL, 0);
+
+ if (size > n->st->disk_size)
+ return -E2BIG;
+
+ priv->resync_size = size;
+
+ return count;
+}
+
+static ssize_t dst_mirror_show_log_num(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+
+ return sprintf(buf, "%u\n", priv->data.num);
+}
+
+static ssize_t dst_mirror_set_resync_timeout(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct dst_node *n = container_of(dev, struct dst_node, device);
+ struct dst_mirror_priv *priv = n->priv;
+ unsigned long tm;
+
+ tm = simple_strtoul(buf, NULL, 0);
+
+ if (tm < 100 || tm > 30000)
+ return -EINVAL;
+
+ priv->resync_timeout = (unsigned int)tm;
+
+ return count;
+}
+
+static struct device_attribute dst_mirror_attrs[] = {
+ __ATTR(dirty, S_IWUSR, NULL, dst_mirror_mark_dirty),
+ __ATTR(clean, S_IWUSR, NULL, dst_mirror_mark_clean),
+ __ATTR(resync_size, S_IWUSR | S_IRUGO, dst_mirror_show_resync_size,
+ dst_mirror_set_resync_size),
+ __ATTR(resync_timeout, S_IWUSR | S_IRUGO, dst_mirror_show_resync_timeout,
+ dst_mirror_set_resync_timeout),
+ __ATTR(state, S_IRUSR, dst_mirror_show_state, NULL),
+ __ATTR(log_num, S_IRUSR, dst_mirror_show_log_num, NULL),
+};
+
+static void dst_mirror_handle_priv(struct dst_node *n)
+{
+ if (n->priv) {
+ int err, i;
+
+ for (i=0; i<ARRAY_SIZE(dst_mirror_attrs); ++i)
+ err = device_create_file(&n->device,
+ &dst_mirror_attrs[i]);
+ }
+}
+
+static void dst_mirror_destructor(struct bio *bio)
+{
+ dprintk("%s: bio: %p.\n", __func__, bio);
+ bio_free(bio, dst_mirror_bio_set);
+}
+
+struct dst_mirror_ndp
+{
+ int err;
+ struct page *page;
+ struct completion complete;
+};
+
+static void dst_mirror_ndb_complete(struct dst_mirror_ndp *cmp, int err)
+{
+ cmp->err = err;
+ dprintk("%s: completing request: cmp: %p, err: %d.\n",
+ __func__, cmp, err);
+ complete(&cmp->complete);
+}
+
+static void dst_mirror_ndp_bio_endio(struct dst_request *req, int err)
+{
+ struct dst_mirror_ndp *cmp = req->bio->bi_private;
+
+ dst_mirror_ndb_complete(cmp, err);
+}
+
+static int dst_mirror_ndp_end_io(struct bio *bio, unsigned int size, int err)
+{
+ struct dst_mirror_ndp *cmp = bio->bi_private;
+
+ if (bio->bi_size)
+ return 0;
+
+ dst_mirror_ndb_complete(cmp, err);
+ return 0;
+}
+
+/*
+ * This function reads or writes node's private data from underlying media.
+ */
+static int dst_mirror_process_node_data(struct dst_node *n,
+ struct dst_mirror_node_data *ndata, int op)
+{
+ struct bio *bio;
+ int err = -ENOMEM;
+ struct dst_mirror_ndp *cmp;
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_node_data *dst;
+
+ cmp = kzalloc(sizeof(struct dst_mirror_ndp), GFP_KERNEL);
+ if (!cmp)
+ goto err_out_exit;
+
+ cmp->page = alloc_page(GFP_NOIO);
+ if (!cmp->page)
+ goto err_out_free_cmp;
+
+ dst = kmap(cmp->page);
+
+ init_completion(&cmp->complete);
+
+ if (op == WRITE) {
+ memset(dst, 0, PAGE_SIZE);
+
+ dst->age = cpu_to_le64(ndata->age);
+ dst->num = cpu_to_le64(ndata->num);
+ dst->write_idx = cpu_to_le64(ndata->write_idx);
+ dst->resync_idx = cpu_to_le64(ndata->resync_idx);
+ dst->magic = cpu_to_le64(ndata->magic);
+ }
+
+ bio = bio_alloc_bioset(GFP_NOIO, 1, dst_mirror_bio_set);
+ if (!bio)
+ goto err_out_free_page;
+
+ bio->bi_rw = op;
+ bio->bi_private = cmp;
+ bio->bi_sector = priv->ndp_sector;
+ bio->bi_bdev = n->bdev;
+ bio->bi_destructor = dst_mirror_destructor;
+ bio->bi_end_io = dst_mirror_ndp_end_io;
+
+ err = bio_add_pc_page(n->st->queue, bio, cmp->page, 512, 0);
+ if (err <= 0)
+ goto err_out_free_bio;
+
+ if (n->bdev) {
+ generic_make_request(bio);
+ } else {
+ struct dst_request req;
+
+ memset(&req, 0, sizeof(struct dst_request));
+ dst_fill_request(&req, bio, bio->bi_sector, n,
+ &dst_mirror_ndp_bio_endio);
+
+ err = req.state->ops->push(&req);
+ if (err)
+ req.bio_endio(&req, err);
+ }
+
+ dprintk("%s: waiting for completion: bio: %p, cmp: %p.\n",
+ __func__, bio, cmp);
+
+ wait_for_completion(&cmp->complete);
+
+ err = cmp->err;
+ if (!err && (op != WRITE)) {
+ ndata->age = cpu_to_le64(dst->age);
+ ndata->num = cpu_to_le64(dst->num);
+ ndata->write_idx = cpu_to_le64(dst->write_idx);
+ ndata->resync_idx = cpu_to_le64(dst->resync_idx);
+ ndata->magic = cpu_to_le64(dst->magic);
+ }
+
+ kunmap(cmp->page);
+
+ dprintk("%s: freeing bio: %p, err: %d.\n", __func__, bio, err);
+
+err_out_free_bio:
+ bio_put(bio);
+err_out_free_page:
+ kunmap(cmp->page);
+ __free_page(cmp->page);
+err_out_free_cmp:
+ kfree(cmp);
+err_out_exit:
+ return err;
+}
+
+/*
+ * This function reads node's private data from underlying media.
+ */
+static int dst_mirror_read_node_data(struct dst_node *n,
+ struct dst_mirror_node_data *ndata)
+{
+ return dst_mirror_process_node_data(n, ndata, READ);
+}
+
+/*
+ * This function writes node's private data from underlying media.
+ */
+static int dst_mirror_write_node_data(struct dst_node *n,
+ struct dst_mirror_node_data *ndata)
+{
+ dprintk("%s: writing new age: %llx, node: %p %llu-%llu.\n",
+ __func__, ndata->age, n, n->start, n->size);
+ return dst_mirror_process_node_data(n, ndata, WRITE);
+}
+
+static int dst_mirror_process_log_on_disk(struct dst_node *n, int op)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_log *log = &priv->log;
+ int err = -ENOMEM;
+ unsigned int i;
+ struct bio *bio;
+ struct dst_mirror_ndp *cmp;
+ struct request_queue *q = n->st->queue;
+
+ cmp = kzalloc(sizeof(struct dst_mirror_ndp), GFP_KERNEL);
+ if (!cmp)
+ goto err_out_exit;
+
+ if (n->bdev) {
+ q = bdev_get_queue(n->bdev);
+ BUG_ON(!q);
+ }
+
+ for (i=0; i<log->nr_pages; ++i) {
+ err = -ENOMEM;
+ bio = bio_alloc_bioset(GFP_NOIO, 1, dst_mirror_bio_set);
+ if (!bio)
+ break;
+
+ bio->bi_rw = op;
+ bio->bi_private = cmp;
+ bio->bi_bdev = n->bdev;
+ bio->bi_destructor = dst_mirror_destructor;
+ bio->bi_end_io = dst_mirror_ndp_end_io;
+
+ bio->bi_sector = n->size + to_sector(i*PAGE_SIZE);
+
+ err = bio_add_pc_page(q, bio,
+ virt_to_page(log->entries[i]), PAGE_SIZE,
+ offset_in_page(log->entries[i]));
+ if (err <= 0) {
+ bio_put(bio);
+ break;
+ }
+
+ init_completion(&cmp->complete);
+
+ if (n->bdev) {
+ generic_make_request(bio);
+ } else {
+ struct dst_request req;
+
+ memset(&req, 0, sizeof(struct dst_request));
+ dst_fill_request(&req, bio, bio->bi_sector, n,
+ &dst_mirror_ndp_bio_endio);
+
+ err = req.state->ops->push(&req);
+ if (err)
+ req.bio_endio(&req, err);
+ }
+
+ dprintk("%s: waiting for completion: bio: %p, cmp: %p.\n",
+ __func__, bio, cmp);
+
+ wait_for_completion(&cmp->complete);
+ bio_put(bio);
+
+ if (cmp->err) {
+ err = cmp->err;
+ break;
+ }
+ }
+
+ err = dst_mirror_write_node_data(n, &priv->data);
+
+ kfree(cmp);
+err_out_exit:
+ if (err)
+ dst_mirror_mark_notsync(n);
+
+ return err;
+}
+
+static int dst_mirror_ndp_setup(struct dst_node *n, int first_node, int clean_on_sync)
+{
+ struct dst_mirror_priv *p = n->priv;
+ int sync = 1, err;
+ struct dst_mirror_priv *fp = p;
+ struct dst_node *first;
+
+ p->full_resync = 0;
+
+ if (first_node) {
+ u64 new_age = *(u64 *)&n->st;
+
+ p->old_age = p->data.age;
+ printk(KERN_NOTICE "%s: first age: %llx -> %llx. "
+ "Old will be set to new for the first node.\n",
+ __func__, p->old_age, new_age);
+ p->data.age = new_age;
+ n->shared_head = n;
+
+ err = dst_mirror_write_node_data(n, &p->data);
+ if (err)
+ return err;
+ } else {
+ mutex_lock(&n->st->tree_lock);
+ first = dst_storage_tree_search(n->st, n->start);
+ if (!first) {
+ mutex_unlock(&n->st->tree_lock);
+ dprintk("%s: there are no nodes in the storage.\n", __func__);
+ return -ENODEV;
+ }
+
+ fp = first->priv;
+
+ if (fp->old_age != p->data.age) {
+ p->full_resync = 1;
+ sync = 0;
+ } else
+ p->data.age = fp->data.age;
+
+ p->old_age = fp->old_age;
+
+ n->shared_head = first;
+ atomic_inc(&first->shared_num);
+ list_add_tail(&n->shared, &first->shared);
+ mutex_unlock(&n->st->tree_lock);
+
+ if (sync) {
+ unsigned long flags;
+ unsigned int pidx, pnum;
+
+ err = dst_mirror_process_log_on_disk(n, READ);
+ if (err)
+ goto err_out_put;
+
+ spin_lock_irqsave(&fp->log_lock, flags);
+ if (fp->data.write_idx != p->data.write_idx)
+ sync = 0;
+ spin_unlock_irqrestore(&fp->log_lock, flags);
+
+ pnum = p->data.resync_idx / DST_LOG_ENTRIES_PER_PAGE;
+ pidx = p->data.resync_idx % DST_LOG_ENTRIES_PER_PAGE;
+
+ if (p->log.entries[pnum][pidx].error)
+ sync = 0;
+ }
+ }
+
+ if (!sync) {
+ printk(KERN_NOTICE "%s: node %llu:%llu is not synced with the first one: "
+ "first_age: %llx, new_age: %llx, full: %d.\n",
+ __func__, n->start, n->start+n->size,
+ p->data.age, fp->data.age, p->full_resync);
+ dst_mirror_mark_notsync(n);
+ } else {
+ if (clean_on_sync)
+ dst_mirror_mark_node_sync(n);
+ complete(&p->resync_complete);
+
+ printk(KERN_NOTICE "%s: node %llu:%llu is in sync with the first node.\n",
+ __func__, n->start, n->start+n->size);
+ }
+
+ printk("%s: n: %p, shared_head: %p, age: old: %llx, new: %llx.\n",
+ __func__, n, n->shared_head, p->old_age, fp->data.age);
+
+ return 0;
+
+err_out_put:
+ first = n->shared_head;
+ atomic_dec(&first->shared_num);
+ mutex_lock(&n->st->tree_lock);
+ list_del(&n->shared);
+ n->shared_head = NULL;
+ mutex_unlock(&n->st->tree_lock);
+ dst_node_put(first);
+
+ return err;
+}
+
+static int dst_mirror_end_io(struct bio *bio, unsigned int size, int err)
+{
+ struct dst_request *req = bio->bi_private;
+
+ if (bio->bi_size)
+ return 0;
+
+ dprintk("%s: req: %p, bio: %p, req->bio: %p, err: %d.\n",
+ __func__, req, bio, req->bio, err);
+ req->bio_endio(req, err);
+ bio_put(bio);
+ return 0;
+}
+
+static int dst_mirror_process_request_nosync(struct dst_request *req,
+ struct dst_node *n)
+{
+ int err = 0;
+
+ /*
+ * Block layer requires to clone a bio.
+ */
+ if (n->bdev) {
+ struct bio *clone = bio_alloc_bioset(GFP_NOIO,
+ req->bio->bi_max_vecs, dst_mirror_bio_set);
+
+ __bio_clone(clone, req->bio);
+
+ clone->bi_bdev = n->bdev;
+ clone->bi_destructor = dst_mirror_destructor;
+ clone->bi_private = req;
+ clone->bi_end_io = &dst_mirror_end_io;
+
+ dprintk("%s: clone: %p, bio: %p, req: %p.\n",
+ __func__, clone, req->bio, req);
+
+ generic_make_request(clone);
+ err = 1;
+ } else {
+ struct dst_request nr;
+ /*
+ * Network state processing engine will clone request
+ * by itself if needed. We can not use the same structure
+ * here, since number of its fields will be modified.
+ */
+ memcpy(&nr, req, sizeof(struct dst_request));
+
+ nr.node = n;
+ nr.state = n->state;
+ nr.priv = req;
+
+ err = kst_check_permissions(n->state, req->bio);
+ if (!err)
+ err = n->state->ops->push(&nr);
+ }
+
+ dprintk("%s: req: %p, n: %p, bdev: %p, err: %d.\n",
+ __func__, req, n, n->bdev, err);
+
+ return err;
+}
+
+static int dst_mirror_sync_requeue(struct dst_node *n, int exiting)
+{
+ struct dst_mirror_priv *p = n->priv;
+ struct dst_mirror_sync_container *sc;
+ struct dst_request req;
+ unsigned long flags;
+ int err, num = 0;
+
+ if (!list_empty(&p->backlog_list))
+ dprintk("%s: n: %p, backlog_empty: %d, resync_num: %d.\n",
+ __func__, n, list_empty(&p->backlog_list),
+ atomic_read(&p->resync_num));
+
+ while (!list_empty(&p->backlog_list)) {
+ sc = NULL;
+ spin_lock_irqsave(&p->backlog_lock, flags);
+ if (!list_empty(&p->backlog_list)) {
+ sc = list_entry(p->backlog_list.next,
+ struct dst_mirror_sync_container,
+ sync_entry);
+ if (bio_rw(sc->bio) == WRITE)
+ list_del(&sc->sync_entry);
+ else
+ sc = NULL;
+ }
+ spin_unlock_irqrestore(&p->backlog_lock, flags);
+
+ if (!sc)
+ break;
+
+ sc->bio->bi_private = n;
+ if (sc->bio->bi_size == 0 || exiting) {
+ err = -EIO;
+ goto out;
+ }
+
+ memset(&req, 0, sizeof(struct dst_request));
+ dst_fill_request(&req, sc->bio, sc->start - n->start,
+ n, &kst_bio_endio);
+
+ err = dst_mirror_process_request_nosync(&req, n);
+out:
+ if (err < 0)
+ bio_endio(sc->bio, sc->bio->bi_size, err);
+ kfree(sc);
+ num++;
+ }
+
+ return num;
+}
+
+static void dst_mirror_resync_work(struct work_struct *work)
+{
+ struct dst_mirror_priv *priv = container_of(work,
+ struct dst_mirror_priv, resync_work.work);
+ struct dst_node *n = priv->node;
+
+ dst_mirror_resync(n, 0);
+ dst_mirror_sync_requeue(n, 0);
+ dst_mirror_process_log_on_disk(n, WRITE);
+ schedule_delayed_work(&priv->resync_work, priv->resync_timeout);
+}
+
+/*
+ * Mirroring log is used to store write request information.
+ * It is allocated on disk and in memory (sync happens each time
+ * resync work queue fires), and eats about 1% of free RAM or disk
+ * (what is less). Each write updates log, so when node goes offline,
+ * its log will be updated with error values, so that this entries
+ * could be resynced when node will be back online. When number of
+ * failed writes becomes equal to number of entries in the write log,
+ * recovery becomes impossible (since old log entries were overwritten)
+ * and full resync is scheduled.
+ *
+ * This does not work well with the situation, when there are multiple
+ * writes to the same locations - they are considered as different
+ * writes and thus will be resynced multiple times.
+ * The right solution is to check log for each write, better if log
+ * would be not array, but tree.
+ */
+static int dst_mirror_log_init(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_log *log = &priv->log;
+ struct dst_mirror_node_data *pd = &priv->data;
+ struct dst_node *first;
+ unsigned int i;
+ int err;
+
+ err = dst_mirror_read_node_data(n, pd);
+ if (err)
+ return err;
+
+ mutex_lock(&n->st->tree_lock);
+ first = dst_storage_tree_search(n->st, n->start);
+ mutex_unlock(&n->st->tree_lock);
+
+ if (first) {
+ struct dst_mirror_priv *fp = first->priv;
+
+ pd->num = fp->data.num;
+ log->nr_pages = fp->log.nr_pages;
+ dst_node_put(first);
+ } else if (pd->magic == DST_MIRROR_COOKIE) {
+ log->nr_pages = (pd->num*sizeof(struct dst_write_entry))>>PAGE_SHIFT;
+ } else {
+ unsigned long allowed_ram = DIV_ROUND_UP(global_page_state(NR_FREE_PAGES), 256);
+ unsigned long allowed_disk = DIV_ROUND_UP(to_bytes(n->size), 256);
+
+ allowed_ram <<= PAGE_SHIFT;
+
+ pd->num = min(allowed_disk, allowed_ram)/sizeof(struct dst_write_entry);
+ log->nr_pages = min(allowed_disk, allowed_ram) >> PAGE_SHIFT;
+ pd->write_idx = pd->resync_idx = 0;
+ }
+ pd->magic = DST_MIRROR_COOKIE;
+
+ log->entries = kzalloc(log->nr_pages * sizeof(void *), GFP_KERNEL);
+ if (!log->entries)
+ return -ENOMEM;
+
+ for (i=0; i<log->nr_pages; ++i) {
+ log->entries[i] = kzalloc(PAGE_SIZE, GFP_KERNEL);
+ if (!log->entries[i])
+ goto err_out_free;
+ }
+
+ printk(KERN_INFO "%s: mirror write log contains %u entries (%u pages).\n",
+ __func__, pd->num, log->nr_pages);
+
+ return 0;
+
+err_out_free:
+ while (i-- != 0)
+ kfree(log->entries[i]);
+ kfree(log->entries);
+
+ return -ENOMEM;
+}
+
+static void dst_mirror_log_exit(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ unsigned int i;
+
+ for (i=0; i<priv->log.nr_pages; ++i)
+ kfree(priv->log.entries[i]);
+ kfree(priv->log.entries);
+}
+
+/*
+ * This callback is invoked when node is added to storage.
+ */
+static int dst_mirror_add_node(struct dst_node *n)
+{
+ struct dst_storage *st = n->st;
+ struct dst_mirror_priv *priv;
+ int err = -ENOMEM, first_node = 0;
+ u64 disk_size;
+
+ n->size--; /* A sector size actually. */
+
+ priv = kzalloc(sizeof(struct dst_mirror_priv), GFP_KERNEL);
+ if (!priv)
+ return -ENOMEM;
+
+ priv->ndp_sector = n->size;
+ priv->node = n;
+ priv->resync_start = 0;
+ priv->resync_size = to_sector(1024*1024*100ULL);
+ init_completion(&priv->resync_complete);
+ atomic_set(&priv->resync_num, 0);
+ INIT_DELAYED_WORK(&priv->resync_work, dst_mirror_resync_work);
+ priv->resync_timeout = 1000;
+
+ spin_lock_init(&priv->resync_wait_lock);
+ INIT_LIST_HEAD(&priv->resync_wait_list);
+ priv->resync_wait_num = 0;
+
+ spin_lock_init(&priv->backlog_lock);
+ INIT_LIST_HEAD(&priv->backlog_list);
+
+ n->priv_callback = &dst_mirror_handle_priv;
+ n->priv = priv;
+
+ spin_lock_init(&priv->log_lock);
+
+ err = dst_mirror_log_init(n);
+ if (err)
+ goto err_out_free;
+
+ n->size -= to_sector(priv->log.nr_pages << PAGE_SHIFT);
+
+ mutex_lock(&st->tree_lock);
+ disk_size = st->disk_size;
+ if (st->disk_size) {
+ if (st->disk_size != n->size)
+ err = -EINVAL;
+ st->disk_size = min(n->size, st->disk_size);
+ } else {
+ st->disk_size = n->size;
+ first_node = 1;
+ }
+ dst_set_disk_size(st);
+ mutex_unlock(&st->tree_lock);
+
+ if (err)
+ goto err_out_free_log;
+
+ err = dst_mirror_ndp_setup(n, first_node, 1);
+ if (err)
+ goto err_out_free_log;
+
+ schedule_delayed_work(&priv->resync_work, priv->resync_timeout);
+
+ dprintk("%s: n: %p, %llu:%llu, disk_size: %llu.\n",
+ __func__, n, n->start, n->size, st->disk_size);
+
+ return 0;
+
+err_out_free_log:
+ mutex_lock(&st->tree_lock);
+ st->disk_size = disk_size;
+ mutex_unlock(&st->tree_lock);
+ dst_mirror_log_exit(n);
+err_out_free:
+ kfree(priv);
+ n->priv = NULL;
+ return err;
+}
+
+static void dst_mirror_sync_destructor(struct bio *bio)
+{
+ struct bio_vec *bv;
+ int i;
+
+ bio_for_each_segment(bv, bio, i)
+ __free_page(bv->bv_page);
+ bio_free(bio, dst_mirror_bio_set);
+}
+
+static void dst_mirror_check_resync_complete(struct dst_node *n, int num_completed, int err)
+{
+ struct dst_mirror_priv *priv = n->priv;
+
+ if (atomic_sub_return(num_completed, &priv->resync_num) == 0) {
+ dprintk("%s: completing resync request, start: %llu, size: %llu.\n",
+ __func__, priv->resync_start, priv->resync_size);
+ complete(&priv->resync_complete);
+ if (!priv->full_resync && !err)
+ schedule_delayed_work(&priv->resync_work, 0);
+ }
+}
+
+static int dst_mirror_sync_check(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_node_data *pd = &priv->data;
+ unsigned int pidx, pnum, i, j;
+ struct dst_write_entry *e;
+
+ dprintk("%s: n: %p, resync_idx: %u.\n", __func__, n, pd->resync_idx);
+
+ pnum = pd->resync_idx / DST_LOG_ENTRIES_PER_PAGE;
+ pidx = pd->resync_idx % DST_LOG_ENTRIES_PER_PAGE;
+
+ for (i=pnum; i<priv->log.nr_pages; ++i) {
+ for (j=pidx; j<DST_LOG_ENTRIES_PER_PAGE; ++j) {
+ e = &priv->log.entries[i][j];
+
+ if (e->error) {
+ pd->resync_idx = i*DST_LOG_ENTRIES_PER_PAGE + j;
+ return 1;
+ }
+ }
+
+ pidx = 0;
+ }
+
+ dst_mirror_mark_node_sync(n);
+ return 0;
+}
+
+static int dst_mirror_sync_endio(struct bio *bio, unsigned int size, int err)
+{
+ dprintk("%s: bio: %p, err: %d, size: %u, op: %s.\n",
+ __func__, bio, err, bio->bi_size,
+ (bio_rw(bio) != WRITE)?"read":"write");
+
+ if (bio->bi_size)
+ return 1;
+
+ if (bio_rw(bio) != WRITE) {
+ struct dst_mirror_sync_container *sc = bio->bi_private;
+ struct dst_node *n = sc->node;
+ struct dst_mirror_priv *priv = n->priv;
+
+ if (err)
+ dst_mirror_mark_notsync(sc->node);
+
+ if (!err) {
+ bio->bi_size = sc->size;
+ bio->bi_sector = sc->start;
+ }
+ bio->bi_rw = WRITE;
+ if (!priv->full_resync && !err)
+ schedule_delayed_work(&priv->resync_work, 0);
+ } else {
+ struct dst_node *n = bio->bi_private;
+ struct dst_mirror_priv *priv = n->priv;
+
+ if (err)
+ dst_mirror_mark_notsync(n);
+ else if (!priv->full_resync) {
+ struct dst_mirror_node_data *pd = &priv->data;
+ unsigned long flags;
+
+ spin_lock_irqsave(&priv->log_lock, flags);
+ pd->resync_idx = (pd->resync_idx + 1) % pd->num;
+ dst_mirror_sync_check(n);
+ spin_unlock_irqrestore(&priv->log_lock, flags);
+ }
+ bio_put(bio);
+ dst_mirror_check_resync_complete(n, 1, err);
+ }
+
+ return 0;
+}
+
+static int dst_mirror_sync_block(struct dst_node *n,
+ u64 start, u32 size)
+{
+ struct bio *bio;
+ unsigned int nr_pages = DIV_ROUND_UP(size, PAGE_SIZE), i, nr;
+ struct page *page;
+ int err = -ENOMEM;
+ unsigned long flags;
+ struct dst_mirror_sync_container *sc;
+ struct dst_mirror_priv *priv = n->priv;
+
+ dprintk("%s: [all in sectors] start: %llu, size: %u, nr_pages: %u, disk_size: %llu.\n",
+ __func__, (u64)to_sector(start), (unsigned int)to_sector(size),
+ nr_pages, n->st->disk_size);
+
+ atomic_set(&priv->resync_num, nr_pages);
+
+ while (nr_pages) {
+ nr = min_t(unsigned int, nr_pages, BIO_MAX_PAGES);
+
+ sc = kmalloc(sizeof(struct dst_mirror_sync_container), GFP_KERNEL);
+ if (!sc)
+ return -ENOMEM;
+
+ bio = bio_alloc_bioset(GFP_NOIO, nr, dst_mirror_bio_set);
+ if (!bio)
+ goto err_out_free_sc;
+
+ bio->bi_rw = READ;
+ bio->bi_private = sc;
+ bio->bi_sector = to_sector(start);
+ bio->bi_bdev = NULL;
+ bio->bi_destructor = dst_mirror_sync_destructor;
+ bio->bi_end_io = dst_mirror_sync_endio;
+
+ for (i = 0; i < nr; ++i) {
+ page = alloc_page(GFP_NOIO);
+ if (!page)
+ break;
+
+ err = bio_add_pc_page(n->st->queue, bio, page,
+ min_t(u32, PAGE_SIZE, size), 0);
+ if (err <= 0)
+ break;
+ size -= err;
+ err = 0;
+ }
+
+ if (!bio->bi_vcnt) {
+ err = -ENOMEM;
+ goto err_out_put_bio;
+ }
+
+ sc->node = n;
+ sc->bio = bio;
+ sc->start = bio->bi_sector;
+ sc->size = bio->bi_size;
+
+ dst_mirror_check_resync_complete(n, i-1, 0);
+
+ spin_lock_irqsave(&priv->backlog_lock, flags);
+ list_add_tail(&sc->sync_entry, &priv->backlog_list);
+ spin_unlock_irqrestore(&priv->backlog_lock, flags);
+
+ nr_pages -= bio->bi_vcnt;
+ dprintk("%s: start: %llu, size: %u/%u, bio: %p, rest_pages: %u, rest_bytes: %u.\n",
+ __func__, start, bio->bi_size, nr, bio, nr_pages, size);
+
+ start += bio->bi_size;
+
+ err = n->st->queue->make_request_fn(n->st->queue, bio);
+ if (err)
+ goto err_out_del;
+ }
+
+ return 0;
+
+err_out_del:
+ spin_lock_irqsave(&priv->backlog_lock, flags);
+ list_del(&sc->sync_entry);
+ spin_unlock_irqrestore(&priv->backlog_lock, flags);
+err_out_put_bio:
+ bio_put(bio);
+err_out_free_sc:
+ kfree(sc);
+ return err;
+}
+
+static void dst_mirror_read_endio(struct dst_request *req, int err)
+{
+ if (err)
+ dst_mirror_mark_notsync(req->node);
+
+ if (err && req->state)
+ kst_wake(req->state);
+
+ if (!err || req->callback)
+ kst_bio_endio(req, err);
+}
+
+static void dst_mirror_update_write_log(struct dst_request *req, int err)
+{
+ struct dst_mirror_priv *priv = req->node->priv;
+ struct dst_mirror_log *log = &priv->log;
+ struct dst_mirror_node_data *pd = &priv->data;
+ unsigned long flags;
+ struct dst_write_entry *e;
+ unsigned int pnum, idx;
+ u32 size = req->orig_size;
+
+ spin_lock_irqsave(&priv->log_lock, flags);
+
+ pnum = pd->write_idx / DST_LOG_ENTRIES_PER_PAGE;
+ idx = pd->write_idx % DST_LOG_ENTRIES_PER_PAGE;
+
+ e = &log->entries[pnum][idx];
+ e->error = cpu_to_le32(err);
+ e->size = cpu_to_le32(size);
+ e->start = cpu_to_le64(req->start - to_sector(req->orig_size));
+
+ if (++pd->write_idx == pd->num)
+ pd->write_idx = 0;
+
+ if (test_bit(DST_NODE_NOTSYNC, &req->node->flags) &&
+ pd->write_idx == pd->resync_idx)
+ priv->full_resync = 1;
+
+ spin_unlock_irqrestore(&priv->log_lock, flags);
+}
+
+static void dst_mirror_write_endio(struct dst_request *req, int err)
+{
+ if (err) {
+ dst_mirror_mark_notsync(req->node);
+ if (req->state)
+ kst_wake(req->state);
+ }
+ dst_mirror_update_write_log(req, err);
+
+ req = req->priv;
+
+ dprintk("%s: req: %p, priv: %p err: %d, bio: %p, "
+ "cnt: %d, orig_size: %llu.\n",
+ __func__, req, req->priv, err, req->bio,
+ atomic_read(&req->refcnt), req->orig_size);
+
+ if (atomic_dec_and_test(&req->refcnt)) {
+ bio_endio(req->bio, req->orig_size, 0);
+ dst_free_request(req);
+ }
+}
+
+static int dst_mirror_process_request(struct dst_request *req,
+ struct dst_node *n)
+{
+ int err;
+
+ dst_mirror_sync_requeue(n, 0);
+ err = dst_mirror_process_request_nosync(req, n);
+ if (err > 0)
+ err = 0;
+ if (err) {
+ req->node = n;
+ req->bio_endio(req, err);
+ }
+
+ return err;
+}
+
+static int dst_mirror_write(struct dst_request *oreq)
+{
+ struct dst_node *n, *node = oreq->node;
+ struct dst_request *req = oreq;
+ int num, err = 0, err_num = 0, orig_num;
+ struct dst_mirror_priv *priv = node->priv;
+ unsigned long flags;
+
+ /*
+ * This check is for requests which fell into resync window.
+ * Such requests are written when resync window moves forward.
+ */
+ if (oreq->bio_endio != &dst_mirror_write_endio) {
+ req = dst_clone_request(oreq, oreq->node->w->req_pool);
+ if (!req) {
+ err = -ENOMEM;
+ goto err_out_exit;
+ }
+
+ req->priv = req;
+ req->bio_endio = &dst_mirror_write_endio;
+ }
+
+ if (test_bit(DST_NODE_NOTSYNC, &node->flags) &&
+ oreq->start >= priv->resync_start &&
+ to_sector(oreq->orig_size) <= priv->resync_size &&
+ priv->full_resync) {
+ dprintk("%s: queueing request: start: %llu, size: %llu, resync window start: %llu, size: %llu.\n",
+ __func__, oreq->start, (u64)to_sector(oreq->orig_size),
+ priv->resync_start, priv->resync_size);
+ spin_lock_irqsave(&priv->resync_wait_lock, flags);
+ list_add_tail(&req->request_list_entry, &priv->resync_wait_list);
+ priv->resync_wait_num++;
+ spin_unlock_irqrestore(&priv->resync_wait_lock, flags);
+ return 0;
+ }
+
+ /*
+ * This logic is pretty simple - req->bio_endio will not
+ * call bio_endio() until all mirror devices completed
+ * processing of the request (no matter with or without error).
+ * Mirror's req->bio_endio callback will take care of that.
+ */
+ orig_num = num = atomic_read(&req->node->shared_num) + 1;
+ atomic_set(&req->refcnt, num);
+
+ dprintk("\n%s: req: %p, mirror to %d nodes.\n",
+ __func__, req, num);
+
+ err = dst_mirror_process_request(req, node);
+ if (err)
+ err_num++;
+
+ if (--num) {
+ list_for_each_entry(n, &node->shared, shared) {
+ dprintk("\n%s: req: %p, start: %llu, size: %llu, "
+ "num: %d, n: %p, state: %p.\n",
+ __func__, req, req->start,
+ req->size, num, n, n->state);
+
+ err = dst_mirror_process_request(req, n);
+ if (err)
+ err_num++;
+
+ if (--num <= 0)
+ break;
+ }
+ }
+
+ if (err_num == orig_num)
+ dprintk("%s: req: %p, num: %d, err: %d.\n",
+ __func__, req, num, err);
+
+ err = 0;
+
+err_out_exit:
+ return err;
+}
+
+static int dst_mirror_read(struct dst_request *req)
+{
+ struct dst_node *node = req->node, *n, *min_dist_node;
+ struct dst_mirror_priv *priv = node->priv;
+ u64 dist, d;
+ int err;
+
+ req->bio_endio = &dst_mirror_read_endio;
+
+ do {
+ err = -ENODEV;
+ min_dist_node = NULL;
+ dist = -1ULL;
+
+ /*
+ * Reading is never performed from the node under resync.
+ */
+
+ if (!test_bit(DST_NODE_NOTSYNC, &node->flags)) {
+ priv = node->priv;
+ if (req->start > priv->last_start)
+ dist = req->start - priv->last_start;
+ else
+ dist = priv->last_start - req->start;
+ min_dist_node = req->node;
+ }
+
+ list_for_each_entry(n, &node->shared, shared) {
+ if (test_bit(DST_NODE_NOTSYNC, &n->flags))
+ continue;
+
+ priv = n->priv;
+
+ if (req->start > priv->last_start)
+ d = req->start - priv->last_start;
+ else
+ d = priv->last_start - req->start;
+
+ if (d < dist)
+ min_dist_node = n;
+ }
+
+ if (!min_dist_node)
+ break;
+
+ priv = min_dist_node->priv;
+ priv->last_start = req->start;
+
+ req->node = min_dist_node;
+ req->state = req->node->state;
+
+ if (req->node->bdev) {
+ req->bio->bi_bdev = req->node->bdev;
+ generic_make_request(req->bio);
+ err = 0;
+ break;
+ }
+
+ err = req->state->ops->push(req);
+ if (err) {
+ dprintk("%s: req: %p, bio: %p, node: %p, err: %d.\n",
+ __func__, req, req->bio, min_dist_node, err);
+ dst_mirror_mark_notsync(req->node);
+ }
+ } while (err && min_dist_node);
+
+ if (err || !min_dist_node) {
+ dprintk("%s: req: %p, bio: %p, node: %p, err: %d.\n",
+ __func__, req, req->bio, min_dist_node, err);
+ if (!err)
+ err = -ENODEV;
+ }
+ dprintk("%s: req: %p, err: %d.\n", __func__, req, err);
+ return err;
+}
+
+/*
+ * This callback is invoked from block layer request processing function,
+ * its task is to remap block request to different nodes.
+ */
+static int dst_mirror_remap(struct dst_request *req)
+{
+ int (*remap[])(struct dst_request *) =
+ {&dst_mirror_read, &dst_mirror_write};
+
+ return remap[bio_rw(req->bio) == WRITE](req);
+}
+
+static void dst_mirror_write_queued(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ unsigned long flags;
+ struct dst_request *req;
+ int num = priv->resync_wait_num, err;
+
+ while (!list_empty(&priv->resync_wait_list) && num != 0) {
+ req = NULL;
+ spin_lock_irqsave(&priv->resync_wait_lock, flags);
+ if (!list_empty(&priv->resync_wait_list)) {
+ req = list_entry(priv->resync_wait_list.next,
+ struct dst_request,
+ request_list_entry);
+ list_del_init(&req->request_list_entry);
+ num--;
+ }
+ spin_unlock_irqrestore(&priv->resync_wait_lock, flags);
+
+ if (!req)
+ break;
+
+ dprintk("%s: queued request n: %p, req: %p, start: %llu, size: %llu, num: %d.\n",
+ __func__, n, req, req->start, (u64)to_sector(req->size), num);
+ err = dst_mirror_process_request(req, n);
+ if (err)
+ break;
+ }
+}
+
+static int dst_mirror_resync_partial(struct dst_node *node)
+{
+ struct dst_storage *st = node->st;
+ struct dst_node *first = node->shared_head, *n, *sync;
+ struct dst_mirror_priv *p = node->priv, *sp;
+ struct dst_mirror_node_data *pd = &p->data;
+ struct dst_mirror_node_data *spd;
+ struct dst_write_entry *e;
+ unsigned long flags;
+ unsigned int pnum, idx;
+ u64 start;
+ u32 size;
+
+ if (!first)
+ first = node;
+
+ sync = NULL;
+ mutex_lock(&st->tree_lock);
+ dprintk("%s: ", __func__);
+ if (!test_bit(DST_NODE_NOTSYNC, &first->flags)) {
+ sync = first;
+ dst_node_get(sync);
+ } else {
+ list_for_each_entry(n, &first->shared, shared) {
+ dprintk("n: %p, sync: %d; ", n, !test_bit(DST_NODE_NOTSYNC, &n->flags));
+ if (!test_bit(DST_NODE_NOTSYNC, &n->flags)) {
+ sync = n;
+ dst_node_get(sync);
+ break;
+ }
+ }
+ }
+ mutex_unlock(&st->tree_lock);
+ dprintk("node: %p, first: %p, sync: %p.\n", node, first, sync);
+
+ if (!sync)
+ return -ENODEV;
+
+ sp = sync->priv;
+ spd = &sp->data;
+
+ spin_lock_irqsave(&sp->log_lock, flags);
+ spin_lock(&p->log_lock);
+
+ pnum = pd->resync_idx / DST_LOG_ENTRIES_PER_PAGE;
+ idx = pd->resync_idx % DST_LOG_ENTRIES_PER_PAGE;
+
+ e = &sp->log.entries[pnum][idx];
+ start = le64_to_cpu(e->start);
+ size = le32_to_cpu(e->size);
+
+ dst_mirror_sync_check(node);
+
+ spin_unlock(&p->log_lock);
+ spin_unlock_irqrestore(&sp->log_lock, flags);
+
+ printk("%s: node write_idx: %u, resync_idx: %u, num: %u, sync write_idx: %u, num: %u.\n",
+ __func__, pd->write_idx, pd->resync_idx, pd->num, spd->write_idx, spd->num);
+ printk("%s: sync request: start: %llu, size: %llu.\n",
+ __func__, start, (u64)to_sector(size));
+
+ dst_node_put(sync);
+
+ return dst_mirror_sync_block(node, to_bytes(start), size);
+}
+
+/*
+ * Resync logic - sliding window algorithm.
+ *
+ * At startup system checks age (unique cookie) of the node and if it
+ * does not match first node it resyncs all data from the first node in
+ * the mirror to others (non-sync nodes), each non-synced node has a
+ * window, which slides from the start of the node to the end.
+ * During resync all requests, which enter the window are queued, thus
+ * window has to be sufficiently small. When window is synced from the
+ * other nodes, queued requests are written and window moves forward,
+ * thus subsequent resync is started when previous window is fully completed.
+ * When window reaches end of the node, it is marked as synchronized.
+ *
+ * If age of the node matches the first one, but log contains different
+ * number of write log entries compared to the first node (first node always
+ * stands as a clean), then partial resync is scheduled.
+ * Partial resync will also be scheduled when log entry pointed by resync
+ * index of the node contains error.
+ *
+ * Mechanism of this resync type is following: system selects a sync node
+ * (checking each node's flags) and fetches a log entry pointed by resync
+ * index of the given node and resync data from other nodes to given one.
+ * Then it checks the rest of the write log and checks if there are
+ * another failed writes, so that next resync block would be fetched for
+ * them.
+ */
+static int dst_mirror_resync(struct dst_node *n, int ndp)
+{
+ struct dst_mirror_priv *priv = n->priv;
+ struct dst_mirror_priv *fp = priv;
+ u64 total, allowed, size;
+ int err;
+
+ if (n->shared_head)
+ fp = n->shared_head->priv;
+
+ if (!test_bit(DST_NODE_NOTSYNC, &n->flags))
+ return 0;
+ if (atomic_read(&priv->resync_num) != 0) {
+ dprintk("%s: n: %p, resync_num: %d.\n",
+ __func__, n, atomic_read(&priv->resync_num));
+ return -EAGAIN;
+ }
+
+ allowed = global_page_state(NR_FREE_PAGES) +
+ global_page_state(NR_FILE_PAGES);
+ allowed >>= 1;
+ allowed = to_sector(allowed << PAGE_SHIFT);
+
+ size = min(priv->resync_size, n->size - priv->resync_start);
+
+ total = min(allowed, size);
+
+ printk(KERN_NOTICE "%s: node: %p [%d], %llu:%llu %s synchronization has been started "
+ "from %llu, allowed: %llu, total: %llu.\n",
+ __func__, n, atomic_read(&n->refcnt),
+ n->start, n->size,
+ (!priv->full_resync) ? "partial" : "full",
+ priv->resync_start, allowed, total);
+
+ if (!priv->full_resync)
+ return dst_mirror_resync_partial(n);
+
+ dst_mirror_write_queued(n);
+
+ if (priv->resync_start == n->size) {
+ dst_mirror_mark_node_sync(n);
+ priv->data.age = fp->data.age;
+ dst_mirror_write_node_data(n, &priv->data);
+ return 0;
+ }
+
+ if (ndp) {
+ err = dst_mirror_ndp_setup(n, 0, 0);
+ if (err)
+ return err;
+ }
+
+ err = dst_mirror_sync_block(n, to_bytes(priv->resync_start),
+ to_bytes(total));
+ if (!err)
+ priv->resync_start += total;
+
+ return err;
+}
+
+static int dst_mirror_error(struct kst_state *st, int err)
+{
+ struct dst_request *req, *tmp;
+ unsigned int revents = st->socket->ops->poll(NULL, st->socket, NULL);
+
+ dprintk("%s: err: %d, revents: %x, notsync: %d.\n",
+ __func__, err, revents,
+ test_bit(DST_NODE_NOTSYNC, &st->node->flags));
+
+ if (err == -EEXIST)
+ return err;
+
+ if (!(revents & (POLLERR | POLLHUP)) &&
+ (err == -EPIPE || err == -ECONNRESET)) {
+ if (test_bit(DST_NODE_NOTSYNC, &st->node->flags))
+ return dst_mirror_resync(st->node, 1);
+ return 0;
+ }
+
+ if (atomic_read(&st->node->shared_num) == 0 &&
+ !st->node->shared_head) {
+ dprintk("%s: this node is the only one in the mirror, "
+ "can not mark it notsync.\n", __func__);
+ return err;
+ }
+
+ dst_mirror_mark_notsync(st->node);
+
+ mutex_lock(&st->request_lock);
+ list_for_each_entry_safe(req, tmp, &st->request_list,
+ request_list_entry) {
+ kst_del_req(req);
+ dprintk("%s: requeue [%c], start: %llu, idx: %d,"
+ " num: %d, size: %llu, offset: %u, err: %d.\n",
+ __func__, (bio_rw(req->bio) == WRITE)?'W':'R',
+ req->start, req->idx, req->num, req->size,
+ req->offset, err);
+
+ if (bio_rw(req->bio) != WRITE) {
+ req->start -= to_sector(req->orig_size - req->size);
+ req->size = req->orig_size;
+ req->flags &= ~(DST_REQ_HEADER_SENT | DST_REQ_CHEKSUM_RECV);
+ req->idx = 0;
+ if (dst_mirror_read(req))
+ dst_free_request(req);
+ } else {
+ kst_complete_req(req, err);
+ }
+ }
+ mutex_unlock(&st->request_lock);
+ return err;
+}
+
+static void dst_mirror_pre_del_node(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+
+ dprintk("%s: n: %p.\n", __func__, n);
+ priv->full_resync = 1;
+ cancel_rearming_delayed_work(&priv->resync_work);
+ flush_scheduled_work();
+}
+
+/*
+ * This callback is invoked when node is removed from storage.
+ */
+static void dst_mirror_del_node(struct dst_node *n)
+{
+ struct dst_mirror_priv *priv = n->priv;
+
+ dprintk("%s: n: %p, backlog_empty: %d, resync_num: %d.\n",
+ __func__, n, list_empty(&priv->backlog_list),
+ atomic_read(&priv->resync_num));
+
+ /*
+ * This strange-looking loop waits until all resync read requests
+ * are completed, this happens in dst_mirror_sync_requeue().
+ */
+ while (atomic_read(&priv->resync_num)) {
+ dst_mirror_sync_requeue(n, 1);
+ if (printk_ratelimit())
+ dprintk("%s: n: %p, backlog_empty: %d, resync_num: %d.\n",
+ __func__, n, list_empty(&priv->backlog_list),
+ atomic_read(&priv->resync_num));
+ msleep(100);
+ }
+
+ wait_for_completion(&priv->resync_complete);
+ dst_mirror_sync_requeue(n, 1);
+
+ if (priv) {
+ dst_mirror_log_exit(n);
+ kfree(priv);
+ n->priv = NULL;
+ }
+
+ if (n->device.parent == &n->st->device) {
+ int i;
+
+ for (i=0; i<ARRAY_SIZE(dst_mirror_attrs); ++i)
+ device_remove_file(&n->device, &dst_mirror_attrs[i]);
+ }
+}
+
+static struct dst_alg_ops alg_mirror_ops = {
+ .remap = dst_mirror_remap,
+ .add_node = dst_mirror_add_node,
+ .del_node = dst_mirror_del_node,
+ .del_node = dst_mirror_pre_del_node,
+ .error = dst_mirror_error,
+ .owner = THIS_MODULE,
+};
+
+static int __devinit alg_mirror_init(void)
+{
+ int err = -ENOMEM;
+
+ dst_mirror_bio_set = bioset_create(256, 256);
+ if (!dst_mirror_bio_set)
+ return -ENOMEM;
+
+ alg_mirror = dst_alloc_alg("alg_mirror", &alg_mirror_ops);
+ if (!alg_mirror)
+ goto err_out;
+
+ return 0;
+
+err_out:
+ bioset_free(dst_mirror_bio_set);
+ return err;
+}
+
+static void __devexit alg_mirror_exit(void)
+{
+ dst_remove_alg(alg_mirror);
+ bioset_free(dst_mirror_bio_set);
+}
+
+module_init(alg_mirror_init);
+module_exit(alg_mirror_exit);
+
+MODULE_LICENSE("GPL");
+MODULE_AUTHOR("Evgeniy Polyakov <johnpol@2ka.mipt.ru>");
+MODULE_DESCRIPTION("Mirror distributed algorithm.");
next prev parent reply other threads:[~2007-12-26 11:27 UTC|newest]
Thread overview: 12+ messages / expand[flat|nested] mbox.gz Atom feed top
[not found] <20071226004434.GA21861@2ka.mipt.ru>
2007-12-26 11:22 ` [0/4] DST: Distributed storage: Groundhogs strike back: no New Year for humans Evgeniy Polyakov
2007-12-26 11:22 ` [1/4] DST: Distributed storage documentation Evgeniy Polyakov
2007-12-26 11:22 ` [2/4] DST: Core distributed storage files Evgeniy Polyakov
2007-12-26 11:22 ` [3/4] DST: Network state machine Evgeniy Polyakov
2007-12-26 11:22 ` Evgeniy Polyakov [this message]
2008-01-22 19:38 Evgeniy Polyakov
2008-01-22 19:38 ` [4/4] DST: Algorithms used in distributed storage Evgeniy Polyakov
-- strict thread matches above, loose matches on Subject: below --
2007-12-17 15:03 [3/4] DST: Network state machine Evgeniy Polyakov
2007-12-17 15:03 ` [4/4] DST: Algorithms used in distributed storage Evgeniy Polyakov
2007-12-10 11:47 [3/4] DST: Network state machine Evgeniy Polyakov
2007-12-10 11:47 ` [4/4] DST: Algorithms used in distributed storage Evgeniy Polyakov
2007-12-12 9:12 ` Dmitry Monakhov
2007-12-12 10:20 ` Evgeniy Polyakov
2007-12-04 14:37 [3/4] DST: Network state machine Evgeniy Polyakov
2007-12-04 14:37 ` [4/4] DST: Algorithms used in distributed storage Evgeniy Polyakov
2007-11-29 12:53 [3/4] dst: Network state machine Evgeniy Polyakov
2007-11-29 12:53 ` [4/4] dst: Algorithms used in distributed storage Evgeniy Polyakov
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=11986681714159@2ka.mipt.ru \
--to=johnpol@2ka.mipt.ru \
--cc=linux-fsdevel@vger.kernel.org \
--cc=linux-kernel@vger.kernel.org \
--cc=netdev@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.