From: Xiao Guangrong <guangrong.xiao@gmail.com>
To: "Michael S. Tsirkin" <mst@redhat.com>
Cc: pbonzini@redhat.com, mtosatti@redhat.com, qemu-devel@nongnu.org,
kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com,
jiang.biao2@zte.com.cn, wei.w.wang@intel.com,
Xiao Guangrong <xiaoguangrong@tencent.com>
Subject: Re: [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer
Date: Fri, 29 Jun 2018 15:30:44 +0800 [thread overview]
Message-ID: <a90a7cd1-3362-fdd6-0caa-148fd5d3acff@gmail.com> (raw)
In-Reply-To: <20180620152407-mutt-send-email-mst@kernel.org>
[-- Attachment #1: Type: text/plain, Size: 3859 bytes --]
Hi Michael,
On 06/20/2018 08:38 PM, Michael S. Tsirkin wrote:
> On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>>
>> (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
>> (2) http://dpdk.org/doc/api/rte__ring_8h.html
>>
>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
>
> So instead of all this super-optimized trickiness, how about
> a simple port of ptr_ring from linux?
>
> That one isn't lockless but it's known to outperform
> most others for a single producer/single consumer case.
> And with a ton of networking going on,
> who said it's such a hot spot? OTOH this implementation
> has more barriers which slows down each individual thread.
> It's also a source of bugs.
>
Thank you for pointing it out.
I just quickly went through the code of ptr_ring that is very nice and
really impressive. I will consider to port it to QEMU.
> Further, atomic tricks this one uses are not fair so some threads can get
> completely starved while others make progress. There's also no
> chance to mix aggressive polling and sleeping with this
> kind of scheme, so the starved thread will consume lots of
> CPU.
>
> So I'd like to see a simple ring used, and then a patch on top
> switching to this tricky one with performance comparison
> along with that.
>
I agree with you, i will make a version that uses a lock for multiple
producers and doing incremental optimizations based on it.
>> ---
>> migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>> 1 file changed, 265 insertions(+)
>> create mode 100644 migration/ring.h
>>
>> diff --git a/migration/ring.h b/migration/ring.h
>> new file mode 100644
>> index 0000000000..da9b8bdcbb
>> --- /dev/null
>> +++ b/migration/ring.h
>> @@ -0,0 +1,265 @@
>> +/*
>> + * Ring Buffer
>> + *
>> + * Multiple producers and single consumer are supported with lock free.
>> + *
>> + * Copyright (c) 2018 Tencent Inc
>> + *
>> + * Authors:
>> + * Xiao Guangrong <xiaoguangrong@tencent.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef _RING__
>> +#define _RING__
>
> Prefix Ring is too short.
>
Okay, will improve it.
>> + atomic_set(&ring->data[index], NULL);
>> +
>> + /*
>> + * (B) smp_mb() is needed as we should read the entry out before
>> + * updating ring->out as we did in __ring_get().
>> + *
>> + * (A) smp_wmb() is needed as we should make the entry be NULL before
>> + * updating ring->out (which will make the entry be visible and usable).
>> + */
>
> I can't say I understand this all.
> And the interaction of acquire/release semantics with smp_*
> barriers is even scarier.
>
Hmm... the parallel accesses for these two indexes and the data stored
in the ring are subtle indeed. :(
>> + atomic_store_release(&ring->out, ring->out + 1);
>> +
>> + return data;
>> +}
>> +
>> +static inline int ring_put(Ring *ring, void *data)
>> +{
>> + if (ring->flags & RING_MULTI_PRODUCER) {
>> + return ring_mp_put(ring, data);
>> + }
>> + return __ring_put(ring, data);
>> +}
>> +
>> +static inline void *ring_get(Ring *ring)
>> +{
>> + if (ring->flags & RING_MULTI_PRODUCER) {
>> + return ring_mp_get(ring);
>> + }
>> + return __ring_get(ring);
>> +}
>> +#endif
>
>
> A bunch of tricky barriers retries etc all over the place. This sorely
> needs *a lot of* unit tests. Where are they?
I used the code attached in this mail to test & benchmark the patches during
my development which does not dedicate for Ring, instead it is based
on the framework of compression.
Yes, test cases are useful and really needed, i will do it... :)
[-- Attachment #2: migration-threads-test.c --]
[-- Type: text/x-csrc, Size: 7400 bytes --]
#include "qemu/osdep.h"
#include "libqtest.h"
#include <zlib.h>
#include "qemu/osdep.h"
#include <zlib.h>
#include "qemu/cutils.h"
#include "qemu/bitops.h"
#include "qemu/bitmap.h"
#include "qemu/main-loop.h"
#include "migration/ram.h"
#include "migration/migration.h"
#include "migration/register.h"
#include "migration/misc.h"
#include "migration/page_cache.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
#include "qapi/qapi-events-migration.h"
#include "qapi/qmp/qerror.h"
#include "trace.h"
//#include "exec/ram_addr.h"
#include "exec/target_page.h"
#include "qemu/rcu_queue.h"
#include "migration/colo.h"
#include "migration/block.h"
#include "migration/threads.h"
#include "migration/qemu-file.h"
#include "migration/threads.h"
CompressionStats compression_counters;
#define PAGE_SIZE 4096
#define PAGE_MASK ~(PAGE_SIZE - 1)
static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
int64_t pos)
{
int i, size = 0;
for (i = 0; i < iovcnt; i++) {
size += iov[i].iov_len;
}
return size;
}
static int test_fclose(void *opaque)
{
return 0;
}
static const QEMUFileOps test_write_ops = {
.writev_buffer = test_writev_buffer,
.close = test_fclose
};
QEMUFile *dest_file;
static const QEMUFileOps empty_ops = { };
static int do_compress_ram_page(QEMUFile *f, z_stream *stream, uint8_t *ram_addr,
ram_addr_t offset, uint8_t *source_buf)
{
int bytes_sent = 0, blen;
uint8_t *p = ram_addr;
/*
* copy it to a internal buffer to avoid it being modified by VM
* so that we can catch up the error during compression and
* decompression
*/
memcpy(source_buf, p, PAGE_SIZE);
blen = qemu_put_compression_data(f, stream, source_buf, PAGE_SIZE);
if (blen < 0) {
bytes_sent = 0;
qemu_file_set_error(dest_file, blen);
error_report("compressed data failed!");
} else {
printf("Compressed size %d.\n", blen);
bytes_sent += blen;
}
return bytes_sent;
}
struct CompressData {
/* filled by migration thread.*/
uint8_t *ram_addr;
ram_addr_t offset;
/* filled by compress thread. */
QEMUFile *file;
z_stream stream;
uint8_t *originbuf;
ThreadRequest data;
};
typedef struct CompressData CompressData;
static ThreadRequest *compress_thread_data_init(void)
{
CompressData *cd = g_new0(CompressData, 1);
cd->originbuf = g_try_malloc(PAGE_SIZE);
if (!cd->originbuf) {
goto exit;
}
if (deflateInit(&cd->stream, 1) != Z_OK) {
g_free(cd->originbuf);
goto exit;
}
cd->file = qemu_fopen_ops(NULL, &empty_ops);
return &cd->data;
exit:
g_free(cd);
return NULL;
}
static void compress_thread_data_fini(ThreadRequest *data)
{
CompressData *cd = container_of(data, CompressData, data);
qemu_fclose(cd->file);
deflateEnd(&cd->stream);
g_free(cd->originbuf);
g_free(cd);
}
static void compress_thread_data_handler(ThreadRequest *data)
{
CompressData *cd = container_of(data, CompressData, data);
/*
* if compression fails, it will indicate by
* migrate_get_current()->to_dst_file.
*/
do_compress_ram_page(cd->file, &cd->stream, cd->ram_addr, cd->offset,
cd->originbuf);
}
static void compress_thread_data_done(ThreadRequest *data)
{
CompressData *cd = container_of(data, CompressData, data);
int bytes_xmit;
bytes_xmit = qemu_put_qemu_file(dest_file, cd->file);
/* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
compression_counters.reduced_size += 4096 - bytes_xmit + 8;
compression_counters.pages++;
}
static Threads *compress_threads;
static void flush_compressed_data(void)
{
threads_wait_done(compress_threads);
}
static void compress_threads_save_cleanup(void)
{
if (!compress_threads) {
return;
}
threads_destroy(compress_threads);
compress_threads = NULL;
qemu_fclose(dest_file);
dest_file = NULL;
}
static int compress_threads_save_setup(void)
{
dest_file = qemu_fopen_ops(NULL, &test_write_ops);
compress_threads = threads_create(16,
"compress",
compress_thread_data_init,
compress_thread_data_fini,
compress_thread_data_handler,
compress_thread_data_done);
assert(compress_threads);
return 0;
}
static int compress_page_with_multi_thread(uint8_t *addr)
{
CompressData *cd;
ThreadRequest *thread_data;
thread_data = threads_submit_request_prepare(compress_threads);
if (!thread_data) {
compression_counters.busy++;
return -1;
}
cd = container_of(thread_data, CompressData, data);
cd->ram_addr = addr;
threads_submit_request_commit(compress_threads, thread_data);
return 1;
}
#define MEM_SIZE (30ULL << 30)
#define COUNT 5
static void run(void)
{
void *mem = qemu_memalign(PAGE_SIZE, MEM_SIZE);
uint8_t *ptr = mem, *end = mem + MEM_SIZE;
uint64_t start_time, total_time = 0, spend, total_busy = 0;
int i;
memset(mem, 0, MEM_SIZE);
start_time = g_get_monotonic_time();
for (i = 0; i < COUNT; i++) {
ptr = mem;
start_time = g_get_monotonic_time();
while (ptr < end) {
*ptr = 0x10;
compress_page_with_multi_thread(ptr);
ptr += PAGE_SIZE;
}
flush_compressed_data();
spend = g_get_monotonic_time() - start_time;
total_time += spend;
printf("RUN %d: BUSY %ld Time Cost %ld.\n", i, compression_counters.busy, spend);
total_busy += compression_counters.busy;
compression_counters.busy = 0;
}
printf("AVG: BUSY %ld Time Cost %ld.\n", total_busy / COUNT, total_time / COUNT);
}
static void compare_zero_and_compression(void)
{
ThreadRequest *data = compress_thread_data_init();
CompressData *cd;
uint64_t start_time, zero_time, compress_time;
char page[PAGE_SIZE];
if (!data) {
printf("Init compression failed.\n");
return;
}
cd = container_of(data, CompressData, data);
cd->ram_addr = (uint8_t *)page;
memset(page, 0, sizeof(page));
dest_file = qemu_fopen_ops(NULL, &test_write_ops);
start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
buffer_is_zero(page, PAGE_SIZE);
zero_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
compress_thread_data_handler(data);
compress_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
printf("Zero %ld ns Compression: %ld ns.\n", zero_time, compress_time);
compress_thread_data_fini(data);
}
static void migration_threads(void)
{
int i;
printf("Zero Test vs. compression.\n");
for (i = 0; i < 10; i++) {
compare_zero_and_compression();
}
printf("test migration threads.\n");
compress_threads_save_setup();
run();
compress_threads_save_cleanup();
}
int main(int argc, char **argv)
{
QTestState *s = NULL;
int ret;
g_test_init(&argc, &argv, NULL);
qtest_add_func("/migration/threads", migration_threads);
ret = g_test_run();
if (s) {
qtest_quit(s);
}
return ret;
}
next prev parent reply other threads:[~2018-06-29 7:30 UTC|newest]
Thread overview: 78+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-06-04 9:55 [Qemu-devel] [PATCH 00/12] migration: improve multithreads for compression and decompression guangrong.xiao
2018-06-04 9:55 ` [Qemu-devel] [PATCH 01/12] migration: do not wait if no free thread guangrong.xiao
2018-06-11 7:39 ` Peter Xu
2018-06-12 2:42 ` Xiao Guangrong
2018-06-12 3:15 ` Peter Xu
2018-06-13 15:43 ` Dr. David Alan Gilbert
2018-06-14 3:19 ` Xiao Guangrong
2018-06-04 9:55 ` [Qemu-devel] [PATCH 02/12] migration: fix counting normal page for compression guangrong.xiao
2018-06-13 15:51 ` Dr. David Alan Gilbert
2018-06-14 3:32 ` Xiao Guangrong
2018-06-04 9:55 ` [Qemu-devel] [PATCH 03/12] migration: fix counting xbzrle cache_miss_rate guangrong.xiao
2018-06-13 16:09 ` Dr. David Alan Gilbert
2018-06-15 11:30 ` Dr. David Alan Gilbert
2018-06-04 9:55 ` [Qemu-devel] [PATCH 04/12] migration: introduce migration_update_rates guangrong.xiao
2018-06-13 16:17 ` Dr. David Alan Gilbert
2018-06-14 3:35 ` Xiao Guangrong
2018-06-15 11:32 ` Dr. David Alan Gilbert
2018-06-04 9:55 ` [Qemu-devel] [PATCH 05/12] migration: show the statistics of compression guangrong.xiao
2018-06-04 22:31 ` Eric Blake
2018-06-06 12:44 ` Xiao Guangrong
2018-06-13 16:25 ` Dr. David Alan Gilbert
2018-06-14 6:48 ` Xiao Guangrong
2018-07-16 19:01 ` Dr. David Alan Gilbert
2018-07-18 8:51 ` Xiao Guangrong
2018-06-04 9:55 ` [Qemu-devel] [PATCH 06/12] migration: do not detect zero page for compression guangrong.xiao
2018-06-19 7:30 ` Peter Xu
2018-06-28 9:12 ` Xiao Guangrong
2018-06-28 9:36 ` Daniel P. Berrangé
2018-06-29 3:50 ` Xiao Guangrong
2018-06-29 9:54 ` Dr. David Alan Gilbert
2018-06-29 9:42 ` Dr. David Alan Gilbert
2018-07-03 3:53 ` Xiao Guangrong
2018-07-16 18:58 ` Dr. David Alan Gilbert
2018-07-18 8:46 ` Xiao Guangrong
2018-07-22 16:05 ` Michael S. Tsirkin
2018-07-23 7:12 ` Xiao Guangrong
2018-06-04 9:55 ` [Qemu-devel] [PATCH 07/12] migration: hold the lock only if it is really needed guangrong.xiao
2018-06-19 7:36 ` Peter Xu
2018-06-28 9:33 ` Xiao Guangrong
2018-06-29 11:22 ` Dr. David Alan Gilbert
2018-07-03 6:27 ` Xiao Guangrong
2018-07-11 8:21 ` Peter Xu
2018-07-12 7:47 ` Xiao Guangrong
2018-07-12 8:26 ` Peter Xu
2018-07-18 8:56 ` Xiao Guangrong
2018-07-18 10:18 ` Peter Xu
2018-07-13 17:44 ` Dr. David Alan Gilbert
2018-06-04 9:55 ` [Qemu-devel] [PATCH 08/12] migration: do not flush_compressed_data at the end of each iteration guangrong.xiao
2018-07-13 18:01 ` Dr. David Alan Gilbert
2018-07-18 8:44 ` Xiao Guangrong
2018-06-04 9:55 ` [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer guangrong.xiao
2018-06-20 4:52 ` Peter Xu
2018-06-28 10:02 ` Xiao Guangrong
2018-06-28 11:55 ` Wei Wang
2018-06-29 3:55 ` Xiao Guangrong
2018-07-03 15:55 ` Paul E. McKenney
2018-06-20 5:55 ` Peter Xu
2018-06-28 14:00 ` Xiao Guangrong
2018-06-20 12:38 ` Michael S. Tsirkin
2018-06-29 7:30 ` Xiao Guangrong [this message]
2018-06-29 13:08 ` Michael S. Tsirkin
2018-07-03 7:31 ` Xiao Guangrong
2018-06-28 13:36 ` Jason Wang
2018-06-29 3:59 ` Xiao Guangrong
2018-06-29 6:15 ` Jason Wang
2018-06-29 7:47 ` Xiao Guangrong
2018-06-29 4:23 ` Michael S. Tsirkin
2018-06-29 7:44 ` Xiao Guangrong
2018-06-04 9:55 ` [Qemu-devel] [PATCH 10/12] migration: introduce lockless multithreads model guangrong.xiao
2018-06-20 6:52 ` Peter Xu
2018-06-28 14:25 ` Xiao Guangrong
2018-07-13 16:24 ` Dr. David Alan Gilbert
2018-07-18 7:12 ` Xiao Guangrong
2018-06-04 9:55 ` [Qemu-devel] [PATCH 11/12] migration: use lockless Multithread model for compression guangrong.xiao
2018-06-04 9:55 ` [Qemu-devel] [PATCH 12/12] migration: use lockless Multithread model for decompression guangrong.xiao
2018-06-11 8:00 ` [Qemu-devel] [PATCH 00/12] migration: improve multithreads for compression and decompression Peter Xu
2018-06-12 3:19 ` Xiao Guangrong
2018-06-12 5:36 ` Peter Xu
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=a90a7cd1-3362-fdd6-0caa-148fd5d3acff@gmail.com \
--to=guangrong.xiao@gmail.com \
--cc=dgilbert@redhat.com \
--cc=jiang.biao2@zte.com.cn \
--cc=kvm@vger.kernel.org \
--cc=mst@redhat.com \
--cc=mtosatti@redhat.com \
--cc=pbonzini@redhat.com \
--cc=peterx@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=wei.w.wang@intel.com \
--cc=xiaoguangrong@tencent.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).