[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[dm-devel] [PATCH 3/6] dm-replicator: replication log ringbuffer handler module



From: Heinz Mauelshagen <heinzm redhat com>

This is the "rinbuffer" type replication log
plugging into the main replicator module.

It abstracts the handling of the log from the main module
allowing it to be log type agnostic and uses the abstracted
device access logic of the site link module.

Signed-off-by: Heinz Mauelshagen <heinzm redhat com>
Signed-off-by: Zdenek Kabelac <zkabelac redhat com>
---
 drivers/md/dm-repl-log-ringbuffer.c | 4872 +++++++++++++++++++++++++++++++++++
 drivers/md/dm-repl-log.h            |  120 +
 2 files changed, 4992 insertions(+), 0 deletions(-)
 create mode 100644 drivers/md/dm-repl-log-ringbuffer.c
 create mode 100644 drivers/md/dm-repl-log.h

diff --git a/drivers/md/dm-repl-log-ringbuffer.c b/drivers/md/dm-repl-log-ringbuffer.c
new file mode 100644
index 0000000..ff248fb
--- /dev/null
+++ b/drivers/md/dm-repl-log-ringbuffer.c
@@ -0,0 +1,4872 @@
+/*
+ * Copyright (C) 2008,2009  Red Hat, Inc. All rights reserved.
+ *
+ * Module Authors: Jeff Moyer (jmoyer redhat com)
+ *		   Heinz Mauelshagen (heinzm redhat com)
+ *
+ * This file is released under the GPL.
+ *
+ * "default" device-mapper replication log type implementing a ring buffer
+ * for write IOs, which will be copied accross site links to devices.
+ *
+ * A log like this allows for write coalescing enhancements in order
+ * to reduce network traffic at the cost of larger fallbehind windows.
+ */
+
+/*
+ * Locking:
+ * l->io.lock for io (de)queueing / slink manipulation
+ * l->lists.lock for copy contexts moved around lists
+ *
+ * The ring_buffer lock does not need to be held in order to take the io.lock,
+ * but if they are both acquired, the ordering must be as indicated above.
+ */
+
+#include "dm-repl.h"
+#include "dm-registry.h"
+#include "dm-repl-log.h"
+#include "dm-repl-slink.h"
+
+#include <linux/crc32.h>
+#include <linux/dm-io.h>
+#include <linux/kernel.h>
+#include <linux/version.h>
+
+static const char version[] = "v0.026";
+static struct dm_repl_log_type ringbuffer_type;
+
+static struct mutex list_mutex;
+
+#define	DM_MSG_PREFIX	"dm-repl-log-ringbuffer"
+#define	DAEMON		DM_MSG_PREFIX	"d"
+
+/* Maximum number of site links supported. */
+#define MAX_DEFAULT_SLINKS 	2048
+
+#define DEFAULT_BIOS	16 /* Default number of max bios -> ring buffer */
+
+#define	LOG_SIZE_MIN	(2 * BIO_MAX_SECTORS)
+#define	REGIONS_MAX	32768
+
+/* Later kernels have this macro in bitops.h */
+#ifndef for_each_bit
+#define for_each_bit(bit, addr, size) \
+	for ((bit) = find_first_bit((void *)(addr), (size)); \
+	     (bit) < (size); \
+	     (bit) = find_next_bit((void *)(addr), (size), (bit) + 1))
+#endif
+
+#define	_BUG_ON_SLINK_NR(l, slink_nr) \
+	do { \
+		BUG_ON(slink_nr < 0); \
+		BUG_ON(slink_nr >= l->slink.max); \
+	} while (0);
+
+/* Replicator log metadata version. */
+struct repl_log_version {
+	unsigned major;
+	unsigned minor;
+	unsigned subminor;
+};
+
+/*
+ *  Each version of the log code may get a separate source module, so
+ *  we store the version information in the .c file.
+ */
+#define DM_REPL_LOG_MAJOR	0
+#define DM_REPL_LOG_MINOR	0
+#define DM_REPL_LOG_MICRO	1
+
+#define DM_REPL_LOG_VERSION			\
+	{ DM_REPL_LOG_MAJOR,			\
+	  DM_REPL_LOG_MINOR,			\
+	  DM_REPL_LOG_MICRO, }
+
+struct version {
+	uint16_t	major;
+	uint16_t	minor;
+	uint16_t	subminor;
+} my_version = DM_REPL_LOG_VERSION;
+
+/* 1 */
+/* Shall be 16 bytes long */
+static const char log_header_magic[] = "dm-replicatorHJM";
+#define	MAGIC_LEN	(sizeof(log_header_magic) - 1)
+#define	HANDLER_LEN	MAGIC_LEN
+
+/* Header format on disk */
+struct log_header_disk {
+	uint8_t			magic[MAGIC_LEN];
+	uint32_t		crc;
+	struct version		version;
+	uint64_t		size;
+	uint64_t		buffer_header; /* sector of first
+						* buffer_header_disk */
+	uint8_t			handler_name[HANDLER_LEN];
+	/* Free space. */
+} __attribute__((__packed__));
+
+/* Macros for bitmap access. */
+#define	BITMAP_SIZE(l)	((l)->slink.bitmap_size)
+#define	BITMAP_ELEMS(l)	((l)->slink.bitmap_elems)
+#define	BITMAP_ELEMS_MAX	32
+
+/* Header format in core (only one of these per log device). */
+struct log_header {
+	struct repl_log_version version;
+	sector_t size;
+	sector_t buffer_header;
+
+	/* Bitarray of configured slinks to copy accross and those to I/O to. */
+	struct {
+		uint64_t slinks[BITMAP_ELEMS_MAX];
+		uint64_t ios[BITMAP_ELEMS_MAX];
+		uint64_t inaccessible[BITMAP_ELEMS_MAX];
+	} slink_bits;
+};
+#define LOG_SLINKS(l) ((void *) (l)->header.log->slink_bits.slinks)
+#define LOG_SLINKS_IOS(l) ((void *) (l)->header.log->slink_bits.ios)
+#define LOG_SLINKS_INACCESSIBLE(l) \
+	((void *)(l)->header.log->slink_bits.inaccessible)
+
+static void
+log_header_to_disk(unsigned slinks, void *d_ptr, void *c_ptr)
+{
+	struct log_header_disk *d = d_ptr;
+	struct log_header *c = c_ptr;
+
+	strncpy((char *) d->magic, log_header_magic, MAGIC_LEN);
+	strncpy((char *) d->handler_name,
+			 ringbuffer_type.type.name, HANDLER_LEN);
+	d->version.major = cpu_to_le16(c->version.major);
+	d->version.minor = cpu_to_le16(c->version.minor);
+	d->version.subminor = cpu_to_le16(c->version.subminor);
+	d->size = cpu_to_le64(c->size);
+	d->buffer_header = cpu_to_le64(c->buffer_header);
+	d->crc = 0;
+	d->crc = crc32(~0, d, sizeof(d));
+}
+
+static int
+log_header_to_core(unsigned slinks, void *c_ptr, void *d_ptr)
+{
+	int r;
+	uint32_t crc;
+	struct log_header *c = c_ptr;
+	struct log_header_disk *d = d_ptr;
+
+	r = strncmp((char *) d->magic, log_header_magic, MAGIC_LEN);
+	if (r)
+		return -EINVAL;
+
+	/* Check, if acceptible to this replication log handler. */
+	r = strncmp((char *) d->handler_name, ringbuffer_type.type.name,
+		    HANDLER_LEN);
+	if (r)
+		return -EINVAL;
+
+	c->version.major = le16_to_cpu(d->version.major);
+	c->version.minor = le16_to_cpu(d->version.minor);
+	c->version.subminor = le16_to_cpu(d->version.subminor);
+	c->size = le64_to_cpu(d->size);
+	c->buffer_header = le64_to_cpu(d->buffer_header);
+	crc = d->crc;
+	d->crc = 0;
+
+	return likely(crc == crc32(~0, d, sizeof(d))) ? 0 : -EINVAL;
+}
+
+/* 1a */
+static const char *buffer_header_magic = "dm-replbufferHJM";
+
+/*
+ * meta-data for the ring buffer, one per replog:
+ *
+ *   start: location on disk
+ *   head:  ring buffer head, first data item to be replicated
+ *   tail:  points to one after the last data item to be replicated
+ *
+ * The ring buffer is full of data_header(_disk) entries.
+ */
+struct buffer_header_disk {
+	uint8_t			magic[MAGIC_LEN];
+	uint32_t		crc;
+	struct buffer_disk {
+		uint64_t	start;
+		uint64_t	head;
+		uint64_t	tail;
+	} buffer;
+
+	uint64_t	flags;
+	/* Free space. */
+} __attribute__((__packed__));
+
+/*
+ * In-core format of the buffer_header_disk structure
+ *
+ * start, head, and tail are as described above for buffer_header_disk.
+ *
+ * next_avail points to the next available sector for placing a log entry.
+ *   It is important to distinguish this from tail, as we can issue I/O to
+ *   multiple log entries at a time.
+ *
+ * end is the end sector of the log device
+ *
+ * len is the total length of the log device, handy to keep around for maths
+ *   free represents the amount of free space in the log. This number
+ *   reflects the free space in the log given the current outstanding I/O's.
+ *   In other words, it is the distance between next_avail and head.
+ */
+/*
+ *  My guess is that this should be subsumed by the repl_log structure, as
+ *  much of the data is copied from there, anyway.  The question is just
+ *  how to organize it in a readable and efficient way.
+ */
+/* Ring state flag(s). */
+enum ring_status_type {
+	RING_BLOCKED,
+	RING_BUFFER_ERROR,
+	RING_BUFFER_DATA_ERROR,
+	RING_BUFFER_HEADER_ERROR,
+	RING_BUFFER_HEAD_ERROR,
+	RING_BUFFER_TAIL_ERROR,
+	RING_BUFFER_FULL,
+	RING_BUFFER_IO_QUEUED,
+	RING_SUSPENDED
+};
+
+/*
+ * Pools types for:
+ * o ring buffer entries
+ * o data headers.
+ * o disk data headers.
+ * o slink copy contexts
+ */
+enum ring_pool_type {
+	ENTRY,			/* Ring buffer entries. */
+	DATA_HEADER,		/* Ring buffer data headers. */
+	DATA_HEADER_DISK,	/* Ring buffer ondisk data headers. */
+	COPY_CONTEXT,		/* Context for any single slink copy. */
+	NR_RING_POOLS,
+};
+
+struct sector_range {
+	sector_t start;
+	sector_t end;
+} range;
+
+struct ring_buffer {
+	sector_t	start;	/* Start sector of the log space on disk. */
+	sector_t	head;	/* Sector of the first log entry. */
+	sector_t	tail;	/* Sector of the last valid log entry. */
+
+	struct mutex	mutex;	/* Mutex hold on member updates below. */
+
+	/* The following fields are useful to keep track of in-core state. */
+	sector_t	next_avail;	/* In-memory tail of the log. */
+	sector_t	end;		/* 1st sector past end of log device. */
+	sector_t	free;		/* Free space left in the log. */
+	sector_t	pending;	/* sectors queued but not allocated */
+
+	struct {
+		unsigned long flags;	/* Buffer state flags. */
+	} io;
+
+	/* Dirty sectors for slink0. */
+	struct sector_hash {
+		struct list_head *hash;
+		unsigned buckets;
+		unsigned mask;
+	} busy_sectors;
+
+	/* Waiting for all I/O to be flushed. */
+	wait_queue_head_t flushq;
+	mempool_t *pools[NR_RING_POOLS];
+};
+
+BITOPS(RingBlocked, ring_buffer, RING_BLOCKED)
+BITOPS(RingBufferError, ring_buffer, RING_BUFFER_ERROR)
+BITOPS(RingBufferDataError, ring_buffer, RING_BUFFER_DATA_ERROR)
+BITOPS(RingBufferHeaderError, ring_buffer, RING_BUFFER_HEADER_ERROR)
+BITOPS(RingBufferHeadError, ring_buffer, RING_BUFFER_HEAD_ERROR)
+BITOPS(RingBufferTailError, ring_buffer, RING_BUFFER_TAIL_ERROR)
+BITOPS(RingBufferFull, ring_buffer, RING_BUFFER_FULL)
+BITOPS(RingBufferIOQueued, ring_buffer, RING_BUFFER_IO_QUEUED)
+BITOPS(RingSuspended, ring_buffer, RING_SUSPENDED)
+
+#define CC_POOL_MIN 4
+#define HEADER_POOL_MIN 32
+#define ENTRY_POOL_MIN 32
+
+static void
+buffer_header_to_disk(unsigned slinks, void *d_ptr, void *c_ptr)
+{
+	struct buffer_header_disk *d = d_ptr;
+	struct ring_buffer *c = c_ptr;
+
+	strncpy((char *) d->magic, buffer_header_magic, MAGIC_LEN);
+	d->buffer.start = cpu_to_le64(to_bytes(c->start));
+	d->buffer.head = cpu_to_le64(to_bytes(c->head));
+	d->buffer.tail = cpu_to_le64(to_bytes(c->tail));
+	d->flags = cpu_to_le64(c->io.flags);
+	d->crc = 0;
+	d->crc = crc32(~0, d, sizeof(d));
+}
+
+static int
+buffer_header_to_core(unsigned slinks, void *c_ptr, void *d_ptr)
+{
+	int r;
+	uint32_t crc;
+	struct ring_buffer *c = c_ptr;
+	struct buffer_header_disk *d = d_ptr;
+
+	r = strncmp((char *) d->magic, buffer_header_magic, MAGIC_LEN);
+	if (r)
+		return -EINVAL;
+
+	c->start = to_sector(le64_to_cpu(d->buffer.start));
+	c->head = to_sector(le64_to_cpu(d->buffer.head));
+	c->tail = to_sector(le64_to_cpu(d->buffer.tail));
+	c->io.flags = le64_to_cpu(d->flags);
+	crc = d->crc;
+	d->crc = 0;
+	return likely(crc == crc32(~0, d, sizeof(d))) ? 0 : -EINVAL;
+}
+
+/* 3 */
+/* The requirement is to support devices with 4k sectors. */
+#define HEADER_SECTORS	to_sector(4096)
+
+static const char *data_header_magic = "dm-replicdataHJM";
+
+/* FIXME: XXX adjust for larger sector size! */
+#define	DATA_HEADER_DISK_SIZE	512
+enum entry_wrap_type { WRAP_NONE, WRAP_DATA, WRAP_NEXT };
+struct data_header_disk {
+	uint8_t	 magic[MAGIC_LEN];
+	uint32_t crc;
+	uint32_t filler;
+
+	struct {
+		/* Internal namespace to get rid of major/minor. -HJM */
+		uint64_t dev;
+		uint64_t offset;
+		uint64_t size;
+	} region;
+
+	/* Position of header and data on disk in bytes. */
+	struct {
+		uint64_t header; /* Offset of this header */
+		uint64_t data; /* Offset of data (ie. the bio). */
+	} pos;
+
+	uint8_t valid; /* FIXME: XXX this needs to be in memory copy, too */
+	uint8_t wrap;  /* Above enum entry_wrap_type. */
+	uint8_t barrier;/* Be prepared for write barrier support. */
+
+	/*
+	 * Free space: fill up to offset 256.
+	 */
+	uint8_t	filler1[189];
+
+	/* Offset 256! */
+	/* Bitmap, bit position set to 0 for uptodate slink */
+	uint64_t slink_bits[BITMAP_ELEMS_MAX];
+
+	/* Free space. */
+} __attribute__((__packed__));
+
+struct data_header {
+	struct list_head list;
+
+	/* Bitmap, bit position set to 0 for uptodate slink. */
+	uint64_t slink_bits[BITMAP_ELEMS_MAX];
+
+	/*
+	 * Reference count indicating the number of endios
+	 * expected while writing the header and bitmap.
+	 */
+	atomic_t cnt;
+
+	struct data_header_region {
+		/* dev, sector, and size are taken from the initial bio. */
+		unsigned long dev;
+		sector_t sector;
+		unsigned size;
+	} region;
+
+	/* Position of header and data on disk and size in sectors. */
+	struct {
+		sector_t header; /* sector of this header on disk */
+		sector_t data; /* Offset of data (ie. the bio). */
+		unsigned data_sectors; /* Useful for sector calculation. */
+	} pos;
+
+	/* Next data or complete entry wraps. */
+	enum entry_wrap_type wrap;
+};
+
+/* Round size in bytes up to multiples of HEADER_SECTORS. */
+enum distance_type { FULL_SECTORS, DATA_SECTORS };
+static inline sector_t
+_roundup_sectors(unsigned sectors, enum distance_type type)
+{
+	return HEADER_SECTORS *
+		(!!(type == FULL_SECTORS) + dm_div_up(sectors, HEADER_SECTORS));
+}
+
+/* Header + data. */
+static inline sector_t
+roundup_sectors(unsigned sectors)
+{
+	return _roundup_sectors(sectors, FULL_SECTORS);
+}
+
+/* Data only. */
+static inline sector_t
+roundup_data_sectors(unsigned sectors)
+{
+	return _roundup_sectors(sectors, DATA_SECTORS);
+}
+
+static void
+data_header_to_disk(unsigned bitmap_elems, void *d_ptr, void *c_ptr)
+{
+	unsigned i = bitmap_elems;
+	struct data_header_disk *d = d_ptr;
+	struct data_header *c = c_ptr;
+
+	BUG_ON(!i);
+
+	strncpy((char *) d->magic, data_header_magic, MAGIC_LEN);
+	d->region.dev =  cpu_to_le64(c->region.dev);
+	d->region.offset = cpu_to_le64(to_bytes(c->region.sector));
+	d->region.size = cpu_to_le64(c->region.size);
+	while (i--)
+		d->slink_bits[i] = cpu_to_le64(c->slink_bits[i]);
+
+	d->valid = 1;
+	d->wrap = c->wrap;
+	d->pos.header = cpu_to_le64(to_bytes(c->pos.header));
+	d->pos.data = cpu_to_le64(to_bytes(c->pos.data));
+	d->crc = 0;
+	d->crc = crc32(~0, d, sizeof(d));
+}
+
+static int
+data_header_to_core(unsigned bitmap_elems, void *c_ptr, void *d_ptr)
+{
+	int r;
+	unsigned i = bitmap_elems;
+	uint32_t crc;
+	struct data_header *c = c_ptr;
+	struct data_header_disk *d = d_ptr;
+
+	BUG_ON(!i);
+
+	r = strncmp((char *) d->magic, data_header_magic, MAGIC_LEN);
+	if (r)
+		return -EINVAL;
+
+	c->region.dev =  le64_to_cpu(d->region.dev);
+	c->region.sector = to_sector(le64_to_cpu(d->region.offset));
+	c->region.size =  le64_to_cpu(d->region.size);
+	while (i--)
+		c->slink_bits[i] = le64_to_cpu(d->slink_bits[i]);
+
+	c->pos.header = to_sector(le64_to_cpu(d->pos.header));
+	c->pos.data = to_sector(le64_to_cpu(d->pos.data));
+	c->pos.data_sectors = roundup_data_sectors(to_sector(c->region.size));
+	c->wrap = d->wrap;
+
+	if (unlikely(!d->valid) ||
+		     !c->region.size)
+		return -EINVAL;
+
+	crc = d->crc;
+	d->crc = 0;
+	return likely(crc == crc32(~0, d, sizeof(d))) ? 0 : -EINVAL;
+}
+
+static inline void
+slink_clear_bit(int bit, uint64_t *ptr)
+{
+	clear_bit(bit, (unsigned long *)ptr);
+}
+
+static inline int
+slink_test_bit(int bit, uint64_t *ptr)
+{
+	return test_bit(bit, (unsigned long *)ptr);
+}
+
+static inline void
+slink_set_bit(int bit, uint64_t *ptr)
+{
+	set_bit(bit, (unsigned long *)ptr);
+}
+
+/* entry list types and access macros. */
+enum entry_list_type {
+	E_BUSY_HASH,	/* Busys entries hash. */
+	E_COPY_CONTEXT,	/* Copyies accross slinks in progress for entry. */
+	E_ORDERED,	/* Ordered for advancing the ring buffer head. */
+	E_WRITE_OR_COPY,/* Add to l->lists.l[L_ENTRY_RING_WRITE/L_SLINK_COPY] */
+	E_NR_LISTS,
+};
+#define	E_BUSY_HASH_LIST(entry)		(entry->lists.l + E_BUSY_HASH)
+#define	E_COPY_CONTEXT_LIST(entry)	(entry->lists.l + E_COPY_CONTEXT)
+#define	E_ORDERED_LIST(entry)		(entry->lists.l + E_ORDERED)
+#define	E_WRITE_OR_COPY_LIST(entry)	(entry->lists.l + E_WRITE_OR_COPY)
+
+/*
+ * Container for the data_header and the associated data pages.
+ */
+struct ring_buffer_entry {
+	struct {
+		struct list_head l[E_NR_LISTS];
+	} lists;
+
+	struct ring_buffer *ring; /* Back pointer. */
+
+	/* Reference count. */
+	atomic_t ref;
+
+	/*
+	 * Reference count indicating the number of endios expected
+	 * while writing its header and data to the ring buffer log
+	 * -or- future use:
+	 * how many copies accross site links are active and how many
+	 * reads are being sattisfied from the entry.
+	 */
+	atomic_t endios;
+
+	struct entry_data {
+		struct data_header *header;
+		struct data_header_disk *disk_header;
+		struct {
+			unsigned long data;
+			unsigned long header;
+		} error;
+	} data;
+
+	struct {
+		struct bio *read;	/* bio to read. */
+		struct bio *write;	/* Original bio to write. */
+	} bios;
+
+	int being_copied;
+
+	struct {
+		/* Bitmask of slinks the entry has active copies accross. */
+		uint64_t ios[BITMAP_ELEMS_MAX];
+		/* Bitmask of synchronuous slinks for endio. */
+		uint64_t sync[BITMAP_ELEMS_MAX];
+		/* Bitmask of slinks with errors. */
+		uint64_t error[BITMAP_ELEMS_MAX];
+	} slink_bits;
+};
+#define ENTRY_SLINKS(l) ((void *) (entry)->data.header->slink_bits)
+#define ENTRY_IOS(entry) ((void *) (entry)->slink_bits.ios)
+#define ENTRY_SYNC(entry) ((entry)->slink_bits.sync)
+#define ENTRY_ERROR(entry) ((entry)->slink_bits.error)
+
+/* FIXME: XXX
+ * For now, the copy context has a backpointer to the ring buffer entry.
+ * This means that a ring buffer entry has to remain in memory until all
+ * of the slink copies have finished.  Heinz, you mentioned that this was
+ * not a good idea.  I'm open to suggestions on how better to organize this.
+ */
+enum error_type { ERR_DISK, ERR_RAM, NR_ERR_TYPES };
+struct slink_copy_error {
+	int read;
+	int write;
+};
+
+struct slink_copy_context {
+	/*
+	 * List first points to the copy context list in the ring buffer
+	 * entry.  Then, upon completion it gets moved to the slink endios
+	 * list.
+	 */
+	struct list_head list;
+	atomic_t cnt;
+	struct ring_buffer_entry *entry;
+	struct dm_repl_slink *slink;
+	struct slink_copy_error error[NR_ERR_TYPES];
+	unsigned long start_jiffies;
+};
+
+/* Development statistics. */
+struct stats {
+	atomic_t io[2];
+	atomic_t writes_pending;
+	atomic_t hash_elem;
+
+	unsigned copy[2];
+	unsigned wrap;
+	unsigned hash_insert;
+	unsigned hash_insert_max;
+	unsigned stall;
+};
+
+/* Per site link measure/state. */
+enum slink_status_type {
+	SS_SYNC,	/* slink fell behind an I/O threshold. */
+	SS_TEARDOWN,	/* Flag site link teardown. */
+};
+struct slink_state {
+	unsigned slink_nr;
+	struct repl_log *l;
+
+	struct {
+
+		/*
+		 * Difference of time (measured in jiffies) between the
+		 * first outstanding copy for this slink and the last
+		 * outstanding copy.
+		 */
+		unsigned long head_jiffies;
+
+		/* Number of ios/sectors currently copy() to this slink. */
+		struct {
+			sector_t sectors;
+			uint64_t ios;
+		} outstanding;
+	} fb;
+
+	struct {
+		unsigned long flags; /* slink_state flags._*/
+
+		/* slink+I/O teardown synchronization. */
+		wait_queue_head_t waiters;
+		atomic_t in_flight;
+	} io;
+};
+BITOPS(SsSync, slink_state, SS_SYNC)
+BITOPS(SsTeardown, slink_state, SS_TEARDOWN)
+
+enum open_type { OT_AUTO, OT_OPEN, OT_CREATE };
+enum replog_status_type { LOG_DEVEL_STATS, LOG_INITIALIZED };
+
+/* repl_log list types and access macros. */
+enum replog_list_type {
+	L_REPLOG,		/* Linked list of replogs. */
+	L_SLINK_COPY,		/* Entries to copy accross slinks. */
+	L_SLINK_ENDIO,		/* Entries to endio process. */
+	L_ENTRY_RING_WRITE,	/* Entries to write to ring buffer */
+	L_ENTRY_ORDERED,	/* Ordered list of all entries. */
+	L_NR_LISTS,
+};
+#define	L_REPLOG_LIST(l)		(l->lists.l + L_REPLOG)
+#define	L_SLINK_COPY_LIST(l)		(l->lists.l + L_SLINK_COPY)
+#define	L_SLINK_ENDIO_LIST(l)		(l->lists.l + L_SLINK_ENDIO)
+#define	L_ENTRY_RING_WRITE_LIST(l)	(l->lists.l + L_ENTRY_RING_WRITE)
+#define	L_ENTRY_ORDERED_LIST(l)		(l->lists.l + L_ENTRY_ORDERED)
+
+/* The replication log in core. */
+struct repl_log {
+	struct dm_repl_log *log;
+
+	struct kref ref;	/* Pin count. */
+
+	struct dm_repl_log *replog;
+	struct dm_repl_slink *slink0;
+
+	struct stats stats;	/* Development statistics. */
+
+	struct repl_params {
+		enum open_type open_type;
+		unsigned count;
+		struct repl_dev {
+			struct dm_dev *dm_dev;
+			sector_t start;
+			sector_t size;
+		} dev;
+	} params;
+
+	struct {
+		spinlock_t lock; /* Lock on pending list below. */
+		struct bio_list in; /* pending list of bios */
+		struct dm_io_client *io_client;
+		struct workqueue_struct *wq;
+		struct work_struct ws;
+		unsigned long flags;	/* State flags. */
+		/* Preallocated header. We only need one at a time.*/
+		struct buffer_header_disk *buffer_header_disk;
+	} io;
+
+	struct ring_buffer ring_buffer;
+
+	/* Useful for runtime performance on bitmap accesses. */
+	struct {
+		int count;	/* Actual # of slinks in this replog. */
+		unsigned max;	/* Actual maximum added site link #. */
+		unsigned bitmap_elems;	/* Actual used elements in bitmaps. */
+		unsigned bitmap_size;	/* Actual bitmap size (for memcpy). */
+	} slink;
+
+	struct {
+		struct log_header *log;
+	} header;
+
+	struct {
+		/* List of site links. */
+		struct dm_repl_log_slink_list slinks;
+
+		/*
+		 * A single lock for all of these lists should be sufficient
+		 * given that each list is processed in-turn (see do_log()).
+		 *
+		 * The lock has to protect the L_SLINK_ENDIO list
+		 * and the entry ring write lists below.
+		 *
+		 * We got to streamline these lists vs. the lock. -HJM
+		 * The others are accessed by one thread only. -HJM
+		 */
+		rwlock_t	lock;
+
+		/*
+		 * Lists for entry slink copies, entry endios,
+		 * ring buffer writes and ordered entries.
+		 */
+		struct list_head l[L_NR_LISTS];
+	} lists;
+
+	/* Caller callback function and context. */
+	struct replog_notify {
+		dm_repl_notify_fn fn;
+		void *context;
+	} notify;
+};
+#define _SET_AND_BUG_ON_L(l, log) \
+	do { \
+		_BUG_ON_PTR(log); \
+		(l) = (log)->context; \
+		_BUG_ON_PTR(l); \
+	} while (0);
+
+/* Define log bitops. */
+BITOPS(LogDevelStats, repl_log, LOG_DEVEL_STATS);
+BITOPS(LogInitialized, repl_log, LOG_INITIALIZED);
+
+/* Check MAX_SLINKS bit array for busy bits. */
+static inline int
+entry_busy(struct repl_log *l, void *bits)
+{
+	return find_first_bit(bits, l->slink.max) < l->slink.max;
+}
+
+static inline int
+ss_io(struct slink_state *ss)
+{
+	_BUG_ON_PTR(ss);
+	return atomic_read(&ss->io.in_flight);
+}
+
+static void
+ss_io_get(const char *caller, struct slink_state *ss)
+{
+	if (!ss || IS_ERR(ss)) {
+		DMERR("%s", caller);
+		BUG();
+	}
+
+	atomic_inc(&ss->io.in_flight);
+}
+
+static void
+ss_io_put(const char *caller, struct slink_state *ss)
+{
+	_BUG_ON_PTR(ss);
+	if (atomic_dec_and_test((&ss->io.in_flight)))
+		wake_up(&ss->io.waiters);
+	else if (ss_io(ss) < 0) {
+		DMERR("%s", caller);
+		BUG();
+	}
+}
+
+static void
+ss_wait_on_io(struct slink_state *ss)
+{
+	_BUG_ON_PTR(ss);
+	while (ss_io(ss)) {
+		flush_workqueue(ss->l->io.wq);
+		wait_event(ss->io.waiters, !ss_io(ss));
+	}
+}
+
+/* Wait for I/O to finish on all site links. */
+static inline void
+ss_all_wait_on_ios(struct repl_log *l)
+{
+	unsigned long slink_nr;
+
+	for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+		struct dm_repl_slink *slink =
+			l->slink0->ops->slink(l->replog, slink_nr);
+		struct slink_state *ss;
+
+		if (IS_ERR(slink)) {
+			DMERR_LIMIT("%s slink error", __func__);
+			continue;
+		}
+
+		ss = slink->caller;
+		_BUG_ON_PTR(ss);
+		ss_wait_on_io(ss);
+	}
+}
+
+static inline struct repl_log *
+ring_buffer_repl_log(struct ring_buffer *ring)
+{
+	return container_of(ring, struct repl_log, ring_buffer);
+}
+
+static inline struct block_device *
+repl_log_bdev(struct repl_log *l)
+{
+	return l->params.dev.dm_dev->bdev;
+}
+
+static inline struct block_device *
+ring_buffer_bdev(struct ring_buffer *ring)
+{
+	return repl_log_bdev(ring_buffer_repl_log(ring));
+}
+
+static inline struct dm_io_client *
+replog_io_client(struct repl_log *l)
+{
+	return l->io.io_client;
+}
+
+static inline struct repl_log *
+dev_repl_log(struct repl_dev *dev)
+{
+	return container_of(dev, struct repl_log, params.dev);
+}
+
+/* Define mempool_{alloc,free}() functions for the ring buffer pools. */
+#define	ALLOC_FREE_ELEM(name, type) \
+static void *\
+alloc_ ## name(struct ring_buffer *ring) \
+{ \
+	return mempool_alloc(ring->pools[(type)], GFP_KERNEL); \
+} \
+\
+static inline void \
+free_ ## name(void *ptr, struct ring_buffer *ring) \
+{ \
+	_BUG_ON_PTR(ptr); \
+	mempool_free(ptr, ring->pools[(type)]); \
+}
+
+ALLOC_FREE_ELEM(entry, ENTRY)
+ALLOC_FREE_ELEM(header, DATA_HEADER)
+ALLOC_FREE_ELEM(data_header_disk, DATA_HEADER_DISK)
+ALLOC_FREE_ELEM(copy_context, COPY_CONTEXT)
+#undef ALLOC_FREE_ELEM
+
+/* Additional alloc/free functions for header_io() abstraction. */
+/* No need to allocate bitmaps, because tehy are transient. */
+static void *
+alloc_log_header_disk(struct ring_buffer *ring)
+{
+	return kmalloc(to_bytes(1), GFP_KERNEL);
+}
+
+static void
+free_log_header_disk(void *ptr, struct ring_buffer *ring)
+{
+	kfree(ptr);
+}
+
+/* Dummies to allow for abstraction. */
+static void *
+alloc_buffer_header_disk(struct ring_buffer *ring)
+{
+	return ring_buffer_repl_log(ring)->io.buffer_header_disk;
+}
+
+static void
+free_buffer_header_disk(void *ptr, struct ring_buffer *ring)
+{
+}
+
+/*********************************************************************
+ * Busys entries hash.
+ */
+/* Initialize/destroy sector hash. */
+static int
+sector_hash_init(struct sector_hash *hash, sector_t size)
+{
+	unsigned buckets = roundup_pow_of_two(size / BIO_MAX_SECTORS);
+
+	if (buckets > 4) {
+		if (buckets > REGIONS_MAX)
+			buckets = REGIONS_MAX;
+
+		buckets /= 4;
+	}
+
+	/* Allocate stripe hash. */
+	hash->hash = vmalloc(buckets * sizeof(*hash->hash));
+	if (!hash->hash)
+		return -ENOMEM;
+
+	hash->buckets = hash->mask = buckets;
+	hash->mask--;
+
+	/* Initialize buckets. */
+	while (buckets--)
+		INIT_LIST_HEAD(hash->hash + buckets);
+
+	return 0;
+}
+
+static void
+sector_hash_exit(struct sector_hash *hash)
+{
+	if (hash->hash) {
+		vfree(hash->hash);
+		hash->hash = NULL;
+	}
+}
+
+/* Hash function. */
+static inline struct list_head *
+hash_bucket(struct sector_hash *hash, sector_t sector)
+{
+	sector_div(sector, BIO_MAX_SECTORS);
+	return hash->hash + (unsigned) (sector & hash->mask);
+}
+
+/* Insert an entry into a sector hash. */
+static inline void
+sector_hash_elem_insert(struct sector_hash *hash,
+			struct ring_buffer_entry *entry)
+{
+	struct repl_log *l;
+	struct stats *s;
+	struct list_head *bucket =
+		hash_bucket(hash, entry->data.header->region.sector);
+
+	BUG_ON(!bucket);
+	_BUG_ON_PTR(entry->ring);
+	l = ring_buffer_repl_log(entry->ring);
+	s = &l->stats;
+
+	BUG_ON(!list_empty(E_BUSY_HASH_LIST(entry)));
+	list_add_tail(E_BUSY_HASH_LIST(entry), bucket);
+
+	/* REMOVEME: statistics. */
+	atomic_inc(&s->hash_elem);
+	if (++s->hash_insert > s->hash_insert_max)
+		s->hash_insert_max = s->hash_insert;
+}
+
+/* Return first sector # of bio. */
+static inline sector_t
+bio_begin(struct bio *bio)
+{
+	return bio->bi_sector;
+}
+
+/* Return last sector # of bio. */
+static inline sector_t bio_end(struct bio *bio)
+{
+	return bio_begin(bio) + bio_sectors(bio);
+}
+
+/* Return bio vector. */
+static inline struct bio_vec *
+bio_vec(struct bio *bio)
+{
+	return bio->bi_io_vec + bio->bi_idx;
+}
+
+/* Roundup size to sectors. */
+static inline sector_t round_up_to_sector(unsigned size)
+{
+	return to_sector(dm_round_up(size, to_bytes(1)));
+}
+
+/* Check if a bio and a range overlap. */
+static inline int
+_ranges_overlap(struct sector_range *r1, struct sector_range *r2)
+{
+	return r1->start >= r2->start &&
+	       r1->start < r2->end;
+}
+
+static inline int
+ranges_overlap(struct sector_range *elem_range, struct sector_range *bio_range)
+{
+	return _ranges_overlap(elem_range, bio_range) ||
+	       _ranges_overlap(bio_range, elem_range);
+}
+
+/* Take an entry ref reference out. */
+static inline void
+entry_get(struct ring_buffer_entry *entry)
+{
+	atomic_inc(&entry->ref);
+}
+
+/*
+ * Check if bio's address range has writes pending.
+ *
+ * Must be called with the read hash lock held.
+ */
+static int
+ring_buffer_writes_pending(struct sector_hash *hash, struct bio *bio,
+			   struct list_head *buckets[2])
+{
+	int r = 0;
+	unsigned end, i;
+	struct ring_buffer_entry *entry;
+	/* Setup a range for the bio. */
+	struct sector_range bio_range = {
+		.start = bio_begin(bio),
+		.end = bio_end(bio),
+	}, entry_range;
+
+	buckets[0] = hash_bucket(hash, bio_range.start);
+	buckets[1] = hash_bucket(hash, bio_range.end);
+	if (buckets[0] == buckets[1]) {
+		end = 1;
+		buckets[1] = NULL;
+	} else
+		end = 2;
+
+	for (i = 0; i < end; i++) {
+		/* Walk the entries checking for overlaps. */
+		list_for_each_entry_reverse(entry, buckets[i],
+					    lists.l[E_BUSY_HASH]) {
+			entry_range.start = entry->data.header->region.sector;
+			entry_range.end = entry_range.start +
+			round_up_to_sector(entry->data.header->region.size);
+
+			if (ranges_overlap(&entry_range, &bio_range))
+				return atomic_read(&entry->endios) ? -EBUSY : 1;
+		}
+	}
+
+	return r;
+}
+
+/* Clear a sector range busy. */
+static void
+entry_put(struct ring_buffer_entry *entry)
+{
+	_BUG_ON_PTR(entry);
+
+	if (atomic_dec_and_test(&entry->ref)) {
+		struct ring_buffer *ring = entry->ring;
+		struct stats *s;
+		struct repl_log *l;
+
+		_BUG_ON_PTR(ring);
+		l = ring_buffer_repl_log(ring);
+		s = &l->stats;
+
+		/*
+		 * We don't need locking here because the last
+		 * put is carried out in daemon context.
+		 */
+		BUG_ON(list_empty(E_BUSY_HASH_LIST(entry)));
+		list_del_init(E_BUSY_HASH_LIST(entry));
+
+		/* REMOVEME: statistics. */
+		atomic_dec(&s->hash_elem);
+		s->hash_insert--;
+	} else
+		BUG_ON(atomic_read(&entry->ref) < 0);
+}
+
+static inline void
+sector_range_clear_busy(struct ring_buffer_entry *entry)
+{
+	entry_put(entry);
+}
+
+/*
+ * Mark a sector range start and length busy.
+ *
+ * Caller has to serialize calls.
+ */
+static void
+sector_range_mark_busy(struct ring_buffer_entry *entry)
+{
+	_BUG_ON_PTR(entry);
+	entry_get(entry);
+
+	/* Insert new element into hash. */
+	sector_hash_elem_insert(&entry->ring->busy_sectors, entry);
+}
+
+static void
+stats_init(struct repl_log *l)
+{
+	unsigned i = 2;
+	struct stats *s = &l->stats;
+
+	memset(s, 0, sizeof(*s));
+
+	while (i--)
+		atomic_set(s->io + i, 0);
+
+	atomic_set(&s->writes_pending, 0);
+	atomic_set(&s->hash_elem, 0);
+}
+
+/* Global replicator log list. */
+LIST_HEAD(replog_list);
+
+/* Wake worker. */
+static void
+wake_do_log(struct repl_log *l)
+{
+	queue_work(l->io.wq, &l->io.ws);
+}
+
+struct dm_repl_slink *
+slink_find(struct repl_log *l, int slink_nr)
+{
+	struct dm_repl_slink *slink0 = l->slink0;
+
+	if (!slink0)
+		return ERR_PTR(-ENOENT);
+
+	_BUG_ON_SLINK_NR(l, slink_nr);
+	return slink_nr ? slink0->ops->slink(l->replog, slink_nr) : slink0;
+}
+
+/*
+ * If an slink is asynchronous, check to see if it needs to fall
+ * back to synchronous mode due to falling too far behind.
+ *
+ * Declare a bunch of fallbehind specific small functions in order
+ * to avoid conditions in the fast path by accessing them via
+ * function pointers.
+ */
+/* True if slink exceeds fallbehind threshold. */
+static int
+slink_fallbehind_exceeded(struct repl_log *l, struct slink_state *ss,
+			  struct dm_repl_slink_fallbehind *fallbehind,
+			  unsigned amount)
+{
+	sector_t *sectors;
+	uint64_t *ios;
+	unsigned long *head_jiffies;
+
+	_BUG_ON_PTR(l);
+	_BUG_ON_PTR(ss);
+	_BUG_ON_PTR(fallbehind);
+	ios = &ss->fb.outstanding.ios;
+	sectors = &ss->fb.outstanding.sectors;
+
+	spin_lock(&l->io.lock);
+	(*ios)++;
+	(*sectors) += amount;
+	spin_unlock(&l->io.lock);
+
+	if (!fallbehind->value)
+		return 0;
+
+	switch (fallbehind->type) {
+	case DM_REPL_SLINK_FB_IOS:
+		return *ios > fallbehind->value;
+
+	case DM_REPL_SLINK_FB_SIZE:
+		return *sectors > fallbehind->value;
+
+	case DM_REPL_SLINK_FB_TIMEOUT:
+		head_jiffies = &ss->fb.head_jiffies;
+		if (unlikely(!*head_jiffies))
+			*head_jiffies = jiffies;
+
+		return time_after(jiffies, *head_jiffies +
+				  msecs_to_jiffies(fallbehind->value));
+
+	default:
+		BUG();
+	}
+
+	return 0;
+}
+
+/*
+ * True if slink falls below fallbehind threshold.
+ *
+ * Can be called from interrupt context.
+ */
+static int
+slink_fallbehind_recovered(struct repl_log *l, struct slink_state *ss,
+			   struct dm_repl_slink_fallbehind *fallbehind,
+			   unsigned amount)
+{
+	sector_t *sectors;
+	uint64_t *ios;
+
+	_BUG_ON_PTR(ss);
+	_BUG_ON_PTR(fallbehind);
+	ios = &ss->fb.outstanding.ios;
+	sectors = &ss->fb.outstanding.sectors;
+
+	/* Need the non-irq versions here, because IRQs are already disabled. */
+	spin_lock(&l->io.lock);
+	(*ios)--;
+	(*sectors) -= amount;
+	spin_unlock(&l->io.lock);
+
+	if (!fallbehind->value)
+		return 0;
+
+	switch (fallbehind->type) {
+	case DM_REPL_SLINK_FB_IOS:
+		return *ios <= fallbehind->value;
+
+	case DM_REPL_SLINK_FB_SIZE:
+		return *sectors <= fallbehind->value;
+
+	case DM_REPL_SLINK_FB_TIMEOUT:
+		return time_before(jiffies, ss->fb.head_jiffies +
+				   msecs_to_jiffies(fallbehind->value));
+	default:
+		BUG();
+	}
+
+	return 0;
+}
+
+/*
+ * Update fallbehind account.
+ *
+ * Has to be called with rw lock held.
+ */
+/* FIXME: account for resynchronization. */
+enum fb_update_type { UPD_INC, UPD_DEC };
+static void
+slink_fallbehind_update(enum fb_update_type type,
+			struct dm_repl_slink *slink,
+			struct ring_buffer_entry *entry)
+{
+	int slink_nr, sync;
+	struct repl_log *l;
+	struct slink_state *ss;
+	struct data_header_region *region;
+	struct dm_repl_slink_fallbehind *fallbehind;
+	struct ring_buffer_entry *pos;
+
+	_BUG_ON_PTR(slink);
+	fallbehind = slink->ops->fallbehind(slink);
+	_BUG_ON_PTR(fallbehind);
+	_BUG_ON_PTR(entry);
+	l = ring_buffer_repl_log(entry->ring);
+	_BUG_ON_PTR(l);
+	slink_nr = slink->ops->slink_number(slink);
+	_BUG_ON_SLINK_NR(l, slink_nr);
+	region = &entry->data.header->region;
+	_BUG_ON_PTR(region);
+
+	/*
+	 * We can access ss w/o a lock, because it's referenced by
+	 * inflight I/Os and by the running worker which processes
+	 * this function.
+	 */
+	ss = slink->caller;
+	if (!ss)
+		return;
+
+	_BUG_ON_PTR(ss);
+	sync = SsSync(ss);
+
+	switch (type) {
+	case UPD_INC:
+		if (slink_fallbehind_exceeded(l, ss, fallbehind,
+					      region->size) &&
+		    !TestSetSsSync(ss) &&
+		    !sync)
+			DMINFO("enforcing fallbehind sync on slink=%d at %u",
+			       slink_nr, jiffies_to_msecs(jiffies));
+		break;
+
+	case UPD_DEC:
+		/*
+		 * Walk the list of outstanding copy I/Os and update the
+		 * start_jiffies value with the first entry found.
+		 */
+		list_for_each_entry(pos, L_SLINK_COPY_LIST(l),
+				    lists.l[E_WRITE_OR_COPY]) {
+			struct slink_copy_context *cc;
+
+			list_for_each_entry(cc, E_COPY_CONTEXT_LIST(pos),
+					    list) {
+				if (cc->slink->ops->slink_number(cc->slink) ==
+				    slink_nr) {
+					ss->fb.head_jiffies = cc->start_jiffies;
+					break;
+				}
+			}
+		}
+
+		if (slink_fallbehind_recovered(l, ss, fallbehind,
+					       region->size)) {
+			ss->fb.head_jiffies = 0;
+
+			if (TestClearSsSync(ss) && sync) {
+				DMINFO("releasing fallbehind sync on slink=%d"
+				       " at %u",
+				       slink_nr, jiffies_to_msecs(jiffies));
+				wake_do_log(l);
+			}
+		}
+
+		break;
+
+	default:
+		BUG();
+	}
+}
+
+static inline void
+slink_fallbehind_inc(struct dm_repl_slink *slink,
+		     struct ring_buffer_entry *entry)
+{
+	slink_fallbehind_update(UPD_INC, slink, entry);
+}
+
+static inline void
+slink_fallbehind_dec(struct dm_repl_slink *slink,
+		     struct ring_buffer_entry *entry)
+{
+	slink_fallbehind_update(UPD_DEC, slink, entry);
+}
+
+/* Caller properties definition for dev_io(). */
+struct dev_io_params {
+	struct repl_dev *dev;
+	sector_t sector;
+	unsigned size;
+	struct dm_io_memory mem;
+	struct dm_io_notify notify;
+	unsigned long flags;
+};
+
+/*
+ * Read/write device items.
+ *
+ * In case of dio->fn, an asynchronous dm_io()
+ * call will be performed, else synchronous.
+ */
+static int
+dev_io(int rw, struct ring_buffer *ring, struct dev_io_params *dio)
+{
+	BUG_ON(dio->size > BIO_MAX_SIZE);
+	DBG_DMINFO("%s: rw: %d, %lu sectors at sector %lu, dev %p",
+		   __func__, rw, where.count, where.sector, where.bdev);
+
+	/* Flag IO queued on asynchronous calls. */
+	if (dio->notify.fn)
+		SetRingBufferIOQueued(ring);
+
+	return dm_io(
+		&(struct dm_io_request) {
+			.bi_rw = rw,
+			.mem = dio->mem,
+			.notify = dio->notify,
+			.client = replog_io_client(dev_repl_log(dio->dev))
+		}, 1 /* 1 region following */,
+		&(struct dm_io_region) {
+			.bdev = dio->dev->dm_dev->bdev,
+			.sector = dio->sector,
+			.count = round_up_to_sector(dio->size),
+		},
+		NULL
+	);
+}
+
+/* Definition of properties/helper functions for header IO. */
+struct header_io_spec {
+	const char *name;	/* Header identifier (eg. 'data'). */
+	unsigned size;		/* Size of ondisk structure. */
+	/* Disk structure allocation helper. */
+	void *(*alloc_disk)(struct ring_buffer *);
+	/* Disk structure deallocation helper. */
+	void (*free_disk)(void *, struct ring_buffer *);
+	/* Disk structure to core structure xfer helper. */
+	int (*to_core_fn)(unsigned bitmap_elems, void *, void *);
+	/* Core structure to disk structure xfer helper. */
+	void (*to_disk_fn)(unsigned bitmap_elems, void *, void *);
+};
+/* Macro to initialize type specific header_io_spec structure. */
+#define	IO_SPEC(header) \
+	{ .name = # header, \
+	  .size = sizeof(struct header ## _header_disk), \
+	  .alloc_disk = alloc_ ## header ## _header_disk, \
+	  .free_disk = free_ ## header ## _header_disk, \
+	  .to_core_fn = header ## _header_to_core, \
+	  .to_disk_fn = header ## _header_to_disk }
+
+enum header_type { IO_LOG, IO_BUFFER, IO_DATA };
+struct header_io_params {
+	enum header_type type;
+	struct repl_log *l;
+	void *core_header;
+	sector_t sector;
+	void (*disk_header_fn)(void *);
+};
+
+/* Read /write a {log,buffer,data} header to disk. */
+static int
+header_io(int rw, struct header_io_params *hio)
+{
+	int r;
+	struct repl_log *l = hio->l;
+	struct ring_buffer *ring = &l->ring_buffer;
+	/* Specs of all log headers. Must be in 'enum header_type' order! */
+	static const struct header_io_spec io_specs[] = {
+		IO_SPEC(log),
+		IO_SPEC(buffer),
+		IO_SPEC(data),
+	};
+	const struct header_io_spec *io = io_specs + hio->type;
+	void *disk_header = io->alloc_disk(ring);
+	struct dev_io_params dio = {
+		&hio->l->params.dev, hio->sector, io->size,
+		.mem = { DM_IO_KMEM, { .addr = disk_header }, 0 },
+		.notify = { NULL, NULL}
+	};
+
+	BUG_ON(io < io_specs || io >= ARRAY_END(io_specs));
+	BUG_ON(!hio->core_header);
+	BUG_ON(!disk_header);
+	memset(disk_header, 0, io->size);
+
+	if (rw == WRITE) {
+		io->to_disk_fn(BITMAP_ELEMS(l), disk_header, hio->core_header);
+
+		/*  If disk header needs special handling before write. */
+		if (hio->disk_header_fn)
+			hio->disk_header_fn(disk_header);
+	}
+
+	r = dev_io(rw, ring, &dio);
+	if (unlikely(r)) {
+		SetRingBufferError(ring);
+		DMERR("Failed to %s %s header!",
+		      rw == WRITE ? "write" : "read", io->name);
+	} else if (rw == READ) {
+		r = io->to_core_fn(BITMAP_ELEMS(l), hio->core_header,
+				   disk_header);
+		if (unlikely(r))
+			DMERR("invalid %s header/sector=%llu",
+			      io->name, (unsigned long long) hio->sector);
+	}
+
+	io->free_disk(disk_header, ring);
+	return r;
+}
+
+/* Read/write the log header synchronously. */
+static inline int
+log_header_io(int rw, struct repl_log *l)
+{
+	return header_io(rw, &(struct header_io_params) {
+			 IO_LOG, l, l->header.log, l->params.dev.start, NULL });
+}
+
+/* Read/write the ring buffer header synchronously. */
+static inline int
+buffer_header_io(int rw, struct repl_log *l)
+{
+	return header_io(rw, &(struct header_io_params) {
+			 IO_BUFFER, l, &l->ring_buffer,
+			 l->header.log->buffer_header, NULL });
+}
+
+/* Read/write a data header to/from the ring buffer synchronously. */
+static inline int
+data_header_io(int rw, struct repl_log *l,
+	       struct data_header *header, sector_t sector)
+{
+	return header_io(rw, &(struct header_io_params) {
+			 IO_DATA, l, header, sector, NULL });
+}
+
+/* Notify dm-repl.c to submit more IO. */
+static void
+notify_caller(struct repl_log *l, int rw, int error)
+{
+	struct replog_notify notify;
+
+	_BUG_ON_PTR(l);
+
+	spin_lock(&l->io.lock);
+	notify = l->notify;
+	spin_unlock(&l->io.lock);
+
+	if (likely(notify.fn)) {
+		if (rw == READ)
+			notify.fn(error, 0, notify.context);
+		else
+			notify.fn(0, error, notify.context);
+	}
+}
+
+/*
+ * Ring buffer routines.
+ *
+ * The ring buffer needs to keep track of arbitrarily-sized data items.
+ * HEAD points to the first data header that needs to be replicated.  This
+ * can mean it has been partially replicated or not replicated at all.
+ * The ring buffer is empty if HEAD == TAIL.
+ * The ring buffer is full if HEAD == TAIL + len(TAIL) modulo device size.
+ *
+ * An entry in the buffer is not valid until both the data header and the
+ * associated data items are on disk.  Multiple data headers and data items
+ * may be written in parallel.  This means that, in addition to the
+ * traditional HEAD and TAIL pointers, we need to keep track of an in-core
+ * variable reflecting the next area in the log that is unallocated.  We also
+ * need to keep an ordered list of pending and completd buffer entry writes.
+ */
+/*
+ * Check and wrap a ring buffer offset around ring buffer end.
+ *
+ * There are three cases to distinguish here:
+ * 1. header and data fit before ring->end
+ * 2. header fits before ring->end, data doesn't -> remap data to ring->start
+ * 3. header doesn't fit before ring->end -> remap both to ring->start
+ *
+ * Function returns the next rounded offset *after* any
+ * conditional remapping of the actual header.
+ *
+ */
+static sector_t
+sectors_unused(struct ring_buffer *ring, sector_t first_free)
+{
+	return (ring->end < first_free) ? 0 : ring->end - first_free;
+}
+
+/*
+ * Return the first sector past the end of the header
+ * (i.e. the first data sector).
+ */
+static inline sector_t
+data_start(struct data_header *header)
+{
+	return header->pos.header + HEADER_SECTORS;
+}
+
+/*
+ * Return the first sector past the end of the entry.
+ * (i.e.(the first unused sector).
+ */
+static inline sector_t
+next_start(struct data_header *header)
+{
+	return header->pos.data + header->pos.data_sectors;
+}
+
+static inline sector_t
+next_start_adjust(struct ring_buffer *ring, struct data_header *header)
+{
+	sector_t next_sector = next_start(header);
+
+	return likely(sectors_unused(ring, next_sector) < HEADER_SECTORS) ?
+	       ring->start : next_sector;
+}
+
+/* True if entry doesn't wrap. */
+static inline int
+not_wrapped(struct data_header *header)
+{
+	return header->wrap == WRAP_NONE;
+}
+
+/* True if header at ring end and data wrapped to ring start. */
+static inline int
+data_wrapped(struct data_header *header)
+{
+	return header->wrap == WRAP_DATA;
+}
+
+/* True if next entry wraps to ring start. */
+static inline int
+next_entry_wraps(struct data_header *header)
+{
+	return header->wrap == WRAP_NEXT;
+}
+
+/* Return amount of skipped sectors in case of wrapping. */
+static unsigned
+sectors_skipped(struct ring_buffer *ring, struct data_header *header)
+{
+	if (likely(not_wrapped(header)))
+		;
+	else if (data_wrapped(header))
+		return sectors_unused(ring, data_start(header));
+	else if (next_entry_wraps(header))
+		return sectors_unused(ring, next_start(header));
+
+	return 0;
+}
+
+/* Emmit only once log error messages. */
+static void
+ring_buffer_error(enum ring_status_type type,
+		  struct ring_buffer *ring, int error)
+{
+	struct error {
+		enum ring_status_type type;
+		int (*f)(struct ring_buffer *);
+		const char *msg;
+	};
+	static const struct error errors[] = {
+		{ RING_BUFFER_DATA_ERROR, TestSetRingBufferDataError, "data" },
+		{ RING_BUFFER_HEAD_ERROR, TestSetRingBufferHeadError, "head" },
+		{ RING_BUFFER_HEADER_ERROR, TestSetRingBufferHeaderError,
+		  "header" },
+		{ RING_BUFFER_TAIL_ERROR, TestSetRingBufferTailError, "tail" },
+	};
+	const struct error *e = ARRAY_END(errors);
+
+	while (e-- > errors) {
+		if (type == e->type) {
+			if (!e->f(ring))
+				DMERR("ring buffer %s I/O error %d",
+				      e->msg, error);
+
+			return SetRingBufferError(ring);
+		}
+	}
+
+	BUG();
+}
+
+/*
+ * Allocate space for a data item in the ring buffer.
+ *
+ * header->pos is filled in with the sectors for the header and data in
+ * the ring buffer. The free space in the ring buffer is decremented to
+ * account for this entry. The return value is the sector address for the
+ * next data_header_disk.
+ */
+
+/* Increment buffer offset past actual header, optionaly wrapping data. */
+static sector_t
+ring_buffer_inc(struct ring_buffer *ring, struct data_header *header)
+{
+	sector_t sectors;
+
+	/* Initialize the header with the common case */
+	header->pos.header = ring->next_avail;
+	header->pos.data = data_start(header);
+
+	/*
+	 * Header doesn't fit before ring->end.
+	 *
+	 * This can only happen when we are started with an empty ring
+	 * buffer that has its tail near the end of the device.
+	 */
+	if (unlikely(data_start(header) > ring->end)) {
+		/*
+		 * Wrap an entire entry (header + data) to the beginning of
+		 * the log device. This will update the ring free sector
+		 * count to account for the unused sectors at the end
+		 * of the device.
+		 */
+		header->pos.header = ring->start;
+		header->pos.data = data_start(header);
+	/* Data doesn't fit before ring->end. */
+	} else if (unlikely(next_start(header) > ring->end)) {
+		/*
+		 * Wrap the data portion of a ring buffer entry to the
+		 * beginning of the log device. This will update the ring
+		 * free sector count to account for the unused sectors at
+		 * the end of the device.
+		 */
+		header->pos.data = ring->start;
+		header->wrap = WRAP_DATA;
+
+		/* REMOVEME: statistics. */
+		ring_buffer_repl_log(ring)->stats.wrap++;
+	} else
+		header->wrap = WRAP_NONE;
+
+	sectors = roundup_sectors(header->pos.data_sectors);
+	BUG_ON(sectors > ring->pending);
+	ring->pending -= sectors;
+
+	sectors = next_start_adjust(ring, header);
+	if (sectors == ring->start) {
+		header->wrap = WRAP_NEXT;
+
+		/* REMOVEME: statistics. */
+		ring_buffer_repl_log(ring)->stats.wrap++;
+	}
+
+	return sectors;
+}
+
+/* Slab and mempool definition. */
+struct cache_defs {
+	const enum ring_pool_type type;
+	const int min;
+	const size_t size;
+	struct kmem_cache *slab_pool;
+	const char *slab_name;
+	const size_t align;
+};
+
+/* Slab and mempool declarations. */
+static struct cache_defs cache_defs[] = {
+	{ ENTRY, ENTRY_POOL_MIN, sizeof(struct ring_buffer_entry),
+	  NULL, "dm_repl_log_entry", 0 },
+	{ DATA_HEADER, HEADER_POOL_MIN, sizeof(struct data_header),
+	  NULL, "dm_repl_log_header", 0 },
+	{ DATA_HEADER_DISK, HEADER_POOL_MIN, DATA_HEADER_DISK_SIZE,
+	  NULL, "dm_repl_log_disk_header", DATA_HEADER_DISK_SIZE },
+	{ COPY_CONTEXT, CC_POOL_MIN, sizeof(struct slink_copy_context),
+	  NULL, "dm_repl_log_copy", 0 },
+};
+
+/* Destroy all memory pools for a ring buffer. */
+static void
+ring_buffer_exit(struct ring_buffer *ring)
+{
+	mempool_t **pool = ARRAY_END(ring->pools);
+
+	sector_hash_exit(&ring->busy_sectors);
+
+	while (pool-- > ring->pools) {
+		if (likely(*pool)) {
+			DMINFO("Destroying mempool %p", *pool);
+			mempool_destroy(*pool);
+			*pool = NULL;
+		}
+	}
+}
+
+/* Create all mempools for a ring buffer. */
+static int
+ring_buffer_init(struct ring_buffer *ring)
+{
+	int r;
+	struct repl_log *l = ring_buffer_repl_log(ring);
+	struct cache_defs *pd = ARRAY_END(cache_defs);
+
+
+	mutex_init(&l->ring_buffer.mutex);
+	init_waitqueue_head(&ring->flushq);
+
+	/* Create slab pools. */
+	while (pd-- > cache_defs) {
+		/* Bitmap is not a slab pool. */
+		if (!pd->size)
+			continue;
+
+		ring->pools[pd->type] =
+			mempool_create_slab_pool(pd->min, pd->slab_pool);
+
+		if (unlikely(!ring->pools[pd->type])) {
+			DMERR("Error creating mempool %s", pd->slab_name);
+			goto bad;
+		}
+
+		DMINFO("Created mempool %s [%p]",
+		       pd->slab_name, ring->pools[pd->type]);
+	}
+
+	/* Initialize busy sector hash. */
+	r = sector_hash_init(&ring->busy_sectors, l->params.dev.size);
+	if (r < 0) {
+		DMERR("Failed to allocate sector busy hash!");
+		goto bad;
+	}
+
+	return 0;
+
+bad:
+	ring_buffer_exit(ring);
+	return -ENOMEM;
+}
+
+/*
+ * Reserve space in the ring buffer for the
+ * given bio data and associated header.
+ *
+ * Correct ring->free by any skipped sectors at the end of the ring buffer.
+ */
+static int
+ring_buffer_reserve_space(struct ring_buffer *ring, struct bio *bio)
+{
+	unsigned nsectors = roundup_sectors(bio_sectors(bio));
+	sector_t end_space, start_sector;
+
+	BUG_ON(!nsectors);
+	BUG_ON(!mutex_is_locked(&ring->mutex));
+
+	if (unlikely(ring->free < nsectors)) {
+		SetRingBufferFull(ring);
+		return -EBUSY;
+	}
+
+	/*
+	 * Account for the sectors that are queued for do_log()
+	 * but have not been accounted for on the disk.  We need this
+	 * calculation to see if any sectors will be lost from our
+	 * free pool at the end of ring buffer.
+	 */
+	start_sector = ring->next_avail + ring->pending;
+	end_space = sectors_unused(ring, start_sector);
+
+	/* if the whole I/O won't fit before the end of the disk. */
+	if (unlikely(end_space && end_space < nsectors)) {
+		sector_t skipped = end_space >= HEADER_SECTORS ?
+			sectors_unused(ring, start_sector + HEADER_SECTORS) :
+			end_space;
+
+		/* Don't subtract skipped sectors in case the bio won't fit. */
+		if (ring->free - skipped < nsectors)
+			return -EBUSY;
+
+		/*
+		 * We subtract the amount of skipped sectors
+		 * from ring->free here..
+		 *
+		 * ring_buffer_advance_head() will add them back on.
+		 */
+		ring->free -= skipped;
+	}
+
+	ring->free -= nsectors;
+	ring->pending += nsectors;
+	return 0;
+}
+
+static int
+ring_buffer_empty(struct ring_buffer *ring)
+{
+	int r;
+
+	mutex_lock(&ring->mutex);
+	r = ring->head == ring->tail && !RingBufferFull(ring);
+	mutex_unlock(&ring->mutex);
+
+	return r;
+}
+
+static void
+set_sync_mask(struct repl_log *l, struct ring_buffer_entry *entry)
+{
+	unsigned long slink_nr;
+
+	/* Bitmask of slinks with synchronous I/O completion policy. */
+	for_each_bit(slink_nr, ENTRY_SLINKS(entry), l->slink.max) {
+		struct dm_repl_slink *slink = slink_find(l, slink_nr);
+
+		/* Slink not configured. */
+		if (unlikely(IS_ERR(slink)))
+			continue;
+
+		/* If an slink has fallen behind an I/O threshold, it
+		 * must be marked for synchronous I/O completion. */
+		if (slink_synchronous(slink) ||
+		    SsSync(slink->caller))
+			slink_set_bit(slink_nr, ENTRY_SYNC(entry));
+	}
+}
+
+/*
+ * Always returns an initialized write entry,
+ * unless fatal memory allocation happens.
+ */
+static struct ring_buffer_entry *
+ring_buffer_alloc_entry(struct ring_buffer *ring, struct bio *bio)
+{
+	int dev_number, i;
+	struct repl_log *l = ring_buffer_repl_log(ring);
+	struct ring_buffer_entry *entry = alloc_entry(ring);
+	struct data_header *header = alloc_header(ring);
+	struct data_header_region *region;
+
+	BUG_ON(!entry);
+	BUG_ON(!header);
+	memset(entry, 0, sizeof(*entry));
+	memset(header, 0, sizeof(*header));
+
+	atomic_set(&entry->endios, 0);
+	atomic_set(&entry->ref, 0);
+	entry->ring = ring;
+	entry->data.header = header;
+	header->wrap = WRAP_NONE;
+
+	/* Now setup the ring_buffer_entry. */
+	i = ARRAY_SIZE(entry->lists.l);
+	while (i--)
+		INIT_LIST_HEAD(entry->lists.l + i);
+
+	/*
+	 * In case we're called with a bio, we're creating a new entry
+	 * or we're allocating it for reading the header in during init.
+	 */
+	if (bio) {
+		struct dm_repl_slink *slink0 = slink_find(l, 0);
+
+		_BUG_ON_PTR(slink0);
+
+		/* Setup the header region. */
+		dev_number = slink0->ops->dev_number(slink0, bio->bi_bdev);
+		BUG_ON(dev_number < 0);
+		region = &header->region;
+		region->dev = dev_number;
+		region->sector = bio_begin(bio);
+		region->size = bio->bi_size;
+		BUG_ON(!region->size);
+		header->pos.data_sectors =
+			roundup_data_sectors(bio_sectors(bio));
+
+		entry->bios.write = bio;
+		sector_range_mark_busy(entry);
+
+		/*
+		 * Successfully allocated space in the ring buffer
+		 * for this entry. Advance our in-memory tail pointer.
+		 * Round up to HEADER_SECTORS boundary for supporting
+		 * up to 4k sector sizes.
+		 */
+		mutex_lock(&ring->mutex);
+		ring->next_avail = ring_buffer_inc(ring, header);
+		mutex_unlock(&ring->mutex);
+
+		/* Bitmask of slinks to initiate copies accross. */
+		memcpy(ENTRY_SLINKS(entry), LOG_SLINKS(l), BITMAP_SIZE(l));
+
+		/* Set synchronous I/O policy mask. */
+		set_sync_mask(l, entry);
+	}
+
+	/* Add header to the ordered list of headers. */
+	list_add_tail(E_ORDERED_LIST(entry), L_ENTRY_ORDERED_LIST(l));
+
+	DBG_DMINFO("%s header->pos.header=%llu header->pos.data=%llu "
+		   "advancing ring->next_avail=%llu", __func__,
+		   (unsigned long long) header->pos.header,
+		   (unsigned long long) header->pos.data,
+		   (unsigned long long) ring->next_avail);
+	return entry;
+}
+
+/* Free a ring buffer entry and the data header hanging off it. */
+static void
+ring_buffer_free_entry(const char *caller, struct ring_buffer_entry *entry)
+{
+	struct ring_buffer *ring;
+	struct repl_log *l;
+
+	_BUG_ON_PTR(entry);
+	_BUG_ON_PTR(entry->data.header);
+
+	ring = entry->ring;
+	_BUG_ON_PTR(ring);
+	l = ring_buffer_repl_log(ring);
+
+	/*
+	 * Will need to change once ring_buffer_entry is
+	 * not kept around as long as the data header.
+	 */
+	if (!list_empty(E_COPY_CONTEXT_LIST(entry))) {
+		DMERR("%s %s E_COPY_CONTEXT_LIST not empty!",
+		      __func__, caller);
+		BUG();
+	}
+
+	if (!list_empty(E_WRITE_OR_COPY_LIST(entry)))
+		list_del(E_WRITE_OR_COPY_LIST(entry));
+
+	if (!list_empty(E_ORDERED_LIST(entry)))
+		list_del(E_ORDERED_LIST(entry));
+
+	free_header(entry->data.header, ring);
+	free_entry(entry, ring);
+}
+
+/* Mark a ring buffer entry invalid on the backing store device. */
+static void
+disk_header_set_invalid(void *ptr)
+{
+	((struct data_header_disk *) ptr)->valid = 0;
+}
+
+static int
+ring_buffer_mark_entry_invalid(struct ring_buffer *ring,
+			       struct ring_buffer_entry *entry)
+{
+	struct data_header *header = entry->data.header;
+
+	return header_io(WRITE, &(struct header_io_params) {
+			 DATA_HEADER, ring_buffer_repl_log(ring),
+			 header, header->pos.header, disk_header_set_invalid });
+}
+
+enum endio_type { HEADER_ENDIO, DATA_ENDIO };
+static void
+endio(struct ring_buffer_entry *entry,
+      enum endio_type type, unsigned long error)
+{
+	*(type == DATA_ENDIO ? &entry->data.error.data :
+			       &entry->data.error.header) = error;
+
+	if (atomic_dec_and_test(&entry->endios)) {
+		struct repl_log *l = ring_buffer_repl_log(entry->ring);
+
+		/*
+		 * Endio processing requires disk writes to advance the log
+		 * tail pointer. So, we need to defer this to process context.
+		 * The endios are processed from the l->lists.entry.io list,
+		 * and the entry is already on that list.
+		 */
+		wake_do_log(l);
+	} else
+		BUG_ON(atomic_read(&entry->endios) < 0);
+}
+
+/* Endio routine for data header io. */
+static void
+header_endio(unsigned long error, void *context)
+{
+	endio(context, HEADER_ENDIO, error);
+}
+
+/* Endio routine for data io (ie. the bio data written for an entry). */
+static void
+data_endio(unsigned long error, void *context)
+{
+	endio(context, DATA_ENDIO, error);
+}
+
+/*
+ * Place the data contained in bio asynchronously
+ * into the replog's ring buffer.
+ *
+ * This can be void, because any allocation failure is fatal and any
+ * IO errors will be reported asynchronously via dm_io() callbacks.
+ */
+static void
+ring_buffer_write_entry(struct repl_log *l, struct bio *bio)
+{
+	int i;
+	struct ring_buffer *ring = &l->ring_buffer;
+	/*
+	 * ring_buffer_alloc_entry returns an entry,
+	 * including an initialized data_header.
+	 */
+	struct ring_buffer_entry *entry = ring_buffer_alloc_entry(ring, bio);
+	struct data_header_disk *disk_header = alloc_data_header_disk(ring);
+	struct data_header *header = entry->data.header;
+	struct dev_io_params dio[] = {
+		{ /* Data IO specs. */
+		  &l->params.dev, header->pos.data, bio->bi_size,
+		  .mem = { DM_IO_BVEC, { .bvec = bio_vec(bio) }, 0 },
+		  .notify = { data_endio, entry }
+		},
+		{ /* Header IO specs. */
+		  &l->params.dev, header->pos.header, DATA_HEADER_DISK_SIZE,
+		  .mem = { DM_IO_KMEM, { .addr = disk_header }, 0 },
+		  .notify = { header_endio, entry }
+		},
+	};
+
+	DBG_DMINFO("in  %s %u", __func__, jiffies_to_msecs(jiffies));
+
+	BUG_ON(!disk_header);
+	entry->data.disk_header = disk_header;
+	data_header_to_disk(BITMAP_ELEMS(l), disk_header, header);
+
+	/* Take ring_buffer IO reference out vs. slink0. */
+	ss_io_get(__func__, l->slink0->caller);
+
+	/* Add to ordered list of active entries. */
+	list_add_tail(E_WRITE_OR_COPY_LIST(entry), L_ENTRY_RING_WRITE_LIST(l));
+
+	DBG_DMINFO("%s writing header to offset=%llu and bio for "
+		   "sector=%llu to sector=%llu/size=%llu", __func__,
+		   (unsigned long long) entry->data.header->pos.header,
+		   (unsigned long long) bio_begin(bio),
+		   (unsigned long long) entry->data.header->pos.data,
+		   (unsigned long long) to_sector(dio[1].size));
+
+	/*
+	 * Submit the writes.
+	 *
+	 * 1 I/O count for header + 1 for data
+	 */
+	i = ARRAY_SIZE(dio);
+	atomic_set(&entry->endios, i);
+	while (i--)
+		BUG_ON(dev_io(WRITE, ring, dio + i));
+
+	DBG_DMINFO("out %s %u", __func__, jiffies_to_msecs(jiffies));
+}
+
+/* Endio routine for bio data reads of off the ring buffer. */
+static void
+read_bio_vec_endio(unsigned long error, void *context)
+{
+	struct ring_buffer_entry *entry = context;
+	struct ring_buffer *ring = entry->ring;
+	struct repl_log *l = ring_buffer_repl_log(ring);
+
+	atomic_dec(&entry->endios);
+	BUG_ON(!entry->bios.read);
+	bio_endio(entry->bios.read, error ? -EIO : 0);
+	entry->bios.read = NULL;
+	entry_put(entry);
+	wake_do_log(l);
+
+	/* Release IO reference on slink0. */
+	ss_io_put(__func__, l->slink0->caller);
+}
+
+/* Read bio data of off the ring buffer. */
+static void
+ring_buffer_read_bio_vec(struct repl_log *l,
+			 struct ring_buffer_entry *entry, sector_t offset,
+			 struct bio_vec *bio_io_vec, unsigned size)
+{
+	/* Data IO specs. */
+	struct dev_io_params dio = {
+		&l->params.dev,
+		entry->data.header->pos.data + offset, size,
+		.mem = { DM_IO_BVEC, { .bvec = bio_io_vec }, 0 },
+		.notify = { read_bio_vec_endio, entry }
+	};
+
+	DBG_DMINFO("in  %s %u", __func__, jiffies_to_msecs(jiffies));
+	_BUG_ON_PTR(entry);
+	entry_get(entry);
+	atomic_inc(&entry->endios);
+
+	/* Take IO reference out vs. slink0. */
+	ss_io_get(__func__, l->slink0->caller);
+
+	DBG_DMINFO("%s reading bio data bio for sector=%llu/size=%llu",
+		   __func__, (unsigned long long) bio_begin(bio),
+		   (unsigned long long) to_sector(dio.size));
+
+	/*
+	 * Submit the read.
+	 */
+	BUG_ON(dev_io(READ, &l->ring_buffer, &dio));
+	DBG_DMINFO("out %s %u", __func__, jiffies_to_msecs(jiffies));
+}
+
+/*
+ * Advances the ring buffer head pointer, updating the in-core data
+ * and writing it to the backing store device, but only if there are
+ * inactive entries (ie. those with copies to all slinks) at the head.
+ *
+ * Returns -ve errno on failure, otherwise the number of entries freed.
+ */
+static inline int
+entry_endios_pending(struct ring_buffer_entry *entry)
+{
+	return entry_busy(ring_buffer_repl_log(entry->ring), ENTRY_IOS(entry));
+}
+
+static int
+ring_buffer_advance_head(struct ring_buffer *ring, int force)
+{
+	int active = 0, r;
+	unsigned entries_freed = 0;
+	sector_t sectors_freed = 0;
+	struct repl_log *l = ring_buffer_repl_log(ring);
+	struct ring_buffer_entry *entry, *entry_last = NULL, *n;
+
+	/* Count any freeable entries and remeber last one. */
+	mutex_lock(&ring->mutex);
+	list_for_each_entry_safe(entry, n, L_ENTRY_ORDERED_LIST(l),
+				 lists.l[E_ORDERED]) {
+		/* Can't advance past dirty entry. */
+		if (atomic_read(&entry->endios) ||
+		    entry_busy(l, ENTRY_SLINKS(entry))) {
+			active = 1;
+			break;
+		}
+
+		BUG_ON(entry_endios_pending(entry));
+
+		DBG_DMINFO("%s header->pos.header=%llu header->pos.data=%llu "
+			   "ring->head=%llu", __func__,
+			   (unsigned long long) header->pos.header,
+			   (unsigned long long) header->pos.data,
+			   (unsigned long long) ring->head);
+		entry_last = entry;
+		entries_freed++;
+	}
+
+	/* FIXME: we don't want to wait for the whole ring to fill. */
+	if ((!entries_freed ||
+	     (active && !RingBlocked(ring))) &&
+	    !force) {
+		mutex_unlock(&ring->mutex);
+		return 0;
+	}
+
+	BUG_ON(!entry_last);
+
+	list_for_each_entry_safe(entry, n, L_ENTRY_ORDERED_LIST(l),
+				 lists.l[E_ORDERED]) {
+		struct data_header *header = entry->data.header;
+
+		BUG_ON(entry_endios_pending(entry) ||
+		       atomic_read(&entry->endios) ||
+		       entry_busy(l, ENTRY_SLINKS(entry)));
+
+		/*
+		 * If the entry wrapped around between the header and
+		 * the data or if the next entry wraps, free the
+		 * unused sectors at the end of the device.
+		 */
+		sectors_freed += roundup_sectors(header->pos.data_sectors)
+				 + sectors_skipped(ring, header);
+		if (likely(ring->head != ring->tail))
+			ring->head = next_start_adjust(ring, header);
+		BUG_ON(ring->head >= ring->end);
+
+		/* Don't access entry after this call! */
+		ring_buffer_free_entry(__func__, entry);
+
+		if (entry == entry_last)
+			break;
+	}
+
+	mutex_unlock(&ring->mutex);
+
+	DMINFO("%s advancing ring buffer head for %u entries to %llu",
+	       __func__, entries_freed, (unsigned long long) ring->head);
+
+	/* Update ring buffer pointers in buffer header. */
+	r = buffer_header_io(WRITE, l);
+	if (likely(!r)) {
+		/* Buffer header written... */
+		mutex_lock(&ring->mutex);
+		ring->free += sectors_freed;
+		mutex_unlock(&ring->mutex);
+	}
+
+	/* Inform caller, that we're willing to receive more I/Os. */
+	ClearRingBlocked(ring);
+	ClearRingBufferFull(ring);
+	notify_caller(l, WRITE, 0);
+	if (unlikely(r < 0))
+		ring_buffer_error(RING_BUFFER_HEAD_ERROR, ring, r);
+
+	return r ? r : entries_freed;
+}
+
+/*
+ * Advances the tail pointer after a successful
+ * write of an entry to the log.
+ */
+static int
+ring_buffer_advance_tail(struct ring_buffer_entry *entry)
+{
+	int r;
+	sector_t new_tail, old_tail;
+	struct ring_buffer *ring = entry->ring;
+	struct repl_log *l = ring_buffer_repl_log(ring);
+	struct data_header *header = entry->data.header;
+
+/*
+	if (unlikely(ring->tail != header->pos.header)) {
+		DMERR("ring->tail %llu header->pos.header %llu",
+		      (unsigned long long) ring->tail,
+		      (unsigned long long) header->pos.header);
+		BUG();
+	}
+*/
+
+	mutex_lock(&ring->mutex);
+	old_tail = ring->tail;
+	/* Should we let this get out of sync? */
+	new_tail = ring->tail = next_start_adjust(ring, header);
+	BUG_ON(ring->tail >= ring->end);
+
+	mutex_unlock(&ring->mutex);
+
+	DBG_DMINFO("%s header->pos.header=%llu header->pos.data=%llu "
+		   "ring->tail=%llu", __func__,
+		   (unsigned long long) header->pos.header,
+		   (unsigned long long) header->pos.data,
+		   (unsigned long long) ring->tail);
+	DBG_DMINFO("Advancing ring tail pointer to %llu",
+			(unsigned long long) ring->tail);
+
+	r = buffer_header_io(WRITE, l);
+	if (unlikely(r < 0)) {
+		/* Return the I/O size to ring->free. */
+		mutex_lock(&ring->mutex);
+		/* Make sure it wasn't changed. */
+		BUG_ON(ring->tail != new_tail);
+		ring->tail = old_tail;
+		mutex_unlock(&ring->mutex);
+
+		ring_buffer_error(RING_BUFFER_TAIL_ERROR, ring, r);
+	}
+
+	return r;
+}
+
+/* Open type <-> name mapping. */
+static const struct dm_str_descr open_types[] = {
+	{ OT_AUTO, "auto" },
+	{ OT_OPEN, "open" },
+	{ OT_CREATE, "create" },
+};
+
+/* Get slink policy flags. */
+static inline int
+_open_type(const char *name)
+{
+	return dm_descr_type(open_types, ARRAY_SIZE(open_types), name);
+}
+
+/* Get slink policy name. */
+static inline const char *
+_open_str(const int type)
+{
+	return dm_descr_name(open_types, ARRAY_SIZE(open_types), type);
+}
+
+/*
+ * Amount of free sectors in ring buffer.  This function does not take
+ * into account unused sectors at the end of the log device.
+ */
+static sector_t
+ring_free(struct ring_buffer *ring)
+{
+	if (unlikely(ring->head == ring->next_avail))
+		return ring->end - ring->start;
+	else
+		return ring->head > ring->tail ?
+		       ring->head - ring->tail :
+		       (ring->head - ring->start) + (ring->end - ring->tail);
+}
+
+static struct log_header *
+alloc_log_header(struct repl_log *l)
+{
+	struct log_header *log_header =
+		kzalloc(sizeof(*log_header), GFP_KERNEL);
+
+	if (likely(log_header))
+		l->header.log = log_header;
+
+	return log_header;
+}
+
+static void free_log_header(struct log_header *log_header,
+			    struct ring_buffer *ring)
+{
+	kfree(log_header);
+}
+
+/* Create a new dirty log. */
+static int
+log_create(struct repl_log *l)
+{
+	int r;
+	struct log_header *log_header = l->header.log;
+	struct repl_dev *dev = &l->params.dev;
+	struct repl_params *params = &l->params;
+	struct ring_buffer *ring = &l->ring_buffer;
+
+	DMINFO("write_log: creating new log");
+	_BUG_ON_PTR(log_header);
+
+	/* First, create the in-memory representation */
+	log_header->version.major = DM_REPL_LOG_MAJOR;
+	log_header->version.minor =  DM_REPL_LOG_MINOR;
+	log_header->version.subminor =  DM_REPL_LOG_MICRO;
+	log_header->size = params->dev.size;
+	log_header->buffer_header = dev->start + HEADER_SECTORS;
+
+	/* Write log header to device. */
+	r = log_header_io(WRITE, l);
+	if (unlikely(r < 0)) {
+		free_log_header(log_header, ring);
+		l->header.log = NULL;
+		return r;
+	}
+
+	/* initialize the ring buffer */
+	/* Start is behind the buffer header which follows the log header. */
+	ring->start = params->dev.start;
+	ring->end = ring->start + params->dev.size;
+	ring->start += 2 * HEADER_SECTORS;
+	ring->head = ring->tail = ring->next_avail = ring->start;
+	ring->free = ring_free(ring);
+
+	DMINFO("%s start=%llu end=%llu free=%llu", __func__,
+	       (unsigned long long) ring->start,
+	       (unsigned long long) ring->end, (unsigned long long) ring->free);
+
+	r = buffer_header_io(WRITE, l);
+	if (unlikely(r < 0)) {
+		free_log_header(log_header, ring);
+		l->header.log = NULL;
+		return r;
+	}
+
+	DMINFO("%s: returning initialized log header", __func__);
+	return 0;
+}
+
+/* Allocate a log_header and read header in from disk. */
+static int
+log_read(struct repl_log *l)
+{
+	int r;
+	struct log_header *log_header = l->header.log;
+	struct repl_dev *dev;
+	struct ring_buffer *ring;
+
+	_BUG_ON_PTR(log_header);
+	r = log_header_io(READ, l);
+	if (unlikely(r < 0))
+		return r;
+
+	/* Make sure that we can handle this version of the log. */
+	if (memcmp(&log_header->version, &my_version, sizeof(my_version)))
+		DMINFO("Found valid log header on disk");
+	else
+		DM_EINVAL("On-disk version (%d.%d.%d) is "
+			  "not supported by this module.",
+			  log_header->version.major, log_header->version.minor,
+			  log_header->version.subminor);
+
+	/*
+	 * Read in the buffer_header_disk
+	 */
+	r = buffer_header_io(READ, l);
+	if (unlikely(r < 0))
+		return r;
+
+	dev = &l->params.dev;
+	ring = &l->ring_buffer;
+	ring->end = dev->start + dev->size;
+	ring->next_avail = ring->tail;
+
+	/*
+	 * The following call to ring_free is incorrect as the free
+	 * space in the ring has to take into account the potential
+	 * for unused sectors at the end of the device.  However, once
+	 * do_log_init is called, any discrepencies are fixed there.
+	 */
+	ring->free = ring_free(ring);
+	return 0;
+}
+
+/*
+ * Open and read/initialize a replicator log backing store device.
+ *
+ * Must be called with dm_io client set up, because we dm_io to the device.
+ */
+/* Try to read an existing log or create a new one. */
+static int
+log_init(struct repl_log *l)
+{
+	int r;
+	struct repl_params *p = &l->params;
+	struct log_header *log_header = alloc_log_header(l);
+
+	BUG_ON(!log_header);
+
+	/* Read the log header in from disk. */
+	r = log_read(l);
+	switch (r) {
+	case 0:
+		/* Sucessfully read in the log. */
+		if (p->open_type == OT_CREATE)
+			DMERR("OT_CREATE requested: initializing "
+			      "existing log!");
+		else
+			break;
+	case -EINVAL:
+		/*
+		 * Most likely this is the initial create of the log.
+		 * But, if this is an open, return failure.
+		 */
+		if (p->open_type == OT_OPEN)
+			DMWARN("Can't create new replog on open!");
+		else
+			/* Try to create a new log. */
+			r = log_create(l);
+
+		break;
+	case -EIO:
+		DMERR("log_read IO error!");
+		break;
+	default:
+		DMERR("log_read failed with %d?", r);
+	}
+
+	return r;
+}
+
+/* Find a replog on the global list checking for bdev and start offset. */
+struct repl_log *
+replog_find(dev_t dev, sector_t dev_start)
+{
+	struct repl_log *replog;
+
+	list_for_each_entry(replog, &replog_list, lists.l[L_REPLOG]) {
+		if (replog->params.dev.dm_dev->bdev->bd_dev == dev)
+			return likely(replog->params.dev.start == dev_start) ?
+				replog : ERR_PTR(-EINVAL);
+	}
+
+	return ERR_PTR(-ENOENT);
+}
+
+/* Clear all allocated slab objects in case of busys teardown. */
+static void
+ring_buffer_free_entries(struct ring_buffer *ring)
+{
+	struct ring_buffer_entry *entry, *n;
+	struct repl_log *l = ring_buffer_repl_log(ring);
+
+	list_for_each_entry_safe(entry, n, L_ENTRY_ORDERED_LIST(l),
+				 lists.l[E_ORDERED]) {
+		if (atomic_read(&entry->ref))
+			sector_range_clear_busy(entry);
+
+		free_header(entry->data.header, ring);
+		free_entry(entry, ring);
+	}
+}
+
+static void
+replog_release(struct kref *ref)
+{
+	struct repl_log *l = container_of(ref, struct repl_log, ref);
+
+	BUG_ON(!list_empty(L_REPLOG_LIST(l)));
+	kfree(l);
+	DMINFO("%s replog released", __func__);
+}
+
+/* Destroy replication log. */
+static void
+replog_destroy(struct repl_log *l)
+{
+	_BUG_ON_PTR(l);
+
+	if (l->io.wq)
+		destroy_workqueue(l->io.wq);
+
+	free_log_header(l->header.log, &l->ring_buffer);
+	ring_buffer_free_entries(&l->ring_buffer);
+	ring_buffer_exit(&l->ring_buffer);
+	kfree(l->io.buffer_header_disk);
+
+	if (l->io.io_client)
+		dm_io_client_destroy(l->io.io_client);
+}
+
+/* Release a reference on a replog freeing its resources on last drop. */
+static int
+replog_put(struct dm_repl_log *log, struct dm_target *ti)
+{
+	struct repl_log *l;
+
+	_SET_AND_BUG_ON_L(l, log);
+	dm_put_device(ti, l->params.dev.dm_dev);
+	return kref_put(&l->ref, replog_release);
+}
+
+/* Get a reference on a replicator log. */
+static void do_log(struct work_struct *ws);
+static struct repl_log *
+replog_get(struct dm_repl_log *log, struct dm_target *ti,
+	   const char *path, struct repl_params *params)
+{
+	int i, r;
+	dev_t dev;
+	struct dm_dev *dm_dev;
+	char buf[BDEVNAME_SIZE];
+	struct repl_log *l;
+	struct dm_io_client *io_client;
+
+	/* Get device with major:minor or device path. */
+	r = dm_get_device(ti, path, params->dev.start, params->dev.size,
+			  FMODE_WRITE, &dm_dev);
+	if (r) {
+		DMERR("Failed to open replicator log device \"%s\" [%d]",
+		      path, r);
+		return ERR_PTR(r);
+	}
+
+	dev = dm_dev->bdev->bd_dev;
+
+	/* Check if we already have a handle to this device. */
+	mutex_lock(&list_mutex);
+	l = replog_find(dev, params->dev.start);
+	if (IS_ERR(l)) {
+		mutex_unlock(&list_mutex);
+
+		if (unlikely(l == ERR_PTR(-EINVAL))) {
+			DMERR("Device open with different start offset!");
+			dm_put_device(ti, dm_dev);
+			return l;
+		}
+	} else {
+		/* Cannot create if there is an open reference. */
+		if (params->open_type == OT_CREATE) {
+			mutex_unlock(&list_mutex);
+			DMERR("OT_CREATE requested, but existing log found!");
+			dm_put_device(ti, dm_dev);
+			return ERR_PTR(-EPERM);
+		}
+
+		/* Take reference on replication log out. */
+		kref_get(&l->ref);
+		mutex_unlock(&list_mutex);
+
+		DMINFO("Found existing replog=%s", format_dev_t(buf, dev));
+
+		/* Found one, return it. */
+		log->context = l;
+		return l;
+	}
+
+	/*
+	 * There is no open log, so time to look for one on disk.
+	 */
+	l = kzalloc(sizeof(*l), GFP_KERNEL);
+	if (unlikely(!l)) {
+		DMERR("failed to allocate replicator log context");
+		dm_put_device(ti, dm_dev);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	/* Preserve constructor parameters. */
+	l->params = *params;
+	l->params.dev.dm_dev = dm_dev;
+
+	log->context = l;
+	l->replog = log;
+
+	/* Init basic members. */
+	rwlock_init(&l->lists.lock);
+	rwlock_init(&l->lists.slinks.lock);
+	INIT_LIST_HEAD(&l->lists.slinks.list);
+
+	i = L_NR_LISTS;
+	while (i--)
+		INIT_LIST_HEAD(l->lists.l + i);
+
+	spin_lock_init(&l->io.lock);
+	bio_list_init(&l->io.in);
+
+	/* Take first reference out. */
+	kref_init(&l->ref);
+
+	/* Initialize ring buffer. */
+	r = ring_buffer_init(&l->ring_buffer);
+	if (unlikely(r < 0)) {
+		DMERR("failed to initialize ring buffer %d", r);
+		goto bad;
+	}
+
+	/* Preallocate to avoid stalling on OOM. */
+	l->io.buffer_header_disk =
+		kmalloc(dm_round_up(sizeof(l->io.buffer_header_disk),
+			to_bytes(1)), GFP_KERNEL);
+	if (unlikely(!l->io.buffer_header_disk)) {
+		DMERR("failed to allocate ring buffer disk header");
+		r = -ENOMEM;
+		goto bad;
+	}
+
+	/*
+	 * ringbuffer_io will only be called with I/O sizes of ti->split_io
+	 * or fewer bytes, which are boundary checked too.
+	 *
+	 * The io_client needs to be setup before we can call log_init below.
+	 */
+	io_client = dm_io_client_create(DEFAULT_BIOS * (1 + BIO_MAX_PAGES));
+	if (unlikely(IS_ERR(io_client))) {
+		DMERR("dm_io_client_create failed!");
+		r = PTR_ERR(io_client);
+		goto bad;
+	} else
+		l->io.io_client = io_client;
+
+	/* Create one worker per replog. */
+	l->io.wq = create_singlethread_workqueue(DAEMON);
+	if (unlikely(!l->io.wq)) {
+		DMERR("failed to create workqueue");
+		r = -ENOMEM;
+		goto bad;
+	} else
+		INIT_WORK(&l->io.ws, do_log);
+
+	/* Try to read an existing log or create a new one. */
+	r = log_init(l);
+	if (unlikely(r < 0))
+		goto bad;
+
+	DMINFO("Created replog=%s", format_dev_t(buf, dev));
+
+	stats_init(l);
+	ClearLogDevelStats(l);
+
+	/* Start out suspended, dm core will resume us. */
+	SetRingSuspended(&l->ring_buffer);
+	SetRingBlocked(&l->ring_buffer);
+
+	/* Link the new replog into the global list */
+	mutex_lock(&list_mutex);
+	list_add_tail(L_REPLOG_LIST(l), &replog_list);
+	mutex_unlock(&list_mutex);
+
+	return l;
+
+bad:
+	replog_destroy(l);
+	return ERR_PTR(r);
+}
+
+/* Account and entry for fallbehind an put on copy list. */
+static void
+entry_account_and_copy(struct ring_buffer_entry *entry)
+{
+	unsigned long slink_nr;
+	struct repl_log *l;
+
+	_BUG_ON_PTR(entry);
+	l = ring_buffer_repl_log(entry->ring);
+	_BUG_ON_PTR(l);
+
+	/* If there's no outstanding copies for this entry -> bail out. */
+	if (!entry_busy(l, ENTRY_SLINKS(entry)) ||
+	    entry->being_copied)
+		return;
+
+	entry->being_copied = 1;
+	_BUG_ON_PTR(entry->ring);
+
+	/* Account for fallbehind. */
+	for_each_bit(slink_nr, ENTRY_SLINKS(entry), l->slink.max) {
+		struct dm_repl_slink *slink = slink_find(l, slink_nr);
+
+		if (!IS_ERR(slink))
+			slink_fallbehind_inc(slink, entry);
+	}
+
+	/*
+	 * Initiate copies across all SLINKS by moving to
+	 * copy list in order. Because we are already running
+	 * in do_log before do_slink_ios(), we need not
+	 * call wake_do_log.
+	 */
+	list_move_tail(E_WRITE_OR_COPY_LIST(entry), L_SLINK_COPY_LIST(l));
+}
+
+/*
+ * Initialize logs incore metadata.
+ */
+static void
+do_log_init(struct repl_log *l)
+{
+	int r;
+	sector_t sector;
+	struct ring_buffer *ring = &l->ring_buffer;
+	struct ring_buffer_entry *entry;
+
+	/* NOOP in case we're initialized already. */
+	if (TestSetLogInitialized(l))
+		return;
+
+	DMINFO("%s", __func__);
+
+	/* Nothing to do if the log is empty */
+	if (ring_buffer_empty(ring)) {
+		DMINFO("empty ring buffer");
+		goto out;
+	}
+
+	/*
+	 * Start at head and walk to tail, queuing I/O to slinks.
+	 */
+	for (sector = ring->head; sector != ring->tail;) {
+		int r;
+		struct data_header *header;
+
+		entry = ring_buffer_alloc_entry(ring, NULL);
+		header = entry->data.header;
+		r = data_header_io(READ, l, header, sector);
+		if (unlikely(r < 0)) {
+			/*
+			 * FIXME: as written, this is not recoverable.
+			 * 	  All ios have to be errored because
+			 * 	  of RingBufferError().
+			 */
+			ring_buffer_error(RING_BUFFER_HEADER_ERROR, ring,
+					  PTR_ERR(entry));
+			ring_buffer_free_entry(__func__, entry);
+			break;
+		} else {
+			/* Set synchronous I/O policy mask. */
+			set_sync_mask(l, entry);
+
+			/* Adjust ring->free for any skipped sectors. */
+			ring->free -= sectors_skipped(ring, header);
+
+			/*
+			 * Mark sector range busy in case the
+			 * entry hasn't been copied to slink0 yet.
+			 */
+			if (slink_test_bit(0, ENTRY_SLINKS(entry)))
+				sector_range_mark_busy(entry);
+
+			/*
+			 * Account entry for fallbehind and
+			 * put on slink copy list if needed.
+			 */
+			entry_account_and_copy(entry);
+
+			/* Advance past this entry. */
+			sector = unlikely(next_entry_wraps(header)) ?
+				 ring->start : next_start(header);
+		}
+	}
+
+	/* Advance head past any already copied entries. */
+	r = ring_buffer_advance_head(ring, 1);
+	if (r > 0)
+		DMINFO("%d entries freed", r);
+
+out:
+	ClearRingBlocked(ring);
+	notify_caller(l, READ, 0);
+	DMINFO("init complete");
+}
+
+/*
+ * Conditionally endio a bio, when no copies on sync slinks are pending.
+ *
+ * In case an error on site link 0 occured, the bio will be errored!
+ */
+static void
+entry_nosync_endio(const char *caller, struct ring_buffer_entry *entry)
+{
+	struct bio *bio = entry->bios.write;
+
+	/* If all sync slinks processed. */
+	if (bio && !entry_busy(ring_buffer_repl_log(entry->ring),
+			       ENTRY_SYNC(entry))) {
+		DBG_DMINFO("%s Calling bio_endio with %u, bi_endio %p",
+			   caller, header->region.size, bio->bi_end_io);
+
+		/* Only error in case of site link 0 errors. */
+		bio_endio(bio,
+			  slink_test_bit(0, ENTRY_ERROR(entry)) ? -EIO : 0);
+		entry->bios.write = NULL;
+	}
+}
+
+/*
+ * Error endio the entries bio, mark the ring
+ * buffer entry invalid and advance the tail.
+ */
+static void
+entry_endio_invalid(const char *caller,
+		    struct repl_log *l, struct ring_buffer_entry *entry)
+{
+	int r;
+
+	DBG_DMINFO("entry %p header_err %lu, data_err %lu", entry,
+			entry->data.error.header,
+			entry->data.error.data);
+	BUG_ON(!entry->bios.write);
+
+	bio_endio(entry->bios.write, -EIO);
+
+	/* Mark the header as invalid so it is not queued for slink copies. */
+	r = ring_buffer_mark_entry_invalid(&l->ring_buffer, entry);
+	if (unlikely(r < 0)) {
+		/* FIXME: XXX
+		 * Take the device offline?
+		 */
+		DMERR("%s: I/O to sector %llu of log device "
+				"failed, and failed to mark header "
+				"invalid.  Taking device off-line.",
+				__func__,
+				(unsigned long long)
+				entry->data.header->region.sector);
+	}
+
+	ring_buffer_free_entry(caller, entry);
+}
+
+static inline int
+cc_error_read(struct slink_copy_context *cc)
+{
+	return cc->error[ERR_DISK].read ||
+	       cc->error[ERR_RAM].read;
+}
+
+static inline int
+cc_error_write(struct slink_copy_context *cc)
+{
+	return cc->error[ERR_DISK].write ||
+	       cc->error[ERR_RAM].write;
+}
+
+static inline int
+cc_error(struct slink_copy_context *cc)
+{
+	return cc_error_read(cc) ||
+	       cc_error_write(cc);
+}
+
+/*
+ * Set state of slink_copy_context to completion.
+ *
+ * Called with list lock held.
+ */
+static void
+slink_copy_complete(struct slink_copy_context *cc)
+{
+	int slink_nr;
+	struct dm_repl_slink *slink = cc->slink;
+	struct ring_buffer_entry *entry = cc->entry;
+	struct repl_log *l = ring_buffer_repl_log(entry->ring);
+
+	_BUG_ON_PTR(slink);
+	_BUG_ON_PTR(slink->caller);
+	_BUG_ON_PTR(entry);
+	_BUG_ON_PTR(entry->data.header);
+	_BUG_ON_PTR(l);
+	slink_nr = slink->ops->slink_number(slink);
+	_BUG_ON_SLINK_NR(l, slink_nr);
+
+	/* The entry is no longer under I/O accross this slink. */
+	slink_clear_bit(slink_nr, ENTRY_IOS(entry));
+
+	/* The slink is no longer under I/O. */
+	slink_clear_bit(slink_nr, LOG_SLINKS_IOS(l));
+
+	/* Update the I/O threshold counters */
+	slink_fallbehind_dec(slink, entry);
+
+	DBG_DMINFO("processing I/O completion for slink%d", slink_nr);
+
+	if (unlikely(cc_error(cc)) &&
+		     slink_test_bit(slink_nr, LOG_SLINKS(l))) {
+		slink_set_bit(slink_nr, ENTRY_ERROR(entry));
+		DMERR_LIMIT("copy on slink%d failed", slink_nr);
+	} else {
+		/* Flag entry copied to slink_nr. */
+		slink_clear_bit(slink_nr, ENTRY_SLINKS(entry));
+
+		/* Reset any sync copy request to slink_nr. */
+		slink_clear_bit(slink_nr, ENTRY_SYNC(entry));
+	}
+
+	free_copy_context(cc, entry->ring);
+
+	/* Release slink state reference after completion. */
+	ss_io_put(__func__, slink->caller);
+}
+
+/* Check for entry with endios pending at ring buffer head. */
+static int
+ring_buffer_head_busy(struct repl_log *l)
+{
+	int r = 1;
+	struct ring_buffer_entry *entry;
+
+	mutex_lock(&l->ring_buffer.mutex);
+
+	/*
+	 * This shouldn't happen.  Presumably this function is called
+	 * when the ring buffer is overflowing, so you would expect
+	 * at least one entry on the list!
+	 */
+	if (unlikely(list_empty(L_ENTRY_ORDERED_LIST(l))))
+		goto out_unlock;
+
+	/* The first entry on this list is the ring head. */
+	entry = list_first_entry(L_ENTRY_ORDERED_LIST(l),
+				 struct ring_buffer_entry,
+				 lists.l[E_ORDERED]);
+	r = entry_endios_pending(entry);
+	mutex_unlock(&l->ring_buffer.mutex);
+	return r;
+
+out_unlock:
+	mutex_unlock(&l->ring_buffer.mutex);
+	DMERR_LIMIT("%s called with an empty ring!", __func__);
+	return 0;
+}
+
+/*
+ * Find the first ring buffer entry with outstanding copies
+ * and record each slink that hasn't completed the copy I/O.
+ */
+static int
+find_slow_slinks(struct repl_log *l, uint64_t *slow_slinks)
+{
+	int r = 0;
+	struct ring_buffer_entry *entry;
+
+	/* Needed for E_COPY_CONTEXT_LIST() access. */
+	list_for_each_entry(entry, L_SLINK_COPY_LIST(l),
+			    lists.l[E_WRITE_OR_COPY]) {
+		int slink_nr;
+		struct slink_copy_context *cc;
+
+		/*
+		 * There may or may not be slink copy contexts hanging
+		 * off of the entry. If there aren't any, it means the
+		 * copy has already completed.
+		 */
+		list_for_each_entry(cc, E_COPY_CONTEXT_LIST(entry), list) {
+			struct dm_repl_slink *slink = cc->slink;
+
+			slink_nr = slink->ops->slink_number(slink);
+			_BUG_ON_SLINK_NR(l, slink_nr);
+			slink_set_bit(slink_nr, slow_slinks);
+			r = 1;
+			break;
+		}
+
+	}
+
+	if (r) {
+		/*
+		 * Check to see if all slinks are slow!  slink0 should
+		 * not be slow, one would hope!  But, we need to deal
+		 * with that case.
+		 */
+		if (slink_test_bit(0, slow_slinks)) {
+			struct slink_state *ss;
+
+			_BUG_ON_PTR(l->slink0);
+			ss = l->slink0->caller;
+			_BUG_ON_PTR(ss);
+
+			/*
+			 * If slink0 is slow, there is
+			 * obviously some other problem!
+			 */
+			DMWARN("%s: slink0 copy taking a long time "
+			       "(%u ms)", __func__,
+			       jiffies_to_msecs(jiffies) -
+			       jiffies_to_msecs(ss->fb.head_jiffies));
+			r = 0;
+		} else if (!memcmp(slow_slinks, LOG_SLINKS(l),
+				   sizeof(LOG_SLINKS(l))))
+			r = 0;
+
+		if (!r)
+			memset(slow_slinks, 0, BITMAP_SIZE(l));
+	}
+
+	return r;
+}
+
+/* Check if entry has ios scheduled on slow slinks. */
+static int
+entry_is_slow(struct ring_buffer_entry *entry, uint64_t *slow_slinks)
+{
+	unsigned long slink_nr;
+
+	for_each_bit(slink_nr, ENTRY_IOS(entry),
+		     ring_buffer_repl_log(entry->ring)->slink.max) {
+		if (test_bit(slink_nr, (void *) slow_slinks))
+			return 1;
+	}
+
+	return 0;
+}
+
+/*
+ * Cancel slink_copies to the slinks specified in the slow_slinks bitmask.
+ *
+ * This function starts at the beginning of the ordered slink copy list
+ * and frees up ring buffer entries which are waiting only for the slow
+ * slinks.  This is accomplished by marking the regions under I/O as
+ * dirty in the slink dirty logs and advancing the ring head pointer.
+ * Once a ring buffer entry is encountered that is waiting for more
+ * than just the slinks specified, the function terminates.
+ */
+static void
+repl_log_cancel_copies(struct repl_log *l, uint64_t *slow_slinks)
+{
+	int r;
+	unsigned long slink_nr;
+	struct ring_buffer *ring = &l->ring_buffer;
+	struct ring_buffer_entry *entry;
+	struct dm_repl_slink *slink;
+	struct data_header_region *region;
+	struct slink_copy_context *cc, *n;
+	static uint64_t flush_slinks[BITMAP_ELEMS_MAX],
+			flush_error[BITMAP_ELEMS_MAX],
+			stall_slinks[BITMAP_ELEMS_MAX];
+
+	memset(flush_slinks, 0, BITMAP_SIZE(l));
+	memset(flush_error, 0, BITMAP_SIZE(l));
+	memset(stall_slinks, 0, BITMAP_SIZE(l));
+
+	/* First walk the entry list setting region nosync state. */
+	list_for_each_entry(entry, L_SLINK_COPY_LIST(l),
+			    lists.l[E_WRITE_OR_COPY]) {
+		if (!entry_is_slow(entry, slow_slinks) ||
+		    entry_endios_pending(entry))
+			break;
+
+		region = &entry->data.header->region;
+
+		/* Needed for E_COPY_CONTEXT_LIST() access. */
+		read_lock_irq(&l->lists.lock);
+
+		/* Walk the copy context list. */
+		list_for_each_entry_safe(cc, n, E_COPY_CONTEXT_LIST(entry),
+					 list) {
+			slink = cc->slink;
+			_BUG_ON_PTR(slink);
+			slink_nr = slink->ops->slink_number(slink);
+			_BUG_ON_SLINK_NR(l, slink_nr);
+
+			/* Stall IO policy set. */
+			if (slink_stall(slink)) {
+				DMINFO_LIMIT("slink=%lu stall", slink_nr);
+				/*
+				 * Keep stall policy in bitarray
+				 * to avoid policy change race.
+				 */
+				slink_set_bit(slink_nr, stall_slinks);
+				l->stats.stall++;
+				continue;
+			}
+
+			r = slink->ops->in_sync(slink,
+						region->dev, region->sector);
+			if (r)
+				slink_set_bit(slink_nr, flush_slinks);
+
+			r = slink->ops->set_sync(slink, region->dev,
+						 region->sector, 0);
+			BUG_ON(r);
+		}
+
+		read_unlock_irq(&l->lists.lock);
+	}
+
+	/*
+	 * The dirty logs of all devices on this slink must be flushed in
+	 * this second step for performance reasons before advancing the
+	 * ring head.
+	 */
+	for_each_bit(slink_nr, (void *) flush_slinks, l->slink.max) {
+		slink = slink_find(l, slink_nr);
+		r = slink->ops->flush_sync(slink);
+
+		if (unlikely(r)) {
+			/*
+			 * What happens when the region is
+			 * marked but not flushed? Will we
+			 * still get an endio?
+			 * This code assumes not. -JEM
+			 *
+			 * If a region is marked sync, the slink
+			 * code won't select it for resync,
+			 * Hence we got to keep the buffer entries,
+			 * because we can't assume resync is
+			 * ever going to happen. -HJM
+			 */
+			DMERR_LIMIT("error flushing dirty logs "
+				    "on slink=%d",
+				    slink->ops->slink_number(slink));
+			slink_set_bit(slink_nr, flush_error);
+		} else {
+			/* Trigger resynchronization on slink. */
+			r = slink->ops->resync(slink, 1);
+			BUG_ON(r);
+		}
+	}
+
+	/* Now release copy contexts, declaring copy completion. */
+	list_for_each_entry(entry, L_SLINK_COPY_LIST(l),
+			    lists.l[E_WRITE_OR_COPY]) {
+		if (!entry_is_slow(entry, slow_slinks) ||
+		    entry_endios_pending(entry))
+			break;
+
+		/* Needed for E_COPY_CONTEXT_LIST() access. */
+		write_lock_irq(&l->lists.lock);
+
+		/* Walk the copy context list. */
+		list_for_each_entry_safe(cc, n, E_COPY_CONTEXT_LIST(entry),
+					 list) {
+			slink = cc->slink;
+			slink_nr = slink->ops->slink_number(slink);
+
+			/* Stall IO policy set. */
+			if (slink_test_bit(slink_nr, stall_slinks))
+				continue;
+
+			/* Error flushing dirty log, keep entry. */
+			if (unlikely(slink_test_bit(slink_nr, flush_error)))
+				continue;
+
+			BUG_ON(list_empty(&cc->list));
+			list_del_init(&cc->list);
+
+			/* Do not reference cc after this call. */
+			slink_copy_complete(cc);
+		}
+
+		write_unlock_irq(&l->lists.lock);
+	}
+
+	/*
+	 * Now advance the head pointer to free up room in the ring buffer.
+	 * In case we fail here, we've got both entries in the ring buffer
+	 * *and* nosync regions to recover.
+	 */
+	ring_buffer_advance_head(ring, 1);
+}
+
+/*
+ * This function is called to free up some ring buffer space when a
+ * full condition is encountered.  The basic idea is to walk through
+ * the list of outstanding copies and see which slinks are slow to
+ * respond.  Then, we free up as many of the entries as possible and
+ * advance the ring head.
+ */
+static void
+ring_check_fallback(struct ring_buffer *ring)
+{
+	int r;
+	struct repl_log *l = ring_buffer_repl_log(ring);
+	static uint64_t slow_slinks[BITMAP_ELEMS_MAX];
+
+	/*
+	 * First, check to see if we can simply
+	 * free entries at the head of the ring.
+	 */
+	r = ring_buffer_advance_head(ring, 1);
+	if (r > 0) {
+		DMINFO_LIMIT("%s: able to advance head", __func__);
+		return;
+	}
+
+	/*
+	 * Check to see if any entries at the head of the ring buffer
+	 * are currently queued for completion.  If they are, then
+	 * don't do anything here; simply allow the I/O completion to
+	 * proceed.
+	 */
+	if (ring_buffer_head_busy(l)) {
+		DMINFO_LIMIT("%s: endios pending.", __func__);
+		return;
+	}
+
+	/*
+	 * Take a look at the first entry in the copy list with outstanding
+	 * I/O and figure out which slinks are holding up progress.
+	 */
+	memset(slow_slinks, 0, BITMAP_SIZE(l));
+
+	r = find_slow_slinks(l, slow_slinks);
+	if (r) {
+		DMINFO_LIMIT("%s: slow slinks found.", __func__);
+		/*
+		 * Now, walk the copy list from the beginning and free
+		 * any entry which is awaiting copy completion from the
+		 * slow slinks. Once we hit an entry which is awaiting
+		 * completion from an slink other than the slow ones, we stop.
+		 */
+		repl_log_cancel_copies(l, slow_slinks);
+	} else
+		DMINFO_LIMIT("%s: no slow slinks found.", __func__);
+}
+
+static int
+entry_error(struct ring_buffer_entry *entry)
+{
+	struct entry_data *data = &entry->data;
+
+	if (unlikely(data->error.header ||
+		     data->error.data)) {
+		if (data->error.header)
+			ring_buffer_error(RING_BUFFER_HEADER_ERROR,
+					  entry->ring, -EIO);
+
+		if (data->error.data)
+			ring_buffer_error(RING_BUFFER_DATA_ERROR,
+					  entry->ring, -EIO);
+
+		return -EIO;
+	}
+
+	return 0;
+}
+
+/*
+ *  Ring buffer endio processing.  The ring buffer tail cannot be
+ *  advanced until both the data and data_header portions are written
+ *  to the log, AND all of the buffer I/O's preceding this one are in
+ *  the log have completed.
+ */
+#define	MIN_ENTRIES_INACTIVE	128
+static void
+do_ring_buffer_endios(struct repl_log *l)
+{
+	int active = 0, r;
+	unsigned count = 0;
+	struct ring_buffer *ring = &l->ring_buffer;
+	struct ring_buffer_entry *entry, *entry_last = NULL, *n;
+
+	DBG_DMINFO("%s", __func__);
+
+	/*
+	 * The l->lists.entry.io list is sorted by on-disk order. The first
+	 * entry in the list will correspond to the current ring buffer tail
+	 * plus the size of the last valid entry.  We process endios in
+	 * order so that the tail is not advanced past unfinished entries.
+	 */
+
+	list_for_each_entry_safe(entry, n, L_ENTRY_RING_WRITE_LIST(l),
+				 lists.l[E_WRITE_OR_COPY]) {
+		if (atomic_read(&entry->endios)) {
+			active = 1;
+			break;
+		}
+
+		count++;
+		entry_last = entry;
+	}
+
+	/* Optimize advancing ringbuffer tail pointer. */
+	if (!entry_last || entry_last->being_copied ||
+	    (active && count < MIN_ENTRIES_INACTIVE))
+		return;
+
+	/* Update the tail pointer once for a list of entries. */
+	DMINFO_LIMIT("%s advancing ring buffer tail %u entries",
+		     __func__, count);
+	r = ring_buffer_advance_tail(entry_last);
+
+	/* Now check for any errored entries. */
+	list_for_each_entry_safe(entry, n, L_ENTRY_RING_WRITE_LIST(l),
+				 lists.l[E_WRITE_OR_COPY]) {
+		struct entry_data *data = &entry->data;
+
+		_BUG_ON_PTR(data->disk_header);
+		free_data_header_disk(data->disk_header, ring);
+		data->disk_header = NULL;
+
+		ss_io_put(__func__, l->slink0->caller);
+
+		/*
+		 * Tail update error before or header/data
+		 * ring buffer write error -> error bio.
+		 */
+		if (unlikely(r || entry_error(entry)))
+			entry_endio_invalid(__func__, l, entry);
+		else {
+			/*
+			 * Handle the slink policy for sync vs. async here.
+			 *
+			 * Synchronous link means, that endio needs to be
+			 * reported *after* the slink copy of the entry
+			 * succeeded and *not* after the entry got stored
+			 * in the ring buffer. -HJM
+			 */
+			entry_nosync_endio(__func__, entry);
+
+			/*
+			 * Account entry for fallbehind
+			 * and put on slink copy list.
+			 */
+			entry_account_and_copy(entry);
+		}
+
+		if (entry == entry_last)
+			break;
+	}
+
+	/* On ring full, check if we need to fall back to bitmap mode. */
+	if (RingBufferFull(ring))
+		ring_check_fallback(ring);
+
+	/* Wake up any waiters. */
+	wake_up(&ring->flushq);
+}
+
+/* Write a data header. */
+static inline void
+header_write(struct repl_log *l, struct ring_buffer_entry *entry)
+{
+	int r;
+
+	_BUG_ON_PTR(l);
+	_BUG_ON_PTR(entry);
+
+	r = data_header_io(WRITE, l, entry->data.header,
+			   entry->data.header->pos.header);
+	if (unlikely(r < 0)) {
+		/*
+		 * If the header update fails, we suspend I/O
+		 * to the ring device.  We need a way to recover
+		 * from this.  It's not clear to me what the
+		 * most likely reasons for this failure would be.
+		 * If it's transient (SAN storage cable pulled), then
+		 * we should do the same sort of liveness checks
+		 * that are implemented in the slink code.
+		 *
+		 * For now, we are going to keep the endio on
+		 * the list for processing it later.
+		 *
+		 * We presume the log backing device to be HA. -HJM
+		 */
+		ring_buffer_error(RING_BUFFER_HEADER_ERROR, &l->ring_buffer, r);
+	}
+}
+
+/*
+ * Work all site link endios (i.e. all slink_copy contexts).
+ */
+static struct slink_copy_context *
+cc_pop(struct repl_log *l)
+{
+	struct slink_copy_context *cc;
+
+	/* Pop copy_context from copy contexts list. */
+	if (list_empty(L_SLINK_ENDIO_LIST(l)))
+		cc = NULL;
+	else {
+		cc = list_first_entry(L_SLINK_ENDIO_LIST(l),
+				      struct slink_copy_context, list);
+		list_del(&cc->list);
+	}
+
+	return cc;
+}
+
+static void
+do_slink_endios(struct repl_log *l)
+{
+	LIST_HEAD(slink_endios);
+	struct ring_buffer *ring = &l->ring_buffer;
+	struct ring_buffer_entry *entry = NULL;
+
+	DBG_DMINFO("%s", __func__);
+
+	while (1) {
+		int slink_nr;
+		struct slink_copy_context *cc;
+		struct dm_repl_slink *slink;
+
+		/* Pop copy_context from copy contexts list. */
+		write_lock_irq(&l->lists.lock);
+		cc = cc_pop(l);
+		if (!cc) {
+			write_unlock_irq(&l->lists.lock);
+			break;
+		}
+
+		/* No active copy on endios list! */
+		BUG_ON(atomic_read(&cc->cnt));
+
+		slink = cc->slink;
+		entry = cc->entry;
+
+		/* Do not reference cc after this call. */
+		slink_copy_complete(cc);
+
+		write_unlock_irq(&l->lists.lock);
+
+		_BUG_ON_PTR(slink);
+		_BUG_ON_PTR(slink->ops);
+		_BUG_ON_PTR(entry);
+
+		/*
+		 * All reads are serviced from slink0 (for now), so mark
+		 * sectors as no longer under I/O once the copy to slink0
+		 * is complete.
+		 */
+		slink_nr = slink->ops->slink_number(slink);
+		_BUG_ON_SLINK_NR(l, slink_nr);
+		if (!slink_nr)
+			sector_range_clear_busy(entry);
+
+		/* If all synchronous site links processed, endio here. */
+		entry_nosync_endio(__func__, entry);
+	}
+
+	/*
+	 * If all slinks are up-to-date, then we can advance
+	 * the ring buffer head pointer and remove the entry
+	 * from the slink copy list.
+	 */
+	ring_buffer_advance_head(ring, 0);
+}
+
+/*
+ * Read a bio (partially) of off log:
+ *
+ * o check if bio's data is completely in the log
+ *   -> redirect N reads to the log
+ *   (N = 1 for simple cases to N > 1)
+ * o check if bio's data is split between log and LD
+ *   -> redirect N parts to the log
+ *   -> redirect 1 part to the LD
+ * o if bio'data is on the LD
+ */
+#define DO_INFO1 \
+DBG_DMINFO("%s overlap for bio_range.start=%llu bio_range.end=%llu " \
+       "entry_range.start=%llu entry_range.end=%llu", __func__, \
+       (unsigned long long) bio_range.start, \
+       (unsigned long long) bio_range.end, \
+       (unsigned long long) entry_range.start, \
+       (unsigned long long) entry_range.end);
+#define DO_INFO2 \
+DBG_DMINFO("%s NO overlap for bio_range.start=%llu bio_range.end=%llu " \
+       "entry_range.start=%llu entry_range.end=%llu", __func__, \
+       (unsigned long long) bio_range.start, \
+       (unsigned long long) bio_range.end, \
+       (unsigned long long) entry_range.start, \
+       (unsigned long long) entry_range.end);
+static inline int
+bio_read(struct repl_log *l, struct bio *bio, struct list_head *buckets[2])
+{
+	int r;
+	unsigned i;
+	struct ring_buffer_entry *entry;
+	struct sector_range bio_range = {
+		.start = bio_begin(bio),
+		.end = bio_end(bio),
+	}, entry_range;
+
+	/* Figure overlapping areas. */
+	r = 0;
+	for (i = 0; buckets[i] && i < 2; i++) {
+		/* Find entry from end of bucket. */
+		list_for_each_entry_reverse(entry, buckets[i],
+					    lists.l[E_BUSY_HASH]) {
+			entry_range.start = entry->data.header->region.sector;
+			entry_range.end = entry_range.start +
+			round_up_to_sector(entry->data.header->region.size);
+
+			if (ranges_overlap(&bio_range, &entry_range)) {
+				if (bio_range.start >= entry_range.start &&
+				    bio_range.end <= entry_range.end) {
+					sector_t off;
+
+					entry->bios.read = bio;
+					DO_INFO1;
+					off = bio_range.start -
+					      entry_range.start;
+					ring_buffer_read_bio_vec(l, entry, off,
+								 bio_vec(bio),
+								 bio->bi_size);
+					return 0;
+				} else
+					DO_INFO2;
+			} else
+				goto out;
+		}
+	}
+
+	/*
+	 * slink->ops->io() will check if region is in sync
+	 * and return -EAGAIN in case the I/O needs
+	 * to be delayed. Returning -ENODEV etc. is fatal.
+	 *
+	 * WARNING: bio->bi_bdev changed after return!
+	 */
+	/*
+	 * Reading of off log:
+	 * o check if bio's data is completely in the log
+	 *   -> redirect N reads to the log
+	 *   (N = 1 for simple cases to N > 1)
+	 * o check if bio's data is split between log and LD
+	 *   -> redirect N parts to the log
+	 *   -> redirect 1 part to the LD
+	 * o if bio'data is on the LD
+	 */
+out:
+	return -EAGAIN;
+}
+#undef DO_INFO1
+#undef DO_INFO2
+
+static int
+ring_buffer_read_bio(struct repl_log *l, struct bio *bio)
+{
+	int r;
+	struct ring_buffer *ring = &l->ring_buffer;
+	struct dm_repl_slink *slink0 = slink_find(l, 0);
+	struct list_head *buckets[2];
+
+	if (IS_ERR(slink0))
+		return PTR_ERR(slink0);
+
+	/*
+	 * Check if there's writes pending to the area the bio intends
+	 * to read and if so, satisfy request from ring buffer.
+	 */
+	r = ring_buffer_writes_pending(&ring->busy_sectors, bio, buckets);
+	if (r < 0) /* Entry busy. */
+		return r;
+
+	/* We've got writes in the log for this bio. */
+	if (r) {
+		/* REMOVEME: statistics. */
+		atomic_inc(&l->stats.writes_pending);
+		r = bio_read(l, bio, buckets);
+	/* Simple case: no writes in the log for this bio. */
+	} else {
+		/*
+		 * slink->ops->io() will check if region is in sync
+		 * and return -EAGAIN in case the I/O needs
+		 * to be delayed. Returning -ENODEV etc. is fatal.
+		 *
+		 * WARNING: bio->bi_bdev changed after return!
+		 */
+		r = slink0->ops->io(slink0, bio, 0);
+		if (r < 0)
+			/* No retry possibility is fatal. */
+			BUG_ON(unlikely(r != -EAGAIN));
+	}
+
+	return r;
+}
+
+/* Work on any IOS queued into the ring buffer. */
+static void
+do_ring_buffer_ios(struct repl_log *l)
+{
+	int r;
+	struct bio *bio;
+	struct bio_list ios_in;
+	struct ring_buffer *ring;
+
+	DBG_DMINFO("%s %u start", __func__, jiffies_to_msecs(jiffies));
+
+	bio_list_init(&ios_in);
+	ring = &l->ring_buffer;
+
+	/* Quickly grab the bio input list. */
+	spin_lock(&l->io.lock);
+	bio_list_merge(&ios_in, &l->io.in);
+	bio_list_init(&l->io.in);
+	spin_unlock(&l->io.lock);
+
+	while ((bio = bio_list_pop(&ios_in))) {
+		/* FATAL: ring buffer I/O error ocurred! */
+		if (unlikely(RingBufferError(ring)))
+			bio_endio(bio, -EIO);
+		else if (bio_data_dir(bio) == READ) {
+			r = ring_buffer_read_bio(l, bio);
+			/* We have to wait. */
+			if (r < 0) {
+				bio_list_push(&ios_in, bio);
+				break;
+			}
+		} else
+			/* Insert new write bio into ring buffer. */
+			ring_buffer_write_entry(l, bio);
+	}
+
+	DBG_DMINFO("%s %u end ", __func__, jiffies_to_msecs(jiffies), writes);
+
+	if (!bio_list_empty(&ios_in)) {
+		spin_lock(&l->io.lock);
+		bio_list_merge_head(&l->io.in, &ios_in);
+		spin_unlock(&l->io.lock);
+	}
+}
+
+/* Drop reference on a copy context and put on endio list on last drop. */
+static void
+slink_copy_context_put(struct slink_copy_context *cc)
+{
+	DBG_DMINFO("%s", __func__);
+
+	if (atomic_dec_and_test(&cc->cnt)) {
+		int slink_nr;
+		unsigned long flags;
+		struct repl_log *l = ring_buffer_repl_log(cc->entry->ring);
+		struct dm_repl_slink *slink = cc->slink;
+
+		/* last put, schedule completion */
+		DBG_DMINFO("last put, scheduling do_log");
+
+		_BUG_ON_PTR(l);
+		_BUG_ON_PTR(slink);
+		slink_nr = slink->ops->slink_number(slink);
+		_BUG_ON_SLINK_NR(l, slink_nr);
+
+		write_lock_irqsave(&l->lists.lock, flags);
+		BUG_ON(list_empty(&cc->list));
+		list_move_tail(&cc->list, L_SLINK_ENDIO_LIST(l));
+		write_unlock_irqrestore(&l->lists.lock, flags);
+
+		wake_do_log(l);
+	} else
+		BUG_ON(atomic_read(&cc->cnt) < 0);
+}
+
+enum slink_endio_type { SLINK_ENDIO_RAM, SLINK_ENDIO_DISK };
+static void
+_slink_copy_endio(enum slink_endio_type type, int read_err, int write_err,
+		  void *context)
+{
+	struct slink_copy_context *cc = context;
+	struct slink_copy_error *error;
+
+	DBG_DMINFO("%s", __func__);
+	_BUG_ON_PTR(cc);
+	error = cc->error;
+
+	if (type == SLINK_ENDIO_RAM) {
+		/* On error, no disk callback will be performed. */
+		if (unlikely(read_err || write_err))
+			atomic_dec(&cc->cnt);
+
+		error += ERR_RAM;
+	} else
+		error += ERR_DISK;
+
+	error->read = read_err;
+	error->write = write_err;
+	slink_copy_context_put(cc);
+}
+
+/* Callback for copy in RAM. */
+static void
+slink_copy_ram_endio(int read_err, int write_err, void *context)
+{
+	_slink_copy_endio(SLINK_ENDIO_RAM, read_err, write_err, context);
+}
+
+/* Callback for copy on disk. */
+static void
+slink_copy_disk_endio(int read_err, int write_err, void *context)
+{
+	_slink_copy_endio(SLINK_ENDIO_DISK, read_err, write_err, context);
+}
+
+/*
+ * Called back when site link recovered from failure or recovered a region.
+ *
+ * If we're being called on device recovery, read_err > 0.
+ */
+static void
+slink_recover_callback(int read_err, int write_err, void *context)
+{
+	unsigned slink_nr;
+	struct repl_log *l;
+	struct slink_state *ss = context;
+
+	_BUG_ON_PTR(ss);
+	l = ss->l;
+	_BUG_ON_PTR(l);
+	slink_nr = ss->slink_nr;
+	_BUG_ON_SLINK_NR(l, slink_nr);
+
+	DBG_DMINFO_LIMIT("%s slink=%d", __func__, slink_nr);
+
+	spin_lock(&l->io.lock);
+	if (!read_err && !write_err)
+		slink_clear_bit(slink_nr, LOG_SLINKS_INACCESSIBLE(l));
+	else
+		slink_set_bit(slink_nr, LOG_SLINKS_INACCESSIBLE(l));
+	spin_unlock(&l->io.lock);
+
+	/* Inform caller, that we're willing to receive more I/Os. */
+	notify_caller(l, WRITE, 0);
+
+	/* Wakeup worker to allow for further IO. */
+	wake_do_log(l);
+}
+
+/* Initialize slink_copy global properties independent of entry. */
+static inline void
+slink_copy_init(struct repl_log *l, struct dm_repl_slink_copy *slink_copy)
+{
+	/*
+	 * The source block device (ie. the ring buffer device)
+	 * is the same for all I/Os.
+	 */
+	slink_copy->src.type = DM_REPL_SLINK_BLOCK_DEVICE;
+	slink_copy->src.dev.bdev = repl_log_bdev(l);
+
+	/* The destination is identified by slink and device number. */
+	slink_copy->dst.type = DM_REPL_SLINK_DEV_NUMBER;
+
+	/* RAM, disk, slink recovery callbacks. */
+	slink_copy->ram.fn = slink_copy_ram_endio;
+	slink_copy->disk.fn = slink_copy_disk_endio;
+}
+
+/* Initialize slink_copy global properties dependent of entry. */
+static inline void
+slink_copy_addr(struct ring_buffer_entry *entry,
+		struct dm_repl_slink_copy *slink_copy)
+{
+	struct data_header *header = entry->data.header;
+	struct data_header_region *region;
+
+	_BUG_ON_PTR(header);
+	region = &header->region;
+	_BUG_ON_PTR(region);
+
+	/* The offset/size to copy from is given by the entry. */
+	slink_copy->src.sector = header->pos.data;
+
+	/* Most of the destination is the same across slinks. */
+	slink_copy->dst.dev.number.dev = region->dev;
+	slink_copy->dst.sector = region->sector;
+	slink_copy->size = region->size;
+}
+
+/* Allocate and initialize and slink_copy_context structure. */
+static inline struct slink_copy_context *
+slink_copy_context_alloc(struct ring_buffer_entry *entry,
+			 struct dm_repl_slink *slink)
+{
+	struct slink_copy_context *cc = alloc_copy_context(entry->ring);
+
+	BUG_ON(!cc);
+	memset(cc, 0, sizeof(*cc));
+
+	/* 1 for each of the 2 callbacks per copy (RAM and disk). */
+	atomic_set(&cc->cnt, 2);
+	cc->entry = entry;
+	cc->slink = slink;
+	cc->start_jiffies = jiffies;
+	return cc;
+}
+
+/* Trigger/prohibit resynchronization on all site links. */
+enum resync_switch { RESYNC_OFF = 0, RESYNC_ON };
+static void
+resync_on_off(struct repl_log *l, enum resync_switch resync)
+{
+	unsigned long slink_nr;
+	struct dm_repl_slink *slink;
+
+	for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+		slink = slink_find(l, slink_nr);
+		if (!IS_ERR(slink))
+			slink->ops->resync(slink, resync);
+	}
+}
+
+/*
+ * Work all site link copy orders.
+ */
+static void
+do_slink_ios(struct repl_log *l)
+{
+	unsigned slinks;
+	unsigned long slink_nr;
+	struct ring_buffer_entry *entry;
+	struct dm_repl_slink *slink;
+	static struct dm_repl_slink_copy slink_copy;
+
+	/* Check, if all slinks active. */
+	slinks = 0;
+	for_each_bit(slink_nr, LOG_SLINKS_IOS(l), l->slink.max)
+		slinks++;
+
+	if (slinks >= l->slink.count)
+		return;
+
+	/* If there's no entries on the copy list, allow resync. */
+	if (list_empty(L_SLINK_COPY_LIST(l)))
+		return resync_on_off(l, RESYNC_ON);
+
+	/* ...else prohibit resync. */
+	resync_on_off(l, RESYNC_OFF);
+
+	/*
+	 * This list is ordered, how do we keep it so that endio processing
+	 * is ordered?  we need this so that head pointer advances in order.
+	 *
+	 * We do that by changing ring_buffer_advance_head() to check
+	 * for entry_busy(l, ENTRY_SLINKS(entry))) and stop processing. -HJM
+	 */
+
+	/* Initialize global properties, which are independent of the entry. */
+	slink_copy_init(l, &slink_copy);
+
+	/* Walk all entries on the slink copy list. */
+	slinks = 0;
+	list_for_each_entry(entry, L_SLINK_COPY_LIST(l),
+			    lists.l[E_WRITE_OR_COPY]) {
+		unsigned copies = 0;
+
+		/* Set common parts endependent of slink up. */
+		slink_copy_addr(entry, &slink_copy);
+
+		/* Walk all slinks, which still need this entry. */
+		for_each_bit(slink_nr, ENTRY_SLINKS(entry), l->slink.max) {
+			int r, teardown;
+			struct slink_copy_context *cc;
+			struct slink_state *ss;
+
+			/* If we worked on all slinks -> bail out. */
+			if (slinks >= l->slink.count)
+				return;
+
+			/*
+			 * One maximum write pending to slink already
+			 * -or-
+			 * slink is recovering this region.
+			 */
+			if (slink_test_bit(slink_nr, LOG_SLINKS_IOS(l)) ||
+			    slink_test_bit(slink_nr,
+					   LOG_SLINKS_INACCESSIBLE(l))) {
+				slinks++;
+				continue;
+			}
+
+			/* Check for deleted site link. */
+			slink = slink_find(l, slink_nr);
+			if (unlikely(IS_ERR(slink))) {
+				DMERR_LIMIT("%s no slink!", __func__);
+				ss = NULL;
+				teardown = 0;
+			} else {
+				ss = slink->caller;
+				_BUG_ON_PTR(ss);
+				teardown = SsTeardown(ss);
+			}
+
+			if (unlikely(IS_ERR(slink) ||
+				     teardown ||
+				     !slink_test_bit(slink_nr,
+						     LOG_SLINKS(l)))) {
+drop_copy:
+				slinks++;
+				if (IS_ERR(slink))
+					DMERR_LIMIT("%s: slink %lu not "
+						    "configured!",
+						    __func__, slink_nr);
+				else
+					/* Correct fallbehind account. */
+					slink_fallbehind_dec(slink, entry);
+
+				/* Flag entry copied to slink_nr. */
+				slink_clear_bit(slink_nr, ENTRY_SLINKS(entry));
+
+				/* Reset any sync copy request to slink_nr. */
+				slink_clear_bit(slink_nr, ENTRY_SYNC(entry));
+
+				if (!slink_nr)
+					sector_range_clear_busy(entry);
+
+				continue;
+			}
+
+			/* Take slink reference out. */
+			ss_io_get(__func__, ss);
+
+			/* Fill in the destination slink number. */
+			slink_copy.dst.dev.number.slink = slink_nr;
+
+			DBG_DMINFO("slink0->ops->copy() from log, sector=%llu, "
+				   "size=%u to dev_number=%d, sector=%llu "
+				   "on slink=%u",
+				   (unsigned long long) slink_copy.src.sector,
+				   slink_copy.size,
+				   slink_copy.dst.dev.number.dev,
+				   (unsigned long long) slink_copy.dst.sector,
+				   slink_copy.dst.dev.number.slink);
+
+			/* Flag active copy to slink+entry, */
+			slink_set_bit(slink_nr, LOG_SLINKS_IOS(l));
+			slink_set_bit(slink_nr, ENTRY_IOS(entry));
+
+			/* Setup the callback data. */
+			cc = slink_copy_context_alloc(entry, slink);
+			slink_copy.ram.context = slink_copy.disk.context = cc;
+
+			/*
+			 * Add to entrys copy list of active copies in
+			 * order to avoid race with ->copy() endio function
+			 * accessing cc->list.
+			 */
+			write_lock_irq(&l->lists.lock);
+			list_add_tail(&cc->list, E_COPY_CONTEXT_LIST(entry));
+			write_unlock_irq(&l->lists.lock);
+
+			/*
+			 * slink->ops->copy() may return:
+			 *
+			 * o -EAGAIN in case of prohibiting I/O because
+			 *    of device inaccessibility/suspension
+			 *    or device I/O errors
+			 *    (i.e. link temporarilly down) ->
+			 *    caller is allowed to retry the I/O later once
+			 *    he'll have received a callback.
+			 *
+			 * o -EACCES in case a region is being resynchronized
+			 *    and the source region is being read to copy data
+			 *    accross to the same region of the replica (RD) ->
+			 *    caller is allowed to retry the I/O later once
+			 *    he'll have received a callback.
+			 *
+			 * o -ENODEV in case a device is not configured
+			 *    caller must drop the I/O to the device/slink pair.
+			 *
+			 * o -EPERM in case a region is out of sync ->
+			 *    caller must drop the I/O to the device/slink pair.
+			 */
+			r = slink->ops->copy(slink, &slink_copy, 0);
+			if (unlikely(r < 0)) {
+				/* Failed -> take off entrys copies list. */
+				write_lock_irq(&l->lists.lock);
+				list_del_init(&cc->list);
+				write_unlock_irq(&l->lists.lock);
+
+				DMINFO_LIMIT("Copy to slink%d/dev%d/"
+					     "sector=%llu failed with %d.",
+					     slink_copy.dst.dev.number.slink,
+					     slink_copy.dst.dev.number.dev,
+					     (unsigned long long)
+					     slink_copy.dst.sector,
+					     r);
+
+				/* Reset active I/O on slink+entry. */
+				slink_clear_bit(slink_nr, LOG_SLINKS_IOS(l));
+				slink_clear_bit(slink_nr, ENTRY_IOS(entry));
+				free_copy_context(cc, entry->ring);
+
+				/* Release slink reference. */
+				ss_io_put(__func__, ss);
+
+				/*
+				 * Source region is being read for recovery
+				 * or device is temporarilly inaccessible.
+				 */
+				if (r == -EACCES ||
+				    r == -EAGAIN) {
+					slink_set_bit(slink_nr,
+						LOG_SLINKS_INACCESSIBLE(l));
+
+				/*
+				 * Device not on slink
+				 * -or-
+				 * region not in sync -> avoid copy.
+				 */
+				} else if (r == -ENODEV ||
+					 r == -EPERM)
+					goto drop_copy;
+				else
+					BUG();
+			} else
+				copies++;
+
+			slinks++;
+		}
+
+		if (copies)
+			l->stats.copy[copies > 1]++;
+	}
+}
+#undef DO_ERR_LIMIT
+
+/* Unplug device queues with entries on all site links. */
+static void
+do_unplug(struct repl_log *l)
+{
+	struct dm_repl_slink *slink;
+	unsigned long slink_nr;
+
+	/* Conditionally unplug ring buffer. */
+	if (TestClearRingBufferIOQueued(&l->ring_buffer))
+		blk_unplug(bdev_get_queue(ring_buffer_bdev(&l->ring_buffer)));
+
+	/* Unplug any devices with queued IO on site links. */
+	for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+		slink = slink_find(l, slink_nr);
+		if (!IS_ERR(slink))
+			slink->ops->unplug(slink);
+	}
+}
+
+/* Take out/drop slink state references to synchronize with slink delition. */
+enum reference_type { REF_GET, REF_PUT };
+static inline void
+ss_ref(enum reference_type type, struct repl_log *l)
+{
+	unsigned long slink_nr;
+	void (*f)(const char *, struct slink_state *) =
+		type == REF_GET ? ss_io_get : ss_io_put;
+
+	if (!l->slink0)
+		return;
+
+	for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+		struct dm_repl_slink *slink =
+			l->slink0->ops->slink(l->replog, slink_nr);
+
+		_BUG_ON_PTR(slink);
+		f(__func__, slink->caller);
+	}
+}
+
+/*
+ * Worker thread.
+ *
+ * Belabour any:
+ * o replicator log ring buffer initialization
+ * o endios on the ring buffer
+ * o endios on any site links
+ * o I/O on site links (copies of buffer entries via site links to [LR]Ds
+ * o I/O to the ring buffer
+ *
+ */
+static void
+do_log(struct work_struct *ws)
+{
+	struct repl_log *l = container_of(ws, struct repl_log, io.ws);
+	struct ring_buffer *ring = &l->ring_buffer;
+
+	/* Take out references vs. removal races. */
+	spin_lock(&l->io.lock);
+	ss_ref(REF_GET, l);
+	spin_unlock(&l->io.lock);
+
+	if (!RingSuspended(ring))
+		do_log_init(l);
+
+	/* Allow for endios at any time, even while suspended. */
+	do_ring_buffer_endios(l); /* Must be called before do_slink_ios */
+	do_slink_endios(l);
+
+	/* Don't allow for new I/Os while suspended. */
+	if (!RingSuspended(ring)) {
+		do_ring_buffer_ios(l);
+		do_slink_ios(l);
+		do_unplug(l);
+	}
+
+	ss_ref(REF_PUT, l);
+}
+
+/*
+ * Start methods of "default" type
+ */
+/* Destroy a replicator log context. */
+static void
+ringbuffer_dtr(struct dm_repl_log *log, struct dm_target *ti)
+{
+	struct repl_log *l;
+
+	DMINFO("%s: log %p", __func__, log);
+	_SET_AND_BUG_ON_L(l, log);
+
+	/* Remove from the global list of replogs. */
+	mutex_lock(&list_mutex);
+	list_del_init(L_REPLOG_LIST(l));
+	mutex_unlock(&list_mutex);
+
+	replog_destroy(l);
+	BUG_ON(!replog_put(log, ti));
+}
+
+/*
+ * Construct a replicator log context.
+ *
+ * Arguments:
+ * 	#replog_params dev_path dev_start [auto/create/open [size]
+ *
+ * dev_path = device path of replication log (REPLOG) backing store
+ * dev_start = offset in sectors to REPLOG header
+ *
+ * auto = causes open of an REPLOG with a valid header or
+ *        creation of a new REPLOG in case the header's invalid.
+ * <#replog_params> = 2 or (3 and ´open´)
+ *      -> the cache device must be initialized or the constructor will fail.
+ * <#replog_params> = 4 and ´auto´"
+ * 	-> if not already initialized, the log device will get initialized
+ * 	   and sized to "size", otherwise it'll be opened.
+ * <#replog_params> = 4 and 'create'
+ * 	-> the log device will get initialized if not active and sized to
+ *         "size"; if the REPLOG is active 'create' will fail.
+ *
+ * The above roughly translates to:
+ *  argv[0] == #params
+ *  argv[1] == dev_name
+ *  argv[2] == dev_start
+ *  argv[3] == OT_OPEN|OT_CREATE|OT_AUTO
+ *  argv[4] == size in sectors
+ */
+#define	MIN_ARGS	3
+static int
+ringbuffer_ctr(struct dm_repl_log *log, struct dm_target *ti,
+	       unsigned argc, char **argv)
+{
+	int open_type, params;
+	unsigned long long tmp;
+	struct repl_log *l;
+	struct repl_params p;
+
+	SHOW_ARGV;
+
+	if (unlikely(argc < MIN_ARGS))
+		DM_EINVAL("%s: at least 3 args required, only got %d\n",
+			  __func__, argc);
+
+	memset(&p, 0, sizeof(p));
+
+	/* Get # of parameters. */
+	if (unlikely(sscanf(argv[0], "%d", &params) != 1 ||
+	    params < 2 ||
+	    params > 5)) {
+		DM_EINVAL("invalid replicator log device start");
+	} else
+		p.count = params;
+
+	if (params == 2)
+		open_type = OT_OPEN;
+	else {
+		open_type = _open_type(argv[3]);
+		if (unlikely(open_type < 0))
+			return -EINVAL;
+		else if (unlikely(open_type == OT_OPEN && params > 3))
+			DM_EINVAL("3 arguments required for open, %d given.",
+				  params);
+	}
+
+	p.open_type = open_type;
+
+	if (params > 3) {
+		/* Get device size argument. */
+		if (unlikely(sscanf(argv[4], "%llu", &tmp) != 1 ||
+		    tmp < LOG_SIZE_MIN)) {
+			DM_EINVAL("invalid replicator log device size");
+		} else
+			p.dev.size = tmp;
+
+	} else
+		p.dev.size = LOG_SIZE_MIN;
+
+	if (unlikely((open_type == OT_AUTO || open_type == OT_CREATE) &&
+		     params < 4))
+		DM_EINVAL("4 arguments required for auto and create");
+
+	/* Get device start argument. */
+	if (unlikely(sscanf(argv[2], "%llu", &tmp) != 1))
+		DM_EINVAL("invalid replicator log device start");
+	else
+		p.dev.start = tmp;
+
+	/* Get a reference on the replog. */
+	l = replog_get(log, ti, argv[1], &p);
+	if (unlikely(IS_ERR(l)))
+		return PTR_ERR(l);
+
+	DMINFO("%s: successfully opened/created log %p", __func__, log);
+	return 0;
+}
+
+/* Flush the current log contents. This function may block. */
+static int
+ringbuffer_flush(struct dm_repl_log *log)
+{
+	struct repl_log *l;
+	struct ring_buffer *ring;
+
+	DMINFO("%s", __func__);
+	_SET_AND_BUG_ON_L(l, log);
+	ring = &l->ring_buffer;
+
+	wake_do_log(l);
+	wait_event(ring->flushq, ring_buffer_empty(ring));
+	return 0;
+}
+
+/* Suspend method. */
+/*
+ * FIXME: we're suspending/resuming the whole ring buffer,
+ *	  not just the device requested. Avoiding this complete
+ *	  suspension would afford knowledge on the reason for the suspension.
+ *	  E.g. in case of device removal, we could avoid suspending completely.
+ *	  Don't know how we can optimize this w/o a bitmap
+ *	  for the devices, hence limiting dev_numbers. -HJM
+ */
+static int
+ringbuffer_postsuspend(struct dm_repl_log *log, int dev_number)
+{
+	struct repl_log *l;
+
+	_SET_AND_BUG_ON_L(l, log);
+	flush_workqueue(l->io.wq);
+
+	if (unlikely(TestSetRingSuspended(&l->ring_buffer)))
+		DMWARN("%s ring buffer already suspended", __func__);
+	else
+		DMINFO("%s suspended ring buffer", __func__);
+
+	flush_workqueue(l->io.wq);
+	ss_all_wait_on_ios(l);
+	return 0;
+}
+
+/* Resume method. */
+static int
+ringbuffer_resume(struct dm_repl_log *log, int dev_number)
+{
+	struct repl_log *l;
+	struct ring_buffer *ring;
+
+	_SET_AND_BUG_ON_L(l, log);
+
+	ring = &l->ring_buffer;
+	if (likely(TestClearRingSuspended(ring)))
+		DMINFO("%s resuming ring buffer", __func__);
+	else
+		DMWARN("%s ring buffer already resumed", __func__);
+
+	ClearRingBlocked(ring);
+	notify_caller(l, WRITE, 0);
+	wake_do_log(l);
+	return 0;
+}
+
+/*
+ * Queue a bio to the worker thread ensuring, that
+ * there's enough space for writes in the ring buffer.
+ */
+static inline int
+queue_bio(struct repl_log *l, struct bio *bio)
+{
+	int rw = bio_data_dir(bio);
+	struct ring_buffer *ring = &l->ring_buffer;
+
+	/*
+	 * Try reserving space for the bio in the
+	 * buffer and mark the sector range busy.
+	 */
+	if (rw == WRITE) {
+		int r;
+
+		mutex_lock(&ring->mutex);
+		r = ring_buffer_reserve_space(ring, bio);
+		mutex_unlock(&ring->mutex);
+
+		/* Ring buffer full. */
+		if (r < 0)
+			return r;
+	}
+
+	spin_lock(&l->io.lock);
+	bio_list_add(&l->io.in, bio);
+	spin_unlock(&l->io.lock);
+
+	/* REMOVEME: statistics. */
+	atomic_inc(l->stats.io + !!rw);
+	wake_do_log(l);
+	return 0;
+}
+
+/*
+ * Read a bio either from a replicator log's ring buffer
+ * or from the replicated device if no buffer entry.
+ * - or-
+ * write a bio to a replicator log's ring
+ * buffer (increments buffer tail).
+ *
+ * This includes buffer allocation in case of a write and
+ * inititation of copies accross an/multiple SLINK(s).
+ *
+ * In case of a read with (partial) writes in the buffer,
+ * the replog may postpone the read until the buffer content has
+ * been copied accross the local SLINK *or* optimize by reading
+ * (parts of) the bio off the buffer.
+ */
+/*
+ * Returns 0 on success, -EWOULDBLOCK if this is a WRITE request
+ * and buffer space could not be allocated.  Returns -EWOULDBLOCK if
+ * this is a READ request and the call would block due to the
+ * requested region being currently under WRITE I/O.
+ */
+static int
+ringbuffer_io(struct dm_repl_log *log, struct bio *bio, unsigned long long tag)
+{
+	int r = 0;
+	struct repl_log *l;
+	struct ring_buffer *ring;
+
+	_SET_AND_BUG_ON_L(l, log);
+	ring = &l->ring_buffer;
+
+	if (RingBlocked(ring) ||
+	    !LogInitialized(l))
+		goto out_blocked;
+
+	if (unlikely(RingSuspended(ring)))
+		goto set_blocked;
+
+	/*
+	 * Queue writes to the daemon in order to avoid sleeping
+	 * on allocations. queue_bio() checks to see if there is
+	 * enough space in the log for this bio and all of the
+	 * other bios currently queued for the daemon.
+	 */
+	r = queue_bio(l, bio);
+	if (!r)
+		return r;
+
+set_blocked:
+	SetRingBlocked(ring);
+out_blocked:
+	DBG_DMWARN("%s Ring blocked", __func__);
+	return -EWOULDBLOCK;
+}
+
+/* Set maximum slink # for bitarray access optimization. */
+static void replog_set_slink_max(struct repl_log *l)
+{
+	unsigned long bit_nr;
+
+	for_each_bit(bit_nr, LOG_SLINKS(l), MAX_DEFAULT_SLINKS)
+		l->slink.max = bit_nr;
+
+	l->slink.max++;
+	l->slink.bitmap_elems =
+		dm_div_up(dm_div_up(l->slink.max, BITS_PER_BYTE),
+			  sizeof(uint64_t));
+	l->slink.bitmap_size = l->slink.bitmap_elems * sizeof(uint64_t);
+}
+
+/* Set replog global I/O notification function and context. */
+static void
+ringbuffer_io_notify_fn_set(struct dm_repl_log *log,
+			 dm_repl_notify_fn fn, void *notify_context)
+{
+	struct repl_log *l;
+
+	_SET_AND_BUG_ON_L(l, log);
+
+	spin_lock(&l->io.lock);
+	l->notify.fn = fn;
+	l->notify.context = notify_context;
+	spin_unlock(&l->io.lock);
+}
+
+/* Add (tie) a site link to a replication log for SLINK copy processing. */
+static int
+ringbuffer_slink_add(struct dm_repl_log *log, struct dm_repl_slink *slink)
+{
+	int slink_nr;
+	struct repl_log *l;
+	struct slink_state *ss;
+
+	/* FIXME: XXX lock the repl_log */
+	DMINFO("ringbuffer_slink_add");
+	_BUG_ON_PTR(slink);
+	_SET_AND_BUG_ON_L(l, log);
+
+	/* See if slink was already added. */
+	slink_nr = slink->ops->slink_number(slink);
+	if (slink_nr >= MAX_DEFAULT_SLINKS)
+		DM_EINVAL("slink number larger than maximum "
+			  "for 'default' replication log.");
+
+	DMINFO("%s: attempting to add slink%d", __func__, slink_nr);
+
+	/* No entry -> add a new one. */
+	ss = kzalloc(sizeof(*ss), GFP_KERNEL);
+	if (unlikely(!ss))
+		return -ENOMEM;
+
+	ss->slink_nr = slink_nr;
+	ss->l = l;
+	atomic_set(&ss->io.in_flight, 0);
+	init_waitqueue_head(&ss->io.waiters);
+
+	spin_lock(&l->io.lock);
+	if (unlikely(slink->caller)) {
+		spin_unlock(&l->io.lock);
+		kfree(ss);
+		DMERR("slink already exists.");
+		return -EEXIST;
+	}
+
+	ClearSsTeardown(ss);
+
+	/* Keep slink state reference. */
+	slink->caller = ss;
+
+	if (!slink_nr)
+		l->slink0 = slink;
+
+	l->slink.count++;
+
+	/* Set site link recovery notification. */
+	slink->ops->recover_notify_fn_set(slink, slink_recover_callback, ss);
+
+	/* Update log_header->slinks bit mask before setting max slink #! */
+	slink_set_bit(slink_nr, LOG_SLINKS(l));
+
+	/* Set maximum slink # for bitarray access optimization. */
+	replog_set_slink_max(l);
+
+	spin_unlock(&l->io.lock);
+	return 0;
+}
+
+/* Remove (untie) a site link from a replication log. */
+/*
+ * How do we tell if this is a configuration change or just a shutdown?
+ * After _repl_ctr, the RDs on the site link are either there or not.
+ */
+static int
+ringbuffer_slink_del(struct dm_repl_log *log, struct dm_repl_slink *slink)
+{
+	int r, slink_nr;
+	struct repl_log *l;
+	struct ring_buffer *ring;
+	struct slink_state *ss;
+
+	DMINFO("%s", __func__);
+	_BUG_ON_PTR(slink);
+	_SET_AND_BUG_ON_L(l, log);
+	ring = &l->ring_buffer;
+
+	/* Find entry to be deleted. */
+	slink_nr = slink->ops->slink_number(slink);
+	DMINFO("%s slink_nr=%d", __func__, slink_nr);
+
+	spin_lock(&l->io.lock);
+	ss = slink->caller;
+	if (likely(ss)) {
+		BUG_ON(atomic_read(&ss->io.in_flight));
+
+		/* No new I/Os on this slink and no duplicate deletion calls. */
+		if (TestSetSsTeardown(ss)) {
+			spin_unlock(&l->io.lock);
+			return -EPERM;
+		}
+
+		/* Wait on worker and any async I/O to finish on site link. */
+		do {
+			spin_unlock(&l->io.lock);
+			ss_wait_on_io(ss);
+			spin_lock(&l->io.lock);
+
+			if (!ss_io(ss)) {
+				slink_clear_bit(slink_nr, LOG_SLINKS(l));
+				slink->caller = NULL;
+				slink->ops->recover_notify_fn_set(slink,
+								  NULL, NULL);
+				if (!slink_nr)
+					l->slink0 = NULL;
+
+				l->slink.count--;
+				/* Set l->slink.max. */
+				replog_set_slink_max(l);
+			}
+		} while (slink->caller);
+
+		spin_unlock(&l->io.lock);
+
+		BUG_ON(l->slink.count < 0);
+		kfree(ss);
+		DMINFO("%s removed slink=%u", __func__, slink_nr);
+		r = 0;
+	} else {
+		spin_unlock(&l->io.lock);
+		r = -EINVAL;
+	}
+
+	wake_do_log(l);
+	return r;
+}
+
+/* Return head of the list of site links for this replicator log. */
+static struct dm_repl_log_slink_list
+*ringbuffer_slinks(struct dm_repl_log *log)
+{
+	struct repl_log *l;
+
+	_SET_AND_BUG_ON_L(l, log);
+	return &l->lists.slinks;
+}
+
+/* Return maximum number of supported site links. */
+static int
+ringbuffer_slink_max(struct dm_repl_log *log)
+{
+	return MAX_DEFAULT_SLINKS;
+}
+
+/*
+ * Message interface
+ *
+ * 'sta[tistics] {on,of[f],r[eset]}'		# e.g. 'stat of'
+ */
+static int
+ringbuffer_message(struct dm_repl_log *log, unsigned argc, char **argv)
+{
+	static const char stat[] = "statistics";
+	struct repl_log *l;
+
+	_SET_AND_BUG_ON_L(l, log);
+	if (argc != 2)
+		DM_EINVAL("Invalid number of arguments.");
+
+	if (!strnicmp(STR_LEN(argv[0], stat))) {
+		if (!strnicmp(STR_LEN(argv[1], "on")))
+			set_bit(LOG_DEVEL_STATS, &l->io.flags);
+		else if (!strnicmp(STR_LEN(argv[1], "off")))
+			clear_bit(LOG_DEVEL_STATS, &l->io.flags);
+		else if (!strnicmp(STR_LEN(argv[1], "reset")))
+			stats_init(l);
+		else
+			DM_EINVAL("Invalid '%s' arguments.", stat);
+	} else
+		DM_EINVAL("Invalid argument.");
+
+	return 0;
+}
+
+/* Support function for replicator log status requests. */
+static int
+ringbuffer_status(struct dm_repl_log *log, int dev_number,
+		  status_type_t type, char *result, unsigned int maxlen)
+{
+	unsigned long slink_nr;
+	size_t sz = 0;
+	sector_t ios, sectors;
+	static char buf[BDEVNAME_SIZE];
+	struct repl_log *l;
+	struct stats *s;
+	struct ring_buffer *ring;
+	struct repl_params *p;
+
+	_SET_AND_BUG_ON_L(l, log);
+	s = &l->stats;
+	ring = &l->ring_buffer;
+	p = &l->params;
+
+	switch (type) {
+	case STATUSTYPE_INFO:
+		ios = sectors = 0;
+
+		/* Output ios/sectors stats. */
+		spin_lock(&l->io.lock);
+		for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+			struct dm_repl_slink *slink = slink_find(l, slink_nr);
+			struct slink_state *ss;
+
+			_BUG_ON_PTR(slink);
+			ss = slink->caller;
+			_BUG_ON_PTR(ss);
+
+			DMEMIT(" %s,%llu,%llu",
+			       SsSync(ss) ? "F" : "-",
+			       (unsigned long long) ss->fb.outstanding.ios,
+			       (unsigned long long) ss->fb.outstanding.sectors);
+			ios += ss->fb.outstanding.ios;
+			sectors += ss->fb.outstanding.sectors;
+		}
+
+		DMEMIT(" %llu/%llu",
+		       (unsigned long long) sectors,
+		       (unsigned long long) l->params.dev.size);
+
+		spin_unlock(&l->io.lock);
+
+		if (LogDevelStats(l))
+			DMEMIT(" ring->start=%llu "
+			       "ring->head=%llu ring->tail=%llu "
+			       "ring->next_avail=%llu ring->end=%llu "
+			       "ring_free=%llu wrap=%d r=%d w=%d wp=%d he=%d "
+			       "hash_insert=%u hash_insert_max=%u "
+			       "single=%u multi=%u stall=%u",
+			       (unsigned long long) ring->start,
+			       (unsigned long long) ring->head,
+			       (unsigned long long) ring->tail,
+			       (unsigned long long) ring->next_avail,
+			       (unsigned long long) ring->end,
+			       (unsigned long long) ring_free(ring),
+			       s->wrap,
+			       atomic_read(s->io + 0), atomic_read(s->io + 1),
+			       atomic_read(&s->writes_pending),
+			       atomic_read(&s->hash_elem),
+			       s->hash_insert, s->hash_insert_max,
+			       s->copy[0], s->copy[1],
+			       s->stall);
+
+		break;
+
+	case STATUSTYPE_TABLE:
+		DMEMIT("%s %d %s %llu", ringbuffer_type.type.name, p->count,
+		       format_dev_t(buf, p->dev.dm_dev->bdev->bd_dev),
+		       (unsigned long long) p->dev.start);
+
+		if (p->count > 2) {
+			DMEMIT(" %s", _open_str(p->open_type));
+
+			if (p->count > 3)
+				DMEMIT(" %llu",
+				       (unsigned long long) p->dev.size);
+		}
+	}
+
+	return 0;
+}
+
+/*
+ * End methods of "ring-buffer" type
+ */
+
+/* "ring-buffer" replication log type. */
+static struct dm_repl_log_type ringbuffer_type = {
+	.type.name = "ringbuffer",
+	.type.module = THIS_MODULE,
+
+	.ctr = ringbuffer_ctr,
+	.dtr = ringbuffer_dtr,
+
+	.postsuspend = ringbuffer_postsuspend,
+	.resume = ringbuffer_resume,
+	.flush = ringbuffer_flush,
+	.io = ringbuffer_io,
+	.io_notify_fn_set = ringbuffer_io_notify_fn_set,
+
+	.slink_add = ringbuffer_slink_add,
+	.slink_del = ringbuffer_slink_del,
+	.slinks = ringbuffer_slinks,
+	.slink_max = ringbuffer_slink_max,
+
+	.message = ringbuffer_message,
+	.status = ringbuffer_status,
+};
+
+/* Destroy kmem caches on module unload. */
+static int
+replog_kmem_caches_exit(void)
+{
+	struct cache_defs *pd = ARRAY_END(cache_defs);
+
+	while (pd-- > cache_defs) {
+		if (unlikely(!pd->slab_pool))
+			continue;
+
+		DMINFO("Destroying kmem_cache %p", pd->slab_pool);
+		kmem_cache_destroy(pd->slab_pool);
+		pd->slab_pool = NULL;
+	}
+
+	return 0;
+}
+
+/* Create kmem caches on module load. */
+static int
+replog_kmem_caches_init(void)
+{
+	int r = 0;
+	struct cache_defs *pd = ARRAY_END(cache_defs);
+
+	while (pd-- > cache_defs) {
+		BUG_ON(pd->slab_pool);
+
+		/* No slab pool. */
+		if (!pd->size)
+			continue;
+
+		pd->slab_pool = kmem_cache_create(pd->slab_name, pd->size,
+						  pd->align, 0, NULL);
+		if (likely(pd->slab_pool))
+			DMINFO("Created kmem_cache %p", pd->slab_pool);
+		else {
+			DMERR("failed to create slab %s for replication log "
+			      " handler %s %s",
+			      pd->slab_name, ringbuffer_type.type.name,
+			      version);
+			replog_kmem_caches_exit();
+			r = -ENOMEM;
+			break;
+		}
+	}
+
+	return r;
+}
+
+int __init
+dm_repl_log_init(void)
+{
+	int r;
+
+	if (sizeof(struct data_header_disk) != DATA_HEADER_DISK_SIZE)
+		DM_EINVAL("invalid size of 'struct data_header_disk' for %s %s",
+			  ringbuffer_type.type.name, version);
+
+	mutex_init(&list_mutex);
+
+	r = replog_kmem_caches_init();
+	if (r < 0) {
+		DMERR("failed to init %s %s kmem caches",
+		      ringbuffer_type.type.name, version);
+		return r;
+	}
+
+	r = dm_register_type(&ringbuffer_type, DM_REPLOG);
+	if (r < 0) {
+		DMERR("failed to register replication log handler %s %s [%d]",
+		      ringbuffer_type.type.name, version, r);
+		replog_kmem_caches_exit();
+	} else
+		DMINFO("registered replication log handler %s %s",
+		       ringbuffer_type.type.name, version);
+
+	return r;
+}
+
+void __exit
+dm_repl_log_exit(void)
+{
+	int r = dm_unregister_type(&ringbuffer_type, DM_REPLOG);
+
+	replog_kmem_caches_exit();
+
+	if (r)
+		DMERR("failed to unregister replication log handler %s %s [%d]",
+		       ringbuffer_type.type.name, version, r);
+	else
+		DMINFO("unregistered replication log handler %s %s",
+		       ringbuffer_type.type.name, version);
+}
+
+/* Module hooks */
+module_init(dm_repl_log_init);
+module_exit(dm_repl_log_exit);
+
+MODULE_DESCRIPTION(DM_NAME " remote replication target \"ringbuffer\" "
+			   "log handler");
+MODULE_AUTHOR("Jeff Moyer <jmoyer redhat com>, "
+	      "Heinz Mauelshagen <heinzm redhat com");
+MODULE_LICENSE("GPL");
diff --git a/drivers/md/dm-repl-log.h b/drivers/md/dm-repl-log.h
new file mode 100644
index 0000000..bb14095
--- /dev/null
+++ b/drivers/md/dm-repl-log.h
@@ -0,0 +1,120 @@
+/*
+ * Copyright (C) 2008,2009  Red Hat, Inc. All rights reserved.
+ *
+ * Module Author: Heinz Mauelshagen (Mauelshagen RedHat com)
+ *
+ * This file is released under the GPL.
+ */
+
+/*
+ * API calling convention to create a replication mapping:
+ *
+ * 1. get a replicator log handle, hence creating a new persistent
+ *    log or accessing an existing one
+ * 2. get an slink handle, hence creating a new transient
+ *    slink or accessing an existing one
+ * 2(cont). repeat the previous step for multiple slinks (eg. one for
+ *    local and one for remote device access)
+ * 3. bind a (remote) device to a particlar slink created in a previous step
+ * 3(cont). repeat the device binding for any additional devices on that slink
+ * 4. bind the created slink which has device(s) bound to it to the replog
+ * 4(cont). repeat the slink binding to the replog for all created slinks
+ * 5. call the replog io function for each IO.
+ *
+ * Reverse steps 1-4 to tear a replication mapping down, hence freeing all
+ * transient resources allocated to it.
+ */
+
+#ifndef _DM_REPL_LOG_H
+#define _DM_REPL_LOG_H
+
+#include "dm-repl.h"
+#include "dm-registry.h"
+#include "dm-repl-slink.h"
+
+/* Handle to access a replicator log. */
+struct dm_repl_log {
+	struct dm_repl_log_type *ops;
+	void *context;
+};
+
+/* List of site links hanging off of each replicator log. */
+struct dm_repl_log_slink_list {
+	rwlock_t lock;
+	struct list_head list; /* List of site links hanging of off this log. */
+	void *context; /* Caller context. */
+};
+
+struct dm_repl_log_type {
+	struct dm_registry_type type;
+
+	/* Construct/destruct a replicator log. */
+	int (*ctr)(struct dm_repl_log *, struct dm_target *,
+		   unsigned argc, char **argv);
+	void (*dtr)(struct dm_repl_log *, struct dm_target *);
+
+	/*
+	 * There are times when we want the log to be quiet.
+	 * Ie. no entries of the log will be copied accross site links.
+	 */
+	int (*postsuspend)(struct dm_repl_log *log, int dev_number);
+	int (*resume)(struct dm_repl_log *log, int dev_number);
+
+	/* Flush the current log contents. This function may block. */
+	int (*flush)(struct dm_repl_log *log);
+
+	/*
+	 * Read a bio either from a replicator logs backing store
+	 * (if supported) or from the replicated device if no buffer entry.
+	 * - or-
+	 * write a bio to a replicator logs backing store buffer.
+	 *
+	 * This includes buffer allocation in case of a write and
+	 * inititation of copies accross an/multiple site link(s).
+	 *
+	 * In case of a read with (partial) writes in the buffer,
+	 * the replog may postpone the read until the buffer content has
+	 * been copied accross the local site link *or* optimize by reading
+	 * (parts of) the bio off the buffer.
+	 *
+	 * Tag us a unique tag identifying a data set.
+	 */
+	int (*io)(struct dm_repl_log *, struct bio *, unsigned long long tag);
+
+	/* Endio function to call from dm_repl core on bio endio processing. */
+	int (*endio) (struct dm_repl_log *, struct bio *bio, int error,
+		      union map_info *map_context);
+
+	/* Set global I/O completion notification function and context- */
+	void (*io_notify_fn_set)(struct dm_repl_log *,
+				 dm_repl_notify_fn, void *context);
+
+	/*
+	 * Add (tie) a site link to a replication
+	 * log for site link copy processing.
+	 */
+	int (*slink_add)(struct dm_repl_log *, struct dm_repl_slink *);
+
+	/* Remove (untie) a site link from a replication log. */
+	int (*slink_del)(struct dm_repl_log *, struct dm_repl_slink *);
+
+	/*
+	 * Return list of site links added to a replication log.
+	 *
+	 * This method eases slink handler coding to
+	 * keep such replication log site link list.
+	 */
+	struct dm_repl_log_slink_list *(*slinks)(struct dm_repl_log *);
+
+	/* Return maximum number of supported site links. */
+	int (*slink_max)(struct dm_repl_log *);
+
+	/* REPLOG messages. */
+	int (*message)(struct dm_repl_log *, unsigned argc, char **argv);
+
+	/* Support function for replicator log status requests. */
+	int (*status)(struct dm_repl_log *, int dev_number, status_type_t,
+		      char *result, unsigned maxlen);
+};
+
+#endif /* #ifndef _DM_REPL_LOG_H */
-- 
1.6.2.5


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]