From: Akira Hayakawa <ruby.wktk@gmail.com>
To: snitzer@redhat.com
Cc: gregkh@linuxfoundation.org, devel@driverdev.osuosl.org,
linux-kernel@vger.kernel.org, dm-devel@redhat.com,
cesarb@cesarb.net, joe@perches.com, akpm@linux-foundation.org,
agk@redhat.com, m.chehab@samsung.com, ejt@redhat.com,
dan.carpenter@oracle.com, ruby.wktk@gmail.com
Subject: Re: Reworking dm-writeboost [was: Re: staging: Add dm-writeboost]
Date: Thu, 26 Sep 2013 10:47:23 +0900 [thread overview]
Message-ID: <5243922B.6070705@gmail.com> (raw)
In-Reply-To: <20130925173757.GA24076@redhat.com>
Hi, Mike
The monolithic source code (3.2k)
is nicely splitted into almost 20 *.c files
according to the functionality and
data strucutures in OOP style.
The aim of this posting
is to share how the splitting looks like.
I believe that
at least reading the *.h files
can convince you the splitting is clear.
The code is now tainted with
almost 20 version switch macros
and WB* debug macros
but I will clean them up
for sending patch.
Again,
the latest code can be cloned by
git clone https://github.com/akiradeveloper/dm-writeboost.git
I will make few updates to the source codes on this weekend
so please track it to follow the latest version.
Below is only the snapshot.
Akira
---------- Summary ----------
33 Makefile
10 bigarray.h
19 cache-alloc.h
10 defer-barrier.h
8 dirty-sync.h
8 flush-daemon.h
10 format-cache.h
24 handle-io.h
16 hashtable.h
18 migrate-daemon.h
7 migrate-modulator.h
12 queue-flush-job.h
8 rambuf.h
13 recover.h
18 segment.h
8 superblock-recorder.h
9 target.h
30 util.h
384 writeboost.h
99 bigarray.c
192 cache-alloc.c
36 defer-barrier.c
33 dirty-sync.c
85 flush-daemon.c
234 format-cache.c
553 handle-io.c
109 hashtable.c
345 migrate-daemon.c
41 migrate-modulator.c
169 queue-flush-job.c
52 rambuf.c
308 recover.c
118 segment.c
61 superblock-recorder.c
376 target.c
126 util.c
---------- Makefile ----------
KERNEL_TREE := /lib/modules/$(shell uname -r)/build
# KERNEL_TREE := $(HOME)/linux-$(KERN_VERSION)
PWD := $(shell pwd)
# EXTRA_CFLAGS += -O0 -DCONFIG_DM_DEBUG -fno-inline #-Wall
# EXTRA_CFLAGS += -O2 -UCONFIG_DM_DEBUG
obj-m := dm-writeboost.o
dm-writeboost-objs := \
target.o \
handle-io.o \
queue-flush-job.o \
flush-daemon.o \
migrate-daemon.o \
migrate-modulator.o \
defer-barrier.o \
superblock-recorder.o \
dirty-sync.o \
bigarray.o \
segment.o \
hashtable.o \
cache-alloc.o \
format-cache.o \
recover.o \
rambuf.o \
util.o
all:
$(MAKE) -C $(KERNEL_TREE) M=$(PWD) modules
clean:
$(MAKE) -C $(KERNEL_TREE) M=$(PWD) clean
---------- bigarray.h ----------
#ifndef WRITEBOOST_BIGARRAY_H
#define WRITEBOOST_BIGARRAY_H
#include "writeboost.h"
struct bigarray;
struct bigarray *make_bigarray(size_t elemsize, size_t nr_elems);
void kill_bigarray(struct bigarray *);
void *bigarray_at(struct bigarray *, size_t i);
#endif
---------- cache-alloc.h ----------
#ifndef WRITEBOOST_CACHE_ALLOC_H
#define WRITEBOOST_CACHE_ALLOC_H
#include "writeboost.h"
#include "segment.h"
#include "flush-daemon.h"
#include "migrate-daemon.h"
#include "migrate-modulator.h"
#include "rambuf.h"
#include "hashtable.h"
#include "superblock-recorder.h"
#include "dirty-sync.h"
#include "recover.h"
#include "defer-barrier.h"
#include "handle-io.h"
int __must_check resume_cache(struct wb_cache *, struct dm_dev *);
void free_cache(struct wb_cache *);
#endif
---------- defer-barrier.h ----------
#ifndef WRITEBOOST_DEFER_BARRIER_H
#define WRITEBOOST_DEFER_BARRIER_H
#include "writeboost.h"
#include "queue-flush-job.h"
void queue_barrier_io(struct wb_cache *, struct bio *);
void flush_barrier_ios(struct work_struct *);
void barrier_deadline_proc(unsigned long data);
#endif
---------- dirty-sync.h ----------
#ifndef WRITEBOOST_DIRTY_SYNC_H
#define WRITEBOOST_DIRTY_SYNC_H
#include "writeboost.h"
#include "queue-flush-job.h"
void sync_proc(struct work_struct *);
#endif
---------- flush-daemon.h ----------
#ifndef WRITEBOOST_FLUSH_DAEMON_H
#define WRITEBOOST_FLUSH_DAEMON_H
#include "writeboost.h"
#include "util.h"
void flush_proc(struct work_struct *);
#endif
---------- format-cache.h ----------
#ifndef WRITEBOOST_FORMAT_CACHE_H
#define WRITEBOOST_FORMAT_CACHE_H
#include "writeboost.h"
#include "util.h"
#include "segment.h"
int __must_check audit_cache_device(struct dm_dev *, bool *cache_valid);
int __must_check format_cache_device(struct dm_dev *);
#endif
---------- handle-io.h ----------
#ifndef WRITEBOOST_HANDLE_IO_H
#define WRITEBOOST_HANDLE_IO_H
#include "writeboost.h"
#include "bigarray.h"
#include "util.h"
#include "defer-barrier.h"
#include "hashtable.h"
#include "segment.h"
#include "queue-flush-job.h"
int writeboost_map(struct dm_target *, struct bio *
#if LINUX_VERSION_CODE < PER_BIO_VERSION
, union map_info *
#endif
);
int writeboost_end_io(struct dm_target *, struct bio *, int error
#if LINUX_VERSION_CODE < PER_BIO_VERSION
, union map_info *
#endif
);
void inc_nr_dirty_caches(struct wb_device *);
void clear_stat(struct wb_cache *);
#endif
---------- hashtable.h ----------
#ifndef WRITEBOOST_HASHTABLE_H
#define WRITEBOOST_HASHTABLE_H
#include "writeboost.h"
#include "segment.h"
int __must_check ht_empty_init(struct wb_cache *);
cache_nr ht_hash(struct wb_cache *, struct lookup_key *);
struct metablock *ht_lookup(struct wb_cache *,
struct ht_head *, struct lookup_key *);
void ht_register(struct wb_cache *, struct ht_head *,
struct lookup_key *, struct metablock *);
void ht_del(struct wb_cache *, struct metablock *);
void discard_caches_inseg(struct wb_cache *,
struct segment_header *);
#endif
---------- migrate-daemon.h ----------
#ifndef WRITEBOOST_MIGRATE_DAEMON_H
#define WRITEBOOST_MIGRATE_DAEMON_H
#include "writeboost.h"
#include "util.h"
#include "segment.h"
u8 atomic_read_mb_dirtiness(struct segment_header *,
struct metablock *);
void cleanup_mb_if_dirty(struct wb_cache *,
struct segment_header *,
struct metablock *);
void migrate_proc(struct work_struct *);
void wait_for_migration(struct wb_cache *, size_t id);
#endif
---------- migrate-modulator.h ----------
#ifndef WRITEBOOST_MIGRATE_MODULATOR_H
#define WRITEBOOST_MIGRATE_MODULATOR_H
#include "writeboost.h"
void modulator_proc(struct work_struct *);
#endif
---------- queue-flush-job.h ----------
#ifndef WRITEBOOST_QUEUE_FLUSH_JOB
#define WRITEBOOST_QUEUE_FLUSH_JOB
#include "writeboost.h"
#include "segment.h"
#include "hashtable.h"
#include "util.h"
#include "migrate-daemon.h"
void queue_current_buffer(struct wb_cache *);
void flush_current_buffer(struct wb_cache *);
#endif
---------- rambuf.h ----------
#ifndef WRITEBOOST_RAMBUF_H
#define WRITEBOOST_RAMBUF_H
#include "writeboost.h"
int __must_check init_rambuf_pool(struct wb_cache *);
void free_rambuf_pool(struct wb_cache *);
#endif
---------- recover.h ----------
#ifndef WRITEBOOST_RECOVER_H
#define WRITEBOOST_RECOVER_H
#include "writeboost.h"
#include "util.h"
#include "segment.h"
#include "bigarray.h"
#include "hashtable.h"
#include "migrate-daemon.h"
#include "handle-io.h"
int __must_check recover_cache(struct wb_cache *);
#endif
---------- segment.h ----------
#ifndef WRITEBOOST_SEGMENT_H
#define WRITEBOOST_SEGMENT_H
#include "writeboost.h"
#include "segment.h"
#include "bigarray.h"
#include "util.h"
int __must_check init_segment_header_array(struct wb_cache *);
u64 calc_nr_segments(struct dm_dev *);
struct segment_header *get_segment_header_by_id(struct wb_cache *,
size_t segment_id);
sector_t calc_segment_header_start(size_t segment_idx);
sector_t calc_mb_start_sector(struct segment_header *, cache_nr mb_idx);
u32 calc_segment_lap(struct wb_cache *, size_t segment_id);
struct metablock *mb_at(struct wb_cache *, cache_nr idx);
bool is_on_buffer(struct wb_cache *, cache_nr mb_idx);
#endif
---------- superblock-recorder.h ----------
#ifndef WRITEBOOST_SUPERBLOCK_RECORDER_H
#define WRITEBOOST_SUPERBLOCK_RECORDER_H
#include "writeboost.h"
#include "util.h"
void recorder_proc(struct work_struct *);
#endif
---------- target.h ----------
#ifndef WRITEBOOST_TARGET_H
#define WRITEBOOST_TARGET_H
#include "writeboost.h"
#include "format-cache.h"
#include "cache-alloc.h"
#include "handle-io.h"
#include "util.h"
#endif
---------- util.h ----------
#ifndef WRITEBOOST_UTIL_H
#define WRITEBOOST_UTIL_H
#include "writeboost.h"
extern struct workqueue_struct *safe_io_wq;
extern struct dm_io_client *wb_io_client;
void *do_kmalloc_retry(size_t size, gfp_t flags, int lineno);
#define kmalloc_retry(size, flags) \
do_kmalloc_retry((size), (flags), __LINE__)
int dm_safe_io_internal(
struct dm_io_request *,
unsigned num_regions, struct dm_io_region *,
unsigned long *err_bits, bool thread, int lineno);
#define dm_safe_io(io_req, num_regions, regions, err_bits, thread) \
dm_safe_io_internal((io_req), (num_regions), (regions), \
(err_bits), (thread), __LINE__)
void dm_safe_io_retry_internal(
struct dm_io_request *,
unsigned num_regions, struct dm_io_region *,
bool thread, int lineno);
#define dm_safe_io_retry(io_req, num_regions, regions, thread) \
dm_safe_io_retry_internal((io_req), (num_regions), (regions), \
(thread), __LINE__)
sector_t dm_devsize(struct dm_dev *);
#endif
---------- writeboost.h ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#ifndef DM_WRITEBOOST_H
#define DM_WRITEBOOST_H
#define DM_MSG_PREFIX "writeboost"
#include <linux/module.h>
#include <linux/version.h>
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/mutex.h>
#include <linux/sched.h>
#include <linux/timer.h>
#include <linux/device-mapper.h>
#include <linux/dm-io.h>
#define WBERR(f, args...) \
DMERR("err@%d " f, __LINE__, ## args)
#define WBWARN(f, args...) \
DMWARN("warn@%d " f, __LINE__, ## args)
#define WBINFO(f, args...) \
DMINFO("info@%d " f, __LINE__, ## args)
/*
* (1 << x) sector.
* 4 <= x <= 11
* dm-writeboost supports segment size up to 1MB.
*
* All the comments are if
* the segment size is the maximum 1MB.
*/
#define WB_SEGMENTSIZE_ORDER 11
/*
* By default,
* we allocate 64 * 1MB RAM buffers statically.
*/
#define NR_RAMBUF_POOL 64
/*
* The first 4KB (1<<3 sectors) in segment
* is for metadata.
*/
#define NR_CACHES_INSEG ((1 << (WB_SEGMENTSIZE_ORDER - 3)) - 1)
/*
* The Detail of the Disk Format
*
* Whole:
* Superblock(1MB) Segment(1MB) Segment(1MB) ...
* We reserve the first segment (1MB) as the superblock.
*
* Superblock(1MB):
* head <---- ----> tail
* superblock header(512B) ... superblock record(512B)
*
* Segment(1MB):
* segment_header_device(4KB) metablock_device(4KB) * NR_CACHES_INSEG
*/
/*
* Superblock Header
* First one sector of the super block region.
* The value is fixed after formatted.
*/
/*
* Magic Number
* "WBst"
*/
#define WRITEBOOST_MAGIC 0x57427374
struct superblock_header_device {
__le32 magic;
} __packed;
/*
* Superblock Record (Mutable)
* Last one sector of the superblock region.
* Record the current cache status in need.
*/
struct superblock_record_device {
__le64 last_migrated_segment_id;
} __packed;
/*
* Cache line index.
*
* dm-writeboost can supoort a cache device
* with size less than 4KB * (1 << 32)
* that is 16TB.
*/
typedef u32 cache_nr;
/*
* Metadata of a 4KB cache line
*
* Dirtiness is defined for each sector
* in this cache line.
*/
struct metablock {
sector_t sector; /* key */
cache_nr idx; /* Const */
struct hlist_node ht_list;
/*
* 8 bit flag for dirtiness
* for each sector in cache line.
*
* Current implementation
* only recovers dirty caches.
* Recovering clean caches complicates the code
* but couldn't be effective
* since only few of the caches are clean.
*/
u8 dirty_bits;
};
/*
* On-disk metablock
*/
struct metablock_device {
__le64 sector;
u8 dirty_bits;
__le32 lap;
} __packed;
#define SZ_MAX (~(size_t)0)
struct segment_header {
struct metablock mb_array[NR_CACHES_INSEG];
/*
* ID uniformly increases.
* ID 0 is used to tell that the segment is invalid
* and valid id >= 1.
*/
u64 global_id;
/*
* Segment can be flushed half-done.
* length is the number of
* metablocks that must be counted in
* in resuming.
*/
u8 length;
cache_nr start_idx; /* Const */
sector_t start_sector; /* Const */
struct list_head migrate_list;
/*
* This segment can not be migrated
* to backin store
* until flushed.
* Flushed segment is in cache device.
*/
struct completion flush_done;
/*
* This segment can not be overwritten
* until migrated.
*/
struct completion migrate_done;
spinlock_t lock;
atomic_t nr_inflight_ios;
};
/*
* (Locking)
* Locking metablocks by their granularity
* needs too much memory space for lock structures.
* We only locks a metablock by locking the parent segment
* that includes the metablock.
*/
#define lockseg(seg, flags) spin_lock_irqsave(&(seg)->lock, flags)
#define unlockseg(seg, flags) spin_unlock_irqrestore(&(seg)->lock, flags)
/*
* On-disk segment header.
*
* Must be at most 4KB large.
*/
struct segment_header_device {
/* - FROM - At most512 byte for atomicity. --- */
__le64 global_id;
/*
* How many cache lines in this segments
* should be counted in resuming.
*/
u8 length;
/*
* On what lap in rorating on cache device
* used to find the head and tail in the
* segments in cache device.
*/
__le32 lap;
/* - TO -------------------------------------- */
/* This array must locate at the tail */
struct metablock_device mbarr[NR_CACHES_INSEG];
} __packed;
struct rambuffer {
void *data;
struct completion done;
};
enum STATFLAG {
STAT_WRITE = 0,
STAT_HIT,
STAT_ON_BUFFER,
STAT_FULLSIZE,
};
#define STATLEN (1 << 4)
struct lookup_key {
sector_t sector;
};
struct ht_head {
struct hlist_head ht_list;
};
struct wb_device;
struct wb_cache {
struct wb_device *wb;
struct dm_dev *device;
struct mutex io_lock;
cache_nr nr_caches; /* Const */
u64 nr_segments; /* Const */
struct bigarray *segment_header_array;
/*
* Chained hashtable
*
* Writeboost uses chained hashtable
* to cache lookup.
* Cache discarding often happedns
* This structure fits our needs.
*/
struct bigarray *htable;
size_t htsize;
struct ht_head *null_head;
cache_nr cursor; /* Index that has been written the most lately */
struct segment_header *current_seg;
struct rambuffer *current_rambuf;
struct rambuffer *rambuf_pool;
u64 last_migrated_segment_id;
u64 last_flushed_segment_id;
u64 reserving_segment_id;
/*
* Flush daemon
*
* Writeboost first queue the segment to flush
* and flush daemon asynchronously
* flush them to the cache device.
*/
struct work_struct flush_work;
struct workqueue_struct *flush_wq;
spinlock_t flush_queue_lock;
struct list_head flush_queue;
wait_queue_head_t flush_wait_queue;
/*
* Deferred ACK for barriers.
*/
struct work_struct barrier_deadline_work;
struct timer_list barrier_deadline_timer;
struct bio_list barrier_ios;
unsigned long barrier_deadline_ms; /* param */
/*
* Migration daemon
*
* Migartion also works in background.
*
* If allow_migrate is true,
* migrate daemon goes into migration
* if they are segments to migrate.
*/
struct work_struct migrate_work;
struct workqueue_struct *migrate_wq;
bool allow_migrate; /* param */
/*
* Batched Migration
*
* Migration is done atomically
* with number of segments batched.
*/
wait_queue_head_t migrate_wait_queue;
atomic_t migrate_fail_count;
atomic_t migrate_io_count;
struct list_head migrate_list;
u8 *dirtiness_snapshot;
void *migrate_buffer;
size_t nr_cur_batched_migration;
size_t nr_max_batched_migration; /* param */
/*
* Migration modulator
*
* This daemon turns on and off
* the migration
* according to the load of backing store.
*/
struct work_struct modulator_work;
bool enable_migration_modulator; /* param */
/*
* Superblock Recorder
*
* Update the superblock record
* periodically.
*/
struct work_struct recorder_work;
unsigned long update_record_interval; /* param */
/*
* Cache Synchronizer
*
* Sync the dirty writes
* periodically.
*/
struct work_struct sync_work;
unsigned long sync_interval; /* param */
/*
* on_terminate is true
* to notify all the background daemons to
* stop their operations.
*/
bool on_terminate;
atomic64_t stat[STATLEN];
};
struct wb_device {
struct dm_target *ti;
struct dm_dev *device;
struct wb_cache *cache;
u8 migrate_threshold;
atomic64_t nr_dirty_caches;
};
struct flush_job {
struct list_head flush_queue;
struct segment_header *seg;
/*
* The data to flush to cache device.
*/
struct rambuffer *rambuf;
/*
* List of bios with barrier flags.
*/
struct bio_list barrier_ios;
};
#define PER_BIO_VERSION KERNEL_VERSION(3, 8, 0)
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
struct per_bio_data {
void *ptr;
};
#endif
#endif
---------- bigarray.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
/*
* A array like structure
* that can contain million of elements.
* The aim of this class is the same as
* flex_array.
* The reason we don't use flex_array is
* that the class trades the performance
* to get the resizability.
* struct arr is fast and light-weighted.
*/
#include "bigarray.h"
struct part {
void *memory;
};
struct bigarray {
struct part *parts;
size_t nr_elems;
size_t elemsize;
};
#define ALLOC_SIZE (1 << 16)
static size_t nr_elems_in_part(struct bigarray *arr)
{
return ALLOC_SIZE / arr->elemsize;
};
static size_t nr_parts(struct bigarray *arr)
{
return dm_div_up(arr->nr_elems, nr_elems_in_part(arr));
}
struct bigarray *make_bigarray(size_t elemsize, size_t nr_elems)
{
size_t i, j;
struct part *part;
struct bigarray *arr = kmalloc(sizeof(*arr), GFP_KERNEL);
if (!arr) {
WBERR();
return NULL;
}
arr->elemsize = elemsize;
arr->nr_elems = nr_elems;
arr->parts = kmalloc(sizeof(struct part) * nr_parts(arr), GFP_KERNEL);
if (!arr->parts) {
WBERR();
goto bad_alloc_parts;
}
for (i = 0; i < nr_parts(arr); i++) {
part = arr->parts + i;
part->memory = kmalloc(ALLOC_SIZE, GFP_KERNEL);
if (!part->memory) {
WBERR();
for (j = 0; j < i; j++) {
part = arr->parts + j;
kfree(part->memory);
}
goto bad_alloc_parts_memory;
}
}
return arr;
bad_alloc_parts_memory:
kfree(arr->parts);
bad_alloc_parts:
kfree(arr);
return NULL;
}
void kill_bigarray(struct bigarray *arr)
{
size_t i;
for (i = 0; i < nr_parts(arr); i++) {
struct part *part = arr->parts + i;
kfree(part->memory);
}
kfree(arr->parts);
kfree(arr);
}
void *bigarray_at(struct bigarray *arr, size_t i)
{
size_t n = nr_elems_in_part(arr);
size_t j = i / n;
size_t k = i % n;
struct part *part = arr->parts + j;
return part->memory + (arr->elemsize * k);
}
---------- cache-alloc.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
/*
* Cache resume/free operations are provided.
* Resuming a cache is to construct in-core
* metadata structures from the metadata
* region in the cache device.
*/
#include "cache-alloc.h"
int __must_check resume_cache(struct wb_cache *cache, struct dm_dev *dev)
{
int r = 0;
cache->device = dev;
cache->nr_segments = calc_nr_segments(cache->device);
cache->nr_caches = cache->nr_segments * NR_CACHES_INSEG;
cache->on_terminate = false;
cache->allow_migrate = true;
cache->reserving_segment_id = 0;
mutex_init(&cache->io_lock);
cache->enable_migration_modulator = true;
cache->update_record_interval = 60;
cache->sync_interval = 60;
r = init_rambuf_pool(cache);
if (r) {
WBERR();
goto bad_init_rambuf_pool;
}
/*
* Select arbitrary one as the initial rambuffer.
*/
cache->current_rambuf = cache->rambuf_pool + 0;
r = init_segment_header_array(cache);
if (r) {
WBERR();
goto bad_alloc_segment_header_array;
}
r = ht_empty_init(cache);
if (r) {
WBERR();
goto bad_alloc_ht;
}
/*
* All in-core structures are allocated and
* initialized.
* Next, read metadata from the cache device.
*/
r = recover_cache(cache);
if (r) {
WBERR();
goto bad_recover;
}
/* Data structures for Migration */
cache->migrate_buffer = vmalloc(NR_CACHES_INSEG << 12);
if (!cache->migrate_buffer) {
WBERR();
goto bad_alloc_migrate_buffer;
}
cache->dirtiness_snapshot = kmalloc(
NR_CACHES_INSEG,
GFP_KERNEL);
if (!cache->dirtiness_snapshot) {
WBERR();
goto bad_alloc_dirtiness_snapshot;
}
cache->migrate_wq = create_singlethread_workqueue("migratewq");
if (!cache->migrate_wq) {
WBERR();
goto bad_migratewq;
}
cache->flush_wq = create_singlethread_workqueue("flushwq");
if (!cache->flush_wq) {
WBERR();
goto bad_flushwq;
}
/* Migration Daemon */
INIT_WORK(&cache->migrate_work, migrate_proc);
init_waitqueue_head(&cache->migrate_wait_queue);
INIT_LIST_HEAD(&cache->migrate_list);
atomic_set(&cache->migrate_fail_count, 0);
atomic_set(&cache->migrate_io_count, 0);
cache->nr_max_batched_migration = 1;
cache->nr_cur_batched_migration = 1;
queue_work(cache->migrate_wq, &cache->migrate_work);
/* Deferred ACK for barrier writes */
setup_timer(&cache->barrier_deadline_timer,
barrier_deadline_proc, (unsigned long) cache);
bio_list_init(&cache->barrier_ios);
/*
* Deadline is 3 ms by default.
* 2.5 us to process on bio
* and 3 ms is enough long to process 255 bios.
* If the buffer doesn't get full within 3 ms,
* we can doubt write starves
* by waiting formerly submitted barrier to be complete.
*/
cache->barrier_deadline_ms = 3;
INIT_WORK(&cache->barrier_deadline_work, flush_barrier_ios);
/* Flush Daemon */
INIT_WORK(&cache->flush_work, flush_proc);
spin_lock_init(&cache->flush_queue_lock);
INIT_LIST_HEAD(&cache->flush_queue);
init_waitqueue_head(&cache->flush_wait_queue);
queue_work(cache->flush_wq, &cache->flush_work);
/* Migartion Modulator */
INIT_WORK(&cache->modulator_work, modulator_proc);
schedule_work(&cache->modulator_work);
/* Superblock Recorder */
INIT_WORK(&cache->recorder_work, recorder_proc);
schedule_work(&cache->recorder_work);
/* Dirty Synchronizer */
INIT_WORK(&cache->sync_work, sync_proc);
schedule_work(&cache->sync_work);
clear_stat(cache);
return 0;
bad_flushwq:
destroy_workqueue(cache->migrate_wq);
bad_migratewq:
kfree(cache->dirtiness_snapshot);
bad_alloc_dirtiness_snapshot:
vfree(cache->migrate_buffer);
bad_alloc_migrate_buffer:
bad_recover:
kill_bigarray(cache->htable);
bad_alloc_ht:
kill_bigarray(cache->segment_header_array);
bad_alloc_segment_header_array:
free_rambuf_pool(cache);
bad_init_rambuf_pool:
kfree(cache);
return r;
}
void free_cache(struct wb_cache *cache)
{
cache->on_terminate = true;
/* Kill in-kernel daemons */
cancel_work_sync(&cache->sync_work);
cancel_work_sync(&cache->recorder_work);
cancel_work_sync(&cache->modulator_work);
cancel_work_sync(&cache->flush_work);
destroy_workqueue(cache->flush_wq);
cancel_work_sync(&cache->barrier_deadline_work);
cancel_work_sync(&cache->migrate_work);
destroy_workqueue(cache->migrate_wq);
kfree(cache->dirtiness_snapshot);
vfree(cache->migrate_buffer);
/* Destroy in-core structures */
kill_bigarray(cache->htable);
kill_bigarray(cache->segment_header_array);
free_rambuf_pool(cache);
}
---------- defer-barrier.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "defer-barrier.h"
void queue_barrier_io(struct wb_cache *cache, struct bio *bio)
{
mutex_lock(&cache->io_lock);
bio_list_add(&cache->barrier_ios, bio);
mutex_unlock(&cache->io_lock);
if (!timer_pending(&cache->barrier_deadline_timer))
mod_timer(&cache->barrier_deadline_timer,
msecs_to_jiffies(cache->barrier_deadline_ms));
}
void barrier_deadline_proc(unsigned long data)
{
struct wb_cache *cache = (struct wb_cache *) data;
schedule_work(&cache->barrier_deadline_work);
}
void flush_barrier_ios(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache,
barrier_deadline_work);
if (bio_list_empty(&cache->barrier_ios))
return;
flush_current_buffer(cache);
}
---------- dirty-sync.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "dirty-sync.h"
void sync_proc(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache, sync_work);
unsigned long intvl;
while (true) {
if (cache->on_terminate)
return;
/* sec -> ms */
intvl = cache->sync_interval * 1000;
if (!intvl) {
schedule_timeout_interruptible(msecs_to_jiffies(1000));
continue;
}
WBINFO();
flush_current_buffer(cache);
blkdev_issue_flush(cache->device->bdev, GFP_NOIO, NULL);
schedule_timeout_interruptible(msecs_to_jiffies(intvl));
}
}
---------- flush-daemon.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "flush-daemon.h"
void flush_proc(struct work_struct *work)
{
unsigned long flags;
struct wb_cache *cache =
container_of(work, struct wb_cache, flush_work);
while (true) {
struct flush_job *job;
struct segment_header *seg;
struct dm_io_request io_req;
struct dm_io_region region;
WBINFO();
spin_lock_irqsave(&cache->flush_queue_lock, flags);
while (list_empty(&cache->flush_queue)) {
spin_unlock_irqrestore(&cache->flush_queue_lock, flags);
wait_event_interruptible_timeout(
cache->flush_wait_queue,
(!list_empty(&cache->flush_queue)),
msecs_to_jiffies(100));
spin_lock_irqsave(&cache->flush_queue_lock, flags);
if (cache->on_terminate)
return;
}
/*
* Pop a fluch_context from a list
* and flush it.
*/
job = list_first_entry(
&cache->flush_queue, struct flush_job, flush_queue);
list_del(&job->flush_queue);
spin_unlock_irqrestore(&cache->flush_queue_lock, flags);
seg = job->seg;
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = job->rambuf->data,
};
region = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = seg->start_sector,
.count = (seg->length + 1) << 3,
};
dm_safe_io_retry(&io_req, 1, ®ion, false);
cache->last_flushed_segment_id = seg->global_id;
complete_all(&seg->flush_done);
complete_all(&job->rambuf->done);
/*
* Deferred ACK
*/
if (!bio_list_empty(&job->barrier_ios)) {
struct bio *bio;
blkdev_issue_flush(cache->device->bdev, GFP_NOIO, NULL);
while ((bio = bio_list_pop(&job->barrier_ios)))
bio_endio(bio, 0);
mod_timer(&cache->barrier_deadline_timer,
msecs_to_jiffies(cache->barrier_deadline_ms));
}
kfree(job);
}
}
---------- format-cache.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "format-cache.h"
static int read_superblock_header(struct superblock_header_device *sup,
struct dm_dev *dev)
{
int r = 0;
struct dm_io_request io_req_sup;
struct dm_io_region region_sup;
void *buf = kmalloc(1 << SECTOR_SHIFT, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
io_req_sup = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_sup = (struct dm_io_region) {
.bdev = dev->bdev,
.sector = 0,
.count = 1,
};
r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false);
kfree(buf);
if (r) {
WBERR();
return r;
}
memcpy(sup, buf, sizeof(*sup));
return 0;
}
static int audit_superblock_header(struct superblock_header_device *sup)
{
u32 magic = le32_to_cpu(sup->magic);
if (magic != WRITEBOOST_MAGIC) {
WBERR();
return -EINVAL;
}
return 0;
}
/*
* Check if the cache device is already formatted.
* Returns 0 iff this routine runs without failure.
* cache_valid is stored true iff the cache device
* is formatted and needs not to be re-fomatted.
*/
int __must_check audit_cache_device(struct dm_dev *dev,
bool *cache_valid)
{
int r = 0;
struct superblock_header_device sup;
r = read_superblock_header(&sup, dev);
if (r)
return r;
*cache_valid = audit_superblock_header(&sup) ? false : true;
return r;
}
static int format_superblock_header(struct dm_dev *dev)
{
int r = 0;
struct dm_io_request io_req_sup;
struct dm_io_region region_sup;
struct superblock_header_device sup = {
.magic = cpu_to_le32(WRITEBOOST_MAGIC),
};
void *buf = kzalloc(1 << SECTOR_SHIFT, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
memcpy(buf, &sup, sizeof(sup));
io_req_sup = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_sup = (struct dm_io_region) {
.bdev = dev->bdev,
.sector = 0,
.count = 1,
};
r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false);
kfree(buf);
if (r) {
WBERR();
return r;
}
return 0;
}
struct format_segmd_context {
int err;
atomic64_t count;
};
static void format_segmd_endio(unsigned long error, void *__context)
{
struct format_segmd_context *context = __context;
if (error)
context->err = 1;
atomic64_dec(&context->count);
}
/*
* Format superblock header and
* all the metadata regions over the cache device.
*/
int __must_check format_cache_device(struct dm_dev *dev)
{
u64 i, nr_segments = calc_nr_segments(dev);
struct format_segmd_context context;
struct dm_io_request io_req_sup;
struct dm_io_region region_sup;
void *buf;
int r = 0;
/*
* Zeroing the full superblock
*/
buf = kzalloc(1 << 20, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
io_req_sup = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_sup = (struct dm_io_region) {
.bdev = dev->bdev,
.sector = 0,
.count = (1 << 11),
};
r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false);
kfree(buf);
if (r) {
WBERR();
return r;
}
format_superblock_header(dev);
/* Format the metadata regions */
/*
* Count the number of segments
*/
atomic64_set(&context.count, nr_segments);
context.err = 0;
buf = kzalloc(1 << 12, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
/*
* Submit all the writes asynchronously.
*/
for (i = 0; i < nr_segments; i++) {
struct dm_io_request io_req_seg = {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = format_segmd_endio,
.notify.context = &context,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
struct dm_io_region region_seg = {
.bdev = dev->bdev,
.sector = calc_segment_header_start(i),
.count = (1 << 3),
};
r = dm_safe_io(&io_req_seg, 1, ®ion_seg, NULL, false);
if (r) {
WBERR();
break;
}
}
kfree(buf);
if (r) {
WBERR();
return r;
}
/*
* Wait for all the writes complete.
*/
while (atomic64_read(&context.count))
schedule_timeout_interruptible(msecs_to_jiffies(100));
if (context.err) {
WBERR();
return -EIO;
}
return blkdev_issue_flush(dev->bdev, GFP_KERNEL, NULL);
}
---------- handle-io.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "handle-io.h"
void inc_nr_dirty_caches(struct wb_device *wb)
{
BUG_ON(!wb);
atomic64_inc(&wb->nr_dirty_caches);
}
static void dec_nr_dirty_caches(struct wb_device *wb)
{
BUG_ON(!wb);
atomic64_dec(&wb->nr_dirty_caches);
}
void cleanup_mb_if_dirty(struct wb_cache *cache,
struct segment_header *seg,
struct metablock *mb)
{
unsigned long flags;
bool b = false;
lockseg(seg, flags);
if (mb->dirty_bits) {
mb->dirty_bits = 0;
b = true;
}
unlockseg(seg, flags);
if (b)
dec_nr_dirty_caches(cache->wb);
}
u8 atomic_read_mb_dirtiness(struct segment_header *seg,
struct metablock *mb)
{
unsigned long flags;
u8 r;
lockseg(seg, flags);
r = mb->dirty_bits;
unlockseg(seg, flags);
return r;
}
static void inc_stat(struct wb_cache *cache,
int rw, bool found, bool on_buffer, bool fullsize)
{
atomic64_t *v;
int i = 0;
if (rw)
i |= (1 << STAT_WRITE);
if (found)
i |= (1 << STAT_HIT);
if (on_buffer)
i |= (1 << STAT_ON_BUFFER);
if (fullsize)
i |= (1 << STAT_FULLSIZE);
v = &cache->stat[i];
atomic64_inc(v);
}
void clear_stat(struct wb_cache *cache)
{
int i;
for (i = 0; i < STATLEN; i++) {
atomic64_t *v = &cache->stat[i];
atomic64_set(v, 0);
}
}
/*
* Migrate a data on the cache device
*/
static void migrate_mb(struct wb_cache *cache, struct segment_header *seg,
struct metablock *mb, u8 dirty_bits, bool thread)
{
struct wb_device *wb = cache->wb;
if (!dirty_bits)
return;
if (dirty_bits == 255) {
void *buf = kmalloc_retry(1 << 12, GFP_NOIO);
struct dm_io_request io_req_r, io_req_w;
struct dm_io_region region_r, region_w;
io_req_r = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_r = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = calc_mb_start_sector(seg, mb->idx),
.count = (1 << 3),
};
dm_safe_io_retry(&io_req_r, 1, ®ion_r, thread);
io_req_w = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_w = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = mb->sector,
.count = (1 << 3),
};
dm_safe_io_retry(&io_req_w, 1, ®ion_w, thread);
kfree(buf);
} else {
void *buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO);
size_t i;
for (i = 0; i < 8; i++) {
bool bit_on = dirty_bits & (1 << i);
struct dm_io_request io_req_r, io_req_w;
struct dm_io_region region_r, region_w;
sector_t src;
if (!bit_on)
continue;
io_req_r = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
/* A tmp variable just to avoid 80 cols rule */
src = calc_mb_start_sector(seg, mb->idx) + i;
region_r = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = src,
.count = 1,
};
dm_safe_io_retry(&io_req_r, 1, ®ion_r, thread);
io_req_w = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region_w = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = mb->sector + 1 * i,
.count = 1,
};
dm_safe_io_retry(&io_req_w, 1, ®ion_w, thread);
}
kfree(buf);
}
}
/*
* Migrate the cache on the RAM buffer.
* Calling this function is really rare.
*/
static void migrate_buffered_mb(struct wb_cache *cache,
struct metablock *mb, u8 dirty_bits)
{
struct wb_device *wb = cache->wb;
u8 i, k = 1 + (mb->idx % NR_CACHES_INSEG);
sector_t offset = (k << 3);
void *buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO);
for (i = 0; i < 8; i++) {
struct dm_io_request io_req;
struct dm_io_region region;
void *src;
sector_t dest;
bool bit_on = dirty_bits & (1 << i);
if (!bit_on)
continue;
src = cache->current_rambuf->data +
((offset + i) << SECTOR_SHIFT);
memcpy(buf, src, 1 << SECTOR_SHIFT);
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
dest = mb->sector + 1 * i;
region = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = dest,
.count = 1,
};
dm_safe_io_retry(&io_req, 1, ®ion, true);
}
kfree(buf);
}
static void bio_remap(struct bio *bio, struct dm_dev *dev, sector_t sector)
{
bio->bi_bdev = dev->bdev;
bio->bi_sector = sector;
}
static sector_t calc_cache_alignment(struct wb_cache *cache,
sector_t bio_sector)
{
return (bio_sector / (1 << 3)) * (1 << 3);
}
int writeboost_map(struct dm_target *ti, struct bio *bio
#if LINUX_VERSION_CODE < PER_BIO_VERSION
, union map_info *map_context
#endif
)
{
unsigned long flags;
struct segment_header *uninitialized_var(seg);
struct metablock *mb, *new_mb;
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
struct per_bio_data *map_context;
#endif
sector_t bio_count, bio_offset, s;
bool bio_fullsize, found, on_buffer,
refresh_segment, b;
int rw;
struct lookup_key key;
struct ht_head *head;
cache_nr update_mb_idx, idx_inseg, k;
size_t start;
void *data;
struct wb_device *wb = ti->private;
struct wb_cache *cache = wb->cache;
struct dm_dev *orig = wb->device;
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
map_context = dm_per_bio_data(bio, ti->per_bio_data_size);
#endif
map_context->ptr = NULL;
/*
* We only discard only the backing store because
* blocks on cache device are unlikely to be discarded.
*
* Discarding blocks is likely to be operated
* long after writing;
* the block is likely to be migrated before.
* Moreover,
* we discard the segment at the end of migration
* and that's enough for discarding blocks.
*/
if (bio->bi_rw & REQ_DISCARD) {
bio_remap(bio, orig, bio->bi_sector);
return DM_MAPIO_REMAPPED;
}
/*
* defered ACK for barrier writes
*
* bio with REQ_FLUSH is guaranteed
* to have no data.
* So, simply queue it and return.
*/
if (bio->bi_rw & REQ_FLUSH) {
BUG_ON(bio->bi_size);
queue_barrier_io(cache, bio);
return DM_MAPIO_SUBMITTED;
}
bio_count = bio->bi_size >> SECTOR_SHIFT;
bio_fullsize = (bio_count == (1 << 3));
bio_offset = bio->bi_sector % (1 << 3);
rw = bio_data_dir(bio);
key = (struct lookup_key) {
.sector = calc_cache_alignment(cache, bio->bi_sector),
};
k = ht_hash(cache, &key);
head = bigarray_at(cache->htable, k);
/*
* (Locking)
* Why mutex?
*
* The reason we use mutex instead of rw_semaphore
* that can allow truely concurrent read access
* is that mutex is even lighter than rw_semaphore.
* Since dm-writebuffer is a real performance centric software
* the overhead of rw_semaphore is crucial.
* All in all,
* since exclusive region in read path is enough small
* and cheap, using rw_semaphore and let the reads
* execute concurrently won't improve the performance
* as much as one expects.
*/
mutex_lock(&cache->io_lock);
mb = ht_lookup(cache, head, &key);
if (mb) {
seg = ((void *) mb) - (mb->idx % NR_CACHES_INSEG) *
sizeof(struct metablock);
atomic_inc(&seg->nr_inflight_ios);
}
found = (mb != NULL);
on_buffer = false;
if (found)
on_buffer = is_on_buffer(cache, mb->idx);
inc_stat(cache, rw, found, on_buffer, bio_fullsize);
if (!rw) {
u8 dirty_bits;
mutex_unlock(&cache->io_lock);
if (!found) {
bio_remap(bio, orig, bio->bi_sector);
return DM_MAPIO_REMAPPED;
}
dirty_bits = atomic_read_mb_dirtiness(seg, mb);
if (unlikely(on_buffer)) {
if (dirty_bits)
migrate_buffered_mb(cache, mb, dirty_bits);
/*
* Cache class
* Live and Stable
*
* Live:
* The cache is on the RAM buffer.
*
* Stable:
* The cache is not on the RAM buffer
* but at least queued in flush_queue.
*/
/*
* (Locking)
* Dirtiness of a live cache
*
* We can assume dirtiness of a cache only increase
* when it is on the buffer, we call this cache is live.
* This eases the locking because
* we don't worry the dirtiness of
* a live cache fluctuates.
*/
atomic_dec(&seg->nr_inflight_ios);
bio_remap(bio, orig, bio->bi_sector);
return DM_MAPIO_REMAPPED;
}
wait_for_completion(&seg->flush_done);
if (likely(dirty_bits == 255)) {
bio_remap(bio,
cache->device,
calc_mb_start_sector(seg, mb->idx)
+ bio_offset);
map_context->ptr = seg;
} else {
/*
* (Locking)
* Dirtiness of a stable cache
*
* Unlike the live caches that don't
* fluctuate the dirtiness,
* stable caches which are not on the buffer
* but on the cache device
* may decrease the dirtiness by other processes
* than the migrate daemon.
* This works fine
* because migrating the same cache twice
* doesn't craze the cache concistency.
*/
migrate_mb(cache, seg, mb, dirty_bits, true);
cleanup_mb_if_dirty(cache, seg, mb);
atomic_dec(&seg->nr_inflight_ios);
bio_remap(bio, orig, bio->bi_sector);
}
return DM_MAPIO_REMAPPED;
}
if (found) {
if (unlikely(on_buffer)) {
mutex_unlock(&cache->io_lock);
update_mb_idx = mb->idx;
goto write_on_buffer;
} else {
u8 dirty_bits = atomic_read_mb_dirtiness(seg, mb);
/*
* First clean up the previous cache
* and migrate the cache if needed.
*/
bool needs_cleanup_prev_cache =
!bio_fullsize || !(dirty_bits == 255);
if (unlikely(needs_cleanup_prev_cache)) {
wait_for_completion(&seg->flush_done);
migrate_mb(cache, seg, mb, dirty_bits, true);
}
/*
* Fullsize dirty cache
* can be discarded without migration.
*/
cleanup_mb_if_dirty(cache, seg, mb);
ht_del(cache, mb);
atomic_dec(&seg->nr_inflight_ios);
goto write_not_found;
}
}
write_not_found:
;
/*
* If cache->cursor is 254, 509, ...
* that is the last cache line in the segment.
* We must flush the current segment and
* get the new one.
*/
refresh_segment = !((cache->cursor + 1) % NR_CACHES_INSEG);
if (refresh_segment)
queue_current_buffer(cache);
cache->cursor = (cache->cursor + 1) % cache->nr_caches;
/*
* update_mb_idx is the cache line index to update.
*/
update_mb_idx = cache->cursor;
seg = cache->current_seg;
atomic_inc(&seg->nr_inflight_ios);
new_mb = seg->mb_array + (update_mb_idx % NR_CACHES_INSEG);
new_mb->dirty_bits = 0;
ht_register(cache, head, &key, new_mb);
mutex_unlock(&cache->io_lock);
mb = new_mb;
write_on_buffer:
;
idx_inseg = update_mb_idx % NR_CACHES_INSEG;
s = (idx_inseg + 1) << 3;
b = false;
lockseg(seg, flags);
if (!mb->dirty_bits) {
seg->length++;
BUG_ON(seg->length > NR_CACHES_INSEG);
b = true;
}
if (likely(bio_fullsize)) {
mb->dirty_bits = 255;
} else {
u8 i;
u8 acc_bits = 0;
s += bio_offset;
for (i = bio_offset; i < (bio_offset+bio_count); i++)
acc_bits += (1 << i);
mb->dirty_bits |= acc_bits;
}
BUG_ON(!mb->dirty_bits);
unlockseg(seg, flags);
if (b)
inc_nr_dirty_caches(wb);
start = s << SECTOR_SHIFT;
data = bio_data(bio);
memcpy(cache->current_rambuf->data + start, data, bio->bi_size);
atomic_dec(&seg->nr_inflight_ios);
/*
* deferred ACK for barrier writes
*
* bio with REQ_FUA flag has data.
* So, we run through the path for the
* ordinary bio. And the data is
* now stored in the RAM buffer.
* After that, queue it and return
* to defer completion.
*/
if (bio->bi_rw & REQ_FUA) {
queue_barrier_io(cache, bio);
return DM_MAPIO_SUBMITTED;
}
bio_endio(bio, 0);
return DM_MAPIO_SUBMITTED;
}
int writeboost_end_io(struct dm_target *ti, struct bio *bio, int error
#if LINUX_VERSION_CODE < PER_BIO_VERSION
, union map_info *map_context
#endif
)
{
struct segment_header *seg;
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
struct per_bio_data *map_context =
dm_per_bio_data(bio, ti->per_bio_data_size);
#endif
if (!map_context->ptr)
return 0;
seg = map_context->ptr;
atomic_dec(&seg->nr_inflight_ios);
return 0;
}
---------- hashtable.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "hashtable.h"
/*
* Initialize the Hash Table.
*/
int __must_check ht_empty_init(struct wb_cache *cache)
{
cache_nr idx;
size_t i;
size_t nr_heads;
struct bigarray *arr;
cache->htsize = cache->nr_caches;
nr_heads = cache->htsize + 1;
arr = make_bigarray(sizeof(struct ht_head), nr_heads);
if (!arr) {
WBERR();
return -ENOMEM;
}
cache->htable = arr;
for (i = 0; i < nr_heads; i++) {
struct ht_head *hd = bigarray_at(arr, i);
INIT_HLIST_HEAD(&hd->ht_list);
}
/*
* Our hashtable has one special bucket called null head.
* Orphan metablocks are linked to the null head.
*/
cache->null_head = bigarray_at(cache->htable, cache->htsize);
for (idx = 0; idx < cache->nr_caches; idx++) {
struct metablock *mb = mb_at(cache, idx);
hlist_add_head(&mb->ht_list, &cache->null_head->ht_list);
}
return 0;
}
cache_nr ht_hash(struct wb_cache *cache, struct lookup_key *key)
{
return key->sector % cache->htsize;
}
static bool mb_hit(struct metablock *mb, struct lookup_key *key)
{
return mb->sector == key->sector;
}
void ht_del(struct wb_cache *cache, struct metablock *mb)
{
struct ht_head *null_head;
hlist_del(&mb->ht_list);
null_head = cache->null_head;
hlist_add_head(&mb->ht_list, &null_head->ht_list);
}
void ht_register(struct wb_cache *cache, struct ht_head *head,
struct lookup_key *key, struct metablock *mb)
{
hlist_del(&mb->ht_list);
hlist_add_head(&mb->ht_list, &head->ht_list);
mb->sector = key->sector;
};
struct metablock *ht_lookup(struct wb_cache *cache,
struct ht_head *head,
struct lookup_key *key)
{
struct metablock *mb, *found = NULL;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 9, 0)
hlist_for_each_entry(mb, &head->ht_list, ht_list)
#else
struct hlist_node *pos;
hlist_for_each_entry(mb, pos, &head->ht_list, ht_list)
#endif
{
if (mb_hit(mb, key)) {
found = mb;
break;
}
}
return found;
}
/*
* Discard all the metablock in a segment.
*/
void discard_caches_inseg(struct wb_cache *cache,
struct segment_header *seg)
{
u8 i;
for (i = 0; i < NR_CACHES_INSEG; i++) {
struct metablock *mb = seg->mb_array + i;
ht_del(cache, mb);
}
}
---------- migrate-daemon.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "migrate-daemon.h"
static void migrate_endio(unsigned long error, void *context)
{
struct wb_cache *cache = context;
if (error)
atomic_inc(&cache->migrate_fail_count);
if (atomic_dec_and_test(&cache->migrate_io_count))
wake_up_interruptible(&cache->migrate_wait_queue);
}
/*
* Submit the segment data at position k
* in migrate buffer.
* Batched migration first gather all the segments
* to migrate into a migrate buffer.
* So, there are a number of segment data
* in the buffer.
* This function submits the one in position k.
*/
static void submit_migrate_io(struct wb_cache *cache,
struct segment_header *seg, size_t k)
{
u8 i, j;
size_t a = NR_CACHES_INSEG * k;
void *p = cache->migrate_buffer + (NR_CACHES_INSEG << 12) * k;
for (i = 0; i < seg->length; i++) {
struct metablock *mb = seg->mb_array + i;
struct wb_device *wb = cache->wb;
u8 dirty_bits = *(cache->dirtiness_snapshot + (a + i));
unsigned long offset;
void *base, *addr;
struct dm_io_request io_req_w;
struct dm_io_region region_w;
if (!dirty_bits)
continue;
offset = i << 12;
base = p + offset;
if (dirty_bits == 255) {
addr = base;
io_req_w = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = migrate_endio,
.notify.context = cache,
.mem.type = DM_IO_VMA,
.mem.ptr.vma = addr,
};
region_w = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = mb->sector,
.count = (1 << 3),
};
dm_safe_io_retry(&io_req_w, 1, ®ion_w, false);
} else {
for (j = 0; j < 8; j++) {
bool b = dirty_bits & (1 << j);
if (!b)
continue;
addr = base + (j << SECTOR_SHIFT);
io_req_w = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE,
.notify.fn = migrate_endio,
.notify.context = cache,
.mem.type = DM_IO_VMA,
.mem.ptr.vma = addr,
};
region_w = (struct dm_io_region) {
.bdev = wb->device->bdev,
.sector = mb->sector + j,
.count = 1,
};
dm_safe_io_retry(
&io_req_w, 1, ®ion_w, false);
}
}
}
}
static void memorize_dirty_state(struct wb_cache *cache,
struct segment_header *seg, size_t k,
size_t *migrate_io_count)
{
u8 i, j;
size_t a = NR_CACHES_INSEG * k;
void *p = cache->migrate_buffer + (NR_CACHES_INSEG << 12) * k;
struct metablock *mb;
struct dm_io_request io_req_r = {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_VMA,
.mem.ptr.vma = p,
};
struct dm_io_region region_r = {
.bdev = cache->device->bdev,
.sector = seg->start_sector + (1 << 3),
.count = seg->length << 3,
};
dm_safe_io_retry(&io_req_r, 1, ®ion_r, false);
/*
* We take snapshot of the dirtiness in the segments.
* The snapshot segments
* are dirtier than themselves of any future moment
* and we will migrate the possible dirtiest
* state of the segments
* which won't lose any dirty data that was acknowledged.
*/
for (i = 0; i < seg->length; i++) {
mb = seg->mb_array + i;
*(cache->dirtiness_snapshot + (a + i)) =
atomic_read_mb_dirtiness(seg, mb);
}
for (i = 0; i < seg->length; i++) {
u8 dirty_bits;
mb = seg->mb_array + i;
dirty_bits = *(cache->dirtiness_snapshot + (a + i));
if (!dirty_bits)
continue;
if (dirty_bits == 255) {
(*migrate_io_count)++;
} else {
for (j = 0; j < 8; j++) {
if (dirty_bits & (1 << j))
(*migrate_io_count)++;
}
}
}
}
static void cleanup_segment(struct wb_cache *cache, struct segment_header *seg)
{
u8 i;
for (i = 0; i < seg->length; i++) {
struct metablock *mb = seg->mb_array + i;
cleanup_mb_if_dirty(cache, seg, mb);
}
}
static void migrate_linked_segments(struct wb_cache *cache)
{
struct segment_header *seg;
size_t k, migrate_io_count = 0;
/*
* Memorize the dirty state to migrate before going in.
* - How many migration writes should be submitted atomically,
* - Which cache lines are dirty to migarate
* - etc.
*/
k = 0;
list_for_each_entry(seg, &cache->migrate_list, migrate_list) {
memorize_dirty_state(cache, seg, k, &migrate_io_count);
k++;
}
migrate_write:
atomic_set(&cache->migrate_io_count, migrate_io_count);
atomic_set(&cache->migrate_fail_count, 0);
k = 0;
list_for_each_entry(seg, &cache->migrate_list, migrate_list) {
submit_migrate_io(cache, seg, k);
k++;
}
wait_event_interruptible(cache->migrate_wait_queue,
atomic_read(&cache->migrate_io_count) == 0);
if (atomic_read(&cache->migrate_fail_count)) {
WBWARN("%u writebacks failed. retry.",
atomic_read(&cache->migrate_fail_count));
goto migrate_write;
}
BUG_ON(atomic_read(&cache->migrate_io_count));
list_for_each_entry(seg, &cache->migrate_list, migrate_list) {
cleanup_segment(cache, seg);
}
/*
* The segment may have a block
* that returns ACK for persistent write
* on the cache device.
* Migrating them in non-persistent way
* is betrayal to the client
* who received the ACK and
* expects the data is persistent.
* Since it is difficult to know
* whether a cache in a segment
* is of that status
* we are on the safe side
* on this issue by always
* migrating those data persistently.
*/
blkdev_issue_flush(cache->wb->device->bdev, GFP_NOIO, NULL);
/*
* Discarding the migrated regions
* can avoid unnecessary wear amplifier in the future.
*
* But note that we should not discard
* the metablock region because
* whether or not to ensure
* the discarded block returns certain value
* is depends on venders
* and unexpected metablock data
* will craze the cache.
*/
list_for_each_entry(seg, &cache->migrate_list, migrate_list) {
blkdev_issue_discard(cache->device->bdev,
seg->start_sector + (1 << 3),
seg->length << 3,
GFP_NOIO, 0);
}
}
void migrate_proc(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache, migrate_work);
while (true) {
bool allow_migrate;
size_t i, nr_mig_candidates, nr_mig;
struct segment_header *seg, *tmp;
WBINFO();
if (cache->on_terminate)
return;
/*
* If reserving_id > 0
* Migration should be immediate.
*/
allow_migrate = cache->reserving_segment_id ||
cache->allow_migrate;
if (!allow_migrate) {
schedule_timeout_interruptible(msecs_to_jiffies(1000));
continue;
}
nr_mig_candidates = cache->last_flushed_segment_id -
cache->last_migrated_segment_id;
if (!nr_mig_candidates) {
schedule_timeout_interruptible(msecs_to_jiffies(1000));
continue;
}
if (cache->nr_cur_batched_migration !=
cache->nr_max_batched_migration){
vfree(cache->migrate_buffer);
kfree(cache->dirtiness_snapshot);
cache->nr_cur_batched_migration =
cache->nr_max_batched_migration;
cache->migrate_buffer =
vmalloc(cache->nr_cur_batched_migration *
(NR_CACHES_INSEG << 12));
cache->dirtiness_snapshot =
kmalloc_retry(cache->nr_cur_batched_migration *
NR_CACHES_INSEG,
GFP_NOIO);
BUG_ON(!cache->migrate_buffer);
BUG_ON(!cache->dirtiness_snapshot);
}
/*
* Batched Migration:
* We will migrate at most nr_max_batched_migration
* segments at a time.
*/
nr_mig = min(nr_mig_candidates,
cache->nr_cur_batched_migration);
/*
* Add segments to migrate atomically.
*/
for (i = 1; i <= nr_mig; i++) {
seg = get_segment_header_by_id(
cache,
cache->last_migrated_segment_id + i);
list_add_tail(&seg->migrate_list, &cache->migrate_list);
}
migrate_linked_segments(cache);
/*
* (Locking)
* Only line of code changes
* last_migrate_segment_id during runtime.
*/
cache->last_migrated_segment_id += nr_mig;
list_for_each_entry_safe(seg, tmp,
&cache->migrate_list,
migrate_list) {
complete_all(&seg->migrate_done);
list_del(&seg->migrate_list);
}
}
}
void wait_for_migration(struct wb_cache *cache, size_t id)
{
struct segment_header *seg = get_segment_header_by_id(cache, id);
/*
* Set reserving_segment_id to non zero
* to force the migartion daemon
* to complete migarate of this segment
* immediately.
*/
cache->reserving_segment_id = id;
wait_for_completion(&seg->migrate_done);
cache->reserving_segment_id = 0;
}
---------- migrate-modulator.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "migrate-modulator.h"
void modulator_proc(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache, modulator_work);
struct wb_device *wb = cache->wb;
struct hd_struct *hd = wb->device->bdev->bd_part;
unsigned long old = 0, new, util;
unsigned long intvl = 1000;
while (true) {
if (cache->on_terminate)
return;
new = jiffies_to_msecs(part_stat_read(hd, io_ticks));
if (!cache->enable_migration_modulator)
goto modulator_update;
util = (100 * (new - old)) / 1000;
WBINFO("%u", (unsigned) util);
if (util < wb->migrate_threshold)
cache->allow_migrate = true;
else
cache->allow_migrate = false;
modulator_update:
old = new;
schedule_timeout_interruptible(msecs_to_jiffies(intvl));
}
}
---------- queue-flush-job.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "queue-flush-job.h"
static u8 count_dirty_caches_remained(struct segment_header *seg)
{
u8 i, count = 0;
struct metablock *mb;
for (i = 0; i < seg->length; i++) {
mb = seg->mb_array + i;
if (mb->dirty_bits)
count++;
}
return count;
}
/*
* Make a metadata in segment data to flush.
* @dest The metadata part of the segment to flush
*/
static void prepare_segment_header_device(struct segment_header_device *dest,
struct wb_cache *cache,
struct segment_header *src)
{
cache_nr i;
u8 left, right;
dest->global_id = cpu_to_le64(src->global_id);
dest->length = src->length;
dest->lap = cpu_to_le32(calc_segment_lap(cache, src->global_id));
left = src->length - 1;
right = (cache->cursor) % NR_CACHES_INSEG;
BUG_ON(left != right);
for (i = 0; i < src->length; i++) {
struct metablock *mb = src->mb_array + i;
struct metablock_device *mbdev = &dest->mbarr[i];
mbdev->sector = cpu_to_le64(mb->sector);
mbdev->dirty_bits = mb->dirty_bits;
mbdev->lap = cpu_to_le32(dest->lap);
}
}
static void prepare_meta_rambuffer(void *rambuffer,
struct wb_cache *cache,
struct segment_header *seg)
{
prepare_segment_header_device(rambuffer, cache, seg);
}
/*
* Queue the current segment into the queue
* and prepare a new segment.
*/
static void queue_flushing(struct wb_cache *cache)
{
unsigned long flags;
struct segment_header *current_seg = cache->current_seg, *new_seg;
struct flush_job *job;
bool empty;
struct rambuffer *next_rambuf;
size_t n1 = 0, n2 = 0;
u64 next_id;
while (atomic_read(¤t_seg->nr_inflight_ios)) {
n1++;
if (n1 == 100)
WBWARN();
schedule_timeout_interruptible(msecs_to_jiffies(1));
}
prepare_meta_rambuffer(cache->current_rambuf->data, cache,
cache->current_seg);
INIT_COMPLETION(current_seg->migrate_done);
INIT_COMPLETION(current_seg->flush_done);
job = kmalloc_retry(sizeof(*job), GFP_NOIO);
INIT_LIST_HEAD(&job->flush_queue);
job->seg = current_seg;
job->rambuf = cache->current_rambuf;
bio_list_init(&job->barrier_ios);
bio_list_merge(&job->barrier_ios, &cache->barrier_ios);
bio_list_init(&cache->barrier_ios);
spin_lock_irqsave(&cache->flush_queue_lock, flags);
empty = list_empty(&cache->flush_queue);
list_add_tail(&job->flush_queue, &cache->flush_queue);
spin_unlock_irqrestore(&cache->flush_queue_lock, flags);
if (empty)
wake_up_interruptible(&cache->flush_wait_queue);
next_id = current_seg->global_id + 1;
new_seg = get_segment_header_by_id(cache, next_id);
new_seg->global_id = next_id;
while (atomic_read(&new_seg->nr_inflight_ios)) {
n2++;
if (n2 == 100)
WBWARN();
schedule_timeout_interruptible(msecs_to_jiffies(1));
}
BUG_ON(count_dirty_caches_remained(new_seg));
discard_caches_inseg(cache, new_seg);
/*
* Set the cursor to the last of the flushed segment.
*/
cache->cursor = current_seg->start_idx + (NR_CACHES_INSEG - 1);
new_seg->length = 0;
next_rambuf = cache->rambuf_pool + (next_id % NR_RAMBUF_POOL);
wait_for_completion(&next_rambuf->done);
INIT_COMPLETION(next_rambuf->done);
cache->current_rambuf = next_rambuf;
cache->current_seg = new_seg;
}
void queue_current_buffer(struct wb_cache *cache)
{
/*
* Before we get the next segment
* we must wait until the segment is all clean.
* A clean segment doesn't have
* log to flush and dirties to migrate.
*/
u64 next_id = cache->current_seg->global_id + 1;
struct segment_header *next_seg =
get_segment_header_by_id(cache, next_id);
wait_for_completion(&next_seg->flush_done);
wait_for_migration(cache, next_id);
queue_flushing(cache);
}
/*
* flush all the dirty data at a moment
* but NOT persistently.
* Clean up the writes before termination
* is an example of the usecase.
*/
void flush_current_buffer(struct wb_cache *cache)
{
struct segment_header *old_seg;
mutex_lock(&cache->io_lock);
old_seg = cache->current_seg;
queue_current_buffer(cache);
cache->cursor = (cache->cursor + 1) % cache->nr_caches;
cache->current_seg->length = 1;
mutex_unlock(&cache->io_lock);
wait_for_completion(&old_seg->flush_done);
}
---------- rambuf.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "rambuf.h"
int __must_check init_rambuf_pool(struct wb_cache *cache)
{
size_t i, j;
struct rambuffer *rambuf;
cache->rambuf_pool = kmalloc(sizeof(struct rambuffer) * NR_RAMBUF_POOL,
GFP_KERNEL);
if (!cache->rambuf_pool) {
WBERR();
return -ENOMEM;
}
for (i = 0; i < NR_RAMBUF_POOL; i++) {
rambuf = cache->rambuf_pool + i;
init_completion(&rambuf->done);
complete_all(&rambuf->done);
rambuf->data = kmalloc(
1 << (WB_SEGMENTSIZE_ORDER + SECTOR_SHIFT),
GFP_KERNEL);
if (!rambuf->data) {
WBERR();
for (j = 0; j < i; j++) {
rambuf = cache->rambuf_pool + j;
kfree(rambuf->data);
}
kfree(cache->rambuf_pool);
return -ENOMEM;
}
}
return 0;
}
void free_rambuf_pool(struct wb_cache *cache)
{
struct rambuffer *rambuf;
size_t i;
for (i = 0; i < NR_RAMBUF_POOL; i++) {
rambuf = cache->rambuf_pool + i;
kfree(rambuf->data);
}
kfree(cache->rambuf_pool);
}
---------- recover.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "recover.h"
static int __must_check
read_superblock_record(struct superblock_record_device *record,
struct wb_cache *cache)
{
int r = 0;
struct dm_io_request io_req;
struct dm_io_region region;
void *buf = kmalloc(1 << SECTOR_SHIFT, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = (1 << 11) - 1,
.count = 1,
};
r = dm_safe_io(&io_req, 1, ®ion, NULL, true);
kfree(buf);
if (r) {
WBERR();
return r;
}
memcpy(record, buf, sizeof(*record));
return r;
}
static int __must_check
read_segment_header_device(struct segment_header_device *dest,
struct wb_cache *cache, size_t segment_idx)
{
int r = 0;
struct dm_io_request io_req;
struct dm_io_region region;
void *buf = kmalloc(1 << 12, GFP_KERNEL);
if (!buf) {
WBERR();
return -ENOMEM;
}
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = READ,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = calc_segment_header_start(segment_idx),
.count = (1 << 3),
};
r = dm_safe_io(&io_req, 1, ®ion, NULL, false);
kfree(buf);
if (r) {
WBERR();
return r;
}
memcpy(dest, buf, sizeof(*dest));
return r;
}
/*
* Read the on-disk metadata of the segment
* and update the in-core cache metadata structure
* like Hash Table.
*/
static void update_by_segment_header_device(struct wb_cache *cache,
struct segment_header_device *src)
{
cache_nr i;
struct segment_header *seg =
get_segment_header_by_id(cache, src->global_id);
seg->length = src->length;
INIT_COMPLETION(seg->migrate_done);
for (i = 0 ; i < src->length; i++) {
cache_nr k;
struct lookup_key key;
struct ht_head *head;
struct metablock *found, *mb = seg->mb_array + i;
struct metablock_device *mbdev = &src->mbarr[i];
if (!mbdev->dirty_bits)
continue;
mb->sector = le64_to_cpu(mbdev->sector);
mb->dirty_bits = mbdev->dirty_bits;
inc_nr_dirty_caches(cache->wb);
key = (struct lookup_key) {
.sector = mb->sector,
};
k = ht_hash(cache, &key);
head = bigarray_at(cache->htable, k);
found = ht_lookup(cache, head, &key);
if (found)
ht_del(cache, found);
ht_register(cache, head, &key, mb);
}
}
/*
* If only if the lap attributes
* are the same between header and all the metablock,
* the segment is judged to be flushed correctly
* and then merge into the runtime structure.
* Otherwise, ignored.
*/
static bool checkup_atomicity(struct segment_header_device *header)
{
u8 i;
u32 a = le32_to_cpu(header->lap), b;
for (i = 0; i < header->length; i++) {
struct metablock_device *o;
o = header->mbarr + i;
b = le32_to_cpu(o->lap);
if (a != b)
return false;
}
return true;
}
int __must_check recover_cache(struct wb_cache *cache)
{
int r = 0;
struct segment_header_device *header;
struct segment_header *seg;
u64 i, j,
max_id, oldest_id, last_flushed_id, init_segment_id,
oldest_idx, nr_segments = cache->nr_segments,
header_id, record_id;
struct superblock_record_device uninitialized_var(record);
r = read_superblock_record(&record, cache);
if (r) {
WBERR();
return r;
}
WBINFO("%llu", record.last_migrated_segment_id);
record_id = le64_to_cpu(record.last_migrated_segment_id);
WBINFO("%llu", record_id);
header = kmalloc(sizeof(*header), GFP_KERNEL);
if (!header) {
WBERR();
return -ENOMEM;
}
/*
* Finding the oldest, non-zero id and its index.
*/
max_id = SZ_MAX;
oldest_id = max_id;
oldest_idx = 0;
for (i = 0; i < nr_segments; i++) {
r = read_segment_header_device(header, cache, i);
if (r) {
WBERR();
kfree(header);
return r;
}
header_id = le64_to_cpu(header->global_id);
if (header_id < 1)
continue;
if (header_id < oldest_id) {
oldest_idx = i;
oldest_id = header_id;
}
}
last_flushed_id = 0;
/*
* This is an invariant.
* We always start from the segment
* that is right after the last_flush_id.
*/
init_segment_id = last_flushed_id + 1;
/*
* If no segment was flushed
* then there is nothing to recover.
*/
if (oldest_id == max_id)
goto setup_init_segment;
/*
* What we have to do in the next loop is to
* revive the segments that are
* flushed but yet not migrated.
*/
/*
* Example:
* There are only 5 segments.
* The segments we will consider are of id k+2 and k+3
* because they are dirty but not migrated.
*
* id: [ k+3 ][ k+4 ][ k ][ k+1 ][ K+2 ]
* last_flushed init_seg migrated last_migrated flushed
*/
for (i = oldest_idx; i < (nr_segments + oldest_idx); i++) {
j = i % nr_segments;
r = read_segment_header_device(header, cache, j);
if (r) {
WBERR();
kfree(header);
return r;
}
header_id = le64_to_cpu(header->global_id);
/*
* Valid global_id > 0.
* We encounter header with global_id = 0 and
* we can consider
* this and the followings are all invalid.
*/
if (header_id <= last_flushed_id)
break;
if (!checkup_atomicity(header)) {
WBWARN("header atomicity broken id %llu",
header_id);
break;
}
/*
* Now the header is proven valid.
*/
last_flushed_id = header_id;
init_segment_id = last_flushed_id + 1;
/*
* If the data is already on the backing store,
* we ignore the segment.
*/
if (header_id <= record_id)
continue;
update_by_segment_header_device(cache, header);
}
setup_init_segment:
kfree(header);
seg = get_segment_header_by_id(cache, init_segment_id);
seg->global_id = init_segment_id;
atomic_set(&seg->nr_inflight_ios, 0);
cache->last_flushed_segment_id = seg->global_id - 1;
cache->last_migrated_segment_id =
cache->last_flushed_segment_id > cache->nr_segments ?
cache->last_flushed_segment_id - cache->nr_segments : 0;
if (record_id > cache->last_migrated_segment_id)
cache->last_migrated_segment_id = record_id;
WBINFO("%llu", cache->last_migrated_segment_id);
wait_for_migration(cache, seg->global_id);
discard_caches_inseg(cache, seg);
/*
* cursor is set to the first element of the segment.
* This means that we will not use the element.
*/
cache->cursor = seg->start_idx;
seg->length = 1;
cache->current_seg = seg;
return 0;
}
---------- segment.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "segment.h"
/*
* Get the in-core metablock of the given index.
*/
struct metablock *mb_at(struct wb_cache *cache, cache_nr idx)
{
u64 seg_idx = idx / NR_CACHES_INSEG;
struct segment_header *seg =
bigarray_at(cache->segment_header_array, seg_idx);
cache_nr idx_inseg = idx % NR_CACHES_INSEG;
return seg->mb_array + idx_inseg;
}
static void mb_array_empty_init(struct wb_cache *cache)
{
size_t i;
for (i = 0; i < cache->nr_caches; i++) {
struct metablock *mb = mb_at(cache, i);
INIT_HLIST_NODE(&mb->ht_list);
mb->idx = i;
mb->dirty_bits = 0;
}
}
int __must_check init_segment_header_array(struct wb_cache *cache)
{
u64 segment_idx, nr_segments = cache->nr_segments;
cache->segment_header_array =
make_bigarray(sizeof(struct segment_header), nr_segments);
if (!cache->segment_header_array) {
WBERR();
return -ENOMEM;
}
for (segment_idx = 0; segment_idx < nr_segments; segment_idx++) {
struct segment_header *seg =
bigarray_at(cache->segment_header_array, segment_idx);
seg->start_idx = NR_CACHES_INSEG * segment_idx;
seg->start_sector =
((segment_idx % nr_segments) + 1) *
(1 << WB_SEGMENTSIZE_ORDER);
seg->length = 0;
atomic_set(&seg->nr_inflight_ios, 0);
spin_lock_init(&seg->lock);
INIT_LIST_HEAD(&seg->migrate_list);
init_completion(&seg->flush_done);
complete_all(&seg->flush_done);
init_completion(&seg->migrate_done);
complete_all(&seg->migrate_done);
}
mb_array_empty_init(cache);
return 0;
}
/*
* Get the segment from the segment id.
* The Index of the segment is calculated from the segment id.
*/
struct segment_header *get_segment_header_by_id(struct wb_cache *cache,
size_t segment_id)
{
struct segment_header *r =
bigarray_at(cache->segment_header_array,
(segment_id - 1) % cache->nr_segments);
return r;
}
u32 calc_segment_lap(struct wb_cache *cache, size_t segment_id)
{
u32 a = (segment_id - 1) / cache->nr_segments;
return a + 1;
};
sector_t calc_mb_start_sector(struct segment_header *seg,
cache_nr mb_idx)
{
size_t k = 1 + (mb_idx % NR_CACHES_INSEG);
return seg->start_sector + (k << 3);
}
sector_t calc_segment_header_start(size_t segment_idx)
{
return (1 << WB_SEGMENTSIZE_ORDER) * (segment_idx + 1);
}
u64 calc_nr_segments(struct dm_dev *dev)
{
sector_t devsize = dm_devsize(dev);
return devsize / (1 << WB_SEGMENTSIZE_ORDER) - 1;
}
bool is_on_buffer(struct wb_cache *cache, cache_nr mb_idx)
{
cache_nr start = cache->current_seg->start_idx;
if (mb_idx < start)
return false;
if (mb_idx >= (start + NR_CACHES_INSEG))
return false;
return true;
}
---------- superblock-recorder.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "superblock-recorder.h"
static void update_superblock_record(struct wb_cache *cache)
{
struct superblock_record_device o;
void *buf;
struct dm_io_request io_req;
struct dm_io_region region;
o.last_migrated_segment_id =
cpu_to_le64(cache->last_migrated_segment_id);
buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO | __GFP_ZERO);
memcpy(buf, &o, sizeof(o));
io_req = (struct dm_io_request) {
.client = wb_io_client,
.bi_rw = WRITE_FUA,
.notify.fn = NULL,
.mem.type = DM_IO_KMEM,
.mem.ptr.addr = buf,
};
region = (struct dm_io_region) {
.bdev = cache->device->bdev,
.sector = (1 << 11) - 1,
.count = 1,
};
dm_safe_io_retry(&io_req, 1, ®ion, true);
kfree(buf);
}
void recorder_proc(struct work_struct *work)
{
struct wb_cache *cache =
container_of(work, struct wb_cache, recorder_work);
unsigned long intvl;
while (true) {
if (cache->on_terminate)
return;
/* sec -> ms */
intvl = cache->update_record_interval * 1000;
if (!intvl) {
schedule_timeout_interruptible(msecs_to_jiffies(1000));
continue;
}
WBINFO();
update_superblock_record(cache);
schedule_timeout_interruptible(msecs_to_jiffies(intvl));
}
}
---------- target.c ----------
/*
* writeboost
* Log-structured Caching for Linux
*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "target.h"
/*
* <backing dev> <cache dev>
*/
static int writeboost_ctr(struct dm_target *ti, unsigned int argc, char **argv)
{
int r = 0;
bool cache_valid;
struct wb_device *wb;
struct wb_cache *cache;
struct dm_dev *origdev, *cachedev;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 6, 0)
r = dm_set_target_max_io_len(ti, (1 << 3));
if (r) {
WBERR();
return r;
}
#else
ti->split_io = (1 << 3);
#endif
wb = kzalloc(sizeof(*wb), GFP_KERNEL);
if (!wb) {
WBERR();
return -ENOMEM;
}
/*
* EMC's textbook on storage system says
* storage should keep its disk util less
* than 70%.
*/
wb->migrate_threshold = 70;
atomic64_set(&wb->nr_dirty_caches, 0);
r = dm_get_device(ti, argv[0], dm_table_get_mode(ti->table),
&origdev);
if (r) {
WBERR("%d", r);
goto bad_get_device_orig;
}
wb->device = origdev;
wb->cache = NULL;
if (dm_get_device(ti, argv[1], dm_table_get_mode(ti->table),
&cachedev)) {
WBERR();
goto bad_get_device_cache;
}
r = audit_cache_device(cachedev, &cache_valid);
if (r) {
WBERR("%d", r);
/*
* If something happens in auditing the cache
* such as read io error either go formatting
* or resume it trusting the cache is valid
* are dangerous. So we quit.
*/
goto bad_audit_cache;
}
if (!cache_valid) {
r = format_cache_device(cachedev);
if (r) {
WBERR("%d", r);
goto bad_format_cache;
}
}
cache = kzalloc(sizeof(*cache), GFP_KERNEL);
if (!cache) {
WBERR();
goto bad_alloc_cache;
}
wb->cache = cache;
wb->cache->wb = wb;
r = resume_cache(cache, cachedev);
if (r) {
WBERR("%d", r);
goto bad_resume_cache;
}
wb->ti = ti;
ti->private = wb;
#if LINUX_VERSION_CODE >= PER_BIO_VERSION
ti->per_bio_data_size = sizeof(struct per_bio_data);
#endif
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 9, 0)
ti->num_flush_bios = 1;
ti->num_discard_bios = 1;
#else
ti->num_flush_requests = 1;
ti->num_discard_requests = 1;
#endif
ti->discard_zeroes_data_unsupported = true;
return 0;
bad_resume_cache:
kfree(cache);
bad_alloc_cache:
bad_format_cache:
bad_audit_cache:
dm_put_device(ti, cachedev);
bad_get_device_cache:
dm_put_device(ti, origdev);
bad_get_device_orig:
kfree(wb);
return r;
}
static void writeboost_dtr(struct dm_target *ti)
{
struct wb_device *wb = ti->private;
struct wb_cache *cache = wb->cache;
/*
* Synchronize all the dirty writes
* before termination.
*/
cache->sync_interval = 1;
free_cache(cache);
kfree(cache);
dm_put_device(wb->ti, cache->device);
dm_put_device(ti, wb->device);
ti->private = NULL;
kfree(wb);
}
static int writeboost_message(struct dm_target *ti, unsigned argc, char **argv)
{
struct wb_device *wb = ti->private;
struct wb_cache *cache = wb->cache;
char *cmd = argv[0];
unsigned long tmp;
if (!strcasecmp(cmd, "clear_stat")) {
struct wb_cache *cache = wb->cache;
clear_stat(cache);
return 0;
}
if (kstrtoul(argv[1], 10, &tmp))
return -EINVAL;
if (!strcasecmp(cmd, "allow_migrate")) {
if (tmp > 1)
return -EINVAL;
cache->allow_migrate = tmp;
return 0;
}
if (!strcasecmp(cmd, "enable_migration_modulator")) {
if (tmp > 1)
return -EINVAL;
cache->enable_migration_modulator = tmp;
return 0;
}
if (!strcasecmp(cmd, "barrier_deadline_ms")) {
if (tmp < 1)
return -EINVAL;
cache->barrier_deadline_ms = tmp;
return 0;
}
if (!strcasecmp(cmd, "nr_max_batched_migration")) {
if (tmp < 1)
return -EINVAL;
cache->nr_max_batched_migration = tmp;
return 0;
}
if (!strcasecmp(cmd, "migrate_threshold")) {
wb->migrate_threshold = tmp;
return 0;
}
if (!strcasecmp(cmd, "update_record_interval")) {
cache->update_record_interval = tmp;
return 0;
}
if (!strcasecmp(cmd, "sync_interval")) {
cache->sync_interval = tmp;
return 0;
}
return -EINVAL;
}
static int writeboost_merge(struct dm_target *ti, struct bvec_merge_data *bvm,
struct bio_vec *biovec, int max_size)
{
struct wb_device *wb = ti->private;
struct dm_dev *device = wb->device;
struct request_queue *q = bdev_get_queue(device->bdev);
if (!q->merge_bvec_fn)
return max_size;
bvm->bi_bdev = device->bdev;
return min(max_size, q->merge_bvec_fn(q, bvm, biovec));
}
static int writeboost_iterate_devices(struct dm_target *ti,
iterate_devices_callout_fn fn, void *data)
{
struct wb_device *wb = ti->private;
struct dm_dev *orig = wb->device;
sector_t start = 0;
sector_t len = dm_devsize(orig);
return fn(ti, orig, start, len, data);
}
static void writeboost_io_hints(struct dm_target *ti,
struct queue_limits *limits)
{
blk_limits_io_min(limits, 512);
blk_limits_io_opt(limits, 4096);
}
static
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 8, 0)
void
#else
int
#endif
writeboost_status(
struct dm_target *ti, status_type_t type,
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 6, 0)
unsigned flags,
#endif
char *result,
unsigned maxlen)
{
unsigned int sz = 0;
struct wb_device *wb = ti->private;
struct wb_cache *cache = wb->cache;
size_t i;
switch (type) {
case STATUSTYPE_INFO:
DMEMIT("%llu %llu %llu %llu %llu %u ",
(long long unsigned int)
atomic64_read(&wb->nr_dirty_caches),
(long long unsigned int) cache->nr_segments,
(long long unsigned int) cache->last_migrated_segment_id,
(long long unsigned int) cache->last_flushed_segment_id,
(long long unsigned int) cache->current_seg->global_id,
(unsigned int) cache->cursor);
for (i = 0; i < STATLEN; i++) {
atomic64_t *v;
if (i == (STATLEN-1))
break;
v = &cache->stat[i];
DMEMIT("%lu ", atomic64_read(v));
}
DMEMIT("%d ", 7);
DMEMIT("barrier_deadline_ms %lu ",
cache->barrier_deadline_ms);
DMEMIT("allow_migrate %d ",
cache->allow_migrate ? 1 : 0);
DMEMIT("enable_migration_modulator %d ",
cache->enable_migration_modulator ? 1 : 0);
DMEMIT("migrate_threshold %d ", wb->migrate_threshold);
DMEMIT("nr_cur_batched_migration %lu ",
cache->nr_cur_batched_migration);
DMEMIT("sync_interval %lu ",
cache->sync_interval);
DMEMIT("update_record_interval %lu",
cache->update_record_interval);
break;
case STATUSTYPE_TABLE:
DMEMIT("%s %s", wb->device->name, wb->cache->device->name);
break;
}
#if LINUX_VERSION_CODE < KERNEL_VERSION(3, 8, 0)
return 0;
#endif
}
static struct target_type writeboost_target = {
.name = "writeboost",
.version = {0, 1, 0},
.module = THIS_MODULE,
.map = writeboost_map,
.ctr = writeboost_ctr,
.dtr = writeboost_dtr,
.end_io = writeboost_end_io,
.merge = writeboost_merge,
.message = writeboost_message,
.status = writeboost_status,
.io_hints = writeboost_io_hints,
.iterate_devices = writeboost_iterate_devices,
};
struct dm_io_client *wb_io_client;
struct workqueue_struct *safe_io_wq;
static int __init writeboost_module_init(void)
{
int r = 0;
r = dm_register_target(&writeboost_target);
if (r < 0) {
WBERR("%d", r);
return r;
}
r = -ENOMEM;
safe_io_wq = alloc_workqueue("safeiowq",
WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
if (!safe_io_wq) {
WBERR();
goto bad_wq;
}
wb_io_client = dm_io_client_create();
if (IS_ERR(wb_io_client)) {
WBERR();
r = PTR_ERR(wb_io_client);
goto bad_io_client;
}
return 0;
bad_io_client:
destroy_workqueue(safe_io_wq);
bad_wq:
dm_unregister_target(&writeboost_target);
return r;
}
static void __exit writeboost_module_exit(void)
{
dm_io_client_destroy(wb_io_client);
destroy_workqueue(safe_io_wq);
dm_unregister_target(&writeboost_target);
}
module_init(writeboost_module_init);
module_exit(writeboost_module_exit);
MODULE_AUTHOR("Akira Hayakawa <ruby.wktk@gmail.com>");
MODULE_DESCRIPTION(DM_NAME " writeboost target");
MODULE_LICENSE("GPL");
---------- util.c ----------
/*
* Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@gmail.com>
*
* This file is released under the GPL.
*/
#include "util.h"
void *do_kmalloc_retry(size_t size, gfp_t flags, int lineno)
{
size_t count = 0;
void *p;
retry_alloc:
p = kmalloc(size, flags);
if (!p) {
count++;
WBWARN("L%d size:%lu, count:%lu",
lineno, size, count);
schedule_timeout_interruptible(msecs_to_jiffies(1));
goto retry_alloc;
}
return p;
}
struct safe_io {
struct work_struct work;
int err;
unsigned long err_bits;
struct dm_io_request *io_req;
unsigned num_regions;
struct dm_io_region *regions;
};
static void safe_io_proc(struct work_struct *work)
{
struct safe_io *io = container_of(work, struct safe_io, work);
io->err_bits = 0;
io->err = dm_io(io->io_req, io->num_regions, io->regions,
&io->err_bits);
}
/*
* dm_io wrapper.
* @thread run this operation in other thread to avoid deadlock.
*/
int dm_safe_io_internal(
struct dm_io_request *io_req,
unsigned num_regions, struct dm_io_region *regions,
unsigned long *err_bits, bool thread, int lineno)
{
int err;
dev_t dev;
if (thread) {
struct safe_io io = {
.io_req = io_req,
.regions = regions,
.num_regions = num_regions,
};
INIT_WORK_ONSTACK(&io.work, safe_io_proc);
queue_work(safe_io_wq, &io.work);
flush_work(&io.work);
err = io.err;
if (err_bits)
*err_bits = io.err_bits;
} else {
err = dm_io(io_req, num_regions, regions, err_bits);
}
dev = regions->bdev->bd_dev;
/* dm_io routines permits NULL for err_bits pointer. */
if (err || (err_bits && *err_bits)) {
unsigned long eb;
if (!err_bits)
eb = (~(unsigned long)0);
else
eb = *err_bits;
WBERR("L%d err(%d, %lu), rw(%d), sector(%lu), dev(%u:%u)",
lineno, err, eb,
io_req->bi_rw, regions->sector,
MAJOR(dev), MINOR(dev));
}
return err;
}
void dm_safe_io_retry_internal(
struct dm_io_request *io_req,
unsigned num_regions, struct dm_io_region *regions,
bool thread, int lineno)
{
int err, count = 0;
unsigned long err_bits;
dev_t dev;
retry_io:
err_bits = 0;
err = dm_safe_io_internal(io_req, num_regions, regions, &err_bits,
thread, lineno);
dev = regions->bdev->bd_dev;
if (err || err_bits) {
count++;
WBWARN("L%d count(%d)", lineno, count);
schedule_timeout_interruptible(msecs_to_jiffies(1000));
goto retry_io;
}
if (count) {
WBWARN("L%d rw(%d), sector(%lu), dev(%u:%u)",
lineno,
io_req->bi_rw, regions->sector,
MAJOR(dev), MINOR(dev));
}
}
sector_t dm_devsize(struct dm_dev *dev)
{
return i_size_read(dev->bdev->bd_inode) >> SECTOR_SHIFT;
}
next prev parent reply other threads:[~2013-09-26 1:47 UTC|newest]
Thread overview: 45+ messages / expand[flat|nested] mbox.gz Atom feed top
2013-09-01 11:10 [PATCH] staging: Add dm-writeboost Akira Hayakawa
2013-09-16 21:53 ` Mike Snitzer
2013-09-16 21:53 ` Mike Snitzer
2013-09-16 22:49 ` Dan Carpenter
2013-09-16 22:49 ` Dan Carpenter
2013-09-17 12:41 ` Akira Hayakawa
2013-09-17 12:41 ` Akira Hayakawa
2013-09-17 20:18 ` Mike Snitzer
2013-09-17 20:18 ` Mike Snitzer
2013-09-17 12:43 ` Akira Hayakawa
2013-09-17 12:43 ` Akira Hayakawa
2013-09-17 20:59 ` Mike Snitzer
2013-09-17 20:59 ` Mike Snitzer
2013-09-22 0:09 ` Reworking dm-writeboost [was: Re: staging: Add dm-writeboost] Akira Hayakawa
2013-09-22 0:09 ` Akira Hayakawa
2013-09-24 12:20 ` Akira Hayakawa
2013-09-24 12:20 ` Akira Hayakawa
2013-09-25 17:37 ` Mike Snitzer
2013-09-25 17:37 ` Mike Snitzer
2013-09-26 1:42 ` Akira Hayakawa
2013-09-26 1:47 ` Akira Hayakawa [this message]
2013-09-27 18:35 ` Mike Snitzer
2013-09-27 18:35 ` Mike Snitzer
2013-09-28 11:29 ` Akira Hayakawa
2013-09-28 11:29 ` Akira Hayakawa
2013-09-25 23:03 ` Greg KH
2013-09-25 23:03 ` Greg KH
2013-09-26 3:43 ` Dave Chinner
2013-10-01 8:26 ` Joe Thornber
2013-10-01 8:26 ` Joe Thornber
2013-10-03 0:01 ` Mikulas Patocka
2013-10-03 0:01 ` [dm-devel] " Mikulas Patocka
2013-10-04 2:04 ` Dave Chinner
2013-10-04 2:04 ` Dave Chinner
2013-10-05 7:51 ` Akira Hayakawa
2013-10-07 23:43 ` Dave Chinner
2013-10-08 9:41 ` Christoph Hellwig
2013-10-08 9:41 ` Christoph Hellwig
2013-10-08 10:37 ` Akira Hayakawa
2013-10-08 10:37 ` Akira Hayakawa
2013-10-08 15:29 ` Mike Snitzer
2013-10-09 1:07 ` Akira Hayakawa
2013-10-09 1:07 ` Akira Hayakawa
2013-10-08 10:57 ` [dm-devel] " Akira Hayakawa
2013-10-08 10:57 ` Akira Hayakawa
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=5243922B.6070705@gmail.com \
--to=ruby.wktk@gmail.com \
--cc=agk@redhat.com \
--cc=akpm@linux-foundation.org \
--cc=cesarb@cesarb.net \
--cc=dan.carpenter@oracle.com \
--cc=devel@driverdev.osuosl.org \
--cc=dm-devel@redhat.com \
--cc=ejt@redhat.com \
--cc=gregkh@linuxfoundation.org \
--cc=joe@perches.com \
--cc=linux-kernel@vger.kernel.org \
--cc=m.chehab@samsung.com \
--cc=snitzer@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.