qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO
@ 2012-07-16 10:42 Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 01/12] event_notifier: enable it to use pipes Paolo Bonzini
                   ` (11 more replies)
  0 siblings, 12 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

This patch series is part 2 in my EventNotifier/AIO improvements
for QEMU 1.2.  It extends use of EventNotifier to the main loop
and AIO subsystems.  A new API using EventNotifier is added to aio.c
and a new portable thread pool is introduced (based on code from
posix-aio-compat.c, mostly) that uses this API.  raw-posix.c is
converted to use the new thread pool, and support for asynchronous
I/O is finally added to Win32 as well.

The network drivers (curl, libiscsi, nbd) have to be disabled
under Windows.

I tested this under Wine, with a RHEL virtual machine booting just as
glacially as before.  However, "info blockstats" does show a slightly
higher overhead, so I would like this to be tested on real Windows hosts.
However, even if the result is negative, I would prefer to keep the early
parts (i.e. drop only the last patch) since they are a prerequisite for
more improvements to block/raw-posix.c (such as asynchronous discard
support).


Paolo Bonzini (12):
  event_notifier: enable it to use pipes
  event_notifier: add Win32 implementation
  main-loop: use event notifiers
  aio: provide platform-independent API
  aio: add Win32 implementation
  linux-aio: use event notifiers
  qemu-thread: add QemuSemaphore
  aio: add generic thread-pool facility
  block: switch posix-aio-compat to threadpool
  raw: merge posix-aio-compat.c into block/raw-posix.c
  raw-posix: rename raw-posix-aio.h, hide unavailable prototypes
  raw-win32: add emulated AIO support

 Makefile.objs                        |   13 +-
 aio.c => aio-posix.c                 |    9 +
 aio-win32.c                          |  177 +++++++++
 block/Makefile.objs                  |    6 +-
 block/{raw-posix-aio.h => raw-aio.h} |   19 +-
 block/raw-posix.c                    |  301 ++++++++++++++-
 block/raw-win32.c                    |  189 +++++++---
 event_notifier-posix.c               |  118 ++++++
 event_notifier-win32.c               |   59 +++
 event_notifier.c                     |   67 ----
 event_notifier.h                     |   20 +-
 linux-aio.c                          |   51 +--
 main-loop.c                          |  106 +-----
 oslib-posix.c                        |   31 --
 posix-aio-compat.c                   |  681 ----------------------------------
 qemu-aio.h                           |   19 +-
 qemu-common.h                        |    1 -
 qemu-thread-posix.c                  |   74 ++++
 qemu-thread-posix.h                  |    5 +
 qemu-thread-win32.c                  |   35 ++
 qemu-thread-win32.h                  |    4 +
 qemu-thread.h                        |    7 +
 thread-pool.c                        |  279 ++++++++++++++
 thread-pool.h                        |   34 ++
 trace-events                         |    5 +
 25 files changed, 1329 insertions(+), 981 deletions(-)
 rename aio.c => aio-posix.c (92%)
 create mode 100644 aio-win32.c
 rename block/{raw-posix-aio.h => raw-aio.h} (62%)
 create mode 100644 event_notifier-posix.c
 create mode 100644 event_notifier-win32.c
 delete mode 100644 event_notifier.c
 delete mode 100644 posix-aio-compat.c
 create mode 100644 thread-pool.c
 create mode 100644 thread-pool.h

-- 
1.7.10.4

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 01/12] event_notifier: enable it to use pipes
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-19 18:58   ` Anthony Liguori
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 02/12] event_notifier: add Win32 implementation Paolo Bonzini
                   ` (10 subsequent siblings)
  11 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

This takes the eventfd emulation code from the main loop and adds it
to EventNotifier.  When the EventNotifier is used for the main loop too,
we need this compatibility code.

Without CONFIG_EVENTFD, event_notifier_get_fd is only usable for the
"read" side of the notifier, for example to set a select() handler.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 event_notifier.c |   83 +++++++++++++++++++++++++++++++++++++++++++-----------
 event_notifier.h |    3 +-
 2 files changed, 69 insertions(+), 17 deletions(-)

diff --git a/event_notifier.c b/event_notifier.c
index 2c207e1..dde2d32 100644
--- a/event_notifier.c
+++ b/event_notifier.c
@@ -20,48 +20,99 @@
 
 void event_notifier_init_fd(EventNotifier *e, int fd)
 {
-    e->fd = fd;
+    e->rfd = fd;
+    e->wfd = fd;
 }
 
 int event_notifier_init(EventNotifier *e, int active)
 {
+    int fds[2];
+    int ret;
+
 #ifdef CONFIG_EVENTFD
-    int fd = eventfd(!!active, EFD_NONBLOCK | EFD_CLOEXEC);
-    if (fd < 0)
-        return -errno;
-    e->fd = fd;
-    return 0;
+    ret = eventfd(0, O_NONBLOCK);
 #else
-    return -ENOSYS;
+    ret = -1;
+    errno = ENOSYS;
 #endif
+    if (ret >= 0) {
+        e->rfd = e->wfd = ret;
+        qemu_set_cloexec(ret);
+    } else {
+        if (errno != ENOSYS) {
+            return -errno;
+        }
+        if (qemu_pipe(fds) < 0) {
+            return -errno;
+        }
+        ret = fcntl_setfl(fds[0], O_NONBLOCK);
+        if (ret < 0) {
+            goto fail;
+        }
+        ret = fcntl_setfl(fds[1], O_NONBLOCK);
+        if (ret < 0) {
+            goto fail;
+        }
+        e->rfd = fds[0];
+        e->wfd = fds[1];
+    }
+    if (active)
+        event_notifier_set(e);
+    return 0;
+
+fail:
+    close(fds[0]);
+    close(fds[1]);
+    return ret;
 }
 
 void event_notifier_cleanup(EventNotifier *e)
 {
-    close(e->fd);
+    if (e->rfd != e->wfd) {
+        close(e->rfd);
+    }
+    close(e->wfd);
 }
 
 int event_notifier_get_fd(EventNotifier *e)
 {
-    return e->fd;
+    return e->rfd;
 }
 
 int event_notifier_set_handler(EventNotifier *e,
                                EventNotifierHandler *handler)
 {
-    return qemu_set_fd_handler(e->fd, (IOHandler *)handler, NULL, e);
+    return qemu_set_fd_handler(e->rfd, (IOHandler *)handler, NULL, e);
 }
 
 int event_notifier_set(EventNotifier *e)
 {
-    uint64_t value = 1;
-    int r = write(e->fd, &value, sizeof(value));
-    return r == sizeof(value);
+    static const uint64_t value = 1;
+    ssize_t ret;
+
+    do {
+        ret = write(e->wfd, &value, sizeof(value));
+    } while (ret < 0 && errno == EINTR);
+
+    /* EAGAIN is fine, a read must be pending.  */
+    if (ret < 0 && errno != EAGAIN) {
+        return -1;
+    }
+    return 0;
 }
 
 int event_notifier_test_and_clear(EventNotifier *e)
 {
-    uint64_t value;
-    int r = read(e->fd, &value, sizeof(value));
-    return r == sizeof(value);
+    int value;
+    ssize_t len;
+    char buffer[512];
+
+    /* Drain the notify pipe.  For eventfd, only 8 bytes will be read.  */
+    value = 0;
+    do {
+        len = read(e->rfd, buffer, sizeof(buffer));
+        value |= (len > 0);
+    } while ((len == -1 && errno == EINTR) || len == sizeof(buffer));
+
+    return value;
 }
diff --git a/event_notifier.h b/event_notifier.h
index f0ec2f2..f04d12d 100644
--- a/event_notifier.h
+++ b/event_notifier.h
@@ -16,7 +16,8 @@
 #include "qemu-common.h"
 
 struct EventNotifier {
-    int fd;
+    int rfd;
+    int wfd;
 };
 
 typedef void EventNotifierHandler(EventNotifier *);
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 02/12] event_notifier: add Win32 implementation
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 01/12] event_notifier: enable it to use pipes Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 03/12] main-loop: use event notifiers Paolo Bonzini
                   ` (9 subsequent siblings)
  11 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

The Win32 implementation of EventNotifier is a trivial wrapper
around manual-reset events.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 Makefile.objs                              |    4 +-
 event_notifier.c => event_notifier-posix.c |    0
 event_notifier-win32.c                     |   59 ++++++++++++++++++++++++++++
 event_notifier.h                           |   17 +++++++-
 4 files changed, 77 insertions(+), 3 deletions(-)
 rename event_notifier.c => event_notifier-posix.c (100%)
 create mode 100644 event_notifier-win32.c

diff --git a/Makefile.objs b/Makefile.objs
index 625c4d5..ecdfaf9 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -45,6 +45,8 @@ block-obj-y = cutils.o cache-utils.o qemu-option.o module.o async.o
 block-obj-y += nbd.o block.o aio.o aes.o qemu-config.o qemu-progress.o qemu-sockets.o
 block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
+block-obj-$(CONFIG_POSIX) += event_notifier-posix.o
+block-obj-$(CONFIG_WIN32) += event_notifier-win32.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 block-obj-y += block/
 
@@ -90,7 +92,7 @@ common-obj-y += bt-host.o bt-vhci.o
 
 common-obj-y += iov.o acl.o
 common-obj-$(CONFIG_POSIX) += compatfd.o
-common-obj-y += notify.o event_notifier.o
+common-obj-y += notify.o
 common-obj-y += qemu-timer.o qemu-timer-common.o
 
 common-obj-$(CONFIG_SLIRP) += slirp/
diff --git a/event_notifier.c b/event_notifier-posix.c
similarity index 100%
rename from event_notifier.c
rename to event_notifier-posix.c
diff --git a/event_notifier-win32.c b/event_notifier-win32.c
new file mode 100644
index 0000000..c723dad
--- /dev/null
+++ b/event_notifier-win32.c
@@ -0,0 +1,59 @@
+/*
+ * event notifier support for Win32
+ *
+ * Copyright Red Hat, Inc. 2012
+ *
+ * Authors:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu-common.h"
+#include "event_notifier.h"
+#include "main-loop.h"
+
+int event_notifier_init(EventNotifier *e, int active)
+{
+    e->event = CreateEvent(NULL, FALSE, FALSE, NULL);
+    assert(e->event);
+    return 0;
+}
+
+void event_notifier_cleanup(EventNotifier *e)
+{
+    CloseHandle(e->event);
+}
+
+HANDLE event_notifier_get_handle(EventNotifier *e)
+{
+    return e->event;
+}
+
+int event_notifier_set_handler(EventNotifier *e,
+                               EventNotifierHandler *handler)
+{
+    if (handler) {
+        return qemu_add_wait_object(e->event, (IOHandler *)handler, e);
+    } else {
+        qemu_del_wait_object(e->event, (IOHandler *)handler, e);
+        return 0;
+    }
+}
+
+int event_notifier_set(EventNotifier *e)
+{
+    SetEvent(e->event);
+    return 0;
+}
+
+int event_notifier_test_and_clear(EventNotifier *e)
+{
+    int ret = WaitForSingleObject(e->event, 0);
+    if (ret == WAIT_OBJECT_0) {
+        ResetEvent(e->event);
+        return true;
+    }
+    return false;
+}
diff --git a/event_notifier.h b/event_notifier.h
index f04d12d..88b57af 100644
--- a/event_notifier.h
+++ b/event_notifier.h
@@ -15,19 +15,32 @@
 
 #include "qemu-common.h"
 
+#ifdef _WIN32
+#include <windows.h>
+#endif
+
 struct EventNotifier {
+#ifdef _WIN32
+    HANDLE event;
+#else
     int rfd;
     int wfd;
+#endif
 };
 
 typedef void EventNotifierHandler(EventNotifier *);
 
-void event_notifier_init_fd(EventNotifier *, int fd);
 int event_notifier_init(EventNotifier *, int active);
 void event_notifier_cleanup(EventNotifier *);
-int event_notifier_get_fd(EventNotifier *);
 int event_notifier_set(EventNotifier *);
 int event_notifier_test_and_clear(EventNotifier *);
 int event_notifier_set_handler(EventNotifier *, EventNotifierHandler *);
 
+#ifdef CONFIG_POSIX
+void event_notifier_init_fd(EventNotifier *, int fd);
+int event_notifier_get_fd(EventNotifier *);
+#else
+HANDLE event_notifier_get_handle(EventNotifier *);
+#endif
+
 #endif
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 03/12] main-loop: use event notifiers
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 01/12] event_notifier: enable it to use pipes Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 02/12] event_notifier: add Win32 implementation Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-19 19:04   ` Anthony Liguori
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 04/12] aio: provide platform-independent API Paolo Bonzini
                   ` (8 subsequent siblings)
  11 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 Makefile.objs |    4 +--
 main-loop.c   |  106 ++++++++-------------------------------------------------
 oslib-posix.c |   31 -----------------
 qemu-common.h |    1 -
 4 files changed, 17 insertions(+), 125 deletions(-)

diff --git a/Makefile.objs b/Makefile.objs
index ecdfaf9..6ed1981 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -20,8 +20,8 @@ universal-obj-y += $(qom-obj-y)
 #######################################################################
 # oslib-obj-y is code depending on the OS (win32 vs posix)
 oslib-obj-y = osdep.o
-oslib-obj-$(CONFIG_WIN32) += oslib-win32.o qemu-thread-win32.o
-oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o
+oslib-obj-$(CONFIG_WIN32) += oslib-win32.o qemu-thread-win32.o event_notifier-win32.o
+oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o event_notifier-posix.o
 
 #######################################################################
 # coroutines
diff --git a/main-loop.c b/main-loop.c
index eb3b6e6..81f49b3 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -26,75 +26,12 @@
 #include "qemu-timer.h"
 #include "slirp/slirp.h"
 #include "main-loop.h"
+#include "event_notifier.h"
 
 #ifndef _WIN32
 
 #include "compatfd.h"
 
-static int io_thread_fd = -1;
-
-void qemu_notify_event(void)
-{
-    /* Write 8 bytes to be compatible with eventfd.  */
-    static const uint64_t val = 1;
-    ssize_t ret;
-
-    if (io_thread_fd == -1) {
-        return;
-    }
-    do {
-        ret = write(io_thread_fd, &val, sizeof(val));
-    } while (ret < 0 && errno == EINTR);
-
-    /* EAGAIN is fine, a read must be pending.  */
-    if (ret < 0 && errno != EAGAIN) {
-        fprintf(stderr, "qemu_notify_event: write() failed: %s\n",
-                strerror(errno));
-        exit(1);
-    }
-}
-
-static void qemu_event_read(void *opaque)
-{
-    int fd = (intptr_t)opaque;
-    ssize_t len;
-    char buffer[512];
-
-    /* Drain the notify pipe.  For eventfd, only 8 bytes will be read.  */
-    do {
-        len = read(fd, buffer, sizeof(buffer));
-    } while ((len == -1 && errno == EINTR) || len == sizeof(buffer));
-}
-
-static int qemu_event_init(void)
-{
-    int err;
-    int fds[2];
-
-    err = qemu_eventfd(fds);
-    if (err == -1) {
-        return -errno;
-    }
-    err = fcntl_setfl(fds[0], O_NONBLOCK);
-    if (err < 0) {
-        goto fail;
-    }
-    err = fcntl_setfl(fds[1], O_NONBLOCK);
-    if (err < 0) {
-        goto fail;
-    }
-    qemu_set_fd_handler2(fds[0], NULL, qemu_event_read, NULL,
-                         (void *)(intptr_t)fds[0]);
-
-    io_thread_fd = fds[1];
-    return 0;
-
-fail:
-    close(fds[0]);
-    close(fds[1]);
-    return err;
-}
-
 /* If we have signalfd, we mask out the signals we want to handle and then
  * use signalfd to listen for them.  We rely on whatever the current signal
  * handler is to dispatch the signals when we receive them.
@@ -164,40 +101,22 @@ static int qemu_signal_init(void)
 
 #else /* _WIN32 */
 
-static HANDLE qemu_event_handle = NULL;
-
-static void dummy_event_handler(void *opaque)
-{
-}
-
-static int qemu_event_init(void)
+static int qemu_signal_init(void)
 {
-    qemu_event_handle = CreateEvent(NULL, FALSE, FALSE, NULL);
-    if (!qemu_event_handle) {
-        fprintf(stderr, "Failed CreateEvent: %ld\n", GetLastError());
-        return -1;
-    }
-    qemu_add_wait_object(qemu_event_handle, dummy_event_handler, NULL);
     return 0;
 }
+#endif
+
+static EventNotifier io_thread_notifier;
+static int io_thread_initialized;
 
 void qemu_notify_event(void)
 {
-    if (!qemu_event_handle) {
+    if (!io_thread_initialized) {
         return;
     }
-    if (!SetEvent(qemu_event_handle)) {
-        fprintf(stderr, "qemu_notify_event: SetEvent failed: %ld\n",
-                GetLastError());
-        exit(1);
-    }
-}
-
-static int qemu_signal_init(void)
-{
-    return 0;
+    event_notifier_set(&io_thread_notifier);
 }
-#endif
 
 int main_loop_init(void)
 {
@@ -210,11 +129,15 @@ int main_loop_init(void)
     }
 
     /* Note eventfd must be drained before signalfd handlers run */
-    ret = qemu_event_init();
+    ret = event_notifier_init(&io_thread_notifier, 0);
     if (ret) {
         return ret;
     }
 
+    io_thread_initialized = true;
+    event_notifier_set_handler(&io_thread_notifier,
+                               (EventNotifierHandler *)
+                               event_notifier_test_and_clear);
     return 0;
 }
 
@@ -400,7 +323,8 @@ void qemu_del_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque)
 
 void qemu_fd_register(int fd)
 {
-    WSAEventSelect(fd, qemu_event_handle, FD_READ | FD_ACCEPT | FD_CLOSE |
+    WSAEventSelect(fd, event_notifier_get_handle(&io_thread_notifier),
+                   FD_READ | FD_ACCEPT | FD_CLOSE |
                    FD_CONNECT | FD_WRITE | FD_OOB);
 }
 
diff --git a/oslib-posix.c b/oslib-posix.c
index 6b7ba64..2c6b044 100644
--- a/oslib-posix.c
+++ b/oslib-posix.c
@@ -58,9 +58,6 @@ static int running_on_valgrind = -1;
 #ifdef CONFIG_LINUX
 #include <sys/syscall.h>
 #endif
-#ifdef CONFIG_EVENTFD
-#include <sys/eventfd.h>
-#endif
 
 int qemu_get_thread_id(void)
 {
@@ -180,34 +177,6 @@ int qemu_pipe(int pipefd[2])
     return ret;
 }
 
-/*
- * Creates an eventfd that looks like a pipe and has EFD_CLOEXEC set.
- */
-int qemu_eventfd(int fds[2])
-{
-#ifdef CONFIG_EVENTFD
-    int ret;
-
-    ret = eventfd(0, 0);
-    if (ret >= 0) {
-        fds[0] = ret;
-        fds[1] = dup(ret);
-        if (fds[1] == -1) {
-            close(ret);
-            return -1;
-        }
-        qemu_set_cloexec(ret);
-        qemu_set_cloexec(fds[1]);
-        return 0;
-    }
-    if (errno != ENOSYS) {
-        return -1;
-    }
-#endif
-
-    return qemu_pipe(fds);
-}
-
 int qemu_utimens(const char *path, const struct timespec *times)
 {
     struct timeval tv[2], tv_now;
diff --git a/qemu-common.h b/qemu-common.h
index 9d9e603..036d254 100644
--- a/qemu-common.h
+++ b/qemu-common.h
@@ -195,7 +195,6 @@ ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags)
     QEMU_WARN_UNUSED_RESULT;
 
 #ifndef _WIN32
-int qemu_eventfd(int pipefd[2]);
 int qemu_pipe(int pipefd[2]);
 #endif
 
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 04/12] aio: provide platform-independent API
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (2 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 03/12] main-loop: use event notifiers Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 05/12] aio: add Win32 implementation Paolo Bonzini
                   ` (7 subsequent siblings)
  11 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

This adds to aio.c a platform-independent API based on EventNotifiers, that
can be used by the portable thread pool.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio.c      |    9 +++++++++
 qemu-aio.h |   19 ++++++++++++++++++-
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/aio.c b/aio.c
index 0a9eb10..417ceb4 100644
--- a/aio.c
+++ b/aio.c
@@ -97,6 +97,15 @@ int qemu_aio_set_fd_handler(int fd,
     return 0;
 }
 
+void qemu_aio_set_event_notifier(EventNotifier *notifier,
+                                 EventNotifierHandler *io_read,
+                                 AioFlushEventNotifierHandler *io_flush)
+{
+    qemu_aio_set_fd_handler(event_notifier_get_fd(notifier),
+                            (IOHandler *)io_read, NULL,
+                            (AioFlushHandler *)io_flush, notifier);
+}
+
 void qemu_aio_flush(void)
 {
     while (qemu_aio_wait());
diff --git a/qemu-aio.h b/qemu-aio.h
index bfdd35f..89c766c 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -16,6 +16,7 @@
 
 #include "qemu-common.h"
 #include "qemu-char.h"
+#include "event_notifier.h"
 
 typedef struct BlockDriverAIOCB BlockDriverAIOCB;
 typedef void BlockDriverCompletionFunc(void *opaque, int ret);
@@ -39,7 +40,7 @@ void *qemu_aio_get(AIOPool *pool, BlockDriverState *bs,
 void qemu_aio_release(void *p);
 
 /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
-typedef int (AioFlushHandler)(void *opaque);
+typedef int (AioFlushEventNotifierHandler)(EventNotifier *e);
 
 /* Flush any pending AIO operation. This function will block until all
  * outstanding AIO operations have been completed or cancelled. */
@@ -53,6 +54,10 @@ void qemu_aio_flush(void);
  * Return whether there is still any pending AIO operation.  */
 bool qemu_aio_wait(void);
 
+#ifdef CONFIG_POSIX
+/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
+typedef int (AioFlushHandler)(void *opaque);
+
 /* Register a file descriptor and associated callbacks.  Behaves very similarly
  * to qemu_set_fd_handler2.  Unlike qemu_set_fd_handler2, these callbacks will
  * be invoked when using either qemu_aio_wait() or qemu_aio_flush().
@@ -65,5 +70,17 @@ int qemu_aio_set_fd_handler(int fd,
                             IOHandler *io_write,
                             AioFlushHandler *io_flush,
                             void *opaque);
+#endif
+
+/* Register an event notifier and associated callbacks.  Behaves very similarly
+ * to event_notifier_set_handler.  Unlike event_notifier_set_handler, these callbacks
+ * will be invoked when using either qemu_aio_wait() or qemu_aio_flush().
+ *
+ * Code that invokes AIO completion functions should rely on this function
+ * instead of event_notifier_set_handler.
+ */
+void qemu_aio_set_event_notifier(EventNotifier *notifier,
+                                 EventNotifierHandler *io_read,
+                                 AioFlushEventNotifierHandler *io_flush);
 
 #endif
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 05/12] aio: add Win32 implementation
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (3 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 04/12] aio: provide platform-independent API Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 06/12] linux-aio: use event notifiers Paolo Bonzini
                   ` (6 subsequent siblings)
  11 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

The Win32 implementation only accepts EventNotifiers, thus a few
drivers are disabled under Windows.  It is possible to use the
same techniques in main-loop.c and reenable them; alternatively,
the drivers can be changed to use threads instead of non-blocking
I/O.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 Makefile.objs        |    6 +-
 aio.c => aio-posix.c |    0
 aio-win32.c          |  177 ++++++++++++++++++++++++++++++++++++++++++++++++++
 block/Makefile.objs  |    6 +-
 4 files changed, 185 insertions(+), 4 deletions(-)
 rename aio.c => aio-posix.c (100%)
 create mode 100644 aio-win32.c

diff --git a/Makefile.objs b/Makefile.objs
index 6ed1981..96d0e68 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -42,11 +42,11 @@ coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o
 # block-obj-y is code used by both qemu system emulation and qemu-img
 
 block-obj-y = cutils.o cache-utils.o qemu-option.o module.o async.o
-block-obj-y += nbd.o block.o aio.o aes.o qemu-config.o qemu-progress.o qemu-sockets.o
+block-obj-y += nbd.o block.o aes.o qemu-config.o qemu-progress.o qemu-sockets.o
 block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
-block-obj-$(CONFIG_POSIX) += event_notifier-posix.o
-block-obj-$(CONFIG_WIN32) += event_notifier-win32.o
+block-obj-$(CONFIG_POSIX) += event_notifier-posix.o aio-posix.o
+block-obj-$(CONFIG_WIN32) += event_notifier-win32.o aio-win32.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 block-obj-y += block/
 
diff --git a/aio.c b/aio-posix.c
similarity index 100%
rename from aio.c
rename to aio-posix.c
diff --git a/aio-win32.c b/aio-win32.c
new file mode 100644
index 0000000..0936f7f
--- /dev/null
+++ b/aio-win32.c
@@ -0,0 +1,177 @@
+/*
+ * QEMU aio implementation for Win32
+ *
+ * Copyright IBM Corp., 2008
+ * Copyright Red Hat Inc., 2012
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
+ */
+
+#include "qemu-common.h"
+#include "block.h"
+#include "qemu-queue.h"
+#include "qemu_socket.h"
+
+typedef struct AioHandler AioHandler;
+
+/* The list of registered AIO handlers */
+static QLIST_HEAD(, AioHandler) aio_handlers;
+
+/* This is a simple lock used to protect the aio_handlers list.  Specifically,
+ * it's used to ensure that no callbacks are removed while we're walking and
+ * dispatching callbacks.
+ */
+static int walking_handlers;
+
+struct AioHandler
+{
+    EventNotifier *e;
+    EventNotifierHandler *io_notify;
+    AioFlushEventNotifierHandler *io_flush;
+    int deleted;
+    QLIST_ENTRY(AioHandler) node;
+};
+
+void qemu_aio_set_event_notifier(EventNotifier *e,
+                                 EventNotifierHandler *io_notify,
+                                 AioFlushEventNotifierHandler *io_flush)
+{
+    AioHandler *node;
+
+    QLIST_FOREACH(node, &aio_handlers, node) {
+        if (node->e == e && !node->deleted) {
+            break;
+        }
+    }
+
+    /* Are we deleting the fd handler? */
+    if (!io_notify) {
+        if (node) {
+            qemu_del_wait_object(event_notifier_get_handle(e),
+                                 (WaitObjectFunc *) node->io_notify, e);
+
+            /* If the lock is held, just mark the node as deleted */
+            if (walking_handlers) {
+                node->deleted = 1;
+            } else {
+                /* Otherwise, delete it for real.  We can't just mark it as
+                 * deleted because deleted nodes are only cleaned up after
+                 * releasing the walking_handlers lock.
+                 */
+                QLIST_REMOVE(node, node);
+                g_free(node);
+            }
+        }
+    } else {
+        if (node == NULL) {
+            /* Alloc and insert if it's not already there */
+            node = g_malloc0(sizeof(AioHandler));
+            node->e = e;
+            QLIST_INSERT_HEAD(&aio_handlers, node, node);
+        }
+        /* Update handler with latest information */
+        node->io_notify = io_notify;
+        node->io_flush = io_flush;
+        qemu_add_wait_object(event_notifier_get_handle(e),
+                             (WaitObjectFunc *) io_notify, e);
+    }
+}
+
+void qemu_aio_flush(void)
+{
+    while (qemu_aio_wait());
+}
+
+bool qemu_aio_wait(void)
+{
+    AioHandler *node;
+    HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
+    bool busy;
+    int count;
+    int ret;
+    int timeout;
+
+    /*
+     * If there are callbacks left that have been queued, we need to call then.
+     * Do not call select in this case, because it is possible that the caller
+     * does not need a complete flush (as is the case for qemu_aio_wait loops).
+     */
+    if (qemu_bh_poll()) {
+        return true;
+    }
+
+    walking_handlers = 1;
+
+    /* fill fd sets */
+    busy = false;
+    count = 0;
+    QLIST_FOREACH(node, &aio_handlers, node) {
+        /* If there aren't pending AIO operations, don't invoke callbacks.
+         * Otherwise, if there are no AIO requests, qemu_aio_wait() would
+         * wait indefinitely.
+         */
+        if (node->io_flush) {
+            if (node->io_flush(node->e) == 0) {
+                continue;
+            }
+            busy = true;
+        }
+        if (!node->deleted && node->io_notify) {
+            events[count++] = event_notifier_get_handle(node->e);
+        }
+    }
+
+    walking_handlers = 0;
+
+    /* No AIO operations?  Get us out of here */
+    if (!busy) {
+        return false;
+    }
+
+    /* wait until next event */
+    timeout = INFINITE;
+    for (;;) {
+        ret = WaitForMultipleObjects(count, events, FALSE, timeout);
+        if ((DWORD) (ret - WAIT_OBJECT_0) >= count) {
+            break;
+        }
+
+        timeout = 0;
+
+        /* if we have any signaled events, dispatch event */
+        walking_handlers = 1;
+
+        /* we have to walk very carefully in case
+         * qemu_aio_set_fd_handler is called while we're walking */
+        node = QLIST_FIRST(&aio_handlers);
+        while (node) {
+            AioHandler *tmp;
+
+            if (!node->deleted &&
+                event_notifier_get_handle(node->e) == events[ret - WAIT_OBJECT_0] &&
+                node->io_notify) {
+                node->io_notify(node->e);
+            }
+
+            tmp = node;
+            node = QLIST_NEXT(node, node);
+
+            if (tmp->deleted) {
+                QLIST_REMOVE(tmp, node);
+                g_free(tmp);
+            }
+        }
+
+        walking_handlers = 0;
+    }
+
+    return true;
+}
diff --git a/block/Makefile.objs b/block/Makefile.objs
index b5754d3..65d4dc6 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -2,10 +2,14 @@ block-obj-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat
 block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
 block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
-block-obj-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
+block-obj-y += parallels.o blkdebug.o blkverify.o
 block-obj-y += stream.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o
 block-obj-$(CONFIG_POSIX) += raw-posix.o
+
+ifeq ($(CONFIG_POSIX),y)
+block-obj-y += nbd.o sheepdog.o
 block-obj-$(CONFIG_LIBISCSI) += iscsi.o
 block-obj-$(CONFIG_CURL) += curl.o
 block-obj-$(CONFIG_RBD) += rbd.o
+endif
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 06/12] linux-aio: use event notifiers
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (4 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 05/12] aio: add Win32 implementation Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-19 19:10   ` Anthony Liguori
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore Paolo Bonzini
                   ` (5 subsequent siblings)
  11 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

Since linux-aio already uses an eventfd, converting it to use the
EventNotifier-based API simplifies the code even though it is not
meant to be portable.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 linux-aio.c |   49 +++++++++++++++++++------------------------------
 1 file changed, 19 insertions(+), 30 deletions(-)

diff --git a/linux-aio.c b/linux-aio.c
index fa0fbf3..779f793 100644
--- a/linux-aio.c
+++ b/linux-aio.c
@@ -10,8 +10,8 @@
 #include "qemu-common.h"
 #include "qemu-aio.h"
 #include "block/raw-posix-aio.h"
+#include "event_notifier.h"
 
-#include <sys/eventfd.h>
 #include <libaio.h>
 
 /*
@@ -37,7 +37,7 @@ struct qemu_laiocb {
 
 struct qemu_laio_state {
     io_context_t ctx;
-    int efd;
+    EventNotifier e;
     int count;
 };
 
@@ -76,29 +76,17 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
     qemu_aio_release(laiocb);
 }
 
-static void qemu_laio_completion_cb(void *opaque)
+static void qemu_laio_completion_cb(EventNotifier *e)
 {
-    struct qemu_laio_state *s = opaque;
+    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
 
-    while (1) {
+    while (event_notifier_test_and_clear(&s->e)) {
         struct io_event events[MAX_EVENTS];
-        uint64_t val;
-        ssize_t ret;
         struct timespec ts = { 0 };
         int nevents, i;
 
         do {
-            ret = read(s->efd, &val, sizeof(val));
-        } while (ret == -1 && errno == EINTR);
-
-        if (ret == -1 && errno == EAGAIN)
-            break;
-
-        if (ret != 8)
-            break;
-
-        do {
-            nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
+            nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts);
         } while (nevents == -EINTR);
 
         for (i = 0; i < nevents; i++) {
@@ -112,9 +100,9 @@ static void qemu_laio_completion_cb(void *opaque)
     }
 }
 
-static int qemu_laio_flush_cb(void *opaque)
+static int qemu_laio_flush_cb(EventNotifier *e)
 {
-    struct qemu_laio_state *s = opaque;
+    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
 
     return (s->count > 0) ? 1 : 0;
 }
@@ -146,8 +134,9 @@ static void laio_cancel(BlockDriverAIOCB *blockacb)
      * We might be able to do this slightly more optimal by removing the
      * O_NONBLOCK flag.
      */
-    while (laiocb->ret == -EINPROGRESS)
-        qemu_laio_completion_cb(laiocb->ctx);
+    while (laiocb->ret == -EINPROGRESS) {
+        qemu_laio_completion_cb(&laiocb->ctx->e);
+    }
 }
 
 static AIOPool laio_pool = {
@@ -186,7 +175,7 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
                         __func__, type);
         goto out_free_aiocb;
     }
-    io_set_eventfd(&laiocb->iocb, s->efd);
+    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
     s->count++;
 
     if (io_submit(s->ctx, 1, &iocbs) < 0)
@@ -205,21 +194,21 @@ void *laio_init(void)
     struct qemu_laio_state *s;
 
     s = g_malloc0(sizeof(*s));
-    s->efd = eventfd(0, 0);
-    if (s->efd == -1)
+    if (event_notifier_init(&s->e, false) < 0) {
         goto out_free_state;
-    fcntl(s->efd, F_SETFL, O_NONBLOCK);
+    }
 
-    if (io_setup(MAX_EVENTS, &s->ctx) != 0)
+    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
         goto out_close_efd;
+    }
 
-    qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, NULL,
-        qemu_laio_flush_cb, s);
+    qemu_aio_set_event_notifier(&s->e, qemu_laio_completion_cb,
+                                qemu_laio_flush_cb);
 
     return s;
 
 out_close_efd:
-    close(s->efd);
+    event_notifier_cleanup(&s->e);
 out_free_state:
     g_free(s);
     return NULL;
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (5 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 06/12] linux-aio: use event notifiers Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-16 12:00   ` Jan Kiszka
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 08/12] aio: add generic thread-pool facility Paolo Bonzini
                   ` (4 subsequent siblings)
  11 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

The new thread pool will use semaphores instead of condition
variables, because QemuCond does not have qemu_cond_timedwait.
(I also like it more this way, since semaphores model well the
producer-consumer problem).

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 qemu-thread-posix.c |   74 +++++++++++++++++++++++++++++++++++++++++++++++++++
 qemu-thread-posix.h |    5 ++++
 qemu-thread-win32.c |   35 ++++++++++++++++++++++++
 qemu-thread-win32.h |    4 +++
 qemu-thread.h       |    7 +++++
 5 files changed, 125 insertions(+)

diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c
index 9e1b5fb..251fef0 100644
--- a/qemu-thread-posix.c
+++ b/qemu-thread-posix.c
@@ -17,6 +17,9 @@
 #include <signal.h>
 #include <stdint.h>
 #include <string.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/time.h>
 #include "qemu-thread.h"
 
 static void error_exit(int err, const char *msg)
@@ -115,6 +118,77 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
         error_exit(err, __func__);
 }
 
+void qemu_sem_init(QemuSemaphore *sem, int init)
+{
+    int rc;
+
+    rc = sem_init(&sem->sem, 0, init);
+    if (rc < 0) {
+        error_exit(errno, __func__);
+    }
+}
+
+void qemu_sem_destroy(QemuSemaphore *sem)
+{
+    int rc;
+
+    rc = sem_destroy(&sem->sem);
+    if (rc < 0) {
+        error_exit(errno, __func__);
+    }
+}
+
+void qemu_sem_post(QemuSemaphore *sem)
+{
+    int rc;
+
+    rc = sem_post(&sem->sem);
+    if (rc < 0) {
+        error_exit(errno, __func__);
+    }
+}
+
+int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
+{
+    int rc;
+
+    if (ms <= 0) {
+        /* This is cheaper than sem_timedwait.  */
+        rc = sem_trywait(&sem->sem);
+        if (rc == -1 && errno == EAGAIN) {
+            return -1;
+        }
+    } else {
+        struct timeval tv;
+        struct timespec ts;
+        gettimeofday(&tv, NULL);
+        ts.tv_nsec = tv.tv_usec * 1000 + (ms % 1000) * 1000000;
+        ts.tv_sec = tv.tv_sec + ms / 1000;
+        if (ts.tv_nsec >= 1000000000) {
+            ts.tv_sec++;
+            ts.tv_nsec -= 1000000000;
+        }
+        rc = sem_timedwait(&sem->sem, &ts);
+        if (rc == -1 && errno == ETIMEDOUT) {
+            return -1;
+        }
+    }
+    if (rc < 0) {
+        error_exit(errno, __func__);
+    }
+    return 0;
+}
+
+void qemu_sem_wait(QemuSemaphore *sem)
+{
+    int rc;
+
+    rc = sem_wait(&sem->sem);
+    if (rc < 0) {
+        error_exit(errno, __func__);
+    }
+}
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg, int mode)
diff --git a/qemu-thread-posix.h b/qemu-thread-posix.h
index ee4618e..2542c15 100644
--- a/qemu-thread-posix.h
+++ b/qemu-thread-posix.h
@@ -1,6 +1,7 @@
 #ifndef __QEMU_THREAD_POSIX_H
 #define __QEMU_THREAD_POSIX_H 1
 #include "pthread.h"
+#include <semaphore.h>
 
 struct QemuMutex {
     pthread_mutex_t lock;
@@ -10,6 +11,10 @@ struct QemuCond {
     pthread_cond_t cond;
 };
 
+struct QemuSemaphore {
+    sem_t sem;
+};
+
 struct QemuThread {
     pthread_t thread;
 };
diff --git a/qemu-thread-win32.c b/qemu-thread-win32.c
index 3524c8b..78602d2 100644
--- a/qemu-thread-win32.c
+++ b/qemu-thread-win32.c
@@ -192,6 +192,41 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
     qemu_mutex_lock(mutex);
 }
 
+void qemu_sem_init(QemuSemaphore *sem, int init)
+{
+    /* Manual reset.  */
+    sem->sema = CreateSemaphore(NULL, init, LONG_MAX, NULL);
+}
+
+void qemu_sem_destroy(QemuSemaphore *sem)
+{
+    CloseHandle(sem->sema);
+}
+
+void qemu_sem_post(QemuSemaphore *sem)
+{
+    ReleaseSemaphore(sem->sema, 1, NULL);
+}
+
+int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
+{
+    int rc = WaitForSingleObject(sem->sema, ms);
+    if (rc == WAIT_OBJECT_0) {
+        return 0;
+    }
+    if (rc != WAIT_TIMEOUT) {
+        error_exit(GetLastError(), __func__);
+    }
+    return -1;
+}
+
+void qemu_sem_wait(QemuSemaphore *sem)
+{
+    if (WaitForSingleObject(sem->sema, INFINITE) != WAIT_OBJECT_0) {
+        error_exit(GetLastError(), __func__);
+    }
+}
+
 struct QemuThreadData {
     /* Passed to win32_start_routine.  */
     void             *(*start_routine)(void *);
diff --git a/qemu-thread-win32.h b/qemu-thread-win32.h
index b9d1be8..13adb95 100644
--- a/qemu-thread-win32.h
+++ b/qemu-thread-win32.h
@@ -13,6 +13,10 @@ struct QemuCond {
     HANDLE continue_event;
 };
 
+struct QemuSemaphore {
+    HANDLE sema;
+};
+
 typedef struct QemuThreadData QemuThreadData;
 struct QemuThread {
     QemuThreadData *data;
diff --git a/qemu-thread.h b/qemu-thread.h
index a78a8f2..c3f960e 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -5,6 +5,7 @@
 
 typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
+typedef struct QemuSemaphore QemuSemaphore;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -37,6 +38,12 @@ void qemu_cond_signal(QemuCond *cond);
 void qemu_cond_broadcast(QemuCond *cond);
 void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex);
 
+void qemu_sem_init(QemuSemaphore *sem, int init);
+void qemu_sem_post(QemuSemaphore *sem);
+void qemu_sem_wait(QemuSemaphore *sem);
+int qemu_sem_timedwait(QemuSemaphore *sem, int ms);
+void qemu_sem_destroy(QemuSemaphore *sem);
+
 void qemu_thread_create(QemuThread *thread,
                         void *(*start_routine)(void *),
                         void *arg, int mode);
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 08/12] aio: add generic thread-pool facility
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (6 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 09/12] block: switch posix-aio-compat to threadpool Paolo Bonzini
                   ` (3 subsequent siblings)
  11 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

Add a generic thread-pool.  The code is roughly based on posix-aio-compat.c,
with some changes, especially the following:

- use QemuSemaphore instead of QemuCond;

- separate the state of the thread from the return code of the worker
function.  The return code is totally opaque for the thread pool;

- do not busy wait when doing cancellation.

A more generic threadpool (but still specific to I/O so that in the future
it can use special scheduling classes or PI mutexes) can have many uses:
it allows more flexibility in raw-posix.c and can more easily be extended
to Win32, and it will also be used to do an msync of the persistent bitmap.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 Makefile.objs |    2 +-
 thread-pool.c |  279 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 thread-pool.h |   34 +++++++
 trace-events  |    5 ++
 4 files changed, 319 insertions(+), 1 deletion(-)
 create mode 100644 thread-pool.c
 create mode 100644 thread-pool.h

diff --git a/Makefile.objs b/Makefile.objs
index 96d0e68..9a30cb5 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -41,7 +41,7 @@ coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o
 #######################################################################
 # block-obj-y is code used by both qemu system emulation and qemu-img
 
-block-obj-y = cutils.o cache-utils.o qemu-option.o module.o async.o
+block-obj-y = cutils.o cache-utils.o qemu-option.o module.o async.o thread-pool.o
 block-obj-y += nbd.o block.o aes.o qemu-config.o qemu-progress.o qemu-sockets.o
 block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
diff --git a/thread-pool.c b/thread-pool.c
new file mode 100644
index 0000000..7895544
--- /dev/null
+++ b/thread-pool.c
@@ -0,0 +1,279 @@
+/*
+ * QEMU block layer thread pool
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright Red Hat, Inc. 2012
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
+ */
+#include "qemu-common.h"
+#include "qemu-queue.h"
+#include "qemu-thread.h"
+#include "osdep.h"
+#include "qemu-coroutine.h"
+#include "trace.h"
+#include "block_int.h"
+#include "event_notifier.h"
+#include "thread-pool.h"
+
+static void do_spawn_thread(void);
+
+typedef struct ThreadPoolElement ThreadPoolElement;
+
+enum ThreadState {
+    THREAD_QUEUED,
+    THREAD_ACTIVE,
+    THREAD_DONE,
+    THREAD_CANCELED,
+};
+
+struct ThreadPoolElement {
+    BlockDriverAIOCB common;
+    ThreadPoolFunc *func;
+    void *arg;
+    enum ThreadState state;
+    int ret;
+
+    QTAILQ_ENTRY(ThreadPoolElement) reqs;
+    QLIST_ENTRY(ThreadPoolElement) all;
+};
+
+static EventNotifier notifier;
+static QemuMutex lock;
+static QemuCond check_cancel;
+static QemuSemaphore sem;
+static int max_threads = 64;
+static int cur_threads = 0;
+static int idle_threads = 0;
+static int new_threads = 0;     /* backlog of threads we need to create */
+static int pending_threads = 0; /* threads created but not running yet */
+static int pending_cancellations = 0; /* whether we need a cond_broadcast */
+static QEMUBH *new_thread_bh;
+static QLIST_HEAD(, ThreadPoolElement) head;
+static QTAILQ_HEAD(, ThreadPoolElement) request_list;
+
+static void *worker_thread(void *unused)
+{
+    qemu_mutex_lock(&lock);
+    pending_threads--;
+    qemu_mutex_unlock(&lock);
+    do_spawn_thread();
+
+    while (1) {
+        ThreadPoolElement *req;
+        int ret;
+
+        qemu_mutex_lock(&lock);
+        idle_threads++;
+        qemu_mutex_unlock(&lock);
+        ret = qemu_sem_timedwait(&sem, 10000);
+        qemu_mutex_lock(&lock);
+        idle_threads--;
+        if (ret == -1) {
+            if (QTAILQ_EMPTY(&request_list)) {
+                break;
+            }
+            qemu_mutex_unlock(&lock);
+            continue;
+        }
+
+        req = QTAILQ_FIRST(&request_list);
+        QTAILQ_REMOVE(&request_list, req, reqs);
+        req->state = THREAD_ACTIVE;
+        qemu_mutex_unlock(&lock);
+
+        ret = req->func(req->arg);
+
+        qemu_mutex_lock(&lock);
+        req->state = THREAD_DONE;
+        req->ret = ret;
+        if (pending_cancellations) {
+            qemu_cond_broadcast(&check_cancel);
+        }
+        qemu_mutex_unlock(&lock);
+
+        event_notifier_set(&notifier);
+    }
+
+    cur_threads--;
+    qemu_mutex_unlock(&lock);
+
+    return NULL;
+}
+
+static void do_spawn_thread(void)
+{
+    QemuThread t;
+
+    qemu_mutex_lock(&lock);
+    if (!new_threads) {
+        qemu_mutex_unlock(&lock);
+        return;
+    }
+
+    new_threads--;
+    pending_threads++;
+
+    qemu_mutex_unlock(&lock);
+
+    qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
+}
+
+static void spawn_thread_bh_fn(void *opaque)
+{
+    do_spawn_thread();
+}
+
+static void spawn_thread(void)
+{
+    cur_threads++;
+    new_threads++;
+    /* If there are threads being created, they will spawn new workers, so
+     * we don't spend time creating many threads in a loop holding a mutex or
+     * starving the current vcpu.
+     *
+     * If there are no idle threads, ask the main thread to create one, so we
+     * inherit the correct affinity instead of the vcpu affinity.
+     */
+    if (!pending_threads) {
+        qemu_bh_schedule(new_thread_bh);
+    }
+}
+
+static void event_notifier_ready(EventNotifier *notifier)
+{
+    ThreadPoolElement *elem, *next;
+
+    event_notifier_test_and_clear(notifier);
+restart:
+    QLIST_FOREACH_SAFE(elem, &head, all, next) {
+        if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
+            continue;
+        }
+        if (elem->state == THREAD_DONE) {
+            trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
+        }
+        if (elem->state == THREAD_DONE && elem->common.cb) {
+            QLIST_REMOVE(elem, all);
+            elem->common.cb(elem->common.opaque, elem->ret);
+            qemu_aio_release(elem);
+            goto restart;
+        } else {
+            /* remove the request */
+            QLIST_REMOVE(elem, all);
+            qemu_aio_release(elem);
+        }
+    }
+}
+
+static int thread_pool_active(EventNotifier *notifier)
+{
+    return !QLIST_EMPTY(&head);
+}
+
+static void thread_pool_cancel(BlockDriverAIOCB *acb)
+{
+    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
+
+    trace_thread_pool_cancel(elem, elem->common.opaque);
+
+    qemu_mutex_lock(&lock);
+    if (elem->state == THREAD_QUEUED &&
+        /* No thread has yet started working on elem. we can try to "steal"
+         * the item from the worker if we can get a signal from the
+         * semaphore.  Because this is non-blocking, we can do it with
+         * the lock taken and ensure that elem will remain THREAD_QUEUED.
+         */
+        qemu_sem_timedwait(&sem, 0) == 0) {
+        QTAILQ_REMOVE(&request_list, elem, reqs);
+        elem->state = THREAD_CANCELED;
+        event_notifier_set(&notifier);
+    } else {
+        pending_cancellations++;
+        while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
+            qemu_cond_wait(&check_cancel, &lock);
+        }
+        pending_cancellations--;
+    }
+    qemu_mutex_unlock(&lock);
+}
+
+static AIOPool thread_pool_cb_pool = {
+    .aiocb_size         = sizeof(ThreadPoolElement),
+    .cancel             = thread_pool_cancel,
+};
+
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    ThreadPoolElement *req;
+
+    req = qemu_aio_get(&thread_pool_cb_pool, NULL, cb, opaque);
+    req->func = func;
+    req->arg = arg;
+    req->state = THREAD_QUEUED;
+
+    QLIST_INSERT_HEAD(&head, req, all);
+
+    trace_thread_pool_submit(req, arg);
+
+    qemu_mutex_lock(&lock);
+    if (idle_threads == 0 && cur_threads < max_threads) {
+        spawn_thread();
+    }
+    QTAILQ_INSERT_TAIL(&request_list, req, reqs);
+    qemu_mutex_unlock(&lock);
+    qemu_sem_post(&sem);
+    return &req->common;
+}
+
+typedef struct ThreadPoolCo {
+    Coroutine *co;
+    int ret;
+} ThreadPoolCo;
+
+static void thread_pool_co_cb(void *opaque, int ret)
+{
+    ThreadPoolCo *co = opaque;
+
+    co->ret = ret;
+    qemu_coroutine_enter(co->co, NULL);
+}
+
+int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
+{
+    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
+    assert(qemu_in_coroutine());
+    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
+    qemu_coroutine_yield();
+    return tpc.ret;
+}
+
+void thread_pool_submit(ThreadPoolFunc *func, void *arg)
+{
+    thread_pool_submit_aio(func, arg, NULL, NULL);
+}
+
+static void thread_pool_init(void)
+{
+    QLIST_INIT(&head);
+    event_notifier_init(&notifier, false);
+    qemu_mutex_init(&lock);
+    qemu_cond_init(&check_cancel);
+    qemu_sem_init(&sem, 0);
+    qemu_aio_set_event_notifier(&notifier, event_notifier_ready,
+                                thread_pool_active);
+
+    QTAILQ_INIT(&request_list);
+    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
+}
+
+block_init(thread_pool_init)
diff --git a/thread-pool.h b/thread-pool.h
new file mode 100644
index 0000000..378a4ac
--- /dev/null
+++ b/thread-pool.h
@@ -0,0 +1,34 @@
+/*
+ * QEMU block layer thread pool
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright Red Hat, Inc. 2012
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
+ */
+
+#ifndef QEMU_THREAD_POOL_H
+#define QEMU_THREAD_POOL_H 1
+
+#include "qemu-common.h"
+#include "qemu-queue.h"
+#include "qemu-thread.h"
+#include "qemu-coroutine.h"
+#include "block_int.h"
+
+typedef int ThreadPoolFunc(void *opaque);
+
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+     BlockDriverCompletionFunc *cb, void *opaque);
+int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
+void thread_pool_submit(ThreadPoolFunc *func, void *arg);
+
+#endif
diff --git a/trace-events b/trace-events
index c935ba2..42f6592 100644
--- a/trace-events
+++ b/trace-events
@@ -86,6 +86,11 @@ virtio_blk_rw_complete(void *req, int ret) "req %p ret %d"
 virtio_blk_handle_write(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
 virtio_blk_handle_read(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
 
+# thread-pool.c
+thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
+thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
+thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
+
 # posix-aio-compat.c
 paio_submit(void *acb, void *opaque, int64_t sector_num, int nb_sectors, int type) "acb %p opaque %p sector_num %"PRId64" nb_sectors %d type %d"
 paio_complete(void *acb, void *opaque, int ret) "acb %p opaque %p ret %d"
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 09/12] block: switch posix-aio-compat to threadpool
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (7 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 08/12] aio: add generic thread-pool facility Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 10/12] raw: merge posix-aio-compat.c into block/raw-posix.c Paolo Bonzini
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

This is not meant for portability, but to remove code duplication.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/raw-posix-aio.h |    1 -
 block/raw-posix.c     |    5 -
 posix-aio-compat.c    |  433 +++++--------------------------------------------
 3 files changed, 42 insertions(+), 397 deletions(-)

diff --git a/block/raw-posix-aio.h b/block/raw-posix-aio.h
index ba118f6..6725135 100644
--- a/block/raw-posix-aio.h
+++ b/block/raw-posix-aio.h
@@ -28,7 +28,6 @@
 
 
 /* posix-aio-compat.c - thread pool based implementation */
-int paio_init(void);
 BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
         BlockDriverCompletionFunc *cb, void *opaque, int type);
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 0dce089..e5faccd 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -234,11 +234,6 @@ static int raw_open_common(BlockDriverState *bs, const char *filename,
         }
     }
 
-    /* We're falling back to POSIX AIO in some cases so init always */
-    if (paio_init() < 0) {
-        goto out_free_buf;
-    }
-
 #ifdef CONFIG_LINUX_AIO
     /*
      * Currently Linux do AIO only for files opened with O_DIRECT
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 68361f5..cf716dc 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -28,12 +28,11 @@
 #include "sysemu.h"
 #include "qemu-common.h"
 #include "trace.h"
+#include "thread-pool.h"
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
 
-static void do_spawn_thread(void);
-
 struct qemu_paiocb {
     BlockDriverAIOCB common;
     int aio_fildes;
@@ -45,82 +44,15 @@ struct qemu_paiocb {
     size_t aio_nbytes;
 #define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
     off_t aio_offset;
-
-    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
-    ssize_t ret;
-    int active;
-    struct qemu_paiocb *next;
 };
 
-typedef struct PosixAioState {
-    int rfd, wfd;
-    struct qemu_paiocb *first_aio;
-} PosixAioState;
-
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static int new_threads = 0;     /* backlog of threads we need to create */
-static int pending_threads = 0; /* threads created but not running yet */
-static QEMUBH *new_thread_bh;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
 static int preadv_present = 0;
 #endif
 
-static void die2(int err, const char *what)
-{
-    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
-    abort();
-}
-
-static void die(const char *what)
-{
-    die2(errno, what);
-}
-
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -309,289 +241,57 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void posix_aio_notify_event(void);
-
-static void *aio_thread(void *unused)
+static int aio_worker(void *arg)
 {
-    mutex_lock(&lock);
-    pending_threads--;
-    mutex_unlock(&lock);
-    do_spawn_thread();
-
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            idle_threads++;
-            ret = cond_timedwait(&cond, &lock, &ts);
-            idle_threads--;
+    struct qemu_paiocb *aiocb = arg;
+    ssize_t ret = 0;
+
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+        ret = handle_aiocb_rw(aiocb);
+        if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
+            /* A short read means that we have reached EOF. Pad the buffer
+             * with zeros for bytes after EOF. */
+            QEMUIOVector qiov;
+
+            qemu_iovec_init_external(&qiov, aiocb->aio_iov,
+                                     aiocb->aio_niov);
+            qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
+
+            ret = aiocb->aio_nbytes;
         }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-            ret = handle_aiocb_rw(aiocb);
-            if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
-                /* A short read means that we have reached EOF. Pad the buffer
-                 * with zeros for bytes after EOF. */
-                QEMUIOVector qiov;
-
-                qemu_iovec_init_external(&qiov, aiocb->aio_iov,
-                                         aiocb->aio_niov);
-                qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
-
-                ret = aiocb->aio_nbytes;
-            }
-            break;
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        if (ret == aiocb->aio_nbytes) {
+            ret = 0;
+        } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
             ret = -EINVAL;
-            break;
         }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        mutex_unlock(&lock);
-
-        posix_aio_notify_event();
-    }
-
-    cur_threads--;
-    mutex_unlock(&lock);
-
-    return NULL;
-}
-
-static void do_spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    mutex_lock(&lock);
-    if (!new_threads) {
-        mutex_unlock(&lock);
-        return;
-    }
-
-    new_threads--;
-    pending_threads++;
-
-    mutex_unlock(&lock);
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
-}
-
-static void spawn_thread_bh_fn(void *opaque)
-{
-    do_spawn_thread();
-}
-
-static void spawn_thread(void)
-{
-    cur_threads++;
-    new_threads++;
-    /* If there are threads being created, they will spawn new workers, so
-     * we don't spend time creating many threads in a loop holding a mutex or
-     * starving the current vcpu.
-     *
-     * If there are no idle threads, ask the main thread to create one, so we
-     * inherit the correct affinity instead of the vcpu affinity.
-     */
-    if (!pending_threads) {
-        qemu_bh_schedule(new_thread_bh);
-    }
-}
-
-static void qemu_paio_submit(struct qemu_paiocb *aiocb)
-{
-    aiocb->ret = -EINPROGRESS;
-    aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
-}
-
-static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
-{
-    ssize_t ret;
-
-    mutex_lock(&lock);
-    ret = aiocb->ret;
-    mutex_unlock(&lock);
-
-    return ret;
-}
-
-static int qemu_paio_error(struct qemu_paiocb *aiocb)
-{
-    ssize_t ret = qemu_paio_return(aiocb);
-
-    if (ret < 0)
-        ret = -ret;
-    else
-        ret = 0;
-
-    return ret;
-}
-
-static void posix_aio_read(void *opaque)
-{
-    PosixAioState *s = opaque;
-    struct qemu_paiocb *acb, **pacb;
-    int ret;
-    ssize_t len;
-
-    /* read all bytes from signal pipe */
-    for (;;) {
-        char bytes[16];
-
-        len = read(s->rfd, bytes, sizeof(bytes));
-        if (len == -1 && errno == EINTR)
-            continue; /* try again */
-        if (len == sizeof(bytes))
-            continue; /* more to read */
         break;
-    }
-
-    for(;;) {
-        pacb = &s->first_aio;
-        for(;;) {
-            acb = *pacb;
-            if (!acb)
-                return;
-
-            ret = qemu_paio_error(acb);
-            if (ret == ECANCELED) {
-                /* remove the request */
-                *pacb = acb->next;
-                qemu_aio_release(acb);
-            } else if (ret != EINPROGRESS) {
-                /* end of aio */
-                if (ret == 0) {
-                    ret = qemu_paio_return(acb);
-                    if (ret == acb->aio_nbytes)
-                        ret = 0;
-                    else
-                        ret = -EINVAL;
-                } else {
-                    ret = -ret;
-                }
-
-                trace_paio_complete(acb, acb->common.opaque, ret);
-
-                /* remove the request */
-                *pacb = acb->next;
-                /* call the callback */
-                acb->common.cb(acb->common.opaque, ret);
-                qemu_aio_release(acb);
-                break;
-            } else {
-                pacb = &acb->next;
-            }
-        }
-    }
-}
-
-static int posix_aio_flush(void *opaque)
-{
-    PosixAioState *s = opaque;
-    return !!s->first_aio;
-}
-
-static PosixAioState *posix_aio_state;
-
-static void posix_aio_notify_event(void)
-{
-    char byte = 0;
-    ssize_t ret;
-
-    ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
-    if (ret < 0 && errno != EAGAIN)
-        die("write()");
-}
-
-static void paio_remove(struct qemu_paiocb *acb)
-{
-    struct qemu_paiocb **pacb;
-
-    /* remove the callback from the queue */
-    pacb = &posix_aio_state->first_aio;
-    for(;;) {
-        if (*pacb == NULL) {
-            fprintf(stderr, "paio_remove: aio request not found!\n");
-            break;
-        } else if (*pacb == acb) {
-            *pacb = acb->next;
-            qemu_aio_release(acb);
-            break;
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        if (ret == aiocb->aio_nbytes) {
+            ret = 0;
+        } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
+            ret = -EINVAL;
         }
-        pacb = &(*pacb)->next;
-    }
-}
-
-static void paio_cancel(BlockDriverAIOCB *blockacb)
-{
-    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
-    int active = 0;
-
-    trace_paio_cancel(acb, acb->common.opaque);
-
-    mutex_lock(&lock);
-    if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
-    } else if (acb->ret == -EINPROGRESS) {
-        active = 1;
-    }
-    mutex_unlock(&lock);
-
-    if (active) {
-        /* fail safe: if the aio could not be canceled, we wait for
-           it */
-        while (qemu_paio_error(acb) == EINPROGRESS)
-            ;
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    paio_remove(acb);
+    qemu_aio_release(aiocb);
+    return ret;
 }
 
 static AIOPool raw_aio_pool = {
     .aiocb_size         = sizeof(struct qemu_paiocb),
-    .cancel             = paio_cancel,
 };
 
 BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
@@ -611,12 +311,8 @@ BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
     acb->aio_nbytes = nb_sectors * 512;
     acb->aio_offset = sector_num * 512;
 
-    acb->next = posix_aio_state->first_aio;
-    posix_aio_state->first_aio = acb;
-
     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-    qemu_paio_submit(acb);
-    return &acb->common;
+    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
 }
 
 BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
@@ -632,50 +328,5 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
     acb->aio_ioctl_buf = buf;
     acb->aio_ioctl_cmd = req;
 
-    acb->next = posix_aio_state->first_aio;
-    posix_aio_state->first_aio = acb;
-
-    qemu_paio_submit(acb);
-    return &acb->common;
-}
-
-int paio_init(void)
-{
-    PosixAioState *s;
-    int fds[2];
-    int ret;
-
-    if (posix_aio_state)
-        return 0;
-
-    s = g_malloc(sizeof(PosixAioState));
-
-    s->first_aio = NULL;
-    if (qemu_pipe(fds) == -1) {
-        fprintf(stderr, "failed to create pipe\n");
-        g_free(s);
-        return -1;
-    }
-
-    s->rfd = fds[0];
-    s->wfd = fds[1];
-
-    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
-    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
-
-    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
-
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
-
-    posix_aio_state = s;
-    return 0;
+    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
 }
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 10/12] raw: merge posix-aio-compat.c into block/raw-posix.c
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (8 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 09/12] block: switch posix-aio-compat to threadpool Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 11/12] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 12/12] raw-win32: add emulated AIO support Paolo Bonzini
  11 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

Making the qemu_paiocb specific to raw devices will let us access members
of the BDRVRawState arbitrarily.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 Makefile.objs         |    1 -
 block/raw-posix-aio.h |    8 --
 block/raw-posix.c     |  294 +++++++++++++++++++++++++++++++++++++++++++
 posix-aio-compat.c    |  332 -------------------------------------------------
 4 files changed, 294 insertions(+), 341 deletions(-)
 delete mode 100644 posix-aio-compat.c

diff --git a/Makefile.objs b/Makefile.objs
index 9a30cb5..3debcba 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -44,7 +44,6 @@ coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o
 block-obj-y = cutils.o cache-utils.o qemu-option.o module.o async.o thread-pool.o
 block-obj-y += nbd.o block.o aes.o qemu-config.o qemu-progress.o qemu-sockets.o
 block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
-block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_POSIX) += event_notifier-posix.o aio-posix.o
 block-obj-$(CONFIG_WIN32) += event_notifier-win32.o aio-win32.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
diff --git a/block/raw-posix-aio.h b/block/raw-posix-aio.h
index 6725135..c714367 100644
--- a/block/raw-posix-aio.h
+++ b/block/raw-posix-aio.h
@@ -27,14 +27,6 @@
 #define QEMU_AIO_MISALIGNED   0x1000
 
 
-/* posix-aio-compat.c - thread pool based implementation */
-BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
-        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
-        BlockDriverCompletionFunc *cb, void *opaque, int type);
-BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
-        unsigned long int req, void *buf,
-        BlockDriverCompletionFunc *cb, void *opaque);
-
 /* linux-aio.c - Linux native implementation */
 void *laio_init(void);
 BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
diff --git a/block/raw-posix.c b/block/raw-posix.c
index e5faccd..3e2e822 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -27,6 +27,8 @@
 #include "qemu-log.h"
 #include "block_int.h"
 #include "module.h"
+#include "trace.h"
+#include "thread-pool.h"
 #include "block/raw-posix-aio.h"
 
 #if defined(__APPLE__) && (__MACH__)
@@ -143,6 +145,20 @@ typedef struct BDRVRawState {
 static int fd_open(BlockDriverState *bs);
 static int64_t raw_getlength(BlockDriverState *bs);
 
+struct qemu_paiocb {
+    BlockDriverState *bs;
+    int aio_fildes;
+    union {
+        struct iovec *aio_iov;
+        void *aio_ioctl_buf;
+    };
+    int aio_niov;
+    size_t aio_nbytes;
+#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
+    off_t aio_offset;
+    int aio_type;
+};
+
 #if defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
 static int cdrom_reopen(BlockDriverState *bs);
 #endif
@@ -311,6 +327,284 @@ static int qiov_is_aligned(BlockDriverState *bs, QEMUIOVector *qiov)
     return 1;
 }
 
+static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
+{
+    int ret;
+
+    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
+    if (ret == -1)
+        return -errno;
+
+    /*
+     * This looks weird, but the aio code only considers a request
+     * successful if it has written the full number of bytes.
+     *
+     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
+     * so in fact we return the ioctl command here to make posix_aio_read()
+     * happy..
+     */
+    return aiocb->aio_nbytes;
+}
+
+static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
+{
+    int ret;
+
+    ret = qemu_fdatasync(aiocb->aio_fildes);
+    if (ret == -1)
+        return -errno;
+    return 0;
+}
+
+#ifdef CONFIG_PREADV
+
+static int preadv_present = 1;
+
+static ssize_t
+qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+    return preadv(fd, iov, nr_iov, offset);
+}
+
+static ssize_t
+qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+    return pwritev(fd, iov, nr_iov, offset);
+}
+
+#else
+
+static int preadv_present = 0;
+
+static ssize_t
+qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+    return -ENOSYS;
+}
+
+static ssize_t
+qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+    return -ENOSYS;
+}
+
+#endif
+
+static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
+{
+    ssize_t len;
+
+    do {
+        if (aiocb->aio_type & QEMU_AIO_WRITE)
+            len = qemu_pwritev(aiocb->aio_fildes,
+                               aiocb->aio_iov,
+                               aiocb->aio_niov,
+                               aiocb->aio_offset);
+         else
+            len = qemu_preadv(aiocb->aio_fildes,
+                              aiocb->aio_iov,
+                              aiocb->aio_niov,
+                              aiocb->aio_offset);
+    } while (len == -1 && errno == EINTR);
+
+    if (len == -1)
+        return -errno;
+    return len;
+}
+
+/*
+ * Read/writes the data to/from a given linear buffer.
+ *
+ * Returns the number of bytes handles or -errno in case of an error. Short
+ * reads are only returned if the end of the file is reached.
+ */
+static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
+{
+    ssize_t offset = 0;
+    ssize_t len;
+
+    while (offset < aiocb->aio_nbytes) {
+         if (aiocb->aio_type & QEMU_AIO_WRITE)
+             len = pwrite(aiocb->aio_fildes,
+                          (const char *)buf + offset,
+                          aiocb->aio_nbytes - offset,
+                          aiocb->aio_offset + offset);
+         else
+             len = pread(aiocb->aio_fildes,
+                         buf + offset,
+                         aiocb->aio_nbytes - offset,
+                         aiocb->aio_offset + offset);
+
+         if (len == -1 && errno == EINTR)
+             continue;
+         else if (len == -1) {
+             offset = -errno;
+             break;
+         } else if (len == 0)
+             break;
+
+         offset += len;
+    }
+
+    return offset;
+}
+
+static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
+{
+    ssize_t nbytes;
+    char *buf;
+
+    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
+        /*
+         * If there is just a single buffer, and it is properly aligned
+         * we can just use plain pread/pwrite without any problems.
+         */
+        if (aiocb->aio_niov == 1)
+             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
+
+        /*
+         * We have more than one iovec, and all are properly aligned.
+         *
+         * Try preadv/pwritev first and fall back to linearizing the
+         * buffer if it's not supported.
+         */
+        if (preadv_present) {
+            nbytes = handle_aiocb_rw_vector(aiocb);
+            if (nbytes == aiocb->aio_nbytes)
+                return nbytes;
+            if (nbytes < 0 && nbytes != -ENOSYS)
+                return nbytes;
+            preadv_present = 0;
+        }
+
+        /*
+         * XXX(hch): short read/write.  no easy way to handle the reminder
+         * using these interfaces.  For now retry using plain
+         * pread/pwrite?
+         */
+    }
+
+    /*
+     * Ok, we have to do it the hard way, copy all segments into
+     * a single aligned buffer.
+     */
+    buf = qemu_blockalign(aiocb->bs, aiocb->aio_nbytes);
+    if (aiocb->aio_type & QEMU_AIO_WRITE) {
+        char *p = buf;
+        int i;
+
+        for (i = 0; i < aiocb->aio_niov; ++i) {
+            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
+            p += aiocb->aio_iov[i].iov_len;
+        }
+    }
+
+    nbytes = handle_aiocb_rw_linear(aiocb, buf);
+    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
+        char *p = buf;
+        size_t count = aiocb->aio_nbytes, copy;
+        int i;
+
+        for (i = 0; i < aiocb->aio_niov && count; ++i) {
+            copy = count;
+            if (copy > aiocb->aio_iov[i].iov_len)
+                copy = aiocb->aio_iov[i].iov_len;
+            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
+            p     += copy;
+            count -= copy;
+        }
+    }
+    qemu_vfree(buf);
+
+    return nbytes;
+}
+
+static int aio_worker(void *arg)
+{
+    struct qemu_paiocb *aiocb = arg;
+    ssize_t ret = 0;
+
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+        ret = handle_aiocb_rw(aiocb);
+        if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->bs->growable) {
+            /* A short read means that we have reached EOF. Pad the buffer
+             * with zeros for bytes after EOF. */
+            QEMUIOVector qiov;
+
+            qemu_iovec_init_external(&qiov, aiocb->aio_iov,
+                                     aiocb->aio_niov);
+            qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
+
+            ret = aiocb->aio_nbytes;
+        }
+        if (ret == aiocb->aio_nbytes) {
+            ret = 0;
+        } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
+            ret = -EINVAL;
+        }
+        break;
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        if (ret == aiocb->aio_nbytes) {
+            ret = 0;
+        } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
+            ret = -EINVAL;
+        }
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
+    }
+
+    g_slice_free(struct qemu_paiocb, aiocb);
+    return ret;
+}
+
+static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque, int type)
+{
+    struct qemu_paiocb *acb = g_slice_new(struct qemu_paiocb);
+
+    acb->bs = bs;
+    acb->aio_type = type;
+    acb->aio_fildes = fd;
+
+    if (qiov) {
+        acb->aio_iov = qiov->iov;
+        acb->aio_niov = qiov->niov;
+    }
+    acb->aio_nbytes = nb_sectors * 512;
+    acb->aio_offset = sector_num * 512;
+
+    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
+    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+}
+
+static BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
+        unsigned long int req, void *buf,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    struct qemu_paiocb *acb = g_slice_new(struct qemu_paiocb);
+
+    acb->bs = bs;
+    acb->aio_type = QEMU_AIO_IOCTL;
+    acb->aio_fildes = fd;
+    acb->aio_offset = 0;
+    acb->aio_ioctl_buf = buf;
+    acb->aio_ioctl_cmd = req;
+
+    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+}
+
 static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
         BlockDriverCompletionFunc *cb, void *opaque, int type)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
deleted file mode 100644
index cf716dc..0000000
--- a/posix-aio-compat.c
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * QEMU posix-aio emulation
- *
- * Copyright IBM, Corp. 2008
- *
- * Authors:
- *  Anthony Liguori   <aliguori@us.ibm.com>
- *
- * This work is licensed under the terms of the GNU GPL, version 2.  See
- * the COPYING file in the top-level directory.
- *
- * Contributions after 2012-01-13 are licensed under the terms of the
- * GNU GPL, version 2 or (at your option) any later version.
- */
-
-#include <sys/ioctl.h>
-#include <sys/types.h>
-#include <pthread.h>
-#include <unistd.h>
-#include <errno.h>
-#include <time.h>
-#include <string.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "qemu-queue.h"
-#include "osdep.h"
-#include "sysemu.h"
-#include "qemu-common.h"
-#include "trace.h"
-#include "thread-pool.h"
-#include "block_int.h"
-
-#include "block/raw-posix-aio.h"
-
-struct qemu_paiocb {
-    BlockDriverAIOCB common;
-    int aio_fildes;
-    union {
-        struct iovec *aio_iov;
-        void *aio_ioctl_buf;
-    };
-    int aio_niov;
-    size_t aio_nbytes;
-#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
-    off_t aio_offset;
-    int aio_type;
-};
-
-#ifdef CONFIG_PREADV
-static int preadv_present = 1;
-#else
-static int preadv_present = 0;
-#endif
-
-static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
-{
-    int ret;
-
-    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
-    if (ret == -1)
-        return -errno;
-
-    /*
-     * This looks weird, but the aio code only considers a request
-     * successful if it has written the full number of bytes.
-     *
-     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
-     * so in fact we return the ioctl command here to make posix_aio_read()
-     * happy..
-     */
-    return aiocb->aio_nbytes;
-}
-
-static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
-{
-    int ret;
-
-    ret = qemu_fdatasync(aiocb->aio_fildes);
-    if (ret == -1)
-        return -errno;
-    return 0;
-}
-
-#ifdef CONFIG_PREADV
-
-static ssize_t
-qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
-{
-    return preadv(fd, iov, nr_iov, offset);
-}
-
-static ssize_t
-qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
-{
-    return pwritev(fd, iov, nr_iov, offset);
-}
-
-#else
-
-static ssize_t
-qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
-{
-    return -ENOSYS;
-}
-
-static ssize_t
-qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
-{
-    return -ENOSYS;
-}
-
-#endif
-
-static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
-{
-    ssize_t len;
-
-    do {
-        if (aiocb->aio_type & QEMU_AIO_WRITE)
-            len = qemu_pwritev(aiocb->aio_fildes,
-                               aiocb->aio_iov,
-                               aiocb->aio_niov,
-                               aiocb->aio_offset);
-         else
-            len = qemu_preadv(aiocb->aio_fildes,
-                              aiocb->aio_iov,
-                              aiocb->aio_niov,
-                              aiocb->aio_offset);
-    } while (len == -1 && errno == EINTR);
-
-    if (len == -1)
-        return -errno;
-    return len;
-}
-
-/*
- * Read/writes the data to/from a given linear buffer.
- *
- * Returns the number of bytes handles or -errno in case of an error. Short
- * reads are only returned if the end of the file is reached.
- */
-static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
-{
-    ssize_t offset = 0;
-    ssize_t len;
-
-    while (offset < aiocb->aio_nbytes) {
-         if (aiocb->aio_type & QEMU_AIO_WRITE)
-             len = pwrite(aiocb->aio_fildes,
-                          (const char *)buf + offset,
-                          aiocb->aio_nbytes - offset,
-                          aiocb->aio_offset + offset);
-         else
-             len = pread(aiocb->aio_fildes,
-                         buf + offset,
-                         aiocb->aio_nbytes - offset,
-                         aiocb->aio_offset + offset);
-
-         if (len == -1 && errno == EINTR)
-             continue;
-         else if (len == -1) {
-             offset = -errno;
-             break;
-         } else if (len == 0)
-             break;
-
-         offset += len;
-    }
-
-    return offset;
-}
-
-static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
-{
-    ssize_t nbytes;
-    char *buf;
-
-    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
-        /*
-         * If there is just a single buffer, and it is properly aligned
-         * we can just use plain pread/pwrite without any problems.
-         */
-        if (aiocb->aio_niov == 1)
-             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
-
-        /*
-         * We have more than one iovec, and all are properly aligned.
-         *
-         * Try preadv/pwritev first and fall back to linearizing the
-         * buffer if it's not supported.
-         */
-        if (preadv_present) {
-            nbytes = handle_aiocb_rw_vector(aiocb);
-            if (nbytes == aiocb->aio_nbytes)
-                return nbytes;
-            if (nbytes < 0 && nbytes != -ENOSYS)
-                return nbytes;
-            preadv_present = 0;
-        }
-
-        /*
-         * XXX(hch): short read/write.  no easy way to handle the reminder
-         * using these interfaces.  For now retry using plain
-         * pread/pwrite?
-         */
-    }
-
-    /*
-     * Ok, we have to do it the hard way, copy all segments into
-     * a single aligned buffer.
-     */
-    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
-    if (aiocb->aio_type & QEMU_AIO_WRITE) {
-        char *p = buf;
-        int i;
-
-        for (i = 0; i < aiocb->aio_niov; ++i) {
-            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
-            p += aiocb->aio_iov[i].iov_len;
-        }
-    }
-
-    nbytes = handle_aiocb_rw_linear(aiocb, buf);
-    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
-        char *p = buf;
-        size_t count = aiocb->aio_nbytes, copy;
-        int i;
-
-        for (i = 0; i < aiocb->aio_niov && count; ++i) {
-            copy = count;
-            if (copy > aiocb->aio_iov[i].iov_len)
-                copy = aiocb->aio_iov[i].iov_len;
-            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
-            p     += copy;
-            count -= copy;
-        }
-    }
-    qemu_vfree(buf);
-
-    return nbytes;
-}
-
-static int aio_worker(void *arg)
-{
-    struct qemu_paiocb *aiocb = arg;
-    ssize_t ret = 0;
-
-    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-    case QEMU_AIO_READ:
-        ret = handle_aiocb_rw(aiocb);
-        if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
-            /* A short read means that we have reached EOF. Pad the buffer
-             * with zeros for bytes after EOF. */
-            QEMUIOVector qiov;
-
-            qemu_iovec_init_external(&qiov, aiocb->aio_iov,
-                                     aiocb->aio_niov);
-            qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
-
-            ret = aiocb->aio_nbytes;
-        }
-        if (ret == aiocb->aio_nbytes) {
-            ret = 0;
-        } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
-            ret = -EINVAL;
-        }
-        break;
-    case QEMU_AIO_WRITE:
-        ret = handle_aiocb_rw(aiocb);
-        if (ret == aiocb->aio_nbytes) {
-            ret = 0;
-        } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
-            ret = -EINVAL;
-        }
-        break;
-    case QEMU_AIO_FLUSH:
-        ret = handle_aiocb_flush(aiocb);
-        break;
-    case QEMU_AIO_IOCTL:
-        ret = handle_aiocb_ioctl(aiocb);
-        break;
-    default:
-        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-        ret = -EINVAL;
-        break;
-    }
-
-    qemu_aio_release(aiocb);
-    return ret;
-}
-
-static AIOPool raw_aio_pool = {
-    .aiocb_size         = sizeof(struct qemu_paiocb),
-};
-
-BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
-        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
-        BlockDriverCompletionFunc *cb, void *opaque, int type)
-{
-    struct qemu_paiocb *acb;
-
-    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
-    acb->aio_type = type;
-    acb->aio_fildes = fd;
-
-    if (qiov) {
-        acb->aio_iov = qiov->iov;
-        acb->aio_niov = qiov->niov;
-    }
-    acb->aio_nbytes = nb_sectors * 512;
-    acb->aio_offset = sector_num * 512;
-
-    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
-}
-
-BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
-        unsigned long int req, void *buf,
-        BlockDriverCompletionFunc *cb, void *opaque)
-{
-    struct qemu_paiocb *acb;
-
-    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
-    acb->aio_type = QEMU_AIO_IOCTL;
-    acb->aio_fildes = fd;
-    acb->aio_offset = 0;
-    acb->aio_ioctl_buf = buf;
-    acb->aio_ioctl_cmd = req;
-
-    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
-}
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 11/12] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (9 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 10/12] raw: merge posix-aio-compat.c into block/raw-posix.c Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 12/12] raw-win32: add emulated AIO support Paolo Bonzini
  11 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/{raw-posix-aio.h => raw-aio.h} |   10 ++++++----
 block/raw-posix.c                    |    2 +-
 linux-aio.c                          |    2 +-
 3 files changed, 8 insertions(+), 6 deletions(-)
 rename block/{raw-posix-aio.h => raw-aio.h} (86%)

diff --git a/block/raw-posix-aio.h b/block/raw-aio.h
similarity index 86%
rename from block/raw-posix-aio.h
rename to block/raw-aio.h
index c714367..b3bb073 100644
--- a/block/raw-posix-aio.h
+++ b/block/raw-aio.h
@@ -1,5 +1,5 @@
 /*
- * QEMU Posix block I/O backend AIO support
+ * Declarations for AIO in the raw protocol
  *
  * Copyright IBM, Corp. 2008
  *
@@ -12,8 +12,8 @@
  * Contributions after 2012-01-13 are licensed under the terms of the
  * GNU GPL, version 2 or (at your option) any later version.
  */
-#ifndef QEMU_RAW_POSIX_AIO_H
-#define QEMU_RAW_POSIX_AIO_H
+#ifndef QEMU_RAW_AIO_H
+#define QEMU_RAW_AIO_H
 
 /* AIO request types */
 #define QEMU_AIO_READ         0x0001
@@ -28,9 +28,11 @@
 
 
 /* linux-aio.c - Linux native implementation */
+#ifdef CONFIG_LINUX_AIO
 void *laio_init(void);
 BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
         BlockDriverCompletionFunc *cb, void *opaque, int type);
+#endif
 
-#endif /* QEMU_RAW_POSIX_AIO_H */
+#endif /* QEMU_RAW_AIO_H */
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 3e2e822..64296a6 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -29,7 +29,7 @@
 #include "module.h"
 #include "trace.h"
 #include "thread-pool.h"
-#include "block/raw-posix-aio.h"
+#include "block/raw-aio.h"
 
 #if defined(__APPLE__) && (__MACH__)
 #include <paths.h>
diff --git a/linux-aio.c b/linux-aio.c
index 779f793..67c49af 100644
--- a/linux-aio.c
+++ b/linux-aio.c
@@ -9,7 +9,7 @@
  */
 #include "qemu-common.h"
 #include "qemu-aio.h"
-#include "block/raw-posix-aio.h"
+#include "block/raw-aio.h"
 #include "event_notifier.h"
 
 #include <libaio.h>
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH 12/12] raw-win32: add emulated AIO support
  2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
                   ` (10 preceding siblings ...)
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 11/12] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes Paolo Bonzini
@ 2012-07-16 10:42 ` Paolo Bonzini
  2012-07-23 16:35   ` Blue Swirl
  11 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 10:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

The thread pool can be used under Win32 in the same way as in raw-posix.c.
Move the existing synchronous code into callbacks, and pass the return
code back.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/raw-win32.c |  189 +++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 140 insertions(+), 49 deletions(-)

diff --git a/block/raw-win32.c b/block/raw-win32.c
index e4b0b75..a50d636 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -25,6 +25,9 @@
 #include "qemu-timer.h"
 #include "block_int.h"
 #include "module.h"
+#include "raw-aio.h"
+#include "trace.h"
+#include "thread-pool.h"
 #include <windows.h>
 #include <winioctl.h>
 
@@ -32,12 +35,130 @@
 #define FTYPE_CD     1
 #define FTYPE_HARDDISK 2
 
+struct qemu_paiocb {
+    BlockDriverState *bs;
+    HANDLE hfile;
+    struct iovec *aio_iov;
+    int aio_niov;
+    size_t aio_nbytes;
+    off_t aio_offset;
+    int aio_type;
+};
+
 typedef struct BDRVRawState {
     HANDLE hfile;
     int type;
     char drive_path[16]; /* format: "d:\" */
 } BDRVRawState;
 
+/*
+ * Read/writes the data to/from a given linear buffer.
+ *
+ * Returns the number of bytes handles or -errno in case of an error. Short
+ * reads are only returned if the end of the file is reached.
+ */
+static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
+{
+    size_t offset = 0;
+    int i;
+
+    for (i = 0; i < aiocb->aio_niov; i++) {
+        OVERLAPPED ov;
+        DWORD ret, ret_count, len;
+
+        memset(&ov, 0, sizeof(ov));
+        ov.Offset = (aiocb->aio_offset + offset);
+        ov.OffsetHigh = (aiocb->aio_offset + offset) >> 32;
+        len = aiocb->aio_iov[i].iov_len;
+        if (aiocb->aio_type & QEMU_AIO_WRITE) {
+            ret = WriteFile(aiocb->hfile, aiocb->aio_iov[i].iov_base,
+                            len, &ret_count, &ov);
+        } else {
+            ret = ReadFile(aiocb->hfile, aiocb->aio_iov[i].iov_base,
+                           len, &ret_count, &ov);
+        }
+        if (!ret) {
+            ret_count = 0;
+        }
+        if (ret_count != len) {
+            break;
+        }
+        offset += len;
+    }
+
+    return offset;
+}
+
+static int aio_worker(void *arg)
+{
+    struct qemu_paiocb *aiocb = arg;
+    ssize_t ret = 0;
+    size_t count;
+
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+        count = handle_aiocb_rw(aiocb);
+        if (count < aiocb->aio_nbytes && aiocb->bs->growable) {
+            /* A short read means that we have reached EOF. Pad the buffer
+             * with zeros for bytes after EOF. */
+            QEMUIOVector qiov;
+
+            qemu_iovec_init_external(&qiov, aiocb->aio_iov,
+                                     aiocb->aio_niov);
+            qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - count, count);
+
+            count = aiocb->aio_nbytes;
+        }
+        if (count == aiocb->aio_nbytes) {
+            ret = 0;
+        } else {
+            ret = -EINVAL;
+        }
+        break;
+    case QEMU_AIO_WRITE:
+        count = handle_aiocb_rw(aiocb);
+        if (count == aiocb->aio_nbytes) {
+            count = 0;
+        } else {
+            count = -EINVAL;
+        }
+        break;
+    case QEMU_AIO_FLUSH:
+        if (!FlushFileBuffers(aiocb->hfile)) {
+            return -EIO;
+        }
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
+    }
+
+    g_slice_free(struct qemu_paiocb, aiocb);
+    return ret;
+}
+
+static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque, int type)
+{
+    struct qemu_paiocb *acb = g_slice_new(struct qemu_paiocb);
+
+    acb->bs = bs;
+    acb->hfile = hfile;
+    acb->aio_type = type;
+
+    if (qiov) {
+        acb->aio_iov = qiov->iov;
+        acb->aio_niov = qiov->niov;
+    }
+    acb->aio_nbytes = nb_sectors * 512;
+    acb->aio_offset = sector_num * 512;
+
+    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
+    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+}
+
 int qemu_ftruncate64(int fd, int64_t length)
 {
     LARGE_INTEGER li;
@@ -109,59 +230,29 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
     return 0;
 }
 
-static int raw_read(BlockDriverState *bs, int64_t sector_num,
-                    uint8_t *buf, int nb_sectors)
+static BlockDriverAIOCB *raw_aio_readv(BlockDriverState *bs,
+                         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+                         BlockDriverCompletionFunc *cb, void *opaque)
 {
     BDRVRawState *s = bs->opaque;
-    OVERLAPPED ov;
-    DWORD ret_count;
-    int ret;
-    int64_t offset = sector_num * 512;
-    int count = nb_sectors * 512;
-
-    memset(&ov, 0, sizeof(ov));
-    ov.Offset = offset;
-    ov.OffsetHigh = offset >> 32;
-    ret = ReadFile(s->hfile, buf, count, &ret_count, &ov);
-    if (!ret)
-        return ret_count;
-    if (ret_count == count)
-        ret_count = 0;
-    return ret_count;
+    return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
+                       cb, opaque, QEMU_AIO_READ);
 }
 
-static int raw_write(BlockDriverState *bs, int64_t sector_num,
-                     const uint8_t *buf, int nb_sectors)
+static BlockDriverAIOCB *raw_aio_writev(BlockDriverState *bs,
+                          int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+                          BlockDriverCompletionFunc *cb, void *opaque)
 {
     BDRVRawState *s = bs->opaque;
-    OVERLAPPED ov;
-    DWORD ret_count;
-    int ret;
-    int64_t offset = sector_num * 512;
-    int count = nb_sectors * 512;
-
-    memset(&ov, 0, sizeof(ov));
-    ov.Offset = offset;
-    ov.OffsetHigh = offset >> 32;
-    ret = WriteFile(s->hfile, buf, count, &ret_count, &ov);
-    if (!ret)
-        return ret_count;
-    if (ret_count == count)
-        ret_count = 0;
-    return ret_count;
+    return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
+                       cb, opaque, QEMU_AIO_WRITE);
 }
 
-static int raw_flush(BlockDriverState *bs)
+static BlockDriverAIOCB *raw_aio_flush(BlockDriverState *bs,
+                         BlockDriverCompletionFunc *cb, void *opaque)
 {
     BDRVRawState *s = bs->opaque;
-    int ret;
-
-    ret = FlushFileBuffers(s->hfile);
-    if (ret == 0) {
-        return -EIO;
-    }
-
-    return 0;
+    return paio_submit(bs, s->hfile, 0, NULL, 0, cb, opaque, QEMU_AIO_FLUSH);
 }
 
 static void raw_close(BlockDriverState *bs)
@@ -282,9 +373,9 @@ static BlockDriver bdrv_file = {
     .bdrv_close		= raw_close,
     .bdrv_create	= raw_create,
 
-    .bdrv_read              = raw_read,
-    .bdrv_write             = raw_write,
-    .bdrv_co_flush_to_disk  = raw_flush,
+    .bdrv_aio_readv     = raw_aio_readv,
+    .bdrv_aio_writev    = raw_aio_writev,
+    .bdrv_aio_flush     = raw_aio_flush,
 
     .bdrv_truncate	= raw_truncate,
     .bdrv_getlength	= raw_getlength,
@@ -413,9 +504,9 @@ static BlockDriver bdrv_host_device = {
     .bdrv_close		= raw_close,
     .bdrv_has_zero_init = hdev_has_zero_init,
 
-    .bdrv_read              = raw_read,
-    .bdrv_write             = raw_write,
-    .bdrv_co_flush_to_disk  = raw_flush,
+    .bdrv_aio_readv     = raw_aio_readv,
+    .bdrv_aio_writev    = raw_aio_writev,
+    .bdrv_aio_flush     = raw_aio_flush,
 
     .bdrv_getlength	= raw_getlength,
     .bdrv_get_allocated_file_size
-- 
1.7.10.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore Paolo Bonzini
@ 2012-07-16 12:00   ` Jan Kiszka
  2012-07-16 12:01     ` [Qemu-devel] [PATCH] qemu-thread: Introduce qemu_cond_timedwait for POSIX Jan Kiszka
  2012-07-16 13:20     ` [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore Paolo Bonzini
  0 siblings, 2 replies; 28+ messages in thread
From: Jan Kiszka @ 2012-07-16 12:00 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: kwolf, aliguori, sw, qemu-devel, stefanha

On 2012-07-16 12:42, Paolo Bonzini wrote:
> The new thread pool will use semaphores instead of condition
> variables, because QemuCond does not have qemu_cond_timedwait.

I'll post an updated patch (according to last round's review comments)
that adds this service for POSIX. I bet you'll find a way to extend it
to Win32 if that is required. ;)

> (I also like it more this way, since semaphores model well the
> producer-consumer problem).

Let's not introduce another synchronization mechanism unless there is a
real need. Semaphores tend to be misused for things they don't fit, so
better keep them out of reach.

Also, if you do producer-consumer this way, you need a down() for every
entity you dequeue. In contrast, you only interact with condition
variables if there the consumer queue is empty - less atomic ops.

Jan

-- 
Siemens AG, Corporate Technology, CT RTC ITP SDP-DE
Corporate Competence Center Embedded Linux

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [Qemu-devel] [PATCH] qemu-thread: Introduce qemu_cond_timedwait for POSIX
  2012-07-16 12:00   ` Jan Kiszka
@ 2012-07-16 12:01     ` Jan Kiszka
  2012-07-16 13:20     ` [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore Paolo Bonzini
  1 sibling, 0 replies; 28+ messages in thread
From: Jan Kiszka @ 2012-07-16 12:01 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

First user will be POSIX compat aio. Windows use cases aren't in sight,
so this remains a POSIX-only service for now.

This version uses CLOCK_MONOTONIC for the timeout to avoid jumps on wall
clock adjustments, provided the host support pthread_condattr_setclock.

Signed-off-by: Jan Kiszka <jan.kiszka@siemens.com>
---
 configure           |   21 +++++++++++++++++++++
 qemu-thread-posix.c |   46 +++++++++++++++++++++++++++++++++++++++++++++-
 qemu-thread-posix.h |    5 +++++
 3 files changed, 71 insertions(+), 1 deletions(-)

diff --git a/configure b/configure
index 0a3896e..dec39c8 100755
--- a/configure
+++ b/configure
@@ -2135,6 +2135,23 @@ if test "$mingw32" != yes -a "$pthread" = no; then
 fi
 
 ##########################################
+# pthread_condattr_setclock probe
+condattr_setclock="no"
+if test "pthread" != "no" ; then
+  cat > $TMPC << EOF
+#include <pthread.h>
+int main(void) {
+  pthread_condattr_t attr;
+  pthread_condattr_init(&attr);
+  return pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
+}
+EOF
+  if compile_prog "" "$LIBS" && $TMPE; then
+    condattr_setclock="yes"
+  fi
+fi
+
+##########################################
 # rbd probe
 if test "$rbd" != "no" ; then
   cat > $TMPC <<EOF
@@ -3040,6 +3057,7 @@ echo "preadv support    $preadv"
 echo "fdatasync         $fdatasync"
 echo "madvise           $madvise"
 echo "posix_madvise     $posix_madvise"
+echo "condattr_setclock $condattr_setclock"
 echo "uuid support      $uuid"
 echo "libcap-ng support $cap_ng"
 echo "vhost-net support $vhost_net"
@@ -3324,6 +3342,9 @@ fi
 if test "$posix_madvise" = "yes" ; then
   echo "CONFIG_POSIX_MADVISE=y" >> $config_host_mak
 fi
+if test "$condattr_setclock" = "yes" ; then
+  echo "CONFIG_CONDATTR_SETCLOCK=y" >> $config_host_mak
+fi
 
 if test "$spice" = "yes" ; then
   echo "CONFIG_SPICE=y" >> $config_host_mak
diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c
index 9e1b5fb..ed6ab06 100644
--- a/qemu-thread-posix.c
+++ b/qemu-thread-posix.c
@@ -17,6 +17,8 @@
 #include <signal.h>
 #include <stdint.h>
 #include <string.h>
+#include <sys/time.h>
+#include "config-host.h"
 #include "qemu-thread.h"
 
 static void error_exit(int err, const char *msg)
@@ -73,8 +75,20 @@ void qemu_mutex_unlock(QemuMutex *mutex)
 void qemu_cond_init(QemuCond *cond)
 {
     int err;
+    pthread_condattr_t attr;
 
-    err = pthread_cond_init(&cond->cond, NULL);
+    err = pthread_condattr_init(&attr);
+    if (err) {
+        error_exit(err, __func__);
+    }
+#ifdef CONFIG_CONDATTR_SETCLOCK
+    err = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
+    if (err) {
+        error_exit(err, __func__);
+    }
+#endif
+
+    err = pthread_cond_init(&cond->cond, &attr);
     if (err)
         error_exit(err, __func__);
 }
@@ -115,6 +129,36 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
         error_exit(err, __func__);
 }
 
+/* Returns true if condition was signals, false if timed out. */
+bool qemu_cond_timedwait(QemuCond *cond, QemuMutex *mutex,
+                         unsigned int timeout_ms)
+{
+    struct timespec ts;
+    int err;
+
+#ifdef CONFIG_CONDATTR_SETCLOCK
+    clock_gettime(CLOCK_MONOTONIC, &ts);
+#else
+    struct timeval tv;
+
+    gettimeofday(&tv, NULL);
+    ts.tv_sec = tv.tv_sec;
+    ts.tv_nsec = tv.tv_usec * 1000;
+#endif
+
+    ts.tv_sec += timeout_ms / 1000;
+    ts.tv_nsec += (timeout_ms % 1000) * 1000000;
+    if (ts.tv_nsec > 1000000000) {
+        ts.tv_sec++;
+        ts.tv_nsec -= 1000000000;
+    }
+    err = pthread_cond_timedwait(&cond->cond, &mutex->lock, &ts);
+    if (err && err != ETIMEDOUT) {
+        error_exit(err, __func__);
+    }
+    return err == 0;
+}
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg, int mode)
diff --git a/qemu-thread-posix.h b/qemu-thread-posix.h
index ee4618e..9f00524 100644
--- a/qemu-thread-posix.h
+++ b/qemu-thread-posix.h
@@ -1,5 +1,6 @@
 #ifndef __QEMU_THREAD_POSIX_H
 #define __QEMU_THREAD_POSIX_H 1
+#include <stdbool.h>
 #include "pthread.h"
 
 struct QemuMutex {
@@ -14,4 +15,8 @@ struct QemuThread {
     pthread_t thread;
 };
 
+/* only provided for posix so far */
+bool qemu_cond_timedwait(QemuCond *cond, QemuMutex *mutex,
+                         unsigned int timeout_ms);
+
 #endif
-- 
1.7.3.4

^ permalink raw reply related	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 12:00   ` Jan Kiszka
  2012-07-16 12:01     ` [Qemu-devel] [PATCH] qemu-thread: Introduce qemu_cond_timedwait for POSIX Jan Kiszka
@ 2012-07-16 13:20     ` Paolo Bonzini
  2012-07-16 13:34       ` Jan Kiszka
  1 sibling, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 13:20 UTC (permalink / raw)
  To: Jan Kiszka; +Cc: kwolf, aliguori, sw, qemu-devel, stefanha

Il 16/07/2012 14:00, Jan Kiszka ha scritto:
> On 2012-07-16 12:42, Paolo Bonzini wrote:
>> The new thread pool will use semaphores instead of condition
>> variables, because QemuCond does not have qemu_cond_timedwait.
> 
> I'll post an updated patch (according to last round's review comments)
> that adds this service for POSIX. I bet you'll find a way to extend it
> to Win32 if that is required. ;)

I can do that (or just use pthreads-win32), but only at the cost of
making cond_wait() slower and more complex.

>> (I also like it more this way, since semaphores model well the
>> producer-consumer problem).
> 
> Let's not introduce another synchronization mechanism unless there is a
> real need. Semaphores tend to be misused for things they don't fit, so
> better keep them out of reach.

That's what patch review is for...

> Also, if you do producer-consumer this way, you need a down() for every
> entity you dequeue. In contrast, you only interact with condition
> variables if there the consumer queue is empty - less atomic ops.

It doesn't really matter.  You want the thread pool to service requests
as fast as possible, which means you'll have always at least one free
thread waiting on the semaphore or cv.  So, with either semaphores or
cvs, the slow path is actually the normal case.

Paolo

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 13:20     ` [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore Paolo Bonzini
@ 2012-07-16 13:34       ` Jan Kiszka
  2012-07-16 13:35         ` Paolo Bonzini
  0 siblings, 1 reply; 28+ messages in thread
From: Jan Kiszka @ 2012-07-16 13:34 UTC (permalink / raw)
  To: Paolo Bonzini
  Cc: kwolf@redhat.com, aliguori@linux.vnet.ibm.com, sw@weilnetz.de,
	qemu-devel@nongnu.org, stefanha@linux.vnet.ibm.com

On 2012-07-16 15:20, Paolo Bonzini wrote:
> Il 16/07/2012 14:00, Jan Kiszka ha scritto:
>> On 2012-07-16 12:42, Paolo Bonzini wrote:
>>> The new thread pool will use semaphores instead of condition
>>> variables, because QemuCond does not have qemu_cond_timedwait.
>>
>> I'll post an updated patch (according to last round's review comments)
>> that adds this service for POSIX. I bet you'll find a way to extend it
>> to Win32 if that is required. ;)
> 
> I can do that (or just use pthreads-win32), but only at the cost of
> making cond_wait() slower and more complex.

Why will it affect cond_wait? WaitForSingleObject can time out as well.

> 
>>> (I also like it more this way, since semaphores model well the
>>> producer-consumer problem).
>>
>> Let's not introduce another synchronization mechanism unless there is a
>> real need. Semaphores tend to be misused for things they don't fit, so
>> better keep them out of reach.
> 
> That's what patch review is for...

But even better is avoiding the temptation.

Jan

-- 
Siemens AG, Corporate Technology, CT RTC ITP SDP-DE
Corporate Competence Center Embedded Linux

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 13:34       ` Jan Kiszka
@ 2012-07-16 13:35         ` Paolo Bonzini
  2012-07-16 13:53           ` Jan Kiszka
  0 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 13:35 UTC (permalink / raw)
  To: Jan Kiszka
  Cc: kwolf@redhat.com, aliguori@linux.vnet.ibm.com, sw@weilnetz.de,
	qemu-devel@nongnu.org, stefanha@linux.vnet.ibm.com

Il 16/07/2012 15:34, Jan Kiszka ha scritto:
> On 2012-07-16 15:20, Paolo Bonzini wrote:
>> Il 16/07/2012 14:00, Jan Kiszka ha scritto:
>>> On 2012-07-16 12:42, Paolo Bonzini wrote:
>>>> The new thread pool will use semaphores instead of condition
>>>> variables, because QemuCond does not have qemu_cond_timedwait.
>>>
>>> I'll post an updated patch (according to last round's review comments)
>>> that adds this service for POSIX. I bet you'll find a way to extend it
>>> to Win32 if that is required. ;)
>>
>> I can do that (or just use pthreads-win32), but only at the cost of
>> making cond_wait() slower and more complex.
> 
> Why will it affect cond_wait? WaitForSingleObject can time out as well.

qemu_cond_wait only uses WaitForSingleObject with INFINITE timeout, and
the algorithm relies on that.

Paolo

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 13:35         ` Paolo Bonzini
@ 2012-07-16 13:53           ` Jan Kiszka
  2012-07-16 14:03             ` Paolo Bonzini
  0 siblings, 1 reply; 28+ messages in thread
From: Jan Kiszka @ 2012-07-16 13:53 UTC (permalink / raw)
  To: Paolo Bonzini
  Cc: kwolf@redhat.com, aliguori@linux.vnet.ibm.com, sw@weilnetz.de,
	qemu-devel@nongnu.org, stefanha@linux.vnet.ibm.com

On 2012-07-16 15:35, Paolo Bonzini wrote:
> Il 16/07/2012 15:34, Jan Kiszka ha scritto:
>> On 2012-07-16 15:20, Paolo Bonzini wrote:
>>> Il 16/07/2012 14:00, Jan Kiszka ha scritto:
>>>> On 2012-07-16 12:42, Paolo Bonzini wrote:
>>>>> The new thread pool will use semaphores instead of condition
>>>>> variables, because QemuCond does not have qemu_cond_timedwait.
>>>>
>>>> I'll post an updated patch (according to last round's review comments)
>>>> that adds this service for POSIX. I bet you'll find a way to extend it
>>>> to Win32 if that is required. ;)
>>>
>>> I can do that (or just use pthreads-win32), but only at the cost of
>>> making cond_wait() slower and more complex.
>>
>> Why will it affect cond_wait? WaitForSingleObject can time out as well.
> 
> qemu_cond_wait only uses WaitForSingleObject with INFINITE timeout, and
> the algorithm relies on that.

I see. But this doesn't look complex awfully. Just move the waker
signaling from within cond_wait under the mutex as well, maybe add
another flag if there is really someone waiting, and that's it. The
costs should be hard to measure, even in line of code.

Jan

-- 
Siemens AG, Corporate Technology, CT RTC ITP SDP-DE
Corporate Competence Center Embedded Linux

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 13:53           ` Jan Kiszka
@ 2012-07-16 14:03             ` Paolo Bonzini
  2012-07-16 14:09               ` Jan Kiszka
  0 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 14:03 UTC (permalink / raw)
  To: Jan Kiszka
  Cc: kwolf@redhat.com, aliguori@linux.vnet.ibm.com, sw@weilnetz.de,
	qemu-devel@nongnu.org, stefanha@linux.vnet.ibm.com

Il 16/07/2012 15:53, Jan Kiszka ha scritto:
>> > 
>> > qemu_cond_wait only uses WaitForSingleObject with INFINITE timeout, and
>> > the algorithm relies on that.
> I see. But this doesn't look complex awfully. Just move the waker
> signaling from within cond_wait under the mutex as well, maybe add
> another flag if there is really someone waiting, and that's it. The
> costs should be hard to measure, even in line of code.

There is still a race after WaitForSingleObject times out.  You need to
catch the mutex before decreasing the number of waiters, and during that
window somebody can broadcast the condition variable.

I'm not saying it's impossible, just that it's hard and I dislike
qemu_cond_timedwait as much as you dislike semaphores. :)

Paolo

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 14:03             ` Paolo Bonzini
@ 2012-07-16 14:09               ` Jan Kiszka
  2012-07-16 14:20                 ` Paolo Bonzini
  0 siblings, 1 reply; 28+ messages in thread
From: Jan Kiszka @ 2012-07-16 14:09 UTC (permalink / raw)
  To: Paolo Bonzini
  Cc: kwolf@redhat.com, aliguori@linux.vnet.ibm.com, sw@weilnetz.de,
	qemu-devel@nongnu.org, stefanha@linux.vnet.ibm.com

On 2012-07-16 16:03, Paolo Bonzini wrote:
> Il 16/07/2012 15:53, Jan Kiszka ha scritto:
>>>>
>>>> qemu_cond_wait only uses WaitForSingleObject with INFINITE timeout, and
>>>> the algorithm relies on that.
>> I see. But this doesn't look complex awfully. Just move the waker
>> signaling from within cond_wait under the mutex as well, maybe add
>> another flag if there is really someone waiting, and that's it. The
>> costs should be hard to measure, even in line of code.
> 
> There is still a race after WaitForSingleObject times out.  You need to
> catch the mutex before decreasing the number of waiters, and during that
> window somebody can broadcast the condition variable.

...and that's why you check what needs to be done to handle this race
after grabbing the mutex. IOW, replicate the state information that the
Windows semaphore contains into the emulated condition variable object.

> 
> I'm not saying it's impossible, just that it's hard and I dislike
> qemu_cond_timedwait as much as you dislike semaphores. :)

As you cannot eliminate our use cases for condition variables, let's
focus on the existing synchronization patterns instead of introducing
new ones + new mechanisms.

Jan

-- 
Siemens AG, Corporate Technology, CT RTC ITP SDP-DE
Corporate Competence Center Embedded Linux

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 14:09               ` Jan Kiszka
@ 2012-07-16 14:20                 ` Paolo Bonzini
  2012-07-24 16:55                   ` Paolo Bonzini
  0 siblings, 1 reply; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-16 14:20 UTC (permalink / raw)
  To: Jan Kiszka
  Cc: kwolf@redhat.com, aliguori@linux.vnet.ibm.com, sw@weilnetz.de,
	qemu-devel@nongnu.org, stefanha@linux.vnet.ibm.com

Il 16/07/2012 16:09, Jan Kiszka ha scritto:
> On 2012-07-16 16:03, Paolo Bonzini wrote:
>> Il 16/07/2012 15:53, Jan Kiszka ha scritto:
>>>>>
>>>>> qemu_cond_wait only uses WaitForSingleObject with INFINITE timeout, and
>>>>> the algorithm relies on that.
>>> I see. But this doesn't look complex awfully. Just move the waker
>>> signaling from within cond_wait under the mutex as well, maybe add
>>> another flag if there is really someone waiting, and that's it. The
>>> costs should be hard to measure, even in line of code.
>>
>> There is still a race after WaitForSingleObject times out.  You need to
>> catch the mutex before decreasing the number of waiters, and during that
>> window somebody can broadcast the condition variable.
> 
> ...and that's why you check what needs to be done to handle this race
> after grabbing the mutex. IOW, replicate the state information that the
> Windows semaphore contains into the emulated condition variable object.

It is already there (cv->waiters), but it is accessed atomically.  To do
what you suggest I would need to add a mutex.

>> I'm not saying it's impossible, just that it's hard and I dislike
>> qemu_cond_timedwait as much as you dislike semaphores. :)
> 
> As you cannot eliminate our use cases for condition variables, let's
> focus on the existing synchronization patterns instead of introducing
> new ones + new mechanisms.

I'm not introducing <fancy new primitive of the day>... semaphores have
the advantage of having better support under Windows (at least older
versions; Microsoft saw the light starting with Vista---yes I know the
oxymoron---and added condvars).

Paolo

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 01/12] event_notifier: enable it to use pipes
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 01/12] event_notifier: enable it to use pipes Paolo Bonzini
@ 2012-07-19 18:58   ` Anthony Liguori
  0 siblings, 0 replies; 28+ messages in thread
From: Anthony Liguori @ 2012-07-19 18:58 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

Paolo Bonzini <pbonzini@redhat.com> writes:

> This takes the eventfd emulation code from the main loop and adds it
> to EventNotifier.  When the EventNotifier is used for the main loop too,
> we need this compatibility code.
>
> Without CONFIG_EVENTFD, event_notifier_get_fd is only usable for the
> "read" side of the notifier, for example to set a select() handler.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  event_notifier.c |   83 +++++++++++++++++++++++++++++++++++++++++++-----------
>  event_notifier.h |    3 +-
>  2 files changed, 69 insertions(+), 17 deletions(-)
>
> diff --git a/event_notifier.c b/event_notifier.c
> index 2c207e1..dde2d32 100644
> --- a/event_notifier.c
> +++ b/event_notifier.c
> @@ -20,48 +20,99 @@
>  
>  void event_notifier_init_fd(EventNotifier *e, int fd)
>  {
> -    e->fd = fd;
> +    e->rfd = fd;
> +    e->wfd = fd;
>  }
>  
>  int event_notifier_init(EventNotifier *e, int active)
>  {
> +    int fds[2];
> +    int ret;
> +
>  #ifdef CONFIG_EVENTFD
> -    int fd = eventfd(!!active, EFD_NONBLOCK | EFD_CLOEXEC);
> -    if (fd < 0)
> -        return -errno;
> -    e->fd = fd;
> -    return 0;
> +    ret = eventfd(0, O_NONBLOCK);
>  #else
> -    return -ENOSYS;
> +    ret = -1;
> +    errno = ENOSYS;
>  #endif
> +    if (ret >= 0) {
> +        e->rfd = e->wfd = ret;
> +        qemu_set_cloexec(ret);

This is kind of redundant with EFD_CLOEXEC, no?

> +    } else {
> +        if (errno != ENOSYS) {
> +            return -errno;
> +        }
> +        if (qemu_pipe(fds) < 0) {
> +            return -errno;
> +        }
> +        ret = fcntl_setfl(fds[0], O_NONBLOCK);
> +        if (ret < 0) {
> +            goto fail;
> +        }
> +        ret = fcntl_setfl(fds[1], O_NONBLOCK);
> +        if (ret < 0) {
> +            goto fail;
> +        }
> +        e->rfd = fds[0];
> +        e->wfd = fds[1];
> +    }
> +    if (active)
> +        event_notifier_set(e);

Missing a curly..

The rest looks good.

Regards,

Anthony Liguori

> +    return 0;
> +
> +fail:
> +    close(fds[0]);
> +    close(fds[1]);
> +    return ret;
>  }
>  
>  void event_notifier_cleanup(EventNotifier *e)
>  {
> -    close(e->fd);
> +    if (e->rfd != e->wfd) {
> +        close(e->rfd);
> +    }
> +    close(e->wfd);
>  }
>  
>  int event_notifier_get_fd(EventNotifier *e)
>  {
> -    return e->fd;
> +    return e->rfd;
>  }
>  
>  int event_notifier_set_handler(EventNotifier *e,
>                                 EventNotifierHandler *handler)
>  {
> -    return qemu_set_fd_handler(e->fd, (IOHandler *)handler, NULL, e);
> +    return qemu_set_fd_handler(e->rfd, (IOHandler *)handler, NULL, e);
>  }
>  
>  int event_notifier_set(EventNotifier *e)
>  {
> -    uint64_t value = 1;
> -    int r = write(e->fd, &value, sizeof(value));
> -    return r == sizeof(value);
> +    static const uint64_t value = 1;
> +    ssize_t ret;
> +
> +    do {
> +        ret = write(e->wfd, &value, sizeof(value));
> +    } while (ret < 0 && errno == EINTR);
> +
> +    /* EAGAIN is fine, a read must be pending.  */
> +    if (ret < 0 && errno != EAGAIN) {
> +        return -1;
> +    }
> +    return 0;
>  }
>  
>  int event_notifier_test_and_clear(EventNotifier *e)
>  {
> -    uint64_t value;
> -    int r = read(e->fd, &value, sizeof(value));
> -    return r == sizeof(value);
> +    int value;
> +    ssize_t len;
> +    char buffer[512];
> +
> +    /* Drain the notify pipe.  For eventfd, only 8 bytes will be read.  */
> +    value = 0;
> +    do {
> +        len = read(e->rfd, buffer, sizeof(buffer));
> +        value |= (len > 0);
> +    } while ((len == -1 && errno == EINTR) || len == sizeof(buffer));
> +
> +    return value;
>  }
> diff --git a/event_notifier.h b/event_notifier.h
> index f0ec2f2..f04d12d 100644
> --- a/event_notifier.h
> +++ b/event_notifier.h
> @@ -16,7 +16,8 @@
>  #include "qemu-common.h"
>  
>  struct EventNotifier {
> -    int fd;
> +    int rfd;
> +    int wfd;
>  };
>  
>  typedef void EventNotifierHandler(EventNotifier *);
> -- 
> 1.7.10.4

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 03/12] main-loop: use event notifiers
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 03/12] main-loop: use event notifiers Paolo Bonzini
@ 2012-07-19 19:04   ` Anthony Liguori
  0 siblings, 0 replies; 28+ messages in thread
From: Anthony Liguori @ 2012-07-19 19:04 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

Paolo Bonzini <pbonzini@redhat.com> writes:

Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>

Regards,

Anthony Liguori

> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  Makefile.objs |    4 +--
>  main-loop.c   |  106 ++++++++-------------------------------------------------
>  oslib-posix.c |   31 -----------------
>  qemu-common.h |    1 -
>  4 files changed, 17 insertions(+), 125 deletions(-)
>
> diff --git a/Makefile.objs b/Makefile.objs
> index ecdfaf9..6ed1981 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -20,8 +20,8 @@ universal-obj-y += $(qom-obj-y)
>  #######################################################################
>  # oslib-obj-y is code depending on the OS (win32 vs posix)
>  oslib-obj-y = osdep.o
> -oslib-obj-$(CONFIG_WIN32) += oslib-win32.o qemu-thread-win32.o
> -oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o
> +oslib-obj-$(CONFIG_WIN32) += oslib-win32.o qemu-thread-win32.o event_notifier-win32.o
> +oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o event_notifier-posix.o
>  
>  #######################################################################
>  # coroutines
> diff --git a/main-loop.c b/main-loop.c
> index eb3b6e6..81f49b3 100644
> --- a/main-loop.c
> +++ b/main-loop.c
> @@ -26,75 +26,12 @@
>  #include "qemu-timer.h"
>  #include "slirp/slirp.h"
>  #include "main-loop.h"
> +#include "event_notifier.h"
>  
>  #ifndef _WIN32
>  
>  #include "compatfd.h"
>  
> -static int io_thread_fd = -1;
> -
> -void qemu_notify_event(void)
> -{
> -    /* Write 8 bytes to be compatible with eventfd.  */
> -    static const uint64_t val = 1;
> -    ssize_t ret;
> -
> -    if (io_thread_fd == -1) {
> -        return;
> -    }
> -    do {
> -        ret = write(io_thread_fd, &val, sizeof(val));
> -    } while (ret < 0 && errno == EINTR);
> -
> -    /* EAGAIN is fine, a read must be pending.  */
> -    if (ret < 0 && errno != EAGAIN) {
> -        fprintf(stderr, "qemu_notify_event: write() failed: %s\n",
> -                strerror(errno));
> -        exit(1);
> -    }
> -}
> -
> -static void qemu_event_read(void *opaque)
> -{
> -    int fd = (intptr_t)opaque;
> -    ssize_t len;
> -    char buffer[512];
> -
> -    /* Drain the notify pipe.  For eventfd, only 8 bytes will be read.  */
> -    do {
> -        len = read(fd, buffer, sizeof(buffer));
> -    } while ((len == -1 && errno == EINTR) || len == sizeof(buffer));
> -}
> -
> -static int qemu_event_init(void)
> -{
> -    int err;
> -    int fds[2];
> -
> -    err = qemu_eventfd(fds);
> -    if (err == -1) {
> -        return -errno;
> -    }
> -    err = fcntl_setfl(fds[0], O_NONBLOCK);
> -    if (err < 0) {
> -        goto fail;
> -    }
> -    err = fcntl_setfl(fds[1], O_NONBLOCK);
> -    if (err < 0) {
> -        goto fail;
> -    }
> -    qemu_set_fd_handler2(fds[0], NULL, qemu_event_read, NULL,
> -                         (void *)(intptr_t)fds[0]);
> -
> -    io_thread_fd = fds[1];
> -    return 0;
> -
> -fail:
> -    close(fds[0]);
> -    close(fds[1]);
> -    return err;
> -}
> -
>  /* If we have signalfd, we mask out the signals we want to handle and then
>   * use signalfd to listen for them.  We rely on whatever the current signal
>   * handler is to dispatch the signals when we receive them.
> @@ -164,40 +101,22 @@ static int qemu_signal_init(void)
>  
>  #else /* _WIN32 */
>  
> -static HANDLE qemu_event_handle = NULL;
> -
> -static void dummy_event_handler(void *opaque)
> -{
> -}
> -
> -static int qemu_event_init(void)
> +static int qemu_signal_init(void)
>  {
> -    qemu_event_handle = CreateEvent(NULL, FALSE, FALSE, NULL);
> -    if (!qemu_event_handle) {
> -        fprintf(stderr, "Failed CreateEvent: %ld\n", GetLastError());
> -        return -1;
> -    }
> -    qemu_add_wait_object(qemu_event_handle, dummy_event_handler, NULL);
>      return 0;
>  }
> +#endif
> +
> +static EventNotifier io_thread_notifier;
> +static int io_thread_initialized;
>  
>  void qemu_notify_event(void)
>  {
> -    if (!qemu_event_handle) {
> +    if (!io_thread_initialized) {
>          return;
>      }
> -    if (!SetEvent(qemu_event_handle)) {
> -        fprintf(stderr, "qemu_notify_event: SetEvent failed: %ld\n",
> -                GetLastError());
> -        exit(1);
> -    }
> -}
> -
> -static int qemu_signal_init(void)
> -{
> -    return 0;
> +    event_notifier_set(&io_thread_notifier);
>  }
> -#endif
>  
>  int main_loop_init(void)
>  {
> @@ -210,11 +129,15 @@ int main_loop_init(void)
>      }
>  
>      /* Note eventfd must be drained before signalfd handlers run */
> -    ret = qemu_event_init();
> +    ret = event_notifier_init(&io_thread_notifier, 0);
>      if (ret) {
>          return ret;
>      }
>  
> +    io_thread_initialized = true;
> +    event_notifier_set_handler(&io_thread_notifier,
> +                               (EventNotifierHandler *)
> +                               event_notifier_test_and_clear);
>      return 0;
>  }
>  
> @@ -400,7 +323,8 @@ void qemu_del_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque)
>  
>  void qemu_fd_register(int fd)
>  {
> -    WSAEventSelect(fd, qemu_event_handle, FD_READ | FD_ACCEPT | FD_CLOSE |
> +    WSAEventSelect(fd, event_notifier_get_handle(&io_thread_notifier),
> +                   FD_READ | FD_ACCEPT | FD_CLOSE |
>                     FD_CONNECT | FD_WRITE | FD_OOB);
>  }
>  
> diff --git a/oslib-posix.c b/oslib-posix.c
> index 6b7ba64..2c6b044 100644
> --- a/oslib-posix.c
> +++ b/oslib-posix.c
> @@ -58,9 +58,6 @@ static int running_on_valgrind = -1;
>  #ifdef CONFIG_LINUX
>  #include <sys/syscall.h>
>  #endif
> -#ifdef CONFIG_EVENTFD
> -#include <sys/eventfd.h>
> -#endif
>  
>  int qemu_get_thread_id(void)
>  {
> @@ -180,34 +177,6 @@ int qemu_pipe(int pipefd[2])
>      return ret;
>  }
>  
> -/*
> - * Creates an eventfd that looks like a pipe and has EFD_CLOEXEC set.
> - */
> -int qemu_eventfd(int fds[2])
> -{
> -#ifdef CONFIG_EVENTFD
> -    int ret;
> -
> -    ret = eventfd(0, 0);
> -    if (ret >= 0) {
> -        fds[0] = ret;
> -        fds[1] = dup(ret);
> -        if (fds[1] == -1) {
> -            close(ret);
> -            return -1;
> -        }
> -        qemu_set_cloexec(ret);
> -        qemu_set_cloexec(fds[1]);
> -        return 0;
> -    }
> -    if (errno != ENOSYS) {
> -        return -1;
> -    }
> -#endif
> -
> -    return qemu_pipe(fds);
> -}
> -
>  int qemu_utimens(const char *path, const struct timespec *times)
>  {
>      struct timeval tv[2], tv_now;
> diff --git a/qemu-common.h b/qemu-common.h
> index 9d9e603..036d254 100644
> --- a/qemu-common.h
> +++ b/qemu-common.h
> @@ -195,7 +195,6 @@ ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags)
>      QEMU_WARN_UNUSED_RESULT;
>  
>  #ifndef _WIN32
> -int qemu_eventfd(int pipefd[2]);
>  int qemu_pipe(int pipefd[2]);
>  #endif
>  
> -- 
> 1.7.10.4

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 06/12] linux-aio: use event notifiers
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 06/12] linux-aio: use event notifiers Paolo Bonzini
@ 2012-07-19 19:10   ` Anthony Liguori
  0 siblings, 0 replies; 28+ messages in thread
From: Anthony Liguori @ 2012-07-19 19:10 UTC (permalink / raw)
  To: Paolo Bonzini, qemu-devel; +Cc: kwolf, aliguori, stefanha, sw

Paolo Bonzini <pbonzini@redhat.com> writes:

> Since linux-aio already uses an eventfd, converting it to use the
> EventNotifier-based API simplifies the code even though it is not
> meant to be portable.
>

Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>

Regards,

Anthony Liguori

> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  linux-aio.c |   49 +++++++++++++++++++------------------------------
>  1 file changed, 19 insertions(+), 30 deletions(-)
>
> diff --git a/linux-aio.c b/linux-aio.c
> index fa0fbf3..779f793 100644
> --- a/linux-aio.c
> +++ b/linux-aio.c
> @@ -10,8 +10,8 @@
>  #include "qemu-common.h"
>  #include "qemu-aio.h"
>  #include "block/raw-posix-aio.h"
> +#include "event_notifier.h"
>  
> -#include <sys/eventfd.h>
>  #include <libaio.h>
>  
>  /*
> @@ -37,7 +37,7 @@ struct qemu_laiocb {
>  
>  struct qemu_laio_state {
>      io_context_t ctx;
> -    int efd;
> +    EventNotifier e;
>      int count;
>  };
>  
> @@ -76,29 +76,17 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
>      qemu_aio_release(laiocb);
>  }
>  
> -static void qemu_laio_completion_cb(void *opaque)
> +static void qemu_laio_completion_cb(EventNotifier *e)
>  {
> -    struct qemu_laio_state *s = opaque;
> +    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
>  
> -    while (1) {
> +    while (event_notifier_test_and_clear(&s->e)) {
>          struct io_event events[MAX_EVENTS];
> -        uint64_t val;
> -        ssize_t ret;
>          struct timespec ts = { 0 };
>          int nevents, i;
>  
>          do {
> -            ret = read(s->efd, &val, sizeof(val));
> -        } while (ret == -1 && errno == EINTR);
> -
> -        if (ret == -1 && errno == EAGAIN)
> -            break;
> -
> -        if (ret != 8)
> -            break;
> -
> -        do {
> -            nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
> +            nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts);
>          } while (nevents == -EINTR);
>  
>          for (i = 0; i < nevents; i++) {
> @@ -112,9 +100,9 @@ static void qemu_laio_completion_cb(void *opaque)
>      }
>  }
>  
> -static int qemu_laio_flush_cb(void *opaque)
> +static int qemu_laio_flush_cb(EventNotifier *e)
>  {
> -    struct qemu_laio_state *s = opaque;
> +    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
>  
>      return (s->count > 0) ? 1 : 0;
>  }
> @@ -146,8 +134,9 @@ static void laio_cancel(BlockDriverAIOCB *blockacb)
>       * We might be able to do this slightly more optimal by removing the
>       * O_NONBLOCK flag.
>       */
> -    while (laiocb->ret == -EINPROGRESS)
> -        qemu_laio_completion_cb(laiocb->ctx);
> +    while (laiocb->ret == -EINPROGRESS) {
> +        qemu_laio_completion_cb(&laiocb->ctx->e);
> +    }
>  }
>  
>  static AIOPool laio_pool = {
> @@ -186,7 +175,7 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
>                          __func__, type);
>          goto out_free_aiocb;
>      }
> -    io_set_eventfd(&laiocb->iocb, s->efd);
> +    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
>      s->count++;
>  
>      if (io_submit(s->ctx, 1, &iocbs) < 0)
> @@ -205,21 +194,21 @@ void *laio_init(void)
>      struct qemu_laio_state *s;
>  
>      s = g_malloc0(sizeof(*s));
> -    s->efd = eventfd(0, 0);
> -    if (s->efd == -1)
> +    if (event_notifier_init(&s->e, false) < 0) {
>          goto out_free_state;
> -    fcntl(s->efd, F_SETFL, O_NONBLOCK);
> +    }
>  
> -    if (io_setup(MAX_EVENTS, &s->ctx) != 0)
> +    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
>          goto out_close_efd;
> +    }
>  
> -    qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, NULL,
> -        qemu_laio_flush_cb, s);
> +    qemu_aio_set_event_notifier(&s->e, qemu_laio_completion_cb,
> +                                qemu_laio_flush_cb);
>  
>      return s;
>  
>  out_close_efd:
> -    close(s->efd);
> +    event_notifier_cleanup(&s->e);
>  out_free_state:
>      g_free(s);
>      return NULL;
> -- 
> 1.7.10.4

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 12/12] raw-win32: add emulated AIO support
  2012-07-16 10:42 ` [Qemu-devel] [PATCH 12/12] raw-win32: add emulated AIO support Paolo Bonzini
@ 2012-07-23 16:35   ` Blue Swirl
  2012-07-23 16:59     ` Paolo Bonzini
  0 siblings, 1 reply; 28+ messages in thread
From: Blue Swirl @ 2012-07-23 16:35 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: kwolf, aliguori, sw, qemu-devel, stefanha

On Mon, Jul 16, 2012 at 10:42 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> The thread pool can be used under Win32 in the same way as in raw-posix.c.
> Move the existing synchronous code into callbacks, and pass the return
> code back.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/raw-win32.c |  189 +++++++++++++++++++++++++++++++++++++++--------------
>  1 file changed, 140 insertions(+), 49 deletions(-)
>
> diff --git a/block/raw-win32.c b/block/raw-win32.c
> index e4b0b75..a50d636 100644
> --- a/block/raw-win32.c
> +++ b/block/raw-win32.c
> @@ -25,6 +25,9 @@
>  #include "qemu-timer.h"
>  #include "block_int.h"
>  #include "module.h"
> +#include "raw-aio.h"
> +#include "trace.h"
> +#include "thread-pool.h"
>  #include <windows.h>
>  #include <winioctl.h>
>
> @@ -32,12 +35,130 @@
>  #define FTYPE_CD     1
>  #define FTYPE_HARDDISK 2
>
> +struct qemu_paiocb {

QEMUPAIOCB

> +    BlockDriverState *bs;
> +    HANDLE hfile;
> +    struct iovec *aio_iov;
> +    int aio_niov;
> +    size_t aio_nbytes;
> +    off_t aio_offset;
> +    int aio_type;
> +};
> +
>  typedef struct BDRVRawState {
>      HANDLE hfile;
>      int type;
>      char drive_path[16]; /* format: "d:\" */
>  } BDRVRawState;
>
> +/*
> + * Read/writes the data to/from a given linear buffer.
> + *
> + * Returns the number of bytes handles or -errno in case of an error. Short
> + * reads are only returned if the end of the file is reached.
> + */
> +static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> +{
> +    size_t offset = 0;
> +    int i;
> +
> +    for (i = 0; i < aiocb->aio_niov; i++) {
> +        OVERLAPPED ov;
> +        DWORD ret, ret_count, len;
> +
> +        memset(&ov, 0, sizeof(ov));
> +        ov.Offset = (aiocb->aio_offset + offset);
> +        ov.OffsetHigh = (aiocb->aio_offset + offset) >> 32;
> +        len = aiocb->aio_iov[i].iov_len;
> +        if (aiocb->aio_type & QEMU_AIO_WRITE) {
> +            ret = WriteFile(aiocb->hfile, aiocb->aio_iov[i].iov_base,
> +                            len, &ret_count, &ov);
> +        } else {
> +            ret = ReadFile(aiocb->hfile, aiocb->aio_iov[i].iov_base,
> +                           len, &ret_count, &ov);
> +        }
> +        if (!ret) {
> +            ret_count = 0;
> +        }
> +        if (ret_count != len) {
> +            break;
> +        }
> +        offset += len;
> +    }
> +
> +    return offset;
> +}
> +
> +static int aio_worker(void *arg)
> +{
> +    struct qemu_paiocb *aiocb = arg;
> +    ssize_t ret = 0;
> +    size_t count;
> +
> +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> +    case QEMU_AIO_READ:
> +        count = handle_aiocb_rw(aiocb);
> +        if (count < aiocb->aio_nbytes && aiocb->bs->growable) {
> +            /* A short read means that we have reached EOF. Pad the buffer
> +             * with zeros for bytes after EOF. */
> +            QEMUIOVector qiov;
> +
> +            qemu_iovec_init_external(&qiov, aiocb->aio_iov,
> +                                     aiocb->aio_niov);
> +            qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - count, count);
> +
> +            count = aiocb->aio_nbytes;
> +        }
> +        if (count == aiocb->aio_nbytes) {
> +            ret = 0;
> +        } else {
> +            ret = -EINVAL;
> +        }
> +        break;
> +    case QEMU_AIO_WRITE:
> +        count = handle_aiocb_rw(aiocb);
> +        if (count == aiocb->aio_nbytes) {
> +            count = 0;
> +        } else {
> +            count = -EINVAL;
> +        }
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        if (!FlushFileBuffers(aiocb->hfile)) {
> +            return -EIO;
> +        }
> +        break;
> +    default:
> +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);

Assert instead?

> +        ret = -EINVAL;
> +        break;
> +    }
> +
> +    g_slice_free(struct qemu_paiocb, aiocb);
> +    return ret;
> +}
> +
> +static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque, int type)
> +{
> +    struct qemu_paiocb *acb = g_slice_new(struct qemu_paiocb);
> +
> +    acb->bs = bs;
> +    acb->hfile = hfile;
> +    acb->aio_type = type;
> +
> +    if (qiov) {
> +        acb->aio_iov = qiov->iov;
> +        acb->aio_niov = qiov->niov;
> +    }
> +    acb->aio_nbytes = nb_sectors * 512;
> +    acb->aio_offset = sector_num * 512;
> +
> +    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
> +    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
> +}
> +
>  int qemu_ftruncate64(int fd, int64_t length)
>  {
>      LARGE_INTEGER li;
> @@ -109,59 +230,29 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
>      return 0;
>  }
>
> -static int raw_read(BlockDriverState *bs, int64_t sector_num,
> -                    uint8_t *buf, int nb_sectors)
> +static BlockDriverAIOCB *raw_aio_readv(BlockDriverState *bs,
> +                         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +                         BlockDriverCompletionFunc *cb, void *opaque)
>  {
>      BDRVRawState *s = bs->opaque;
> -    OVERLAPPED ov;
> -    DWORD ret_count;
> -    int ret;
> -    int64_t offset = sector_num * 512;
> -    int count = nb_sectors * 512;
> -
> -    memset(&ov, 0, sizeof(ov));
> -    ov.Offset = offset;
> -    ov.OffsetHigh = offset >> 32;
> -    ret = ReadFile(s->hfile, buf, count, &ret_count, &ov);
> -    if (!ret)
> -        return ret_count;
> -    if (ret_count == count)
> -        ret_count = 0;
> -    return ret_count;
> +    return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
> +                       cb, opaque, QEMU_AIO_READ);
>  }
>
> -static int raw_write(BlockDriverState *bs, int64_t sector_num,
> -                     const uint8_t *buf, int nb_sectors)
> +static BlockDriverAIOCB *raw_aio_writev(BlockDriverState *bs,
> +                          int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +                          BlockDriverCompletionFunc *cb, void *opaque)
>  {
>      BDRVRawState *s = bs->opaque;
> -    OVERLAPPED ov;
> -    DWORD ret_count;
> -    int ret;
> -    int64_t offset = sector_num * 512;
> -    int count = nb_sectors * 512;
> -
> -    memset(&ov, 0, sizeof(ov));
> -    ov.Offset = offset;
> -    ov.OffsetHigh = offset >> 32;
> -    ret = WriteFile(s->hfile, buf, count, &ret_count, &ov);
> -    if (!ret)
> -        return ret_count;
> -    if (ret_count == count)
> -        ret_count = 0;
> -    return ret_count;
> +    return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
> +                       cb, opaque, QEMU_AIO_WRITE);
>  }
>
> -static int raw_flush(BlockDriverState *bs)
> +static BlockDriverAIOCB *raw_aio_flush(BlockDriverState *bs,
> +                         BlockDriverCompletionFunc *cb, void *opaque)
>  {
>      BDRVRawState *s = bs->opaque;
> -    int ret;
> -
> -    ret = FlushFileBuffers(s->hfile);
> -    if (ret == 0) {
> -        return -EIO;
> -    }
> -
> -    return 0;
> +    return paio_submit(bs, s->hfile, 0, NULL, 0, cb, opaque, QEMU_AIO_FLUSH);
>  }
>
>  static void raw_close(BlockDriverState *bs)
> @@ -282,9 +373,9 @@ static BlockDriver bdrv_file = {
>      .bdrv_close                = raw_close,
>      .bdrv_create       = raw_create,
>
> -    .bdrv_read              = raw_read,
> -    .bdrv_write             = raw_write,
> -    .bdrv_co_flush_to_disk  = raw_flush,
> +    .bdrv_aio_readv     = raw_aio_readv,
> +    .bdrv_aio_writev    = raw_aio_writev,
> +    .bdrv_aio_flush     = raw_aio_flush,
>
>      .bdrv_truncate     = raw_truncate,
>      .bdrv_getlength    = raw_getlength,
> @@ -413,9 +504,9 @@ static BlockDriver bdrv_host_device = {
>      .bdrv_close                = raw_close,
>      .bdrv_has_zero_init = hdev_has_zero_init,
>
> -    .bdrv_read              = raw_read,
> -    .bdrv_write             = raw_write,
> -    .bdrv_co_flush_to_disk  = raw_flush,
> +    .bdrv_aio_readv     = raw_aio_readv,
> +    .bdrv_aio_writev    = raw_aio_writev,
> +    .bdrv_aio_flush     = raw_aio_flush,
>
>      .bdrv_getlength    = raw_getlength,
>      .bdrv_get_allocated_file_size
> --
> 1.7.10.4
>
>

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 12/12] raw-win32: add emulated AIO support
  2012-07-23 16:35   ` Blue Swirl
@ 2012-07-23 16:59     ` Paolo Bonzini
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-23 16:59 UTC (permalink / raw)
  To: Blue Swirl; +Cc: kwolf, aliguori, qemu-devel, stefanha, sw

Il 23/07/2012 18:35, Blue Swirl ha scritto:
>> > +struct qemu_paiocb {
> QEMUPAIOCB

RawWin32AIOData. :)

>> > +    BlockDriverState *bs;
>> > +    HANDLE hfile;
>> > +    struct iovec *aio_iov;
>> > +    int aio_niov;
>> > +    size_t aio_nbytes;
>> > +    off_t aio_offset;
>> > +    int aio_type;
>> > +};
>> > +
>> >  typedef struct BDRVRawState {
>> >      HANDLE hfile;
>> >      int type;
>> >      char drive_path[16]; /* format: "d:\" */
>> >  } BDRVRawState;
>> >
>> > +/*
>> > + * Read/writes the data to/from a given linear buffer.
>> > + *
>> > + * Returns the number of bytes handles or -errno in case of an error. Short
>> > + * reads are only returned if the end of the file is reached.
>> > + */
>> > +static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>> > +{
>> > +    size_t offset = 0;
>> > +    int i;
>> > +
>> > +    for (i = 0; i < aiocb->aio_niov; i++) {
>> > +        OVERLAPPED ov;
>> > +        DWORD ret, ret_count, len;
>> > +
>> > +        memset(&ov, 0, sizeof(ov));
>> > +        ov.Offset = (aiocb->aio_offset + offset);
>> > +        ov.OffsetHigh = (aiocb->aio_offset + offset) >> 32;
>> > +        len = aiocb->aio_iov[i].iov_len;
>> > +        if (aiocb->aio_type & QEMU_AIO_WRITE) {
>> > +            ret = WriteFile(aiocb->hfile, aiocb->aio_iov[i].iov_base,
>> > +                            len, &ret_count, &ov);
>> > +        } else {
>> > +            ret = ReadFile(aiocb->hfile, aiocb->aio_iov[i].iov_base,
>> > +                           len, &ret_count, &ov);
>> > +        }
>> > +        if (!ret) {
>> > +            ret_count = 0;
>> > +        }
>> > +        if (ret_count != len) {
>> > +            break;
>> > +        }
>> > +        offset += len;
>> > +    }
>> > +
>> > +    return offset;
>> > +}
>> > +
>> > +static int aio_worker(void *arg)
>> > +{
>> > +    struct qemu_paiocb *aiocb = arg;
>> > +    ssize_t ret = 0;
>> > +    size_t count;
>> > +
>> > +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
>> > +    case QEMU_AIO_READ:
>> > +        count = handle_aiocb_rw(aiocb);
>> > +        if (count < aiocb->aio_nbytes && aiocb->bs->growable) {
>> > +            /* A short read means that we have reached EOF. Pad the buffer
>> > +             * with zeros for bytes after EOF. */
>> > +            QEMUIOVector qiov;
>> > +
>> > +            qemu_iovec_init_external(&qiov, aiocb->aio_iov,
>> > +                                     aiocb->aio_niov);
>> > +            qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - count, count);
>> > +
>> > +            count = aiocb->aio_nbytes;
>> > +        }
>> > +        if (count == aiocb->aio_nbytes) {
>> > +            ret = 0;
>> > +        } else {
>> > +            ret = -EINVAL;
>> > +        }
>> > +        break;
>> > +    case QEMU_AIO_WRITE:
>> > +        count = handle_aiocb_rw(aiocb);
>> > +        if (count == aiocb->aio_nbytes) {
>> > +            count = 0;
>> > +        } else {
>> > +            count = -EINVAL;
>> > +        }
>> > +        break;
>> > +    case QEMU_AIO_FLUSH:
>> > +        if (!FlushFileBuffers(aiocb->hfile)) {
>> > +            return -EIO;
>> > +        }
>> > +        break;
>> > +    default:
>> > +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> Assert instead?

Yeah, this is cut-and-pasted from posix-aio-compat.c, I'll fix both.

Paolo

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore
  2012-07-16 14:20                 ` Paolo Bonzini
@ 2012-07-24 16:55                   ` Paolo Bonzini
  0 siblings, 0 replies; 28+ messages in thread
From: Paolo Bonzini @ 2012-07-24 16:55 UTC (permalink / raw)
  Cc: kwolf@redhat.com, stefanha@linux.vnet.ibm.com, Jan Kiszka,
	qemu-devel@nongnu.org, aliguori@linux.vnet.ibm.com,
	sw@weilnetz.de

[-- Attachment #1: Type: text/plain, Size: 881 bytes --]

Il 16/07/2012 16:20, Paolo Bonzini ha scritto:
>> > ...and that's why you check what needs to be done to handle this race
>> > after grabbing the mutex. IOW, replicate the state information that the
>> > Windows semaphore contains into the emulated condition variable object.
> It is already there (cv->waiters), but it is accessed atomically.  To do
> what you suggest I would need to add a mutex.

FWIW, I found a good condvar implementation in Chromium, but I really
don't have the time to port it over to QEMU right now.  I still would
like to get the semaphore version in 1.2.

Also, the attached pseudo-patch is an example of using semaphores to
limit the size of the critical sections, and also decrease the number of
threads created.  I'm not proposing to include it now, it's just an
example of things that are harder with condition variables than with
semaphores.

Paolo

[-- Attachment #2: thread-pool-sem.patch --]
[-- Type: text/x-patch, Size: 1400 bytes --]

diff --git a/thread-pool.c b/thread-pool.c
index 7895544..72be971 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -71,20 +72,16 @@ static void *worker_thread(void *unused)
         ThreadPoolElement *req;
         int ret;
 
-        qemu_mutex_lock(&lock);
-        idle_threads++;
-        qemu_mutex_unlock(&lock);
-        ret = qemu_sem_timedwait(&sem, 10000);
-        qemu_mutex_lock(&lock);
-        idle_threads--;
+        atomic_inc(&idle_threads);
+        do {
+            ret = qemu_sem_timedwait(&sem, 10000);
+        } while (ret == -1 && atomic_read(&QTAILQ_FIRST(&request_list)) != NULL);
+        atomic_dec(&idle_threads);
         if (ret == -1) {
-            if (QTAILQ_EMPTY(&request_list)) {
-                break;
-            }
-            qemu_mutex_unlock(&lock);
-            continue;
+            break;
         }
 
+        qemu_mutex_lock(&lock);
         req = QTAILQ_FIRST(&request_list);
         QTAILQ_REMOVE(&request_list, req, reqs);
         req->state = THREAD_ACTIVE;
@@ -226,7 +223,7 @@ BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
     trace_thread_pool_submit(req, arg);
 
     qemu_mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads) {
+    if (atomic_read(&idle_threads) == 0 && cur_threads < max_threads) {
         spawn_thread();
     }
     QTAILQ_INSERT_TAIL(&request_list, req, reqs);

^ permalink raw reply related	[flat|nested] 28+ messages in thread

end of thread, other threads:[~2012-07-24 16:55 UTC | newest]

Thread overview: 28+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-07-16 10:42 [Qemu-devel] [PATCH 00/12] Portable thread-pool/AIO, Win32 emulated AIO Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 01/12] event_notifier: enable it to use pipes Paolo Bonzini
2012-07-19 18:58   ` Anthony Liguori
2012-07-16 10:42 ` [Qemu-devel] [PATCH 02/12] event_notifier: add Win32 implementation Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 03/12] main-loop: use event notifiers Paolo Bonzini
2012-07-19 19:04   ` Anthony Liguori
2012-07-16 10:42 ` [Qemu-devel] [PATCH 04/12] aio: provide platform-independent API Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 05/12] aio: add Win32 implementation Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 06/12] linux-aio: use event notifiers Paolo Bonzini
2012-07-19 19:10   ` Anthony Liguori
2012-07-16 10:42 ` [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore Paolo Bonzini
2012-07-16 12:00   ` Jan Kiszka
2012-07-16 12:01     ` [Qemu-devel] [PATCH] qemu-thread: Introduce qemu_cond_timedwait for POSIX Jan Kiszka
2012-07-16 13:20     ` [Qemu-devel] [PATCH 07/12] qemu-thread: add QemuSemaphore Paolo Bonzini
2012-07-16 13:34       ` Jan Kiszka
2012-07-16 13:35         ` Paolo Bonzini
2012-07-16 13:53           ` Jan Kiszka
2012-07-16 14:03             ` Paolo Bonzini
2012-07-16 14:09               ` Jan Kiszka
2012-07-16 14:20                 ` Paolo Bonzini
2012-07-24 16:55                   ` Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 08/12] aio: add generic thread-pool facility Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 09/12] block: switch posix-aio-compat to threadpool Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 10/12] raw: merge posix-aio-compat.c into block/raw-posix.c Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 11/12] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes Paolo Bonzini
2012-07-16 10:42 ` [Qemu-devel] [PATCH 12/12] raw-win32: add emulated AIO support Paolo Bonzini
2012-07-23 16:35   ` Blue Swirl
2012-07-23 16:59     ` Paolo Bonzini

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).