qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration
@ 2015-02-02 11:05 Liang Li
  2015-02-02 11:05 ` [Qemu-devel] [v4 01/13] docs: Add a doc about multiple thread compression Liang Li
                   ` (12 more replies)
  0 siblings, 13 replies; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel; +Cc: quintela, Liang Li, armbru, dgilbert, amit.shah, lcapitulino

This feature can help to reduce the data transferred about 60%, and the
migration time can also be reduced about 70%.

    Summary of changed from v3->v4
    
    -Update the test data in the document
    -Fix some typo errors 
    -Use compressBound instead of MACRO define
    -Optimize the performance when compression co-work with XBZRLE
    -Added some comments
    -Rename some functions and variables
    -Incorrect coding style fix
    -QMP type change for compatibility
    
-- 
1.9.1

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 01/13] docs: Add a doc about multiple thread compression
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-02 11:05 ` [Qemu-devel] [v4 02/13] migration: Add the framework of multi-thread compression Liang Li
                   ` (11 subsequent siblings)
  12 siblings, 0 replies; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Give some details about the multiple thread (de)compression and
how to use it in live migration.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
Reviewed-by: Dr.David Alan Gilbert <dgilbert@redhat.com>
---
 docs/multi-thread-compression.txt | 149 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 149 insertions(+)
 create mode 100644 docs/multi-thread-compression.txt

diff --git a/docs/multi-thread-compression.txt b/docs/multi-thread-compression.txt
new file mode 100644
index 0000000..0d4d212
--- /dev/null
+++ b/docs/multi-thread-compression.txt
@@ -0,0 +1,149 @@
+Use multiple thread (de)compression in live migration
+=====================================================
+Copyright (C) 2015 Intel Corporation
+Author: Liang Li <liang.z.li@intel.com>
+
+This work is licensed under the terms of the GNU GPLv2 or later. See
+the COPYING file in the top-level directory.
+
+Contents:
+=========
+* Introduction
+* When to use
+* Performance
+* Usage
+* TODO
+
+Introduction
+============
+Instead of sending the guest memory directly, this solution will
+compress the RAM page before sending; after receiving, the data will
+be decompressed. Using compression in live migration can help
+to reduce the data transferred about 60%, this is very useful when the
+bandwidth is limited, and the total migration time can also be reduced
+about 70% in a typical case. In addition to this, the VM downtime can be
+reduced about 50%. The benefit depends on data's compressibility in VM.
+
+The process of compression will consume additional CPU cycles, and the
+extra CPU cycles will increase the migration time. On the other hand,
+the amount of data transferred will decrease; this factor can reduce
+the total migration time. If the process of the compression is quick
+enough, then the total migration time can be reduced, and multiple
+thread compression can be used to accelerate the compression process.
+
+The decompression speed of Zlib is at least 4 times as quick as
+compression, if the source and destination CPU have equal speed,
+keeping the compression thread count 4 times the decompression
+thread count can avoid CPU waste.
+
+Compression level can be used to control the compression speed and the
+compression ratio. High compression ratio will take more time, level 0
+stands for no compression, level 1 stands for the best compression
+speed, and level 9 stands for the best compression ratio. Users can
+select a level number between 0 and 9.
+
+
+When to use the multiple thread compression in live migration
+=============================================================
+Compression of data will consume extra CPU cycles; so in a system with
+high overhead of CPU, avoid using this feature. When the network
+bandwidth is very limited and the CPU resource is adequate, use of
+multiple thread compression will be very helpful. If both the CPU and
+the network bandwidth are adequate, use of multiple thread compression
+can still help to reduce the migration time.
+
+Performance
+===========
+Test environment:
+
+CPU: Intel(R) Xeon(R) CPU E5-2680 0 @ 2.70GHz
+Socket Count: 2
+RAM: 128G
+NIC: Intel I350 (10/100/1000Mbps)
+Host OS: CentOS 7 64-bit
+Guest OS: RHEL 6.5 64-bit
+Parameter: qemu-system-x86_64 -enable-kvm -smp 4 -m 4096
+ /share/ia32e_rhel6u5.qcow -monitor stdio
+
+There is no additional application is running on the guest when doing
+the test.
+
+
+Speed limit: 1000Gb/s
+---------------------------------------------------------------
+                    | original  | compress thread: 8
+                    |   way     | decompress thread: 2
+                    |           | compression level: 1
+---------------------------------------------------------------
+total time(msec):   |   3333    |  1833
+---------------------------------------------------------------
+downtime(msec):     |    100    |   27
+---------------------------------------------------------------
+transferred ram(kB):|  363536   | 107819
+---------------------------------------------------------------
+throughput(mbps):   |  893.73   | 482.22
+---------------------------------------------------------------
+total ram(kB):      |  4211524  | 4211524
+---------------------------------------------------------------
+
+There is an application running on the guest which write random numbers
+to RAM block areas periodically.
+
+Speed limit: 1000Gb/s
+---------------------------------------------------------------
+                    | original  | compress thread: 8
+                    |   way     | decompress thread: 2
+                    |           | compression level: 1
+---------------------------------------------------------------
+total time(msec):   |   37369   | 15989
+---------------------------------------------------------------
+downtime(msec):     |    337    |  173
+---------------------------------------------------------------
+transferred ram(kB):|  4274143  | 1699824
+---------------------------------------------------------------
+throughput(mbps):   |  936.99   | 870.95
+---------------------------------------------------------------
+total ram(kB):      |  4211524  | 4211524
+---------------------------------------------------------------
+
+Usage
+=====
+1. Verify both the source and destination QEMU are able
+to support the multiple thread compression migration:
+    {qemu} info_migrate_capabilities
+    {qemu} ... compress: off ...
+
+2. Activate compression on the source:
+    {qemu} migrate_set_capability compress on
+
+3. Set the compression thread count on source:
+    {qemu} migrate_set_parameter compress_threads 12
+
+4. Set the compression level on the source:
+    {qemu} migrate_set_parameter compress_level 1
+
+5. Set the decompression thread count on destination:
+    {qemu} migrate_set_parameter decompress_threads 3
+
+6. Start outgoing migration:
+    {qemu} migrate -d tcp:destination.host:4444
+    {qemu} info migrate
+    Capabilities: ... compress: on
+    ...
+
+The following are the default settings:
+    compress: off
+    compress_threads: 8
+    decompress_threads: 2
+    compress_level: 1 (which means best speed)
+
+So, only the first two steps are required to use the multiple
+thread compression in migration. You can do more if the default
+settings are not appropriate.
+
+TODO
+====
+Some faster (de)compression method such as LZ4 and Quicklz can help
+to reduce the CPU consumption when doing (de)compression. If using
+these faster (de)compression method, less (de)compression threads
+are needed when doing the migration.
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 02/13] migration: Add the framework of multi-thread compression
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
  2015-02-02 11:05 ` [Qemu-devel] [v4 01/13] docs: Add a doc about multiple thread compression Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 10:11   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 03/13] migration: Add the framework of multi-thread decompression Liang Li
                   ` (10 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Add the code to create and destroy the multiple threads those will
be used to do data compression. Left some functions empty to keep
clearness, and the code will be added later.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c                   | 79 ++++++++++++++++++++++++++++++++++++++++++-
 include/migration/migration.h |  9 +++++
 migration/migration.c         | 37 ++++++++++++++++++++
 3 files changed, 124 insertions(+), 1 deletion(-)

diff --git a/arch_init.c b/arch_init.c
index 89c8fa4..1831f1a 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -332,6 +332,68 @@ static uint64_t migration_dirty_pages;
 static uint32_t last_version;
 static bool ram_bulk_stage;
 
+struct CompressParam {
+    /* To be done */
+};
+typedef struct CompressParam CompressParam;
+
+static CompressParam *comp_param;
+static bool quit_thread;
+
+static void *do_data_compress(void *opaque)
+{
+    while (!quit_thread) {
+
+    /* To be done */
+
+    }
+
+    return NULL;
+}
+
+static inline void terminate_compression_threads(void)
+{
+    quit_thread = true;
+
+    /* To be done */
+}
+
+void migrate_compress_threads_join(MigrationState *s)
+{
+    int i, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    terminate_compression_threads();
+    thread_count = migrate_compress_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(s->compress_thread + i);
+    }
+    g_free(s->compress_thread);
+    g_free(comp_param);
+    s->compress_thread = NULL;
+    comp_param = NULL;
+}
+
+void migrate_compress_threads_create(MigrationState *s)
+{
+    int i, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    quit_thread = false;
+    thread_count = migrate_compress_threads();
+    s->compress_thread = g_new0(QemuThread, thread_count);
+    comp_param = g_new0(CompressParam, thread_count);
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_create(s->compress_thread + i, "compress",
+                           do_data_compress, comp_param + i,
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
 /* Update the xbzrle cache to reflect a page that's been sent as all 0.
  * The important thing is that a stale (not-yet-0'd) page be replaced
  * by the new data.
@@ -645,6 +707,16 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
     return bytes_sent;
 }
 
+static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
+                                    ram_addr_t offset, bool last_stage)
+{
+    int bytes_sent = 0;
+
+    /* To be done*/
+
+    return bytes_sent;
+}
+
 /*
  * ram_find_and_save_block: Finds a page to send and sends it to f
  *
@@ -679,7 +751,12 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
                 ram_bulk_stage = false;
             }
         } else {
-            bytes_sent = ram_save_page(f, block, offset, last_stage);
+            if (migrate_use_compression()) {
+                bytes_sent = ram_save_compressed_page(f, block, offset,
+                                                      last_stage);
+            } else {
+                bytes_sent = ram_save_page(f, block, offset, last_stage);
+            }
 
             /* if page is unmodified, continue to the next */
             if (bytes_sent > 0) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3cb5ba8..daf6c81 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -49,6 +49,9 @@ struct MigrationState
     QemuThread thread;
     QEMUBH *cleanup_bh;
     QEMUFile *file;
+    QemuThread *compress_thread;
+    int compress_thread_count;
+    int compress_level;
 
     int state;
     MigrationParams params;
@@ -107,6 +110,8 @@ bool migration_has_finished(MigrationState *);
 bool migration_has_failed(MigrationState *);
 MigrationState *migrate_get_current(void);
 
+void migrate_compress_threads_create(MigrationState *s);
+void migrate_compress_threads_join(MigrationState *s);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_transferred(void);
 uint64_t ram_bytes_total(void);
@@ -156,6 +161,10 @@ int64_t migrate_xbzrle_cache_size(void);
 
 int64_t xbzrle_cache_resize(int64_t new_size);
 
+bool migrate_use_compression(void);
+int migrate_compress_level(void);
+int migrate_compress_threads(void);
+
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_load_hook(QEMUFile *f, uint64_t flags);
diff --git a/migration/migration.c b/migration/migration.c
index b3adbc6..309443e 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -43,6 +43,11 @@ enum {
 #define BUFFER_DELAY     100
 #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY)
 
+/* Default compression thread count */
+#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
+#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
+
 /* Migration XBZRLE default cache size */
 #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
 
@@ -60,6 +65,8 @@ MigrationState *migrate_get_current(void)
         .bandwidth_limit = MAX_THROTTLE,
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
+        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
     };
 
     return &current_migration;
@@ -302,6 +309,7 @@ static void migrate_fd_cleanup(void *opaque)
         qemu_thread_join(&s->thread);
         qemu_mutex_lock_iothread();
 
+        migrate_compress_threads_join(s);
         qemu_fclose(s->file);
         s->file = NULL;
     }
@@ -385,6 +393,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
     int64_t bandwidth_limit = s->bandwidth_limit;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
+    int compress_level = s->compress_level;
+    int compress_thread_count = s->compress_thread_count;
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -395,6 +405,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
            sizeof(enabled_capabilities));
     s->xbzrle_cache_size = xbzrle_cache_size;
 
+    s->compress_level = compress_level;
+    s->compress_thread_count = compress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIG_STATE_SETUP;
     trace_migrate_set_state(MIG_STATE_SETUP);
@@ -567,6 +579,30 @@ bool migrate_zero_blocks(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
 }
 
+bool migrate_use_compression(void)
+{
+    /* Disable compression before the patch series are applied */
+    return false;
+}
+
+int migrate_compress_level(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->compress_level;
+}
+
+int migrate_compress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->compress_thread_count;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -707,6 +743,7 @@ void migrate_fd_connect(MigrationState *s)
     /* Notify before starting migration thread */
     notifier_list_notify(&migration_state_notifiers, s);
 
+    migrate_compress_threads_create(s);
     qemu_thread_create(&s->thread, "migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
 }
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 03/13] migration: Add the framework of multi-thread decompression
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
  2015-02-02 11:05 ` [Qemu-devel] [v4 01/13] docs: Add a doc about multiple thread compression Liang Li
  2015-02-02 11:05 ` [Qemu-devel] [v4 02/13] migration: Add the framework of multi-thread compression Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 10:16   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 04/13] qemu-file: Add compression functions to QEMUFile Liang Li
                   ` (9 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Add the code to create and destroy the multiple threads those will be
used to do data decompression. Left some functions empty just to keep
clearness, and the code will be added later.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c                   | 75 +++++++++++++++++++++++++++++++++++++++++++
 include/migration/migration.h |  4 +++
 migration/migration.c         | 16 +++++++++
 3 files changed, 95 insertions(+)

diff --git a/arch_init.c b/arch_init.c
index 1831f1a..ed34eb3 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -24,6 +24,7 @@
 #include <stdint.h>
 #include <stdarg.h>
 #include <stdlib.h>
+#include <zlib.h>
 #ifndef _WIN32
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_CONTINUE 0x20
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 
 static struct defconfig_file {
     const char *filename;
@@ -337,8 +339,16 @@ struct CompressParam {
 };
 typedef struct CompressParam CompressParam;
 
+struct DecompressParam {
+    /* To be done */
+};
+typedef struct DecompressParam DecompressParam;
+
 static CompressParam *comp_param;
 static bool quit_thread;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static uint8_t *compressed_data_buf;
 
 static void *do_data_compress(void *opaque)
 {
@@ -1128,10 +1138,58 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+static void *do_data_decompress(void *opaque)
+{
+    while (!quit_thread) {
+        /* To be done */
+    }
+
+    return NULL;
+}
+
+void migrate_decompress_threads_create(int count)
+{
+    int i;
+
+    decompress_threads = g_new0(QemuThread, count);
+    decomp_param = g_new0(DecompressParam, count);
+    compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+    quit_thread = false;
+    for (i = 0; i < count; i++) {
+        qemu_thread_create(decompress_threads + i, "decompress",
+                           do_data_decompress, decomp_param + i,
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
+void migrate_decompress_threads_join(void)
+{
+    int i, thread_count;
+
+    quit_thread = true;
+    thread_count = migrate_decompress_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(decompress_threads + i);
+    }
+    g_free(decompress_threads);
+    g_free(decomp_param);
+    g_free(compressed_data_buf);
+    decompress_threads = NULL;
+    decomp_param = NULL;
+    compressed_data_buf = NULL;
+}
+
+static void decompress_data_with_multi_threads(uint8_t *compbuf,
+                                               void *host, int len)
+{
+    /* To be done */
+}
+
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
     int flags = 0, ret = 0;
     static uint64_t seq_iter;
+    int len = 0;
 
     seq_iter++;
 
@@ -1208,6 +1266,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
 
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
+        case RAM_SAVE_FLAG_COMPRESS_PAGE:
+            host = host_from_stream_offset(f, addr, flags);
+            if (!host) {
+                error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
+                ret = -EINVAL;
+                break;
+            }
+
+            len = qemu_get_be32(f);
+            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+                error_report("Invalid compressed data length: %d", len);
+                ret = -EINVAL;
+                break;
+            }
+            qemu_get_buffer(f, compressed_data_buf, len);
+            decompress_data_with_multi_threads(compressed_data_buf, host, len);
+            break;
         case RAM_SAVE_FLAG_XBZRLE:
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index daf6c81..0c4f21c 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -51,6 +51,7 @@ struct MigrationState
     QEMUFile *file;
     QemuThread *compress_thread;
     int compress_thread_count;
+    int decompress_thread_count;
     int compress_level;
 
     int state;
@@ -112,6 +113,8 @@ MigrationState *migrate_get_current(void);
 
 void migrate_compress_threads_create(MigrationState *s);
 void migrate_compress_threads_join(MigrationState *s);
+void migrate_decompress_threads_create(int count);
+void migrate_decompress_threads_join(void);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_transferred(void);
 uint64_t ram_bytes_total(void);
@@ -164,6 +167,7 @@ int64_t xbzrle_cache_resize(int64_t new_size);
 bool migrate_use_compression(void);
 int migrate_compress_level(void);
 int migrate_compress_threads(void);
+int migrate_decompress_threads(void);
 
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
diff --git a/migration/migration.c b/migration/migration.c
index 309443e..a6f6e02 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -45,6 +45,7 @@ enum {
 
 /* Default compression thread count */
 #define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
 /*0: means nocompress, 1: best speed, ... 9: best compress ratio */
 #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
 
@@ -66,6 +67,7 @@ MigrationState *migrate_get_current(void)
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
         .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
         .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
     };
 
@@ -123,12 +125,15 @@ static void process_incoming_migration_co(void *opaque)
     } else {
         runstate_set(RUN_STATE_PAUSED);
     }
+    migrate_decompress_threads_join();
 }
 
 void process_incoming_migration(QEMUFile *f)
 {
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
     int fd = qemu_get_fd(f);
+    int thread_count = migrate_decompress_threads();
+    migrate_decompress_threads_create(thread_count);
 
     assert(fd != -1);
     qemu_set_nonblock(fd);
@@ -395,6 +400,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
     int compress_level = s->compress_level;
     int compress_thread_count = s->compress_thread_count;
+    int decompress_thread_count = s->decompress_thread_count;
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -407,6 +413,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
 
     s->compress_level = compress_level;
     s->compress_thread_count = compress_thread_count;
+    s->decompress_thread_count = decompress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIG_STATE_SETUP;
     trace_migrate_set_state(MIG_STATE_SETUP);
@@ -603,6 +610,15 @@ int migrate_compress_threads(void)
     return s->compress_thread_count;
 }
 
+int migrate_decompress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->decompress_thread_count;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 04/13] qemu-file: Add compression functions to QEMUFile
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (2 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 03/13] migration: Add the framework of multi-thread decompression Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 10:33   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 05/13] arch_init: Alloc and free data struct for compression Liang Li
                   ` (8 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

qemu_put_compression_data() compress the data and put it to QEMUFile.
qemu_put_qemu_file() put the data in the buffer of source QEMUFile to
destination QEMUFile.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 include/migration/qemu-file.h |  3 +++
 migration/qemu-file.c         | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 42 insertions(+)

diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index d843c00..2204fb9 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -159,6 +159,9 @@ void qemu_put_be32(QEMUFile *f, unsigned int v);
 void qemu_put_be64(QEMUFile *f, uint64_t v);
 int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset);
 int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size);
+size_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
+                                 int level);
+int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
 /*
  * Note that you can only peek continuous bytes from where the current pointer
  * is; you aren't guaranteed to be able to peak to +n bytes unless you've
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index edc2830..de2da2d 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -21,6 +21,7 @@
  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  * THE SOFTWARE.
  */
+#include <zlib.h>
 #include "qemu-common.h"
 #include "qemu/iov.h"
 #include "qemu/sockets.h"
@@ -529,3 +530,41 @@ uint64_t qemu_get_be64(QEMUFile *f)
     v |= qemu_get_be32(f);
     return v;
 }
+
+/* compress size bytes of data start at p with specific compression
+ * leve and store the compressed data to the buffer of f.
+ */
+
+size_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
+                                 int level)
+{
+    size_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
+
+    if (blen < compressBound(size)) {
+        return 0;
+    }
+    if (compress2(f->buf + f->buf_index + sizeof(int32_t), &blen, (Bytef *)p,
+                  size, level) != Z_OK) {
+        error_report("Compress Failed!");
+        return 0;
+    }
+    qemu_put_be32(f, blen);
+    f->buf_index += blen;
+    return blen + sizeof(int32_t);
+}
+
+/* Put the data in the buffer of f_src to the buffer of f_des, and
+ * then reset the buf_index of f_src to 0.
+ */
+
+int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
+{
+    int len = 0;
+
+    if (f_src->buf_index > 0) {
+        len = f_src->buf_index;
+        qemu_put_buffer(f_des, f_src->buf, f_src->buf_index);
+        f_src->buf_index = 0;
+    }
+    return len;
+}
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 05/13] arch_init: Alloc and free data struct for compression
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (3 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 04/13] qemu-file: Add compression functions to QEMUFile Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 10:45   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 06/13] arch_init: Add and free data struct for decompression Liang Li
                   ` (7 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Define the data structure and variables used to do multiple thread
compression, and add the code to initialize and free them.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 34 +++++++++++++++++++++++++++++++++-
 1 file changed, 33 insertions(+), 1 deletion(-)

diff --git a/arch_init.c b/arch_init.c
index ed34eb3..87c4947 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -335,7 +335,12 @@ static uint32_t last_version;
 static bool ram_bulk_stage;
 
 struct CompressParam {
-    /* To be done */
+    bool busy;
+    QEMUFile *file;
+    QemuMutex mutex;
+    QemuCond cond;
+    RAMBlock *block;
+    ram_addr_t offset;
 };
 typedef struct CompressParam CompressParam;
 
@@ -345,6 +350,14 @@ struct DecompressParam {
 typedef struct DecompressParam DecompressParam;
 
 static CompressParam *comp_param;
+/* comp_done_cond is used to wake up the migration thread when
+ * one of the compression threads has finished the compression.
+ * comp_done_lock is used to co-work with comp_done_cond.
+ */
+static QemuMutex *comp_done_lock;
+static QemuCond *comp_done_cond;
+/* The empty QEMUFileOps will be used by file in CompressParam */
+static const QEMUFileOps empty_ops = { };
 static bool quit_thread;
 static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
@@ -379,11 +392,20 @@ void migrate_compress_threads_join(MigrationState *s)
     thread_count = migrate_compress_threads();
     for (i = 0; i < thread_count; i++) {
         qemu_thread_join(s->compress_thread + i);
+        qemu_fclose(comp_param[i].file);
+        qemu_mutex_destroy(&comp_param[i].mutex);
+        qemu_cond_destroy(&comp_param[i].cond);
     }
+    qemu_mutex_destroy(comp_done_lock);
+    qemu_cond_destroy(comp_done_cond);
     g_free(s->compress_thread);
     g_free(comp_param);
+    g_free(comp_done_cond);
+    g_free(comp_done_lock);
     s->compress_thread = NULL;
     comp_param = NULL;
+    comp_done_cond = NULL;
+    comp_done_lock = NULL;
 }
 
 void migrate_compress_threads_create(MigrationState *s)
@@ -397,7 +419,17 @@ void migrate_compress_threads_create(MigrationState *s)
     thread_count = migrate_compress_threads();
     s->compress_thread = g_new0(QemuThread, thread_count);
     comp_param = g_new0(CompressParam, thread_count);
+    comp_done_cond = g_new0(QemuCond, 1);
+    comp_done_lock = g_new0(QemuMutex, 1);
+    qemu_cond_init(comp_done_cond);
+    qemu_mutex_init(comp_done_lock);
     for (i = 0; i < thread_count; i++) {
+        /* com_param[i].file is just used as a dummy buffer to save data, set
+         * it's ops to empty.
+         */
+        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
+        qemu_mutex_init(&comp_param[i].mutex);
+        qemu_cond_init(&comp_param[i].cond);
         qemu_thread_create(s->compress_thread + i, "compress",
                            do_data_compress, comp_param + i,
                            QEMU_THREAD_JOINABLE);
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 06/13] arch_init: Add and free data struct for decompression
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (4 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 05/13] arch_init: Alloc and free data struct for compression Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 10:46   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 07/13] migration: Split the function ram_save_page Liang Li
                   ` (6 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Define the data structure and variables used to do multiple thread
decompression, and add the code to initialize and free them.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git a/arch_init.c b/arch_init.c
index 87c4947..500f299 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -345,7 +345,12 @@ struct CompressParam {
 typedef struct CompressParam CompressParam;
 
 struct DecompressParam {
-    /* To be done */
+    bool busy;
+    QemuMutex mutex;
+    QemuCond cond;
+    void *des;
+    uint8 *compbuf;
+    int len;
 };
 typedef struct DecompressParam DecompressParam;
 
@@ -1188,6 +1193,9 @@ void migrate_decompress_threads_create(int count)
     compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
     quit_thread = false;
     for (i = 0; i < count; i++) {
+        qemu_mutex_init(&decomp_param[i].mutex);
+        qemu_cond_init(&decomp_param[i].cond);
+        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
         qemu_thread_create(decompress_threads + i, "decompress",
                            do_data_decompress, decomp_param + i,
                            QEMU_THREAD_JOINABLE);
@@ -1202,6 +1210,9 @@ void migrate_decompress_threads_join(void)
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
         qemu_thread_join(decompress_threads + i);
+        qemu_mutex_destroy(&decomp_param[i].mutex);
+        qemu_cond_destroy(&decomp_param[i].cond);
+        g_free(decomp_param[i].compbuf);
     }
     g_free(decompress_threads);
     g_free(decomp_param);
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 07/13] migration: Split the function ram_save_page
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (5 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 06/13] arch_init: Add and free data struct for decompression Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 11:01   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression Liang Li
                   ` (5 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Split the function ram_save_page for code reuse purpose.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 102 +++++++++++++++++++++++++++++++++---------------------------
 1 file changed, 56 insertions(+), 46 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 500f299..eae082b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -595,6 +595,58 @@ static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length)
     }
 }
 
+static int save_zero_and_xbzrle_page(QEMUFile *f, uint8_t **current_data,
+                                     RAMBlock *block, ram_addr_t offset,
+                                     bool last_stage, bool *send_async)
+{
+    int bytes_sent = -1;
+    int cont, ret;
+    ram_addr_t current_addr;
+
+    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+
+    /* In doubt sent page as normal */
+    ret = ram_control_save_page(f, block->offset,
+                                offset, TARGET_PAGE_SIZE, &bytes_sent);
+
+    XBZRLE_cache_lock();
+
+    current_addr = block->offset + offset;
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_sent > 0) {
+                acct_info.norm_pages++;
+            } else if (bytes_sent == 0) {
+                acct_info.dup_pages++;
+            }
+        }
+    } else if (is_zero_range(*current_data, TARGET_PAGE_SIZE)) {
+        acct_info.dup_pages++;
+        bytes_sent = save_block_hdr(f, block, offset, cont,
+                                    RAM_SAVE_FLAG_COMPRESS);
+        qemu_put_byte(f, 0);
+        bytes_sent++;
+        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
+         * page would be stale
+         */
+        xbzrle_cache_zero_page(current_addr);
+    } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
+        bytes_sent = save_xbzrle_page(f, current_data, current_addr, block,
+                                      offset, cont, last_stage);
+        if (!last_stage) {
+            /* Can't send this cached data async, since the cache page
+             * might get updated before it gets to the wire
+             */
+            if (send_async != NULL) {
+                *send_async = false;
+            }
+        }
+    }
+
+    XBZRLE_cache_unlock();
+
+    return bytes_sent;
+}
 
 /* Needs iothread lock! */
 /* Fix me: there are too many global variables used in migration process. */
@@ -685,60 +737,20 @@ static void migration_bitmap_sync(void)
  *
  * Returns: Number of bytes written.
  */
-static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
+static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
                          bool last_stage)
 {
     int bytes_sent;
     int cont;
-    ram_addr_t current_addr;
     MemoryRegion *mr = block->mr;
     uint8_t *p;
-    int ret;
     bool send_async = true;
 
-    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
-
     p = memory_region_get_ram_ptr(mr) + offset;
-
-    /* In doubt sent page as normal */
-    bytes_sent = -1;
-    ret = ram_control_save_page(f, block->offset,
-                           offset, TARGET_PAGE_SIZE, &bytes_sent);
-
-    XBZRLE_cache_lock();
-
-    current_addr = block->offset + offset;
-    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
-        if (ret != RAM_SAVE_CONTROL_DELAYED) {
-            if (bytes_sent > 0) {
-                acct_info.norm_pages++;
-            } else if (bytes_sent == 0) {
-                acct_info.dup_pages++;
-            }
-        }
-    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
-        acct_info.dup_pages++;
-        bytes_sent = save_block_hdr(f, block, offset, cont,
-                                    RAM_SAVE_FLAG_COMPRESS);
-        qemu_put_byte(f, 0);
-        bytes_sent++;
-        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
-         * page would be stale
-         */
-        xbzrle_cache_zero_page(current_addr);
-    } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
-        bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
-                                      offset, cont, last_stage);
-        if (!last_stage) {
-            /* Can't send this cached data async, since the cache page
-             * might get updated before it gets to the wire
-             */
-            send_async = false;
-        }
-    }
-
-    /* XBZRLE overflow or normal page */
+    bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
+                                           last_stage, &send_async);
     if (bytes_sent == -1) {
+        cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
         bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_PAGE);
         if (send_async) {
             qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
@@ -749,8 +761,6 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
         acct_info.norm_pages++;
     }
 
-    XBZRLE_cache_unlock();
-
     return bytes_sent;
 }
 
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (6 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 07/13] migration: Split the function ram_save_page Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 12:12   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 09/13] migration: Make compression co-work with xbzrle Liang Li
                   ` (4 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Implement the core logic of the multiple thread compression. At this
point, multiple thread compression can't co-work with xbzrle yet.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 159 insertions(+), 8 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index eae082b..b8bdb16 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -364,16 +364,31 @@ static QemuCond *comp_done_cond;
 /* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 static bool quit_thread;
+static int one_byte_count;
 static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
 static uint8_t *compressed_data_buf;
 
+static int do_compress_ram_page(CompressParam *param);
+
 static void *do_data_compress(void *opaque)
 {
-    while (!quit_thread) {
-
-    /* To be done */
+    CompressParam *param = opaque;
 
+    while (!quit_thread) {
+        qemu_mutex_lock(&param->mutex);
+        while (!param->busy) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            if (quit_thread) {
+                break;
+            }
+        }
+        qemu_mutex_unlock(&param->mutex);
+        do_compress_ram_page(param);
+        qemu_mutex_lock(comp_done_lock);
+        param->busy = false;
+        qemu_cond_signal(comp_done_cond);
+        qemu_mutex_unlock(comp_done_lock);
     }
 
     return NULL;
@@ -381,9 +396,13 @@ static void *do_data_compress(void *opaque)
 
 static inline void terminate_compression_threads(void)
 {
-    quit_thread = true;
+    int idx, thread_count;
 
-    /* To be done */
+    thread_count = migrate_compress_threads();
+    quit_thread = true;
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_cond_signal(&comp_param[idx].cond);
+    }
 }
 
 void migrate_compress_threads_join(MigrationState *s)
@@ -764,12 +783,144 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
     return bytes_sent;
 }
 
+static int do_compress_ram_page(CompressParam *param)
+{
+    int bytes_sent, cont;
+    int blen;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+    p = memory_region_get_ram_ptr(block->mr) + offset;
+
+    bytes_sent = save_block_hdr(param->file, block, offset, cont,
+                                RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+                                     migrate_compress_level());
+    bytes_sent += blen;
+    atomic_inc(&acct_info.norm_pages);
+
+    return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->busy = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        if (comp_param[idx].busy) {
+            qemu_mutex_lock(comp_done_lock);
+            while (comp_param[idx].busy) {
+                qemu_cond_wait(comp_done_cond, comp_done_lock);
+            }
+            qemu_mutex_unlock(comp_done_lock);
+        }
+        len = qemu_put_qemu_file(f, comp_param[idx].file);
+        bytes_transferred += len;
+    }
+    if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
+        bytes_transferred -= one_byte_count;
+        one_byte_count = 0;
+    }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+                                       ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+                                           ram_addr_t offset)
+{
+    int idx, thread_count, bytes_sent = 0;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(comp_done_lock);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (!comp_param[idx].busy) {
+                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx], block, offset);
+                start_compression(&comp_param[idx]);
+                if (bytes_sent == 0) {
+                    /* set bytes_sent to 1 in this case to prevent migration
+                     * from terminating, this 1 byte whill be added to
+                     * bytes_transferred later, minus 1 to keep the
+                     * bytes_transferred accurate */
+                    bytes_sent = 1;
+                    if (bytes_transferred <= 0) {
+                        one_byte_count++;
+                    } else {
+                        bytes_transferred -= 1;
+                    }
+                }
+                break;
+            }
+        }
+        if (bytes_sent > 0) {
+            break;
+        } else {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+    }
+    qemu_mutex_unlock(comp_done_lock);
+
+    return bytes_sent;
+}
+
 static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
                                     ram_addr_t offset, bool last_stage)
 {
     int bytes_sent = 0;
+    MemoryRegion *mr = block->mr;
+    uint8_t *p;
 
-    /* To be done*/
+    p = memory_region_get_ram_ptr(mr) + offset;
+    /* When starting the process of a new block, the first page of
+     * the block should be sent out before other pages in the same
+     * block, and all the pages in last block should have been sent
+     * out, keeping this order is important, because the 'cont' flag
+     * is used to avoid resending the block name.
+     */
+    if (block != last_sent_block) {
+        flush_compressed_data(f);
+        bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
+                                               last_stage, NULL);
+        if (bytes_sent == -1) {
+            set_compress_params(&comp_param[0], block, offset);
+            /* Use the qemu thread to compress the data to make sure the
+             * first page is sent out before other pages
+             */
+            bytes_sent = do_compress_ram_page(&comp_param[0]);
+            if (bytes_sent > 0) {
+                qemu_put_qemu_file(f, comp_param[0].file);
+            }
+        }
+    } else {
+        bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
+                                               last_stage, NULL);
+        if (bytes_sent == -1) {
+            bytes_sent = compress_page_with_multi_thread(f, block, offset);
+        }
+    }
 
     return bytes_sent;
 }
@@ -828,8 +979,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     return bytes_sent;
 }
 
-static uint64_t bytes_transferred;
-
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
     uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -1037,6 +1186,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
+    flush_compressed_data(f);
     qemu_mutex_unlock_ramlist();
 
     /*
@@ -1083,6 +1233,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         bytes_transferred += bytes_sent;
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 09/13] migration: Make compression co-work with xbzrle
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (7 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 12:15   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 10/13] migration: Add the core code for decompression Liang Li
                   ` (3 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Now, multiple thread compression can co-work with xbzrle. when
xbzrle is on, multiple thread compression will only work at the
first round of RAM data sync.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/arch_init.c b/arch_init.c
index b8bdb16..8ef0315 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -364,6 +364,7 @@ static QemuCond *comp_done_cond;
 /* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 static bool quit_thread;
+static bool compression_switch_on;
 static int one_byte_count;
 static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
@@ -440,6 +441,7 @@ void migrate_compress_threads_create(MigrationState *s)
         return;
     }
     quit_thread = false;
+    compression_switch_on = true;
     thread_count = migrate_compress_threads();
     s->compress_thread = g_new0(QemuThread, thread_count);
     comp_param = g_new0(CompressParam, thread_count);
@@ -957,9 +959,16 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
                 block = QTAILQ_FIRST(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
+                if (migrate_use_xbzrle()) {
+                    /* If xbzrle is on, stop using the data compression at this
+                     * point. In theory, xbzrle can do better than compression.
+                     */
+                    flush_compressed_data(f);
+                    compression_switch_on = false;
+                }
             }
         } else {
-            if (migrate_use_compression()) {
+            if (compression_switch_on && migrate_use_compression()) {
                 bytes_sent = ram_save_compressed_page(f, block, offset,
                                                       last_stage);
             } else {
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 10/13] migration: Add the core code for decompression
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (8 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 09/13] migration: Make compression co-work with xbzrle Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-06 12:27   ` Dr. David Alan Gilbert
  2015-02-02 11:05 ` [Qemu-devel] [v4 11/13] migration: Add interface to control compression Liang Li
                   ` (2 subsequent siblings)
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Implement the core logic of multiple thread decompression,
the decompression can work now.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 47 insertions(+), 2 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 8ef0315..549fdbb 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -814,6 +814,13 @@ static inline void start_compression(CompressParam *param)
     qemu_mutex_unlock(&param->mutex);
 }
 
+static inline void start_decompression(DecompressParam *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->busy = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
 
 static uint64_t bytes_transferred;
 
@@ -1347,8 +1354,27 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
 
 static void *do_data_decompress(void *opaque)
 {
+    DecompressParam *param = opaque;
+    size_t pagesize;
+
     while (!quit_thread) {
-        /* To be done */
+        qemu_mutex_lock(&param->mutex);
+        while (!param->busy) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            if (quit_thread) {
+                break;
+            }
+            pagesize = TARGET_PAGE_SIZE;
+            /* uncompress() will return failed in some case, especially
+             * when the page is dirted when doing the compression, it's
+             * not a problem because the dirty page will be retransferred
+             * and uncompress() won't break the data in other pages.
+             */
+            uncompress((Bytef *)param->des, &pagesize,
+                       (const Bytef *)param->compbuf, param->len);
+            param->busy = false;
+        }
+        qemu_mutex_unlock(&param->mutex);
     }
 
     return NULL;
@@ -1379,6 +1405,9 @@ void migrate_decompress_threads_join(void)
     quit_thread = true;
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        qemu_cond_signal(&decomp_param[i].cond);
+    }
+    for (i = 0; i < thread_count; i++) {
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
@@ -1395,7 +1424,23 @@ void migrate_decompress_threads_join(void)
 static void decompress_data_with_multi_threads(uint8_t *compbuf,
                                                void *host, int len)
 {
-    /* To be done */
+    int idx, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (!decomp_param[idx].busy) {
+                memcpy(decomp_param[idx].compbuf, compbuf, len);
+                decomp_param[idx].des = host;
+                decomp_param[idx].len = len;
+                start_decompression(&decomp_param[idx]);
+                break;
+            }
+        }
+        if (idx < thread_count) {
+            break;
+        }
+    }
 }
 
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 11/13] migration: Add interface to control compression
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (9 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 10/13] migration: Add the core code for decompression Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-03 22:17   ` Eric Blake
  2015-02-02 11:05 ` [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter Liang Li
  2015-02-02 11:05 ` [Qemu-devel] [v4 13/13] migration: Add command to query " Liang Li
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

The multiple compression threads can be turned on/off through
qmp and hmp interface before doing live migration.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
Reviewed-by: Dr.David Alan Gilbert <dgilbert@redhat.com>
---
 migration/migration.c | 7 +++++--
 qapi-schema.json      | 7 ++++++-
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index a6f6e02..cbbd455 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -588,8 +588,11 @@ bool migrate_zero_blocks(void)
 
 bool migrate_use_compression(void)
 {
-    /* Disable compression before the patch series are applied */
-    return false;
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
 }
 
 int migrate_compress_level(void)
diff --git a/qapi-schema.json b/qapi-schema.json
index e16f8eb..0dfc4ce 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -491,13 +491,18 @@
 #          to enable the capability on the source VM. The feature is disabled by
 #          default. (since 1.6)
 #
+# @compress: Use multiple compression threads to accelerate live migration.
+#          This feature can help to reduce the migration traffic, by sending
+#          compressed pages. The feature is disabled by default. (since 2.3)
+#
 # @auto-converge: If enabled, QEMU will automatically throttle down the guest
 #          to speed up convergence of RAM migration. (since 1.6)
 #
 # Since: 1.2
 ##
 { 'enum': 'MigrationCapability',
-  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
+  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
+           'compress'] }
 
 ##
 # @MigrationCapabilityStatus
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (10 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 11/13] migration: Add interface to control compression Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-03 23:28   ` Eric Blake
  2015-02-02 11:05 ` [Qemu-devel] [v4 13/13] migration: Add command to query " Liang Li
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Add the qmp and hmp commands to tune the parameters used in live
migration.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 hmp-commands.hx               | 15 ++++++++++
 hmp.c                         | 35 ++++++++++++++++++++++
 hmp.h                         |  3 ++
 include/migration/migration.h |  4 +--
 migration/migration.c         | 69 +++++++++++++++++++++++++++++++++++--------
 monitor.c                     | 18 +++++++++++
 qapi-schema.json              | 52 ++++++++++++++++++++++++++++++++
 qmp-commands.hx               | 25 ++++++++++++++++
 8 files changed, 206 insertions(+), 15 deletions(-)

diff --git a/hmp-commands.hx b/hmp-commands.hx
index e37bc8b..535b5ba 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -985,6 +985,21 @@ Enable/Disable the usage of a capability @var{capability} for migration.
 ETEXI
 
     {
+        .name       = "migrate_set_parameter",
+        .args_type  = "parameter:s,value:i",
+        .params     = "parameter value",
+        .help       = "Set the parameter for migration",
+        .mhandler.cmd = hmp_migrate_set_parameter,
+        .command_completion = migrate_set_parameter_completion,
+    },
+
+STEXI
+@item migrate_set_parameter @var{parameter} @var{value}
+@findex migrate_set_parameter
+Set the parameter @var{parameter} for migration.
+ETEXI
+
+    {
         .name       = "client_migrate_info",
         .args_type  = "protocol:s,hostname:s,port:i?,tls-port:i?,cert-subject:s?",
         .params     = "protocol hostname port tls-port cert-subject",
diff --git a/hmp.c b/hmp.c
index 481be80..faab4b0 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1134,6 +1134,41 @@ void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict)
     }
 }
 
+void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
+{
+    const char *param = qdict_get_str(qdict, "parameter");
+    int value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+    MigrationParameterStatusList *params = g_malloc0(sizeof(*params));
+    MigrationParameterInt *data;
+    int i;
+
+    for (i = 0; i < MIGRATION_PARAMETER_MAX; i++) {
+        if (strcmp(param, MigrationParameter_lookup[i]) == 0) {
+            params->value = g_malloc0(sizeof(*params->value));
+            params->value->kind = i;
+            params->value->data = g_malloc0(sizeof(MigrationParameterInt));
+            data = (MigrationParameterInt *)params->value->data;
+            data->value = value;
+            params->next = NULL;
+            qmp_migrate_set_parameters(params, &err);
+            break;
+        }
+    }
+
+    if (i == MIGRATION_PARAMETER_MAX) {
+        error_set(&err, QERR_INVALID_PARAMETER, param);
+    }
+
+    qapi_free_MigrationParameterStatusList(params);
+
+    if (err) {
+        monitor_printf(mon, "migrate_set_parameter: %s\n",
+                       error_get_pretty(err));
+        error_free(err);
+    }
+}
+
 void hmp_set_password(Monitor *mon, const QDict *qdict)
 {
     const char *protocol  = qdict_get_str(qdict, "protocol");
diff --git a/hmp.h b/hmp.h
index 4bb5dca..429efea 100644
--- a/hmp.h
+++ b/hmp.h
@@ -63,6 +63,7 @@ void hmp_migrate_cancel(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
 void hmp_set_password(Monitor *mon, const QDict *qdict);
 void hmp_expire_password(Monitor *mon, const QDict *qdict);
@@ -111,6 +112,8 @@ void watchdog_action_completion(ReadLineState *rs, int nb_args,
                                 const char *str);
 void migrate_set_capability_completion(ReadLineState *rs, int nb_args,
                                        const char *str);
+void migrate_set_parameter_completion(ReadLineState *rs, int nb_args,
+                                      const char *str);
 void host_net_add_completion(ReadLineState *rs, int nb_args, const char *str);
 void host_net_remove_completion(ReadLineState *rs, int nb_args,
                                 const char *str);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 0c4f21c..8e09b42 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -50,9 +50,7 @@ struct MigrationState
     QEMUBH *cleanup_bh;
     QEMUFile *file;
     QemuThread *compress_thread;
-    int compress_thread_count;
-    int decompress_thread_count;
-    int compress_level;
+    int parameters[MIGRATION_PARAMETER_MAX];
 
     int state;
     MigrationParams params;
diff --git a/migration/migration.c b/migration/migration.c
index cbbd455..b5055dc 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -66,9 +66,12 @@ MigrationState *migrate_get_current(void)
         .bandwidth_limit = MAX_THROTTLE,
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
-        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
-        .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
-        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
+        .parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] =
+                DEFAULT_MIGRATE_COMPRESS_LEVEL,
+        .parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
+                DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
+                DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
     };
 
     return &current_migration;
@@ -292,6 +295,44 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
     }
 }
 
+void qmp_migrate_set_parameters(MigrationParameterStatusList *params,
+                                Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+    MigrationParameterStatusList *p;
+    MigrationParameterInt *data;
+
+    for (p = params; p; p = p->next) {
+        switch (p->value->kind) {
+        case MIGRATION_PARAMETER_COMPRESS_LEVEL:
+            data = (MigrationParameterInt *)p->value->data;
+            if (data->value < 0 || data->value > 9) {
+                error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level",
+                          "is invalid, it should be in the range of 0 to 9");
+                return;
+            }
+            break;
+        case MIGRATION_PARAMETER_COMPRESS_THREADS:
+        case MIGRATION_PARAMETER_DECOMPRESS_THREADS:
+            if (s->state == MIG_STATE_ACTIVE || s->state == MIG_STATE_SETUP) {
+                error_set(errp, QERR_MIGRATION_ACTIVE);
+                return;
+            }
+            data = (MigrationParameterInt *)p->value->data;
+            if (data->value < 1 || data->value > 255) {
+                error_set(errp, QERR_INVALID_PARAMETER_VALUE,
+                          "(de)compress_threads",
+                          "is invalid, it should be in the range of 1 to 255");
+                return;
+            }
+            break;
+        default:
+           return;
+        }
+        s->parameters[p->value->kind] = data->value;
+    }
+}
+
 /* shared migration helpers */
 
 static void migrate_set_state(MigrationState *s, int old_state, int new_state)
@@ -398,9 +439,11 @@ static MigrationState *migrate_init(const MigrationParams *params)
     int64_t bandwidth_limit = s->bandwidth_limit;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
-    int compress_level = s->compress_level;
-    int compress_thread_count = s->compress_thread_count;
-    int decompress_thread_count = s->decompress_thread_count;
+    int compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
+    int compress_thread_count =
+            s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
+    int decompress_thread_count =
+            s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -411,9 +454,11 @@ static MigrationState *migrate_init(const MigrationParams *params)
            sizeof(enabled_capabilities));
     s->xbzrle_cache_size = xbzrle_cache_size;
 
-    s->compress_level = compress_level;
-    s->compress_thread_count = compress_thread_count;
-    s->decompress_thread_count = decompress_thread_count;
+    s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level;
+    s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
+               compress_thread_count;
+    s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
+               decompress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIG_STATE_SETUP;
     trace_migrate_set_state(MIG_STATE_SETUP);
@@ -601,7 +646,7 @@ int migrate_compress_level(void)
 
     s = migrate_get_current();
 
-    return s->compress_level;
+    return s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
 }
 
 int migrate_compress_threads(void)
@@ -610,7 +655,7 @@ int migrate_compress_threads(void)
 
     s = migrate_get_current();
 
-    return s->compress_thread_count;
+    return s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
 }
 
 int migrate_decompress_threads(void)
@@ -619,7 +664,7 @@ int migrate_decompress_threads(void)
 
     s = migrate_get_current();
 
-    return s->decompress_thread_count;
+    return s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
 }
 
 int migrate_use_xbzrle(void)
diff --git a/monitor.c b/monitor.c
index 7e4f605..fa8ebde 100644
--- a/monitor.c
+++ b/monitor.c
@@ -4554,6 +4554,24 @@ void migrate_set_capability_completion(ReadLineState *rs, int nb_args,
     }
 }
 
+void migrate_set_parameter_completion(ReadLineState *rs, int nb_args,
+                                      const char *str)
+{
+    size_t len;
+
+    len = strlen(str);
+    readline_set_completion_index(rs, len);
+    if (nb_args == 2) {
+        int i;
+        for (i = 0; i < MIGRATION_PARAMETER_MAX; i++) {
+            const char *name = MigrationParameter_lookup[i];
+            if (!strncmp(str, name, len)) {
+                readline_add_completion(rs, name);
+            }
+        }
+    }
+}
+
 void host_net_add_completion(ReadLineState *rs, int nb_args, const char *str)
 {
     int i;
diff --git a/qapi-schema.json b/qapi-schema.json
index 0dfc4ce..273f991 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -541,6 +541,58 @@
 ##
 { 'command': 'query-migrate-capabilities', 'returns':   ['MigrationCapabilityStatus']}
 
+# @MigrationParameter
+#
+# Migration parameters enumeration
+#
+# @compress-level: Set the compression level to be used in live migration,
+#          the compression level is an integer between 0 and 9, where 0 means
+#          no compression, 1 means the best compression speed, and 9 means best
+#          compression ratio which will consume more CPU.
+#
+# @compress-threads: Set compression thread count to be used in live migration,
+#          the compression thread count is an integer between 1 and 255.
+#
+# @decompress-threads: Set decompression thread count to be used in live migration,
+#          the decompression thread count is an integer between 1 and 255.
+#
+# Since: 2.3
+##
+{ 'enum': 'MigrationParameter',
+  'data': ['compress-level', 'compress-threads', 'decompress-threads'] }
+##
+# @MigrationParameterStatus
+#
+# Migration parameter information
+#
+# @parameter: the parameter of migration
+#
+# @data: pointer to the parameter value
+#
+# Since: 2.3
+##
+{ 'type': 'MigrationParameterBase',
+  'data': {'parameter': 'MigrationParameter'} }
+{ 'type': 'MigrationParameterInt',
+  'data': {'value': 'int'} }
+{ 'union': 'MigrationParameterStatus',
+  'base': 'MigrationParameterBase',
+  'discriminator': 'parameter',
+  'data': { 'compress-level': 'MigrationParameterInt',
+            'compress-threads': 'MigrationParameterInt',
+            'decompress-threads': 'MigrationParameterInt'} }
+#
+# @migrate-set-parameters
+#
+# Set the following migration parameters (like compress-level)
+#
+# @parameters: json array of parameter modifications to make
+#
+# Since: 2.3
+##
+{ 'command': 'migrate-set-parameters',
+  'data': { 'parameters': ['MigrationParameterStatus'] } }
+##
 ##
 # @MouseInfo:
 #
diff --git a/qmp-commands.hx b/qmp-commands.hx
index c5f16dd..9d16386 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -3278,6 +3278,31 @@ EQMP
     },
 
 SQMP
+migrate-set-parameters
+----------------------
+
+Set migration parameters
+
+- "compress-level": set compression level during migration
+- "compress-threads": set compression thread count for migration
+- "decompress-threads": set decompression thread count for migration
+
+Arguments:
+
+Example:
+
+-> { "execute": "migrate-set-parameters" , "arguments":
+     { "parameters": [ { "parameter": "compress-level", "value": 1 } ] } }
+
+EQMP
+
+    {
+        .name       = "migrate-set-parameters",
+        .args_type  = "parameters:O",
+        .params     = "parameter:s,value:O",
+	.mhandler.cmd_new = qmp_marshal_input_migrate_set_parameters,
+    },
+SQMP
 query-balloon
 -------------
 
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [v4 13/13] migration: Add command to query migration parameter
  2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
                   ` (11 preceding siblings ...)
  2015-02-02 11:05 ` [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter Liang Li
@ 2015-02-02 11:05 ` Liang Li
  2015-02-03 23:30   ` Eric Blake
  12 siblings, 1 reply; 28+ messages in thread
From: Liang Li @ 2015-02-02 11:05 UTC (permalink / raw)
  To: qemu-devel
  Cc: quintela, Liang Li, armbru, dgilbert, Yang Zhang, amit.shah,
	lcapitulino

Add the qmp and hmp commands to query the parameters used in live
migration.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 hmp-commands.hx       |  2 ++
 hmp.c                 | 21 +++++++++++++++++++++
 hmp.h                 |  1 +
 migration/migration.c | 27 +++++++++++++++++++++++++++
 monitor.c             |  7 +++++++
 qapi-schema.json      | 11 +++++++++++
 qmp-commands.hx       | 26 ++++++++++++++++++++++++++
 7 files changed, 95 insertions(+)

diff --git a/hmp-commands.hx b/hmp-commands.hx
index 535b5ba..ed0c06a 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -1779,6 +1779,8 @@ show user network stack connection states
 show migration status
 @item info migrate_capabilities
 show current migration capabilities
+@item info migrate_parameters
+show current migration parameters
 @item info migrate_cache_size
 show current migration XBZRLE cache size
 @item info balloon
diff --git a/hmp.c b/hmp.c
index faab4b0..33c95b3 100644
--- a/hmp.c
+++ b/hmp.c
@@ -246,6 +246,27 @@ void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict)
     qapi_free_MigrationCapabilityStatusList(caps);
 }
 
+void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
+{
+    MigrationParameterStatusList *params, *p;
+    MigrationParameterInt *data;
+
+    params = qmp_query_migrate_parameters(NULL);
+
+    if (params) {
+        monitor_printf(mon, "parameters:");
+        for (p = params; p; p = p->next) {
+            data = (MigrationParameterInt *)p->value->data;
+            monitor_printf(mon, " %s: %" PRId64,
+                           MigrationParameter_lookup[p->value->kind],
+                           data->value);
+        }
+        monitor_printf(mon, "\n");
+    }
+
+    qapi_free_MigrationParameterStatusList(params);
+}
+
 void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
 {
     monitor_printf(mon, "xbzrel cache size: %" PRId64 " kbytes\n",
diff --git a/hmp.h b/hmp.h
index 429efea..b2b2d2c 100644
--- a/hmp.h
+++ b/hmp.h
@@ -28,6 +28,7 @@ void hmp_info_chardev(Monitor *mon, const QDict *qdict);
 void hmp_info_mice(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
 void hmp_info_cpus(Monitor *mon, const QDict *qdict);
 void hmp_info_block(Monitor *mon, const QDict *qdict);
diff --git a/migration/migration.c b/migration/migration.c
index b5055dc..f925130 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -179,6 +179,33 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
     return head;
 }
 
+MigrationParameterStatusList *qmp_query_migrate_parameters(Error **errp)
+{
+    MigrationParameterStatusList *head = NULL;
+    MigrationParameterStatusList *params;
+    MigrationState *s = migrate_get_current();
+    MigrationParameterInt *data;
+    int i;
+
+    params = NULL; /* silence compiler warning */
+    for (i = 0; i < MIGRATION_PARAMETER_MAX; i++) {
+        if (head == NULL) {
+            head = g_malloc0(sizeof(*params));
+            params = head;
+        } else {
+            params->next = g_malloc0(sizeof(*params));
+            params = params->next;
+        }
+        params->value = g_malloc(sizeof(*params->value));
+        params->value->kind = i;
+        params->value->data = g_malloc(sizeof(MigrationParameterInt));
+        data = (MigrationParameterInt *)params->value->data;
+        data->value = s->parameters[i];
+    }
+
+    return head;
+}
+
 static void get_xbzrle_cache_stats(MigrationInfo *info)
 {
     if (migrate_use_xbzrle()) {
diff --git a/monitor.c b/monitor.c
index fa8ebde..58dfa28 100644
--- a/monitor.c
+++ b/monitor.c
@@ -2872,6 +2872,13 @@ static mon_cmd_t info_cmds[] = {
         .mhandler.cmd = hmp_info_migrate_capabilities,
     },
     {
+        .name       = "migrate_parameters",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration parameters",
+        .mhandler.cmd = hmp_info_migrate_parameters,
+    },
+    {
         .name       = "migrate_cache_size",
         .args_type  = "",
         .params     = "",
diff --git a/qapi-schema.json b/qapi-schema.json
index 273f991..af34f7f 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -593,6 +593,17 @@
 { 'command': 'migrate-set-parameters',
   'data': { 'parameters': ['MigrationParameterStatus'] } }
 ##
+# @query-migrate-parameters
+#
+# Returns information about the current migration parameters status
+#
+# Returns: @MigrationParametersStatus
+#
+# Since: 2.3
+##
+{ 'command': 'query-migrate-parameters',
+  'returns': ['MigrationParameterStatus'] }
+##
 ##
 # @MouseInfo:
 #
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 9d16386..bcfe823 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -3303,6 +3303,32 @@ EQMP
 	.mhandler.cmd_new = qmp_marshal_input_migrate_set_parameters,
     },
 SQMP
+query-migrate-parameters
+------------------------
+
+Query current migration parameters
+
+- "parameters": migration parameters value
+         - "compress-level" : compression level value (json-int)
+         - "compress-threads" : compression thread count value (json-int)
+         - "decompress-threads" : decompression thread count value (json-int)
+
+Arguments:
+
+Example:
+
+-> { "execute": "query-migrate-parameters" }
+<- { "return": [ { "value": 1, "parameter": "compress-level" } ] }
+
+EQMP
+
+    {
+        .name       = "query-migrate-parameters",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_parameters,
+    },
+
+SQMP
 query-balloon
 -------------
 
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 11/13] migration: Add interface to control compression
  2015-02-02 11:05 ` [Qemu-devel] [v4 11/13] migration: Add interface to control compression Liang Li
@ 2015-02-03 22:17   ` Eric Blake
  0 siblings, 0 replies; 28+ messages in thread
From: Eric Blake @ 2015-02-03 22:17 UTC (permalink / raw)
  To: Liang Li, qemu-devel
  Cc: quintela, armbru, dgilbert, Yang Zhang, amit.shah, lcapitulino

[-- Attachment #1: Type: text/plain, Size: 623 bytes --]

On 02/02/2015 04:05 AM, Liang Li wrote:
> The multiple compression threads can be turned on/off through
> qmp and hmp interface before doing live migration.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> Reviewed-by: Dr.David Alan Gilbert <dgilbert@redhat.com>
> ---
>  migration/migration.c | 7 +++++--
>  qapi-schema.json      | 7 ++++++-
>  2 files changed, 11 insertions(+), 3 deletions(-)

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter
  2015-02-02 11:05 ` [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter Liang Li
@ 2015-02-03 23:28   ` Eric Blake
  2015-02-04  1:26     ` Li, Liang Z
  0 siblings, 1 reply; 28+ messages in thread
From: Eric Blake @ 2015-02-03 23:28 UTC (permalink / raw)
  To: Liang Li, qemu-devel
  Cc: quintela, armbru, dgilbert, Yang Zhang, amit.shah, lcapitulino

[-- Attachment #1: Type: text/plain, Size: 4451 bytes --]

On 02/02/2015 04:05 AM, Liang Li wrote:
> Add the qmp and hmp commands to tune the parameters used in live
> migration.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  hmp-commands.hx               | 15 ++++++++++
>  hmp.c                         | 35 ++++++++++++++++++++++
>  hmp.h                         |  3 ++
>  include/migration/migration.h |  4 +--
>  migration/migration.c         | 69 +++++++++++++++++++++++++++++++++++--------
>  monitor.c                     | 18 +++++++++++
>  qapi-schema.json              | 52 ++++++++++++++++++++++++++++++++
>  qmp-commands.hx               | 25 ++++++++++++++++
>  8 files changed, 206 insertions(+), 15 deletions(-)

> +++ b/migration/migration.c
> @@ -66,9 +66,12 @@ MigrationState *migrate_get_current(void)
>          .bandwidth_limit = MAX_THROTTLE,
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
> -        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> -        .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
> -        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
> +        .parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] =
> +                DEFAULT_MIGRATE_COMPRESS_LEVEL,

Looks okay.

> +        .parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
> +                DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
> +                DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,

Hmm - do we really need two parameters here?  Remember, compress threads
is used only on the source, and decompress threads is used only on the
destination.  Having a single parameter, 'threads', which is set to
compression threads on source and decompression threads on destination,
and which need not be equal between the two machines, should still work,
right?

> +++ b/qapi-schema.json
> @@ -541,6 +541,58 @@
>  ##
>  { 'command': 'query-migrate-capabilities', 'returns':   ['MigrationCapabilityStatus']}
>  
> +# @MigrationParameter
> +#
> +# Migration parameters enumeration
> +#
> +# @compress-level: Set the compression level to be used in live migration,
> +#          the compression level is an integer between 0 and 9, where 0 means
> +#          no compression, 1 means the best compression speed, and 9 means best
> +#          compression ratio which will consume more CPU.
> +#
> +# @compress-threads: Set compression thread count to be used in live migration,
> +#          the compression thread count is an integer between 1 and 255.
> +#
> +# @decompress-threads: Set decompression thread count to be used in live migration,
> +#          the decompression thread count is an integer between 1 and 255.
> +#

Again, I think you could get by with just a single parameter
'compress-threads', and maybe document that the value is typically set
higher on destination than on source.

> +# Since: 2.3
> +##
> +{ 'enum': 'MigrationParameter',
> +  'data': ['compress-level', 'compress-threads', 'decompress-threads'] }
> +##
> +# @MigrationParameterStatus
> +#
> +# Migration parameter information
> +#
> +# @parameter: the parameter of migration
> +#
> +# @data: pointer to the parameter value
> +#
> +# Since: 2.3
> +##
> +{ 'type': 'MigrationParameterBase',
> +  'data': {'parameter': 'MigrationParameter'} }
> +{ 'type': 'MigrationParameterInt',
> +  'data': {'value': 'int'} }
> +{ 'union': 'MigrationParameterStatus',

Is it worth having independent docs for each of these, rather than
cramming all three under one doc text?

> +  'base': 'MigrationParameterBase',
> +  'discriminator': 'parameter',
> +  'data': { 'compress-level': 'MigrationParameterInt',
> +            'compress-threads': 'MigrationParameterInt',
> +            'decompress-threads': 'MigrationParameterInt'} }
> +#
> +# @migrate-set-parameters
> +#
> +# Set the following migration parameters (like compress-level)
> +#
> +# @parameters: json array of parameter modifications to make
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-parameters',
> +  'data': { 'parameters': ['MigrationParameterStatus'] } }
> +##

Interface looks reasonable to me (but I'm biased, as I suggested it).

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 13/13] migration: Add command to query migration parameter
  2015-02-02 11:05 ` [Qemu-devel] [v4 13/13] migration: Add command to query " Liang Li
@ 2015-02-03 23:30   ` Eric Blake
  0 siblings, 0 replies; 28+ messages in thread
From: Eric Blake @ 2015-02-03 23:30 UTC (permalink / raw)
  To: Liang Li, qemu-devel
  Cc: quintela, armbru, dgilbert, Yang Zhang, amit.shah, lcapitulino

[-- Attachment #1: Type: text/plain, Size: 967 bytes --]

On 02/02/2015 04:05 AM, Liang Li wrote:
> Add the qmp and hmp commands to query the parameters used in live
> migration.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---

I'd squash this with 12, if it doesn't make the patch feel too large.


> +Query current migration parameters
> +
> +- "parameters": migration parameters value
> +         - "compress-level" : compression level value (json-int)
> +         - "compress-threads" : compression thread count value (json-int)
> +         - "decompress-threads" : decompression thread count value (json-int)
> +
> +Arguments:
> +
> +Example:
> +
> +-> { "execute": "query-migrate-parameters" }
> +<- { "return": [ { "value": 1, "parameter": "compress-level" } ] }

Should the example also include 'compress-threads'?


-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter
  2015-02-03 23:28   ` Eric Blake
@ 2015-02-04  1:26     ` Li, Liang Z
  2015-02-04  2:27       ` Eric Blake
  0 siblings, 1 reply; 28+ messages in thread
From: Li, Liang Z @ 2015-02-04  1:26 UTC (permalink / raw)
  To: Eric Blake, qemu-devel@nongnu.org
  Cc: quintela@redhat.com, armbru@redhat.com, dgilbert@redhat.com,
	Zhang, Yang Z, amit.shah@redhat.com, lcapitulino@redhat.com


> > +++ b/migration/migration.c
> > @@ -66,9 +66,12 @@ MigrationState *migrate_get_current(void)
> >          .bandwidth_limit = MAX_THROTTLE,
> >          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
> >          .mbps = -1,
> > -        .compress_thread_count =
> DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> > -        .decompress_thread_count =
> DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
> > -        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
> > +        .parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] =
> > +                DEFAULT_MIGRATE_COMPRESS_LEVEL,
> 
> Looks okay.
> 
> > +        .parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
> > +                DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> > +        .parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
> > +                DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
> 
> Hmm - do we really need two parameters here?  Remember, compress
> threads is used only on the source, and decompress threads is used only on
> the destination.  Having a single parameter, 'threads', which is set to
> compression threads on source and decompression threads on destination,
> and which need not be equal between the two machines, should still work,
> right?
>

Yes, it works. The benefit of using one parameter instead of two can reduce the QMP 
command count, and the side effect of using the same thread count for compression
 and decompression is a little waste if the user just want to use the default settings,
you know, decompression is usually  about 4 times faster than compression.  Use more
decompression threads than needed will waste some RAM which used to save data 
structure related to the decompression thread, about 4K bytes RAM per thread, is it 
acceptable?

Liang

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter
  2015-02-04  1:26     ` Li, Liang Z
@ 2015-02-04  2:27       ` Eric Blake
  0 siblings, 0 replies; 28+ messages in thread
From: Eric Blake @ 2015-02-04  2:27 UTC (permalink / raw)
  To: Li, Liang Z, qemu-devel@nongnu.org
  Cc: quintela@redhat.com, armbru@redhat.com, dgilbert@redhat.com,
	Zhang, Yang Z, amit.shah@redhat.com, lcapitulino@redhat.com

[-- Attachment #1: Type: text/plain, Size: 1295 bytes --]

On 02/03/2015 06:26 PM, Li, Liang Z wrote:

>> Hmm - do we really need two parameters here?  Remember, compress
>> threads is used only on the source, and decompress threads is used only on
>> the destination.  Having a single parameter, 'threads', which is set to
>> compression threads on source and decompression threads on destination,
>> and which need not be equal between the two machines, should still work,
>> right?
>>
> 
> Yes, it works. The benefit of using one parameter instead of two can reduce the QMP 
> command count, and the side effect of using the same thread count for compression
>  and decompression is a little waste if the user just want to use the default settings,
> you know, decompression is usually  about 4 times faster than compression.  Use more
> decompression threads than needed will waste some RAM which used to save data 
> structure related to the decompression thread, about 4K bytes RAM per thread, is it 
> acceptable?

The default setting is no compression.  The user already has to
configure things on both sides to get compression, so it is not a burden
to ask them to configure thread count on both sides correctly.

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 02/13] migration: Add the framework of multi-thread compression
  2015-02-02 11:05 ` [Qemu-devel] [v4 02/13] migration: Add the framework of multi-thread compression Liang Li
@ 2015-02-06 10:11   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 10:11 UTC (permalink / raw)
  To: Liang Li; +Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah

* Liang Li (liang.z.li@intel.com) wrote:
> Add the code to create and destroy the multiple threads those will
> be used to do data compression. Left some functions empty to keep
> clearness, and the code will be added later.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  arch_init.c                   | 79 ++++++++++++++++++++++++++++++++++++++++++-
>  include/migration/migration.h |  9 +++++
>  migration/migration.c         | 37 ++++++++++++++++++++
>  3 files changed, 124 insertions(+), 1 deletion(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 89c8fa4..1831f1a 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -332,6 +332,68 @@ static uint64_t migration_dirty_pages;
>  static uint32_t last_version;
>  static bool ram_bulk_stage;
>  
> +struct CompressParam {
> +    /* To be done */
> +};
> +typedef struct CompressParam CompressParam;
> +
> +static CompressParam *comp_param;
> +static bool quit_thread;
> +
> +static void *do_data_compress(void *opaque)
> +{
> +    while (!quit_thread) {
> +
> +    /* To be done */
> +
> +    }
> +
> +    return NULL;
> +}
> +
> +static inline void terminate_compression_threads(void)
> +{
> +    quit_thread = true;
> +
> +    /* To be done */
> +}
> +
> +void migrate_compress_threads_join(MigrationState *s)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    terminate_compression_threads();
> +    thread_count = migrate_compress_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_thread_join(s->compress_thread + i);
> +    }
> +    g_free(s->compress_thread);
> +    g_free(comp_param);
> +    s->compress_thread = NULL;
> +    comp_param = NULL;
> +}
> +
> +void migrate_compress_threads_create(MigrationState *s)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    quit_thread = false;
> +    thread_count = migrate_compress_threads();
> +    s->compress_thread = g_new0(QemuThread, thread_count);
> +    comp_param = g_new0(CompressParam, thread_count);
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_thread_create(s->compress_thread + i, "compress",
> +                           do_data_compress, comp_param + i,
> +                           QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
>  /* Update the xbzrle cache to reflect a page that's been sent as all 0.
>   * The important thing is that a stale (not-yet-0'd) page be replaced
>   * by the new data.
> @@ -645,6 +707,16 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
> +                                    ram_addr_t offset, bool last_stage)
> +{
> +    int bytes_sent = 0;
> +
> +    /* To be done*/
> +
> +    return bytes_sent;
> +}
> +
>  /*
>   * ram_find_and_save_block: Finds a page to send and sends it to f
>   *
> @@ -679,7 +751,12 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>                  ram_bulk_stage = false;
>              }
>          } else {
> -            bytes_sent = ram_save_page(f, block, offset, last_stage);
> +            if (migrate_use_compression()) {
> +                bytes_sent = ram_save_compressed_page(f, block, offset,
> +                                                      last_stage);
> +            } else {
> +                bytes_sent = ram_save_page(f, block, offset, last_stage);
> +            }
>  
>              /* if page is unmodified, continue to the next */
>              if (bytes_sent > 0) {
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3cb5ba8..daf6c81 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -49,6 +49,9 @@ struct MigrationState
>      QemuThread thread;
>      QEMUBH *cleanup_bh;
>      QEMUFile *file;
> +    QemuThread *compress_thread;
> +    int compress_thread_count;
> +    int compress_level;
>  
>      int state;
>      MigrationParams params;
> @@ -107,6 +110,8 @@ bool migration_has_finished(MigrationState *);
>  bool migration_has_failed(MigrationState *);
>  MigrationState *migrate_get_current(void);
>  
> +void migrate_compress_threads_create(MigrationState *s);
> +void migrate_compress_threads_join(MigrationState *s);
>  uint64_t ram_bytes_remaining(void);
>  uint64_t ram_bytes_transferred(void);
>  uint64_t ram_bytes_total(void);
> @@ -156,6 +161,10 @@ int64_t migrate_xbzrle_cache_size(void);
>  
>  int64_t xbzrle_cache_resize(int64_t new_size);
>  
> +bool migrate_use_compression(void);
> +int migrate_compress_level(void);
> +int migrate_compress_threads(void);
> +
>  void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
>  void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
>  void ram_control_load_hook(QEMUFile *f, uint64_t flags);
> diff --git a/migration/migration.c b/migration/migration.c
> index b3adbc6..309443e 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -43,6 +43,11 @@ enum {
>  #define BUFFER_DELAY     100
>  #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY)
>  
> +/* Default compression thread count */
> +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
> +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
> +
>  /* Migration XBZRLE default cache size */
>  #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
>  
> @@ -60,6 +65,8 @@ MigrationState *migrate_get_current(void)
>          .bandwidth_limit = MAX_THROTTLE,
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
> +        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
>      };
>  
>      return &current_migration;
> @@ -302,6 +309,7 @@ static void migrate_fd_cleanup(void *opaque)
>          qemu_thread_join(&s->thread);
>          qemu_mutex_lock_iothread();
>  
> +        migrate_compress_threads_join(s);
>          qemu_fclose(s->file);
>          s->file = NULL;
>      }
> @@ -385,6 +393,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>      int64_t bandwidth_limit = s->bandwidth_limit;
>      bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
>      int64_t xbzrle_cache_size = s->xbzrle_cache_size;
> +    int compress_level = s->compress_level;
> +    int compress_thread_count = s->compress_thread_count;
>  
>      memcpy(enabled_capabilities, s->enabled_capabilities,
>             sizeof(enabled_capabilities));
> @@ -395,6 +405,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>             sizeof(enabled_capabilities));
>      s->xbzrle_cache_size = xbzrle_cache_size;
>  
> +    s->compress_level = compress_level;
> +    s->compress_thread_count = compress_thread_count;
>      s->bandwidth_limit = bandwidth_limit;
>      s->state = MIG_STATE_SETUP;
>      trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -567,6 +579,30 @@ bool migrate_zero_blocks(void)
>      return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
>  }
>  
> +bool migrate_use_compression(void)
> +{
> +    /* Disable compression before the patch series are applied */
> +    return false;
> +}
> +
> +int migrate_compress_level(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_level;
> +}
> +
> +int migrate_compress_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_thread_count;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> @@ -707,6 +743,7 @@ void migrate_fd_connect(MigrationState *s)
>      /* Notify before starting migration thread */
>      notifier_list_notify(&migration_state_notifiers, s);
>  
> +    migrate_compress_threads_create(s);
>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>                         QEMU_THREAD_JOINABLE);
>  }
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 03/13] migration: Add the framework of multi-thread decompression
  2015-02-02 11:05 ` [Qemu-devel] [v4 03/13] migration: Add the framework of multi-thread decompression Liang Li
@ 2015-02-06 10:16   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 10:16 UTC (permalink / raw)
  To: Liang Li; +Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah

* Liang Li (liang.z.li@intel.com) wrote:
> Add the code to create and destroy the multiple threads those will be
> used to do data decompression. Left some functions empty just to keep
> clearness, and the code will be added later.

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c                   | 75 +++++++++++++++++++++++++++++++++++++++++++
>  include/migration/migration.h |  4 +++
>  migration/migration.c         | 16 +++++++++
>  3 files changed, 95 insertions(+)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 1831f1a..ed34eb3 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -24,6 +24,7 @@
>  #include <stdint.h>
>  #include <stdarg.h>
>  #include <stdlib.h>
> +#include <zlib.h>
>  #ifndef _WIN32
>  #include <sys/types.h>
>  #include <sys/mman.h>
> @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_CONTINUE 0x20
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
> +#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  
>  static struct defconfig_file {
>      const char *filename;
> @@ -337,8 +339,16 @@ struct CompressParam {
>  };
>  typedef struct CompressParam CompressParam;
>  
> +struct DecompressParam {
> +    /* To be done */
> +};
> +typedef struct DecompressParam DecompressParam;
> +
>  static CompressParam *comp_param;
>  static bool quit_thread;
> +static DecompressParam *decomp_param;
> +static QemuThread *decompress_threads;
> +static uint8_t *compressed_data_buf;
>  
>  static void *do_data_compress(void *opaque)
>  {
> @@ -1128,10 +1138,58 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>      }
>  }
>  
> +static void *do_data_decompress(void *opaque)
> +{
> +    while (!quit_thread) {
> +        /* To be done */
> +    }
> +
> +    return NULL;
> +}
> +
> +void migrate_decompress_threads_create(int count)
> +{
> +    int i;
> +
> +    decompress_threads = g_new0(QemuThread, count);
> +    decomp_param = g_new0(DecompressParam, count);
> +    compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
> +    quit_thread = false;
> +    for (i = 0; i < count; i++) {
> +        qemu_thread_create(decompress_threads + i, "decompress",
> +                           do_data_decompress, decomp_param + i,
> +                           QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +void migrate_decompress_threads_join(void)
> +{
> +    int i, thread_count;
> +
> +    quit_thread = true;
> +    thread_count = migrate_decompress_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_thread_join(decompress_threads + i);
> +    }
> +    g_free(decompress_threads);
> +    g_free(decomp_param);
> +    g_free(compressed_data_buf);
> +    decompress_threads = NULL;
> +    decomp_param = NULL;
> +    compressed_data_buf = NULL;
> +}
> +
> +static void decompress_data_with_multi_threads(uint8_t *compbuf,
> +                                               void *host, int len)
> +{
> +    /* To be done */
> +}
> +
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
>      int flags = 0, ret = 0;
>      static uint64_t seq_iter;
> +    int len = 0;
>  
>      seq_iter++;
>  
> @@ -1208,6 +1266,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> +        case RAM_SAVE_FLAG_COMPRESS_PAGE:
> +            host = host_from_stream_offset(f, addr, flags);
> +            if (!host) {
> +                error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
> +                ret = -EINVAL;
> +                break;
> +            }
> +
> +            len = qemu_get_be32(f);
> +            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
> +                error_report("Invalid compressed data length: %d", len);
> +                ret = -EINVAL;
> +                break;
> +            }
> +            qemu_get_buffer(f, compressed_data_buf, len);
> +            decompress_data_with_multi_threads(compressed_data_buf, host, len);
> +            break;
>          case RAM_SAVE_FLAG_XBZRLE:
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index daf6c81..0c4f21c 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -51,6 +51,7 @@ struct MigrationState
>      QEMUFile *file;
>      QemuThread *compress_thread;
>      int compress_thread_count;
> +    int decompress_thread_count;
>      int compress_level;
>  
>      int state;
> @@ -112,6 +113,8 @@ MigrationState *migrate_get_current(void);
>  
>  void migrate_compress_threads_create(MigrationState *s);
>  void migrate_compress_threads_join(MigrationState *s);
> +void migrate_decompress_threads_create(int count);
> +void migrate_decompress_threads_join(void);
>  uint64_t ram_bytes_remaining(void);
>  uint64_t ram_bytes_transferred(void);
>  uint64_t ram_bytes_total(void);
> @@ -164,6 +167,7 @@ int64_t xbzrle_cache_resize(int64_t new_size);
>  bool migrate_use_compression(void);
>  int migrate_compress_level(void);
>  int migrate_compress_threads(void);
> +int migrate_decompress_threads(void);
>  
>  void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
>  void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
> diff --git a/migration/migration.c b/migration/migration.c
> index 309443e..a6f6e02 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -45,6 +45,7 @@ enum {
>  
>  /* Default compression thread count */
>  #define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
>  /*0: means nocompress, 1: best speed, ... 9: best compress ratio */
>  #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
>  
> @@ -66,6 +67,7 @@ MigrationState *migrate_get_current(void)
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
>          .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
>          .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
>      };
>  
> @@ -123,12 +125,15 @@ static void process_incoming_migration_co(void *opaque)
>      } else {
>          runstate_set(RUN_STATE_PAUSED);
>      }
> +    migrate_decompress_threads_join();
>  }
>  
>  void process_incoming_migration(QEMUFile *f)
>  {
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
>      int fd = qemu_get_fd(f);
> +    int thread_count = migrate_decompress_threads();
> +    migrate_decompress_threads_create(thread_count);
>  
>      assert(fd != -1);
>      qemu_set_nonblock(fd);
> @@ -395,6 +400,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
>      int64_t xbzrle_cache_size = s->xbzrle_cache_size;
>      int compress_level = s->compress_level;
>      int compress_thread_count = s->compress_thread_count;
> +    int decompress_thread_count = s->decompress_thread_count;
>  
>      memcpy(enabled_capabilities, s->enabled_capabilities,
>             sizeof(enabled_capabilities));
> @@ -407,6 +413,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
>  
>      s->compress_level = compress_level;
>      s->compress_thread_count = compress_thread_count;
> +    s->decompress_thread_count = decompress_thread_count;
>      s->bandwidth_limit = bandwidth_limit;
>      s->state = MIG_STATE_SETUP;
>      trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -603,6 +610,15 @@ int migrate_compress_threads(void)
>      return s->compress_thread_count;
>  }
>  
> +int migrate_decompress_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->decompress_thread_count;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 04/13] qemu-file: Add compression functions to QEMUFile
  2015-02-02 11:05 ` [Qemu-devel] [v4 04/13] qemu-file: Add compression functions to QEMUFile Liang Li
@ 2015-02-06 10:33   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 10:33 UTC (permalink / raw)
  To: Liang Li
  Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah,
	dgilbert

* Liang Li (liang.z.li@intel.com) wrote:
> qemu_put_compression_data() compress the data and put it to QEMUFile.
> qemu_put_qemu_file() put the data in the buffer of source QEMUFile to
> destination QEMUFile.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  include/migration/qemu-file.h |  3 +++
>  migration/qemu-file.c         | 39 +++++++++++++++++++++++++++++++++++++++
>  2 files changed, 42 insertions(+)
> 
> diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
> index d843c00..2204fb9 100644
> --- a/include/migration/qemu-file.h
> +++ b/include/migration/qemu-file.h
> @@ -159,6 +159,9 @@ void qemu_put_be32(QEMUFile *f, unsigned int v);
>  void qemu_put_be64(QEMUFile *f, uint64_t v);
>  int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset);
>  int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size);
> +size_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
> +                                 int level);
> +int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
>  /*
>   * Note that you can only peek continuous bytes from where the current pointer
>   * is; you aren't guaranteed to be able to peak to +n bytes unless you've
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index edc2830..de2da2d 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -21,6 +21,7 @@
>   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
>   * THE SOFTWARE.
>   */
> +#include <zlib.h>
>  #include "qemu-common.h"
>  #include "qemu/iov.h"
>  #include "qemu/sockets.h"
> @@ -529,3 +530,41 @@ uint64_t qemu_get_be64(QEMUFile *f)
>      v |= qemu_get_be32(f);
>      return v;
>  }
> +
> +/* compress size bytes of data start at p with specific compression
> + * leve and store the compressed data to the buffer of f.

Typo: 'leve'

> + */
> +
> +size_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
> +                                 int level)
> +{
> +    size_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
> +
> +    if (blen < compressBound(size)) {
> +        return 0;
> +    }

Imagine that there were only 3 bytes space in the buffer; so that
   IO_BUF_SIZE - f->buf_index = 3
then you subtract sizeof(int32_t) and get -1, but size_t is unsigned
so the calculation works out as a very big number, and that comparison
fails and it carries on to call compress2.

> +    if (compress2(f->buf + f->buf_index + sizeof(int32_t), &blen, (Bytef *)p,
> +                  size, level) != Z_OK) {
> +        error_report("Compress Failed!");
> +        return 0;
> +    }
> +    qemu_put_be32(f, blen);
> +    f->buf_index += blen;
> +    return blen + sizeof(int32_t);
> +}
> +
> +/* Put the data in the buffer of f_src to the buffer of f_des, and
> + * then reset the buf_index of f_src to 0.
> + */
> +
> +int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
> +{
> +    int len = 0;
> +
> +    if (f_src->buf_index > 0) {
> +        len = f_src->buf_index;
> +        qemu_put_buffer(f_des, f_src->buf, f_src->buf_index);
> +        f_src->buf_index = 0;
> +    }
> +    return len;
> +}
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 05/13] arch_init: Alloc and free data struct for compression
  2015-02-02 11:05 ` [Qemu-devel] [v4 05/13] arch_init: Alloc and free data struct for compression Liang Li
@ 2015-02-06 10:45   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 10:45 UTC (permalink / raw)
  To: Liang Li; +Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah

* Liang Li (liang.z.li@intel.com) wrote:
> Define the data structure and variables used to do multiple thread
> compression, and add the code to initialize and free them.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  arch_init.c | 34 +++++++++++++++++++++++++++++++++-
>  1 file changed, 33 insertions(+), 1 deletion(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index ed34eb3..87c4947 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -335,7 +335,12 @@ static uint32_t last_version;
>  static bool ram_bulk_stage;
>  
>  struct CompressParam {
> -    /* To be done */
> +    bool busy;
> +    QEMUFile *file;
> +    QemuMutex mutex;
> +    QemuCond cond;
> +    RAMBlock *block;
> +    ram_addr_t offset;
>  };
>  typedef struct CompressParam CompressParam;
>  
> @@ -345,6 +350,14 @@ struct DecompressParam {
>  typedef struct DecompressParam DecompressParam;
>  
>  static CompressParam *comp_param;
> +/* comp_done_cond is used to wake up the migration thread when
> + * one of the compression threads has finished the compression.
> + * comp_done_lock is used to co-work with comp_done_cond.
> + */
> +static QemuMutex *comp_done_lock;
> +static QemuCond *comp_done_cond;
> +/* The empty QEMUFileOps will be used by file in CompressParam */
> +static const QEMUFileOps empty_ops = { };
>  static bool quit_thread;
>  static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
> @@ -379,11 +392,20 @@ void migrate_compress_threads_join(MigrationState *s)
>      thread_count = migrate_compress_threads();
>      for (i = 0; i < thread_count; i++) {
>          qemu_thread_join(s->compress_thread + i);
> +        qemu_fclose(comp_param[i].file);
> +        qemu_mutex_destroy(&comp_param[i].mutex);
> +        qemu_cond_destroy(&comp_param[i].cond);
>      }
> +    qemu_mutex_destroy(comp_done_lock);
> +    qemu_cond_destroy(comp_done_cond);
>      g_free(s->compress_thread);
>      g_free(comp_param);
> +    g_free(comp_done_cond);
> +    g_free(comp_done_lock);
>      s->compress_thread = NULL;
>      comp_param = NULL;
> +    comp_done_cond = NULL;
> +    comp_done_lock = NULL;
>  }
>  
>  void migrate_compress_threads_create(MigrationState *s)
> @@ -397,7 +419,17 @@ void migrate_compress_threads_create(MigrationState *s)
>      thread_count = migrate_compress_threads();
>      s->compress_thread = g_new0(QemuThread, thread_count);
>      comp_param = g_new0(CompressParam, thread_count);
> +    comp_done_cond = g_new0(QemuCond, 1);
> +    comp_done_lock = g_new0(QemuMutex, 1);
> +    qemu_cond_init(comp_done_cond);
> +    qemu_mutex_init(comp_done_lock);
>      for (i = 0; i < thread_count; i++) {
> +        /* com_param[i].file is just used as a dummy buffer to save data, set
> +         * it's ops to empty.
> +         */
> +        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> +        qemu_mutex_init(&comp_param[i].mutex);
> +        qemu_cond_init(&comp_param[i].cond);
>          qemu_thread_create(s->compress_thread + i, "compress",
>                             do_data_compress, comp_param + i,
>                             QEMU_THREAD_JOINABLE);
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 06/13] arch_init: Add and free data struct for decompression
  2015-02-02 11:05 ` [Qemu-devel] [v4 06/13] arch_init: Add and free data struct for decompression Liang Li
@ 2015-02-06 10:46   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 10:46 UTC (permalink / raw)
  To: Liang Li; +Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah

* Liang Li (liang.z.li@intel.com) wrote:
> Define the data structure and variables used to do multiple thread
> decompression, and add the code to initialize and free them.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  arch_init.c | 13 ++++++++++++-
>  1 file changed, 12 insertions(+), 1 deletion(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 87c4947..500f299 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -345,7 +345,12 @@ struct CompressParam {
>  typedef struct CompressParam CompressParam;
>  
>  struct DecompressParam {
> -    /* To be done */
> +    bool busy;
> +    QemuMutex mutex;
> +    QemuCond cond;
> +    void *des;
> +    uint8 *compbuf;
> +    int len;
>  };
>  typedef struct DecompressParam DecompressParam;
>  
> @@ -1188,6 +1193,9 @@ void migrate_decompress_threads_create(int count)
>      compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
>      quit_thread = false;
>      for (i = 0; i < count; i++) {
> +        qemu_mutex_init(&decomp_param[i].mutex);
> +        qemu_cond_init(&decomp_param[i].cond);
> +        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
>          qemu_thread_create(decompress_threads + i, "decompress",
>                             do_data_decompress, decomp_param + i,
>                             QEMU_THREAD_JOINABLE);
> @@ -1202,6 +1210,9 @@ void migrate_decompress_threads_join(void)
>      thread_count = migrate_decompress_threads();
>      for (i = 0; i < thread_count; i++) {
>          qemu_thread_join(decompress_threads + i);
> +        qemu_mutex_destroy(&decomp_param[i].mutex);
> +        qemu_cond_destroy(&decomp_param[i].cond);
> +        g_free(decomp_param[i].compbuf);
>      }
>      g_free(decompress_threads);
>      g_free(decomp_param);
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 07/13] migration: Split the function ram_save_page
  2015-02-02 11:05 ` [Qemu-devel] [v4 07/13] migration: Split the function ram_save_page Liang Li
@ 2015-02-06 11:01   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 11:01 UTC (permalink / raw)
  To: Liang Li; +Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah

* Liang Li (liang.z.li@intel.com) wrote:
> Split the function ram_save_page for code reuse purpose.

That's better, but I still think there is an XBZRLE problem; see below.

> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c | 102 +++++++++++++++++++++++++++++++++---------------------------
>  1 file changed, 56 insertions(+), 46 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 500f299..eae082b 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -595,6 +595,58 @@ static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length)
>      }
>  }
>  
> +static int save_zero_and_xbzrle_page(QEMUFile *f, uint8_t **current_data,
> +                                     RAMBlock *block, ram_addr_t offset,
> +                                     bool last_stage, bool *send_async)
> +{
> +    int bytes_sent = -1;
> +    int cont, ret;
> +    ram_addr_t current_addr;
> +
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +
> +    /* In doubt sent page as normal */
> +    ret = ram_control_save_page(f, block->offset,
> +                                offset, TARGET_PAGE_SIZE, &bytes_sent);
> +
> +    XBZRLE_cache_lock();
> +
> +    current_addr = block->offset + offset;
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                acct_info.norm_pages++;
> +            } else if (bytes_sent == 0) {
> +                acct_info.dup_pages++;
> +            }
> +        }
> +    } else if (is_zero_range(*current_data, TARGET_PAGE_SIZE)) {
> +        acct_info.dup_pages++;
> +        bytes_sent = save_block_hdr(f, block, offset, cont,
> +                                    RAM_SAVE_FLAG_COMPRESS);
> +        qemu_put_byte(f, 0);
> +        bytes_sent++;
> +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> +         * page would be stale
> +         */
> +        xbzrle_cache_zero_page(current_addr);
> +    } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
> +        bytes_sent = save_xbzrle_page(f, current_data, current_addr, block,
> +                                      offset, cont, last_stage);
> +        if (!last_stage) {
> +            /* Can't send this cached data async, since the cache page
> +             * might get updated before it gets to the wire
> +             */
> +            if (send_async != NULL) {
> +                *send_async = false;
> +            }
> +        }
> +    }
> +
> +    XBZRLE_cache_unlock();

I think this is too soon; when save_xbzrle_page updates current_data to point
to a page from the cache, the cache data is still in use by this point, so
we must be careful that the cache couldn't get resized until after the qemu_put_buffer
below.  Thus this lock must be held until after that.

Dave

> +    return bytes_sent;
> +}
>  
>  /* Needs iothread lock! */
>  /* Fix me: there are too many global variables used in migration process. */
> @@ -685,60 +737,20 @@ static void migration_bitmap_sync(void)
>   *
>   * Returns: Number of bytes written.
>   */
> -static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
> +static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
>                           bool last_stage)
>  {
>      int bytes_sent;
>      int cont;
> -    ram_addr_t current_addr;
>      MemoryRegion *mr = block->mr;
>      uint8_t *p;
> -    int ret;
>      bool send_async = true;
>  
> -    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> -
>      p = memory_region_get_ram_ptr(mr) + offset;
> -
> -    /* In doubt sent page as normal */
> -    bytes_sent = -1;
> -    ret = ram_control_save_page(f, block->offset,
> -                           offset, TARGET_PAGE_SIZE, &bytes_sent);
> -
> -    XBZRLE_cache_lock();
> -
> -    current_addr = block->offset + offset;
> -    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> -        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> -            if (bytes_sent > 0) {
> -                acct_info.norm_pages++;
> -            } else if (bytes_sent == 0) {
> -                acct_info.dup_pages++;
> -            }
> -        }
> -    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> -        acct_info.dup_pages++;
> -        bytes_sent = save_block_hdr(f, block, offset, cont,
> -                                    RAM_SAVE_FLAG_COMPRESS);
> -        qemu_put_byte(f, 0);
> -        bytes_sent++;
> -        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> -         * page would be stale
> -         */
> -        xbzrle_cache_zero_page(current_addr);
> -    } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
> -        bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
> -                                      offset, cont, last_stage);
> -        if (!last_stage) {
> -            /* Can't send this cached data async, since the cache page
> -             * might get updated before it gets to the wire
> -             */
> -            send_async = false;
> -        }
> -    }
> -
> -    /* XBZRLE overflow or normal page */
> +    bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> +                                           last_stage, &send_async);
>      if (bytes_sent == -1) {
> +        cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
>          bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_PAGE);
>          if (send_async) {
>              qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
> @@ -749,8 +761,6 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>          acct_info.norm_pages++;
>      }
>  
> -    XBZRLE_cache_unlock();
> -
>      return bytes_sent;
>  }
>  
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression
  2015-02-02 11:05 ` [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression Liang Li
@ 2015-02-06 12:12   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 12:12 UTC (permalink / raw)
  To: Liang Li; +Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah

* Liang Li (liang.z.li@intel.com) wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 159 insertions(+), 8 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index eae082b..b8bdb16 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -364,16 +364,31 @@ static QemuCond *comp_done_cond;
>  /* The empty QEMUFileOps will be used by file in CompressParam */
>  static const QEMUFileOps empty_ops = { };
>  static bool quit_thread;
> +static int one_byte_count;

Please add a comment here about what one_byte_count is; it's
not obvious, but I can't think of a better name

>  static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
>  static uint8_t *compressed_data_buf;
>  
> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_thread) {
> -
> -    /* To be done */
> +    CompressParam *param = opaque;
>  
> +    while (!quit_thread) {

This is something I missed on 02/   - can you rename 'quit_thread'
to comp_quit_thread or something, so it's obvious which thread.

> +        qemu_mutex_lock(&param->mutex);
> +        while (!param->busy) {
> +            qemu_cond_wait(&param->cond, &param->mutex);
> +            if (quit_thread) {
> +                break;
> +            }
> +        }
> +        qemu_mutex_unlock(&param->mutex);
> +        do_compress_ram_page(param);
> +        qemu_mutex_lock(comp_done_lock);
> +        param->busy = false;
> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);

This is interestingly different from your previous version; param->mutex
used to be held all of the time except during the cond_wait itself.

I'm also worried about the quit_thread behaviour;  is there
any guarantee that 'terminate_compression_threads' is called
while this code is in the param->cond cond_wait?   If terminate_compression_threads
was called while the thread was busy, then the cond_signal on param->cond
would be too early.  I'm thinking perhaps you need to check quit_thread before
the cond_wait as well?  (It's mostly error cases and migrate_cancel I'm worried
about here).

>      }
>  
>      return NULL;
> @@ -381,9 +396,13 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_thread = true;
> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_cond_signal(&comp_param[idx].cond);
> +    }
>  }
>  
>  void migrate_compress_threads_join(MigrationState *s)
> @@ -764,12 +783,144 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, cont;
> +    int blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +    p = memory_region_get_ram_ptr(block->mr) + offset;
> +
> +    bytes_sent = save_block_hdr(param->file, block, offset, cont,
> +                                RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    qemu_mutex_lock(&param->mutex);
> +    param->busy = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (comp_param[idx].busy) {
> +            qemu_mutex_lock(comp_done_lock);
> +            while (comp_param[idx].busy) {
> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }
> +        len = qemu_put_qemu_file(f, comp_param[idx].file);
> +        bytes_transferred += len;
> +    }
> +    if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
> +        bytes_transferred -= one_byte_count;
> +        one_byte_count = 0;
> +    }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset)
> +{
> +    int idx, thread_count, bytes_sent = 0;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (!comp_param[idx].busy) {
> +                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);
> +                if (bytes_sent == 0) {
> +                    /* set bytes_sent to 1 in this case to prevent migration
> +                     * from terminating, this 1 byte whill be added to
> +                     * bytes_transferred later, minus 1 to keep the
> +                     * bytes_transferred accurate */
> +                    bytes_sent = 1;
> +                    if (bytes_transferred <= 0) {
> +                        one_byte_count++;
> +                    } else {
> +                        bytes_transferred -= 1;
> +                    }
> +                }
> +                break;
> +            }
> +        }
> +        if (bytes_sent > 0) {
> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return bytes_sent;
> +}
> +
>  static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
>                                      ram_addr_t offset, bool last_stage)
>  {
>      int bytes_sent = 0;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
>  
> -    /* To be done*/
> +    p = memory_region_get_ram_ptr(mr) + offset;
> +    /* When starting the process of a new block, the first page of
> +     * the block should be sent out before other pages in the same
> +     * block, and all the pages in last block should have been sent
> +     * out, keeping this order is important, because the 'cont' flag
> +     * is used to avoid resending the block name.
> +     */
> +    if (block != last_sent_block) {
> +        flush_compressed_data(f);
> +        bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> +                                               last_stage, NULL);
> +        if (bytes_sent == -1) {
> +            set_compress_params(&comp_param[0], block, offset);
> +            /* Use the qemu thread to compress the data to make sure the
> +             * first page is sent out before other pages
> +             */
> +            bytes_sent = do_compress_ram_page(&comp_param[0]);
> +            if (bytes_sent > 0) {
> +                qemu_put_qemu_file(f, comp_param[0].file);
> +            }
> +        }
> +    } else {
> +        bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> +                                               last_stage, NULL);
> +        if (bytes_sent == -1) {
> +            bytes_sent = compress_page_with_multi_thread(f, block, offset);
> +        }
> +    }
>  
>      return bytes_sent;
>  }
> @@ -828,8 +979,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1037,6 +1186,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -1083,6 +1233,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
>  
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 09/13] migration: Make compression co-work with xbzrle
  2015-02-02 11:05 ` [Qemu-devel] [v4 09/13] migration: Make compression co-work with xbzrle Liang Li
@ 2015-02-06 12:15   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 12:15 UTC (permalink / raw)
  To: Liang Li; +Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah

* Liang Li (liang.z.li@intel.com) wrote:
> Now, multiple thread compression can co-work with xbzrle. when
> xbzrle is on, multiple thread compression will only work at the
> first round of RAM data sync.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c | 11 ++++++++++-
>  1 file changed, 10 insertions(+), 1 deletion(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index b8bdb16..8ef0315 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -364,6 +364,7 @@ static QemuCond *comp_done_cond;
>  /* The empty QEMUFileOps will be used by file in CompressParam */
>  static const QEMUFileOps empty_ops = { };
>  static bool quit_thread;
> +static bool compression_switch_on;


Yes, OK; a bit of an odd name, but I can see what it's trying to do.

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

>  static int one_byte_count;
>  static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
> @@ -440,6 +441,7 @@ void migrate_compress_threads_create(MigrationState *s)
>          return;
>      }
>      quit_thread = false;
> +    compression_switch_on = true;
>      thread_count = migrate_compress_threads();
>      s->compress_thread = g_new0(QemuThread, thread_count);
>      comp_param = g_new0(CompressParam, thread_count);
> @@ -957,9 +959,16 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>                  block = QTAILQ_FIRST(&ram_list.blocks);
>                  complete_round = true;
>                  ram_bulk_stage = false;
> +                if (migrate_use_xbzrle()) {
> +                    /* If xbzrle is on, stop using the data compression at this
> +                     * point. In theory, xbzrle can do better than compression.
> +                     */
> +                    flush_compressed_data(f);
> +                    compression_switch_on = false;
> +                }
>              }
>          } else {
> -            if (migrate_use_compression()) {
> +            if (compression_switch_on && migrate_use_compression()) {
>                  bytes_sent = ram_save_compressed_page(f, block, offset,
>                                                        last_stage);
>              } else {
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [v4 10/13] migration: Add the core code for decompression
  2015-02-02 11:05 ` [Qemu-devel] [v4 10/13] migration: Add the core code for decompression Liang Li
@ 2015-02-06 12:27   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2015-02-06 12:27 UTC (permalink / raw)
  To: Liang Li; +Cc: quintela, armbru, qemu-devel, lcapitulino, Yang Zhang, amit.shah

* Liang Li (liang.z.li@intel.com) wrote:
> Implement the core logic of multiple thread decompression,
> the decompression can work now.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 47 insertions(+), 2 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 8ef0315..549fdbb 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -814,6 +814,13 @@ static inline void start_compression(CompressParam *param)
>      qemu_mutex_unlock(&param->mutex);
>  }
>  
> +static inline void start_decompression(DecompressParam *param)
> +{
> +    qemu_mutex_lock(&param->mutex);
> +    param->busy = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
>  
>  static uint64_t bytes_transferred;
>  
> @@ -1347,8 +1354,27 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>  
>  static void *do_data_decompress(void *opaque)
>  {
> +    DecompressParam *param = opaque;
> +    size_t pagesize;
> +
>      while (!quit_thread) {
> -        /* To be done */
> +        qemu_mutex_lock(&param->mutex);
> +        while (!param->busy) {
> +            qemu_cond_wait(&param->cond, &param->mutex);
> +            if (quit_thread) {
> +                break;
> +            }
> +            pagesize = TARGET_PAGE_SIZE;
> +            /* uncompress() will return failed in some case, especially
> +             * when the page is dirted when doing the compression, it's
> +             * not a problem because the dirty page will be retransferred
> +             * and uncompress() won't break the data in other pages.
> +             */
> +            uncompress((Bytef *)param->des, &pagesize,
> +                       (const Bytef *)param->compbuf, param->len);
> +            param->busy = false;
> +        }
> +        qemu_mutex_unlock(&param->mutex);
>      }

Again, similar question to the compress thread; what ensures that
teh cond_signal for quit_thread happens while this loop is at
the cond_wait? Again mainly in error cases.

Dave

>  
>      return NULL;
> @@ -1379,6 +1405,9 @@ void migrate_decompress_threads_join(void)
>      quit_thread = true;
>      thread_count = migrate_decompress_threads();
>      for (i = 0; i < thread_count; i++) {
> +        qemu_cond_signal(&decomp_param[i].cond);
> +    }
> +    for (i = 0; i < thread_count; i++) {
>          qemu_thread_join(decompress_threads + i);
>          qemu_mutex_destroy(&decomp_param[i].mutex);
>          qemu_cond_destroy(&decomp_param[i].cond);
> @@ -1395,7 +1424,23 @@ void migrate_decompress_threads_join(void)
>  static void decompress_data_with_multi_threads(uint8_t *compbuf,
>                                                 void *host, int len)
>  {
> -    /* To be done */
> +    int idx, thread_count;
> +
> +    thread_count = migrate_decompress_threads();
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (!decomp_param[idx].busy) {
> +                memcpy(decomp_param[idx].compbuf, compbuf, len);
> +                decomp_param[idx].des = host;
> +                decomp_param[idx].len = len;
> +                start_decompression(&decomp_param[idx]);
> +                break;
> +            }
> +        }
> +        if (idx < thread_count) {
> +            break;
> +        }
> +    }
>  }
>  
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 28+ messages in thread

end of thread, other threads:[~2015-02-06 12:27 UTC | newest]

Thread overview: 28+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
2015-02-02 11:05 ` [Qemu-devel] [v4 01/13] docs: Add a doc about multiple thread compression Liang Li
2015-02-02 11:05 ` [Qemu-devel] [v4 02/13] migration: Add the framework of multi-thread compression Liang Li
2015-02-06 10:11   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 03/13] migration: Add the framework of multi-thread decompression Liang Li
2015-02-06 10:16   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 04/13] qemu-file: Add compression functions to QEMUFile Liang Li
2015-02-06 10:33   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 05/13] arch_init: Alloc and free data struct for compression Liang Li
2015-02-06 10:45   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 06/13] arch_init: Add and free data struct for decompression Liang Li
2015-02-06 10:46   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 07/13] migration: Split the function ram_save_page Liang Li
2015-02-06 11:01   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression Liang Li
2015-02-06 12:12   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 09/13] migration: Make compression co-work with xbzrle Liang Li
2015-02-06 12:15   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 10/13] migration: Add the core code for decompression Liang Li
2015-02-06 12:27   ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 11/13] migration: Add interface to control compression Liang Li
2015-02-03 22:17   ` Eric Blake
2015-02-02 11:05 ` [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter Liang Li
2015-02-03 23:28   ` Eric Blake
2015-02-04  1:26     ` Li, Liang Z
2015-02-04  2:27       ` Eric Blake
2015-02-02 11:05 ` [Qemu-devel] [v4 13/13] migration: Add command to query " Liang Li
2015-02-03 23:30   ` Eric Blake

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).