qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v2 0/2] migration: Add a new feature to do live migration
@ 2014-11-06 11:08 Li Liang
  2014-11-06 11:08 ` [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads Li Liang
  2014-11-06 11:08 ` [Qemu-devel] [v2 2/2] migration: Implement " Li Liang
  0 siblings, 2 replies; 21+ messages in thread
From: Li Liang @ 2014-11-06 11:08 UTC (permalink / raw)
  To: qemu-devel; +Cc: yang.z.zhang, Li Liang, armbru, lcapitulino

This feature can help to reduce the data transferred about 60%, and the
migration time can also be reduced about 70%.

    Summary of changed from v1->v2
    
    -Changed the decompression thread limit from 64 to 255
    -Fixed some spelling mistake
    -Added test result to the document
    -Fixed the version mistake in qapi-schema.json
    -Added the document of the 'compress' flag
    -Rebased the series to proposed the document first
    -Fixed comment
    
-- 
1.9.1

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads
  2014-11-06 11:08 [Qemu-devel] [PATCH v2 0/2] migration: Add a new feature to do live migration Li Liang
@ 2014-11-06 11:08 ` Li Liang
  2014-11-06 11:25   ` Eric Blake
  2014-11-06 13:24   ` Dr. David Alan Gilbert
  2014-11-06 11:08 ` [Qemu-devel] [v2 2/2] migration: Implement " Li Liang
  1 sibling, 2 replies; 21+ messages in thread
From: Li Liang @ 2014-11-06 11:08 UTC (permalink / raw)
  To: qemu-devel; +Cc: yang.z.zhang, Li Liang, armbru, lcapitulino

Give some details about the multiple compression threads and how
to use it in live migration.

Signed-off-by: Li Liang <liang.z.li@intel.com>
---
 docs/multiple-compression-threads.txt | 128 ++++++++++++++++++++++++++++++++++
 1 file changed, 128 insertions(+)
 create mode 100644 docs/multiple-compression-threads.txt

diff --git a/docs/multiple-compression-threads.txt b/docs/multiple-compression-threads.txt
new file mode 100644
index 0000000..a5e53de
--- /dev/null
+++ b/docs/multiple-compression-threads.txt
@@ -0,0 +1,128 @@
+Use multiple (de)compression threads in live migration
+=================================================================
+Copyright (C) 2014 Li Liang <liang.z.li@intel.com>
+
+
+Contents:
+=========
+* Introduction
+* When to use
+* Performance
+* Usage
+* TODO
+
+Introduction
+============
+Instead of sending the guest memory directly, this solution will
+compress the ram page before sending, after receiving, the data will
+be decompressed. Using compression in live migration can help
+to reduce the data transferred about 60%, this is very useful when the
+bandwidth is limited, and the migration time can also be reduced about
+70% in a typical case.
+
+The process of compression will consume additional CPU cycles, and the
+extra CPU cycles will increase the migration time. On the other hand,
+the amount of data transferred will reduced, this factor can reduce
+the migration time. If the process of the compression is quick
+enough, then the total migration time can be reduced, and multiple
+compression threads can be used to accelerate the compression process.
+
+The decompression speed of zlib is at least 4 times as quickly as
+compression, if the source and destination CPU have equal speed,
+keeping the compression thread count 4 times the decompression
+thread count can avoid CPU waste.
+
+Compression level can be used to control the compression speed and the
+compression ratio. High compression ratio will take more time, level 0
+stands for no compression, level 1 stands for the best compression
+speed, and level 9 stands for the best compression ratio. Users can
+select a level number between 0 and 9.
+
+
+When to use the multiple compression threads in live migration
+==============================================================
+Compression of data will consume lot of extra CPU cycles, in a system
+with high overhead of CPU, avoid using this feature. When the network
+bandwidth is very limited and the CPU resource is adequate, use the
+multiple compression threads will be very helpful. If both the CPU and
+the network bandwidth are adequate, use multiple compression threads
+can still help to reduce the migration time.
+
+Performance
+===========
+Test environment:
+
+CPU: Intel(R) Xeon(R) CPU E5-2680 0 @ 2.70GHz
+Socket Count: 2
+Ram: 128G
+NIC: Intel I350 (10/100/1000Mbps)
+Host OS: CentOS 7 64-bit
+Guest OS: Ubuntu 12.10 64-bit
+Parameter: qemu-system-x86_64 -enable-kvm -m 1024
+ /share/ia32e_ubuntu12.10.img -monitor stdio
+
+There is no additional application is running on the guest when doing
+the test.
+
+
+Speed limit: 32MB/s
+---------------------------------------------------------------
+                    | original  | compress thread: 8
+                    |   way     | decompress thread: 2
+                    |           | compression level: 1
+---------------------------------------------------------------
+total time(msec):   |  26561    |  7920
+---------------------------------------------------------------
+transferred ram(kB):|  877054   | 260641
+---------------------------------------------------------------
+throughput(mbps):   |  270.53   | 269.68
+---------------------------------------------------------------
+total ram(kB):      |  1057604  | 1057604
+---------------------------------------------------------------
+
+
+Speed limit: No
+---------------------------------------------------------------
+                    | original  | compress thread: 15
+                    |   way     | decompress thread: 4
+                    |           | compression level: 1
+---------------------------------------------------------------
+total time(msec):   |  7611     |  2888
+---------------------------------------------------------------
+transferred ram(kB):|  876761   | 262301
+---------------------------------------------------------------
+throughput(mbps):   |  943.78   | 744.27
+---------------------------------------------------------------
+total ram(kB):      |  1057604  | 1057604
+---------------------------------------------------------------
+
+Usage
+======
+1. Verify the destination QEMU version is able to support the multiple
+compression threads migration:
+    {qemu} info_migrate_capablilites
+    {qemu} ... compress: off ...
+
+2. Activate compression on the souce:
+    {qemu} migrate_set_capability compress on
+
+3. Set the compression thread count on source:
+    {qemu} migrate_set_compress_threads 10
+
+4. Set the compression level on the source:
+    {qemu} migrate_set_compress_level 1
+
+5. Set the decompression thread count on destination:
+    {qemu} migrate_set_decompress_threads 5
+
+6. Start outgoing migration:
+    {qemu} migrate -d tcp:destination.host:4444
+    {qemu} info migrate
+    Capabilities: ... compress: on
+    ...
+
+TODO
+====
+Some faster compression/decompression method such as lz4 and quicklz
+can help to reduce the CPU consumption when doing (de)compression.
+Less (de)compression threads are needed when doing the migration.
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-06 11:08 [Qemu-devel] [PATCH v2 0/2] migration: Add a new feature to do live migration Li Liang
  2014-11-06 11:08 ` [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads Li Liang
@ 2014-11-06 11:08 ` Li Liang
  2014-11-06 12:57   ` Eric Blake
                     ` (2 more replies)
  1 sibling, 3 replies; 21+ messages in thread
From: Li Liang @ 2014-11-06 11:08 UTC (permalink / raw)
  To: qemu-devel; +Cc: yang.z.zhang, Li Liang, armbru, lcapitulino

Instead of sending the guest memory directly, this solution compress
the ram page before sending, after receiving, the data will be
decompressed.
This feature can help to reduce the data transferred about
60%, this is very useful when the network bandwidth is limited,
and the migration time can also be reduced about 70%. The
feature is off by default, following the document
docs/multiple-compression-threads.txt for information to use it.

Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Li Liang <liang.z.li@intel.com>
---
 arch_init.c                   | 435 ++++++++++++++++++++++++++++++++++++++++--
 hmp-commands.hx               |  56 ++++++
 hmp.c                         |  57 ++++++
 hmp.h                         |   6 +
 include/migration/migration.h |  12 +-
 include/migration/qemu-file.h |   1 +
 migration.c                   |  99 ++++++++++
 monitor.c                     |  21 ++
 qapi-schema.json              |  88 ++++++++-
 qmp-commands.hx               | 131 +++++++++++++
 10 files changed, 890 insertions(+), 16 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 88a5ba0..a27d87b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -24,6 +24,7 @@
 #include <stdint.h>
 #include <stdarg.h>
 #include <stdlib.h>
+#include <zlib.h>
 #ifndef _WIN32
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_CONTINUE 0x20
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 
 static struct defconfig_file {
     const char *filename;
@@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages;
 static uint32_t last_version;
 static bool ram_bulk_stage;
 
+#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
+#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
+struct MigBuf {
+    int buf_index;
+    uint8_t buf[MIG_BUF_SIZE];
+};
+
+typedef struct MigBuf MigBuf;
+
+static void migrate_put_byte(MigBuf *f, int v)
+{
+    f->buf[f->buf_index] = v;
+    f->buf_index++;
+}
+
+static void migrate_put_be16(MigBuf *f, unsigned int v)
+{
+    migrate_put_byte(f, v >> 8);
+    migrate_put_byte(f, v);
+}
+
+static void migrate_put_be32(MigBuf *f, unsigned int v)
+{
+    migrate_put_byte(f, v >> 24);
+    migrate_put_byte(f, v >> 16);
+    migrate_put_byte(f, v >> 8);
+    migrate_put_byte(f, v);
+}
+
+static void migrate_put_be64(MigBuf *f, uint64_t v)
+{
+    migrate_put_be32(f, v >> 32);
+    migrate_put_be32(f, v);
+}
+
+static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
+{
+    int l;
+
+    while (size > 0) {
+        l = MIG_BUF_SIZE - f->buf_index;
+        if (l > size) {
+            l = size;
+        }
+        memcpy(f->buf + f->buf_index, buf, l);
+        f->buf_index += l;
+        buf += l;
+        size -= l;
+    }
+}
+
+static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
+        ram_addr_t offset, int cont, int flag)
+{
+    size_t size;
+
+    migrate_put_be64(f, offset | cont | flag);
+    size = 8;
+
+    if (!cont) {
+        migrate_put_byte(f, strlen(block->idstr));
+        migrate_put_buffer(f, (uint8_t *)block->idstr,
+                        strlen(block->idstr));
+        size += 1 + strlen(block->idstr);
+    }
+    return size;
+}
+
+static int migrate_qemu_add_compress(MigBuf *f,  const uint8_t *p,
+        int size, int level)
+{
+    uLong  blen = COMPRESS_BUF_SIZE;
+    if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
+            size, level) != Z_OK) {
+        error_report("Compress Failed!\n");
+        return 0;
+    }
+    migrate_put_be32(f, blen);
+    f->buf_index += blen;
+    return blen + sizeof(int);
+}
+
+enum {
+    COM_DONE = 0,
+    COM_START,
+};
+
+static int  compress_thread_count;
+static int  decompress_thread_count;
+
+struct compress_param {
+    int state;
+    MigBuf migbuf;
+    RAMBlock *block;
+    ram_addr_t offset;
+    bool last_stage;
+    int ret;
+    int bytes_sent;
+    uint8_t *p;
+    int cont;
+    bool bulk_stage;
+};
+
+typedef struct compress_param compress_param;
+compress_param *comp_param;
+
+struct decompress_param {
+    int state;
+    void *des;
+    uint8 compbuf[COMPRESS_BUF_SIZE];
+    int len;
+};
+typedef struct decompress_param decompress_param;
+
+static decompress_param *decomp_param;
+bool incomming_migration_done;
+static bool quit_thread;
+
+static int save_compress_ram_page(compress_param *param);
+
+
+static void *do_data_compress(void *opaque)
+{
+    compress_param *param = opaque;
+    while (!quit_thread) {
+        if (param->state == COM_START) {
+            save_compress_ram_page(param);
+            param->state = COM_DONE;
+         } else {
+             g_usleep(1);
+         }
+    }
+
+    return NULL;
+}
+
+
+void migrate_compress_threads_join(MigrationState *s)
+{
+    int i;
+    if (!migrate_use_compress()) {
+        return;
+    }
+    quit_thread = true;
+    for (i = 0; i < compress_thread_count; i++) {
+        qemu_thread_join(s->compress_thread + i);
+    }
+    g_free(s->compress_thread);
+    g_free(comp_param);
+    s->compress_thread = NULL;
+    comp_param = NULL;
+}
+
+void migrate_compress_threads_create(MigrationState *s)
+{
+    int i;
+    if (!migrate_use_compress()) {
+        return;
+    }
+    quit_thread = false;
+    compress_thread_count = s->compress_thread_count;
+    s->compress_thread = g_malloc0(sizeof(QemuThread)
+        * s->compress_thread_count);
+    comp_param = g_malloc0(sizeof(compress_param) * s->compress_thread_count);
+    for (i = 0; i < s->compress_thread_count; i++) {
+        qemu_thread_create(s->compress_thread + i, "compress",
+            do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
+
+    }
+}
+
 /* Update the xbzrle cache to reflect a page that's been sent as all 0.
  * The important thing is that a stale (not-yet-0'd) page be replaced
  * by the new data.
@@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t current_addr)
 
 #define ENCODING_FLAG_XBZRLE 0x1
 
-static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
+static int save_xbzrle_page(void *f, uint8_t **current_data,
                             ram_addr_t current_addr, RAMBlock *block,
-                            ram_addr_t offset, int cont, bool last_stage)
+                            ram_addr_t offset, int cont, bool last_stage,
+                            bool save_to_buf)
 {
     int encoded_len = 0, bytes_sent = -1;
     uint8_t *prev_cached_page;
@@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
     }
 
     /* Send XBZRLE based compressed page */
-    bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
-    qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
-    qemu_put_be16(f, encoded_len);
-    qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
+    if (save_to_buf) {
+        bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
+            cont, RAM_SAVE_FLAG_XBZRLE);
+        migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
+        migrate_put_be16((MigBuf *)f, encoded_len);
+        migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
+    } else {
+        bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
+            cont, RAM_SAVE_FLAG_XBZRLE);
+        qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
+        qemu_put_be16((QEMUFile *)f, encoded_len);
+        qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
+    }
     bytes_sent += encoded_len + 1 + 2;
     acct_info.xbzrle_pages++;
     acct_info.xbzrle_bytes += bytes_sent;
@@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
         xbzrle_cache_zero_page(current_addr);
     } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
         bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
-                                      offset, cont, last_stage);
+                                      offset, cont, last_stage, false);
         if (!last_stage) {
             /* Can't send this cached data async, since the cache page
              * might get updated before it gets to the wire
@@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
     return bytes_sent;
 }
 
+static int save_compress_ram_page(compress_param *param)
+{
+    int bytes_sent = param->bytes_sent;
+    int blen = COMPRESS_BUF_SIZE;
+    int cont = param->cont;
+    uint8_t *p = param->p;
+    int ret = param->ret;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+    bool last_stage = param->last_stage;
+    /* In doubt sent page as normal */
+    XBZRLE_cache_lock();
+    ram_addr_t current_addr = block->offset + offset;
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_sent > 0) {
+                atomic_inc(&acct_info.norm_pages);
+             } else if (bytes_sent == 0) {
+                atomic_inc(&acct_info.dup_pages);
+             }
+        }
+    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+        atomic_inc(&acct_info.dup_pages);
+        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
+                             RAM_SAVE_FLAG_COMPRESS);
+        migrate_put_byte(&param->migbuf, 0);
+        bytes_sent++;
+        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
+         * page would be stale
+         */
+        xbzrle_cache_zero_page(current_addr);
+    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
+        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
+                              offset, cont, last_stage, true);
+    }
+    XBZRLE_cache_unlock();
+    /* XBZRLE overflow or normal page */
+    if (bytes_sent == -1) {
+        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
+            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
+        blen = migrate_qemu_add_compress(&param->migbuf, p,
+            TARGET_PAGE_SIZE, migrate_compress_level());
+        bytes_sent += blen;
+        atomic_inc(&acct_info.norm_pages);
+    }
+    return bytes_sent;
+}
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx;
+    if (!migrate_use_compress()) {
+        return;
+    }
+
+    for (idx = 0; idx < compress_thread_count; idx++) {
+        while (comp_param[idx].state != COM_DONE) {
+            g_usleep(0);
+        }
+        if (comp_param[idx].migbuf.buf_index > 0) {
+            qemu_put_buffer(f, comp_param[idx].migbuf.buf,
+                comp_param[idx].migbuf.buf_index);
+            bytes_transferred += comp_param[idx].migbuf.buf_index;
+            comp_param[idx].migbuf.buf_index = 0;
+        }
+    }
+}
+
+static inline void set_common_compress_params(compress_param *param,
+    int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
+    bool last_stage, int cont, uint8_t *p, bool bulk_stage)
+{
+    param->ret = ret;
+    param->bytes_sent = bytes_sent;
+    param->block = block;
+    param->offset = offset;
+    param->last_stage = last_stage;
+    param->cont = cont;
+    param->p = p;
+    param->bulk_stage = bulk_stage;
+}
+
 /*
  * ram_find_and_save_block: Finds a page to send and sends it to f
  *
@@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     bool complete_round = false;
     int bytes_sent = 0;
     MemoryRegion *mr;
+    int cont, idx, ret, len = -1;
+    uint8_t *p;
 
     if (!block)
         block = QTAILQ_FIRST(&ram_list.blocks);
@@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
                 block = QTAILQ_FIRST(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
+                if (migrate_use_xbzrle()) {
+                    /* terminate the used thread at this point*/
+                    flush_compressed_data(f);
+                    quit_thread = true;
+                }
             }
         } else {
-            bytes_sent = ram_save_page(f, block, offset, last_stage);
-
-            /* if page is unmodified, continue to the next */
-            if (bytes_sent > 0) {
-                last_sent_block = block;
-                break;
+            if (!migrate_use_compress()) {
+                bytes_sent = ram_save_page(f, block, offset, last_stage);
+                /* if page is unmodified, continue to the next */
+                if (bytes_sent > 0) {
+                    last_sent_block = block;
+                    break;
+                }
+            } else {
+                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+                p = memory_region_get_ram_ptr(block->mr) + offset;
+                ret = ram_control_save_page(f, block->offset,
+                           offset, TARGET_PAGE_SIZE, &len);
+                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
+                    if (cont == 0) {
+                        flush_compressed_data(f);
+                    }
+                    set_common_compress_params(&comp_param[0],
+                        ret, len, block, offset, last_stage, cont,
+                        p, ram_bulk_stage);
+                    bytes_sent = save_compress_ram_page(&comp_param[0]);
+                    if (bytes_sent > 0) {
+                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
+                            comp_param[0].migbuf.buf_index);
+                        comp_param[0].migbuf.buf_index = 0;
+                        last_sent_block = block;
+                        break;
+                    }
+                } else {
+retry:
+                    for (idx = 0; idx < compress_thread_count; idx++) {
+                        if (comp_param[idx].state == COM_DONE) {
+                            bytes_sent = comp_param[idx].migbuf.buf_index;
+                            if (bytes_sent == 0) {
+                                set_common_compress_params(&comp_param[idx],
+                                    ret, len, block, offset, last_stage,
+                                    cont, p, ram_bulk_stage);
+                                comp_param[idx].state = COM_START;
+                                bytes_sent = 1;
+                                bytes_transferred -= 1;
+                                break;
+                            } else if (bytes_sent > 0) {
+                                qemu_put_buffer(f, comp_param[idx].migbuf.buf,
+                                    comp_param[idx].migbuf.buf_index);
+                                comp_param[idx].migbuf.buf_index = 0;
+                                set_common_compress_params(&comp_param[idx],
+                                   ret, len, block, offset, last_stage,
+                                   cont, p, ram_bulk_stage);
+                                comp_param[idx].state = COM_START;
+                                break;
+                            }
+                        }
+                    }
+                    if (idx < compress_thread_count) {
+                        last_sent_block = block;
+                        break;
+                    } else {
+                        g_usleep(0);
+                        goto retry;
+                    }
+                }
             }
         }
     }
@@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     return bytes_sent;
 }
 
-static uint64_t bytes_transferred;
 
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
@@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
+    flush_compressed_data(f);
     qemu_mutex_unlock_ramlist();
 
     /*
@@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         bytes_transferred += bytes_sent;
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
@@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+QemuThread *decompress_threads;
+
+static void *do_data_decompress(void *opaque)
+{
+    decompress_param *param = opaque;
+    while (incomming_migration_done == false) {
+        if (param->state == COM_START) {
+            uLong pagesize = TARGET_PAGE_SIZE;
+            if (uncompress((Bytef *)param->des, &pagesize,
+                    (const Bytef *)param->compbuf, param->len) != Z_OK) {
+                error_report("Uncompress Failed!\n");
+                break;
+            }
+            param->state = COM_DONE;
+        } else {
+            if (quit_thread) {
+                break;
+            }
+            g_usleep(1);
+        }
+    }
+    return NULL;
+}
+
+void migrate_decompress_threads_create(int count)
+{
+    int i;
+    decompress_thread_count = count;
+    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
+    decomp_param = g_malloc0(sizeof(decompress_param) * count);
+    quit_thread = false;
+    for (i = 0; i < count; i++) {
+        qemu_thread_create(decompress_threads + i, "decompress",
+            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
+    }
+}
+
+void migrate_decompress_threads_join(void)
+{
+    int i;
+    for (i = 0; i < decompress_thread_count; i++) {
+        qemu_thread_join(decompress_threads + i);
+    }
+    g_free(decompress_threads);
+    g_free(decomp_param);
+    decompress_threads = NULL;
+    decomp_param = NULL;
+}
+
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
     int flags = 0, ret = 0;
     static uint64_t seq_iter;
+    int len = 0;
+    uint8_t compbuf[COMPRESS_BUF_SIZE];
 
     seq_iter++;
 
@@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
             break;
         case RAM_SAVE_FLAG_PAGE:
+            quit_thread = true;
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
@@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
 
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
+        case RAM_SAVE_FLAG_COMPRESS_PAGE:
+            host = host_from_stream_offset(f, addr, flags);
+            if (!host) {
+                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
+                ret = -EINVAL;
+                break;
+            }
+
+            len = qemu_get_be32(f);
+            qemu_get_buffer(f, compbuf, len);
+            int idx;
+retry:
+            for (idx = 0; idx < decompress_thread_count; idx++) {
+                if (decomp_param[idx].state == COM_DONE)  {
+                    memcpy(decomp_param[idx].compbuf, compbuf, len);
+                    decomp_param[idx].des = host;
+                    decomp_param[idx].len = len;
+                    decomp_param[idx].state = COM_START;
+                    break;
+                }
+            }
+            if (idx == decompress_thread_count) {
+                g_usleep(0);
+                goto retry;
+            }
+            break;
         case RAM_SAVE_FLAG_XBZRLE:
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
diff --git a/hmp-commands.hx b/hmp-commands.hx
index e37bc8b..8b93bed 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle migrations.
 ETEXI
 
     {
+        .name       = "migrate_set_compress_level",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set compress level for compress migrations,"
+                      "the level is a number between 0 and 9, 0 stands for "
+                      "no compression.\n"
+                      "1 stands for the fast compress speed while 9 stands for"
+                      "the highest compress ratio.",
+        .mhandler.cmd = hmp_migrate_set_compress_level,
+    },
+
+STEXI
+@item migrate_set_compress_level @var{value}
+@findex migrate_set_compress_level
+Set compress level to @var{value}  for compress migrations.
+ETEXI
+
+    {
+        .name       = "migrate_set_compress_threads",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set compress thread count for migrations. "
+                      "a proper thread count will accelerate the migration speed,"
+                      "the threads should be between 1 and the CPUS of your system",
+        .mhandler.cmd = hmp_migrate_set_compress_threads,
+    },
+
+STEXI
+@item migrate_set_compress_threads @var{value}
+@findex migrate_set_compress_threads
+Set compress threads to @var{value}  for compress migrations.
+ETEXI
+
+    {
+        .name       = "migrate_set_decompress_threads",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set decompress thread count for migrations. "
+                      "a proper thread count will accelerate the migration speed,"
+                      "the threads should be between 1 and the CPUS of your system",
+        .mhandler.cmd = hmp_migrate_set_decompress_threads,
+    },
+
+STEXI
+@item migrate_set_decompress_threads @var{value}
+@findex migrate_set_decompress_threads
+Set decompress threads to @var{value}  for compress migrations.
+ETEXI
+
+    {
         .name       = "migrate_set_speed",
         .args_type  = "value:o",
         .params     = "value",
@@ -1766,6 +1816,12 @@ show migration status
 show current migration capabilities
 @item info migrate_cache_size
 show current migration XBZRLE cache size
+@item info migrate_compress_level
+show current migration compress level
+@item info migrate_compress_threads
+show current migration compress threads
+@item info migrate_decompress_threads
+show current migration decompress threads
 @item info balloon
 show balloon information
 @item info qtree
diff --git a/hmp.c b/hmp.c
index 63d7686..b1936a3 100644
--- a/hmp.c
+++ b/hmp.c
@@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
                    qmp_query_migrate_cache_size(NULL) >> 10);
 }
 
+void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "compress level: %" PRId64 "\n",
+                   qmp_query_migrate_compress_level(NULL));
+}
+
+void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "compress threads: %" PRId64 "\n",
+                   qmp_query_migrate_compress_threads(NULL));
+}
+
+void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "decompress threads: %" PRId64 "\n",
+                   qmp_query_migrate_decompress_threads(NULL));
+}
+
 void hmp_info_cpus(Monitor *mon, const QDict *qdict)
 {
     CpuInfoList *cpu_list, *cpu;
@@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
     }
 }
 
+void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_compress_level(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
+void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_compress_threads(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
+void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_decompress_threads(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
 void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
 {
     int64_t value = qdict_get_int(qdict, "value");
diff --git a/hmp.h b/hmp.h
index 4bb5dca..b348806 100644
--- a/hmp.h
+++ b/hmp.h
@@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
 void hmp_info_cpus(Monitor *mon, const QDict *qdict);
 void hmp_info_block(Monitor *mon, const QDict *qdict);
 void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
@@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
 void hmp_set_password(Monitor *mon, const QDict *qdict);
 void hmp_expire_password(Monitor *mon, const QDict *qdict);
 void hmp_eject(Monitor *mon, const QDict *qdict);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3cb5ba8..03c8e0d 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -49,6 +49,9 @@ struct MigrationState
     QemuThread thread;
     QEMUBH *cleanup_bh;
     QEMUFile *file;
+    QemuThread *compress_thread;
+    int compress_thread_count;
+    int compress_level;
 
     int state;
     MigrationParams params;
@@ -64,6 +67,7 @@ struct MigrationState
     int64_t dirty_sync_count;
 };
 
+extern bool incomming_migration_done;
 void process_incoming_migration(QEMUFile *f);
 
 void qemu_start_incoming_migration(const char *uri, Error **errp);
@@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *);
 bool migration_has_failed(MigrationState *);
 MigrationState *migrate_get_current(void);
 
+void migrate_compress_threads_create(MigrationState *s);
+void migrate_compress_threads_join(MigrationState *s);
+void migrate_decompress_threads_create(int count);
+void migrate_decompress_threads_join(void);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_transferred(void);
 uint64_t ram_bytes_total(void);
@@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
 
 bool migrate_rdma_pin_all(void);
 bool migrate_zero_blocks(void);
-
+bool migrate_use_compress(void);
 bool migrate_auto_converge(void);
 
 int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
@@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
+int migrate_compress_level(void);
+int migrate_compress_threads(void);
 
 int64_t xbzrle_cache_resize(int64_t new_size);
 
diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 401676b..431e6cc 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input);
 int qemu_get_fd(QEMUFile *f);
 int qemu_fclose(QEMUFile *f);
 int64_t qemu_ftell(QEMUFile *f);
+uint64_t qemu_add_compress(QEMUFile *f,  const uint8_t *p, int size);
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
 void qemu_put_byte(QEMUFile *f, int v);
 /*
diff --git a/migration.c b/migration.c
index c49a05a..716de97 100644
--- a/migration.c
+++ b/migration.c
@@ -46,6 +46,12 @@ enum {
 /* Migration XBZRLE default cache size */
 #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
 
+/* Migration compress default thread count */
+#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
+/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
+#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
+
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
 
@@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
         .bandwidth_limit = MAX_THROTTLE,
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
+        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
     };
 
     return &current_migration;
@@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
         error_report("load of migration failed: %s", strerror(-ret));
         exit(EXIT_FAILURE);
     }
+    incomming_migration_done = true;
     qemu_announce_self();
 
     /* Make sure all file formats flush their mutable metadata */
@@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
     } else {
         runstate_set(RUN_STATE_PAUSED);
     }
+    migrate_decompress_threads_join();
 }
 
+static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
 void process_incoming_migration(QEMUFile *f)
 {
+    incomming_migration_done = false;
+    migrate_decompress_threads_create(uncompress_thread_count);
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
     int fd = qemu_get_fd(f);
 
@@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
         qemu_thread_join(&s->thread);
         qemu_mutex_lock_iothread();
 
+        migrate_compress_threads_join(s);
         qemu_fclose(s->file);
         s->file = NULL;
     }
@@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
     int64_t bandwidth_limit = s->bandwidth_limit;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
+    int compress_level = s->compress_level;
+    int compress_thread_count = s->compress_thread_count;
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
            sizeof(enabled_capabilities));
     s->xbzrle_cache_size = xbzrle_cache_size;
 
+    s->compress_level = compress_level;
+    s->compress_thread_count = compress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIG_STATE_SETUP;
     trace_migrate_set_state(MIG_STATE_SETUP);
@@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
     return migrate_xbzrle_cache_size();
 }
 
+void qmp_migrate_set_compress_level(int64_t value, Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+
+    if (value > 9 || value < 0) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
+                  "is invalid, please input a integer between 0 and 9. ");
+        return;
+    }
+
+    s->compress_level = value;
+}
+
+int64_t qmp_query_migrate_compress_level(Error **errp)
+{
+    return migrate_compress_level();
+}
+
+void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+
+    if (value > 255 || value < 1) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
+                  "is invalid, please input a integer between 1 and 255. ");
+        return;
+    }
+
+    s->compress_thread_count = value;
+}
+
+void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
+{
+
+    if (value > 255 || value < 1) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
+                  "is invalid, please input a integer between 1 and 255. ");
+        return;
+    }
+
+    uncompress_thread_count = value;
+}
+
+int64_t qmp_query_migrate_compress_threads(Error **errp)
+{
+    return migrate_compress_threads();
+}
+
+int64_t qmp_query_migrate_decompress_threads(Error **errp)
+{
+    return uncompress_thread_count;
+}
+
 void qmp_migrate_set_speed(int64_t value, Error **errp)
 {
     MigrationState *s;
@@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
 }
 
+bool migrate_use_compress(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
+}
+
+int migrate_compress_level(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->compress_level;
+}
+
+int migrate_compress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->compress_thread_count;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
 
     qemu_thread_create(&s->thread, "migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
+    migrate_compress_threads_create(s);
 }
diff --git a/monitor.c b/monitor.c
index 905d8cf..365547e 100644
--- a/monitor.c
+++ b/monitor.c
@@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
         .mhandler.cmd = hmp_info_migrate_cache_size,
     },
     {
+        .name       = "migrate_compress_level",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration compress level",
+        .mhandler.cmd = hmp_info_migrate_compress_level,
+    },
+    {
+        .name       = "migrate_compress_threads",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration compress thread count",
+        .mhandler.cmd = hmp_info_migrate_compress_threads,
+    },
+    {
+        .name       = "migrate_decompress_threads",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration decompress thread count",
+        .mhandler.cmd = hmp_info_migrate_decompress_threads,
+    },
+    {
         .name       = "balloon",
         .args_type  = "",
         .params     = "",
diff --git a/qapi-schema.json b/qapi-schema.json
index 24379ab..71a9e0f 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -491,13 +491,17 @@
 #          to enable the capability on the source VM. The feature is disabled by
 #          default. (since 1.6)
 #
+# @compress: Using the multiple compression threads to accelerate live migration.
+#          This feature can help to reduce the migration traffic, by sending
+#          compressed pages. The feature is disabled by default. (since 2.3)
+#
 # @auto-converge: If enabled, QEMU will automatically throttle down the guest
 #          to speed up convergence of RAM migration. (since 1.6)
 #
 # Since: 1.2
 ##
 { 'enum': 'MigrationCapability',
-  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
+  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress'] }
 
 ##
 # @MigrationCapabilityStatus
@@ -1382,6 +1386,88 @@
 { 'command': 'query-migrate-cache-size', 'returns': 'int' }
 
 ##
+# @migrate-set-compress-level
+#
+# Set compress level
+#
+# @value: compress level int
+#
+# The compress level will be an integer between 0 and 9.
+# The compress level can be modified before and during ongoing migration
+#
+# Returns: nothing on success
+#
+# Since: 2.3
+##
+{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-compress-level
+#
+# query compress level
+#
+# Returns: compress level int
+#
+# Since: 2.3
+##
+{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
+
+##
+# @migrate-set-compress-threads
+#
+# Set compress threads
+#
+# @value: compress threads int
+#
+# The compress thread count is an integer between 1 and 255.
+# The compress level can be modified only before migration
+#
+# Returns: nothing on success
+#
+# Since: 2.3
+##
+{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-compress-threads
+#
+# query compress threads
+#
+# Returns: compress threads int
+#
+# Since: 2.3
+##
+{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
+
+##
+##
+# @migrate-set-decompress-threads
+#
+# Set decompress threads
+#
+# @value: decompress threads int
+#
+# The decompress thread count is an integer between 1 and 255.
+# The decompress level can be modified only before migration
+#
+# Returns: nothing on success
+#
+# Since: 2.3
+##
+{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-decompress-threads
+#
+# query decompress threads
+#
+# Returns: decompress threads int
+#
+# Since: 2.3
+##
+{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
+
+##
 # @ObjectPropertyInfo:
 #
 # @name: the name of the property
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 1abd619..b60fdab 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -705,7 +705,138 @@ Example:
 <- { "return": 67108864 }
 
 EQMP
+{
+        .name       = "migrate-set-compress-level",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
+    },
+
+SQMP
+migrate-set-compress-level
+----------------------
+
+Set compress level to be used by compress migration, the compress level is an integer
+between 0 and 9
+
+Arguments:
+
+- "value": compress level (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-compress-level", "arguments": { "value": 536870912 } }
+<- { "return": {} }
+
+EQMP
+    {
+        .name       = "query-migrate-compress-level",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
+    },
+
+SQMP
+query-migrate-compress-level
+------------------------
+
+Show compress level to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-level" }
+<- { "return": 67108864 }
+
+EQMP
+{
+        .name       = "migrate-set-compress-threads",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
+    },
+
+SQMP
+migrate-set-compress-threads
+----------------------
+
+Set compress thread count to be used by compress migration, the compress thread count is an integer
+between 1 and 255
+
+Arguments:
+
+- "value": compress threads (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 536870912 } }
+<- { "return": {} }
+
+EQMP
+    {
+        .name       = "query-migrate-compress-threads",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
+    },
+
+SQMP
+query-migrate-compress-threads
+------------------------
+
+Show compress thread count to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-threads" }
+<- { "return": 67108864 }
+
+EQMP
+{
+        .name       = "migrate-set-decompress-threads",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
+    },
+
+SQMP
+migrate-set-decompress-threads
+----------------------
+
+Set decompress thread count to be used by compress migration, the decompress thread count is an integer
+between 1 and 255
+
+Arguments:
+
+- "value": decompress threads (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": 536870912 } }
+<- { "return": {} }
 
+EQMP
+    {
+        .name       = "query-migrate-decompress-threads",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_decompress_threads,
+    },
+
+SQMP
+query-migrate-decompress-threads
+------------------------
+
+Show decompress thread count to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-threads" }
+<- { "return": 67108864 }
+
+EQMP
     {
         .name       = "migrate_set_speed",
         .args_type  = "value:o",
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads
  2014-11-06 11:08 ` [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads Li Liang
@ 2014-11-06 11:25   ` Eric Blake
  2014-11-06 13:24   ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 21+ messages in thread
From: Eric Blake @ 2014-11-06 11:25 UTC (permalink / raw)
  To: Li Liang, qemu-devel; +Cc: yang.z.zhang, armbru, lcapitulino

[-- Attachment #1: Type: text/plain, Size: 6809 bytes --]

On 11/06/2014 12:08 PM, Li Liang wrote:
> Give some details about the multiple compression threads and how
> to use it in live migration.
> 
> Signed-off-by: Li Liang <liang.z.li@intel.com>
> ---
>  docs/multiple-compression-threads.txt | 128 ++++++++++++++++++++++++++++++++++
>  1 file changed, 128 insertions(+)
>  create mode 100644 docs/multiple-compression-threads.txt
> 
> diff --git a/docs/multiple-compression-threads.txt b/docs/multiple-compression-threads.txt
> new file mode 100644
> index 0000000..a5e53de
> --- /dev/null
> +++ b/docs/multiple-compression-threads.txt
> @@ -0,0 +1,128 @@
> +Use multiple (de)compression threads in live migration
> +=================================================================
> +Copyright (C) 2014 Li Liang <liang.z.li@intel.com>

Asserting copyright without also mentioning an open license is awkward
in open source (IANAL, but as I understand it, in some areas, asserting
a copyright without also granting disclaimers merely gets the default
non-open status where the file cannot be copied at all; the license is
essential to make it obvious that the copyright holder INTENDS for the
file to be copied in some circumstances).  Thus, you need to explicitly
call out GPLv2+ (even if it can be argued it is was implied by the
top-level LICENSE) or some other compatible license to be safe.

> +
> +
> +Contents:
> +=========
> +* Introduction
> +* When to use
> +* Performance
> +* Usage
> +* TODO
> +
> +Introduction
> +============
> +Instead of sending the guest memory directly, this solution will
> +compress the ram page before sending, after receiving, the data will

s/sending,/sending;/

> +be decompressed. Using compression in live migration can help
> +to reduce the data transferred about 60%, this is very useful when the
> +bandwidth is limited, and the migration time can also be reduced about
> +70% in a typical case.
> +
> +The process of compression will consume additional CPU cycles, and the
> +extra CPU cycles will increase the migration time. On the other hand,
> +the amount of data transferred will reduced, this factor can reduce
> +the migration time. If the process of the compression is quick
> +enough, then the total migration time can be reduced, and multiple
> +compression threads can be used to accelerate the compression process.
> +
> +The decompression speed of zlib is at least 4 times as quickly as

s/quickly/quick/

> +compression, if the source and destination CPU have equal speed,
> +keeping the compression thread count 4 times the decompression
> +thread count can avoid CPU waste.
> +
> +Compression level can be used to control the compression speed and the
> +compression ratio. High compression ratio will take more time, level 0
> +stands for no compression, level 1 stands for the best compression
> +speed, and level 9 stands for the best compression ratio. Users can
> +select a level number between 0 and 9.
> +
> +
> +When to use the multiple compression threads in live migration
> +==============================================================
> +Compression of data will consume lot of extra CPU cycles, in a system

s/lot of//
s/cycles,/cycles; so/

> +with high overhead of CPU, avoid using this feature. When the network
> +bandwidth is very limited and the CPU resource is adequate, use the

s/use the/use of/

> +multiple compression threads will be very helpful. If both the CPU and
> +the network bandwidth are adequate, use multiple compression threads

s/use/use of/

> +can still help to reduce the migration time.
> +
> +Performance
> +===========
> +Test environment:
> +
> +CPU: Intel(R) Xeon(R) CPU E5-2680 0 @ 2.70GHz
> +Socket Count: 2
> +Ram: 128G
> +NIC: Intel I350 (10/100/1000Mbps)
> +Host OS: CentOS 7 64-bit
> +Guest OS: Ubuntu 12.10 64-bit
> +Parameter: qemu-system-x86_64 -enable-kvm -m 1024
> + /share/ia32e_ubuntu12.10.img -monitor stdio
> +
> +There is no additional application is running on the guest when doing
> +the test.
> +
> +
> +Speed limit: 32MB/s
> +---------------------------------------------------------------
> +                    | original  | compress thread: 8
> +                    |   way     | decompress thread: 2
> +                    |           | compression level: 1
> +---------------------------------------------------------------
> +total time(msec):   |  26561    |  7920
> +---------------------------------------------------------------
> +transferred ram(kB):|  877054   | 260641
> +---------------------------------------------------------------
> +throughput(mbps):   |  270.53   | 269.68
> +---------------------------------------------------------------
> +total ram(kB):      |  1057604  | 1057604
> +---------------------------------------------------------------
> +
> +
> +Speed limit: No
> +---------------------------------------------------------------
> +                    | original  | compress thread: 15
> +                    |   way     | decompress thread: 4
> +                    |           | compression level: 1
> +---------------------------------------------------------------
> +total time(msec):   |  7611     |  2888
> +---------------------------------------------------------------
> +transferred ram(kB):|  876761   | 262301
> +---------------------------------------------------------------
> +throughput(mbps):   |  943.78   | 744.27
> +---------------------------------------------------------------
> +total ram(kB):      |  1057604  | 1057604
> +---------------------------------------------------------------
> +
> +Usage
> +======
> +1. Verify the destination QEMU version is able to support the multiple
> +compression threads migration:
> +    {qemu} info_migrate_capablilites
> +    {qemu} ... compress: off ...
> +
> +2. Activate compression on the souce:
> +    {qemu} migrate_set_capability compress on
> +
> +3. Set the compression thread count on source:
> +    {qemu} migrate_set_compress_threads 10
> +
> +4. Set the compression level on the source:
> +    {qemu} migrate_set_compress_level 1
> +
> +5. Set the decompression thread count on destination:
> +    {qemu} migrate_set_decompress_threads 5
> +
> +6. Start outgoing migration:
> +    {qemu} migrate -d tcp:destination.host:4444
> +    {qemu} info migrate
> +    Capabilities: ... compress: on
> +    ...
> +
> +TODO
> +====
> +Some faster compression/decompression method such as lz4 and quicklz
> +can help to reduce the CPU consumption when doing (de)compression.
> +Less (de)compression threads are needed when doing the migration.
> 

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 539 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-06 11:08 ` [Qemu-devel] [v2 2/2] migration: Implement " Li Liang
@ 2014-11-06 12:57   ` Eric Blake
  2014-11-21  6:18     ` Zhang, Yang Z
                       ` (2 more replies)
  2014-11-06 15:41   ` Dr. David Alan Gilbert
  2014-11-21  7:29   ` ChenLiang
  2 siblings, 3 replies; 21+ messages in thread
From: Eric Blake @ 2014-11-06 12:57 UTC (permalink / raw)
  To: Li Liang, qemu-devel; +Cc: yang.z.zhang, armbru, lcapitulino

[-- Attachment #1: Type: text/plain, Size: 5370 bytes --]

On 11/06/2014 12:08 PM, Li Liang wrote:
> Instead of sending the guest memory directly, this solution compress
> the ram page before sending, after receiving, the data will be
> decompressed.
> This feature can help to reduce the data transferred about
> 60%, this is very useful when the network bandwidth is limited,
> and the migration time can also be reduced about 70%. The
> feature is off by default, following the document
> docs/multiple-compression-threads.txt for information to use it.
> 
> Reviewed-by: Eric Blake <eblake@redhat.com>

Please DON'T add this line unless the author spelled it out (or if they
mentioned that it would be okay if you fix minor issues).  I
intentionally omitted a reviewed-by on v1:

https://lists.gnu.org/archive/html/qemu-devel/2014-11/msg00672.html

because I was not happy with the patch as it was presented and did not
think the work to fix it was trivial.  Furthermore, my review of v1 was
just over the interface, and not the entire patch; there are very likely
still bugs lurking in the .c files.  Once again, I'm going to limit my
review of v2 to the interface (at least in this email):

> Signed-off-by: Li Liang <liang.z.li@intel.com>
> ---

> +++ b/qapi-schema.json
> @@ -491,13 +491,17 @@
>  #          to enable the capability on the source VM. The feature is disabled by
>  #          default. (since 1.6)
>  #
> +# @compress: Using the multiple compression threads to accelerate live migration.
> +#          This feature can help to reduce the migration traffic, by sending
> +#          compressed pages. The feature is disabled by default. (since 2.3)
> +#
>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>  #          to speed up convergence of RAM migration. (since 1.6)
>  #
>  # Since: 1.2
>  ##
>  { 'enum': 'MigrationCapability',
> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress'] }
>  

I'll repeat what I said on v1 (but this time, with some links to back it
up :)

We really need to avoid a proliferation of new commands, two per tunable
does not scale well.  I think now is the time to implement my earlier
suggestion at making MigrationCapability become THE resource for tunables:

https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html

> +++ b/qmp-commands.hx
> @@ -705,7 +705,138 @@ Example:
>  <- { "return": 67108864 }
>  
>  EQMP
> +{
> +        .name       = "migrate-set-compress-level",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
> +    },
> +
> +SQMP
> +migrate-set-compress-level
> +----------------------

Convention in this file is to have the --- line extended out to the
length of the text it is tied to (you are missing four bytes,
corresponding to the tail "evel")

> +
> +Set compress level to be used by compress migration, the compress level is an integer

s/compress level/the compression level/ (twice)

> +between 0 and 9

s/9/9, where 9 means try harder for smaller compression at the expense
of more CPU time/

> +
> +Arguments:
> +
> +- "value": compress level (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-level", "arguments": { "value": 536870912 } }

Umm, 536870912 is not an integer between 0 and 9.


> +SQMP
> +query-migrate-compress-level
> +------------------------

--- length

> +
> +Show compress level to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-level" }
> +<- { "return": 67108864 }

Ewww. Please no new interfaces that return raw ints.  Rather, return a
dictionary with one key/value pair holding the int.  Raw ints are not as
extensible as dictionaries.  Also, make the example realistic - 67108864
is not a valid compression level.

{ "return": { "level": 9 } }


> +migrate-set-compress-threads
> +----------------------

--- length

> +
> +Set compress thread count to be used by compress migration, the compress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": compress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 536870912 } }

Value out of range 1-255

> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
> +    },
> +
> +SQMP
> +query-migrate-compress-threads
> +------------------------

--- length

> +
> +Show compress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }

out of range, raw int return

and so on in the rest of the patch (I'll quit calling it out, especially
if we switch over to my enhanced set-capabilities proposal)

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 539 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads
  2014-11-06 11:08 ` [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads Li Liang
  2014-11-06 11:25   ` Eric Blake
@ 2014-11-06 13:24   ` Dr. David Alan Gilbert
  2014-11-06 13:46     ` Eric Blake
  2014-11-07  2:28     ` Li, Liang Z
  1 sibling, 2 replies; 21+ messages in thread
From: Dr. David Alan Gilbert @ 2014-11-06 13:24 UTC (permalink / raw)
  To: Li Liang; +Cc: quintela, armbru, qemu-devel, lcapitulino, amit.shah,
	yang.z.zhang

* Li Liang (liang.z.li@intel.com) wrote:
> Give some details about the multiple compression threads and how
> to use it in live migration.
> 
> Signed-off-by: Li Liang <liang.z.li@intel.com>
> ---
>  docs/multiple-compression-threads.txt | 128 ++++++++++++++++++++++++++++++++++
>  1 file changed, 128 insertions(+)
>  create mode 100644 docs/multiple-compression-threads.txt
> 
> diff --git a/docs/multiple-compression-threads.txt b/docs/multiple-compression-threads.txt
> new file mode 100644
> index 0000000..a5e53de
> --- /dev/null
> +++ b/docs/multiple-compression-threads.txt

Should probably have migration in the title?

> +Usage
> +======
> +1. Verify the destination QEMU version is able to support the multiple
> +compression threads migration:
> +    {qemu} info_migrate_capablilites
> +    {qemu} ... compress: off ...
> +
> +2. Activate compression on the souce:
> +    {qemu} migrate_set_capability compress on
> +
> +3. Set the compression thread count on source:
> +    {qemu} migrate_set_compress_threads 10
> +
> +4. Set the compression level on the source:
> +    {qemu} migrate_set_compress_level 1
> +
> +5. Set the decompression thread count on destination:
> +    {qemu} migrate_set_decompress_threads 5
> +
> +6. Start outgoing migration:
> +    {qemu} migrate -d tcp:destination.host:4444
> +    {qemu} info migrate
> +    Capabilities: ... compress: on
> +    ...
> +
> +TODO
> +====
> +Some faster compression/decompression method such as lz4 and quicklz
> +can help to reduce the CPU consumption when doing (de)compression.
> +Less (de)compression threads are needed when doing the migration.

OK, some high level questions:
   1) How does the performance compare to running a separate compressor
process in the stream rather than embedding it in the qemu?

   2) Since you're looking at different compression schemes do we need
something in the settings to select it, and to say what makes sense
for the 'compress_level'?   For example I don't know if lz4 or quicklz
have 1-10 for their compression levels?  How do I know which compression
schemes are available on any host?

Dave
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads
  2014-11-06 13:24   ` Dr. David Alan Gilbert
@ 2014-11-06 13:46     ` Eric Blake
  2014-11-07  2:28     ` Li, Liang Z
  1 sibling, 0 replies; 21+ messages in thread
From: Eric Blake @ 2014-11-06 13:46 UTC (permalink / raw)
  To: Dr. David Alan Gilbert, Li Liang
  Cc: quintela, armbru, qemu-devel, lcapitulino, yang.z.zhang,
	amit.shah

[-- Attachment #1: Type: text/plain, Size: 2671 bytes --]

On 11/06/2014 02:24 PM, Dr. David Alan Gilbert wrote:
> * Li Liang (liang.z.li@intel.com) wrote:
>> Give some details about the multiple compression threads and how
>> to use it in live migration.
>>
>> Signed-off-by: Li Liang <liang.z.li@intel.com>
>> ---

>> +TODO
>> +====
>> +Some faster compression/decompression method such as lz4 and quicklz
>> +can help to reduce the CPU consumption when doing (de)compression.
>> +Less (de)compression threads are needed when doing the migration.
> 
> OK, some high level questions:
>    1) How does the performance compare to running a separate compressor
> process in the stream rather than embedding it in the qemu?

Interesting question.  I wonder if libvirt should be extended to
optionally insert a compression/decompression filter in the setups it
creates.  Remember, in libvirt tunnelled mode, where libvirt is adding
TLS encryption on top of the migration data stream so that it is not
sniffable from TCP, all data is already going through the path:

source qemu -> source libvirt -> destination libvirt -> destination qemu
          Unix socket/pipe  TCP socket          Unix socket/pipe

Furthermore, libvirt is ALREADY wired up to use external compression
when doing migration to file (such as supporting multiple compression
formats for 'virsh save'), which looks like:

qemu -> compressor -> libvirt I/O helper -> file
     pipe         pipe           O_DIRECT file ops

then restoring that image with:

file -> libvirt I/O helper -> decompressor -> qemu
  O_DIRECT file ops      pipe             pipe

So adding compression in the mix seems like it would be easy for libvirt
to do:

source qemu -> compressor -> source libvirt -> destination libvirt ...
          pipe           pipe            TCP socket
   -> decompressor -> destination qemu
 pipe             pipe


Of course, with an external processor, I don't know if you can get
speedups from having multiple compression threads when all input is
coming serially from a single connection, so your approach of folding in
parallel compression threads directly into qemu may still have some
speed merits.  On the other hand, I'm not sure how your solution is
multiplexing the multiple compression threads into a single migration
stream; if you are still bottlenecked by a single migration stream, what
good do you get by adding multiple (de)compression threads, without some
way in the migration protocol to cleanly call out a fair rotation from
the independent sub-stream of each thread?

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 539 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-06 11:08 ` [Qemu-devel] [v2 2/2] migration: Implement " Li Liang
  2014-11-06 12:57   ` Eric Blake
@ 2014-11-06 15:41   ` Dr. David Alan Gilbert
  2014-11-21  7:01     ` Li, Liang Z
  2014-11-21  7:29   ` ChenLiang
  2 siblings, 1 reply; 21+ messages in thread
From: Dr. David Alan Gilbert @ 2014-11-06 15:41 UTC (permalink / raw)
  To: Li Liang; +Cc: quintela, armbru, qemu-devel, lcapitulino, amit.shah,
	yang.z.zhang

* Li Liang (liang.z.li@intel.com) wrote:
> Instead of sending the guest memory directly, this solution compress
> the ram page before sending, after receiving, the data will be
> decompressed.
> This feature can help to reduce the data transferred about
> 60%, this is very useful when the network bandwidth is limited,
> and the migration time can also be reduced about 70%. The
> feature is off by default, following the document
> docs/multiple-compression-threads.txt for information to use it.

More technical comments below; but could you split the patch up a bit
more please - it's a bit daunting; probably with the commands in a separate
patch, and maybe split the compress stuff into one patch and the decompress
into another.

Another thing; I've not figured out how all of this gets cleaned up in
a migration_cancel or if the migration fails.

> Reviewed-by: Eric Blake <eblake@redhat.com>
> Signed-off-by: Li Liang <liang.z.li@intel.com>
> ---
>  arch_init.c                   | 435 ++++++++++++++++++++++++++++++++++++++++--
>  hmp-commands.hx               |  56 ++++++
>  hmp.c                         |  57 ++++++
>  hmp.h                         |   6 +
>  include/migration/migration.h |  12 +-
>  include/migration/qemu-file.h |   1 +
>  migration.c                   |  99 ++++++++++
>  monitor.c                     |  21 ++
>  qapi-schema.json              |  88 ++++++++-
>  qmp-commands.hx               | 131 +++++++++++++
>  10 files changed, 890 insertions(+), 16 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 88a5ba0..a27d87b 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -24,6 +24,7 @@
>  #include <stdint.h>
>  #include <stdarg.h>
>  #include <stdlib.h>
> +#include <zlib.h>
>  #ifndef _WIN32
>  #include <sys/types.h>
>  #include <sys/mman.h>
> @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_CONTINUE 0x20
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
> +#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  
>  static struct defconfig_file {
>      const char *filename;
> @@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages;
>  static uint32_t last_version;
>  static bool ram_bulk_stage;
>  

Magic 16 constnats - why?

> +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
> +#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
> +struct MigBuf {
> +    int buf_index;
> +    uint8_t buf[MIG_BUF_SIZE];
> +};
> +
> +typedef struct MigBuf MigBuf;
> +

These functions look like they're recreating stuff in Qemufile - is there
no way to share anything? 
> +static void migrate_put_byte(MigBuf *f, int v)
> +{
> +    f->buf[f->buf_index] = v;
> +    f->buf_index++;
> +}
> +
> +static void migrate_put_be16(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be32(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 24);
> +    migrate_put_byte(f, v >> 16);
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be64(MigBuf *f, uint64_t v)
> +{
> +    migrate_put_be32(f, v >> 32);
> +    migrate_put_be32(f, v);
> +}
> +

This feels like you're doing something very similar to 
the buffered file code that recently went in; could you
reuse qemu_bufopen or the QEMUSizedBuffer?
I think if you could use the qemu_buf somehow (maybe with
modifications?) then you could avoid a lot of the 'if'd
code below, because you'd always be working with a QEMUFile,
it would just be a different QEMUFile.

> +static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
> +{
> +    int l;
> +
> +    while (size > 0) {
> +        l = MIG_BUF_SIZE - f->buf_index;
> +        if (l > size) {
> +            l = size;
> +        }
> +        memcpy(f->buf + f->buf_index, buf, l);
> +        f->buf_index += l;
> +        buf += l;
> +        size -= l;
> +    }
> +}
> +
> +static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
> +        ram_addr_t offset, int cont, int flag)
> +{
> +    size_t size;
> +
> +    migrate_put_be64(f, offset | cont | flag);
> +    size = 8;
> +
> +    if (!cont) {
> +        migrate_put_byte(f, strlen(block->idstr));
> +        migrate_put_buffer(f, (uint8_t *)block->idstr,
> +                        strlen(block->idstr));
> +        size += 1 + strlen(block->idstr);
> +    }
> +    return size;
> +}
> +


> +static int migrate_qemu_add_compress(MigBuf *f,  const uint8_t *p,
> +        int size, int level)
> +{
> +    uLong  blen = COMPRESS_BUF_SIZE;
> +    if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
> +            size, level) != Z_OK) {
> +        error_report("Compress Failed!\n");
> +        return 0;
> +    }
> +    migrate_put_be32(f, blen);
> +    f->buf_index += blen;
> +    return blen + sizeof(int);
> +}

Please add a comment about what this is doing, and use size_t or
unsigned int for sizes.
Also error_report doesn't need the \n

> +enum {
> +    COM_DONE = 0,
> +    COM_START,
> +};
> +
> +static int  compress_thread_count;
> +static int  decompress_thread_count;
> +
> +struct compress_param {
> +    int state;
> +    MigBuf migbuf;
> +    RAMBlock *block;
> +    ram_addr_t offset;
> +    bool last_stage;
> +    int ret;
> +    int bytes_sent;
> +    uint8_t *p;
> +    int cont;
> +    bool bulk_stage;
> +};
> +
> +typedef struct compress_param compress_param;
> +compress_param *comp_param;
> +
> +struct decompress_param {
> +    int state;
> +    void *des;
> +    uint8 compbuf[COMPRESS_BUF_SIZE];
> +    int len;
> +};
> +typedef struct decompress_param decompress_param;
> +
> +static decompress_param *decomp_param;
> +bool incomming_migration_done;
> +static bool quit_thread;
> +
> +static int save_compress_ram_page(compress_param *param);
> +
> +
> +static void *do_data_compress(void *opaque)
> +{
> +    compress_param *param = opaque;
> +    while (!quit_thread) {
> +        if (param->state == COM_START) {
> +            save_compress_ram_page(param);
> +            param->state = COM_DONE;
> +         } else {
> +             g_usleep(1);

There has to be a better way than heaving your thread spin
with sleeps; qemu_event or semaphore or something?

> +         }
> +    }
> +
> +    return NULL;
> +}
> +
> +
> +void migrate_compress_threads_join(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = true;
> +    for (i = 0; i < compress_thread_count; i++) {
> +        qemu_thread_join(s->compress_thread + i);
> +    }
> +    g_free(s->compress_thread);
> +    g_free(comp_param);
> +    s->compress_thread = NULL;
> +    comp_param = NULL;
> +}
> +
> +void migrate_compress_threads_create(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = false;
> +    compress_thread_count = s->compress_thread_count;
> +    s->compress_thread = g_malloc0(sizeof(QemuThread)
> +        * s->compress_thread_count);
> +    comp_param = g_malloc0(sizeof(compress_param) * s->compress_thread_count);

You might need to be careful about how quit_thread and comp_param
are accessed by the migration thread and your individual compression threads,
especially on those architectures that don't do ordering etc.

> +    for (i = 0; i < s->compress_thread_count; i++) {
> +        qemu_thread_create(s->compress_thread + i, "compress",
> +            do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
> +
> +    }
> +}
> +
>  /* Update the xbzrle cache to reflect a page that's been sent as all 0.
>   * The important thing is that a stale (not-yet-0'd) page be replaced
>   * by the new data.
> @@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t current_addr)
>  
>  #define ENCODING_FLAG_XBZRLE 0x1
>  
> -static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
> +static int save_xbzrle_page(void *f, uint8_t **current_data,
>                              ram_addr_t current_addr, RAMBlock *block,
> -                            ram_addr_t offset, int cont, bool last_stage)
> +                            ram_addr_t offset, int cont, bool last_stage,
> +                            bool save_to_buf)
>  {
>      int encoded_len = 0, bytes_sent = -1;
>      uint8_t *prev_cached_page;
> @@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
>      }
>  
>      /* Send XBZRLE based compressed page */
> -    bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
> -    qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
> -    qemu_put_be16(f, encoded_len);
> -    qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
> +    if (save_to_buf) {
> +        bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
> +        migrate_put_be16((MigBuf *)f, encoded_len);
> +        migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
> +    } else {
> +        bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
> +        qemu_put_be16((QEMUFile *)f, encoded_len);
> +        qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
> +    }

So this in particular is the thing where I think using a qemu_buf file/qsb
would help; all that if would disappear.

>      bytes_sent += encoded_len + 1 + 2;
>      acct_info.xbzrle_pages++;
>      acct_info.xbzrle_bytes += bytes_sent;
> @@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>          xbzrle_cache_zero_page(current_addr);
>      } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
>          bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
> -                                      offset, cont, last_stage);
> +                                      offset, cont, last_stage, false);
>          if (!last_stage) {
>              /* Can't send this cached data async, since the cache page
>               * might get updated before it gets to the wire
> @@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int save_compress_ram_page(compress_param *param)
> +{
> +    int bytes_sent = param->bytes_sent;
> +    int blen = COMPRESS_BUF_SIZE;
> +    int cont = param->cont;
> +    uint8_t *p = param->p;
> +    int ret = param->ret;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +    bool last_stage = param->last_stage;
> +    /* In doubt sent page as normal */
> +    XBZRLE_cache_lock();
> +    ram_addr_t current_addr = block->offset + offset;
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                atomic_inc(&acct_info.norm_pages);
> +             } else if (bytes_sent == 0) {
> +                atomic_inc(&acct_info.dup_pages);
> +             }
> +        }
> +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> +        atomic_inc(&acct_info.dup_pages);
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
> +                             RAM_SAVE_FLAG_COMPRESS);
> +        migrate_put_byte(&param->migbuf, 0);
> +        bytes_sent++;
> +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> +         * page would be stale
> +         */
> +        xbzrle_cache_zero_page(current_addr);
> +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
> +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
> +                              offset, cont, last_stage, true);
> +    }
> +    XBZRLE_cache_unlock();
> +    /* XBZRLE overflow or normal page */

I wonder if it's worth the complexity of doing the zero check
and the xbzrle if you're already doing compression?  I assume
zlib is going to handle a zero page reasonably well anyway?

> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> +    }
> +    return bytes_sent;
> +}
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +
> +    for (idx = 0; idx < compress_thread_count; idx++) {
> +        while (comp_param[idx].state != COM_DONE) {
> +            g_usleep(0);
> +        }

Again, some type of event/semaphore rather than busy sleeping;
and also I don't understand how the different threads keep everything
in order - can you add some comments (or maybe notes in the docs)
that explain how it all works?

> +        if (comp_param[idx].migbuf.buf_index > 0) {
> +            qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                comp_param[idx].migbuf.buf_index);
> +            bytes_transferred += comp_param[idx].migbuf.buf_index;
> +            comp_param[idx].migbuf.buf_index = 0;
> +        }
> +    }
> +}

> +static inline void set_common_compress_params(compress_param *param,
> +    int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
> +    bool last_stage, int cont, uint8_t *p, bool bulk_stage)
> +{
> +    param->ret = ret;
> +    param->bytes_sent = bytes_sent;
> +    param->block = block;
> +    param->offset = offset;
> +    param->last_stage = last_stage;
> +    param->cont = cont;
> +    param->p = p;
> +    param->bulk_stage = bulk_stage;
> +}
> +
>  /*
>   * ram_find_and_save_block: Finds a page to send and sends it to f
>   *
> @@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      bool complete_round = false;
>      int bytes_sent = 0;
>      MemoryRegion *mr;
> +    int cont, idx, ret, len = -1;
> +    uint8_t *p;
>  
>      if (!block)
>          block = QTAILQ_FIRST(&ram_list.blocks);
> @@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>                  block = QTAILQ_FIRST(&ram_list.blocks);
>                  complete_round = true;
>                  ram_bulk_stage = false;
> +                if (migrate_use_xbzrle()) {
> +                    /* terminate the used thread at this point*/
> +                    flush_compressed_data(f);
> +                    quit_thread = true;
> +                }
>              }
>          } else {
> -            bytes_sent = ram_save_page(f, block, offset, last_stage);
> -
> -            /* if page is unmodified, continue to the next */
> -            if (bytes_sent > 0) {
> -                last_sent_block = block;
> -                break;
> +            if (!migrate_use_compress()) {
> +                bytes_sent = ram_save_page(f, block, offset, last_stage);
> +                /* if page is unmodified, continue to the next */
> +                if (bytes_sent > 0) {
> +                    last_sent_block = block;
> +                    break;
> +                }
> +            } else {
> +                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +                p = memory_region_get_ram_ptr(block->mr) + offset;
> +                ret = ram_control_save_page(f, block->offset,
> +                           offset, TARGET_PAGE_SIZE, &len);
> +                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
> +                    if (cont == 0) {
> +                        flush_compressed_data(f);
> +                    }
> +                    set_common_compress_params(&comp_param[0],
> +                        ret, len, block, offset, last_stage, cont,
> +                        p, ram_bulk_stage);
> +                    bytes_sent = save_compress_ram_page(&comp_param[0]);
> +                    if (bytes_sent > 0) {
> +                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
> +                            comp_param[0].migbuf.buf_index);
> +                        comp_param[0].migbuf.buf_index = 0;
> +                        last_sent_block = block;
> +                        break;
> +                    }

Is there no way to move this down into your save_compress_ram_page?
When I split the code into ram_find_and_save_block and ram_save_page
a few months ago, it meant that 'ram_find_and_save_block' only really
did the work of finding what to send, and 'ram_save_page' figured out
everything to do with sending it; it would be nice to keep all
the details of sending it separate still.

Since ram_bulk_stage is a static global in this file, why bother passing
it into the 'compress_params'?  I think you could probably avoid a lot
of things like that.

> +                } else {
> +retry:
> +                    for (idx = 0; idx < compress_thread_count; idx++) {
> +                        if (comp_param[idx].state == COM_DONE) {
> +                            bytes_sent = comp_param[idx].migbuf.buf_index;
> +                            if (bytes_sent == 0) {
> +                                set_common_compress_params(&comp_param[idx],
> +                                    ret, len, block, offset, last_stage,
> +                                    cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                bytes_sent = 1;
> +                                bytes_transferred -= 1;
> +                                break;
> +                            } else if (bytes_sent > 0) {
> +                                qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                                    comp_param[idx].migbuf.buf_index);
> +                                comp_param[idx].migbuf.buf_index = 0;
> +                                set_common_compress_params(&comp_param[idx],
> +                                   ret, len, block, offset, last_stage,
> +                                   cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if (idx < compress_thread_count) {
> +                        last_sent_block = block;
> +                        break;
> +                    } else {
> +                        g_usleep(0);
> +                        goto retry;
> +                    }

No; again this shouldn't be using usleep to do stuff between threads; do
stuff using proper safe thread ops, and probably a queue or something
that holds things to the different threads.

> +                }
>              }
>          }
>      }
> @@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
>  
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
> @@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
>  
> @@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>      }
>  }
>  
> +QemuThread *decompress_threads;
> +
> +static void *do_data_decompress(void *opaque)
> +{
> +    decompress_param *param = opaque;
> +    while (incomming_migration_done == false) {
> +        if (param->state == COM_START) {
> +            uLong pagesize = TARGET_PAGE_SIZE;
> +            if (uncompress((Bytef *)param->des, &pagesize,
> +                    (const Bytef *)param->compbuf, param->len) != Z_OK) {
> +                error_report("Uncompress Failed!\n");

Again \n on error_report.

> +                break;
> +            }
> +            param->state = COM_DONE;
> +        } else {
> +            if (quit_thread) {
> +                break;
> +            }
> +            g_usleep(1);

and the usleep.

> +        }
> +    }
> +    return NULL;
> +}
> +
> +void migrate_decompress_threads_create(int count)
> +{
> +    int i;
> +    decompress_thread_count = count;
> +    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
> +    decomp_param = g_malloc0(sizeof(decompress_param) * count);
> +    quit_thread = false;
> +    for (i = 0; i < count; i++) {
> +        qemu_thread_create(decompress_threads + i, "decompress",
> +            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +void migrate_decompress_threads_join(void)
> +{
> +    int i;
> +    for (i = 0; i < decompress_thread_count; i++) {
> +        qemu_thread_join(decompress_threads + i);
> +    }
> +    g_free(decompress_threads);
> +    g_free(decomp_param);
> +    decompress_threads = NULL;
> +    decomp_param = NULL;
> +}
> +
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
>      int flags = 0, ret = 0;
>      static uint64_t seq_iter;
> +    int len = 0;
> +    uint8_t compbuf[COMPRESS_BUF_SIZE];
>  
>      seq_iter++;
>  
> @@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
>              break;
>          case RAM_SAVE_FLAG_PAGE:
> +            quit_thread = true;
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
>                  error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> @@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> +        case RAM_SAVE_FLAG_COMPRESS_PAGE:
> +            host = host_from_stream_offset(f, addr, flags);
> +            if (!host) {
> +                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> +                ret = -EINVAL;
> +                break;
> +            }
> +
> +            len = qemu_get_be32(f);
> +            qemu_get_buffer(f, compbuf, len);
> +            int idx;
> +retry:
> +            for (idx = 0; idx < decompress_thread_count; idx++) {
> +                if (decomp_param[idx].state == COM_DONE)  {
> +                    memcpy(decomp_param[idx].compbuf, compbuf, len);
> +                    decomp_param[idx].des = host;
> +                    decomp_param[idx].len = len;
> +                    decomp_param[idx].state = COM_START;
> +                    break;
> +                }
> +            }
> +            if (idx == decompress_thread_count) {
> +                g_usleep(0);
> +                goto retry;
> +            }
> +            break;

Same comments as above.

>          case RAM_SAVE_FLAG_XBZRLE:
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> diff --git a/hmp-commands.hx b/hmp-commands.hx
> index e37bc8b..8b93bed 100644
> --- a/hmp-commands.hx
> +++ b/hmp-commands.hx
> @@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle migrations.
>  ETEXI
>  
>      {
> +        .name       = "migrate_set_compress_level",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress level for compress migrations,"
> +                      "the level is a number between 0 and 9, 0 stands for "
> +                      "no compression.\n"
> +                      "1 stands for the fast compress speed while 9 stands for"
> +                      "the highest compress ratio.",
> +        .mhandler.cmd = hmp_migrate_set_compress_level,
> +    },
> +
> +STEXI
> +@item migrate_set_compress_level @var{value}
> +@findex migrate_set_compress_level
> +Set compress level to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_compress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration speed,"
> +                      "the threads should be between 1 and the CPUS of your system",
> +        .mhandler.cmd = hmp_migrate_set_compress_threads,
> +    },
> +
> +STEXI
> +@item migrate_set_compress_threads @var{value}
> +@findex migrate_set_compress_threads
> +Set compress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_decompress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set decompress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration speed,"
> +                      "the threads should be between 1 and the CPUS of your system",
> +        .mhandler.cmd = hmp_migrate_set_decompress_threads,
> +    },
> +
> +STEXI
> +@item migrate_set_decompress_threads @var{value}
> +@findex migrate_set_decompress_threads
> +Set decompress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",
>          .params     = "value",
> @@ -1766,6 +1816,12 @@ show migration status
>  show current migration capabilities
>  @item info migrate_cache_size
>  show current migration XBZRLE cache size
> +@item info migrate_compress_level
> +show current migration compress level
> +@item info migrate_compress_threads
> +show current migration compress threads
> +@item info migrate_decompress_threads
> +show current migration decompress threads
>  @item info balloon
>  show balloon information
>  @item info qtree
> diff --git a/hmp.c b/hmp.c
> index 63d7686..b1936a3 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
>                     qmp_query_migrate_cache_size(NULL) >> 10);
>  }
>  
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress level: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_level(NULL));
> +}
> +
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_threads(NULL));
> +}
> +
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "decompress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_decompress_threads(NULL));
> +}
> +
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict)
>  {
>      CpuInfoList *cpu_list, *cpu;
> @@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
>      }
>  }
>  
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_level(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_decompress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
>  {
>      int64_t value = qdict_get_int(qdict, "value");
> diff --git a/hmp.h b/hmp.h
> index 4bb5dca..b348806 100644
> --- a/hmp.h
> +++ b/hmp.h
> @@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict);
>  void hmp_info_block(Monitor *mon, const QDict *qdict);
>  void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
> @@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_set_password(Monitor *mon, const QDict *qdict);
>  void hmp_expire_password(Monitor *mon, const QDict *qdict);
>  void hmp_eject(Monitor *mon, const QDict *qdict);
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3cb5ba8..03c8e0d 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -49,6 +49,9 @@ struct MigrationState
>      QemuThread thread;
>      QEMUBH *cleanup_bh;
>      QEMUFile *file;
> +    QemuThread *compress_thread;
> +    int compress_thread_count;
> +    int compress_level;
>  
>      int state;
>      MigrationParams params;
> @@ -64,6 +67,7 @@ struct MigrationState
>      int64_t dirty_sync_count;
>  };
>  
> +extern bool incomming_migration_done;
>  void process_incoming_migration(QEMUFile *f);
>  
>  void qemu_start_incoming_migration(const char *uri, Error **errp);
> @@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *);
>  bool migration_has_failed(MigrationState *);
>  MigrationState *migrate_get_current(void);
>  
> +void migrate_compress_threads_create(MigrationState *s);
> +void migrate_compress_threads_join(MigrationState *s);
> +void migrate_decompress_threads_create(int count);
> +void migrate_decompress_threads_join(void);
>  uint64_t ram_bytes_remaining(void);
>  uint64_t ram_bytes_transferred(void);
>  uint64_t ram_bytes_total(void);
> @@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
>  
>  bool migrate_rdma_pin_all(void);
>  bool migrate_zero_blocks(void);
> -
> +bool migrate_use_compress(void);
>  bool migrate_auto_converge(void);
>  
>  int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
> @@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen);
>  
>  int migrate_use_xbzrle(void);
>  int64_t migrate_xbzrle_cache_size(void);
> +int migrate_compress_level(void);
> +int migrate_compress_threads(void);
>  
>  int64_t xbzrle_cache_resize(int64_t new_size);
>  
> diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
> index 401676b..431e6cc 100644
> --- a/include/migration/qemu-file.h
> +++ b/include/migration/qemu-file.h
> @@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input);
>  int qemu_get_fd(QEMUFile *f);
>  int qemu_fclose(QEMUFile *f);
>  int64_t qemu_ftell(QEMUFile *f);
> +uint64_t qemu_add_compress(QEMUFile *f,  const uint8_t *p, int size);

Huh? I don't see the code for this anywhere?

>  void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
>  void qemu_put_byte(QEMUFile *f, int v);
>  /*
> diff --git a/migration.c b/migration.c
> index c49a05a..716de97 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -46,6 +46,12 @@ enum {
>  /* Migration XBZRLE default cache size */
>  #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
>  
> +/* Migration compress default thread count */
> +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
> +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
> +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
> +
>  static NotifierList migration_state_notifiers =
>      NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
>  
> @@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
>          .bandwidth_limit = MAX_THROTTLE,
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
> +        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
>      };
>  
>      return &current_migration;
> @@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
>          error_report("load of migration failed: %s", strerror(-ret));
>          exit(EXIT_FAILURE);
>      }
> +    incomming_migration_done = true;
>      qemu_announce_self();
>  
>      /* Make sure all file formats flush their mutable metadata */
> @@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
>      } else {
>          runstate_set(RUN_STATE_PAUSED);
>      }
> +    migrate_decompress_threads_join();
>  }
>  
> +static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
>  void process_incoming_migration(QEMUFile *f)
>  {
> +    incomming_migration_done = false;
> +    migrate_decompress_threads_create(uncompress_thread_count);
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
>      int fd = qemu_get_fd(f);
>  
> @@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
>          qemu_thread_join(&s->thread);
>          qemu_mutex_lock_iothread();
>  
> +        migrate_compress_threads_join(s);
>          qemu_fclose(s->file);
>          s->file = NULL;
>      }
> @@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>      int64_t bandwidth_limit = s->bandwidth_limit;
>      bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
>      int64_t xbzrle_cache_size = s->xbzrle_cache_size;
> +    int compress_level = s->compress_level;
> +    int compress_thread_count = s->compress_thread_count;
>  
>      memcpy(enabled_capabilities, s->enabled_capabilities,
>             sizeof(enabled_capabilities));
> @@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>             sizeof(enabled_capabilities));
>      s->xbzrle_cache_size = xbzrle_cache_size;
>  
> +    s->compress_level = compress_level;
> +    s->compress_thread_count = compress_thread_count;
>      s->bandwidth_limit = bandwidth_limit;
>      s->state = MIG_STATE_SETUP;
>      trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
>      return migrate_xbzrle_cache_size();
>  }
>  
> +void qmp_migrate_set_compress_level(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 9 || value < 0) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
> +                  "is invalid, please input a integer between 0 and 9. ");
> +        return;
> +    }
> +
> +    s->compress_level = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_level(Error **errp)
> +{
> +    return migrate_compress_level();
> +}
> +
> +void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    s->compress_thread_count = value;
> +}
> +
> +void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
> +{
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    uncompress_thread_count = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_threads(Error **errp)
> +{
> +    return migrate_compress_threads();
> +}
> +
> +int64_t qmp_query_migrate_decompress_threads(Error **errp)
> +{
> +    return uncompress_thread_count;
> +}
> +
>  void qmp_migrate_set_speed(int64_t value, Error **errp)
>  {
>      MigrationState *s;
> @@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
>      return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
>  }
>  
> +bool migrate_use_compress(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
> +}
> +
> +int migrate_compress_level(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_level;
> +}
> +
> +int migrate_compress_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_thread_count;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>  
>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>                         QEMU_THREAD_JOINABLE);
> +    migrate_compress_threads_create(s);
>  }
> diff --git a/monitor.c b/monitor.c
> index 905d8cf..365547e 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
>          .mhandler.cmd = hmp_info_migrate_cache_size,
>      },
>      {
> +        .name       = "migrate_compress_level",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress level",
> +        .mhandler.cmd = hmp_info_migrate_compress_level,
> +    },
> +    {
> +        .name       = "migrate_compress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress thread count",
> +        .mhandler.cmd = hmp_info_migrate_compress_threads,
> +    },
> +    {
> +        .name       = "migrate_decompress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration decompress thread count",
> +        .mhandler.cmd = hmp_info_migrate_decompress_threads,
> +    },
> +    {
>          .name       = "balloon",
>          .args_type  = "",
>          .params     = "",
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 24379ab..71a9e0f 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -491,13 +491,17 @@
>  #          to enable the capability on the source VM. The feature is disabled by
>  #          default. (since 1.6)
>  #
> +# @compress: Using the multiple compression threads to accelerate live migration.
> +#          This feature can help to reduce the migration traffic, by sending
> +#          compressed pages. The feature is disabled by default. (since 2.3)
> +#
>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>  #          to speed up convergence of RAM migration. (since 1.6)
>  #
>  # Since: 1.2
>  ##
>  { 'enum': 'MigrationCapability',
> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress'] }
>  
>  ##
>  # @MigrationCapabilityStatus
> @@ -1382,6 +1386,88 @@
>  { 'command': 'query-migrate-cache-size', 'returns': 'int' }
>  
>  ##
> +# @migrate-set-compress-level
> +#
> +# Set compress level
> +#
> +# @value: compress level int
> +#
> +# The compress level will be an integer between 0 and 9.
> +# The compress level can be modified before and during ongoing migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-level
> +#
> +# query compress level
> +#
> +# Returns: compress level int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
> +
> +##
> +# @migrate-set-compress-threads
> +#
> +# Set compress threads
> +#
> +# @value: compress threads int
> +#
> +# The compress thread count is an integer between 1 and 255.
> +# The compress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-threads
> +#
> +# query compress threads
> +#
> +# Returns: compress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
> +
> +##
> +##
> +# @migrate-set-decompress-threads
> +#
> +# Set decompress threads
> +#
> +# @value: decompress threads int
> +#
> +# The decompress thread count is an integer between 1 and 255.
> +# The decompress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-decompress-threads
> +#
> +# query decompress threads
> +#
> +# Returns: decompress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
> +
> +##
>  # @ObjectPropertyInfo:
>  #
>  # @name: the name of the property
> diff --git a/qmp-commands.hx b/qmp-commands.hx
> index 1abd619..b60fdab 100644
> --- a/qmp-commands.hx
> +++ b/qmp-commands.hx
> @@ -705,7 +705,138 @@ Example:
>  <- { "return": 67108864 }
>  
>  EQMP
> +{
> +        .name       = "migrate-set-compress-level",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
> +    },
> +
> +SQMP
> +migrate-set-compress-level
> +----------------------
> +
> +Set compress level to be used by compress migration, the compress level is an integer
> +between 0 and 9
> +
> +Arguments:
> +
> +- "value": compress level (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-level", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-level",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
> +    },
> +
> +SQMP
> +query-migrate-compress-level
> +------------------------
> +
> +Show compress level to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-level" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-compress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
> +    },
> +
> +SQMP
> +migrate-set-compress-threads
> +----------------------
> +
> +Set compress thread count to be used by compress migration, the compress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": compress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
> +    },
> +
> +SQMP
> +query-migrate-compress-threads
> +------------------------
> +
> +Show compress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-decompress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
> +    },
> +
> +SQMP
> +migrate-set-decompress-threads
> +----------------------
> +
> +Set decompress thread count to be used by compress migration, the decompress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": decompress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
>  
> +EQMP
> +    {
> +        .name       = "query-migrate-decompress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_decompress_threads,
> +    },
> +
> +SQMP
> +query-migrate-decompress-threads
> +------------------------
> +
> +Show decompress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
>      {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",
> -- 
> 1.9.1
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads
  2014-11-06 13:24   ` Dr. David Alan Gilbert
  2014-11-06 13:46     ` Eric Blake
@ 2014-11-07  2:28     ` Li, Liang Z
  1 sibling, 0 replies; 21+ messages in thread
From: Li, Liang Z @ 2014-11-07  2:28 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: quintela@redhat.com, armbru@redhat.com, qemu-devel@nongnu.org,
	lcapitulino@redhat.com, amit.shah@redhat.com, Zhang, Yang Z

>OK, some high level questions:
>
> 1) How does the performance compare to running a separate compressor process in the stream rather than embedding it in the qemu?
>

I have not do the test, so I don't know the performance. Maybe I can do it later.

>  2) Since you're looking at different compression schemes do we need something in the settings to select it, and to say what makes sense
>for the 'compress_level'?   For example I don't know if lz4 or quicklz
>have 1-10 for their compression levels?  How do I know which compression schemes are available on any host?
>

Only the LZ4HC support compression level, which range from 0 to 16. My implementation does not support selecting different compression schemes, it only support selecting different compression level. Using LZ4HC can actually help to improve the performance compared to using zlib, on the other hand, it's not widespread as zlib, and the License is another problem.

Liang  

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-06 12:57   ` Eric Blake
@ 2014-11-21  6:18     ` Zhang, Yang Z
  2014-11-24  2:25     ` Li, Liang Z
  2014-12-08  6:34     ` Li, Liang Z
  2 siblings, 0 replies; 21+ messages in thread
From: Zhang, Yang Z @ 2014-11-21  6:18 UTC (permalink / raw)
  To: Eric Blake, Li, Liang Z, qemu-devel@nongnu.org
  Cc: armbru@redhat.com, lcapitulino@redhat.com

Eric Blake wrote on 2014-11-06:

Hi Eric

Thanks for your review and comment.

> On 11/06/2014 12:08 PM, Li Liang wrote:
>> Instead of sending the guest memory directly, this solution compress 
>> the ram page before sending, after receiving, the data will be 
>> decompressed.
>> This feature can help to reduce the data transferred about 60%, this 
>> is very useful when the network bandwidth is limited, and the 
>> migration time can also be reduced about 70%. The feature is off by 
>> default, following the document docs/multiple-compression-threads.txt
>> for information to use it.
>> 
>> Reviewed-by: Eric Blake <eblake@redhat.com>
> 
> Please DON'T add this line unless the author spelled it out (or if 
> they mentioned that it would be okay if you fix minor issues).  I 
> intentionally omitted a reviewed-by on v1:
> 
> https://lists.gnu.org/archive/html/qemu-devel/2014-11/msg00672.html
> 
> because I was not happy with the patch as it was presented and did not 
> think the work to fix it was trivial.  Furthermore, my review of v1 
> was just over the interface, and not the entire patch; there are very 
> likely still bugs lurking in the .c files.  Once again, I'm going to 
> limit my review of v2 to the interface (at least in this email):
> 
>> Signed-off-by: Li Liang <liang.z.li@intel.com>
>> ---
>> 
>> +++ b/qapi-schema.json
>> @@ -491,13 +491,17 @@
>>  #          to enable the capability on the source VM. The feature is
>>  disabled by #          default. (since 1.6) #
>> +# @compress: Using the multiple compression threads to accelerate
>> +live migration. +#          This feature can help to reduce the
>> migration traffic, by sending +#          compressed pages. The feature
>> is disabled by default. (since 2.3) +#  # @auto-converge: If enabled, 
>> QEMU will automatically throttle down the
> guest
>>  #          to speed up convergence of RAM migration. (since 1.6)
>>  #
>>  # Since: 1.2
>>  ##
>>  { 'enum': 'MigrationCapability',
>> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 
>> }
>> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 
>> + 'compress'] }
>> 
> 
> I'll repeat what I said on v1 (but this time, with some links to back 
> it up :)

Agree. Additional commands lead to complexity. 

> 
> We really need to avoid a proliferation of new commands, two per 
> tunable does not scale well.  I think now is the time to implement my 
> earlier suggestion at making MigrationCapability become THE resource for tunables:
> 
> https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html

I see that you said you are trying to clean up this part. Have you done it? It is helpful if you can provide some draft patches or help on this since you are more professional than us on this part.

Best regards,
Yang



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-06 15:41   ` Dr. David Alan Gilbert
@ 2014-11-21  7:01     ` Li, Liang Z
  0 siblings, 0 replies; 21+ messages in thread
From: Li, Liang Z @ 2014-11-21  7:01 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: quintela@redhat.com, armbru@redhat.com, qemu-devel@nongnu.org,
	lcapitulino@redhat.com, amit.shah@redhat.com, Zhang, Yang Z

> > +static void migrate_put_be32(MigBuf *f, unsigned int v)
> > +{
> > +    migrate_put_byte(f, v >> 24);
> > +    migrate_put_byte(f, v >> 16);
> > +    migrate_put_byte(f, v >> 8);
> > +    migrate_put_byte(f, v);
> > +}
> > +
> > +static void migrate_put_be64(MigBuf *f, uint64_t v)
> > +{
> > +    migrate_put_be32(f, v >> 32);
> > +    migrate_put_be32(f, v);
> > +}
> > +

> This feels like you're doing something very similar to 
> the buffered file code that recently went in; could you
> reuse qemu_bufopen or the QEMUSizedBuffer?
> I think if you could use the qemu_buf somehow (maybe with
> modifications?) then you could avoid a lot of the 'if'd
> code below, because you'd always be working with a QEMUFile,
> it would just be a different QEMUFile.

I will do it in the next version patch.  

> > +static void *do_data_compress(void *opaque)
> > +{
> > +    compress_param *param = opaque;
> > +    while (!quit_thread) {
> > +        if (param->state == COM_START) {
> > +            save_compress_ram_page(param);
> > +            param->state = COM_DONE;
> > +         } else {
> > +             g_usleep(1);
> 
> > There has to be a better way than heaving your thread spin
> > with sleeps; qemu_event or semaphore or something?

I will use QemuCond and QemuMutex  instead.

> > +static int save_compress_ram_page(compress_param *param)
> > +{
> > +    int bytes_sent = param->bytes_sent;
> > +    int blen = COMPRESS_BUF_SIZE;
> > +    int cont = param->cont;
> > +    uint8_t *p = param->p;
> > +    int ret = param->ret;
> > +    RAMBlock *block = param->block;
> > +    ram_addr_t offset = param->offset;
> > +    bool last_stage = param->last_stage;
> > +    /* In doubt sent page as normal */
> > +    XBZRLE_cache_lock();
> > +    ram_addr_t current_addr = block->offset + offset;
> > +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> > +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> > +            if (bytes_sent > 0) {
> > +                atomic_inc(&acct_info.norm_pages);
> > +             } else if (bytes_sent == 0) {
> > +                atomic_inc(&acct_info.dup_pages);
> > +             }
> > +        }
> > +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> > +        atomic_inc(&acct_info.dup_pages);
> > +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
> > +                             RAM_SAVE_FLAG_COMPRESS);
> > +        migrate_put_byte(&param->migbuf, 0);
> > +        bytes_sent++;
> > +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> > +         * page would be stale
> > +         */
> > +        xbzrle_cache_zero_page(current_addr);
> > +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
> > +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
> > +                              offset, cont, last_stage, true);
> > +    }
> > +    XBZRLE_cache_unlock();
> > +    /* XBZRLE overflow or normal page */

> I wonder if it's worth the complexity of doing the zero check
> and the xbzrle if you're already doing compression?  I assume
> zlib is going to handle a zero page reasonably well anyway?

Yes, the test show it's worth, with zero check is time will be shorter. The reason for checking the xbzrle is that  I want the compression co-work with xbzrle, 
using xbzrle can reduce the amount of data transferred.

> > +static uint64_t bytes_transferred;
> > +
> > +static void flush_compressed_data(QEMUFile *f)
> > +{
> > +    int idx;
> > +    if (!migrate_use_compress()) {
> > +        return;
> > +    }
> > +
> > +    for (idx = 0; idx < compress_thread_count; idx++) {
> > +        while (comp_param[idx].state != COM_DONE) {
> > +            g_usleep(0);
> > +        }

> Again, some type of event/semaphore rather than busy sleeping;
> and also I don't understand how the different threads keep everything
> in order - can you add some comments (or maybe notes in the docs)
> that explain how it all works?

I will add some comments in next version.

> >              }
> >          } else {
> > -            bytes_sent = ram_save_page(f, block, offset, last_stage);
> > -
> > -            /* if page is unmodified, continue to the next */
> > -            if (bytes_sent > 0) {
> > -                last_sent_block = block;
> > -                break;
> > +            if (!migrate_use_compress()) {
> > +                bytes_sent = ram_save_page(f, block, offset, last_stage);
> > +                /* if page is unmodified, continue to the next */
> > +                if (bytes_sent > 0) {
> > +                    last_sent_block = block;
> > +                    break;
> > +                }
> > +            } else {
> > +                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> > +                p = memory_region_get_ram_ptr(block->mr) + offset;
> > +                ret = ram_control_save_page(f, block->offset,
> > +                           offset, TARGET_PAGE_SIZE, &len);
> > +                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
> > +                    if (cont == 0) {
> > +                        flush_compressed_data(f);
> > +                    }
> > +                    set_common_compress_params(&comp_param[0],
> > +                        ret, len, block, offset, last_stage, cont,
> > +                        p, ram_bulk_stage);
> > +                    bytes_sent = save_compress_ram_page(&comp_param[0]);
> > +                    if (bytes_sent > 0) {
> > +                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
> > +                            comp_param[0].migbuf.buf_index);
> > +                        comp_param[0].migbuf.buf_index = 0;
> > +                        last_sent_block = block;
> > +                        break;
> > +                    }

> Is there no way to move this down into your save_compress_ram_page?
> When I split the code into ram_find_and_save_block and ram_save_page
> a few months ago, it meant that 'ram_find_and_save_block' only really
> did the work of finding what to send, and 'ram_save_page' figured out
> everything to do with sending it; it would be nice to keep all
> the details of sending it separate still.

I will rewrite the code.

> Since ram_bulk_stage is a static global in this file, why bother passing
> it into the 'compress_params'?  I think you could probably avoid a lot
> of things like that.

Passing ram_bulk_stage to compress_params is important to make things correct.

Liang

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-06 11:08 ` [Qemu-devel] [v2 2/2] migration: Implement " Li Liang
  2014-11-06 12:57   ` Eric Blake
  2014-11-06 15:41   ` Dr. David Alan Gilbert
@ 2014-11-21  7:29   ` ChenLiang
  2014-11-21  7:38     ` Li, Liang Z
  2 siblings, 1 reply; 21+ messages in thread
From: ChenLiang @ 2014-11-21  7:29 UTC (permalink / raw)
  To: Li Liang; +Cc: yang.z.zhang, lcapitulino, qemu-devel, armbru

On 2014/11/6 19:08, Li Liang wrote:

> Instead of sending the guest memory directly, this solution compress
> the ram page before sending, after receiving, the data will be
> decompressed.
> This feature can help to reduce the data transferred about
> 60%, this is very useful when the network bandwidth is limited,
> and the migration time can also be reduced about 70%. The
> feature is off by default, following the document
> docs/multiple-compression-threads.txt for information to use it.
> 
> Reviewed-by: Eric Blake <eblake@redhat.com>
> Signed-off-by: Li Liang <liang.z.li@intel.com>
> ---
>  arch_init.c                   | 435 ++++++++++++++++++++++++++++++++++++++++--
>  hmp-commands.hx               |  56 ++++++
>  hmp.c                         |  57 ++++++
>  hmp.h                         |   6 +
>  include/migration/migration.h |  12 +-
>  include/migration/qemu-file.h |   1 +
>  migration.c                   |  99 ++++++++++
>  monitor.c                     |  21 ++
>  qapi-schema.json              |  88 ++++++++-
>  qmp-commands.hx               | 131 +++++++++++++
>  10 files changed, 890 insertions(+), 16 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 88a5ba0..a27d87b 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -24,6 +24,7 @@
>  #include <stdint.h>
>  #include <stdarg.h>
>  #include <stdlib.h>
> +#include <zlib.h>
>  #ifndef _WIN32
>  #include <sys/types.h>
>  #include <sys/mman.h>
> @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_CONTINUE 0x20
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
> +#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  
>  static struct defconfig_file {
>      const char *filename;
> @@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages;
>  static uint32_t last_version;
>  static bool ram_bulk_stage;
>  
> +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
> +#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
> +struct MigBuf {
> +    int buf_index;
> +    uint8_t buf[MIG_BUF_SIZE];
> +};
> +
> +typedef struct MigBuf MigBuf;
> +
> +static void migrate_put_byte(MigBuf *f, int v)
> +{
> +    f->buf[f->buf_index] = v;
> +    f->buf_index++;
> +}
> +
> +static void migrate_put_be16(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be32(MigBuf *f, unsigned int v)
> +{
> +    migrate_put_byte(f, v >> 24);
> +    migrate_put_byte(f, v >> 16);
> +    migrate_put_byte(f, v >> 8);
> +    migrate_put_byte(f, v);
> +}
> +
> +static void migrate_put_be64(MigBuf *f, uint64_t v)
> +{
> +    migrate_put_be32(f, v >> 32);
> +    migrate_put_be32(f, v);
> +}
> +
> +static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
> +{
> +    int l;
> +
> +    while (size > 0) {
> +        l = MIG_BUF_SIZE - f->buf_index;
> +        if (l > size) {
> +            l = size;
> +        }
> +        memcpy(f->buf + f->buf_index, buf, l);
> +        f->buf_index += l;
> +        buf += l;
> +        size -= l;
> +    }
> +}
> +
> +static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
> +        ram_addr_t offset, int cont, int flag)
> +{
> +    size_t size;
> +
> +    migrate_put_be64(f, offset | cont | flag);
> +    size = 8;
> +
> +    if (!cont) {
> +        migrate_put_byte(f, strlen(block->idstr));
> +        migrate_put_buffer(f, (uint8_t *)block->idstr,
> +                        strlen(block->idstr));
> +        size += 1 + strlen(block->idstr);
> +    }
> +    return size;
> +}
> +
> +static int migrate_qemu_add_compress(MigBuf *f,  const uint8_t *p,
> +        int size, int level)
> +{
> +    uLong  blen = COMPRESS_BUF_SIZE;
> +    if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
> +            size, level) != Z_OK) {
> +        error_report("Compress Failed!\n");
> +        return 0;
> +    }
> +    migrate_put_be32(f, blen);
> +    f->buf_index += blen;
> +    return blen + sizeof(int);
> +}
> +
> +enum {
> +    COM_DONE = 0,
> +    COM_START,
> +};
> +
> +static int  compress_thread_count;
> +static int  decompress_thread_count;
> +
> +struct compress_param {
> +    int state;
> +    MigBuf migbuf;
> +    RAMBlock *block;
> +    ram_addr_t offset;
> +    bool last_stage;
> +    int ret;
> +    int bytes_sent;
> +    uint8_t *p;
> +    int cont;
> +    bool bulk_stage;
> +};
> +
> +typedef struct compress_param compress_param;
> +compress_param *comp_param;
> +
> +struct decompress_param {
> +    int state;
> +    void *des;
> +    uint8 compbuf[COMPRESS_BUF_SIZE];
> +    int len;
> +};
> +typedef struct decompress_param decompress_param;
> +
> +static decompress_param *decomp_param;
> +bool incomming_migration_done;
> +static bool quit_thread;
> +
> +static int save_compress_ram_page(compress_param *param);
> +
> +
> +static void *do_data_compress(void *opaque)
> +{
> +    compress_param *param = opaque;
> +    while (!quit_thread) {
> +        if (param->state == COM_START) {
> +            save_compress_ram_page(param);
> +            param->state = COM_DONE;
> +         } else {
> +             g_usleep(1);
> +         }
> +    }
> +
> +    return NULL;
> +}
> +
> +
> +void migrate_compress_threads_join(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = true;
> +    for (i = 0; i < compress_thread_count; i++) {
> +        qemu_thread_join(s->compress_thread + i);
> +    }
> +    g_free(s->compress_thread);
> +    g_free(comp_param);
> +    s->compress_thread = NULL;
> +    comp_param = NULL;
> +}
> +
> +void migrate_compress_threads_create(MigrationState *s)
> +{
> +    int i;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +    quit_thread = false;
> +    compress_thread_count = s->compress_thread_count;
> +    s->compress_thread = g_malloc0(sizeof(QemuThread)
> +        * s->compress_thread_count);
> +    comp_param = g_malloc0(sizeof(compress_param) * s->compress_thread_count);
> +    for (i = 0; i < s->compress_thread_count; i++) {
> +        qemu_thread_create(s->compress_thread + i, "compress",
> +            do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
> +
> +    }
> +}
> +
>  /* Update the xbzrle cache to reflect a page that's been sent as all 0.
>   * The important thing is that a stale (not-yet-0'd) page be replaced
>   * by the new data.
> @@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t current_addr)
>  
>  #define ENCODING_FLAG_XBZRLE 0x1
>  
> -static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
> +static int save_xbzrle_page(void *f, uint8_t **current_data,
>                              ram_addr_t current_addr, RAMBlock *block,
> -                            ram_addr_t offset, int cont, bool last_stage)
> +                            ram_addr_t offset, int cont, bool last_stage,
> +                            bool save_to_buf)
>  {
>      int encoded_len = 0, bytes_sent = -1;
>      uint8_t *prev_cached_page;
> @@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
>      }
>  
>      /* Send XBZRLE based compressed page */
> -    bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
> -    qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
> -    qemu_put_be16(f, encoded_len);
> -    qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
> +    if (save_to_buf) {
> +        bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
> +        migrate_put_be16((MigBuf *)f, encoded_len);
> +        migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
> +    } else {
> +        bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
> +            cont, RAM_SAVE_FLAG_XBZRLE);
> +        qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
> +        qemu_put_be16((QEMUFile *)f, encoded_len);
> +        qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
> +    }
>      bytes_sent += encoded_len + 1 + 2;
>      acct_info.xbzrle_pages++;
>      acct_info.xbzrle_bytes += bytes_sent;
> @@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>          xbzrle_cache_zero_page(current_addr);
>      } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
>          bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
> -                                      offset, cont, last_stage);
> +                                      offset, cont, last_stage, false);
>          if (!last_stage) {
>              /* Can't send this cached data async, since the cache page
>               * might get updated before it gets to the wire
> @@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int save_compress_ram_page(compress_param *param)
> +{
> +    int bytes_sent = param->bytes_sent;
> +    int blen = COMPRESS_BUF_SIZE;
> +    int cont = param->cont;
> +    uint8_t *p = param->p;
> +    int ret = param->ret;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +    bool last_stage = param->last_stage;
> +    /* In doubt sent page as normal */
> +    XBZRLE_cache_lock();
> +    ram_addr_t current_addr = block->offset + offset;
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                atomic_inc(&acct_info.norm_pages);
> +             } else if (bytes_sent == 0) {
> +                atomic_inc(&acct_info.dup_pages);
> +             }
> +        }
> +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
> +        atomic_inc(&acct_info.dup_pages);
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
> +                             RAM_SAVE_FLAG_COMPRESS);
> +        migrate_put_byte(&param->migbuf, 0);
> +        bytes_sent++;
> +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
> +         * page would be stale
> +         */
> +        xbzrle_cache_zero_page(current_addr);
> +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
> +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
> +                              offset, cont, last_stage, true);
> +    }
> +    XBZRLE_cache_unlock();
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> +    }
> +    return bytes_sent;
> +}
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx;
> +    if (!migrate_use_compress()) {
> +        return;
> +    }
> +
> +    for (idx = 0; idx < compress_thread_count; idx++) {
> +        while (comp_param[idx].state != COM_DONE) {
> +            g_usleep(0);
> +        }
> +        if (comp_param[idx].migbuf.buf_index > 0) {
> +            qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                comp_param[idx].migbuf.buf_index);
> +            bytes_transferred += comp_param[idx].migbuf.buf_index;
> +            comp_param[idx].migbuf.buf_index = 0;
> +        }
> +    }
> +}
> +
> +static inline void set_common_compress_params(compress_param *param,
> +    int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
> +    bool last_stage, int cont, uint8_t *p, bool bulk_stage)
> +{
> +    param->ret = ret;
> +    param->bytes_sent = bytes_sent;
> +    param->block = block;
> +    param->offset = offset;
> +    param->last_stage = last_stage;
> +    param->cont = cont;
> +    param->p = p;
> +    param->bulk_stage = bulk_stage;
> +}
> +
>  /*
>   * ram_find_and_save_block: Finds a page to send and sends it to f
>   *
> @@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      bool complete_round = false;
>      int bytes_sent = 0;
>      MemoryRegion *mr;
> +    int cont, idx, ret, len = -1;
> +    uint8_t *p;
>  
>      if (!block)
>          block = QTAILQ_FIRST(&ram_list.blocks);
> @@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>                  block = QTAILQ_FIRST(&ram_list.blocks);
>                  complete_round = true;
>                  ram_bulk_stage = false;
> +                if (migrate_use_xbzrle()) {
> +                    /* terminate the used thread at this point*/
> +                    flush_compressed_data(f);
> +                    quit_thread = true;
> +                }
>              }
>          } else {
> -            bytes_sent = ram_save_page(f, block, offset, last_stage);
> -
> -            /* if page is unmodified, continue to the next */
> -            if (bytes_sent > 0) {
> -                last_sent_block = block;
> -                break;
> +            if (!migrate_use_compress()) {
> +                bytes_sent = ram_save_page(f, block, offset, last_stage);
> +                /* if page is unmodified, continue to the next */
> +                if (bytes_sent > 0) {
> +                    last_sent_block = block;
> +                    break;
> +                }
> +            } else {
> +                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +                p = memory_region_get_ram_ptr(block->mr) + offset;
> +                ret = ram_control_save_page(f, block->offset,
> +                           offset, TARGET_PAGE_SIZE, &len);
> +                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
> +                    if (cont == 0) {
> +                        flush_compressed_data(f);
> +                    }
> +                    set_common_compress_params(&comp_param[0],
> +                        ret, len, block, offset, last_stage, cont,
> +                        p, ram_bulk_stage);
> +                    bytes_sent = save_compress_ram_page(&comp_param[0]);
> +                    if (bytes_sent > 0) {
> +                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
> +                            comp_param[0].migbuf.buf_index);
> +                        comp_param[0].migbuf.buf_index = 0;
> +                        last_sent_block = block;
> +                        break;
> +                    }
> +                } else {
> +retry:
> +                    for (idx = 0; idx < compress_thread_count; idx++) {
> +                        if (comp_param[idx].state == COM_DONE) {
> +                            bytes_sent = comp_param[idx].migbuf.buf_index;
> +                            if (bytes_sent == 0) {
> +                                set_common_compress_params(&comp_param[idx],
> +                                    ret, len, block, offset, last_stage,
> +                                    cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                bytes_sent = 1;
> +                                bytes_transferred -= 1;
> +                                break;
> +                            } else if (bytes_sent > 0) {
> +                                qemu_put_buffer(f, comp_param[idx].migbuf.buf,
> +                                    comp_param[idx].migbuf.buf_index);
> +                                comp_param[idx].migbuf.buf_index = 0;
> +                                set_common_compress_params(&comp_param[idx],
> +                                   ret, len, block, offset, last_stage,
> +                                   cont, p, ram_bulk_stage);
> +                                comp_param[idx].state = COM_START;
> +                                break;
> +                            }
> +                        }
> +                    }
> +                    if (idx < compress_thread_count) {
> +                        last_sent_block = block;
> +                        break;
> +                    } else {
> +                        g_usleep(0);
> +                        goto retry;
> +                    }
> +                }
>              }
>          }
>      }
> @@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
>  
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
> @@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
>  
> @@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>      }
>  }
>  
> +QemuThread *decompress_threads;
> +
> +static void *do_data_decompress(void *opaque)
> +{
> +    decompress_param *param = opaque;
> +    while (incomming_migration_done == false) {
> +        if (param->state == COM_START) {
> +            uLong pagesize = TARGET_PAGE_SIZE;
> +            if (uncompress((Bytef *)param->des, &pagesize,
> +                    (const Bytef *)param->compbuf, param->len) != Z_OK) {
> +                error_report("Uncompress Failed!\n");
> +                break;
> +            }
> +            param->state = COM_DONE;
> +        } else {
> +            if (quit_thread) {
> +                break;
> +            }
> +            g_usleep(1);
> +        }
> +    }
> +    return NULL;
> +}
> +
> +void migrate_decompress_threads_create(int count)
> +{
> +    int i;
> +    decompress_thread_count = count;
> +    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
> +    decomp_param = g_malloc0(sizeof(decompress_param) * count);
> +    quit_thread = false;
> +    for (i = 0; i < count; i++) {
> +        qemu_thread_create(decompress_threads + i, "decompress",
> +            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +void migrate_decompress_threads_join(void)
> +{
> +    int i;
> +    for (i = 0; i < decompress_thread_count; i++) {
> +        qemu_thread_join(decompress_threads + i);
> +    }
> +    g_free(decompress_threads);
> +    g_free(decomp_param);
> +    decompress_threads = NULL;
> +    decomp_param = NULL;
> +}
> +
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
>      int flags = 0, ret = 0;
>      static uint64_t seq_iter;
> +    int len = 0;
> +    uint8_t compbuf[COMPRESS_BUF_SIZE];
>  
>      seq_iter++;
>  
> @@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
>              break;
>          case RAM_SAVE_FLAG_PAGE:
> +            quit_thread = true;
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
>                  error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> @@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> +        case RAM_SAVE_FLAG_COMPRESS_PAGE:
> +            host = host_from_stream_offset(f, addr, flags);
> +            if (!host) {
> +                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> +                ret = -EINVAL;
> +                break;
> +            }
> +
> +            len = qemu_get_be32(f);
> +            qemu_get_buffer(f, compbuf, len);
> +            int idx;
> +retry:
> +            for (idx = 0; idx < decompress_thread_count; idx++) {
> +                if (decomp_param[idx].state == COM_DONE)  {
> +                    memcpy(decomp_param[idx].compbuf, compbuf, len);
> +                    decomp_param[idx].des = host;
> +                    decomp_param[idx].len = len;
> +                    decomp_param[idx].state = COM_START;
> +                    break;
> +                }
> +            }
> +            if (idx == decompress_thread_count) {
> +                g_usleep(0);
> +                goto retry;
> +            }
> +            break;
>          case RAM_SAVE_FLAG_XBZRLE:
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> diff --git a/hmp-commands.hx b/hmp-commands.hx
> index e37bc8b..8b93bed 100644
> --- a/hmp-commands.hx
> +++ b/hmp-commands.hx
> @@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle migrations.
>  ETEXI
>  
>      {
> +        .name       = "migrate_set_compress_level",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress level for compress migrations,"
> +                      "the level is a number between 0 and 9, 0 stands for "
> +                      "no compression.\n"
> +                      "1 stands for the fast compress speed while 9 stands for"
> +                      "the highest compress ratio.",
> +        .mhandler.cmd = hmp_migrate_set_compress_level,
> +    },
> +
> +STEXI
> +@item migrate_set_compress_level @var{value}
> +@findex migrate_set_compress_level
> +Set compress level to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_compress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set compress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration speed,"
> +                      "the threads should be between 1 and the CPUS of your system",
> +        .mhandler.cmd = hmp_migrate_set_compress_threads,
> +    },
> +
> +STEXI
> +@item migrate_set_compress_threads @var{value}
> +@findex migrate_set_compress_threads
> +Set compress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
> +        .name       = "migrate_set_decompress_threads",
> +        .args_type  = "value:i",
> +        .params     = "value",
> +        .help       = "set decompress thread count for migrations. "
> +                      "a proper thread count will accelerate the migration speed,"
> +                      "the threads should be between 1 and the CPUS of your system",
> +        .mhandler.cmd = hmp_migrate_set_decompress_threads,
> +    },
> +
> +STEXI
> +@item migrate_set_decompress_threads @var{value}
> +@findex migrate_set_decompress_threads
> +Set decompress threads to @var{value}  for compress migrations.
> +ETEXI
> +
> +    {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",
>          .params     = "value",
> @@ -1766,6 +1816,12 @@ show migration status
>  show current migration capabilities
>  @item info migrate_cache_size
>  show current migration XBZRLE cache size
> +@item info migrate_compress_level
> +show current migration compress level
> +@item info migrate_compress_threads
> +show current migration compress threads
> +@item info migrate_decompress_threads
> +show current migration decompress threads
>  @item info balloon
>  show balloon information
>  @item info qtree
> diff --git a/hmp.c b/hmp.c
> index 63d7686..b1936a3 100644
> --- a/hmp.c
> +++ b/hmp.c
> @@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
>                     qmp_query_migrate_cache_size(NULL) >> 10);
>  }
>  
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress level: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_level(NULL));
> +}
> +
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "compress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_compress_threads(NULL));
> +}
> +
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    monitor_printf(mon, "decompress threads: %" PRId64 "\n",
> +                   qmp_query_migrate_decompress_threads(NULL));
> +}
> +
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict)
>  {
>      CpuInfoList *cpu_list, *cpu;
> @@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
>      }
>  }
>  
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_level(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_compress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
> +{
> +    int64_t value = qdict_get_int(qdict, "value");
> +    Error *err = NULL;
> +
> +    qmp_migrate_set_decompress_threads(value, &err);
> +    if (err) {
> +        monitor_printf(mon, "%s\n", error_get_pretty(err));
> +        error_free(err);
> +        return;
> +    }
> +}
> +
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
>  {
>      int64_t value = qdict_get_int(qdict, "value");
> diff --git a/hmp.h b/hmp.h
> index 4bb5dca..b348806 100644
> --- a/hmp.h
> +++ b/hmp.h
> @@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
>  void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_info_cpus(Monitor *mon, const QDict *qdict);
>  void hmp_info_block(Monitor *mon, const QDict *qdict);
>  void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
> @@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
>  void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
> +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
>  void hmp_set_password(Monitor *mon, const QDict *qdict);
>  void hmp_expire_password(Monitor *mon, const QDict *qdict);
>  void hmp_eject(Monitor *mon, const QDict *qdict);
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3cb5ba8..03c8e0d 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -49,6 +49,9 @@ struct MigrationState
>      QemuThread thread;
>      QEMUBH *cleanup_bh;
>      QEMUFile *file;
> +    QemuThread *compress_thread;
> +    int compress_thread_count;
> +    int compress_level;
>  
>      int state;
>      MigrationParams params;
> @@ -64,6 +67,7 @@ struct MigrationState
>      int64_t dirty_sync_count;
>  };
>  
> +extern bool incomming_migration_done;
>  void process_incoming_migration(QEMUFile *f);
>  
>  void qemu_start_incoming_migration(const char *uri, Error **errp);
> @@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *);
>  bool migration_has_failed(MigrationState *);
>  MigrationState *migrate_get_current(void);
>  
> +void migrate_compress_threads_create(MigrationState *s);
> +void migrate_compress_threads_join(MigrationState *s);
> +void migrate_decompress_threads_create(int count);
> +void migrate_decompress_threads_join(void);
>  uint64_t ram_bytes_remaining(void);
>  uint64_t ram_bytes_transferred(void);
>  uint64_t ram_bytes_total(void);
> @@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
>  
>  bool migrate_rdma_pin_all(void);
>  bool migrate_zero_blocks(void);
> -
> +bool migrate_use_compress(void);
>  bool migrate_auto_converge(void);
>  
>  int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
> @@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen);
>  
>  int migrate_use_xbzrle(void);
>  int64_t migrate_xbzrle_cache_size(void);
> +int migrate_compress_level(void);
> +int migrate_compress_threads(void);
>  
>  int64_t xbzrle_cache_resize(int64_t new_size);
>  
> diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
> index 401676b..431e6cc 100644
> --- a/include/migration/qemu-file.h
> +++ b/include/migration/qemu-file.h
> @@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input);
>  int qemu_get_fd(QEMUFile *f);
>  int qemu_fclose(QEMUFile *f);
>  int64_t qemu_ftell(QEMUFile *f);
> +uint64_t qemu_add_compress(QEMUFile *f,  const uint8_t *p, int size);
>  void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
>  void qemu_put_byte(QEMUFile *f, int v);
>  /*
> diff --git a/migration.c b/migration.c
> index c49a05a..716de97 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -46,6 +46,12 @@ enum {
>  /* Migration XBZRLE default cache size */
>  #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
>  
> +/* Migration compress default thread count */
> +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
> +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
> +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
> +
>  static NotifierList migration_state_notifiers =
>      NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
>  
> @@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
>          .bandwidth_limit = MAX_THROTTLE,
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
> +        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
>      };
>  
>      return &current_migration;
> @@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
>          error_report("load of migration failed: %s", strerror(-ret));
>          exit(EXIT_FAILURE);
>      }
> +    incomming_migration_done = true;
>      qemu_announce_self();
>  
>      /* Make sure all file formats flush their mutable metadata */
> @@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
>      } else {
>          runstate_set(RUN_STATE_PAUSED);
>      }
> +    migrate_decompress_threads_join();
>  }
>  
> +static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
>  void process_incoming_migration(QEMUFile *f)
>  {
> +    incomming_migration_done = false;
> +    migrate_decompress_threads_create(uncompress_thread_count);
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
>      int fd = qemu_get_fd(f);
>  
> @@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
>          qemu_thread_join(&s->thread);
>          qemu_mutex_lock_iothread();
>  
> +        migrate_compress_threads_join(s);
>          qemu_fclose(s->file);
>          s->file = NULL;
>      }
> @@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>      int64_t bandwidth_limit = s->bandwidth_limit;
>      bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
>      int64_t xbzrle_cache_size = s->xbzrle_cache_size;
> +    int compress_level = s->compress_level;
> +    int compress_thread_count = s->compress_thread_count;
>  
>      memcpy(enabled_capabilities, s->enabled_capabilities,
>             sizeof(enabled_capabilities));
> @@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
>             sizeof(enabled_capabilities));
>      s->xbzrle_cache_size = xbzrle_cache_size;
>  
> +    s->compress_level = compress_level;
> +    s->compress_thread_count = compress_thread_count;
>      s->bandwidth_limit = bandwidth_limit;
>      s->state = MIG_STATE_SETUP;
>      trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
>      return migrate_xbzrle_cache_size();
>  }
>  
> +void qmp_migrate_set_compress_level(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 9 || value < 0) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
> +                  "is invalid, please input a integer between 0 and 9. ");
> +        return;
> +    }
> +
> +    s->compress_level = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_level(Error **errp)
> +{
> +    return migrate_compress_level();
> +}
> +
> +void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    s->compress_thread_count = value;
> +}
> +
> +void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
> +{
> +
> +    if (value > 255 || value < 1) {
> +        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
> +                  "is invalid, please input a integer between 1 and 255. ");
> +        return;
> +    }
> +
> +    uncompress_thread_count = value;
> +}
> +
> +int64_t qmp_query_migrate_compress_threads(Error **errp)
> +{
> +    return migrate_compress_threads();
> +}
> +
> +int64_t qmp_query_migrate_decompress_threads(Error **errp)
> +{
> +    return uncompress_thread_count;
> +}
> +
>  void qmp_migrate_set_speed(int64_t value, Error **errp)
>  {
>      MigrationState *s;
> @@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
>      return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
>  }
>  
> +bool migrate_use_compress(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
> +}
> +
> +int migrate_compress_level(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_level;
> +}
> +
> +int migrate_compress_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->compress_thread_count;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>  
>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>                         QEMU_THREAD_JOINABLE);
> +    migrate_compress_threads_create(s);


don't create compress_threads always.
It may be better:

if (!migrate_use_xbzrle()) {
    migrate_compress_threads_create(s);
}

BTW, this patch is too big to review. Spliting it into some patch will be welcome.

>  }
> diff --git a/monitor.c b/monitor.c
> index 905d8cf..365547e 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
>          .mhandler.cmd = hmp_info_migrate_cache_size,
>      },
>      {
> +        .name       = "migrate_compress_level",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress level",
> +        .mhandler.cmd = hmp_info_migrate_compress_level,
> +    },
> +    {
> +        .name       = "migrate_compress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration compress thread count",
> +        .mhandler.cmd = hmp_info_migrate_compress_threads,
> +    },
> +    {
> +        .name       = "migrate_decompress_threads",
> +        .args_type  = "",
> +        .params     = "",
> +        .help       = "show current migration decompress thread count",
> +        .mhandler.cmd = hmp_info_migrate_decompress_threads,
> +    },
> +    {
>          .name       = "balloon",
>          .args_type  = "",
>          .params     = "",
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 24379ab..71a9e0f 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -491,13 +491,17 @@
>  #          to enable the capability on the source VM. The feature is disabled by
>  #          default. (since 1.6)
>  #
> +# @compress: Using the multiple compression threads to accelerate live migration.
> +#          This feature can help to reduce the migration traffic, by sending
> +#          compressed pages. The feature is disabled by default. (since 2.3)
> +#
>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>  #          to speed up convergence of RAM migration. (since 1.6)
>  #
>  # Since: 1.2
>  ##
>  { 'enum': 'MigrationCapability',
> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress'] }
>  
>  ##
>  # @MigrationCapabilityStatus
> @@ -1382,6 +1386,88 @@
>  { 'command': 'query-migrate-cache-size', 'returns': 'int' }
>  
>  ##
> +# @migrate-set-compress-level
> +#
> +# Set compress level
> +#
> +# @value: compress level int
> +#
> +# The compress level will be an integer between 0 and 9.
> +# The compress level can be modified before and during ongoing migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-level
> +#
> +# query compress level
> +#
> +# Returns: compress level int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
> +
> +##
> +# @migrate-set-compress-threads
> +#
> +# Set compress threads
> +#
> +# @value: compress threads int
> +#
> +# The compress thread count is an integer between 1 and 255.
> +# The compress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-compress-threads
> +#
> +# query compress threads
> +#
> +# Returns: compress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
> +
> +##
> +##
> +# @migrate-set-decompress-threads
> +#
> +# Set decompress threads
> +#
> +# @value: decompress threads int
> +#
> +# The decompress thread count is an integer between 1 and 255.
> +# The decompress level can be modified only before migration
> +#
> +# Returns: nothing on success
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
> +
> +##
> +# @query-migrate-decompress-threads
> +#
> +# query decompress threads
> +#
> +# Returns: decompress threads int
> +#
> +# Since: 2.3
> +##
> +{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
> +
> +##
>  # @ObjectPropertyInfo:
>  #
>  # @name: the name of the property
> diff --git a/qmp-commands.hx b/qmp-commands.hx
> index 1abd619..b60fdab 100644
> --- a/qmp-commands.hx
> +++ b/qmp-commands.hx
> @@ -705,7 +705,138 @@ Example:
>  <- { "return": 67108864 }
>  
>  EQMP
> +{
> +        .name       = "migrate-set-compress-level",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
> +    },
> +
> +SQMP
> +migrate-set-compress-level
> +----------------------
> +
> +Set compress level to be used by compress migration, the compress level is an integer
> +between 0 and 9
> +
> +Arguments:
> +
> +- "value": compress level (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-level", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-level",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
> +    },
> +
> +SQMP
> +query-migrate-compress-level
> +------------------------
> +
> +Show compress level to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-level" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-compress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
> +    },
> +
> +SQMP
> +migrate-set-compress-threads
> +----------------------
> +
> +Set compress thread count to be used by compress migration, the compress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": compress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
> +
> +EQMP
> +    {
> +        .name       = "query-migrate-compress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
> +    },
> +
> +SQMP
> +query-migrate-compress-threads
> +------------------------
> +
> +Show compress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
> +{
> +        .name       = "migrate-set-decompress-threads",
> +        .args_type  = "value:i",
> +        .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
> +    },
> +
> +SQMP
> +migrate-set-decompress-threads
> +----------------------
> +
> +Set decompress thread count to be used by compress migration, the decompress thread count is an integer
> +between 1 and 255
> +
> +Arguments:
> +
> +- "value": decompress threads (json-int)
> +
> +Example:
> +
> +-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": 536870912 } }
> +<- { "return": {} }
>  
> +EQMP
> +    {
> +        .name       = "query-migrate-decompress-threads",
> +        .args_type  = "",
> +        .mhandler.cmd_new = qmp_marshal_input_query_migrate_decompress_threads,
> +    },
> +
> +SQMP
> +query-migrate-decompress-threads
> +------------------------
> +
> +Show decompress thread count to be used by compress migration
> +
> +returns a json-object with the following information:
> +- "size" : json-int
> +
> +Example:
> +
> +-> { "execute": "query-migrate-compress-threads" }
> +<- { "return": 67108864 }
> +
> +EQMP
>      {
>          .name       = "migrate_set_speed",
>          .args_type  = "value:o",

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-21  7:29   ` ChenLiang
@ 2014-11-21  7:38     ` Li, Liang Z
  2014-11-21  8:17       ` ChenLiang
  0 siblings, 1 reply; 21+ messages in thread
From: Li, Liang Z @ 2014-11-21  7:38 UTC (permalink / raw)
  To: ChenLiang
  Cc: Zhang, Yang Z, lcapitulino@redhat.com, qemu-devel@nongnu.org,
	armbru@redhat.com

> > +int migrate_compress_threads(void)
> > +{
> > +    MigrationState *s;
> > +
> > +    s = migrate_get_current();
> > +
> > +    return s->compress_thread_count;
> > +}
> > +
> >  int migrate_use_xbzrle(void)
> >  {
> >      MigrationState *s;
> > @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
> >  
> >      qemu_thread_create(&s->thread, "migration", migration_thread, s,
> >                         QEMU_THREAD_JOINABLE);
> > +    migrate_compress_threads_create(s);


> don't create compress_threads always.
> It may be better:

> if (!migrate_use_xbzrle()) {
>     migrate_compress_threads_create(s);
> }

Thanks for your comments, in fact,  the multiple thread compression can co-work with xbrzle, which can help to accelerate live migration.

> BTW, this patch is too big to review. Spliting it into some patch will be welcome.

I am doing it.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-21  7:38     ` Li, Liang Z
@ 2014-11-21  8:17       ` ChenLiang
  2014-11-21  8:35         ` Li, Liang Z
                           ` (2 more replies)
  0 siblings, 3 replies; 21+ messages in thread
From: ChenLiang @ 2014-11-21  8:17 UTC (permalink / raw)
  To: Li, Liang Z
  Cc: Zhang, Yang Z, lcapitulino@redhat.com, qemu-devel@nongnu.org,
	armbru@redhat.com

On 2014/11/21 15:38, Li, Liang Z wrote:

>>> +int migrate_compress_threads(void)
>>> +{
>>> +    MigrationState *s;
>>> +
>>> +    s = migrate_get_current();
>>> +
>>> +    return s->compress_thread_count;
>>> +}
>>> +
>>>  int migrate_use_xbzrle(void)
>>>  {
>>>      MigrationState *s;
>>> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>>>  
>>>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>>>                         QEMU_THREAD_JOINABLE);
>>> +    migrate_compress_threads_create(s);
> 
> 
>> don't create compress_threads always.
>> It may be better:
> 
>> if (!migrate_use_xbzrle()) {
>>     migrate_compress_threads_create(s);
>> }
> 
> Thanks for your comments, in fact,  the multiple thread compression can co-work with xbrzle, which can help to accelerate live migration.


hmm, multiple thread compression can't co-work with xbzrle. xbzrle need guarantee
the cache at src is same to dest. But I dont see that below:

+    /* XBZRLE overflow or normal page */
+    if (bytes_sent == -1) {
+        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
+            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
+        blen = migrate_qemu_add_compress(&param->migbuf, p,
+            TARGET_PAGE_SIZE, migrate_compress_level());
+        bytes_sent += blen;
+        atomic_inc(&acct_info.norm_pages);

the code don't update the cache of xbzrle at src.

> 
>> BTW, this patch is too big to review. Spliting it into some patch will be welcome.
> 
> I am doing it.
> 
> 
> 
> 
> 

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-21  8:17       ` ChenLiang
@ 2014-11-21  8:35         ` Li, Liang Z
  2014-11-21  8:38         ` ChenLiang
  2014-11-21  8:39         ` ChenLiang
  2 siblings, 0 replies; 21+ messages in thread
From: Li, Liang Z @ 2014-11-21  8:35 UTC (permalink / raw)
  To: ChenLiang
  Cc: Zhang, Yang Z, lcapitulino@redhat.com, qemu-devel@nongnu.org,
	armbru@redhat.com

> hmm, multiple thread compression can't co-work with xbzrle. xbzrle need guarantee the cache at src is same to dest. But I dont see that below:
>
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
>
> the code don't update the cache of xbzrle at src.

It's updated here..

 +    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
 +        atomic_inc(&acct_info.dup_pages);
 +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, cont,
 +                             RAM_SAVE_FLAG_COMPRESS);
 +        migrate_put_byte(&param->migbuf, 0);
 +        bytes_sent++;
 +        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
 +         * page would be stale
 +         */
 +        xbzrle_cache_zero_page(current_addr);
 +    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
 +        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
 +                              offset, cont, last_stage, true);
 +    }

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-21  8:17       ` ChenLiang
  2014-11-21  8:35         ` Li, Liang Z
@ 2014-11-21  8:38         ` ChenLiang
  2014-11-21  8:39         ` ChenLiang
  2 siblings, 0 replies; 21+ messages in thread
From: ChenLiang @ 2014-11-21  8:38 UTC (permalink / raw)
  To: Li, Liang Z
  Cc: Zhang, Yang Z, armbru@redhat.com, qemu-devel@nongnu.org,
	lcapitulino@redhat.com

On 2014/11/21 16:17, ChenLiang wrote:

> On 2014/11/21 15:38, Li, Liang Z wrote:
> 
>>>> +int migrate_compress_threads(void)
>>>> +{
>>>> +    MigrationState *s;
>>>> +
>>>> +    s = migrate_get_current();
>>>> +
>>>> +    return s->compress_thread_count;
>>>> +}
>>>> +
>>>>  int migrate_use_xbzrle(void)
>>>>  {
>>>>      MigrationState *s;
>>>> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>>>>  
>>>>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>>>>                         QEMU_THREAD_JOINABLE);
>>>> +    migrate_compress_threads_create(s);
>>
>>
>>> don't create compress_threads always.
>>> It may be better:
>>
>>> if (!migrate_use_xbzrle()) {
>>>     migrate_compress_threads_create(s);
>>> }
>>
>> Thanks for your comments, in fact,  the multiple thread compression can co-work with xbrzle, which can help to accelerate live migration.
> 
> 
> hmm, multiple thread compression can't co-work with xbzrle. xbzrle need guarantee
> the cache at src is same to dest. But I dont see that below:
> 
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> 
> the code don't update the cache of xbzrle at src.
> 
>>
>>> BTW, this patch is too big to review. Spliting it into some patch will be welcome.
>>
>> I am doing it.
>>
>>
>>
>>
>>
> 
> 
> 
> 
> 
> 

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-21  8:17       ` ChenLiang
  2014-11-21  8:35         ` Li, Liang Z
  2014-11-21  8:38         ` ChenLiang
@ 2014-11-21  8:39         ` ChenLiang
  2 siblings, 0 replies; 21+ messages in thread
From: ChenLiang @ 2014-11-21  8:39 UTC (permalink / raw)
  To: Li, Liang Z
  Cc: Zhang, Yang Z, armbru@redhat.com, qemu-devel@nongnu.org,
	lcapitulino@redhat.com

On 2014/11/21 16:17, ChenLiang wrote:

> On 2014/11/21 15:38, Li, Liang Z wrote:
> 
>>>> +int migrate_compress_threads(void)
>>>> +{
>>>> +    MigrationState *s;
>>>> +
>>>> +    s = migrate_get_current();
>>>> +
>>>> +    return s->compress_thread_count;
>>>> +}
>>>> +
>>>>  int migrate_use_xbzrle(void)
>>>>  {
>>>>      MigrationState *s;
>>>> @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
>>>>  
>>>>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>>>>                         QEMU_THREAD_JOINABLE);
>>>> +    migrate_compress_threads_create(s);
>>
>>
>>> don't create compress_threads always.
>>> It may be better:
>>
>>> if (!migrate_use_xbzrle()) {
>>>     migrate_compress_threads_create(s);
>>> }
>>
>> Thanks for your comments, in fact,  the multiple thread compression can co-work with xbrzle, which can help to accelerate live migration.
> 
> 
> hmm, multiple thread compression can't co-work with xbzrle. xbzrle need guarantee
> the cache at src is same to dest. But I dont see that below:
> 
> +    /* XBZRLE overflow or normal page */
> +    if (bytes_sent == -1) {
> +        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
> +            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
> +        blen = migrate_qemu_add_compress(&param->migbuf, p,
> +            TARGET_PAGE_SIZE, migrate_compress_level());
> +        bytes_sent += blen;
> +        atomic_inc(&acct_info.norm_pages);
> 
> the code don't update the cache of xbzrle at src.
> 


sorry, I make a mistake. :)

>>
>>> BTW, this patch is too big to review. Spliting it into some patch will be welcome.
>>
>> I am doing it.
>>
>>
>>
>>
>>
> 
> 
> 
> 
> 
> 

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-06 12:57   ` Eric Blake
  2014-11-21  6:18     ` Zhang, Yang Z
@ 2014-11-24  2:25     ` Li, Liang Z
  2014-11-24 17:16       ` Eric Blake
  2014-12-08  6:34     ` Li, Liang Z
  2 siblings, 1 reply; 21+ messages in thread
From: Li, Liang Z @ 2014-11-24  2:25 UTC (permalink / raw)
  To: Eric Blake, qemu-devel@nongnu.org
  Cc: Zhang, Yang Z, armbru@redhat.com, lcapitulino@redhat.com

> >  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
> >  #          to speed up convergence of RAM migration. (since 1.6)
> >  #
> >  # Since: 1.2
> >  ##
> >  { 'enum': 'MigrationCapability',
> > -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 
> > }
> > +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 
> > + 'compress'] }
> >  

> I'll repeat what I said on v1 (but this time, with some links to back it up :)

> We really need to avoid a proliferation of new commands, two per tunable does not scale well.  I think now is the time to implement my earlier suggestion at making MigrationCapability become THE resource for > tunables:

> https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html

Hi, Eric

       I have read your proposal, and I just want to verify if I got it  exactly.  Take the 'compresss-level' parameter for example, according to you suggestion, should I implement a command 'set-migrate-capability compress-level 1', or  'set-migrate-parameter  compress-level 1' ?  if it's the former, how to keep the HMP back compatibility, as you know, the current HMP framework will check the parameter type, the 'int' will be processed differently from 'bool', ;  if it's the latter,  it seems like a ' query-migrate-paramer ' command should be provided to keep consistency, not query-migrate-capability.

Liang


^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-24  2:25     ` Li, Liang Z
@ 2014-11-24 17:16       ` Eric Blake
  0 siblings, 0 replies; 21+ messages in thread
From: Eric Blake @ 2014-11-24 17:16 UTC (permalink / raw)
  To: Li, Liang Z, qemu-devel@nongnu.org
  Cc: Zhang, Yang Z, armbru@redhat.com, lcapitulino@redhat.com

[-- Attachment #1: Type: text/plain, Size: 2275 bytes --]

On 11/23/2014 07:25 PM, Li, Liang Z wrote:
>>>  # @auto-converge: If enabled, QEMU will automatically throttle down the guest
>>>  #          to speed up convergence of RAM migration. (since 1.6)
>>>  #
>>>  # Since: 1.2
>>>  ##
>>>  { 'enum': 'MigrationCapability',
>>> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 
>>> }
>>> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 
>>> + 'compress'] }
>>>  
> 
>> I'll repeat what I said on v1 (but this time, with some links to back it up :)
> 
>> We really need to avoid a proliferation of new commands, two per tunable does not scale well.  I think now is the time to implement my earlier suggestion at making MigrationCapability become THE resource for > tunables:

[please configure your mailer to wrap long lines]

> 
>> https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html
> 
> Hi, Eric
> 
>        I have read your proposal, and I just want to verify if I got it  exactly.  Take the 'compresss-level' parameter for example, according to you suggestion, should I implement a command 'set-migrate-capability compress-level 1', or  'set-migrate-parameter  compress-level 1' ?  if it's the former, how to keep the HMP back compatibility, as you know, the current HMP framework will check the parameter type, the 'int' will be processed differently from 'bool', ;  if it's the latter,  it seems like a ' query-migrate-paramer ' command should be provided to keep consistency, not query-migrate-capability.

HMP back-compat is NOT a problem we need to worry about; it's okay to
break the semantics if something else is easier to represent; it is only
QMP where we have to remain backwards compatible.  We already have
set-migrate-capability, so that seems like the command to extend,
instead of adding a new one.  On the other hand, if it is easier for you
to add a new HMP command that maps correctly to the underlying QMP
command, then that is fine, too.  The point of my proposal is that the
QMP command can use a union to provide the correct typing as needed,
without worrying about what HMP has to do to match that.

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 539 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-11-06 12:57   ` Eric Blake
  2014-11-21  6:18     ` Zhang, Yang Z
  2014-11-24  2:25     ` Li, Liang Z
@ 2014-12-08  6:34     ` Li, Liang Z
  2014-12-10  8:23       ` Li, Liang Z
  2 siblings, 1 reply; 21+ messages in thread
From: Li, Liang Z @ 2014-12-08  6:34 UTC (permalink / raw)
  To: Eric Blake, qemu-devel@nongnu.org
  Cc: Zhang, Yang Z, armbru@redhat.com, lcapitulino@redhat.com

>>  #
>>  # Since: 1.2
>>  ##
>>  { 'enum': 'MigrationCapability',
>> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 
>> }
>> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 
>>+ 'compress'] }
>> 
>
>I'll repeat what I said on v1 (but this time, with some links to back it up :)
>
>We really need to avoid a proliferation of new commands, two per tunable does not scale well.  I think now is the time to implement my earlier suggestion at making MigrationCapability become THE resource for tunables:
>
>https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html

Hi Eric,

     I tried to follow your suggestion to implement a back-compat method to reuse the 'migrate-set-capabilities' and 'query-migrate-capabilities' , I found that I should change a lot of current code to make it work, and I don’t know how to deal with the HMP interface.  So I add 'migrate-set-parameter' and 'query-migrate-parameter' interface to reduce the migrate-tunable commands, they can deal with all the 'int'  type parameter, now the ' compress-threads' ' , 'decompress-threads', and 'compress-level' and be set/queried with the two interfaces. 

{ 'enum': 'MigrationParameter',
  'data': ['compress-level', 'compress-threads', 'decompress-threads'] }

{ 'type': 'MigrationParameterStatus',
  'data': { 'parameter' : 'MigrationParameter', 'value' : 'int' } }

   I am not sure if it's a good solution, but it's much more simple, and it can minimize the change of current code. Is that OK?

Liang 

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [v2 2/2] migration: Implement multiple compression threads
  2014-12-08  6:34     ` Li, Liang Z
@ 2014-12-10  8:23       ` Li, Liang Z
  0 siblings, 0 replies; 21+ messages in thread
From: Li, Liang Z @ 2014-12-10  8:23 UTC (permalink / raw)
  To: 'Eric Blake', 'qemu-devel@nongnu.org'
  Cc: Zhang, Yang Z, 'armbru@redhat.com',
	'lcapitulino@redhat.com'

>>>  ##
>>>  { 'enum': 'MigrationCapability',
>>> -  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] 
>>> }
>>> +  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
>>>+ 'compress'] }
>>> 
>>
>>I'll repeat what I said on v1 (but this time, with some links to back 
>>it up :)
>>
>>We really need to avoid a proliferation of new commands, two per tunable does not scale well.  I think now is the time to implement my earlier suggestion at making MigrationCapability become THE resource for tunables:
>>
>>https://lists.gnu.org/archive/html/qemu-devel/2014-03/msg02274.html
>
>Hi Eric,
>
>     I tried to follow your suggestion to implement a back-compat method to reuse the 'migrate-set-capabilities' and 'query-migrate-capabilities' , I found that I should change a lot of current code to make it work, and I don’t know how to deal with the HMP interface.  So I add 'migrate-set-parameter' and 'query-migrate-parameter' interface to reduce the migrate-tunable commands, they can deal with all the 'int'  type parameter, now the ' compress-threads' ' , 'decompress-threads', and 'compress-level' and be set/queried with the two interfaces. 
>
>{ 'enum': 'MigrationParameter',
>  'data': ['compress-level', 'compress-threads', 'decompress-threads'] }
>
>{ 'type': 'MigrationParameterStatus',
>  'data': { 'parameter' : 'MigrationParameter', 'value' : 'int' } }
>
>   I am not sure if it's a good solution, but it's much more simple, and it can minimize the change of current code. Is that OK?

Hi Eric,

    What do you think about this solution?

Liang 

^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2014-12-10  8:23 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2014-11-06 11:08 [Qemu-devel] [PATCH v2 0/2] migration: Add a new feature to do live migration Li Liang
2014-11-06 11:08 ` [Qemu-devel] [v2 1/2] docs: Add a doc about multiple compression threads Li Liang
2014-11-06 11:25   ` Eric Blake
2014-11-06 13:24   ` Dr. David Alan Gilbert
2014-11-06 13:46     ` Eric Blake
2014-11-07  2:28     ` Li, Liang Z
2014-11-06 11:08 ` [Qemu-devel] [v2 2/2] migration: Implement " Li Liang
2014-11-06 12:57   ` Eric Blake
2014-11-21  6:18     ` Zhang, Yang Z
2014-11-24  2:25     ` Li, Liang Z
2014-11-24 17:16       ` Eric Blake
2014-12-08  6:34     ` Li, Liang Z
2014-12-10  8:23       ` Li, Liang Z
2014-11-06 15:41   ` Dr. David Alan Gilbert
2014-11-21  7:01     ` Li, Liang Z
2014-11-21  7:29   ` ChenLiang
2014-11-21  7:38     ` Li, Liang Z
2014-11-21  8:17       ` ChenLiang
2014-11-21  8:35         ` Li, Liang Z
2014-11-21  8:38         ` ChenLiang
2014-11-21  8:39         ` ChenLiang

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).