[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[dm-devel] [PATCH v2 2/5] dm-replicator: add main control module



From: Heinz Mauelshagen <heinzm redhat com>

This is the main replicator module plugging into the dm core interface
to construct/destruct/... replication control ("replicator" target)
and data devices ("replicator-dev" target).

The "replicator" control target handles the replication log and
the site link properties (eg. log size or asynchronous replication),
while the "replicator-dev" target handles all local and remote device
properties (eg. their paths and dirty log parameters).


Signed-off-by: Heinz Mauelshagen <heinzm redhat com>
Reviewed-by: Jon Brassow <jbrassow redhat com>
Tested-by: Jon Brassow <jbrassow redhat com>
---
 drivers/md/dm-repl.c | 1993 ++++++++++++++++++++++++++++++++++++++++++++++++++
 drivers/md/dm-repl.h |  124 ++++
 2 files changed, 2117 insertions(+), 0 deletions(-)
 create mode 100644 drivers/md/dm-repl.c
 create mode 100644 drivers/md/dm-repl.h

diff --git linux-2.6.orig/drivers/md/dm-repl.c linux-2.6/drivers/md/dm-repl.c
new file mode 100644
index 0000000..bb122b5
--- /dev/null
+++ linux-2.6/drivers/md/dm-repl.c
@@ -0,0 +1,1993 @@
+/*
+ * Copyright (C) 2008,2009  Red Hat, Inc. All rights reserved.
+ *
+ * Module Author: Heinz Mauelshagen <HeinzM redhat com>
+ *
+ * This file is released under the GPL.
+ *
+ * Remote Replication target.
+ *
+ * Features:
+ * o Logs writes to circular buffer keeping persistent state metadata.
+ * o Writes data from log synchronuously or asynchronuously
+ *   to multiple (1-N) remote replicas.
+ * o stores CRCs with metadata for integrity checks
+ * o stores versions with metadata to support future metadata migration
+ *
+ *
+ * For disk layout of backing store see dm-repl-log implementation.
+ */
+
+static const char version[] = "v0.028";
+
+#include "dm.h"
+#include "dm-repl.h"
+#include "dm-repl-log.h"
+#include "dm-repl-slink.h"
+
+#include <stdarg.h>
+#include <linux/dm-dirty-log.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
+#include <linux/crc32.h>
+#include <linux/init.h>
+#include <linux/module.h>
+#include <linux/namei.h>
+#include <linux/types.h>
+#include <linux/vmalloc.h>
+#include <linux/workqueue.h>
+
+#define	DM_MSG_PREFIX	"dm-repl"
+#define	DAEMON	DM_MSG_PREFIX	"d"
+
+/* Default local device read ahead pages. */
+#define	LD_RA_PAGES_DEFAULT	8
+
+/* Factor out to dm.[ch] */
+/* Return type for name. */
+int
+dm_descr_type(const struct dm_str_descr *descr, unsigned len, const char *name)
+{
+	while (len--) {
+		if (!strncmp(STR_LEN(name, descr[len].name)))
+			return descr[len].type;
+	}
+
+	return -ENOENT;
+}
+EXPORT_SYMBOL_GPL(dm_descr_type);
+
+/* Return name for type. */
+const char *
+dm_descr_name(const struct dm_str_descr *descr, unsigned len, const int type)
+{
+	while (len--) {
+		if (type == descr[len].type)
+			return descr[len].name;
+	}
+
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(dm_descr_name);
+/* END Factor out to dm.[ch] */
+
+/* Global list of replication log contexts for ctr/dtr and lock. */
+LIST_HEAD(replog_c_list);
+static struct mutex replog_c_list_mutex;
+
+/* REMOVEME: */
+/* Development statistics. */
+struct stats {
+	atomic_t io[2];
+	atomic_t submitted_io[2];
+	atomic_t congested_fn[2];
+};
+
+/* Reset statistics variables. */
+static void
+stats_reset(struct stats *stats)
+{
+	int i = 2;
+
+	while (i--) {
+		atomic_set(stats->io + i, 0);
+		atomic_set(stats->submitted_io + i, 0);
+		atomic_set(stats->congested_fn + i, 0);
+	}
+}
+
+/* Per site link context. */
+struct slink_c {
+	struct {
+		struct list_head slink_c;
+		struct list_head dc; /* List of replication device contexts. */
+	} lists;
+
+	/* Reference count (ie. number of devices on this site link) */
+	struct kref ref;
+
+	/* Slink handle. */
+	struct dm_repl_slink *slink;
+
+	/* Replog context. */
+	struct replog_c *replog_c;
+};
+
+/* Global context kept with replicator log. */
+enum replog_c_flags {
+	REPLOG_C_BLOCKED,
+	REPLOG_C_DEVEL_STATS,
+	REPLOG_C_IO_INFLIGHT
+};
+struct replog_c {
+	struct {
+		struct list_head replog_c;/* To add to global replog_c list. */
+		struct list_head slink_c; /* Site link context elements. */
+	} lists;
+
+	struct dm_target *ti;
+
+	/* Reference count (ie. # of slinks * # of devices on this replog) */
+	struct kref ref;
+
+	/* Back pointer to replication log. */
+	struct dm_repl_log *replog;
+	dev_t dev;	/* Replicator control device major:minor. */
+
+	/* Global io housekeeping on site link 0. */
+	struct repl_io {
+		unsigned long flags;	/* I/O state flags. */
+
+		struct bio_list in;	/* Pending bios (central input list).*/
+		spinlock_t in_lock;	/* Protects central input list.*/
+		atomic_t in_flight;	/* In flight io counter. */
+
+		/* IO workqueue. */
+		struct workqueue_struct *wq;
+		struct work_struct ws;
+
+		/* Statistics. */
+		struct stats stats;
+
+		/* slink+I/O teardown synchronization. */
+		wait_queue_head_t waiters;
+	} io;
+};
+BITOPS(ReplBlocked, replog_c, REPLOG_C_BLOCKED);
+BITOPS(ReplDevelStats, replog_c, REPLOG_C_DEVEL_STATS);
+BITOPS(ReplIoInflight, replog_c, REPLOG_C_IO_INFLIGHT);
+
+/*
+ * Per device replication context kept with any mapped device and
+ * any associated remote device, which doesn't have a local mapping.
+ */
+struct device_c {
+	struct list_head list; /* To add to slink_c rc list. */
+
+	/* Local device ti (i.e. head). */
+	struct dm_target *ti;
+
+	/* replicator control device reference. */
+	struct dm_dev *replicator_dev;
+
+	/* SLINK handle. */
+	struct slink_c *slink_c;
+
+	/* This device's number. */
+	int number;
+};
+
+/* IO in flight wait qeue handling during suspension. */
+static void
+replog_c_io_get(struct replog_c *replog_c)
+{
+	SetReplIoInflight(replog_c);
+	atomic_inc(&replog_c->io.in_flight);
+}
+
+/* Drop io in flight reference. */
+static void
+replog_c_io_put(struct replog_c *replog_c)
+{
+	if (atomic_dec_and_test(&replog_c->io.in_flight)) {
+		ClearReplIoInflight(replog_c);
+		wake_up(&replog_c->io.waiters);
+	}
+}
+
+/* Get a handle on a replicator log. */
+static struct dm_repl_log *
+repl_log_ctr(const char *name, struct dm_target *ti,
+	     unsigned int argc, char **argv)
+{
+	int r;
+	struct dm_repl_log_type *type;
+	struct dm_repl_log *log;
+
+	log = kzalloc(sizeof(*log), GFP_KERNEL);
+	if (unlikely(!log))
+		return ERR_PTR(-ENOMEM);
+
+	/* Load requested replication log module. */
+	r = request_module("dm-repl-log-%s", name);
+	if (r < 0) {
+		DMERR("replication log module for \"%s\" not found", name);
+		kfree(log);
+		return ERR_PTR(-ENOENT);
+	}
+
+	type = dm_get_type(name, DM_REPLOG);
+	if (unlikely(IS_ERR(type))) {
+		DMERR("replication log registry type not found");
+		kfree(log);
+		return (struct dm_repl_log *) type;
+	}
+
+	log->ops = type;
+	r = type->ctr(log, ti, argc, argv);
+	if (unlikely(r < 0)) {
+		DMERR("%s: constructor failed", __func__);
+		dm_put_type(type, DM_REPLOG);
+		kfree(log);
+		log = ERR_PTR(r);
+	}
+
+	return log;
+}
+
+/* Put a handle on a replicator log. */
+static void
+repl_log_dtr(struct dm_repl_log *log, struct dm_target *ti)
+{
+	/* Frees log on last drop. */
+	log->ops->dtr(log, ti);
+	dm_put_type(log->ops, DM_REPLOG);
+	kfree(log);
+}
+
+/*
+ * Create/destroy a transient replicator site link on initial get/last out.
+ */
+static struct dm_repl_slink *
+repl_slink_ctr(char *name, struct dm_repl_log *replog,
+	       unsigned argc, char **argv)
+{
+	int r;
+	struct dm_repl_slink_type *type;
+	struct dm_repl_slink *slink;
+
+	slink = kzalloc(sizeof(*slink), GFP_KERNEL);
+	if (unlikely(!slink))
+		return ERR_PTR(-ENOMEM);
+
+	/* Load requested replication site link module. */
+	r = request_module("dm-repl-slink-%s", name);
+	if (r < 0) {
+		DMERR("replication slink module for \"%s\" not found", name);
+		kfree(slink);
+		return ERR_PTR(-ENOENT);
+	}
+
+	type = dm_get_type(name, DM_SLINK);
+	if (unlikely(IS_ERR(type))) {
+		DMERR("replication slink registry type not found");
+		kfree(slink);
+		return (struct dm_repl_slink *) type;
+	}
+
+	r = type->ctr(slink, replog, argc, argv);
+	if (unlikely(r < 0)) {
+		DMERR("%s: constructor failed", __func__);
+		dm_put_type(type, DM_SLINK);
+		kfree(slink);
+		return ERR_PTR(r);
+	}
+
+	slink->ops = type;
+	return slink;
+}
+
+static void
+slink_destroy(struct dm_repl_slink *slink)
+{
+	/* Frees slink on last reference drop. */
+	slink->ops->dtr(slink);
+	dm_put_type(slink->ops, DM_SLINK);
+	kfree(slink);
+}
+
+
+/* Wake worker. */
+static void do_repl(struct work_struct *ws);
+static void
+wake_do_repl(struct replog_c *replog_c)
+{
+	queue_work(replog_c->io.wq, &replog_c->io.ws);
+}
+
+/* Called from the replog in case we can queue more bios. */
+static void
+io_callback(int read_err, int write_err, void *context)
+{
+	struct replog_c *replog_c = context;
+
+	DMDEBUG_LIMIT("%s", __func__);
+	_BUG_ON_PTR(replog_c);
+	ClearReplBlocked(replog_c);
+	wake_do_repl(replog_c);
+}
+
+/* Get a reference on a replog_c by replog reference. */
+static struct replog_c *
+replog_c_get(struct replog_c *replog_c)
+{
+	kref_get(&replog_c->ref);
+	return replog_c;
+}
+
+/* Destroy replog_c object. */
+static int slink_c_put(struct slink_c *slink_c);
+static void
+replog_c_release(struct kref *ref)
+{
+	struct replog_c *replog_c = container_of(ref, struct replog_c, ref);
+
+	BUG_ON(!list_empty(&replog_c->lists.replog_c));
+	BUG_ON(!list_empty(&replog_c->lists.slink_c));
+	kfree(replog_c);
+}
+
+/* Release reference on replog_c, releasing resources on last drop. */
+static int
+replog_c_put(struct replog_c *replog_c)
+{
+	_BUG_ON_PTR(replog_c);
+	return kref_put(&replog_c->ref, replog_c_release);
+}
+
+/*
+ * Find a replog_c by replog reference in the global replog context list.
+ *
+ * Call with replog_c_list_mutex held.
+ */
+static struct replog_c *
+replog_c_get_by_dev(dev_t dev)
+{
+	struct replog_c *replog_c;
+
+	list_for_each_entry(replog_c, &replog_c_list, lists.replog_c) {
+		if (dev == replog_c->dev)
+			return replog_c_get(replog_c);
+	}
+
+	return ERR_PTR(-ENOENT);
+}
+
+/* Get replicator control device major:minor. */
+static dev_t
+get_ctrl_dev(struct dm_target *ti)
+{
+	dev_t dev;
+	struct mapped_device *md = dm_table_get_md(ti->table);
+	struct block_device *bdev = bdget_disk(dm_disk(md), 0);
+
+	dev = bdev->bd_dev;
+	bdput(bdev);
+	dm_put(md);
+	return dev;
+}
+
+/* Allocate a replication control context. */
+static struct replog_c *
+replog_c_alloc(void)
+{
+	struct replog_c *replog_c = kzalloc(sizeof(*replog_c), GFP_KERNEL);
+	struct repl_io *io;
+
+	if (unlikely(!replog_c))
+		return ERR_PTR(-ENOMEM);
+
+	io = &replog_c->io;
+
+	/* Create singlethread workqueue for this replog's io. */
+	io->wq = create_singlethread_workqueue(DAEMON);
+	if (unlikely(!io->wq)) {
+		kfree(replog_c);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	kref_init(&replog_c->ref);
+	INIT_LIST_HEAD(&replog_c->lists.slink_c);
+	ClearReplDevelStats(replog_c);
+	ClearReplBlocked(replog_c);
+	spin_lock_init(&io->in_lock);
+	bio_list_init(&io->in);
+	atomic_set(&io->in_flight, 0);
+	INIT_WORK(&io->ws, do_repl);
+	stats_reset(&io->stats);
+	init_waitqueue_head(&io->waiters);
+	return replog_c;
+}
+
+/* Create replog_c context. */
+static struct replog_c *
+replog_c_create(struct dm_target *ti, struct dm_repl_log *replog)
+{
+	dev_t replicator_dev;
+	struct replog_c *replog_c, *replog_c_tmp;
+
+	/* Get replicator control device major:minor. */
+	replicator_dev = get_ctrl_dev(ti);
+
+	/* Allcate and init replog_c object. */
+	replog_c = replog_c_alloc();
+	if (IS_ERR(replog_c))
+		return replog_c;
+
+	/* Add to global replog_c list. */
+	mutex_lock(&replog_c_list_mutex);
+	replog_c_tmp = replog_c_get_by_dev(replicator_dev);
+	if (likely(IS_ERR(replog_c_tmp))) {
+		/* We won any potential race. */
+		/* Set replog global I/O callback and context. */
+		replog->ops->io_notify_fn_set(replog, io_callback,
+					      replog_c);
+		replog_c->dev = replicator_dev;
+		replog_c->ti = ti;
+		replog_c->replog = replog;
+		list_add_tail(&replog_c->lists.replog_c,
+			      &replog_c_list);
+		mutex_unlock(&replog_c_list_mutex);
+	} else {
+		/* Lost a potential race. */
+		mutex_unlock(&replog_c_list_mutex);
+
+		destroy_workqueue(replog_c->io.wq);
+		kfree(replog_c);
+		replog_c = replog_c_tmp;
+	}
+
+	return replog_c;
+}
+
+/* Find dc on slink_c list by dev_nr. */
+static struct device_c *
+device_c_find(struct slink_c *slink_c, unsigned dev_nr)
+{
+	struct device_c *dc;
+
+	list_for_each_entry(dc, &slink_c->lists.dc, list) {
+		if (dev_nr == dc->number)
+			return dc;
+	}
+
+	return ERR_PTR(-ENOENT);
+}
+
+/* Get a reference on an slink_c by slink reference. */
+static struct slink_c *
+slink_c_get(struct slink_c *slink_c)
+{
+	kref_get(&slink_c->ref);
+	return slink_c;
+}
+
+/* Find an slink_c by slink number on the replog slink list. */
+static struct slink_c *
+slink_c_get_by_number(struct replog_c *replog_c, int slink_nr)
+{
+	struct slink_c *slink_c;
+
+	list_for_each_entry(slink_c, &replog_c->lists.slink_c, lists.slink_c) {
+		int slink_nr_tmp =
+			slink_c->slink->ops->slink_number(slink_c->slink);
+
+		if (slink_nr == slink_nr_tmp)
+			return slink_c_get(slink_c);
+	}
+
+	return ERR_PTR(-ENOENT);
+}
+
+static struct slink_c *
+slink_c_create(struct replog_c *replog_c, struct dm_repl_slink *slink)
+{
+	int r, slink_nr = slink->ops->slink_number(slink);
+	struct slink_c *slink_c, *slink_c_tmp;
+	struct dm_repl_log *replog = replog_c->replog;
+
+	BUG_ON(slink_nr < 0);
+	DMDEBUG("%s creating slink_c for site link=%d", __func__, slink_nr);
+
+	slink_c = kzalloc(sizeof(*slink_c), GFP_KERNEL);
+	if (unlikely(!slink_c))
+		return ERR_PTR(-ENOMEM);
+
+	r = replog->ops->slink_add(replog, slink);
+	if (unlikely(r < 0)) {
+		kfree(slink_c);
+		return ERR_PTR(r);
+	}
+
+	DMDEBUG("%s added site link=%d", __func__,
+		slink->ops->slink_number(slink));
+
+	kref_init(&slink_c->ref);
+	INIT_LIST_HEAD(&slink_c->lists.dc);
+	slink_c->replog_c = replog_c;
+	slink_c->slink = slink;
+
+	/* Check creation race and add to per replog_c slink_c list. */
+	mutex_lock(&replog_c_list_mutex);
+	slink_c_tmp = slink_c_get_by_number(replog_c, slink_nr);
+	if (likely(IS_ERR(slink_c_tmp)))
+		list_add_tail(&slink_c->lists.slink_c,
+			      &replog_c->lists.slink_c);
+	else {
+		kfree(slink_c);
+		slink_c = slink_c_tmp;
+	}
+
+	mutex_unlock(&replog_c_list_mutex);
+	return slink_c;
+}
+
+/*
+ * Release reference on slink_c, removing dc from
+ * it and releasing resources on last drop.
+ */
+static void
+slink_c_release(struct kref *ref)
+{
+	struct slink_c *slink_c = container_of(ref, struct slink_c, ref);
+
+	BUG_ON(!list_empty(&slink_c->lists.dc));
+	kfree(slink_c);
+}
+
+/*
+ * Release reference on slink_c, removing dc from
+ * it and releasing resources on last drop.
+ */
+static int
+slink_c_put(struct slink_c *slink_c)
+{
+	return kref_put(&slink_c->ref, slink_c_release);
+}
+
+/* Either set ti->error or call DMERR() depending on ctr call type. */
+enum ctr_call_type { CTR_CALL, MESSAGE_CALL };
+static void
+ti_or_dmerr(enum ctr_call_type call_type, struct dm_target *ti, char *msg)
+{
+	if (call_type == CTR_CALL)
+		ti->error = msg;
+	else
+		DMERR("%s", msg);
+}
+
+/*
+ * Check, if @str is listed on variable (const char *) list of strings.
+ *
+ * Returns 1 for found on list and 0 for failure.
+ */
+static int
+str_listed(const char *str, ...)
+{
+	int r = 0;
+	const char *s;
+	va_list str_list;
+
+	va_start(str_list, str);
+
+	while ((s = va_arg(str_list, const char *))) {
+		if (!strncmp(str, s, strlen(str))) {
+			r = 1;
+			break;
+		}
+	}
+
+	va_end(str_list);
+	return r;
+}
+
+/*
+ * Worker thread.
+ *
+ * o work on all new queued bios io'ing them to the REPLOG
+ * o break out if replog reports -EWOULDBLOCK until called back
+ */
+static void
+do_repl(struct work_struct *ws)
+{
+	struct replog_c *replog_c = container_of(ws, struct replog_c, io.ws);
+	struct dm_repl_log *replog = replog_c->replog;
+	struct bio *bio;
+	struct bio_list ios;
+
+	_BUG_ON_PTR(replog);
+
+	if (ReplBlocked(replog_c))
+		return;
+
+	bio_list_init(&ios);
+
+	/* Quickly grab all (new) input bios queued. */
+	spin_lock(&replog_c->io.in_lock);
+	bio_list_merge(&ios, &replog_c->io.in);
+	bio_list_init(&replog_c->io.in);
+	spin_unlock(&replog_c->io.in_lock);
+
+	/* Work all deferred or new bios on work list. */
+	while ((bio = bio_list_pop(&ios))) {
+		int r = replog->ops->io(replog, bio, 0);
+
+		if (r == -EWOULDBLOCK) {
+			SetReplBlocked(replog_c);
+			/* REMOVEME: */
+			DMDEBUG_LIMIT("%s SetReplBlocked", __func__);
+
+			/* Push non-processed bio back to the work list. */
+			bio_list_push(&ios, bio);
+
+			/*
+			 * Merge non-processed bios
+			 * back to the input list head.
+			 */
+			spin_lock(&replog_c->io.in_lock);
+			bio_list_merge_head(&replog_c->io.in, &ios);
+			spin_unlock(&replog_c->io.in_lock);
+
+			break;
+		} else
+			BUG_ON(r);
+	}
+}
+
+/* Replication congested function. */
+static int
+repl_congested(void *congested_data, int bdi_bits)
+{
+	int r;
+	struct device_c *dc = congested_data;
+	struct replog_c *replog_c;
+
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	r = !!ReplBlocked(replog_c);
+
+	/* REMOVEME: statistics. */
+	atomic_inc(&replog_c->io.stats.congested_fn[r]);
+	return r;
+}
+
+/* Set backing device congested function of a local replicated device. */
+static void
+dc_set_bdi(struct device_c *dc)
+{
+	struct mapped_device *md = dm_table_get_md(dc->ti->table);
+	struct backing_dev_info *bdi = &dm_disk(md)->queue->backing_dev_info;
+
+	/* Set congested function and data. */
+	bdi->congested_fn = repl_congested;
+	bdi->congested_data = dc;
+	dm_put(md);
+}
+
+/* Get device on slink and unlink it from the list of devices. */
+static struct device_c *
+dev_get_del(struct device_c *dc, int slink_nr, struct list_head *dc_list)
+{
+	int dev_nr;
+	struct slink_c *slink_c;
+	struct dm_repl_slink *slink;
+	struct dm_repl_log *replog;
+	struct replog_c *replog_c;
+
+	/* REMOVEME: */
+	_BUG_ON_PTR(dc);
+	dev_nr = dc->number;
+	BUG_ON(dev_nr < 0);
+	slink_c = dc->slink_c;
+	_BUG_ON_PTR(slink_c);
+	slink = slink_c->slink;
+	_BUG_ON_PTR(slink);
+	replog_c = slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* Get the slink by number. */
+	slink = slink->ops->slink(replog, slink_nr);
+	if (IS_ERR(slink))
+		return (struct device_c *) slink;
+
+	slink_c = slink_c_get_by_number(replog_c, slink_nr);
+	if (IS_ERR(slink_c))
+		return (struct device_c *) slink_c;
+
+	dc = device_c_find(slink_c, dev_nr);
+	if (IS_ERR(dc))
+		DMERR("No device %d on slink %d", dev_nr, slink_nr);
+	else
+		list_move(&dc->list, dc_list);
+
+	BUG_ON(slink_c_put(slink_c));
+	return dc;
+}
+
+/* Free device and put references. */
+static int
+dev_free_put(struct device_c *dc, int slink_nr)
+{
+	int r;
+	struct slink_c *slink_c;
+	struct dm_repl_slink *slink;
+
+	/* REMOVEME: */
+	_BUG_ON_PTR(dc);
+	BUG_ON(dc->number < 0);
+	BUG_ON(slink_nr < 0);
+	slink_c = dc->slink_c;
+	_BUG_ON_PTR(slink_c);
+	slink = slink_c->slink;
+	_BUG_ON_PTR(slink);
+
+	/* Delete device from slink. */
+	r = slink->ops->dev_del(slink, dc->number);
+	if (r < 0) {
+		DMERR("Error %d deleting device %d from "
+		      "site link %d", r, dc->number, slink_nr);
+	} else
+		/* Drop reference on replicator control device. */
+		dm_put_device(dc->ti, dc->replicator_dev);
+
+	kfree(dc);
+
+	if (!r)
+		/* Drop reference on slink_c, freeing it on last one. */
+		BUG_ON(slink_c_put(slink_c));
+
+	return r;
+}
+
+/*
+ * Replication device "replicator-dev" destructor method.
+ *
+ * Either on slink0 in case slink_nr == 0 for mapped devices;
+ * the whole chain of LD + its RDs will be deleted
+ * -or-
+ * on slink > 0 in case of message interface calls (just one RD)
+ */
+static int
+_replicator_dev_dtr(struct dm_target *ti, int slink_nr)
+{
+	int r;
+	struct device_c *dc = ti->private, *dc_tmp, *dc_n;
+	struct slink_c *slink_c, *slink_c_n;
+	struct replog_c *replog_c;
+	struct dm_repl_slink *slink;
+	struct list_head dc_list;
+
+	BUG_ON(slink_nr < 0);
+	_BUG_ON_PTR(dc);
+	INIT_LIST_HEAD(&dc_list);
+	slink_c = dc->slink_c;
+	_BUG_ON_PTR(slink_c);
+	replog_c = slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+
+	/* First pull device out on all slinks holding lock. */
+	mutex_lock(&replog_c_list_mutex);
+	/* Call from message interface wih slink_nr > 0. */
+	if (slink_nr)
+		dev_get_del(dc, slink_nr, &dc_list);
+	else {
+		/* slink number 0 -> delete LD and any RDs. */
+		list_for_each_entry_safe(slink_c, slink_c_n,
+					 &replog_c->lists.slink_c,
+					 lists.slink_c) {
+			slink = slink_c->slink;
+			_BUG_ON_PTR(slink);
+			slink_nr = slink->ops->slink_number(slink);
+			BUG_ON(slink_nr < 0);
+			dev_get_del(dc, slink_nr, &dc_list);
+		}
+	}
+
+	mutex_unlock(&replog_c_list_mutex);
+
+	r = !list_empty(&dc_list);
+
+	/* Now delete devices on pulled out list. */
+	list_for_each_entry_safe(dc_tmp, dc_n, &dc_list, list) {
+		slink = dc_tmp->slink_c->slink;
+		dev_free_put(dc_tmp, slink->ops->slink_number(slink));
+	}
+
+	ti->private = NULL;
+	return r;
+}
+
+/* Replicator device destructor. Autodestructs devices on slink > 0. */
+static void
+replicator_dev_dtr(struct dm_target *ti)
+{
+	_replicator_dev_dtr(ti, 0); /* Slink 0 device destruction. */
+}
+
+/* Construct a local/remote device. */
+/*
+ * slink_nr dev_nr dev_path dirty_log_params
+ *
+ * [0 1 /dev/mapper/local_device \	# local device being replicated
+ * nolog 0]{1..N}			# no dirty log with local devices
+ */
+#define	MIN_DEV_ARGS	5
+static int
+device_ctr(enum ctr_call_type call_type, struct dm_target *ti,
+	   struct replog_c *replog_c,
+	   const char *replicator_path, unsigned dev_nr,
+	   unsigned argc, char **argv, unsigned *args_used)
+{
+	int dev_params, dirtylog_params, params, r, slink_nr;
+	struct dm_repl_slink *slink;	/* Site link handle. */
+	struct slink_c *slink_c;	/* Site link context. */
+	struct device_c *dc;		/* Replication device context. */
+
+	SHOW_ARGV;
+
+	if (argc < MIN_DEV_ARGS) {
+		ti_or_dmerr(call_type, ti, "Not enough device arguments");
+		return -EINVAL;
+	}
+
+	/* Get slink number. */
+	params = 0;
+	if (unlikely(sscanf(argv[params], "%d", &slink_nr) != 1 ||
+		     slink_nr < 0)) {
+		ti_or_dmerr(call_type, ti,
+			    "Invalid site link number argument");
+		return -EINVAL;
+	}
+
+	/* Get #dev_params. */
+	params++;
+	if (unlikely(sscanf(argv[params], "%d", &dev_params) != 1 ||
+		     dev_params < 0 ||
+		     dev_params  + 4 > argc)) {
+		ti_or_dmerr(call_type, ti,
+			    "Invalid device parameter number argument");
+		return -EINVAL;
+	}
+
+	/* Get #dirtylog_params. */
+	params += dev_params + 2;
+	if (unlikely(sscanf(argv[params], "%d", &dirtylog_params) != 1 ||
+		     dirtylog_params < 0 ||
+		     params + dirtylog_params + 1 > argc)) {
+		ti_or_dmerr(call_type, ti,
+			    "Invalid dirtylog parameter number argument");
+		return -EINVAL;
+	}
+
+	/* Check that all parameters are sane. */
+	params = dev_params + dirtylog_params + 3;
+	if (params > argc) {
+		ti_or_dmerr(call_type, ti,
+			    "Invalid device/dirtylog argument count");
+		return -EINVAL;
+	}
+
+	/* Get SLINK handle. */
+	mutex_lock(&replog_c_list_mutex);
+	slink_c = slink_c_get_by_number(replog_c, slink_nr);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (unlikely(IS_ERR(slink_c))) {
+		ti_or_dmerr(call_type, ti, "Cannot find site link context");
+		return -ENOENT;
+	}
+
+	slink = slink_c->slink;
+	_BUG_ON_PTR(slink);
+
+	/* Allocate replication context for new device. */
+	dc = kzalloc(sizeof(*dc), GFP_KERNEL);
+	if (unlikely(!dc)) {
+		ti_or_dmerr(call_type, ti, "Cannot allocate device context");
+		BUG_ON(slink_c_put(slink_c));
+		return -ENOMEM;
+	}
+
+	INIT_LIST_HEAD(&dc->list);
+	dc->slink_c = slink_c;
+	dc->ti = ti;
+
+	/*
+	 * Get reference on replicator control device.
+	 *
+	 * Dummy start/size sufficient here.
+	 */
+	r = dm_get_device(ti, replicator_path, 0, 1,
+			  FMODE_WRITE, &dc->replicator_dev);
+	if (unlikely(r < 0)) {
+		ti_or_dmerr(call_type, ti,
+			    "Can't access replicator control device");
+		goto err_slink_put;
+	}
+
+	/* Add device to slink. */
+	/*
+	 * ti->split_io for all local devices must be set
+	 * to the unique region_size of the remote devices.
+	 */
+	r = slink->ops->dev_add(slink, dev_nr, ti, params, argv + 1);
+	if (unlikely(r < 0)) {
+		ti_or_dmerr(call_type, ti, r == -EEXIST ?
+			"device already in use on site link" :
+			"Failed to add device to site link");
+		goto err_device_put;
+	}
+
+	dc->number = r;
+
+	/* Only set bdi properties on local devices. */
+	if (!slink_nr) {
+		/* Preset, will be set to region size in the slink code. */
+		ti->split_io = DM_REPL_MIN_SPLIT_IO;
+
+		/*
+		 * Init ti reference on slink0 devices only,
+		 * because they only have a local mapping!
+		 */
+		ti->private = dc;
+		dc_set_bdi(dc);
+	}
+
+	/* Add rc to slink_c list. */
+	mutex_lock(&replog_c_list_mutex);
+	list_add_tail(&dc->list, &slink_c->lists.dc);
+	mutex_unlock(&replog_c_list_mutex);
+
+	*args_used = dev_params + dirtylog_params + 4;
+	DMDEBUG("%s added device=%d to site link=%u", __func__,
+		r, slink->ops->slink_number(slink));
+	return 0;
+
+err_device_put:
+	dm_put_device(ti, dc->replicator_dev);
+err_slink_put:
+	BUG_ON(slink_c_put(slink_c));
+	kfree(dc);
+	return r;
+}
+
+/*
+ * Replication device "replicator-dev" constructor method.
+ *
+ * <start> <length> replicator-dev
+ *         <replicator_device> <dev_nr>		\
+ *         [<slink_nr> <#dev_params> <dev_params>
+ *          <dlog_type> <#dlog_params> <dlog_params>]{1..N}
+ *
+ * <replicator_device> = device previously constructed via "replication" target
+ * <dev_nr>	    = An integer that is used to 'tag' write requests as
+ *		      belonging to a particular set of devices - specifically,
+ *		      the devices that follow this argument (i.e. the site
+ *		      link devices).
+ * <slink_nr>	    = This number identifies the site/location where the next
+ *		      device to be specified comes from.  It is exactly the
+ *		      same number used to identify the site/location (and its
+ *		      policies) in the "replicator" target.  Interestingly,
+ *		      while one might normally expect a "dev_type" argument
+ *		      here, it can be deduced from the site link number and
+ *		      the 'slink_type' given in the "replication" target.
+ * <#dev_params>    = '1'  (The number of allowed parameters actually depends
+ *		      on the 'slink_type' given in the "replication" target.
+ *		      Since our only option there is "blockdev", the only
+ *		      allowable number here is currently '1'.)
+ * <dev_params>	    = 'dev_path'  (Again, since "blockdev" is the only
+ *		      'slink_type' available, the only allowable argument here
+ *		      is the path to the device.)
+ * <dlog_type>	    = Not to be confused with the "replicator log", this is
+ *		      the type of dirty log associated with this particular
+ *		      device.  Dirty logs are used for synchronization, during
+ *		      initialization or fall behind conditions, to bring devices
+ *		      into a coherent state with its peers - analogous to
+ *		      rebuilding a RAID1 (mirror) device.  Available dirty
+ *		      log types include: 'nolog', 'core', and 'disk'
+ * <#dlog_params>   = The number of arguments required for a particular log
+ *		      type - 'nolog' = 0, 'core' = 1/2, 'disk' = 2/3.
+ * <dlog_params>    = 'nolog' => ~no arguments~
+ *		      'core'  => <region_size> [sync | nosync]
+ *		      'disk'  => <dlog_dev_path> <region_size> [sync | nosync]
+ *	<region_size>   = This sets the granularity at which the dirty log
+ *			  tracks what areas of the device is in-sync.
+ *	[sync | nosync] = Optionally specify whether the sync should be forced
+ *			  or avoided initially.
+ */
+#define LOG_ARGS 2
+#define DEV_MIN_ARGS 5
+static int
+_replicator_dev_ctr(enum ctr_call_type call_type, struct dm_target *ti,
+		    unsigned argc, char **argv)
+{
+	int args_used, r, tmp;
+	unsigned dev_nr;
+	char *replicator_path = argv[0];
+	struct dm_dev *ctrl_dev;
+	struct replog_c *replog_c;
+
+	SHOW_ARGV;
+
+	if (argc < LOG_ARGS + DEV_MIN_ARGS)
+		goto err_args;
+
+	/*
+	 * Get reference on replicator control device.
+	 *
+	 * Dummy start/size sufficient here.
+	 */
+	r = dm_get_device(ti, replicator_path, 0, 1, FMODE_WRITE, &ctrl_dev);
+	if (unlikely(r < 0)) {
+		ti_or_dmerr(CTR_CALL, ti,
+			    "Can't access replicator control device");
+		return r;
+	}
+
+	if (sscanf(argv[1], "%d", &tmp) != 1 ||
+	    tmp < 0) {
+		dm_put_device(ti, ctrl_dev);
+		ti_or_dmerr(call_type, ti, "Invalid device number argument");
+		return -EINVAL;
+	}
+
+	dev_nr = tmp;
+
+	/* Find precreated replog context by device, taking out a reference. */
+	mutex_lock(&replog_c_list_mutex);
+	replog_c = replog_c_get_by_dev(ctrl_dev->bdev->bd_dev);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (unlikely(IS_ERR(replog_c))) {
+		dm_put_device(ti, ctrl_dev);
+		ti_or_dmerr(call_type, ti, "Failed to find replication log");
+		return PTR_ERR(replog_c);
+	}
+
+	_BUG_ON_PTR(replog_c->replog);
+	argc -= LOG_ARGS;
+	argv += LOG_ARGS;
+
+	/*
+	 * Iterate all slinks/rds if multiple device/dirty
+	 * log tuples present on mapping table line.
+	 */
+	while (argc >= DEV_MIN_ARGS) {
+		/* Create slink+device context. */
+		r = device_ctr(call_type, ti, replog_c, replicator_path,
+			       dev_nr, argc, argv, &args_used);
+		if (unlikely(r))
+			goto device_ctr_err;
+
+		BUG_ON(args_used > argc);
+		argc -= args_used;
+		argv += args_used;
+	}
+
+	/* All arguments consumed? */
+	if (argc) {
+		r = -EINVAL;
+		goto invalid_args;
+	}
+
+	/* Drop initially taken replog reference. */
+	BUG_ON(replog_c_put(replog_c));
+	dm_put_device(ti, ctrl_dev);
+	return 0;
+
+invalid_args:
+	ti_or_dmerr(call_type, ti, "Invalid device arguments");
+device_ctr_err:
+	/* Drop the initially taken replog reference. */
+	BUG_ON(replog_c_put(replog_c));
+	dm_put_device(ti, ctrl_dev);
+
+	/* If we get an error in ctr -> tear down. */
+	if (call_type == CTR_CALL)
+		replicator_dev_dtr(ti);
+
+	return r;
+
+err_args:
+	ti_or_dmerr(call_type, ti, "Not enough device arguments");
+	return -EINVAL;
+}
+
+/* Constructor method. */
+static int
+replicator_dev_ctr(struct dm_target *ti, unsigned argc, char **argv)
+{
+	return _replicator_dev_ctr(CTR_CALL, ti, argc, argv);
+}
+
+/* Device flush method. */
+static void
+replicator_dev_flush(struct dm_target *ti)
+{
+	struct device_c *dc = ti->private;
+	struct dm_repl_log *replog;
+
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	_BUG_ON_PTR(dc->slink_c->replog_c);
+	replog = dc->slink_c->replog_c->replog;
+	_BUG_ON_PTR(replog);
+	BUG_ON(!replog->ops->flush);
+	replog->ops->flush(replog);
+}
+
+/* Queues bios to the cache and wakes up worker thread. */
+static inline void
+queue_bio(struct device_c *dc, struct bio *bio)
+{
+	struct replog_c *replog_c = dc->slink_c->replog_c;
+
+	/* REMOVEME: statistics */
+	atomic_inc(replog_c->io.stats.io + !!(bio_data_dir(bio) == WRITE));
+
+	spin_lock(&replog_c->io.in_lock);
+	bio_list_add(&replog_c->io.in, bio);
+	replog_c_io_get(replog_c);
+	spin_unlock(&replog_c->io.in_lock);
+
+	/* Wakeup worker to deal with bio input list. */
+	wake_do_repl(replog_c);
+}
+
+/*
+ * Map a replicated device io by handling it in the worker
+ * thread in order to avoid delays in the fast path.
+ */
+static int
+replicator_dev_map(struct dm_target *ti, struct bio *bio,
+		   union map_info *map_context)
+{
+	map_context->ptr = bio->bi_private;
+	bio->bi_sector -= ti->begin;	/* Remap sector to target begin. */
+	queue_bio(ti->private, bio);	/* Queue bio to the worker. */
+	return DM_MAPIO_SUBMITTED;	/* Handle later. */
+}
+
+
+/* Replication device suspend/resume helper. */
+enum suspend_resume_type { POSTSUSPEND, RESUME };
+static void
+_replicator_dev_suspend_resume(struct dm_target *ti,
+			       enum suspend_resume_type type)
+{
+	struct device_c *dc = ti->private;
+	struct replog_c *replog_c;
+	struct slink_c *slink_c, *n;
+	int dev_nr = dc->number, slinks = 0;
+
+	DMDEBUG("%s %s", __func__, type == RESUME ? "resume" : "postsusend");
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	BUG_ON(dev_nr < 0);
+
+	/* Suspend/resume device on all slinks. */
+	list_for_each_entry_safe(slink_c, n, &replog_c->lists.slink_c,
+				 lists.slink_c) {
+		int r;
+		struct dm_repl_slink *slink = slink_c->slink;
+
+		_BUG_ON_PTR(slink);
+
+		r = type == RESUME ?
+			slink->ops->resume(slink, dev_nr) :
+			slink->ops->postsuspend(slink, dev_nr);
+		if (r < 0)
+			DMERR("Error %d %s device=%d on site link %u",
+			      r, type == RESUME ?
+			      "resuming" : "postsuspending",
+			      dev_nr, slink->ops->slink_number(slink));
+		else
+			slinks++;
+	}
+
+	if (type == RESUME && slinks)
+		wake_do_repl(replog_c);
+}
+
+/* Replication device post suspend method. */
+static void
+replicator_dev_postsuspend(struct dm_target *ti)
+{
+	_replicator_dev_suspend_resume(ti, POSTSUSPEND);
+}
+
+/* Replicatin device resume method. */
+static void
+replicator_dev_resume(struct dm_target *ti)
+{
+	_replicator_dev_suspend_resume(ti, RESUME);
+}
+
+/* Pass endio calls down to the replicator log if requested. */
+static int
+replicator_dev_endio(struct dm_target *ti, struct bio *bio,
+		     int error, union map_info *map_context)
+{
+	int rr, rs;
+	struct device_c *dc = ti->private;
+	struct replog_c *replog_c;
+	struct dm_repl_log *replog;
+	struct dm_repl_slink *slink;
+
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	slink = dc->slink_c->slink;
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	replog = dc->slink_c->replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	rr = replog->ops->endio ?
+	     replog->ops->endio(replog, bio, error, map_context) : 0;
+	rs = slink->ops->endio ?
+	     slink->ops->endio(slink, bio, error, map_context) : 0;
+	replog_c_io_put(replog_c);
+	return rs < 0 ? rs : rr;
+}
+
+/*
+ * Replication device message method.
+ *
+ * Arguments:
+ * device add/del \
+ * 63:4 0 \		# replication log on 63:4 and device number '0'
+ * [0 1 /dev/mapper/local_device \	# local device being replicated
+ * nolog 0]{1..N}			# no dirty log with local devices
+ *
+ * start/resume all/device		# Resume whole replicator/
+ * 					# a single device
+ */
+static int
+replicator_dev_message(struct dm_target *ti, unsigned argc, char **argv)
+{
+	int slink_nr;
+	struct device_c *dc = ti->private;
+	struct replog_c *replog_c;
+	struct dm_repl_log *replog;
+
+	SHOW_ARGV;
+
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+	replog = dc->slink_c->replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* Check minimum arguments. */
+	if (unlikely(argc < 1))
+		goto err_args;
+
+	/* Add/delete a device to/from a site link. */
+	if (str_listed(argv[0], "device", NULL)) {
+		if (argc < 2)
+			goto err_args;
+
+		/* We've got the target index of an SLINK0 device here. */
+		if (str_listed(argv[1], "add", NULL))
+			return _replicator_dev_ctr(MESSAGE_CALL, ti,
+						   argc - 2, argv + 2);
+		else if (str_listed(argv[1], "del", NULL)) {
+			if (argc < 3)
+				goto err_args;
+
+			if (sscanf(argv[2], "%d", &slink_nr) != 1 ||
+			    slink_nr < 1)
+				DM_EINVAL("invalid site link number "
+					  "argument; must be > 0");
+
+			return _replicator_dev_dtr(ti, slink_nr);
+		} else
+			DM_EINVAL("invalid device command argument");
+
+	/* Start replication on single device on all slinks. */
+	} else if (str_listed(argv[0], "start", "resume", NULL))
+		replicator_dev_resume(ti);
+
+	/* Stop replication for single device on all slinks. */
+	else if (str_listed(argv[0], "stop", "suspend", "postsuspend", NULL))
+		replicator_dev_postsuspend(ti);
+	else
+		DM_EINVAL("invalid message command");
+
+	return 0;
+
+err_args:
+	DM_EINVAL("too few message arguments");
+}
+
+/* Replication device status output method. */
+static int
+replicator_dev_status(struct dm_target *ti, status_type_t type,
+		      char *result, unsigned maxlen)
+{
+	ssize_t sz = 0;
+	static char buffer[2048];
+	struct device_c *dc = ti->private;
+	struct replog_c *replog_c;
+	struct dm_repl_slink *slink;
+
+	mutex_lock(&replog_c_list_mutex);
+	_BUG_ON_PTR(dc);
+	_BUG_ON_PTR(dc->slink_c);
+	slink = dc->slink_c->slink;
+	_BUG_ON_PTR(slink);
+	replog_c = dc->slink_c->replog_c;
+	_BUG_ON_PTR(replog_c);
+
+	DMEMIT("%s %d ", format_dev_t(buffer, replog_c->dev), dc->number);
+	mutex_unlock(&replog_c_list_mutex);
+	slink->ops->status(slink, dc->number, type, buffer, sizeof(buffer));
+	DMEMIT("%s", buffer);
+	return 0;
+}
+
+/* Replicator device interface. */
+static struct target_type replicator_dev_target = {
+	.name = "replicator-dev",
+	.version = {1, 0, 0},
+	.module = THIS_MODULE,
+	.ctr = replicator_dev_ctr,
+	.dtr = replicator_dev_dtr,
+	.flush = replicator_dev_flush,
+	.map = replicator_dev_map,
+	.postsuspend = replicator_dev_postsuspend,
+	.resume = replicator_dev_resume,
+	.end_io = replicator_dev_endio,
+	.message = replicator_dev_message,
+	.status = replicator_dev_status,
+};
+
+
+/*
+ * Replication log destructor.
+ */
+static void
+replicator_dtr(struct dm_target *ti)
+{
+	int r, slink_nr;
+	struct replog_c *replog_c = ti->private;
+	struct dm_repl_log *replog;
+	struct slink_c *slink_c, *n;
+	struct dm_repl_slink *slink;
+
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* Pull out replog_c to process destruction cleanly. */
+	mutex_lock(&replog_c_list_mutex);
+	list_del_init(&replog_c->lists.replog_c);
+	mutex_unlock(&replog_c_list_mutex);
+
+	/* Put all replog's slink contexts. */
+	list_for_each_entry_safe(slink_c, n, &replog_c->lists.slink_c,
+				 lists.slink_c) {
+		list_del_init(&slink_c->lists.slink_c);
+		slink = slink_c->slink;
+		_BUG_ON_PTR(slink);
+		slink_nr = slink->ops->slink_number(slink);
+		r = replog->ops->slink_del(replog, slink);
+		BUG_ON(r < 0);
+		slink_destroy(slink);
+		BUG_ON(replog_c_put(replog_c));
+		BUG_ON(!slink_c_put(slink_c));
+	}
+
+	/* Drop work queue. */
+	destroy_workqueue(replog_c->io.wq);
+
+	/* Drop reference on replog. */
+	repl_log_dtr(replog_c->replog, replog_c->ti);
+
+	BUG_ON(!replog_c_put(replog_c));
+}
+
+/*
+ * Replication constructor helpers.
+ */
+/* Create a site link tying it to the replication log. */
+/*
+ * E.g.: "local 4 1 async ios 10000"
+ */
+#define	MIN_SLINK_ARGS	3
+static int
+_replicator_slink_ctr(enum ctr_call_type call_type, struct dm_target *ti,
+		      struct replog_c *replog_c,
+		      unsigned argc, char **argv, unsigned *args_used)
+{
+	int first_slink, slink_nr, slink_params;
+	struct dm_repl_slink *slink;	/* Site link handle. */
+	struct slink_c *slink_c;	/* Site link context. */
+
+	SHOW_ARGV;
+
+	if (argc < MIN_SLINK_ARGS)
+		return -EINVAL;
+
+	/* Get #slink_params. */
+	if (unlikely(sscanf(argv[1], "%d", &slink_params) != 1 ||
+		     slink_params < 0 ||
+		     slink_params + 2 > argc)) {
+		ti_or_dmerr(call_type, ti,
+			   "Invalid site link parameter number argument");
+		return -EINVAL;
+	}
+
+	/* Get slink #. */
+	if (unlikely(sscanf(argv[2], "%d", &slink_nr) != 1 ||
+		     slink_nr < 0)) {
+		ti_or_dmerr(call_type, ti,
+			   "Invalid site link number argument");
+		return -EINVAL;
+	}
+
+	/* Check first slink is slink 0. */
+	mutex_lock(&replog_c_list_mutex);
+	first_slink = !list_first_entry(&replog_c->lists.slink_c,
+					struct slink_c, lists.slink_c);
+	if (first_slink && slink_nr) {
+		mutex_unlock(&replog_c_list_mutex);
+		ti_or_dmerr(call_type, ti, "First site link must be 0");
+		return -EINVAL;
+	}
+
+	slink_c = slink_c_get_by_number(replog_c, slink_nr);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (!IS_ERR(slink_c)) {
+		ti_or_dmerr(call_type, ti, "slink already existing");
+		BUG_ON(slink_c_put(slink_c));
+		return -EPERM;
+	}
+
+	/* Get SLINK handle. */
+	slink = repl_slink_ctr(argv[0], replog_c->replog,
+			       slink_params + 1, argv + 1);
+	if (unlikely(IS_ERR(slink))) {
+		ti_or_dmerr(call_type, ti, "Cannot create site link context");
+		return PTR_ERR(slink);
+	}
+
+	slink_c = slink_c_create(replog_c, slink);
+	if (unlikely(IS_ERR(slink_c))) {
+		ti_or_dmerr(call_type, ti, "Cannot allocate site link context");
+		slink_destroy(slink);
+		return PTR_ERR(slink_c);
+	}
+
+	*args_used = slink_params + 2;
+	DMDEBUG("%s added site link=%d", __func__, slink_nr);
+	return 0;
+}
+
+/*
+ * Construct a replicator mapping to log writes of one or more local mapped
+ * devices in a local replicator log (REPLOG) in order to replicate them to
+ * one or multiple site links (SLINKs) while ensuring write order fidelity.
+ *
+ *******************************
+ *
+ * "replicator" constructor table:
+ *
+ * <start> <length> replicator \
+ *	<replog_type> <#replog_params> <replog_params> \
+ *	[<slink_type_0> <#slink_params_0> <slink_params_0>]{1..N}
+ *
+ * <replog_type>    = "ringbuffer" is currently the only available type
+ * <#replog_params> = # of args intended for the replog (2 or 4)
+ * <replog_params>  = <dev_path> <dev_start> [auto/create/open <size>]
+ *	<dev_path>  = device path of replication log (REPLOG) backing store
+ *	<dev_start> = offset to REPLOG header
+ *	create	    = The replication log will be initialized if not active
+ *		      and sized to "size".  (If already active, the create
+ *		      will fail.)  Size is always in sectors.
+ *	open	    = The replication log must be initialized and valid or
+ *		      the constructor will fail.
+ *	auto        = If a valid replication log header is found on the
+ *		      replication device, this will behave like 'open'.
+ *		      Otherwise, this option behaves like 'create'.
+ *
+ * <slink_type>    = "blockdev" is currently the only available type
+ * <#slink_params> = 1/2/4
+ * <slink_params>  = <slink_nr> [<slink_policy> [<fall_behind> <N>]]
+ *	<slink_nr>     = This is a unique number that is used to identify a
+ *			 particular site/location.  '0' is always used to
+ *			 identify the local site, while increasing integers
+ *			 are used to identify remote sites.
+ *	<slink_policy> = The policy can be either 'sync' or 'async'.
+ *			 'sync' means write requests will not return until
+ *			 the data is on the storage device.  'async' allows
+ *			 a device to "fall behind"; that is, outstanding
+ *			 write requests are waiting in the replication log
+ *			 to be processed for this site, but it is not delaying
+ *			 the writes of other sites.
+ *	<fall_behind>  = This field is used to specify how far the user is
+ *			 willing to allow write requests to this specific site
+ *			 to "fall behind" in processing before switching to
+ *			 a 'sync' policy.  This "fall behind" threshhold can
+ *			 be specified in three ways: ios, size, or timeout.
+ *			 'ios' is the number of pending I/Os allowed (e.g.
+ *			 "ios 10000").  'size' is the amount of pending data
+ *			 allowed (e.g. "size 200m").  Size labels include:
+ *			 s (sectors), k, m, g, t, p, and e.  'timeout' is
+ *			 the amount of time allowed for writes to be
+ *			 outstanding.  Time labels include: s, m, h, and d.
+ */
+#define	MIN_CONTROL_ARGS	3
+static int
+replicator_ctr(struct dm_target *ti, unsigned argc, char **argv)
+{
+	int args_used = 0, params, r;
+	struct dm_dev *backing_dev;
+	struct dm_repl_log *replog;	/* Replicator log handle. */
+	struct replog_c *replog_c;	/* Replication log context. */
+
+	SHOW_ARGV;
+
+	if (unlikely(argc < MIN_CONTROL_ARGS)) {
+		ti->error = "Invalid argument count";
+		return -EINVAL;
+	}
+
+	/* Get # of replog params. */
+	if (unlikely(sscanf(argv[1], "%d", &params) != 1 ||
+		     params < 2 ||
+		     params + 3 > argc)) {
+		ti->error = "Invalid replicator log parameter number";
+		return -EINVAL;
+	}
+
+	/* Check for site link 0 parameter count. */
+	if (params + 4 > argc) {
+		ti->error = "Invalid replicator site link parameter number";
+		return -EINVAL;
+	}
+
+	/*
+	 * Get reference on replicator control device.
+	 *
+	 * Dummy start/size sufficient here.
+	 */
+	r = dm_get_device(ti, argv[2], 0, 1, FMODE_WRITE, &backing_dev);
+	if (unlikely(r < 0)) {
+		ti_or_dmerr(CTR_CALL, ti,
+			    "Can't access replicator control device");
+		return r;
+	}
+
+
+	/* Lookup replog_c by dev_t. */
+	mutex_lock(&replog_c_list_mutex);
+	replog_c = replog_c_get_by_dev(backing_dev->bdev->bd_dev);
+	mutex_unlock(&replog_c_list_mutex);
+
+	if (unlikely(!IS_ERR(replog_c))) {
+		BUG_ON(replog_c_put(replog_c));
+		dm_put_device(ti, backing_dev);
+		ti->error = "Recreating replication log prohibited";
+		return -EPERM;
+	}
+
+	/* Get a reference on the replication log. */
+	replog = repl_log_ctr(argv[0], ti, params, argv + 1);
+	dm_put_device(ti, backing_dev);
+	if (unlikely(IS_ERR(replog))) {
+		ti->error = "Cannot create replication log context";
+		return PTR_ERR(replog);
+	}
+
+	_BUG_ON_PTR(replog->ops->postsuspend);
+	_BUG_ON_PTR(replog->ops->resume);
+
+	/* Create global replication control context. */
+	replog_c = replog_c_create(ti, replog);
+	if (unlikely(IS_ERR(replog_c))) {
+		ti->error = "Cannot allocate replication device context";
+		return PTR_ERR(replog_c);
+	} else
+		ti->private = replog_c;
+
+	/* Work any slink parameter tupels. */
+	params += 2;
+	BUG_ON(argc < params);
+	argc -= params;
+	argv += params;
+	r = 0;
+
+	while (argc > 0) {
+		r = _replicator_slink_ctr(CTR_CALL, ti, replog_c,
+					  argc, argv, &args_used);
+		if (r)
+			break;
+
+		/* Take per site link reference out. */
+		replog_c_get(replog_c);
+
+		BUG_ON(argc < args_used);
+		argc -= args_used;
+		argv += args_used;
+	}
+
+	return r;
+}
+
+/*
+ * Replication log map function.
+ *
+ * No io to replication log device allowed: ignore it
+ * by returning zeroes on read and ignoring writes silently.
+ */
+static int
+replicator_map(struct dm_target *ti, struct bio *bio,
+	       union map_info *map_context)
+{
+	/* Readahead of null bytes only wastes buffer cache. */
+	if (bio_rw(bio) == READA)
+		return -EIO;
+	else if (bio_rw(bio) == READ)
+		zero_fill_bio(bio);
+
+	bio_endio(bio, 0);
+	return DM_MAPIO_SUBMITTED; /* Accepted bio, don't make new request. */
+}
+
+/* Replication log suspend/resume helper. */
+static void
+_replicator_suspend_resume(struct replog_c *replog_c,
+			   enum suspend_resume_type type)
+{
+	struct dm_repl_log *replog;
+
+	DMDEBUG("%s %s", __func__, type == RESUME ? "resume" : "postsusend");
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* FIXME: device number not utilized yet. */
+	switch (type) {
+	case POSTSUSPEND:
+		ClearReplBlocked(replog_c);
+		flush_workqueue(replog_c->io.wq);
+		wait_event(replog_c->io.waiters, !ReplIoInflight(replog_c));
+		replog->ops->postsuspend(replog, -1);
+		break;
+	case RESUME:
+		replog->ops->resume(replog, -1);
+		ClearReplBlocked(replog_c);
+		wake_do_repl(replog_c);
+		break;
+	default:
+		BUG();
+	}
+}
+
+
+/* Suspend/Resume all. */
+static void
+_replicator_suspend_resume_all(struct replog_c *replog_c,
+			       enum suspend_resume_type type)
+{
+	struct device_c *dc;
+	struct slink_c *slink_c0;
+
+	_BUG_ON_PTR(replog_c);
+
+	/* First entry on replog_c slink_c list is slink0. */
+	slink_c0 = list_first_entry(&replog_c->lists.slink_c,
+				    struct slink_c, lists.slink_c);
+	_BUG_ON_PTR(slink_c0);
+
+	/* Walk all slink device_c dc and resume slinks. */
+	if (type == RESUME)
+		list_for_each_entry(dc, &slink_c0->lists.dc, list)
+			_replicator_dev_suspend_resume(dc->ti, type);
+
+	_replicator_suspend_resume(replog_c, type);
+
+	/* Walk all slink device_c dc and resume slinks. */
+	if (type == POSTSUSPEND)
+		list_for_each_entry(dc, &slink_c0->lists.dc, list)
+			_replicator_dev_suspend_resume(dc->ti, type);
+}
+
+/* Replication control post suspend method. */
+static void
+replicator_postsuspend(struct dm_target *ti)
+{
+	_replicator_suspend_resume(ti->private, POSTSUSPEND);
+}
+
+/* Replication control resume method. */
+static void
+replicator_resume(struct dm_target *ti)
+{
+	_replicator_suspend_resume(ti->private, RESUME);
+}
+
+/*
+ * Replication log message method.
+ *
+ * Arguments: start/resume/stop/suspend/statistics/replog
+ */
+static int
+replicator_message(struct dm_target *ti, unsigned argc, char **argv)
+{
+	int r, resume, suspend;
+	struct replog_c *replog_c = ti->private;
+	struct dm_repl_log *replog;
+
+	SHOW_ARGV;
+
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	/* Check minimum arguments. */
+	if (unlikely(argc < 1))
+		goto err_args;
+
+	resume  = str_listed(argv[0], "resume", "start", NULL);
+	/* Hrm, bogus: need a NULL end arg to make it work!? */
+	suspend = !resume &&
+		  str_listed(argv[0], "suspend", "postsuspend", "stop", NULL);
+
+	/*
+	 * Start/resume replicaton log or
+	 * start/resume it and all slinks+devices.
+	 */
+	if (suspend || resume) {
+		int all;
+
+		if (!range_ok(argc, 1, 2)) {
+			DMERR("Invalid suspend/resume argument count");
+			return -EINVAL;
+		}
+
+		all = (argc == 2 && str_listed(argv[1], "all", NULL));
+
+		if (resume) {
+			if (all)
+				_replicator_suspend_resume_all(replog_c,
+							       RESUME);
+			else
+				_replicator_suspend_resume(replog_c,
+							   RESUME);
+
+		/* Stop replication log. */
+		} else  {
+			if (all) {
+				_replicator_suspend_resume_all(replog_c,
+							       POSTSUSPEND);
+			} else
+				_replicator_suspend_resume(replog_c,
+							   POSTSUSPEND);
+		}
+
+	/* Site link message. */
+	} else if (str_listed(argv[0], "slink", NULL)) {
+		/* E.g.: "local 4 1 async ios 10000" */
+		int args_used, tmp;
+		unsigned slink_nr;
+		struct dm_repl_slink *slink;
+		struct slink_c *slink_c;
+
+		/* Check minimum arguments. */
+		if (unlikely(argc < 3))
+			goto err_args;
+
+		if (sscanf(argv[2], "%d", &tmp) != 1 ||	tmp < 1)
+			DM_EINVAL("site link number invalid");
+
+		slink_nr = tmp;
+
+		if (str_listed(argv[1], "add", "del", NULL) &&
+		    !slink_nr)
+			DM_EPERM("Can't add/delete site link 0 via message");
+
+		mutex_lock(&replog_c_list_mutex);
+		slink_c = slink_c_get_by_number(replog_c, slink_nr);
+		mutex_unlock(&replog_c_list_mutex);
+
+		if (str_listed(argv[1], "add", NULL)) {
+			if (IS_ERR(slink_c)) {
+				r = _replicator_slink_ctr(MESSAGE_CALL, ti,
+							 replog_c,
+							  argc - 2, argv + 2,
+							  &args_used);
+				if (r)
+					DMERR("Error creating site link");
+
+				return r;
+			} else {
+				BUG_ON(slink_c_put(slink_c));
+				DM_EPERM("site link already exists");
+			}
+		} else if (str_listed(argv[1], "del", NULL)) {
+			if (IS_ERR(slink_c))
+				DM_EPERM("site link doesn't exist");
+			else {
+				if (!list_empty(&slink_c->lists.dc)) {
+					slink_c_put(slink_c);
+					DM_EPERM("site link still has devices");
+				}
+
+				slink_c_put(slink_c);
+				r = slink_c_put(slink_c);
+				if (!r)
+					DMERR("site link still exists (race)!");
+
+				return r;
+			}
+		} else if (str_listed(argv[1], "message", NULL)) {
+			slink = slink_c->slink;
+			_BUG_ON_PTR(slink);
+
+			if (slink->ops->message)
+				return slink->ops->message(slink,
+							   argc - 2, argv + 2);
+			else
+				DM_EPERM("no site link message interface");
+		}
+
+	/* Statistics. */
+	} else if (str_listed(argv[0], "statistics", NULL)) {
+		if (argc != 2)
+			DM_EINVAL("too many message arguments");
+
+		_BUG_ON_PTR(replog_c);
+		if (str_listed(argv[1], "on"))
+			SetReplDevelStats(replog_c);
+		else if (str_listed(argv[1], "off"))
+			ClearReplDevelStats(replog_c);
+		else if (str_listed(argv[1], "reset"))
+			stats_reset(&replog_c->io.stats);
+
+	/* Replication log message. */
+	} else if (str_listed(argv[0], "replog", NULL)) {
+		if (argc < 2)
+			goto err_args;
+
+		if (replog->ops->message)
+			return replog->ops->message(replog, argc - 1, argv + 1);
+		else
+			DM_EPERM("no replication log message interface");
+	} else
+		DM_EINVAL("invalid message received");
+
+	return 0;
+
+err_args:
+	DM_EINVAL("too few message arguments");
+}
+
+/* Replication log status output method. */
+static int
+replicator_status(struct dm_target *ti, status_type_t type,
+		    char *result, unsigned maxlen)
+{
+	unsigned dev_nr = 0;
+	ssize_t sz = 0;
+	static char buffer[2048];
+	struct replog_c *replog_c = ti->private;
+	struct dm_repl_log *replog;
+	struct slink_c *slink_c0;
+	struct dm_repl_slink *slink;
+
+	mutex_lock(&replog_c_list_mutex);
+	_BUG_ON_PTR(replog_c);
+	replog = replog_c->replog;
+	_BUG_ON_PTR(replog);
+
+	if (type == STATUSTYPE_INFO) {
+		if (ReplDevelStats(replog_c)) {
+			struct stats *s = &replog_c->io.stats;
+
+			DMEMIT("v=%s r=%u w=%u rs=%u "
+			       "ws=%u nc=%u c=%u ",
+			       version,
+			       atomic_read(s->io), atomic_read(s->io + 1),
+			       atomic_read(s->submitted_io),
+			       atomic_read(s->submitted_io + 1),
+			       atomic_read(s->congested_fn),
+			       atomic_read(s->congested_fn + 1));
+		}
+	}
+
+	mutex_unlock(&replog_c_list_mutex);
+
+	/* Get status from replog. */
+	/* FIXME: dev_nr superfluous? */
+	replog->ops->status(replog, dev_nr, type, buffer, sizeof(buffer));
+	DMEMIT("%s", buffer);
+
+	slink_c0 = list_first_entry(&replog_c->lists.slink_c,
+				    struct slink_c, lists.slink_c);
+	slink = slink_c0->slink;
+	_BUG_ON_PTR(slink);
+	/* Get status from slink. */
+	*buffer = 0;
+	slink->ops->status(slink, -1, type, buffer, sizeof(buffer));
+	DMEMIT(" %s", buffer);
+	return 0;
+}
+
+/* Replicator control interface. */
+static struct target_type replicator_target = {
+	.name = "replicator",
+	.version = {1, 0, 0},
+	.module = THIS_MODULE,
+	.ctr = replicator_ctr,
+	.dtr = replicator_dtr,
+	.map = replicator_map,
+	.postsuspend = replicator_postsuspend,
+	.resume = replicator_resume,
+	.message = replicator_message,
+	.status = replicator_status,
+};
+
+int __init dm_repl_init(void)
+{
+	int r;
+
+	INIT_LIST_HEAD(&replog_c_list);
+	mutex_init(&replog_c_list_mutex);
+
+	r = dm_register_target(&replicator_target);
+	if (r < 0)
+		DMERR("failed to register %s %s [%d]",
+		      replicator_target.name, version, r);
+	else {
+		DMINFO("registered %s target %s",
+		       replicator_target.name, version);
+		r = dm_register_target(&replicator_dev_target);
+		if (r < 0) {
+			DMERR("Failed to register %s %s [%d]",
+			      replicator_dev_target.name, version, r);
+			dm_unregister_target(&replicator_target);
+		} else
+			DMINFO("registered %s target %s",
+			       replicator_dev_target.name, version);
+	}
+
+	return r;
+}
+
+void __exit
+dm_repl_exit(void)
+{
+	dm_unregister_target(&replicator_dev_target);
+	DMINFO("unregistered target %s %s",
+	       replicator_dev_target.name, version);
+	dm_unregister_target(&replicator_target);
+	DMINFO("unregistered target %s %s", replicator_target.name, version);
+}
+
+/* Module hooks */
+module_init(dm_repl_init);
+module_exit(dm_repl_exit);
+
+MODULE_DESCRIPTION(DM_NAME " remote replication target");
+MODULE_AUTHOR("Heinz Mauelshagen <heinzm redhat com>");
+MODULE_LICENSE("GPL");
diff --git linux-2.6.orig/drivers/md/dm-repl.h linux-2.6/drivers/md/dm-repl.h
new file mode 100644
index 0000000..f0101e0
--- /dev/null
+++ linux-2.6/drivers/md/dm-repl.h
@@ -0,0 +1,124 @@
+/*
+ * Copyright (C) 2008,2009  Red Hat, Inc. All rights reserved.
+ *
+ * Module Author: Heinz Mauelshagen (Mauelshagen RedHat com)
+ *
+ * This file is released under the GPL.
+ */
+
+/*
+ * API calling convention to create a replication mapping:
+ *
+ * 1. get a replicator log handle, hence creating a new persistent
+ *    log or accessing an existing one
+ * 2. get an slink handle, hence creating a new transient
+ *    slink or accessing an existing one
+ * 2(cont). repeat the previous step for multiple slinks (eg. one for
+ *    local and one for remote device access)
+ * 3. bind a (remote) device to a particlar slink created in a previous step
+ * 3(cont). repeat the device binding for any additional devices on that slink
+ * 4. bind the created slink which has device(s) bound to it to the replog
+ * 4(cont). repeat the slink binding to the replog for all created slinks
+ * 5. call the replog io function for each IO.
+ *
+ * Reverse steps 1-4 to tear a replication mapping down, hence freeing all
+ * transient resources allocated to it.
+ */
+
+#ifndef _DM_REPL_H
+#define _DM_REPL_H
+
+#include <linux/device-mapper.h>
+
+/* FIXME: factor these macros out to dm.h */
+#define	STR_LEN(ptr, str)	ptr, str, strlen(ptr)
+#define ARRAY_END(a)    ((a) + ARRAY_SIZE(a))
+#define	range_ok(i, min, max)   (i >= min && i <= max)
+
+#define	TI_ERR_RET(str, ret) \
+do { \
+	ti->error = DM_MSG_PREFIX ": " str; \
+	return ret; } \
+while (0)
+#define	TI_ERR(str)	TI_ERR_RET(str, -EINVAL)
+
+#define	DM_ERR_RET(ret, x...)	do { DMERR(x); return ret; } while (0)
+#define	DM_EINVAL(x...)	DM_ERR_RET(-EINVAL, x)
+#define	DM_EPERM(x...)	DM_ERR_RET(-EPERM, x)
+
+/*
+ * Minimum split_io of target to preset for local devices in repl_ctr().
+ * Will be adjusted while constructing (a) remote device(s).
+ */
+#define	DM_REPL_MIN_SPLIT_IO	BIO_MAX_SECTORS
+
+/* REMOVEME: deevel testing. */
+#if	0
+#define	SHOW_ARGV \
+	do { \
+		int i; \
+\
+		DMINFO("%s: called with the following args:", __func__); \
+		for (i = 0; i < argc; i++) \
+			DMINFO("%d: %s", i, argv[i]); \
+	} while (0)
+#else
+#define	SHOW_ARGV
+#endif
+
+
+/* Factor out to dm-bio-list.h */
+static inline void
+bio_list_push(struct bio_list *bl, struct bio *bio)
+{
+	bio->bi_next = bl->head;
+	bl->head = bio;
+
+	if (!bl->tail)
+		bl->tail = bio;
+}
+
+/* REMOVEME: development */
+#define	_BUG_ON_PTR(ptr) \
+	do { \
+		BUG_ON(!ptr); \
+		BUG_ON(IS_ERR(ptr)); \
+	} while (0)
+
+/* Callback function. */
+typedef void
+(*dm_repl_notify_fn)(int read_err, int write_err, void *context);
+
+/* Macros to access bitfields. o.flag */
+#define	BITOPS(name, var, flag) \
+static inline int \
+TestClear ## name(struct var *v) \
+{ return test_and_clear_bit(flag, &v->io.flags); } \
+static inline int \
+TestSet ## name(struct var *v) \
+{ return test_and_set_bit(flag, &v->io.flags); } \
+static inline void \
+Clear ## name(struct var *v) \
+{ clear_bit(flag, &v->io.flags); } \
+static inline void \
+Set ## name(struct var *v) \
+{ set_bit(flag, &v->io.flags); } \
+static inline int \
+name(struct var *v) \
+{ return test_bit(flag, &v->io.flags); }
+
+/* FIXME: move to dm core. */
+/* Search routines for descriptor arrays. */
+struct dm_str_descr {
+	const int type;
+	const char *name;
+};
+
+/* Return type for name. */
+extern int
+dm_descr_type(const struct dm_str_descr *descr, unsigned len, const char *name);
+/* Return name for type. */
+extern const char *
+dm_descr_name(const struct dm_str_descr *descr, unsigned len, const int type);
+
+#endif
-- 
1.6.2.5


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]