From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Liang Li <liang.z.li@intel.com>
Cc: quintela@redhat.com, armbru@redhat.com, qemu-devel@nongnu.org,
lcapitulino@redhat.com, Yang Zhang <yang.z.zhang@intel.com>,
amit.shah@redhat.com
Subject: Re: [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression
Date: Fri, 6 Feb 2015 12:12:27 +0000 [thread overview]
Message-ID: <20150206121226.GH2364@work-vm> (raw)
In-Reply-To: <1422875149-13198-9-git-send-email-liang.z.li@intel.com>
* Liang Li (liang.z.li@intel.com) wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
> arch_init.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
> 1 file changed, 159 insertions(+), 8 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index eae082b..b8bdb16 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -364,16 +364,31 @@ static QemuCond *comp_done_cond;
> /* The empty QEMUFileOps will be used by file in CompressParam */
> static const QEMUFileOps empty_ops = { };
> static bool quit_thread;
> +static int one_byte_count;
Please add a comment here about what one_byte_count is; it's
not obvious, but I can't think of a better name
> static DecompressParam *decomp_param;
> static QemuThread *decompress_threads;
> static uint8_t *compressed_data_buf;
>
> +static int do_compress_ram_page(CompressParam *param);
> +
> static void *do_data_compress(void *opaque)
> {
> - while (!quit_thread) {
> -
> - /* To be done */
> + CompressParam *param = opaque;
>
> + while (!quit_thread) {
This is something I missed on 02/ - can you rename 'quit_thread'
to comp_quit_thread or something, so it's obvious which thread.
> + qemu_mutex_lock(¶m->mutex);
> + while (!param->busy) {
> + qemu_cond_wait(¶m->cond, ¶m->mutex);
> + if (quit_thread) {
> + break;
> + }
> + }
> + qemu_mutex_unlock(¶m->mutex);
> + do_compress_ram_page(param);
> + qemu_mutex_lock(comp_done_lock);
> + param->busy = false;
> + qemu_cond_signal(comp_done_cond);
> + qemu_mutex_unlock(comp_done_lock);
This is interestingly different from your previous version; param->mutex
used to be held all of the time except during the cond_wait itself.
I'm also worried about the quit_thread behaviour; is there
any guarantee that 'terminate_compression_threads' is called
while this code is in the param->cond cond_wait? If terminate_compression_threads
was called while the thread was busy, then the cond_signal on param->cond
would be too early. I'm thinking perhaps you need to check quit_thread before
the cond_wait as well? (It's mostly error cases and migrate_cancel I'm worried
about here).
> }
>
> return NULL;
> @@ -381,9 +396,13 @@ static void *do_data_compress(void *opaque)
>
> static inline void terminate_compression_threads(void)
> {
> - quit_thread = true;
> + int idx, thread_count;
>
> - /* To be done */
> + thread_count = migrate_compress_threads();
> + quit_thread = true;
> + for (idx = 0; idx < thread_count; idx++) {
> + qemu_cond_signal(&comp_param[idx].cond);
> + }
> }
>
> void migrate_compress_threads_join(MigrationState *s)
> @@ -764,12 +783,144 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
> return bytes_sent;
> }
>
> +static int do_compress_ram_page(CompressParam *param)
> +{
> + int bytes_sent, cont;
> + int blen;
> + uint8_t *p;
> + RAMBlock *block = param->block;
> + ram_addr_t offset = param->offset;
> +
> + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> + p = memory_region_get_ram_ptr(block->mr) + offset;
> +
> + bytes_sent = save_block_hdr(param->file, block, offset, cont,
> + RAM_SAVE_FLAG_COMPRESS_PAGE);
> + blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> + migrate_compress_level());
> + bytes_sent += blen;
> + atomic_inc(&acct_info.norm_pages);
> +
> + return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> + qemu_mutex_lock(¶m->mutex);
> + param->busy = true;
> + qemu_cond_signal(¶m->cond);
> + qemu_mutex_unlock(¶m->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> + int idx, len, thread_count;
> +
> + if (!migrate_use_compression()) {
> + return;
> + }
> + thread_count = migrate_compress_threads();
> + for (idx = 0; idx < thread_count; idx++) {
> + if (comp_param[idx].busy) {
> + qemu_mutex_lock(comp_done_lock);
> + while (comp_param[idx].busy) {
> + qemu_cond_wait(comp_done_cond, comp_done_lock);
> + }
> + qemu_mutex_unlock(comp_done_lock);
> + }
> + len = qemu_put_qemu_file(f, comp_param[idx].file);
> + bytes_transferred += len;
> + }
> + if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
> + bytes_transferred -= one_byte_count;
> + one_byte_count = 0;
> + }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> + ram_addr_t offset)
> +{
> + param->block = block;
> + param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> + ram_addr_t offset)
> +{
> + int idx, thread_count, bytes_sent = 0;
> +
> + thread_count = migrate_compress_threads();
> + qemu_mutex_lock(comp_done_lock);
> + while (true) {
> + for (idx = 0; idx < thread_count; idx++) {
> + if (!comp_param[idx].busy) {
> + bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> + set_compress_params(&comp_param[idx], block, offset);
> + start_compression(&comp_param[idx]);
> + if (bytes_sent == 0) {
> + /* set bytes_sent to 1 in this case to prevent migration
> + * from terminating, this 1 byte whill be added to
> + * bytes_transferred later, minus 1 to keep the
> + * bytes_transferred accurate */
> + bytes_sent = 1;
> + if (bytes_transferred <= 0) {
> + one_byte_count++;
> + } else {
> + bytes_transferred -= 1;
> + }
> + }
> + break;
> + }
> + }
> + if (bytes_sent > 0) {
> + break;
> + } else {
> + qemu_cond_wait(comp_done_cond, comp_done_lock);
> + }
> + }
> + qemu_mutex_unlock(comp_done_lock);
> +
> + return bytes_sent;
> +}
> +
> static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
> ram_addr_t offset, bool last_stage)
> {
> int bytes_sent = 0;
> + MemoryRegion *mr = block->mr;
> + uint8_t *p;
>
> - /* To be done*/
> + p = memory_region_get_ram_ptr(mr) + offset;
> + /* When starting the process of a new block, the first page of
> + * the block should be sent out before other pages in the same
> + * block, and all the pages in last block should have been sent
> + * out, keeping this order is important, because the 'cont' flag
> + * is used to avoid resending the block name.
> + */
> + if (block != last_sent_block) {
> + flush_compressed_data(f);
> + bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> + last_stage, NULL);
> + if (bytes_sent == -1) {
> + set_compress_params(&comp_param[0], block, offset);
> + /* Use the qemu thread to compress the data to make sure the
> + * first page is sent out before other pages
> + */
> + bytes_sent = do_compress_ram_page(&comp_param[0]);
> + if (bytes_sent > 0) {
> + qemu_put_qemu_file(f, comp_param[0].file);
> + }
> + }
> + } else {
> + bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> + last_stage, NULL);
> + if (bytes_sent == -1) {
> + bytes_sent = compress_page_with_multi_thread(f, block, offset);
> + }
> + }
>
> return bytes_sent;
> }
> @@ -828,8 +979,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
> return bytes_sent;
> }
>
> -static uint64_t bytes_transferred;
> -
> void acct_update_position(QEMUFile *f, size_t size, bool zero)
> {
> uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1037,6 +1186,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
> i++;
> }
>
> + flush_compressed_data(f);
> qemu_mutex_unlock_ramlist();
>
> /*
> @@ -1083,6 +1233,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> bytes_transferred += bytes_sent;
> }
>
> + flush_compressed_data(f);
> ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> migration_end();
>
> --
> 1.9.1
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
next prev parent reply other threads:[~2015-02-06 12:12 UTC|newest]
Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-02-02 11:05 [Qemu-devel] [PATCH v4 0/13] migration: Add a new feature to do live migration Liang Li
2015-02-02 11:05 ` [Qemu-devel] [v4 01/13] docs: Add a doc about multiple thread compression Liang Li
2015-02-02 11:05 ` [Qemu-devel] [v4 02/13] migration: Add the framework of multi-thread compression Liang Li
2015-02-06 10:11 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 03/13] migration: Add the framework of multi-thread decompression Liang Li
2015-02-06 10:16 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 04/13] qemu-file: Add compression functions to QEMUFile Liang Li
2015-02-06 10:33 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 05/13] arch_init: Alloc and free data struct for compression Liang Li
2015-02-06 10:45 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 06/13] arch_init: Add and free data struct for decompression Liang Li
2015-02-06 10:46 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 07/13] migration: Split the function ram_save_page Liang Li
2015-02-06 11:01 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 08/13] migration: Add the core code of multi-thread compression Liang Li
2015-02-06 12:12 ` Dr. David Alan Gilbert [this message]
2015-02-02 11:05 ` [Qemu-devel] [v4 09/13] migration: Make compression co-work with xbzrle Liang Li
2015-02-06 12:15 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 10/13] migration: Add the core code for decompression Liang Li
2015-02-06 12:27 ` Dr. David Alan Gilbert
2015-02-02 11:05 ` [Qemu-devel] [v4 11/13] migration: Add interface to control compression Liang Li
2015-02-03 22:17 ` Eric Blake
2015-02-02 11:05 ` [Qemu-devel] [v4 12/13] migration: Add command to set migration parameter Liang Li
2015-02-03 23:28 ` Eric Blake
2015-02-04 1:26 ` Li, Liang Z
2015-02-04 2:27 ` Eric Blake
2015-02-02 11:05 ` [Qemu-devel] [v4 13/13] migration: Add command to query " Liang Li
2015-02-03 23:30 ` Eric Blake
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20150206121226.GH2364@work-vm \
--to=dgilbert@redhat.com \
--cc=amit.shah@redhat.com \
--cc=armbru@redhat.com \
--cc=lcapitulino@redhat.com \
--cc=liang.z.li@intel.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
--cc=yang.z.zhang@intel.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.