From: Paolo Bonzini <pbonzini@redhat.com>
To: "Dr. David Alan Gilbert" <dgilbert@redhat.com>, guangrong.xiao@gmail.com
Cc: mst@redhat.com, mtosatti@redhat.com, qemu-devel@nongnu.org,
kvm@vger.kernel.org, peterx@redhat.com, wei.w.wang@intel.com,
jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com,
cota@braap.org, Xiao Guangrong <xiaoguangrong@tencent.com>
Subject: Re: [Qemu-devel] [PATCH v3 3/5] migration: use threaded workqueue for compression
Date: Fri, 23 Nov 2018 19:22:44 +0100 [thread overview]
Message-ID: <ec42ec4b-652d-807e-2d88-94a2d52e9cec@redhat.com> (raw)
In-Reply-To: <20181123181717.GH2373@work-vm>
On 23/11/18 19:17, Dr. David Alan Gilbert wrote:
> * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> Adapt the compression code to the threaded workqueue
>>
>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
>> ---
>> migration/ram.c | 308 ++++++++++++++++++++------------------------------------
>> 1 file changed, 110 insertions(+), 198 deletions(-)
>>
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 7e7deec4d8..254c08f27b 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -57,6 +57,7 @@
>> #include "qemu/uuid.h"
>> #include "savevm.h"
>> #include "qemu/iov.h"
>> +#include "qemu/threaded-workqueue.h"
>>
>> /***********************************************************/
>> /* ram save/restore */
>> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
>>
>> CompressionStats compression_counters;
>>
>> -struct CompressParam {
>> - bool done;
>> - bool quit;
>> - bool zero_page;
>> - QEMUFile *file;
>> - QemuMutex mutex;
>> - QemuCond cond;
>> - RAMBlock *block;
>> - ram_addr_t offset;
>> -
>> - /* internally used fields */
>> - z_stream stream;
>> - uint8_t *originbuf;
>> -};
>> -typedef struct CompressParam CompressParam;
>> -
>> struct DecompressParam {
>> bool done;
>> bool quit;
>> @@ -377,15 +362,6 @@ struct DecompressParam {
>> };
>> typedef struct DecompressParam DecompressParam;
>>
>> -static CompressParam *comp_param;
>> -static QemuThread *compress_threads;
>> -/* comp_done_cond is used to wake up the migration thread when
>> - * one of the compression threads has finished the compression.
>> - * comp_done_lock is used to co-work with comp_done_cond.
>> - */
>> -static QemuMutex comp_done_lock;
>> -static QemuCond comp_done_cond;
>> -/* The empty QEMUFileOps will be used by file in CompressParam */
>> static const QEMUFileOps empty_ops = { };
>>
>> static QEMUFile *decomp_file;
>> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
>> static QemuMutex decomp_done_lock;
>> static QemuCond decomp_done_cond;
>>
>> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
>> - ram_addr_t offset, uint8_t *source_buf);
>> -
>> -static void *do_data_compress(void *opaque)
>> -{
>> - CompressParam *param = opaque;
>> - RAMBlock *block;
>> - ram_addr_t offset;
>> - bool zero_page;
>> -
>> - qemu_mutex_lock(¶m->mutex);
>> - while (!param->quit) {
>> - if (param->block) {
>> - block = param->block;
>> - offset = param->offset;
>> - param->block = NULL;
>> - qemu_mutex_unlock(¶m->mutex);
>> -
>> - zero_page = do_compress_ram_page(param->file, ¶m->stream,
>> - block, offset, param->originbuf);
>> -
>> - qemu_mutex_lock(&comp_done_lock);
>> - param->done = true;
>> - param->zero_page = zero_page;
>> - qemu_cond_signal(&comp_done_cond);
>> - qemu_mutex_unlock(&comp_done_lock);
>> -
>> - qemu_mutex_lock(¶m->mutex);
>> - } else {
>> - qemu_cond_wait(¶m->cond, ¶m->mutex);
>> - }
>> - }
>> - qemu_mutex_unlock(¶m->mutex);
>> -
>> - return NULL;
>> -}
>> -
>> -static void compress_threads_save_cleanup(void)
>> -{
>> - int i, thread_count;
>> -
>> - if (!migrate_use_compression() || !comp_param) {
>> - return;
>> - }
>> -
>> - thread_count = migrate_compress_threads();
>> - for (i = 0; i < thread_count; i++) {
>> - /*
>> - * we use it as a indicator which shows if the thread is
>> - * properly init'd or not
>> - */
>> - if (!comp_param[i].file) {
>> - break;
>> - }
>> -
>> - qemu_mutex_lock(&comp_param[i].mutex);
>> - comp_param[i].quit = true;
>> - qemu_cond_signal(&comp_param[i].cond);
>> - qemu_mutex_unlock(&comp_param[i].mutex);
>> -
>> - qemu_thread_join(compress_threads + i);
>> - qemu_mutex_destroy(&comp_param[i].mutex);
>> - qemu_cond_destroy(&comp_param[i].cond);
>> - deflateEnd(&comp_param[i].stream);
>> - g_free(comp_param[i].originbuf);
>> - qemu_fclose(comp_param[i].file);
>> - comp_param[i].file = NULL;
>> - }
>> - qemu_mutex_destroy(&comp_done_lock);
>> - qemu_cond_destroy(&comp_done_cond);
>> - g_free(compress_threads);
>> - g_free(comp_param);
>> - compress_threads = NULL;
>> - comp_param = NULL;
>> -}
>> -
>> -static int compress_threads_save_setup(void)
>> -{
>> - int i, thread_count;
>> -
>> - if (!migrate_use_compression()) {
>> - return 0;
>> - }
>> - thread_count = migrate_compress_threads();
>> - compress_threads = g_new0(QemuThread, thread_count);
>> - comp_param = g_new0(CompressParam, thread_count);
>> - qemu_cond_init(&comp_done_cond);
>> - qemu_mutex_init(&comp_done_lock);
>> - for (i = 0; i < thread_count; i++) {
>> - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
>> - if (!comp_param[i].originbuf) {
>> - goto exit;
>> - }
>> -
>> - if (deflateInit(&comp_param[i].stream,
>> - migrate_compress_level()) != Z_OK) {
>> - g_free(comp_param[i].originbuf);
>> - goto exit;
>> - }
>> -
>> - /* comp_param[i].file is just used as a dummy buffer to save data,
>> - * set its ops to empty.
>> - */
>> - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
>> - comp_param[i].done = true;
>> - comp_param[i].quit = false;
>> - qemu_mutex_init(&comp_param[i].mutex);
>> - qemu_cond_init(&comp_param[i].cond);
>> - qemu_thread_create(compress_threads + i, "compress",
>> - do_data_compress, comp_param + i,
>> - QEMU_THREAD_JOINABLE);
>> - }
>> - return 0;
>> -
>> -exit:
>> - compress_threads_save_cleanup();
>> - return -1;
>> -}
>> -
>> /* Multiple fd's */
>>
>> #define MULTIFD_MAGIC 0x11223344U
>> @@ -1909,12 +1766,25 @@ exit:
>> return zero_page;
>> }
>>
>> +struct CompressData {
>> + /* filled by migration thread.*/
>> + RAMBlock *block;
>> + ram_addr_t offset;
>> +
>> + /* filled by compress thread. */
>> + QEMUFile *file;
>> + z_stream stream;
>> + uint8_t *originbuf;
>> + bool zero_page;
>> +};
>> +typedef struct CompressData CompressData;
>> +
>> static void
>> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)
>
> Keep the const?
>> {
>> ram_counters.transferred += bytes_xmit;
>>
>> - if (param->zero_page) {
>> + if (cd->zero_page) {
>> ram_counters.duplicate++;
>> return;
>> }
>> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>> compression_counters.pages++;
>> }
>>
>> +static int compress_thread_data_init(void *request)
>> +{
>> + CompressData *cd = request;
>> +
>> + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
>> + if (!cd->originbuf) {
>> + return -1;
>> + }
>> +
>> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
>> + g_free(cd->originbuf);
>> + return -1;
>> + }
>
> Please print errors if you fail in any case so we can easily tell what
> happened.
>
>> + cd->file = qemu_fopen_ops(NULL, &empty_ops);
>> + return 0;
>> +}
>> +
>> +static void compress_thread_data_fini(void *request)
>> +{
>> + CompressData *cd = request;
>> +
>> + qemu_fclose(cd->file);
>> + deflateEnd(&cd->stream);
>> + g_free(cd->originbuf);
>> +}
>> +
>> +static void compress_thread_data_handler(void *request)
>> +{
>> + CompressData *cd = request;
>> +
>> + /*
>> + * if compression fails, it will be indicated by
>> + * migrate_get_current()->to_dst_file.
>> + */
>> + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
>> + cd->offset, cd->originbuf);
>> +}
>> +
>> +static void compress_thread_data_done(void *request)
>> +{
>> + CompressData *cd = request;
>> + RAMState *rs = ram_state;
>> + int bytes_xmit;
>> +
>> + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
>> + update_compress_thread_counts(cd, bytes_xmit);
>> +}
>> +
>> +static const ThreadedWorkqueueOps compress_ops = {
>> + .thread_request_init = compress_thread_data_init,
>> + .thread_request_uninit = compress_thread_data_fini,
>> + .thread_request_handler = compress_thread_data_handler,
>> + .thread_request_done = compress_thread_data_done,
>> + .request_size = sizeof(CompressData),
>> +};
>> +
>> +static Threads *compress_threads;
>> +
>> static bool save_page_use_compression(RAMState *rs);
>>
>> static void flush_compressed_data(RAMState *rs)
>> {
>> - int idx, len, thread_count;
>> -
>> if (!save_page_use_compression(rs)) {
>> return;
>> }
>> - thread_count = migrate_compress_threads();
>>
>> - qemu_mutex_lock(&comp_done_lock);
>> - for (idx = 0; idx < thread_count; idx++) {
>> - while (!comp_param[idx].done) {
>> - qemu_cond_wait(&comp_done_cond, &comp_done_lock);
>> - }
>> - }
>> - qemu_mutex_unlock(&comp_done_lock);
>> + threaded_workqueue_wait_for_requests(compress_threads);
>> +}
>>
>> - for (idx = 0; idx < thread_count; idx++) {
>> - qemu_mutex_lock(&comp_param[idx].mutex);
>> - if (!comp_param[idx].quit) {
>> - len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
>> - /*
>> - * it's safe to fetch zero_page without holding comp_done_lock
>> - * as there is no further request submitted to the thread,
>> - * i.e, the thread should be waiting for a request at this point.
>> - */
>> - update_compress_thread_counts(&comp_param[idx], len);
>> - }
>> - qemu_mutex_unlock(&comp_param[idx].mutex);
>> +static void compress_threads_save_cleanup(void)
>> +{
>> + if (!compress_threads) {
>> + return;
>> }
>> +
>> + threaded_workqueue_destroy(compress_threads);
>> + compress_threads = NULL;
>> }
>>
>> -static inline void set_compress_params(CompressParam *param, RAMBlock *block,
>> - ram_addr_t offset)
>> +static int compress_threads_save_setup(void)
>> {
>> - param->block = block;
>> - param->offset = offset;
>> + if (!migrate_use_compression()) {
>> + return 0;
>> + }
>> +
>> + compress_threads = threaded_workqueue_create("compress",
>> + migrate_compress_threads(),
>> + DEFAULT_THREAD_REQUEST_NR, &compress_ops);
>> + return compress_threads ? 0 : -1;
>> }
>>
>> static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
>> ram_addr_t offset)
>> {
>> - int idx, thread_count, bytes_xmit = -1, pages = -1;
>> + CompressData *cd;
>> bool wait = migrate_compress_wait_thread();
>>
>> - thread_count = migrate_compress_threads();
>> - qemu_mutex_lock(&comp_done_lock);
>> retry:
>> - for (idx = 0; idx < thread_count; idx++) {
>> - if (comp_param[idx].done) {
>> - comp_param[idx].done = false;
>> - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
>> - qemu_mutex_lock(&comp_param[idx].mutex);
>> - set_compress_params(&comp_param[idx], block, offset);
>> - qemu_cond_signal(&comp_param[idx].cond);
>> - qemu_mutex_unlock(&comp_param[idx].mutex);
>> - pages = 1;
>> - update_compress_thread_counts(&comp_param[idx], bytes_xmit);
>> - break;
>> + cd = threaded_workqueue_get_request(compress_threads);
>> + if (!cd) {
>> + /*
>> + * wait for the free thread if the user specifies
>> + * 'compress-wait-thread', otherwise we will post
>> + * the page out in the main thread as normal page.
>> + */
>> + if (wait) {
>> + cpu_relax();
>> + goto retry;
>
> Is there nothing better we can use to wait without eating CPU time?
There is a mechanism to wait without eating CPU time in the data
structure, but it makes sense to busy wait. There are 4 threads in the
workqueue, so you have to compare 1/4th of the time spent compressing a
page, with the trip into the kernel to wake you up. You're adding 20%
CPU usage, but I'm not surprised it's worthwhile.
Paolo
next prev parent reply other threads:[~2018-11-23 18:29 UTC|newest]
Thread overview: 35+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-11-22 7:20 [Qemu-devel] [PATCH v3 0/5] migration: improve multithreads guangrong.xiao
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 1/5] bitops: introduce change_bit_atomic guangrong.xiao
2018-11-23 10:23 ` Dr. David Alan Gilbert
2018-11-28 9:35 ` Juan Quintela
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 2/5] util: introduce threaded workqueue guangrong.xiao
2018-11-23 11:02 ` Dr. David Alan Gilbert
2018-11-26 7:57 ` Xiao Guangrong
2018-11-26 10:56 ` Dr. David Alan Gilbert
2018-11-27 7:17 ` Xiao Guangrong
2018-11-26 18:55 ` Emilio G. Cota
2018-11-27 8:30 ` Xiao Guangrong
2018-11-24 0:12 ` Emilio G. Cota
2018-11-26 8:06 ` Xiao Guangrong
2018-11-26 18:49 ` Emilio G. Cota
2018-11-27 8:29 ` Xiao Guangrong
2018-11-24 0:17 ` Emilio G. Cota
2018-11-26 8:18 ` Xiao Guangrong
2018-11-26 10:28 ` Paolo Bonzini
2018-11-27 8:31 ` Xiao Guangrong
2018-11-27 12:49 ` Christophe de Dinechin
2018-11-27 13:51 ` Paolo Bonzini
2018-12-04 15:49 ` Christophe de Dinechin
2018-12-04 17:16 ` Paolo Bonzini
2018-12-10 3:23 ` Xiao Guangrong
2018-11-27 17:39 ` Emilio G. Cota
2018-11-28 8:55 ` Xiao Guangrong
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 3/5] migration: use threaded workqueue for compression guangrong.xiao
2018-11-23 18:17 ` Dr. David Alan Gilbert
2018-11-23 18:22 ` Paolo Bonzini [this message]
2018-11-23 18:29 ` Dr. David Alan Gilbert
2018-11-26 8:00 ` Xiao Guangrong
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 4/5] migration: use threaded workqueue for decompression guangrong.xiao
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 5/5] tests: add threaded-workqueue-bench guangrong.xiao
2018-11-22 21:25 ` [Qemu-devel] [PATCH v3 0/5] migration: improve multithreads no-reply
2018-11-22 21:35 ` no-reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=ec42ec4b-652d-807e-2d88-94a2d52e9cec@redhat.com \
--to=pbonzini@redhat.com \
--cc=cota@braap.org \
--cc=dgilbert@redhat.com \
--cc=eblake@redhat.com \
--cc=guangrong.xiao@gmail.com \
--cc=jiang.biao2@zte.com.cn \
--cc=kvm@vger.kernel.org \
--cc=mst@redhat.com \
--cc=mtosatti@redhat.com \
--cc=peterx@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
--cc=wei.w.wang@intel.com \
--cc=xiaoguangrong@tencent.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).