qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH v9 00/12] Multifd
@ 2017-10-04 10:46 Juan Quintela
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 01/12] qapi: Fix grammar in x-multifd-page-count descriptions Juan Quintela
                   ` (11 more replies)
  0 siblings, 12 replies; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Hi

This series is on top of my migration test series just sent, only reject should be on the test system, though.

On v9 series for you:
- qobject_unref() as requested by dan

  Yes he was right, I had a reference leak for _non_ multifd, I
  *thought* he mean for multifd, and that took a while to understand
  (and then find when/where).

- multifd page count: it is dropped for good
- uuid handling: we use the default qemu uuid of 0000...
- uuid handling: using and struct and sending the struct
  * idea is to add a size field and add more parameter after that
  * anyone has a good idea how to "ouptut" info
    migrate_capabilities/parameters json into a string and how to read it back?
- changed how we test that all threads/channels are already created.
  Should be more robust.
- Add tests multifd.  Still not ported on top of migration-tests series sent early
  waiting for review on the ideas there.
- Rebase and remove al the integrated patches (back at 12)

Please, review.

Later, Juan.

[v8]
Things NOT done yet:

- drop x-multifd-page-count?  We can use performance to set a default value
- paolo suggestion of not having a control channel
  needs iyet more cleanups to be able to have more than one ramstate, trying it.
- still not performance done, but it has been very stable

On v8:
- use connect_async
- rename multifd-group to multifd-page-count (danp suggestion)
- rename multifd-threads to multifd-channels (danp suggestion)
- use new qio*channel functions
- Address rest of comments left


So, please review.

My idea will be to pull this changes and continue performance changes
for inside, basically everything is already reviewed.

Thanks, Juan.

On v7:
- tests fixed as danp wanted
- have to revert danp qio_*_all patches, as they break multifd, I have to investigate why.
- error_abort is gone.  After several tries about getting errors, I ended having a single error
  proceted by a lock and first error wins.
- Addressed basically all reviews (see on ToDo)
- Pointers to struct are done now
- fix lots of leaks
- lots of small fixes


[v6]
- Improve migration_ioc_porcess_incoming
- teach about G_SOURCE_REMOVE/CONTINUE
- Add test for migration_has_all_channels
- use DEFIN_PROP*
- change recv_state to use pointers to parameters
  make easier to receive channels out of order
- use g_strdup_printf()
- improve count of threads to know when we have to finish
- report channel id's on errors
- Use last_page parameter for multifd_send_page() sooner
- Improve commets for address
- use g_new0() instead of g_malloc()
- create MULTIFD_CONTINUE instead of using UINT16_MAX
- clear memory used by group of pages
  once there, pass everything to the global state variables instead of being
  local to the function.  This way it works if we cancel migration and start
  a new one
- Really wait to create the migration_thread until all channels are created
- split initial_bytes setup to make clearer following patches.
- createRAM_SAVE_FLAG_MULTIFD_SYNC macro, to make clear what we are doing
- move setting of need_flush to inside bitmap_sync
- Lots of other small changes & reorderings

Please, comment.


[v5]

- tests from qio functions (a.k.a. make danp happy)
- 1st message from one channel to the other contains:
   <uuid> multifd <channel number>
   This would allow us to create more channels as we want them.
   a.k.a. Making dave happy
- Waiting in reception for new channels using qio listeners
  Getting threads, qio and reference counters working at the same time
  was interesing.
  Another make danp happy.

- Lots and lots of small changes and fixes.  Notice that the last 70 patches
  that I merged or so what to make this series easier/smaller.

- NOT DONE: I haven't been woring on measuring performance
  differences, this was about getting the creation of the
  threads/channels right.

So, what I want:

- Are people happy with how I have (ab)used qio channels? (yes danp,
  that is you).
- My understanding is th

ToDo:

- Make paolo happy: He wanted to test using control information
  through each channel, not only pages.  This requires yet more
  cleanups to be able to have more than one QEMUFile/RAMState open at
  the same time.

- How I create multiple channels.  Things I know:
  * with current changes, it should work with fd/channels (the multifd bits),
    but we don;t have a way to pass multiple fd;s or exec files.
    Danp, any idea about how to create an UI for it?
  * My idea is that we would split current code to be:
    + channel creation at migration.c
    + rest of bits at ram.c
    + change format to:
      <uuid> main <rest of migration capabilities/paramentes> so we can check
      <uuid> postcopy <no clue what parameters are needed>
          Dave wanted a way to create a new fd for postcopy for some time
    + Adding new channels is easy

- Performance data/numbers: Yes, I wanted to get this out at once, I
  would continue with this.


Please, review.


[v4]
This is the 4th version of multifd. Changes:
- XBZRLE don't need to be checked for
- Documentation and defaults are consistent
- split socketArgs
- use iovec instead of creating something similar.
- We use now the exported size of target page (another HACK removal)
- created qio_chanel_{wirtev,readv}_all functions.  the _full() name
  was already taken.
  What they do is the same that the without _all() function, but if it
  returns due to blocking it redo the call.
- it is checkpatch.pl clean now.

Please comment, Juan.


Juan Quintela (12):
  qapi: Fix grammar in x-multifd-page-count descriptions
  migration: Improve migration thread error handling
  migration: Make migrate_fd_error() the owner of the Error
  migration: Start of multiple fd work
  migration: Create ram_multifd_page
  migration: Send the fd number which we are going to use for this page
  migration: Create thread infrastructure for multifd recv side
  migration: Test new fd infrastructure
  migration: Rename initial_bytes
  migration: Transfer pages over new channels
  migration: Flush receive queue
  migration: Add multifd test

 migration/channel.c    |   1 -
 migration/migration.c  |  47 +++--
 migration/migration.h  |  10 +-
 migration/ram.c        | 390 +++++++++++++++++++++++++++++++++++--
 migration/ram.h        |   3 +
 migration/socket.c     |  38 +++-
 migration/socket.h     |  10 +
 migration/tls.c        |   1 -
 qapi/migration.json    |   6 +-
 tests/Makefile.include |   3 +
 tests/multifd-test.c   | 511 +++++++++++++++++++++++++++++++++++++++++++++++++
 11 files changed, 985 insertions(+), 35 deletions(-)
 create mode 100644 tests/multifd-test.c

-- 
2.13.5

^ permalink raw reply	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 01/12] qapi: Fix grammar in x-multifd-page-count descriptions
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-16 16:53   ` Dr. David Alan Gilbert
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling Juan Quintela
                   ` (10 subsequent siblings)
  11 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Reported-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 qapi/migration.json | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/qapi/migration.json b/qapi/migration.json
index f8b365e3f5..d94d6c0e46 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -471,7 +471,7 @@
 #                     number of sockets used for migration.  The
 #                     default value is 2 (since 2.11)
 #
-# @x-multifd-page-count: Number of pages sent together to a thread
+# @x-multifd-page-count: Number of pages sent together to a thread.
 #                        The default value is 16 (since 2.11)
 #
 # Since: 2.4
@@ -542,7 +542,7 @@
 #                     number of sockets used for migration.  The
 #                     default value is 2 (since 2.11)
 #
-# @x-multifd-page-count: Number of pages sent together to a thread
+# @x-multifd-page-count: Number of pages sent together to a thread.
 #                        The default value is 16 (since 2.11)
 #
 # Since: 2.4
@@ -638,7 +638,7 @@
 #                     number of sockets used for migration.
 #                     The default value is 2 (since 2.11)
 #
-# @x-multifd-page-count: Number of pages sent together to a thread
+# @x-multifd-page-count: Number of pages sent together to a thread.
 #                        The default value is 16 (since 2.11)
 #
 # Since: 2.4
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 01/12] qapi: Fix grammar in x-multifd-page-count descriptions Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-09  9:28   ` Peter Xu
  2017-10-16 17:34   ` Dr. David Alan Gilbert
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 03/12] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
                   ` (9 subsequent siblings)
  11 siblings, 2 replies; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We now report errors also when we finish migration, not only on info
migrate.  We plan to use this error from several places, and we want
the first error to happen to win, so we add an mutex to order it.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/migration.c | 19 ++++++++++++++++---
 migration/migration.h |  7 ++++++-
 migration/tls.c       |  1 -
 3 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 98429dc843..468f51cfa7 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1071,19 +1071,30 @@ static void migrate_fd_cleanup(void *opaque)
                           MIGRATION_STATUS_CANCELLED);
     }
 
+    if (s->error) {
+        /* It is used on info migrate.  We can't free it */
+        error_report_err(error_copy(s->error));
+    }
     notifier_list_notify(&migration_state_notifiers, s);
     block_cleanup_parameters(s);
 }
 
+void migrate_set_error(MigrationState *s, const Error *error)
+{
+    qemu_mutex_lock(&s->error_mutex);
+    if (!s->error) {
+        s->error = error_copy(error);
+    }
+    qemu_mutex_unlock(&s->error_mutex);
+}
+
 void migrate_fd_error(MigrationState *s, const Error *error)
 {
     trace_migrate_fd_error(error_get_pretty(error));
     assert(s->to_dst_file == NULL);
     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                       MIGRATION_STATUS_FAILED);
-    if (!s->error) {
-        s->error = error_copy(error);
-    }
+    migrate_set_error(s, error);
     notifier_list_notify(&migration_state_notifiers, s);
     block_cleanup_parameters(s);
 }
@@ -2355,6 +2366,7 @@ static void migration_instance_finalize(Object *obj)
     MigrationState *ms = MIGRATION_OBJ(obj);
     MigrationParameters *params = &ms->parameters;
 
+    qemu_mutex_destroy(&ms->error_mutex);
     g_free(params->tls_hostname);
     g_free(params->tls_creds);
 }
@@ -2367,6 +2379,7 @@ static void migration_instance_init(Object *obj)
     ms->state = MIGRATION_STATUS_NONE;
     ms->xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE;
     ms->mbps = -1;
+    qemu_mutex_init(&ms->error_mutex);
 
     params->tls_hostname = g_strdup("");
     params->tls_creds = g_strdup("");
diff --git a/migration/migration.h b/migration/migration.h
index b83cceadc4..51c0ac2e71 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -129,8 +129,12 @@ struct MigrationState
     int64_t colo_checkpoint_time;
     QEMUTimer *colo_delay_timer;
 
-    /* The last error that occurred */
+    /* The first error that has occurred.
+       We used the mutex to be able to return the 1st error message */
     Error *error;
+    /* mutex to protect errp */
+    QemuMutex error_mutex;
+
     /* Do we have to clean up -b/-i from old migrate parameters */
     /* This feature is deprecated and will be removed */
     bool must_remove_block_options;
@@ -159,6 +163,7 @@ bool  migration_has_all_channels(void);
 
 uint64_t migrate_max_downtime(void);
 
+void migrate_set_error(MigrationState *s, const Error *error);
 void migrate_fd_error(MigrationState *s, const Error *error);
 
 void migrate_fd_connect(MigrationState *s);
diff --git a/migration/tls.c b/migration/tls.c
index 596e8790bd..026a008667 100644
--- a/migration/tls.c
+++ b/migration/tls.c
@@ -119,7 +119,6 @@ static void migration_tls_outgoing_handshake(QIOTask *task,
     if (qio_task_propagate_error(task, &err)) {
         trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
         migrate_fd_error(s, err);
-        error_free(err);
     } else {
         trace_migration_tls_outgoing_handshake_complete();
         migration_channel_connect(s, ioc, NULL);
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 03/12] migration: Make migrate_fd_error() the owner of the Error
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 01/12] qapi: Fix grammar in x-multifd-page-count descriptions Juan Quintela
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-09  9:34   ` Peter Xu
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work Juan Quintela
                   ` (8 subsequent siblings)
  11 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

So far, we had to free the error after each caller, so just do it
here.  Once there, tls.c was leaking the error.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/channel.c   |  1 -
 migration/migration.c | 10 ++++------
 migration/migration.h |  4 ++--
 migration/socket.c    |  1 -
 4 files changed, 6 insertions(+), 10 deletions(-)

diff --git a/migration/channel.c b/migration/channel.c
index 70ec7ea3b7..1dd2ae1530 100644
--- a/migration/channel.c
+++ b/migration/channel.c
@@ -71,7 +71,6 @@ void migration_channel_connect(MigrationState *s,
         migration_tls_channel_connect(s, ioc, hostname, &local_err);
         if (local_err) {
             migrate_fd_error(s, local_err);
-            error_free(local_err);
         }
     } else {
         QEMUFile *f = qemu_fopen_channel_output(ioc);
diff --git a/migration/migration.c b/migration/migration.c
index 468f51cfa7..61b7e7105a 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1079,16 +1079,14 @@ static void migrate_fd_cleanup(void *opaque)
     block_cleanup_parameters(s);
 }
 
-void migrate_set_error(MigrationState *s, const Error *error)
+void migrate_set_error(MigrationState *s, Error *error)
 {
     qemu_mutex_lock(&s->error_mutex);
-    if (!s->error) {
-        s->error = error_copy(error);
-    }
+    error_propagate(&s->error, error);
     qemu_mutex_unlock(&s->error_mutex);
 }
 
-void migrate_fd_error(MigrationState *s, const Error *error)
+void migrate_fd_error(MigrationState *s, Error *error)
 {
     trace_migrate_fd_error(error_get_pretty(error));
     assert(s->to_dst_file == NULL);
@@ -1362,7 +1360,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
     }
 
     if (local_err) {
-        migrate_fd_error(s, local_err);
+        migrate_fd_error(s, error_copy(local_err));
         error_propagate(errp, local_err);
         return;
     }
diff --git a/migration/migration.h b/migration/migration.h
index 51c0ac2e71..cc196cc87f 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -163,8 +163,8 @@ bool  migration_has_all_channels(void);
 
 uint64_t migrate_max_downtime(void);
 
-void migrate_set_error(MigrationState *s, const Error *error);
-void migrate_fd_error(MigrationState *s, const Error *error);
+void migrate_set_error(MigrationState *s, Error *error);
+void migrate_fd_error(MigrationState *s, Error *error);
 
 void migrate_fd_connect(MigrationState *s);
 
diff --git a/migration/socket.c b/migration/socket.c
index dee869044a..2d70747a1a 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -80,7 +80,6 @@ static void socket_outgoing_migration(QIOTask *task,
     if (qio_task_propagate_error(task, &err)) {
         trace_migration_socket_outgoing_error(error_get_pretty(err));
         migrate_fd_error(data->s, err);
-        error_free(err);
     } else {
         trace_migration_socket_outgoing_connected(data->hostname);
         migration_channel_connect(data->s, sioc, data->hostname);
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (2 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 03/12] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-09 10:05   ` Peter Xu
                     ` (2 more replies)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 05/12] migration: Create ram_multifd_page Juan Quintela
                   ` (7 subsequent siblings)
  11 siblings, 3 replies; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We create new channels for each new thread created. We send through
them a string containing <uuid> multifd <channel number> so we are
sure that we connect the right channels in both sides.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Split SocketArgs into incoming and outgoing args

Use UUID's on the initial message, so we are sure we are connecting to
the right channel.

Remove init semaphore.  Now that we use uuids on the init message, we
know that this is our channel.

Fix recv socket destwroy, we were destroying send channels.
This was very interesting, because we were using an unreferred object
without problems.

Move to struct of pointers
init channel sooner.
split recv thread creation.
listen on main thread
We count the number of created threads to know when we need to stop listening
Use g_strdup_printf
report channel id on errors
Add name parameter
Use local_err
Add Error * parameter to socket_send_channel_create()
Use qio_channel_*_all
Use asynchronous connect
Use an struct to send all fields
Use default uuid
---
 migration/migration.c |   5 ++
 migration/ram.c       | 128 +++++++++++++++++++++++++++++++++++++++++++-------
 migration/ram.h       |   3 ++
 migration/socket.c    |  34 +++++++++++++-
 migration/socket.h    |  10 ++++
 5 files changed, 162 insertions(+), 18 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 61b7e7105a..ee98c50d8c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -419,6 +419,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
  */
 bool migration_has_all_channels(void)
 {
+    if (migrate_use_multifd()) {
+        int thread_count = migrate_multifd_channels();
+
+        return thread_count == multifd_created_channels();
+    }
     return true;
 }
 
diff --git a/migration/ram.c b/migration/ram.c
index b83f8977c5..b57006594b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,6 +36,7 @@
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
+#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -47,6 +48,8 @@
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"
 #include "migration/block.h"
+#include "sysemu/sysemu.h"
+#include "qemu/uuid.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -363,6 +366,7 @@ struct MultiFDSendParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -379,6 +383,12 @@ static void terminate_multifd_send_threads(Error *errp)
 {
     int i;
 
+    if (errp) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, errp);
+        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
+                          MIGRATION_STATUS_FAILED);
+    }
     for (i = 0; i < multifd_send_state->count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -404,6 +414,7 @@ int multifd_save_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        socket_send_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
     }
@@ -414,9 +425,27 @@ int multifd_save_cleanup(Error **errp)
     return ret;
 }
 
+typedef struct {
+    uint32_t version;
+    uint8_t id;
+    char uuid[UUID_FMT_LEN];
+} MigrateMultiFDInit_t;
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    MigrateMultiFDInit_t msg;
+    Error *local_err = NULL;
+    size_t ret;
+
+    msg.version = 1;
+    msg.id = p->id;
+    qemu_uuid_unparse(&qemu_uuid, (char *)&msg.uuid);
+    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
+    if (ret != 0) {
+        terminate_multifd_send_threads(local_err);
+        return NULL;
+    }
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -431,6 +460,27 @@ static void *multifd_send_thread(void *opaque)
     return NULL;
 }
 
+static void multifd_new_channel_async(QIOTask *task, gpointer opaque)
+{
+    MultiFDSendParams *p = opaque;
+    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+    Error *local_err;
+
+    if (qio_task_propagate_error(task, &local_err)) {
+        if (multifd_save_cleanup(&local_err) != 0) {
+            migrate_set_error(migrate_get_current(), local_err);
+        }
+    } else {
+        p->c = QIO_CHANNEL(sioc);
+        qio_channel_set_delay(p->c, false);
+
+        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+                           QEMU_THREAD_JOINABLE);
+
+        multifd_send_state->count++;
+    }
+}
+
 int multifd_save_setup(void)
 {
     int thread_count;
@@ -451,10 +501,7 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdsend_%d", i);
-        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
-                           QEMU_THREAD_JOINABLE);
-
-        multifd_send_state->count++;
+        socket_send_channel_create(multifd_new_channel_async, p);
     }
     return 0;
 }
@@ -463,6 +510,7 @@ struct MultiFDRecvParams {
     uint8_t id;
     char *name;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -473,12 +521,22 @@ struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
+    /* Should we finish */
+    bool quit;
 } *multifd_recv_state;
 
 static void terminate_multifd_recv_threads(Error *errp)
 {
     int i;
 
+    if (errp) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, errp);
+        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
+                          MIGRATION_STATUS_FAILED);
+    }
+    multifd_recv_state->quit = true;
+
     for (i = 0; i < multifd_recv_state->count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -504,6 +562,7 @@ int multifd_load_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        socket_recv_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
     }
@@ -532,10 +591,51 @@ static void *multifd_recv_thread(void *opaque)
     return NULL;
 }
 
+void multifd_new_channel(QIOChannel *ioc)
+{
+    MultiFDRecvParams *p;
+    MigrateMultiFDInit_t msg;
+    Error *local_err = NULL;
+    char *uuid;
+    size_t ret;
+
+    ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
+    if (ret != 0) {
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+
+    uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+
+    if (strcmp(msg.uuid, uuid)) {
+        g_free(uuid);
+        error_setg(&local_err, "multifd: received uuid '%s' and expected "
+                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+    g_free(uuid);
+
+    p = &multifd_recv_state->params[msg.id];
+    if (p->id != 0) {
+        error_setg(&local_err, "multifd: received id '%d' already setup'", msg.id);
+        terminate_multifd_recv_threads(local_err);
+        return;
+    }
+    qemu_mutex_init(&p->mutex);
+    qemu_sem_init(&p->sem, 0);
+    p->quit = false;
+    p->id = msg.id;
+    p->c = ioc;
+    multifd_recv_state->count++;
+    p->name = g_strdup_printf("multifdrecv_%d", msg.id);
+    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+}
+
 int multifd_load_setup(void)
 {
     int thread_count;
-    uint8_t i;
 
     if (!migrate_use_multifd()) {
         return 0;
@@ -544,21 +644,15 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     multifd_recv_state->count = 0;
-    for (i = 0; i < thread_count; i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem, 0);
-        p->quit = false;
-        p->id = i;
-        p->name = g_strdup_printf("multifdrecv_%d", i);
-        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                           QEMU_THREAD_JOINABLE);
-        multifd_recv_state->count++;
-    }
+    multifd_recv_state->quit = false;
     return 0;
 }
 
+int multifd_created_channels(void)
+{
+    return multifd_recv_state->count;
+}
+
 /**
  * save_page_header: write page header to wire
  *
diff --git a/migration/ram.h b/migration/ram.h
index 4a72d66503..5221bc9beb 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -31,6 +31,7 @@
 
 #include "qemu-common.h"
 #include "exec/cpu-common.h"
+#include "io/channel.h"
 
 extern MigrationStats ram_counters;
 extern XBZRLECacheStats xbzrle_counters;
@@ -43,6 +44,8 @@ int multifd_save_setup(void);
 int multifd_save_cleanup(Error **errp);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
+void multifd_new_channel(QIOChannel *ioc);
+int multifd_created_channels(void);
 
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
diff --git a/migration/socket.c b/migration/socket.c
index 2d70747a1a..22fb05edc8 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -26,6 +26,34 @@
 #include "io/channel-socket.h"
 #include "trace.h"
 
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+    /* Remove channel */
+    object_unref(OBJECT(recv));
+    return 0;
+}
+
+struct SocketOutgoingArgs {
+    SocketAddress *saddr;
+} outgoing_args;
+
+void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
+                                     f, data, NULL);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    /* Remove channel */
+    object_unref(OBJECT(send));
+    if (outgoing_args.saddr) {
+        qapi_free_SocketAddress(outgoing_args.saddr);
+        outgoing_args.saddr = NULL;
+    }
+    return 0;
+}
 
 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
@@ -95,6 +123,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
 
     data->s = s;
+
+    /* in case previous migration leaked it */
+    qapi_free_SocketAddress(outgoing_args.saddr);
+    outgoing_args.saddr = saddr;
+
     if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
         data->hostname = g_strdup(saddr->u.inet.host);
     }
@@ -105,7 +138,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free);
-    qapi_free_SocketAddress(saddr);
 }
 
 void tcp_start_outgoing_migration(MigrationState *s,
diff --git a/migration/socket.h b/migration/socket.h
index 6b91e9db38..afb0ff0f51 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -16,6 +16,16 @@
 
 #ifndef QEMU_MIGRATION_SOCKET_H
 #define QEMU_MIGRATION_SOCKET_H
+
+#include "io/channel.h"
+#include "io/task.h"
+
+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+
+void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data);
+int socket_send_channel_destroy(QIOChannel *send);
+
 void tcp_start_incoming_migration(const char *host_port, Error **errp);
 
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 05/12] migration: Create ram_multifd_page
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (3 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-09 13:08   ` Paolo Bonzini
  2017-10-16 19:43   ` Dr. David Alan Gilbert
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 06/12] migration: Send the fd number which we are going to use for this page Juan Quintela
                   ` (6 subsequent siblings)
  11 siblings, 2 replies; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter and a new flag for this type of pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Add last_page parameter
Add commets for done and address
Remove multifd field, it is the same than normal pages
Merge next patch, now we send multiple pages at a time
Remove counter for multifd pages, it is identical to normal pages
Use iovec's instead of creating the equivalent.
Clear memory used by pages (dave)
Use g_new0(danp)
define MULTIFD_CONTINUE
---
 migration/ram.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 131 insertions(+), 1 deletion(-)

diff --git a/migration/ram.c b/migration/ram.c
index b57006594b..c0af538f5f 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -50,6 +50,7 @@
 #include "migration/block.h"
 #include "sysemu/sysemu.h"
 #include "qemu/uuid.h"
+#include "qemu/iov.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -69,6 +70,7 @@
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
+#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
 
 static inline bool is_zero_range(uint8_t *p, uint64_t size)
 {
@@ -362,14 +364,29 @@ static void compress_threads_save_setup(void)
 
 /* Multiple fd's */
 
+/* used to continue on the same multifd group */
+#define MULTIFD_CONTINUE UINT16_MAX
+
+typedef struct {
+    int num;
+    size_t size;
+    struct iovec *iov;
+} multifd_pages_t;
+
 struct MultiFDSendParams {
+    /* not changed */
     uint8_t id;
     char *name;
     QemuThread thread;
     QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
+    /* protected by param mutex */
     bool quit;
+    multifd_pages_t pages;
+    /* protected by multifd mutex */
+    /* has the thread finish the last submitted job */
+    bool done;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;
 
@@ -377,8 +394,26 @@ struct {
     MultiFDSendParams *params;
     /* number of created threads */
     int count;
+    QemuMutex mutex;
+    QemuSemaphore sem;
+    multifd_pages_t pages;
 } *multifd_send_state;
 
+static void multifd_init_pages(multifd_pages_t *pages)
+{
+    pages->num = 0;
+    pages->size = migrate_multifd_page_count();
+    pages->iov = g_new0(struct iovec, pages->size);
+}
+
+static void multifd_clear_pages(multifd_pages_t *pages)
+{
+    pages->num = 0;
+    pages->size = 0;
+    g_free(pages->iov);
+    pages->iov = NULL;
+}
+
 static void terminate_multifd_send_threads(Error *errp)
 {
     int i;
@@ -417,9 +452,11 @@ int multifd_save_cleanup(Error **errp)
         socket_send_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
+        multifd_clear_pages(&p->pages);
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
+    multifd_clear_pages(&multifd_send_state->pages);
     g_free(multifd_send_state);
     multifd_send_state = NULL;
     return ret;
@@ -446,6 +483,7 @@ static void *multifd_send_thread(void *opaque)
         terminate_multifd_send_threads(local_err);
         return NULL;
     }
+    qemu_sem_post(&multifd_send_state->sem);
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -453,6 +491,15 @@ static void *multifd_send_thread(void *opaque)
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+        if (p->pages.num) {
+            p->pages.num = 0;
+            qemu_mutex_unlock(&p->mutex);
+            qemu_mutex_lock(&multifd_send_state->mutex);
+            p->done = true;
+            qemu_mutex_unlock(&multifd_send_state->mutex);
+            qemu_sem_post(&multifd_send_state->sem);
+            continue;
+        }
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_wait(&p->sem);
     }
@@ -493,6 +540,9 @@ int multifd_save_setup(void)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     multifd_send_state->count = 0;
+    qemu_mutex_init(&multifd_send_state->mutex);
+    qemu_sem_init(&multifd_send_state->sem, 0);
+    multifd_init_pages(&multifd_send_state->pages);
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -500,12 +550,52 @@ int multifd_save_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        p->done = true;
+        multifd_init_pages(&p->pages);
         p->name = g_strdup_printf("multifdsend_%d", i);
         socket_send_channel_create(multifd_new_channel_async, p);
     }
     return 0;
 }
 
+static uint16_t multifd_send_page(uint8_t *address, bool last_page)
+{
+    int i;
+    MultiFDSendParams *p = NULL; /* make happy gcc */
+    multifd_pages_t *pages = &multifd_send_state->pages;
+
+    pages->iov[pages->num].iov_base = address;
+    pages->iov[pages->num].iov_len = TARGET_PAGE_SIZE;
+    pages->num++;
+
+    if (!last_page) {
+        if (pages->num < (pages->size - 1)) {
+            return MULTIFD_CONTINUE;
+        }
+    }
+
+    qemu_sem_wait(&multifd_send_state->sem);
+    qemu_mutex_lock(&multifd_send_state->mutex);
+    for (i = 0; i < multifd_send_state->count; i++) {
+        p = &multifd_send_state->params[i];
+
+        if (p->done) {
+            p->done = false;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&multifd_send_state->mutex);
+    qemu_mutex_lock(&p->mutex);
+    p->pages.num = pages->num;
+    iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0,
+             iov_size(pages->iov, pages->num));
+    pages->num = 0;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+
+    return 0;
+}
+
 struct MultiFDRecvParams {
     uint8_t id;
     char *name;
@@ -1086,6 +1176,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
+static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
+                            bool last_stage)
+{
+    int pages;
+    uint8_t *p;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
+
+    p = block->host + offset;
+
+    pages = save_zero_page(rs, block, offset, p);
+    if (pages == -1) {
+        ram_counters.transferred +=
+            save_page_header(rs, rs->f, block,
+                             offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
+        multifd_send_page(p, rs->migration_dirty_pages == 1);
+        ram_counters.transferred += TARGET_PAGE_SIZE;
+        pages = 1;
+        ram_counters.normal++;
+    }
+
+    return pages;
+}
+
 static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
                                 ram_addr_t offset)
 {
@@ -1514,6 +1629,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
         if (migrate_use_compression() &&
             (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
             res = ram_save_compressed_page(rs, pss, last_stage);
+        } else if (migrate_use_multifd()) {
+            res = ram_multifd_page(rs, pss, last_stage);
         } else {
             res = ram_save_page(rs, pss, last_stage);
         }
@@ -2810,6 +2927,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     if (!migrate_use_compression()) {
         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
     }
+
+    if (!migrate_use_multifd()) {
+        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
+    }
     /* This RCU critical section can be very long running.
      * When RCU reclaims in the code start to become numerous,
      * it will be necessary to reduce the granularity of this
@@ -2834,13 +2955,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
                 error_report("Received an unexpected compressed page");
             }
+            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
+                error_report("Received an unexpected multifd page");
+            }
 
             ret = -EINVAL;
             break;
         }
 
         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
-                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
+                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
+                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
             RAMBlock *block = ram_block_from_stream(f, flags);
 
             host = host_from_ram_block_offset(block, addr);
@@ -2928,6 +3053,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
                 break;
             }
             break;
+
+        case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
+            break;
+
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
             break;
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 06/12] migration: Send the fd number which we are going to use for this page
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (4 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 05/12] migration: Create ram_multifd_page Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 07/12] migration: Create thread infrastructure for multifd recv side Juan Quintela
                   ` (5 subsequent siblings)
  11 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We are still sending the page through the main channel, that would
change later in the series

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/ram.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index c0af538f5f..288201e360 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -593,7 +593,7 @@ static uint16_t multifd_send_page(uint8_t *address, bool last_page)
     qemu_mutex_unlock(&p->mutex);
     qemu_sem_post(&p->sem);
 
-    return 0;
+    return i;
 }
 
 struct MultiFDRecvParams {
@@ -1180,6 +1180,7 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
                             bool last_stage)
 {
     int pages;
+    uint16_t fd_num;
     uint8_t *p;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
@@ -1191,8 +1192,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
         ram_counters.transferred +=
             save_page_header(rs, rs->f, block,
                              offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+        fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
+        qemu_put_be16(rs->f, fd_num);
+        ram_counters.transferred += 2; /* size of fd_num */
         qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
-        multifd_send_page(p, rs->migration_dirty_pages == 1);
         ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
         ram_counters.normal++;
@@ -2945,6 +2948,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
     while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
         ram_addr_t addr, total_ram_bytes;
         void *host = NULL;
+        uint16_t fd_num;
         uint8_t ch;
 
         addr = qemu_get_be64(f);
@@ -3055,6 +3059,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
 
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
+            fd_num = qemu_get_be16(f);
+            if (fd_num != 0) {
+                /* this is yet an unused variable, changed later */
+                fd_num = fd_num;
+            }
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 07/12] migration: Create thread infrastructure for multifd recv side
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (5 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 06/12] migration: Send the fd number which we are going to use for this page Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-17 11:07   ` Dr. David Alan Gilbert
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 08/12] migration: Test new fd infrastructure Juan Quintela
                   ` (4 subsequent siblings)
  11 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We make the locking and the transfer of information specific, even if we
are still receiving things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

We split when we create the main channel and where we start the main
migration thread, so we wait for the creation of the other threads.

Use multifd_clear_pages().
Don't remove object_unref()
We use correctly the channel numbres
---
 migration/migration.c |  7 +++---
 migration/migration.h |  1 +
 migration/ram.c       | 60 +++++++++++++++++++++++++++++++++++++++++++++++----
 migration/socket.c    |  3 +++
 4 files changed, 64 insertions(+), 7 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index ee98c50d8c..1e7c537954 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -388,7 +388,7 @@ static void migration_incoming_setup(QEMUFile *f)
     qemu_file_set_blocking(f, false);
 }
 
-static void migration_incoming_process(void)
+void migration_incoming_process(void)
 {
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     qemu_coroutine_enter(co);
@@ -406,9 +406,10 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
 
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
-        migration_fd_process_incoming(f);
+        migration_incoming_setup(f);
+        return;
     }
-    /* We still only have a single channel.  Nothing to do here yet */
+    multifd_new_channel(ioc);
 }
 
 /**
diff --git a/migration/migration.h b/migration/migration.h
index cc196cc87f..a3db60a2a1 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -158,6 +158,7 @@ void migrate_set_state(int *state, int old_state, int new_state);
 
 void migration_fd_process_incoming(QEMUFile *f);
 void migration_ioc_process_incoming(QIOChannel *ioc);
+void migration_incoming_process(void);
 
 bool  migration_has_all_channels(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 288201e360..745da2971d 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -597,13 +597,18 @@ static uint16_t multifd_send_page(uint8_t *address, bool last_page)
 }
 
 struct MultiFDRecvParams {
+    /* not changed */
     uint8_t id;
     char *name;
     QemuThread thread;
     QIOChannel *c;
+    QemuSemaphore ready;
     QemuSemaphore sem;
     QemuMutex mutex;
+    /* proteced by param mutex */
     bool quit;
+    multifd_pages_t pages;
+    bool done;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;
 
@@ -613,6 +618,7 @@ struct {
     int count;
     /* Should we finish */
     bool quit;
+    multifd_pages_t pages;
 } *multifd_recv_state;
 
 static void terminate_multifd_recv_threads(Error *errp)
@@ -634,6 +640,7 @@ static void terminate_multifd_recv_threads(Error *errp)
         p->quit = true;
         qemu_sem_post(&p->sem);
         qemu_mutex_unlock(&p->mutex);
+        multifd_clear_pages(&p->pages);
     }
 }
 
@@ -658,6 +665,7 @@ int multifd_load_cleanup(Error **errp)
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
+    multifd_clear_pages(&multifd_recv_state->pages);
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 
@@ -668,12 +676,20 @@ static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
 
+    qemu_sem_post(&p->ready);
     while (true) {
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+        if (p->pages.num) {
+            p->pages.num = 0;
+            p->done = true;
+            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&p->ready);
+            continue;
+        }
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_wait(&p->sem);
     }
@@ -714,13 +730,21 @@ void multifd_new_channel(QIOChannel *ioc)
     }
     qemu_mutex_init(&p->mutex);
     qemu_sem_init(&p->sem, 0);
+    qemu_sem_init(&p->ready, 0);
     p->quit = false;
     p->id = msg.id;
+    p->done = false;
+    multifd_init_pages(&p->pages);
     p->c = ioc;
     multifd_recv_state->count++;
     p->name = g_strdup_printf("multifdrecv_%d", msg.id);
+    object_ref(OBJECT(ioc));
+
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                        QEMU_THREAD_JOINABLE);
+    if (multifd_recv_state->count == migrate_multifd_channels()) {
+        migration_incoming_process();
+    }
 }
 
 int multifd_load_setup(void)
@@ -735,6 +759,7 @@ int multifd_load_setup(void)
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     multifd_recv_state->count = 0;
     multifd_recv_state->quit = false;
+    multifd_init_pages(&multifd_recv_state->pages);
     return 0;
 }
 
@@ -743,6 +768,36 @@ int multifd_created_channels(void)
     return multifd_recv_state->count;
 }
 
+static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
+{
+    int thread_count;
+    MultiFDRecvParams *p;
+    multifd_pages_t *pages = &multifd_recv_state->pages;
+
+    pages->iov[pages->num].iov_base = address;
+    pages->iov[pages->num].iov_len = TARGET_PAGE_SIZE;
+    pages->num++;
+
+    if (fd_num == MULTIFD_CONTINUE) {
+        return;
+    }
+
+    thread_count = migrate_multifd_channels();
+    assert(fd_num < thread_count);
+    p = &multifd_recv_state->params[fd_num];
+
+    qemu_sem_wait(&p->ready);
+
+    qemu_mutex_lock(&p->mutex);
+    p->done = false;
+    iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0,
+             iov_size(pages->iov, pages->num));
+    p->pages.num = pages->num;
+    pages->num = 0;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+}
+
 /**
  * save_page_header: write page header to wire
  *
@@ -3060,10 +3115,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
 
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
-            if (fd_num != 0) {
-                /* this is yet an unused variable, changed later */
-                fd_num = fd_num;
-            }
+            multifd_recv_page(host, fd_num);
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
diff --git a/migration/socket.c b/migration/socket.c
index 22fb05edc8..debe972ee8 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -186,6 +186,9 @@ out:
     if (migration_has_all_channels()) {
         /* Close listening socket as its no longer needed */
         qio_channel_close(ioc, NULL);
+        if (!migrate_use_multifd()) {
+            migration_incoming_process();
+        }
         return G_SOURCE_REMOVE;
     } else {
         return G_SOURCE_CONTINUE;
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 08/12] migration: Test new fd infrastructure
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (6 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 07/12] migration: Create thread infrastructure for multifd recv side Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-17 11:11   ` Dr. David Alan Gilbert
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 09/12] migration: Rename initial_bytes Juan Quintela
                   ` (3 subsequent siblings)
  11 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We just send the address through the alternate channels and test that it
is ok.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Use qio_channel_*all functions
---
 migration/ram.c | 39 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 39 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 745da2971d..4c16d0775b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -492,8 +492,24 @@ static void *multifd_send_thread(void *opaque)
             break;
         }
         if (p->pages.num) {
+            Error *local_err = NULL;
+            size_t ret;
+            int i;
+            int num;
+
+            num = p->pages.num;
             p->pages.num = 0;
             qemu_mutex_unlock(&p->mutex);
+
+            for (i = 0; i < num; i++) {
+                ret = qio_channel_write_all(p->c,
+                         (const char *)&p->pages.iov[i].iov_base,
+                         sizeof(uint8_t *), &local_err);
+                if (ret != 0) {
+                    terminate_multifd_send_threads(local_err);
+                    return NULL;
+                }
+            }
             qemu_mutex_lock(&multifd_send_state->mutex);
             p->done = true;
             qemu_mutex_unlock(&multifd_send_state->mutex);
@@ -675,6 +691,7 @@ int multifd_load_cleanup(Error **errp)
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
+    uint8_t *recv_address;
 
     qemu_sem_post(&p->ready);
     while (true) {
@@ -684,7 +701,29 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
         if (p->pages.num) {
+            Error *local_err = NULL;
+            size_t ret;
+            int i;
+            int num;
+
+            num = p->pages.num;
             p->pages.num = 0;
+
+            for (i = 0; i < num; i++) {
+                ret = qio_channel_read_all(p->c, (char *)&recv_address,
+                                           sizeof(uint8_t *), &local_err);
+                if (ret != 0) {
+                    terminate_multifd_recv_threads(local_err);
+                    return NULL;
+                }
+                if (recv_address != p->pages.iov[i].iov_base) {
+                    error_setg(&local_err, "received %p and expecting %p (%d)",
+                               recv_address, p->pages.iov[i].iov_base, i);
+                    terminate_multifd_recv_threads(local_err);
+                    return NULL;
+                }
+            }
+
             p->done = true;
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 09/12] migration: Rename initial_bytes
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (7 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 08/12] migration: Test new fd infrastructure Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 10/12] migration: Transfer pages over new channels Juan Quintela
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Now it is called qemu_file_bytes that reflects better what it does,
and we create qemu_file_bytes_now to not have to call qemu_ftell() twice.

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 migration/migration.c | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 1e7c537954..54ef095d82 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2078,13 +2078,13 @@ static void *migration_thread(void *opaque)
     /* Used by the bandwidth calcs, updated later */
     int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
-    int64_t initial_bytes = 0;
     /*
      * The final stage happens when the remaining data is smaller than
      * this threshold; it's calculated from the requested downtime and
      * measured bandwidth
      */
     int64_t threshold_size = 0;
+    int64_t qemu_file_bytes = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
     bool old_vm_running = false;
@@ -2172,8 +2172,9 @@ static void *migration_thread(void *opaque)
         }
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
-            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
-                                         initial_bytes;
+            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t transferred_bytes =
+                qemu_file_bytes_now - qemu_file_bytes;
             uint64_t time_spent = current_time - initial_time;
             double bandwidth = (double)transferred_bytes / time_spent;
             threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2192,7 +2193,7 @@ static void *migration_thread(void *opaque)
 
             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
-            initial_bytes = qemu_ftell(s->to_dst_file);
+            qemu_file_bytes = qemu_file_bytes_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 10/12] migration: Transfer pages over new channels
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (8 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 09/12] migration: Rename initial_bytes Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-17 14:18   ` Dr. David Alan Gilbert
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 11/12] migration: Flush receive queue Juan Quintela
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 12/12] migration: Add multifd test Juan Quintela
  11 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We switch for sending the page number to send real pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Remove the HACK bit, now we have the function that calculates the size
of a page exported.
Rename multifd_pages{_now}, to sent pages
Remove multifd pages field, it is the same than normal pages
---
 migration/migration.c |  7 ++++++-
 migration/ram.c       | 39 +++++++++++----------------------------
 2 files changed, 17 insertions(+), 29 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 54ef095d82..1bd87a4e44 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2085,6 +2085,7 @@ static void *migration_thread(void *opaque)
      */
     int64_t threshold_size = 0;
     int64_t qemu_file_bytes = 0;
+    int64_t sent_pages = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
     bool old_vm_running = false;
@@ -2173,8 +2174,11 @@ static void *migration_thread(void *opaque)
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
             uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t sent_pages_now = ram_counters.normal;
             uint64_t transferred_bytes =
-                qemu_file_bytes_now - qemu_file_bytes;
+                (qemu_file_bytes_now - qemu_file_bytes) +
+                (sent_pages_now - sent_pages) *
+                qemu_target_page_size();
             uint64_t time_spent = current_time - initial_time;
             double bandwidth = (double)transferred_bytes / time_spent;
             threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2194,6 +2198,7 @@ static void *migration_thread(void *opaque)
             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
             qemu_file_bytes = qemu_file_bytes_now;
+            sent_pages = sent_pages_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
diff --git a/migration/ram.c b/migration/ram.c
index 4c16d0775b..981f345294 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -494,21 +494,15 @@ static void *multifd_send_thread(void *opaque)
         if (p->pages.num) {
             Error *local_err = NULL;
             size_t ret;
-            int i;
             int num;
 
             num = p->pages.num;
             p->pages.num = 0;
             qemu_mutex_unlock(&p->mutex);
-
-            for (i = 0; i < num; i++) {
-                ret = qio_channel_write_all(p->c,
-                         (const char *)&p->pages.iov[i].iov_base,
-                         sizeof(uint8_t *), &local_err);
-                if (ret != 0) {
-                    terminate_multifd_send_threads(local_err);
-                    return NULL;
-                }
+            ret = qio_channel_writev_all(p->c, p->pages.iov, num, &local_err);
+            if (ret != 0) {
+                terminate_multifd_send_threads(local_err);
+                return NULL;
             }
             qemu_mutex_lock(&multifd_send_state->mutex);
             p->done = true;
@@ -691,7 +685,6 @@ int multifd_load_cleanup(Error **errp)
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
-    uint8_t *recv_address;
 
     qemu_sem_post(&p->ready);
     while (true) {
@@ -703,27 +696,16 @@ static void *multifd_recv_thread(void *opaque)
         if (p->pages.num) {
             Error *local_err = NULL;
             size_t ret;
-            int i;
             int num;
 
             num = p->pages.num;
             p->pages.num = 0;
 
-            for (i = 0; i < num; i++) {
-                ret = qio_channel_read_all(p->c, (char *)&recv_address,
-                                           sizeof(uint8_t *), &local_err);
-                if (ret != 0) {
-                    terminate_multifd_recv_threads(local_err);
-                    return NULL;
-                }
-                if (recv_address != p->pages.iov[i].iov_base) {
-                    error_setg(&local_err, "received %p and expecting %p (%d)",
-                               recv_address, p->pages.iov[i].iov_base, i);
-                    terminate_multifd_recv_threads(local_err);
-                    return NULL;
-                }
+            ret = qio_channel_readv_all(p->c, p->pages.iov, num, &local_err);
+            if (ret != 0) {
+                terminate_multifd_recv_threads(local_err);
+                return NULL;
             }
-
             p->done = true;
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
@@ -1288,8 +1270,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
                              offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
         qemu_put_be16(rs->f, fd_num);
+        if (fd_num != MULTIFD_CONTINUE) {
+            qemu_fflush(rs->f);
+        }
         ram_counters.transferred += 2; /* size of fd_num */
-        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
         ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
         ram_counters.normal++;
@@ -3155,7 +3139,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
             multifd_recv_page(host, fd_num);
-            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
         case RAM_SAVE_FLAG_EOS:
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 11/12] migration: Flush receive queue
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (9 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 10/12] migration: Transfer pages over new channels Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-17 14:51   ` Dr. David Alan Gilbert
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 12/12] migration: Add multifd test Juan Quintela
  11 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

Each time that we sync the bitmap, it is a possiblity that we receive
a page that is being processed by a different thread.  We fix this
problem just making sure that we wait for all receiving threads to
finish its work before we procedeed with the next stage.

We are low on page flags, so we use a combination that is not valid to
emit that message:  MULTIFD_PAGE and COMPRESSED.

I tried to make a migration command for it, but it don't work because
we sync the bitmap sometimes when we have already sent the beggining
of the section, so I just added a new page flag.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Create RAM_SAVE_FLAG_MULTIFD_SYNC (dave suggestion)
Move the set of need_flush to inside the bitmap_sync code (peter suggestion)
---
 migration/ram.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/migration/ram.c b/migration/ram.c
index 981f345294..d717776f32 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -72,6 +72,14 @@
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
 
+/* We are getting low on pages flags, so we start using combinations
+   When we need to flush a page, we sent it as
+   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
+   We don't allow that combination
+*/
+#define RAM_SAVE_FLAG_MULTIFD_SYNC \
+    (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)
+
 static inline bool is_zero_range(uint8_t *p, uint64_t size)
 {
     return buffer_is_zero(p, size);
@@ -194,6 +202,9 @@ struct RAMState {
     uint64_t iterations_prev;
     /* Iterations since start */
     uint64_t iterations;
+    /* Indicates if we have synced the bitmap and we need to assure that
+       target has processeed all previous pages */
+    bool multifd_needs_flush;
     /* number of dirty bits in the bitmap */
     uint64_t migration_dirty_pages;
     /* protects modification of the bitmap */
@@ -614,9 +625,11 @@ struct MultiFDRecvParams {
     QIOChannel *c;
     QemuSemaphore ready;
     QemuSemaphore sem;
+    QemuCond cond_sync;
     QemuMutex mutex;
     /* proteced by param mutex */
     bool quit;
+    bool sync;
     multifd_pages_t pages;
     bool done;
 };
@@ -669,6 +682,7 @@ int multifd_load_cleanup(Error **errp)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_cond_destroy(&p->cond_sync);
         socket_recv_channel_destroy(p->c);
         g_free(p->name);
         p->name = NULL;
@@ -707,6 +721,10 @@ static void *multifd_recv_thread(void *opaque)
                 return NULL;
             }
             p->done = true;
+            if (p->sync) {
+                qemu_cond_signal(&p->cond_sync);
+                p->sync = false;
+            }
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
             continue;
@@ -752,9 +770,11 @@ void multifd_new_channel(QIOChannel *ioc)
     qemu_mutex_init(&p->mutex);
     qemu_sem_init(&p->sem, 0);
     qemu_sem_init(&p->ready, 0);
+    qemu_cond_init(&p->cond_sync);
     p->quit = false;
     p->id = msg.id;
     p->done = false;
+    p->sync = false;
     multifd_init_pages(&p->pages);
     p->c = ioc;
     multifd_recv_state->count++;
@@ -819,6 +839,27 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
     qemu_sem_post(&p->sem);
 }
 
+static int multifd_flush(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_channels();
+    for (i = 0; i < thread_count; i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        while (!p->done) {
+            p->sync = true;
+            qemu_cond_wait(&p->cond_sync, &p->mutex);
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    return 0;
+}
+
 /**
  * save_page_header: write page header to wire
  *
@@ -836,6 +877,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
 {
     size_t size, len;
 
+    if (rs->multifd_needs_flush &&
+        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
+        offset |= RAM_SAVE_FLAG_ZERO;
+        rs->multifd_needs_flush = false;
+    }
+
     if (block == rs->last_sent_block) {
         offset |= RAM_SAVE_FLAG_CONTINUE;
     }
@@ -1124,6 +1171,9 @@ static void migration_bitmap_sync(RAMState *rs)
     if (migrate_use_events()) {
         qapi_event_send_migration_pass(ram_counters.dirty_sync_count, NULL);
     }
+    if (!rs->ram_bulk_stage && migrate_use_multifd()) {
+        rs->multifd_needs_flush = true;
+    }
 }
 
 /**
@@ -3045,6 +3095,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         }
 
+        if ((flags & RAM_SAVE_FLAG_MULTIFD_SYNC)
+            == RAM_SAVE_FLAG_MULTIFD_SYNC) {
+            multifd_flush();
+            flags = flags & ~RAM_SAVE_FLAG_ZERO;
+        }
         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
                      RAM_SAVE_FLAG_MULTIFD_PAGE)) {
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* [Qemu-devel] [PATCH v9 12/12] migration: Add multifd test
  2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
                   ` (10 preceding siblings ...)
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 11/12] migration: Flush receive queue Juan Quintela
@ 2017-10-04 10:46 ` Juan Quintela
  2017-10-17 15:27   ` Dr. David Alan Gilbert
  11 siblings, 1 reply; 35+ messages in thread
From: Juan Quintela @ 2017-10-04 10:46 UTC (permalink / raw)
  To: qemu-devel; +Cc: dgilbert, lvivier, peterx

We set the x-multifd-page-count and x-multifd-channels.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 tests/Makefile.include |   3 +
 tests/multifd-test.c   | 511 +++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 514 insertions(+)
 create mode 100644 tests/multifd-test.c

diff --git a/tests/Makefile.include b/tests/Makefile.include
index d23ca1ed1c..69e3fd143d 100644
--- a/tests/Makefile.include
+++ b/tests/Makefile.include
@@ -289,6 +289,7 @@ check-qtest-i386-$(CONFIG_POSIX) += tests/test-filter-mirror$(EXESUF)
 check-qtest-i386-$(CONFIG_POSIX) += tests/test-filter-redirector$(EXESUF)
 check-qtest-i386-y += tests/migration-test$(EXESUF)
 check-qtest-i386-y += tests/postcopy-test$(EXESUF)
+check-qtest-i386-y += tests/multifd-test$(EXESUF)
 check-qtest-i386-y += tests/test-x86-cpuid-compat$(EXESUF)
 check-qtest-i386-y += tests/numa-test$(EXESUF)
 check-qtest-x86_64-y += $(check-qtest-i386-y)
@@ -318,6 +319,7 @@ check-qtest-ppc64-y += tests/pnv-xscom-test$(EXESUF)
 check-qtest-ppc64-y += tests/drive_del-test$(EXESUF)
 check-qtest-ppc64-y += tests/postcopy-test$(EXESUF)
 check-qtest-ppc64-y += tests/migration-test$(EXESUF)
+check-qtest-ppc64-y += tests/multifd-test$(EXESUF)
 check-qtest-ppc64-y += tests/boot-serial-test$(EXESUF)
 check-qtest-ppc64-y += tests/rtas-test$(EXESUF)
 check-qtest-ppc64-$(CONFIG_SLIRP) += tests/pxe-test$(EXESUF)
@@ -785,6 +787,7 @@ tests/usb-hcd-xhci-test$(EXESUF): tests/usb-hcd-xhci-test.o $(libqos-usb-obj-y)
 tests/pc-cpu-test$(EXESUF): tests/pc-cpu-test.o
 tests/postcopy-test$(EXESUF): tests/postcopy-test.o
 tests/migration-test$(EXESUF): tests/migration-test.o
+tests/multifd-test$(EXESUF): tests/multifd-test.o
 tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o $(test-util-obj-y) \
 	$(qtest-obj-y) $(test-io-obj-y) $(libqos-virtio-obj-y) $(libqos-pc-obj-y) \
 	$(chardev-obj-y)
diff --git a/tests/multifd-test.c b/tests/multifd-test.c
new file mode 100644
index 0000000000..2b90f8f7bd
--- /dev/null
+++ b/tests/multifd-test.c
@@ -0,0 +1,511 @@
+/*
+ * QTest testcase for multifd
+ *
+ * Copyright (c) 2017 Red Hat, Inc. and/or its affiliates
+ *   based on the postcopy-test.c which is based
+ *   based on the vhost-user-test.c that is:
+ *      Copyright (c) 2014 Virtual Open Systems Sarl.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu/osdep.h"
+
+#include "libqtest.h"
+#include "qemu/option.h"
+#include "qemu/range.h"
+#include "qemu/sockets.h"
+#include "chardev/char.h"
+#include "sysemu/sysemu.h"
+#include "hw/nvram/chrp_nvram.h"
+
+#define MIN_NVRAM_SIZE 8192 /* from spapr_nvram.c */
+
+const unsigned start_address = 1024 * 1024;
+const unsigned end_address = 100 * 1024 * 1024;
+bool got_stop;
+
+#if defined(__linux__)
+#include <sys/syscall.h>
+#include <sys/vfs.h>
+#endif
+
+static const char *tmpfs;
+
+/* A simple PC boot sector that modifies memory (1-100MB) quickly
+ * outputing a 'B' every so often if it's still running.
+ */
+unsigned char bootsect[] = {
+  0xfa, 0x0f, 0x01, 0x16, 0x74, 0x7c, 0x66, 0xb8, 0x01, 0x00, 0x00, 0x00,
+  0x0f, 0x22, 0xc0, 0x66, 0xea, 0x20, 0x7c, 0x00, 0x00, 0x08, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xe4, 0x92, 0x0c, 0x02,
+  0xe6, 0x92, 0xb8, 0x10, 0x00, 0x00, 0x00, 0x8e, 0xd8, 0x66, 0xb8, 0x41,
+  0x00, 0x66, 0xba, 0xf8, 0x03, 0xee, 0xb3, 0x00, 0xb8, 0x00, 0x00, 0x10,
+  0x00, 0xfe, 0x00, 0x05, 0x00, 0x10, 0x00, 0x00, 0x3d, 0x00, 0x00, 0x40,
+  0x06, 0x7c, 0xf2, 0xfe, 0xc3, 0x75, 0xe9, 0x66, 0xb8, 0x42, 0x00, 0x66,
+  0xba, 0xf8, 0x03, 0xee, 0xeb, 0xde, 0x66, 0x90, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x9a, 0xcf, 0x00,
+  0xff, 0xff, 0x00, 0x00, 0x00, 0x92, 0xcf, 0x00, 0x27, 0x00, 0x5c, 0x7c,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0xaa
+};
+
+static void init_bootfile_x86(const char *bootpath)
+{
+    FILE *bootfile = fopen(bootpath, "wb");
+
+    g_assert_cmpint(fwrite(bootsect, 512, 1, bootfile), ==, 1);
+    fclose(bootfile);
+}
+
+static void init_bootfile_ppc(const char *bootpath)
+{
+    FILE *bootfile;
+    char buf[MIN_NVRAM_SIZE];
+    ChrpNvramPartHdr *header = (ChrpNvramPartHdr *)buf;
+
+    memset(buf, 0, MIN_NVRAM_SIZE);
+
+    /* Create a "common" partition in nvram to store boot-command property */
+
+    header->signature = CHRP_NVPART_SYSTEM;
+    memcpy(header->name, "common", 6);
+    chrp_nvram_finish_partition(header, MIN_NVRAM_SIZE);
+
+    /* FW_MAX_SIZE is 4MB, but slof.bin is only 900KB,
+     * so let's modify memory between 1MB and 100MB
+     * to do like PC bootsector
+     */
+
+    sprintf(buf + 16,
+            "boot-command=hex .\" _\" begin %x %x do i c@ 1 + i c! 1000 +loop "
+            ".\" B\" 0 until", end_address, start_address);
+
+    /* Write partition to the NVRAM file */
+
+    bootfile = fopen(bootpath, "wb");
+    g_assert_cmpint(fwrite(buf, MIN_NVRAM_SIZE, 1, bootfile), ==, 1);
+    fclose(bootfile);
+}
+
+/*
+ * Wait for some output in the serial output file,
+ * we get an 'A' followed by an endless string of 'B's
+ * but on the destination we won't have the A.
+ */
+static void wait_for_serial(const char *side)
+{
+    char *serialpath = g_strdup_printf("%s/%s", tmpfs, side);
+    FILE *serialfile = fopen(serialpath, "r");
+    const char *arch = qtest_get_arch();
+    int started = (strcmp(side, "src_serial") == 0 &&
+                   strcmp(arch, "ppc64") == 0) ? 0 : 1;
+
+    g_free(serialpath);
+    do {
+        int readvalue = fgetc(serialfile);
+
+        if (!started) {
+            /* SLOF prints its banner before starting test,
+             * to ignore it, mark the start of the test with '_',
+             * ignore all characters until this marker
+             */
+            switch (readvalue) {
+            case '_':
+                started = 1;
+                break;
+            case EOF:
+                fseek(serialfile, 0, SEEK_SET);
+                usleep(1000);
+                break;
+            }
+            continue;
+        }
+        switch (readvalue) {
+        case 'A':
+            /* Fine */
+            break;
+
+        case 'B':
+            /* It's alive! */
+            fclose(serialfile);
+            return;
+
+        case EOF:
+            started = (strcmp(side, "src_serial") == 0 &&
+                       strcmp(arch, "ppc64") == 0) ? 0 : 1;
+            fseek(serialfile, 0, SEEK_SET);
+            usleep(1000);
+            break;
+
+        default:
+            fprintf(stderr, "Unexpected %d on %s serial\n", readvalue, side);
+            g_assert_not_reached();
+        }
+    } while (true);
+}
+
+/*
+ * Events can get in the way of responses we are actually waiting for.
+ */
+static QDict *return_or_event(QDict *response)
+{
+    const char *event_string;
+    if (!qdict_haskey(response, "event")) {
+        return response;
+    }
+
+    /* OK, it was an event */
+    event_string = qdict_get_str(response, "event");
+    if (!strcmp(event_string, "STOP")) {
+        got_stop = true;
+    }
+    QDECREF(response);
+    return return_or_event(qtest_qmp_receive(global_qtest));
+}
+
+
+/*
+ * It's tricky to use qemu's migration event capability with qtest,
+ * events suddenly appearing confuse the qmp()/hmp() responses.
+ * so wait for a couple of passes to have happened before
+ * going postcopy.
+ */
+
+static uint64_t get_migration_pass(void)
+{
+    QDict *rsp, *rsp_return, *rsp_ram;
+    uint64_t result;
+
+    rsp = return_or_event(qmp("{ 'execute': 'query-migrate' }"));
+    rsp_return = qdict_get_qdict(rsp, "return");
+    if (!qdict_haskey(rsp_return, "ram")) {
+        /* Still in setup */
+        result = 0;
+    } else {
+        rsp_ram = qdict_get_qdict(rsp_return, "ram");
+        result = qdict_get_try_int(rsp_ram, "dirty-sync-count", 0);
+    }
+    QDECREF(rsp);
+    return result;
+}
+
+static void wait_for_migration_complete(void)
+{
+    QDict *rsp, *rsp_return;
+    bool completed;
+
+    do {
+        const char *status;
+
+        rsp = return_or_event(qmp("{ 'execute': 'query-migrate' }"));
+        rsp_return = qdict_get_qdict(rsp, "return");
+        status = qdict_get_str(rsp_return, "status");
+        completed = strcmp(status, "completed") == 0;
+        g_assert_cmpstr(status, !=,  "failed");
+        QDECREF(rsp);
+        usleep(1000 * 100);
+    } while (!completed);
+}
+
+static void wait_for_migration_pass(void)
+{
+    uint64_t initial_pass = get_migration_pass();
+    uint64_t pass;
+
+    /* Wait for the 1st sync */
+    do {
+        initial_pass = get_migration_pass();
+        if (got_stop || initial_pass) {
+            break;
+        }
+        usleep(1000 * 100);
+    } while (true);
+
+    do {
+        usleep(1000 * 100);
+        pass = get_migration_pass();
+    } while (pass == initial_pass && !got_stop);
+}
+
+static void check_guests_ram(void)
+{
+    /* Our ASM test will have been incrementing one byte from each page from
+     * 1MB to <100MB in order.
+     * This gives us a constraint that any page's byte should be equal or less
+     * than the previous pages byte (mod 256); and they should all be equal
+     * except for one transition at the point where we meet the incrementer.
+     * (We're running this with the guest stopped).
+     */
+    unsigned address;
+    uint8_t first_byte;
+    uint8_t last_byte;
+    bool hit_edge = false;
+    bool bad = false;
+
+    qtest_memread(global_qtest, start_address, &first_byte, 1);
+    last_byte = first_byte;
+
+    for (address = start_address + 4096; address < end_address; address += 4096)
+    {
+        uint8_t b;
+        qtest_memread(global_qtest, address, &b, 1);
+        if (b != last_byte) {
+            if (((b + 1) % 256) == last_byte && !hit_edge) {
+                /* This is OK, the guest stopped at the point of
+                 * incrementing the previous page but didn't get
+                 * to us yet.
+                 */
+                hit_edge = true;
+            } else {
+                fprintf(stderr, "Memory content inconsistency at %x"
+                                " first_byte = %x last_byte = %x current = %x"
+                                " hit_edge = %x\n",
+                                address, first_byte, last_byte, b, hit_edge);
+                bad = true;
+            }
+        }
+        last_byte = b;
+    }
+    g_assert_false(bad);
+}
+
+static void cleanup(const char *filename)
+{
+    char *path = g_strdup_printf("%s/%s", tmpfs, filename);
+
+    unlink(path);
+    g_free(path);
+}
+
+static void test_migrate(void)
+{
+    char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
+    QTestState *global = global_qtest, *from, *to;
+    unsigned char dest_byte_a, dest_byte_b, dest_byte_c, dest_byte_d;
+    gchar *cmd, *cmd_src, *cmd_dst;
+    QDict *rsp;
+
+    char *bootpath = g_strdup_printf("%s/bootsect", tmpfs);
+    const char *arch = qtest_get_arch();
+
+    got_stop = false;
+
+    if (strcmp(arch, "i386") == 0 || strcmp(arch, "x86_64") == 0) {
+        init_bootfile_x86(bootpath);
+        cmd_src = g_strdup_printf("-machine accel=kvm:tcg -m 150M"
+                                  " -name pcsource,debug-threads=on"
+                                  " -serial file:%s/src_serial"
+                                  " -drive file=%s,format=raw",
+                                  tmpfs, bootpath);
+        cmd_dst = g_strdup_printf("-machine accel=kvm:tcg -m 150M"
+                                  " -name pcdest,debug-threads=on"
+                                  " -serial file:%s/dest_serial"
+                                  " -drive file=%s,format=raw"
+                                  " -incoming %s",
+                                  tmpfs, bootpath, uri);
+    } else if (strcmp(arch, "ppc64") == 0) {
+        const char *accel;
+
+        /* On ppc64, the test only works with kvm-hv, but not with kvm-pr */
+        accel = access("/sys/module/kvm_hv", F_OK) ? "tcg" : "kvm:tcg";
+        init_bootfile_ppc(bootpath);
+        cmd_src = g_strdup_printf("-machine accel=%s -m 256M"
+                                  " -name pcsource,debug-threads=on"
+                                  " -serial file:%s/src_serial"
+                                  " -drive file=%s,if=pflash,format=raw",
+                                  accel, tmpfs, bootpath);
+        cmd_dst = g_strdup_printf("-machine accel=%s -m 256M"
+                                  " -name pcdest,debug-threads=on"
+                                  " -serial file:%s/dest_serial"
+                                  " -incoming %s",
+                                  accel, tmpfs, uri);
+    } else {
+        g_assert_not_reached();
+    }
+
+    g_free(bootpath);
+
+    from = qtest_start(cmd_src);
+    g_free(cmd_src);
+
+    to = qtest_init(cmd_dst);
+    g_free(cmd_dst);
+
+    global_qtest = from;
+    rsp = qmp("{ 'execute': 'migrate-set-capabilities',"
+                  "'arguments': { "
+                      "'capabilities': [ {"
+                          "'capability': 'x-multifd',"
+                          "'state': true } ] } }");
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+    global_qtest = to;
+    rsp = qmp("{ 'execute': 'migrate-set-capabilities',"
+                  "'arguments': { "
+                      "'capabilities': [ {"
+                          "'capability': 'x-multifd',"
+                          "'state': true } ] } }");
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+    /* We want to pick a speed slow enough that the test completes
+     * quickly, but that it doesn't complete precopy even on a slow
+     * machine, so also set the downtime.
+     */
+    global_qtest = from;
+    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
+              "'arguments': { 'max-bandwidth': 100000000 } }");
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+    /* 200ms downtime */
+    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
+              "'arguments': { 'downtime-limit': 300 } }");
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+    /* set 4 channels */
+    global_qtest = to;
+    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
+              "'arguments': { 'x-multifd-channels': 4 } }");
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
+              "'arguments': { 'x-multifd-page-count': 64 } }");
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+    /* set 4 channels */
+    global_qtest = from;
+    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
+              "'arguments': { 'x-multifd-channels': 4 } }");
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
+              "'arguments': { 'x-multifd-page-count': 64 } }");
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+
+    /* Wait for the first serial output from the source */
+    wait_for_serial("src_serial");
+
+    cmd = g_strdup_printf("{ 'execute': 'migrate',"
+                          "'arguments': { 'uri': '%s' } }",
+                          uri);
+    rsp = qmp(cmd);
+    g_free(cmd);
+    g_assert(qdict_haskey(rsp, "return"));
+    QDECREF(rsp);
+
+    wait_for_migration_pass();
+
+    if (!got_stop) {
+        qmp_eventwait("STOP");
+    }
+
+    global_qtest = to;
+    qmp_eventwait("RESUME");
+
+    wait_for_serial("dest_serial");
+    global_qtest = from;
+    wait_for_migration_complete();
+
+    qtest_quit(from);
+
+    global_qtest = to;
+
+    qtest_memread(to, start_address, &dest_byte_a, 1);
+
+    /* Destination still running, wait for a byte to change */
+    do {
+        qtest_memread(to, start_address, &dest_byte_b, 1);
+        usleep(10 * 1000);
+    } while (dest_byte_a == dest_byte_b);
+
+    qmp_discard_response("{ 'execute' : 'stop'}");
+    /* With it stopped, check nothing changes */
+    qtest_memread(to, start_address, &dest_byte_c, 1);
+    sleep(1);
+    qtest_memread(to, start_address, &dest_byte_d, 1);
+    g_assert_cmpint(dest_byte_c, ==, dest_byte_d);
+
+    check_guests_ram();
+
+    qtest_quit(to);
+    g_free(uri);
+
+    global_qtest = global;
+
+    cleanup("bootsect");
+    cleanup("migsocket");
+    cleanup("src_serial");
+    cleanup("dest_serial");
+}
+
+int main(int argc, char **argv)
+{
+    char template[] = "/tmp/multifd-test-XXXXXX";
+    int ret;
+
+    g_test_init(&argc, &argv, NULL);
+
+    tmpfs = mkdtemp(template);
+    if (!tmpfs) {
+        g_test_message("mkdtemp on path (%s): %s\n", template, strerror(errno));
+    }
+    g_assert(tmpfs);
+
+    module_call_init(MODULE_INIT_QOM);
+
+    qtest_add_func("/multifd", test_migrate);
+
+    ret = g_test_run();
+
+    g_assert_cmpint(ret, ==, 0);
+
+    ret = rmdir(tmpfs);
+    if (ret != 0) {
+        g_test_message("unable to rmdir: path (%s): %s\n",
+                       tmpfs, strerror(errno));
+    }
+
+    return ret;
+}
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling Juan Quintela
@ 2017-10-09  9:28   ` Peter Xu
  2017-10-16 17:34   ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 35+ messages in thread
From: Peter Xu @ 2017-10-09  9:28 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Oct 04, 2017 at 12:46:26PM +0200, Juan Quintela wrote:

[...]

> diff --git a/migration/tls.c b/migration/tls.c
> index 596e8790bd..026a008667 100644
> --- a/migration/tls.c
> +++ b/migration/tls.c
> @@ -119,7 +119,6 @@ static void migration_tls_outgoing_handshake(QIOTask *task,
>      if (qio_task_propagate_error(task, &err)) {
>          trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
>          migrate_fd_error(s, err);
> -        error_free(err);

Would err be leaked if this line is removed?

>      } else {
>          trace_migration_tls_outgoing_handshake_complete();
>          migration_channel_connect(s, ioc, NULL);
> -- 
> 2.13.5
> 

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 03/12] migration: Make migrate_fd_error() the owner of the Error
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 03/12] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
@ 2017-10-09  9:34   ` Peter Xu
  0 siblings, 0 replies; 35+ messages in thread
From: Peter Xu @ 2017-10-09  9:34 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Oct 04, 2017 at 12:46:27PM +0200, Juan Quintela wrote:
> So far, we had to free the error after each caller, so just do it
> here.  Once there, tls.c was leaking the error.

Ah I see the point of my previous question... I think the tls.c leak
was introduced by previous patch?  Shall we squash that line into this
patch instead?

After that, these two patches both look good to me.  Thanks,

> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/channel.c   |  1 -
>  migration/migration.c | 10 ++++------
>  migration/migration.h |  4 ++--
>  migration/socket.c    |  1 -
>  4 files changed, 6 insertions(+), 10 deletions(-)
> 
> diff --git a/migration/channel.c b/migration/channel.c
> index 70ec7ea3b7..1dd2ae1530 100644
> --- a/migration/channel.c
> +++ b/migration/channel.c
> @@ -71,7 +71,6 @@ void migration_channel_connect(MigrationState *s,
>          migration_tls_channel_connect(s, ioc, hostname, &local_err);
>          if (local_err) {
>              migrate_fd_error(s, local_err);
> -            error_free(local_err);
>          }
>      } else {
>          QEMUFile *f = qemu_fopen_channel_output(ioc);
> diff --git a/migration/migration.c b/migration/migration.c
> index 468f51cfa7..61b7e7105a 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1079,16 +1079,14 @@ static void migrate_fd_cleanup(void *opaque)
>      block_cleanup_parameters(s);
>  }
>  
> -void migrate_set_error(MigrationState *s, const Error *error)
> +void migrate_set_error(MigrationState *s, Error *error)
>  {
>      qemu_mutex_lock(&s->error_mutex);
> -    if (!s->error) {
> -        s->error = error_copy(error);
> -    }
> +    error_propagate(&s->error, error);
>      qemu_mutex_unlock(&s->error_mutex);
>  }
>  
> -void migrate_fd_error(MigrationState *s, const Error *error)
> +void migrate_fd_error(MigrationState *s, Error *error)
>  {
>      trace_migrate_fd_error(error_get_pretty(error));
>      assert(s->to_dst_file == NULL);
> @@ -1362,7 +1360,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
>      }
>  
>      if (local_err) {
> -        migrate_fd_error(s, local_err);
> +        migrate_fd_error(s, error_copy(local_err));
>          error_propagate(errp, local_err);
>          return;
>      }
> diff --git a/migration/migration.h b/migration/migration.h
> index 51c0ac2e71..cc196cc87f 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -163,8 +163,8 @@ bool  migration_has_all_channels(void);
>  
>  uint64_t migrate_max_downtime(void);
>  
> -void migrate_set_error(MigrationState *s, const Error *error);
> -void migrate_fd_error(MigrationState *s, const Error *error);
> +void migrate_set_error(MigrationState *s, Error *error);
> +void migrate_fd_error(MigrationState *s, Error *error);
>  
>  void migrate_fd_connect(MigrationState *s);
>  
> diff --git a/migration/socket.c b/migration/socket.c
> index dee869044a..2d70747a1a 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -80,7 +80,6 @@ static void socket_outgoing_migration(QIOTask *task,
>      if (qio_task_propagate_error(task, &err)) {
>          trace_migration_socket_outgoing_error(error_get_pretty(err));
>          migrate_fd_error(data->s, err);
> -        error_free(err);
>      } else {
>          trace_migration_socket_outgoing_connected(data->hostname);
>          migration_channel_connect(data->s, sioc, data->hostname);
> -- 
> 2.13.5
> 

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work Juan Quintela
@ 2017-10-09 10:05   ` Peter Xu
  2017-10-09 10:15   ` Daniel P. Berrange
  2017-10-16 19:11   ` Dr. David Alan Gilbert
  2 siblings, 0 replies; 35+ messages in thread
From: Peter Xu @ 2017-10-09 10:05 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, dgilbert, lvivier

On Wed, Oct 04, 2017 at 12:46:28PM +0200, Juan Quintela wrote:
> We create new channels for each new thread created. We send through
> them a string containing <uuid> multifd <channel number> so we are

Need to touch-up the commit message to reflect the new protocol?

> sure that we connect the right channels in both sides.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> ---
>  migration/migration.c |   5 ++
>  migration/ram.c       | 128 +++++++++++++++++++++++++++++++++++++++++++-------
>  migration/ram.h       |   3 ++
>  migration/socket.c    |  34 +++++++++++++-
>  migration/socket.h    |  10 ++++
>  5 files changed, 162 insertions(+), 18 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 61b7e7105a..ee98c50d8c 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -419,6 +419,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +    if (migrate_use_multifd()) {
> +        int thread_count = migrate_multifd_channels();
> +
> +        return thread_count == multifd_created_channels();
> +    }
>      return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index b83f8977c5..b57006594b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -47,6 +48,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -363,6 +366,7 @@ struct MultiFDSendParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -379,6 +383,12 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +    }
>      for (i = 0; i < multifd_send_state->count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -404,6 +414,7 @@ int multifd_save_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_send_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -414,9 +425,27 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +typedef struct {
> +    uint32_t version;

Maybe use uint8_t for version as well?  Otherwise we may need to do
proper endianess swapping to make sure BE/LE hosts can migrate between
each other?

> +    uint8_t id;
> +    char uuid[UUID_FMT_LEN];
> +} MigrateMultiFDInit_t;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    MigrateMultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    size_t ret;
> +
> +    msg.version = 1;
> +    msg.id = p->id;
> +    qemu_uuid_unparse(&qemu_uuid, (char *)&msg.uuid);

Would it be possible that we send the qemu_uuid.data directly?  Then
we can avoid parse/unparse on both sides?

> +    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_send_threads(local_err);
> +        return NULL;
> +    }
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -431,6 +460,27 @@ static void *multifd_send_thread(void *opaque)
>      return NULL;
>  }
>  
> +static void multifd_new_channel_async(QIOTask *task, gpointer opaque)
> +{
> +    MultiFDSendParams *p = opaque;
> +    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
> +    Error *local_err;
> +
> +    if (qio_task_propagate_error(task, &local_err)) {
> +        if (multifd_save_cleanup(&local_err) != 0) {
> +            migrate_set_error(migrate_get_current(), local_err);
> +        }
> +    } else {
> +        p->c = QIO_CHANNEL(sioc);
> +        qio_channel_set_delay(p->c, false);
> +
> +        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +
> +        multifd_send_state->count++;
> +    }
> +}
> +
>  int multifd_save_setup(void)
>  {
>      int thread_count;
> @@ -451,10 +501,7 @@ int multifd_save_setup(void)
>          p->quit = false;
>          p->id = i;
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -
> -        multifd_send_state->count++;
> +        socket_send_channel_create(multifd_new_channel_async, p);
>      }
>      return 0;
>  }
> @@ -463,6 +510,7 @@ struct MultiFDRecvParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -473,12 +521,22 @@ struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
>      int count;
> +    /* Should we finish */
> +    bool quit;
>  } *multifd_recv_state;
>  
>  static void terminate_multifd_recv_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +    }
> +    multifd_recv_state->quit = true;
> +
>      for (i = 0; i < multifd_recv_state->count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> @@ -504,6 +562,7 @@ int multifd_load_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_recv_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -532,10 +591,51 @@ static void *multifd_recv_thread(void *opaque)
>      return NULL;
>  }
>  
> +void multifd_new_channel(QIOChannel *ioc)

This name looks similar to multifd_new_channel_async().  Would it make
sense to rename it to something like "multifd_new_recv_channel" to
show that it's creating receiving ports?  Also, is this function used
in current patch?  Since I see no caller of it.

> +{
> +    MultiFDRecvParams *p;
> +    MigrateMultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    char *uuid;
> +    size_t ret;
> +
> +    ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +
> +    uuid = qemu_uuid_unparse_strdup(&qemu_uuid);

(if we use UUID binary, we can avoid parsing here as well)

> +
> +    if (strcmp(msg.uuid, uuid)) {
> +        g_free(uuid);
> +        error_setg(&local_err, "multifd: received uuid '%s' and expected "
> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +    g_free(uuid);
> +
> +    p = &multifd_recv_state->params[msg.id];
> +    if (p->id != 0) {

Maybe init p->id to -1 then check against -1 here?  Since the first
channel seems to always have p->id == 0.

> +        error_setg(&local_err, "multifd: received id '%d' already setup'", msg.id);
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +    qemu_mutex_init(&p->mutex);
> +    qemu_sem_init(&p->sem, 0);
> +    p->quit = false;
> +    p->id = msg.id;
> +    p->c = ioc;
> +    multifd_recv_state->count++;
> +    p->name = g_strdup_printf("multifdrecv_%d", msg.id);
> +    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> +                       QEMU_THREAD_JOINABLE);
> +}
> +
>  int multifd_load_setup(void)
>  {
>      int thread_count;
> -    uint8_t i;
>  
>      if (!migrate_use_multifd()) {
>          return 0;
> @@ -544,21 +644,15 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      multifd_recv_state->count = 0;
> -    for (i = 0; i < thread_count; i++) {
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> -        qemu_mutex_init(&p->mutex);
> -        qemu_sem_init(&p->sem, 0);
> -        p->quit = false;
> -        p->id = i;
> -        p->name = g_strdup_printf("multifdrecv_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -        multifd_recv_state->count++;
> -    }
> +    multifd_recv_state->quit = false;
>      return 0;
>  }
>  
> +int multifd_created_channels(void)
> +{
> +    return multifd_recv_state->count;
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> diff --git a/migration/ram.h b/migration/ram.h
> index 4a72d66503..5221bc9beb 100644
> --- a/migration/ram.h
> +++ b/migration/ram.h
> @@ -31,6 +31,7 @@
>  
>  #include "qemu-common.h"
>  #include "exec/cpu-common.h"
> +#include "io/channel.h"
>  
>  extern MigrationStats ram_counters;
>  extern XBZRLECacheStats xbzrle_counters;
> @@ -43,6 +44,8 @@ int multifd_save_setup(void);
>  int multifd_save_cleanup(Error **errp);
>  int multifd_load_setup(void);
>  int multifd_load_cleanup(Error **errp);
> +void multifd_new_channel(QIOChannel *ioc);
> +int multifd_created_channels(void);
>  
>  uint64_t ram_pagesize_summary(void);
>  int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
> diff --git a/migration/socket.c b/migration/socket.c
> index 2d70747a1a..22fb05edc8 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -26,6 +26,34 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
>  
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(recv));
> +    return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> +    SocketAddress *saddr;
> +} outgoing_args;
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> +                                     f, data, NULL);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    if (outgoing_args.saddr) {

Nitpick: IIUC socket_send_channel_destroy() will be called per
channel?  Would it be nicer to clean up outgoing_args.saddr once
somewhere instead of checking it every time when destroying a channel?

(If we move this out, maybe we can avoid introducing
 socket_send_channel_destroy() in general since then it'll only
 contain one single object_unref.)

Thanks,

> +        qapi_free_SocketAddress(outgoing_args.saddr);
> +        outgoing_args.saddr = NULL;
> +    }
> +    return 0;
> +}
>  
>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>  {
> @@ -95,6 +123,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>  
>      data->s = s;
> +
> +    /* in case previous migration leaked it */
> +    qapi_free_SocketAddress(outgoing_args.saddr);
> +    outgoing_args.saddr = saddr;
> +
>      if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
>          data->hostname = g_strdup(saddr->u.inet.host);
>      }
> @@ -105,7 +138,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free);
> -    qapi_free_SocketAddress(saddr);
>  }
>  
>  void tcp_start_outgoing_migration(MigrationState *s,
> diff --git a/migration/socket.h b/migration/socket.h
> index 6b91e9db38..afb0ff0f51 100644
> --- a/migration/socket.h
> +++ b/migration/socket.h
> @@ -16,6 +16,16 @@
>  
>  #ifndef QEMU_MIGRATION_SOCKET_H
>  #define QEMU_MIGRATION_SOCKET_H
> +
> +#include "io/channel.h"
> +#include "io/task.h"
> +
> +QIOChannel *socket_recv_channel_create(void);
> +int socket_recv_channel_destroy(QIOChannel *recv);
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data);
> +int socket_send_channel_destroy(QIOChannel *send);
> +
>  void tcp_start_incoming_migration(const char *host_port, Error **errp);
>  
>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
> -- 
> 2.13.5
> 

-- 
Peter Xu

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work Juan Quintela
  2017-10-09 10:05   ` Peter Xu
@ 2017-10-09 10:15   ` Daniel P. Berrange
  2017-10-09 12:32     ` Juan Quintela
  2017-10-09 12:32     ` Juan Quintela
  2017-10-16 19:11   ` Dr. David Alan Gilbert
  2 siblings, 2 replies; 35+ messages in thread
From: Daniel P. Berrange @ 2017-10-09 10:15 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, dgilbert, peterx

On Wed, Oct 04, 2017 at 12:46:28PM +0200, Juan Quintela wrote:
> We create new channels for each new thread created. We send through
> them a string containing <uuid> multifd <channel number> so we are
> sure that we connect the right channels in both sides.

This message needs updating now that we send a struct.


> diff --git a/migration/ram.c b/migration/ram.c
> index b83f8977c5..b57006594b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -414,9 +425,27 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +typedef struct {
> +    uint32_t version;
> +    uint8_t id;
> +    char uuid[UUID_FMT_LEN];
> +} MigrateMultiFDInit_t;

We add an __attribute__((packed)) here since we send it directly
on the wire. Perhaps put 'uuid' field before 'id' when doing that
so 'uuid' gets a more natural alignment.

If we use 'unsigned char uuid[16]' then you don't need to convert
from string format either...

>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    MigrateMultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    size_t ret;
> +
> +    msg.version = 1;
> +    msg.id = p->id;
> +    qemu_uuid_unparse(&qemu_uuid, (char *)&msg.uuid);

eg this could be   memcpy(msg.uuid, qemu_uuid.data, sizeof(msg.uuid))

> +    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_send_threads(local_err);
> +        return NULL;
> +    }
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -431,6 +460,27 @@ static void *multifd_send_thread(void *opaque)
>      return NULL;
>  }

> +void multifd_new_channel(QIOChannel *ioc)
> +{
> +    MultiFDRecvParams *p;
> +    MigrateMultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    char *uuid;
> +    size_t ret;
> +
> +    ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +
> +    uuid = qemu_uuid_unparse_strdup(&qemu_uuid);

...and here we would avoid need to unparse, instead..

> +
> +    if (strcmp(msg.uuid, uuid)) {

  memcmp(msg.uuid, qemu_uuid.data, sizeof(msg.uuid) != 0

> +        g_free(uuid);
> +        error_setg(&local_err, "multifd: received uuid '%s' and expected "
> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +    g_free(uuid);
> +

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work
  2017-10-09 10:15   ` Daniel P. Berrange
@ 2017-10-09 12:32     ` Juan Quintela
  2017-10-09 12:32     ` Juan Quintela
  1 sibling, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-10-09 12:32 UTC (permalink / raw)
  To: Daniel P. Berrange; +Cc: qemu-devel, lvivier, dgilbert, peterx

"Daniel P. Berrange" <berrange@redhat.com> wrote:
> On Wed, Oct 04, 2017 at 12:46:28PM +0200, Juan Quintela wrote:
>> We create new channels for each new thread created. We send through
>> them a string containing <uuid> multifd <channel number> so we are
>> sure that we connect the right channels in both sides.
>
> This message needs updating now that we send a struct.
>
>
>> diff --git a/migration/ram.c b/migration/ram.c
>> index b83f8977c5..b57006594b 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -414,9 +425,27 @@ int multifd_save_cleanup(Error **errp)
>>      return ret;
>>  }
>>  
>> +typedef struct {
>> +    uint32_t version;
>> +    uint8_t id;
>> +    char uuid[UUID_FMT_LEN];
>> +} MigrateMultiFDInit_t;
>
> We add an __attribute__((packed)) here since we send it directly
> on the wire. Perhaps put 'uuid' field before 'id' when doing that
> so 'uuid' gets a more natural alignment.

ok.

> If we use 'unsigned char uuid[16]' then you don't need to convert
> from string format either...

I was looking at the "exported" commands in uuid.h, but I can use the
memcopy without problem.  Just feel like using a "detail" of the
implementation.



>
>>  static void *multifd_send_thread(void *opaque)
>>  {
>>      MultiFDSendParams *p = opaque;
>> +    MigrateMultiFDInit_t msg;
>> +    Error *local_err = NULL;
>> +    size_t ret;
>> +
>> +    msg.version = 1;
>> +    msg.id = p->id;
>> +    qemu_uuid_unparse(&qemu_uuid, (char *)&msg.uuid);
>
> eg this could be   memcpy(msg.uuid, qemu_uuid.data, sizeof(msg.uuid))
>
>> +    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
>> +    if (ret != 0) {
>> +        terminate_multifd_send_threads(local_err);
>> +        return NULL;
>> +    }
>>  
>>      while (true) {
>>          qemu_mutex_lock(&p->mutex);
>> @@ -431,6 +460,27 @@ static void *multifd_send_thread(void *opaque)
>>      return NULL;
>>  }
>
>> +void multifd_new_channel(QIOChannel *ioc)
>> +{
>> +    MultiFDRecvParams *p;
>> +    MigrateMultiFDInit_t msg;
>> +    Error *local_err = NULL;
>> +    char *uuid;
>> +    size_t ret;
>> +
>> +    ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
>> +    if (ret != 0) {
>> +        terminate_multifd_recv_threads(local_err);
>> +        return;
>> +    }
>> +
>> +    uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
>
> ...and here we would avoid need to unparse, instead..
>
>> +
>> +    if (strcmp(msg.uuid, uuid)) {
>
>   memcmp(msg.uuid, qemu_uuid.data, sizeof(msg.uuid) != 0
>
>> +        g_free(uuid);
>> +        error_setg(&local_err, "multifd: received uuid '%s' and expected "
>> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
>> +        terminate_multifd_recv_threads(local_err);
>> +        return;
>> +    }
>> +    g_free(uuid);

Thanks, Juan.

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work
  2017-10-09 10:15   ` Daniel P. Berrange
  2017-10-09 12:32     ` Juan Quintela
@ 2017-10-09 12:32     ` Juan Quintela
  1 sibling, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-10-09 12:32 UTC (permalink / raw)
  To: Daniel P. Berrange; +Cc: qemu-devel, lvivier, dgilbert, peterx

"Daniel P. Berrange" <berrange@redhat.com> wrote:
> On Wed, Oct 04, 2017 at 12:46:28PM +0200, Juan Quintela wrote:
>> We create new channels for each new thread created. We send through
>> them a string containing <uuid> multifd <channel number> so we are
>> sure that we connect the right channels in both sides.
>
> This message needs updating now that we send a struct.
>
>
>> diff --git a/migration/ram.c b/migration/ram.c
>> index b83f8977c5..b57006594b 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -414,9 +425,27 @@ int multifd_save_cleanup(Error **errp)
>>      return ret;
>>  }
>>  
>> +typedef struct {
>> +    uint32_t version;
>> +    uint8_t id;
>> +    char uuid[UUID_FMT_LEN];
>> +} MigrateMultiFDInit_t;
>
> We add an __attribute__((packed)) here since we send it directly
> on the wire. Perhaps put 'uuid' field before 'id' when doing that
> so 'uuid' gets a more natural alignment.

ok.

> If we use 'unsigned char uuid[16]' then you don't need to convert
> from string format either...

I was looking at the "exported" commands in uuid.h, but I can use the
memcopy without problem.  Just feel like using a "detail" of the
implementation.



>
>>  static void *multifd_send_thread(void *opaque)
>>  {
>>      MultiFDSendParams *p = opaque;
>> +    MigrateMultiFDInit_t msg;
>> +    Error *local_err = NULL;
>> +    size_t ret;
>> +
>> +    msg.version = 1;
>> +    msg.id = p->id;
>> +    qemu_uuid_unparse(&qemu_uuid, (char *)&msg.uuid);
>
> eg this could be   memcpy(msg.uuid, qemu_uuid.data, sizeof(msg.uuid))
>
>> +    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
>> +    if (ret != 0) {
>> +        terminate_multifd_send_threads(local_err);
>> +        return NULL;
>> +    }
>>  
>>      while (true) {
>>          qemu_mutex_lock(&p->mutex);
>> @@ -431,6 +460,27 @@ static void *multifd_send_thread(void *opaque)
>>      return NULL;
>>  }
>
>> +void multifd_new_channel(QIOChannel *ioc)
>> +{
>> +    MultiFDRecvParams *p;
>> +    MigrateMultiFDInit_t msg;
>> +    Error *local_err = NULL;
>> +    char *uuid;
>> +    size_t ret;
>> +
>> +    ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
>> +    if (ret != 0) {
>> +        terminate_multifd_recv_threads(local_err);
>> +        return;
>> +    }
>> +
>> +    uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
>
> ...and here we would avoid need to unparse, instead..
>
>> +
>> +    if (strcmp(msg.uuid, uuid)) {
>
>   memcmp(msg.uuid, qemu_uuid.data, sizeof(msg.uuid) != 0
>
>> +        g_free(uuid);
>> +        error_setg(&local_err, "multifd: received uuid '%s' and expected "
>> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
>> +        terminate_multifd_recv_threads(local_err);
>> +        return;
>> +    }
>> +    g_free(uuid);

Thanks, Juan.

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 05/12] migration: Create ram_multifd_page
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 05/12] migration: Create ram_multifd_page Juan Quintela
@ 2017-10-09 13:08   ` Paolo Bonzini
  2017-10-16 19:43   ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2017-10-09 13:08 UTC (permalink / raw)
  To: Juan Quintela, qemu-devel; +Cc: lvivier, dgilbert, peterx

On 04/10/2017 12:46, Juan Quintela wrote:
> The function still don't use multifd, but we have simplified
> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> counter and a new flag for this type of pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

I still disagree with the approach, and still haven't gotten a good
explanation of why it is too difficult to just have multiple migration
streams.

Paolo

> --
> Add last_page parameter
> Add commets for done and address
> Remove multifd field, it is the same than normal pages
> Merge next patch, now we send multiple pages at a time
> Remove counter for multifd pages, it is identical to normal pages
> Use iovec's instead of creating the equivalent.
> Clear memory used by pages (dave)
> Use g_new0(danp)
> define MULTIFD_CONTINUE
> ---
>  migration/ram.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 131 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index b57006594b..c0af538f5f 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -50,6 +50,7 @@
>  #include "migration/block.h"
>  #include "sysemu/sysemu.h"
>  #include "qemu/uuid.h"
> +#include "qemu/iov.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -69,6 +70,7 @@
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
> +#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>  
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>  {
> @@ -362,14 +364,29 @@ static void compress_threads_save_setup(void)
>  
>  /* Multiple fd's */
>  
> +/* used to continue on the same multifd group */
> +#define MULTIFD_CONTINUE UINT16_MAX
> +
> +typedef struct {
> +    int num;
> +    size_t size;
> +    struct iovec *iov;
> +} multifd_pages_t;
> +
>  struct MultiFDSendParams {
> +    /* not changed */
>      uint8_t id;
>      char *name;
>      QemuThread thread;
>      QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
> +    /* protected by param mutex */
>      bool quit;
> +    multifd_pages_t pages;
> +    /* protected by multifd mutex */
> +    /* has the thread finish the last submitted job */
> +    bool done;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
>  
> @@ -377,8 +394,26 @@ struct {
>      MultiFDSendParams *params;
>      /* number of created threads */
>      int count;
> +    QemuMutex mutex;
> +    QemuSemaphore sem;
> +    multifd_pages_t pages;
>  } *multifd_send_state;
>  
> +static void multifd_init_pages(multifd_pages_t *pages)
> +{
> +    pages->num = 0;
> +    pages->size = migrate_multifd_page_count();
> +    pages->iov = g_new0(struct iovec, pages->size);
> +}
> +
> +static void multifd_clear_pages(multifd_pages_t *pages)
> +{
> +    pages->num = 0;
> +    pages->size = 0;
> +    g_free(pages->iov);
> +    pages->iov = NULL;
> +}
> +
>  static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
> @@ -417,9 +452,11 @@ int multifd_save_cleanup(Error **errp)
>          socket_send_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
> +        multifd_clear_pages(&p->pages);
>      }
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> +    multifd_clear_pages(&multifd_send_state->pages);
>      g_free(multifd_send_state);
>      multifd_send_state = NULL;
>      return ret;
> @@ -446,6 +483,7 @@ static void *multifd_send_thread(void *opaque)
>          terminate_multifd_send_threads(local_err);
>          return NULL;
>      }
> +    qemu_sem_post(&multifd_send_state->sem);
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -453,6 +491,15 @@ static void *multifd_send_thread(void *opaque)
>              qemu_mutex_unlock(&p->mutex);
>              break;
>          }
> +        if (p->pages.num) {
> +            p->pages.num = 0;
> +            qemu_mutex_unlock(&p->mutex);
> +            qemu_mutex_lock(&multifd_send_state->mutex);
> +            p->done = true;
> +            qemu_mutex_unlock(&multifd_send_state->mutex);
> +            qemu_sem_post(&multifd_send_state->sem);
> +            continue;
> +        }
>          qemu_mutex_unlock(&p->mutex);
>          qemu_sem_wait(&p->sem);
>      }
> @@ -493,6 +540,9 @@ int multifd_save_setup(void)
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      multifd_send_state->count = 0;
> +    qemu_mutex_init(&multifd_send_state->mutex);
> +    qemu_sem_init(&multifd_send_state->sem, 0);
> +    multifd_init_pages(&multifd_send_state->pages);
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -500,12 +550,52 @@ int multifd_save_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          p->quit = false;
>          p->id = i;
> +        p->done = true;
> +        multifd_init_pages(&p->pages);
>          p->name = g_strdup_printf("multifdsend_%d", i);
>          socket_send_channel_create(multifd_new_channel_async, p);
>      }
>      return 0;
>  }
>  
> +static uint16_t multifd_send_page(uint8_t *address, bool last_page)
> +{
> +    int i;
> +    MultiFDSendParams *p = NULL; /* make happy gcc */
> +    multifd_pages_t *pages = &multifd_send_state->pages;
> +
> +    pages->iov[pages->num].iov_base = address;
> +    pages->iov[pages->num].iov_len = TARGET_PAGE_SIZE;
> +    pages->num++;
> +
> +    if (!last_page) {
> +        if (pages->num < (pages->size - 1)) {
> +            return MULTIFD_CONTINUE;
> +        }
> +    }
> +
> +    qemu_sem_wait(&multifd_send_state->sem);
> +    qemu_mutex_lock(&multifd_send_state->mutex);
> +    for (i = 0; i < multifd_send_state->count; i++) {
> +        p = &multifd_send_state->params[i];
> +
> +        if (p->done) {
> +            p->done = false;
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_send_state->mutex);
> +    qemu_mutex_lock(&p->mutex);
> +    p->pages.num = pages->num;
> +    iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0,
> +             iov_size(pages->iov, pages->num));
> +    pages->num = 0;
> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
> +
> +    return 0;
> +}
> +
>  struct MultiFDRecvParams {
>      uint8_t id;
>      char *name;
> @@ -1086,6 +1176,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
>      return pages;
>  }
>  
> +static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
> +                            bool last_stage)
> +{
> +    int pages;
> +    uint8_t *p;
> +    RAMBlock *block = pss->block;
> +    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
> +
> +    p = block->host + offset;
> +
> +    pages = save_zero_page(rs, block, offset, p);
> +    if (pages == -1) {
> +        ram_counters.transferred +=
> +            save_page_header(rs, rs->f, block,
> +                             offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> +        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
> +        multifd_send_page(p, rs->migration_dirty_pages == 1);
> +        ram_counters.transferred += TARGET_PAGE_SIZE;
> +        pages = 1;
> +        ram_counters.normal++;
> +    }
> +
> +    return pages;
> +}
> +
>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
> @@ -1514,6 +1629,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
>          if (migrate_use_compression() &&
>              (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
>              res = ram_save_compressed_page(rs, pss, last_stage);
> +        } else if (migrate_use_multifd()) {
> +            res = ram_multifd_page(rs, pss, last_stage);
>          } else {
>              res = ram_save_page(rs, pss, last_stage);
>          }
> @@ -2810,6 +2927,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      if (!migrate_use_compression()) {
>          invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
>      }
> +
> +    if (!migrate_use_multifd()) {
> +        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
> +    }
>      /* This RCU critical section can be very long running.
>       * When RCU reclaims in the code start to become numerous,
>       * it will be necessary to reduce the granularity of this
> @@ -2834,13 +2955,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
>                  error_report("Received an unexpected compressed page");
>              }
> +            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
> +                error_report("Received an unexpected multifd page");
> +            }
>  
>              ret = -EINVAL;
>              break;
>          }
>  
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
> -                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
> +                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
> +                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
>              RAMBlock *block = ram_block_from_stream(f, flags);
>  
>              host = host_from_ram_block_offset(block, addr);
> @@ -2928,6 +3053,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>                  break;
>              }
>              break;
> +
> +        case RAM_SAVE_FLAG_MULTIFD_PAGE:
> +            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> +            break;
> +
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
>              break;
> 

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 01/12] qapi: Fix grammar in x-multifd-page-count descriptions
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 01/12] qapi: Fix grammar in x-multifd-page-count descriptions Juan Quintela
@ 2017-10-16 16:53   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-16 16:53 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Reported-by: Eric Blake <eblake@redhat.com>
> Signed-off-by: Juan Quintela <quintela@redhat.com>

This should just go separately to trivial.

> ---
>  qapi/migration.json | 6 +++---
>  1 file changed, 3 insertions(+), 3 deletions(-)
> 
> diff --git a/qapi/migration.json b/qapi/migration.json
> index f8b365e3f5..d94d6c0e46 100644
> --- a/qapi/migration.json
> +++ b/qapi/migration.json
> @@ -471,7 +471,7 @@
>  #                     number of sockets used for migration.  The
>  #                     default value is 2 (since 2.11)

I wonder if we need a '.' after the 2 and 16 in this set as well.
(Personally I'm not that fussy).

>  #
> -# @x-multifd-page-count: Number of pages sent together to a thread
> +# @x-multifd-page-count: Number of pages sent together to a thread.
>  #                        The default value is 16 (since 2.11)
>  #
>  # Since: 2.4
> @@ -542,7 +542,7 @@
>  #                     number of sockets used for migration.  The
>  #                     default value is 2 (since 2.11)
>  #
> -# @x-multifd-page-count: Number of pages sent together to a thread
> +# @x-multifd-page-count: Number of pages sent together to a thread.
>  #                        The default value is 16 (since 2.11)
>  #
>  # Since: 2.4
> @@ -638,7 +638,7 @@
>  #                     number of sockets used for migration.
>  #                     The default value is 2 (since 2.11)
>  #
> -# @x-multifd-page-count: Number of pages sent together to a thread
> +# @x-multifd-page-count: Number of pages sent together to a thread.
>  #                        The default value is 16 (since 2.11)
>  #

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

>  # Since: 2.4
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling Juan Quintela
  2017-10-09  9:28   ` Peter Xu
@ 2017-10-16 17:34   ` Dr. David Alan Gilbert
  2017-10-16 17:48     ` Dr. David Alan Gilbert
  1 sibling, 1 reply; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-16 17:34 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We now report errors also when we finish migration, not only on info
> migrate.  We plan to use this error from several places, and we want
> the first error to happen to win, so we add an mutex to order it.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> ---
>  migration/migration.c | 19 ++++++++++++++++---
>  migration/migration.h |  7 ++++++-
>  migration/tls.c       |  1 -
>  3 files changed, 22 insertions(+), 5 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 98429dc843..468f51cfa7 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1071,19 +1071,30 @@ static void migrate_fd_cleanup(void *opaque)
>                            MIGRATION_STATUS_CANCELLED);
>      }
>  
> +    if (s->error) {
> +        /* It is used on info migrate.  We can't free it */
> +        error_report_err(error_copy(s->error));
> +    }
>      notifier_list_notify(&migration_state_notifiers, s);
>      block_cleanup_parameters(s);
>  }
>  
> +void migrate_set_error(MigrationState *s, const Error *error)

If you find you need to resend this, please add a comment on this
function saying it takes a copy and it's upto the caller to free
the error they pass in.

Dave

> +{
> +    qemu_mutex_lock(&s->error_mutex);
> +    if (!s->error) {
> +        s->error = error_copy(error);
> +    }
> +    qemu_mutex_unlock(&s->error_mutex);
> +}
> +
>  void migrate_fd_error(MigrationState *s, const Error *error)
>  {
>      trace_migrate_fd_error(error_get_pretty(error));
>      assert(s->to_dst_file == NULL);
>      migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
>                        MIGRATION_STATUS_FAILED);
> -    if (!s->error) {
> -        s->error = error_copy(error);
> -    }
> +    migrate_set_error(s, error);
>      notifier_list_notify(&migration_state_notifiers, s);
>      block_cleanup_parameters(s);
>  }
> @@ -2355,6 +2366,7 @@ static void migration_instance_finalize(Object *obj)
>      MigrationState *ms = MIGRATION_OBJ(obj);
>      MigrationParameters *params = &ms->parameters;
>  
> +    qemu_mutex_destroy(&ms->error_mutex);
>      g_free(params->tls_hostname);
>      g_free(params->tls_creds);
>  }
> @@ -2367,6 +2379,7 @@ static void migration_instance_init(Object *obj)
>      ms->state = MIGRATION_STATUS_NONE;
>      ms->xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE;
>      ms->mbps = -1;
> +    qemu_mutex_init(&ms->error_mutex);
>  
>      params->tls_hostname = g_strdup("");
>      params->tls_creds = g_strdup("");
> diff --git a/migration/migration.h b/migration/migration.h
> index b83cceadc4..51c0ac2e71 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -129,8 +129,12 @@ struct MigrationState
>      int64_t colo_checkpoint_time;
>      QEMUTimer *colo_delay_timer;
>  
> -    /* The last error that occurred */
> +    /* The first error that has occurred.
> +       We used the mutex to be able to return the 1st error message */
>      Error *error;
> +    /* mutex to protect errp */
> +    QemuMutex error_mutex;
> +
>      /* Do we have to clean up -b/-i from old migrate parameters */
>      /* This feature is deprecated and will be removed */
>      bool must_remove_block_options;
> @@ -159,6 +163,7 @@ bool  migration_has_all_channels(void);
>  
>  uint64_t migrate_max_downtime(void);
>  
> +void migrate_set_error(MigrationState *s, const Error *error);
>  void migrate_fd_error(MigrationState *s, const Error *error);
>  
>  void migrate_fd_connect(MigrationState *s);
> diff --git a/migration/tls.c b/migration/tls.c
> index 596e8790bd..026a008667 100644
> --- a/migration/tls.c
> +++ b/migration/tls.c
> @@ -119,7 +119,6 @@ static void migration_tls_outgoing_handshake(QIOTask *task,
>      if (qio_task_propagate_error(task, &err)) {
>          trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
>          migrate_fd_error(s, err);
> -        error_free(err);
>      } else {
>          trace_migration_tls_outgoing_handshake_complete();
>          migration_channel_connect(s, ioc, NULL);
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling
  2017-10-16 17:34   ` Dr. David Alan Gilbert
@ 2017-10-16 17:48     ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-16 17:48 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Dr. David Alan Gilbert (dgilbert@redhat.com) wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
> > We now report errors also when we finish migration, not only on info
> > migrate.  We plan to use this error from several places, and we want
> > the first error to happen to win, so we add an mutex to order it.
> > 
> > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> > ---
> >  migration/migration.c | 19 ++++++++++++++++---
> >  migration/migration.h |  7 ++++++-
> >  migration/tls.c       |  1 -
> >  3 files changed, 22 insertions(+), 5 deletions(-)
> > 
> > diff --git a/migration/migration.c b/migration/migration.c
> > index 98429dc843..468f51cfa7 100644
> > --- a/migration/migration.c
> > +++ b/migration/migration.c
> > @@ -1071,19 +1071,30 @@ static void migrate_fd_cleanup(void *opaque)
> >                            MIGRATION_STATUS_CANCELLED);
> >      }
> >  
> > +    if (s->error) {
> > +        /* It is used on info migrate.  We can't free it */
> > +        error_report_err(error_copy(s->error));
> > +    }
> >      notifier_list_notify(&migration_state_notifiers, s);
> >      block_cleanup_parameters(s);
> >  }
> >  
> > +void migrate_set_error(MigrationState *s, const Error *error)
> 
> If you find you need to resend this, please add a comment on this
> function saying it takes a copy and it's upto the caller to free
> the error they pass in.

Oops, ignore that, I see you change it in the next one.

> Dave
> 
> > +{
> > +    qemu_mutex_lock(&s->error_mutex);
> > +    if (!s->error) {
> > +        s->error = error_copy(error);
> > +    }
> > +    qemu_mutex_unlock(&s->error_mutex);
> > +}
> > +
> >  void migrate_fd_error(MigrationState *s, const Error *error)
> >  {
> >      trace_migrate_fd_error(error_get_pretty(error));
> >      assert(s->to_dst_file == NULL);
> >      migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
> >                        MIGRATION_STATUS_FAILED);
> > -    if (!s->error) {
> > -        s->error = error_copy(error);
> > -    }
> > +    migrate_set_error(s, error);
> >      notifier_list_notify(&migration_state_notifiers, s);
> >      block_cleanup_parameters(s);
> >  }
> > @@ -2355,6 +2366,7 @@ static void migration_instance_finalize(Object *obj)
> >      MigrationState *ms = MIGRATION_OBJ(obj);
> >      MigrationParameters *params = &ms->parameters;
> >  
> > +    qemu_mutex_destroy(&ms->error_mutex);
> >      g_free(params->tls_hostname);
> >      g_free(params->tls_creds);
> >  }
> > @@ -2367,6 +2379,7 @@ static void migration_instance_init(Object *obj)
> >      ms->state = MIGRATION_STATUS_NONE;
> >      ms->xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE;
> >      ms->mbps = -1;
> > +    qemu_mutex_init(&ms->error_mutex);
> >  
> >      params->tls_hostname = g_strdup("");
> >      params->tls_creds = g_strdup("");
> > diff --git a/migration/migration.h b/migration/migration.h
> > index b83cceadc4..51c0ac2e71 100644
> > --- a/migration/migration.h
> > +++ b/migration/migration.h
> > @@ -129,8 +129,12 @@ struct MigrationState
> >      int64_t colo_checkpoint_time;
> >      QEMUTimer *colo_delay_timer;
> >  
> > -    /* The last error that occurred */
> > +    /* The first error that has occurred.
> > +       We used the mutex to be able to return the 1st error message */
> >      Error *error;
> > +    /* mutex to protect errp */
> > +    QemuMutex error_mutex;
> > +
> >      /* Do we have to clean up -b/-i from old migrate parameters */
> >      /* This feature is deprecated and will be removed */
> >      bool must_remove_block_options;
> > @@ -159,6 +163,7 @@ bool  migration_has_all_channels(void);
> >  
> >  uint64_t migrate_max_downtime(void);
> >  
> > +void migrate_set_error(MigrationState *s, const Error *error);
> >  void migrate_fd_error(MigrationState *s, const Error *error);
> >  
> >  void migrate_fd_connect(MigrationState *s);
> > diff --git a/migration/tls.c b/migration/tls.c
> > index 596e8790bd..026a008667 100644
> > --- a/migration/tls.c
> > +++ b/migration/tls.c
> > @@ -119,7 +119,6 @@ static void migration_tls_outgoing_handshake(QIOTask *task,
> >      if (qio_task_propagate_error(task, &err)) {
> >          trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
> >          migrate_fd_error(s, err);
> > -        error_free(err);
> >      } else {
> >          trace_migration_tls_outgoing_handshake_complete();
> >          migration_channel_connect(s, ioc, NULL);
> > -- 
> > 2.13.5
> > 
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work Juan Quintela
  2017-10-09 10:05   ` Peter Xu
  2017-10-09 10:15   ` Daniel P. Berrange
@ 2017-10-16 19:11   ` Dr. David Alan Gilbert
  2017-12-09 16:46     ` Juan Quintela
  2 siblings, 1 reply; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-16 19:11 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We create new channels for each new thread created. We send through
> them a string containing <uuid> multifd <channel number> so we are
> sure that we connect the right channels in both sides.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> ---
>  migration/migration.c |   5 ++
>  migration/ram.c       | 128 +++++++++++++++++++++++++++++++++++++++++++-------
>  migration/ram.h       |   3 ++
>  migration/socket.c    |  34 +++++++++++++-
>  migration/socket.h    |  10 ++++
>  5 files changed, 162 insertions(+), 18 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 61b7e7105a..ee98c50d8c 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -419,6 +419,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +    if (migrate_use_multifd()) {
> +        int thread_count = migrate_multifd_channels();
> +
> +        return thread_count == multifd_created_channels();
> +    }
>      return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index b83f8977c5..b57006594b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -47,6 +48,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -363,6 +366,7 @@ struct MultiFDSendParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -379,6 +383,12 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +    }
>      for (i = 0; i < multifd_send_state->count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -404,6 +414,7 @@ int multifd_save_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_send_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -414,9 +425,27 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +typedef struct {
> +    uint32_t version;
> +    uint8_t id;
> +    char uuid[UUID_FMT_LEN];
> +} MigrateMultiFDInit_t;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    MigrateMultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    size_t ret;
> +
> +    msg.version = 1;
> +    msg.id = p->id;
> +    qemu_uuid_unparse(&qemu_uuid, (char *)&msg.uuid);
> +    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_send_threads(local_err);
> +        return NULL;
> +    }
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -431,6 +460,27 @@ static void *multifd_send_thread(void *opaque)
>      return NULL;
>  }
>  
> +static void multifd_new_channel_async(QIOTask *task, gpointer opaque)
> +{
> +    MultiFDSendParams *p = opaque;
> +    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
> +    Error *local_err;

Does that need an = NULL ?

> +    if (qio_task_propagate_error(task, &local_err)) {
> +        if (multifd_save_cleanup(&local_err) != 0) {
> +            migrate_set_error(migrate_get_current(), local_err);
> +        }
> +    } else {
> +        p->c = QIO_CHANNEL(sioc);
> +        qio_channel_set_delay(p->c, false);
> +
> +        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +
> +        multifd_send_state->count++;
> +    }
> +}
> +
>  int multifd_save_setup(void)
>  {
>      int thread_count;
> @@ -451,10 +501,7 @@ int multifd_save_setup(void)
>          p->quit = false;
>          p->id = i;
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -
> -        multifd_send_state->count++;
> +        socket_send_channel_create(multifd_new_channel_async, p);
>      }
>      return 0;
>  }
> @@ -463,6 +510,7 @@ struct MultiFDRecvParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -473,12 +521,22 @@ struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
>      int count;
> +    /* Should we finish */
> +    bool quit;
>  } *multifd_recv_state;
>  
>  static void terminate_multifd_recv_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);

Are we necessarily in ACTIVE at this point?   I suspect there
are some SETUP and I wonder if there are others.

Dave

> +    }
> +    multifd_recv_state->quit = true;
> +
>      for (i = 0; i < multifd_recv_state->count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> @@ -504,6 +562,7 @@ int multifd_load_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_recv_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -532,10 +591,51 @@ static void *multifd_recv_thread(void *opaque)
>      return NULL;
>  }
>  
> +void multifd_new_channel(QIOChannel *ioc)
> +{
> +    MultiFDRecvParams *p;
> +    MigrateMultiFDInit_t msg;
> +    Error *local_err = NULL;
> +    char *uuid;
> +    size_t ret;
> +
> +    ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +
> +    uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +
> +    if (strcmp(msg.uuid, uuid)) {
> +        g_free(uuid);
> +        error_setg(&local_err, "multifd: received uuid '%s' and expected "
> +                   "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +    g_free(uuid);
> +
> +    p = &multifd_recv_state->params[msg.id];
> +    if (p->id != 0) {
> +        error_setg(&local_err, "multifd: received id '%d' already setup'", msg.id);
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +    qemu_mutex_init(&p->mutex);
> +    qemu_sem_init(&p->sem, 0);
> +    p->quit = false;
> +    p->id = msg.id;
> +    p->c = ioc;
> +    multifd_recv_state->count++;
> +    p->name = g_strdup_printf("multifdrecv_%d", msg.id);
> +    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> +                       QEMU_THREAD_JOINABLE);
> +}
> +
>  int multifd_load_setup(void)
>  {
>      int thread_count;
> -    uint8_t i;
>  
>      if (!migrate_use_multifd()) {
>          return 0;
> @@ -544,21 +644,15 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      multifd_recv_state->count = 0;
> -    for (i = 0; i < thread_count; i++) {
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> -        qemu_mutex_init(&p->mutex);
> -        qemu_sem_init(&p->sem, 0);
> -        p->quit = false;
> -        p->id = i;
> -        p->name = g_strdup_printf("multifdrecv_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -        multifd_recv_state->count++;
> -    }
> +    multifd_recv_state->quit = false;
>      return 0;
>  }
>  
> +int multifd_created_channels(void)
> +{
> +    return multifd_recv_state->count;
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> diff --git a/migration/ram.h b/migration/ram.h
> index 4a72d66503..5221bc9beb 100644
> --- a/migration/ram.h
> +++ b/migration/ram.h
> @@ -31,6 +31,7 @@
>  
>  #include "qemu-common.h"
>  #include "exec/cpu-common.h"
> +#include "io/channel.h"
>  
>  extern MigrationStats ram_counters;
>  extern XBZRLECacheStats xbzrle_counters;
> @@ -43,6 +44,8 @@ int multifd_save_setup(void);
>  int multifd_save_cleanup(Error **errp);
>  int multifd_load_setup(void);
>  int multifd_load_cleanup(Error **errp);
> +void multifd_new_channel(QIOChannel *ioc);
> +int multifd_created_channels(void);
>  
>  uint64_t ram_pagesize_summary(void);
>  int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
> diff --git a/migration/socket.c b/migration/socket.c
> index 2d70747a1a..22fb05edc8 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -26,6 +26,34 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
>  
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(recv));
> +    return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> +    SocketAddress *saddr;
> +} outgoing_args;
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> +                                     f, data, NULL);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    if (outgoing_args.saddr) {
> +        qapi_free_SocketAddress(outgoing_args.saddr);
> +        outgoing_args.saddr = NULL;
> +    }
> +    return 0;
> +}
>  
>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>  {
> @@ -95,6 +123,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>  
>      data->s = s;
> +
> +    /* in case previous migration leaked it */
> +    qapi_free_SocketAddress(outgoing_args.saddr);
> +    outgoing_args.saddr = saddr;
> +
>      if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
>          data->hostname = g_strdup(saddr->u.inet.host);
>      }
> @@ -105,7 +138,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free);
> -    qapi_free_SocketAddress(saddr);
>  }
>  
>  void tcp_start_outgoing_migration(MigrationState *s,
> diff --git a/migration/socket.h b/migration/socket.h
> index 6b91e9db38..afb0ff0f51 100644
> --- a/migration/socket.h
> +++ b/migration/socket.h
> @@ -16,6 +16,16 @@
>  
>  #ifndef QEMU_MIGRATION_SOCKET_H
>  #define QEMU_MIGRATION_SOCKET_H
> +
> +#include "io/channel.h"
> +#include "io/task.h"
> +
> +QIOChannel *socket_recv_channel_create(void);
> +int socket_recv_channel_destroy(QIOChannel *recv);
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data);
> +int socket_send_channel_destroy(QIOChannel *send);
> +
>  void tcp_start_incoming_migration(const char *host_port, Error **errp);
>  
>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 05/12] migration: Create ram_multifd_page
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 05/12] migration: Create ram_multifd_page Juan Quintela
  2017-10-09 13:08   ` Paolo Bonzini
@ 2017-10-16 19:43   ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-16 19:43 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> The function still don't use multifd, but we have simplified
> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> counter and a new flag for this type of pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Add last_page parameter
> Add commets for done and address
> Remove multifd field, it is the same than normal pages
> Merge next patch, now we send multiple pages at a time
> Remove counter for multifd pages, it is identical to normal pages
> Use iovec's instead of creating the equivalent.
> Clear memory used by pages (dave)
> Use g_new0(danp)
> define MULTIFD_CONTINUE
> ---
>  migration/ram.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 131 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index b57006594b..c0af538f5f 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -50,6 +50,7 @@
>  #include "migration/block.h"
>  #include "sysemu/sysemu.h"
>  #include "qemu/uuid.h"
> +#include "qemu/iov.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -69,6 +70,7 @@
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
> +#define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>  
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>  {
> @@ -362,14 +364,29 @@ static void compress_threads_save_setup(void)
>  
>  /* Multiple fd's */
>  
> +/* used to continue on the same multifd group */
> +#define MULTIFD_CONTINUE UINT16_MAX
> +
> +typedef struct {
> +    int num;
> +    size_t size;
> +    struct iovec *iov;
> +} multifd_pages_t;
> +
>  struct MultiFDSendParams {
> +    /* not changed */
>      uint8_t id;
>      char *name;
>      QemuThread thread;
>      QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
> +    /* protected by param mutex */
>      bool quit;
> +    multifd_pages_t pages;
> +    /* protected by multifd mutex */
> +    /* has the thread finish the last submitted job */
> +    bool done;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
>  
> @@ -377,8 +394,26 @@ struct {
>      MultiFDSendParams *params;
>      /* number of created threads */
>      int count;
> +    QemuMutex mutex;
> +    QemuSemaphore sem;
> +    multifd_pages_t pages;
>  } *multifd_send_state;
>  
> +static void multifd_init_pages(multifd_pages_t *pages)
> +{
> +    pages->num = 0;
> +    pages->size = migrate_multifd_page_count();
> +    pages->iov = g_new0(struct iovec, pages->size);
> +}
> +
> +static void multifd_clear_pages(multifd_pages_t *pages)
> +{
> +    pages->num = 0;
> +    pages->size = 0;
> +    g_free(pages->iov);
> +    pages->iov = NULL;
> +}
> +
>  static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
> @@ -417,9 +452,11 @@ int multifd_save_cleanup(Error **errp)
>          socket_send_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
> +        multifd_clear_pages(&p->pages);
>      }
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> +    multifd_clear_pages(&multifd_send_state->pages);
>      g_free(multifd_send_state);
>      multifd_send_state = NULL;
>      return ret;
> @@ -446,6 +483,7 @@ static void *multifd_send_thread(void *opaque)
>          terminate_multifd_send_threads(local_err);
>          return NULL;
>      }
> +    qemu_sem_post(&multifd_send_state->sem);
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -453,6 +491,15 @@ static void *multifd_send_thread(void *opaque)
>              qemu_mutex_unlock(&p->mutex);
>              break;
>          }
> +        if (p->pages.num) {
> +            p->pages.num = 0;
> +            qemu_mutex_unlock(&p->mutex);
> +            qemu_mutex_lock(&multifd_send_state->mutex);
> +            p->done = true;
> +            qemu_mutex_unlock(&multifd_send_state->mutex);

That does need commenting as to why it needs the lock around that
trivial done=true

> +            qemu_sem_post(&multifd_send_state->sem);
> +            continue;
> +        }
>          qemu_mutex_unlock(&p->mutex);
>          qemu_sem_wait(&p->sem);
>      }
> @@ -493,6 +540,9 @@ int multifd_save_setup(void)
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      multifd_send_state->count = 0;
> +    qemu_mutex_init(&multifd_send_state->mutex);
> +    qemu_sem_init(&multifd_send_state->sem, 0);
> +    multifd_init_pages(&multifd_send_state->pages);
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -500,12 +550,52 @@ int multifd_save_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          p->quit = false;
>          p->id = i;
> +        p->done = true;
> +        multifd_init_pages(&p->pages);
>          p->name = g_strdup_printf("multifdsend_%d", i);
>          socket_send_channel_create(multifd_new_channel_async, p);
>      }
>      return 0;
>  }
>  
> +static uint16_t multifd_send_page(uint8_t *address, bool last_page)
> +{
> +    int i;
> +    MultiFDSendParams *p = NULL; /* make happy gcc */
> +    multifd_pages_t *pages = &multifd_send_state->pages;
> +
> +    pages->iov[pages->num].iov_base = address;
> +    pages->iov[pages->num].iov_len = TARGET_PAGE_SIZE;
> +    pages->num++;
> +
> +    if (!last_page) {
> +        if (pages->num < (pages->size - 1)) {
> +            return MULTIFD_CONTINUE;
> +        }
> +    }
> +
> +    qemu_sem_wait(&multifd_send_state->sem);
> +    qemu_mutex_lock(&multifd_send_state->mutex);
> +    for (i = 0; i < multifd_send_state->count; i++) {
> +        p = &multifd_send_state->params[i];
> +
> +        if (p->done) {
> +            p->done = false;
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_send_state->mutex);
> +    qemu_mutex_lock(&p->mutex);
> +    p->pages.num = pages->num;
> +    iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0,
> +             iov_size(pages->iov, pages->num));
> +    pages->num = 0;
> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
> +
> +    return 0;
> +}
> +
>  struct MultiFDRecvParams {
>      uint8_t id;
>      char *name;
> @@ -1086,6 +1176,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
>      return pages;
>  }
>  
> +static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
> +                            bool last_stage)
> +{
> +    int pages;
> +    uint8_t *p;
> +    RAMBlock *block = pss->block;
> +    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
> +
> +    p = block->host + offset;
> +
> +    pages = save_zero_page(rs, block, offset, p);
> +    if (pages == -1) {
> +        ram_counters.transferred +=
> +            save_page_header(rs, rs->f, block,
> +                             offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> +        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);

That needs commenting - if I read this right this is a temporary
step until multifd_send_page is complete.

> +        multifd_send_page(p, rs->migration_dirty_pages == 1);

Watch out;  I've got at least one bug where we hit the case of that
never happening (I have some debug to detect the count being wrong).

> +        ram_counters.transferred += TARGET_PAGE_SIZE;
> +        pages = 1;
> +        ram_counters.normal++;
> +    }
> +
> +    return pages;
> +}
> +
>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
> @@ -1514,6 +1629,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
>          if (migrate_use_compression() &&
>              (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
>              res = ram_save_compressed_page(rs, pss, last_stage);
> +        } else if (migrate_use_multifd()) {
> +            res = ram_multifd_page(rs, pss, last_stage);
>          } else {
>              res = ram_save_page(rs, pss, last_stage);
>          }
> @@ -2810,6 +2927,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      if (!migrate_use_compression()) {
>          invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
>      }
> +
> +    if (!migrate_use_multifd()) {
> +        invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE;
> +    }
>      /* This RCU critical section can be very long running.
>       * When RCU reclaims in the code start to become numerous,
>       * it will be necessary to reduce the granularity of this
> @@ -2834,13 +2955,17 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
>                  error_report("Received an unexpected compressed page");
>              }
> +            if (flags & invalid_flags  & RAM_SAVE_FLAG_MULTIFD_PAGE) {
> +                error_report("Received an unexpected multifd page");
> +            }
>  
>              ret = -EINVAL;
>              break;
>          }
>  
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
> -                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
> +                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
> +                     RAM_SAVE_FLAG_MULTIFD_PAGE)) {
>              RAMBlock *block = ram_block_from_stream(f, flags);
>  
>              host = host_from_ram_block_offset(block, addr);
> @@ -2928,6 +3053,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>                  break;
>              }
>              break;
> +
> +        case RAM_SAVE_FLAG_MULTIFD_PAGE:
> +            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> +            break;
> +
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
>              break;

Dave

> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 07/12] migration: Create thread infrastructure for multifd recv side
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 07/12] migration: Create thread infrastructure for multifd recv side Juan Quintela
@ 2017-10-17 11:07   ` Dr. David Alan Gilbert
  2018-01-08  9:24     ` Juan Quintela
  0 siblings, 1 reply; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-17 11:07 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We make the locking and the transfer of information specific, even if we
> are still receiving things through the main thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> 
> We split when we create the main channel and where we start the main
> migration thread, so we wait for the creation of the other threads.
> 
> Use multifd_clear_pages().
> Don't remove object_unref()
> We use correctly the channel numbres
> ---
>  migration/migration.c |  7 +++---
>  migration/migration.h |  1 +
>  migration/ram.c       | 60 +++++++++++++++++++++++++++++++++++++++++++++++----
>  migration/socket.c    |  3 +++
>  4 files changed, 64 insertions(+), 7 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index ee98c50d8c..1e7c537954 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -388,7 +388,7 @@ static void migration_incoming_setup(QEMUFile *f)
>      qemu_file_set_blocking(f, false);
>  }
>  
> -static void migration_incoming_process(void)
> +void migration_incoming_process(void)
>  {
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
>      qemu_coroutine_enter(co);
> @@ -406,9 +406,10 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>  
>      if (!mis->from_src_file) {
>          QEMUFile *f = qemu_fopen_channel_input(ioc);
> -        migration_fd_process_incoming(f);
> +        migration_incoming_setup(f);
> +        return;
>      }
> -    /* We still only have a single channel.  Nothing to do here yet */
> +    multifd_new_channel(ioc);
>  }
>  
>  /**
> diff --git a/migration/migration.h b/migration/migration.h
> index cc196cc87f..a3db60a2a1 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -158,6 +158,7 @@ void migrate_set_state(int *state, int old_state, int new_state);
>  
>  void migration_fd_process_incoming(QEMUFile *f);
>  void migration_ioc_process_incoming(QIOChannel *ioc);
> +void migration_incoming_process(void);
>  
>  bool  migration_has_all_channels(void);
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index 288201e360..745da2971d 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -597,13 +597,18 @@ static uint16_t multifd_send_page(uint8_t *address, bool last_page)
>  }
>  
>  struct MultiFDRecvParams {
> +    /* not changed */
>      uint8_t id;
>      char *name;
>      QemuThread thread;
>      QIOChannel *c;
> +    QemuSemaphore ready;
>      QemuSemaphore sem;
>      QemuMutex mutex;
> +    /* proteced by param mutex */
>      bool quit;
> +    multifd_pages_t pages;
> +    bool done;
>  };
>  typedef struct MultiFDRecvParams MultiFDRecvParams;
>  
> @@ -613,6 +618,7 @@ struct {
>      int count;
>      /* Should we finish */
>      bool quit;
> +    multifd_pages_t pages;
>  } *multifd_recv_state;
>  
>  static void terminate_multifd_recv_threads(Error *errp)
> @@ -634,6 +640,7 @@ static void terminate_multifd_recv_threads(Error *errp)
>          p->quit = true;
>          qemu_sem_post(&p->sem);
>          qemu_mutex_unlock(&p->mutex);
> +        multifd_clear_pages(&p->pages);
>      }
>  }
>  
> @@ -658,6 +665,7 @@ int multifd_load_cleanup(Error **errp)
>      }
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> +    multifd_clear_pages(&multifd_recv_state->pages);
>      g_free(multifd_recv_state);
>      multifd_recv_state = NULL;
>  
> @@ -668,12 +676,20 @@ static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
>  
> +    qemu_sem_post(&p->ready);
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
>          if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
>          }
> +        if (p->pages.num) {
> +            p->pages.num = 0;

This could do with some TODO comments in - since this code
doesn't do anything useful yet and is confusing, but gets clearer
when you add the filling in the later patches.

Dave

> +            p->done = true;
> +            qemu_mutex_unlock(&p->mutex);
> +            qemu_sem_post(&p->ready);
> +            continue;
> +        }
>          qemu_mutex_unlock(&p->mutex);
>          qemu_sem_wait(&p->sem);
>      }
> @@ -714,13 +730,21 @@ void multifd_new_channel(QIOChannel *ioc)
>      }
>      qemu_mutex_init(&p->mutex);
>      qemu_sem_init(&p->sem, 0);
> +    qemu_sem_init(&p->ready, 0);
>      p->quit = false;
>      p->id = msg.id;
> +    p->done = false;
> +    multifd_init_pages(&p->pages);
>      p->c = ioc;
>      multifd_recv_state->count++;
>      p->name = g_strdup_printf("multifdrecv_%d", msg.id);
> +    object_ref(OBJECT(ioc));

It would be good to comment to say where that gets unref'd.

Dave

> +
>      qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
>                         QEMU_THREAD_JOINABLE);
> +    if (multifd_recv_state->count == migrate_multifd_channels()) {
> +        migration_incoming_process();
> +    }
>  }
>  
>  int multifd_load_setup(void)
> @@ -735,6 +759,7 @@ int multifd_load_setup(void)
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      multifd_recv_state->count = 0;
>      multifd_recv_state->quit = false;
> +    multifd_init_pages(&multifd_recv_state->pages);
>      return 0;
>  }
>  
> @@ -743,6 +768,36 @@ int multifd_created_channels(void)
>      return multifd_recv_state->count;
>  }
>  
> +static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
> +{
> +    int thread_count;
> +    MultiFDRecvParams *p;
> +    multifd_pages_t *pages = &multifd_recv_state->pages;
> +
> +    pages->iov[pages->num].iov_base = address;
> +    pages->iov[pages->num].iov_len = TARGET_PAGE_SIZE;
> +    pages->num++;
> +
> +    if (fd_num == MULTIFD_CONTINUE) {
> +        return;
> +    }
> +
> +    thread_count = migrate_multifd_channels();
> +    assert(fd_num < thread_count);
> +    p = &multifd_recv_state->params[fd_num];
> +
> +    qemu_sem_wait(&p->ready);
> +
> +    qemu_mutex_lock(&p->mutex);
> +    p->done = false;
> +    iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0,
> +             iov_size(pages->iov, pages->num));
> +    p->pages.num = pages->num;
> +    pages->num = 0;
> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> @@ -3060,10 +3115,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>              fd_num = qemu_get_be16(f);
> -            if (fd_num != 0) {
> -                /* this is yet an unused variable, changed later */
> -                fd_num = fd_num;
> -            }
> +            multifd_recv_page(host, fd_num);
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
>  
> diff --git a/migration/socket.c b/migration/socket.c
> index 22fb05edc8..debe972ee8 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -186,6 +186,9 @@ out:
>      if (migration_has_all_channels()) {
>          /* Close listening socket as its no longer needed */
>          qio_channel_close(ioc, NULL);
> +        if (!migrate_use_multifd()) {
> +            migration_incoming_process();
> +        }
>          return G_SOURCE_REMOVE;
>      } else {
>          return G_SOURCE_CONTINUE;
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 08/12] migration: Test new fd infrastructure
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 08/12] migration: Test new fd infrastructure Juan Quintela
@ 2017-10-17 11:11   ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-17 11:11 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We just send the address through the alternate channels and test that it
> is ok.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

I remember questions on this patch from last time as well; this is just
test isn't it, and all this gets changed in later patches.  So I'm
not too sure of the point, especially since you could use
qio_channel_writev_all  here and make the changes smaller.

Dave

> --
> 
> Use qio_channel_*all functions
> ---
>  migration/ram.c | 39 +++++++++++++++++++++++++++++++++++++++
>  1 file changed, 39 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 745da2971d..4c16d0775b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -492,8 +492,24 @@ static void *multifd_send_thread(void *opaque)
>              break;
>          }
>          if (p->pages.num) {
> +            Error *local_err = NULL;
> +            size_t ret;
> +            int i;
> +            int num;
> +
> +            num = p->pages.num;
>              p->pages.num = 0;
>              qemu_mutex_unlock(&p->mutex);
> +
> +            for (i = 0; i < num; i++) {
> +                ret = qio_channel_write_all(p->c,
> +                         (const char *)&p->pages.iov[i].iov_base,
> +                         sizeof(uint8_t *), &local_err);
> +                if (ret != 0) {
> +                    terminate_multifd_send_threads(local_err);
> +                    return NULL;
> +                }
> +            }
>              qemu_mutex_lock(&multifd_send_state->mutex);
>              p->done = true;
>              qemu_mutex_unlock(&multifd_send_state->mutex);
> @@ -675,6 +691,7 @@ int multifd_load_cleanup(Error **errp)
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
> +    uint8_t *recv_address;
>  
>      qemu_sem_post(&p->ready);
>      while (true) {
> @@ -684,7 +701,29 @@ static void *multifd_recv_thread(void *opaque)
>              break;
>          }
>          if (p->pages.num) {
> +            Error *local_err = NULL;
> +            size_t ret;
> +            int i;
> +            int num;
> +
> +            num = p->pages.num;
>              p->pages.num = 0;
> +
> +            for (i = 0; i < num; i++) {
> +                ret = qio_channel_read_all(p->c, (char *)&recv_address,
> +                                           sizeof(uint8_t *), &local_err);
> +                if (ret != 0) {
> +                    terminate_multifd_recv_threads(local_err);
> +                    return NULL;
> +                }
> +                if (recv_address != p->pages.iov[i].iov_base) {
> +                    error_setg(&local_err, "received %p and expecting %p (%d)",
> +                               recv_address, p->pages.iov[i].iov_base, i);
> +                    terminate_multifd_recv_threads(local_err);
> +                    return NULL;
> +                }
> +            }
> +
>              p->done = true;
>              qemu_mutex_unlock(&p->mutex);
>              qemu_sem_post(&p->ready);
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 10/12] migration: Transfer pages over new channels
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 10/12] migration: Transfer pages over new channels Juan Quintela
@ 2017-10-17 14:18   ` Dr. David Alan Gilbert
  2018-01-08  9:40     ` Juan Quintela
  0 siblings, 1 reply; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-17 14:18 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We switch for sending the page number to send real pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

I think this is OK if squashed with the 'test' patch to remove
the test stuff.

Some minor comments below.

> --
> 
> Remove the HACK bit, now we have the function that calculates the size
> of a page exported.
> Rename multifd_pages{_now}, to sent pages
> Remove multifd pages field, it is the same than normal pages
> ---
>  migration/migration.c |  7 ++++++-
>  migration/ram.c       | 39 +++++++++++----------------------------
>  2 files changed, 17 insertions(+), 29 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 54ef095d82..1bd87a4e44 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2085,6 +2085,7 @@ static void *migration_thread(void *opaque)
>       */
>      int64_t threshold_size = 0;
>      int64_t qemu_file_bytes = 0;
> +    int64_t sent_pages = 0;
>      int64_t start_time = initial_time;
>      int64_t end_time;
>      bool old_vm_running = false;
> @@ -2173,8 +2174,11 @@ static void *migration_thread(void *opaque)
>          current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>          if (current_time >= initial_time + BUFFER_DELAY) {
>              uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
> +            uint64_t sent_pages_now = ram_counters.normal;
>              uint64_t transferred_bytes =
> -                qemu_file_bytes_now - qemu_file_bytes;
> +                (qemu_file_bytes_now - qemu_file_bytes) +
> +                (sent_pages_now - sent_pages) *
> +                qemu_target_page_size();

This could do with commenting to explain the difference between the
two sets of counts.

>              uint64_t time_spent = current_time - initial_time;
>              double bandwidth = (double)transferred_bytes / time_spent;
>              threshold_size = bandwidth * s->parameters.downtime_limit;
> @@ -2194,6 +2198,7 @@ static void *migration_thread(void *opaque)
>              qemu_file_reset_rate_limit(s->to_dst_file);
>              initial_time = current_time;
>              qemu_file_bytes = qemu_file_bytes_now;
> +            sent_pages = sent_pages_now;
>          }
>          if (qemu_file_rate_limit(s->to_dst_file)) {
>              /* usleep expects microseconds */
> diff --git a/migration/ram.c b/migration/ram.c
> index 4c16d0775b..981f345294 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -494,21 +494,15 @@ static void *multifd_send_thread(void *opaque)
>          if (p->pages.num) {
>              Error *local_err = NULL;
>              size_t ret;
> -            int i;
>              int num;
>  
>              num = p->pages.num;
>              p->pages.num = 0;
>              qemu_mutex_unlock(&p->mutex);
> -
> -            for (i = 0; i < num; i++) {
> -                ret = qio_channel_write_all(p->c,
> -                         (const char *)&p->pages.iov[i].iov_base,
> -                         sizeof(uint8_t *), &local_err);
> -                if (ret != 0) {
> -                    terminate_multifd_send_threads(local_err);
> -                    return NULL;
> -                }
> +            ret = qio_channel_writev_all(p->c, p->pages.iov, num, &local_err);
> +            if (ret != 0) {
> +                terminate_multifd_send_threads(local_err);
> +                return NULL;
>              }
>              qemu_mutex_lock(&multifd_send_state->mutex);
>              p->done = true;
> @@ -691,7 +685,6 @@ int multifd_load_cleanup(Error **errp)
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
> -    uint8_t *recv_address;
>  
>      qemu_sem_post(&p->ready);
>      while (true) {
> @@ -703,27 +696,16 @@ static void *multifd_recv_thread(void *opaque)
>          if (p->pages.num) {
>              Error *local_err = NULL;
>              size_t ret;
> -            int i;
>              int num;
>  
>              num = p->pages.num;
>              p->pages.num = 0;
>  
> -            for (i = 0; i < num; i++) {
> -                ret = qio_channel_read_all(p->c, (char *)&recv_address,
> -                                           sizeof(uint8_t *), &local_err);
> -                if (ret != 0) {
> -                    terminate_multifd_recv_threads(local_err);
> -                    return NULL;
> -                }
> -                if (recv_address != p->pages.iov[i].iov_base) {
> -                    error_setg(&local_err, "received %p and expecting %p (%d)",
> -                               recv_address, p->pages.iov[i].iov_base, i);
> -                    terminate_multifd_recv_threads(local_err);
> -                    return NULL;
> -                }
> +            ret = qio_channel_readv_all(p->c, p->pages.iov, num, &local_err);
> +            if (ret != 0) {
> +                terminate_multifd_recv_threads(local_err);
> +                return NULL;
>              }

A trace or two in each of these threads would probably help understand
what's going on.

> -
>              p->done = true;
>              qemu_mutex_unlock(&p->mutex);
>              qemu_sem_post(&p->ready);
> @@ -1288,8 +1270,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
>                               offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>          fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
>          qemu_put_be16(rs->f, fd_num);
> +        if (fd_num != MULTIFD_CONTINUE) {
> +            qemu_fflush(rs->f);
> +        }

Could do with a comment.

Dave

>          ram_counters.transferred += 2; /* size of fd_num */
> -        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
>          ram_counters.transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          ram_counters.normal++;
> @@ -3155,7 +3139,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>              fd_num = qemu_get_be16(f);
>              multifd_recv_page(host, fd_num);
> -            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
>  
>          case RAM_SAVE_FLAG_EOS:
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 11/12] migration: Flush receive queue
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 11/12] migration: Flush receive queue Juan Quintela
@ 2017-10-17 14:51   ` Dr. David Alan Gilbert
  2017-12-11  9:40     ` Juan Quintela
  0 siblings, 1 reply; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-17 14:51 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> We are low on page flags, so we use a combination that is not valid to
> emit that message:  MULTIFD_PAGE and COMPRESSED.
> 
> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Create RAM_SAVE_FLAG_MULTIFD_SYNC (dave suggestion)
> Move the set of need_flush to inside the bitmap_sync code (peter suggestion)
> ---
>  migration/ram.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 55 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 981f345294..d717776f32 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -72,6 +72,14 @@
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>  
> +/* We are getting low on pages flags, so we start using combinations
> +   When we need to flush a page, we sent it as
> +   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
> +   We don't allow that combination
> +*/
> +#define RAM_SAVE_FLAG_MULTIFD_SYNC \
> +    (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)

Good that's better than last time; note you're using FLAG_ZERO where
the comment says COMPRESS_PAGE and the commit message says COMPRESSED.

> +
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>  {
>      return buffer_is_zero(p, size);
> @@ -194,6 +202,9 @@ struct RAMState {
>      uint64_t iterations_prev;
>      /* Iterations since start */
>      uint64_t iterations;
> +    /* Indicates if we have synced the bitmap and we need to assure that
> +       target has processeed all previous pages */
> +    bool multifd_needs_flush;
>      /* number of dirty bits in the bitmap */
>      uint64_t migration_dirty_pages;
>      /* protects modification of the bitmap */
> @@ -614,9 +625,11 @@ struct MultiFDRecvParams {
>      QIOChannel *c;
>      QemuSemaphore ready;
>      QemuSemaphore sem;
> +    QemuCond cond_sync;
>      QemuMutex mutex;
>      /* proteced by param mutex */
>      bool quit;
> +    bool sync;
>      multifd_pages_t pages;
>      bool done;
>  };
> @@ -669,6 +682,7 @@ int multifd_load_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_cond_destroy(&p->cond_sync);
>          socket_recv_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
> @@ -707,6 +721,10 @@ static void *multifd_recv_thread(void *opaque)
>                  return NULL;
>              }
>              p->done = true;
> +            if (p->sync) {
> +                qemu_cond_signal(&p->cond_sync);
> +                p->sync = false;
> +            }
>              qemu_mutex_unlock(&p->mutex);
>              qemu_sem_post(&p->ready);
>              continue;
> @@ -752,9 +770,11 @@ void multifd_new_channel(QIOChannel *ioc)
>      qemu_mutex_init(&p->mutex);
>      qemu_sem_init(&p->sem, 0);
>      qemu_sem_init(&p->ready, 0);
> +    qemu_cond_init(&p->cond_sync);
>      p->quit = false;
>      p->id = msg.id;
>      p->done = false;
> +    p->sync = false;
>      multifd_init_pages(&p->pages);
>      p->c = ioc;
>      multifd_recv_state->count++;
> @@ -819,6 +839,27 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
>      qemu_sem_post(&p->sem);
>  }
>  
> +static int multifd_flush(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_multifd()) {
> +        return 0;
> +    }
> +    thread_count = migrate_multifd_channels();
> +    for (i = 0; i < thread_count; i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        while (!p->done) {
> +            p->sync = true;
> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
> +        }
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    return 0;
> +}

I wonder if we need some way of terminating this on error
(e.g. if terminate_multifd_recev_threads is called for an error
case).

Dave

>  /**
>   * save_page_header: write page header to wire
>   *
> @@ -836,6 +877,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
>  {
>      size_t size, len;
>  
> +    if (rs->multifd_needs_flush &&
> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> +        offset |= RAM_SAVE_FLAG_ZERO;
> +        rs->multifd_needs_flush = false;
> +    }
> +
>      if (block == rs->last_sent_block) {
>          offset |= RAM_SAVE_FLAG_CONTINUE;
>      }
> @@ -1124,6 +1171,9 @@ static void migration_bitmap_sync(RAMState *rs)
>      if (migrate_use_events()) {
>          qapi_event_send_migration_pass(ram_counters.dirty_sync_count, NULL);
>      }
> +    if (!rs->ram_bulk_stage && migrate_use_multifd()) {
> +        rs->multifd_needs_flush = true;
> +    }
>  }
>  
>  /**
> @@ -3045,6 +3095,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
>          }
>  
> +        if ((flags & RAM_SAVE_FLAG_MULTIFD_SYNC)
> +            == RAM_SAVE_FLAG_MULTIFD_SYNC) {
> +            multifd_flush();
> +            flags = flags & ~RAM_SAVE_FLAG_ZERO;
> +        }
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 12/12] migration: Add multifd test
  2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 12/12] migration: Add multifd test Juan Quintela
@ 2017-10-17 15:27   ` Dr. David Alan Gilbert
  2017-12-11  9:40     ` Juan Quintela
  0 siblings, 1 reply; 35+ messages in thread
From: Dr. David Alan Gilbert @ 2017-10-17 15:27 UTC (permalink / raw)
  To: Juan Quintela; +Cc: qemu-devel, lvivier, peterx

* Juan Quintela (quintela@redhat.com) wrote:
> We set the x-multifd-page-count and x-multifd-channels.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

<snip>
We *must* find a way to share all the boiler plate I've snipped
out;  even this test_migrate function is almost identical to
your other test series with just those few extra parameter sets.

> +static void test_migrate(void)
> +{
> +    char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
> +    QTestState *global = global_qtest, *from, *to;
> +    unsigned char dest_byte_a, dest_byte_b, dest_byte_c, dest_byte_d;
> +    gchar *cmd, *cmd_src, *cmd_dst;
> +    QDict *rsp;
> +
> +    char *bootpath = g_strdup_printf("%s/bootsect", tmpfs);
> +    const char *arch = qtest_get_arch();
> +
> +    got_stop = false;
> +
> +    if (strcmp(arch, "i386") == 0 || strcmp(arch, "x86_64") == 0) {
> +        init_bootfile_x86(bootpath);
> +        cmd_src = g_strdup_printf("-machine accel=kvm:tcg -m 150M"
> +                                  " -name pcsource,debug-threads=on"
> +                                  " -serial file:%s/src_serial"
> +                                  " -drive file=%s,format=raw",
> +                                  tmpfs, bootpath);
> +        cmd_dst = g_strdup_printf("-machine accel=kvm:tcg -m 150M"
> +                                  " -name pcdest,debug-threads=on"
> +                                  " -serial file:%s/dest_serial"
> +                                  " -drive file=%s,format=raw"
> +                                  " -incoming %s",
> +                                  tmpfs, bootpath, uri);
> +    } else if (strcmp(arch, "ppc64") == 0) {
> +        const char *accel;
> +
> +        /* On ppc64, the test only works with kvm-hv, but not with kvm-pr */
> +        accel = access("/sys/module/kvm_hv", F_OK) ? "tcg" : "kvm:tcg";
> +        init_bootfile_ppc(bootpath);
> +        cmd_src = g_strdup_printf("-machine accel=%s -m 256M"
> +                                  " -name pcsource,debug-threads=on"
> +                                  " -serial file:%s/src_serial"
> +                                  " -drive file=%s,if=pflash,format=raw",
> +                                  accel, tmpfs, bootpath);
> +        cmd_dst = g_strdup_printf("-machine accel=%s -m 256M"
> +                                  " -name pcdest,debug-threads=on"
> +                                  " -serial file:%s/dest_serial"
> +                                  " -incoming %s",
> +                                  accel, tmpfs, uri);
> +    } else {
> +        g_assert_not_reached();
> +    }
> +
> +    g_free(bootpath);
> +
> +    from = qtest_start(cmd_src);
> +    g_free(cmd_src);
> +
> +    to = qtest_init(cmd_dst);
> +    g_free(cmd_dst);
> +
> +    global_qtest = from;
> +    rsp = qmp("{ 'execute': 'migrate-set-capabilities',"
> +                  "'arguments': { "
> +                      "'capabilities': [ {"
> +                          "'capability': 'x-multifd',"
> +                          "'state': true } ] } }");
> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +    global_qtest = to;
> +    rsp = qmp("{ 'execute': 'migrate-set-capabilities',"
> +                  "'arguments': { "
> +                      "'capabilities': [ {"
> +                          "'capability': 'x-multifd',"
> +                          "'state': true } ] } }");
> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +    /* We want to pick a speed slow enough that the test completes
> +     * quickly, but that it doesn't complete precopy even on a slow
> +     * machine, so also set the downtime.
> +     */
> +    global_qtest = from;
> +    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
> +              "'arguments': { 'max-bandwidth': 100000000 } }");
> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +    /* 200ms downtime */
> +    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
> +              "'arguments': { 'downtime-limit': 300 } }");

Note 200 vs 300 !

> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +    /* set 4 channels */
> +    global_qtest = to;
> +    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
> +              "'arguments': { 'x-multifd-channels': 4 } }");
> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
> +              "'arguments': { 'x-multifd-page-count': 64 } }");
> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +    /* set 4 channels */
> +    global_qtest = from;
> +    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
> +              "'arguments': { 'x-multifd-channels': 4 } }");
> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
> +              "'arguments': { 'x-multifd-page-count': 64 } }");
> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +
> +    /* Wait for the first serial output from the source */
> +    wait_for_serial("src_serial");
> +
> +    cmd = g_strdup_printf("{ 'execute': 'migrate',"
> +                          "'arguments': { 'uri': '%s' } }",
> +                          uri);
> +    rsp = qmp(cmd);
> +    g_free(cmd);
> +    g_assert(qdict_haskey(rsp, "return"));
> +    QDECREF(rsp);
> +
> +    wait_for_migration_pass();
> +
> +    if (!got_stop) {
> +        qmp_eventwait("STOP");
> +    }
> +
> +    global_qtest = to;
> +    qmp_eventwait("RESUME");
> +
> +    wait_for_serial("dest_serial");
> +    global_qtest = from;
> +    wait_for_migration_complete();
> +
> +    qtest_quit(from);
> +
> +    global_qtest = to;
> +
> +    qtest_memread(to, start_address, &dest_byte_a, 1);
> +
> +    /* Destination still running, wait for a byte to change */
> +    do {
> +        qtest_memread(to, start_address, &dest_byte_b, 1);
> +        usleep(10 * 1000);
> +    } while (dest_byte_a == dest_byte_b);

Are there any multifd stats we can check?  I guess the migration
wont start until all channels are connected, so we know they're all
in use in theory, so there's nothing specific.

Dave

> +    qmp_discard_response("{ 'execute' : 'stop'}");
> +    /* With it stopped, check nothing changes */
> +    qtest_memread(to, start_address, &dest_byte_c, 1);
> +    sleep(1);
> +    qtest_memread(to, start_address, &dest_byte_d, 1);
> +    g_assert_cmpint(dest_byte_c, ==, dest_byte_d);
> +
> +    check_guests_ram();
> +
> +    qtest_quit(to);
> +    g_free(uri);
> +
> +    global_qtest = global;
> +
> +    cleanup("bootsect");
> +    cleanup("migsocket");
> +    cleanup("src_serial");
> +    cleanup("dest_serial");
> +}
> +
> +int main(int argc, char **argv)
> +{
> +    char template[] = "/tmp/multifd-test-XXXXXX";
> +    int ret;
> +
> +    g_test_init(&argc, &argv, NULL);
> +
> +    tmpfs = mkdtemp(template);
> +    if (!tmpfs) {
> +        g_test_message("mkdtemp on path (%s): %s\n", template, strerror(errno));
> +    }
> +    g_assert(tmpfs);
> +
> +    module_call_init(MODULE_INIT_QOM);
> +
> +    qtest_add_func("/multifd", test_migrate);
> +
> +    ret = g_test_run();
> +
> +    g_assert_cmpint(ret, ==, 0);
> +
> +    ret = rmdir(tmpfs);
> +    if (ret != 0) {
> +        g_test_message("unable to rmdir: path (%s): %s\n",
> +                       tmpfs, strerror(errno));
> +    }
> +
> +    return ret;
> +}
> -- 
> 2.13.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work
  2017-10-16 19:11   ` Dr. David Alan Gilbert
@ 2017-12-09 16:46     ` Juan Quintela
  0 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-12-09 16:46 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:

>> +static void multifd_new_channel_async(QIOTask *task, gpointer opaque)
>> +{
>> +    MultiFDSendParams *p = opaque;
>> +    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
>> +    Error *local_err;
>
> Does that need an = NULL ?


Good catch.  Fixed.

>> +    if (errp) {
>> +        MigrationState *s = migrate_get_current();
>> +        migrate_set_error(s, errp);
>> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
>> +                          MIGRATION_STATUS_FAILED);
>
> Are we necessarily in ACTIVE at this point?   I suspect there
> are some SETUP and I wonder if there are others.

We only care about SETUP & ACTIVE.  We could still be on SETUP here as
far as I can see.  Fixed for both send and recv size.

Later, Juan.

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 12/12] migration: Add multifd test
  2017-10-17 15:27   ` Dr. David Alan Gilbert
@ 2017-12-11  9:40     ` Juan Quintela
  0 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-12-11  9:40 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We set the x-multifd-page-count and x-multifd-channels.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>
> <snip>
> We *must* find a way to share all the boiler plate I've snipped
> out;  even this test_migrate function is almost identical to
> your other test series with just those few extra parameter sets.

See my migration-test series, I changed this a lot.  I think that
without using function pointers, it is not easy to share more code that
what I did there (and I am not sure that using function pointers makes
things easier to understand).


>> +    /* 200ms downtime */
>> +    rsp = qmp("{ 'execute': 'migrate-set-parameters',"
>> +              "'arguments': { 'downtime-limit': 300 } }");
>
> Note 200 vs 300 !


Fixed when I merged with the previous cod.

>> +    /* Destination still running, wait for a byte to change */
>> +    do {
>> +        qtest_memread(to, start_address, &dest_byte_b, 1);
>> +        usleep(10 * 1000);
>> +    } while (dest_byte_a == dest_byte_b);
>
> Are there any multifd stats we can check?  I guess the migration
> wont start until all channels are connected, so we know they're all
> in use in theory, so there's nothing specific.

We can check that all threads have been created, for instance.

Later, Juan.

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 11/12] migration: Flush receive queue
  2017-10-17 14:51   ` Dr. David Alan Gilbert
@ 2017-12-11  9:40     ` Juan Quintela
  0 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2017-12-11  9:40 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:

>> +/* We are getting low on pages flags, so we start using combinations
>> +   When we need to flush a page, we sent it as
>> +   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
>> +   We don't allow that combination
>> +*/
>> +#define RAM_SAVE_FLAG_MULTIFD_SYNC \
>> +    (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)
>
> Good that's better than last time; note you're using FLAG_ZERO where
> the comment says COMPRESS_PAGE and the commit message says COMPRESSED.

Fixed.

>
>> +
>>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>>  {
>>      return buffer_is_zero(p, size);
>> @@ -194,6 +202,9 @@ struct RAMState {
>>      uint64_t iterations_prev;
>>      /* Iterations since start */
>>      uint64_t iterations;
>> +    /* Indicates if we have synced the bitmap and we need to assure that
>> +       target has processeed all previous pages */
>> +    bool multifd_needs_flush;
>>      /* number of dirty bits in the bitmap */
>>      uint64_t migration_dirty_pages;
>>      /* protects modification of the bitmap */
>> @@ -614,9 +625,11 @@ struct MultiFDRecvParams {
>>      QIOChannel *c;
>>      QemuSemaphore ready;
>>      QemuSemaphore sem;
>> +    QemuCond cond_sync;
>>      QemuMutex mutex;
>>      /* proteced by param mutex */
>>      bool quit;
>> +    bool sync;
>>      multifd_pages_t pages;
>>      bool done;
>>  };
>> @@ -669,6 +682,7 @@ int multifd_load_cleanup(Error **errp)
>>          qemu_thread_join(&p->thread);
>>          qemu_mutex_destroy(&p->mutex);
>>          qemu_sem_destroy(&p->sem);
>> +        qemu_cond_destroy(&p->cond_sync);
>>          socket_recv_channel_destroy(p->c);
>>          g_free(p->name);
>>          p->name = NULL;
>> @@ -707,6 +721,10 @@ static void *multifd_recv_thread(void *opaque)
>>                  return NULL;
>>              }
>>              p->done = true;
>> +            if (p->sync) {
>> +                qemu_cond_signal(&p->cond_sync);
>> +                p->sync = false;
>> +            }
>>              qemu_mutex_unlock(&p->mutex);
>>              qemu_sem_post(&p->ready);
>>              continue;
>> @@ -752,9 +770,11 @@ void multifd_new_channel(QIOChannel *ioc)
>>      qemu_mutex_init(&p->mutex);
>>      qemu_sem_init(&p->sem, 0);
>>      qemu_sem_init(&p->ready, 0);
>> +    qemu_cond_init(&p->cond_sync);
>>      p->quit = false;
>>      p->id = msg.id;
>>      p->done = false;
>> +    p->sync = false;
>>      multifd_init_pages(&p->pages);
>>      p->c = ioc;
>>      multifd_recv_state->count++;
>> @@ -819,6 +839,27 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
>>      qemu_sem_post(&p->sem);
>>  }
>>  
>> +static int multifd_flush(void)
>> +{
>> +    int i, thread_count;
>> +
>> +    if (!migrate_use_multifd()) {
>> +        return 0;
>> +    }
>> +    thread_count = migrate_multifd_channels();
>> +    for (i = 0; i < thread_count; i++) {
>> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
>> +
>> +        qemu_mutex_lock(&p->mutex);
>> +        while (!p->done) {
>> +            p->sync = true;
>> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
>> +        }
>> +        qemu_mutex_unlock(&p->mutex);
>> +    }
>> +    return 0;
>> +}
pD>
> I wonder if we need some way of terminating this on error
> (e.g. if terminate_multifd_recev_threads is called for an error
> case).

It could be, I have to think about this.

Later, Juan.

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 07/12] migration: Create thread infrastructure for multifd recv side
  2017-10-17 11:07   ` Dr. David Alan Gilbert
@ 2018-01-08  9:24     ` Juan Quintela
  0 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2018-01-08  9:24 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We make the locking and the transfer of information specific, even if we
>> are still receiving things through the main thread.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> 
>> --
>> 
>> We split when we create the main channel and where we start the main
>> migration thread, so we wait for the creation of the other threads.
>> 

>> @@ -668,12 +676,20 @@ static void *multifd_recv_thread(void *opaque)
>>  {
>>      MultiFDRecvParams *p = opaque;
>>  
>> +    qemu_sem_post(&p->ready);
>>      while (true) {
>>          qemu_mutex_lock(&p->mutex);
>>          if (p->quit) {
>>              qemu_mutex_unlock(&p->mutex);
>>              break;
>>          }
>> +        if (p->pages.num) {
>> +            p->pages.num = 0;
>
> This could do with some TODO comments in - since this code
> doesn't do anything useful yet and is confusing, but gets clearer
> when you add the filling in the later patches.

Added.


>> +            p->done = true;
>> +            qemu_mutex_unlock(&p->mutex);
>> +            qemu_sem_post(&p->ready);
>> +            continue;
>> +        }
>>          qemu_mutex_unlock(&p->mutex);
>>          qemu_sem_wait(&p->sem);
>>      }
>> @@ -714,13 +730,21 @@ void multifd_new_channel(QIOChannel *ioc)
>>      }
>>      qemu_mutex_init(&p->mutex);
>>      qemu_sem_init(&p->sem, 0);
>> +    qemu_sem_init(&p->ready, 0);
>>      p->quit = false;
>>      p->id = msg.id;
>> +    p->done = false;
>> +    multifd_init_pages(&p->pages);
>>      p->c = ioc;
>>      multifd_recv_state->count++;
>>      p->name = g_strdup_printf("multifdrecv_%d", msg.id);
>> +    object_ref(OBJECT(ioc));
>
> It would be good to comment to say where that gets unref'd.

Added this on Start of multiple fd work patch.

It belongs there, and there is where the unref is done.

Thanks, Juan.

^ permalink raw reply	[flat|nested] 35+ messages in thread

* Re: [Qemu-devel] [PATCH v9 10/12] migration: Transfer pages over new channels
  2017-10-17 14:18   ` Dr. David Alan Gilbert
@ 2018-01-08  9:40     ` Juan Quintela
  0 siblings, 0 replies; 35+ messages in thread
From: Juan Quintela @ 2018-01-08  9:40 UTC (permalink / raw)
  To: Dr. David Alan Gilbert; +Cc: qemu-devel, lvivier, peterx

"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We switch for sending the page number to send real pages.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>
> I think this is OK if squashed with the 'test' patch to remove
> the test stuff.

Done.

>
> Some minor comments below.
>
>> --
>> 
>> Remove the HACK bit, now we have the function that calculates the size
>> of a page exported.
>> Rename multifd_pages{_now}, to sent pages
>> Remove multifd pages field, it is the same than normal pages
>> ---
>>  migration/migration.c |  7 ++++++-
>>  migration/ram.c       | 39 +++++++++++----------------------------
>>  2 files changed, 17 insertions(+), 29 deletions(-)
>> 
>> diff --git a/migration/migration.c b/migration/migration.c
>> index 54ef095d82..1bd87a4e44 100644
>> --- a/migration/migration.c
>> +++ b/migration/migration.c
>> @@ -2085,6 +2085,7 @@ static void *migration_thread(void *opaque)
>>       */
>>      int64_t threshold_size = 0;
>>      int64_t qemu_file_bytes = 0;
>> +    int64_t sent_pages = 0;
>>      int64_t start_time = initial_time;
>>      int64_t end_time;
>>      bool old_vm_running = false;
>> @@ -2173,8 +2174,11 @@ static void *migration_thread(void *opaque)
>>          current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>>          if (current_time >= initial_time + BUFFER_DELAY) {
>>              uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
>> +            uint64_t sent_pages_now = ram_counters.normal;
>>              uint64_t transferred_bytes =
>> -                qemu_file_bytes_now - qemu_file_bytes;
>> +                (qemu_file_bytes_now - qemu_file_bytes) +
>> +                (sent_pages_now - sent_pages) *
>> +                qemu_target_page_size();
>
> This could do with commenting to explain the difference between the
> two sets of counts.

Rework it to make clear that multifd data is not sent through qemu file.

>> @@ -1288,8 +1270,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
>>                               offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>>          fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
>>          qemu_put_be16(rs->f, fd_num);
>> +        if (fd_num != MULTIFD_CONTINUE) {
>> +            qemu_fflush(rs->f);
>> +        }
>
> Could do with a comment.

Done.

Later, Juan.

^ permalink raw reply	[flat|nested] 35+ messages in thread

end of thread, other threads:[~2018-01-08  9:40 UTC | newest]

Thread overview: 35+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2017-10-04 10:46 [Qemu-devel] [PATCH v9 00/12] Multifd Juan Quintela
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 01/12] qapi: Fix grammar in x-multifd-page-count descriptions Juan Quintela
2017-10-16 16:53   ` Dr. David Alan Gilbert
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 02/12] migration: Improve migration thread error handling Juan Quintela
2017-10-09  9:28   ` Peter Xu
2017-10-16 17:34   ` Dr. David Alan Gilbert
2017-10-16 17:48     ` Dr. David Alan Gilbert
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 03/12] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
2017-10-09  9:34   ` Peter Xu
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 04/12] migration: Start of multiple fd work Juan Quintela
2017-10-09 10:05   ` Peter Xu
2017-10-09 10:15   ` Daniel P. Berrange
2017-10-09 12:32     ` Juan Quintela
2017-10-09 12:32     ` Juan Quintela
2017-10-16 19:11   ` Dr. David Alan Gilbert
2017-12-09 16:46     ` Juan Quintela
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 05/12] migration: Create ram_multifd_page Juan Quintela
2017-10-09 13:08   ` Paolo Bonzini
2017-10-16 19:43   ` Dr. David Alan Gilbert
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 06/12] migration: Send the fd number which we are going to use for this page Juan Quintela
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 07/12] migration: Create thread infrastructure for multifd recv side Juan Quintela
2017-10-17 11:07   ` Dr. David Alan Gilbert
2018-01-08  9:24     ` Juan Quintela
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 08/12] migration: Test new fd infrastructure Juan Quintela
2017-10-17 11:11   ` Dr. David Alan Gilbert
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 09/12] migration: Rename initial_bytes Juan Quintela
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 10/12] migration: Transfer pages over new channels Juan Quintela
2017-10-17 14:18   ` Dr. David Alan Gilbert
2018-01-08  9:40     ` Juan Quintela
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 11/12] migration: Flush receive queue Juan Quintela
2017-10-17 14:51   ` Dr. David Alan Gilbert
2017-12-11  9:40     ` Juan Quintela
2017-10-04 10:46 ` [Qemu-devel] [PATCH v9 12/12] migration: Add multifd test Juan Quintela
2017-10-17 15:27   ` Dr. David Alan Gilbert
2017-12-11  9:40     ` Juan Quintela

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).