* [PATCH v2 00/13] migration/ram.c: Refactor compress code
@ 2023-04-20 9:47 Lukas Straub
2023-04-20 9:47 ` [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled Lukas Straub
` (13 more replies)
0 siblings, 14 replies; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:47 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 1933 bytes --]
This series refactors the ram compress code.
It first removes ram.c dependencies from the core compress code, then
moves it out to its own file. Finally, on the migration destination side
the initialisation and cleanup of compress threads is moved out of ram.c
to migration.c. This allows using COLO with compress enabled.
Changes since v1:
- Add postcopy tests with compress enabled
- Use page_size variable in "ram-compress.c: Make target independent"
- Squash "ram.c: Remove unused include after moving out code"
with "ram.c: Move core compression code into its own file"
- Add Reviewed-by: Tags
Lukas Straub (13):
qtest/migration-test.c: Add postcopy tests with compress enabled
ram.c: Let the compress threads return a CompressResult enum
ram.c: Dont change param->block in the compress thread
ram.c: Reset result after sending queued data
ram.c: Do not call save_page_header() from compress threads
ram.c: Call update_compress_thread_counts from
compress_send_queued_data
ram.c: Remove last ram.c dependency from the core compress code
ram.c: Introduce whitespace (squash with next patch)
ram.c: Move core compression code into its own file
ram.c: Remove whitespace (squash with previous patch)
ram.c: Move core decompression code into its own file
ram compress: Assert that the file buffer matches the result
ram-compress.c: Make target independent
migration/meson.build | 5 +-
migration/qemu-file.c | 11 +
migration/qemu-file.h | 1 +
migration/ram-compress.c | 484 ++++++++++++++++++++++++++++++++++
migration/ram-compress.h | 70 +++++
migration/ram.c | 485 +++--------------------------------
tests/qtest/migration-test.c | 83 +++---
7 files changed, 660 insertions(+), 479 deletions(-)
create mode 100644 migration/ram-compress.c
create mode 100644 migration/ram-compress.h
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply [flat|nested] 30+ messages in thread
* [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
@ 2023-04-20 9:47 ` Lukas Straub
2023-04-20 10:20 ` Juan Quintela
2023-04-20 9:47 ` [PATCH v2 02/13] ram.c: Let the compress threads return a CompressResult enum Lukas Straub
` (12 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:47 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 5026 bytes --]
Add postcopy tests with compress enabled to ensure nothing breaks
with the refactoring in the next commits.
preempt+compress is blocked, so no test needed for that case.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
tests/qtest/migration-test.c | 83 +++++++++++++++++++++++-------------
1 file changed, 53 insertions(+), 30 deletions(-)
diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 1f2a019ce0..930cb4f29d 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -1127,6 +1127,36 @@ test_migrate_tls_x509_finish(QTestState *from,
#endif /* CONFIG_TASN1 */
#endif /* CONFIG_GNUTLS */
+static void *
+test_migrate_compress_start(QTestState *from,
+ QTestState *to)
+{
+ migrate_set_parameter_int(from, "compress-level", 1);
+ migrate_set_parameter_int(from, "compress-threads", 4);
+ migrate_set_parameter_bool(from, "compress-wait-thread", true);
+ migrate_set_parameter_int(to, "decompress-threads", 4);
+
+ migrate_set_capability(from, "compress", true);
+ migrate_set_capability(to, "compress", true);
+
+ return NULL;
+}
+
+static void *
+test_migrate_compress_nowait_start(QTestState *from,
+ QTestState *to)
+{
+ migrate_set_parameter_int(from, "compress-level", 9);
+ migrate_set_parameter_int(from, "compress-threads", 1);
+ migrate_set_parameter_bool(from, "compress-wait-thread", false);
+ migrate_set_parameter_int(to, "decompress-threads", 1);
+
+ migrate_set_capability(from, "compress", true);
+ migrate_set_capability(to, "compress", true);
+
+ return NULL;
+}
+
static int migrate_postcopy_prepare(QTestState **from_ptr,
QTestState **to_ptr,
MigrateCommon *args)
@@ -1204,6 +1234,15 @@ static void test_postcopy(void)
test_postcopy_common(&args);
}
+static void test_postcopy_compress(void)
+{
+ MigrateCommon args = {
+ .start_hook = test_migrate_compress_start
+ };
+
+ test_postcopy_common(&args);
+}
+
static void test_postcopy_preempt(void)
{
MigrateCommon args = {
@@ -1305,6 +1344,15 @@ static void test_postcopy_recovery(void)
test_postcopy_recovery_common(&args);
}
+static void test_postcopy_recovery_compress(void)
+{
+ MigrateCommon args = {
+ .start_hook = test_migrate_compress_start
+ };
+
+ test_postcopy_recovery_common(&args);
+}
+
#ifdef CONFIG_GNUTLS
static void test_postcopy_recovery_tls_psk(void)
{
@@ -1338,6 +1386,7 @@ static void test_postcopy_preempt_all(void)
test_postcopy_recovery_common(&args);
}
+
#endif
static void test_baddest(void)
@@ -1559,21 +1608,6 @@ static void test_precopy_unix_xbzrle(void)
test_precopy_common(&args);
}
-static void *
-test_migrate_compress_start(QTestState *from,
- QTestState *to)
-{
- migrate_set_parameter_int(from, "compress-level", 1);
- migrate_set_parameter_int(from, "compress-threads", 4);
- migrate_set_parameter_bool(from, "compress-wait-thread", true);
- migrate_set_parameter_int(to, "decompress-threads", 4);
-
- migrate_set_capability(from, "compress", true);
- migrate_set_capability(to, "compress", true);
-
- return NULL;
-}
-
static void test_precopy_unix_compress(void)
{
g_autofree char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
@@ -1591,21 +1625,6 @@ static void test_precopy_unix_compress(void)
test_precopy_common(&args);
}
-static void *
-test_migrate_compress_nowait_start(QTestState *from,
- QTestState *to)
-{
- migrate_set_parameter_int(from, "compress-level", 9);
- migrate_set_parameter_int(from, "compress-threads", 1);
- migrate_set_parameter_bool(from, "compress-wait-thread", false);
- migrate_set_parameter_int(to, "decompress-threads", 1);
-
- migrate_set_capability(from, "compress", true);
- migrate_set_capability(to, "compress", true);
-
- return NULL;
-}
-
static void test_precopy_unix_compress_nowait(void)
{
g_autofree char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
@@ -2604,8 +2623,12 @@ int main(int argc, char **argv)
if (has_uffd) {
qtest_add_func("/migration/postcopy/plain", test_postcopy);
+ qtest_add_func("/migration/postcopy/compress/plain",
+ test_postcopy_compress);
qtest_add_func("/migration/postcopy/recovery/plain",
test_postcopy_recovery);
+ qtest_add_func("/migration/postcopy/recovery/compress/plain",
+ test_postcopy_recovery_compress);
qtest_add_func("/migration/postcopy/preempt/plain", test_postcopy_preempt);
qtest_add_func("/migration/postcopy/preempt/recovery/plain",
test_postcopy_preempt_recovery);
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 02/13] ram.c: Let the compress threads return a CompressResult enum
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
2023-04-20 9:47 ` [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled Lukas Straub
@ 2023-04-20 9:47 ` Lukas Straub
2023-04-20 21:13 ` Juan Quintela
2023-04-20 9:47 ` [PATCH v2 03/13] ram.c: Dont change param->block in the compress thread Lukas Straub
` (11 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:47 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 4012 bytes --]
This will be used in the next commits to move save_page_header()
out of compress code.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/ram.c | 34 ++++++++++++++++++++++------------
1 file changed, 22 insertions(+), 12 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index 79d881f735..ade6638a96 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -493,10 +493,17 @@ MigrationOps *migration_ops;
CompressionStats compression_counters;
+enum CompressResult {
+ RES_NONE = 0,
+ RES_ZEROPAGE = 1,
+ RES_COMPRESS = 2
+};
+typedef enum CompressResult CompressResult;
+
struct CompressParam {
bool done;
bool quit;
- bool zero_page;
+ CompressResult result;
QEMUFile *file;
QemuMutex mutex;
QemuCond cond;
@@ -538,8 +545,9 @@ static QemuCond decomp_done_cond;
static int ram_save_host_page_urgent(PageSearchStatus *pss);
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset, uint8_t *source_buf);
+static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
+ RAMBlock *block, ram_addr_t offset,
+ uint8_t *source_buf);
/* NOTE: page is the PFN not real ram_addr_t. */
static void pss_init(PageSearchStatus *pss, RAMBlock *rb, ram_addr_t page)
@@ -564,7 +572,7 @@ static void *do_data_compress(void *opaque)
CompressParam *param = opaque;
RAMBlock *block;
ram_addr_t offset;
- bool zero_page;
+ CompressResult result;
qemu_mutex_lock(¶m->mutex);
while (!param->quit) {
@@ -574,12 +582,12 @@ static void *do_data_compress(void *opaque)
param->block = NULL;
qemu_mutex_unlock(¶m->mutex);
- zero_page = do_compress_ram_page(param->file, ¶m->stream,
- block, offset, param->originbuf);
+ result = do_compress_ram_page(param->file, ¶m->stream,
+ block, offset, param->originbuf);
qemu_mutex_lock(&comp_done_lock);
param->done = true;
- param->zero_page = zero_page;
+ param->result = result;
qemu_cond_signal(&comp_done_cond);
qemu_mutex_unlock(&comp_done_lock);
@@ -1463,8 +1471,9 @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block,
return 1;
}
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset, uint8_t *source_buf)
+static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
+ RAMBlock *block, ram_addr_t offset,
+ uint8_t *source_buf)
{
RAMState *rs = ram_state;
PageSearchStatus *pss = &rs->pss[RAM_CHANNEL_PRECOPY];
@@ -1472,7 +1481,7 @@ static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
int ret;
if (save_zero_page_to_file(pss, f, block, offset)) {
- return true;
+ return RES_ZEROPAGE;
}
save_page_header(pss, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
@@ -1487,8 +1496,9 @@ static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
if (ret < 0) {
qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
+ return RES_NONE;
}
- return false;
+ return RES_COMPRESS;
}
static void
@@ -1496,7 +1506,7 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
{
ram_transferred_add(bytes_xmit);
- if (param->zero_page) {
+ if (param->result == RES_ZEROPAGE) {
stat64_add(&ram_atomic_counters.duplicate, 1);
return;
}
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 03/13] ram.c: Dont change param->block in the compress thread
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
2023-04-20 9:47 ` [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled Lukas Straub
2023-04-20 9:47 ` [PATCH v2 02/13] ram.c: Let the compress threads return a CompressResult enum Lukas Straub
@ 2023-04-20 9:47 ` Lukas Straub
2023-04-20 21:13 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 04/13] ram.c: Reset result after sending queued data Lukas Straub
` (10 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:47 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 1500 bytes --]
Instead introduce a extra parameter to trigger the compress thread.
Now, when the compress thread is done, we know what RAMBlock and
offset it did compress.
This will be used in the next commits to move save_page_header()
out of compress code.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/ram.c | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index ade6638a96..820b4ebaeb 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -503,6 +503,7 @@ typedef enum CompressResult CompressResult;
struct CompressParam {
bool done;
bool quit;
+ bool trigger;
CompressResult result;
QEMUFile *file;
QemuMutex mutex;
@@ -576,10 +577,10 @@ static void *do_data_compress(void *opaque)
qemu_mutex_lock(¶m->mutex);
while (!param->quit) {
- if (param->block) {
+ if (param->trigger) {
block = param->block;
offset = param->offset;
- param->block = NULL;
+ param->trigger = false;
qemu_mutex_unlock(¶m->mutex);
result = do_compress_ram_page(param->file, ¶m->stream,
@@ -1556,6 +1557,7 @@ static inline void set_compress_params(CompressParam *param, RAMBlock *block,
{
param->block = block;
param->offset = offset;
+ param->trigger = true;
}
static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 04/13] ram.c: Reset result after sending queued data
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (2 preceding siblings ...)
2023-04-20 9:47 ` [PATCH v2 03/13] ram.c: Dont change param->block in the compress thread Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 11:59 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 05/13] ram.c: Do not call save_page_header() from compress threads Lukas Straub
` (9 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 3236 bytes --]
And take the param->mutex lock for the whole section to ensure
thread-safety.
Now, it is explicitly clear if there is no queued data to send.
Before, this was handled by param->file stream being empty and thus
qemu_put_qemu_file() not sending anything.
This will be used in the next commits to move save_page_header()
out of compress code.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/ram.c | 32 ++++++++++++++++++++++----------
1 file changed, 22 insertions(+), 10 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index 820b4ebaeb..5ca0f115cf 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1519,6 +1519,13 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
static bool save_page_use_compression(RAMState *rs);
+static inline void compress_reset_result(CompressParam *param)
+{
+ param->result = RES_NONE;
+ param->block = NULL;
+ param->offset = 0;
+}
+
static void flush_compressed_data(RAMState *rs)
{
MigrationState *ms = migrate_get_current();
@@ -1540,13 +1547,16 @@ static void flush_compressed_data(RAMState *rs)
for (idx = 0; idx < thread_count; idx++) {
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) {
- len = qemu_put_qemu_file(ms->to_dst_file, comp_param[idx].file);
+ CompressParam *param = &comp_param[idx];
+ len = qemu_put_qemu_file(ms->to_dst_file, param->file);
+ compress_reset_result(param);
+
/*
* it's safe to fetch zero_page without holding comp_done_lock
* as there is no further request submitted to the thread,
* i.e, the thread should be waiting for a request at this point.
*/
- update_compress_thread_counts(&comp_param[idx], len);
+ update_compress_thread_counts(param, len);
}
qemu_mutex_unlock(&comp_param[idx].mutex);
}
@@ -1571,15 +1581,17 @@ static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
retry:
for (idx = 0; idx < thread_count; idx++) {
if (comp_param[idx].done) {
- comp_param[idx].done = false;
- bytes_xmit = qemu_put_qemu_file(ms->to_dst_file,
- comp_param[idx].file);
- qemu_mutex_lock(&comp_param[idx].mutex);
- set_compress_params(&comp_param[idx], block, offset);
- qemu_cond_signal(&comp_param[idx].cond);
- qemu_mutex_unlock(&comp_param[idx].mutex);
+ CompressParam *param = &comp_param[idx];
+ qemu_mutex_lock(¶m->mutex);
+ param->done = false;
+ bytes_xmit = qemu_put_qemu_file(ms->to_dst_file, param->file);
+ compress_reset_result(param);
+ set_compress_params(param, block, offset);
+
+ update_compress_thread_counts(param, bytes_xmit);
+ qemu_cond_signal(¶m->cond);
+ qemu_mutex_unlock(¶m->mutex);
pages = 1;
- update_compress_thread_counts(&comp_param[idx], bytes_xmit);
break;
}
}
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 05/13] ram.c: Do not call save_page_header() from compress threads
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (3 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 04/13] ram.c: Reset result after sending queued data Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 12:02 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 06/13] ram.c: Call update_compress_thread_counts from compress_send_queued_data Lukas Straub
` (8 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 3820 bytes --]
save_page_header() accesses several global variables, so calling it
from multiple threads is pretty ugly.
Instead, call save_page_header() before writing out the compressed
data from the compress buffer to the migration stream.
This also makes the core compress code more independend from ram.c.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/ram.c | 44 +++++++++++++++++++++++++++++++++++---------
1 file changed, 35 insertions(+), 9 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index 5ca0f115cf..d248e1f062 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1476,17 +1476,13 @@ static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
RAMBlock *block, ram_addr_t offset,
uint8_t *source_buf)
{
- RAMState *rs = ram_state;
- PageSearchStatus *pss = &rs->pss[RAM_CHANNEL_PRECOPY];
uint8_t *p = block->host + offset;
int ret;
- if (save_zero_page_to_file(pss, f, block, offset)) {
+ if (buffer_is_zero(p, TARGET_PAGE_SIZE)) {
return RES_ZEROPAGE;
}
- save_page_header(pss, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
-
/*
* copy it to a internal buffer to avoid it being modified by VM
* so that we can catch up the error during compression and
@@ -1526,9 +1522,40 @@ static inline void compress_reset_result(CompressParam *param)
param->offset = 0;
}
-static void flush_compressed_data(RAMState *rs)
+static int send_queued_data(CompressParam *param)
{
+ PageSearchStatus *pss = &ram_state->pss[RAM_CHANNEL_PRECOPY];
MigrationState *ms = migrate_get_current();
+ QEMUFile *file = ms->to_dst_file;
+ int len = 0;
+
+ RAMBlock *block = param->block;
+ ram_addr_t offset = param->offset;
+
+ if (param->result == RES_NONE) {
+ return 0;
+ }
+
+ assert(block == pss->last_sent_block);
+
+ if (param->result == RES_ZEROPAGE) {
+ len += save_page_header(pss, file, block, offset | RAM_SAVE_FLAG_ZERO);
+ qemu_put_byte(file, 0);
+ len += 1;
+ ram_release_page(block->idstr, offset);
+ } else if (param->result == RES_COMPRESS) {
+ len += save_page_header(pss, file, block,
+ offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
+ len += qemu_put_qemu_file(file, param->file);
+ } else {
+ abort();
+ }
+
+ return len;
+}
+
+static void flush_compressed_data(RAMState *rs)
+{
int idx, len, thread_count;
if (!save_page_use_compression(rs)) {
@@ -1548,7 +1575,7 @@ static void flush_compressed_data(RAMState *rs)
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) {
CompressParam *param = &comp_param[idx];
- len = qemu_put_qemu_file(ms->to_dst_file, param->file);
+ len = send_queued_data(param);
compress_reset_result(param);
/*
@@ -1574,7 +1601,6 @@ static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
{
int idx, thread_count, bytes_xmit = -1, pages = -1;
bool wait = migrate_compress_wait_thread();
- MigrationState *ms = migrate_get_current();
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
@@ -1584,7 +1610,7 @@ retry:
CompressParam *param = &comp_param[idx];
qemu_mutex_lock(¶m->mutex);
param->done = false;
- bytes_xmit = qemu_put_qemu_file(ms->to_dst_file, param->file);
+ bytes_xmit = send_queued_data(param);
compress_reset_result(param);
set_compress_params(param, block, offset);
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 06/13] ram.c: Call update_compress_thread_counts from compress_send_queued_data
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (4 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 05/13] ram.c: Do not call save_page_header() from compress threads Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 12:10 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 07/13] ram.c: Remove last ram.c dependency from the core compress code Lukas Straub
` (7 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 2399 bytes --]
This makes the core compress code more independend from ram.c.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/ram.c | 18 ++++++------------
1 file changed, 6 insertions(+), 12 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index d248e1f062..3894d0ae79 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1551,12 +1551,14 @@ static int send_queued_data(CompressParam *param)
abort();
}
+ update_compress_thread_counts(param, len);
+
return len;
}
static void flush_compressed_data(RAMState *rs)
{
- int idx, len, thread_count;
+ int idx, thread_count;
if (!save_page_use_compression(rs)) {
return;
@@ -1575,15 +1577,8 @@ static void flush_compressed_data(RAMState *rs)
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) {
CompressParam *param = &comp_param[idx];
- len = send_queued_data(param);
+ send_queued_data(param);
compress_reset_result(param);
-
- /*
- * it's safe to fetch zero_page without holding comp_done_lock
- * as there is no further request submitted to the thread,
- * i.e, the thread should be waiting for a request at this point.
- */
- update_compress_thread_counts(param, len);
}
qemu_mutex_unlock(&comp_param[idx].mutex);
}
@@ -1599,7 +1594,7 @@ static inline void set_compress_params(CompressParam *param, RAMBlock *block,
static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
{
- int idx, thread_count, bytes_xmit = -1, pages = -1;
+ int idx, thread_count, pages = -1;
bool wait = migrate_compress_wait_thread();
thread_count = migrate_compress_threads();
@@ -1610,11 +1605,10 @@ retry:
CompressParam *param = &comp_param[idx];
qemu_mutex_lock(¶m->mutex);
param->done = false;
- bytes_xmit = send_queued_data(param);
+ send_queued_data(param);
compress_reset_result(param);
set_compress_params(param, block, offset);
- update_compress_thread_counts(param, bytes_xmit);
qemu_cond_signal(¶m->cond);
qemu_mutex_unlock(¶m->mutex);
pages = 1;
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 07/13] ram.c: Remove last ram.c dependency from the core compress code
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (5 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 06/13] ram.c: Call update_compress_thread_counts from compress_send_queued_data Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 12:14 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 08/13] ram.c: Introduce whitespace (squash with next patch) Lukas Straub
` (6 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 3339 bytes --]
Make compression interfaces take send_queued_data() as an argument.
Remove save_page_use_compression() from flush_compressed_data().
This removes the last ram.c dependency from the core compress code.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/ram.c | 27 +++++++++++++++++----------
1 file changed, 17 insertions(+), 10 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index 3894d0ae79..bd3773d4c4 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1556,13 +1556,10 @@ static int send_queued_data(CompressParam *param)
return len;
}
-static void flush_compressed_data(RAMState *rs)
+static void flush_compressed_data(int (send_queued_data(CompressParam *)))
{
int idx, thread_count;
- if (!save_page_use_compression(rs)) {
- return;
- }
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
@@ -1584,6 +1581,15 @@ static void flush_compressed_data(RAMState *rs)
}
}
+static void ram_flush_compressed_data(RAMState *rs)
+{
+ if (!save_page_use_compression(rs)) {
+ return;
+ }
+
+ flush_compressed_data(send_queued_data);
+}
+
static inline void set_compress_params(CompressParam *param, RAMBlock *block,
ram_addr_t offset)
{
@@ -1592,7 +1598,8 @@ static inline void set_compress_params(CompressParam *param, RAMBlock *block,
param->trigger = true;
}
-static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
+static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
+ int (send_queued_data(CompressParam *)))
{
int idx, thread_count, pages = -1;
bool wait = migrate_compress_wait_thread();
@@ -1673,7 +1680,7 @@ static int find_dirty_block(RAMState *rs, PageSearchStatus *pss)
* Also If xbzrle is on, stop using the data compression at this
* point. In theory, xbzrle can do better than compression.
*/
- flush_compressed_data(rs);
+ ram_flush_compressed_data(rs);
/* Hit the end of the list */
pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
@@ -2363,11 +2370,11 @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss,
* much CPU resource.
*/
if (block != pss->last_sent_block) {
- flush_compressed_data(rs);
+ ram_flush_compressed_data(rs);
return false;
}
- if (compress_page_with_multi_thread(block, offset) > 0) {
+ if (compress_page_with_multi_thread(block, offset, send_queued_data) > 0) {
return true;
}
@@ -3419,7 +3426,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
* page is sent in one chunk.
*/
if (migrate_postcopy_ram()) {
- flush_compressed_data(rs);
+ ram_flush_compressed_data(rs);
}
/*
@@ -3512,7 +3519,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
}
qemu_mutex_unlock(&rs->bitmap_mutex);
- flush_compressed_data(rs);
+ ram_flush_compressed_data(rs);
ram_control_after_iterate(f, RAM_CONTROL_FINISH);
}
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 08/13] ram.c: Introduce whitespace (squash with next patch)
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (6 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 07/13] ram.c: Remove last ram.c dependency from the core compress code Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 16:13 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 09/13] ram.c: Move core compression code into its own file Lukas Straub
` (5 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 747 bytes --]
Introduce whitespace to make it easier to reroll the series.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/ram.c | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/migration/ram.c b/migration/ram.c
index bd3773d4c4..b95c5c720d 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -602,6 +602,12 @@ static void *do_data_compress(void *opaque)
return NULL;
}
+
+
+/* split */
+
+
+
static void compress_threads_save_cleanup(void)
{
int i, thread_count;
@@ -641,6 +647,12 @@ static void compress_threads_save_cleanup(void)
comp_param = NULL;
}
+
+
+/* split */
+
+
+
static int compress_threads_save_setup(void)
{
int i, thread_count;
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 09/13] ram.c: Move core compression code into its own file
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (7 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 08/13] ram.c: Introduce whitespace (squash with next patch) Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-20 9:48 ` [PATCH v2 10/13] ram.c: Remove whitespace (squash with previous patch) Lukas Straub
` (4 subsequent siblings)
13 siblings, 0 replies; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 23121 bytes --]
No functional changes intended.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/meson.build | 5 +-
migration/ram-compress.c | 273 +++++++++++++++++++++++++++++++++++++++
migration/ram-compress.h | 65 ++++++++++
migration/ram.c | 256 +-----------------------------------
4 files changed, 343 insertions(+), 256 deletions(-)
create mode 100644 migration/ram-compress.c
create mode 100644 migration/ram-compress.h
diff --git a/migration/meson.build b/migration/meson.build
index 0d1bb9f96e..262e3c9754 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -36,4 +36,7 @@ endif
softmmu_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
specific_ss.add(when: 'CONFIG_SOFTMMU',
- if_true: files('dirtyrate.c', 'ram.c', 'target.c'))
+ if_true: files('dirtyrate.c',
+ 'ram.c',
+ 'ram-compress.c',
+ 'target.c'))
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
new file mode 100644
index 0000000000..77902a1d65
--- /dev/null
+++ b/migration/ram-compress.c
@@ -0,0 +1,273 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2011-2015 Red Hat Inc
+ *
+ * Authors:
+ * Juan Quintela <quintela@redhat.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/cutils.h"
+
+#include "ram-compress.h"
+
+#include "qemu/error-report.h"
+#include "migration.h"
+#include "io/channel-null.h"
+#include "exec/ram_addr.h"
+
+CompressionStats compression_counters;
+
+static CompressParam *comp_param;
+static QemuThread *compress_threads;
+/* comp_done_cond is used to wake up the migration thread when
+ * one of the compression threads has finished the compression.
+ * comp_done_lock is used to co-work with comp_done_cond.
+ */
+static QemuMutex comp_done_lock;
+static QemuCond comp_done_cond;
+
+static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
+ RAMBlock *block, ram_addr_t offset,
+ uint8_t *source_buf);
+
+static void *do_data_compress(void *opaque)
+{
+ CompressParam *param = opaque;
+ RAMBlock *block;
+ ram_addr_t offset;
+ CompressResult result;
+
+ qemu_mutex_lock(¶m->mutex);
+ while (!param->quit) {
+ if (param->trigger) {
+ block = param->block;
+ offset = param->offset;
+ param->trigger = false;
+ qemu_mutex_unlock(¶m->mutex);
+
+ result = do_compress_ram_page(param->file, ¶m->stream,
+ block, offset, param->originbuf);
+
+ qemu_mutex_lock(&comp_done_lock);
+ param->done = true;
+ param->result = result;
+ qemu_cond_signal(&comp_done_cond);
+ qemu_mutex_unlock(&comp_done_lock);
+
+ qemu_mutex_lock(¶m->mutex);
+ } else {
+ qemu_cond_wait(¶m->cond, ¶m->mutex);
+ }
+ }
+ qemu_mutex_unlock(¶m->mutex);
+
+ return NULL;
+}
+
+void compress_threads_save_cleanup(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_compression() || !comp_param) {
+ return;
+ }
+
+ thread_count = migrate_compress_threads();
+ for (i = 0; i < thread_count; i++) {
+ /*
+ * we use it as a indicator which shows if the thread is
+ * properly init'd or not
+ */
+ if (!comp_param[i].file) {
+ break;
+ }
+
+ qemu_mutex_lock(&comp_param[i].mutex);
+ comp_param[i].quit = true;
+ qemu_cond_signal(&comp_param[i].cond);
+ qemu_mutex_unlock(&comp_param[i].mutex);
+
+ qemu_thread_join(compress_threads + i);
+ qemu_mutex_destroy(&comp_param[i].mutex);
+ qemu_cond_destroy(&comp_param[i].cond);
+ deflateEnd(&comp_param[i].stream);
+ g_free(comp_param[i].originbuf);
+ qemu_fclose(comp_param[i].file);
+ comp_param[i].file = NULL;
+ }
+ qemu_mutex_destroy(&comp_done_lock);
+ qemu_cond_destroy(&comp_done_cond);
+ g_free(compress_threads);
+ g_free(comp_param);
+ compress_threads = NULL;
+ comp_param = NULL;
+}
+
+int compress_threads_save_setup(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_compression()) {
+ return 0;
+ }
+ thread_count = migrate_compress_threads();
+ compress_threads = g_new0(QemuThread, thread_count);
+ comp_param = g_new0(CompressParam, thread_count);
+ qemu_cond_init(&comp_done_cond);
+ qemu_mutex_init(&comp_done_lock);
+ for (i = 0; i < thread_count; i++) {
+ comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+ if (!comp_param[i].originbuf) {
+ goto exit;
+ }
+
+ if (deflateInit(&comp_param[i].stream,
+ migrate_compress_level()) != Z_OK) {
+ g_free(comp_param[i].originbuf);
+ goto exit;
+ }
+
+ /* comp_param[i].file is just used as a dummy buffer to save data,
+ * set its ops to empty.
+ */
+ comp_param[i].file = qemu_file_new_output(
+ QIO_CHANNEL(qio_channel_null_new()));
+ comp_param[i].done = true;
+ comp_param[i].quit = false;
+ qemu_mutex_init(&comp_param[i].mutex);
+ qemu_cond_init(&comp_param[i].cond);
+ qemu_thread_create(compress_threads + i, "compress",
+ do_data_compress, comp_param + i,
+ QEMU_THREAD_JOINABLE);
+ }
+ return 0;
+
+exit:
+ compress_threads_save_cleanup();
+ return -1;
+}
+
+static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
+ RAMBlock *block, ram_addr_t offset,
+ uint8_t *source_buf)
+{
+ uint8_t *p = block->host + offset;
+ int ret;
+
+ if (buffer_is_zero(p, TARGET_PAGE_SIZE)) {
+ return RES_ZEROPAGE;
+ }
+
+ /*
+ * copy it to a internal buffer to avoid it being modified by VM
+ * so that we can catch up the error during compression and
+ * decompression
+ */
+ memcpy(source_buf, p, TARGET_PAGE_SIZE);
+ ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
+ if (ret < 0) {
+ qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
+ error_report("compressed data failed!");
+ return RES_NONE;
+ }
+ return RES_COMPRESS;
+}
+
+static inline void compress_reset_result(CompressParam *param)
+{
+ param->result = RES_NONE;
+ param->block = NULL;
+ param->offset = 0;
+}
+
+void flush_compressed_data(int (send_queued_data(CompressParam *)))
+{
+ int idx, thread_count;
+
+ thread_count = migrate_compress_threads();
+
+ qemu_mutex_lock(&comp_done_lock);
+ for (idx = 0; idx < thread_count; idx++) {
+ while (!comp_param[idx].done) {
+ qemu_cond_wait(&comp_done_cond, &comp_done_lock);
+ }
+ }
+ qemu_mutex_unlock(&comp_done_lock);
+
+ for (idx = 0; idx < thread_count; idx++) {
+ qemu_mutex_lock(&comp_param[idx].mutex);
+ if (!comp_param[idx].quit) {
+ CompressParam *param = &comp_param[idx];
+ send_queued_data(param);
+ compress_reset_result(param);
+ }
+ qemu_mutex_unlock(&comp_param[idx].mutex);
+ }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+ ram_addr_t offset)
+{
+ param->block = block;
+ param->offset = offset;
+ param->trigger = true;
+}
+
+int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
+ int (send_queued_data(CompressParam *)))
+{
+ int idx, thread_count, pages = -1;
+ bool wait = migrate_compress_wait_thread();
+
+ thread_count = migrate_compress_threads();
+ qemu_mutex_lock(&comp_done_lock);
+retry:
+ for (idx = 0; idx < thread_count; idx++) {
+ if (comp_param[idx].done) {
+ CompressParam *param = &comp_param[idx];
+ qemu_mutex_lock(¶m->mutex);
+ param->done = false;
+ send_queued_data(param);
+ compress_reset_result(param);
+ set_compress_params(param, block, offset);
+
+ qemu_cond_signal(¶m->cond);
+ qemu_mutex_unlock(¶m->mutex);
+ pages = 1;
+ break;
+ }
+ }
+
+ /*
+ * wait for the free thread if the user specifies 'compress-wait-thread',
+ * otherwise we will post the page out in the main thread as normal page.
+ */
+ if (pages < 0 && wait) {
+ qemu_cond_wait(&comp_done_cond, &comp_done_lock);
+ goto retry;
+ }
+ qemu_mutex_unlock(&comp_done_lock);
+
+ return pages;
+}
diff --git a/migration/ram-compress.h b/migration/ram-compress.h
new file mode 100644
index 0000000000..06570a799c
--- /dev/null
+++ b/migration/ram-compress.h
@@ -0,0 +1,65 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2011-2015 Red Hat Inc
+ *
+ * Authors:
+ * Juan Quintela <quintela@redhat.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef QEMU_MIGRATION_COMPRESS_H
+#define QEMU_MIGRATION_COMPRESS_H
+
+#include "qemu-file.h"
+
+enum CompressResult {
+ RES_NONE = 0,
+ RES_ZEROPAGE = 1,
+ RES_COMPRESS = 2
+};
+typedef enum CompressResult CompressResult;
+
+struct CompressParam {
+ bool done;
+ bool quit;
+ bool trigger;
+ CompressResult result;
+ QEMUFile *file;
+ QemuMutex mutex;
+ QemuCond cond;
+ RAMBlock *block;
+ ram_addr_t offset;
+
+ /* internally used fields */
+ z_stream stream;
+ uint8_t *originbuf;
+};
+typedef struct CompressParam CompressParam;
+
+void compress_threads_save_cleanup(void);
+int compress_threads_save_setup(void);
+
+void flush_compressed_data(int (send_queued_data(CompressParam *)));
+int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
+ int (send_queued_data(CompressParam *)));
+
+#endif
diff --git a/migration/ram.c b/migration/ram.c
index b95c5c720d..42d6a54132 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -32,8 +32,8 @@
#include "qemu/bitmap.h"
#include "qemu/madvise.h"
#include "qemu/main-loop.h"
-#include "io/channel-null.h"
#include "xbzrle.h"
+#include "ram-compress.h"
#include "ram.h"
#include "migration.h"
#include "migration/register.h"
@@ -491,32 +491,6 @@ typedef struct MigrationOps MigrationOps;
MigrationOps *migration_ops;
-CompressionStats compression_counters;
-
-enum CompressResult {
- RES_NONE = 0,
- RES_ZEROPAGE = 1,
- RES_COMPRESS = 2
-};
-typedef enum CompressResult CompressResult;
-
-struct CompressParam {
- bool done;
- bool quit;
- bool trigger;
- CompressResult result;
- QEMUFile *file;
- QemuMutex mutex;
- QemuCond cond;
- RAMBlock *block;
- ram_addr_t offset;
-
- /* internally used fields */
- z_stream stream;
- uint8_t *originbuf;
-};
-typedef struct CompressParam CompressParam;
-
struct DecompressParam {
bool done;
bool quit;
@@ -529,15 +503,6 @@ struct DecompressParam {
};
typedef struct DecompressParam DecompressParam;
-static CompressParam *comp_param;
-static QemuThread *compress_threads;
-/* comp_done_cond is used to wake up the migration thread when
- * one of the compression threads has finished the compression.
- * comp_done_lock is used to co-work with comp_done_cond.
- */
-static QemuMutex comp_done_lock;
-static QemuCond comp_done_cond;
-
static QEMUFile *decomp_file;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
@@ -546,10 +511,6 @@ static QemuCond decomp_done_cond;
static int ram_save_host_page_urgent(PageSearchStatus *pss);
-static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
- RAMBlock *block, ram_addr_t offset,
- uint8_t *source_buf);
-
/* NOTE: page is the PFN not real ram_addr_t. */
static void pss_init(PageSearchStatus *pss, RAMBlock *rb, ram_addr_t page)
{
@@ -568,39 +529,7 @@ static bool pss_overlap(PageSearchStatus *pss1, PageSearchStatus *pss2)
(pss1->host_page_start == pss2->host_page_start);
}
-static void *do_data_compress(void *opaque)
-{
- CompressParam *param = opaque;
- RAMBlock *block;
- ram_addr_t offset;
- CompressResult result;
- qemu_mutex_lock(¶m->mutex);
- while (!param->quit) {
- if (param->trigger) {
- block = param->block;
- offset = param->offset;
- param->trigger = false;
- qemu_mutex_unlock(¶m->mutex);
-
- result = do_compress_ram_page(param->file, ¶m->stream,
- block, offset, param->originbuf);
-
- qemu_mutex_lock(&comp_done_lock);
- param->done = true;
- param->result = result;
- qemu_cond_signal(&comp_done_cond);
- qemu_mutex_unlock(&comp_done_lock);
-
- qemu_mutex_lock(¶m->mutex);
- } else {
- qemu_cond_wait(¶m->cond, ¶m->mutex);
- }
- }
- qemu_mutex_unlock(¶m->mutex);
-
- return NULL;
-}
@@ -608,44 +537,7 @@ static void *do_data_compress(void *opaque)
-static void compress_threads_save_cleanup(void)
-{
- int i, thread_count;
-
- if (!migrate_use_compression() || !comp_param) {
- return;
- }
-
- thread_count = migrate_compress_threads();
- for (i = 0; i < thread_count; i++) {
- /*
- * we use it as a indicator which shows if the thread is
- * properly init'd or not
- */
- if (!comp_param[i].file) {
- break;
- }
-
- qemu_mutex_lock(&comp_param[i].mutex);
- comp_param[i].quit = true;
- qemu_cond_signal(&comp_param[i].cond);
- qemu_mutex_unlock(&comp_param[i].mutex);
- qemu_thread_join(compress_threads + i);
- qemu_mutex_destroy(&comp_param[i].mutex);
- qemu_cond_destroy(&comp_param[i].cond);
- deflateEnd(&comp_param[i].stream);
- g_free(comp_param[i].originbuf);
- qemu_fclose(comp_param[i].file);
- comp_param[i].file = NULL;
- }
- qemu_mutex_destroy(&comp_done_lock);
- qemu_cond_destroy(&comp_done_cond);
- g_free(compress_threads);
- g_free(comp_param);
- compress_threads = NULL;
- comp_param = NULL;
-}
@@ -653,49 +545,7 @@ static void compress_threads_save_cleanup(void)
-static int compress_threads_save_setup(void)
-{
- int i, thread_count;
- if (!migrate_use_compression()) {
- return 0;
- }
- thread_count = migrate_compress_threads();
- compress_threads = g_new0(QemuThread, thread_count);
- comp_param = g_new0(CompressParam, thread_count);
- qemu_cond_init(&comp_done_cond);
- qemu_mutex_init(&comp_done_lock);
- for (i = 0; i < thread_count; i++) {
- comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
- if (!comp_param[i].originbuf) {
- goto exit;
- }
-
- if (deflateInit(&comp_param[i].stream,
- migrate_compress_level()) != Z_OK) {
- g_free(comp_param[i].originbuf);
- goto exit;
- }
-
- /* comp_param[i].file is just used as a dummy buffer to save data,
- * set its ops to empty.
- */
- comp_param[i].file = qemu_file_new_output(
- QIO_CHANNEL(qio_channel_null_new()));
- comp_param[i].done = true;
- comp_param[i].quit = false;
- qemu_mutex_init(&comp_param[i].mutex);
- qemu_cond_init(&comp_param[i].cond);
- qemu_thread_create(compress_threads + i, "compress",
- do_data_compress, comp_param + i,
- QEMU_THREAD_JOINABLE);
- }
- return 0;
-
-exit:
- compress_threads_save_cleanup();
- return -1;
-}
/**
* save_page_header: write page header to wire
@@ -1484,32 +1334,6 @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block,
return 1;
}
-static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
- RAMBlock *block, ram_addr_t offset,
- uint8_t *source_buf)
-{
- uint8_t *p = block->host + offset;
- int ret;
-
- if (buffer_is_zero(p, TARGET_PAGE_SIZE)) {
- return RES_ZEROPAGE;
- }
-
- /*
- * copy it to a internal buffer to avoid it being modified by VM
- * so that we can catch up the error during compression and
- * decompression
- */
- memcpy(source_buf, p, TARGET_PAGE_SIZE);
- ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
- if (ret < 0) {
- qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
- error_report("compressed data failed!");
- return RES_NONE;
- }
- return RES_COMPRESS;
-}
-
static void
update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
{
@@ -1527,13 +1351,6 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
static bool save_page_use_compression(RAMState *rs);
-static inline void compress_reset_result(CompressParam *param)
-{
- param->result = RES_NONE;
- param->block = NULL;
- param->offset = 0;
-}
-
static int send_queued_data(CompressParam *param)
{
PageSearchStatus *pss = &ram_state->pss[RAM_CHANNEL_PRECOPY];
@@ -1568,31 +1385,6 @@ static int send_queued_data(CompressParam *param)
return len;
}
-static void flush_compressed_data(int (send_queued_data(CompressParam *)))
-{
- int idx, thread_count;
-
- thread_count = migrate_compress_threads();
-
- qemu_mutex_lock(&comp_done_lock);
- for (idx = 0; idx < thread_count; idx++) {
- while (!comp_param[idx].done) {
- qemu_cond_wait(&comp_done_cond, &comp_done_lock);
- }
- }
- qemu_mutex_unlock(&comp_done_lock);
-
- for (idx = 0; idx < thread_count; idx++) {
- qemu_mutex_lock(&comp_param[idx].mutex);
- if (!comp_param[idx].quit) {
- CompressParam *param = &comp_param[idx];
- send_queued_data(param);
- compress_reset_result(param);
- }
- qemu_mutex_unlock(&comp_param[idx].mutex);
- }
-}
-
static void ram_flush_compressed_data(RAMState *rs)
{
if (!save_page_use_compression(rs)) {
@@ -1602,52 +1394,6 @@ static void ram_flush_compressed_data(RAMState *rs)
flush_compressed_data(send_queued_data);
}
-static inline void set_compress_params(CompressParam *param, RAMBlock *block,
- ram_addr_t offset)
-{
- param->block = block;
- param->offset = offset;
- param->trigger = true;
-}
-
-static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
- int (send_queued_data(CompressParam *)))
-{
- int idx, thread_count, pages = -1;
- bool wait = migrate_compress_wait_thread();
-
- thread_count = migrate_compress_threads();
- qemu_mutex_lock(&comp_done_lock);
-retry:
- for (idx = 0; idx < thread_count; idx++) {
- if (comp_param[idx].done) {
- CompressParam *param = &comp_param[idx];
- qemu_mutex_lock(¶m->mutex);
- param->done = false;
- send_queued_data(param);
- compress_reset_result(param);
- set_compress_params(param, block, offset);
-
- qemu_cond_signal(¶m->cond);
- qemu_mutex_unlock(¶m->mutex);
- pages = 1;
- break;
- }
- }
-
- /*
- * wait for the free thread if the user specifies 'compress-wait-thread',
- * otherwise we will post the page out in the main thread as normal page.
- */
- if (pages < 0 && wait) {
- qemu_cond_wait(&comp_done_cond, &comp_done_lock);
- goto retry;
- }
- qemu_mutex_unlock(&comp_done_lock);
-
- return pages;
-}
-
#define PAGE_ALL_CLEAN 0
#define PAGE_TRY_AGAIN 1
#define PAGE_DIRTY_FOUND 2
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 10/13] ram.c: Remove whitespace (squash with previous patch)
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (8 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 09/13] ram.c: Move core compression code into its own file Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 16:15 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 11/13] ram.c: Move core decompression code into its own file Lukas Straub
` (3 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 575 bytes --]
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/ram.c | 18 ------------------
1 file changed, 18 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index 42d6a54132..7be09c18e3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -529,24 +529,6 @@ static bool pss_overlap(PageSearchStatus *pss1, PageSearchStatus *pss2)
(pss1->host_page_start == pss2->host_page_start);
}
-
-
-
-
-/* split */
-
-
-
-
-
-
-
-/* split */
-
-
-
-
-
/**
* save_page_header: write page header to wire
*
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 11/13] ram.c: Move core decompression code into its own file
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (9 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 10/13] ram.c: Remove whitespace (squash with previous patch) Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 17:22 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 12/13] ram compress: Assert that the file buffer matches the result Lukas Straub
` (2 subsequent siblings)
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 14329 bytes --]
No functional changes intended.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Philippe Mathieu-Daudé <philmd@linaro.org>
---
migration/ram-compress.c | 203 ++++++++++++++++++++++++++++++++++++++
migration/ram-compress.h | 5 +
migration/ram.c | 204 ---------------------------------------
3 files changed, 208 insertions(+), 204 deletions(-)
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index 77902a1d65..f75b8c3079 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -47,6 +47,24 @@ static QemuThread *compress_threads;
static QemuMutex comp_done_lock;
static QemuCond comp_done_cond;
+struct DecompressParam {
+ bool done;
+ bool quit;
+ QemuMutex mutex;
+ QemuCond cond;
+ void *des;
+ uint8_t *compbuf;
+ int len;
+ z_stream stream;
+};
+typedef struct DecompressParam DecompressParam;
+
+static QEMUFile *decomp_file;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static QemuMutex decomp_done_lock;
+static QemuCond decomp_done_cond;
+
static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
RAMBlock *block, ram_addr_t offset,
uint8_t *source_buf);
@@ -271,3 +289,188 @@ retry:
return pages;
}
+
+/* return the size after decompression, or negative value on error */
+static int
+qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len)
+{
+ int err;
+
+ err = inflateReset(stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = source_len;
+ stream->next_in = (uint8_t *)source;
+ stream->avail_out = dest_len;
+ stream->next_out = dest;
+
+ err = inflate(stream, Z_NO_FLUSH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ return stream->total_out;
+}
+
+static void *do_data_decompress(void *opaque)
+{
+ DecompressParam *param = opaque;
+ unsigned long pagesize;
+ uint8_t *des;
+ int len, ret;
+
+ qemu_mutex_lock(¶m->mutex);
+ while (!param->quit) {
+ if (param->des) {
+ des = param->des;
+ len = param->len;
+ param->des = 0;
+ qemu_mutex_unlock(¶m->mutex);
+
+ pagesize = TARGET_PAGE_SIZE;
+
+ ret = qemu_uncompress_data(¶m->stream, des, pagesize,
+ param->compbuf, len);
+ if (ret < 0 && migrate_get_current()->decompress_error_check) {
+ error_report("decompress data failed");
+ qemu_file_set_error(decomp_file, ret);
+ }
+
+ qemu_mutex_lock(&decomp_done_lock);
+ param->done = true;
+ qemu_cond_signal(&decomp_done_cond);
+ qemu_mutex_unlock(&decomp_done_lock);
+
+ qemu_mutex_lock(¶m->mutex);
+ } else {
+ qemu_cond_wait(¶m->cond, ¶m->mutex);
+ }
+ }
+ qemu_mutex_unlock(¶m->mutex);
+
+ return NULL;
+}
+
+int wait_for_decompress_done(void)
+{
+ int idx, thread_count;
+
+ if (!migrate_use_compression()) {
+ return 0;
+ }
+
+ thread_count = migrate_decompress_threads();
+ qemu_mutex_lock(&decomp_done_lock);
+ for (idx = 0; idx < thread_count; idx++) {
+ while (!decomp_param[idx].done) {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
+ }
+ }
+ qemu_mutex_unlock(&decomp_done_lock);
+ return qemu_file_get_error(decomp_file);
+}
+
+void compress_threads_load_cleanup(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_compression()) {
+ return;
+ }
+ thread_count = migrate_decompress_threads();
+ for (i = 0; i < thread_count; i++) {
+ /*
+ * we use it as a indicator which shows if the thread is
+ * properly init'd or not
+ */
+ if (!decomp_param[i].compbuf) {
+ break;
+ }
+
+ qemu_mutex_lock(&decomp_param[i].mutex);
+ decomp_param[i].quit = true;
+ qemu_cond_signal(&decomp_param[i].cond);
+ qemu_mutex_unlock(&decomp_param[i].mutex);
+ }
+ for (i = 0; i < thread_count; i++) {
+ if (!decomp_param[i].compbuf) {
+ break;
+ }
+
+ qemu_thread_join(decompress_threads + i);
+ qemu_mutex_destroy(&decomp_param[i].mutex);
+ qemu_cond_destroy(&decomp_param[i].cond);
+ inflateEnd(&decomp_param[i].stream);
+ g_free(decomp_param[i].compbuf);
+ decomp_param[i].compbuf = NULL;
+ }
+ g_free(decompress_threads);
+ g_free(decomp_param);
+ decompress_threads = NULL;
+ decomp_param = NULL;
+ decomp_file = NULL;
+}
+
+int compress_threads_load_setup(QEMUFile *f)
+{
+ int i, thread_count;
+
+ if (!migrate_use_compression()) {
+ return 0;
+ }
+
+ thread_count = migrate_decompress_threads();
+ decompress_threads = g_new0(QemuThread, thread_count);
+ decomp_param = g_new0(DecompressParam, thread_count);
+ qemu_mutex_init(&decomp_done_lock);
+ qemu_cond_init(&decomp_done_cond);
+ decomp_file = f;
+ for (i = 0; i < thread_count; i++) {
+ if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+ goto exit;
+ }
+
+ decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ qemu_mutex_init(&decomp_param[i].mutex);
+ qemu_cond_init(&decomp_param[i].cond);
+ decomp_param[i].done = true;
+ decomp_param[i].quit = false;
+ qemu_thread_create(decompress_threads + i, "decompress",
+ do_data_decompress, decomp_param + i,
+ QEMU_THREAD_JOINABLE);
+ }
+ return 0;
+exit:
+ compress_threads_load_cleanup();
+ return -1;
+}
+
+void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
+{
+ int idx, thread_count;
+
+ thread_count = migrate_decompress_threads();
+ QEMU_LOCK_GUARD(&decomp_done_lock);
+ while (true) {
+ for (idx = 0; idx < thread_count; idx++) {
+ if (decomp_param[idx].done) {
+ decomp_param[idx].done = false;
+ qemu_mutex_lock(&decomp_param[idx].mutex);
+ qemu_get_buffer(f, decomp_param[idx].compbuf, len);
+ decomp_param[idx].des = host;
+ decomp_param[idx].len = len;
+ qemu_cond_signal(&decomp_param[idx].cond);
+ qemu_mutex_unlock(&decomp_param[idx].mutex);
+ break;
+ }
+ }
+ if (idx < thread_count) {
+ break;
+ } else {
+ qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
+ }
+ }
+}
diff --git a/migration/ram-compress.h b/migration/ram-compress.h
index 06570a799c..6f7fe2f472 100644
--- a/migration/ram-compress.h
+++ b/migration/ram-compress.h
@@ -62,4 +62,9 @@ void flush_compressed_data(int (send_queued_data(CompressParam *)));
int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
int (send_queued_data(CompressParam *)));
+int wait_for_decompress_done(void);
+void compress_threads_load_cleanup(void);
+int compress_threads_load_setup(QEMUFile *f);
+void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len);
+
#endif
diff --git a/migration/ram.c b/migration/ram.c
index 7be09c18e3..1e5dede6f0 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -491,24 +491,6 @@ typedef struct MigrationOps MigrationOps;
MigrationOps *migration_ops;
-struct DecompressParam {
- bool done;
- bool quit;
- QemuMutex mutex;
- QemuCond cond;
- void *des;
- uint8_t *compbuf;
- int len;
- z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
-static QEMUFile *decomp_file;
-static DecompressParam *decomp_param;
-static QemuThread *decompress_threads;
-static QemuMutex decomp_done_lock;
-static QemuCond decomp_done_cond;
-
static int ram_save_host_page_urgent(PageSearchStatus *pss);
/* NOTE: page is the PFN not real ram_addr_t. */
@@ -3467,192 +3449,6 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
}
}
-/* return the size after decompression, or negative value on error */
-static int
-qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
- const uint8_t *source, size_t source_len)
-{
- int err;
-
- err = inflateReset(stream);
- if (err != Z_OK) {
- return -1;
- }
-
- stream->avail_in = source_len;
- stream->next_in = (uint8_t *)source;
- stream->avail_out = dest_len;
- stream->next_out = dest;
-
- err = inflate(stream, Z_NO_FLUSH);
- if (err != Z_STREAM_END) {
- return -1;
- }
-
- return stream->total_out;
-}
-
-static void *do_data_decompress(void *opaque)
-{
- DecompressParam *param = opaque;
- unsigned long pagesize;
- uint8_t *des;
- int len, ret;
-
- qemu_mutex_lock(¶m->mutex);
- while (!param->quit) {
- if (param->des) {
- des = param->des;
- len = param->len;
- param->des = 0;
- qemu_mutex_unlock(¶m->mutex);
-
- pagesize = TARGET_PAGE_SIZE;
-
- ret = qemu_uncompress_data(¶m->stream, des, pagesize,
- param->compbuf, len);
- if (ret < 0 && migrate_get_current()->decompress_error_check) {
- error_report("decompress data failed");
- qemu_file_set_error(decomp_file, ret);
- }
-
- qemu_mutex_lock(&decomp_done_lock);
- param->done = true;
- qemu_cond_signal(&decomp_done_cond);
- qemu_mutex_unlock(&decomp_done_lock);
-
- qemu_mutex_lock(¶m->mutex);
- } else {
- qemu_cond_wait(¶m->cond, ¶m->mutex);
- }
- }
- qemu_mutex_unlock(¶m->mutex);
-
- return NULL;
-}
-
-static int wait_for_decompress_done(void)
-{
- int idx, thread_count;
-
- if (!migrate_use_compression()) {
- return 0;
- }
-
- thread_count = migrate_decompress_threads();
- qemu_mutex_lock(&decomp_done_lock);
- for (idx = 0; idx < thread_count; idx++) {
- while (!decomp_param[idx].done) {
- qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
- }
- }
- qemu_mutex_unlock(&decomp_done_lock);
- return qemu_file_get_error(decomp_file);
-}
-
-static void compress_threads_load_cleanup(void)
-{
- int i, thread_count;
-
- if (!migrate_use_compression()) {
- return;
- }
- thread_count = migrate_decompress_threads();
- for (i = 0; i < thread_count; i++) {
- /*
- * we use it as a indicator which shows if the thread is
- * properly init'd or not
- */
- if (!decomp_param[i].compbuf) {
- break;
- }
-
- qemu_mutex_lock(&decomp_param[i].mutex);
- decomp_param[i].quit = true;
- qemu_cond_signal(&decomp_param[i].cond);
- qemu_mutex_unlock(&decomp_param[i].mutex);
- }
- for (i = 0; i < thread_count; i++) {
- if (!decomp_param[i].compbuf) {
- break;
- }
-
- qemu_thread_join(decompress_threads + i);
- qemu_mutex_destroy(&decomp_param[i].mutex);
- qemu_cond_destroy(&decomp_param[i].cond);
- inflateEnd(&decomp_param[i].stream);
- g_free(decomp_param[i].compbuf);
- decomp_param[i].compbuf = NULL;
- }
- g_free(decompress_threads);
- g_free(decomp_param);
- decompress_threads = NULL;
- decomp_param = NULL;
- decomp_file = NULL;
-}
-
-static int compress_threads_load_setup(QEMUFile *f)
-{
- int i, thread_count;
-
- if (!migrate_use_compression()) {
- return 0;
- }
-
- thread_count = migrate_decompress_threads();
- decompress_threads = g_new0(QemuThread, thread_count);
- decomp_param = g_new0(DecompressParam, thread_count);
- qemu_mutex_init(&decomp_done_lock);
- qemu_cond_init(&decomp_done_cond);
- decomp_file = f;
- for (i = 0; i < thread_count; i++) {
- if (inflateInit(&decomp_param[i].stream) != Z_OK) {
- goto exit;
- }
-
- decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
- qemu_mutex_init(&decomp_param[i].mutex);
- qemu_cond_init(&decomp_param[i].cond);
- decomp_param[i].done = true;
- decomp_param[i].quit = false;
- qemu_thread_create(decompress_threads + i, "decompress",
- do_data_decompress, decomp_param + i,
- QEMU_THREAD_JOINABLE);
- }
- return 0;
-exit:
- compress_threads_load_cleanup();
- return -1;
-}
-
-static void decompress_data_with_multi_threads(QEMUFile *f,
- void *host, int len)
-{
- int idx, thread_count;
-
- thread_count = migrate_decompress_threads();
- QEMU_LOCK_GUARD(&decomp_done_lock);
- while (true) {
- for (idx = 0; idx < thread_count; idx++) {
- if (decomp_param[idx].done) {
- decomp_param[idx].done = false;
- qemu_mutex_lock(&decomp_param[idx].mutex);
- qemu_get_buffer(f, decomp_param[idx].compbuf, len);
- decomp_param[idx].des = host;
- decomp_param[idx].len = len;
- qemu_cond_signal(&decomp_param[idx].cond);
- qemu_mutex_unlock(&decomp_param[idx].mutex);
- break;
- }
- }
- if (idx < thread_count) {
- break;
- } else {
- qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
- }
- }
-}
-
static void colo_init_ram_state(void)
{
ram_state_init(&ram_state);
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 12/13] ram compress: Assert that the file buffer matches the result
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (10 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 11/13] ram.c: Move core decompression code into its own file Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 17:24 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 13/13] ram-compress.c: Make target independent Lukas Straub
2023-04-20 9:59 ` [PATCH v2 14/13] migration: Initialize and cleanup decompression in migration.c Lukas Straub
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 3900 bytes --]
Before this series, "nothing to send" was handled by the file buffer
being empty. Now it is tracked via param->result.
Assert that the file buffer state matches the result.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/qemu-file.c | 11 +++++++++++
migration/qemu-file.h | 1 +
migration/ram-compress.c | 5 +++++
migration/ram.c | 2 ++
4 files changed, 19 insertions(+)
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 102ab3b439..2b3f3f8549 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -887,6 +887,17 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
return len;
}
+/*
+ * Check if the writable buffer is empty
+ */
+
+bool qemu_file_buffer_empty(QEMUFile *file)
+{
+ assert(qemu_file_is_writable(file));
+
+ return !file->iovcnt;
+}
+
/*
* Get a string whose length is determined by a single preceding byte
* A preallocated 256 byte buffer must be passed in.
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index 9d0155a2a1..15e5f189f0 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -113,6 +113,7 @@ size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size);
ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
const uint8_t *p, size_t size);
int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
+bool qemu_file_buffer_empty(QEMUFile *file);
/*
* Note that you can only peek continuous bytes from where the current pointer
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index f75b8c3079..b75a9d2b9a 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -193,6 +193,8 @@ static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
uint8_t *p = block->host + offset;
int ret;
+ assert(qemu_file_buffer_empty(f));
+
if (buffer_is_zero(p, TARGET_PAGE_SIZE)) {
return RES_ZEROPAGE;
}
@@ -207,6 +209,7 @@ static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
if (ret < 0) {
qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
+ qemu_fflush(f);
return RES_NONE;
}
return RES_COMPRESS;
@@ -238,6 +241,7 @@ void flush_compressed_data(int (send_queued_data(CompressParam *)))
if (!comp_param[idx].quit) {
CompressParam *param = &comp_param[idx];
send_queued_data(param);
+ assert(qemu_file_buffer_empty(param->file));
compress_reset_result(param);
}
qemu_mutex_unlock(&comp_param[idx].mutex);
@@ -267,6 +271,7 @@ retry:
qemu_mutex_lock(¶m->mutex);
param->done = false;
send_queued_data(param);
+ assert(qemu_file_buffer_empty(param->file));
compress_reset_result(param);
set_compress_params(param, block, offset);
diff --git a/migration/ram.c b/migration/ram.c
index 1e5dede6f0..ccfedf4fb5 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1332,11 +1332,13 @@ static int send_queued_data(CompressParam *param)
assert(block == pss->last_sent_block);
if (param->result == RES_ZEROPAGE) {
+ assert(qemu_file_buffer_empty(param->file));
len += save_page_header(pss, file, block, offset | RAM_SAVE_FLAG_ZERO);
qemu_put_byte(file, 0);
len += 1;
ram_release_page(block->idstr, offset);
} else if (param->result == RES_COMPRESS) {
+ assert(!qemu_file_buffer_empty(param->file));
len += save_page_header(pss, file, block,
offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
len += qemu_put_qemu_file(file, param->file);
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 13/13] ram-compress.c: Make target independent
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (11 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 12/13] ram compress: Assert that the file buffer matches the result Lukas Straub
@ 2023-04-20 9:48 ` Lukas Straub
2023-04-28 17:29 ` Juan Quintela
2023-04-20 9:59 ` [PATCH v2 14/13] migration: Initialize and cleanup decompression in migration.c Lukas Straub
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:48 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 3564 bytes --]
Make ram-compress.c target independent.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
migration/meson.build | 2 +-
migration/ram-compress.c | 17 ++++++++++-------
2 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/migration/meson.build b/migration/meson.build
index 262e3c9754..16f642031c 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -22,6 +22,7 @@ softmmu_ss.add(files(
'migration.c',
'multifd.c',
'multifd-zlib.c',
+ 'ram-compress.c',
'postcopy-ram.c',
'savevm.c',
'socket.c',
@@ -38,5 +39,4 @@ softmmu_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
specific_ss.add(when: 'CONFIG_SOFTMMU',
if_true: files('dirtyrate.c',
'ram.c',
- 'ram-compress.c',
'target.c'))
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index b75a9d2b9a..9f03c3cc66 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -34,7 +34,8 @@
#include "qemu/error-report.h"
#include "migration.h"
#include "io/channel-null.h"
-#include "exec/ram_addr.h"
+#include "exec/target_page.h"
+#include "exec/ramblock.h"
CompressionStats compression_counters;
@@ -155,7 +156,7 @@ int compress_threads_save_setup(void)
qemu_cond_init(&comp_done_cond);
qemu_mutex_init(&comp_done_lock);
for (i = 0; i < thread_count; i++) {
- comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+ comp_param[i].originbuf = g_try_malloc(qemu_target_page_size());
if (!comp_param[i].originbuf) {
goto exit;
}
@@ -191,11 +192,12 @@ static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
uint8_t *source_buf)
{
uint8_t *p = block->host + offset;
+ size_t page_size = qemu_target_page_size();
int ret;
assert(qemu_file_buffer_empty(f));
- if (buffer_is_zero(p, TARGET_PAGE_SIZE)) {
+ if (buffer_is_zero(p, page_size)) {
return RES_ZEROPAGE;
}
@@ -204,8 +206,8 @@ static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
* so that we can catch up the error during compression and
* decompression
*/
- memcpy(source_buf, p, TARGET_PAGE_SIZE);
- ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
+ memcpy(source_buf, p, page_size);
+ ret = qemu_put_compression_data(f, stream, source_buf, page_size);
if (ret < 0) {
qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
@@ -335,7 +337,7 @@ static void *do_data_decompress(void *opaque)
param->des = 0;
qemu_mutex_unlock(¶m->mutex);
- pagesize = TARGET_PAGE_SIZE;
+ pagesize = qemu_target_page_size();
ret = qemu_uncompress_data(¶m->stream, des, pagesize,
param->compbuf, len);
@@ -438,7 +440,8 @@ int compress_threads_load_setup(QEMUFile *f)
goto exit;
}
- decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ size_t compbuf_size = compressBound(qemu_target_page_size());
+ decomp_param[i].compbuf = g_malloc0(compbuf_size);
qemu_mutex_init(&decomp_param[i].mutex);
qemu_cond_init(&decomp_param[i].cond);
decomp_param[i].done = true;
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* [PATCH v2 14/13] migration: Initialize and cleanup decompression in migration.c
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
` (12 preceding siblings ...)
2023-04-20 9:48 ` [PATCH v2 13/13] ram-compress.c: Make target independent Lukas Straub
@ 2023-04-20 9:59 ` Lukas Straub
2023-04-28 17:33 ` Juan Quintela
13 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 9:59 UTC (permalink / raw)
To: qemu-devel
Cc: Juan Quintela, Peter Xu, Thomas Huth, Laurent Vivier,
Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 2170 bytes --]
This fixes compress with colo.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
Oops, this one slipped trough
migration/migration.c | 9 +++++++++
migration/ram.c | 5 -----
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/migration/migration.c b/migration/migration.c
index bda4789193..e7d082a208 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -26,6 +26,7 @@
#include "sysemu/cpu-throttle.h"
#include "rdma.h"
#include "ram.h"
+#include "ram-compress.h"
#include "migration/global_state.h"
#include "migration/misc.h"
#include "migration.h"
@@ -316,6 +317,7 @@ void migration_incoming_state_destroy(void)
struct MigrationIncomingState *mis = migration_incoming_get_current();
multifd_load_cleanup();
+ compress_threads_load_cleanup();
if (mis->to_src_file) {
/* Tell source that we are done */
@@ -598,6 +600,12 @@ process_incoming_migration_co(void *opaque)
Error *local_err = NULL;
assert(mis->from_src_file);
+
+ if (compress_threads_load_setup(mis->from_src_file)) {
+ error_report("Failed to setup decompress threads");
+ goto fail;
+ }
+
mis->migration_incoming_co = qemu_coroutine_self();
mis->largest_page_size = qemu_ram_pagesize_largest();
postcopy_state_set(POSTCOPY_INCOMING_NONE);
@@ -663,6 +671,7 @@ fail:
qemu_fclose(mis->from_src_file);
multifd_load_cleanup();
+ compress_threads_load_cleanup();
exit(EXIT_FAILURE);
}
diff --git a/migration/ram.c b/migration/ram.c
index ccfedf4fb5..344f326065 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -3560,10 +3560,6 @@ void colo_release_ram_cache(void)
*/
static int ram_load_setup(QEMUFile *f, void *opaque)
{
- if (compress_threads_load_setup(f)) {
- return -1;
- }
-
xbzrle_load_setup();
ramblock_recv_map_init();
@@ -3579,7 +3575,6 @@ static int ram_load_cleanup(void *opaque)
}
xbzrle_load_cleanup();
- compress_threads_load_cleanup();
RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
g_free(rb->receivedmap);
--
2.40.0
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply related [flat|nested] 30+ messages in thread
* Re: [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled
2023-04-20 9:47 ` [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled Lukas Straub
@ 2023-04-20 10:20 ` Juan Quintela
2023-04-20 10:37 ` Lukas Straub
0 siblings, 1 reply; 30+ messages in thread
From: Juan Quintela @ 2023-04-20 10:20 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> Add postcopy tests with compress enabled to ensure nothing breaks
> with the refactoring in the next commits.
>
> preempt+compress is blocked, so no test needed for that case.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
And I wanted to removed the old compression code and it gets new users. Sniff.
> ---
> tests/qtest/migration-test.c | 83 +++++++++++++++++++++++-------------
> 1 file changed, 53 insertions(+), 30 deletions(-)
>
> diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
> index 1f2a019ce0..930cb4f29d 100644
> --- a/tests/qtest/migration-test.c
> +++ b/tests/qtest/migration-test.c
> @@ -1127,6 +1127,36 @@ test_migrate_tls_x509_finish(QTestState *from,
> #endif /* CONFIG_TASN1 */
> #endif /* CONFIG_GNUTLS */
>
> +static void *
> +test_migrate_compress_start(QTestState *from,
> + QTestState *to)
> +{
> + migrate_set_parameter_int(from, "compress-level", 1);
> + migrate_set_parameter_int(from, "compress-threads", 4);
> + migrate_set_parameter_bool(from, "compress-wait-thread", true);
> + migrate_set_parameter_int(to, "decompress-threads", 4);
> +
> + migrate_set_capability(from, "compress", true);
> + migrate_set_capability(to, "compress", true);
> +
> + return NULL;
> +}
Independently of this patch, we need to change this test to use 4
compression tests and 3 decompression or anything that is not the same
number in both sides.
I was complaining about this and when I arrived to the end of the path
found that this was code movement.
Later, Juan.
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled
2023-04-20 10:20 ` Juan Quintela
@ 2023-04-20 10:37 ` Lukas Straub
2023-04-20 21:12 ` Juan Quintela
0 siblings, 1 reply; 30+ messages in thread
From: Lukas Straub @ 2023-04-20 10:37 UTC (permalink / raw)
To: Juan Quintela
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 2398 bytes --]
On Thu, 20 Apr 2023 12:20:25 +0200
Juan Quintela <quintela@redhat.com> wrote:
> Lukas Straub <lukasstraub2@web.de> wrote:
> > Add postcopy tests with compress enabled to ensure nothing breaks
> > with the refactoring in the next commits.
> >
> > preempt+compress is blocked, so no test needed for that case.
> >
> > Signed-off-by: Lukas Straub <lukasstraub2@web.de>
>
> Reviewed-by: Juan Quintela <quintela@redhat.com>
>
> And I wanted to removed the old compression code and it gets new users. Sniff.
Who know how many compress threads users are out there...
By the way, I'm not against deprecating compress threads in the long
run. I'm already working on (cleanly :)) adding colo support with
multifd.
> > ---
> > tests/qtest/migration-test.c | 83 +++++++++++++++++++++++-------------
> > 1 file changed, 53 insertions(+), 30 deletions(-)
> >
> > diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
> > index 1f2a019ce0..930cb4f29d 100644
> > --- a/tests/qtest/migration-test.c
> > +++ b/tests/qtest/migration-test.c
> > @@ -1127,6 +1127,36 @@ test_migrate_tls_x509_finish(QTestState *from,
> > #endif /* CONFIG_TASN1 */
> > #endif /* CONFIG_GNUTLS */
> >
> > +static void *
> > +test_migrate_compress_start(QTestState *from,
> > + QTestState *to)
> > +{
> > + migrate_set_parameter_int(from, "compress-level", 1);
> > + migrate_set_parameter_int(from, "compress-threads", 4);
> > + migrate_set_parameter_bool(from, "compress-wait-thread", true);
> > + migrate_set_parameter_int(to, "decompress-threads", 4);
> > +
> > + migrate_set_capability(from, "compress", true);
> > + migrate_set_capability(to, "compress", true);
> > +
> > + return NULL;
> > +}
>
> Independently of this patch, we need to change this test to use 4
> compression tests and 3 decompression or anything that is not the same
> number in both sides.
>
> I was complaining about this and when I arrived to the end of the path
> found that this was code movement.
>
> Later, Juan.
>
Oops, forgot to mention, the test is based on this patch
https://lore.kernel.org/qemu-devel/2f4abb67cf5f3e1591b2666676462a93bdd20bbc.1680618040.git.lukasstraub2@web.de/
Will probably carry the patch with this series then. So you mean 4
compress _threads_ and 3 decompress _threads_?
--
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled
2023-04-20 10:37 ` Lukas Straub
@ 2023-04-20 21:12 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-20 21:12 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> On Thu, 20 Apr 2023 12:20:25 +0200
> Juan Quintela <quintela@redhat.com> wrote:
>
>> Lukas Straub <lukasstraub2@web.de> wrote:
>> > Add postcopy tests with compress enabled to ensure nothing breaks
>> > with the refactoring in the next commits.
>> >
>> > preempt+compress is blocked, so no test needed for that case.
>> >
>> > Signed-off-by: Lukas Straub <lukasstraub2@web.de>
>>
>> Reviewed-by: Juan Quintela <quintela@redhat.com>
>>
>> And I wanted to removed the old compression code and it gets new users. Sniff.
>
> Who know how many compress threads users are out there...
Not too much.
We broke it during development and nobody found it.
And the reason that I wrote the multifd-zlib compression code was
because I was not able to get a migration-test working with compression,
so ....
> By the way, I'm not against deprecating compress threads in the long
> run. I'm already working on (cleanly :)) adding colo support with
> multifd.
Ok, then I will still put the deprecate comment there.
>> > ---
>> > tests/qtest/migration-test.c | 83 +++++++++++++++++++++++-------------
>> > 1 file changed, 53 insertions(+), 30 deletions(-)
>> >
>> > diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
>> > index 1f2a019ce0..930cb4f29d 100644
>> > --- a/tests/qtest/migration-test.c
>> > +++ b/tests/qtest/migration-test.c
>> > @@ -1127,6 +1127,36 @@ test_migrate_tls_x509_finish(QTestState *from,
>> > #endif /* CONFIG_TASN1 */
>> > #endif /* CONFIG_GNUTLS */
>> >
>> > +static void *
>> > +test_migrate_compress_start(QTestState *from,
>> > + QTestState *to)
>> > +{
>> > + migrate_set_parameter_int(from, "compress-level", 1);
>> > + migrate_set_parameter_int(from, "compress-threads", 4);
>> > + migrate_set_parameter_bool(from, "compress-wait-thread", true);
>> > + migrate_set_parameter_int(to, "decompress-threads", 4);
>> > +
>> > + migrate_set_capability(from, "compress", true);
>> > + migrate_set_capability(to, "compress", true);
>> > +
>> > + return NULL;
>> > +}
>>
>> Independently of this patch, we need to change this test to use 4
>> compression tests and 3 decompression or anything that is not the same
>> number in both sides.
>>
>> I was complaining about this and when I arrived to the end of the path
>> found that this was code movement.
>>
>> Later, Juan.
>>
>
> Oops, forgot to mention, the test is based on this patch
> https://lore.kernel.org/qemu-devel/2f4abb67cf5f3e1591b2666676462a93bdd20bbc.1680618040.git.lukasstraub2@web.de/
>
> Will probably carry the patch with this series then. So you mean 4
> compress _threads_ and 3 decompress _threads_?
Yeap.
Later, Juan.
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 02/13] ram.c: Let the compress threads return a CompressResult enum
2023-04-20 9:47 ` [PATCH v2 02/13] ram.c: Let the compress threads return a CompressResult enum Lukas Straub
@ 2023-04-20 21:13 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-20 21:13 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> This will be used in the next commits to move save_page_header()
> out of compress code.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 03/13] ram.c: Dont change param->block in the compress thread
2023-04-20 9:47 ` [PATCH v2 03/13] ram.c: Dont change param->block in the compress thread Lukas Straub
@ 2023-04-20 21:13 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-20 21:13 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> Instead introduce a extra parameter to trigger the compress thread.
> Now, when the compress thread is done, we know what RAMBlock and
> offset it did compress.
>
> This will be used in the next commits to move save_page_header()
> out of compress code.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Much better, thanks.
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 04/13] ram.c: Reset result after sending queued data
2023-04-20 9:48 ` [PATCH v2 04/13] ram.c: Reset result after sending queued data Lukas Straub
@ 2023-04-28 11:59 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 11:59 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> And take the param->mutex lock for the whole section to ensure
> thread-safety.
> Now, it is explicitly clear if there is no queued data to send.
> Before, this was handled by param->file stream being empty and thus
> qemu_put_qemu_file() not sending anything.
>
> This will be used in the next commits to move save_page_header()
> out of compress code.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
If you do more changes around here, please consider:
> @@ -1540,13 +1547,16 @@ static void flush_compressed_data(RAMState *rs)
> for (idx = 0; idx < thread_count; idx++) {
Move
CompressParam *param = &comp_param[idx];
to here, and use it also for the locks.
I will even think about calling the variable just p.
And once there, everything under the sun except this uses i as a for
variable, not idx O:-)
> qemu_mutex_lock(&comp_param[idx].mutex);
> if (!comp_param[idx].quit) {
> - len = qemu_put_qemu_file(ms->to_dst_file, comp_param[idx].file);
> + CompressParam *param = &comp_param[idx];
Move this declaration
> + len = qemu_put_qemu_file(ms->to_dst_file, param->file);
> + compress_reset_result(param);
> +
> /*
> * it's safe to fetch zero_page without holding comp_done_lock
> * as there is no further request submitted to the thread,
> * i.e, the thread should be waiting for a request at this point.
> */
> - update_compress_thread_counts(&comp_param[idx], len);
> + update_compress_thread_counts(param, len);
> }
> qemu_mutex_unlock(&comp_param[idx].mutex);
> }
> @@ -1571,15 +1581,17 @@ static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
> retry:
> for (idx = 0; idx < thread_count; idx++) {
> if (comp_param[idx].done) {
> - comp_param[idx].done = false;
Same here.
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 05/13] ram.c: Do not call save_page_header() from compress threads
2023-04-20 9:48 ` [PATCH v2 05/13] ram.c: Do not call save_page_header() from compress threads Lukas Straub
@ 2023-04-28 12:02 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 12:02 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> save_page_header() accesses several global variables, so calling it
> from multiple threads is pretty ugly.
>
> Instead, call save_page_header() before writing out the compressed
> data from the compress buffer to the migration stream.
>
> This also makes the core compress code more independend from ram.c.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 06/13] ram.c: Call update_compress_thread_counts from compress_send_queued_data
2023-04-20 9:48 ` [PATCH v2 06/13] ram.c: Call update_compress_thread_counts from compress_send_queued_data Lukas Straub
@ 2023-04-28 12:10 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 12:10 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> This makes the core compress code more independend from ram.c.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
I haven't looked all the series yet
> @@ -1575,15 +1577,8 @@ static void flush_compressed_data(RAMState *rs)
> qemu_mutex_lock(&comp_param[idx].mutex);
> if (!comp_param[idx].quit) {
> CompressParam *param = &comp_param[idx];
> - len = send_queued_data(param);
> + send_queued_data(param);
but it appears that send_queued_data don't need to return anything now.
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 07/13] ram.c: Remove last ram.c dependency from the core compress code
2023-04-20 9:48 ` [PATCH v2 07/13] ram.c: Remove last ram.c dependency from the core compress code Lukas Straub
@ 2023-04-28 12:14 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 12:14 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> Make compression interfaces take send_queued_data() as an argument.
> Remove save_page_use_compression() from flush_compressed_data().
>
> This removes the last ram.c dependency from the core compress code.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 08/13] ram.c: Introduce whitespace (squash with next patch)
2023-04-20 9:48 ` [PATCH v2 08/13] ram.c: Introduce whitespace (squash with next patch) Lukas Straub
@ 2023-04-28 16:13 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 16:13 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> Introduce whitespace to make it easier to reroll the series.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
obviously skipped.
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 10/13] ram.c: Remove whitespace (squash with previous patch)
2023-04-20 9:48 ` [PATCH v2 10/13] ram.c: Remove whitespace (squash with previous patch) Lukas Straub
@ 2023-04-28 16:15 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 16:15 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
obviously skipped
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 11/13] ram.c: Move core decompression code into its own file
2023-04-20 9:48 ` [PATCH v2 11/13] ram.c: Move core decompression code into its own file Lukas Straub
@ 2023-04-28 17:22 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 17:22 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> No functional changes intended.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> Reviewed-by: Philippe Mathieu-Daudé <philmd@linaro.org>
Reviewed-by: Juan Quintela <quintela@redhat.com>
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 12/13] ram compress: Assert that the file buffer matches the result
2023-04-20 9:48 ` [PATCH v2 12/13] ram compress: Assert that the file buffer matches the result Lukas Straub
@ 2023-04-28 17:24 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 17:24 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> Before this series, "nothing to send" was handled by the file buffer
> being empty. Now it is tracked via param->result.
>
> Assert that the file buffer state matches the result.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Even when:
> +/*
> + * Check if the writable buffer is empty
> + */
> +
> +bool qemu_file_buffer_empty(QEMUFile *file)
> +{
> + assert(qemu_file_is_writable(file));
A function that is basically an assert
> + assert(qemu_file_buffer_empty(f));
Is always called as an assert O:-)
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 13/13] ram-compress.c: Make target independent
2023-04-20 9:48 ` [PATCH v2 13/13] ram-compress.c: Make target independent Lukas Straub
@ 2023-04-28 17:29 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 17:29 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> Make ram-compress.c target independent.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [PATCH v2 14/13] migration: Initialize and cleanup decompression in migration.c
2023-04-20 9:59 ` [PATCH v2 14/13] migration: Initialize and cleanup decompression in migration.c Lukas Straub
@ 2023-04-28 17:33 ` Juan Quintela
0 siblings, 0 replies; 30+ messages in thread
From: Juan Quintela @ 2023-04-28 17:33 UTC (permalink / raw)
To: Lukas Straub
Cc: qemu-devel, Peter Xu, Thomas Huth, Laurent Vivier, Paolo Bonzini
Lukas Straub <lukasstraub2@web.de> wrote:
> This fixes compress with colo.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
> ---
>
> Oops, this one slipped trough
As the saying goes, it happes in the best families.
Yeap, it is a literal translation of a Spanish saying O:-)
^ permalink raw reply [flat|nested] 30+ messages in thread
end of thread, other threads:[~2023-04-28 17:34 UTC | newest]
Thread overview: 30+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-04-20 9:47 [PATCH v2 00/13] migration/ram.c: Refactor compress code Lukas Straub
2023-04-20 9:47 ` [PATCH v2 01/13] qtest/migration-test.c: Add postcopy tests with compress enabled Lukas Straub
2023-04-20 10:20 ` Juan Quintela
2023-04-20 10:37 ` Lukas Straub
2023-04-20 21:12 ` Juan Quintela
2023-04-20 9:47 ` [PATCH v2 02/13] ram.c: Let the compress threads return a CompressResult enum Lukas Straub
2023-04-20 21:13 ` Juan Quintela
2023-04-20 9:47 ` [PATCH v2 03/13] ram.c: Dont change param->block in the compress thread Lukas Straub
2023-04-20 21:13 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 04/13] ram.c: Reset result after sending queued data Lukas Straub
2023-04-28 11:59 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 05/13] ram.c: Do not call save_page_header() from compress threads Lukas Straub
2023-04-28 12:02 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 06/13] ram.c: Call update_compress_thread_counts from compress_send_queued_data Lukas Straub
2023-04-28 12:10 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 07/13] ram.c: Remove last ram.c dependency from the core compress code Lukas Straub
2023-04-28 12:14 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 08/13] ram.c: Introduce whitespace (squash with next patch) Lukas Straub
2023-04-28 16:13 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 09/13] ram.c: Move core compression code into its own file Lukas Straub
2023-04-20 9:48 ` [PATCH v2 10/13] ram.c: Remove whitespace (squash with previous patch) Lukas Straub
2023-04-28 16:15 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 11/13] ram.c: Move core decompression code into its own file Lukas Straub
2023-04-28 17:22 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 12/13] ram compress: Assert that the file buffer matches the result Lukas Straub
2023-04-28 17:24 ` Juan Quintela
2023-04-20 9:48 ` [PATCH v2 13/13] ram-compress.c: Make target independent Lukas Straub
2023-04-28 17:29 ` Juan Quintela
2023-04-20 9:59 ` [PATCH v2 14/13] migration: Initialize and cleanup decompression in migration.c Lukas Straub
2023-04-28 17:33 ` Juan Quintela
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).