From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Liang Li <liang.z.li@intel.com>
Cc: quintela@redhat.com, armbru@redhat.com, qemu-devel@nongnu.org,
lcapitulino@redhat.com, Yang Zhang <yang.z.zhang@intel.com>,
amit.shah@redhat.com
Subject: Re: [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression
Date: Fri, 6 Feb 2015 12:12:27 +0000 [thread overview]
Message-ID: <20150206121226.GH2364@work-vm> (raw)
In-Reply-To: <1422875149-13198-9-git-send-email-liang.z.li@intel.com>
* Liang Li (liang.z.li@intel.com) wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
> arch_init.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
> 1 file changed, 159 insertions(+), 8 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index eae082b..b8bdb16 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -364,16 +364,31 @@ static QemuCond *comp_done_cond;
> /* The empty QEMUFileOps will be used by file in CompressParam */
> static const QEMUFileOps empty_ops = { };
> static bool quit_thread;
> +static int one_byte_count;
Please add a comment here about what one_byte_count is; it's
not obvious, but I can't think of a better name
> static DecompressParam *decomp_param;
> static QemuThread *decompress_threads;
> static uint8_t *compressed_data_buf;
>
> +static int do_compress_ram_page(CompressParam *param);
> +
> static void *do_data_compress(void *opaque)
> {
> - while (!quit_thread) {
> -
> - /* To be done */
> + CompressParam *param = opaque;
>
> + while (!quit_thread) {
This is something I missed on 02/ - can you rename 'quit_thread'
to comp_quit_thread or something, so it's obvious which thread.
> + qemu_mutex_lock(¶m->mutex);
> + while (!param->busy) {
> + qemu_cond_wait(¶m->cond, ¶m->mutex);
> + if (quit_thread) {
> + break;
> + }
> + }
> + qemu_mutex_unlock(¶m->mutex);
> + do_compress_ram_page(param);
> + qemu_mutex_lock(comp_done_lock);
> + param->busy = false;
> + qemu_cond_signal(comp_done_cond);
> + qemu_mutex_unlock(comp_done_lock);
This is interestingly different from your previous version; param->mutex
used to be held all of the time except during the cond_wait itself.
I'm also worried about the quit_thread behaviour; is there
any guarantee that 'terminate_compression_threads' is called
while this code is in the param->cond cond_wait? If terminate_compression_threads
was called while the thread was busy, then the cond_signal on param->cond
would be too early. I'm thinking perhaps you need to check quit_thread before
the cond_wait as well? (It's mostly error cases and migrate_cancel I'm worried
about here).
> }
>
> return NULL;
> @@ -381,9 +396,13 @@ static void *do_data_compress(void *opaque)
>
> static inline void terminate_compression_threads(void)
> {
> - quit_thread = true;
> + int idx, thread_count;
>
> - /* To be done */
> + thread_count = migrate_compress_threads();
> + quit_thread = true;
> + for (idx = 0; idx < thread_count; idx++) {
> + qemu_cond_signal(&comp_param[idx].cond);
> + }
> }
>
> void migrate_compress_threads_join(MigrationState *s)
> @@ -764,12 +783,144 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
> return bytes_sent;
> }
>
> +static int do_compress_ram_page(CompressParam *param)
> +{
> + int bytes_sent, cont;
> + int blen;
> + uint8_t *p;
> + RAMBlock *block = param->block;
> + ram_addr_t offset = param->offset;
> +
> + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> + p = memory_region_get_ram_ptr(block->mr) + offset;
> +
> + bytes_sent = save_block_hdr(param->file, block, offset, cont,
> + RAM_SAVE_FLAG_COMPRESS_PAGE);
> + blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> + migrate_compress_level());
> + bytes_sent += blen;
> + atomic_inc(&acct_info.norm_pages);
> +
> + return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> + qemu_mutex_lock(¶m->mutex);
> + param->busy = true;
> + qemu_cond_signal(¶m->cond);
> + qemu_mutex_unlock(¶m->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> + int idx, len, thread_count;
> +
> + if (!migrate_use_compression()) {
> + return;
> + }
> + thread_count = migrate_compress_threads();
> + for (idx = 0; idx < thread_count; idx++) {
> + if (comp_param[idx].busy) {
> + qemu_mutex_lock(comp_done_lock);
> + while (comp_param[idx].busy) {
> + qemu_cond_wait(comp_done_cond, comp_done_lock);
> + }
> + qemu_mutex_unlock(comp_done_lock);
> + }
> + len = qemu_put_qemu_file(f, comp_param[idx].file);
> + bytes_transferred += len;
> + }
> + if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
> + bytes_transferred -= one_byte_count;
> + one_byte_count = 0;
> + }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> + ram_addr_t offset)
> +{
> + param->block = block;
> + param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> + ram_addr_t offset)
> +{
> + int idx, thread_count, bytes_sent = 0;
> +
> + thread_count = migrate_compress_threads();
> + qemu_mutex_lock(comp_done_lock);
> + while (true) {
> + for (idx = 0; idx < thread_count; idx++) {
> + if (!comp_param[idx].busy) {
> + bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> + set_compress_params(&comp_param[idx], block, offset);
> + start_compression(&comp_param[idx]);
> + if (bytes_sent == 0) {
> + /* set bytes_sent to 1 in this case to prevent migration
> + * from terminating, this 1 byte whill be added to
> + * bytes_transferred later, minus 1 to keep the
> + * bytes_transferred accurate */
> + bytes_sent = 1;
> + if (bytes_transferred <= 0) {
> + one_byte_count++;
> + } else {
> + bytes_transferred -= 1;
> + }
> + }
> + break;
> + }
> + }
> + if (bytes_sent > 0) {
> + break;
> + } else {
> + qemu_cond_wait(comp_done_cond, comp_done_lock);
> + }
> + }
> + qemu_mutex_unlock(comp_done_lock);
> +
> + return bytes_sent;
> +}
> +
> static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
> ram_addr_t offset, bool last_stage)
> {
> int bytes_sent = 0;
> + MemoryRegion *mr = block->mr;
> + uint8_t *p;
>
> - /* To be done*/
> + p = memory_region_get_ram_ptr(mr) + offset;
> + /* When starting the process of a new block, the first page of
> + * the block should be sent out before other pages in the same
> + * block, and all the pages in last block should have been sent
> + * out, keeping this order is important, because the 'cont' flag
> + * is used to avoid resending the block name.
> + */
> + if (block != last_sent_block) {
> + flush_compressed_data(f);
> + bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> + last_stage, NULL);
> + if (bytes_sent == -1) {
> + set_compress_params(&comp_param[0], block, offset);
> + /* Use the qemu thread to compress the data to make sure the
> + * first page is sent out before other pages
> + */
> + bytes_sent = do_compress_ram_page(&comp_param[0]);
> + if (bytes_sent > 0) {
> + qemu_put_qemu_file(f, comp_param[0].file);
> + }
> + }
> + } else {
> + bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> + last_stage, NULL);
> + if (bytes_sent == -1) {
> + bytes_sent = compress_page_with_multi_thread(f, block, offset);
> + }
> + }
>
> return bytes_sent;
> }
> @@ -828,8 +979,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
> return bytes_sent;
> }
>
> -static uint64_t bytes_transferred;
> -
> void acct_update_position(QEMUFile *f, size_t size, bool zero)
> {
> uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1037,6 +1186,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
> i++;
> }
>
> + flush_compressed_data(f);
> qemu_mutex_unlock_ramlist();
>
> /*
> @@ -1083,6 +1233,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> bytes_transferred += bytes_sent;
> }
>
> + flush_compressed_data(f);
> ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> migration_end();
>
> --
> 1.9.1
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
next prev parent reply other threads:[~2015-02-06 12:12 UTC|newest]
Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
2015-02-02 11:05 ` [Qemu-devel] [v4 01/13] docs: Add a doc about multiple thread compression Liang Li
2015-02-02 11:05 ` [Qemu-devel] [v4 02/13] migration: Add the framework of multi-thread compression Liang Li
2015-02-06 10:11 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 03/13] migration: Add the framework of multi-thread decompression Liang Li
2015-02-06 10:16 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 04/13] qemu-file: Add compression functions to QEMUFile Liang Li
2015-02-06 10:33 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 05/13] arch_init: Alloc and free data struct for compression Liang Li
2015-02-06 10:45 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 06/13] arch_init: Add and free data struct for decompression Liang Li
2015-02-06 10:46 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 07/13] migration: Split the function ram_save_page Liang Li
2015-02-06 11:01 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression Liang Li
2015-02-06 12:12 ` Dr. David Alan Gilbert [this message]
2015-02-02 11:05 ` [Qemu-devel] [v4 09/13] migration: Make compression co-work with xbzrle Liang Li
2015-02-06 12:15 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 10/13] migration: Add the core code for decompression Liang Li
2015-02-06 12:27 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 11/13] migration: Add interface to control compression Liang Li
2015-02-03 22:17 ` Eric Blake
2015-02-02 11:05 ` [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter Liang Li
2015-02-03 23:28 ` Eric Blake
2015-02-04 1:26 ` Li, Liang Z
2015-02-04 2:27 ` Eric Blake
2015-02-02 11:05 ` [Qemu-devel] [v4 13/13] migration: Add command to query " Liang Li
2015-02-03 23:30 ` Eric Blake
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20150206121226.GH2364@work-vm \
--to=dgilbert@redhat.com \
--cc=amit.shah@redhat.com \
--cc=armbru@redhat.com \
--cc=lcapitulino@redhat.com \
--cc=liang.z.li@intel.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
--cc=yang.z.zhang@intel.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).