qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Fabiano Rosas <farosas@suse.de>
To: Hao Xiang <hao.xiang@bytedance.com>,
	peter.maydell@linaro.org, quintela@redhat.com, peterx@redhat.com,
	marcandre.lureau@redhat.com, bryan.zhang@bytedance.com,
	qemu-devel@nongnu.org
Cc: Hao Xiang <hao.xiang@bytedance.com>
Subject: Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchronous completion thread model.
Date: Tue, 12 Dec 2023 16:36:13 -0300	[thread overview]
Message-ID: <87fs072oya.fsf@suse.de> (raw)
In-Reply-To: <20231114054032.1192027-10-hao.xiang@bytedance.com>

Hao Xiang <hao.xiang@bytedance.com> writes:

> * Create a dedicated thread for DSA task completion.
> * DSA completion thread runs a loop and poll for completed tasks.
> * Start and stop DSA completion thread during DSA device start stop.
>
> User space application can directly submit task to Intel DSA
> accelerator by writing to DSA's device memory (mapped in user space).
> Once a task is submitted, the device starts processing it and write
> the completion status back to the task. A user space application can
> poll the task's completion status to check for completion. This change
> uses a dedicated thread to perform DSA task completion checking.
>
> Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
> ---
>  util/dsa.c | 243 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 242 insertions(+), 1 deletion(-)
>
> diff --git a/util/dsa.c b/util/dsa.c
> index f82282ce99..0e68013ffb 100644
> --- a/util/dsa.c
> +++ b/util/dsa.c
> @@ -44,6 +44,7 @@
>  
>  #define DSA_WQ_SIZE 4096
>  #define MAX_DSA_DEVICES 16
> +#define DSA_COMPLETION_THREAD "dsa_completion"
>  
>  typedef QSIMPLEQ_HEAD(dsa_task_queue, buffer_zero_batch_task) dsa_task_queue;
>  
> @@ -61,8 +62,18 @@ struct dsa_device_group {
>      dsa_task_queue task_queue;
>  };
>  
> +struct dsa_completion_thread {
> +    bool stopping;
> +    bool running;
> +    QemuThread thread;
> +    int thread_id;
> +    QemuSemaphore sem_init_done;
> +    struct dsa_device_group *group;
> +};
> +
>  uint64_t max_retry_count;
>  static struct dsa_device_group dsa_group;
> +static struct dsa_completion_thread completion_thread;
>  
>  
>  /**
> @@ -439,6 +450,234 @@ submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
>      return dsa_task_enqueue(device_group, batch_task);
>  }
>  
> +/**
> + * @brief Poll for the DSA work item completion.
> + *
> + * @param completion A pointer to the DSA work item completion record.
> + * @param opcode The DSA opcode.
> + *
> + * @return Zero if successful, non-zero otherwise.
> + */
> +static int
> +poll_completion(struct dsa_completion_record *completion,
> +                enum dsa_opcode opcode)
> +{
> +    uint8_t status;
> +    uint64_t retry = 0;
> +
> +    while (true) {
> +        // The DSA operation completes successfully or fails.
> +        status = completion->status;
> +        if (status == DSA_COMP_SUCCESS ||

Should we read directly from completion->status or is the compiler smart
enough to not optimize 'status' out?

> +            status == DSA_COMP_PAGE_FAULT_NOBOF ||
> +            status == DSA_COMP_BATCH_PAGE_FAULT ||
> +            status == DSA_COMP_BATCH_FAIL) {
> +            break;
> +        } else if (status != DSA_COMP_NONE) {
> +            /* TODO: Error handling here on unexpected failure. */

Let's make sure this is dealt with before merging.

> +            fprintf(stderr, "DSA opcode %d failed with status = %d.\n",
> +                    opcode, status);
> +            exit(1);

return instead of exiting.

> +        }
> +        retry++;
> +        if (retry > max_retry_count) {
> +            fprintf(stderr, "Wait for completion retry %lu times.\n", retry);
> +            exit(1);

same here

> +        }
> +        _mm_pause();
> +    }
> +
> +    return 0;
> +}
> +
> +/**
> + * @brief Complete a single DSA task in the batch task.
> + *
> + * @param task A pointer to the batch task structure.
> + */
> +static void
> +poll_task_completion(struct buffer_zero_batch_task *task)
> +{
> +    assert(task->task_type == DSA_TASK);
> +
> +    struct dsa_completion_record *completion = &task->completions[0];
> +    uint8_t status;
> +
> +    poll_completion(completion, task->descriptors[0].opcode);
> +
> +    status = completion->status;
> +    if (status == DSA_COMP_SUCCESS) {
> +        task->results[0] = (completion->result == 0);
> +        return;
> +    }
> +
> +    assert(status == DSA_COMP_PAGE_FAULT_NOBOF);
> +}
> +
> +/**
> + * @brief Poll a batch task status until it completes. If DSA task doesn't
> + *        complete properly, use CPU to complete the task.
> + *
> + * @param batch_task A pointer to the DSA batch task.
> + */
> +static void
> +poll_batch_task_completion(struct buffer_zero_batch_task *batch_task)
> +{
> +    struct dsa_completion_record *batch_completion = &batch_task->batch_completion;
> +    struct dsa_completion_record *completion;
> +    uint8_t batch_status;
> +    uint8_t status;
> +    bool *results = batch_task->results;
> +    uint32_t count = batch_task->batch_descriptor.desc_count;
> +
> +    poll_completion(batch_completion,
> +                    batch_task->batch_descriptor.opcode);
> +
> +    batch_status = batch_completion->status;
> +
> +    if (batch_status == DSA_COMP_SUCCESS) {
> +        if (batch_completion->bytes_completed == count) {
> +            // Let's skip checking for each descriptors' completion status
> +            // if the batch descriptor says all succedded.
> +            for (int i = 0; i < count; i++) {
> +                assert(batch_task->completions[i].status == DSA_COMP_SUCCESS);
> +                results[i] = (batch_task->completions[i].result == 0);
> +            }
> +            return;
> +        }
> +    } else {
> +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
> +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
> +    }
> +
> +    for (int i = 0; i < count; i++) {
> +

extra whitespace

> +        completion = &batch_task->completions[i];
> +        status = completion->status;
> +
> +        if (status == DSA_COMP_SUCCESS) {
> +            results[i] = (completion->result == 0);
> +            continue;
> +        }
> +
> +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> +            fprintf(stderr,
> +                    "Unexpected completion status = %u.\n", status);
> +            assert(false);

return here

> +        }
> +    }
> +}
> +
> +/**
> + * @brief Handles an asynchronous DSA batch task completion.
> + *
> + * @param task A pointer to the batch buffer zero task structure.
> + */
> +static void
> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> +{
> +    batch_task->status = DSA_TASK_COMPLETION;
> +    batch_task->completion_callback(batch_task);
> +}
> +
> +/**
> + * @brief The function entry point called by a dedicated DSA
> + *        work item completion thread.
> + *
> + * @param opaque A pointer to the thread context.
> + *
> + * @return void* Not used.
> + */
> +static void *
> +dsa_completion_loop(void *opaque)
> +{
> +    struct dsa_completion_thread *thread_context =
> +        (struct dsa_completion_thread *)opaque;
> +    struct buffer_zero_batch_task *batch_task;
> +    struct dsa_device_group *group = thread_context->group;
> +
> +    rcu_register_thread();
> +
> +    thread_context->thread_id = qemu_get_thread_id();
> +    qemu_sem_post(&thread_context->sem_init_done);
> +
> +    while (thread_context->running) {
> +        batch_task = dsa_task_dequeue(group);
> +        assert(batch_task != NULL || !group->running);
> +        if (!group->running) {
> +            assert(!thread_context->running);

This is racy if the compiler reorders "thread_context->running = false"
and "group->running = false". I'd put this under the task_queue_lock or
add a compiler barrier at dsa_completion_thread_stop().

> +            break;
> +        }
> +        if (batch_task->task_type == DSA_TASK) {
> +            poll_task_completion(batch_task);
> +        } else {
> +            assert(batch_task->task_type == DSA_BATCH_TASK);
> +            poll_batch_task_completion(batch_task);
> +        }
> +
> +        dsa_batch_task_complete(batch_task);
> +    }
> +
> +    rcu_unregister_thread();
> +    return NULL;
> +}
> +
> +/**
> + * @brief Initializes a DSA completion thread.
> + *
> + * @param completion_thread A pointer to the completion thread context.
> + * @param group A pointer to the DSA device group.
> + */
> +static void
> +dsa_completion_thread_init(
> +    struct dsa_completion_thread *completion_thread,
> +    struct dsa_device_group *group)
> +{
> +    completion_thread->stopping = false;
> +    completion_thread->running = true;
> +    completion_thread->thread_id = -1;
> +    qemu_sem_init(&completion_thread->sem_init_done, 0);
> +    completion_thread->group = group;
> +
> +    qemu_thread_create(&completion_thread->thread,
> +                       DSA_COMPLETION_THREAD,
> +                       dsa_completion_loop,
> +                       completion_thread,
> +                       QEMU_THREAD_JOINABLE);
> +
> +    /* Wait for initialization to complete */
> +    while (completion_thread->thread_id == -1) {
> +        qemu_sem_wait(&completion_thread->sem_init_done);
> +    }

This is racy, the thread can set 'thread_id' before this enters the loop
and the semaphore will be left unmatched. Not a huge deal but it might
cause confusion when debugging the initialization.

> +}
> +
> +/**
> + * @brief Stops the completion thread (and implicitly, the device group).
> + *
> + * @param opaque A pointer to the completion thread.
> + */
> +static void dsa_completion_thread_stop(void *opaque)
> +{
> +    struct dsa_completion_thread *thread_context =
> +        (struct dsa_completion_thread *)opaque;
> +
> +    struct dsa_device_group *group = thread_context->group;
> +
> +    qemu_mutex_lock(&group->task_queue_lock);
> +
> +    thread_context->stopping = true;
> +    thread_context->running = false;
> +
> +    dsa_device_group_stop(group);
> +
> +    qemu_cond_signal(&group->task_queue_cond);
> +    qemu_mutex_unlock(&group->task_queue_lock);
> +
> +    qemu_thread_join(&thread_context->thread);
> +
> +    qemu_sem_destroy(&thread_context->sem_init_done);
> +}
> +
>  /**
>   * @brief Check if DSA is running.
>   *
> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
>   */
>  bool dsa_is_running(void)
>  {
> -    return false;
> +    return completion_thread.running;
>  }
>  
>  static void
> @@ -481,6 +720,7 @@ void dsa_start(void)
>          return;
>      }
>      dsa_device_group_start(&dsa_group);
> +    dsa_completion_thread_init(&completion_thread, &dsa_group);
>  }
>  
>  /**
> @@ -496,6 +736,7 @@ void dsa_stop(void)
>          return;
>      }
>  
> +    dsa_completion_thread_stop(&completion_thread);
>      dsa_empty_task_queue(group);
>  }


  reply	other threads:[~2023-12-12 19:36 UTC|newest]

Thread overview: 51+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-11-14  5:40 [PATCH v2 00/20] Use Intel DSA accelerator to offload zero page checking in multifd live migration Hao Xiang
2023-11-14  5:40 ` [PATCH v2 01/20] multifd: Add capability to enable/disable zero_page Hao Xiang
2023-11-16 15:15   ` Fabiano Rosas
2023-11-14  5:40 ` [PATCH v2 02/20] multifd: Support for zero pages transmission Hao Xiang
2023-11-14  5:40 ` [PATCH v2 03/20] multifd: Zero " Hao Xiang
2023-12-18  2:43   ` Wang, Lei
2023-11-14  5:40 ` [PATCH v2 04/20] So we use multifd to transmit zero pages Hao Xiang
2023-11-16 15:14   ` Fabiano Rosas
2024-01-23  4:28     ` [External] " Hao Xiang
2024-01-25 21:55       ` Hao Xiang
2024-01-25 23:14         ` Fabiano Rosas
2024-01-25 23:46           ` Hao Xiang
2023-11-14  5:40 ` [PATCH v2 05/20] meson: Introduce new instruction set enqcmd to the build system Hao Xiang
2023-12-11 15:41   ` Fabiano Rosas
2023-12-16  0:26     ` [External] " Hao Xiang
2023-11-14  5:40 ` [PATCH v2 06/20] util/dsa: Add dependency idxd Hao Xiang
2023-11-14  5:40 ` [PATCH v2 07/20] util/dsa: Implement DSA device start and stop logic Hao Xiang
2023-12-11 21:28   ` Fabiano Rosas
2023-12-19  6:41     ` [External] " Hao Xiang
2023-12-19 13:18       ` Fabiano Rosas
2023-12-27  6:00         ` Hao Xiang
2023-11-14  5:40 ` [PATCH v2 08/20] util/dsa: Implement DSA task enqueue and dequeue Hao Xiang
2023-12-12 16:10   ` Fabiano Rosas
2023-12-27  0:07     ` [External] " Hao Xiang
2023-11-14  5:40 ` [PATCH v2 09/20] util/dsa: Implement DSA task asynchronous completion thread model Hao Xiang
2023-12-12 19:36   ` Fabiano Rosas [this message]
2023-12-18  3:11   ` Wang, Lei
2023-12-18 18:57     ` [External] " Hao Xiang
2023-12-19  1:33       ` Wang, Lei
2023-12-19  5:12         ` Hao Xiang
2023-11-14  5:40 ` [PATCH v2 10/20] util/dsa: Implement zero page checking in DSA task Hao Xiang
2023-11-14  5:40 ` [PATCH v2 11/20] util/dsa: Implement DSA task asynchronous submission and wait for completion Hao Xiang
2023-12-13 14:01   ` Fabiano Rosas
2023-12-27  6:26     ` [External] " Hao Xiang
2023-11-14  5:40 ` [PATCH v2 12/20] migration/multifd: Add new migration option for multifd DSA offloading Hao Xiang
2023-12-11 19:44   ` Fabiano Rosas
2023-12-18 18:34     ` [External] " Hao Xiang
2023-12-18  3:12   ` Wang, Lei
2023-11-14  5:40 ` [PATCH v2 13/20] migration/multifd: Prepare to introduce DSA acceleration on the multifd path Hao Xiang
2023-12-18  3:20   ` Wang, Lei
2023-11-14  5:40 ` [PATCH v2 14/20] migration/multifd: Enable DSA offloading in multifd sender path Hao Xiang
2023-11-14  5:40 ` [PATCH v2 15/20] migration/multifd: Add test hook to set normal page ratio Hao Xiang
2023-11-14  5:40 ` [PATCH v2 16/20] migration/multifd: Enable set normal page ratio test hook in multifd Hao Xiang
2023-11-14  5:40 ` [PATCH v2 17/20] migration/multifd: Add migration option set packet size Hao Xiang
2023-11-14  5:40 ` [PATCH v2 18/20] migration/multifd: Enable set packet size migration option Hao Xiang
2023-12-13 17:33   ` Fabiano Rosas
2024-01-03 20:04     ` [External] " Hao Xiang
2023-11-14  5:40 ` [PATCH v2 19/20] util/dsa: Add unit test coverage for Intel DSA task submission and completion Hao Xiang
2023-11-14  5:40 ` [PATCH v2 20/20] migration/multifd: Add integration tests for multifd with Intel DSA offloading Hao Xiang
2023-11-15 17:43 ` [PATCH v2 00/20] Use Intel DSA accelerator to offload zero page checking in multifd live migration Elena Ufimtseva
2023-11-15 19:37   ` [External] " Hao Xiang

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=87fs072oya.fsf@suse.de \
    --to=farosas@suse.de \
    --cc=bryan.zhang@bytedance.com \
    --cc=hao.xiang@bytedance.com \
    --cc=marcandre.lureau@redhat.com \
    --cc=peter.maydell@linaro.org \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).