qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Peter Xu <peterx@redhat.com>
To: qemu-devel@nongnu.org
Cc: Juan Quintela <quintela@redhat.com>,
	Fabiano Rosas <farosas@suse.de>,
	peterx@redhat.com
Subject: [PATCH RFC 5/7] migration: Modulize multifd send threads with a few helpers
Date: Sun, 22 Oct 2023 16:12:09 -0400	[thread overview]
Message-ID: <20231022201211.452861-6-peterx@redhat.com> (raw)
In-Reply-To: <20231022201211.452861-1-peterx@redhat.com>

Abstract the multifd send packet logic into two phases:

  - multifd_send_prepare(): prepare the packet headers, with mutex
  - multifd_do_send(): do the send job finally, without mutex

When at it, always allow the send thread to use Error* for detecting
errors, dropping "int ret" altogether.

One trivial change is the send thread now kicks the sem_sync within mutex
critical section, but that shouldn't be a problem.

Signed-off-by: Peter Xu <peterx@redhat.com>
---
 migration/multifd.c | 160 ++++++++++++++++++++++++++------------------
 1 file changed, 96 insertions(+), 64 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 9d458914a9..8140520843 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -640,13 +640,89 @@ int multifd_send_sync_main(QEMUFile *f)
     return 0;
 }
 
+/*
+ * Returns true if succeed, false otherwise (with errp set).  Caller must
+ * be with p->mutex held.
+ */
+static bool multifd_send_prepare(MultiFDSendParams *p, Error **errp)
+{
+    bool use_zero_copy_send = migrate_zero_copy_send();
+    uint64_t packet_num = p->packet_num;
+    uint32_t flags;
+    int ret;
+
+    p->normal_num = 0;
+
+    if (use_zero_copy_send) {
+        p->iovs_num = 0;
+    } else {
+        p->iovs_num = 1;
+    }
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->normal[p->normal_num] = p->pages->offset[i];
+        p->normal_num++;
+    }
+
+    if (p->normal_num) {
+        ret = multifd_send_state->ops->send_prepare(p, errp);
+        if (ret != 0) {
+            return false;
+        }
+    }
+    multifd_send_fill_packet(p);
+    flags = p->flags;
+    p->flags = 0;
+    p->num_packets++;
+    p->total_normal_pages += p->normal_num;
+    p->pages->num = 0;
+    p->pages->block = NULL;
+
+    trace_multifd_send(p->id, packet_num, p->normal_num, flags,
+                       p->next_packet_size);
+
+    return true;
+}
+
+/* Returns true if succeed, false otherwise (with errp set) */
+static bool multifd_do_send(MultiFDSendParams *p, Error **errp)
+{
+    bool use_zero_copy_send = migrate_zero_copy_send();
+    int ret;
+
+    if (use_zero_copy_send) {
+        /* Send header first, without zerocopy */
+        ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                    p->packet_len, errp);
+        if (ret != 0) {
+            return false;
+        }
+    } else {
+        /* Send header using the same writev call */
+        p->iov[0].iov_len = p->packet_len;
+        p->iov[0].iov_base = p->packet;
+    }
+
+    ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
+                                      0, p->write_flags, errp);
+    if (ret != 0) {
+        return false;
+    }
+
+    stat64_add(&mig_stats.multifd_bytes,
+               p->next_packet_size + p->packet_len);
+    stat64_add(&mig_stats.transferred,
+               p->next_packet_size + p->packet_len);
+    p->next_packet_size = 0;
+
+    return true;
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     MigrationThread *thread = NULL;
     Error *local_err = NULL;
-    int ret = 0;
-    bool use_zero_copy_send = migrate_zero_copy_send();
 
     thread = migration_threads_add(p->name, qemu_get_thread_id());
 
@@ -654,9 +730,10 @@ static void *multifd_send_thread(void *opaque)
     rcu_register_thread();
 
     if (multifd_send_initial_packet(p, &local_err) < 0) {
-        ret = -1;
+        assert(local_err);
         goto out;
     }
+
     /* initial packet */
     p->num_packets = 1;
 
@@ -667,83 +744,38 @@ static void *multifd_send_thread(void *opaque)
         if (qatomic_read(&multifd_send_state->exiting)) {
             break;
         }
-        qemu_mutex_lock(&p->mutex);
 
+        qemu_mutex_lock(&p->mutex);
         if (p->pending_job) {
-            uint64_t packet_num = p->packet_num;
-            uint32_t flags;
-            p->normal_num = 0;
-
-            if (use_zero_copy_send) {
-                p->iovs_num = 0;
-            } else {
-                p->iovs_num = 1;
-            }
+            bool need_sync = p->flags & MULTIFD_FLAG_SYNC;
 
-            for (int i = 0; i < p->pages->num; i++) {
-                p->normal[p->normal_num] = p->pages->offset[i];
-                p->normal_num++;
+            if (!multifd_send_prepare(p, &local_err)) {
+                assert(local_err);
+                qemu_mutex_unlock(&p->mutex);
+                goto out;
             }
 
-            if (p->normal_num) {
-                ret = multifd_send_state->ops->send_prepare(p, &local_err);
-                if (ret != 0) {
-                    qemu_mutex_unlock(&p->mutex);
-                    break;
-                }
-            }
-            multifd_send_fill_packet(p);
-            flags = p->flags;
-            p->flags = 0;
-            p->num_packets++;
-            p->total_normal_pages += p->normal_num;
-            p->pages->num = 0;
-            p->pages->block = NULL;
+            /* Send the packets without mutex */
             qemu_mutex_unlock(&p->mutex);
-
-            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
-                               p->next_packet_size);
-
-            if (use_zero_copy_send) {
-                /* Send header first, without zerocopy */
-                ret = qio_channel_write_all(p->c, (void *)p->packet,
-                                            p->packet_len, &local_err);
-                if (ret != 0) {
-                    break;
-                }
-            } else {
-                /* Send header using the same writev call */
-                p->iov[0].iov_len = p->packet_len;
-                p->iov[0].iov_base = p->packet;
+            if (!multifd_do_send(p, &local_err)) {
+                assert(local_err);
+                goto out;
             }
-
-            ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
-                                              0, p->write_flags, &local_err);
-            if (ret != 0) {
-                break;
-            }
-
-            stat64_add(&mig_stats.multifd_bytes,
-                       p->next_packet_size + p->packet_len);
-            stat64_add(&mig_stats.transferred,
-                       p->next_packet_size + p->packet_len);
-            p->next_packet_size = 0;
             qemu_mutex_lock(&p->mutex);
+
+            /* Send successful, mark the task completed */
             p->pending_job--;
-            qemu_mutex_unlock(&p->mutex);
 
-            if (flags & MULTIFD_FLAG_SYNC) {
+            /* If this is a sync task, we need one more kick */
+            if (need_sync) {
                 qemu_sem_post(&p->sem_sync);
             }
-        } else {
-            qemu_mutex_unlock(&p->mutex);
-            /* sometimes there are spurious wakeups */
         }
+        qemu_mutex_unlock(&p->mutex);
     }
 
 out:
-    if (ret) {
-        assert(local_err);
+    if (local_err) {
         trace_multifd_send_error(p->id);
         multifd_send_terminate_threads(local_err);
         multifd_send_kick_main(p);
-- 
2.41.0



  parent reply	other threads:[~2023-10-22 20:13 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-10-22 20:12 [PATCH RFC 0/7] migration/multifd: quit unitifications and separate sync packet Peter Xu
2023-10-22 20:12 ` [PATCH RFC 1/7] migration: Drop stale comment for multifd zero copy Peter Xu
2023-10-23 14:16   ` Fabiano Rosas
2023-10-22 20:12 ` [PATCH RFC 2/7] migration: Fix error leak in multifd_tls_outgoing_handshake() Peter Xu
2023-10-23 14:17   ` Fabiano Rosas
2023-10-22 20:12 ` [PATCH RFC 3/7] migration: multifd_send_kick_main() Peter Xu
2023-10-23 14:43   ` Fabiano Rosas
2023-11-08 22:49   ` Fabiano Rosas
2023-11-09 16:50     ` Peter Xu
2023-11-09 17:00       ` Fabiano Rosas
2023-10-22 20:12 ` [PATCH RFC 4/7] migration: Drop MultiFDSendParams.quit and cleanup error paths Peter Xu
2023-10-23 14:42   ` Fabiano Rosas
2023-10-23 14:53     ` Peter Xu
2023-10-23 15:35       ` Fabiano Rosas
2023-10-23 15:54         ` Peter Xu
2023-10-22 20:12 ` Peter Xu [this message]
2023-10-22 20:12 ` [PATCH RFC 6/7] migration: Split multifd pending_job into two booleans Peter Xu
2023-10-23 15:15   ` Fabiano Rosas
2023-10-23 15:52     ` Peter Xu
2023-10-22 20:12 ` [PATCH RFC 7/7] migration: Further unify paths for multifd normal or sync requests Peter Xu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20231022201211.452861-6-peterx@redhat.com \
    --to=peterx@redhat.com \
    --cc=farosas@suse.de \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).