From: guangrong.xiao@gmail.com
To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com
Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com,
peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn,
eblake@redhat.com, quintela@redhat.com, cota@braap.org,
Xiao Guangrong <xiaoguangrong@tencent.com>
Subject: [Qemu-devel] [PATCH v3 4/5] migration: use threaded workqueue for decompression
Date: Thu, 22 Nov 2018 15:20:27 +0800 [thread overview]
Message-ID: <20181122072028.22819-5-xiaoguangrong@tencent.com> (raw)
In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com>
From: Xiao Guangrong <xiaoguangrong@tencent.com>
Adapt the compression code to the threaded workqueue
Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
migration/ram.c | 222 ++++++++++++++++++++------------------------------------
1 file changed, 77 insertions(+), 145 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index 254c08f27b..ccec59c35e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus;
CompressionStats compression_counters;
-struct DecompressParam {
- bool done;
- bool quit;
- QemuMutex mutex;
- QemuCond cond;
- void *des;
- uint8_t *compbuf;
- int len;
- z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
static const QEMUFileOps empty_ops = { };
static QEMUFile *decomp_file;
-static DecompressParam *decomp_param;
-static QemuThread *decompress_threads;
-static QemuMutex decomp_done_lock;
-static QemuCond decomp_done_cond;
/* Multiple fd's */
@@ -3399,6 +3383,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
}
}
+
/* return the size after decompression, or negative value on error */
static int
qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
@@ -3424,166 +3409,113 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
return stream->total_out;
}
-static void *do_data_decompress(void *opaque)
-{
- DecompressParam *param = opaque;
- unsigned long pagesize;
- uint8_t *des;
- int len, ret;
-
- qemu_mutex_lock(¶m->mutex);
- while (!param->quit) {
- if (param->des) {
- des = param->des;
- len = param->len;
- param->des = 0;
- qemu_mutex_unlock(¶m->mutex);
-
- pagesize = TARGET_PAGE_SIZE;
-
- ret = qemu_uncompress_data(¶m->stream, des, pagesize,
- param->compbuf, len);
- if (ret < 0 && migrate_get_current()->decompress_error_check) {
- error_report("decompress data failed");
- qemu_file_set_error(decomp_file, ret);
- }
+struct DecompressData {
+ /* filled by migration thread.*/
+ void *des;
+ uint8_t *compbuf;
+ size_t len;
- qemu_mutex_lock(&decomp_done_lock);
- param->done = true;
- qemu_cond_signal(&decomp_done_cond);
- qemu_mutex_unlock(&decomp_done_lock);
+ z_stream stream;
+};
+typedef struct DecompressData DecompressData;
- qemu_mutex_lock(¶m->mutex);
- } else {
- qemu_cond_wait(¶m->cond, ¶m->mutex);
- }
+static Threads *decompress_threads;
+
+static int decompress_thread_data_init(void *request)
+{
+ DecompressData *dd = request;
+
+ if (inflateInit(&dd->stream) != Z_OK) {
+ return -1;
}
- qemu_mutex_unlock(¶m->mutex);
- return NULL;
+ dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ return 0;
}
-static int wait_for_decompress_done(void)
+static void decompress_thread_data_fini(void *request)
{
- int idx, thread_count;
+ DecompressData *dd = request;
- if (!migrate_use_compression()) {
- return 0;
- }
+ inflateEnd(&dd->stream);
+ g_free(dd->compbuf);
+}
- thread_count = migrate_decompress_threads();
- qemu_mutex_lock(&decomp_done_lock);
- for (idx = 0; idx < thread_count; idx++) {
- while (!decomp_param[idx].done) {
- qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
- }
+static void decompress_thread_data_handler(void *request)
+{
+ DecompressData *dd = request;
+ unsigned long pagesize = TARGET_PAGE_SIZE;
+ int ret;
+
+ ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize,
+ dd->compbuf, dd->len);
+ if (ret < 0 && migrate_get_current()->decompress_error_check) {
+ error_report("decompress data failed");
+ qemu_file_set_error(decomp_file, ret);
}
- qemu_mutex_unlock(&decomp_done_lock);
- return qemu_file_get_error(decomp_file);
}
-static void compress_threads_load_cleanup(void)
+static void decompress_thread_data_done(void *request)
{
- int i, thread_count;
+}
+
+static const ThreadedWorkqueueOps decompress_ops = {
+ .thread_request_init = decompress_thread_data_init,
+ .thread_request_uninit = decompress_thread_data_fini,
+ .thread_request_handler = decompress_thread_data_handler,
+ .thread_request_done = decompress_thread_data_done,
+ .request_size = sizeof(DecompressData),
+};
+static int decompress_init(QEMUFile *f)
+{
if (!migrate_use_compression()) {
- return;
+ return 0;
}
- thread_count = migrate_decompress_threads();
- for (i = 0; i < thread_count; i++) {
- /*
- * we use it as a indicator which shows if the thread is
- * properly init'd or not
- */
- if (!decomp_param[i].compbuf) {
- break;
- }
- qemu_mutex_lock(&decomp_param[i].mutex);
- decomp_param[i].quit = true;
- qemu_cond_signal(&decomp_param[i].cond);
- qemu_mutex_unlock(&decomp_param[i].mutex);
- }
- for (i = 0; i < thread_count; i++) {
- if (!decomp_param[i].compbuf) {
- break;
- }
+ decomp_file = f;
+ decompress_threads = threaded_workqueue_create("decompress",
+ migrate_decompress_threads(),
+ DEFAULT_THREAD_REQUEST_NR, &decompress_ops);
+ return decompress_threads ? 0 : -1;
+}
- qemu_thread_join(decompress_threads + i);
- qemu_mutex_destroy(&decomp_param[i].mutex);
- qemu_cond_destroy(&decomp_param[i].cond);
- inflateEnd(&decomp_param[i].stream);
- g_free(decomp_param[i].compbuf);
- decomp_param[i].compbuf = NULL;
+static void decompress_fini(void)
+{
+ if (!decompress_threads) {
+ return;
}
- g_free(decompress_threads);
- g_free(decomp_param);
+
+ threaded_workqueue_destroy(decompress_threads);
decompress_threads = NULL;
- decomp_param = NULL;
decomp_file = NULL;
}
-static int compress_threads_load_setup(QEMUFile *f)
+static int flush_decompressed_data(void)
{
- int i, thread_count;
-
if (!migrate_use_compression()) {
return 0;
}
- thread_count = migrate_decompress_threads();
- decompress_threads = g_new0(QemuThread, thread_count);
- decomp_param = g_new0(DecompressParam, thread_count);
- qemu_mutex_init(&decomp_done_lock);
- qemu_cond_init(&decomp_done_cond);
- decomp_file = f;
- for (i = 0; i < thread_count; i++) {
- if (inflateInit(&decomp_param[i].stream) != Z_OK) {
- goto exit;
- }
-
- decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
- qemu_mutex_init(&decomp_param[i].mutex);
- qemu_cond_init(&decomp_param[i].cond);
- decomp_param[i].done = true;
- decomp_param[i].quit = false;
- qemu_thread_create(decompress_threads + i, "decompress",
- do_data_decompress, decomp_param + i,
- QEMU_THREAD_JOINABLE);
- }
- return 0;
-exit:
- compress_threads_load_cleanup();
- return -1;
+ threaded_workqueue_wait_for_requests(decompress_threads);
+ return qemu_file_get_error(decomp_file);
}
static void decompress_data_with_multi_threads(QEMUFile *f,
- void *host, int len)
+ void *host, size_t len)
{
- int idx, thread_count;
+ DecompressData *dd;
- thread_count = migrate_decompress_threads();
- qemu_mutex_lock(&decomp_done_lock);
- while (true) {
- for (idx = 0; idx < thread_count; idx++) {
- if (decomp_param[idx].done) {
- decomp_param[idx].done = false;
- qemu_mutex_lock(&decomp_param[idx].mutex);
- qemu_get_buffer(f, decomp_param[idx].compbuf, len);
- decomp_param[idx].des = host;
- decomp_param[idx].len = len;
- qemu_cond_signal(&decomp_param[idx].cond);
- qemu_mutex_unlock(&decomp_param[idx].mutex);
- break;
- }
- }
- if (idx < thread_count) {
- break;
- } else {
- qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
- }
+retry:
+ dd = threaded_workqueue_get_request(decompress_threads);
+ if (!dd) {
+ goto retry;
}
- qemu_mutex_unlock(&decomp_done_lock);
+
+ dd->des = host;
+ dd->len = len;
+ qemu_get_buffer(f, dd->compbuf, len);
+ threaded_workqueue_submit_request(decompress_threads, dd);
}
/*
@@ -3678,7 +3610,7 @@ void colo_release_ram_cache(void)
*/
static int ram_load_setup(QEMUFile *f, void *opaque)
{
- if (compress_threads_load_setup(f)) {
+ if (decompress_init(f)) {
return -1;
}
@@ -3699,7 +3631,7 @@ static int ram_load_cleanup(void *opaque)
}
xbzrle_load_cleanup();
- compress_threads_load_cleanup();
+ decompress_fini();
RAMBLOCK_FOREACH_MIGRATABLE(rb) {
g_free(rb->receivedmap);
@@ -4101,7 +4033,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
}
}
- ret |= wait_for_decompress_done();
+ ret |= flush_decompressed_data();
rcu_read_unlock();
trace_ram_load_complete(ret, seq_iter);
--
2.14.5
next prev parent reply other threads:[~2018-11-22 7:21 UTC|newest]
Thread overview: 35+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-11-22 7:20 [Qemu-devel] [PATCH v3 0/5] migration: improve multithreads guangrong.xiao
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 1/5] bitops: introduce change_bit_atomic guangrong.xiao
2018-11-23 10:23 ` Dr. David Alan Gilbert
2018-11-28 9:35 ` Juan Quintela
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 2/5] util: introduce threaded workqueue guangrong.xiao
2018-11-23 11:02 ` Dr. David Alan Gilbert
2018-11-26 7:57 ` Xiao Guangrong
2018-11-26 10:56 ` Dr. David Alan Gilbert
2018-11-27 7:17 ` Xiao Guangrong
2018-11-26 18:55 ` Emilio G. Cota
2018-11-27 8:30 ` Xiao Guangrong
2018-11-24 0:12 ` Emilio G. Cota
2018-11-26 8:06 ` Xiao Guangrong
2018-11-26 18:49 ` Emilio G. Cota
2018-11-27 8:29 ` Xiao Guangrong
2018-11-24 0:17 ` Emilio G. Cota
2018-11-26 8:18 ` Xiao Guangrong
2018-11-26 10:28 ` Paolo Bonzini
2018-11-27 8:31 ` Xiao Guangrong
2018-11-27 12:49 ` Christophe de Dinechin
2018-11-27 13:51 ` Paolo Bonzini
2018-12-04 15:49 ` Christophe de Dinechin
2018-12-04 17:16 ` Paolo Bonzini
2018-12-10 3:23 ` Xiao Guangrong
2018-11-27 17:39 ` Emilio G. Cota
2018-11-28 8:55 ` Xiao Guangrong
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 3/5] migration: use threaded workqueue for compression guangrong.xiao
2018-11-23 18:17 ` Dr. David Alan Gilbert
2018-11-23 18:22 ` Paolo Bonzini
2018-11-23 18:29 ` Dr. David Alan Gilbert
2018-11-26 8:00 ` Xiao Guangrong
2018-11-22 7:20 ` guangrong.xiao [this message]
2018-11-22 7:20 ` [Qemu-devel] [PATCH v3 5/5] tests: add threaded-workqueue-bench guangrong.xiao
2018-11-22 21:25 ` [Qemu-devel] [PATCH v3 0/5] migration: improve multithreads no-reply
2018-11-22 21:35 ` no-reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20181122072028.22819-5-xiaoguangrong@tencent.com \
--to=guangrong.xiao@gmail.com \
--cc=cota@braap.org \
--cc=dgilbert@redhat.com \
--cc=eblake@redhat.com \
--cc=jiang.biao2@zte.com.cn \
--cc=kvm@vger.kernel.org \
--cc=mst@redhat.com \
--cc=mtosatti@redhat.com \
--cc=pbonzini@redhat.com \
--cc=peterx@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
--cc=wei.w.wang@intel.com \
--cc=xiaoguangrong@tencent.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).