qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path.
@ 2024-07-11 22:04 Yichen Wang
  2024-07-11 22:04 ` [PATCH v5 11/13] migration/multifd: Add migration option set packet size Yichen Wang
                   ` (3 more replies)
  0 siblings, 4 replies; 8+ messages in thread
From: Yichen Wang @ 2024-07-11 22:04 UTC (permalink / raw)
  To: Paolo Bonzini, Marc-André Lureau, Daniel P. Berrangé,
	Thomas Huth, Philippe Mathieu-Daudé, Peter Xu, Fabiano Rosas,
	Eric Blake, Markus Armbruster, Michael S. Tsirkin, Cornelia Huck,
	qemu-devel
  Cc: Hao Xiang, Liu, Yuan1, Shivam Kumar, Ho-Ren (Jack) Chuang,
	Yichen Wang

From: Hao Xiang <hao.xiang@linux.dev>

Multifd sender path gets an array of pages queued by the migration
thread. It performs zero page checking on every page in the array.
The pages are classfied as either a zero page or a normal page. This
change uses Intel DSA to offload the zero page checking from CPU to
the DSA accelerator. The sender thread submits a batch of pages to DSA
hardware and waits for the DSA completion thread to signal for work
completion.

Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
---
 include/qemu/dsa.h            |   4 +-
 migration/migration.c         |   2 +-
 migration/multifd-zero-page.c | 100 ++++++++++++++++++++++++++++++++--
 migration/multifd.c           |  43 ++++++++++++++-
 migration/multifd.h           |   2 +-
 util/dsa.c                    |  23 ++++----
 6 files changed, 150 insertions(+), 24 deletions(-)

diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
index fd0305a7c7..a3b502ee41 100644
--- a/include/qemu/dsa.h
+++ b/include/qemu/dsa.h
@@ -83,7 +83,7 @@ typedef struct QemuDsaBatchTask {
  *
  * @return int Zero if successful, otherwise non zero.
  */
-int qemu_dsa_init(const char *dsa_parameter, Error **errp);
+int qemu_dsa_init(const strList *dsa_parameter, Error **errp);
 
 /**
  * @brief Start logic to enable using DSA.
@@ -146,7 +146,7 @@ static inline bool qemu_dsa_is_running(void)
     return false;
 }
 
-static inline int qemu_dsa_init(const char *dsa_parameter, Error **errp)
+static inline int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
 {
     error_setg(errp, "DSA accelerator is not enabled.");
     return -1;
diff --git a/migration/migration.c b/migration/migration.c
index 3dea06d577..085395b900 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -3469,7 +3469,7 @@ static void *migration_thread(void *opaque)
     object_ref(OBJECT(s));
     update_iteration_initial_status(s);
 
-    if (!multifd_send_setup()) {
+    if (!multifd_send_setup(&local_err)) {
         goto out;
     }
 
diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
index e1b8370f88..ffb5611d44 100644
--- a/migration/multifd-zero-page.c
+++ b/migration/multifd-zero-page.c
@@ -37,25 +37,84 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
 }
 
 /**
- * multifd_send_zero_page_detect: Perform zero page detection on all pages.
+ * zero_page_detect_cpu: Perform zero page detection using CPU.
  *
  * Sorts normal pages before zero pages in p->pages->offset and updates
  * p->pages->normal_num.
  *
  * @param p A pointer to the send params.
  */
-void multifd_send_zero_page_detect(MultiFDSendParams *p)
+static void zero_page_detect_cpu(MultiFDSendParams *p)
 {
     MultiFDPages_t *pages = p->pages;
     RAMBlock *rb = pages->block;
     int i = 0;
     int j = pages->num - 1;
 
-    if (!multifd_zero_page_enabled()) {
-        pages->normal_num = pages->num;
+    /*
+     * Sort the page offset array by moving all normal pages to
+     * the left and all zero pages to the right of the array.
+     */
+    while (i <= j) {
+        uint64_t offset = pages->offset[i];
+
+        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
+            i++;
+            continue;
+        }
+
+        swap_page_offset(pages->offset, i, j);
+        ram_release_page(rb->idstr, offset);
+        j--;
+    }
+
+    pages->normal_num = i;
+}
+
+
+#ifdef CONFIG_DSA_OPT
+
+static void swap_result(bool *results, int a, int b)
+{
+    bool temp;
+
+    if (a == b) {
         return;
     }
 
+    temp = results[a];
+    results[a] = results[b];
+    results[b] = temp;
+}
+
+/**
+ * zero_page_detect_dsa: Perform zero page detection using
+ * Intel Data Streaming Accelerator (DSA).
+ *
+ * Sorts normal pages before zero pages in p->pages->offset and updates
+ * p->pages->normal_num.
+ *
+ * @param p A pointer to the send params.
+ */
+static void zero_page_detect_dsa(MultiFDSendParams *p)
+{
+    MultiFDPages_t *pages = p->pages;
+    RAMBlock *rb = pages->block;
+    bool *results = p->dsa_batch_task->results;
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->dsa_batch_task->addr[i] =
+            (ram_addr_t)(rb->host + p->pages->offset[i]);
+    }
+
+    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
+                                  (const void **)p->dsa_batch_task->addr,
+                                  p->pages->num,
+                                  p->page_size);
+
+    int i = 0;
+    int j = pages->num - 1;
+
     /*
      * Sort the page offset array by moving all normal pages to
      * the left and all zero pages to the right of the array.
@@ -63,11 +122,12 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
     while (i <= j) {
         uint64_t offset = pages->offset[i];
 
-        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
+        if (!results[i]) {
             i++;
             continue;
         }
 
+        swap_result(results, i, j);
         swap_page_offset(pages->offset, i, j);
         ram_release_page(rb->idstr, offset);
         j--;
@@ -76,6 +136,15 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
     pages->normal_num = i;
 }
 
+#else
+
+static void zero_page_detect_dsa(MultiFDSendParams *p)
+{
+    exit(1);
+}
+
+#endif
+
 void multifd_recv_zero_page_process(MultiFDRecvParams *p)
 {
     for (int i = 0; i < p->zero_num; i++) {
@@ -87,3 +156,24 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
         }
     }
 }
+
+/**
+ * multifd_send_zero_page_detect: Perform zero page detection on all pages.
+ *
+ * @param p A pointer to the send params.
+ */
+void multifd_send_zero_page_detect(MultiFDSendParams *p)
+{
+    MultiFDPages_t *pages = p->pages;
+
+    if (!multifd_zero_page_enabled()) {
+        pages->normal_num = pages->num;
+        return;
+    }
+
+    if (qemu_dsa_is_running()) {
+        zero_page_detect_dsa(p);
+    } else {
+        zero_page_detect_cpu(p);
+    }
+}
diff --git a/migration/multifd.c b/migration/multifd.c
index 6f8edd4b6a..014fee757a 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -817,6 +817,32 @@ static void multifd_send_cleanup_state(void)
     multifd_send_state = NULL;
 }
 
+static bool multifd_dsa_setup(MigrationState *s, const char *role, Error **errp)
+{
+    /*
+     * Only setup DSA when needed. Currently, DSA is only used for zero page
+     * detection, which is only needed on sender side.
+     */
+    if (!s ||
+        s->parameters.zero_page_detection != ZERO_PAGE_DETECTION_DSA_ACCEL) {
+        return true;
+    }
+
+    const strList *dsa_parameter = migrate_dsa_accel_path();
+    if (qemu_dsa_init(dsa_parameter, errp)) {
+        error_setg(errp, "multifd: %s failed to initialize DSA.", role);
+        return false;
+    }
+    qemu_dsa_start();
+
+    return true;
+}
+
+static void multifd_dsa_cleanup(void)
+{
+    qemu_dsa_cleanup();
+}
+
 void multifd_send_shutdown(void)
 {
     int i;
@@ -827,6 +853,8 @@ void multifd_send_shutdown(void)
 
     multifd_send_terminate_threads();
 
+    multifd_dsa_cleanup();
+
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
@@ -1156,7 +1184,7 @@ static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
     return true;
 }
 
-bool multifd_send_setup(void)
+bool multifd_send_setup(Error **errp)
 {
     MigrationState *s = migrate_get_current();
     Error *local_err = NULL;
@@ -1169,6 +1197,10 @@ bool multifd_send_setup(void)
         return true;
     }
 
+    if (!multifd_dsa_setup(s, "Sender", errp)) {
+        return false;
+    }
+
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
@@ -1395,6 +1427,7 @@ void multifd_recv_cleanup(void)
             qemu_thread_join(&p->thread);
         }
     }
+    multifd_dsa_cleanup();
     for (i = 0; i < migrate_multifd_channels(); i++) {
         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
     }
@@ -1570,6 +1603,7 @@ int multifd_recv_setup(Error **errp)
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     bool use_packets = multifd_use_packets();
     uint8_t i;
+    int ret;
 
     /*
      * Return successfully if multiFD recv state is already initialised
@@ -1579,6 +1613,10 @@ int multifd_recv_setup(Error **errp)
         return 0;
     }
 
+    if (!multifd_dsa_setup(NULL, "Receiver", errp)) {
+        return -1;
+    }
+
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
@@ -1617,13 +1655,12 @@ int multifd_recv_setup(Error **errp)
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
-        int ret;
-
         ret = multifd_recv_state->ops->recv_setup(p, errp);
         if (ret) {
             return ret;
         }
     }
+
     return 0;
 }
 
diff --git a/migration/multifd.h b/migration/multifd.h
index 027f57bf4e..871e3aa063 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -18,7 +18,7 @@
 
 typedef struct MultiFDRecvData MultiFDRecvData;
 
-bool multifd_send_setup(void);
+bool multifd_send_setup(Error **errp);
 void multifd_send_shutdown(void);
 void multifd_send_channel_created(void);
 int multifd_recv_setup(Error **errp);
diff --git a/util/dsa.c b/util/dsa.c
index 5aba1ae23a..44b1130a51 100644
--- a/util/dsa.c
+++ b/util/dsa.c
@@ -116,27 +116,27 @@ dsa_device_cleanup(QemuDsaDevice *instance)
  */
 static int
 dsa_device_group_init(QemuDsaDeviceGroup *group,
-                      const char *dsa_parameter,
+                      const strList *dsa_parameter,
                       Error **errp)
 {
-    if (dsa_parameter == NULL || strlen(dsa_parameter) == 0) {
-        return 0;
+    if (dsa_parameter == NULL) {
+        /* HACKING ALERT. */
+        /* return 0; */
+        dsa_parameter = &(strList) {
+            .value = (char *)"/dev/dsa/wq0.0", .next = NULL
+        };
     }
 
     int ret = 0;
-    char *local_dsa_parameter = g_strdup(dsa_parameter);
     const char *dsa_path[MAX_DSA_DEVICES];
     int num_dsa_devices = 0;
-    char delim[2] = " ";
 
-    char *current_dsa_path = strtok(local_dsa_parameter, delim);
-
-    while (current_dsa_path != NULL) {
-        dsa_path[num_dsa_devices++] = current_dsa_path;
+    while (dsa_parameter) {
+        dsa_path[num_dsa_devices++] = dsa_parameter->value;
         if (num_dsa_devices == MAX_DSA_DEVICES) {
             break;
         }
-        current_dsa_path = strtok(NULL, delim);
+        dsa_parameter = dsa_parameter->next;
     }
 
     group->dsa_devices =
@@ -161,7 +161,6 @@ dsa_device_group_init(QemuDsaDeviceGroup *group,
     }
 
 exit:
-    g_free(local_dsa_parameter);
     return ret;
 }
 
@@ -718,7 +717,7 @@ dsa_globals_init(void)
  *
  * @return int Zero if successful, otherwise non zero.
  */
-int qemu_dsa_init(const char *dsa_parameter, Error **errp)
+int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
 {
     dsa_globals_init();
 
-- 
Yichen Wang



^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v5 11/13] migration/multifd: Add migration option set packet size.
  2024-07-11 22:04 [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Yichen Wang
@ 2024-07-11 22:04 ` Yichen Wang
  2024-07-17 14:59   ` Fabiano Rosas
  2024-07-11 22:04 ` [PATCH v5 12/13] util/dsa: Add unit test coverage for Intel DSA task submission and completion Yichen Wang
                   ` (2 subsequent siblings)
  3 siblings, 1 reply; 8+ messages in thread
From: Yichen Wang @ 2024-07-11 22:04 UTC (permalink / raw)
  To: Paolo Bonzini, Marc-André Lureau, Daniel P. Berrangé,
	Thomas Huth, Philippe Mathieu-Daudé, Peter Xu, Fabiano Rosas,
	Eric Blake, Markus Armbruster, Michael S. Tsirkin, Cornelia Huck,
	qemu-devel
  Cc: Hao Xiang, Liu, Yuan1, Shivam Kumar, Ho-Ren (Jack) Chuang,
	Yichen Wang

From: Hao Xiang <hao.xiang@linux.dev>

During live migration, if the latency between sender and receiver is
high and bandwidth is also high (a long and fat pipe), using a bigger
packet size can help reduce migration total time. The current multifd
packet size is 128 * 4kb. In addition, Intel DSA offloading performs
better with a large batch task.

This change adds an option to set the packet size, which is also useful
for performance tunin. Both sender and receiver needs to set the same
packet size for things to work.

Set the option:
migrate_set_parameter multifd-packet-size 4190208

Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
---
 migration/migration-hmp-cmds.c |  7 +++++++
 migration/multifd-zlib.c       |  6 ++++--
 migration/multifd-zstd.c       |  6 ++++--
 migration/multifd.c            |  6 ++++--
 migration/multifd.h            |  3 ---
 migration/options.c            | 38 ++++++++++++++++++++++++++++++++++
 migration/options.h            |  1 +
 qapi/migration.json            | 21 ++++++++++++++++---
 8 files changed, 76 insertions(+), 12 deletions(-)

diff --git a/migration/migration-hmp-cmds.c b/migration/migration-hmp-cmds.c
index c422db4ecd..27ba0ce79a 100644
--- a/migration/migration-hmp-cmds.c
+++ b/migration/migration-hmp-cmds.c
@@ -292,6 +292,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
         monitor_printf(mon, "%s: %u ms\n",
             MigrationParameter_str(MIGRATION_PARAMETER_X_CHECKPOINT_DELAY),
             params->x_checkpoint_delay);
+        monitor_printf(mon, "%s: %" PRIu64 "\n",
+            MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_PACKET_SIZE),
+            params->multifd_packet_size);
         monitor_printf(mon, "%s: %u\n",
             MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_CHANNELS),
             params->multifd_channels);
@@ -576,6 +579,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
         p->has_dsa_accel_path = true;
         visit_type_strList(v, param, &p->dsa_accel_path, &err);
         break;
+    case MIGRATION_PARAMETER_MULTIFD_PACKET_SIZE:
+        p->has_multifd_packet_size = true;
+        visit_type_size(v, param, &p->multifd_packet_size, &err);
+        break;
     case MIGRATION_PARAMETER_MULTIFD_CHANNELS:
         p->has_multifd_channels = true;
         visit_type_uint8(v, param, &p->multifd_channels, &err);
diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 2ced69487e..bd900fe575 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -49,6 +49,7 @@ static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
     struct zlib_data *z = g_new0(struct zlib_data, 1);
     z_stream *zs = &z->zs;
     const char *err_msg;
+    uint64_t multifd_packet_size = migrate_multifd_packet_size();
 
     zs->zalloc = Z_NULL;
     zs->zfree = Z_NULL;
@@ -58,7 +59,7 @@ static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
         goto err_free_z;
     }
     /* This is the maximum size of the compressed buffer */
-    z->zbuff_len = compressBound(MULTIFD_PACKET_SIZE);
+    z->zbuff_len = compressBound(multifd_packet_size);
     z->zbuff = g_try_malloc(z->zbuff_len);
     if (!z->zbuff) {
         err_msg = "out of memory for zbuff";
@@ -200,6 +201,7 @@ out:
  */
 static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
 {
+    uint64_t multifd_packet_size = migrate_multifd_packet_size();
     struct zlib_data *z = g_new0(struct zlib_data, 1);
     z_stream *zs = &z->zs;
 
@@ -214,7 +216,7 @@ static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
         return -1;
     }
     /* To be safe, we reserve twice the size of the packet */
-    z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
+    z->zbuff_len = multifd_packet_size * 2;
     z->zbuff = g_try_malloc(z->zbuff_len);
     if (!z->zbuff) {
         inflateEnd(zs);
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index ca17b7e310..8aaa7363be 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -49,6 +49,7 @@ struct zstd_data {
  */
 static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
 {
+    uint64_t multifd_packet_size = migrate_multifd_packet_size();
     struct zstd_data *z = g_new0(struct zstd_data, 1);
     int res;
 
@@ -68,7 +69,7 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
         return -1;
     }
     /* This is the maximum size of the compressed buffer */
-    z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE);
+    z->zbuff_len = ZSTD_compressBound(multifd_packet_size);
     z->zbuff = g_try_malloc(z->zbuff_len);
     if (!z->zbuff) {
         ZSTD_freeCStream(z->zcs);
@@ -188,6 +189,7 @@ out:
  */
 static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
 {
+    uint64_t multifd_packet_size = migrate_multifd_packet_size();
     struct zstd_data *z = g_new0(struct zstd_data, 1);
     int ret;
 
@@ -209,7 +211,7 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
     }
 
     /* To be safe, we reserve twice the size of the packet */
-    z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
+    z->zbuff_len = multifd_packet_size * 2;
     z->zbuff = g_try_malloc(z->zbuff_len);
     if (!z->zbuff) {
         ZSTD_freeDStream(z->zds);
diff --git a/migration/multifd.c b/migration/multifd.c
index 014fee757a..87ed421364 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -1189,7 +1189,8 @@ bool multifd_send_setup(Error **errp)
     MigrationState *s = migrate_get_current();
     Error *local_err = NULL;
     int thread_count, ret = 0;
-    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    uint32_t page_count =
+        migrate_multifd_packet_size() / qemu_target_page_size();
     bool use_packets = multifd_use_packets();
     uint8_t i;
 
@@ -1600,7 +1601,8 @@ static void *multifd_recv_thread(void *opaque)
 int multifd_recv_setup(Error **errp)
 {
     int thread_count;
-    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    uint32_t page_count =
+        migrate_multifd_packet_size() / qemu_target_page_size();
     bool use_packets = multifd_use_packets();
     uint8_t i;
     int ret;
diff --git a/migration/multifd.h b/migration/multifd.h
index 871e3aa063..63cec33c61 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -44,9 +44,6 @@ MultiFDRecvData *multifd_get_recv_data(void);
 #define MULTIFD_FLAG_QPL (4 << 1)
 #define MULTIFD_FLAG_UADK (8 << 1)
 
-/* This value needs to be a multiple of qemu_target_page_size() */
-#define MULTIFD_PACKET_SIZE (512 * 1024)
-
 typedef struct {
     uint32_t magic;
     uint32_t version;
diff --git a/migration/options.c b/migration/options.c
index f839493016..1417fa6ab0 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -73,6 +73,12 @@
 #define DEFAULT_MIGRATE_ANNOUNCE_ROUNDS    5
 #define DEFAULT_MIGRATE_ANNOUNCE_STEP    100
 
+/*
+ * Parameter for multifd packet size.
+ */
+#define DEFAULT_MIGRATE_MULTIFD_PACKET_SIZE (128 * 4 * 1024)
+#define MAX_MIGRATE_MULTIFD_PACKET_SIZE (1023 * 4 * 1024)
+
 #define DEFINE_PROP_MIG_CAP(name, x)             \
     DEFINE_PROP_BOOL(name, MigrationState, capabilities[x], false)
 
@@ -167,6 +173,9 @@ Property migration_properties[] = {
     /*                    parameters.dsa_accel_path, qdev_prop_string, char *), */
     /* DEFINE_PROP_STRING("dsa-accel-path", MigrationState, */
     /*                    parameters.dsa_accel_path), */
+    DEFINE_PROP_SIZE("multifd-packet-size", MigrationState,
+                     parameters.multifd_packet_size,
+                     DEFAULT_MIGRATE_MULTIFD_PACKET_SIZE),
 
     /* Migration capabilities */
     DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
@@ -777,6 +786,13 @@ int migrate_multifd_channels(void)
     return s->parameters.multifd_channels;
 }
 
+uint64_t migrate_multifd_packet_size(void)
+{
+    MigrationState *s = migrate_get_current();
+
+    return s->parameters.multifd_packet_size;
+}
+
 MultiFDCompression migrate_multifd_compression(void)
 {
     MigrationState *s = migrate_get_current();
@@ -898,6 +914,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
     params->downtime_limit = s->parameters.downtime_limit;
     params->has_x_checkpoint_delay = true;
     params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
+    params->has_multifd_packet_size = true;
+    params->multifd_packet_size = s->parameters.multifd_packet_size;
     params->has_multifd_channels = true;
     params->multifd_channels = s->parameters.multifd_channels;
     params->has_multifd_compression = true;
@@ -957,6 +975,7 @@ void migrate_params_init(MigrationParameters *params)
     params->has_max_bandwidth = true;
     params->has_downtime_limit = true;
     params->has_x_checkpoint_delay = true;
+    params->has_multifd_packet_size = true;
     params->has_multifd_channels = true;
     params->has_multifd_compression = true;
     params->has_multifd_zlib_level = true;
@@ -1038,6 +1057,19 @@ bool migrate_params_check(MigrationParameters *params, Error **errp)
 
     /* x_checkpoint_delay is now always positive */
 
+    if (params->has_multifd_packet_size &&
+        ((params->multifd_packet_size < DEFAULT_MIGRATE_MULTIFD_PACKET_SIZE) ||
+            (params->multifd_packet_size >  MAX_MIGRATE_MULTIFD_PACKET_SIZE) ||
+            (params->multifd_packet_size % qemu_target_page_size() != 0))) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                    "multifd_packet_size",
+                    "an integer in the range of "
+                    stringify(DEFAULT_MIGRATE_MULTIFD_PACKET_SIZE)
+                    " to "stringify(MAX_MIGRATE_MULTIFD_PACKET_SIZE)", "
+                    "and must be a multiple of guest VM's page size.");
+        return false;
+    }
+
     if (params->has_multifd_channels && (params->multifd_channels < 1)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
                    "multifd_channels",
@@ -1219,6 +1251,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
         dest->x_checkpoint_delay = params->x_checkpoint_delay;
     }
 
+    if (params->has_multifd_packet_size) {
+        dest->multifd_packet_size = params->multifd_packet_size;
+    }
     if (params->has_multifd_channels) {
         dest->multifd_channels = params->multifd_channels;
     }
@@ -1344,6 +1379,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
         colo_checkpoint_delay_set();
     }
 
+    if (params->has_multifd_packet_size) {
+        s->parameters.multifd_packet_size = params->multifd_packet_size;
+    }
     if (params->has_multifd_channels) {
         s->parameters.multifd_channels = params->multifd_channels;
     }
diff --git a/migration/options.h b/migration/options.h
index 78b9e4080b..b37cffc887 100644
--- a/migration/options.h
+++ b/migration/options.h
@@ -86,6 +86,7 @@ const char *migrate_tls_hostname(void);
 uint64_t migrate_xbzrle_cache_size(void);
 ZeroPageDetection migrate_zero_page_detection(void);
 const strList *migrate_dsa_accel_path(void);
+uint64_t migrate_multifd_packet_size(void);
 
 /* parameters helpers */
 
diff --git a/qapi/migration.json b/qapi/migration.json
index ff41780347..1a9dc5d74c 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -839,6 +839,10 @@
 #     only has effect if the @mapped-ram capability is enabled.
 #     (Since 9.1)
 #
+# @multifd-packet-size: Packet size in bytes used to migrate data.
+#     The value needs to be a multiple of guest page size.
+#     The default value is 524288 and max value is 4190208. (Since 9.2)
+#
 # Features:
 #
 # @unstable: Members @x-checkpoint-delay and
@@ -864,7 +868,8 @@
            'vcpu-dirty-limit',
            'mode',
            'zero-page-detection',
-           'direct-io'] }
+           'direct-io',
+           'multifd-packet-size'] }
 
 ##
 # @MigrateSetParameters:
@@ -1020,6 +1025,10 @@
 #     only has effect if the @mapped-ram capability is enabled.
 #     (Since 9.1)
 #
+# @multifd-packet-size: Packet size in bytes used to migrate data.
+#     The value needs to be a multiple of guest page size.
+#     The default value is 524288 and max value is 4190208. (Since 9.2)
+#
 # Features:
 #
 # @unstable: Members @x-checkpoint-delay and
@@ -1061,7 +1070,8 @@
             '*mode': 'MigMode',
             '*zero-page-detection': 'ZeroPageDetection',
             '*direct-io': 'bool',
-            '*dsa-accel-path': ['str'] } }
+            '*dsa-accel-path': ['str'],
+            '*multifd-packet-size' : 'uint64'} }
 
 ##
 # @migrate-set-parameters:
@@ -1231,6 +1241,10 @@
 #     only has effect if the @mapped-ram capability is enabled.
 #     (Since 9.1)
 #
+# @multifd-packet-size: Packet size in bytes used to migrate data.
+#     The value needs to be a multiple of guest page size.
+#     The default value is 524288 and max value is 4190208. (Since 9.2)
+#
 # Features:
 #
 # @unstable: Members @x-checkpoint-delay and
@@ -1269,7 +1283,8 @@
             '*mode': 'MigMode',
             '*zero-page-detection': 'ZeroPageDetection',
             '*direct-io': 'bool',
-            '*dsa-accel-path': ['str'] } }
+            '*dsa-accel-path': ['str'],
+            '*multifd-packet-size': 'uint64'} }
 
 ##
 # @query-migrate-parameters:
-- 
Yichen Wang



^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v5 12/13] util/dsa: Add unit test coverage for Intel DSA task submission and completion.
  2024-07-11 22:04 [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Yichen Wang
  2024-07-11 22:04 ` [PATCH v5 11/13] migration/multifd: Add migration option set packet size Yichen Wang
@ 2024-07-11 22:04 ` Yichen Wang
  2024-07-11 22:04 ` [PATCH v5 13/13] migration/multifd: Add integration tests for multifd with Intel DSA offloading Yichen Wang
  2024-07-17 14:41 ` [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Fabiano Rosas
  3 siblings, 0 replies; 8+ messages in thread
From: Yichen Wang @ 2024-07-11 22:04 UTC (permalink / raw)
  To: Paolo Bonzini, Marc-André Lureau, Daniel P. Berrangé,
	Thomas Huth, Philippe Mathieu-Daudé, Peter Xu, Fabiano Rosas,
	Eric Blake, Markus Armbruster, Michael S. Tsirkin, Cornelia Huck,
	qemu-devel
  Cc: Hao Xiang, Liu, Yuan1, Shivam Kumar, Ho-Ren (Jack) Chuang,
	Yichen Wang, Bryan Zhang

From: Hao Xiang <hao.xiang@linux.dev>

* Test DSA start and stop path.
* Test DSA configure and cleanup path.
* Test DSA task submission and completion path.

Signed-off-by: Bryan Zhang <bryan.zhang@bytedance.com>
Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
---
 tests/unit/meson.build |   6 +
 tests/unit/test-dsa.c  | 503 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 509 insertions(+)
 create mode 100644 tests/unit/test-dsa.c

diff --git a/tests/unit/meson.build b/tests/unit/meson.build
index 26c109c968..1d4d48898b 100644
--- a/tests/unit/meson.build
+++ b/tests/unit/meson.build
@@ -49,6 +49,12 @@ tests = {
   'test-interval-tree': [],
 }
 
+if config_host_data.get('CONFIG_DSA_OPT')
+  tests += {
+    'test-dsa': [],
+  }
+endif
+
 if have_system or have_tools
   tests += {
     'test-qmp-event': [testqapi],
diff --git a/tests/unit/test-dsa.c b/tests/unit/test-dsa.c
new file mode 100644
index 0000000000..181a547528
--- /dev/null
+++ b/tests/unit/test-dsa.c
@@ -0,0 +1,503 @@
+/*
+ * Test DSA functions.
+ *
+ * Copyright (C) Bytedance Ltd.
+ *
+ * Authors:
+ *  Hao Xiang <hao.xiang@bytedance.com>
+ *  Bryan Zhang <bryan.zhang@bytedance.com>
+ *  Yichen Wang <yichen.wang@bytedance.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/host-utils.h"
+
+#include "qemu/cutils.h"
+#include "qemu/memalign.h"
+#include "qemu/dsa.h"
+
+/*
+ * TODO Communicate that DSA must be configured to support this batch size.
+ * TODO Alternatively, poke the DSA device to figure out batch size.
+ */
+#define batch_size 128
+#define page_size 4096
+
+#define oversized_batch_size (batch_size + 1)
+#define num_devices 2
+#define max_buffer_size (64 * 1024)
+
+/* TODO Make these not-hardcoded. */
+static const strList path1[] = {
+    {.value = (char *)"/dev/dsa/wq4.0", .next = NULL}
+};
+static const strList path2[] = {
+    {.value = (char *)"/dev/dsa/wq4.0", .next = (strList*)&path2[1]},
+    {.value = (char *)"/dev/dsa/wq4.1", .next = NULL}
+};
+
+static Error **errp;
+
+static QemuDsaBatchTask *task;
+
+/* A helper for running a single task and checking for correctness. */
+static void do_single_task(void)
+{
+    task = buffer_zero_batch_task_init(batch_size);
+    char buf[page_size];
+    char *ptr = buf;
+
+    buffer_is_zero_dsa_batch_sync(task,
+                                  (const void **)&ptr,
+                                  1,
+                                  page_size);
+    g_assert(task->results[0] == buffer_is_zero(buf, page_size));
+
+    buffer_zero_batch_task_destroy(task);
+}
+
+static void test_single_zero(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+    qemu_dsa_start();
+
+    task = buffer_zero_batch_task_init(batch_size);
+
+    char buf[page_size];
+    char *ptr = buf;
+
+    memset(buf, 0x0, page_size);
+    buffer_is_zero_dsa_batch_sync(task,
+                                  (const void **)&ptr,
+                                  1, page_size);
+    g_assert(task->results[0]);
+
+    buffer_zero_batch_task_destroy(task);
+
+    qemu_dsa_cleanup();
+}
+
+static void test_single_zero_async(void)
+{
+    test_single_zero();
+}
+
+static void test_single_nonzero(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+    qemu_dsa_start();
+
+    task = buffer_zero_batch_task_init(batch_size);
+
+    char buf[page_size];
+    char *ptr = buf;
+
+    memset(buf, 0x1, page_size);
+    buffer_is_zero_dsa_batch_sync(task,
+                                  (const void **)&ptr,
+                                  1, page_size);
+    g_assert(!task->results[0]);
+
+    buffer_zero_batch_task_destroy(task);
+
+    qemu_dsa_cleanup();
+}
+
+static void test_single_nonzero_async(void)
+{
+    test_single_nonzero();
+}
+
+/* count == 0 should return quickly without calling into DSA. */
+static void test_zero_count_async(void)
+{
+    char buf[page_size];
+    buffer_is_zero_dsa_batch_sync(task,
+                                  (const void **)&buf,
+                                  0,
+                                  page_size);
+}
+
+static void test_null_task_async(void)
+{
+    if (g_test_subprocess()) {
+        g_assert(!qemu_dsa_init(path1, errp));
+
+        char buf[page_size * batch_size];
+        char *addrs[batch_size];
+        for (int i = 0; i < batch_size; i++) {
+            addrs[i] = buf + (page_size * i);
+        }
+
+        buffer_is_zero_dsa_batch_sync(NULL, (const void **)addrs,
+                                      batch_size,
+                                      page_size);
+    } else {
+        g_test_trap_subprocess(NULL, 0, 0);
+        g_test_trap_assert_failed();
+    }
+}
+
+static void test_oversized_batch(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+    qemu_dsa_start();
+
+    task = buffer_zero_batch_task_init(batch_size);
+
+    char buf[page_size * oversized_batch_size];
+    char *addrs[batch_size];
+    for (int i = 0; i < oversized_batch_size; i++) {
+        addrs[i] = buf + (page_size * i);
+    }
+
+    int ret = buffer_is_zero_dsa_batch_sync(task,
+                                            (const void **)addrs,
+                                            oversized_batch_size,
+                                            page_size);
+    g_assert(ret != 0);
+
+    buffer_zero_batch_task_destroy(task);
+
+    qemu_dsa_cleanup();
+}
+
+static void test_oversized_batch_async(void)
+{
+    test_oversized_batch();
+}
+
+static void test_zero_len_async(void)
+{
+    if (g_test_subprocess()) {
+        g_assert(!qemu_dsa_init(path1, errp));
+
+        task = buffer_zero_batch_task_init(batch_size);
+
+        char buf[page_size];
+
+        buffer_is_zero_dsa_batch_sync(task,
+                                      (const void **)&buf,
+                                      1,
+                                      0);
+
+        buffer_zero_batch_task_destroy(task);
+    } else {
+        g_test_trap_subprocess(NULL, 0, 0);
+        g_test_trap_assert_failed();
+    }
+}
+
+static void test_null_buf_async(void)
+{
+    if (g_test_subprocess()) {
+        g_assert(!qemu_dsa_init(path1, errp));
+
+        task = buffer_zero_batch_task_init(batch_size);
+
+        buffer_is_zero_dsa_batch_sync(task, NULL, 1, page_size);
+
+        buffer_zero_batch_task_destroy(task);
+    } else {
+        g_test_trap_subprocess(NULL, 0, 0);
+        g_test_trap_assert_failed();
+    }
+}
+
+static void test_batch(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+    qemu_dsa_start();
+
+    task = buffer_zero_batch_task_init(batch_size);
+
+    char buf[page_size * batch_size];
+    char *addrs[batch_size];
+    for (int i = 0; i < batch_size; i++) {
+        addrs[i] = buf + (page_size * i);
+    }
+
+    /*
+     * Using whatever is on the stack is somewhat random.
+     * Manually set some pages to zero and some to nonzero.
+     */
+    memset(buf + 0, 0, page_size * 10);
+    memset(buf + (10 * page_size), 0xff, page_size * 10);
+
+    buffer_is_zero_dsa_batch_sync(task,
+                                  (const void **)addrs,
+                                  batch_size,
+                                  page_size);
+
+    bool is_zero;
+    for (int i = 0; i < batch_size; i++) {
+        is_zero = buffer_is_zero((const void *)&buf[page_size * i], page_size);
+        g_assert(task->results[i] == is_zero);
+    }
+
+    buffer_zero_batch_task_destroy(task);
+
+    qemu_dsa_cleanup();
+}
+
+static void test_batch_async(void)
+{
+    test_batch();
+}
+
+static void test_page_fault(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+    qemu_dsa_start();
+
+    char *buf[2];
+    int prot = PROT_READ | PROT_WRITE;
+    int flags = MAP_SHARED | MAP_ANON;
+    buf[0] = (char *)mmap(NULL, page_size * batch_size, prot, flags, -1, 0);
+    assert(buf[0] != MAP_FAILED);
+    buf[1] = (char *)malloc(page_size * batch_size);
+    assert(buf[1] != NULL);
+
+    for (int j = 0; j < 2; j++) {
+        task = buffer_zero_batch_task_init(batch_size);
+
+        char *addrs[batch_size];
+        for (int i = 0; i < batch_size; i++) {
+            addrs[i] = buf[j] + (page_size * i);
+        }
+
+        buffer_is_zero_dsa_batch_sync(task,
+                                      (const void **)addrs,
+                                      batch_size,
+                                      page_size);
+
+        bool is_zero;
+        for (int i = 0; i < batch_size; i++) {
+            is_zero = buffer_is_zero((const void *)&buf[j][page_size * i],
+                                      page_size);
+            g_assert(task->results[i] == is_zero);
+        }
+        buffer_zero_batch_task_destroy(task);
+    }
+
+    assert(!munmap(buf[0], page_size * batch_size));
+    free(buf[1]);
+    qemu_dsa_cleanup();
+}
+
+static void test_various_buffer_sizes(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+    qemu_dsa_start();
+
+    char *buf = malloc(max_buffer_size * batch_size);
+    char *addrs[batch_size];
+
+    for (int len = 16; len <= max_buffer_size; len *= 2) {
+        task = buffer_zero_batch_task_init(batch_size);
+
+        for (int i = 0; i < batch_size; i++) {
+            addrs[i] = buf + (len * i);
+        }
+
+        buffer_is_zero_dsa_batch_sync(task,
+                                      (const void **)addrs,
+                                      batch_size,
+                                      len);
+
+        bool is_zero;
+        for (int j = 0; j < batch_size; j++) {
+            is_zero = buffer_is_zero((const void *)&buf[len * j], len);
+            g_assert(task->results[j] == is_zero);
+        }
+
+        buffer_zero_batch_task_destroy(task);
+    }
+
+    free(buf);
+
+    qemu_dsa_cleanup();
+}
+
+static void test_various_buffer_sizes_async(void)
+{
+    test_various_buffer_sizes();
+}
+
+static void test_double_start_stop(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+    /* Double start */
+    qemu_dsa_start();
+    qemu_dsa_start();
+    g_assert(qemu_dsa_is_running());
+    do_single_task();
+
+    /* Double stop */
+    qemu_dsa_stop();
+    g_assert(!qemu_dsa_is_running());
+    qemu_dsa_stop();
+    g_assert(!qemu_dsa_is_running());
+
+    /* Restart */
+    qemu_dsa_start();
+    g_assert(qemu_dsa_is_running());
+    do_single_task();
+    qemu_dsa_cleanup();
+}
+
+static void test_is_running(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+
+    g_assert(!qemu_dsa_is_running());
+    qemu_dsa_start();
+    g_assert(qemu_dsa_is_running());
+    qemu_dsa_stop();
+    g_assert(!qemu_dsa_is_running());
+    qemu_dsa_cleanup();
+}
+
+static void test_multiple_engines(void)
+{
+    g_assert(!qemu_dsa_init(path2, errp));
+    qemu_dsa_start();
+
+    QemuDsaBatchTask *tasks[num_devices];
+    char bufs[num_devices][page_size * batch_size];
+    char *addrs[num_devices][batch_size];
+
+    /*
+     *  This is a somewhat implementation-specific way
+     *  of testing that the tasks have unique engines
+     *  assigned to them.
+     */
+    tasks[0] = buffer_zero_batch_task_init(batch_size);
+    tasks[1] = buffer_zero_batch_task_init(batch_size);
+    g_assert(tasks[0]->device != tasks[1]->device);
+
+    for (int i = 0; i < num_devices; i++) {
+        for (int j = 0; j < batch_size; j++) {
+            addrs[i][j] = bufs[i] + (page_size * j);
+        }
+
+        buffer_is_zero_dsa_batch_sync(tasks[i],
+                                      (const void **)addrs[i],
+                                      batch_size, page_size);
+
+        bool is_zero;
+        for (int j = 0; j < batch_size; j++) {
+            is_zero = buffer_is_zero((const void *)&bufs[i][page_size * j],
+                                     page_size);
+            g_assert(tasks[i]->results[j] == is_zero);
+        }
+    }
+
+    buffer_zero_batch_task_destroy(tasks[0]);
+    buffer_zero_batch_task_destroy(tasks[1]);
+
+    qemu_dsa_cleanup();
+}
+
+static void test_configure_dsa_twice(void)
+{
+    g_assert(!qemu_dsa_init(path2, errp));
+    g_assert(!qemu_dsa_init(path2, errp));
+    qemu_dsa_start();
+    do_single_task();
+    qemu_dsa_cleanup();
+}
+
+static void test_configure_dsa_bad_path(void)
+{
+    const strList *bad_path = &(strList) {
+        .value = (char *)"/not/a/real/path", .next = NULL
+    };
+    g_assert(qemu_dsa_init(bad_path, errp));
+}
+
+static void test_cleanup_before_configure(void)
+{
+    qemu_dsa_cleanup();
+    g_assert(!qemu_dsa_init(path2, errp));
+}
+
+static void test_configure_dsa_num_devices(void)
+{
+    g_assert(!qemu_dsa_init(path1, errp));
+    qemu_dsa_start();
+
+    do_single_task();
+    qemu_dsa_stop();
+    qemu_dsa_cleanup();
+}
+
+static void test_cleanup_twice(void)
+{
+    g_assert(!qemu_dsa_init(path2, errp));
+    qemu_dsa_cleanup();
+    qemu_dsa_cleanup();
+
+    g_assert(!qemu_dsa_init(path2, errp));
+    qemu_dsa_start();
+    do_single_task();
+    qemu_dsa_cleanup();
+}
+
+static int check_test_setup(void)
+{
+    const strList *path[2] = {path1, path2};
+    for (int i = 0; i < sizeof(path) / sizeof(strList *); i++) {
+        if (qemu_dsa_init(path[i], errp)) {
+            return -1;
+        }
+        qemu_dsa_cleanup();
+    }
+    return 0;
+}
+
+int main(int argc, char **argv)
+{
+    g_test_init(&argc, &argv, NULL);
+
+    if (check_test_setup() != 0) {
+        /*
+         * This test requires extra setup. The current
+         * setup is not correct. Just skip this test
+         * for now.
+         */
+        exit(0);
+    }
+
+    if (num_devices > 1) {
+        g_test_add_func("/dsa/multiple_engines", test_multiple_engines);
+    }
+
+    g_test_add_func("/dsa/async/batch", test_batch_async);
+    g_test_add_func("/dsa/async/various_buffer_sizes",
+                    test_various_buffer_sizes_async);
+    g_test_add_func("/dsa/async/null_buf", test_null_buf_async);
+    g_test_add_func("/dsa/async/zero_len", test_zero_len_async);
+    g_test_add_func("/dsa/async/oversized_batch", test_oversized_batch_async);
+    g_test_add_func("/dsa/async/zero_count", test_zero_count_async);
+    g_test_add_func("/dsa/async/single_zero", test_single_zero_async);
+    g_test_add_func("/dsa/async/single_nonzero", test_single_nonzero_async);
+    g_test_add_func("/dsa/async/null_task", test_null_task_async);
+    g_test_add_func("/dsa/async/page_fault", test_page_fault);
+
+    g_test_add_func("/dsa/double_start_stop", test_double_start_stop);
+    g_test_add_func("/dsa/is_running", test_is_running);
+
+    g_test_add_func("/dsa/configure_dsa_twice", test_configure_dsa_twice);
+    g_test_add_func("/dsa/configure_dsa_bad_path", test_configure_dsa_bad_path);
+    g_test_add_func("/dsa/cleanup_before_configure",
+                    test_cleanup_before_configure);
+    g_test_add_func("/dsa/configure_dsa_num_devices",
+                    test_configure_dsa_num_devices);
+    g_test_add_func("/dsa/cleanup_twice", test_cleanup_twice);
+
+    return g_test_run();
+}
-- 
Yichen Wang



^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH v5 13/13] migration/multifd: Add integration tests for multifd with Intel DSA offloading.
  2024-07-11 22:04 [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Yichen Wang
  2024-07-11 22:04 ` [PATCH v5 11/13] migration/multifd: Add migration option set packet size Yichen Wang
  2024-07-11 22:04 ` [PATCH v5 12/13] util/dsa: Add unit test coverage for Intel DSA task submission and completion Yichen Wang
@ 2024-07-11 22:04 ` Yichen Wang
  2024-07-17 14:41 ` [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Fabiano Rosas
  3 siblings, 0 replies; 8+ messages in thread
From: Yichen Wang @ 2024-07-11 22:04 UTC (permalink / raw)
  To: Paolo Bonzini, Marc-André Lureau, Daniel P. Berrangé,
	Thomas Huth, Philippe Mathieu-Daudé, Peter Xu, Fabiano Rosas,
	Eric Blake, Markus Armbruster, Michael S. Tsirkin, Cornelia Huck,
	qemu-devel
  Cc: Hao Xiang, Liu, Yuan1, Shivam Kumar, Ho-Ren (Jack) Chuang,
	Yichen Wang, Bryan Zhang

From: Hao Xiang <hao.xiang@linux.dev>

* Add test case to start and complete multifd live migration with DSA
offloading enabled.
* Add test case to start and cancel multifd live migration with DSA
offloading enabled.

Signed-off-by: Bryan Zhang <bryan.zhang@bytedance.com>
Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
---
 tests/qtest/migration-test.c | 80 +++++++++++++++++++++++++++++++++++-
 1 file changed, 79 insertions(+), 1 deletion(-)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 70b606b888..67cd976705 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -608,6 +608,13 @@ typedef struct {
     bool suspend_me;
 } MigrateStart;
 
+/*
+ * It requires separate steps to configure and enable DSA device.
+ * This test assumes that the configuration is done already.
+ */
+static const char *dsa_dev_path_p = "['/dev/dsa/wq4.0']";
+static const char *dsa_dev_path = "/dev/dsa/wq4.0";
+
 /*
  * A hook that runs after the src and dst QEMUs have been
  * created, but before the migration is started. This can
@@ -3279,7 +3286,7 @@ static void test_multifd_tcp_tls_x509_reject_anon_client(void)
  *
  *  And see that it works
  */
-static void test_multifd_tcp_cancel(void)
+static void test_multifd_tcp_cancel_common(bool use_dsa)
 {
     MigrateStart args = {
         .hide_stderr = true,
@@ -3299,6 +3306,11 @@ static void test_multifd_tcp_cancel(void)
     migrate_set_capability(from, "multifd", true);
     migrate_set_capability(to, "multifd", true);
 
+    if (use_dsa) {
+        migrate_set_parameter_str(from, "zero-page-detection", "dsa-accel");
+        migrate_set_parameter_str(from, "dsa-accel-path", dsa_dev_path_p);
+    }
+
     /* Start incoming migration from the 1st socket */
     migrate_incoming_qmp(to, "tcp:127.0.0.1:0", "{}");
 
@@ -3348,6 +3360,49 @@ static void test_multifd_tcp_cancel(void)
     test_migrate_end(from, to2, true);
 }
 
+/*
+ * This test does:
+ *  source               target
+ *                       migrate_incoming
+ *     migrate
+ *     migrate_cancel
+ *                       launch another target
+ *     migrate
+ *
+ *  And see that it works
+ */
+static void test_multifd_tcp_cancel(void)
+{
+    test_multifd_tcp_cancel_common(false);
+}
+
+#ifdef CONFIG_DSA_OPT
+
+static void *test_migrate_precopy_tcp_multifd_start_dsa(QTestState *from,
+                                                        QTestState *to)
+{
+    migrate_set_parameter_str(from, "zero-page-detection", "dsa-accel");
+    migrate_set_parameter_str(from, "dsa-accel-path", dsa_dev_path_p);
+    return test_migrate_precopy_tcp_multifd_start_common(from, to, "none");
+}
+
+static void test_multifd_tcp_zero_page_dsa(void)
+{
+    MigrateCommon args = {
+        .listen_uri = "defer",
+        .start_hook = test_migrate_precopy_tcp_multifd_start_dsa,
+    };
+
+    test_precopy_common(&args);
+}
+
+static void test_multifd_tcp_cancel_dsa(void)
+{
+    test_multifd_tcp_cancel_common(true);
+}
+
+#endif
+
 static void calc_dirty_rate(QTestState *who, uint64_t calc_time)
 {
     qtest_qmp_assert_success(who,
@@ -3772,6 +3827,19 @@ static bool kvm_dirty_ring_supported(void)
 #endif
 }
 
+#ifdef CONFIG_DSA_OPT
+static int test_dsa_setup(void)
+{
+    int fd;
+    fd = open(dsa_dev_path, O_RDWR);
+    if (fd < 0) {
+        return -1;
+    }
+    close(fd);
+    return 0;
+}
+#endif
+
 int main(int argc, char **argv)
 {
     bool has_kvm, has_tcg;
@@ -3984,6 +4052,16 @@ int main(int argc, char **argv)
                        test_multifd_tcp_zero_page_legacy);
     migration_test_add("/migration/multifd/tcp/plain/zero-page/none",
                        test_multifd_tcp_no_zero_page);
+
+#ifdef CONFIG_DSA_OPT
+    if (g_str_equal(arch, "x86_64") && test_dsa_setup() == 0) {
+        migration_test_add("/migration/multifd/tcp/plain/zero-page/dsa",
+                       test_multifd_tcp_zero_page_dsa);
+        migration_test_add("/migration/multifd/tcp/plain/cancel/dsa",
+                       test_multifd_tcp_cancel_dsa);
+    }
+#endif
+
     migration_test_add("/migration/multifd/tcp/plain/cancel",
                        test_multifd_tcp_cancel);
     migration_test_add("/migration/multifd/tcp/plain/zlib",
-- 
Yichen Wang



^ permalink raw reply related	[flat|nested] 8+ messages in thread

* Re: [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path.
  2024-07-11 22:04 [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Yichen Wang
                   ` (2 preceding siblings ...)
  2024-07-11 22:04 ` [PATCH v5 13/13] migration/multifd: Add integration tests for multifd with Intel DSA offloading Yichen Wang
@ 2024-07-17 14:41 ` Fabiano Rosas
  2024-09-09 23:31   ` [External] " Yichen Wang
  3 siblings, 1 reply; 8+ messages in thread
From: Fabiano Rosas @ 2024-07-17 14:41 UTC (permalink / raw)
  To: Yichen Wang, Paolo Bonzini, Marc-André Lureau,
	Daniel P. Berrangé, Thomas Huth, Philippe Mathieu-Daudé,
	Peter Xu, Eric Blake, Markus Armbruster, Michael S. Tsirkin,
	Cornelia Huck, qemu-devel
  Cc: Hao Xiang, Liu, Yuan1, Shivam Kumar, Ho-Ren (Jack) Chuang,
	Yichen Wang

Yichen Wang <yichen.wang@bytedance.com> writes:

> From: Hao Xiang <hao.xiang@linux.dev>
>
> Multifd sender path gets an array of pages queued by the migration
> thread. It performs zero page checking on every page in the array.
> The pages are classfied as either a zero page or a normal page. This
> change uses Intel DSA to offload the zero page checking from CPU to
> the DSA accelerator. The sender thread submits a batch of pages to DSA
> hardware and waits for the DSA completion thread to signal for work
> completion.
>
> Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> ---
>  include/qemu/dsa.h            |   4 +-

This patch should have no changes to dsa code. Put them in the patches
that introduce them.

>  migration/migration.c         |   2 +-
>  migration/multifd-zero-page.c | 100 ++++++++++++++++++++++++++++++++--
>  migration/multifd.c           |  43 ++++++++++++++-
>  migration/multifd.h           |   2 +-
>  util/dsa.c                    |  23 ++++----

Same with these.

>  6 files changed, 150 insertions(+), 24 deletions(-)
>
> diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
> index fd0305a7c7..a3b502ee41 100644
> --- a/include/qemu/dsa.h
> +++ b/include/qemu/dsa.h
> @@ -83,7 +83,7 @@ typedef struct QemuDsaBatchTask {
>   *
>   * @return int Zero if successful, otherwise non zero.
>   */
> -int qemu_dsa_init(const char *dsa_parameter, Error **errp);
> +int qemu_dsa_init(const strList *dsa_parameter, Error **errp);
>  
>  /**
>   * @brief Start logic to enable using DSA.
> @@ -146,7 +146,7 @@ static inline bool qemu_dsa_is_running(void)
>      return false;
>  }
>  
> -static inline int qemu_dsa_init(const char *dsa_parameter, Error **errp)
> +static inline int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
>  {
>      error_setg(errp, "DSA accelerator is not enabled.");
>      return -1;
> diff --git a/migration/migration.c b/migration/migration.c
> index 3dea06d577..085395b900 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -3469,7 +3469,7 @@ static void *migration_thread(void *opaque)
>      object_ref(OBJECT(s));
>      update_iteration_initial_status(s);
>  
> -    if (!multifd_send_setup()) {
> +    if (!multifd_send_setup(&local_err)) {

This is interesting, probably more correct than what we're doing
today. But you need to hoist the error handling out of
multifd_send_setup into here. And put this in a separate patch because
it is an improvement on its own.

>          goto out;
>      }
>  
> diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c

The way git generated this diff makes it hard to review. When this
happens, you can use a different algorithm such as --patience when
generating the patches. Compare git show vs. git show --patience to see
the difference.

> index e1b8370f88..ffb5611d44 100644
> --- a/migration/multifd-zero-page.c
> +++ b/migration/multifd-zero-page.c
> @@ -37,25 +37,84 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
>  }
>  
>  /**
> - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> + * zero_page_detect_cpu: Perform zero page detection using CPU.
>   *
>   * Sorts normal pages before zero pages in p->pages->offset and updates
>   * p->pages->normal_num.

Probably best to carry this part along as well. This is the public
function that people will most likely look at first.

>   *
>   * @param p A pointer to the send params.
>   */
> -void multifd_send_zero_page_detect(MultiFDSendParams *p)
> +static void zero_page_detect_cpu(MultiFDSendParams *p)
> {
>      MultiFDPages_t *pages = p->pages;
>      RAMBlock *rb = pages->block;
>      int i = 0;
>      int j = pages->num - 1;
>  
> -    if (!multifd_zero_page_enabled()) {
> -        pages->normal_num = pages->num;
> +    /*
> +     * Sort the page offset array by moving all normal pages to
> +     * the left and all zero pages to the right of the array.
> +     */
> +    while (i <= j) {
> +        uint64_t offset = pages->offset[i];
> +
> +        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> +            i++;
> +            continue;
> +        }
> +
> +        swap_page_offset(pages->offset, i, j);
> +        ram_release_page(rb->idstr, offset);
> +        j--;
> +    }
> +
> +    pages->normal_num = i;
> +}
> +
> +
> +#ifdef CONFIG_DSA_OPT
> +
> +static void swap_result(bool *results, int a, int b)
> +{
> +    bool temp;
> +
> +    if (a == b) {
>          return;
>      }
>  
> +    temp = results[a];
> +    results[a] = results[b];
> +    results[b] = temp;
> +}
> +
> +/**
> + * zero_page_detect_dsa: Perform zero page detection using
> + * Intel Data Streaming Accelerator (DSA).
> + *
> + * Sorts normal pages before zero pages in p->pages->offset and updates
> + * p->pages->normal_num.
> + *
> + * @param p A pointer to the send params.
> + */
> +static void zero_page_detect_dsa(MultiFDSendParams *p)
> +{
> +    MultiFDPages_t *pages = p->pages;

Actually use the pages variable all over instead of dereferencing
p->pages again.

> +    RAMBlock *rb = pages->block;
> +    bool *results = p->dsa_batch_task->results;

I think we had a suggestion from Peter to not carry the batch task in
the channel parameters, no?

> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        p->dsa_batch_task->addr[i] =
> +            (ram_addr_t)(rb->host + p->pages->offset[i]);
> +    }
> +
> +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
> +                                  (const void **)p->dsa_batch_task->addr,
> +                                  p->pages->num,
> +                                  p->page_size);
> +
> +    int i = 0;
> +    int j = pages->num - 1;
> +
>      /*
>       * Sort the page offset array by moving all normal pages to
>       * the left and all zero pages to the right of the array.
> @@ -63,11 +122,12 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
>      while (i <= j) {
>          uint64_t offset = pages->offset[i];
>  
> -        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> +        if (!results[i]) {
>              i++;
>              continue;
>          }
>  
> +        swap_result(results, i, j);
>          swap_page_offset(pages->offset, i, j);
>          ram_release_page(rb->idstr, offset);
>          j--;
> @@ -76,6 +136,15 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
>      pages->normal_num = i;
>  }
>  
> +#else
> +
> +static void zero_page_detect_dsa(MultiFDSendParams *p)
> +{
> +    exit(1);

g_assert_not_reached();

> +}
> +
> +#endif
> +
>  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>  {
>      for (int i = 0; i < p->zero_num; i++) {
> @@ -87,3 +156,24 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>          }
>      }
>  }
> +
> +/**
> + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> + *
> + * @param p A pointer to the send params.
> + */
> +void multifd_send_zero_page_detect(MultiFDSendParams *p)
> +{
> +    MultiFDPages_t *pages = p->pages;
> +
> +    if (!multifd_zero_page_enabled()) {
> +        pages->normal_num = pages->num;
> +        return;
> +    }
> +
> +    if (qemu_dsa_is_running()) {
> +        zero_page_detect_dsa(p);
> +    } else {
> +        zero_page_detect_cpu(p);
> +    }
> +}
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 6f8edd4b6a..014fee757a 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -817,6 +817,32 @@ static void multifd_send_cleanup_state(void)
>      multifd_send_state = NULL;
>  }
>  
> +static bool multifd_dsa_setup(MigrationState *s, const char *role, Error **errp)

You don't need MigrationState here. You can call the function only from
multifd_send_setup() and use migrate_zero_page_detection() to check for
DSA.

> +{
> +    /*
> +     * Only setup DSA when needed. Currently, DSA is only used for zero page
> +     * detection, which is only needed on sender side.
> +     */
> +    if (!s ||
> +        s->parameters.zero_page_detection != ZERO_PAGE_DETECTION_DSA_ACCEL) {
> +        return true;
> +    }
> +
> +    const strList *dsa_parameter = migrate_dsa_accel_path();
> +    if (qemu_dsa_init(dsa_parameter, errp)) {
> +        error_setg(errp, "multifd: %s failed to initialize DSA.", role);
> +        return false;
> +    }
> +    qemu_dsa_start();
> +
> +    return true;
> +}
> +
> +static void multifd_dsa_cleanup(void)
> +{
> +    qemu_dsa_cleanup();
> +}

Hmm, these two functions seem to fit better in multifd-zero-page.c.

> +
>  void multifd_send_shutdown(void)
>  {
>      int i;
> @@ -827,6 +853,8 @@ void multifd_send_shutdown(void)
>  
>      multifd_send_terminate_threads();
>  
> +    multifd_dsa_cleanup();
> +
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>          Error *local_err = NULL;
> @@ -1156,7 +1184,7 @@ static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
>      return true;
>  }
>  
> -bool multifd_send_setup(void)
> +bool multifd_send_setup(Error **errp)
>  {
>      MigrationState *s = migrate_get_current();
>      Error *local_err = NULL;

Remove this and use errp instead everywhere.

> @@ -1169,6 +1197,10 @@ bool multifd_send_setup(void)
>          return true;
>      }
>  
> +    if (!multifd_dsa_setup(s, "Sender", errp)) {
> +        return false;
> +    }
> +
>      thread_count = migrate_multifd_channels();
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> @@ -1395,6 +1427,7 @@ void multifd_recv_cleanup(void)
>              qemu_thread_join(&p->thread);
>          }
>      }
> +    multifd_dsa_cleanup();
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
>      }
> @@ -1570,6 +1603,7 @@ int multifd_recv_setup(Error **errp)
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>      bool use_packets = multifd_use_packets();
>      uint8_t i;
> +    int ret;
>  
>      /*
>       * Return successfully if multiFD recv state is already initialised
> @@ -1579,6 +1613,10 @@ int multifd_recv_setup(Error **errp)
>          return 0;
>      }
>  
> +    if (!multifd_dsa_setup(NULL, "Receiver", errp)) {
> +        return -1;
> +    }

Is there a reason to call this here?

> +
>      thread_count = migrate_multifd_channels();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> @@ -1617,13 +1655,12 @@ int multifd_recv_setup(Error **errp)
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -        int ret;
> -

This is a separate cleanup patch.

>          ret = multifd_recv_state->ops->recv_setup(p, errp);
>          if (ret) {
>              return ret;
>          }
>      }
> +

Avoid introducing extra lines for no reason, this leads to git conflicts
sometimes.

>      return 0;
>  }
>  
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 027f57bf4e..871e3aa063 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -18,7 +18,7 @@
>  
>  typedef struct MultiFDRecvData MultiFDRecvData;
>  
> -bool multifd_send_setup(void);
> +bool multifd_send_setup(Error **errp);
>  void multifd_send_shutdown(void);
>  void multifd_send_channel_created(void);
>  int multifd_recv_setup(Error **errp);
> diff --git a/util/dsa.c b/util/dsa.c
> index 5aba1ae23a..44b1130a51 100644
> --- a/util/dsa.c
> +++ b/util/dsa.c
> @@ -116,27 +116,27 @@ dsa_device_cleanup(QemuDsaDevice *instance)
>   */
>  static int
>  dsa_device_group_init(QemuDsaDeviceGroup *group,
> -                      const char *dsa_parameter,
> +                      const strList *dsa_parameter,
>                        Error **errp)
>  {
> -    if (dsa_parameter == NULL || strlen(dsa_parameter) == 0) {
> -        return 0;
> +    if (dsa_parameter == NULL) {
> +        /* HACKING ALERT. */
> +        /* return 0; */
> +        dsa_parameter = &(strList) {
> +            .value = (char *)"/dev/dsa/wq0.0", .next = NULL
> +        };
>      }
>  
>      int ret = 0;
> -    char *local_dsa_parameter = g_strdup(dsa_parameter);
>      const char *dsa_path[MAX_DSA_DEVICES];
>      int num_dsa_devices = 0;
> -    char delim[2] = " ";
>  
> -    char *current_dsa_path = strtok(local_dsa_parameter, delim);
> -
> -    while (current_dsa_path != NULL) {
> -        dsa_path[num_dsa_devices++] = current_dsa_path;
> +    while (dsa_parameter) {
> +        dsa_path[num_dsa_devices++] = dsa_parameter->value;
>          if (num_dsa_devices == MAX_DSA_DEVICES) {
>              break;
>          }
> -        current_dsa_path = strtok(NULL, delim);
> +        dsa_parameter = dsa_parameter->next;
>      }
>  
>      group->dsa_devices =
> @@ -161,7 +161,6 @@ dsa_device_group_init(QemuDsaDeviceGroup *group,
>      }
>  
>  exit:
> -    g_free(local_dsa_parameter);
>      return ret;
>  }
>  
> @@ -718,7 +717,7 @@ dsa_globals_init(void)
>   *
>   * @return int Zero if successful, otherwise non zero.
>   */
> -int qemu_dsa_init(const char *dsa_parameter, Error **errp)
> +int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
>  {
>      dsa_globals_init();


^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH v5 11/13] migration/multifd: Add migration option set packet size.
  2024-07-11 22:04 ` [PATCH v5 11/13] migration/multifd: Add migration option set packet size Yichen Wang
@ 2024-07-17 14:59   ` Fabiano Rosas
  2024-08-21 21:16     ` Peter Xu
  0 siblings, 1 reply; 8+ messages in thread
From: Fabiano Rosas @ 2024-07-17 14:59 UTC (permalink / raw)
  To: Yichen Wang, Paolo Bonzini, Marc-André Lureau,
	Daniel P. Berrangé, Thomas Huth, Philippe Mathieu-Daudé,
	Peter Xu, Eric Blake, Markus Armbruster, Michael S. Tsirkin,
	Cornelia Huck, qemu-devel
  Cc: Hao Xiang, Liu, Yuan1, Shivam Kumar, Ho-Ren (Jack) Chuang,
	Yichen Wang

Yichen Wang <yichen.wang@bytedance.com> writes:

> From: Hao Xiang <hao.xiang@linux.dev>
>
> During live migration, if the latency between sender and receiver is
> high and bandwidth is also high (a long and fat pipe), using a bigger
> packet size can help reduce migration total time. The current multifd
> packet size is 128 * 4kb. In addition, Intel DSA offloading performs
> better with a large batch task.

Last time we measured, mapped-ram also performed slightly better with a
larger packet size:

        2 MiB   1 MiB   512 KiB  256 KiB  128 KiB
AVG(10) 50814   50396     48732    46423    34574
DEV       736     552       619      473     1430

>
> This change adds an option to set the packet size, which is also useful
> for performance tunin. Both sender and receiver needs to set the same
> packet size for things to work.
>
> Set the option:
> migrate_set_parameter multifd-packet-size 4190208
>
> Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> ---
>  migration/migration-hmp-cmds.c |  7 +++++++
>  migration/multifd-zlib.c       |  6 ++++--
>  migration/multifd-zstd.c       |  6 ++++--
>  migration/multifd.c            |  6 ++++--
>  migration/multifd.h            |  3 ---
>  migration/options.c            | 38 ++++++++++++++++++++++++++++++++++
>  migration/options.h            |  1 +
>  qapi/migration.json            | 21 ++++++++++++++++---
>  8 files changed, 76 insertions(+), 12 deletions(-)
>
> diff --git a/migration/migration-hmp-cmds.c b/migration/migration-hmp-cmds.c
> index c422db4ecd..27ba0ce79a 100644
> --- a/migration/migration-hmp-cmds.c
> +++ b/migration/migration-hmp-cmds.c
> @@ -292,6 +292,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
>          monitor_printf(mon, "%s: %u ms\n",
>              MigrationParameter_str(MIGRATION_PARAMETER_X_CHECKPOINT_DELAY),
>              params->x_checkpoint_delay);
> +        monitor_printf(mon, "%s: %" PRIu64 "\n",
> +            MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_PACKET_SIZE),
> +            params->multifd_packet_size);
>          monitor_printf(mon, "%s: %u\n",
>              MigrationParameter_str(MIGRATION_PARAMETER_MULTIFD_CHANNELS),
>              params->multifd_channels);
> @@ -576,6 +579,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>          p->has_dsa_accel_path = true;
>          visit_type_strList(v, param, &p->dsa_accel_path, &err);
>          break;
> +    case MIGRATION_PARAMETER_MULTIFD_PACKET_SIZE:
> +        p->has_multifd_packet_size = true;
> +        visit_type_size(v, param, &p->multifd_packet_size, &err);
> +        break;
>      case MIGRATION_PARAMETER_MULTIFD_CHANNELS:
>          p->has_multifd_channels = true;
>          visit_type_uint8(v, param, &p->multifd_channels, &err);
> diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
> index 2ced69487e..bd900fe575 100644
> --- a/migration/multifd-zlib.c
> +++ b/migration/multifd-zlib.c
> @@ -49,6 +49,7 @@ static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
>      struct zlib_data *z = g_new0(struct zlib_data, 1);
>      z_stream *zs = &z->zs;
>      const char *err_msg;
> +    uint64_t multifd_packet_size = migrate_multifd_packet_size();
>  
>      zs->zalloc = Z_NULL;
>      zs->zfree = Z_NULL;
> @@ -58,7 +59,7 @@ static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
>          goto err_free_z;
>      }
>      /* This is the maximum size of the compressed buffer */
> -    z->zbuff_len = compressBound(MULTIFD_PACKET_SIZE);
> +    z->zbuff_len = compressBound(multifd_packet_size);
>      z->zbuff = g_try_malloc(z->zbuff_len);
>      if (!z->zbuff) {
>          err_msg = "out of memory for zbuff";
> @@ -200,6 +201,7 @@ out:
>   */
>  static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
>  {
> +    uint64_t multifd_packet_size = migrate_multifd_packet_size();
>      struct zlib_data *z = g_new0(struct zlib_data, 1);
>      z_stream *zs = &z->zs;
>  
> @@ -214,7 +216,7 @@ static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
>          return -1;
>      }
>      /* To be safe, we reserve twice the size of the packet */
> -    z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
> +    z->zbuff_len = multifd_packet_size * 2;
>      z->zbuff = g_try_malloc(z->zbuff_len);
>      if (!z->zbuff) {
>          inflateEnd(zs);
> diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
> index ca17b7e310..8aaa7363be 100644
> --- a/migration/multifd-zstd.c
> +++ b/migration/multifd-zstd.c
> @@ -49,6 +49,7 @@ struct zstd_data {
>   */
>  static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
>  {
> +    uint64_t multifd_packet_size = migrate_multifd_packet_size();
>      struct zstd_data *z = g_new0(struct zstd_data, 1);
>      int res;
>  
> @@ -68,7 +69,7 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
>          return -1;
>      }
>      /* This is the maximum size of the compressed buffer */
> -    z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE);
> +    z->zbuff_len = ZSTD_compressBound(multifd_packet_size);
>      z->zbuff = g_try_malloc(z->zbuff_len);
>      if (!z->zbuff) {
>          ZSTD_freeCStream(z->zcs);
> @@ -188,6 +189,7 @@ out:
>   */
>  static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
>  {
> +    uint64_t multifd_packet_size = migrate_multifd_packet_size();
>      struct zstd_data *z = g_new0(struct zstd_data, 1);
>      int ret;
>  
> @@ -209,7 +211,7 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
>      }
>  
>      /* To be safe, we reserve twice the size of the packet */
> -    z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
> +    z->zbuff_len = multifd_packet_size * 2;
>      z->zbuff = g_try_malloc(z->zbuff_len);
>      if (!z->zbuff) {
>          ZSTD_freeDStream(z->zds);
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 014fee757a..87ed421364 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -1189,7 +1189,8 @@ bool multifd_send_setup(Error **errp)
>      MigrationState *s = migrate_get_current();
>      Error *local_err = NULL;
>      int thread_count, ret = 0;
> -    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    uint32_t page_count =
> +        migrate_multifd_packet_size() / qemu_target_page_size();
>      bool use_packets = multifd_use_packets();
>      uint8_t i;
>  
> @@ -1600,7 +1601,8 @@ static void *multifd_recv_thread(void *opaque)
>  int multifd_recv_setup(Error **errp)
>  {
>      int thread_count;
> -    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> +    uint32_t page_count =
> +        migrate_multifd_packet_size() / qemu_target_page_size();
>      bool use_packets = multifd_use_packets();
>      uint8_t i;
>      int ret;
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 871e3aa063..63cec33c61 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -44,9 +44,6 @@ MultiFDRecvData *multifd_get_recv_data(void);
>  #define MULTIFD_FLAG_QPL (4 << 1)
>  #define MULTIFD_FLAG_UADK (8 << 1)
>  
> -/* This value needs to be a multiple of qemu_target_page_size() */
> -#define MULTIFD_PACKET_SIZE (512 * 1024)
> -
>  typedef struct {
>      uint32_t magic;
>      uint32_t version;
> diff --git a/migration/options.c b/migration/options.c
> index f839493016..1417fa6ab0 100644
> --- a/migration/options.c
> +++ b/migration/options.c
> @@ -73,6 +73,12 @@
>  #define DEFAULT_MIGRATE_ANNOUNCE_ROUNDS    5
>  #define DEFAULT_MIGRATE_ANNOUNCE_STEP    100
>  
> +/*
> + * Parameter for multifd packet size.
> + */
> +#define DEFAULT_MIGRATE_MULTIFD_PACKET_SIZE (128 * 4 * 1024)
> +#define MAX_MIGRATE_MULTIFD_PACKET_SIZE (1023 * 4 * 1024)

Why 1023?

> +
>  #define DEFINE_PROP_MIG_CAP(name, x)             \
>      DEFINE_PROP_BOOL(name, MigrationState, capabilities[x], false)
>  
> @@ -167,6 +173,9 @@ Property migration_properties[] = {
>      /*                    parameters.dsa_accel_path, qdev_prop_string, char *), */
>      /* DEFINE_PROP_STRING("dsa-accel-path", MigrationState, */
>      /*                    parameters.dsa_accel_path), */
> +    DEFINE_PROP_SIZE("multifd-packet-size", MigrationState,
> +                     parameters.multifd_packet_size,
> +                     DEFAULT_MIGRATE_MULTIFD_PACKET_SIZE),
>  
>      /* Migration capabilities */
>      DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
> @@ -777,6 +786,13 @@ int migrate_multifd_channels(void)
>      return s->parameters.multifd_channels;
>  }
>  
> +uint64_t migrate_multifd_packet_size(void)
> +{
> +    MigrationState *s = migrate_get_current();
> +
> +    return s->parameters.multifd_packet_size;
> +}
> +
>  MultiFDCompression migrate_multifd_compression(void)
>  {
>      MigrationState *s = migrate_get_current();
> @@ -898,6 +914,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
>      params->downtime_limit = s->parameters.downtime_limit;
>      params->has_x_checkpoint_delay = true;
>      params->x_checkpoint_delay = s->parameters.x_checkpoint_delay;
> +    params->has_multifd_packet_size = true;
> +    params->multifd_packet_size = s->parameters.multifd_packet_size;
>      params->has_multifd_channels = true;
>      params->multifd_channels = s->parameters.multifd_channels;
>      params->has_multifd_compression = true;
> @@ -957,6 +975,7 @@ void migrate_params_init(MigrationParameters *params)
>      params->has_max_bandwidth = true;
>      params->has_downtime_limit = true;
>      params->has_x_checkpoint_delay = true;
> +    params->has_multifd_packet_size = true;
>      params->has_multifd_channels = true;
>      params->has_multifd_compression = true;
>      params->has_multifd_zlib_level = true;
> @@ -1038,6 +1057,19 @@ bool migrate_params_check(MigrationParameters *params, Error **errp)
>  
>      /* x_checkpoint_delay is now always positive */
>  
> +    if (params->has_multifd_packet_size &&
> +        ((params->multifd_packet_size < DEFAULT_MIGRATE_MULTIFD_PACKET_SIZE) ||
> +            (params->multifd_packet_size >  MAX_MIGRATE_MULTIFD_PACKET_SIZE) ||
> +            (params->multifd_packet_size % qemu_target_page_size() != 0))) {
> +        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
> +                    "multifd_packet_size",
> +                    "an integer in the range of "
> +                    stringify(DEFAULT_MIGRATE_MULTIFD_PACKET_SIZE)
> +                    " to "stringify(MAX_MIGRATE_MULTIFD_PACKET_SIZE)", "
> +                    "and must be a multiple of guest VM's page size.");
> +        return false;
> +    }
> +
>      if (params->has_multifd_channels && (params->multifd_channels < 1)) {
>          error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
>                     "multifd_channels",
> @@ -1219,6 +1251,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
>          dest->x_checkpoint_delay = params->x_checkpoint_delay;
>      }
>  
> +    if (params->has_multifd_packet_size) {
> +        dest->multifd_packet_size = params->multifd_packet_size;
> +    }
>      if (params->has_multifd_channels) {
>          dest->multifd_channels = params->multifd_channels;
>      }
> @@ -1344,6 +1379,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
>          colo_checkpoint_delay_set();
>      }
>  
> +    if (params->has_multifd_packet_size) {
> +        s->parameters.multifd_packet_size = params->multifd_packet_size;
> +    }
>      if (params->has_multifd_channels) {
>          s->parameters.multifd_channels = params->multifd_channels;
>      }
> diff --git a/migration/options.h b/migration/options.h
> index 78b9e4080b..b37cffc887 100644
> --- a/migration/options.h
> +++ b/migration/options.h
> @@ -86,6 +86,7 @@ const char *migrate_tls_hostname(void);
>  uint64_t migrate_xbzrle_cache_size(void);
>  ZeroPageDetection migrate_zero_page_detection(void);
>  const strList *migrate_dsa_accel_path(void);
> +uint64_t migrate_multifd_packet_size(void);
>  
>  /* parameters helpers */
>  
> diff --git a/qapi/migration.json b/qapi/migration.json
> index ff41780347..1a9dc5d74c 100644
> --- a/qapi/migration.json
> +++ b/qapi/migration.json
> @@ -839,6 +839,10 @@
>  #     only has effect if the @mapped-ram capability is enabled.
>  #     (Since 9.1)
>  #
> +# @multifd-packet-size: Packet size in bytes used to migrate data.
> +#     The value needs to be a multiple of guest page size.
> +#     The default value is 524288 and max value is 4190208. (Since 9.2)
> +#
>  # Features:
>  #
>  # @unstable: Members @x-checkpoint-delay and
> @@ -864,7 +868,8 @@
>             'vcpu-dirty-limit',
>             'mode',
>             'zero-page-detection',
> -           'direct-io'] }
> +           'direct-io',
> +           'multifd-packet-size'] }
>  
>  ##
>  # @MigrateSetParameters:
> @@ -1020,6 +1025,10 @@
>  #     only has effect if the @mapped-ram capability is enabled.
>  #     (Since 9.1)
>  #
> +# @multifd-packet-size: Packet size in bytes used to migrate data.
> +#     The value needs to be a multiple of guest page size.
> +#     The default value is 524288 and max value is 4190208. (Since 9.2)
> +#
>  # Features:
>  #
>  # @unstable: Members @x-checkpoint-delay and
> @@ -1061,7 +1070,8 @@
>              '*mode': 'MigMode',
>              '*zero-page-detection': 'ZeroPageDetection',
>              '*direct-io': 'bool',
> -            '*dsa-accel-path': ['str'] } }
> +            '*dsa-accel-path': ['str'],
> +            '*multifd-packet-size' : 'uint64'} }
>  
>  ##
>  # @migrate-set-parameters:
> @@ -1231,6 +1241,10 @@
>  #     only has effect if the @mapped-ram capability is enabled.
>  #     (Since 9.1)
>  #
> +# @multifd-packet-size: Packet size in bytes used to migrate data.
> +#     The value needs to be a multiple of guest page size.
> +#     The default value is 524288 and max value is 4190208. (Since 9.2)
> +#
>  # Features:
>  #
>  # @unstable: Members @x-checkpoint-delay and
> @@ -1269,7 +1283,8 @@
>              '*mode': 'MigMode',
>              '*zero-page-detection': 'ZeroPageDetection',
>              '*direct-io': 'bool',
> -            '*dsa-accel-path': ['str'] } }
> +            '*dsa-accel-path': ['str'],
> +            '*multifd-packet-size': 'uint64'} }
>  
>  ##
>  # @query-migrate-parameters:


^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH v5 11/13] migration/multifd: Add migration option set packet size.
  2024-07-17 14:59   ` Fabiano Rosas
@ 2024-08-21 21:16     ` Peter Xu
  0 siblings, 0 replies; 8+ messages in thread
From: Peter Xu @ 2024-08-21 21:16 UTC (permalink / raw)
  To: Fabiano Rosas
  Cc: Yichen Wang, Paolo Bonzini, Marc-André Lureau,
	Daniel P. Berrangé, Thomas Huth, Philippe Mathieu-Daudé,
	Eric Blake, Markus Armbruster, Michael S. Tsirkin, Cornelia Huck,
	qemu-devel, Hao Xiang, Liu, Yuan1, Shivam Kumar,
	Ho-Ren (Jack) Chuang

On Wed, Jul 17, 2024 at 11:59:50AM -0300, Fabiano Rosas wrote:
> Yichen Wang <yichen.wang@bytedance.com> writes:
> 
> > From: Hao Xiang <hao.xiang@linux.dev>
> >
> > During live migration, if the latency between sender and receiver is
> > high and bandwidth is also high (a long and fat pipe), using a bigger
> > packet size can help reduce migration total time. The current multifd
> > packet size is 128 * 4kb. In addition, Intel DSA offloading performs
> > better with a large batch task.
> 
> Last time we measured, mapped-ram also performed slightly better with a
> larger packet size:
> 
>         2 MiB   1 MiB   512 KiB  256 KiB  128 KiB
> AVG(10) 50814   50396     48732    46423    34574
> DEV       736     552       619      473     1430

I wonder whether we could make the new parameter to be pages-per-packet,
rather than in the form of packet-size, just to make our lifes easier for a
possibly static offset[] buffer in the future for the MultiFDPages_t.

With that, we throttle it with MAX_N_PAGES, we can have MultiFDPages_t
statically allocated always with the max buffer. After all, it won't
consume a lot of memory anyway; for MAX_N_PAGES=1K pages it's 8KB per
channel.

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [External] Re: [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path.
  2024-07-17 14:41 ` [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Fabiano Rosas
@ 2024-09-09 23:31   ` Yichen Wang
  0 siblings, 0 replies; 8+ messages in thread
From: Yichen Wang @ 2024-09-09 23:31 UTC (permalink / raw)
  To: Fabiano Rosas
  Cc: Paolo Bonzini, Marc-André Lureau, Daniel P. Berrangé,
	Thomas Huth, Philippe Mathieu-Daudé, Peter Xu, Eric Blake,
	Markus Armbruster, Michael S. Tsirkin, Cornelia Huck, qemu-devel,
	Hao Xiang, Liu, Yuan1, Shivam Kumar, Ho-Ren (Jack) Chuang

On Wed, Jul 17, 2024 at 7:41 AM Fabiano Rosas <farosas@suse.de> wrote:
>
> Yichen Wang <yichen.wang@bytedance.com> writes:
>
> > From: Hao Xiang <hao.xiang@linux.dev>
> >
> > Multifd sender path gets an array of pages queued by the migration
> > thread. It performs zero page checking on every page in the array.
> > The pages are classfied as either a zero page or a normal page. This
> > change uses Intel DSA to offload the zero page checking from CPU to
> > the DSA accelerator. The sender thread submits a batch of pages to DSA
> > hardware and waits for the DSA completion thread to signal for work
> > completion.
> >
> > Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> > Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> > ---
> >  include/qemu/dsa.h            |   4 +-
>
> This patch should have no changes to dsa code. Put them in the patches
> that introduce them.
>
> >  migration/migration.c         |   2 +-
> >  migration/multifd-zero-page.c | 100 ++++++++++++++++++++++++++++++++--
> >  migration/multifd.c           |  43 ++++++++++++++-
> >  migration/multifd.h           |   2 +-
> >  util/dsa.c                    |  23 ++++----
>
> Same with these.
>
> >  6 files changed, 150 insertions(+), 24 deletions(-)
> >
> > diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
> > index fd0305a7c7..a3b502ee41 100644
> > --- a/include/qemu/dsa.h
> > +++ b/include/qemu/dsa.h
> > @@ -83,7 +83,7 @@ typedef struct QemuDsaBatchTask {
> >   *
> >   * @return int Zero if successful, otherwise non zero.
> >   */
> > -int qemu_dsa_init(const char *dsa_parameter, Error **errp);
> > +int qemu_dsa_init(const strList *dsa_parameter, Error **errp);
> >
> >  /**
> >   * @brief Start logic to enable using DSA.
> > @@ -146,7 +146,7 @@ static inline bool qemu_dsa_is_running(void)
> >      return false;
> >  }
> >
> > -static inline int qemu_dsa_init(const char *dsa_parameter, Error **errp)
> > +static inline int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
> >  {
> >      error_setg(errp, "DSA accelerator is not enabled.");
> >      return -1;
> > diff --git a/migration/migration.c b/migration/migration.c
> > index 3dea06d577..085395b900 100644
> > --- a/migration/migration.c
> > +++ b/migration/migration.c
> > @@ -3469,7 +3469,7 @@ static void *migration_thread(void *opaque)
> >      object_ref(OBJECT(s));
> >      update_iteration_initial_status(s);
> >
> > -    if (!multifd_send_setup()) {
> > +    if (!multifd_send_setup(&local_err)) {
>
> This is interesting, probably more correct than what we're doing
> today. But you need to hoist the error handling out of
> multifd_send_setup into here. And put this in a separate patch because
> it is an improvement on its own.
>
> >          goto out;
> >      }
> >
> > diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
>
> The way git generated this diff makes it hard to review. When this
> happens, you can use a different algorithm such as --patience when
> generating the patches. Compare git show vs. git show --patience to see
> the difference.
>

I tried with both --patience and default, it looks the same. So the
code is basically to implement a new multifd_send_zero_page_detect()
which calls zero_page_detect_cpu() or zero_page_detect_dsa() based on
the configuration.

> > index e1b8370f88..ffb5611d44 100644
> > --- a/migration/multifd-zero-page.c
> > +++ b/migration/multifd-zero-page.c
> > @@ -37,25 +37,84 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
> >  }
> >
> >  /**
> > - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> > + * zero_page_detect_cpu: Perform zero page detection using CPU.
> >   *
> >   * Sorts normal pages before zero pages in p->pages->offset and updates
> >   * p->pages->normal_num.
>
> Probably best to carry this part along as well. This is the public
> function that people will most likely look at first.
>
> >   *
> >   * @param p A pointer to the send params.
> >   */
> > -void multifd_send_zero_page_detect(MultiFDSendParams *p)
> > +static void zero_page_detect_cpu(MultiFDSendParams *p)
> > {
> >      MultiFDPages_t *pages = p->pages;
> >      RAMBlock *rb = pages->block;
> >      int i = 0;
> >      int j = pages->num - 1;
> >
> > -    if (!multifd_zero_page_enabled()) {
> > -        pages->normal_num = pages->num;
> > +    /*
> > +     * Sort the page offset array by moving all normal pages to
> > +     * the left and all zero pages to the right of the array.
> > +     */
> > +    while (i <= j) {
> > +        uint64_t offset = pages->offset[i];
> > +
> > +        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> > +            i++;
> > +            continue;
> > +        }
> > +
> > +        swap_page_offset(pages->offset, i, j);
> > +        ram_release_page(rb->idstr, offset);
> > +        j--;
> > +    }
> > +
> > +    pages->normal_num = i;
> > +}
> > +
> > +
> > +#ifdef CONFIG_DSA_OPT
> > +
> > +static void swap_result(bool *results, int a, int b)
> > +{
> > +    bool temp;
> > +
> > +    if (a == b) {
> >          return;
> >      }
> >
> > +    temp = results[a];
> > +    results[a] = results[b];
> > +    results[b] = temp;
> > +}
> > +
> > +/**
> > + * zero_page_detect_dsa: Perform zero page detection using
> > + * Intel Data Streaming Accelerator (DSA).
> > + *
> > + * Sorts normal pages before zero pages in p->pages->offset and updates
> > + * p->pages->normal_num.
> > + *
> > + * @param p A pointer to the send params.
> > + */
> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
> > +{
> > +    MultiFDPages_t *pages = p->pages;
>
> Actually use the pages variable all over instead of dereferencing
> p->pages again.
>
> > +    RAMBlock *rb = pages->block;
> > +    bool *results = p->dsa_batch_task->results;
>
> I think we had a suggestion from Peter to not carry the batch task in
> the channel parameters, no?
>

Yes, I saw that. I followed his idea and used a much more concise data
structure. We will still need to carry the task data structure in
MultiFDSendParams, but that is a pointer to a single data structure.
And I also moved some DSA specific function to their own .c files.
Will post the in my next version.

> > +
> > +    for (int i = 0; i < p->pages->num; i++) {
> > +        p->dsa_batch_task->addr[i] =
> > +            (ram_addr_t)(rb->host + p->pages->offset[i]);
> > +    }
> > +
> > +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
> > +                                  (const void **)p->dsa_batch_task->addr,
> > +                                  p->pages->num,
> > +                                  p->page_size);
> > +
> > +    int i = 0;
> > +    int j = pages->num - 1;
> > +
> >      /*
> >       * Sort the page offset array by moving all normal pages to
> >       * the left and all zero pages to the right of the array.
> > @@ -63,11 +122,12 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
> >      while (i <= j) {
> >          uint64_t offset = pages->offset[i];
> >
> > -        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> > +        if (!results[i]) {
> >              i++;
> >              continue;
> >          }
> >
> > +        swap_result(results, i, j);
> >          swap_page_offset(pages->offset, i, j);
> >          ram_release_page(rb->idstr, offset);
> >          j--;
> > @@ -76,6 +136,15 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
> >      pages->normal_num = i;
> >  }
> >
> > +#else
> > +
> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
> > +{
> > +    exit(1);
>
> g_assert_not_reached();
>
> > +}
> > +
> > +#endif
> > +
> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
> >  {
> >      for (int i = 0; i < p->zero_num; i++) {
> > @@ -87,3 +156,24 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
> >          }
> >      }
> >  }
> > +
> > +/**
> > + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> > + *
> > + * @param p A pointer to the send params.
> > + */
> > +void multifd_send_zero_page_detect(MultiFDSendParams *p)
> > +{
> > +    MultiFDPages_t *pages = p->pages;
> > +
> > +    if (!multifd_zero_page_enabled()) {
> > +        pages->normal_num = pages->num;
> > +        return;
> > +    }
> > +
> > +    if (qemu_dsa_is_running()) {
> > +        zero_page_detect_dsa(p);
> > +    } else {
> > +        zero_page_detect_cpu(p);
> > +    }
> > +}
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index 6f8edd4b6a..014fee757a 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -817,6 +817,32 @@ static void multifd_send_cleanup_state(void)
> >      multifd_send_state = NULL;
> >  }
> >
> > +static bool multifd_dsa_setup(MigrationState *s, const char *role, Error **errp)
>
> You don't need MigrationState here. You can call the function only from
> multifd_send_setup() and use migrate_zero_page_detection() to check for
> DSA.
>

Removed and refactored this part.

> > +{
> > +    /*
> > +     * Only setup DSA when needed. Currently, DSA is only used for zero page
> > +     * detection, which is only needed on sender side.
> > +     */
> > +    if (!s ||
> > +        s->parameters.zero_page_detection != ZERO_PAGE_DETECTION_DSA_ACCEL) {
> > +        return true;
> > +    }
> > +
> > +    const strList *dsa_parameter = migrate_dsa_accel_path();
> > +    if (qemu_dsa_init(dsa_parameter, errp)) {
> > +        error_setg(errp, "multifd: %s failed to initialize DSA.", role);
> > +        return false;
> > +    }
> > +    qemu_dsa_start();
> > +
> > +    return true;
> > +}
> > +
> > +static void multifd_dsa_cleanup(void)
> > +{
> > +    qemu_dsa_cleanup();
> > +}
>
> Hmm, these two functions seem to fit better in multifd-zero-page.c.
>
> > +
> >  void multifd_send_shutdown(void)
> >  {
> >      int i;
> > @@ -827,6 +853,8 @@ void multifd_send_shutdown(void)
> >
> >      multifd_send_terminate_threads();
> >
> > +    multifd_dsa_cleanup();
> > +
> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >          Error *local_err = NULL;
> > @@ -1156,7 +1184,7 @@ static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
> >      return true;
> >  }
> >
> > -bool multifd_send_setup(void)
> > +bool multifd_send_setup(Error **errp)
> >  {
> >      MigrationState *s = migrate_get_current();
> >      Error *local_err = NULL;
>
> Remove this and use errp instead everywhere.
>
> > @@ -1169,6 +1197,10 @@ bool multifd_send_setup(void)
> >          return true;
> >      }
> >
> > +    if (!multifd_dsa_setup(s, "Sender", errp)) {
> > +        return false;
> > +    }
> > +
> >      thread_count = migrate_multifd_channels();
> >      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
> >      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> > @@ -1395,6 +1427,7 @@ void multifd_recv_cleanup(void)
> >              qemu_thread_join(&p->thread);
> >          }
> >      }
> > +    multifd_dsa_cleanup();
> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
> >      }
> > @@ -1570,6 +1603,7 @@ int multifd_recv_setup(Error **errp)
> >      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> >      bool use_packets = multifd_use_packets();
> >      uint8_t i;
> > +    int ret;
> >
> >      /*
> >       * Return successfully if multiFD recv state is already initialised
> > @@ -1579,6 +1613,10 @@ int multifd_recv_setup(Error **errp)
> >          return 0;
> >      }
> >
> > +    if (!multifd_dsa_setup(NULL, "Receiver", errp)) {
> > +        return -1;
> > +    }
>
> Is there a reason to call this here?
>

Removed.

> > +
> >      thread_count = migrate_multifd_channels();
> >      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> >      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> > @@ -1617,13 +1655,12 @@ int multifd_recv_setup(Error **errp)
> >
> >      for (i = 0; i < thread_count; i++) {
> >          MultiFDRecvParams *p = &multifd_recv_state->params[i];
> > -        int ret;
> > -
>
> This is a separate cleanup patch.
>
> >          ret = multifd_recv_state->ops->recv_setup(p, errp);
> >          if (ret) {
> >              return ret;
> >          }
> >      }
> > +
>
> Avoid introducing extra lines for no reason, this leads to git conflicts
> sometimes.
>
> >      return 0;
> >  }
> >
> > diff --git a/migration/multifd.h b/migration/multifd.h
> > index 027f57bf4e..871e3aa063 100644
> > --- a/migration/multifd.h
> > +++ b/migration/multifd.h
> > @@ -18,7 +18,7 @@
> >
> >  typedef struct MultiFDRecvData MultiFDRecvData;
> >
> > -bool multifd_send_setup(void);
> > +bool multifd_send_setup(Error **errp);
> >  void multifd_send_shutdown(void);
> >  void multifd_send_channel_created(void);
> >  int multifd_recv_setup(Error **errp);
> > diff --git a/util/dsa.c b/util/dsa.c
> > index 5aba1ae23a..44b1130a51 100644
> > --- a/util/dsa.c
> > +++ b/util/dsa.c
> > @@ -116,27 +116,27 @@ dsa_device_cleanup(QemuDsaDevice *instance)
> >   */
> >  static int
> >  dsa_device_group_init(QemuDsaDeviceGroup *group,
> > -                      const char *dsa_parameter,
> > +                      const strList *dsa_parameter,
> >                        Error **errp)
> >  {
> > -    if (dsa_parameter == NULL || strlen(dsa_parameter) == 0) {
> > -        return 0;
> > +    if (dsa_parameter == NULL) {
> > +        /* HACKING ALERT. */
> > +        /* return 0; */
> > +        dsa_parameter = &(strList) {
> > +            .value = (char *)"/dev/dsa/wq0.0", .next = NULL
> > +        };
> >      }
> >
> >      int ret = 0;
> > -    char *local_dsa_parameter = g_strdup(dsa_parameter);
> >      const char *dsa_path[MAX_DSA_DEVICES];
> >      int num_dsa_devices = 0;
> > -    char delim[2] = " ";
> >
> > -    char *current_dsa_path = strtok(local_dsa_parameter, delim);
> > -
> > -    while (current_dsa_path != NULL) {
> > -        dsa_path[num_dsa_devices++] = current_dsa_path;
> > +    while (dsa_parameter) {
> > +        dsa_path[num_dsa_devices++] = dsa_parameter->value;
> >          if (num_dsa_devices == MAX_DSA_DEVICES) {
> >              break;
> >          }
> > -        current_dsa_path = strtok(NULL, delim);
> > +        dsa_parameter = dsa_parameter->next;
> >      }
> >
> >      group->dsa_devices =
> > @@ -161,7 +161,6 @@ dsa_device_group_init(QemuDsaDeviceGroup *group,
> >      }
> >
> >  exit:
> > -    g_free(local_dsa_parameter);
> >      return ret;
> >  }
> >
> > @@ -718,7 +717,7 @@ dsa_globals_init(void)
> >   *
> >   * @return int Zero if successful, otherwise non zero.
> >   */
> > -int qemu_dsa_init(const char *dsa_parameter, Error **errp)
> > +int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
> >  {
> >      dsa_globals_init();


^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2024-09-09 23:32 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-07-11 22:04 [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Yichen Wang
2024-07-11 22:04 ` [PATCH v5 11/13] migration/multifd: Add migration option set packet size Yichen Wang
2024-07-17 14:59   ` Fabiano Rosas
2024-08-21 21:16     ` Peter Xu
2024-07-11 22:04 ` [PATCH v5 12/13] util/dsa: Add unit test coverage for Intel DSA task submission and completion Yichen Wang
2024-07-11 22:04 ` [PATCH v5 13/13] migration/multifd: Add integration tests for multifd with Intel DSA offloading Yichen Wang
2024-07-17 14:41 ` [PATCH v5 10/13] migration/multifd: Enable DSA offloading in multifd sender path Fabiano Rosas
2024-09-09 23:31   ` [External] " Yichen Wang

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).