* [Qemu-devel] [PATCH 00/12] Multifd v4
@ 2017-02-13 17:19 Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 01/12] migration: Test for disabled features on reception Juan Quintela
` (13 more replies)
0 siblings, 14 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
Hi
[v4]
- Address reviews
- move synchronization to semaphores (faster). Paolo suggestion
- improvements overall (see invidiual patches)
- fix all the checkpatch warnings
- fix all [HACKS] except for one
Please review.
[v3]
This is the 3rd version of multifd. Changes:
- comments for previous verion addressed
- lots of bugs fixed
- remove DPRINTF from ram.c
- add multifd-group parameter, it gives how many pages we sent each
time to the worker threads. I am open to better names.
- Better flush support.
- with migration_set_speed 2G it is able to migrate "stress -vm 2
-vm-bytes 512M" over loopback.
Please review.
Thanks, Juan.
[v2]
This is a version against current code. It is based on top of QIO
work. It improves the thread synchronization and fixes the problem
when we could have two threads handing the same page.
Please comment, Juan.
The following changes since commit df96bfab49dab2d0373e49b51bbb51ce72e1601e:
Merge remote-tracking branch 'remotes/kraxel/tags/pull-vga-20170213-1' into staging (2017-02-13 10:54:49 +0000)
are available in the git repository at:
git://github.com/juanquintela/qemu.git tags/multifd/20170213
for you to fetch changes up to 733be4d2d815c6331b86364f68f6980699e9bb48:
migration: Test new fd infrastructure (2017-02-13 18:14:06 +0100)
----------------------------------------------------------------
multifd/next for 20170213
----------------------------------------------------------------
Juan Quintela (12):
migration: Test for disabled features on reception
migration: Don't create decompression threads if not enabled
migration: Add multifd capability
migration: Create x-multifd-threads parameter
migration: Create x-multifd-group parameter
migration: Create multifd migration threads
migration: Start of multiple fd work
migration: Create ram_multifd_page
migration: Create thread infrastructure for multifd send side
migration: Really use multiple pages at a time
migration: Send the fd number which we are going to use for this page
migration: Test new fd infrastructure
hmp.c | 18 ++
include/migration/migration.h | 17 ++
migration/migration.c | 77 ++++++-
migration/ram.c | 499 +++++++++++++++++++++++++++++++++++++++++-
migration/socket.c | 64 +++++-
qapi-schema.json | 29 ++-
6 files changed, 690 insertions(+), 14 deletions(-)
^ permalink raw reply [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 01/12] migration: Test for disabled features on reception
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-15 13:12 ` Dr. David Alan Gilbert
2017-02-13 17:19 ` [Qemu-devel] [PULL 02/12] migration: Don't create decompression threads if not enabled Juan Quintela
` (12 subsequent siblings)
13 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
Right now, if we receive a compressed page while this features are
disabled, Bad Things (TM) can happen. Just add a test for them.
Signed-off-by: Juan Quintela <quintela@redhat.com>
--
I had XBZRLE here also, but it don't need extra resources on
destination, only on source. Additionally libvirt don't enable it on
destination, so don't put it here.
---
migration/ram.c | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git a/migration/ram.c b/migration/ram.c
index ef8fadf..5817f8c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -2455,7 +2455,7 @@ static int ram_load_postcopy(QEMUFile *f)
static int ram_load(QEMUFile *f, void *opaque, int version_id)
{
- int flags = 0, ret = 0;
+ int flags = 0, ret = 0, invalid_flags;
static uint64_t seq_iter;
int len = 0;
/*
@@ -2470,6 +2470,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
ret = -EINVAL;
}
+ invalid_flags = 0;
+
+ if (!migrate_use_compression()) {
+ invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
+ }
/* This RCU critical section can be very long running.
* When RCU reclaims in the code start to become numerous,
* it will be necessary to reduce the granularity of this
@@ -2490,6 +2495,15 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
flags = addr & ~TARGET_PAGE_MASK;
addr &= TARGET_PAGE_MASK;
+ if (flags & invalid_flags) {
+ if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
+ error_report("Received an unexpected compressed page");
+ }
+
+ ret = -EINVAL;
+ break;
+ }
+
if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
RAMBlock *block = ram_block_from_stream(f, flags);
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 02/12] migration: Don't create decompression threads if not enabled
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 01/12] migration: Test for disabled features on reception Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-15 13:17 ` Dr. David Alan Gilbert
2017-02-13 17:19 ` [Qemu-devel] [PULL 03/12] migration: Add multifd capability Juan Quintela
` (11 subsequent siblings)
13 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
Signed-off-by: Juan Quintela <quintela@redhat.com>
--
I removed the [HACK] part because previous patch just check that
compression pages are not received.
---
migration/ram.c | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/migration/ram.c b/migration/ram.c
index 5817f8c..4422ee8 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -2251,6 +2251,9 @@ void migrate_decompress_threads_create(void)
{
int i, thread_count;
+ if (!migrate_use_compression()) {
+ return;
+ }
thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
@@ -2272,6 +2275,9 @@ void migrate_decompress_threads_join(void)
{
int i, thread_count;
+ if (!migrate_use_compression()) {
+ return;
+ }
thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) {
qemu_mutex_lock(&decomp_param[i].mutex);
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 03/12] migration: Add multifd capability
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 01/12] migration: Test for disabled features on reception Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 02/12] migration: Don't create decompression threads if not enabled Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-15 13:04 ` Dr. David Alan Gilbert
2017-02-13 17:19 ` [Qemu-devel] [PULL 04/12] migration: Create x-multifd-threads parameter Juan Quintela
` (10 subsequent siblings)
13 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
include/migration/migration.h | 1 +
migration/migration.c | 9 +++++++++
qapi-schema.json | 5 +++--
3 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 7528cc2..e8bba55 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -308,6 +308,7 @@ bool migrate_postcopy_ram(void);
bool migrate_zero_blocks(void);
bool migrate_auto_converge(void);
+bool migrate_use_multifd(void);
int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
uint8_t *dst, int dlen);
diff --git a/migration/migration.c b/migration/migration.c
index 2b179c6..37af7a4 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1369,6 +1369,15 @@ bool migrate_use_events(void)
return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS];
}
+bool migrate_use_multifd(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
+}
+
int migrate_use_xbzrle(void)
{
MigrationState *s;
diff --git a/qapi-schema.json b/qapi-schema.json
index 61151f3..ff7579e 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -865,12 +865,13 @@
# side, this process is called COarse-Grain LOck Stepping (COLO) for
# Non-stop Service. (since 2.8)
#
+# @x-multifd: Use more than one fd for migration (since 2.9)
+#
# Since: 1.2
##
{ 'enum': 'MigrationCapability',
'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
- 'compress', 'events', 'postcopy-ram', 'x-colo'] }
-
+ 'compress', 'events', 'postcopy-ram', 'x-colo', 'x-multifd'] }
##
# @MigrationCapabilityStatus:
#
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 04/12] migration: Create x-multifd-threads parameter
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (2 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 03/12] migration: Add multifd capability Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 05/12] migration: Create x-multifd-group parameter Juan Quintela
` (9 subsequent siblings)
13 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
Indicates the number of threads that we would create. By default we
create 2 threads.
Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
--
Catch inconsistent defaults.
Thanks Eric
---
hmp.c | 8 ++++++++
include/migration/migration.h | 2 ++
migration/migration.c | 23 +++++++++++++++++++++++
qapi-schema.json | 13 +++++++++++--
4 files changed, 44 insertions(+), 2 deletions(-)
diff --git a/hmp.c b/hmp.c
index 2bc4f06..71e3d8c 100644
--- a/hmp.c
+++ b/hmp.c
@@ -323,6 +323,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, " %s: %" PRId64,
MigrationParameter_lookup[MIGRATION_PARAMETER_X_CHECKPOINT_DELAY],
params->x_checkpoint_delay);
+ monitor_printf(mon, " %s: %" PRId64,
+ MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_THREADS],
+ params->x_multifd_threads);
monitor_printf(mon, "\n");
}
@@ -1395,6 +1398,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p.has_x_checkpoint_delay = true;
use_int_value = true;
break;
+ case MIGRATION_PARAMETER_X_MULTIFD_THREADS:
+ p.has_x_multifd_threads = true;
+ use_int_value = true;
+ break;
}
if (use_int_value) {
@@ -1412,6 +1419,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p.cpu_throttle_increment = valueint;
p.downtime_limit = valueint;
p.x_checkpoint_delay = valueint;
+ p.x_multifd_threads = valueint;
}
qmp_migrate_set_parameters(&p, &err);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index e8bba55..de987da 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -254,6 +254,8 @@ bool migration_in_postcopy(MigrationState *);
bool migration_in_postcopy_after_devices(MigrationState *);
MigrationState *migrate_get_current(void);
+int migrate_multifd_threads(void);
+
void migrate_compress_threads_create(void);
void migrate_compress_threads_join(void);
void migrate_decompress_threads_create(void);
diff --git a/migration/migration.c b/migration/migration.c
index 37af7a4..141eac6 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -67,6 +67,7 @@
* Note: Please change this default value to 10000 when we support hybrid mode.
*/
#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
+#define DEFAULT_MIGRATE_MULTIFD_THREADS 2
static NotifierList migration_state_notifiers =
NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -101,6 +102,7 @@ MigrationState *migrate_get_current(void)
.max_bandwidth = MAX_THROTTLE,
.downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
.x_checkpoint_delay = DEFAULT_MIGRATE_X_CHECKPOINT_DELAY,
+ .x_multifd_threads = DEFAULT_MIGRATE_MULTIFD_THREADS,
},
};
@@ -591,6 +593,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->downtime_limit = s->parameters.downtime_limit;
params->has_x_checkpoint_delay = true;
params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
+ params->has_x_multifd_threads = true;
+ params->x_multifd_threads = s->parameters.x_multifd_threads;
return params;
}
@@ -854,6 +858,13 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
"x_checkpoint_delay",
"is invalid, it should be positive");
}
+ if (params->has_x_multifd_threads &&
+ (params->x_multifd_threads < 1 || params->x_multifd_threads > 255)) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "multifd_threads",
+ "is invalid, it should be in the range of 1 to 255");
+ return;
+ }
if (params->has_compress_level) {
s->parameters.compress_level = params->compress_level;
@@ -892,6 +903,9 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
if (params->has_x_checkpoint_delay) {
s->parameters.x_checkpoint_delay = params->x_checkpoint_delay;
}
+ if (params->has_x_multifd_threads) {
+ s->parameters.x_multifd_threads = params->x_multifd_threads;
+ }
}
@@ -1378,6 +1392,15 @@ bool migrate_use_multifd(void)
return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
}
+int migrate_multifd_threads(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.x_multifd_threads;
+}
+
int migrate_use_xbzrle(void)
{
MigrationState *s;
diff --git a/qapi-schema.json b/qapi-schema.json
index ff7579e..5c18d54 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -981,13 +981,17 @@
# @x-checkpoint-delay: The delay time (in ms) between two COLO checkpoints in
# periodic mode. (Since 2.8)
#
+# @x-multifd-threads: Number of threads used to migrate data in parallel
+# The default value is 2 (since 2.9)
+#
# Since: 2.4
##
{ 'enum': 'MigrationParameter',
'data': ['compress-level', 'compress-threads', 'decompress-threads',
'cpu-throttle-initial', 'cpu-throttle-increment',
'tls-creds', 'tls-hostname', 'max-bandwidth',
- 'downtime-limit', 'x-checkpoint-delay' ] }
+ 'downtime-limit', 'x-checkpoint-delay',
+ 'x-multifd-threads'] }
##
# @migrate-set-parameters:
@@ -1050,6 +1054,10 @@
#
# @x-checkpoint-delay: the delay time between two COLO checkpoints. (Since 2.8)
#
+#
+# @x-multifd-threads: Number of threads used to migrate data in parallel
+# The default value is 2 (since 2.9)
+#
# Since: 2.4
##
{ 'struct': 'MigrationParameters',
@@ -1062,7 +1070,8 @@
'*tls-hostname': 'str',
'*max-bandwidth': 'int',
'*downtime-limit': 'int',
- '*x-checkpoint-delay': 'int'} }
+ '*x-checkpoint-delay': 'int',
+ '*x-multifd-threads': 'int'} }
##
# @query-migrate-parameters:
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 05/12] migration: Create x-multifd-group parameter
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (3 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 04/12] migration: Create x-multifd-threads parameter Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 06/12] migration: Create multifd migration threads Juan Quintela
` (8 subsequent siblings)
13 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
Indicates how many pages we are going to send in each batch to a multifd
thread.
Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
--
Be consistent with defaults and documentation
---
hmp.c | 8 ++++++++
include/migration/migration.h | 1 +
migration/migration.c | 23 +++++++++++++++++++++++
qapi-schema.json | 11 +++++++++--
4 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/hmp.c b/hmp.c
index 71e3d8c..fc24dad 100644
--- a/hmp.c
+++ b/hmp.c
@@ -326,6 +326,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, " %s: %" PRId64,
MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_THREADS],
params->x_multifd_threads);
+ monitor_printf(mon, " %s: %" PRId64,
+ MigrationParameter_lookup[MIGRATION_PARAMETER_X_MULTIFD_GROUP],
+ params->x_multifd_group);
monitor_printf(mon, "\n");
}
@@ -1402,6 +1405,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p.has_x_multifd_threads = true;
use_int_value = true;
break;
+ case MIGRATION_PARAMETER_X_MULTIFD_GROUP:
+ p.has_x_multifd_group = true;
+ use_int_value = true;
+ break;
}
if (use_int_value) {
@@ -1420,6 +1427,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p.downtime_limit = valueint;
p.x_checkpoint_delay = valueint;
p.x_multifd_threads = valueint;
+ p.x_multifd_group = valueint;
}
qmp_migrate_set_parameters(&p, &err);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index de987da..3c7f165 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -255,6 +255,7 @@ bool migration_in_postcopy_after_devices(MigrationState *);
MigrationState *migrate_get_current(void);
int migrate_multifd_threads(void);
+int migrate_multifd_group(void);
void migrate_compress_threads_create(void);
void migrate_compress_threads_join(void);
diff --git a/migration/migration.c b/migration/migration.c
index 141eac6..2b2d0a8 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -68,6 +68,7 @@
*/
#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
#define DEFAULT_MIGRATE_MULTIFD_THREADS 2
+#define DEFAULT_MIGRATE_MULTIFD_GROUP 16
static NotifierList migration_state_notifiers =
NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -103,6 +104,7 @@ MigrationState *migrate_get_current(void)
.downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
.x_checkpoint_delay = DEFAULT_MIGRATE_X_CHECKPOINT_DELAY,
.x_multifd_threads = DEFAULT_MIGRATE_MULTIFD_THREADS,
+ .x_multifd_group = DEFAULT_MIGRATE_MULTIFD_GROUP,
},
};
@@ -595,6 +597,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
params->has_x_multifd_threads = true;
params->x_multifd_threads = s->parameters.x_multifd_threads;
+ params->has_x_multifd_group = true;
+ params->x_multifd_group = s->parameters.x_multifd_group;
return params;
}
@@ -865,6 +869,13 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
"is invalid, it should be in the range of 1 to 255");
return;
}
+ if (params->has_x_multifd_group &&
+ (params->x_multifd_group < 1 || params->x_multifd_group > 10000)) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "multifd_group",
+ "is invalid, it should be in the range of 1 to 10000");
+ return;
+ }
if (params->has_compress_level) {
s->parameters.compress_level = params->compress_level;
@@ -906,6 +917,9 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
if (params->has_x_multifd_threads) {
s->parameters.x_multifd_threads = params->x_multifd_threads;
}
+ if (params->has_x_multifd_group) {
+ s->parameters.x_multifd_group = params->x_multifd_group;
+ }
}
@@ -1401,6 +1415,15 @@ int migrate_multifd_threads(void)
return s->parameters.x_multifd_threads;
}
+int migrate_multifd_group(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.x_multifd_group;
+}
+
int migrate_use_xbzrle(void)
{
MigrationState *s;
diff --git a/qapi-schema.json b/qapi-schema.json
index 5c18d54..4fdea1a 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -984,6 +984,9 @@
# @x-multifd-threads: Number of threads used to migrate data in parallel
# The default value is 2 (since 2.9)
#
+# @x-multifd-group: Number of pages sent together to a thread
+# The default value is 16 (since 2.9)
+#
# Since: 2.4
##
{ 'enum': 'MigrationParameter',
@@ -991,7 +994,7 @@
'cpu-throttle-initial', 'cpu-throttle-increment',
'tls-creds', 'tls-hostname', 'max-bandwidth',
'downtime-limit', 'x-checkpoint-delay',
- 'x-multifd-threads'] }
+ 'x-multifd-threads', 'x-multifd-group'] }
##
# @migrate-set-parameters:
@@ -1058,6 +1061,9 @@
# @x-multifd-threads: Number of threads used to migrate data in parallel
# The default value is 2 (since 2.9)
#
+# @x-multifd-group: Number of pages sent together in a bunch
+# The default value is 16 (since 2.9)
+#
# Since: 2.4
##
{ 'struct': 'MigrationParameters',
@@ -1071,7 +1077,8 @@
'*max-bandwidth': 'int',
'*downtime-limit': 'int',
'*x-checkpoint-delay': 'int',
- '*x-multifd-threads': 'int'} }
+ '*x-multifd-threads': 'int',
+ '*x-multifd-group': 'int'} }
##
# @query-migrate-parameters:
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 06/12] migration: Create multifd migration threads
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (4 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 05/12] migration: Create x-multifd-group parameter Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-14 13:02 ` Paolo Bonzini
2017-02-13 17:19 ` [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work Juan Quintela
` (7 subsequent siblings)
13 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
Creation of the threads, nothing inside yet.
Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
--
Use pointers instead of long array names
Move to use semaphores instead of conditions as paolo suggestion
---
include/migration/migration.h | 4 +
migration/migration.c | 6 ++
migration/ram.c | 168 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 178 insertions(+)
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3c7f165..13fac75 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -256,6 +256,10 @@ MigrationState *migrate_get_current(void);
int migrate_multifd_threads(void);
int migrate_multifd_group(void);
+void migrate_multifd_send_threads_create(void);
+void migrate_multifd_send_threads_join(void);
+void migrate_multifd_recv_threads_create(void);
+void migrate_multifd_recv_threads_join(void);
void migrate_compress_threads_create(void);
void migrate_compress_threads_join(void);
diff --git a/migration/migration.c b/migration/migration.c
index 2b2d0a8..d2705d7 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -344,6 +344,7 @@ static void process_incoming_migration_bh(void *opaque)
MIGRATION_STATUS_FAILED);
error_report_err(local_err);
migrate_decompress_threads_join();
+ migrate_multifd_recv_threads_join();
exit(EXIT_FAILURE);
}
@@ -368,6 +369,7 @@ static void process_incoming_migration_bh(void *opaque)
runstate_set(global_state_get_runstate());
}
migrate_decompress_threads_join();
+ migrate_multifd_recv_threads_join();
/*
* This must happen after any state changes since as soon as an external
* observer sees this event they might start to prod at the VM assuming
@@ -433,6 +435,7 @@ static void process_incoming_migration_co(void *opaque)
MIGRATION_STATUS_FAILED);
error_report("load of migration failed: %s", strerror(-ret));
migrate_decompress_threads_join();
+ migrate_multifd_recv_threads_join();
exit(EXIT_FAILURE);
}
@@ -445,6 +448,7 @@ void migration_fd_process_incoming(QEMUFile *f)
Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);
migrate_decompress_threads_create();
+ migrate_multifd_recv_threads_create();
qemu_file_set_blocking(f, false);
qemu_coroutine_enter(co);
}
@@ -974,6 +978,7 @@ static void migrate_fd_cleanup(void *opaque)
qemu_mutex_lock_iothread();
migrate_compress_threads_join();
+ migrate_multifd_send_threads_join();
qemu_fclose(s->to_dst_file);
s->to_dst_file = NULL;
}
@@ -2100,6 +2105,7 @@ void migrate_fd_connect(MigrationState *s)
}
migrate_compress_threads_create();
+ migrate_multifd_send_threads_create();
qemu_thread_create(&s->thread, "live_migration", migration_thread, s,
QEMU_THREAD_JOINABLE);
s->migration_thread_running = true;
diff --git a/migration/ram.c b/migration/ram.c
index 4422ee8..0cb19cf 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -382,6 +382,174 @@ void migrate_compress_threads_create(void)
}
}
+/* Multiple fd's */
+
+struct MultiFDSendParams {
+ QemuThread thread;
+ QemuSemaphore sem;
+ QemuMutex mutex;
+ bool quit;
+};
+typedef struct MultiFDSendParams MultiFDSendParams;
+
+static MultiFDSendParams *multifd_send;
+
+static void *multifd_send_thread(void *opaque)
+{
+ MultiFDSendParams *params = opaque;
+
+ while (true) {
+ qemu_mutex_lock(¶ms->mutex);
+ if (params->quit) {
+ qemu_mutex_unlock(¶ms->mutex);
+ break;
+ }
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_sem_wait(¶ms->sem);
+ }
+
+ return NULL;
+}
+
+static void terminate_multifd_send_threads(void)
+{
+ int i, thread_count;
+
+ thread_count = migrate_multifd_threads();
+ for (i = 0; i < thread_count; i++) {
+ MultiFDSendParams *p = &multifd_send[i];
+
+ qemu_mutex_lock(&p->mutex);
+ p->quit = true;
+ qemu_sem_post(&p->sem);
+ qemu_mutex_unlock(&p->mutex);
+ }
+}
+
+void migrate_multifd_send_threads_join(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_multifd()) {
+ return;
+ }
+ terminate_multifd_send_threads();
+ thread_count = migrate_multifd_threads();
+ for (i = 0; i < thread_count; i++) {
+ MultiFDSendParams *p = &multifd_send[i];
+
+ qemu_thread_join(&p->thread);
+ qemu_mutex_destroy(&p->mutex);
+ qemu_sem_destroy(&p->sem);
+ }
+ g_free(multifd_send);
+ multifd_send = NULL;
+}
+
+void migrate_multifd_send_threads_create(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_multifd()) {
+ return;
+ }
+ thread_count = migrate_multifd_threads();
+ multifd_send = g_new0(MultiFDSendParams, thread_count);
+ for (i = 0; i < thread_count; i++) {
+ char thread_name[15];
+ MultiFDSendParams *p = &multifd_send[i];
+
+ qemu_mutex_init(&p->mutex);
+ qemu_sem_init(&p->sem, 0);
+ p->quit = false;
+ snprintf(thread_name, 15, "multifd_send_%d", i);
+ qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
+ QEMU_THREAD_JOINABLE);
+ }
+}
+
+struct MultiFDRecvParams {
+ QemuThread thread;
+ QemuSemaphore sem;
+ QemuMutex mutex;
+ bool quit;
+};
+typedef struct MultiFDRecvParams MultiFDRecvParams;
+
+static MultiFDRecvParams *multifd_recv;
+
+static void *multifd_recv_thread(void *opaque)
+{
+ MultiFDRecvParams *params = opaque;
+
+ while (true) {
+ qemu_mutex_lock(¶ms->mutex);
+ if (params->quit) {
+ qemu_mutex_unlock(¶ms->mutex);
+ break;
+ }
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_sem_wait(¶ms->sem);
+ }
+
+ return NULL;
+}
+
+static void terminate_multifd_recv_threads(void)
+{
+ int i, thread_count;
+
+ thread_count = migrate_multifd_threads();
+ for (i = 0; i < thread_count; i++) {
+ MultiFDRecvParams *p = &multifd_recv[i];
+
+ qemu_mutex_lock(&p->mutex);
+ p->quit = true;
+ qemu_sem_post(&p->sem);
+ qemu_mutex_unlock(&p->mutex);
+ }
+}
+
+void migrate_multifd_recv_threads_join(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_multifd()) {
+ return;
+ }
+ terminate_multifd_recv_threads();
+ thread_count = migrate_multifd_threads();
+ for (i = 0; i < thread_count; i++) {
+ MultiFDRecvParams *p = &multifd_recv[i];
+
+ qemu_thread_join(&p->thread);
+ qemu_mutex_destroy(&p->mutex);
+ qemu_sem_destroy(&p->sem);
+ }
+ g_free(multifd_recv);
+ multifd_recv = NULL;
+}
+
+void migrate_multifd_recv_threads_create(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_multifd()) {
+ return;
+ }
+ thread_count = migrate_multifd_threads();
+ multifd_recv = g_new0(MultiFDRecvParams, thread_count);
+ for (i = 0; i < thread_count; i++) {
+ MultiFDRecvParams *p = &multifd_recv[i];
+
+ qemu_mutex_init(&p->mutex);
+ qemu_sem_init(&p->sem, 0);
+ p->quit = false;
+ qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
+ QEMU_THREAD_JOINABLE);
+ }
+}
+
/**
* save_page_header: Write page header to wire
*
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (5 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 06/12] migration: Create multifd migration threads Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-14 11:17 ` Daniel P. Berrange
2017-02-14 12:57 ` Paolo Bonzini
2017-02-13 17:19 ` [Qemu-devel] [PULL 08/12] migration: Create ram_multifd_page Juan Quintela
` (6 subsequent siblings)
13 siblings, 2 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
We create new channels for each new thread created. We only send through
them a character to be sure that we are creating the channels in the
right order.
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
include/migration/migration.h | 7 +++++
migration/ram.c | 33 ++++++++++++++++++++++
migration/socket.c | 64 +++++++++++++++++++++++++++++++++++++++++--
3 files changed, 101 insertions(+), 3 deletions(-)
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 13fac75..ff890b5 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -22,6 +22,7 @@
#include "qapi-types.h"
#include "exec/cpu-common.h"
#include "qemu/coroutine_int.h"
+#include "io/channel.h"
#define QEMU_VM_FILE_MAGIC 0x5145564d
#define QEMU_VM_FILE_VERSION_COMPAT 0x00000002
@@ -224,6 +225,12 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);
void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+int socket_recv_channel_close_listening(void);
+QIOChannel *socket_send_channel_create(void);
+int socket_send_channel_destroy(QIOChannel *send);
+
void unix_start_incoming_migration(const char *path, Error **errp);
void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
diff --git a/migration/ram.c b/migration/ram.c
index 0cb19cf..b101a59 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -386,7 +386,9 @@ void migrate_compress_threads_create(void)
struct MultiFDSendParams {
QemuThread thread;
+ QIOChannel *c;
QemuSemaphore sem;
+ QemuSemaphore init;
QemuMutex mutex;
bool quit;
};
@@ -397,6 +399,10 @@ static MultiFDSendParams *multifd_send;
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *params = opaque;
+ char start = 's';
+
+ qio_channel_write(params->c, &start, 1, &error_abort);
+ qemu_sem_post(¶ms->init);
while (true) {
qemu_mutex_lock(¶ms->mutex);
@@ -441,7 +447,10 @@ void migrate_multifd_send_threads_join(void)
qemu_thread_join(&p->thread);
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
+ qemu_sem_destroy(&p->init);
+ socket_send_channel_destroy(p->c);
}
+
g_free(multifd_send);
multifd_send = NULL;
}
@@ -461,15 +470,24 @@ void migrate_multifd_send_threads_create(void)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
+ qemu_sem_init(&p->init, 0);
p->quit = false;
+ p->c = socket_send_channel_create();
+ if (!p->c) {
+ error_report("Error creating a send channel");
+ exit(0);
+ }
snprintf(thread_name, 15, "multifd_send_%d", i);
qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
QEMU_THREAD_JOINABLE);
+ qemu_sem_wait(&p->init);
}
}
struct MultiFDRecvParams {
QemuThread thread;
+ QIOChannel *c;
+ QemuSemaphore init;
QemuSemaphore sem;
QemuMutex mutex;
bool quit;
@@ -481,6 +499,10 @@ static MultiFDRecvParams *multifd_recv;
static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *params = opaque;
+ char start;
+
+ qio_channel_read(params->c, &start, 1, &error_abort);
+ qemu_sem_post(¶ms->init);
while (true) {
qemu_mutex_lock(¶ms->mutex);
@@ -525,6 +547,8 @@ void migrate_multifd_recv_threads_join(void)
qemu_thread_join(&p->thread);
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
+ qemu_sem_destroy(&p->init);
+ socket_send_channel_destroy(multifd_recv[i].c);
}
g_free(multifd_recv);
multifd_recv = NULL;
@@ -544,10 +568,19 @@ void migrate_multifd_recv_threads_create(void)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
+ qemu_sem_init(&p->init, 0);
p->quit = false;
+ p->c = socket_recv_channel_create();
+
+ if (!p->c) {
+ error_report("Error creating a recv channel");
+ exit(0);
+ }
qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
QEMU_THREAD_JOINABLE);
+ qemu_sem_wait(&p->init);
}
+ socket_recv_channel_close_listening();
}
/**
diff --git a/migration/socket.c b/migration/socket.c
index 13966f1..1c764f1 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -24,6 +24,62 @@
#include "io/channel-socket.h"
#include "trace.h"
+struct SocketArgs {
+ QIOChannelSocket *ioc;
+ SocketAddress *saddr;
+ Error **errp;
+} socket_args;
+
+QIOChannel *socket_recv_channel_create(void)
+{
+ QIOChannelSocket *sioc;
+ Error *err = NULL;
+
+ sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
+ &err);
+ if (!sioc) {
+ error_report("could not accept migration connection (%s)",
+ error_get_pretty(err));
+ return NULL;
+ }
+ return QIO_CHANNEL(sioc);
+}
+
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+ /* Remove channel */
+ object_unref(OBJECT(send));
+ return 0;
+}
+
+/* we have created all the recv channels, we can close the main one */
+int socket_recv_channel_close_listening(void)
+{
+ /* Close listening socket as its no longer needed */
+ qio_channel_close(QIO_CHANNEL(socket_args.ioc), NULL);
+ return 0;
+}
+
+QIOChannel *socket_send_channel_create(void)
+{
+ QIOChannelSocket *sioc = qio_channel_socket_new();
+
+ qio_channel_socket_connect_sync(sioc, socket_args.saddr,
+ socket_args.errp);
+ qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+ return QIO_CHANNEL(sioc);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+ /* Remove channel */
+ object_unref(OBJECT(send));
+ if (socket_args.saddr) {
+ qapi_free_SocketAddress(socket_args.saddr);
+ socket_args.saddr = NULL;
+ }
+ return 0;
+}
static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
{
@@ -97,6 +153,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
data->s = s;
+
+ socket_args.saddr = saddr;
+ socket_args.errp = errp;
+
if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
data->hostname = g_strdup(saddr->u.inet.data->host);
}
@@ -107,7 +167,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
socket_outgoing_migration,
data,
socket_connect_data_free);
- qapi_free_SocketAddress(saddr);
}
void tcp_start_outgoing_migration(MigrationState *s,
@@ -154,8 +213,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
object_unref(OBJECT(sioc));
out:
- /* Close listening socket as its no longer needed */
- qio_channel_close(ioc, NULL);
return FALSE; /* unregister */
}
@@ -164,6 +221,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
Error **errp)
{
QIOChannelSocket *listen_ioc = qio_channel_socket_new();
+ socket_args.ioc = listen_ioc;
qio_channel_set_name(QIO_CHANNEL(listen_ioc),
"migration-socket-listener");
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 08/12] migration: Create ram_multifd_page
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (6 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 09/12] migration: Create thread infrastructure for multifd send side Juan Quintela
` (5 subsequent siblings)
13 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone. We have added a new
counter and a new flag for this type of pages.
Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
hmp.c | 2 ++
include/migration/migration.h | 1 +
migration/migration.c | 1 +
migration/ram.c | 51 ++++++++++++++++++++++++++++++++++++++++++-
qapi-schema.json | 4 +++-
5 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/hmp.c b/hmp.c
index fc24dad..eec0ffd 100644
--- a/hmp.c
+++ b/hmp.c
@@ -223,6 +223,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
monitor_printf(mon, "postcopy request count: %" PRIu64 "\n",
info->ram->postcopy_requests);
}
+ monitor_printf(mon, "multifd: %" PRIu64 " pages\n",
+ info->ram->multifd);
}
if (info->has_disk) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index ff890b5..cad03ab 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -290,6 +290,7 @@ uint64_t xbzrle_mig_pages_transferred(void);
uint64_t xbzrle_mig_pages_overflow(void);
uint64_t xbzrle_mig_pages_cache_miss(void);
double xbzrle_mig_cache_miss_rate(void);
+uint64_t multifd_mig_pages_transferred(void);
void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
void ram_debug_dump_bitmap(unsigned long *todump, bool expected);
diff --git a/migration/migration.c b/migration/migration.c
index d2705d7..2e3b357 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -652,6 +652,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
info->ram->mbps = s->mbps;
info->ram->dirty_sync_count = s->dirty_sync_count;
info->ram->postcopy_requests = s->postcopy_requests;
+ info->ram->multifd = multifd_mig_pages_transferred();
if (s->state != MIGRATION_STATUS_COMPLETED) {
info->ram->remaining = ram_bytes_remaining();
diff --git a/migration/ram.c b/migration/ram.c
index b101a59..45c46cb 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -61,6 +61,7 @@ static uint64_t bitmap_sync_count;
#define RAM_SAVE_FLAG_XBZRLE 0x40
/* 0x80 is reserved in migration.h start with 0x100 next */
#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100
+#define RAM_SAVE_FLAG_MULTIFD_PAGE 0x200
static uint8_t *ZERO_TARGET_PAGE;
@@ -141,6 +142,7 @@ typedef struct AccountingInfo {
uint64_t dup_pages;
uint64_t skipped_pages;
uint64_t norm_pages;
+ uint64_t multifd_pages;
uint64_t iterations;
uint64_t xbzrle_bytes;
uint64_t xbzrle_pages;
@@ -211,6 +213,11 @@ uint64_t xbzrle_mig_pages_overflow(void)
return acct_info.xbzrle_overflows;
}
+uint64_t multifd_mig_pages_transferred(void)
+{
+ return acct_info.multifd_pages;
+}
+
/* This is the last block that we have visited serching for dirty pages
*/
static RAMBlock *last_seen_block;
@@ -998,6 +1005,33 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss,
return pages;
}
+static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
+ bool last_stage, uint64_t *bytes_transferred)
+{
+ int pages;
+ uint8_t *p;
+ RAMBlock *block = pss->block;
+ ram_addr_t offset = pss->offset;
+
+ p = block->host + offset;
+
+ if (block == last_sent_block) {
+ offset |= RAM_SAVE_FLAG_CONTINUE;
+ }
+ pages = save_zero_page(f, block, offset, p, bytes_transferred);
+ if (pages == -1) {
+ *bytes_transferred +=
+ save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+ qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+ *bytes_transferred += TARGET_PAGE_SIZE;
+ pages = 1;
+ acct_info.norm_pages++;
+ acct_info.multifd_pages++;
+ }
+
+ return pages;
+}
+
static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
ram_addr_t offset)
{
@@ -1435,6 +1469,8 @@ static int ram_save_target_page(MigrationState *ms, QEMUFile *f,
res = ram_save_compressed_page(f, pss,
last_stage,
bytes_transferred);
+ } else if (migrate_use_multifd()) {
+ res = ram_multifd_page(f, pss, last_stage, bytes_transferred);
} else {
res = ram_save_page(f, pss, last_stage,
bytes_transferred);
@@ -2682,6 +2718,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
if (!migrate_use_compression()) {
invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
}
+
+ if (!migrate_use_multifd()) {
+ invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
+ }
/* This RCU critical section can be very long running.
* When RCU reclaims in the code start to become numerous,
* it will be necessary to reduce the granularity of this
@@ -2706,13 +2746,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
error_report("Received an unexpected compressed page");
}
+ if (flags & invalid_flags & RAM_SAVE_FLAG_MULTIFD_PAGE) {
+ error_report("Received an unexpected multifd page");
+ }
ret = -EINVAL;
break;
}
if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
- RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
+ RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
+ RAM_SAVE_FLAG_MULTIFD_PAGE)) {
RAMBlock *block = ram_block_from_stream(f, flags);
host = host_from_ram_block_offset(block, addr);
@@ -2787,6 +2831,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
break;
}
break;
+
+ case RAM_SAVE_FLAG_MULTIFD_PAGE:
+ qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
+ break;
+
case RAM_SAVE_FLAG_EOS:
/* normal exit */
break;
diff --git a/qapi-schema.json b/qapi-schema.json
index 4fdea1a..03a0383 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -574,6 +574,7 @@
#
# @postcopy-requests: The number of page requests received from the destination
# (since 2.7)
+# @multifd: number of pages sent with multifd (since 2.9)
#
# Since: 0.14.0
##
@@ -582,7 +583,8 @@
'duplicate': 'int', 'skipped': 'int', 'normal': 'int',
'normal-bytes': 'int', 'dirty-pages-rate' : 'int',
'mbps' : 'number', 'dirty-sync-count' : 'int',
- 'postcopy-requests' : 'int' } }
+ 'postcopy-requests' : 'int',
+ 'multifd' : 'int'} }
##
# @XBZRLECacheStats:
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 09/12] migration: Create thread infrastructure for multifd send side
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (7 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 08/12] migration: Create ram_multifd_page Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 10/12] migration: Really use multiple pages at a time Juan Quintela
` (4 subsequent siblings)
13 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
We make the locking and the transfer of information specific, even if we
are still transmiting things through the main thread.
Signed-off-by: Juan Quintela <quintela@redhat.com>
--
Move synchronization to use semaphores, as paolo suggestion.
---
migration/ram.c | 45 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 45 insertions(+)
diff --git a/migration/ram.c b/migration/ram.c
index 45c46cb..f7df6cb 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
/* Multiple fd's */
struct MultiFDSendParams {
+ /* not changed */
QemuThread thread;
QIOChannel *c;
QemuSemaphore sem;
QemuSemaphore init;
QemuMutex mutex;
+ /* protected by param mutex */
bool quit;
+ uint8_t *address;
+ /* protected by multifd mutex */
+ bool done;
};
typedef struct MultiFDSendParams MultiFDSendParams;
static MultiFDSendParams *multifd_send;
+QemuMutex multifd_send_mutex;
+QemuSemaphore multifd_send_sem;
+
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *params = opaque;
@@ -410,6 +418,7 @@ static void *multifd_send_thread(void *opaque)
qio_channel_write(params->c, &start, 1, &error_abort);
qemu_sem_post(¶ms->init);
+ qemu_sem_post(&multifd_send_sem);
while (true) {
qemu_mutex_lock(¶ms->mutex);
@@ -417,6 +426,15 @@ static void *multifd_send_thread(void *opaque)
qemu_mutex_unlock(¶ms->mutex);
break;
}
+ if (params->address) {
+ params->address = 0;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_mutex_lock(&multifd_send_mutex);
+ params->done = true;
+ qemu_mutex_unlock(&multifd_send_mutex);
+ qemu_sem_post(&multifd_send_sem);
+ continue;
+ }
qemu_mutex_unlock(¶ms->mutex);
qemu_sem_wait(¶ms->sem);
}
@@ -471,6 +489,8 @@ void migrate_multifd_send_threads_create(void)
}
thread_count = migrate_multifd_threads();
multifd_send = g_new0(MultiFDSendParams, thread_count);
+ qemu_mutex_init(&multifd_send_mutex);
+ qemu_sem_init(&multifd_send_sem, 0);
for (i = 0; i < thread_count; i++) {
char thread_name[15];
MultiFDSendParams *p = &multifd_send[i];
@@ -479,6 +499,8 @@ void migrate_multifd_send_threads_create(void)
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->init, 0);
p->quit = false;
+ p->done = true;
+ p->address = 0;
p->c = socket_send_channel_create();
if (!p->c) {
error_report("Error creating a send channel");
@@ -491,6 +513,28 @@ void migrate_multifd_send_threads_create(void)
}
}
+static int multifd_send_page(uint8_t *address)
+{
+ int i, thread_count;
+
+ thread_count = migrate_multifd_threads();
+ qemu_sem_wait(&multifd_send_sem);
+ qemu_mutex_lock(&multifd_send_mutex);
+ for (i = 0; i < thread_count; i++) {
+ if (multifd_send[i].done) {
+ multifd_send[i].done = false;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&multifd_send_mutex);
+ qemu_mutex_lock(&multifd_send[i].mutex);
+ multifd_send[i].address = address;
+ qemu_mutex_unlock(&multifd_send[i].mutex);
+ qemu_sem_post(&multifd_send[i].sem);
+
+ return 0;
+}
+
struct MultiFDRecvParams {
QemuThread thread;
QIOChannel *c;
@@ -1023,6 +1067,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
*bytes_transferred +=
save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+ multifd_send_page(p);
*bytes_transferred += TARGET_PAGE_SIZE;
pages = 1;
acct_info.norm_pages++;
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 10/12] migration: Really use multiple pages at a time
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (8 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 09/12] migration: Create thread infrastructure for multifd send side Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 11/12] migration: Send the fd number which we are going to use for this page Juan Quintela
` (3 subsequent siblings)
13 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
We now send several pages at a time each time that we wakeup a thread.
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
migration/ram.c | 44 ++++++++++++++++++++++++++++++++++++++------
1 file changed, 38 insertions(+), 6 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index f7df6cb..8d85c49 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -391,6 +391,13 @@ void migrate_compress_threads_create(void)
/* Multiple fd's */
+
+typedef struct {
+ int num;
+ int size;
+ uint8_t **address;
+} MultiFDPages;
+
struct MultiFDSendParams {
/* not changed */
QemuThread thread;
@@ -400,7 +407,7 @@ struct MultiFDSendParams {
QemuMutex mutex;
/* protected by param mutex */
bool quit;
- uint8_t *address;
+ MultiFDPages pages;
/* protected by multifd mutex */
bool done;
};
@@ -426,8 +433,8 @@ static void *multifd_send_thread(void *opaque)
qemu_mutex_unlock(¶ms->mutex);
break;
}
- if (params->address) {
- params->address = 0;
+ if (params->pages.num) {
+ params->pages.num = 0;
qemu_mutex_unlock(¶ms->mutex);
qemu_mutex_lock(&multifd_send_mutex);
params->done = true;
@@ -480,6 +487,13 @@ void migrate_multifd_send_threads_join(void)
multifd_send = NULL;
}
+static void multifd_init_group(MultiFDPages *pages)
+{
+ pages->num = 0;
+ pages->size = migrate_multifd_group();
+ pages->address = g_new0(uint8_t *, pages->size);
+}
+
void migrate_multifd_send_threads_create(void)
{
int i, thread_count;
@@ -500,7 +514,7 @@ void migrate_multifd_send_threads_create(void)
qemu_sem_init(&p->init, 0);
p->quit = false;
p->done = true;
- p->address = 0;
+ multifd_init_group(&multifd_send[i].pages);
p->c = socket_send_channel_create();
if (!p->c) {
error_report("Error creating a send channel");
@@ -515,7 +529,21 @@ void migrate_multifd_send_threads_create(void)
static int multifd_send_page(uint8_t *address)
{
- int i, thread_count;
+ int i, j, thread_count;
+ static MultiFDPages pages;
+ static bool once;
+
+ if (!once) {
+ multifd_init_group(&pages);
+ once = true;
+ }
+
+ pages.address[pages.num] = address;
+ pages.num++;
+
+ if (pages.num < (pages.size - 1)) {
+ return UINT16_MAX;
+ }
thread_count = migrate_multifd_threads();
qemu_sem_wait(&multifd_send_sem);
@@ -528,7 +556,11 @@ static int multifd_send_page(uint8_t *address)
}
qemu_mutex_unlock(&multifd_send_mutex);
qemu_mutex_lock(&multifd_send[i].mutex);
- multifd_send[i].address = address;
+ multifd_send[i].pages.num = pages.num;
+ for (j = 0; j < pages.size; j++) {
+ multifd_send[i].pages.address[j] = pages.address[j];
+ }
+ pages.num = 0;
qemu_mutex_unlock(&multifd_send[i].mutex);
qemu_sem_post(&multifd_send[i].sem);
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 11/12] migration: Send the fd number which we are going to use for this page
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (9 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 10/12] migration: Really use multiple pages at a time Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-14 13:02 ` Paolo Bonzini
2017-02-13 17:19 ` [Qemu-devel] [PULL 12/12] migration: Test new fd infrastructure Juan Quintela
` (2 subsequent siblings)
13 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
We are still sending the page through the main channel, that would
change later in the series
Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
migration/ram.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++----
1 file changed, 66 insertions(+), 5 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index 8d85c49..38789c8 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -527,7 +527,8 @@ void migrate_multifd_send_threads_create(void)
}
}
-static int multifd_send_page(uint8_t *address)
+
+static uint16_t multifd_send_page(uint8_t *address, bool last_page)
{
int i, j, thread_count;
static MultiFDPages pages;
@@ -541,8 +542,10 @@ static int multifd_send_page(uint8_t *address)
pages.address[pages.num] = address;
pages.num++;
- if (pages.num < (pages.size - 1)) {
- return UINT16_MAX;
+ if (!last_page) {
+ if (pages.num < (pages.size - 1)) {
+ return UINT16_MAX;
+ }
}
thread_count = migrate_multifd_threads();
@@ -564,16 +567,21 @@ static int multifd_send_page(uint8_t *address)
qemu_mutex_unlock(&multifd_send[i].mutex);
qemu_sem_post(&multifd_send[i].sem);
- return 0;
+ return i;
}
struct MultiFDRecvParams {
+ /* not changed */
QemuThread thread;
QIOChannel *c;
QemuSemaphore init;
+ QemuSemaphore ready;
QemuSemaphore sem;
QemuMutex mutex;
+ /* proteced by param mutex */
bool quit;
+ MultiFDPages pages;
+ bool done;
};
typedef struct MultiFDRecvParams MultiFDRecvParams;
@@ -586,6 +594,7 @@ static void *multifd_recv_thread(void *opaque)
qio_channel_read(params->c, &start, 1, &error_abort);
qemu_sem_post(¶ms->init);
+ qemu_sem_post(¶ms->ready);
while (true) {
qemu_mutex_lock(¶ms->mutex);
@@ -593,6 +602,13 @@ static void *multifd_recv_thread(void *opaque)
qemu_mutex_unlock(¶ms->mutex);
break;
}
+ if (params->pages.num) {
+ params->pages.num = 0;
+ params->done = true;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_sem_post(¶ms->ready);
+ continue;
+ }
qemu_mutex_unlock(¶ms->mutex);
qemu_sem_wait(¶ms->sem);
}
@@ -652,7 +668,10 @@ void migrate_multifd_recv_threads_create(void)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->init, 0);
+ qemu_sem_init(&p->ready, 0);
p->quit = false;
+ p->done = false;
+ multifd_init_group(&p->pages);
p->c = socket_recv_channel_create();
if (!p->c) {
@@ -666,6 +685,42 @@ void migrate_multifd_recv_threads_create(void)
socket_recv_channel_close_listening();
}
+static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
+{
+ int i, thread_count;
+ MultiFDRecvParams *params;
+ static MultiFDPages pages;
+ static bool once;
+
+ if (!once) {
+ multifd_init_group(&pages);
+ once = true;
+ }
+
+ pages.address[pages.num] = address;
+ pages.num++;
+
+ if (fd_num == UINT16_MAX) {
+ return;
+ }
+
+ thread_count = migrate_multifd_threads();
+ assert(fd_num < thread_count);
+ params = &multifd_recv[fd_num];
+
+ qemu_sem_wait(¶ms->ready);
+
+ qemu_mutex_lock(¶ms->mutex);
+ params->done = false;
+ for (i = 0; i < pages.num; i++) {
+ params->pages.address[i] = pages.address[i];
+ }
+ params->pages.num = pages.num;
+ pages.num = 0;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_sem_post(¶ms->sem);
+}
+
/**
* save_page_header: Write page header to wire
*
@@ -1085,6 +1140,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
bool last_stage, uint64_t *bytes_transferred)
{
int pages;
+ uint16_t fd_num;
uint8_t *p;
RAMBlock *block = pss->block;
ram_addr_t offset = pss->offset;
@@ -1098,8 +1154,10 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
if (pages == -1) {
*bytes_transferred +=
save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+ fd_num = multifd_send_page(p, migration_dirty_pages == 1);
+ qemu_put_be16(f, fd_num);
+ *bytes_transferred += 2; /* size of fd_num */
qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
- multifd_send_page(p);
*bytes_transferred += TARGET_PAGE_SIZE;
pages = 1;
acct_info.norm_pages++;
@@ -2813,6 +2871,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
ram_addr_t addr, total_ram_bytes;
void *host = NULL;
+ uint16_t fd_num;
uint8_t ch;
addr = qemu_get_be64(f);
@@ -2910,6 +2969,8 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
break;
case RAM_SAVE_FLAG_MULTIFD_PAGE:
+ fd_num = qemu_get_be16(f);
+ multifd_recv_page(host, fd_num);
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [Qemu-devel] [PULL 12/12] migration: Test new fd infrastructure
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (10 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 11/12] migration: Send the fd number which we are going to use for this page Juan Quintela
@ 2017-02-13 17:19 ` Juan Quintela
2017-02-14 9:55 ` [Qemu-devel] [PATCH 00/12] Multifd v4 Peter Maydell
2017-02-14 13:03 ` Paolo Bonzini
13 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-13 17:19 UTC (permalink / raw)
To: qemu-devel; +Cc: amit.shah, dgilbert
We just send the address through the alternate channels and test that it
is ok.
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
include/migration/migration.h | 1 +
migration/migration.c | 15 +++++--
migration/ram.c | 91 ++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 101 insertions(+), 6 deletions(-)
diff --git a/include/migration/migration.h b/include/migration/migration.h
index cad03ab..5ec5c62 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -267,6 +267,7 @@ void migrate_multifd_send_threads_create(void);
void migrate_multifd_send_threads_join(void);
void migrate_multifd_recv_threads_create(void);
void migrate_multifd_recv_threads_join(void);
+void qemu_savevm_send_multifd_flush(QEMUFile *f);
void migrate_compress_threads_create(void);
void migrate_compress_threads_join(void);
diff --git a/migration/migration.c b/migration/migration.c
index 2e3b357..10ed934 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1919,7 +1919,8 @@ static void *migration_thread(void *opaque)
/* Used by the bandwidth calcs, updated later */
int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
- int64_t initial_bytes = 0;
+ int64_t qemu_file_bytes = 0;
+ int64_t multifd_pages = 0;
int64_t max_size = 0;
int64_t start_time = initial_time;
int64_t end_time;
@@ -2003,9 +2004,14 @@ static void *migration_thread(void *opaque)
}
current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
if (current_time >= initial_time + BUFFER_DELAY) {
- uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
- initial_bytes;
uint64_t time_spent = current_time - initial_time;
+ uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+ uint64_t multifd_pages_now = multifd_mig_pages_transferred();
+ /* Hack ahead. Why the hell we don't have a function to now the
+ target_page_size. Hard coding it to 4096 */
+ uint64_t transferred_bytes =
+ (qemu_file_bytes_now - qemu_file_bytes) +
+ (multifd_pages_now - multifd_pages) * 4096;
double bandwidth = (double)transferred_bytes / time_spent;
max_size = bandwidth * s->parameters.downtime_limit;
@@ -2022,7 +2028,8 @@ static void *migration_thread(void *opaque)
qemu_file_reset_rate_limit(s->to_dst_file);
initial_time = current_time;
- initial_bytes = qemu_ftell(s->to_dst_file);
+ qemu_file_bytes = qemu_file_bytes_now;
+ multifd_pages = multifd_pages_now;
}
if (qemu_file_rate_limit(s->to_dst_file)) {
/* usleep expects microseconds */
diff --git a/migration/ram.c b/migration/ram.c
index 38789c8..6167a27 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -63,6 +63,13 @@ static uint64_t bitmap_sync_count;
#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100
#define RAM_SAVE_FLAG_MULTIFD_PAGE 0x200
+/* We are getting low on pages flags, so we start using combinations
+ When we need to flush a page, we sent it as
+ RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
+ We don't allow that combination
+*/
+
+
static uint8_t *ZERO_TARGET_PAGE;
static inline bool is_zero_range(uint8_t *p, uint64_t size)
@@ -391,6 +398,9 @@ void migrate_compress_threads_create(void)
/* Multiple fd's */
+/* Indicates if we have synced the bitmap and we need to assure that
+ target has processeed all previous pages */
+bool multifd_needs_flush;
typedef struct {
int num;
@@ -434,8 +444,22 @@ static void *multifd_send_thread(void *opaque)
break;
}
if (params->pages.num) {
+ int i;
+ int num;
+
+ num = params->pages.num;
params->pages.num = 0;
qemu_mutex_unlock(¶ms->mutex);
+
+ for (i = 0; i < num; i++) {
+ if (qio_channel_write(params->c,
+ (const char *)params->pages.address[i],
+ TARGET_PAGE_SIZE, &error_abort)
+ != TARGET_PAGE_SIZE) {
+ /* Shuoudn't ever happen */
+ exit(-1);
+ }
+ }
qemu_mutex_lock(&multifd_send_mutex);
params->done = true;
qemu_mutex_unlock(&multifd_send_mutex);
@@ -577,9 +601,11 @@ struct MultiFDRecvParams {
QemuSemaphore init;
QemuSemaphore ready;
QemuSemaphore sem;
+ QemuCond cond_sync;
QemuMutex mutex;
/* proteced by param mutex */
bool quit;
+ bool sync;
MultiFDPages pages;
bool done;
};
@@ -603,8 +629,26 @@ static void *multifd_recv_thread(void *opaque)
break;
}
if (params->pages.num) {
+ int i;
+ int num;
+
+ num = params->pages.num;
params->pages.num = 0;
+
+ for (i = 0; i < num; i++) {
+ if (qio_channel_read(params->c,
+ (char *)params->pages.address[i],
+ TARGET_PAGE_SIZE, &error_abort)
+ != TARGET_PAGE_SIZE) {
+ /* shouldn't ever happen */
+ exit(-1);
+ }
+ }
params->done = true;
+ if (params->sync) {
+ qemu_cond_signal(¶ms->cond_sync);
+ params->sync = false;
+ }
qemu_mutex_unlock(¶ms->mutex);
qemu_sem_post(¶ms->ready);
continue;
@@ -647,6 +691,7 @@ void migrate_multifd_recv_threads_join(void)
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
qemu_sem_destroy(&p->init);
+ qemu_cond_destroy(&p->cond_sync);
socket_send_channel_destroy(multifd_recv[i].c);
}
g_free(multifd_recv);
@@ -669,8 +714,10 @@ void migrate_multifd_recv_threads_create(void)
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->init, 0);
qemu_sem_init(&p->ready, 0);
+ qemu_cond_init(&p->cond_sync);
p->quit = false;
p->done = false;
+ p->sync = false;
multifd_init_group(&p->pages);
p->c = socket_recv_channel_create();
@@ -721,6 +768,28 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
qemu_sem_post(¶ms->sem);
}
+
+static int multifd_flush(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_multifd()) {
+ return 0;
+ }
+ thread_count = migrate_multifd_threads();
+ for (i = 0; i < thread_count; i++) {
+ MultiFDRecvParams *p = &multifd_recv[i];
+
+ qemu_mutex_lock(&p->mutex);
+ while (!p->done) {
+ p->sync = true;
+ qemu_cond_wait(&p->cond_sync, &p->mutex);
+ }
+ qemu_mutex_unlock(&p->mutex);
+ }
+ return 0;
+}
+
/**
* save_page_header: Write page header to wire
*
@@ -737,6 +806,12 @@ static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
{
size_t size, len;
+ if (multifd_needs_flush &&
+ (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
+ offset |= RAM_SAVE_FLAG_COMPRESS;
+ multifd_needs_flush = false;
+ }
+
qemu_put_be64(f, offset);
size = 8;
@@ -1156,8 +1231,10 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
fd_num = multifd_send_page(p, migration_dirty_pages == 1);
qemu_put_be16(f, fd_num);
+ if (fd_num != UINT16_MAX) {
+ qemu_fflush(f);
+ }
*bytes_transferred += 2; /* size of fd_num */
- qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
*bytes_transferred += TARGET_PAGE_SIZE;
pages = 1;
acct_info.norm_pages++;
@@ -2417,6 +2494,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
if (!migration_in_postcopy(migrate_get_current())) {
migration_bitmap_sync();
+ if (migrate_use_multifd()) {
+ multifd_needs_flush = true;
+ }
}
ram_control_before_iterate(f, RAM_CONTROL_FINISH);
@@ -2458,6 +2538,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
qemu_mutex_lock_iothread();
rcu_read_lock();
migration_bitmap_sync();
+ if (migrate_use_multifd()) {
+ multifd_needs_flush = true;
+ }
rcu_read_unlock();
qemu_mutex_unlock_iothread();
remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
@@ -2890,6 +2973,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
break;
}
+ if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS))
+ == (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS)) {
+ multifd_flush();
+ flags = flags & ~RAM_SAVE_FLAG_COMPRESS;
+ }
if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
RAM_SAVE_FLAG_MULTIFD_PAGE)) {
@@ -2971,7 +3059,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
case RAM_SAVE_FLAG_MULTIFD_PAGE:
fd_num = qemu_get_be16(f);
multifd_recv_page(host, fd_num);
- qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
case RAM_SAVE_FLAG_EOS:
--
2.7.4
^ permalink raw reply related [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PATCH 00/12] Multifd v4
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (11 preceding siblings ...)
2017-02-13 17:19 ` [Qemu-devel] [PULL 12/12] migration: Test new fd infrastructure Juan Quintela
@ 2017-02-14 9:55 ` Peter Maydell
2017-02-14 12:38 ` Juan Quintela
2017-02-14 13:03 ` Paolo Bonzini
13 siblings, 1 reply; 28+ messages in thread
From: Peter Maydell @ 2017-02-14 9:55 UTC (permalink / raw)
To: Juan Quintela; +Cc: QEMU Developers, Amit Shah, Dr. David Alan Gilbert
On 13 February 2017 at 17:19, Juan Quintela <quintela@redhat.com> wrote:
> Hi
>
> [v4]
>
> - Address reviews
> - move synchronization to semaphores (faster). Paolo suggestion
> - improvements overall (see invidiual patches)
> - fix all the checkpatch warnings
> - fix all [HACKS] except for one
> Please review.
Is this a patch series or a pull request? The subject line
says "PATCH" and the body says 'please review' but it's in
the form of a pull request email...
thanks
-- PMM
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work
2017-02-13 17:19 ` [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work Juan Quintela
@ 2017-02-14 11:17 ` Daniel P. Berrange
2017-02-14 12:57 ` Paolo Bonzini
1 sibling, 0 replies; 28+ messages in thread
From: Daniel P. Berrange @ 2017-02-14 11:17 UTC (permalink / raw)
To: Juan Quintela; +Cc: qemu-devel, amit.shah, dgilbert
On Mon, Feb 13, 2017 at 06:19:43PM +0100, Juan Quintela wrote:
> We create new channels for each new thread created. We only send through
> them a character to be sure that we are creating the channels in the
> right order.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
> include/migration/migration.h | 7 +++++
> migration/ram.c | 33 ++++++++++++++++++++++
> migration/socket.c | 64 +++++++++++++++++++++++++++++++++++++++++--
> 3 files changed, 101 insertions(+), 3 deletions(-)
[snip]
> diff --git a/migration/socket.c b/migration/socket.c
> index 13966f1..1c764f1 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -24,6 +24,62 @@
> #include "io/channel-socket.h"
> #include "trace.h"
>
> +struct SocketArgs {
> + QIOChannelSocket *ioc;
> + SocketAddress *saddr;
> + Error **errp;
> +} socket_args;
> +
> +QIOChannel *socket_recv_channel_create(void)
> +{
> + QIOChannelSocket *sioc;
> + Error *err = NULL;
> +
> + sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
> + &err);
> + if (!sioc) {
> + error_report("could not accept migration connection (%s)",
> + error_get_pretty(err));
> + return NULL;
> + }
> + return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> + /* Remove channel */
> + object_unref(OBJECT(send));
> + return 0;
> +}
> +
> +/* we have created all the recv channels, we can close the main one */
> +int socket_recv_channel_close_listening(void)
> +{
> + /* Close listening socket as its no longer needed */
> + qio_channel_close(QIO_CHANNEL(socket_args.ioc), NULL);
> + return 0;
> +}
> +
> +QIOChannel *socket_send_channel_create(void)
> +{
> + QIOChannelSocket *sioc = qio_channel_socket_new();
> +
> + qio_channel_socket_connect_sync(sioc, socket_args.saddr,
> + socket_args.errp);
> + qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> + return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> + /* Remove channel */
> + object_unref(OBJECT(send));
> + if (socket_args.saddr) {
> + qapi_free_SocketAddress(socket_args.saddr);
> + socket_args.saddr = NULL;
> + }
> + return 0;
> +}
>
> static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
> {
> @@ -97,6 +153,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
> struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>
> data->s = s;
> +
> + socket_args.saddr = saddr;
> + socket_args.errp = errp;
> +
> if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
> data->hostname = g_strdup(saddr->u.inet.data->host);
> }
> @@ -107,7 +167,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
> socket_outgoing_migration,
> data,
> socket_connect_data_free);
> - qapi_free_SocketAddress(saddr);
> }
>
> void tcp_start_outgoing_migration(MigrationState *s,
> @@ -154,8 +213,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
> object_unref(OBJECT(sioc));
>
> out:
> - /* Close listening socket as its no longer needed */
> - qio_channel_close(ioc, NULL);
> return FALSE; /* unregister */
> }
>
> @@ -164,6 +221,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
> Error **errp)
> {
> QIOChannelSocket *listen_ioc = qio_channel_socket_new();
> + socket_args.ioc = listen_ioc;
>
> qio_channel_set_name(QIO_CHANNEL(listen_ioc),
> "migration-socket-listener");
FYI I put some comments against v3 on this patch just as you sent this v4,
as I don't think the changes here are desirable in this format.
Regards,
Daniel
--
|: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org -o- http://virt-manager.org :|
|: http://entangle-photo.org -o- http://search.cpan.org/~danberr/ :|
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PATCH 00/12] Multifd v4
2017-02-14 9:55 ` [Qemu-devel] [PATCH 00/12] Multifd v4 Peter Maydell
@ 2017-02-14 12:38 ` Juan Quintela
0 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-14 12:38 UTC (permalink / raw)
To: Peter Maydell; +Cc: QEMU Developers, Amit Shah, Dr. David Alan Gilbert
Peter Maydell <peter.maydell@linaro.org> wrote:
> On 13 February 2017 at 17:19, Juan Quintela <quintela@redhat.com> wrote:
>> Hi
>>
>> [v4]
>>
>> - Address reviews
>> - move synchronization to semaphores (faster). Paolo suggestion
>> - improvements overall (see invidiual patches)
>> - fix all the checkpatch warnings
>> - fix all [HACKS] except for one
>> Please review.
>
>
> Is this a patch series or a pull request? The subject line
> says "PATCH" and the body says 'please review' but it's in
> the form of a pull request email...
patch, sorry.
I have to upgrade my scripts.
Sorry again, Juan.
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work
2017-02-13 17:19 ` [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work Juan Quintela
2017-02-14 11:17 ` Daniel P. Berrange
@ 2017-02-14 12:57 ` Paolo Bonzini
2017-02-14 13:12 ` Juan Quintela
1 sibling, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2017-02-14 12:57 UTC (permalink / raw)
To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert
On 13/02/2017 18:19, Juan Quintela wrote:
> + qemu_sem_init(&p->init, 0);
> p->quit = false;
> + p->c = socket_send_channel_create();
> + if (!p->c) {
> + error_report("Error creating a send channel");
> + exit(0);
> + }
> snprintf(thread_name, 15, "multifd_send_%d", i);
> qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
> QEMU_THREAD_JOINABLE);
> + qemu_sem_wait(&p->init);
Why do you need p->init here? Could initialization proceed in parallel
for all the threads?
Paolo
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 11/12] migration: Send the fd number which we are going to use for this page
2017-02-13 17:19 ` [Qemu-devel] [PULL 11/12] migration: Send the fd number which we are going to use for this page Juan Quintela
@ 2017-02-14 13:02 ` Paolo Bonzini
2017-02-14 13:16 ` Juan Quintela
0 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2017-02-14 13:02 UTC (permalink / raw)
To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert
On 13/02/2017 18:19, Juan Quintela wrote:
> case RAM_SAVE_FLAG_MULTIFD_PAGE:
> + fd_num = qemu_get_be16(f);
> + multifd_recv_page(host, fd_num);
> qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> break;
Why do you need RAM_SAVE_FLAG_MULTIFD_PAGE? I understand the
orchestration of sent pages from a single thread, but could the receive
threads proceed independently, each reading its own socket? They do not
even need to tell the central thread "I'm done" (they can do so just by
exiting, and the central thread does qemu_thread_join when it sees the
marker for end of live data).
Paolo
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 06/12] migration: Create multifd migration threads
2017-02-13 17:19 ` [Qemu-devel] [PULL 06/12] migration: Create multifd migration threads Juan Quintela
@ 2017-02-14 13:02 ` Paolo Bonzini
0 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2017-02-14 13:02 UTC (permalink / raw)
To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert
On 13/02/2017 18:19, Juan Quintela wrote:
> Creation of the threads, nothing inside yet.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
>
> --
>
> Use pointers instead of long array names
> Move to use semaphores instead of conditions as paolo suggestion
> ---
> include/migration/migration.h | 4 +
> migration/migration.c | 6 ++
> migration/ram.c | 168 ++++++++++++++++++++++++++++++++++++++++++
> 3 files changed, 178 insertions(+)
>
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 3c7f165..13fac75 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -256,6 +256,10 @@ MigrationState *migrate_get_current(void);
>
> int migrate_multifd_threads(void);
> int migrate_multifd_group(void);
> +void migrate_multifd_send_threads_create(void);
> +void migrate_multifd_send_threads_join(void);
> +void migrate_multifd_recv_threads_create(void);
> +void migrate_multifd_recv_threads_join(void);
>
> void migrate_compress_threads_create(void);
> void migrate_compress_threads_join(void);
> diff --git a/migration/migration.c b/migration/migration.c
> index 2b2d0a8..d2705d7 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -344,6 +344,7 @@ static void process_incoming_migration_bh(void *opaque)
> MIGRATION_STATUS_FAILED);
> error_report_err(local_err);
> migrate_decompress_threads_join();
> + migrate_multifd_recv_threads_join();
> exit(EXIT_FAILURE);
> }
>
> @@ -368,6 +369,7 @@ static void process_incoming_migration_bh(void *opaque)
> runstate_set(global_state_get_runstate());
> }
> migrate_decompress_threads_join();
> + migrate_multifd_recv_threads_join();
> /*
> * This must happen after any state changes since as soon as an external
> * observer sees this event they might start to prod at the VM assuming
> @@ -433,6 +435,7 @@ static void process_incoming_migration_co(void *opaque)
> MIGRATION_STATUS_FAILED);
> error_report("load of migration failed: %s", strerror(-ret));
> migrate_decompress_threads_join();
> + migrate_multifd_recv_threads_join();
> exit(EXIT_FAILURE);
> }
>
> @@ -445,6 +448,7 @@ void migration_fd_process_incoming(QEMUFile *f)
> Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);
>
> migrate_decompress_threads_create();
> + migrate_multifd_recv_threads_create();
> qemu_file_set_blocking(f, false);
> qemu_coroutine_enter(co);
> }
> @@ -974,6 +978,7 @@ static void migrate_fd_cleanup(void *opaque)
> qemu_mutex_lock_iothread();
>
> migrate_compress_threads_join();
> + migrate_multifd_send_threads_join();
> qemu_fclose(s->to_dst_file);
> s->to_dst_file = NULL;
> }
> @@ -2100,6 +2105,7 @@ void migrate_fd_connect(MigrationState *s)
> }
>
> migrate_compress_threads_create();
> + migrate_multifd_send_threads_create();
> qemu_thread_create(&s->thread, "live_migration", migration_thread, s,
> QEMU_THREAD_JOINABLE);
> s->migration_thread_running = true;
> diff --git a/migration/ram.c b/migration/ram.c
> index 4422ee8..0cb19cf 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -382,6 +382,174 @@ void migrate_compress_threads_create(void)
> }
> }
>
> +/* Multiple fd's */
> +
> +struct MultiFDSendParams {
> + QemuThread thread;
> + QemuSemaphore sem;
> + QemuMutex mutex;
> + bool quit;
> +};
> +typedef struct MultiFDSendParams MultiFDSendParams;
> +
> +static MultiFDSendParams *multifd_send;
> +
> +static void *multifd_send_thread(void *opaque)
> +{
> + MultiFDSendParams *params = opaque;
> +
> + while (true) {
> + qemu_mutex_lock(¶ms->mutex);
> + if (params->quit) {
> + qemu_mutex_unlock(¶ms->mutex);
> + break;
> + }
> + qemu_mutex_unlock(¶ms->mutex);
> + qemu_sem_wait(¶ms->sem);
> + }
You can use
while (true) {
qemu_sem_wait(¶ms->sem);
if (atomic_read(¶ms->quit)) {
break;
}
if (atomic_read(¶ms->address)) {
params->address = 0;
params->done = true;
qemu_set_post(&multifd_send_sem);
}
}
...
atomic_set(&p->quit, true);
qemu_sem_post(&p->sem);
This avoid params->mutex, and it works because wait and post are
respectively acquire and release operations: the read certainly happens
after wait, the write certainly happens before the post.
Likewise for ->done in patch 9; ->address and later ->pages.num don't
even need atomic_read/atomic_set because there are two semaphores
involved and they act as the synchronization point.
Paolo
> +
> + return NULL;
> +}
> +
> +static void terminate_multifd_send_threads(void)
> +{
> + int i, thread_count;
> +
> + thread_count = migrate_multifd_threads();
> + for (i = 0; i < thread_count; i++) {
> + MultiFDSendParams *p = &multifd_send[i];
> +
> + qemu_mutex_lock(&p->mutex);
> + p->quit = true;
> + qemu_sem_post(&p->sem);
> + qemu_mutex_unlock(&p->mutex);
> + }
> +}
> +
> +void migrate_multifd_send_threads_join(void)
> +{
> + int i, thread_count;
> +
> + if (!migrate_use_multifd()) {
> + return;
> + }
> + terminate_multifd_send_threads();
> + thread_count = migrate_multifd_threads();
> + for (i = 0; i < thread_count; i++) {
> + MultiFDSendParams *p = &multifd_send[i];
> +
> + qemu_thread_join(&p->thread);
> + qemu_mutex_destroy(&p->mutex);
> + qemu_sem_destroy(&p->sem);
> + }
> + g_free(multifd_send);
> + multifd_send = NULL;
> +}
> +
> +void migrate_multifd_send_threads_create(void)
> +{
> + int i, thread_count;
> +
> + if (!migrate_use_multifd()) {
> + return;
> + }
> + thread_count = migrate_multifd_threads();
> + multifd_send = g_new0(MultiFDSendParams, thread_count);
> + for (i = 0; i < thread_count; i++) {
> + char thread_name[15];
> + MultiFDSendParams *p = &multifd_send[i];
> +
> + qemu_mutex_init(&p->mutex);
> + qemu_sem_init(&p->sem, 0);
> + p->quit = false;
> + snprintf(thread_name, 15, "multifd_send_%d", i);
> + qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
> + QEMU_THREAD_JOINABLE);
> + }
> +}
> +
> +struct MultiFDRecvParams {
> + QemuThread thread;
> + QemuSemaphore sem;
> + QemuMutex mutex;
> + bool quit;
> +};
> +typedef struct MultiFDRecvParams MultiFDRecvParams;
> +
> +static MultiFDRecvParams *multifd_recv;
> +
> +static void *multifd_recv_thread(void *opaque)
> +{
> + MultiFDRecvParams *params = opaque;
> +
> + while (true) {
> + qemu_mutex_lock(¶ms->mutex);
> + if (params->quit) {
> + qemu_mutex_unlock(¶ms->mutex);
> + break;
> + }
> + qemu_mutex_unlock(¶ms->mutex);
> + qemu_sem_wait(¶ms->sem);
> + }
> +
> + return NULL;
> +}
> +
> +static void terminate_multifd_recv_threads(void)
> +{
> + int i, thread_count;
> +
> + thread_count = migrate_multifd_threads();
> + for (i = 0; i < thread_count; i++) {
> + MultiFDRecvParams *p = &multifd_recv[i];
> +
> + qemu_mutex_lock(&p->mutex);
> + p->quit = true;
> + qemu_sem_post(&p->sem);
> + qemu_mutex_unlock(&p->mutex);
> + }
> +}
> +
> +void migrate_multifd_recv_threads_join(void)
> +{
> + int i, thread_count;
> +
> + if (!migrate_use_multifd()) {
> + return;
> + }
> + terminate_multifd_recv_threads();
> + thread_count = migrate_multifd_threads();
> + for (i = 0; i < thread_count; i++) {
> + MultiFDRecvParams *p = &multifd_recv[i];
> +
> + qemu_thread_join(&p->thread);
> + qemu_mutex_destroy(&p->mutex);
> + qemu_sem_destroy(&p->sem);
> + }
> + g_free(multifd_recv);
> + multifd_recv = NULL;
> +}
> +
> +void migrate_multifd_recv_threads_create(void)
> +{
> + int i, thread_count;
> +
> + if (!migrate_use_multifd()) {
> + return;
> + }
> + thread_count = migrate_multifd_threads();
> + multifd_recv = g_new0(MultiFDRecvParams, thread_count);
> + for (i = 0; i < thread_count; i++) {
> + MultiFDRecvParams *p = &multifd_recv[i];
> +
> + qemu_mutex_init(&p->mutex);
> + qemu_sem_init(&p->sem, 0);
> + p->quit = false;
> + qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
> + QEMU_THREAD_JOINABLE);
> + }
> +}
> +
> /**
> * save_page_header: Write page header to wire
> *
>
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PATCH 00/12] Multifd v4
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
` (12 preceding siblings ...)
2017-02-14 9:55 ` [Qemu-devel] [PATCH 00/12] Multifd v4 Peter Maydell
@ 2017-02-14 13:03 ` Paolo Bonzini
13 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2017-02-14 13:03 UTC (permalink / raw)
To: Juan Quintela, qemu-devel; +Cc: amit.shah, dgilbert
On 13/02/2017 18:19, Juan Quintela wrote:
> [v4]
>
> - Address reviews
> - move synchronization to semaphores (faster). Paolo suggestion
> - improvements overall (see invidiual patches)
> - fix all the checkpatch warnings
> - fix all [HACKS] except for one
>
> Please review.
I think you can simplify the receive threads much more. I left comments
on patch 11.
Paolo
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work
2017-02-14 12:57 ` Paolo Bonzini
@ 2017-02-14 13:12 ` Juan Quintela
2017-02-14 13:37 ` Paolo Bonzini
0 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2017-02-14 13:12 UTC (permalink / raw)
To: Paolo Bonzini; +Cc: qemu-devel, amit.shah, dgilbert
Paolo Bonzini <pbonzini@redhat.com> wrote:
> On 13/02/2017 18:19, Juan Quintela wrote:
>> + qemu_sem_init(&p->init, 0);
>> p->quit = false;
>> + p->c = socket_send_channel_create();
>> + if (!p->c) {
>> + error_report("Error creating a send channel");
>> + exit(0);
>> + }
>> snprintf(thread_name, 15, "multifd_send_%d", i);
>> qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
>> QEMU_THREAD_JOINABLE);
>> + qemu_sem_wait(&p->init);
>
> Why do you need p->init here? Could initialization proceed in parallel
> for all the threads?
We need to make sure that the send thread number 2 goes to thread number
2 on destination. Yes, we could do a more complicated algorithm, but we
really care so much about this initialization time?
Later, Juan.
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 11/12] migration: Send the fd number which we are going to use for this page
2017-02-14 13:02 ` Paolo Bonzini
@ 2017-02-14 13:16 ` Juan Quintela
0 siblings, 0 replies; 28+ messages in thread
From: Juan Quintela @ 2017-02-14 13:16 UTC (permalink / raw)
To: Paolo Bonzini; +Cc: qemu-devel, amit.shah, dgilbert
Paolo Bonzini <pbonzini@redhat.com> wrote:
> On 13/02/2017 18:19, Juan Quintela wrote:
>> case RAM_SAVE_FLAG_MULTIFD_PAGE:
>> + fd_num = qemu_get_be16(f);
>> + multifd_recv_page(host, fd_num);
>> qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>> break;
>
> Why do you need RAM_SAVE_FLAG_MULTIFD_PAGE? I understand the
> orchestration of sent pages from a single thread, but could the receive
> threads proceed independently, each reading its own socket? They do not
> even need to tell the central thread "I'm done" (they can do so just by
> exiting, and the central thread does qemu_thread_join when it sees the
> marker for end of live data).
We can send multiple sends in one go, the whole idea was to send the
pages "aligned", and being able to read also in place on destination.
But this showed that we still have bottlenecks on the code that search
for pages :-(
Later, Juan.
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work
2017-02-14 13:12 ` Juan Quintela
@ 2017-02-14 13:37 ` Paolo Bonzini
2017-02-14 13:52 ` Juan Quintela
0 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2017-02-14 13:37 UTC (permalink / raw)
To: quintela; +Cc: qemu-devel, amit.shah, dgilbert
On 14/02/2017 14:12, Juan Quintela wrote:
>> On 13/02/2017 18:19, Juan Quintela wrote:
>>> + qemu_sem_init(&p->init, 0);
>>> p->quit = false;
>>> + p->c = socket_send_channel_create();
>>> + if (!p->c) {
>>> + error_report("Error creating a send channel");
>>> + exit(0);
>>> + }
>>> snprintf(thread_name, 15, "multifd_send_%d", i);
>>> qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
>>> QEMU_THREAD_JOINABLE);
>>> + qemu_sem_wait(&p->init);
>> Why do you need p->init here? Could initialization proceed in parallel
>> for all the threads?
>
> We need to make sure that the send thread number 2 goes to thread number
> 2 on destination. Yes, we could do a more complicated algorithm, but we
> really care so much about this initialization time?
I was wondering if p->init was needed in general, so it would simplify.
But without a design document I cannot really understand the logic---as
I said, I don't really grok the need for RAM_SAVE_FLAG_MULTIFD_PAGE.
Paolo
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work
2017-02-14 13:37 ` Paolo Bonzini
@ 2017-02-14 13:52 ` Juan Quintela
2017-02-14 14:08 ` Paolo Bonzini
0 siblings, 1 reply; 28+ messages in thread
From: Juan Quintela @ 2017-02-14 13:52 UTC (permalink / raw)
To: Paolo Bonzini; +Cc: qemu-devel, amit.shah, dgilbert
Paolo Bonzini <pbonzini@redhat.com> wrote:
> On 14/02/2017 14:12, Juan Quintela wrote:
>>> On 13/02/2017 18:19, Juan Quintela wrote:
>>>> + qemu_sem_init(&p->init, 0);
>>>> p->quit = false;
>>>> + p->c = socket_send_channel_create();
>>>> + if (!p->c) {
>>>> + error_report("Error creating a send channel");
>>>> + exit(0);
>>>> + }
>>>> snprintf(thread_name, 15, "multifd_send_%d", i);
>>>> qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
>>>> QEMU_THREAD_JOINABLE);
>>>> + qemu_sem_wait(&p->init);
>>> Why do you need p->init here? Could initialization proceed in parallel
>>> for all the threads?
>>
>> We need to make sure that the send thread number 2 goes to thread number
>> 2 on destination. Yes, we could do a more complicated algorithm, but we
>> really care so much about this initialization time?
>
> I was wondering if p->init was needed in general, so it would simplify.
> But without a design document I cannot really understand the logic---as
> I said, I don't really grok the need for RAM_SAVE_FLAG_MULTIFD_PAGE.
will get some general documentation in /doc/.
Basically what we had on the only stream was:
{[page header][page]}+
And we moved to:
{[page header]+[where to receive]}: on the principal stream
[page]+: on the rest of the multifd
All nicely aligned and so.
My understanding is that we could optimize the receiving with splice to
not even touch userspace? (that part is not done). That was the reason
why I didn't want to put header's footers there. As the headers are so
small compared with the pages payload, the transmission of them should
be lost on the noise, no?
Later, Juan.
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work
2017-02-14 13:52 ` Juan Quintela
@ 2017-02-14 14:08 ` Paolo Bonzini
0 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2017-02-14 14:08 UTC (permalink / raw)
To: quintela; +Cc: qemu-devel, amit.shah, dgilbert
On 14/02/2017 14:52, Juan Quintela wrote:
> will get some general documentation in /doc/.
>
> Basically what we had on the only stream was:
>
>
> {[page header][page]}+
>
>
> And we moved to:
>
> {[page header]+[where to receive]}: on the principal stream
>
> [page]+: on the rest of the multifd
>
> All nicely aligned and so.
>
> My understanding is that we could optimize the receiving with splice to
> not even touch userspace? (that part is not done).
The frames are not going to be aligned (MTU is usually 1500 or 9000),
and the extra synchronization cost might nullify any speedup.
Even the send side can in principle be made completely independent, by
scanning the bitmap in each thread.
> That was the reason
> why I didn't want to put header's footers there. As the headers are so
> small compared with the pages payload, the transmission of them should
> be lost on the noise, no?
The transmission may be, but the cost of having one more active socket +
possibly the cost of Nagle's algorithm on that one socket + the cost of
synchronization can be nontrivial.
Paolo
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 03/12] migration: Add multifd capability
2017-02-13 17:19 ` [Qemu-devel] [PULL 03/12] migration: Add multifd capability Juan Quintela
@ 2017-02-15 13:04 ` Dr. David Alan Gilbert
0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-15 13:04 UTC (permalink / raw)
To: Juan Quintela; +Cc: qemu-devel, amit.shah
* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> ---
> include/migration/migration.h | 1 +
> migration/migration.c | 9 +++++++++
> qapi-schema.json | 5 +++--
> 3 files changed, 13 insertions(+), 2 deletions(-)
>
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 7528cc2..e8bba55 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -308,6 +308,7 @@ bool migrate_postcopy_ram(void);
> bool migrate_zero_blocks(void);
>
> bool migrate_auto_converge(void);
> +bool migrate_use_multifd(void);
>
> int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
> uint8_t *dst, int dlen);
> diff --git a/migration/migration.c b/migration/migration.c
> index 2b179c6..37af7a4 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1369,6 +1369,15 @@ bool migrate_use_events(void)
> return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS];
> }
>
> +bool migrate_use_multifd(void)
> +{
> + MigrationState *s;
> +
> + s = migrate_get_current();
> +
> + return s->enabled_capabilities[MIGRATION_CAPABILITY_X_MULTIFD];
> +}
> +
> int migrate_use_xbzrle(void)
> {
> MigrationState *s;
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 61151f3..ff7579e 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -865,12 +865,13 @@
> # side, this process is called COarse-Grain LOck Stepping (COLO) for
> # Non-stop Service. (since 2.8)
> #
> +# @x-multifd: Use more than one fd for migration (since 2.9)
> +#
> # Since: 1.2
> ##
> { 'enum': 'MigrationCapability',
> 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
> - 'compress', 'events', 'postcopy-ram', 'x-colo'] }
> -
> + 'compress', 'events', 'postcopy-ram', 'x-colo', 'x-multifd'] }
That needs a minor remerge after 'release-ram' just went in.
Dave
> ##
> # @MigrationCapabilityStatus:
> #
> --
> 2.7.4
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 01/12] migration: Test for disabled features on reception
2017-02-13 17:19 ` [Qemu-devel] [PULL 01/12] migration: Test for disabled features on reception Juan Quintela
@ 2017-02-15 13:12 ` Dr. David Alan Gilbert
0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-15 13:12 UTC (permalink / raw)
To: Juan Quintela; +Cc: qemu-devel
* Juan Quintela (quintela@redhat.com) wrote:
> Right now, if we receive a compressed page while this features are
> disabled, Bad Things (TM) can happen. Just add a test for them.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
This could go in separately.
Dave
>
> --
>
> I had XBZRLE here also, but it don't need extra resources on
> destination, only on source. Additionally libvirt don't enable it on
> destination, so don't put it here.
> ---
> migration/ram.c | 16 +++++++++++++++-
> 1 file changed, 15 insertions(+), 1 deletion(-)
>
> diff --git a/migration/ram.c b/migration/ram.c
> index ef8fadf..5817f8c 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -2455,7 +2455,7 @@ static int ram_load_postcopy(QEMUFile *f)
>
> static int ram_load(QEMUFile *f, void *opaque, int version_id)
> {
> - int flags = 0, ret = 0;
> + int flags = 0, ret = 0, invalid_flags;
> static uint64_t seq_iter;
> int len = 0;
> /*
> @@ -2470,6 +2470,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
> ret = -EINVAL;
> }
>
> + invalid_flags = 0;
> +
> + if (!migrate_use_compression()) {
> + invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
> + }
> /* This RCU critical section can be very long running.
> * When RCU reclaims in the code start to become numerous,
> * it will be necessary to reduce the granularity of this
> @@ -2490,6 +2495,15 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
> flags = addr & ~TARGET_PAGE_MASK;
> addr &= TARGET_PAGE_MASK;
>
> + if (flags & invalid_flags) {
> + if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
> + error_report("Received an unexpected compressed page");
> + }
> +
> + ret = -EINVAL;
> + break;
> + }
> +
> if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
> RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
> RAMBlock *block = ram_block_from_stream(f, flags);
> --
> 2.7.4
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
^ permalink raw reply [flat|nested] 28+ messages in thread
* Re: [Qemu-devel] [PULL 02/12] migration: Don't create decompression threads if not enabled
2017-02-13 17:19 ` [Qemu-devel] [PULL 02/12] migration: Don't create decompression threads if not enabled Juan Quintela
@ 2017-02-15 13:17 ` Dr. David Alan Gilbert
0 siblings, 0 replies; 28+ messages in thread
From: Dr. David Alan Gilbert @ 2017-02-15 13:17 UTC (permalink / raw)
To: Juan Quintela; +Cc: qemu-devel
* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
This can go in separately.
> --
>
> I removed the [HACK] part because previous patch just check that
> compression pages are not received.
> ---
> migration/ram.c | 6 ++++++
> 1 file changed, 6 insertions(+)
>
> diff --git a/migration/ram.c b/migration/ram.c
> index 5817f8c..4422ee8 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -2251,6 +2251,9 @@ void migrate_decompress_threads_create(void)
> {
> int i, thread_count;
>
> + if (!migrate_use_compression()) {
> + return;
> + }
> thread_count = migrate_decompress_threads();
> decompress_threads = g_new0(QemuThread, thread_count);
> decomp_param = g_new0(DecompressParam, thread_count);
> @@ -2272,6 +2275,9 @@ void migrate_decompress_threads_join(void)
> {
> int i, thread_count;
>
> + if (!migrate_use_compression()) {
> + return;
> + }
> thread_count = migrate_decompress_threads();
> for (i = 0; i < thread_count; i++) {
> qemu_mutex_lock(&decomp_param[i].mutex);
> --
> 2.7.4
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
^ permalink raw reply [flat|nested] 28+ messages in thread
end of thread, other threads:[~2017-02-15 13:17 UTC | newest]
Thread overview: 28+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2017-02-13 17:19 [Qemu-devel] [PATCH 00/12] Multifd v4 Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 01/12] migration: Test for disabled features on reception Juan Quintela
2017-02-15 13:12 ` Dr. David Alan Gilbert
2017-02-13 17:19 ` [Qemu-devel] [PULL 02/12] migration: Don't create decompression threads if not enabled Juan Quintela
2017-02-15 13:17 ` Dr. David Alan Gilbert
2017-02-13 17:19 ` [Qemu-devel] [PULL 03/12] migration: Add multifd capability Juan Quintela
2017-02-15 13:04 ` Dr. David Alan Gilbert
2017-02-13 17:19 ` [Qemu-devel] [PULL 04/12] migration: Create x-multifd-threads parameter Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 05/12] migration: Create x-multifd-group parameter Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 06/12] migration: Create multifd migration threads Juan Quintela
2017-02-14 13:02 ` Paolo Bonzini
2017-02-13 17:19 ` [Qemu-devel] [PULL 07/12] migration: Start of multiple fd work Juan Quintela
2017-02-14 11:17 ` Daniel P. Berrange
2017-02-14 12:57 ` Paolo Bonzini
2017-02-14 13:12 ` Juan Quintela
2017-02-14 13:37 ` Paolo Bonzini
2017-02-14 13:52 ` Juan Quintela
2017-02-14 14:08 ` Paolo Bonzini
2017-02-13 17:19 ` [Qemu-devel] [PULL 08/12] migration: Create ram_multifd_page Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 09/12] migration: Create thread infrastructure for multifd send side Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 10/12] migration: Really use multiple pages at a time Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 11/12] migration: Send the fd number which we are going to use for this page Juan Quintela
2017-02-14 13:02 ` Paolo Bonzini
2017-02-14 13:16 ` Juan Quintela
2017-02-13 17:19 ` [Qemu-devel] [PULL 12/12] migration: Test new fd infrastructure Juan Quintela
2017-02-14 9:55 ` [Qemu-devel] [PATCH 00/12] Multifd v4 Peter Maydell
2017-02-14 12:38 ` Juan Quintela
2017-02-14 13:03 ` Paolo Bonzini
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).