From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:35905) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1VEVjS-0007y1-EV for qemu-devel@nongnu.org; Tue, 27 Aug 2013 22:43:51 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1VEVjJ-00066r-Fh for qemu-devel@nongnu.org; Tue, 27 Aug 2013 22:43:42 -0400 Received: from e23smtp05.au.ibm.com ([202.81.31.147]:46161) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1VEViu-0005IH-0n for qemu-devel@nongnu.org; Tue, 27 Aug 2013 22:43:33 -0400 Received: from /spool/local by e23smtp05.au.ibm.com with IBM ESMTP SMTP Gateway: Authorized Use Only! Violators will be prosecuted for from ; Wed, 28 Aug 2013 12:35:35 +1000 Received: from d23relay04.au.ibm.com (d23relay04.au.ibm.com [9.190.234.120]) by d23dlp01.au.ibm.com (Postfix) with ESMTP id 9B30B2CE8054 for ; Wed, 28 Aug 2013 12:42:48 +1000 (EST) Received: from d23av01.au.ibm.com (d23av01.au.ibm.com [9.190.234.96]) by d23relay04.au.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id r7S2QgJu57540728 for ; Wed, 28 Aug 2013 12:26:43 +1000 Received: from d23av01.au.ibm.com (localhost [127.0.0.1]) by d23av01.au.ibm.com (8.14.4/8.14.4/NCO v10.0 AVout) with ESMTP id r7S2glbh009260 for ; Wed, 28 Aug 2013 12:42:47 +1000 Message-ID: <521D6366.6090903@linux.vnet.ibm.com> Date: Wed, 28 Aug 2013 10:41:42 +0800 From: Wenchao Xia MIME-Version: 1.0 References: <1377614385-20466-1-git-send-email-stefanha@redhat.com> <521CC1D7.6040903@redhat.com> In-Reply-To: <521CC1D7.6040903@redhat.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit Subject: Re: [Qemu-devel] [RFC] aio: add aio_context_acquire() and aio_context_release() List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Paolo Bonzini Cc: Kevin Wolf , qemu-devel@nongnu.org, Stefan Hajnoczi 于 2013-8-27 23:12, Paolo Bonzini 写道: > Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto: >> It can be useful to run an AioContext from a thread which normally does >> not "own" the AioContext. For example, request draining can be >> implemented by acquiring the AioContext and looping aio_poll() until all >> requests have been completed. >> >> The following pattern should work: >> >> /* Event loop thread */ >> while (running) { >> aio_context_acquire(ctx); >> aio_poll(ctx, true); >> aio_context_release(ctx); >> } >> >> /* Another thread */ >> aio_context_acquire(ctx); >> bdrv_read(bs, 0x1000, buf, 1); >> aio_context_release(ctx); >> >> This patch implements aio_context_acquire() and aio_context_release(). >> Note that existing aio_poll() callers do not need to worry about >> acquiring and releasing - it is only needed when multiple threads will >> call aio_poll() on the same AioContext. >> >> Signed-off-by: Stefan Hajnoczi > > Really, really nice! The "kick owner thread" technique is a very > interesting way to avoid dropping the lock around aio_poll's ppoll > system call. > > On top of this, I think it would be useful to make aio_context_acquire > support recursive acquisition by returning a bool if the current thread > is already the owner. Recursive acquisition != recursive locking! :) > In fact, acquisition and releasing could be done directly by the > synchronous block I/O functions perhaps? > > One comment: ctx->owner is really "ctx->owned", if you replace the > argument of qemu_thread_is_self(ctx->owner) with &ctx->owner_thread. It > is probably a bit clearer that way. > That will also avoid to have two QemuThread member, which may be a little strange. > Paolo > > >> --- >> I previously sent patches that implement bdrv_drain_all() by stopping dataplane >> threads. AioContext acquire()/release() is a more general solution than >> temporarily stopping dataplane threads. This solution is less hacky and also >> supported by other event loops like GMainContext. >> >> No need to commit this patch yet, I still want to build things on top of it >> before submitting a final version. >> >> async.c | 27 +++++++++++++++++++++++++ >> include/block/aio.h | 13 ++++++++++++ >> tests/test-aio.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++ >> 3 files changed, 98 insertions(+) >> >> diff --git a/include/block/aio.h b/include/block/aio.h >> index 5743bf1..9035e87 100644 >> --- a/include/block/aio.h >> +++ b/include/block/aio.h >> @@ -45,6 +45,11 @@ typedef void IOHandler(void *opaque); >> typedef struct AioContext { >> GSource source; >> >> + QemuMutex acquire_lock; >> + QemuCond acquire_cond; >> + QemuThread owner_thread; >> + QemuThread *owner; >> + >> /* The list of registered AIO handlers */ >> QLIST_HEAD(, AioHandler) aio_handlers; >> >> @@ -99,6 +104,14 @@ void aio_context_ref(AioContext *ctx); >> */ >> void aio_context_unref(AioContext *ctx); >> >> +/* Take ownership of the AioContext. If the AioContext will be shared between >> + * threads, a thread must have ownership when calling aio_poll(). >> + */ >> +void aio_context_acquire(AioContext *ctx); >> + >> +/* Reliquinish ownership of the AioContext. */ >> +void aio_context_release(AioContext *ctx); >> + >> /** >> * aio_bh_new: Allocate a new bottom half structure. >> * >> diff --git a/async.c b/async.c >> index 9791d8e..9fec07c 100644 >> --- a/async.c >> +++ b/async.c >> @@ -203,6 +203,8 @@ aio_ctx_finalize(GSource *source) >> thread_pool_free(ctx->thread_pool); >> aio_set_event_notifier(ctx, &ctx->notifier, NULL); >> event_notifier_cleanup(&ctx->notifier); >> + qemu_cond_destroy(&ctx->acquire_cond); >> + qemu_mutex_destroy(&ctx->acquire_lock); >> qemu_mutex_destroy(&ctx->bh_lock); >> g_array_free(ctx->pollfds, TRUE); >> } >> @@ -240,6 +242,9 @@ AioContext *aio_context_new(void) >> ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); >> ctx->thread_pool = NULL; >> qemu_mutex_init(&ctx->bh_lock); >> + qemu_mutex_init(&ctx->acquire_lock); >> + qemu_cond_init(&ctx->acquire_cond); >> + ctx->owner = NULL; >> event_notifier_init(&ctx->notifier, false); >> aio_set_event_notifier(ctx, &ctx->notifier, >> (EventNotifierHandler *) >> @@ -257,3 +262,25 @@ void aio_context_unref(AioContext *ctx) >> { >> g_source_unref(&ctx->source); >> } >> + >> +void aio_context_acquire(AioContext *ctx) >> +{ >> + qemu_mutex_lock(&ctx->acquire_lock); >> + while (ctx->owner) { >> + assert(!qemu_thread_is_self(ctx->owner)); >> + aio_notify(ctx); /* kick current owner */ >> + qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock); >> + } >> + qemu_thread_get_self(&ctx->owner_thread); >> + ctx->owner = &ctx->owner_thread; >> + qemu_mutex_unlock(&ctx->acquire_lock); >> +} >> + >> +void aio_context_release(AioContext *ctx) >> +{ >> + qemu_mutex_lock(&ctx->acquire_lock); >> + assert(ctx->owner && qemu_thread_is_self(ctx->owner)); >> + ctx->owner = NULL; >> + qemu_cond_signal(&ctx->acquire_cond); >> + qemu_mutex_unlock(&ctx->acquire_lock); >> +} >> diff --git a/tests/test-aio.c b/tests/test-aio.c >> index 1ab5637..324c099 100644 >> --- a/tests/test-aio.c >> +++ b/tests/test-aio.c >> @@ -88,6 +88,63 @@ static void test_notify(void) >> g_assert(!aio_poll(ctx, false)); >> } >> >> +typedef struct { >> + QemuMutex start_lock; >> + bool thread_acquired; >> +} AcquireTestData; >> + >> +static void *test_acquire_thread(void *opaque) >> +{ >> + AcquireTestData *data = opaque; >> + >> + /* Wait for other thread to let us start */ >> + qemu_mutex_lock(&data->start_lock); >> + qemu_mutex_unlock(&data->start_lock); >> + >> + aio_context_acquire(ctx); >> + aio_context_release(ctx); >> + >> + data->thread_acquired = true; /* success, we got here */ >> + >> + return NULL; >> +} >> + >> +static void dummy_notifier_read(EventNotifier *unused) >> +{ >> + g_assert(false); /* should never be invoked */ >> +} >> + >> +static void test_acquire(void) >> +{ >> + QemuThread thread; >> + EventNotifier notifier; >> + AcquireTestData data; >> + >> + /* Dummy event notifier ensures aio_poll() will block */ >> + event_notifier_init(¬ifier, false); >> + aio_set_event_notifier(ctx, ¬ifier, dummy_notifier_read); >> + g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */ >> + >> + qemu_mutex_init(&data.start_lock); >> + qemu_mutex_lock(&data.start_lock); >> + data.thread_acquired = false; >> + >> + qemu_thread_create(&thread, test_acquire_thread, >> + &data, QEMU_THREAD_JOINABLE); >> + >> + /* Block in aio_poll(), let other thread kick us and acquire context */ >> + aio_context_acquire(ctx); >> + qemu_mutex_unlock(&data.start_lock); /* let the thread run */ >> + g_assert(!aio_poll(ctx, true)); >> + aio_context_release(ctx); >> + >> + qemu_thread_join(&thread); >> + aio_set_event_notifier(ctx, ¬ifier, NULL); >> + event_notifier_cleanup(¬ifier); >> + >> + g_assert(data.thread_acquired); >> +} >> + >> static void test_bh_schedule(void) >> { >> BHTestData data = { .n = 0 }; >> @@ -639,6 +696,7 @@ int main(int argc, char **argv) >> >> g_test_init(&argc, &argv, NULL); >> g_test_add_func("/aio/notify", test_notify); >> + g_test_add_func("/aio/acquire", test_acquire); >> g_test_add_func("/aio/bh/schedule", test_bh_schedule); >> g_test_add_func("/aio/bh/schedule10", test_bh_schedule10); >> g_test_add_func("/aio/bh/cancel", test_bh_cancel); >> > > -- Best Regards Wenchao Xia