From: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
To: Jason Wang <jasowang@redhat.com>, qemu devel <qemu-devel@nongnu.org>
Cc: Li Zhijian <lizhijian@cn.fujitsu.com>,
Wen Congyang <wency@cn.fujitsu.com>,
zhanghailiang <zhang.zhanghailiang@huawei.com>,
"eddie . dong" <eddie.dong@intel.com>,
"Dr . David Alan Gilbert" <dgilbert@redhat.com>
Subject: Re: [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread
Date: Mon, 11 Jul 2016 15:17:15 +0800 [thread overview]
Message-ID: <578347FB.9020806@cn.fujitsu.com> (raw)
In-Reply-To: <577F2AD6.40004@redhat.com>
On 07/08/2016 12:23 PM, Jason Wang wrote:
>
>
> On 2016年06月23日 19:34, Zhang Chen wrote:
>> if packets are same, we send primary packet and drop secondary
>> packet, otherwise notify COLO do checkpoint.
>
> More verbose please, e.g how to handle each case of exception (or
> maybe comment in the code).
>
OK.
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>> net/colo-base.c | 1 +
>> net/colo-base.h | 3 +
>> net/colo-compare.c | 214
>> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>> trace-events | 2 +
>> 4 files changed, 220 insertions(+)
>>
>> diff --git a/net/colo-base.c b/net/colo-base.c
>> index 7e263e8..9673661 100644
>> --- a/net/colo-base.c
>> +++ b/net/colo-base.c
>> @@ -146,6 +146,7 @@ Packet *packet_new(const void *data, int size)
>> pkt->data = g_memdup(data, size);
>> pkt->size = size;
>> + pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>> return pkt;
>> }
>> diff --git a/net/colo-base.h b/net/colo-base.h
>> index 01c1a5d..8bb1043 100644
>> --- a/net/colo-base.h
>> +++ b/net/colo-base.h
>> @@ -18,6 +18,7 @@
>> #include "slirp/slirp.h"
>> #include "qemu/jhash.h"
>> #include "qemu/rcu.h"
>> +#include "qemu/timer.h"
>> #define HASHTABLE_MAX_SIZE 16384
>> @@ -47,6 +48,8 @@ typedef struct Packet {
>> };
>> uint8_t *transport_layer;
>> int size;
>> + /* Time of packet creation, in wall clock ms */
>> + int64_t creation_ms;
>> } Packet;
>> typedef struct ConnectionKey {
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index 4231fe7..928d729 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -35,6 +35,8 @@
>> OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>> #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>> +/* TODO: Should be configurable */
>> +#define REGULAR_CHECK_MS 400
>
> "REGULAR" seems to generic, need a better name.
Like "REGULAR_PACKET_CHECK_MS" ?
>
>> static QTAILQ_HEAD(, CompareState) net_compares =
>> QTAILQ_HEAD_INITIALIZER(net_compares);
>> @@ -86,6 +88,11 @@ typedef struct CompareState {
>> GQueue unprocessed_connections;
>> /* proxy current hash size */
>> uint32_t hashtable_size;
>> + /* compare thread, a thread for each NIC */
>> + QemuThread thread;
>> + int thread_status;
>> + /* Timer used on the primary to find packets that are never
>> matched */
>> + QEMUTimer *timer;
>> } CompareState;
>> typedef struct CompareClass {
>> @@ -97,6 +104,15 @@ enum {
>> SECONDARY_IN,
>> };
>> +enum {
>> + /* compare thread isn't started */
>> + COMPARE_THREAD_NONE,
>> + /* compare thread is running */
>> + COMPARE_THREAD_RUNNING,
>> + /* compare thread exit */
>> + COMPARE_THREAD_EXIT,
>> +};
>> +
>> static int compare_chr_send(CharDriverState *out,
>> const uint8_t *buf,
>> uint32_t size);
>> @@ -143,6 +159,98 @@ static int packet_enqueue(CompareState *s, int
>> mode)
>> return 0;
>> }
>> +/*
>> + * The IP packets sent by primary and secondary
>> + * will be compared in here
>> + * TODO support ip fragment, Out-Of-Order
>> + * return: 0 means packet same
>> + * > 0 || < 0 means packet different
>> + */
>> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
>> +{
>> + trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
>> + inet_ntoa(ppkt->ip->ip_dst), spkt->size,
>> + inet_ntoa(spkt->ip->ip_src),
>> + inet_ntoa(spkt->ip->ip_dst));
>> +
>> + if (ppkt->size == spkt->size) {
>> + return memcmp(ppkt->data, spkt->data, spkt->size);
>> + } else {
>> + return -1;
>> + }
>> +}
>> +
>> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
>> +{
>> + trace_colo_compare_main("compare all");
>> + return colo_packet_compare(ppkt, spkt);
>> +}
>> +
>> +static void colo_old_packet_check(void *opaque_packet, void
>> *opaque_found)
>> +{
>> + int64_t now;
>> + bool *found_old = (bool *)opaque_found;
>> + Packet *ppkt = (Packet *)opaque_packet;
>> +
>> + if (*found_old) {
>> + /* Someone found an old packet earlier in the queue */
>> + return;
>> + }
>> +
>> + now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>> + if ((ppkt->creation_ms < now) &&
>
> Any case that ppkt->creation_ms >= now?
No, will remove it.
>
>> + ((now - ppkt->creation_ms) > REGULAR_CHECK_MS)) {
>> + trace_colo_old_packet_check_found(ppkt->creation_ms);
>> + *found_old = true;
>> + }
>> +}
>> +
>> +/*
>> + * called from the compare thread on the primary
>> + * for compare connection
>> + */
>> +static void colo_compare_connection(void *opaque, void *user_data)
>> +{
>> + CompareState *s = user_data;
>> + Connection *conn = opaque;
>> + Packet *pkt = NULL;
>> + GList *result = NULL;
>> + bool found_old;
>> + int ret;
>> +
>> + while (!g_queue_is_empty(&conn->primary_list) &&
>> + !g_queue_is_empty(&conn->secondary_list)) {
>> + pkt = g_queue_pop_tail(&conn->primary_list);
>> + result = g_queue_find_custom(&conn->secondary_list,
>> + pkt,
>> (GCompareFunc)colo_packet_compare_all);
>> +
>> + if (result) {
>> + ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
>> + if (ret < 0) {
>> + error_report("colo_send_primary_packet failed");
>> + }
>> + trace_colo_compare_main("packet same and release packet");
>> + g_queue_remove(&conn->secondary_list, result->data);
>> + } else {
>
> A question I forget the answer, so may ask again. What if secondary
> packet comes late?
If secondary packet comes late, primary queue has some primary packet.
we use timer to regular call colo_compare_connection(), In here,
we foreach conn->primary_list, if have old primary packet(secondary
packet comes late),
will call colo_notify_checkpoint() to do a checkpoint,that can make primary
and secondary be same.
>
>> + trace_colo_compare_main("packet different");
>> + g_queue_push_tail(&conn->primary_list, pkt);
>> + /* TODO: colo_notify_checkpoint();*/
>> + break;
>> + }
>> + }
>> +
>> + /*
>> + * Look for old packets that the secondary hasn't matched,
>> + * if we have some then we have to checkpoint to wake
>> + * the secondary up.
>> + */
>> + found_old = false;
>> + g_queue_foreach(&conn->primary_list, colo_old_packet_check,
>> &found_old);
>> + if (found_old) {
>> + /* TODO: colo_notify_checkpoint();*/
>
> Shouldn't we need to remove all "old" packets here?
yes,will add remove func.
>
>> + }
>> +}
>> +
>> static int compare_chr_send(CharDriverState *out,
>> const uint8_t *buf,
>> uint32_t size)
>> @@ -170,6 +278,69 @@ err:
>> return ret < 0 ? ret : -EIO;
>> }
>> +static int compare_chr_can_read(void *opaque)
>> +{
>> + return COMPARE_READ_LEN_MAX;
>> +}
>> +
>> +/*
>> + * called from the main thread on the primary for packets
>> + * arriving over the socket from the primary.
>> + */
>> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int
>> size)
>> +{
>> + CompareState *s = COLO_COMPARE(opaque);
>> + int ret;
>> +
>> + ret = net_fill_rstate(&s->pri_rs, buf, size);
>> + if (ret == -1) {
>> + qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> + error_report("colo-compare primary_in error");
>> + }
>> +}
>> +
>> +/*
>> + * called from the main thread on the primary for packets
>> + * arriving over the socket from the secondary.
>> + */
>> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int
>> size)
>> +{
>> + CompareState *s = COLO_COMPARE(opaque);
>> + int ret;
>> +
>> + ret = net_fill_rstate(&s->sec_rs, buf, size);
>> + if (ret == -1) {
>> + qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> + error_report("colo-compare secondary_in error");
>> + }
>> +}
>> +
>> +static void *colo_compare_thread(void *opaque)
>> +{
>> + GMainContext *worker_context;
>> + GMainLoop *compare_loop;
>> + CompareState *s = opaque;
>> +
>> + worker_context = g_main_context_new();
>> + g_assert(g_main_context_get_thread_default() == NULL);
>> + g_main_context_push_thread_default(worker_context);
>> + g_assert(g_main_context_get_thread_default() == worker_context);
>> +
>> + qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
>> + compare_pri_chr_in, NULL, s);
>> + qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
>> + compare_sec_chr_in, NULL, s);
>> +
>> + compare_loop = g_main_loop_new(worker_context, FALSE);
>> +
>> + g_main_loop_run(compare_loop);
>> +
>> + g_main_loop_unref(compare_loop);
>> + g_main_context_pop_thread_default(worker_context);
>> + g_main_context_unref(worker_context);
>> + return NULL;
>> +}
>> +
>> static char *compare_get_pri_indev(Object *obj, Error **errp)
>> {
>> CompareState *s = COLO_COMPARE(obj);
>> @@ -222,6 +393,9 @@ static void
>> compare_pri_rs_finalize(SocketReadState *pri_rs)
>> if (packet_enqueue(s, PRIMARY_IN)) {
>> trace_colo_compare_main("primary: unsupported packet in");
>> compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
>> + } else {
>> + /* compare connection */
>> + g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>> }
>> }
>> @@ -231,16 +405,35 @@ static void
>> compare_sec_rs_finalize(SocketReadState *sec_rs)
>> if (packet_enqueue(s, SECONDARY_IN)) {
>> trace_colo_compare_main("secondary: unsupported packet in");
>> + } else {
>> + /* compare connection */
>> + g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>> }
>> }
>> /*
>> + * Prod the compare thread regularly so it can watch for any packets
>> + * that the secondary hasn't produced equivalents of.
>> + */
>> +static void colo_compare_regular(void *opaque)
>> +{
>> + CompareState *s = opaque;
>> +
>> + timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
>> + REGULAR_CHECK_MS);
>> + /* compare connection */
>> + g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>> +}
>
> We need make sure this function was called from colo thread, but it
> looks not?
Yes, In next version I will make old_packet_check related codes
independent with
colo_compare_connection().
>
>> +
>> +/*
>> * called from the main thread on the primary
>> * to setup colo-compare.
>> */
>> static void colo_compare_complete(UserCreatable *uc, Error **errp)
>> {
>> CompareState *s = COLO_COMPARE(uc);
>> + char thread_name[64];
>> + static int compare_id;
>> if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>> error_setg(errp, "colo compare needs 'primary_in' ,"
>> @@ -293,6 +486,19 @@ static void colo_compare_complete(UserCreatable
>> *uc, Error **errp)
>> g_free,
>> connection_destroy);
>> + s->thread_status = COMPARE_THREAD_RUNNING;
>> + sprintf(thread_name, "compare %d", compare_id);
>> + qemu_thread_create(&s->thread, thread_name,
>> + colo_compare_thread, s,
>> + QEMU_THREAD_JOINABLE);
>> + compare_id++;
>> +
>> + /* A regular timer to kick any packets that the secondary
>> doesn't match */
>> + s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest
>> runs */
>> + colo_compare_regular, s);
>> + timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
>> + REGULAR_CHECK_MS);
>> +
>> return;
>> }
>> @@ -338,6 +544,14 @@ static void colo_compare_finalize(Object *obj)
>> qemu_mutex_destroy(&s->conn_list_lock);
>> g_queue_free(&s->conn_list);
>> + if (s->thread.thread) {
>> + s->thread_status = COMPARE_THREAD_EXIT;
>
> Looks like there's not any code that depends on the status, so why
> need to this>
Currently we needn't this, will move it to "work with colo-frame"
related patch.
Thanks
Zhang Chen
>
>> + /* compare connection */
>> + g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>> + qemu_thread_join(&s->thread);
>> + }
>> + timer_del(s->timer);
>> +
>> g_free(s->pri_indev);
>> g_free(s->sec_indev);
>> g_free(s->outdev);
>> diff --git a/trace-events b/trace-events
>> index 703de1a..1537e91 100644
>> --- a/trace-events
>> +++ b/trace-events
>> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned
>> size, uint32_t data) "To 0x%" PRIx64
>> # net/colo-compare.c
>> colo_compare_main(const char *chr) ": %s"
>> +colo_compare_ip_info(int psize, const char *sta, const char *stb,
>> int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src
>> = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
>> +colo_old_packet_check_found(int64_t old_time) "%" PRId64
>
>
>
> .
>
--
Thanks
zhangchen
next prev parent reply other threads:[~2016-07-11 7:16 UTC|newest]
Thread overview: 21+ messages / expand[flat|nested] mbox.gz Atom feed top
2016-06-23 11:34 [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization Zhang Chen
2016-07-08 3:40 ` Jason Wang
2016-07-08 8:21 ` Zhang Chen
2016-07-08 9:12 ` Jason Wang
2016-07-11 5:14 ` Zhang Chen
2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet Zhang Chen
2016-07-08 4:07 ` Jason Wang
2016-07-08 9:56 ` Zhang Chen
2016-07-11 5:41 ` Jason Wang
2016-07-12 5:42 ` Zhang Chen
2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread Zhang Chen
2016-07-08 4:23 ` Jason Wang
2016-07-11 7:17 ` Zhang Chen [this message]
2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison Zhang Chen
2016-07-08 8:59 ` Jason Wang
2016-07-11 10:02 ` Zhang Chen
2016-07-13 2:54 ` Jason Wang
2016-07-13 5:10 ` Zhang Chen
2016-07-07 7:47 ` [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
2016-07-07 8:41 ` Jason Wang
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=578347FB.9020806@cn.fujitsu.com \
--to=zhangchen.fnst@cn.fujitsu.com \
--cc=dgilbert@redhat.com \
--cc=eddie.dong@intel.com \
--cc=jasowang@redhat.com \
--cc=lizhijian@cn.fujitsu.com \
--cc=qemu-devel@nongnu.org \
--cc=wency@cn.fujitsu.com \
--cc=zhang.zhanghailiang@huawei.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.