All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: qemu-devel@nongnu.org, lvivier@redhat.com, peterx@redhat.com
Subject: Re: [Qemu-devel] [PATCH v14 07/12] migration: Synchronize multifd threads with main thread
Date: Wed, 20 Jun 2018 11:30:15 +0100	[thread overview]
Message-ID: <20180620103014.GG2549@work-vm> (raw)
In-Reply-To: <20180620081524.5751-8-quintela@redhat.com>

* Juan Quintela (quintela@redhat.com) wrote:
> We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
> synchronizations don't happen inside a  ram section, so we are safe
> about two channels trying to overwrite the same memory.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> --
> seq needs to be atomic now, will also be accessed from main thread.
> Fix the if (true || ...) leftover
> ---
>  migration/ram.c        | 147 ++++++++++++++++++++++++++++++++---------
>  migration/trace-events |   6 ++
>  2 files changed, 122 insertions(+), 31 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 793f0dc5d3..516f347d24 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -510,6 +510,8 @@ exit:
>  #define MULTIFD_MAGIC 0x11223344U
>  #define MULTIFD_VERSION 1
>  
> +#define MULTIFD_FLAG_SYNC (1 << 0)
> +
>  typedef struct {
>      uint32_t magic;
>      uint32_t version;
> @@ -577,6 +579,8 @@ typedef struct {
>      uint32_t num_packets;
>      /* pages sent through this channel */
>      uint32_t num_pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
>  }  MultiFDSendParams;
>  
>  typedef struct {
> @@ -614,6 +618,8 @@ typedef struct {
>      uint32_t num_packets;
>      /* pages sent through this channel */
>      uint32_t num_pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
>  } MultiFDRecvParams;
>  
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> @@ -801,6 +807,10 @@ struct {
>      int count;
>      /* array of pages to sent */
>      MultiFDPages_t *pages;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
> +    /* global number of generated multifd packets */
> +    uint64_t packet_num;
>  } *multifd_send_state;
>  
>  static void multifd_send_terminate_threads(Error *err)
> @@ -848,6 +858,7 @@ int multifd_save_cleanup(Error **errp)
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_sem_destroy(&p->sem_sync);
>          g_free(p->name);
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
> @@ -856,6 +867,7 @@ int multifd_save_cleanup(Error **errp)
>          g_free(p->packet);
>          p->packet = NULL;
>      }
> +    qemu_sem_destroy(&multifd_send_state->sem_sync);
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
>      multifd_pages_clear(multifd_send_state->pages);
> @@ -865,6 +877,33 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +static void multifd_send_sync_main(void)
> +{
> +    int i;
> +
> +    if (!migrate_use_multifd()) {
> +        return;
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +        trace_multifd_send_sync_main_signal(p->id);
> +
> +        qemu_mutex_lock(&p->mutex);
> +        p->flags |= MULTIFD_FLAG_SYNC;
> +        p->pending_job++;
> +        qemu_mutex_unlock(&p->mutex);
> +        qemu_sem_post(&p->sem);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +        trace_multifd_send_sync_main_wait(p->id);
> +        qemu_sem_wait(&multifd_send_state->sem_sync);
> +    }
> +    trace_multifd_send_sync_main(atomic_read(&multifd_send_state->packet_num));
> +}
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> @@ -901,15 +940,17 @@ static void *multifd_send_thread(void *opaque)
>              qemu_mutex_lock(&p->mutex);
>              p->pending_job--;
>              qemu_mutex_unlock(&p->mutex);
> -            continue;
> +
> +            if (flags & MULTIFD_FLAG_SYNC) {
> +                qemu_sem_post(&multifd_send_state->sem_sync);
> +            }
>          } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> +        } else {
> +            qemu_mutex_unlock(&p->mutex);
> +            /* sometimes there are spurious wakeups */
>          }
> -        qemu_mutex_unlock(&p->mutex);
> -        /* this is impossible */
> -        error_setg(&local_err, "multifd_send_thread: Unknown command");
> -        break;
>      }
>  
>  out:
> @@ -961,12 +1002,14 @@ int multifd_save_setup(void)
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      atomic_set(&multifd_send_state->count, 0);
>      multifd_send_state->pages = multifd_pages_init(page_count);
> +    qemu_sem_init(&multifd_send_state->sem_sync, 0);
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
> +        qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
>          p->pending_job = 0;
>          p->id = i;
> @@ -991,6 +1034,10 @@ struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
>      int count;
> +    /* syncs main thread and channels */
> +    QemuSemaphore sem_sync;
> +    /* global number of generated multifd packets */
> +    uint64_t packet_num;
>  } *multifd_recv_state;
>  
>  static void multifd_recv_terminate_threads(Error *err)
> @@ -1036,6 +1083,7 @@ int multifd_load_cleanup(Error **errp)
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_sem_destroy(&p->sem_sync);
>          g_free(p->name);
>          p->name = NULL;
>          multifd_pages_clear(p->pages);
> @@ -1044,6 +1092,7 @@ int multifd_load_cleanup(Error **errp)
>          g_free(p->packet);
>          p->packet = NULL;
>      }
> +    qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
>      g_free(multifd_recv_state);
> @@ -1052,6 +1101,42 @@ int multifd_load_cleanup(Error **errp)
>      return ret;
>  }
>  
> +static void multifd_recv_sync_main(void)
> +{
> +    int i;
> +
> +    if (!migrate_use_multifd()) {
> +        return;
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_signal(p->id);
> +        qemu_mutex_lock(&p->mutex);
> +        p->pending_job = true;
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_wait(p->id);
> +        qemu_sem_wait(&multifd_recv_state->sem_sync);
> +        qemu_mutex_lock(&p->mutex);
> +        if (atomic_read(&multifd_recv_state->packet_num) < p->packet_num) {
> +            atomic_set(&multifd_recv_state->packet_num, p->packet_num);
> +        }
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_signal(p->id);
> +
> +        qemu_sem_post(&p->sem_sync);
> +    }
> +    trace_multifd_recv_sync_main(atomic_read(&multifd_recv_state->packet_num));
> +}
> +
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
> @@ -1061,37 +1146,30 @@ static void *multifd_recv_thread(void *opaque)
>      trace_multifd_recv_thread_start(p->id);
>  
>      while (true) {
> -        qemu_sem_wait(&p->sem);
> +        uint32_t used;
> +        uint32_t flags;
> +
> +        /* ToDo: recv packet here */
> +
>          qemu_mutex_lock(&p->mutex);
> -        if (p->pending_job) {
> -            uint32_t used;
> -            uint32_t flags;
> -            qemu_mutex_unlock(&p->mutex);
> -
> -            /* ToDo: recv packet here */
> -
> -            qemu_mutex_lock(&p->mutex);
> -            ret = multifd_recv_unfill_packet(p, &local_err);
> -            if (ret) {
> -                qemu_mutex_unlock(&p->mutex);
> -                break;
> -            }
> -
> -            used = p->pages->used;
> -            flags = p->flags;
> -            trace_multifd_recv(p->id, p->packet_num, used, flags);
> -            p->pending_job = false;
> -            p->num_packets++;
> -            p->num_pages += used;
> -            qemu_mutex_unlock(&p->mutex);
> -        } else if (p->quit) {
> +        ret = multifd_recv_unfill_packet(p, &local_err);
> +        if (ret) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
>          }
> +
> +        used = p->pages->used;
> +        flags = p->flags;
> +        trace_multifd_recv(p->id, p->packet_num, used, flags);
> +        p->pending_job = false;
> +        p->num_packets++;
> +        p->num_pages += used;
>          qemu_mutex_unlock(&p->mutex);
> -        /* this is impossible */
> -        error_setg(&local_err, "multifd_recv_thread: Unknown command");
> -        break;
> +
> +        if (flags & MULTIFD_FLAG_SYNC) {
> +            qemu_sem_post(&multifd_recv_state->sem_sync);
> +            qemu_sem_wait(&p->sem_sync);
> +        }
>      }
>  
>      if (local_err) {
> @@ -1119,12 +1197,14 @@ int multifd_load_setup(void)
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      atomic_set(&multifd_recv_state->count, 0);
> +    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
> +        qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
>          p->pending_job = false;
>          p->id = i;
> @@ -2882,6 +2962,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>      ram_control_before_iterate(f, RAM_CONTROL_SETUP);
>      ram_control_after_iterate(f, RAM_CONTROL_SETUP);
>  
> +    multifd_send_sync_main();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>  
>      return 0;
> @@ -2962,6 +3043,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>       */
>      ram_control_after_iterate(f, RAM_CONTROL_ROUND);
>  
> +    multifd_send_sync_main();
>  out:
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>      ram_counters.transferred += 8;
> @@ -3015,6 +3097,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>  
>      rcu_read_unlock();
>  
> +    multifd_send_sync_main();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>  
>      return 0;
> @@ -3504,6 +3587,7 @@ static int ram_load_postcopy(QEMUFile *f)
>              break;
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
> +            multifd_recv_sync_main();
>              break;
>          default:
>              error_report("Unknown combination of migration flags: %#x"
> @@ -3692,6 +3776,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
>          case RAM_SAVE_FLAG_EOS:
>              /* normal exit */
> +            multifd_recv_sync_main();
>              break;
>          default:
>              if (flags & RAM_SAVE_FLAG_HOOK) {
> diff --git a/migration/trace-events b/migration/trace-events
> index c667d98529..ea39bb2bc5 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -77,9 +77,15 @@ migration_bitmap_sync_start(void) ""
>  migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
>  migration_throttle(void) ""
>  multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet number %ld pages %d flags 0x%x"
> +multifd_recv_sync_main(uint32_t seq) "seq %d"
> +multifd_recv_sync_main_signal(uint8_t id) "channel %d"
> +multifd_recv_sync_main_wait(uint8_t id) "channel %d"
>  multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
>  multifd_recv_thread_start(uint8_t id) "%d"
>  multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet_num %ld pages %d flags 0x%x"
> +multifd_send_sync_main(uint32_t seq) "seq %d"
> +multifd_send_sync_main_signal(uint8_t id) "channel %d"
> +multifd_send_sync_main_wait(uint8_t id) "channel %d"
>  multifd_send_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
>  multifd_send_thread_start(uint8_t id) "%d"
>  ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx"
> -- 
> 2.17.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

  reply	other threads:[~2018-06-20 10:30 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-06-20  8:15 [Qemu-devel] [PATCH v14 00/12] Multifd Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 01/12] migration: Create multipage support Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 02/12] migration: Create multifd packet Juan Quintela
2018-06-20  9:40   ` Dr. David Alan Gilbert
2018-06-20  9:43     ` Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 03/12] migration: Add multifd traces for start/end thread Juan Quintela
2018-06-20  9:51   ` Dr. David Alan Gilbert
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 04/12] migration: Calculate transferred ram correctly Juan Quintela
2018-06-20  9:58   ` Dr. David Alan Gilbert
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 05/12] migration: Multifd channels always wait on the sem Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 06/12] migration: Add block where to send/receive packets Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 07/12] migration: Synchronize multifd threads with main thread Juan Quintela
2018-06-20 10:30   ` Dr. David Alan Gilbert [this message]
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 08/12] migration: Create ram_save_multifd_page Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 09/12] migration: Start sending messages Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 10/12] migration: Wait for blocking IO Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 11/12] migration: Remove not needed semaphore and quit Juan Quintela
2018-06-20  8:15 ` [Qemu-devel] [PATCH v14 12/12] migration: Stop sending whole pages through main channel Juan Quintela
2018-06-20 10:44 ` [Qemu-devel] [PATCH v14 00/12] Multifd no-reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180620103014.GG2549@work-vm \
    --to=dgilbert@redhat.com \
    --cc=lvivier@redhat.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.