All of lore.kernel.org
 help / color / mirror / Atom feed
From: <wang.yong155@zte.com.cn>
To: jasowang@redhat.com, zhang.zhanghailiang@huawei.com
Cc: zhangchen.fnst@cn.fujitsu.com, lizhijian@cn.fujitsu.com,
	qemu-devel@nongnu.org, wang.guang55@zte.com.cn,
	yang.bin18@zte.com.cn, liu.dayu@zte.com.cn
Subject: [Qemu-devel] 答复: Re: 答复: Re: [PATCHv2 02/04] colo-compare: Process pactkets in the IOThread ofthe primary
Date: Tue, 13 Jun 2017 08:48:25 +0800 (CST)	[thread overview]
Message-ID: <201706130848249945419@zte.com.cn> (raw)

>> >> From: Wang Yong <wang.yong155@zte.com.cn>

>>

>> >>

>>

>> >> Process pactkets in the IOThread which arrived over the socket.

>>

>> >> we use qio_channel_set_aio_fd_handler to set the handlers on the

>>

>> >> IOThread AioContext.then the packets from the primary and the 

>> secondary

>>

>> >> are processed in the IOThread.

>>

>> >> Finally remove the colo-compare thread using the IOThread instead.

>>

>> >>

>>

>> >> Signed-off-by: Wang Yong<wang.yong155@zte.com.cn>

>>

>> >> Signed-off-by: Wang Guang<wang.guang55@zte.com.cn>

>>

>> >> ---

>>

>> >>   net/colo-compare.c | 133 

>> ++++++++++++++++++++++++++++++++++++-----------------

>>

>> >>   net/colo.h         |   1 +

>>

>> >>   2 files changed, 91 insertions(+), 43 deletions(-)

>>

>> >>

>>

>> >> diff --git a/net/colo-compare.c b/net/colo-compare.c

>>

>> >> index b0942a4..e3af791 100644

>>

>> >> --- a/net/colo-compare.c

>>

>> >> +++ b/net/colo-compare.c

>>

>> >> @@ -29,6 +29,7 @@

>>

>> >>   #include "qemu/sockets.h"

>>

>> >>   #include "qapi-visit.h"

>>

>> >>   #include "net/colo.h"

>>

>> >> +#include "io/channel.h"

>>

>> >>   #include "sysemu/iothread.h"

>>

>> >>

>>

>> >>   #define TYPE_COLO_COMPARE "colo-compare"

>>

>> >> @@ -82,11 +83,6 @@ typedef struct CompareState {

>>

>> >>       GQueue conn_list

>>

>> >>       /* hashtable to save connection */

>>

>> >>       GHashTable *connection_track_table

>>

>> >> -    /* compare thread, a thread for each NIC */

>>

>> >> -    QemuThread thread

>>

>> >> -

>>

>> >> -    GMainContext *worker_context

>>

>> >> -    GMainLoop *compare_loop

>>

>> >>

>>

>> >>       /*compare iothread*/

>>

>> >>       IOThread *iothread

>>

>> >> @@ -95,6 +91,14 @@ typedef struct CompareState {

>>

>> >>       QEMUTimer *packet_check_timer

>>

>> >>   } CompareState

>>

>> >>

>>

>> >> +typedef struct {

>>

>> >> +    Chardev parent

>>

>> >> +    QIOChannel *ioc /*I/O channel */

>>

>>

>> >We probably don't want to manipulate char backend's internal io 

>> channel.

>>

>> >All need here is to access the frontend API (char-fe.c) I believe, and

>>

>> >hide the internal implementation.

>>

>> char-fd.c ?

>>




>Char-fe.c for sure which means frontend of chardev.




>> These API can only watch events in the qemu main thread, not in the 

>> IOThread.

>>

>> I had to use the qio_channel_socket_set_aio_fd_handler function to

>>

>> monitor the char event in the IOThread,so the io channel is used her

>>




>The point is not touching the internal structure of chardev like ioc, 

>instead extend its helper like e.g qemu_chr_fe_set_handlers() and let it 

>set aio handlers,




Currently character devices are tied to the GSource API. However,I'll try to submit a patch first.




Thanks




>> ->qio_channel_socket_set_aio_fd_handler

>>

>>    ->aio_set_fd_handler

>>

>>

>> Thanks

>>

>>

>> >> +} CompareChardev

>>

>> >> +

>>

>> >> +#define COMPARE_CHARDEV(obj)         \

>>

>> >> +    OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET)

>>

>> >> +

>>

>> >>   typedef struct CompareClass {

>>

>> >>       ObjectClass parent_class

>>

>> >>   } CompareClass

>>

>> >> @@ -107,6 +111,12 @@ enum {

>>

>> >>   static int compare_chr_send(CharBackend *out,

>>

>> >>                               const uint8_t *buf,

>>

>> >>                               uint32_t size)

>>

>> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,

>>

>> >> +                                    AioContext *ctx,

>>

>> >> +                                    IOCanReadHandler *fd_can_read,

>>

>> >> +                                    IOReadHandler *fd_read,

>>

>> >> +                                    IOEventHandler *fd_event,

>>

>> >> +                                    void *opaque)

>>

>> >>

>>

>> >>   static gint seq_sorter(Packet *a, Packet *b, gpointer data)

>>

>> >>   {

>>

>> >> @@ -534,6 +544,30 @@ err:

>>

>> >>       return ret < 0 ? ret : -EIO

>>

>> >>   }

>>

>> >>

>>

>> >> +static void compare_chr_read(void *opaque)

>>

>> >> +{

>>

>> >> +    Chardev *chr = opaque

>>

>> >> +    uint8_t buf[CHR_READ_BUF_LEN]

>>

>> >> +    int len, size

>>

>> >> +    int max_size

>>

>> >> +

>>

>> >> +    max_size = qemu_chr_be_can_write(chr)

>>

>> >> +    if (max_size <= 0) {

>>

>> >> +        return

>>

>> >> +    }

>>

>> >> +

>>

>> >> +    len = sizeof(buf)

>>

>> >> +    if (len > max_size) {

>>

>> >> +        len = max_size

>>

>> >> +    }

>>

>> >> +    size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf, 

>> len)

>>

>> >> +    if (size == 0) {

>>

>> >> +        return

>>

>> >> +    } else if (size > 0) {

>>

>> >> +        qemu_chr_be_write(chr, buf, size)

>>

>> >> +    }

>>

>> >> +}

>>

>> >> +

>>

>> >>   static int compare_chr_can_read(void *opaque)

>>

>> >>   {

>>

>> >>       return COMPARE_READ_LEN_MAX

>>

>> >> @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque, 

>> const uint8_t *buf, int size)

>>

>> >>

>>

>> >>       ret = net_fill_rstate(&s->pri_rs, buf, size)

>>

>> >>       if (ret == -1) {

>>

>> >> -        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,

>>

>> >> -                                 NULL, NULL, true)

>>

>> >> +  compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,

>>

>> >> +                                    NULL, NULL, NULL, NULL)

>>

>> >>           error_report("colo-compare primary_in error")

>>

>> >>       }

>>

>> >>   }

>>

>> >> @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque, 

>> const uint8_t *buf, int size)

>>

>> >>

>>

>> >>       ret = net_fill_rstate(&s->sec_rs, buf, size)

>>

>> >>       if (ret == -1) {

>>

>> >> -        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,

>>

>> >> -                                 NULL, NULL, true)

>>

>> >> +  compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,

>>

>> >> +                                    NULL, NULL, NULL, NULL)

>>

>> >>           error_report("colo-compare secondary_in error")

>>

>> >>       }

>>

>> >>   }

>>

>> >> @@ -605,34 +639,57 @@ static void 

>> colo_compare_timer_del(CompareState *s)

>>

>> >>       }

>>

>> >>   }

>>

>> >>

>>

>> >> -static void *colo_compare_thread(void *opaque)

>>

>> >> -{

>>

>> >> -    CompareState *s = opaque

>>

>> >> -

>>

>>>> -    s->worker_context = g_main_context_new()

>>

>> >> -

>>

>> >> -    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,

>>

>> >> -                          compare_pri_chr_in, NULL, s, 

>> s->worker_context, true)

>>

>> >> -    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,

>>

>> >> -                          compare_sec_chr_in, NULL, s, 

>> s->worker_context, true)

>>

>> >> -

>>

>> >> -    s->compare_loop = g_main_loop_new(s->worker_context, FALSE)

>>

>> >> -

>>

>> >> -    g_main_loop_run(s->compare_loop)

>>

>> >> -

>>

>> >> -    g_main_loop_unref(s->compare_loop)

>>

>> >> -    g_main_context_unref(s->worker_context)

>>

>> >> -    return NULL

>>

>> >> -}

>>

>> >>

>>

>> >>   static void colo_compare_iothread(CompareState *s)

>>

>> >>   {

>>

>> >>       object_ref(OBJECT(s->iothread))

>>

>> >>       s->ctx = iothread_get_aio_context(s->iothread)

>>

>> >>

>>

>> >> +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,

>>

>> >> +                    compare_chr_can_read,

>>

>> >> +                    compare_pri_chr_in,

>>

>> >> +                    NULL,

>>

>> >> +                    s)

>>

>> >> +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,

>>

>> >> +                    compare_chr_can_read,

>>

>> >> +                    compare_sec_chr_in,

>>

>> >> +                    NULL,

>>

>> >> +                    s)

>>

>> >> +

>>

>> >>       colo_compare_timer_init(s)

>>

>> >>   }

>>

>> >>

>>

>> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,

>>

>> >> +                                    AioContext *ctx,

>>

>> >> +                                    IOCanReadHandler *fd_can_read,

>>

>> >> +                                    IOReadHandler *fd_read,

>>

>> >> +                                    IOEventHandler *fd_event,

>>

>> >> +                                    void *opaque)

>>

>> >> +{

>>

>> >> +    CompareChardev *s

>>

>> >> +

>>

>> >> +    if (!b->chr) {

>>

>> >> +        return

>>

>> >> +    }

>>

>> >> +    s = COMPARE_CHARDEV(b->chr)

>>

>> >> +    if (!s->ioc) {

>>

>> >> +        return

>>

>> >> +    }

>>

>>

>> >So this is hacky, you can refer how vhost-user validate udp socket char

>>

>> >backend.

>>

>> I will investigate.

>>

>>

>> Thanks

>>

>>

>> >> +

>>

>> >> +    b->chr_can_read = fd_can_read

>>

>> >> +    b->chr_read = fd_read

>>

>> >> +    b->chr_event = fd_event

>>

>> >> +    b->opaque = opaque

>>

>> >> +    remove_fd_in_watch(b->chr)

>>

>> >> +

>>

>> >> +    if (b->chr_read) {

>>

>> >> +        qio_channel_set_aio_fd_handler(s->ioc, ctx,

>>

>> >> +                                compare_chr_read, NULL, b->chr)

>>

>> >> +    } else {

>>

>> >> +        qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL, 

>> NULL)

>>

>>

>> >So instead of doing such hack, how about passing a AioContext * instead

>>

>> >of GMainContext * to qemu_chr_fe_set_handlers?

>>

>> IOThread AioContext ->GSource -> GMainContext  is NULL

>>

>> if we still use the qemu_chr_fe_set_handlers, it will use the qemu 

>> main thread' GMainContext,

>>

>> then io will still be processed in the qemu main thread.

>>

>> so I encapsulate a function(compare_chr_set_aio_fd_handlers) to 

>> monitor char fd in the IOThread.

>>

>>>




>As above, we should do this inside qemu-fe.c not here.

>

>Thanks

>

>> Thanks

>>

>>

>> >Thanks

>>

>>

>> >> +    }

>>

>> >> +}

>>

>> >> +

>>

>> >>   static char *compare_get_pri_indev(Object *obj, Error **errp)

>>

>> >>   {

>>

>> >>       CompareState *s = COLO_COMPARE(obj)

>>

>> >> @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable 

>> *uc, Error **errp)

>>

>> >>   {

>>

>> >>       CompareState *s = COLO_COMPARE(uc)

>>

>> >>       Chardev *chr

>>

>> >> -    char thread_name[64]

>>

>> >> -    static int compare_id

>>

>> >>

>>

>> >>       if (!s->pri_indev || !s->sec_indev || !s->outdev || 

>> !s->iothread) {

>>

>> >>           error_setg(errp, "colo compare needs 'primary_in' ,"

>>

>> >> @@ -776,12 +831,6 @@ static void 

>> colo_compare_complete(UserCreatable *uc, Error **errp)

>>

>> >> g_free,

>>

>> >> connection_destroy)

>>

>> >>

>>

>> >> -    sprintf(thread_name, "colo-compare %d", compare_id)

>>

>> >> -    qemu_thread_create(&s->thread, thread_name,

>>

>> >> -                       colo_compare_thread, s,

>>

>> >> -                       QEMU_THREAD_JOINABLE)

>>

>> >> -    compare_id++

>>

>> >> -

>>

>> >>       colo_compare_iothread(s)

>>

>> >>

>>

>> >>       return

>>

>> >> @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj)

>>

>> >>   {

>>

>> >>       CompareState *s = COLO_COMPARE(obj)

>>

>> >>

>>

>> >> -    qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, 

>> NULL,

>>

>> >> -                             s->worker_context, true)

>>

>> >> -    qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, 

>> NULL,

>>

>> >> -                             s->worker_context, true)

>>

>> >> +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,

>>

>> >> +                                    NULL, NULL, NULL, NULL)

>>

>> >> +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,

>>

>> >> +                                    NULL, NULL, NULL, NULL)

>>

>> >> +

>>

>> >>       qemu_chr_fe_deinit(&s->chr_out)

>>

>> >>       colo_compare_timer_del(s)

>>

>> >>

>>

>> >> -    g_main_loop_quit(s->compare_loop)

>>

>> >> -    qemu_thread_join(&s->thread)

>>

>> >> -

>>

>> >>       /* Release all unhandled packets after compare thead exited */

>>>

>> >>       g_queue_foreach(&s->conn_list, colo_flush_packets, s)

>>

>> >>

>>

>> >> diff --git a/net/colo.h b/net/colo.h

>>

>> >> index 7c524f3..936dea1 100644

>>

>> >> --- a/net/colo.h

>>

>> >> +++ b/net/colo.h

>>

>> >> @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable 

>> *connection_track_table,

>>

>> >>   void connection_hashtable_reset(GHashTable 

>> *connection_track_table)

>>

>> >>   Packet *packet_new(const void *data, int size)

>>

>> >>   void packet_destroy(void *opaque, void *user_data)

>>

>> >> +void remove_fd_in_watch(Chardev *chr)

>>

>> >>

>>

>> >>   #endif /* QEMU_COLO_PROXY_H */















原始邮件



发件人: <jasowang@redhat.com>
收件人:王勇10170530 <zhang.zhanghailiang@huawei.com>
抄送人: <zhangchen.fnst@cn.fujitsu.com> <lizhijian@cn.fujitsu.com> <qemu-devel@nongnu.org>王广10165992
日 期 :2017年06月09日 12:20
主 题 :Re: 答复: Re: [PATCHv2 02/04] colo-compare: Process pactkets in the IOThread ofthe primary







On 2017年06月08日 17:16, wang.yong155@zte.com.cn wrote:
>
> >> From: Wang Yong <wang.yong155@zte.com.cn>
>
> >>
>
> >> Process pactkets in the IOThread which arrived over the socket.
>
> >> we use qio_channel_set_aio_fd_handler to set the handlers on the
>
> >> IOThread AioContext.then the packets from the primary and the 
> secondary
>
> >> are processed in the IOThread.
>
> >> Finally remove the colo-compare thread using the IOThread instead.
>
> >>
>
> >> Signed-off-by: Wang Yong<wang.yong155@zte.com.cn>
>
> >> Signed-off-by: Wang Guang<wang.guang55@zte.com.cn>
>
> >> ---
>
> >>   net/colo-compare.c | 133 
> ++++++++++++++++++++++++++++++++++++-----------------
>
> >>   net/colo.h         |   1 +
>
> >>   2 files changed, 91 insertions(+), 43 deletions(-)
>
> >>
>
> >> diff --git a/net/colo-compare.c b/net/colo-compare.c
>
> >> index b0942a4..e3af791 100644
>
> >> --- a/net/colo-compare.c
>
> >> +++ b/net/colo-compare.c
>
> >> @@ -29,6 +29,7 @@
>
> >>   #include "qemu/sockets.h"
>
> >>   #include "qapi-visit.h"
>
> >>   #include "net/colo.h"
>
> >> +#include "io/channel.h"
>
> >>   #include "sysemu/iothread.h"
>
> >>
>
> >>   #define TYPE_COLO_COMPARE "colo-compare"
>
> >> @@ -82,11 +83,6 @@ typedef struct CompareState {
>
> >>       GQueue conn_list
>
> >>       /* hashtable to save connection */
>
> >>       GHashTable *connection_track_table
>
> >> -    /* compare thread, a thread for each NIC */
>
> >> -    QemuThread thread
>
> >> -
>
> >> -    GMainContext *worker_context
>
> >> -    GMainLoop *compare_loop
>
> >>
>
> >>       /*compare iothread*/
>
> >>       IOThread *iothread
>
> >> @@ -95,6 +91,14 @@ typedef struct CompareState {
>
> >>       QEMUTimer *packet_check_timer
>
> >>   } CompareState
>
> >>
>
> >> +typedef struct {
>
> >> +    Chardev parent
>
> >> +    QIOChannel *ioc /*I/O channel */
>
>
> >We probably don't want to manipulate char backend's internal io 
> channel.
>
> >All need here is to access the frontend API (char-fe.c) I believe, and
>
> >hide the internal implementation.
>
> char-fd.c ?
>

Char-fe.c for sure which means frontend of chardev.

> These API can only watch events in the qemu main thread, not in the 
> IOThread.
>
> I had to use the qio_channel_socket_set_aio_fd_handler function to
>
> monitor the char event in the IOThread,so the io channel is used her
>

The point is not touching the internal structure of chardev like ioc, 
instead extend its helper like e.g qemu_chr_fe_set_handlers() and let it 
set aio handlers,

> ->qio_channel_socket_set_aio_fd_handler
>
>    ->aio_set_fd_handler
>
>
> Thanks
>
>
> >> +} CompareChardev
>
> >> +
>
> >> +#define COMPARE_CHARDEV(obj)         \
>
> >> +    OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET)
>
> >> +
>
> >>   typedef struct CompareClass {
>
> >>       ObjectClass parent_class
>
> >>   } CompareClass
>
> >> @@ -107,6 +111,12 @@ enum {
>
> >>   static int compare_chr_send(CharBackend *out,
>
> >>                               const uint8_t *buf,
>
> >>                               uint32_t size)
>
> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
>
> >> +                                    AioContext *ctx,
>
> >> +                                    IOCanReadHandler *fd_can_read,
>
> >> +                                    IOReadHandler *fd_read,
>
> >> +                                    IOEventHandler *fd_event,
>
> >> +                                    void *opaque)
>
> >>
>
> >>   static gint seq_sorter(Packet *a, Packet *b, gpointer data)
>
> >>   {
>
> >> @@ -534,6 +544,30 @@ err:
>
> >>       return ret < 0 ? ret : -EIO
>
> >>   }
>
> >>
>
> >> +static void compare_chr_read(void *opaque)
>
> >> +{
>
> >> +    Chardev *chr = opaque
>
> >> +    uint8_t buf[CHR_READ_BUF_LEN]
>
> >> +    int len, size
>
> >> +    int max_size
>
> >> +
>
> >> +    max_size = qemu_chr_be_can_write(chr)
>
> >> +    if (max_size <= 0) {
>
> >> +        return
>
> >> +    }
>
> >> +
>
> >> +    len = sizeof(buf)
>
> >> +    if (len > max_size) {
>
> >> +        len = max_size
>
> >> +    }
>
> >> +    size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf, 
> len)
>
> >> +    if (size == 0) {
>
> >> +        return
>
> >> +    } else if (size > 0) {
>
> >> +        qemu_chr_be_write(chr, buf, size)
>
> >> +    }
>
> >> +}
>
> >> +
>
> >>   static int compare_chr_can_read(void *opaque)
>
> >>   {
>
> >>       return COMPARE_READ_LEN_MAX
>
> >> @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque, 
> const uint8_t *buf, int size)
>
> >>
>
> >>       ret = net_fill_rstate(&s->pri_rs, buf, size)
>
> >>       if (ret == -1) {
>
> >> -        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
>
> >> -                                 NULL, NULL, true)
>
> >> +  compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>
> >> +                                    NULL, NULL, NULL, NULL)
>
> >>           error_report("colo-compare primary_in error")
>
> >>       }
>
> >>   }
>
> >> @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque, 
> const uint8_t *buf, int size)
>
> >>
>
> >>       ret = net_fill_rstate(&s->sec_rs, buf, size)
>
> >>       if (ret == -1) {
>
> >> -        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
>
> >> -                                 NULL, NULL, true)
>
> >> +  compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>
> >> +                                    NULL, NULL, NULL, NULL)
>
> >>           error_report("colo-compare secondary_in error")
>
> >>       }
>
> >>   }
>
> >> @@ -605,34 +639,57 @@ static void 
> colo_compare_timer_del(CompareState *s)
>
> >>       }
>
> >>   }
>
> >>
>
> >> -static void *colo_compare_thread(void *opaque)
>
> >> -{
>
> >> -    CompareState *s = opaque
>
> >> -
>
> >> -    s->worker_context = g_main_context_new()
>
> >> -
>
> >> -    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
>
> >> -                          compare_pri_chr_in, NULL, s, 
> s->worker_context, true)
>
> >> -    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
>
> >> -                          compare_sec_chr_in, NULL, s, 
> s->worker_context, true)
>
> >> -
>
> >> -    s->compare_loop = g_main_loop_new(s->worker_context, FALSE)
>
> >> -
>
> >> -    g_main_loop_run(s->compare_loop)
>
> >> -
>
> >> -    g_main_loop_unref(s->compare_loop)
>
> >> -    g_main_context_unref(s->worker_context)
>
> >> -    return NULL
>
> >> -}
>
> >>
>
> >>   static void colo_compare_iothread(CompareState *s)
>
> >>   {
>
> >>       object_ref(OBJECT(s->iothread))
>
> >>       s->ctx = iothread_get_aio_context(s->iothread)
>
> >>
>
> >> +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>
> >> +                    compare_chr_can_read,
>
> >> +                    compare_pri_chr_in,
>
> >> +                    NULL,
>
> >> +                    s)
>
> >> +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>
> >> +                    compare_chr_can_read,
>
> >> +                    compare_sec_chr_in,
>
> >> +                    NULL,
>
> >> +                    s)
>
> >> +
>
> >>       colo_compare_timer_init(s)
>
> >>   }
>
> >>
>
> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
>
> >> +                                    AioContext *ctx,
>
> >> +                                    IOCanReadHandler *fd_can_read,
>
> >> +                                    IOReadHandler *fd_read,
>
> >> +                                    IOEventHandler *fd_event,
>
> >> +                                    void *opaque)
>
> >> +{
>
> >> +    CompareChardev *s
>
> >> +
>
> >> +    if (!b->chr) {
>
> >> +        return
>
> >> +    }
>
> >> +    s = COMPARE_CHARDEV(b->chr)
>
> >> +    if (!s->ioc) {
>
> >> +        return
>
> >> +    }
>
>
> >So this is hacky, you can refer how vhost-user validate udp socket char
>
> >backend.
>
> I will investigate.
>
>
> Thanks
>
>
> >> +
>
> >> +    b->chr_can_read = fd_can_read
>
> >> +    b->chr_read = fd_read
>
> >> +    b->chr_event = fd_event
>
> >> +    b->opaque = opaque
>
> >> +    remove_fd_in_watch(b->chr)
>
> >> +
>
> >> +    if (b->chr_read) {
>
> >> +        qio_channel_set_aio_fd_handler(s->ioc, ctx,
>
> >> +                                compare_chr_read, NULL, b->chr)
>
> >> +    } else {
>
> >> +        qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL, 
> NULL)
>
>
> >So instead of doing such hack, how about passing a AioContext * instead
>
> >of GMainContext * to qemu_chr_fe_set_handlers?
>
> IOThread AioContext ->GSource -> GMainContext  is NULL
>
> if we still use the qemu_chr_fe_set_handlers, it will use the qemu 
> main thread' GMainContext,
>
> then io will still be processed in the qemu main thread.
>
> so I encapsulate a function(compare_chr_set_aio_fd_handlers) to 
> monitor char fd in the IOThread.
>
>

As above, we should do this inside qemu-fe.c not here.

Thanks

> Thanks
>
>
> >Thanks
>
>
> >> +    }
>
> >> +}
>
> >> +
>
> >>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>
> >>   {
>
> >>       CompareState *s = COLO_COMPARE(obj)
>
> >> @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable 
> *uc, Error **errp)
>
> >>   {
>
> >>       CompareState *s = COLO_COMPARE(uc)
>
> >>       Chardev *chr
>
> >> -    char thread_name[64]
>
> >> -    static int compare_id
>
> >>
>
> >>       if (!s->pri_indev || !s->sec_indev || !s->outdev || 
> !s->iothread) {
>
> >>           error_setg(errp, "colo compare needs 'primary_in' ,"
>
> >> @@ -776,12 +831,6 @@ static void 
> colo_compare_complete(UserCreatable *uc, Error **errp)
>
> >> g_free,
>
> >> connection_destroy)
>
> >>
>
> >> -    sprintf(thread_name, "colo-compare %d", compare_id)
>
> >> -    qemu_thread_create(&s->thread, thread_name,
>
> >> -                       colo_compare_thread, s,
>
> >> -                       QEMU_THREAD_JOINABLE)
>
> >> -    compare_id++
>
> >> -
>
> >>       colo_compare_iothread(s)
>
> >>
>
> >>       return
>
> >> @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj)
>
> >>   {
>
> >>       CompareState *s = COLO_COMPARE(obj)
>
> >>
>
> >> -    qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, 
> NULL,
>
> >> -                             s->worker_context, true)
>
> >> -    qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, 
> NULL,
>
> >> -                             s->worker_context, true)
>
> >> +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
>
> >> +                                    NULL, NULL, NULL, NULL)
>
> >> +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
>
> >> +                                    NULL, NULL, NULL, NULL)
>
> >> +
>
> >>       qemu_chr_fe_deinit(&s->chr_out)
>
> >>       colo_compare_timer_del(s)
>
> >>
>
> >> -    g_main_loop_quit(s->compare_loop)
>
> >> -    qemu_thread_join(&s->thread)
>
> >> -
>
> >>       /* Release all unhandled packets after compare thead exited */
>
> >>       g_queue_foreach(&s->conn_list, colo_flush_packets, s)
>
> >>
>
> >> diff --git a/net/colo.h b/net/colo.h
>
> >> index 7c524f3..936dea1 100644
>
> >> --- a/net/colo.h
>
> >> +++ b/net/colo.h
>
> >> @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable 
> *connection_track_table,
>
> >>   void connection_hashtable_reset(GHashTable 
> *connection_track_table)
>
> >>   Packet *packet_new(const void *data, int size)
>
> >>   void packet_destroy(void *opaque, void *user_data)
>
> >> +void remove_fd_in_watch(Chardev *chr)
>
> >>
>
> >>   #endif /* QEMU_COLO_PROXY_H */
>
>
>
>
>
> 原始邮件
> *发件人:*<jasowang@redhat.com>
> *收件人:*王勇10170530<zhang.zhanghailiang@huawei.com><zhangchen.fnst@cn.fujitsu.com>
> *抄送人:*<lizhijian@cn.fujitsu.com><qemu-devel@nongnu.org>王广10165992
> *日 期 :*2017年06月07日 16:35
> *主 题 :**Re: [PATCHv2 02/04] colo-compare: Process pactkets in the 
> IOThread ofthe primary*
>
>
>
>
> On 2017年06月05日 18:44, Yong Wang wrote:
> > From: Wang Yong <wang.yong155@zte.com.cn>
> >
> > Process pactkets in the IOThread which arrived over the socket.
> > we use qio_channel_set_aio_fd_handler to set the handlers on the
> > IOThread AioContext.then the packets from the primary and the secondary
> > are processed in the IOThread.
> > Finally remove the colo-compare thread using the IOThread instead.
> >
> > Signed-off-by: Wang Yong<wang.yong155@zte.com.cn>
> > Signed-off-by: Wang Guang<wang.guang55@zte.com.cn>
> > ---
> >   net/colo-compare.c | 133 ++++++++++++++++++++++++++++++++++++-----------------
> >   net/colo.h         |   1 +
> >   2 files changed, 91 insertions(+), 43 deletions(-)
> >
> > diff --git a/net/colo-compare.c b/net/colo-compare.c
> > index b0942a4..e3af791 100644
> > --- a/net/colo-compare.c
> > +++ b/net/colo-compare.c
> > @@ -29,6 +29,7 @@
> >   #include "qemu/sockets.h"
> >   #include "qapi-visit.h"
> >   #include "net/colo.h"
> > +#include "io/channel.h"
> >   #include "sysemu/iothread.h"
> >
> >   #define TYPE_COLO_COMPARE "colo-compare"
> > @@ -82,11 +83,6 @@ typedef struct CompareState {
> >       GQueue conn_list
> >       /* hashtable to save connection */
> >       GHashTable *connection_track_table
> > -    /* compare thread, a thread for each NIC */
> > -    QemuThread thread
> > -
> > -    GMainContext *worker_context
> > -    GMainLoop *compare_loop
> >
> >       /*compare iothread*/
> >       IOThread *iothread
> > @@ -95,6 +91,14 @@ typedef struct CompareState {
> >       QEMUTimer *packet_check_timer
> >   } CompareState
> >
> > +typedef struct {
> > +    Chardev parent
> > +    QIOChannel *ioc /*I/O channel */
>
> We probably don't want to manipulate char backend's internal io channel.
> All need here is to access the frontend API (char-fe.c) I believe, and
> hide the internal implementation.
>
> > +} CompareChardev
> > +
> > +#define COMPARE_CHARDEV(obj)                                     \
> > +    OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET)
> > +
> >   typedef struct CompareClass {
> >       ObjectClass parent_class
> >   } CompareClass
> > @@ -107,6 +111,12 @@ enum {
> >   static int compare_chr_send(CharBackend *out,
> >                               const uint8_t *buf,
> >                               uint32_t size)
> > +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
> > +                                    AioContext *ctx,
> > +                                    IOCanReadHandler *fd_can_read,
> > +                                    IOReadHandler *fd_read,
> > +                                    IOEventHandler *fd_event,
> > +                                    void *opaque)
> >
> >   static gint seq_sorter(Packet *a, Packet *b, gpointer data)
> >   {
> > @@ -534,6 +544,30 @@ err:
> >       return ret < 0 ? ret : -EIO
> >   }
> >
> > +static void compare_chr_read(void *opaque)
> > +{
> > +    Chardev *chr = opaque
> > +    uint8_t buf[CHR_READ_BUF_LEN]
> > +    int len, size
> > +    int max_size
> > +
> > +    max_size = qemu_chr_be_can_write(chr)
> > +    if (max_size <= 0) {
> > +        return
> > +    }
> > +
> > +    len = sizeof(buf)
> > +    if (len > max_size) {
> > +        len = max_size
> > +    }
> > +    size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf, len)
> > +    if (size == 0) {
> > +        return
> > +    } else if (size > 0) {
> > +        qemu_chr_be_write(chr, buf, size)
> > +    }
> > +}
> > +
> >   static int compare_chr_can_read(void *opaque)
> >   {
> >       return COMPARE_READ_LEN_MAX
> > @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> >
> >       ret = net_fill_rstate(&s->pri_rs, buf, size)
> >       if (ret == -1) {
> > -        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
> > -                                 NULL, NULL, true)
> > +        compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> > +                                    NULL, NULL, NULL, NULL)
> >           error_report("colo-compare primary_in error")
> >       }
> >   }
> > @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
> >
> >       ret = net_fill_rstate(&s->sec_rs, buf, size)
> >       if (ret == -1) {
> > -        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
> > -                                 NULL, NULL, true)
> > +        compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> > +                                    NULL, NULL, NULL, NULL)
> >           error_report("colo-compare secondary_in error")
> >       }
> >   }
> > @@ -605,34 +639,57 @@ static void colo_compare_timer_del(CompareState *s)
> >       }
> >   }
> >
> > -static void *colo_compare_thread(void *opaque)
> > -{
> > -    CompareState *s = opaque
> > -
> > -    s->worker_context = g_main_context_new()
> > -
> > -    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
> > -                          compare_pri_chr_in, NULL, s, s->worker_context, true)
> > -    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
> > -                          compare_sec_chr_in, NULL, s, s->worker_context, true)
> > -
> > -    s->compare_loop = g_main_loop_new(s->worker_context, FALSE)
> > -
> > -    g_main_loop_run(s->compare_loop)
> > -
> > -    g_main_loop_unref(s->compare_loop)
> > -    g_main_context_unref(s->worker_context)
> > -    return NULL
> > -}
> >
> >   static void colo_compare_iothread(CompareState *s)
> >   {
> >       object_ref(OBJECT(s->iothread))
> >       s->ctx = iothread_get_aio_context(s->iothread)
> >
> > +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> > +                    compare_chr_can_read,
> > +                    compare_pri_chr_in,
> > +                    NULL,
> > +                    s)
> > +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> > +                    compare_chr_can_read,
> > +                    compare_sec_chr_in,
> > +                    NULL,
> > +                    s)
> > +
> >       colo_compare_timer_init(s)
> >   }
> >
> > +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
> > +                                    AioContext *ctx,
> > +                                    IOCanReadHandler *fd_can_read,
> > +                                    IOReadHandler *fd_read,
> > +                                    IOEventHandler *fd_event,
> > +                                    void *opaque)
> > +{
> > +    CompareChardev *s
> > +
> > +    if (!b->chr) {
> > +        return
> > +    }
> > +    s = COMPARE_CHARDEV(b->chr)
> > +    if (!s->ioc) {
> > +        return
> > +    }
>
> So this is hacky, you can refer how vhost-user validate udp socket char
> backend.
>
> > +
> > +    b->chr_can_read = fd_can_read
> > +    b->chr_read = fd_read
> > +    b->chr_event = fd_event
> > +    b->opaque = opaque
> > +    remove_fd_in_watch(b->chr)
> > +
> > +    if (b->chr_read) {
> > +        qio_channel_set_aio_fd_handler(s->ioc, ctx,
> > +                                compare_chr_read, NULL, b->chr)
> > +    } else {
> > +        qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL, NULL)
>
> So instead of doing such hack, how about passing a AioContext * instead
> of GMainContext * to qemu_chr_fe_set_handlers?
>
> Thanks
>
> > +    }
> > +}
> > +
> >   static char *compare_get_pri_indev(Object *obj, Error **errp)
> >   {
> >       CompareState *s = COLO_COMPARE(obj)
> > @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
> >   {
> >       CompareState *s = COLO_COMPARE(uc)
> >       Chardev *chr
> > -    char thread_name[64]
> > -    static int compare_id
> >
> >       if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
> >           error_setg(errp, "colo compare needs 'primary_in' ,"
> > @@ -776,12 +831,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
> >                                                         g_free,
> >                                                         connection_destroy)
> >
> > -    sprintf(thread_name, "colo-compare %d", compare_id)
> > -    qemu_thread_create(&s->thread, thread_name,
> > -                       colo_compare_thread, s,
> > -                       QEMU_THREAD_JOINABLE)
> > -    compare_id++
> > -
> >       colo_compare_iothread(s)
> >
> >       return
> > @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj)
> >   {
> >       CompareState *s = COLO_COMPARE(obj)
> >
> > -    qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
> > -                             s->worker_context, true)
> > -    qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
> > -                             s->worker_context, true)
> > +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> > +                                    NULL, NULL, NULL, NULL)
> > +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> > +                                    NULL, NULL, NULL, NULL)
> > +
> >       qemu_chr_fe_deinit(&s->chr_out)
> >       colo_compare_timer_del(s)
> >
> > -    g_main_loop_quit(s->compare_loop)
> > -    qemu_thread_join(&s->thread)
> > -
> >       /* Release all unhandled packets after compare thead exited */
> >       g_queue_foreach(&s->conn_list, colo_flush_packets, s)
> >
> > diff --git a/net/colo.h b/net/colo.h
> > index 7c524f3..936dea1 100644
> > --- a/net/colo.h
> > +++ b/net/colo.h
> > @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable *connection_track_table,
> >   void connection_hashtable_reset(GHashTable *connection_track_table)
> >   Packet *packet_new(const void *data, int size)
> >   void packet_destroy(void *opaque, void *user_data)
> > +void remove_fd_in_watch(Chardev *chr)
> >
> >   #endif /* QEMU_COLO_PROXY_H */
>
>
>

             reply	other threads:[~2017-06-13  0:49 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-06-13  0:48 wang.yong155 [this message]
  -- strict thread matches above, loose matches on Subject: below --
2017-06-13 11:24 [Qemu-devel] 答复: Re: 答复: Re: [PATCHv2 02/04] colo-compare: Process pactkets in the IOThread ofthe primary wang.yong155
2017-06-15  4:23 ` Jason Wang
2017-06-16  9:02   ` Stefan Hajnoczi
2017-06-16  9:20   ` Stefan Hajnoczi

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=201706130848249945419@zte.com.cn \
    --to=wang.yong155@zte.com.cn \
    --cc=jasowang@redhat.com \
    --cc=liu.dayu@zte.com.cn \
    --cc=lizhijian@cn.fujitsu.com \
    --cc=qemu-devel@nongnu.org \
    --cc=wang.guang55@zte.com.cn \
    --cc=yang.bin18@zte.com.cn \
    --cc=zhang.zhanghailiang@huawei.com \
    --cc=zhangchen.fnst@cn.fujitsu.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.