From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:35864) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1bwlru-0003AE-LY for qemu-devel@nongnu.org; Wed, 19 Oct 2016 04:05:00 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1bwlrr-0004hp-DB for qemu-devel@nongnu.org; Wed, 19 Oct 2016 04:04:58 -0400 Received: from mx1.redhat.com ([209.132.183.28]:44288) by eggs.gnu.org with esmtps (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32) (Exim 4.71) (envelope-from ) id 1bwlrr-0004h2-4y for qemu-devel@nongnu.org; Wed, 19 Oct 2016 04:04:55 -0400 Received: from int-mx10.intmail.prod.int.phx2.redhat.com (int-mx10.intmail.prod.int.phx2.redhat.com [10.5.11.23]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mx1.redhat.com (Postfix) with ESMTPS id CB15465403 for ; Wed, 19 Oct 2016 08:04:52 +0000 (UTC) From: Fam Zheng Date: Wed, 19 Oct 2016 16:04:50 +0800 Message-Id: <1476864290-6129-1-git-send-email-famz@redhat.com> Subject: [Qemu-devel] [PATCH RFC] iothread: Add "spawns" property List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: pbonzini@redhat.com The option specifies how many threads to spawn under the iothread object. All threads share the same AioContext so they can safely run (contend) together. With AioContext going away, the spawns will natually enable the block multi-queue work. Signed-off-by: Fam Zheng --- Based on v2 of Paolo's RFifoLock removal series, with which the symmetric contention on the single AioContext is no longer a busy preempt loop. --- include/sysemu/iothread.h | 19 ++++-- iothread.c | 148 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 125 insertions(+), 42 deletions(-) diff --git a/include/sysemu/iothread.h b/include/sysemu/iothread.h index 68ac2de..b50e76c 100644 --- a/include/sysemu/iothread.h +++ b/include/sysemu/iothread.h @@ -19,16 +19,25 @@ #define TYPE_IOTHREAD "iothread" -typedef struct { - Object parent_obj; +typedef struct IOThread IOThread; +typedef struct { QemuThread thread; - AioContext *ctx; + int thread_id; + IOThread *iothread; QemuMutex init_done_lock; QemuCond init_done_cond; /* is thread initialization done? */ + bool running; +} IOThreadSpawn; + +struct IOThread { + Object parent_obj; + + uint32_t nspawns; + IOThreadSpawn *spawns; + AioContext *ctx; bool stopping; - int thread_id; -} IOThread; +}; #define IOTHREAD(obj) \ OBJECT_CHECK(IOThread, obj, TYPE_IOTHREAD) diff --git a/iothread.c b/iothread.c index bd70344..bad00fb 100644 --- a/iothread.c +++ b/iothread.c @@ -39,49 +39,66 @@ AioContext *qemu_get_current_aio_context(void) static void *iothread_run(void *opaque) { - IOThread *iothread = opaque; + IOThreadSpawn *s = opaque; + + s->running = true; rcu_register_thread(); - my_iothread = iothread; - qemu_mutex_lock(&iothread->init_done_lock); - iothread->thread_id = qemu_get_thread_id(); - qemu_cond_signal(&iothread->init_done_cond); - qemu_mutex_unlock(&iothread->init_done_lock); + my_iothread = s->iothread; + qemu_mutex_lock(&s->init_done_lock); + s->thread_id = qemu_get_thread_id(); + qemu_cond_signal(&s->init_done_cond); + qemu_mutex_unlock(&s->init_done_lock); - while (!atomic_read(&iothread->stopping)) { - aio_poll(iothread->ctx, true); + while (!atomic_read(&s->iothread->stopping)) { + aio_poll(s->iothread->ctx, true); } rcu_unregister_thread(); + s->running = false; return NULL; } static int iothread_stop(Object *object, void *opaque) { + int i; IOThread *iothread; + bool has_running = true; iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD); if (!iothread || !iothread->ctx) { return 0; } iothread->stopping = true; - aio_notify(iothread->ctx); - qemu_thread_join(&iothread->thread); + while (has_running) { + has_running = false; + aio_notify(iothread->ctx); + for (i = 0; i < iothread->nspawns; ++i) { + has_running |= iothread->spawns[i].running; + } + } + for (i = 0; i < iothread->nspawns; i++) { + qemu_thread_join(&iothread->spawns[i].thread); + } return 0; } static void iothread_instance_finalize(Object *obj) { + int i; IOThread *iothread = IOTHREAD(obj); iothread_stop(obj, NULL); - qemu_cond_destroy(&iothread->init_done_cond); - qemu_mutex_destroy(&iothread->init_done_lock); - if (!iothread->ctx) { - return; + for (i = 0; i < iothread->nspawns; ++i) { + IOThreadSpawn *s = &iothread->spawns[i]; + qemu_cond_destroy(&s->init_done_cond); + qemu_mutex_destroy(&s->init_done_lock); + if (iothread->ctx) { + aio_context_unref(s->iothread->ctx); + } } - aio_context_unref(iothread->ctx); + g_free(iothread->spawns); } static void iothread_complete(UserCreatable *obj, Error **errp) @@ -89,35 +106,81 @@ static void iothread_complete(UserCreatable *obj, Error **errp) Error *local_error = NULL; IOThread *iothread = IOTHREAD(obj); char *name, *thread_name; + int i; iothread->stopping = false; - iothread->thread_id = -1; iothread->ctx = aio_context_new(&local_error); if (!iothread->ctx) { error_propagate(errp, local_error); return; } - - qemu_mutex_init(&iothread->init_done_lock); - qemu_cond_init(&iothread->init_done_cond); + if (!iothread->nspawns) { + iothread->nspawns = 1; + } + iothread->spawns = g_new0(IOThreadSpawn, iothread->nspawns); /* This assumes we are called from a thread with useful CPU affinity for us * to inherit. */ name = object_get_canonical_path_component(OBJECT(obj)); - thread_name = g_strdup_printf("IO %s", name); - qemu_thread_create(&iothread->thread, thread_name, iothread_run, - iothread, QEMU_THREAD_JOINABLE); - g_free(thread_name); + for (i = 0; i < iothread->nspawns; ++i) { + IOThreadSpawn *s = &iothread->spawns[i]; + s->thread_id = -1; + s->iothread = iothread; + qemu_mutex_init(&s->init_done_lock); + qemu_cond_init(&s->init_done_cond); + thread_name = g_strdup_printf("IO %s[%d]", name, i); + qemu_thread_create(&s->thread, thread_name, iothread_run, + s, QEMU_THREAD_JOINABLE); + g_free(thread_name); + } g_free(name); - /* Wait for initialization to complete */ - qemu_mutex_lock(&iothread->init_done_lock); - while (iothread->thread_id == -1) { - qemu_cond_wait(&iothread->init_done_cond, - &iothread->init_done_lock); + for (i = 0; i < iothread->nspawns; ++i) { + IOThreadSpawn *s = &iothread->spawns[i]; + /* Wait for initialization to complete */ + qemu_mutex_lock(&s->init_done_lock); + while (s->thread_id == -1) { + qemu_cond_wait(&s->init_done_cond, + &s->init_done_lock); + } + qemu_mutex_unlock(&s->init_done_lock); } - qemu_mutex_unlock(&iothread->init_done_lock); +} + +static void iothread_set_spawns(Object *obj, Visitor *v, + const char *name, void *opaque, + Error **errp) +{ + IOThread *iothread = IOTHREAD(obj); + + if (iothread->nspawns) { + error_setg(errp, + "Modifying iothread spawns is not supported"); + return; + } + visit_type_uint32(v, name, &iothread->nspawns, errp); + if (!iothread->nspawns) { + error_setg(errp, "Invalid iothread spawn number: %u", + iothread->nspawns); + } +} + +static void iothread_get_spawns(Object *obj, Visitor *v, + const char *name, void *opaque, + Error **errp) +{ + IOThread *iothread = IOTHREAD(obj); + visit_type_uint32(v, name, &iothread->nspawns, errp); +} + + +static void iothread_instance_init(Object *obj) +{ + object_property_add(obj, "spawns", "uint32", + iothread_get_spawns, + iothread_set_spawns, + NULL, NULL, &error_abort); } static void iothread_class_init(ObjectClass *klass, void *class_data) @@ -131,6 +194,7 @@ static const TypeInfo iothread_info = { .parent = TYPE_OBJECT, .class_init = iothread_class_init, .instance_size = sizeof(IOThread), + .instance_init = iothread_instance_init, .instance_finalize = iothread_instance_finalize, .interfaces = (InterfaceInfo[]) { {TYPE_USER_CREATABLE}, @@ -161,22 +225,32 @@ static int query_one_iothread(Object *object, void *opaque) IOThreadInfoList *elem; IOThreadInfo *info; IOThread *iothread; + char *id; + int i; iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD); if (!iothread) { return 0; } - info = g_new0(IOThreadInfo, 1); - info->id = iothread_get_id(iothread); - info->thread_id = iothread->thread_id; + id = iothread_get_id(iothread); + for (i = 0; i < iothread->nspawns; ++i) { + info = g_new0(IOThreadInfo, 1); + info->thread_id = iothread->spawns[i].thread_id; + if (iothread->nspawns > 1) { + info->id = g_strdup_printf("%s[%d]", id, i); + } else { + info->id = g_strdup(id); + } - elem = g_new0(IOThreadInfoList, 1); - elem->value = info; - elem->next = NULL; + elem = g_new0(IOThreadInfoList, 1); + elem->value = info; + elem->next = NULL; - **prev = elem; - *prev = &elem->next; + **prev = elem; + *prev = &elem->next; + } + g_free(id); return 0; } -- 2.7.4