qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Liang Li <liang.z.li@intel.com>
To: qemu-devel@nongnu.org
Cc: quintela@redhat.com, Liang Li <liang.z.li@intel.com>,
	armbru@redhat.com, lcapitulino@redhat.com,
	Yang Zhang <yang.z.zhang@intel.com>,
	amit.shah@redhat.com, dgilbert@redhat.com
Subject: [Qemu-devel] [v5 08/12] migration: Add the core code of multi-thread compression
Date: Wed, 11 Feb 2015 11:06:22 +0800	[thread overview]
Message-ID: <1423623986-590-9-git-send-email-liang.z.li@intel.com> (raw)
In-Reply-To: <1423623986-590-1-git-send-email-liang.z.li@intel.com>

Implement the core logic of the multiple thread compression. At this
point, multiple thread compression can't co-work with xbzrle yet.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 193 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 185 insertions(+), 8 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index fe062db..17b7f15 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock;
 static QemuCond *comp_done_cond;
 /* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
+
+/* one_byte_count is used to count the bytes that is added to
+ * bytes_transferred but not actually transferred, at the proper
+ * time, we should sub one_byte_count from bytes_transferred to
+ * make bytes_transferred accurate.
+ */
+static int one_byte_count;
 static bool quit_comp_thread;
 static bool quit_decomp_thread;
 static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
 static uint8_t *compressed_data_buf;
 
+static int do_compress_ram_page(CompressParam *param);
+
 static void *do_data_compress(void *opaque)
 {
-    while (!quit_comp_thread) {
-
-    /* To be done */
+    CompressParam *param = opaque;
 
+    while (!quit_comp_thread) {
+        qemu_mutex_lock(&param->mutex);
+        /* Re-check the quit_comp_thread in case of
+         * terminate_compression_threads is called just before
+         * qemu_mutex_lock(&param->mutex) and after
+         * while(!quit_comp_thread), re-check it here can make
+         * sure the compression thread terminate as expected.
+         */
+        while (!param->busy && !quit_comp_thread) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+        }
+        qemu_mutex_unlock(&param->mutex);
+        if (!quit_comp_thread) {
+            do_compress_ram_page(param);
+        }
+        qemu_mutex_lock(comp_done_lock);
+        param->busy = false;
+        qemu_cond_signal(comp_done_cond);
+        qemu_mutex_unlock(comp_done_lock);
     }
 
     return NULL;
@@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque)
 
 static inline void terminate_compression_threads(void)
 {
-    quit_comp_thread = true;
+    int idx, thread_count;
 
-    /* To be done */
+    thread_count = migrate_compress_threads();
+    quit_comp_thread = true;
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_mutex_lock(&comp_param[idx].mutex);
+        qemu_cond_signal(&comp_param[idx].cond);
+        qemu_mutex_unlock(&comp_param[idx].mutex);
+    }
 }
 
 void migrate_compress_threads_join(MigrationState *s)
@@ -770,12 +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
     return bytes_sent;
 }
 
+static int do_compress_ram_page(CompressParam *param)
+{
+    int bytes_sent, cont;
+    int blen;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+    p = memory_region_get_ram_ptr(block->mr) + offset;
+
+    bytes_sent = save_block_hdr(param->file, block, offset, cont,
+                                RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+                                     migrate_compress_level());
+    bytes_sent += blen;
+    atomic_inc(&acct_info.norm_pages);
+
+    return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->busy = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        if (comp_param[idx].busy) {
+            qemu_mutex_lock(comp_done_lock);
+            while (comp_param[idx].busy && !quit_comp_thread) {
+                qemu_cond_wait(comp_done_cond, comp_done_lock);
+            }
+            qemu_mutex_unlock(comp_done_lock);
+        }
+        len = qemu_put_qemu_file(f, comp_param[idx].file);
+        bytes_transferred += len;
+    }
+    if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
+        bytes_transferred -= one_byte_count;
+        one_byte_count = 0;
+    }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+                                       ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+                                           ram_addr_t offset)
+{
+    int idx, thread_count, bytes_sent = 0;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(comp_done_lock);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (!comp_param[idx].busy) {
+                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx], block, offset);
+                start_compression(&comp_param[idx]);
+                if (bytes_sent == 0) {
+                    /* set bytes_sent to 1 in this case to prevent migration
+                     * from terminating, this 1 byte will be added to
+                     * bytes_transferred later, minus 1 to keep the
+                     * bytes_transferred accurate */
+                    bytes_sent = 1;
+                    if (bytes_transferred <= 0) {
+                        one_byte_count++;
+                    } else {
+                        bytes_transferred -= 1;
+                    }
+                }
+                break;
+            }
+        }
+        if (bytes_sent > 0) {
+            break;
+        } else {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+    }
+    qemu_mutex_unlock(comp_done_lock);
+
+    return bytes_sent;
+}
+
 static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
                                     ram_addr_t offset, bool last_stage)
 {
     int bytes_sent = -1;
+    MemoryRegion *mr = block->mr;
+    uint8_t *p;
+    int ret;
+    int cont;
 
-    /* To be done*/
+    p = memory_region_get_ram_ptr(mr) + offset;
+    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+    ret = ram_control_save_page(f, block->offset,
+                                offset, TARGET_PAGE_SIZE, &bytes_sent);
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_sent > 0) {
+                acct_info.norm_pages++;
+            } else if (bytes_sent == 0) {
+                acct_info.dup_pages++;
+            }
+        }
+    } else {
+        /* When starting the process of a new block, the first page of
+         * the block should be sent out before other pages in the same
+         * block, and all the pages in last block should have been sent
+         * out, keeping this order is important, because the 'cont' flag
+         * is used to avoid resending the block name.
+         */
+        if (block != last_sent_block) {
+            flush_compressed_data(f);
+            bytes_sent = save_zero_page(f, block, offset, p, cont);
+            if (bytes_sent == -1) {
+                set_compress_params(&comp_param[0], block, offset);
+                /* Use the qemu thread to compress the data to make sure the
+                 * first page is sent out before other pages
+                 */
+                bytes_sent = do_compress_ram_page(&comp_param[0]);
+                if (bytes_sent > 0) {
+                    qemu_put_qemu_file(f, comp_param[0].file);
+                }
+            }
+        } else {
+            bytes_sent = save_zero_page(f, block, offset, p, cont);
+            if (bytes_sent == -1) {
+                bytes_sent = compress_page_with_multi_thread(f, block, offset);
+            }
+        }
+    }
 
     return bytes_sent;
 }
@@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     return bytes_sent;
 }
 
-static uint64_t bytes_transferred;
-
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
     uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -1043,6 +1218,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
+    flush_compressed_data(f);
     qemu_mutex_unlock_ramlist();
 
     /*
@@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         bytes_transferred += bytes_sent;
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
-- 
1.9.1

  parent reply	other threads:[~2015-02-11  3:15 UTC|newest]

Thread overview: 43+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-02-11  3:06 [Qemu-devel] [PATCH v5 0/12] migration: Add a new feature to do live migration Liang Li
2015-02-11  3:06 ` [Qemu-devel] [v5 01/12] docs: Add a doc about multiple thread compression Liang Li
2015-02-11  8:46   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 02/12] migration: Add the framework of multi-thread compression Liang Li
2015-02-11  8:52   ` Juan Quintela
2015-02-11  8:55   ` Juan Quintela
2015-02-11 11:10   ` Juan Quintela
2015-02-12  7:24     ` Li, Liang Z
2015-02-12 12:31       ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 03/12] migration: Add the framework of multi-thread decompression Liang Li
2015-02-11  8:52   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 04/12] qemu-file: Add compression functions to QEMUFile Liang Li
2015-02-12 12:06   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 05/12] arch_init: Alloc and free data struct for compression Liang Li
2015-02-11  9:03   ` Juan Quintela
2015-02-12  5:32     ` Li, Liang Z
2015-02-12 12:05       ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 06/12] arch_init: Add and free data struct for decompression Liang Li
2015-02-11  9:04   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 07/12] migration: Split the function ram_save_page Liang Li
2015-02-11  9:08   ` Juan Quintela
2015-02-11 11:02   ` Juan Quintela
2015-02-11  3:06 ` Liang Li [this message]
2015-02-11 11:44   ` [Qemu-devel] [v5 08/12] migration: Add the core code of multi-thread compression Juan Quintela
2015-02-12  7:43     ` Li, Liang Z
2015-03-02  2:46     ` Li, Liang Z
2015-02-11  3:06 ` [Qemu-devel] [v5 09/12] migration: Make compression co-work with xbzrle Liang Li
2015-02-11 11:46   ` Juan Quintela
2015-02-12  2:24     ` Li, Liang Z
2015-02-12 12:22       ` Juan Quintela
2015-02-12 15:10         ` Li, Liang Z
2015-02-11  3:06 ` [Qemu-devel] [v5 10/12] migration: Add the core code for decompression Liang Li
2015-02-11 11:48   ` Juan Quintela
2015-02-11  3:06 ` [Qemu-devel] [v5 11/12] migration: Add interface to control compression Liang Li
2015-02-11  3:06 ` [Qemu-devel] [v5 12/12] migration: Add commands to set and query parameter Liang Li
2015-02-11 11:53   ` Juan Quintela
2015-03-11 16:57   ` Dr. David Alan Gilbert
2015-03-12 10:30   ` Markus Armbruster
2015-03-19  2:30     ` Li, Liang Z
2015-03-19  7:49       ` Markus Armbruster
2015-03-19 14:21         ` Li, Liang Z
2015-03-20  5:16           ` Li, Liang Z
2015-03-19 14:33       ` Eric Blake

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1423623986-590-9-git-send-email-liang.z.li@intel.com \
    --to=liang.z.li@intel.com \
    --cc=amit.shah@redhat.com \
    --cc=armbru@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=lcapitulino@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    --cc=yang.z.zhang@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).