qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Xiao Guangrong <guangrong.xiao@gmail.com>
To: "Michael S. Tsirkin" <mst@redhat.com>
Cc: pbonzini@redhat.com, mtosatti@redhat.com, qemu-devel@nongnu.org,
	kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com,
	jiang.biao2@zte.com.cn, wei.w.wang@intel.com,
	Xiao Guangrong <xiaoguangrong@tencent.com>
Subject: Re: [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer
Date: Fri, 29 Jun 2018 15:30:44 +0800	[thread overview]
Message-ID: <a90a7cd1-3362-fdd6-0caa-148fd5d3acff@gmail.com> (raw)
In-Reply-To: <20180620152407-mutt-send-email-mst@kernel.org>

[-- Attachment #1: Type: text/plain, Size: 3859 bytes --]


Hi Michael,

On 06/20/2018 08:38 PM, Michael S. Tsirkin wrote:
> On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>

>>
>>
>> (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
>> (2) http://dpdk.org/doc/api/rte__ring_8h.html
>>
>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> So instead of all this super-optimized trickiness, how about
> a simple port of ptr_ring from linux?
> 
> That one isn't lockless but it's known to outperform
> most others for a single producer/single consumer case.
> And with a ton of networking going on,
> who said it's such a hot spot? OTOH this implementation
> has more barriers which slows down each individual thread.
> It's also a source of bugs.
> 

Thank you for pointing it out.

I just quickly went through the code of ptr_ring that is very nice and
really impressive. I will consider to port it to QEMU.

> Further, atomic tricks this one uses are not fair so some threads can get
> completely starved while others make progress. There's also no
> chance to mix aggressive polling and sleeping with this
> kind of scheme, so the starved thread will consume lots of
> CPU.
> 
> So I'd like to see a simple ring used, and then a patch on top
> switching to this tricky one with performance comparison
> along with that.
> 

I agree with you, i will make a version that uses a lock for multiple
producers and doing incremental optimizations based on it.

>> ---
>>   migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   1 file changed, 265 insertions(+)
>>   create mode 100644 migration/ring.h
>>
>> diff --git a/migration/ring.h b/migration/ring.h
>> new file mode 100644
>> index 0000000000..da9b8bdcbb
>> --- /dev/null
>> +++ b/migration/ring.h
>> @@ -0,0 +1,265 @@
>> +/*
>> + * Ring Buffer
>> + *
>> + * Multiple producers and single consumer are supported with lock free.
>> + *
>> + * Copyright (c) 2018 Tencent Inc
>> + *
>> + * Authors:
>> + *  Xiao Guangrong <xiaoguangrong@tencent.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef _RING__
>> +#define _RING__
> 
> Prefix Ring is too short.
> 

Okay, will improve it.

>> +    atomic_set(&ring->data[index], NULL);
>> +
>> +    /*
>> +     * (B) smp_mb() is needed as we should read the entry out before
>> +     * updating ring->out as we did in __ring_get().
>> +     *
>> +     * (A) smp_wmb() is needed as we should make the entry be NULL before
>> +     * updating ring->out (which will make the entry be visible and usable).
>> +     */
> 
> I can't say I understand this all.
> And the interaction of acquire/release semantics with smp_*
> barriers is even scarier.
> 

Hmm... the parallel accesses for these two indexes and the data stored
in the ring are subtle indeed. :(

>> +    atomic_store_release(&ring->out, ring->out + 1);
>> +
>> +    return data;
>> +}
>> +
>> +static inline int ring_put(Ring *ring, void *data)
>> +{
>> +    if (ring->flags & RING_MULTI_PRODUCER) {
>> +        return ring_mp_put(ring, data);
>> +    }
>> +    return __ring_put(ring, data);
>> +}
>> +
>> +static inline void *ring_get(Ring *ring)
>> +{
>> +    if (ring->flags & RING_MULTI_PRODUCER) {
>> +        return ring_mp_get(ring);
>> +    }
>> +    return __ring_get(ring);
>> +}
>> +#endif
> 
> 
> A bunch of tricky barriers retries etc all over the place.  This sorely
> needs *a lot of* unit tests. Where are they?

I used the code attached in this mail to test & benchmark the patches during
my development which does not dedicate for Ring, instead it is based
on the framework of compression.

Yes, test cases are useful and really needed, i will do it... :)


[-- Attachment #2: migration-threads-test.c --]
[-- Type: text/x-csrc, Size: 7400 bytes --]

#include "qemu/osdep.h"

#include "libqtest.h"
#include <zlib.h>

#include "qemu/osdep.h"
#include <zlib.h>
#include "qemu/cutils.h"
#include "qemu/bitops.h"
#include "qemu/bitmap.h"
#include "qemu/main-loop.h"
#include "migration/ram.h"
#include "migration/migration.h"
#include "migration/register.h"
#include "migration/misc.h"
#include "migration/page_cache.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
#include "qapi/qapi-events-migration.h"
#include "qapi/qmp/qerror.h"
#include "trace.h"
//#include "exec/ram_addr.h"
#include "exec/target_page.h"
#include "qemu/rcu_queue.h"
#include "migration/colo.h"
#include "migration/block.h"
#include "migration/threads.h"

#include "migration/qemu-file.h"
#include "migration/threads.h"

CompressionStats compression_counters;

#define PAGE_SIZE 4096
#define PAGE_MASK ~(PAGE_SIZE - 1)

static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
                                   int64_t pos)
{
    int i, size = 0;

    for (i = 0; i < iovcnt; i++) {
        size += iov[i].iov_len;
    }
    return size;
}

static int test_fclose(void *opaque)
{
    return 0;
}

static const QEMUFileOps test_write_ops = {
    .writev_buffer  = test_writev_buffer,
    .close          = test_fclose
};

QEMUFile *dest_file;

static const QEMUFileOps empty_ops = { };

static int do_compress_ram_page(QEMUFile *f, z_stream *stream, uint8_t *ram_addr,
                                ram_addr_t offset, uint8_t *source_buf)
{
    int bytes_sent = 0, blen;
    uint8_t *p = ram_addr;

    /*
     * copy it to a internal buffer to avoid it being modified by VM
     * so that we can catch up the error during compression and
     * decompression
     */
    memcpy(source_buf, p, PAGE_SIZE);
    blen = qemu_put_compression_data(f, stream, source_buf, PAGE_SIZE);
    if (blen < 0) {
        bytes_sent = 0;
        qemu_file_set_error(dest_file, blen);
        error_report("compressed data failed!");
    } else {
        printf("Compressed size %d.\n", blen);
        bytes_sent += blen;
    }

    return bytes_sent;
}

struct CompressData {
    /* filled by migration thread.*/
    uint8_t *ram_addr;
    ram_addr_t offset;

    /* filled by compress thread. */
    QEMUFile *file;
    z_stream stream;
    uint8_t *originbuf;

    ThreadRequest data;
};
typedef struct CompressData CompressData;

static ThreadRequest *compress_thread_data_init(void)
{
    CompressData *cd = g_new0(CompressData, 1);

    cd->originbuf = g_try_malloc(PAGE_SIZE);
    if (!cd->originbuf) {
        goto exit;
    }

    if (deflateInit(&cd->stream, 1) != Z_OK) {
        g_free(cd->originbuf);
        goto exit;
    }

    cd->file = qemu_fopen_ops(NULL, &empty_ops);
    return &cd->data;

exit:
    g_free(cd);
    return NULL;
}

static void compress_thread_data_fini(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);

    qemu_fclose(cd->file);
    deflateEnd(&cd->stream);
    g_free(cd->originbuf);
    g_free(cd);
}

static void compress_thread_data_handler(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);

    /*
     * if compression fails, it will indicate by
     * migrate_get_current()->to_dst_file.
     */
    do_compress_ram_page(cd->file, &cd->stream, cd->ram_addr, cd->offset,
                         cd->originbuf);
}

static void compress_thread_data_done(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);
    int bytes_xmit;

    bytes_xmit = qemu_put_qemu_file(dest_file, cd->file);
    /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
    compression_counters.reduced_size += 4096 - bytes_xmit + 8;
    compression_counters.pages++;
}

static Threads *compress_threads;

static void flush_compressed_data(void)
{
    threads_wait_done(compress_threads);
}

static void compress_threads_save_cleanup(void)
{
    if (!compress_threads) {
        return;
    }

    threads_destroy(compress_threads);
    compress_threads = NULL;
    qemu_fclose(dest_file);
    dest_file = NULL;
}

static int compress_threads_save_setup(void)
{
    dest_file = qemu_fopen_ops(NULL, &test_write_ops);
    compress_threads = threads_create(16,
                                      "compress",
                                      compress_thread_data_init,
                                      compress_thread_data_fini,
                                      compress_thread_data_handler,
                                      compress_thread_data_done);
    assert(compress_threads);
    return 0;
}

static int compress_page_with_multi_thread(uint8_t *addr)
{
    CompressData *cd;
    ThreadRequest *thread_data;
    thread_data = threads_submit_request_prepare(compress_threads);
    if (!thread_data) {
        compression_counters.busy++;
        return -1;
    }

    cd = container_of(thread_data, CompressData, data);
    cd->ram_addr = addr;
    threads_submit_request_commit(compress_threads, thread_data);
    return 1;
}

#define MEM_SIZE (30ULL << 30)
#define COUNT    5 

static void run(void)
{
    void *mem = qemu_memalign(PAGE_SIZE, MEM_SIZE);
    uint8_t *ptr = mem, *end = mem + MEM_SIZE;
    uint64_t start_time, total_time = 0, spend, total_busy = 0;
    int i;

    memset(mem, 0, MEM_SIZE);

    start_time = g_get_monotonic_time();
    for (i = 0; i < COUNT; i++) {
        ptr = mem;
	start_time = g_get_monotonic_time();
        while (ptr < end) {
            *ptr = 0x10;
            compress_page_with_multi_thread(ptr);
            ptr += PAGE_SIZE;
        }
        flush_compressed_data();
	spend = g_get_monotonic_time() - start_time;
	total_time += spend;
	printf("RUN %d: BUSY %ld Time Cost %ld.\n", i, compression_counters.busy, spend);
	total_busy += compression_counters.busy;
	compression_counters.busy = 0;
    }

    printf("AVG: BUSY %ld Time Cost %ld.\n", total_busy / COUNT, total_time / COUNT);
}

static void compare_zero_and_compression(void)
{
    ThreadRequest *data = compress_thread_data_init();
    CompressData *cd;
    uint64_t start_time, zero_time, compress_time;
    char page[PAGE_SIZE];

    if (!data) {
        printf("Init compression failed.\n");
        return;
    }

    cd = container_of(data, CompressData, data);
    cd->ram_addr = (uint8_t *)page;

    memset(page, 0, sizeof(page));
    dest_file = qemu_fopen_ops(NULL, &test_write_ops);

    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
    buffer_is_zero(page, PAGE_SIZE);
    zero_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;

    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
    compress_thread_data_handler(data);
    compress_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;

    printf("Zero %ld ns Compression: %ld ns.\n", zero_time, compress_time);
    compress_thread_data_fini(data);

}

static void migration_threads(void)
{
    int i;

    printf("Zero Test vs. compression.\n");
    for (i = 0; i < 10; i++) {
        compare_zero_and_compression();
    }

    printf("test migration threads.\n");
    compress_threads_save_setup();
    run();
    compress_threads_save_cleanup();
}

int main(int argc, char **argv)
{
    QTestState *s = NULL;
    int ret;

    g_test_init(&argc, &argv, NULL);

    qtest_add_func("/migration/threads", migration_threads);
    ret = g_test_run();

    if (s) {
        qtest_quit(s);
    }

    return ret;
}


  reply	other threads:[~2018-06-29  7:30 UTC|newest]

Thread overview: 78+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-06-04  9:55 [Qemu-devel] [PATCH 00/12] migration: improve multithreads for compression and decompression guangrong.xiao
2018-06-04  9:55 ` [Qemu-devel] [PATCH 01/12] migration: do not wait if no free thread guangrong.xiao
2018-06-11  7:39   ` Peter Xu
2018-06-12  2:42     ` Xiao Guangrong
2018-06-12  3:15       ` Peter Xu
2018-06-13 15:43         ` Dr. David Alan Gilbert
2018-06-14  3:19           ` Xiao Guangrong
2018-06-04  9:55 ` [Qemu-devel] [PATCH 02/12] migration: fix counting normal page for compression guangrong.xiao
2018-06-13 15:51   ` Dr. David Alan Gilbert
2018-06-14  3:32     ` Xiao Guangrong
2018-06-04  9:55 ` [Qemu-devel] [PATCH 03/12] migration: fix counting xbzrle cache_miss_rate guangrong.xiao
2018-06-13 16:09   ` Dr. David Alan Gilbert
2018-06-15 11:30   ` Dr. David Alan Gilbert
2018-06-04  9:55 ` [Qemu-devel] [PATCH 04/12] migration: introduce migration_update_rates guangrong.xiao
2018-06-13 16:17   ` Dr. David Alan Gilbert
2018-06-14  3:35     ` Xiao Guangrong
2018-06-15 11:32     ` Dr. David Alan Gilbert
2018-06-04  9:55 ` [Qemu-devel] [PATCH 05/12] migration: show the statistics of compression guangrong.xiao
2018-06-04 22:31   ` Eric Blake
2018-06-06 12:44     ` Xiao Guangrong
2018-06-13 16:25   ` Dr. David Alan Gilbert
2018-06-14  6:48     ` Xiao Guangrong
2018-07-16 19:01       ` Dr. David Alan Gilbert
2018-07-18  8:51         ` Xiao Guangrong
2018-06-04  9:55 ` [Qemu-devel] [PATCH 06/12] migration: do not detect zero page for compression guangrong.xiao
2018-06-19  7:30   ` Peter Xu
2018-06-28  9:12     ` Xiao Guangrong
2018-06-28  9:36       ` Daniel P. Berrangé
2018-06-29  3:50         ` Xiao Guangrong
2018-06-29  9:54         ` Dr. David Alan Gilbert
2018-06-29  9:42       ` Dr. David Alan Gilbert
2018-07-03  3:53         ` Xiao Guangrong
2018-07-16 18:58           ` Dr. David Alan Gilbert
2018-07-18  8:46             ` Xiao Guangrong
2018-07-22 16:05               ` Michael S. Tsirkin
2018-07-23  7:12                 ` Xiao Guangrong
2018-06-04  9:55 ` [Qemu-devel] [PATCH 07/12] migration: hold the lock only if it is really needed guangrong.xiao
2018-06-19  7:36   ` Peter Xu
2018-06-28  9:33     ` Xiao Guangrong
2018-06-29 11:22       ` Dr. David Alan Gilbert
2018-07-03  6:27         ` Xiao Guangrong
2018-07-11  8:21       ` Peter Xu
2018-07-12  7:47         ` Xiao Guangrong
2018-07-12  8:26           ` Peter Xu
2018-07-18  8:56             ` Xiao Guangrong
2018-07-18 10:18               ` Peter Xu
2018-07-13 17:44           ` Dr. David Alan Gilbert
2018-06-04  9:55 ` [Qemu-devel] [PATCH 08/12] migration: do not flush_compressed_data at the end of each iteration guangrong.xiao
2018-07-13 18:01   ` Dr. David Alan Gilbert
2018-07-18  8:44     ` Xiao Guangrong
2018-06-04  9:55 ` [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer guangrong.xiao
2018-06-20  4:52   ` Peter Xu
2018-06-28 10:02     ` Xiao Guangrong
2018-06-28 11:55       ` Wei Wang
2018-06-29  3:55         ` Xiao Guangrong
2018-07-03 15:55           ` Paul E. McKenney
2018-06-20  5:55   ` Peter Xu
2018-06-28 14:00     ` Xiao Guangrong
2018-06-20 12:38   ` Michael S. Tsirkin
2018-06-29  7:30     ` Xiao Guangrong [this message]
2018-06-29 13:08       ` Michael S. Tsirkin
2018-07-03  7:31         ` Xiao Guangrong
2018-06-28 13:36   ` Jason Wang
2018-06-29  3:59     ` Xiao Guangrong
2018-06-29  6:15       ` Jason Wang
2018-06-29  7:47         ` Xiao Guangrong
2018-06-29  4:23     ` Michael S. Tsirkin
2018-06-29  7:44       ` Xiao Guangrong
2018-06-04  9:55 ` [Qemu-devel] [PATCH 10/12] migration: introduce lockless multithreads model guangrong.xiao
2018-06-20  6:52   ` Peter Xu
2018-06-28 14:25     ` Xiao Guangrong
2018-07-13 16:24     ` Dr. David Alan Gilbert
2018-07-18  7:12       ` Xiao Guangrong
2018-06-04  9:55 ` [Qemu-devel] [PATCH 11/12] migration: use lockless Multithread model for compression guangrong.xiao
2018-06-04  9:55 ` [Qemu-devel] [PATCH 12/12] migration: use lockless Multithread model for decompression guangrong.xiao
2018-06-11  8:00 ` [Qemu-devel] [PATCH 00/12] migration: improve multithreads for compression and decompression Peter Xu
2018-06-12  3:19   ` Xiao Guangrong
2018-06-12  5:36     ` Peter Xu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=a90a7cd1-3362-fdd6-0caa-148fd5d3acff@gmail.com \
    --to=guangrong.xiao@gmail.com \
    --cc=dgilbert@redhat.com \
    --cc=jiang.biao2@zte.com.cn \
    --cc=kvm@vger.kernel.org \
    --cc=mst@redhat.com \
    --cc=mtosatti@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=wei.w.wang@intel.com \
    --cc=xiaoguangrong@tencent.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).