* [Qemu-devel] [PATCH 00/25] AioContext & threadpool
@ 2012-10-26 14:05 Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 01/25] event_notifier: add Win32 implementation Paolo Bonzini
` (25 more replies)
0 siblings, 26 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Since half of the patches are in common between the two series, here
are both of them together. Under Wine I see a performance regression
due to AIO, but I wouldn't be surprised if it is an emulation artifact
(especially since attempts to use native AIO are converted by Wine to
synchronous I/O + the overhead of signaling). If testing gives the same
results on native Windows it can be reverted later.
Anthony, please let me know if you want to apply this before or after
the rename.
Paolo
Paolo Bonzini (25):
event_notifier: add Win32 implementation
event_notifier: enable it to use pipes
aio: change qemu_aio_set_fd_handler to return void
aio: provide platform-independent API
aio: introduce AioContext, move bottom halves there
aio: add I/O handlers to the AioContext interface
aio: test node->deleted before calling io_flush
aio: add non-blocking variant of aio_wait
aio: prepare for introducing GSource-based dispatch
aio: add Win32 implementation
aio: make AioContexts GSources
aio: add aio_notify
aio: call aio_notify after setting I/O handlers
main-loop: use GSource to poll AIO file descriptors
main-loop: use aio_notify for qemu_notify_event
aio: clean up now-unused functions
linux-aio: use event notifiers
qemu-thread: add QemuSemaphore
aio: add generic thread-pool facility
block: switch posix-aio-compat to threadpool
raw: merge posix-aio-compat.c into block/raw-posix.c
raw-posix: rename raw-posix-aio.h, hide unavailable prototypes
raw-win32: add emulated AIO support
raw-posix: move linux-aio.c to block/
raw-win32: implement native asynchronous I/O
Makefile.objs | 10 +-
aio.c => aio-posix.c | 172 +++++---
aio.c => aio-win32.c | 197 +++++----
async.c | 118 ++++-
block/Makefile.objs | 9 +-
linux-aio.c => block/linux-aio.c | 51 +--
block/{raw-posix-aio.h => raw-aio.h} | 29 +-
block/raw-posix.c | 301 ++++++++++++-
block/raw-win32.c | 221 +++++++---
block/win32-aio.c | 226 ++++++++++
event_notifier-posix.c | 120 +++++
event_notifier.c => event_notifier-win32.c | 48 +-
event_notifier.h | 20 +-
hw/hw.h | 1 +
iohandler.c | 1 +
main-loop.c | 160 +++----
main-loop.h | 56 +--
oslib-posix.c | 31 --
posix-aio-compat.c | 679 -----------------------------
qemu-aio.h | 206 ++++++++-
qemu-char.h | 1 +
qemu-common.h | 2 +-
qemu-coroutine-lock.c | 2 +-
qemu-os-win32.h | 1 -
qemu-thread-posix.c | 74 ++++
qemu-thread-posix.h | 5 +
qemu-thread-win32.c | 35 ++
qemu-thread-win32.h | 4 +
qemu-thread.h | 7 +
thread-pool.c | 279 ++++++++++++
thread-pool.h | 34 ++
trace-events | 5 +
32 file modificati, 1926 inserzioni(+), 1179 rimozioni(-)
copy aio.c => aio-posix.c (44%)
rename aio.c => aio-win32.c (42%)
rename linux-aio.c => block/linux-aio.c (82%)
rename block/{raw-posix-aio.h => raw-aio.h} (71%)
create mode 100644 block/win32-aio.c
create mode 100644 event_notifier-posix.c
rename event_notifier.c => event_notifier-win32.c (49%)
delete mode 100644 posix-aio-compat.c
create mode 100644 thread-pool.c
create mode 100644 thread-pool.h
--
1.7.12.1
^ permalink raw reply [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 01/25] event_notifier: add Win32 implementation
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 02/25] event_notifier: enable it to use pipes Paolo Bonzini
` (24 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
Makefile.objs | 4 ++-
event_notifier.c => event_notifier-posix.c | 0
event_notifier.c => event_notifier-win32.c | 48 +++++++++++++-----------------
event_notifier.h | 17 +++++++++--
qemu-os-win32.h | 1 -
5 file modificati, 38 inserzioni(+), 32 rimozioni(-)
copy event_notifier.c => event_notifier-posix.c (100%)
rename event_notifier.c => event_notifier-win32.c (49%)
diff --git a/Makefile.objs b/Makefile.objs
index 74b3542..a0a3543 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -93,7 +93,9 @@ common-obj-y += bt-host.o bt-vhci.o
common-obj-y += dma-helpers.o
common-obj-y += iov.o acl.o
common-obj-$(CONFIG_POSIX) += compatfd.o
-common-obj-y += notify.o event_notifier.o
+common-obj-y += notify.o
+common-obj-$(CONFIG_POSIX) += event_notifier-posix.o
+common-obj-$(CONFIG_WIN32) += event_notifier-win32.o
common-obj-y += qemu-timer.o qemu-timer-common.o
common-obj-y += qtest.o
common-obj-y += vl.o
diff --git a/event_notifier.c b/event_notifier-posix.c
similarity index 100%
copy from event_notifier.c
copy to event_notifier-posix.c
diff --git a/event_notifier.c b/event_notifier-win32.c
similarity index 49%
rename from event_notifier.c
rename to event_notifier-win32.c
index 2c207e1..c723dad 100644
--- a/event_notifier.c
+++ b/event_notifier-win32.c
@@ -12,56 +12,48 @@
#include "qemu-common.h"
#include "event_notifier.h"
-#include "qemu-char.h"
-
-#ifdef CONFIG_EVENTFD
-#include <sys/eventfd.h>
-#endif
-
-void event_notifier_init_fd(EventNotifier *e, int fd)
-{
- e->fd = fd;
-}
+#include "main-loop.h"
int event_notifier_init(EventNotifier *e, int active)
{
-#ifdef CONFIG_EVENTFD
- int fd = eventfd(!!active, EFD_NONBLOCK | EFD_CLOEXEC);
- if (fd < 0)
- return -errno;
- e->fd = fd;
+ e->event = CreateEvent(NULL, FALSE, FALSE, NULL);
+ assert(e->event);
return 0;
-#else
- return -ENOSYS;
-#endif
}
void event_notifier_cleanup(EventNotifier *e)
{
- close(e->fd);
+ CloseHandle(e->event);
}
-int event_notifier_get_fd(EventNotifier *e)
+HANDLE event_notifier_get_handle(EventNotifier *e)
{
- return e->fd;
+ return e->event;
}
int event_notifier_set_handler(EventNotifier *e,
EventNotifierHandler *handler)
{
- return qemu_set_fd_handler(e->fd, (IOHandler *)handler, NULL, e);
+ if (handler) {
+ return qemu_add_wait_object(e->event, (IOHandler *)handler, e);
+ } else {
+ qemu_del_wait_object(e->event, (IOHandler *)handler, e);
+ return 0;
+ }
}
int event_notifier_set(EventNotifier *e)
{
- uint64_t value = 1;
- int r = write(e->fd, &value, sizeof(value));
- return r == sizeof(value);
+ SetEvent(e->event);
+ return 0;
}
int event_notifier_test_and_clear(EventNotifier *e)
{
- uint64_t value;
- int r = read(e->fd, &value, sizeof(value));
- return r == sizeof(value);
+ int ret = WaitForSingleObject(e->event, 0);
+ if (ret == WAIT_OBJECT_0) {
+ ResetEvent(e->event);
+ return true;
+ }
+ return false;
}
diff --git a/event_notifier.h b/event_notifier.h
index f0ec2f2..b283a49 100644
--- a/event_notifier.h
+++ b/event_notifier.h
@@ -15,18 +15,31 @@
#include "qemu-common.h"
+#ifdef _WIN32
+#include <windows.h>
+#endif
+
struct EventNotifier {
+#ifdef _WIN32
+ HANDLE event;
+#else
int fd;
+#endif
};
typedef void EventNotifierHandler(EventNotifier *);
-void event_notifier_init_fd(EventNotifier *, int fd);
int event_notifier_init(EventNotifier *, int active);
void event_notifier_cleanup(EventNotifier *);
-int event_notifier_get_fd(EventNotifier *);
int event_notifier_set(EventNotifier *);
int event_notifier_test_and_clear(EventNotifier *);
int event_notifier_set_handler(EventNotifier *, EventNotifierHandler *);
+#ifdef CONFIG_POSIX
+void event_notifier_init_fd(EventNotifier *, int fd);
+int event_notifier_get_fd(EventNotifier *);
+#else
+HANDLE event_notifier_get_handle(EventNotifier *);
+#endif
+
#endif
diff --git a/qemu-os-win32.h b/qemu-os-win32.h
index 8ba466d..d0e9234 100644
--- a/qemu-os-win32.h
+++ b/qemu-os-win32.h
@@ -28,7 +28,6 @@
#include <windows.h>
#include <winsock2.h>
-#include "main-loop.h"
/* Workaround for older versions of MinGW. */
#ifndef ECONNREFUSED
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 02/25] event_notifier: enable it to use pipes
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 01/25] event_notifier: add Win32 implementation Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 03/25] aio: change qemu_aio_set_fd_handler to return void Paolo Bonzini
` (23 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
This takes the eventfd emulation code from the main loop. When the
EventNotifier is used for the main loop too, we need this compatibility
code.
Without CONFIG_EVENTFD, event_notifier_get_fd is only usable for the
"read" side of the notifier, for example to set a select() handler.
The return value of event_notifier_set changes to the cleaner 0/-errno.
No caller is actually checking the return value.
Reviewed-by: Anthony Liguori <anthony@codemonkey.ws>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
event_notifier-posix.c | 85 ++++++++++++++++++++++++++++++++++++++++----------
event_notifier.h | 3 +-
2 file modificati, 71 inserzioni(+), 17 rimozioni(-)
diff --git a/event_notifier-posix.c b/event_notifier-posix.c
index 2c207e1..6f3239a 100644
--- a/event_notifier-posix.c
+++ b/event_notifier-posix.c
@@ -20,48 +20,101 @@
void event_notifier_init_fd(EventNotifier *e, int fd)
{
- e->fd = fd;
+ e->rfd = fd;
+ e->wfd = fd;
}
int event_notifier_init(EventNotifier *e, int active)
{
+ int fds[2];
+ int ret;
+
#ifdef CONFIG_EVENTFD
- int fd = eventfd(!!active, EFD_NONBLOCK | EFD_CLOEXEC);
- if (fd < 0)
- return -errno;
- e->fd = fd;
- return 0;
+ ret = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
#else
- return -ENOSYS;
+ ret = -1;
+ errno = ENOSYS;
#endif
+ if (ret >= 0) {
+ e->rfd = e->wfd = ret;
+ } else {
+ if (errno != ENOSYS) {
+ return -errno;
+ }
+ if (qemu_pipe(fds) < 0) {
+ return -errno;
+ }
+ ret = fcntl_setfl(fds[0], O_NONBLOCK);
+ if (ret < 0) {
+ ret = -errno;
+ goto fail;
+ }
+ ret = fcntl_setfl(fds[1], O_NONBLOCK);
+ if (ret < 0) {
+ ret = -errno;
+ goto fail;
+ }
+ e->rfd = fds[0];
+ e->wfd = fds[1];
+ }
+ if (active) {
+ event_notifier_set(e);
+ }
+ return 0;
+
+fail:
+ close(fds[0]);
+ close(fds[1]);
+ return ret;
}
void event_notifier_cleanup(EventNotifier *e)
{
- close(e->fd);
+ if (e->rfd != e->wfd) {
+ close(e->rfd);
+ }
+ close(e->wfd);
}
int event_notifier_get_fd(EventNotifier *e)
{
- return e->fd;
+ return e->rfd;
}
int event_notifier_set_handler(EventNotifier *e,
EventNotifierHandler *handler)
{
- return qemu_set_fd_handler(e->fd, (IOHandler *)handler, NULL, e);
+ return qemu_set_fd_handler(e->rfd, (IOHandler *)handler, NULL, e);
}
int event_notifier_set(EventNotifier *e)
{
- uint64_t value = 1;
- int r = write(e->fd, &value, sizeof(value));
- return r == sizeof(value);
+ static const uint64_t value = 1;
+ ssize_t ret;
+
+ do {
+ ret = write(e->wfd, &value, sizeof(value));
+ } while (ret < 0 && errno == EINTR);
+
+ /* EAGAIN is fine, a read must be pending. */
+ if (ret < 0 && errno != EAGAIN) {
+ return -errno;
+ }
+ return 0;
}
int event_notifier_test_and_clear(EventNotifier *e)
{
- uint64_t value;
- int r = read(e->fd, &value, sizeof(value));
- return r == sizeof(value);
+ int value;
+ ssize_t len;
+ char buffer[512];
+
+ /* Drain the notify pipe. For eventfd, only 8 bytes will be read. */
+ value = 0;
+ do {
+ len = read(e->rfd, buffer, sizeof(buffer));
+ value |= (len > 0);
+ } while ((len == -1 && errno == EINTR) || len == sizeof(buffer));
+
+ return value;
}
diff --git a/event_notifier.h b/event_notifier.h
index b283a49..88b57af 100644
--- a/event_notifier.h
+++ b/event_notifier.h
@@ -23,7 +23,8 @@ struct EventNotifier {
#ifdef _WIN32
HANDLE event;
#else
- int fd;
+ int rfd;
+ int wfd;
#endif
};
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 03/25] aio: change qemu_aio_set_fd_handler to return void
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 01/25] event_notifier: add Win32 implementation Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 02/25] event_notifier: enable it to use pipes Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 04/25] aio: provide platform-independent API Paolo Bonzini
` (22 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
aio.c | 12 +++++-------
qemu-aio.h | 10 +++++-----
2 file modificati, 10 inserzioni(+), 12 rimozioni(-)
diff --git a/aio.c b/aio.c
index c738a4e..e062aab 100644
--- a/aio.c
+++ b/aio.c
@@ -53,11 +53,11 @@ static AioHandler *find_aio_handler(int fd)
return NULL;
}
-int qemu_aio_set_fd_handler(int fd,
- IOHandler *io_read,
- IOHandler *io_write,
- AioFlushHandler *io_flush,
- void *opaque)
+void qemu_aio_set_fd_handler(int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ AioFlushHandler *io_flush,
+ void *opaque)
{
AioHandler *node;
@@ -93,8 +93,6 @@ int qemu_aio_set_fd_handler(int fd,
}
qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque);
-
- return 0;
}
void qemu_aio_flush(void)
diff --git a/qemu-aio.h b/qemu-aio.h
index bfdd35f..27a7e21 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -60,10 +60,10 @@ bool qemu_aio_wait(void);
* Code that invokes AIO completion functions should rely on this function
* instead of qemu_set_fd_handler[2].
*/
-int qemu_aio_set_fd_handler(int fd,
- IOHandler *io_read,
- IOHandler *io_write,
- AioFlushHandler *io_flush,
- void *opaque);
+void qemu_aio_set_fd_handler(int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ AioFlushHandler *io_flush,
+ void *opaque);
#endif
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 04/25] aio: provide platform-independent API
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (2 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 03/25] aio: change qemu_aio_set_fd_handler to return void Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 05/25] aio: introduce AioContext, move bottom halves there Paolo Bonzini
` (21 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
This adds to aio.c a platform-independent API based on EventNotifiers, that
can be used by both POSIX and Win32.
Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
Makefile.objs | 4 ++--
aio.c | 9 +++++++++
qemu-aio.h | 19 ++++++++++++++++++-
3 file modificati, 29 inserzioni(+), 3 rimozioni(-)
diff --git a/Makefile.objs b/Makefile.objs
index a0a3543..6f7b40b 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -46,6 +46,8 @@ block-obj-y += nbd.o block.o blockjob.o aio.o aes.o qemu-config.o
block-obj-y += qemu-progress.o qemu-sockets.o uri.o
block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
+block-obj-$(CONFIG_POSIX) += event_notifier-posix.o
+block-obj-$(CONFIG_WIN32) += event_notifier-win32.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
block-obj-y += block/
@@ -94,8 +96,6 @@ common-obj-y += dma-helpers.o
common-obj-y += iov.o acl.o
common-obj-$(CONFIG_POSIX) += compatfd.o
common-obj-y += notify.o
-common-obj-$(CONFIG_POSIX) += event_notifier-posix.o
-common-obj-$(CONFIG_WIN32) += event_notifier-win32.o
common-obj-y += qemu-timer.o qemu-timer-common.o
common-obj-y += qtest.o
common-obj-y += vl.o
diff --git a/aio.c b/aio.c
index e062aab..44214e1 100644
--- a/aio.c
+++ b/aio.c
@@ -95,6 +95,15 @@ void qemu_aio_set_fd_handler(int fd,
qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque);
}
+void qemu_aio_set_event_notifier(EventNotifier *notifier,
+ EventNotifierHandler *io_read,
+ AioFlushEventNotifierHandler *io_flush)
+{
+ qemu_aio_set_fd_handler(event_notifier_get_fd(notifier),
+ (IOHandler *)io_read, NULL,
+ (AioFlushHandler *)io_flush, notifier);
+}
+
void qemu_aio_flush(void)
{
while (qemu_aio_wait());
diff --git a/qemu-aio.h b/qemu-aio.h
index 27a7e21..dc416a5 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -16,6 +16,7 @@
#include "qemu-common.h"
#include "qemu-char.h"
+#include "event_notifier.h"
typedef struct BlockDriverAIOCB BlockDriverAIOCB;
typedef void BlockDriverCompletionFunc(void *opaque, int ret);
@@ -39,7 +40,7 @@ void *qemu_aio_get(AIOPool *pool, BlockDriverState *bs,
void qemu_aio_release(void *p);
/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
-typedef int (AioFlushHandler)(void *opaque);
+typedef int (AioFlushEventNotifierHandler)(EventNotifier *e);
/* Flush any pending AIO operation. This function will block until all
* outstanding AIO operations have been completed or cancelled. */
@@ -53,6 +54,10 @@ void qemu_aio_flush(void);
* Return whether there is still any pending AIO operation. */
bool qemu_aio_wait(void);
+#ifdef CONFIG_POSIX
+/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
+typedef int (AioFlushHandler)(void *opaque);
+
/* Register a file descriptor and associated callbacks. Behaves very similarly
* to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will
* be invoked when using either qemu_aio_wait() or qemu_aio_flush().
@@ -65,5 +70,17 @@ void qemu_aio_set_fd_handler(int fd,
IOHandler *io_write,
AioFlushHandler *io_flush,
void *opaque);
+#endif
+
+/* Register an event notifier and associated callbacks. Behaves very similarly
+ * to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks
+ * will be invoked when using either qemu_aio_wait() or qemu_aio_flush().
+ *
+ * Code that invokes AIO completion functions should rely on this function
+ * instead of event_notifier_set_handler.
+ */
+void qemu_aio_set_event_notifier(EventNotifier *notifier,
+ EventNotifierHandler *io_read,
+ AioFlushEventNotifierHandler *io_flush);
#endif
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 05/25] aio: introduce AioContext, move bottom halves there
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (3 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 04/25] aio: provide platform-independent API Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 06/25] aio: add I/O handlers to the AioContext interface Paolo Bonzini
` (20 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Start introducing AioContext, which will let us remove globals from
aio.c/async.c, and introduce multiple I/O threads.
The bottom half functions now take an additional AioContext argument.
A bottom half is created with a specific AioContext that remains the
same throughout the lifetime. qemu_bh_new is just a wrapper that
uses a global context.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
async.c | 30 +++++++++----------
hw/hw.h | 1 +
iohandler.c | 1 +
main-loop.c | 18 +++++++++++-
main-loop.h | 55 ++---------------------------------
qemu-aio.h | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++-
qemu-char.h | 1 +
qemu-common.h | 1 +
qemu-coroutine-lock.c | 2 +-
9 file modificati, 118 inserzioni(+), 70 rimozioni(-)
diff --git a/async.c b/async.c
index 85cc641..189ee1b 100644
--- a/async.c
+++ b/async.c
@@ -26,9 +26,6 @@
#include "qemu-aio.h"
#include "main-loop.h"
-/* Anchor of the list of Bottom Halves belonging to the context */
-static struct QEMUBH *first_bh;
-
/***********************************************************/
/* bottom halves (can be seen as timers which expire ASAP) */
@@ -41,27 +38,26 @@ struct QEMUBH {
bool deleted;
};
-QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque)
+QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
{
QEMUBH *bh;
bh = g_malloc0(sizeof(QEMUBH));
bh->cb = cb;
bh->opaque = opaque;
- bh->next = first_bh;
- first_bh = bh;
+ bh->next = ctx->first_bh;
+ ctx->first_bh = bh;
return bh;
}
-int qemu_bh_poll(void)
+int aio_bh_poll(AioContext *ctx)
{
QEMUBH *bh, **bhp, *next;
int ret;
- static int nesting = 0;
- nesting++;
+ ctx->walking_bh++;
ret = 0;
- for (bh = first_bh; bh; bh = next) {
+ for (bh = ctx->first_bh; bh; bh = next) {
next = bh->next;
if (!bh->deleted && bh->scheduled) {
bh->scheduled = 0;
@@ -72,11 +68,11 @@ int qemu_bh_poll(void)
}
}
- nesting--;
+ ctx->walking_bh--;
/* remove deleted bhs */
- if (!nesting) {
- bhp = &first_bh;
+ if (!ctx->walking_bh) {
+ bhp = &ctx->first_bh;
while (*bhp) {
bh = *bhp;
if (bh->deleted) {
@@ -120,11 +116,11 @@ void qemu_bh_delete(QEMUBH *bh)
bh->deleted = 1;
}
-void qemu_bh_update_timeout(uint32_t *timeout)
+void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout)
{
QEMUBH *bh;
- for (bh = first_bh; bh; bh = bh->next) {
+ for (bh = ctx->first_bh; bh; bh = bh->next) {
if (!bh->deleted && bh->scheduled) {
if (bh->idle) {
/* idle bottom halves will be polled at least
@@ -140,3 +136,7 @@ void qemu_bh_update_timeout(uint32_t *timeout)
}
}
+AioContext *aio_context_new(void)
+{
+ return g_new0(AioContext, 1);
+}
diff --git a/hw/hw.h b/hw/hw.h
index b337ee3..f530f6f 100644
--- a/hw/hw.h
+++ b/hw/hw.h
@@ -10,6 +10,7 @@
#include "ioport.h"
#include "irq.h"
+#include "qemu-aio.h"
#include "qemu-file.h"
#include "vmstate.h"
#include "qemu-log.h"
diff --git a/iohandler.c b/iohandler.c
index a2d871b..60460a6 100644
--- a/iohandler.c
+++ b/iohandler.c
@@ -26,6 +26,7 @@
#include "qemu-common.h"
#include "qemu-char.h"
#include "qemu-queue.h"
+#include "qemu-aio.h"
#include "main-loop.h"
#ifndef _WIN32
diff --git a/main-loop.c b/main-loop.c
index eb3b6e6..f0bc515 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -26,6 +26,7 @@
#include "qemu-timer.h"
#include "slirp/slirp.h"
#include "main-loop.h"
+#include "qemu-aio.h"
#ifndef _WIN32
@@ -199,6 +200,8 @@ static int qemu_signal_init(void)
}
#endif
+static AioContext *qemu_aio_context;
+
int main_loop_init(void)
{
int ret;
@@ -215,6 +218,7 @@ int main_loop_init(void)
return ret;
}
+ qemu_aio_context = aio_context_new();
return 0;
}
@@ -478,7 +482,7 @@ int main_loop_wait(int nonblocking)
if (nonblocking) {
timeout = 0;
} else {
- qemu_bh_update_timeout(&timeout);
+ aio_bh_update_timeout(qemu_aio_context, &timeout);
}
/* poll any events */
@@ -507,3 +511,15 @@ int main_loop_wait(int nonblocking)
return ret;
}
+
+/* Functions to operate on the main QEMU AioContext. */
+
+QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque)
+{
+ return aio_bh_new(qemu_aio_context, cb, opaque);
+}
+
+int qemu_bh_poll(void)
+{
+ return aio_bh_poll(qemu_aio_context);
+}
diff --git a/main-loop.h b/main-loop.h
index dce1cd9..a337096 100644
--- a/main-loop.h
+++ b/main-loop.h
@@ -25,6 +25,8 @@
#ifndef QEMU_MAIN_LOOP_H
#define QEMU_MAIN_LOOP_H 1
+#include "qemu-aio.h"
+
#define SIG_IPI SIGUSR1
/**
@@ -173,7 +175,6 @@ void qemu_del_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque);
typedef void IOReadHandler(void *opaque, const uint8_t *buf, int size);
typedef int IOCanReadHandler(void *opaque);
-typedef void IOHandler(void *opaque);
/**
* qemu_set_fd_handler2: Register a file descriptor with the main loop
@@ -254,56 +255,6 @@ int qemu_set_fd_handler(int fd,
IOHandler *fd_write,
void *opaque);
-typedef struct QEMUBH QEMUBH;
-typedef void QEMUBHFunc(void *opaque);
-
-/**
- * qemu_bh_new: Allocate a new bottom half structure.
- *
- * Bottom halves are lightweight callbacks whose invocation is guaranteed
- * to be wait-free, thread-safe and signal-safe. The #QEMUBH structure
- * is opaque and must be allocated prior to its use.
- */
-QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque);
-
-/**
- * qemu_bh_schedule: Schedule a bottom half.
- *
- * Scheduling a bottom half interrupts the main loop and causes the
- * execution of the callback that was passed to qemu_bh_new.
- *
- * Bottom halves that are scheduled from a bottom half handler are instantly
- * invoked. This can create an infinite loop if a bottom half handler
- * schedules itself.
- *
- * @bh: The bottom half to be scheduled.
- */
-void qemu_bh_schedule(QEMUBH *bh);
-
-/**
- * qemu_bh_cancel: Cancel execution of a bottom half.
- *
- * Canceling execution of a bottom half undoes the effect of calls to
- * qemu_bh_schedule without freeing its resources yet. While cancellation
- * itself is also wait-free and thread-safe, it can of course race with the
- * loop that executes bottom halves unless you are holding the iothread
- * mutex. This makes it mostly useless if you are not holding the mutex.
- *
- * @bh: The bottom half to be canceled.
- */
-void qemu_bh_cancel(QEMUBH *bh);
-
-/**
- *qemu_bh_delete: Cancel execution of a bottom half and free its resources.
- *
- * Deleting a bottom half frees the memory that was allocated for it by
- * qemu_bh_new. It also implies canceling the bottom half if it was
- * scheduled.
- *
- * @bh: The bottom half to be deleted.
- */
-void qemu_bh_delete(QEMUBH *bh);
-
#ifdef CONFIG_POSIX
/**
* qemu_add_child_watch: Register a child process for reaping.
@@ -359,8 +310,8 @@ void qemu_fd_register(int fd);
void qemu_iohandler_fill(int *pnfds, fd_set *readfds, fd_set *writefds, fd_set *xfds);
void qemu_iohandler_poll(fd_set *readfds, fd_set *writefds, fd_set *xfds, int rc);
+QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque);
void qemu_bh_schedule_idle(QEMUBH *bh);
int qemu_bh_poll(void);
-void qemu_bh_update_timeout(uint32_t *timeout);
#endif
diff --git a/qemu-aio.h b/qemu-aio.h
index dc416a5..2ed6ad3 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -15,7 +15,6 @@
#define QEMU_AIO_H
#include "qemu-common.h"
-#include "qemu-char.h"
#include "event_notifier.h"
typedef struct BlockDriverAIOCB BlockDriverAIOCB;
@@ -39,9 +38,87 @@ void *qemu_aio_get(AIOPool *pool, BlockDriverState *bs,
BlockDriverCompletionFunc *cb, void *opaque);
void qemu_aio_release(void *p);
+typedef struct AioHandler AioHandler;
+typedef void QEMUBHFunc(void *opaque);
+typedef void IOHandler(void *opaque);
+
+typedef struct AioContext {
+ /* Anchor of the list of Bottom Halves belonging to the context */
+ struct QEMUBH *first_bh;
+
+ /* A simple lock used to protect the first_bh list, and ensure that
+ * no callbacks are removed while we're walking and dispatching callbacks.
+ */
+ int walking_bh;
+} AioContext;
+
/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
typedef int (AioFlushEventNotifierHandler)(EventNotifier *e);
+/**
+ * aio_context_new: Allocate a new AioContext.
+ *
+ * AioContext provide a mini event-loop that can be waited on synchronously.
+ * They also provide bottom halves, a service to execute a piece of code
+ * as soon as possible.
+ */
+AioContext *aio_context_new(void);
+
+/**
+ * aio_bh_new: Allocate a new bottom half structure.
+ *
+ * Bottom halves are lightweight callbacks whose invocation is guaranteed
+ * to be wait-free, thread-safe and signal-safe. The #QEMUBH structure
+ * is opaque and must be allocated prior to its use.
+ */
+QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque);
+
+/**
+ * aio_bh_poll: Poll bottom halves for an AioContext.
+ *
+ * These are internal functions used by the QEMU main loop.
+ */
+int aio_bh_poll(AioContext *ctx);
+void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout);
+
+/**
+ * qemu_bh_schedule: Schedule a bottom half.
+ *
+ * Scheduling a bottom half interrupts the main loop and causes the
+ * execution of the callback that was passed to qemu_bh_new.
+ *
+ * Bottom halves that are scheduled from a bottom half handler are instantly
+ * invoked. This can create an infinite loop if a bottom half handler
+ * schedules itself.
+ *
+ * @bh: The bottom half to be scheduled.
+ */
+void qemu_bh_schedule(QEMUBH *bh);
+
+/**
+ * qemu_bh_cancel: Cancel execution of a bottom half.
+ *
+ * Canceling execution of a bottom half undoes the effect of calls to
+ * qemu_bh_schedule without freeing its resources yet. While cancellation
+ * itself is also wait-free and thread-safe, it can of course race with the
+ * loop that executes bottom halves unless you are holding the iothread
+ * mutex. This makes it mostly useless if you are not holding the mutex.
+ *
+ * @bh: The bottom half to be canceled.
+ */
+void qemu_bh_cancel(QEMUBH *bh);
+
+/**
+ *qemu_bh_delete: Cancel execution of a bottom half and free its resources.
+ *
+ * Deleting a bottom half frees the memory that was allocated for it by
+ * qemu_bh_new. It also implies canceling the bottom half if it was
+ * scheduled.
+ *
+ * @bh: The bottom half to be deleted.
+ */
+void qemu_bh_delete(QEMUBH *bh);
+
/* Flush any pending AIO operation. This function will block until all
* outstanding AIO operations have been completed or cancelled. */
void qemu_aio_flush(void);
diff --git a/qemu-char.h b/qemu-char.h
index 486644b..5087168 100644
--- a/qemu-char.h
+++ b/qemu-char.h
@@ -5,6 +5,7 @@
#include "qemu-queue.h"
#include "qemu-option.h"
#include "qemu-config.h"
+#include "qemu-aio.h"
#include "qobject.h"
#include "qstring.h"
#include "main-loop.h"
diff --git a/qemu-common.h b/qemu-common.h
index b54612b..9e92eb0 100644
--- a/qemu-common.h
+++ b/qemu-common.h
@@ -14,6 +14,7 @@
typedef struct QEMUTimer QEMUTimer;
typedef struct QEMUFile QEMUFile;
+typedef struct QEMUBH QEMUBH;
typedef struct DeviceState DeviceState;
struct Monitor;
diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c
index 26ad76b..9dda3f8 100644
--- a/qemu-coroutine-lock.c
+++ b/qemu-coroutine-lock.c
@@ -26,7 +26,7 @@
#include "qemu-coroutine.h"
#include "qemu-coroutine-int.h"
#include "qemu-queue.h"
-#include "main-loop.h"
+#include "qemu-aio.h"
#include "trace.h"
static QTAILQ_HEAD(, Coroutine) unlock_bh_queue =
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 06/25] aio: add I/O handlers to the AioContext interface
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (4 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 05/25] aio: introduce AioContext, move bottom halves there Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 07/25] aio: test node->deleted before calling io_flush Paolo Bonzini
` (19 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
With this patch, I/O handlers (including event notifier handlers) can be
attached to a single AioContext.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
aio.c | 70 ++++++++++++++++++++++++-------------------------------------
async.c | 6 ++++++
main-loop.c | 33 +++++++++++++++++++++++++++++
qemu-aio.h | 42 ++++++++++++++++++++++++++++++-------
4 file modificati, 101 inserzioni(+), 50 rimozioni(-)
diff --git a/aio.c b/aio.c
index 44214e1..c89f1e9 100644
--- a/aio.c
+++ b/aio.c
@@ -18,17 +18,6 @@
#include "qemu-queue.h"
#include "qemu_socket.h"
-typedef struct AioHandler AioHandler;
-
-/* The list of registered AIO handlers */
-static QLIST_HEAD(, AioHandler) aio_handlers;
-
-/* This is a simple lock used to protect the aio_handlers list. Specifically,
- * it's used to ensure that no callbacks are removed while we're walking and
- * dispatching callbacks.
- */
-static int walking_handlers;
-
struct AioHandler
{
int fd;
@@ -40,11 +29,11 @@ struct AioHandler
QLIST_ENTRY(AioHandler) node;
};
-static AioHandler *find_aio_handler(int fd)
+static AioHandler *find_aio_handler(AioContext *ctx, int fd)
{
AioHandler *node;
- QLIST_FOREACH(node, &aio_handlers, node) {
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (node->fd == fd)
if (!node->deleted)
return node;
@@ -53,21 +42,22 @@ static AioHandler *find_aio_handler(int fd)
return NULL;
}
-void qemu_aio_set_fd_handler(int fd,
- IOHandler *io_read,
- IOHandler *io_write,
- AioFlushHandler *io_flush,
- void *opaque)
+void aio_set_fd_handler(AioContext *ctx,
+ int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ AioFlushHandler *io_flush,
+ void *opaque)
{
AioHandler *node;
- node = find_aio_handler(fd);
+ node = find_aio_handler(ctx, fd);
/* Are we deleting the fd handler? */
if (!io_read && !io_write) {
if (node) {
/* If the lock is held, just mark the node as deleted */
- if (walking_handlers)
+ if (ctx->walking_handlers)
node->deleted = 1;
else {
/* Otherwise, delete it for real. We can't just mark it as
@@ -83,7 +73,7 @@ void qemu_aio_set_fd_handler(int fd,
/* Alloc and insert if it's not already there */
node = g_malloc0(sizeof(AioHandler));
node->fd = fd;
- QLIST_INSERT_HEAD(&aio_handlers, node, node);
+ QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
}
/* Update handler with latest information */
node->io_read = io_read;
@@ -91,25 +81,19 @@ void qemu_aio_set_fd_handler(int fd,
node->io_flush = io_flush;
node->opaque = opaque;
}
-
- qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque);
-}
-
-void qemu_aio_set_event_notifier(EventNotifier *notifier,
- EventNotifierHandler *io_read,
- AioFlushEventNotifierHandler *io_flush)
-{
- qemu_aio_set_fd_handler(event_notifier_get_fd(notifier),
- (IOHandler *)io_read, NULL,
- (AioFlushHandler *)io_flush, notifier);
}
-void qemu_aio_flush(void)
+void aio_set_event_notifier(AioContext *ctx,
+ EventNotifier *notifier,
+ EventNotifierHandler *io_read,
+ AioFlushEventNotifierHandler *io_flush)
{
- while (qemu_aio_wait());
+ aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
+ (IOHandler *)io_read, NULL,
+ (AioFlushHandler *)io_flush, notifier);
}
-bool qemu_aio_wait(void)
+bool aio_wait(AioContext *ctx)
{
AioHandler *node;
fd_set rdfds, wrfds;
@@ -122,18 +106,18 @@ bool qemu_aio_wait(void)
* Do not call select in this case, because it is possible that the caller
* does not need a complete flush (as is the case for qemu_aio_wait loops).
*/
- if (qemu_bh_poll()) {
+ if (aio_bh_poll(ctx)) {
return true;
}
- walking_handlers++;
+ ctx->walking_handlers++;
FD_ZERO(&rdfds);
FD_ZERO(&wrfds);
/* fill fd sets */
busy = false;
- QLIST_FOREACH(node, &aio_handlers, node) {
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
/* If there aren't pending AIO operations, don't invoke callbacks.
* Otherwise, if there are no AIO requests, qemu_aio_wait() would
* wait indefinitely.
@@ -154,7 +138,7 @@ bool qemu_aio_wait(void)
}
}
- walking_handlers--;
+ ctx->walking_handlers--;
/* No AIO operations? Get us out of here */
if (!busy) {
@@ -168,11 +152,11 @@ bool qemu_aio_wait(void)
if (ret > 0) {
/* we have to walk very carefully in case
* qemu_aio_set_fd_handler is called while we're walking */
- node = QLIST_FIRST(&aio_handlers);
+ node = QLIST_FIRST(&ctx->aio_handlers);
while (node) {
AioHandler *tmp;
- walking_handlers++;
+ ctx->walking_handlers++;
if (!node->deleted &&
FD_ISSET(node->fd, &rdfds) &&
@@ -188,9 +172,9 @@ bool qemu_aio_wait(void)
tmp = node;
node = QLIST_NEXT(node, node);
- walking_handlers--;
+ ctx->walking_handlers--;
- if (!walking_handlers && tmp->deleted) {
+ if (!ctx->walking_handlers && tmp->deleted) {
QLIST_REMOVE(tmp, node);
g_free(tmp);
}
diff --git a/async.c b/async.c
index 189ee1b..c99db79 100644
--- a/async.c
+++ b/async.c
@@ -136,7 +136,13 @@ void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout)
}
}
+
AioContext *aio_context_new(void)
{
return g_new0(AioContext, 1);
}
+
+void aio_flush(AioContext *ctx)
+{
+ while (aio_wait(ctx));
+}
diff --git a/main-loop.c b/main-loop.c
index f0bc515..8301fe9 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -523,3 +523,36 @@ int qemu_bh_poll(void)
{
return aio_bh_poll(qemu_aio_context);
}
+
+void qemu_aio_flush(void)
+{
+ aio_flush(qemu_aio_context);
+}
+
+bool qemu_aio_wait(void)
+{
+ return aio_wait(qemu_aio_context);
+}
+
+void qemu_aio_set_fd_handler(int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ AioFlushHandler *io_flush,
+ void *opaque)
+{
+ aio_set_fd_handler(qemu_aio_context, fd, io_read, io_write, io_flush,
+ opaque);
+
+ qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque);
+}
+
+#ifdef CONFIG_POSIX
+void qemu_aio_set_event_notifier(EventNotifier *notifier,
+ EventNotifierHandler *io_read,
+ AioFlushEventNotifierHandler *io_flush)
+{
+ qemu_aio_set_fd_handler(event_notifier_get_fd(notifier),
+ (IOHandler *)io_read, NULL,
+ (AioFlushHandler *)io_flush, notifier);
+}
+#endif
diff --git a/qemu-aio.h b/qemu-aio.h
index 2ed6ad3..f8a93d8 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -15,6 +15,7 @@
#define QEMU_AIO_H
#include "qemu-common.h"
+#include "qemu-queue.h"
#include "event_notifier.h"
typedef struct BlockDriverAIOCB BlockDriverAIOCB;
@@ -43,6 +44,15 @@ typedef void QEMUBHFunc(void *opaque);
typedef void IOHandler(void *opaque);
typedef struct AioContext {
+ /* The list of registered AIO handlers */
+ QLIST_HEAD(, AioHandler) aio_handlers;
+
+ /* This is a simple lock used to protect the aio_handlers list.
+ * Specifically, it's used to ensure that no callbacks are removed while
+ * we're walking and dispatching callbacks.
+ */
+ int walking_handlers;
+
/* Anchor of the list of Bottom Halves belonging to the context */
struct QEMUBH *first_bh;
@@ -121,7 +131,7 @@ void qemu_bh_delete(QEMUBH *bh);
/* Flush any pending AIO operation. This function will block until all
* outstanding AIO operations have been completed or cancelled. */
-void qemu_aio_flush(void);
+void aio_flush(AioContext *ctx);
/* Wait for a single AIO completion to occur. This function will wait
* until a single AIO event has completed and it will ensure something
@@ -129,7 +139,7 @@ void qemu_aio_flush(void);
* result of executing I/O completion or bh callbacks.
*
* Return whether there is still any pending AIO operation. */
-bool qemu_aio_wait(void);
+bool aio_wait(AioContext *ctx);
#ifdef CONFIG_POSIX
/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
@@ -142,11 +152,12 @@ typedef int (AioFlushHandler)(void *opaque);
* Code that invokes AIO completion functions should rely on this function
* instead of qemu_set_fd_handler[2].
*/
-void qemu_aio_set_fd_handler(int fd,
- IOHandler *io_read,
- IOHandler *io_write,
- AioFlushHandler *io_flush,
- void *opaque);
+void aio_set_fd_handler(AioContext *ctx,
+ int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ AioFlushHandler *io_flush,
+ void *opaque);
#endif
/* Register an event notifier and associated callbacks. Behaves very similarly
@@ -156,8 +167,25 @@ void qemu_aio_set_fd_handler(int fd,
* Code that invokes AIO completion functions should rely on this function
* instead of event_notifier_set_handler.
*/
+void aio_set_event_notifier(AioContext *ctx,
+ EventNotifier *notifier,
+ EventNotifierHandler *io_read,
+ AioFlushEventNotifierHandler *io_flush);
+
+/* Functions to operate on the main QEMU AioContext. */
+
+void qemu_aio_flush(void);
+bool qemu_aio_wait(void);
void qemu_aio_set_event_notifier(EventNotifier *notifier,
EventNotifierHandler *io_read,
AioFlushEventNotifierHandler *io_flush);
+#ifdef CONFIG_POSIX
+void qemu_aio_set_fd_handler(int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ AioFlushHandler *io_flush,
+ void *opaque);
+#endif
+
#endif
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 07/25] aio: test node->deleted before calling io_flush
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (5 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 06/25] aio: add I/O handlers to the AioContext interface Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 08/25] aio: add non-blocking variant of aio_wait Paolo Bonzini
` (18 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Otherwise, there could be a case where io_flush accesses freed
memory because it should not have been called.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
aio.c | 2 +-
1 file modificato, 1 inserzione(+). 1 rimozione(-)
diff --git a/aio.c b/aio.c
index c89f1e9..734d2cf 100644
--- a/aio.c
+++ b/aio.c
@@ -122,7 +122,7 @@ bool aio_wait(AioContext *ctx)
* Otherwise, if there are no AIO requests, qemu_aio_wait() would
* wait indefinitely.
*/
- if (node->io_flush) {
+ if (!node->deleted && node->io_flush) {
if (node->io_flush(node->opaque) == 0) {
continue;
}
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 08/25] aio: add non-blocking variant of aio_wait
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (6 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 07/25] aio: test node->deleted before calling io_flush Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 09/25] aio: prepare for introducing GSource-based dispatch Paolo Bonzini
` (17 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
This will be used when polling the GSource attached to an AioContext.
Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
aio.c | 20 +++++++++++++++-----
async.c | 2 +-
main-loop.c | 2 +-
qemu-aio.h | 21 +++++++++++++++------
4 file modificati, 32 inserzioni(+), 13 rimozioni(-)
diff --git a/aio.c b/aio.c
index 734d2cf..1d5e0c6 100644
--- a/aio.c
+++ b/aio.c
@@ -93,13 +93,16 @@ void aio_set_event_notifier(AioContext *ctx,
(AioFlushHandler *)io_flush, notifier);
}
-bool aio_wait(AioContext *ctx)
+bool aio_poll(AioContext *ctx, bool blocking)
{
+ static struct timeval tv0;
AioHandler *node;
fd_set rdfds, wrfds;
int max_fd = -1;
int ret;
- bool busy;
+ bool busy, progress;
+
+ progress = false;
/*
* If there are callbacks left that have been queued, we need to call then.
@@ -107,6 +110,11 @@ bool aio_wait(AioContext *ctx)
* does not need a complete flush (as is the case for qemu_aio_wait loops).
*/
if (aio_bh_poll(ctx)) {
+ blocking = false;
+ progress = true;
+ }
+
+ if (progress && !blocking) {
return true;
}
@@ -142,11 +150,11 @@ bool aio_wait(AioContext *ctx)
/* No AIO operations? Get us out of here */
if (!busy) {
- return false;
+ return progress;
}
/* wait until next event */
- ret = select(max_fd, &rdfds, &wrfds, NULL, NULL);
+ ret = select(max_fd, &rdfds, &wrfds, NULL, blocking ? NULL : &tv0);
/* if we have any readable fds, dispatch event */
if (ret > 0) {
@@ -161,11 +169,13 @@ bool aio_wait(AioContext *ctx)
if (!node->deleted &&
FD_ISSET(node->fd, &rdfds) &&
node->io_read) {
+ progress = true;
node->io_read(node->opaque);
}
if (!node->deleted &&
FD_ISSET(node->fd, &wrfds) &&
node->io_write) {
+ progress = true;
node->io_write(node->opaque);
}
@@ -181,5 +191,5 @@ bool aio_wait(AioContext *ctx)
}
}
- return true;
+ return progress;
}
diff --git a/async.c b/async.c
index c99db79..513bdd7 100644
--- a/async.c
+++ b/async.c
@@ -144,5 +144,5 @@ AioContext *aio_context_new(void)
void aio_flush(AioContext *ctx)
{
- while (aio_wait(ctx));
+ while (aio_poll(ctx, true));
}
diff --git a/main-loop.c b/main-loop.c
index 8301fe9..67800fe 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -531,7 +531,7 @@ void qemu_aio_flush(void)
bool qemu_aio_wait(void)
{
- return aio_wait(qemu_aio_context);
+ return aio_poll(qemu_aio_context, true);
}
void qemu_aio_set_fd_handler(int fd,
diff --git a/qemu-aio.h b/qemu-aio.h
index f8a93d8..f19201e 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -133,13 +133,22 @@ void qemu_bh_delete(QEMUBH *bh);
* outstanding AIO operations have been completed or cancelled. */
void aio_flush(AioContext *ctx);
-/* Wait for a single AIO completion to occur. This function will wait
- * until a single AIO event has completed and it will ensure something
- * has moved before returning. This can issue new pending aio as
- * result of executing I/O completion or bh callbacks.
+/* Progress in completing AIO work to occur. This can issue new pending
+ * aio as a result of executing I/O completion or bh callbacks.
*
- * Return whether there is still any pending AIO operation. */
-bool aio_wait(AioContext *ctx);
+ * If there is no pending AIO operation or completion (bottom half),
+ * return false. If there are pending bottom halves, return true.
+ *
+ * If there are no pending bottom halves, but there are pending AIO
+ * operations, it may not be possible to make any progress without
+ * blocking. If @blocking is true, this function will wait until one
+ * or more AIO events have completed, to ensure something has moved
+ * before returning.
+ *
+ * If @blocking is false, this function will also return false if the
+ * function cannot make any progress without blocking.
+ */
+bool aio_poll(AioContext *ctx, bool blocking);
#ifdef CONFIG_POSIX
/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 09/25] aio: prepare for introducing GSource-based dispatch
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (7 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 08/25] aio: add non-blocking variant of aio_wait Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 10/25] aio: add Win32 implementation Paolo Bonzini
` (16 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
This adds a GPollFD to each AioHandler. It will then be possible to
attach these GPollFDs to a GSource, and from there to the main loop.
aio_wait examines the GPollFDs and avoids calling select() if any
is set (similar to what it does if bottom halves are available).
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
aio.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------
qemu-aio.h | 7 +++++
2 file modificati, 87 inserzioni(+), 13 rimozioni(-)
diff --git a/aio.c b/aio.c
index 1d5e0c6..4424722 100644
--- a/aio.c
+++ b/aio.c
@@ -20,7 +20,7 @@
struct AioHandler
{
- int fd;
+ GPollFD pfd;
IOHandler *io_read;
IOHandler *io_write;
AioFlushHandler *io_flush;
@@ -34,7 +34,7 @@ static AioHandler *find_aio_handler(AioContext *ctx, int fd)
AioHandler *node;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- if (node->fd == fd)
+ if (node->pfd.fd == fd)
if (!node->deleted)
return node;
}
@@ -57,9 +57,10 @@ void aio_set_fd_handler(AioContext *ctx,
if (!io_read && !io_write) {
if (node) {
/* If the lock is held, just mark the node as deleted */
- if (ctx->walking_handlers)
+ if (ctx->walking_handlers) {
node->deleted = 1;
- else {
+ node->pfd.revents = 0;
+ } else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
* releasing the walking_handlers lock.
@@ -72,7 +73,7 @@ void aio_set_fd_handler(AioContext *ctx,
if (node == NULL) {
/* Alloc and insert if it's not already there */
node = g_malloc0(sizeof(AioHandler));
- node->fd = fd;
+ node->pfd.fd = fd;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
}
/* Update handler with latest information */
@@ -80,6 +81,9 @@ void aio_set_fd_handler(AioContext *ctx,
node->io_write = io_write;
node->io_flush = io_flush;
node->opaque = opaque;
+
+ node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP : 0);
+ node->pfd.events |= (io_write ? G_IO_OUT : 0);
}
}
@@ -93,6 +97,32 @@ void aio_set_event_notifier(AioContext *ctx,
(AioFlushHandler *)io_flush, notifier);
}
+bool aio_pending(AioContext *ctx)
+{
+ AioHandler *node;
+
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ int revents;
+
+ /*
+ * FIXME: right now we cannot get G_IO_HUP and G_IO_ERR because
+ * main-loop.c is still select based (due to the slirp legacy).
+ * If main-loop.c ever switches to poll, G_IO_ERR should be
+ * tested too. Dispatching G_IO_ERR to both handlers should be
+ * okay, since handlers need to be ready for spurious wakeups.
+ */
+ revents = node->pfd.revents & node->pfd.events;
+ if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
+ return true;
+ }
+ if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
bool aio_poll(AioContext *ctx, bool blocking)
{
static struct timeval tv0;
@@ -114,6 +144,43 @@ bool aio_poll(AioContext *ctx, bool blocking)
progress = true;
}
+ /*
+ * Then dispatch any pending callbacks from the GSource.
+ *
+ * We have to walk very carefully in case qemu_aio_set_fd_handler is
+ * called while we're walking.
+ */
+ node = QLIST_FIRST(&ctx->aio_handlers);
+ while (node) {
+ AioHandler *tmp;
+ int revents;
+
+ ctx->walking_handlers++;
+
+ revents = node->pfd.revents & node->pfd.events;
+ node->pfd.revents = 0;
+
+ /* See comment in aio_pending. */
+ if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
+ node->io_read(node->opaque);
+ progress = true;
+ }
+ if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
+ node->io_write(node->opaque);
+ progress = true;
+ }
+
+ tmp = node;
+ node = QLIST_NEXT(node, node);
+
+ ctx->walking_handlers--;
+
+ if (!ctx->walking_handlers && tmp->deleted) {
+ QLIST_REMOVE(tmp, node);
+ g_free(tmp);
+ }
+ }
+
if (progress && !blocking) {
return true;
}
@@ -137,12 +204,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
busy = true;
}
if (!node->deleted && node->io_read) {
- FD_SET(node->fd, &rdfds);
- max_fd = MAX(max_fd, node->fd + 1);
+ FD_SET(node->pfd.fd, &rdfds);
+ max_fd = MAX(max_fd, node->pfd.fd + 1);
}
if (!node->deleted && node->io_write) {
- FD_SET(node->fd, &wrfds);
- max_fd = MAX(max_fd, node->fd + 1);
+ FD_SET(node->pfd.fd, &wrfds);
+ max_fd = MAX(max_fd, node->pfd.fd + 1);
}
}
@@ -167,16 +234,16 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers++;
if (!node->deleted &&
- FD_ISSET(node->fd, &rdfds) &&
+ FD_ISSET(node->pfd.fd, &rdfds) &&
node->io_read) {
- progress = true;
node->io_read(node->opaque);
+ progress = true;
}
if (!node->deleted &&
- FD_ISSET(node->fd, &wrfds) &&
+ FD_ISSET(node->pfd.fd, &wrfds) &&
node->io_write) {
- progress = true;
node->io_write(node->opaque);
+ progress = true;
}
tmp = node;
diff --git a/qemu-aio.h b/qemu-aio.h
index f19201e..ac24896 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -133,6 +133,13 @@ void qemu_bh_delete(QEMUBH *bh);
* outstanding AIO operations have been completed or cancelled. */
void aio_flush(AioContext *ctx);
+/* Return whether there are any pending callbacks from the GSource
+ * attached to the AioContext.
+ *
+ * This is used internally in the implementation of the GSource.
+ */
+bool aio_pending(AioContext *ctx);
+
/* Progress in completing AIO work to occur. This can issue new pending
* aio as a result of executing I/O completion or bh callbacks.
*
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 10/25] aio: add Win32 implementation
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (8 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 09/25] aio: prepare for introducing GSource-based dispatch Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 11/25] aio: make AioContexts GSources Paolo Bonzini
` (15 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
The Win32 implementation will only accept EventNotifiers, thus a few
drivers are disabled under Windows. EventNotifiers are a good match
for the GSource implementation, too, because the Win32 port of glib
allows to place their HANDLEs in a GPollFD.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
Makefile.objs | 6 +--
aio.c => aio-posix.c | 0
aio.c => aio-win32.c | 137 ++++++++++++++++-----------------------------------
block/Makefile.objs | 6 ++-
main-loop.c | 2 +-
5 file modificati, 51 inserzioni(+), 100 rimozioni(-)
copy aio.c => aio-posix.c (100%)
rename aio.c => aio-win32.c (56%)
diff --git a/Makefile.objs b/Makefile.objs
index 6f7b40b..da744e1 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -42,12 +42,12 @@ coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o
# block-obj-y is code used by both qemu system emulation and qemu-img
block-obj-y = cutils.o iov.o cache-utils.o qemu-option.o module.o async.o
-block-obj-y += nbd.o block.o blockjob.o aio.o aes.o qemu-config.o
+block-obj-y += nbd.o block.o blockjob.o aes.o qemu-config.o
block-obj-y += qemu-progress.o qemu-sockets.o uri.o
block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
-block-obj-$(CONFIG_POSIX) += event_notifier-posix.o
-block-obj-$(CONFIG_WIN32) += event_notifier-win32.o
+block-obj-$(CONFIG_POSIX) += event_notifier-posix.o aio-posix.o
+block-obj-$(CONFIG_WIN32) += event_notifier-win32.o aio-win32.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
block-obj-y += block/
diff --git a/aio.c b/aio-posix.c
similarity index 100%
copy from aio.c
copy to aio-posix.c
diff --git a/aio.c b/aio-win32.c
similarity index 56%
rename from aio.c
rename to aio-win32.c
index 4424722..9881fdb 100644
--- a/aio.c
+++ b/aio-win32.c
@@ -1,10 +1,12 @@
/*
* QEMU aio implementation
*
- * Copyright IBM, Corp. 2008
+ * Copyright IBM Corp., 2008
+ * Copyright Red Hat Inc., 2012
*
* Authors:
* Anthony Liguori <aliguori@us.ibm.com>
+ * Paolo Bonzini <pbonzini@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2. See
* the COPYING file in the top-level directory.
@@ -18,43 +20,30 @@
#include "qemu-queue.h"
#include "qemu_socket.h"
-struct AioHandler
-{
+struct AioHandler {
+ EventNotifier *e;
+ EventNotifierHandler *io_notify;
+ AioFlushEventNotifierHandler *io_flush;
GPollFD pfd;
- IOHandler *io_read;
- IOHandler *io_write;
- AioFlushHandler *io_flush;
int deleted;
- void *opaque;
QLIST_ENTRY(AioHandler) node;
};
-static AioHandler *find_aio_handler(AioContext *ctx, int fd)
+void aio_set_event_notifier(AioContext *ctx,
+ EventNotifier *e,
+ EventNotifierHandler *io_notify,
+ AioFlushEventNotifierHandler *io_flush)
{
AioHandler *node;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- if (node->pfd.fd == fd)
- if (!node->deleted)
- return node;
+ if (node->e == e && !node->deleted) {
+ break;
+ }
}
- return NULL;
-}
-
-void aio_set_fd_handler(AioContext *ctx,
- int fd,
- IOHandler *io_read,
- IOHandler *io_write,
- AioFlushHandler *io_flush,
- void *opaque)
-{
- AioHandler *node;
-
- node = find_aio_handler(ctx, fd);
-
/* Are we deleting the fd handler? */
- if (!io_read && !io_write) {
+ if (!io_notify) {
if (node) {
/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
@@ -73,49 +62,23 @@ void aio_set_fd_handler(AioContext *ctx,
if (node == NULL) {
/* Alloc and insert if it's not already there */
node = g_malloc0(sizeof(AioHandler));
- node->pfd.fd = fd;
+ node->e = e;
+ node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
+ node->pfd.events = G_IO_IN;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
}
/* Update handler with latest information */
- node->io_read = io_read;
- node->io_write = io_write;
+ node->io_notify = io_notify;
node->io_flush = io_flush;
- node->opaque = opaque;
-
- node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP : 0);
- node->pfd.events |= (io_write ? G_IO_OUT : 0);
}
}
-void aio_set_event_notifier(AioContext *ctx,
- EventNotifier *notifier,
- EventNotifierHandler *io_read,
- AioFlushEventNotifierHandler *io_flush)
-{
- aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
- (IOHandler *)io_read, NULL,
- (AioFlushHandler *)io_flush, notifier);
-}
-
bool aio_pending(AioContext *ctx)
{
AioHandler *node;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- int revents;
-
- /*
- * FIXME: right now we cannot get G_IO_HUP and G_IO_ERR because
- * main-loop.c is still select based (due to the slirp legacy).
- * If main-loop.c ever switches to poll, G_IO_ERR should be
- * tested too. Dispatching G_IO_ERR to both handlers should be
- * okay, since handlers need to be ready for spurious wakeups.
- */
- revents = node->pfd.revents & node->pfd.events;
- if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
- return true;
- }
- if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
+ if (node->pfd.revents && node->io_notify) {
return true;
}
}
@@ -125,12 +88,10 @@ bool aio_pending(AioContext *ctx)
bool aio_poll(AioContext *ctx, bool blocking)
{
- static struct timeval tv0;
AioHandler *node;
- fd_set rdfds, wrfds;
- int max_fd = -1;
- int ret;
+ HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
bool busy, progress;
+ int count;
progress = false;
@@ -153,20 +114,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
node = QLIST_FIRST(&ctx->aio_handlers);
while (node) {
AioHandler *tmp;
- int revents;
ctx->walking_handlers++;
- revents = node->pfd.revents & node->pfd.events;
- node->pfd.revents = 0;
-
- /* See comment in aio_pending. */
- if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
- node->io_read(node->opaque);
- progress = true;
- }
- if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
- node->io_write(node->opaque);
+ if (node->pfd.revents && node->io_notify) {
+ node->pfd.revents = 0;
+ node->io_notify(node->e);
progress = true;
}
@@ -187,29 +140,22 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers++;
- FD_ZERO(&rdfds);
- FD_ZERO(&wrfds);
-
/* fill fd sets */
busy = false;
+ count = 0;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
/* If there aren't pending AIO operations, don't invoke callbacks.
* Otherwise, if there are no AIO requests, qemu_aio_wait() would
* wait indefinitely.
*/
if (!node->deleted && node->io_flush) {
- if (node->io_flush(node->opaque) == 0) {
+ if (node->io_flush(node->e) == 0) {
continue;
}
busy = true;
}
- if (!node->deleted && node->io_read) {
- FD_SET(node->pfd.fd, &rdfds);
- max_fd = MAX(max_fd, node->pfd.fd + 1);
- }
- if (!node->deleted && node->io_write) {
- FD_SET(node->pfd.fd, &wrfds);
- max_fd = MAX(max_fd, node->pfd.fd + 1);
+ if (!node->deleted && node->io_notify) {
+ events[count++] = event_notifier_get_handle(node->e);
}
}
@@ -221,10 +167,17 @@ bool aio_poll(AioContext *ctx, bool blocking)
}
/* wait until next event */
- ret = select(max_fd, &rdfds, &wrfds, NULL, blocking ? NULL : &tv0);
+ for (;;) {
+ int timeout = blocking ? INFINITE : 0;
+ int ret = WaitForMultipleObjects(count, events, FALSE, timeout);
+
+ /* if we have any signaled events, dispatch event */
+ if ((DWORD) (ret - WAIT_OBJECT_0) >= count) {
+ break;
+ }
+
+ blocking = false;
- /* if we have any readable fds, dispatch event */
- if (ret > 0) {
/* we have to walk very carefully in case
* qemu_aio_set_fd_handler is called while we're walking */
node = QLIST_FIRST(&ctx->aio_handlers);
@@ -234,15 +187,9 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers++;
if (!node->deleted &&
- FD_ISSET(node->pfd.fd, &rdfds) &&
- node->io_read) {
- node->io_read(node->opaque);
- progress = true;
- }
- if (!node->deleted &&
- FD_ISSET(node->pfd.fd, &wrfds) &&
- node->io_write) {
- node->io_write(node->opaque);
+ event_notifier_get_handle(node->e) == events[ret - WAIT_OBJECT_0] &&
+ node->io_notify) {
+ node->io_notify(node->e);
progress = true;
}
diff --git a/block/Makefile.objs b/block/Makefile.objs
index 554f429..684765b 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -2,13 +2,17 @@ block-obj-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat
block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
block-obj-y += qed-check.o
-block-obj-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
+block-obj-y += parallels.o blkdebug.o blkverify.o
block-obj-$(CONFIG_WIN32) += raw-win32.o
block-obj-$(CONFIG_POSIX) += raw-posix.o
+
+ifeq ($(CONFIG_POSIX),y)
+block-obj-y += nbd.o sheepdog.o
block-obj-$(CONFIG_LIBISCSI) += iscsi.o
block-obj-$(CONFIG_CURL) += curl.o
block-obj-$(CONFIG_RBD) += rbd.o
block-obj-$(CONFIG_GLUSTERFS) += gluster.o
+endif
common-obj-y += stream.o
common-obj-y += commit.o
diff --git a/main-loop.c b/main-loop.c
index 67800fe..b290c79 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -534,6 +534,7 @@ bool qemu_aio_wait(void)
return aio_poll(qemu_aio_context, true);
}
+#ifdef CONFIG_POSIX
void qemu_aio_set_fd_handler(int fd,
IOHandler *io_read,
IOHandler *io_write,
@@ -546,7 +547,6 @@ void qemu_aio_set_fd_handler(int fd,
qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque);
}
-#ifdef CONFIG_POSIX
void qemu_aio_set_event_notifier(EventNotifier *notifier,
EventNotifierHandler *io_read,
AioFlushEventNotifierHandler *io_flush)
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 11/25] aio: make AioContexts GSources
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (9 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 10/25] aio: add Win32 implementation Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 12/25] aio: add aio_notify Paolo Bonzini
` (14 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
This lets AioContexts be used (optionally) with a glib main loop.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
aio-posix.c | 4 ++++
aio-win32.c | 4 ++++
async.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
qemu-aio.h | 23 ++++++++++++++++++++++
4 file modificati, 95 inserzioni(+). 1 rimozione(-)
diff --git a/aio-posix.c b/aio-posix.c
index 4424722..65b2607 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -56,6 +56,8 @@ void aio_set_fd_handler(AioContext *ctx,
/* Are we deleting the fd handler? */
if (!io_read && !io_write) {
if (node) {
+ g_source_remove_poll(&ctx->source, &node->pfd);
+
/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
node->deleted = 1;
@@ -75,6 +77,8 @@ void aio_set_fd_handler(AioContext *ctx,
node = g_malloc0(sizeof(AioHandler));
node->pfd.fd = fd;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+
+ g_source_add_poll(&ctx->source, &node->pfd);
}
/* Update handler with latest information */
node->io_read = io_read;
diff --git a/aio-win32.c b/aio-win32.c
index 9881fdb..e460bd8 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -45,6 +45,8 @@ void aio_set_event_notifier(AioContext *ctx,
/* Are we deleting the fd handler? */
if (!io_notify) {
if (node) {
+ g_source_remove_poll(&ctx->source, &node->pfd);
+
/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
node->deleted = 1;
@@ -66,6 +68,8 @@ void aio_set_event_notifier(AioContext *ctx,
node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
node->pfd.events = G_IO_IN;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+
+ g_source_add_poll(&ctx->source, &node->pfd);
}
/* Update handler with latest information */
node->io_notify = io_notify;
diff --git a/async.c b/async.c
index 513bdd7..4ffdd98 100644
--- a/async.c
+++ b/async.c
@@ -136,10 +136,73 @@ void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout)
}
}
+static gboolean
+aio_ctx_prepare(GSource *source, gint *timeout)
+{
+ AioContext *ctx = (AioContext *) source;
+ uint32_t wait = -1;
+ aio_bh_update_timeout(ctx, &wait);
+
+ if (wait != -1) {
+ *timeout = MIN(*timeout, wait);
+ return wait == 0;
+ }
+
+ return false;
+}
+
+static gboolean
+aio_ctx_check(GSource *source)
+{
+ AioContext *ctx = (AioContext *) source;
+ QEMUBH *bh;
+
+ for (bh = ctx->first_bh; bh; bh = bh->next) {
+ if (!bh->deleted && bh->scheduled) {
+ return true;
+ }
+ }
+ return aio_pending(ctx);
+}
+
+static gboolean
+aio_ctx_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ AioContext *ctx = (AioContext *) source;
+
+ assert(callback == NULL);
+ aio_poll(ctx, false);
+ return true;
+}
+
+static GSourceFuncs aio_source_funcs = {
+ aio_ctx_prepare,
+ aio_ctx_check,
+ aio_ctx_dispatch,
+ NULL
+};
+
+GSource *aio_get_g_source(AioContext *ctx)
+{
+ g_source_ref(&ctx->source);
+ return &ctx->source;
+}
AioContext *aio_context_new(void)
{
- return g_new0(AioContext, 1);
+ return (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
+}
+
+void aio_context_ref(AioContext *ctx)
+{
+ g_source_ref(&ctx->source);
+}
+
+void aio_context_unref(AioContext *ctx)
+{
+ g_source_unref(&ctx->source);
}
void aio_flush(AioContext *ctx)
diff --git a/qemu-aio.h b/qemu-aio.h
index ac24896..aedf66c 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -44,6 +44,8 @@ typedef void QEMUBHFunc(void *opaque);
typedef void IOHandler(void *opaque);
typedef struct AioContext {
+ GSource source;
+
/* The list of registered AIO handlers */
QLIST_HEAD(, AioHandler) aio_handlers;
@@ -75,6 +77,22 @@ typedef int (AioFlushEventNotifierHandler)(EventNotifier *e);
AioContext *aio_context_new(void);
/**
+ * aio_context_ref:
+ * @ctx: The AioContext to operate on.
+ *
+ * Add a reference to an AioContext.
+ */
+void aio_context_ref(AioContext *ctx);
+
+/**
+ * aio_context_unref:
+ * @ctx: The AioContext to operate on.
+ *
+ * Drop a reference to an AioContext.
+ */
+void aio_context_unref(AioContext *ctx);
+
+/**
* aio_bh_new: Allocate a new bottom half structure.
*
* Bottom halves are lightweight callbacks whose invocation is guaranteed
@@ -188,6 +206,11 @@ void aio_set_event_notifier(AioContext *ctx,
EventNotifierHandler *io_read,
AioFlushEventNotifierHandler *io_flush);
+/* Return a GSource that lets the main loop poll the file descriptors attached
+ * to this AioContext.
+ */
+GSource *aio_get_g_source(AioContext *ctx);
+
/* Functions to operate on the main QEMU AioContext. */
void qemu_aio_flush(void);
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 12/25] aio: add aio_notify
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (10 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 11/25] aio: make AioContexts GSources Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 13/25] aio: call aio_notify after setting I/O handlers Paolo Bonzini
` (13 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
With this change async.c does not rely anymore on any service from
main-loop.c, i.e. it is completely self-contained.
Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
async.c | 30 ++++++++++++++++++++++++++----
qemu-aio.h | 18 ++++++++++++++++++
2 file modificati, 44 inserzioni(+), 4 rimozioni(-)
diff --git a/async.c b/async.c
index 4ffdd98..564526f 100644
--- a/async.c
+++ b/async.c
@@ -30,6 +30,7 @@
/* bottom halves (can be seen as timers which expire ASAP) */
struct QEMUBH {
+ AioContext *ctx;
QEMUBHFunc *cb;
void *opaque;
QEMUBH *next;
@@ -42,6 +43,7 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
{
QEMUBH *bh;
bh = g_malloc0(sizeof(QEMUBH));
+ bh->ctx = ctx;
bh->cb = cb;
bh->opaque = opaque;
bh->next = ctx->first_bh;
@@ -101,8 +103,7 @@ void qemu_bh_schedule(QEMUBH *bh)
return;
bh->scheduled = 1;
bh->idle = 0;
- /* stop the currently executing CPU to execute the BH ASAP */
- qemu_notify_event();
+ aio_notify(bh->ctx);
}
void qemu_bh_cancel(QEMUBH *bh)
@@ -177,11 +178,20 @@ aio_ctx_dispatch(GSource *source,
return true;
}
+static void
+aio_ctx_finalize(GSource *source)
+{
+ AioContext *ctx = (AioContext *) source;
+
+ aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL);
+ event_notifier_cleanup(&ctx->notifier);
+}
+
static GSourceFuncs aio_source_funcs = {
aio_ctx_prepare,
aio_ctx_check,
aio_ctx_dispatch,
- NULL
+ aio_ctx_finalize
};
GSource *aio_get_g_source(AioContext *ctx)
@@ -190,9 +200,21 @@ GSource *aio_get_g_source(AioContext *ctx)
return &ctx->source;
}
+void aio_notify(AioContext *ctx)
+{
+ event_notifier_set(&ctx->notifier);
+}
+
AioContext *aio_context_new(void)
{
- return (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
+ AioContext *ctx;
+ ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
+ event_notifier_init(&ctx->notifier, false);
+ aio_set_event_notifier(ctx, &ctx->notifier,
+ (EventNotifierHandler *)
+ event_notifier_test_and_clear, NULL);
+
+ return ctx;
}
void aio_context_ref(AioContext *ctx)
diff --git a/qemu-aio.h b/qemu-aio.h
index aedf66c..2354617 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -62,6 +62,9 @@ typedef struct AioContext {
* no callbacks are removed while we're walking and dispatching callbacks.
*/
int walking_bh;
+
+ /* Used for aio_notify. */
+ EventNotifier notifier;
} AioContext;
/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
@@ -102,6 +105,21 @@ void aio_context_unref(AioContext *ctx);
QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque);
/**
+ * aio_notify: Force processing of pending events.
+ *
+ * Similar to signaling a condition variable, aio_notify forces
+ * aio_wait to exit, so that the next call will re-examine pending events.
+ * The caller of aio_notify will usually call aio_wait again very soon,
+ * or go through another iteration of the GLib main loop. Hence, aio_notify
+ * also has the side effect of recalculating the sets of file descriptors
+ * that the main loop waits for.
+ *
+ * Calling aio_notify is rarely necessary, because for example scheduling
+ * a bottom half calls it already.
+ */
+void aio_notify(AioContext *ctx);
+
+/**
* aio_bh_poll: Poll bottom halves for an AioContext.
*
* These are internal functions used by the QEMU main loop.
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 13/25] aio: call aio_notify after setting I/O handlers
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (11 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 12/25] aio: add aio_notify Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 14/25] main-loop: use GSource to poll AIO file descriptors Paolo Bonzini
` (12 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
In the current code, this is done by qemu_set_fd_handler2, which is
called by qemu_aio_set_fd_handler. We need to keep the same behavior
even after removing the call to qemu_set_fd_handler2.
Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
aio-posix.c | 2 ++
aio-win32.c | 2 ++
2 file modificati, 4 inserzioni(+)
diff --git a/aio-posix.c b/aio-posix.c
index 65b2607..05cc84e 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -89,6 +89,8 @@ void aio_set_fd_handler(AioContext *ctx,
node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP : 0);
node->pfd.events |= (io_write ? G_IO_OUT : 0);
}
+
+ aio_notify(ctx);
}
void aio_set_event_notifier(AioContext *ctx,
diff --git a/aio-win32.c b/aio-win32.c
index e460bd8..a84eb71 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -75,6 +75,8 @@ void aio_set_event_notifier(AioContext *ctx,
node->io_notify = io_notify;
node->io_flush = io_flush;
}
+
+ aio_notify(ctx);
}
bool aio_pending(AioContext *ctx)
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 14/25] main-loop: use GSource to poll AIO file descriptors
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (12 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 13/25] aio: call aio_notify after setting I/O handlers Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 15/25] main-loop: use aio_notify for qemu_notify_event Paolo Bonzini
` (11 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
This lets us remove the hooks for the main loop in async.c.
Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
main-loop.c | 23 ++++++-----------------
main-loop.h | 1 -
2 file modificati, 6 inserzioni(+), 18 rimozioni(-)
diff --git a/main-loop.c b/main-loop.c
index b290c79..209f699 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -205,6 +205,7 @@ static AioContext *qemu_aio_context;
int main_loop_init(void)
{
int ret;
+ GSource *src;
qemu_mutex_lock_iothread();
ret = qemu_signal_init();
@@ -219,6 +220,9 @@ int main_loop_init(void)
}
qemu_aio_context = aio_context_new();
+ src = aio_get_g_source(qemu_aio_context);
+ g_source_attach(src, NULL);
+ g_source_unref(src);
return 0;
}
@@ -481,8 +485,6 @@ int main_loop_wait(int nonblocking)
if (nonblocking) {
timeout = 0;
- } else {
- aio_bh_update_timeout(qemu_aio_context, &timeout);
}
/* poll any events */
@@ -505,10 +507,6 @@ int main_loop_wait(int nonblocking)
qemu_run_all_timers();
- /* Check bottom-halves last in case any of the earlier events triggered
- them. */
- qemu_bh_poll();
-
return ret;
}
@@ -519,11 +517,6 @@ QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque)
return aio_bh_new(qemu_aio_context, cb, opaque);
}
-int qemu_bh_poll(void)
-{
- return aio_bh_poll(qemu_aio_context);
-}
-
void qemu_aio_flush(void)
{
aio_flush(qemu_aio_context);
@@ -543,16 +536,12 @@ void qemu_aio_set_fd_handler(int fd,
{
aio_set_fd_handler(qemu_aio_context, fd, io_read, io_write, io_flush,
opaque);
-
- qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque);
}
+#endif
void qemu_aio_set_event_notifier(EventNotifier *notifier,
EventNotifierHandler *io_read,
AioFlushEventNotifierHandler *io_flush)
{
- qemu_aio_set_fd_handler(event_notifier_get_fd(notifier),
- (IOHandler *)io_read, NULL,
- (AioFlushHandler *)io_flush, notifier);
+ aio_set_event_notifier(qemu_aio_context, notifier, io_read, io_flush);
}
-#endif
diff --git a/main-loop.h b/main-loop.h
index a337096..c58f38b 100644
--- a/main-loop.h
+++ b/main-loop.h
@@ -312,6 +312,5 @@ void qemu_iohandler_poll(fd_set *readfds, fd_set *writefds, fd_set *xfds, int rc
QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque);
void qemu_bh_schedule_idle(QEMUBH *bh);
-int qemu_bh_poll(void);
#endif
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 15/25] main-loop: use aio_notify for qemu_notify_event
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (13 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 14/25] main-loop: use GSource to poll AIO file descriptors Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 16/25] aio: clean up now-unused functions Paolo Bonzini
` (10 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
main-loop.c | 110 ++++++------------------------------------------------------
1 file modificato, 10 inserzioni(+), 100 rimozioni(-)
diff --git a/main-loop.c b/main-loop.c
index 209f699..978050a 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -32,70 +32,6 @@
#include "compatfd.h"
-static int io_thread_fd = -1;
-
-void qemu_notify_event(void)
-{
- /* Write 8 bytes to be compatible with eventfd. */
- static const uint64_t val = 1;
- ssize_t ret;
-
- if (io_thread_fd == -1) {
- return;
- }
- do {
- ret = write(io_thread_fd, &val, sizeof(val));
- } while (ret < 0 && errno == EINTR);
-
- /* EAGAIN is fine, a read must be pending. */
- if (ret < 0 && errno != EAGAIN) {
- fprintf(stderr, "qemu_notify_event: write() failed: %s\n",
- strerror(errno));
- exit(1);
- }
-}
-
-static void qemu_event_read(void *opaque)
-{
- int fd = (intptr_t)opaque;
- ssize_t len;
- char buffer[512];
-
- /* Drain the notify pipe. For eventfd, only 8 bytes will be read. */
- do {
- len = read(fd, buffer, sizeof(buffer));
- } while ((len == -1 && errno == EINTR) || len == sizeof(buffer));
-}
-
-static int qemu_event_init(void)
-{
- int err;
- int fds[2];
-
- err = qemu_eventfd(fds);
- if (err == -1) {
- return -errno;
- }
- err = fcntl_setfl(fds[0], O_NONBLOCK);
- if (err < 0) {
- goto fail;
- }
- err = fcntl_setfl(fds[1], O_NONBLOCK);
- if (err < 0) {
- goto fail;
- }
- qemu_set_fd_handler2(fds[0], NULL, qemu_event_read, NULL,
- (void *)(intptr_t)fds[0]);
-
- io_thread_fd = fds[1];
- return 0;
-
-fail:
- close(fds[0]);
- close(fds[1]);
- return err;
-}
-
/* If we have signalfd, we mask out the signals we want to handle and then
* use signalfd to listen for them. We rely on whatever the current signal
* handler is to dispatch the signals when we receive them.
@@ -165,35 +101,6 @@ static int qemu_signal_init(void)
#else /* _WIN32 */
-static HANDLE qemu_event_handle = NULL;
-
-static void dummy_event_handler(void *opaque)
-{
-}
-
-static int qemu_event_init(void)
-{
- qemu_event_handle = CreateEvent(NULL, FALSE, FALSE, NULL);
- if (!qemu_event_handle) {
- fprintf(stderr, "Failed CreateEvent: %ld\n", GetLastError());
- return -1;
- }
- qemu_add_wait_object(qemu_event_handle, dummy_event_handler, NULL);
- return 0;
-}
-
-void qemu_notify_event(void)
-{
- if (!qemu_event_handle) {
- return;
- }
- if (!SetEvent(qemu_event_handle)) {
- fprintf(stderr, "qemu_notify_event: SetEvent failed: %ld\n",
- GetLastError());
- exit(1);
- }
-}
-
static int qemu_signal_init(void)
{
return 0;
@@ -202,6 +109,14 @@ static int qemu_signal_init(void)
static AioContext *qemu_aio_context;
+void qemu_notify_event(void)
+{
+ if (!qemu_aio_context) {
+ return;
+ }
+ aio_notify(qemu_aio_context);
+}
+
int main_loop_init(void)
{
int ret;
@@ -213,12 +128,6 @@ int main_loop_init(void)
return ret;
}
- /* Note eventfd must be drained before signalfd handlers run */
- ret = qemu_event_init();
- if (ret) {
- return ret;
- }
-
qemu_aio_context = aio_context_new();
src = aio_get_g_source(qemu_aio_context);
g_source_attach(src, NULL);
@@ -408,7 +317,8 @@ void qemu_del_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque)
void qemu_fd_register(int fd)
{
- WSAEventSelect(fd, qemu_event_handle, FD_READ | FD_ACCEPT | FD_CLOSE |
+ WSAEventSelect(fd, event_notifier_get_handle(&qemu_aio_context->notifier),
+ FD_READ | FD_ACCEPT | FD_CLOSE |
FD_CONNECT | FD_WRITE | FD_OOB);
}
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 16/25] aio: clean up now-unused functions
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (14 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 15/25] main-loop: use aio_notify for qemu_notify_event Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 17/25] linux-aio: use event notifiers Paolo Bonzini
` (9 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Some cleanups can now be made, now that the main loop does not anymore need
hooks into the bottom half code.
Reviewed-by: Anthony Liguori <aliguori@us.ibm.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
async.c | 23 +++++++----------------
oslib-posix.c | 31 -------------------------------
qemu-aio.h | 1 -
qemu-common.h | 1 -
4 file modificati, 7 inserzioni(+), 49 rimozioni(-)
diff --git a/async.c b/async.c
index 564526f..04f9dcb 100644
--- a/async.c
+++ b/async.c
@@ -117,16 +117,20 @@ void qemu_bh_delete(QEMUBH *bh)
bh->deleted = 1;
}
-void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout)
+static gboolean
+aio_ctx_prepare(GSource *source, gint *timeout)
{
+ AioContext *ctx = (AioContext *) source;
QEMUBH *bh;
+ bool scheduled = false;
for (bh = ctx->first_bh; bh; bh = bh->next) {
if (!bh->deleted && bh->scheduled) {
+ scheduled = true;
if (bh->idle) {
/* idle bottom halves will be polled at least
* every 10ms */
- *timeout = MIN(10, *timeout);
+ *timeout = 10;
} else {
/* non-idle bottom halves will be executed
* immediately */
@@ -135,21 +139,8 @@ void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout)
}
}
}
-}
-static gboolean
-aio_ctx_prepare(GSource *source, gint *timeout)
-{
- AioContext *ctx = (AioContext *) source;
- uint32_t wait = -1;
- aio_bh_update_timeout(ctx, &wait);
-
- if (wait != -1) {
- *timeout = MIN(*timeout, wait);
- return wait == 0;
- }
-
- return false;
+ return scheduled;
}
static gboolean
diff --git a/oslib-posix.c b/oslib-posix.c
index dbeb627..9db9c3d 100644
--- a/oslib-posix.c
+++ b/oslib-posix.c
@@ -61,9 +61,6 @@ static int running_on_valgrind = -1;
#ifdef CONFIG_LINUX
#include <sys/syscall.h>
#endif
-#ifdef CONFIG_EVENTFD
-#include <sys/eventfd.h>
-#endif
int qemu_get_thread_id(void)
{
@@ -183,34 +180,6 @@ int qemu_pipe(int pipefd[2])
return ret;
}
-/*
- * Creates an eventfd that looks like a pipe and has EFD_CLOEXEC set.
- */
-int qemu_eventfd(int fds[2])
-{
-#ifdef CONFIG_EVENTFD
- int ret;
-
- ret = eventfd(0, 0);
- if (ret >= 0) {
- fds[0] = ret;
- fds[1] = dup(ret);
- if (fds[1] == -1) {
- close(ret);
- return -1;
- }
- qemu_set_cloexec(ret);
- qemu_set_cloexec(fds[1]);
- return 0;
- }
- if (errno != ENOSYS) {
- return -1;
- }
-#endif
-
- return qemu_pipe(fds);
-}
-
int qemu_utimens(const char *path, const struct timespec *times)
{
struct timeval tv[2], tv_now;
diff --git a/qemu-aio.h b/qemu-aio.h
index 2354617..1b7eb6e 100644
--- a/qemu-aio.h
+++ b/qemu-aio.h
@@ -125,7 +125,6 @@ void aio_notify(AioContext *ctx);
* These are internal functions used by the QEMU main loop.
*/
int aio_bh_poll(AioContext *ctx);
-void aio_bh_update_timeout(AioContext *ctx, uint32_t *timeout);
/**
* qemu_bh_schedule: Schedule a bottom half.
diff --git a/qemu-common.h b/qemu-common.h
index 9e92eb0..9b3ee95 100644
--- a/qemu-common.h
+++ b/qemu-common.h
@@ -219,7 +219,6 @@ ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags)
QEMU_WARN_UNUSED_RESULT;
#ifndef _WIN32
-int qemu_eventfd(int pipefd[2]);
int qemu_pipe(int pipefd[2]);
#endif
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 17/25] linux-aio: use event notifiers
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (15 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 16/25] aio: clean up now-unused functions Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 18/25] qemu-thread: add QemuSemaphore Paolo Bonzini
` (8 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Since linux-aio already uses an eventfd, converting it to use the
EventNotifier-based API simplifies the code even though it is not
meant to be portable.
Reviewed-by: Anthony Liguori <anthony@codemonkey.ws>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
linux-aio.c | 49 +++++++++++++++++++------------------------------
1 file modificato, 19 inserzioni(+), 30 rimozioni(-)
diff --git a/linux-aio.c b/linux-aio.c
index ce9b5d4..769b558 100644
--- a/linux-aio.c
+++ b/linux-aio.c
@@ -10,8 +10,8 @@
#include "qemu-common.h"
#include "qemu-aio.h"
#include "block/raw-posix-aio.h"
+#include "event_notifier.h"
-#include <sys/eventfd.h>
#include <libaio.h>
/*
@@ -37,7 +37,7 @@ struct qemu_laiocb {
struct qemu_laio_state {
io_context_t ctx;
- int efd;
+ EventNotifier e;
int count;
};
@@ -76,29 +76,17 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
qemu_aio_release(laiocb);
}
-static void qemu_laio_completion_cb(void *opaque)
+static void qemu_laio_completion_cb(EventNotifier *e)
{
- struct qemu_laio_state *s = opaque;
+ struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
- while (1) {
+ while (event_notifier_test_and_clear(&s->e)) {
struct io_event events[MAX_EVENTS];
- uint64_t val;
- ssize_t ret;
struct timespec ts = { 0 };
int nevents, i;
do {
- ret = read(s->efd, &val, sizeof(val));
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1 && errno == EAGAIN)
- break;
-
- if (ret != 8)
- break;
-
- do {
- nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
+ nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts);
} while (nevents == -EINTR);
for (i = 0; i < nevents; i++) {
@@ -112,9 +100,9 @@ static void qemu_laio_completion_cb(void *opaque)
}
}
-static int qemu_laio_flush_cb(void *opaque)
+static int qemu_laio_flush_cb(EventNotifier *e)
{
- struct qemu_laio_state *s = opaque;
+ struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
return (s->count > 0) ? 1 : 0;
}
@@ -146,8 +134,9 @@ static void laio_cancel(BlockDriverAIOCB *blockacb)
* We might be able to do this slightly more optimal by removing the
* O_NONBLOCK flag.
*/
- while (laiocb->ret == -EINPROGRESS)
- qemu_laio_completion_cb(laiocb->ctx);
+ while (laiocb->ret == -EINPROGRESS) {
+ qemu_laio_completion_cb(&laiocb->ctx->e);
+ }
}
static AIOPool laio_pool = {
@@ -186,7 +175,7 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
__func__, type);
goto out_free_aiocb;
}
- io_set_eventfd(&laiocb->iocb, s->efd);
+ io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
s->count++;
if (io_submit(s->ctx, 1, &iocbs) < 0)
@@ -205,21 +194,21 @@ void *laio_init(void)
struct qemu_laio_state *s;
s = g_malloc0(sizeof(*s));
- s->efd = eventfd(0, 0);
- if (s->efd == -1)
+ if (event_notifier_init(&s->e, false) < 0) {
goto out_free_state;
- fcntl(s->efd, F_SETFL, O_NONBLOCK);
+ }
- if (io_setup(MAX_EVENTS, &s->ctx) != 0)
+ if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
goto out_close_efd;
+ }
- qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, NULL,
- qemu_laio_flush_cb, s);
+ qemu_aio_set_event_notifier(&s->e, qemu_laio_completion_cb,
+ qemu_laio_flush_cb);
return s;
out_close_efd:
- close(s->efd);
+ event_notifier_cleanup(&s->e);
out_free_state:
g_free(s);
return NULL;
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 18/25] qemu-thread: add QemuSemaphore
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (16 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 17/25] linux-aio: use event notifiers Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-30 18:48 ` Stefan Hajnoczi
2012-10-26 14:05 ` [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility Paolo Bonzini
` (7 subsequent siblings)
25 siblings, 1 reply; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
The new thread pool will use semaphores instead of condition
variables, because QemuCond does not have qemu_cond_timedwait.
(I also like it more this way).
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
qemu-thread-posix.c | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++
qemu-thread-posix.h | 5 ++++
qemu-thread-win32.c | 35 +++++++++++++++++++++++++
qemu-thread-win32.h | 4 +++
qemu-thread.h | 7 +++++
5 file modificati, 125 inserzioni(+)
diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c
index 8fbabda..dea57d4 100644
--- a/qemu-thread-posix.c
+++ b/qemu-thread-posix.c
@@ -17,6 +17,9 @@
#include <signal.h>
#include <stdint.h>
#include <string.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/time.h>
#include "qemu-thread.h"
static void error_exit(int err, const char *msg)
@@ -115,6 +118,77 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
error_exit(err, __func__);
}
+void qemu_sem_init(QemuSemaphore *sem, int init)
+{
+ int rc;
+
+ rc = sem_init(&sem->sem, 0, init);
+ if (rc < 0) {
+ error_exit(errno, __func__);
+ }
+}
+
+void qemu_sem_destroy(QemuSemaphore *sem)
+{
+ int rc;
+
+ rc = sem_destroy(&sem->sem);
+ if (rc < 0) {
+ error_exit(errno, __func__);
+ }
+}
+
+void qemu_sem_post(QemuSemaphore *sem)
+{
+ int rc;
+
+ rc = sem_post(&sem->sem);
+ if (rc < 0) {
+ error_exit(errno, __func__);
+ }
+}
+
+int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
+{
+ int rc;
+
+ if (ms <= 0) {
+ /* This is cheaper than sem_timedwait. */
+ rc = sem_trywait(&sem->sem);
+ if (rc == -1 && errno == EAGAIN) {
+ return -1;
+ }
+ } else {
+ struct timeval tv;
+ struct timespec ts;
+ gettimeofday(&tv, NULL);
+ ts.tv_nsec = tv.tv_usec * 1000 + (ms % 1000) * 1000000;
+ ts.tv_sec = tv.tv_sec + ms / 1000;
+ if (ts.tv_nsec >= 1000000000) {
+ ts.tv_sec++;
+ ts.tv_nsec -= 1000000000;
+ }
+ rc = sem_timedwait(&sem->sem, &ts);
+ if (rc == -1 && errno == ETIMEDOUT) {
+ return -1;
+ }
+ }
+ if (rc < 0) {
+ error_exit(errno, __func__);
+ }
+ return 0;
+}
+
+void qemu_sem_wait(QemuSemaphore *sem)
+{
+ int rc;
+
+ rc = sem_wait(&sem->sem);
+ if (rc < 0) {
+ error_exit(errno, __func__);
+ }
+}
+
void qemu_thread_create(QemuThread *thread,
void *(*start_routine)(void*),
void *arg, int mode)
diff --git a/qemu-thread-posix.h b/qemu-thread-posix.h
index ee4618e..2542c15 100644
--- a/qemu-thread-posix.h
+++ b/qemu-thread-posix.h
@@ -1,6 +1,7 @@
#ifndef __QEMU_THREAD_POSIX_H
#define __QEMU_THREAD_POSIX_H 1
#include "pthread.h"
+#include <semaphore.h>
struct QemuMutex {
pthread_mutex_t lock;
@@ -10,6 +11,10 @@ struct QemuCond {
pthread_cond_t cond;
};
+struct QemuSemaphore {
+ sem_t sem;
+};
+
struct QemuThread {
pthread_t thread;
};
diff --git a/qemu-thread-win32.c b/qemu-thread-win32.c
index 177b398..4b3db60 100644
--- a/qemu-thread-win32.c
+++ b/qemu-thread-win32.c
@@ -192,6 +192,41 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
qemu_mutex_lock(mutex);
}
+void qemu_sem_init(QemuSemaphore *sem, int init)
+{
+ /* Manual reset. */
+ sem->sema = CreateSemaphore(NULL, init, LONG_MAX, NULL);
+}
+
+void qemu_sem_destroy(QemuSemaphore *sem)
+{
+ CloseHandle(sem->sema);
+}
+
+void qemu_sem_post(QemuSemaphore *sem)
+{
+ ReleaseSemaphore(sem->sema, 1, NULL);
+}
+
+int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
+{
+ int rc = WaitForSingleObject(sem->sema, ms);
+ if (rc == WAIT_OBJECT_0) {
+ return 0;
+ }
+ if (rc != WAIT_TIMEOUT) {
+ error_exit(GetLastError(), __func__);
+ }
+ return -1;
+}
+
+void qemu_sem_wait(QemuSemaphore *sem)
+{
+ if (WaitForSingleObject(sem->sema, INFINITE) != WAIT_OBJECT_0) {
+ error_exit(GetLastError(), __func__);
+ }
+}
+
struct QemuThreadData {
/* Passed to win32_start_routine. */
void *(*start_routine)(void *);
diff --git a/qemu-thread-win32.h b/qemu-thread-win32.h
index b9d1be8..13adb95 100644
--- a/qemu-thread-win32.h
+++ b/qemu-thread-win32.h
@@ -13,6 +13,10 @@ struct QemuCond {
HANDLE continue_event;
};
+struct QemuSemaphore {
+ HANDLE sema;
+};
+
typedef struct QemuThreadData QemuThreadData;
struct QemuThread {
QemuThreadData *data;
diff --git a/qemu-thread.h b/qemu-thread.h
index 05fdaaf..3ee2f6b 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -6,6 +6,7 @@
typedef struct QemuMutex QemuMutex;
typedef struct QemuCond QemuCond;
+typedef struct QemuSemaphore QemuSemaphore;
typedef struct QemuThread QemuThread;
#ifdef _WIN32
@@ -38,6 +39,12 @@ void qemu_cond_signal(QemuCond *cond);
void qemu_cond_broadcast(QemuCond *cond);
void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex);
+void qemu_sem_init(QemuSemaphore *sem, int init);
+void qemu_sem_post(QemuSemaphore *sem);
+void qemu_sem_wait(QemuSemaphore *sem);
+int qemu_sem_timedwait(QemuSemaphore *sem, int ms);
+void qemu_sem_destroy(QemuSemaphore *sem);
+
void qemu_thread_create(QemuThread *thread,
void *(*start_routine)(void *),
void *arg, int mode);
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (17 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 18/25] qemu-thread: add QemuSemaphore Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-30 19:13 ` Stefan Hajnoczi
2012-10-26 14:05 ` [Qemu-devel] [PATCH 20/25] block: switch posix-aio-compat to threadpool Paolo Bonzini
` (6 subsequent siblings)
25 siblings, 1 reply; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Add a generic thread-pool. The code is roughly based on posix-aio-compat.c,
with some changes, especially the following:
- use QemuSemaphore instead of QemuCond;
- separate the state of the thread from the return code of the worker
function. The return code is totally opaque for the thread pool;
- do not busy wait when doing cancellation.
A more generic threadpool (but still specific to I/O so that in the future
it can use special scheduling classes or PI mutexes) can have many uses:
it allows more flexibility in raw-posix.c and can more easily be extended
to Win32, and it will also be used to do an msync of the persistent bitmap.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
Makefile.objs | 2 +-
thread-pool.c | 279 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
thread-pool.h | 34 +++++++
trace-events | 5 ++
4 file modificati, 319 inserzioni(+). 1 rimozione(-)
create mode 100644 thread-pool.c
create mode 100644 thread-pool.h
diff --git a/Makefile.objs b/Makefile.objs
index da744e1..4a6d47e 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -43,7 +43,7 @@ coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o
block-obj-y = cutils.o iov.o cache-utils.o qemu-option.o module.o async.o
block-obj-y += nbd.o block.o blockjob.o aes.o qemu-config.o
-block-obj-y += qemu-progress.o qemu-sockets.o uri.o
+block-obj-y += thread-pool.o qemu-progress.o qemu-sockets.o uri.o
block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_POSIX) += event_notifier-posix.o aio-posix.o
diff --git a/thread-pool.c b/thread-pool.c
new file mode 100644
index 0000000..266f12f
--- /dev/null
+++ b/thread-pool.c
@@ -0,0 +1,279 @@
+/*
+ * QEMU block layer thread pool
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright Red Hat, Inc. 2012
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
+ */
+#include "qemu-common.h"
+#include "qemu-queue.h"
+#include "qemu-thread.h"
+#include "osdep.h"
+#include "qemu-coroutine.h"
+#include "trace.h"
+#include "block_int.h"
+#include "event_notifier.h"
+#include "thread-pool.h"
+
+static void do_spawn_thread(void);
+
+typedef struct ThreadPoolElement ThreadPoolElement;
+
+enum ThreadState {
+ THREAD_QUEUED,
+ THREAD_ACTIVE,
+ THREAD_DONE,
+ THREAD_CANCELED,
+};
+
+struct ThreadPoolElement {
+ BlockDriverAIOCB common;
+ ThreadPoolFunc *func;
+ void *arg;
+ enum ThreadState state;
+ int ret;
+
+ QTAILQ_ENTRY(ThreadPoolElement) reqs;
+ QLIST_ENTRY(ThreadPoolElement) all;
+};
+
+static EventNotifier notifier;
+static QemuMutex lock;
+static QemuCond check_cancel;
+static QemuSemaphore sem;
+static int max_threads = 64;
+static int cur_threads;
+static int idle_threads;
+static int new_threads; /* backlog of threads we need to create */
+static int pending_threads; /* threads created but not running yet */
+static int pending_cancellations; /* whether we need a cond_broadcast */
+static QEMUBH *new_thread_bh;
+static QLIST_HEAD(, ThreadPoolElement) head;
+static QTAILQ_HEAD(, ThreadPoolElement) request_list;
+
+static void *worker_thread(void *unused)
+{
+ qemu_mutex_lock(&lock);
+ pending_threads--;
+ qemu_mutex_unlock(&lock);
+ do_spawn_thread();
+
+ while (1) {
+ ThreadPoolElement *req;
+ int ret;
+
+ qemu_mutex_lock(&lock);
+ idle_threads++;
+ qemu_mutex_unlock(&lock);
+ ret = qemu_sem_timedwait(&sem, 10000);
+ qemu_mutex_lock(&lock);
+ idle_threads--;
+ if (ret == -1) {
+ if (QTAILQ_EMPTY(&request_list)) {
+ break;
+ }
+ qemu_mutex_unlock(&lock);
+ continue;
+ }
+
+ req = QTAILQ_FIRST(&request_list);
+ QTAILQ_REMOVE(&request_list, req, reqs);
+ req->state = THREAD_ACTIVE;
+ qemu_mutex_unlock(&lock);
+
+ ret = req->func(req->arg);
+
+ qemu_mutex_lock(&lock);
+ req->state = THREAD_DONE;
+ req->ret = ret;
+ if (pending_cancellations) {
+ qemu_cond_broadcast(&check_cancel);
+ }
+ qemu_mutex_unlock(&lock);
+
+ event_notifier_set(¬ifier);
+ }
+
+ cur_threads--;
+ qemu_mutex_unlock(&lock);
+
+ return NULL;
+}
+
+static void do_spawn_thread(void)
+{
+ QemuThread t;
+
+ qemu_mutex_lock(&lock);
+ if (!new_threads) {
+ qemu_mutex_unlock(&lock);
+ return;
+ }
+
+ new_threads--;
+ pending_threads++;
+
+ qemu_mutex_unlock(&lock);
+
+ qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
+}
+
+static void spawn_thread_bh_fn(void *opaque)
+{
+ do_spawn_thread();
+}
+
+static void spawn_thread(void)
+{
+ cur_threads++;
+ new_threads++;
+ /* If there are threads being created, they will spawn new workers, so
+ * we don't spend time creating many threads in a loop holding a mutex or
+ * starving the current vcpu.
+ *
+ * If there are no idle threads, ask the main thread to create one, so we
+ * inherit the correct affinity instead of the vcpu affinity.
+ */
+ if (!pending_threads) {
+ qemu_bh_schedule(new_thread_bh);
+ }
+}
+
+static void event_notifier_ready(EventNotifier *notifier)
+{
+ ThreadPoolElement *elem, *next;
+
+ event_notifier_test_and_clear(notifier);
+restart:
+ QLIST_FOREACH_SAFE(elem, &head, all, next) {
+ if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
+ continue;
+ }
+ if (elem->state == THREAD_DONE) {
+ trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
+ }
+ if (elem->state == THREAD_DONE && elem->common.cb) {
+ QLIST_REMOVE(elem, all);
+ elem->common.cb(elem->common.opaque, elem->ret);
+ qemu_aio_release(elem);
+ goto restart;
+ } else {
+ /* remove the request */
+ QLIST_REMOVE(elem, all);
+ qemu_aio_release(elem);
+ }
+ }
+}
+
+static int thread_pool_active(EventNotifier *notifier)
+{
+ return !QLIST_EMPTY(&head);
+}
+
+static void thread_pool_cancel(BlockDriverAIOCB *acb)
+{
+ ThreadPoolElement *elem = (ThreadPoolElement *)acb;
+
+ trace_thread_pool_cancel(elem, elem->common.opaque);
+
+ qemu_mutex_lock(&lock);
+ if (elem->state == THREAD_QUEUED &&
+ /* No thread has yet started working on elem. we can try to "steal"
+ * the item from the worker if we can get a signal from the
+ * semaphore. Because this is non-blocking, we can do it with
+ * the lock taken and ensure that elem will remain THREAD_QUEUED.
+ */
+ qemu_sem_timedwait(&sem, 0) == 0) {
+ QTAILQ_REMOVE(&request_list, elem, reqs);
+ elem->state = THREAD_CANCELED;
+ event_notifier_set(¬ifier);
+ } else {
+ pending_cancellations++;
+ while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
+ qemu_cond_wait(&check_cancel, &lock);
+ }
+ pending_cancellations--;
+ }
+ qemu_mutex_unlock(&lock);
+}
+
+static AIOPool thread_pool_cb_pool = {
+ .aiocb_size = sizeof(ThreadPoolElement),
+ .cancel = thread_pool_cancel,
+};
+
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+ BlockDriverCompletionFunc *cb, void *opaque)
+{
+ ThreadPoolElement *req;
+
+ req = qemu_aio_get(&thread_pool_cb_pool, NULL, cb, opaque);
+ req->func = func;
+ req->arg = arg;
+ req->state = THREAD_QUEUED;
+
+ QLIST_INSERT_HEAD(&head, req, all);
+
+ trace_thread_pool_submit(req, arg);
+
+ qemu_mutex_lock(&lock);
+ if (idle_threads == 0 && cur_threads < max_threads) {
+ spawn_thread();
+ }
+ QTAILQ_INSERT_TAIL(&request_list, req, reqs);
+ qemu_mutex_unlock(&lock);
+ qemu_sem_post(&sem);
+ return &req->common;
+}
+
+typedef struct ThreadPoolCo {
+ Coroutine *co;
+ int ret;
+} ThreadPoolCo;
+
+static void thread_pool_co_cb(void *opaque, int ret)
+{
+ ThreadPoolCo *co = opaque;
+
+ co->ret = ret;
+ qemu_coroutine_enter(co->co, NULL);
+}
+
+int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
+{
+ ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
+ assert(qemu_in_coroutine());
+ thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
+ qemu_coroutine_yield();
+ return tpc.ret;
+}
+
+void thread_pool_submit(ThreadPoolFunc *func, void *arg)
+{
+ thread_pool_submit_aio(func, arg, NULL, NULL);
+}
+
+static void thread_pool_init(void)
+{
+ QLIST_INIT(&head);
+ event_notifier_init(¬ifier, false);
+ qemu_mutex_init(&lock);
+ qemu_cond_init(&check_cancel);
+ qemu_sem_init(&sem, 0);
+ qemu_aio_set_event_notifier(¬ifier, event_notifier_ready,
+ thread_pool_active);
+
+ QTAILQ_INIT(&request_list);
+ new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
+}
+
+block_init(thread_pool_init)
diff --git a/thread-pool.h b/thread-pool.h
new file mode 100644
index 0000000..378a4ac
--- /dev/null
+++ b/thread-pool.h
@@ -0,0 +1,34 @@
+/*
+ * QEMU block layer thread pool
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright Red Hat, Inc. 2012
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
+ */
+
+#ifndef QEMU_THREAD_POOL_H
+#define QEMU_THREAD_POOL_H 1
+
+#include "qemu-common.h"
+#include "qemu-queue.h"
+#include "qemu-thread.h"
+#include "qemu-coroutine.h"
+#include "block_int.h"
+
+typedef int ThreadPoolFunc(void *opaque);
+
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+ BlockDriverCompletionFunc *cb, void *opaque);
+int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
+void thread_pool_submit(ThreadPoolFunc *func, void *arg);
+
+#endif
diff --git a/trace-events b/trace-events
index e2d4580..58c18eb 100644
--- a/trace-events
+++ b/trace-events
@@ -90,6 +90,11 @@ virtio_blk_rw_complete(void *req, int ret) "req %p ret %d"
virtio_blk_handle_write(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
virtio_blk_handle_read(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
+# thread-pool.c
+thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
+thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
+thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
+
# posix-aio-compat.c
paio_submit(void *acb, void *opaque, int64_t sector_num, int nb_sectors, int type) "acb %p opaque %p sector_num %"PRId64" nb_sectors %d type %d"
paio_complete(void *acb, void *opaque, int ret) "acb %p opaque %p ret %d"
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 20/25] block: switch posix-aio-compat to threadpool
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (18 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 21/25] raw: merge posix-aio-compat.c into block/raw-posix.c Paolo Bonzini
` (5 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
This is not meant for portability, but to remove code duplication.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
block/raw-posix-aio.h | 1 -
block/raw-posix.c | 5 -
posix-aio-compat.c | 423 +++++---------------------------------------------
3 file modificati, 37 inserzioni(+), 392 rimozioni(-)
diff --git a/block/raw-posix-aio.h b/block/raw-posix-aio.h
index ba118f6..6725135 100644
--- a/block/raw-posix-aio.h
+++ b/block/raw-posix-aio.h
@@ -28,7 +28,6 @@
/* posix-aio-compat.c - thread pool based implementation */
-int paio_init(void);
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
BlockDriverCompletionFunc *cb, void *opaque, int type);
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 28d439f..82b1a19 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -266,11 +266,6 @@ static int raw_open_common(BlockDriverState *bs, const char *filename,
}
s->fd = fd;
- /* We're falling back to POSIX AIO in some cases so init always */
- if (paio_init() < 0) {
- goto out_close;
- }
-
#ifdef CONFIG_LINUX_AIO
if (raw_set_aio(&s->aio_ctx, &s->use_aio, bdrv_flags)) {
goto out_close;
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 96e4daf..4a1e3d3 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -28,13 +28,12 @@
#include "sysemu.h"
#include "qemu-common.h"
#include "trace.h"
+#include "thread-pool.h"
#include "block_int.h"
#include "iov.h"
#include "block/raw-posix-aio.h"
-static void do_spawn_thread(void);
-
struct qemu_paiocb {
BlockDriverAIOCB common;
int aio_fildes;
@@ -46,82 +45,15 @@ struct qemu_paiocb {
size_t aio_nbytes;
#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */
off_t aio_offset;
-
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
- ssize_t ret;
- int active;
- struct qemu_paiocb *next;
};
-typedef struct PosixAioState {
- int rfd, wfd;
- struct qemu_paiocb *first_aio;
-} PosixAioState;
-
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static int new_threads = 0; /* backlog of threads we need to create */
-static int pending_threads = 0; /* threads created but not running yet */
-static QEMUBH *new_thread_bh;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
static int preadv_present = 0;
#endif
-static void die2(int err, const char *what)
-{
- fprintf(stderr, "%s failed: %s\n", what, strerror(err));
- abort();
-}
-
-static void die(const char *what)
-{
- die2(errno, what);
-}
-
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -310,286 +242,54 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void posix_aio_notify_event(void);
-
-static void *aio_thread(void *unused)
+static int aio_worker(void *arg)
{
- mutex_lock(&lock);
- pending_threads--;
- mutex_unlock(&lock);
- do_spawn_thread();
-
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
+ struct qemu_paiocb *aiocb = arg;
+ ssize_t ret = 0;
- mutex_lock(&lock);
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ ret = handle_aiocb_rw(aiocb);
+ if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
+ /* A short read means that we have reached EOF. Pad the buffer
+ * with zeros for bytes after EOF. */
+ iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret,
+ 0, aiocb->aio_nbytes - ret);
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- idle_threads++;
- ret = cond_timedwait(&cond, &lock, &ts);
- idle_threads--;
+ ret = aiocb->aio_nbytes;
}
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- ret = handle_aiocb_rw(aiocb);
- if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
- /* A short read means that we have reached EOF. Pad the buffer
- * with zeros for bytes after EOF. */
- iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret,
- 0, aiocb->aio_nbytes - ret);
-
- ret = aiocb->aio_nbytes;
- }
- break;
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ if (ret == aiocb->aio_nbytes) {
+ ret = 0;
+ } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
ret = -EINVAL;
- break;
}
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- mutex_unlock(&lock);
-
- posix_aio_notify_event();
- }
-
- cur_threads--;
- mutex_unlock(&lock);
-
- return NULL;
-}
-
-static void do_spawn_thread(void)
-{
- sigset_t set, oldset;
-
- mutex_lock(&lock);
- if (!new_threads) {
- mutex_unlock(&lock);
- return;
- }
-
- new_threads--;
- pending_threads++;
-
- mutex_unlock(&lock);
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
-}
-
-static void spawn_thread_bh_fn(void *opaque)
-{
- do_spawn_thread();
-}
-
-static void spawn_thread(void)
-{
- cur_threads++;
- new_threads++;
- /* If there are threads being created, they will spawn new workers, so
- * we don't spend time creating many threads in a loop holding a mutex or
- * starving the current vcpu.
- *
- * If there are no idle threads, ask the main thread to create one, so we
- * inherit the correct affinity instead of the vcpu affinity.
- */
- if (!pending_threads) {
- qemu_bh_schedule(new_thread_bh);
- }
-}
-
-static void qemu_paio_submit(struct qemu_paiocb *aiocb)
-{
- aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
-}
-
-static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
-{
- ssize_t ret;
-
- mutex_lock(&lock);
- ret = aiocb->ret;
- mutex_unlock(&lock);
-
- return ret;
-}
-
-static int qemu_paio_error(struct qemu_paiocb *aiocb)
-{
- ssize_t ret = qemu_paio_return(aiocb);
-
- if (ret < 0)
- ret = -ret;
- else
- ret = 0;
-
- return ret;
-}
-
-static void posix_aio_read(void *opaque)
-{
- PosixAioState *s = opaque;
- struct qemu_paiocb *acb, **pacb;
- int ret;
- ssize_t len;
-
- /* read all bytes from signal pipe */
- for (;;) {
- char bytes[16];
-
- len = read(s->rfd, bytes, sizeof(bytes));
- if (len == -1 && errno == EINTR)
- continue; /* try again */
- if (len == sizeof(bytes))
- continue; /* more to read */
break;
- }
-
- for(;;) {
- pacb = &s->first_aio;
- for(;;) {
- acb = *pacb;
- if (!acb)
- return;
-
- ret = qemu_paio_error(acb);
- if (ret == ECANCELED) {
- /* remove the request */
- *pacb = acb->next;
- qemu_aio_release(acb);
- } else if (ret != EINPROGRESS) {
- /* end of aio */
- if (ret == 0) {
- ret = qemu_paio_return(acb);
- if (ret == acb->aio_nbytes)
- ret = 0;
- else
- ret = -EINVAL;
- } else {
- ret = -ret;
- }
-
- trace_paio_complete(acb, acb->common.opaque, ret);
-
- /* remove the request */
- *pacb = acb->next;
- /* call the callback */
- acb->common.cb(acb->common.opaque, ret);
- qemu_aio_release(acb);
- break;
- } else {
- pacb = &acb->next;
- }
- }
- }
-}
-
-static int posix_aio_flush(void *opaque)
-{
- PosixAioState *s = opaque;
- return !!s->first_aio;
-}
-
-static PosixAioState *posix_aio_state;
-
-static void posix_aio_notify_event(void)
-{
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
-}
-
-static void paio_remove(struct qemu_paiocb *acb)
-{
- struct qemu_paiocb **pacb;
-
- /* remove the callback from the queue */
- pacb = &posix_aio_state->first_aio;
- for(;;) {
- if (*pacb == NULL) {
- fprintf(stderr, "paio_remove: aio request not found!\n");
- break;
- } else if (*pacb == acb) {
- *pacb = acb->next;
- qemu_aio_release(acb);
- break;
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ if (ret == aiocb->aio_nbytes) {
+ ret = 0;
+ } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
+ ret = -EINVAL;
}
- pacb = &(*pacb)->next;
- }
-}
-
-static void paio_cancel(BlockDriverAIOCB *blockacb)
-{
- struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- int active = 0;
-
- trace_paio_cancel(acb, acb->common.opaque);
-
- mutex_lock(&lock);
- if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
- } else if (acb->ret == -EINPROGRESS) {
- active = 1;
- }
- mutex_unlock(&lock);
-
- if (active) {
- /* fail safe: if the aio could not be canceled, we wait for
- it */
- while (qemu_paio_error(acb) == EINPROGRESS)
- ;
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- paio_remove(acb);
+ qemu_aio_release(aiocb);
+ return ret;
}
static AIOPool raw_aio_pool = {
.aiocb_size = sizeof(struct qemu_paiocb),
- .cancel = paio_cancel,
};
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
@@ -609,12 +309,8 @@ BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
acb->aio_nbytes = nb_sectors * 512;
acb->aio_offset = sector_num * 512;
- acb->next = posix_aio_state->first_aio;
- posix_aio_state->first_aio = acb;
-
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
- qemu_paio_submit(acb);
- return &acb->common;
+ return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
}
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
@@ -630,50 +326,5 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
acb->aio_ioctl_buf = buf;
acb->aio_ioctl_cmd = req;
- acb->next = posix_aio_state->first_aio;
- posix_aio_state->first_aio = acb;
-
- qemu_paio_submit(acb);
- return &acb->common;
-}
-
-int paio_init(void)
-{
- PosixAioState *s;
- int fds[2];
- int ret;
-
- if (posix_aio_state)
- return 0;
-
- s = g_malloc(sizeof(PosixAioState));
-
- s->first_aio = NULL;
- if (qemu_pipe(fds) == -1) {
- fprintf(stderr, "failed to create pipe\n");
- g_free(s);
- return -1;
- }
-
- s->rfd = fds[0];
- s->wfd = fds[1];
-
- fcntl(s->rfd, F_SETFL, O_NONBLOCK);
- fcntl(s->wfd, F_SETFL, O_NONBLOCK);
-
- qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
-
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
- new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
-
- posix_aio_state = s;
- return 0;
+ return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
}
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 21/25] raw: merge posix-aio-compat.c into block/raw-posix.c
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (19 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 20/25] block: switch posix-aio-compat to threadpool Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 22/25] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes Paolo Bonzini
` (4 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Making the qemu_paiocb specific to raw devices will let us access members
of the BDRVRawState arbitrarily.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
Makefile.objs | 1 -
block/raw-posix-aio.h | 8 --
block/raw-posix.c | 294 ++++++++++++++++++++++++++++++++++++++++++++
posix-aio-compat.c | 330 --------------------------------------------------
4 file modificati, 294 inserzioni(+), 339 rimozioni(-)
delete mode 100644 posix-aio-compat.c
diff --git a/Makefile.objs b/Makefile.objs
index 4a6d47e..8219f41 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -45,7 +45,6 @@ block-obj-y = cutils.o iov.o cache-utils.o qemu-option.o module.o async.o
block-obj-y += nbd.o block.o blockjob.o aes.o qemu-config.o
block-obj-y += thread-pool.o qemu-progress.o qemu-sockets.o uri.o
block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
-block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_POSIX) += event_notifier-posix.o aio-posix.o
block-obj-$(CONFIG_WIN32) += event_notifier-win32.o aio-win32.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
diff --git a/block/raw-posix-aio.h b/block/raw-posix-aio.h
index 6725135..c714367 100644
--- a/block/raw-posix-aio.h
+++ b/block/raw-posix-aio.h
@@ -27,14 +27,6 @@
#define QEMU_AIO_MISALIGNED 0x1000
-/* posix-aio-compat.c - thread pool based implementation */
-BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
- int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
- BlockDriverCompletionFunc *cb, void *opaque, int type);
-BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
- unsigned long int req, void *buf,
- BlockDriverCompletionFunc *cb, void *opaque);
-
/* linux-aio.c - Linux native implementation */
void *laio_init(void);
BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 82b1a19..1c98217 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -27,6 +27,9 @@
#include "qemu-log.h"
#include "block_int.h"
#include "module.h"
+#include "trace.h"
+#include "thread-pool.h"
+#include "iov.h"
#include "block/raw-posix-aio.h"
#if defined(__APPLE__) && (__MACH__)
@@ -149,6 +152,20 @@ typedef struct BDRVRawReopenState {
static int fd_open(BlockDriverState *bs);
static int64_t raw_getlength(BlockDriverState *bs);
+typedef struct RawPosixAIOData {
+ BlockDriverState *bs;
+ int aio_fildes;
+ union {
+ struct iovec *aio_iov;
+ void *aio_ioctl_buf;
+ };
+ int aio_niov;
+ size_t aio_nbytes;
+#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */
+ off_t aio_offset;
+ int aio_type;
+} RawPosixAIOData;
+
#if defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
static int cdrom_reopen(BlockDriverState *bs);
#endif
@@ -429,6 +446,283 @@ static int qiov_is_aligned(BlockDriverState *bs, QEMUIOVector *qiov)
return 1;
}
+static ssize_t handle_aiocb_ioctl(RawPosixAIOData *aiocb)
+{
+ int ret;
+
+ ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
+ if (ret == -1) {
+ return -errno;
+ }
+
+ /*
+ * This looks weird, but the aio code only considers a request
+ * successful if it has written the full number of bytes.
+ *
+ * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
+ * so in fact we return the ioctl command here to make posix_aio_read()
+ * happy..
+ */
+ return aiocb->aio_nbytes;
+}
+
+static ssize_t handle_aiocb_flush(RawPosixAIOData *aiocb)
+{
+ int ret;
+
+ ret = qemu_fdatasync(aiocb->aio_fildes);
+ if (ret == -1) {
+ return -errno;
+ }
+ return 0;
+}
+
+#ifdef CONFIG_PREADV
+
+static bool preadv_present = true;
+
+static ssize_t
+qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+ return preadv(fd, iov, nr_iov, offset);
+}
+
+static ssize_t
+qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+ return pwritev(fd, iov, nr_iov, offset);
+}
+
+#else
+
+static bool preadv_present = false;
+
+static ssize_t
+qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+ return -ENOSYS;
+}
+
+static ssize_t
+qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+ return -ENOSYS;
+}
+
+#endif
+
+static ssize_t handle_aiocb_rw_vector(RawPosixAIOData *aiocb)
+{
+ ssize_t len;
+
+ do {
+ if (aiocb->aio_type & QEMU_AIO_WRITE)
+ len = qemu_pwritev(aiocb->aio_fildes,
+ aiocb->aio_iov,
+ aiocb->aio_niov,
+ aiocb->aio_offset);
+ else
+ len = qemu_preadv(aiocb->aio_fildes,
+ aiocb->aio_iov,
+ aiocb->aio_niov,
+ aiocb->aio_offset);
+ } while (len == -1 && errno == EINTR);
+
+ if (len == -1) {
+ return -errno;
+ }
+ return len;
+}
+
+/*
+ * Read/writes the data to/from a given linear buffer.
+ *
+ * Returns the number of bytes handles or -errno in case of an error. Short
+ * reads are only returned if the end of the file is reached.
+ */
+static ssize_t handle_aiocb_rw_linear(RawPosixAIOData *aiocb, char *buf)
+{
+ ssize_t offset = 0;
+ ssize_t len;
+
+ while (offset < aiocb->aio_nbytes) {
+ if (aiocb->aio_type & QEMU_AIO_WRITE) {
+ len = pwrite(aiocb->aio_fildes,
+ (const char *)buf + offset,
+ aiocb->aio_nbytes - offset,
+ aiocb->aio_offset + offset);
+ } else {
+ len = pread(aiocb->aio_fildes,
+ buf + offset,
+ aiocb->aio_nbytes - offset,
+ aiocb->aio_offset + offset);
+ }
+ if (len == -1 && errno == EINTR) {
+ continue;
+ } else if (len == -1) {
+ offset = -errno;
+ break;
+ } else if (len == 0) {
+ break;
+ }
+ offset += len;
+ }
+
+ return offset;
+}
+
+static ssize_t handle_aiocb_rw(RawPosixAIOData *aiocb)
+{
+ ssize_t nbytes;
+ char *buf;
+
+ if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
+ /*
+ * If there is just a single buffer, and it is properly aligned
+ * we can just use plain pread/pwrite without any problems.
+ */
+ if (aiocb->aio_niov == 1) {
+ return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
+ }
+ /*
+ * We have more than one iovec, and all are properly aligned.
+ *
+ * Try preadv/pwritev first and fall back to linearizing the
+ * buffer if it's not supported.
+ */
+ if (preadv_present) {
+ nbytes = handle_aiocb_rw_vector(aiocb);
+ if (nbytes == aiocb->aio_nbytes ||
+ (nbytes < 0 && nbytes != -ENOSYS)) {
+ return nbytes;
+ }
+ preadv_present = false;
+ }
+
+ /*
+ * XXX(hch): short read/write. no easy way to handle the reminder
+ * using these interfaces. For now retry using plain
+ * pread/pwrite?
+ */
+ }
+
+ /*
+ * Ok, we have to do it the hard way, copy all segments into
+ * a single aligned buffer.
+ */
+ buf = qemu_blockalign(aiocb->bs, aiocb->aio_nbytes);
+ if (aiocb->aio_type & QEMU_AIO_WRITE) {
+ char *p = buf;
+ int i;
+
+ for (i = 0; i < aiocb->aio_niov; ++i) {
+ memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
+ p += aiocb->aio_iov[i].iov_len;
+ }
+ }
+
+ nbytes = handle_aiocb_rw_linear(aiocb, buf);
+ if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
+ char *p = buf;
+ size_t count = aiocb->aio_nbytes, copy;
+ int i;
+
+ for (i = 0; i < aiocb->aio_niov && count; ++i) {
+ copy = count;
+ if (copy > aiocb->aio_iov[i].iov_len) {
+ copy = aiocb->aio_iov[i].iov_len;
+ }
+ memcpy(aiocb->aio_iov[i].iov_base, p, copy);
+ p += copy;
+ count -= copy;
+ }
+ }
+ qemu_vfree(buf);
+
+ return nbytes;
+}
+
+static int aio_worker(void *arg)
+{
+ RawPosixAIOData *aiocb = arg;
+ ssize_t ret = 0;
+
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ ret = handle_aiocb_rw(aiocb);
+ if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->bs->growable) {
+ iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret,
+ 0, aiocb->aio_nbytes - ret);
+
+ ret = aiocb->aio_nbytes;
+ }
+ if (ret == aiocb->aio_nbytes) {
+ ret = 0;
+ } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
+ ret = -EINVAL;
+ }
+ break;
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ if (ret == aiocb->aio_nbytes) {
+ ret = 0;
+ } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
+ ret = -EINVAL;
+ }
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
+ }
+
+ g_slice_free(RawPosixAIOData, aiocb);
+ return ret;
+}
+
+static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque, int type)
+{
+ RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
+
+ acb->bs = bs;
+ acb->aio_type = type;
+ acb->aio_fildes = fd;
+
+ if (qiov) {
+ acb->aio_iov = qiov->iov;
+ acb->aio_niov = qiov->niov;
+ }
+ acb->aio_nbytes = nb_sectors * 512;
+ acb->aio_offset = sector_num * 512;
+
+ trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
+ return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+}
+
+static BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
+ unsigned long int req, void *buf,
+ BlockDriverCompletionFunc *cb, void *opaque)
+{
+ RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
+
+ acb->bs = bs;
+ acb->aio_type = QEMU_AIO_IOCTL;
+ acb->aio_fildes = fd;
+ acb->aio_offset = 0;
+ acb->aio_ioctl_buf = buf;
+ acb->aio_ioctl_cmd = req;
+
+ return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+}
+
static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
BlockDriverCompletionFunc *cb, void *opaque, int type)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
deleted file mode 100644
index 4a1e3d3..0000000
--- a/posix-aio-compat.c
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * QEMU posix-aio emulation
- *
- * Copyright IBM, Corp. 2008
- *
- * Authors:
- * Anthony Liguori <aliguori@us.ibm.com>
- *
- * This work is licensed under the terms of the GNU GPL, version 2. See
- * the COPYING file in the top-level directory.
- *
- * Contributions after 2012-01-13 are licensed under the terms of the
- * GNU GPL, version 2 or (at your option) any later version.
- */
-
-#include <sys/ioctl.h>
-#include <sys/types.h>
-#include <pthread.h>
-#include <unistd.h>
-#include <errno.h>
-#include <time.h>
-#include <string.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "qemu-queue.h"
-#include "osdep.h"
-#include "sysemu.h"
-#include "qemu-common.h"
-#include "trace.h"
-#include "thread-pool.h"
-#include "block_int.h"
-#include "iov.h"
-
-#include "block/raw-posix-aio.h"
-
-struct qemu_paiocb {
- BlockDriverAIOCB common;
- int aio_fildes;
- union {
- struct iovec *aio_iov;
- void *aio_ioctl_buf;
- };
- int aio_niov;
- size_t aio_nbytes;
-#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */
- off_t aio_offset;
- int aio_type;
-};
-
-#ifdef CONFIG_PREADV
-static int preadv_present = 1;
-#else
-static int preadv_present = 0;
-#endif
-
-static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
-{
- int ret;
-
- ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
- if (ret == -1)
- return -errno;
-
- /*
- * This looks weird, but the aio code only considers a request
- * successful if it has written the full number of bytes.
- *
- * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
- * so in fact we return the ioctl command here to make posix_aio_read()
- * happy..
- */
- return aiocb->aio_nbytes;
-}
-
-static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
-{
- int ret;
-
- ret = qemu_fdatasync(aiocb->aio_fildes);
- if (ret == -1)
- return -errno;
- return 0;
-}
-
-#ifdef CONFIG_PREADV
-
-static ssize_t
-qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
-{
- return preadv(fd, iov, nr_iov, offset);
-}
-
-static ssize_t
-qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
-{
- return pwritev(fd, iov, nr_iov, offset);
-}
-
-#else
-
-static ssize_t
-qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
-{
- return -ENOSYS;
-}
-
-static ssize_t
-qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
-{
- return -ENOSYS;
-}
-
-#endif
-
-static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
-{
- ssize_t len;
-
- do {
- if (aiocb->aio_type & QEMU_AIO_WRITE)
- len = qemu_pwritev(aiocb->aio_fildes,
- aiocb->aio_iov,
- aiocb->aio_niov,
- aiocb->aio_offset);
- else
- len = qemu_preadv(aiocb->aio_fildes,
- aiocb->aio_iov,
- aiocb->aio_niov,
- aiocb->aio_offset);
- } while (len == -1 && errno == EINTR);
-
- if (len == -1)
- return -errno;
- return len;
-}
-
-/*
- * Read/writes the data to/from a given linear buffer.
- *
- * Returns the number of bytes handles or -errno in case of an error. Short
- * reads are only returned if the end of the file is reached.
- */
-static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
-{
- ssize_t offset = 0;
- ssize_t len;
-
- while (offset < aiocb->aio_nbytes) {
- if (aiocb->aio_type & QEMU_AIO_WRITE)
- len = pwrite(aiocb->aio_fildes,
- (const char *)buf + offset,
- aiocb->aio_nbytes - offset,
- aiocb->aio_offset + offset);
- else
- len = pread(aiocb->aio_fildes,
- buf + offset,
- aiocb->aio_nbytes - offset,
- aiocb->aio_offset + offset);
-
- if (len == -1 && errno == EINTR)
- continue;
- else if (len == -1) {
- offset = -errno;
- break;
- } else if (len == 0)
- break;
-
- offset += len;
- }
-
- return offset;
-}
-
-static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
-{
- ssize_t nbytes;
- char *buf;
-
- if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
- /*
- * If there is just a single buffer, and it is properly aligned
- * we can just use plain pread/pwrite without any problems.
- */
- if (aiocb->aio_niov == 1)
- return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
-
- /*
- * We have more than one iovec, and all are properly aligned.
- *
- * Try preadv/pwritev first and fall back to linearizing the
- * buffer if it's not supported.
- */
- if (preadv_present) {
- nbytes = handle_aiocb_rw_vector(aiocb);
- if (nbytes == aiocb->aio_nbytes)
- return nbytes;
- if (nbytes < 0 && nbytes != -ENOSYS)
- return nbytes;
- preadv_present = 0;
- }
-
- /*
- * XXX(hch): short read/write. no easy way to handle the reminder
- * using these interfaces. For now retry using plain
- * pread/pwrite?
- */
- }
-
- /*
- * Ok, we have to do it the hard way, copy all segments into
- * a single aligned buffer.
- */
- buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
- if (aiocb->aio_type & QEMU_AIO_WRITE) {
- char *p = buf;
- int i;
-
- for (i = 0; i < aiocb->aio_niov; ++i) {
- memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
- p += aiocb->aio_iov[i].iov_len;
- }
- }
-
- nbytes = handle_aiocb_rw_linear(aiocb, buf);
- if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
- char *p = buf;
- size_t count = aiocb->aio_nbytes, copy;
- int i;
-
- for (i = 0; i < aiocb->aio_niov && count; ++i) {
- copy = count;
- if (copy > aiocb->aio_iov[i].iov_len)
- copy = aiocb->aio_iov[i].iov_len;
- memcpy(aiocb->aio_iov[i].iov_base, p, copy);
- p += copy;
- count -= copy;
- }
- }
- qemu_vfree(buf);
-
- return nbytes;
-}
-
-static int aio_worker(void *arg)
-{
- struct qemu_paiocb *aiocb = arg;
- ssize_t ret = 0;
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- ret = handle_aiocb_rw(aiocb);
- if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
- /* A short read means that we have reached EOF. Pad the buffer
- * with zeros for bytes after EOF. */
- iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret,
- 0, aiocb->aio_nbytes - ret);
-
- ret = aiocb->aio_nbytes;
- }
- if (ret == aiocb->aio_nbytes) {
- ret = 0;
- } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
- ret = -EINVAL;
- }
- break;
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- if (ret == aiocb->aio_nbytes) {
- ret = 0;
- } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
- ret = -EINVAL;
- }
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- qemu_aio_release(aiocb);
- return ret;
-}
-
-static AIOPool raw_aio_pool = {
- .aiocb_size = sizeof(struct qemu_paiocb),
-};
-
-BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
- int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
- BlockDriverCompletionFunc *cb, void *opaque, int type)
-{
- struct qemu_paiocb *acb;
-
- acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
- acb->aio_type = type;
- acb->aio_fildes = fd;
-
- if (qiov) {
- acb->aio_iov = qiov->iov;
- acb->aio_niov = qiov->niov;
- }
- acb->aio_nbytes = nb_sectors * 512;
- acb->aio_offset = sector_num * 512;
-
- trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
- return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
-}
-
-BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
- unsigned long int req, void *buf,
- BlockDriverCompletionFunc *cb, void *opaque)
-{
- struct qemu_paiocb *acb;
-
- acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
- acb->aio_type = QEMU_AIO_IOCTL;
- acb->aio_fildes = fd;
- acb->aio_offset = 0;
- acb->aio_ioctl_buf = buf;
- acb->aio_ioctl_cmd = req;
-
- return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
-}
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 22/25] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (20 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 21/25] raw: merge posix-aio-compat.c into block/raw-posix.c Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 23/25] raw-win32: add emulated AIO support Paolo Bonzini
` (3 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
block/{raw-posix-aio.h => raw-aio.h} | 10 ++++++----
block/raw-posix.c | 2 +-
linux-aio.c | 2 +-
3 file modificati, 8 inserzioni(+), 6 rimozioni(-)
rename block/{raw-posix-aio.h => raw-aio.h} (86%)
diff --git a/block/raw-posix-aio.h b/block/raw-aio.h
similarity index 86%
rename from block/raw-posix-aio.h
rename to block/raw-aio.h
index c714367..b3bb073 100644
--- a/block/raw-posix-aio.h
+++ b/block/raw-aio.h
@@ -1,5 +1,5 @@
/*
- * QEMU Posix block I/O backend AIO support
+ * Declarations for AIO in the raw protocol
*
* Copyright IBM, Corp. 2008
*
@@ -12,8 +12,8 @@
* Contributions after 2012-01-13 are licensed under the terms of the
* GNU GPL, version 2 or (at your option) any later version.
*/
-#ifndef QEMU_RAW_POSIX_AIO_H
-#define QEMU_RAW_POSIX_AIO_H
+#ifndef QEMU_RAW_AIO_H
+#define QEMU_RAW_AIO_H
/* AIO request types */
#define QEMU_AIO_READ 0x0001
@@ -28,9 +28,11 @@
/* linux-aio.c - Linux native implementation */
+#ifdef CONFIG_LINUX_AIO
void *laio_init(void);
BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
BlockDriverCompletionFunc *cb, void *opaque, int type);
+#endif
-#endif /* QEMU_RAW_POSIX_AIO_H */
+#endif /* QEMU_RAW_AIO_H */
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 1c98217..f1a9965 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -30,7 +30,7 @@
#include "trace.h"
#include "thread-pool.h"
#include "iov.h"
-#include "block/raw-posix-aio.h"
+#include "raw-aio.h"
#if defined(__APPLE__) && (__MACH__)
#include <paths.h>
diff --git a/linux-aio.c b/linux-aio.c
index 769b558..06f6790 100644
--- a/linux-aio.c
+++ b/linux-aio.c
@@ -9,7 +9,7 @@
*/
#include "qemu-common.h"
#include "qemu-aio.h"
-#include "block/raw-posix-aio.h"
+#include "block/raw-aio.h"
#include "event_notifier.h"
#include <libaio.h>
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 23/25] raw-win32: add emulated AIO support
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (21 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 22/25] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 24/25] raw-posix: move linux-aio.c to block/ Paolo Bonzini
` (2 subsequent siblings)
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
block/raw-win32.c | 187 ++++++++++++++++++++++++++++++++++++++++--------------
1 file modificato, 138 inserzioni(+), 49 rimozioni(-)
diff --git a/block/raw-win32.c b/block/raw-win32.c
index 78c8306..ffd86e3 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -25,6 +25,10 @@
#include "qemu-timer.h"
#include "block_int.h"
#include "module.h"
+#include "raw-aio.h"
+#include "trace.h"
+#include "thread-pool.h"
+#include "iov.h"
#include <windows.h>
#include <winioctl.h>
@@ -32,12 +36,127 @@
#define FTYPE_CD 1
#define FTYPE_HARDDISK 2
+typedef struct RawWin32AIOData {
+ BlockDriverState *bs;
+ HANDLE hfile;
+ struct iovec *aio_iov;
+ int aio_niov;
+ size_t aio_nbytes;
+ off64_t aio_offset;
+ int aio_type;
+} RawWin32AIOData;
+
typedef struct BDRVRawState {
HANDLE hfile;
int type;
char drive_path[16]; /* format: "d:\" */
} BDRVRawState;
+/*
+ * Read/writes the data to/from a given linear buffer.
+ *
+ * Returns the number of bytes handles or -errno in case of an error. Short
+ * reads are only returned if the end of the file is reached.
+ */
+static size_t handle_aiocb_rw(RawWin32AIOData *aiocb)
+{
+ size_t offset = 0;
+ int i;
+
+ for (i = 0; i < aiocb->aio_niov; i++) {
+ OVERLAPPED ov;
+ DWORD ret, ret_count, len;
+
+ memset(&ov, 0, sizeof(ov));
+ ov.Offset = (aiocb->aio_offset + offset);
+ ov.OffsetHigh = (aiocb->aio_offset + offset) >> 32;
+ len = aiocb->aio_iov[i].iov_len;
+ if (aiocb->aio_type & QEMU_AIO_WRITE) {
+ ret = WriteFile(aiocb->hfile, aiocb->aio_iov[i].iov_base,
+ len, &ret_count, &ov);
+ } else {
+ ret = ReadFile(aiocb->hfile, aiocb->aio_iov[i].iov_base,
+ len, &ret_count, &ov);
+ }
+ if (!ret) {
+ ret_count = 0;
+ }
+ if (ret_count != len) {
+ break;
+ }
+ offset += len;
+ }
+
+ return offset;
+}
+
+static int aio_worker(void *arg)
+{
+ RawWin32AIOData *aiocb = arg;
+ ssize_t ret = 0;
+ size_t count;
+
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ count = handle_aiocb_rw(aiocb);
+ if (count < aiocb->aio_nbytes && aiocb->bs->growable) {
+ /* A short read means that we have reached EOF. Pad the buffer
+ * with zeros for bytes after EOF. */
+ iov_memset(aiocb->aio_iov, aiocb->aio_niov, count,
+ 0, aiocb->aio_nbytes - count);
+
+ count = aiocb->aio_nbytes;
+ }
+ if (count == aiocb->aio_nbytes) {
+ ret = 0;
+ } else {
+ ret = -EINVAL;
+ }
+ break;
+ case QEMU_AIO_WRITE:
+ count = handle_aiocb_rw(aiocb);
+ if (count == aiocb->aio_nbytes) {
+ count = 0;
+ } else {
+ count = -EINVAL;
+ }
+ break;
+ case QEMU_AIO_FLUSH:
+ if (!FlushFileBuffers(aiocb->hfile)) {
+ return -EIO;
+ }
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
+ }
+
+ g_slice_free(RawWin32AIOData, aiocb);
+ return ret;
+}
+
+static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque, int type)
+{
+ RawWin32AIOData *acb = g_slice_new(RawWin32AIOData);
+
+ acb->bs = bs;
+ acb->hfile = hfile;
+ acb->aio_type = type;
+
+ if (qiov) {
+ acb->aio_iov = qiov->iov;
+ acb->aio_niov = qiov->niov;
+ }
+ acb->aio_nbytes = nb_sectors * 512;
+ acb->aio_offset = sector_num * 512;
+
+ trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
+ return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+}
+
int qemu_ftruncate64(int fd, int64_t length)
{
LARGE_INTEGER li;
@@ -117,59 +236,29 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
return 0;
}
-static int raw_read(BlockDriverState *bs, int64_t sector_num,
- uint8_t *buf, int nb_sectors)
+static BlockDriverAIOCB *raw_aio_readv(BlockDriverState *bs,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque)
{
BDRVRawState *s = bs->opaque;
- OVERLAPPED ov;
- DWORD ret_count;
- int ret;
- int64_t offset = sector_num * 512;
- int count = nb_sectors * 512;
-
- memset(&ov, 0, sizeof(ov));
- ov.Offset = offset;
- ov.OffsetHigh = offset >> 32;
- ret = ReadFile(s->hfile, buf, count, &ret_count, &ov);
- if (!ret)
- return ret_count;
- if (ret_count == count)
- ret_count = 0;
- return ret_count;
+ return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
+ cb, opaque, QEMU_AIO_READ);
}
-static int raw_write(BlockDriverState *bs, int64_t sector_num,
- const uint8_t *buf, int nb_sectors)
+static BlockDriverAIOCB *raw_aio_writev(BlockDriverState *bs,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque)
{
BDRVRawState *s = bs->opaque;
- OVERLAPPED ov;
- DWORD ret_count;
- int ret;
- int64_t offset = sector_num * 512;
- int count = nb_sectors * 512;
-
- memset(&ov, 0, sizeof(ov));
- ov.Offset = offset;
- ov.OffsetHigh = offset >> 32;
- ret = WriteFile(s->hfile, buf, count, &ret_count, &ov);
- if (!ret)
- return ret_count;
- if (ret_count == count)
- ret_count = 0;
- return ret_count;
+ return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
+ cb, opaque, QEMU_AIO_WRITE);
}
-static int raw_flush(BlockDriverState *bs)
+static BlockDriverAIOCB *raw_aio_flush(BlockDriverState *bs,
+ BlockDriverCompletionFunc *cb, void *opaque)
{
BDRVRawState *s = bs->opaque;
- int ret;
-
- ret = FlushFileBuffers(s->hfile);
- if (ret == 0) {
- return -EIO;
- }
-
- return 0;
+ return paio_submit(bs, s->hfile, 0, NULL, 0, cb, opaque, QEMU_AIO_FLUSH);
}
static void raw_close(BlockDriverState *bs)
@@ -290,9 +379,9 @@ static BlockDriver bdrv_file = {
.bdrv_close = raw_close,
.bdrv_create = raw_create,
- .bdrv_read = raw_read,
- .bdrv_write = raw_write,
- .bdrv_co_flush_to_disk = raw_flush,
+ .bdrv_aio_readv = raw_aio_readv,
+ .bdrv_aio_writev = raw_aio_writev,
+ .bdrv_aio_flush = raw_aio_flush,
.bdrv_truncate = raw_truncate,
.bdrv_getlength = raw_getlength,
@@ -413,9 +502,9 @@ static BlockDriver bdrv_host_device = {
.bdrv_close = raw_close,
.bdrv_has_zero_init = hdev_has_zero_init,
- .bdrv_read = raw_read,
- .bdrv_write = raw_write,
- .bdrv_co_flush_to_disk = raw_flush,
+ .bdrv_aio_readv = raw_aio_readv,
+ .bdrv_aio_writev = raw_aio_writev,
+ .bdrv_aio_flush = raw_aio_flush,
.bdrv_getlength = raw_getlength,
.bdrv_get_allocated_file_size
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 24/25] raw-posix: move linux-aio.c to block/
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (22 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 23/25] raw-win32: add emulated AIO support Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 25/25] raw-win32: implement native asynchronous I/O Paolo Bonzini
2012-10-30 19:15 ` [Qemu-devel] [PATCH 00/25] AioContext & threadpool Stefan Hajnoczi
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
Makefile.objs | 1 -
block/Makefile.objs | 1 +
linux-aio.c => block/linux-aio.c | 0
3 file modificati, 1 inserzione(+). 1 rimozione(-)
rename linux-aio.c => block/linux-aio.c (100%)
diff --git a/Makefile.objs b/Makefile.objs
index 8219f41..807d9cd 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -47,7 +47,6 @@ block-obj-y += thread-pool.o qemu-progress.o qemu-sockets.o uri.o
block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)
block-obj-$(CONFIG_POSIX) += event_notifier-posix.o aio-posix.o
block-obj-$(CONFIG_WIN32) += event_notifier-win32.o aio-win32.o
-block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
block-obj-y += block/
ifeq ($(CONFIG_VIRTIO)$(CONFIG_VIRTFS)$(CONFIG_PCI),yyy)
diff --git a/block/Makefile.objs b/block/Makefile.objs
index 684765b..771d341 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -5,6 +5,7 @@ block-obj-y += qed-check.o
block-obj-y += parallels.o blkdebug.o blkverify.o
block-obj-$(CONFIG_WIN32) += raw-win32.o
block-obj-$(CONFIG_POSIX) += raw-posix.o
+block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
ifeq ($(CONFIG_POSIX),y)
block-obj-y += nbd.o sheepdog.o
diff --git a/linux-aio.c b/block/linux-aio.c
similarity index 100%
rename from linux-aio.c
rename to block/linux-aio.c
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* [Qemu-devel] [PATCH 25/25] raw-win32: implement native asynchronous I/O
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (23 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 24/25] raw-posix: move linux-aio.c to block/ Paolo Bonzini
@ 2012-10-26 14:05 ` Paolo Bonzini
2012-10-30 19:15 ` [Qemu-devel] [PATCH 00/25] AioContext & threadpool Stefan Hajnoczi
25 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-26 14:05 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha
With the new support for EventNotifiers in the AIO event loop, we
can hook a completion port to every opened file and use asynchronous
I/O on them.
Wine's support is extremely inefficient, also because it really does
the I/O synchronously on regular files. (!) But it works, and it is
good to keep the Win32 and POSIX ports as similar as possible.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
block/Makefile.objs | 2 +-
block/raw-aio.h | 10 +++
block/raw-win32.c | 42 ++++++++--
block/win32-aio.c | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++++
4 file modificati, 274 inserzioni(+), 6 rimozioni(-)
create mode 100644 block/win32-aio.c
diff --git a/block/Makefile.objs b/block/Makefile.objs
index 771d341..30ef6ae 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
block-obj-y += qed-check.o
block-obj-y += parallels.o blkdebug.o blkverify.o
-block-obj-$(CONFIG_WIN32) += raw-win32.o
+block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
block-obj-$(CONFIG_POSIX) += raw-posix.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
diff --git a/block/raw-aio.h b/block/raw-aio.h
index b3bb073..e77f361 100644
--- a/block/raw-aio.h
+++ b/block/raw-aio.h
@@ -35,4 +35,14 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
BlockDriverCompletionFunc *cb, void *opaque, int type);
#endif
+#ifdef _WIN32
+typedef struct QEMUWin32AIOState QEMUWin32AIOState;
+QEMUWin32AIOState *win32_aio_init(void);
+int win32_aio_attach(QEMUWin32AIOState *aio, HANDLE hfile);
+BlockDriverAIOCB *win32_aio_submit(BlockDriverState *bs,
+ QEMUWin32AIOState *aio, HANDLE hfile,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque, int type);
+#endif
+
#endif /* QEMU_RAW_AIO_H */
diff --git a/block/raw-win32.c b/block/raw-win32.c
index ffd86e3..0c05c58 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -36,6 +36,8 @@
#define FTYPE_CD 1
#define FTYPE_HARDDISK 2
+static QEMUWin32AIOState *aio;
+
typedef struct RawWin32AIOData {
BlockDriverState *bs;
HANDLE hfile;
@@ -50,6 +52,7 @@ typedef struct BDRVRawState {
HANDLE hfile;
int type;
char drive_path[16]; /* format: "d:\" */
+ QEMUWin32AIOState *aio;
} BDRVRawState;
/*
@@ -208,6 +211,9 @@ static void raw_parse_flags(int flags, int *access_flags, DWORD *overlapped)
}
*overlapped = FILE_ATTRIBUTE_NORMAL;
+ if (flags & BDRV_O_NATIVE_AIO) {
+ *overlapped |= FILE_FLAG_OVERLAPPED;
+ }
if (flags & BDRV_O_NOCACHE) {
*overlapped |= FILE_FLAG_NO_BUFFERING;
}
@@ -222,6 +228,13 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
s->type = FTYPE_FILE;
raw_parse_flags(flags, &access_flags, &overlapped);
+
+ if ((flags & BDRV_O_NATIVE_AIO) && aio == NULL) {
+ aio = win32_aio_init();
+ if (aio == NULL) {
+ return -EINVAL;
+ }
+ }
s->hfile = CreateFile(filename, access_flags,
FILE_SHARE_READ, NULL,
@@ -231,7 +244,16 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
if (err == ERROR_ACCESS_DENIED)
return -EACCES;
- return -1;
+ return -EINVAL;
+ }
+
+ if (flags & BDRV_O_NATIVE_AIO) {
+ int ret = win32_aio_attach(aio, s->hfile);
+ if (ret < 0) {
+ CloseHandle(s->hfile);
+ return ret;
+ }
+ s->aio = aio;
}
return 0;
}
@@ -241,8 +263,13 @@ static BlockDriverAIOCB *raw_aio_readv(BlockDriverState *bs,
BlockDriverCompletionFunc *cb, void *opaque)
{
BDRVRawState *s = bs->opaque;
- return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
- cb, opaque, QEMU_AIO_READ);
+ if (s->aio) {
+ return win32_aio_submit(bs, s->aio, s->hfile, sector_num, qiov,
+ nb_sectors, cb, opaque, QEMU_AIO_READ);
+ } else {
+ return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
+ cb, opaque, QEMU_AIO_READ);
+ }
}
static BlockDriverAIOCB *raw_aio_writev(BlockDriverState *bs,
@@ -250,8 +277,13 @@ static BlockDriverAIOCB *raw_aio_writev(BlockDriverState *bs,
BlockDriverCompletionFunc *cb, void *opaque)
{
BDRVRawState *s = bs->opaque;
- return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
- cb, opaque, QEMU_AIO_WRITE);
+ if (s->aio) {
+ return win32_aio_submit(bs, s->aio, s->hfile, sector_num, qiov,
+ nb_sectors, cb, opaque, QEMU_AIO_WRITE);
+ } else {
+ return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors,
+ cb, opaque, QEMU_AIO_WRITE);
+ }
}
static BlockDriverAIOCB *raw_aio_flush(BlockDriverState *bs,
diff --git a/block/win32-aio.c b/block/win32-aio.c
new file mode 100644
index 0000000..c34dc73
--- /dev/null
+++ b/block/win32-aio.c
@@ -0,0 +1,228 @@
+/*
+ * Native AIO support (win32)
+ *
+ * Copyright (c) 2012 Red Hat, Inc.
+ *
+ * Author: Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+#include "qemu-common.h"
+#include "qemu-timer.h"
+#include "block_int.h"
+#include "module.h"
+#include "qemu-common.h"
+#include "qemu-aio.h"
+#include "raw-aio.h"
+#include "event_notifier.h"
+#include <windows.h>
+#include <winioctl.h>
+
+#define FTYPE_FILE 0
+#define FTYPE_CD 1
+#define FTYPE_HARDDISK 2
+
+struct QEMUWin32AIOState {
+ HANDLE hIOCP;
+ EventNotifier e;
+ int count;
+};
+
+typedef struct QEMUWin32AIOCB {
+ BlockDriverAIOCB common;
+ struct QEMUWin32AIOState *ctx;
+ int nbytes;
+ OVERLAPPED ov;
+ QEMUIOVector *qiov;
+ void *buf;
+ bool is_read;
+ bool is_linear;
+} QEMUWin32AIOCB;
+
+/*
+ * Completes an AIO request (calls the callback and frees the ACB).
+ */
+static void win32_aio_process_completion(QEMUWin32AIOState *s,
+ QEMUWin32AIOCB *waiocb, DWORD count)
+{
+ int ret;
+ s->count--;
+
+ if (waiocb->ov.Internal != 0) {
+ ret = -EIO;
+ } else {
+ ret = 0;
+ if (count < waiocb->nbytes) {
+ /* Short reads mean EOF, pad with zeros. */
+ if (waiocb->is_read) {
+ qemu_iovec_memset(waiocb->qiov, count, 0,
+ waiocb->qiov->size - count);
+ } else {
+ ret = -EINVAL;
+ }
+ }
+ }
+
+ if (!waiocb->is_linear) {
+ if (ret == 0 && waiocb->is_read) {
+ QEMUIOVector *qiov = waiocb->qiov;
+ char *p = waiocb->buf;
+ int i;
+
+ for (i = 0; i < qiov->niov; ++i) {
+ memcpy(p, qiov->iov[i].iov_base, qiov->iov[i].iov_len);
+ p += qiov->iov[i].iov_len;
+ }
+ g_free(waiocb->buf);
+ }
+ }
+
+
+ waiocb->common.cb(waiocb->common.opaque, ret);
+ qemu_aio_release(waiocb);
+}
+
+static void win32_aio_completion_cb(EventNotifier *e)
+{
+ QEMUWin32AIOState *s = container_of(e, QEMUWin32AIOState, e);
+ DWORD count;
+ ULONG_PTR key;
+ OVERLAPPED *ov;
+
+ event_notifier_test_and_clear(&s->e);
+ while (GetQueuedCompletionStatus(s->hIOCP, &count, &key, &ov, 0)) {
+ QEMUWin32AIOCB *waiocb = container_of(ov, QEMUWin32AIOCB, ov);
+
+ win32_aio_process_completion(s, waiocb, count);
+ }
+}
+
+static int win32_aio_flush_cb(EventNotifier *e)
+{
+ QEMUWin32AIOState *s = container_of(e, QEMUWin32AIOState, e);
+
+ return (s->count > 0) ? 1 : 0;
+}
+
+static void win32_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+ QEMUWin32AIOCB *waiocb = (QEMUWin32AIOCB *)blockacb;
+
+ /*
+ * CancelIoEx is only supported in Vista and newer. For now, just
+ * wait for completion.
+ */
+ while (!HasOverlappedIoCompleted(&waiocb->ov)) {
+ qemu_aio_wait();
+ }
+}
+
+static AIOPool win32_aio_pool = {
+ .aiocb_size = sizeof(QEMUWin32AIOCB),
+ .cancel = win32_aio_cancel,
+};
+
+BlockDriverAIOCB *win32_aio_submit(BlockDriverState *bs,
+ QEMUWin32AIOState *aio, HANDLE hfile,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque, int type)
+{
+ struct QEMUWin32AIOCB *waiocb;
+ uint64_t offset = sector_num * 512;
+ DWORD rc;
+
+ waiocb = qemu_aio_get(&win32_aio_pool, bs, cb, opaque);
+ waiocb->nbytes = nb_sectors * 512;
+ waiocb->qiov = qiov;
+ waiocb->is_read = (type == QEMU_AIO_READ);
+
+ if (qiov->niov > 1) {
+ waiocb->buf = qemu_blockalign(bs, qiov->size);
+ if (type & QEMU_AIO_WRITE) {
+ char *p = waiocb->buf;
+ int i;
+
+ for (i = 0; i < qiov->niov; ++i) {
+ memcpy(p, qiov->iov[i].iov_base, qiov->iov[i].iov_len);
+ p += qiov->iov[i].iov_len;
+ }
+ }
+ waiocb->is_linear = false;
+ } else {
+ waiocb->buf = qiov->iov[0].iov_base;
+ waiocb->is_linear = true;
+ }
+
+ waiocb->ov = (OVERLAPPED) {
+ .Offset = (DWORD) offset,
+ .OffsetHigh = (DWORD) (offset >> 32),
+ .hEvent = event_notifier_get_handle(&aio->e)
+ };
+ aio->count++;
+
+ if (type & QEMU_AIO_READ) {
+ rc = ReadFile(hfile, waiocb->buf, waiocb->nbytes, NULL, &waiocb->ov);
+ } else {
+ rc = WriteFile(hfile, waiocb->buf, waiocb->nbytes, NULL, &waiocb->ov);
+ }
+ if(rc == 0 && GetLastError() != ERROR_IO_PENDING) {
+ goto out_dec_count;
+ }
+ return &waiocb->common;
+
+out_dec_count:
+ aio->count--;
+ qemu_aio_release(waiocb);
+ return NULL;
+}
+
+int win32_aio_attach(QEMUWin32AIOState *aio, HANDLE hfile)
+{
+ if (CreateIoCompletionPort(hfile, aio->hIOCP, (ULONG_PTR) 0, 0) == NULL) {
+ return -EINVAL;
+ } else {
+ return 0;
+ }
+}
+
+QEMUWin32AIOState *win32_aio_init(void)
+{
+ QEMUWin32AIOState *s;
+
+ s = g_malloc0(sizeof(*s));
+ if (event_notifier_init(&s->e, false) < 0) {
+ goto out_free_state;
+ }
+
+ s->hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ if (s->hIOCP == NULL) {
+ goto out_close_efd;
+ }
+
+ qemu_aio_set_event_notifier(&s->e, win32_aio_completion_cb,
+ win32_aio_flush_cb);
+
+ return s;
+
+out_close_efd:
+ event_notifier_cleanup(&s->e);
+out_free_state:
+ g_free(s);
+ return NULL;
+}
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* Re: [Qemu-devel] [PATCH 18/25] qemu-thread: add QemuSemaphore
2012-10-26 14:05 ` [Qemu-devel] [PATCH 18/25] qemu-thread: add QemuSemaphore Paolo Bonzini
@ 2012-10-30 18:48 ` Stefan Hajnoczi
2012-10-31 8:46 ` Paolo Bonzini
0 siblings, 1 reply; 32+ messages in thread
From: Stefan Hajnoczi @ 2012-10-30 18:48 UTC (permalink / raw)
To: Paolo Bonzini; +Cc: aliguori, qemu-devel
On Fri, Oct 26, 2012 at 04:05:48PM +0200, Paolo Bonzini wrote:
> +int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
> +{
> + int rc;
> +
> + if (ms <= 0) {
> + /* This is cheaper than sem_timedwait. */
> + rc = sem_trywait(&sem->sem);
> + if (rc == -1 && errno == EAGAIN) {
> + return -1;
> + }
> + } else {
> + struct timeval tv;
> + struct timespec ts;
> + gettimeofday(&tv, NULL);
> + ts.tv_nsec = tv.tv_usec * 1000 + (ms % 1000) * 1000000;
> + ts.tv_sec = tv.tv_sec + ms / 1000;
> + if (ts.tv_nsec >= 1000000000) {
> + ts.tv_sec++;
> + ts.tv_nsec -= 1000000000;
> + }
> + rc = sem_timedwait(&sem->sem, &ts);
> + if (rc == -1 && errno == ETIMEDOUT) {
> + return -1;
> + }
> + }
> + if (rc < 0) {
> + error_exit(errno, __func__);
> + }
Forgot to handle EINTR?
> + return 0;
> +}
> +
> +void qemu_sem_wait(QemuSemaphore *sem)
> +{
> + int rc;
> +
> + rc = sem_wait(&sem->sem);
> + if (rc < 0) {
> + error_exit(errno, __func__);
> + }
EINTR
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility
2012-10-26 14:05 ` [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility Paolo Bonzini
@ 2012-10-30 19:13 ` Stefan Hajnoczi
2012-10-31 9:41 ` Paolo Bonzini
0 siblings, 1 reply; 32+ messages in thread
From: Stefan Hajnoczi @ 2012-10-30 19:13 UTC (permalink / raw)
To: Paolo Bonzini; +Cc: aliguori, qemu-devel
On Fri, Oct 26, 2012 at 04:05:49PM +0200, Paolo Bonzini wrote:
> +static void event_notifier_ready(EventNotifier *notifier)
> +{
> + ThreadPoolElement *elem, *next;
> +
> + event_notifier_test_and_clear(notifier);
> +restart:
> + QLIST_FOREACH_SAFE(elem, &head, all, next) {
> + if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
> + continue;
> + }
> + if (elem->state == THREAD_DONE) {
> + trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
> + }
> + if (elem->state == THREAD_DONE && elem->common.cb) {
> + QLIST_REMOVE(elem, all);
> + elem->common.cb(elem->common.opaque, elem->ret);
This function didn't take the lock. First it accessed elem->state and
how it reads elem->ret. We need to take the lock to ensure both
elem->state and elem->ret have been set - otherwise we could read
elem->ret before the return value was stored.
> +typedef struct ThreadPoolCo {
> + Coroutine *co;
> + int ret;
> +} ThreadPoolCo;
> +
> +static void thread_pool_co_cb(void *opaque, int ret)
> +{
> + ThreadPoolCo *co = opaque;
> +
> + co->ret = ret;
> + qemu_coroutine_enter(co->co, NULL);
> +}
> +
> +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
> +{
> + ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
> + assert(qemu_in_coroutine());
> + thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
> + qemu_coroutine_yield();
> + return tpc.ret;
It's important to understand that the submit_aio, yield, return ret
pattern works because we assume this function was called as part of the
main loop.
If thread_pool_submit_co() was called outside the event loop and global
mutex, then there is a race between the submit_aio and yield steps where
thread_pool_co_cb() is called before this coroutine yields!
I see no reason why this is a problem today but I had to think through
this case when reading the code.
Stefan
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [Qemu-devel] [PATCH 00/25] AioContext & threadpool
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
` (24 preceding siblings ...)
2012-10-26 14:05 ` [Qemu-devel] [PATCH 25/25] raw-win32: implement native asynchronous I/O Paolo Bonzini
@ 2012-10-30 19:15 ` Stefan Hajnoczi
25 siblings, 0 replies; 32+ messages in thread
From: Stefan Hajnoczi @ 2012-10-30 19:15 UTC (permalink / raw)
To: Paolo Bonzini; +Cc: aliguori, qemu-devel
On Fri, Oct 26, 2012 at 04:05:30PM +0200, Paolo Bonzini wrote:
> Since half of the patches are in common between the two series, here
> are both of them together. Under Wine I see a performance regression
> due to AIO, but I wouldn't be surprised if it is an emulation artifact
> (especially since attempts to use native AIO are converted by Wine to
> synchronous I/O + the overhead of signaling). If testing gives the same
> results on native Windows it can be reverted later.
>
> Anthony, please let me know if you want to apply this before or after
> the rename.
>
> Paolo
>
> Paolo Bonzini (25):
> event_notifier: add Win32 implementation
> event_notifier: enable it to use pipes
> aio: change qemu_aio_set_fd_handler to return void
> aio: provide platform-independent API
> aio: introduce AioContext, move bottom halves there
> aio: add I/O handlers to the AioContext interface
> aio: test node->deleted before calling io_flush
> aio: add non-blocking variant of aio_wait
> aio: prepare for introducing GSource-based dispatch
> aio: add Win32 implementation
> aio: make AioContexts GSources
> aio: add aio_notify
> aio: call aio_notify after setting I/O handlers
> main-loop: use GSource to poll AIO file descriptors
> main-loop: use aio_notify for qemu_notify_event
> aio: clean up now-unused functions
> linux-aio: use event notifiers
> qemu-thread: add QemuSemaphore
> aio: add generic thread-pool facility
> block: switch posix-aio-compat to threadpool
> raw: merge posix-aio-compat.c into block/raw-posix.c
> raw-posix: rename raw-posix-aio.h, hide unavailable prototypes
> raw-win32: add emulated AIO support
> raw-posix: move linux-aio.c to block/
> raw-win32: implement native asynchronous I/O
>
> Makefile.objs | 10 +-
> aio.c => aio-posix.c | 172 +++++---
> aio.c => aio-win32.c | 197 +++++----
> async.c | 118 ++++-
> block/Makefile.objs | 9 +-
> linux-aio.c => block/linux-aio.c | 51 +--
> block/{raw-posix-aio.h => raw-aio.h} | 29 +-
> block/raw-posix.c | 301 ++++++++++++-
> block/raw-win32.c | 221 +++++++---
> block/win32-aio.c | 226 ++++++++++
> event_notifier-posix.c | 120 +++++
> event_notifier.c => event_notifier-win32.c | 48 +-
> event_notifier.h | 20 +-
> hw/hw.h | 1 +
> iohandler.c | 1 +
> main-loop.c | 160 +++----
> main-loop.h | 56 +--
> oslib-posix.c | 31 --
> posix-aio-compat.c | 679 -----------------------------
> qemu-aio.h | 206 ++++++++-
> qemu-char.h | 1 +
> qemu-common.h | 2 +-
> qemu-coroutine-lock.c | 2 +-
> qemu-os-win32.h | 1 -
> qemu-thread-posix.c | 74 ++++
> qemu-thread-posix.h | 5 +
> qemu-thread-win32.c | 35 ++
> qemu-thread-win32.h | 4 +
> qemu-thread.h | 7 +
> thread-pool.c | 279 ++++++++++++
> thread-pool.h | 34 ++
> trace-events | 5 +
> 32 file modificati, 1926 inserzioni(+), 1179 rimozioni(-)
> copy aio.c => aio-posix.c (44%)
> rename aio.c => aio-win32.c (42%)
> rename linux-aio.c => block/linux-aio.c (82%)
> rename block/{raw-posix-aio.h => raw-aio.h} (71%)
> create mode 100644 block/win32-aio.c
> create mode 100644 event_notifier-posix.c
> rename event_notifier.c => event_notifier-win32.c (49%)
> delete mode 100644 posix-aio-compat.c
> create mode 100644 thread-pool.c
> create mode 100644 thread-pool.h
Posted a few small comments. Otherwise looks good.
Stefan
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [Qemu-devel] [PATCH 18/25] qemu-thread: add QemuSemaphore
2012-10-30 18:48 ` Stefan Hajnoczi
@ 2012-10-31 8:46 ` Paolo Bonzini
0 siblings, 0 replies; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-31 8:46 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: aliguori, qemu-devel
Il 30/10/2012 19:48, Stefan Hajnoczi ha scritto:
>> + if (rc < 0) {
>> + error_exit(errno, __func__);
>> + }
>
> Forgot to handle EINTR?
>
>> + return 0;
>> +}
>> +
>> +void qemu_sem_wait(QemuSemaphore *sem)
>> +{
>> + int rc;
>> +
>> + rc = sem_wait(&sem->sem);
>> + if (rc < 0) {
>> + error_exit(errno, __func__);
>> + }
>
> EINTR
Right!
I'm squashing this:
diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c
index dea57d4..6a3d3a1 100644
--- a/qemu-thread-posix.c
+++ b/qemu-thread-posix.c
@@ -154,7 +154,9 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
if (ms <= 0) {
/* This is cheaper than sem_timedwait. */
- rc = sem_trywait(&sem->sem);
+ do {
+ rc = sem_trywait(&sem->sem);
+ } while (rc == -1 && errno == EINTR);
if (rc == -1 && errno == EAGAIN) {
return -1;
}
@@ -168,7 +170,9 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
ts.tv_sec++;
ts.tv_nsec -= 1000000000;
}
- rc = sem_timedwait(&sem->sem, &ts);
+ do {
+ rc = sem_timedwait(&sem->sem, &ts);
+ } while (rc == -1 && errno == EINTR);
if (rc == -1 && errno == ETIMEDOUT) {
return -1;
}
@@ -183,7 +187,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
{
int rc;
- rc = sem_wait(&sem->sem);
+ do {
+ rc = sem_wait(&sem->sem);
+ } while (rc == -1 && errno == EINTR);
if (rc < 0) {
error_exit(errno, __func__);
}
Paolo
^ permalink raw reply related [flat|nested] 32+ messages in thread
* Re: [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility
2012-10-30 19:13 ` Stefan Hajnoczi
@ 2012-10-31 9:41 ` Paolo Bonzini
2012-10-31 13:44 ` Stefan Hajnoczi
0 siblings, 1 reply; 32+ messages in thread
From: Paolo Bonzini @ 2012-10-31 9:41 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: aliguori, qemu-devel
Il 30/10/2012 20:13, Stefan Hajnoczi ha scritto:
> On Fri, Oct 26, 2012 at 04:05:49PM +0200, Paolo Bonzini wrote:
>> +static void event_notifier_ready(EventNotifier *notifier)
>> +{
>> + ThreadPoolElement *elem, *next;
>> +
>> + event_notifier_test_and_clear(notifier);
>> +restart:
>> + QLIST_FOREACH_SAFE(elem, &head, all, next) {
>> + if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
>> + continue;
>> + }
>> + if (elem->state == THREAD_DONE) {
>> + trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
>> + }
>> + if (elem->state == THREAD_DONE && elem->common.cb) {
>> + QLIST_REMOVE(elem, all);
>> + elem->common.cb(elem->common.opaque, elem->ret);
>
> This function didn't take the lock. First it accessed elem->state and
> how it reads elem->ret. We need to take the lock to ensure both
> elem->state and elem->ret have been set - otherwise we could read
> elem->ret before the return value was stored.
Right. posix-aio-compat didn't need this because it only had ret.
Just as important: the locking policy was not documented at all.
I'm applying some changes. Logically (and for ease of review) they are
four patches on top of this one, but they'll be squashed in the next
submission. (Hmm, the fourth should be separate).
>> +typedef struct ThreadPoolCo {
>> + Coroutine *co;
>> + int ret;
>> +} ThreadPoolCo;
>> +
>> +static void thread_pool_co_cb(void *opaque, int ret)
>> +{
>> + ThreadPoolCo *co = opaque;
>> +
>> + co->ret = ret;
>> + qemu_coroutine_enter(co->co, NULL);
>> +}
>> +
>> +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>> +{
>> + ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
>> + assert(qemu_in_coroutine());
>> + thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
>> + qemu_coroutine_yield();
>> + return tpc.ret;
>
> It's important to understand that the submit_aio, yield, return ret
> pattern works because we assume this function was called as part of the
> main loop.
>
> If thread_pool_submit_co() was called outside the event loop and global
> mutex, then there is a race between the submit_aio and yield steps where
> thread_pool_co_cb() is called before this coroutine yields!
Even before that, thread_pool_submit_aio would race on the
non-thread-safe qemu_aio_get. Also, head is protected by the BQL.
event_notifier_ready needs to run under the BQL too, because it
accesses head and also calls qemu_aio_release.
qemu_aio_get and qemu_aio_release should be moved to AioContext, so
that they can use the (upcoming) AioContext lock instead of the BQL.
The thread pool needs to be per-AioContext instead of using globals,
too. However, this can be done later.
Paolo
----------------------- >8 ---------------------------
>From acf39d76ddf4109fbdbc897afe0d9a23ba8ffba1 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:07:04 +0100
Subject: [PATCH 1/4] fix locking in thread-pool
event_notifier_ready accesses elem->state and then elem->ret. We need
to take the lock to ensure both elem->state and elem->ret have been set -
otherwise we could read elem->ret before the return value was stored.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
thread-pool.c | 5 ++++-
1 file modificato, 4 inserzioni(+). 1 rimozione(-)
diff --git a/thread-pool.c b/thread-pool.c
index 266f12f..e4bd4f3 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -162,8 +162,11 @@ restart:
trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
}
if (elem->state == THREAD_DONE && elem->common.cb) {
+ qemu_mutex_lock(&lock);
+ int ret = elem->ret;
+ qemu_mutex_unlock(&lock);
QLIST_REMOVE(elem, all);
- elem->common.cb(elem->common.opaque, elem->ret);
+ elem->common.cb(elem->common.opaque, ret);
qemu_aio_release(elem);
goto restart;
} else {
--
1.7.12.1
>From 8671e581bb65be4d3cd82a9b99fe46735c6ea76b Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:09:26 +0100
Subject: [PATCH 2/4] document lock policy
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
thread-pool.c | 13 ++++++++++---
1 file modificato, 10 inserzioni(+), 3 rimozioni(-)
diff --git a/thread-pool.c b/thread-pool.c
index e4bd4f3..10bab70 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -42,7 +42,10 @@ struct ThreadPoolElement {
enum ThreadState state;
int ret;
+ /* Access to this list is protected by lock. */
QTAILQ_ENTRY(ThreadPoolElement) reqs;
+
+ /* Access to this list is protected by the global mutex. */
QLIST_ENTRY(ThreadPoolElement) all;
};
@@ -51,14 +54,18 @@ static QemuMutex lock;
static QemuCond check_cancel;
static QemuSemaphore sem;
static int max_threads = 64;
+static QEMUBH *new_thread_bh;
+
+/* The following variables are protected by the global mutex. */
+static QLIST_HEAD(, ThreadPoolElement) head;
+
+/* The following variables are protected by lock. */
+static QTAILQ_HEAD(, ThreadPoolElement) request_list;
static int cur_threads;
static int idle_threads;
static int new_threads; /* backlog of threads we need to create */
static int pending_threads; /* threads created but not running yet */
static int pending_cancellations; /* whether we need a cond_broadcast */
-static QEMUBH *new_thread_bh;
-static QLIST_HEAD(, ThreadPoolElement) head;
-static QTAILQ_HEAD(, ThreadPoolElement) request_list;
static void *worker_thread(void *unused)
{
--
1.7.12.1
>From 1b611e625a1a16c1d1b110e410f082b64c7ba332 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:08:39 +0100
Subject: [PATCH 3/4] simplify locking
Avoid repeated lock/unlock, take lock around the while loop rather
than inside.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
thread-pool.c | 11 +++--------
1 file modificato, 3 inserzioni(+), 8 rimozioni(-)
diff --git a/thread-pool.c b/thread-pool.c
index 10bab70..38ac5b4 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -71,25 +71,21 @@ static void *worker_thread(void *unused)
{
qemu_mutex_lock(&lock);
pending_threads--;
- qemu_mutex_unlock(&lock);
do_spawn_thread();
while (1) {
ThreadPoolElement *req;
int ret;
- qemu_mutex_lock(&lock);
- idle_threads++;
- qemu_mutex_unlock(&lock);
- ret = qemu_sem_timedwait(&sem, 10000);
- qemu_mutex_lock(&lock);
- idle_threads--;
+ do {
+ idle_threads++;
+ qemu_mutex_unlock(&lock);
+ ret = qemu_sem_timedwait(&sem, 10000);
+ qemu_mutex_lock(&lock);
+ idle_threads--;
+ } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
if (ret == -1) {
- if (QTAILQ_EMPTY(&request_list)) {
- break;
- }
- qemu_mutex_unlock(&lock);
- continue;
+ break;
}
req = QTAILQ_FIRST(&request_list);
@@ -105,14 +103,12 @@ static void *worker_thread(void *unused)
if (pending_cancellations) {
qemu_cond_broadcast(&check_cancel);
}
- qemu_mutex_unlock(&lock);
event_notifier_set(¬ifier);
}
cur_threads--;
qemu_mutex_unlock(&lock);
-
return NULL;
}
@@ -120,23 +116,22 @@ static void do_spawn_thread(void)
{
QemuThread t;
- qemu_mutex_lock(&lock);
+ /* Runs with lock taken. */
if (!new_threads) {
- qemu_mutex_unlock(&lock);
return;
}
new_threads--;
pending_threads++;
- qemu_mutex_unlock(&lock);
-
qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
}
static void spawn_thread_bh_fn(void *opaque)
{
+ qemu_mutex_lock(&lock);
do_spawn_thread();
+ qemu_mutex_unlock(&lock);
}
static void spawn_thread(void)
--
1.7.12.1
>From 3478c7db38368804db46924fc22d90cda77f6a48 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:09:11 +0100
Subject: [PATCH 4/4] threadpool: do not take lock in event_notifier_ready
The ordering is:
worker thread consumer thread
-------------------------------------------------------------------
write ret event_notifier_test_and_clear
wmb() read state
write state rmb()
event_notifier_set read ret
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
thread-pool.c | 19 +++++++++++++------
1 file modificato, 13 inserzioni(+), 6 rimozioni(-)
diff --git a/thread-pool.c b/thread-pool.c
index 38ac5b4..883948a 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -39,6 +39,11 @@ struct ThreadPoolElement {
BlockDriverAIOCB common;
ThreadPoolFunc *func;
void *arg;
+
+ /* Moving state out of THREAD_QUEUED is protected by lock. After
+ * that, only the worker thread can write to it. Reads and writes
+ * of state and ret are ordered with memory barriers.
+ */
enum ThreadState state;
int ret;
@@ -97,9 +102,12 @@ static void *worker_thread(void *unused)
ret = req->func(req->arg);
- qemu_mutex_lock(&lock);
- req->state = THREAD_DONE;
req->ret = ret;
+ /* Write ret before state. */
+ smp_wmb();
+ req->state = THREAD_DONE;
+
+ qemu_mutex_lock(&lock);
if (pending_cancellations) {
qemu_cond_broadcast(&check_cancel);
}
@@ -164,11 +172,10 @@ restart:
trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
}
if (elem->state == THREAD_DONE && elem->common.cb) {
- qemu_mutex_lock(&lock);
- int ret = elem->ret;
- qemu_mutex_unlock(&lock);
QLIST_REMOVE(elem, all);
- elem->common.cb(elem->common.opaque, ret);
+ /* Read state before ret. */
+ smp_rmb();
+ elem->common.cb(elem->common.opaque, elem->ret);
qemu_aio_release(elem);
goto restart;
} else {
--
1.7.12.1
^ permalink raw reply related [flat|nested] 32+ messages in thread
* Re: [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility
2012-10-31 9:41 ` Paolo Bonzini
@ 2012-10-31 13:44 ` Stefan Hajnoczi
0 siblings, 0 replies; 32+ messages in thread
From: Stefan Hajnoczi @ 2012-10-31 13:44 UTC (permalink / raw)
To: Paolo Bonzini; +Cc: Anthony Liguori, qemu-devel, Stefan Hajnoczi
On Wed, Oct 31, 2012 at 10:41 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 30/10/2012 20:13, Stefan Hajnoczi ha scritto:
>> On Fri, Oct 26, 2012 at 04:05:49PM +0200, Paolo Bonzini wrote:
>>> +static void event_notifier_ready(EventNotifier *notifier)
>>> +{
>>> + ThreadPoolElement *elem, *next;
>>> +
>>> + event_notifier_test_and_clear(notifier);
>>> +restart:
>>> + QLIST_FOREACH_SAFE(elem, &head, all, next) {
>>> + if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
>>> + continue;
>>> + }
>>> + if (elem->state == THREAD_DONE) {
>>> + trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
>>> + }
>>> + if (elem->state == THREAD_DONE && elem->common.cb) {
>>> + QLIST_REMOVE(elem, all);
>>> + elem->common.cb(elem->common.opaque, elem->ret);
>>
>> This function didn't take the lock. First it accessed elem->state and
>> how it reads elem->ret. We need to take the lock to ensure both
>> elem->state and elem->ret have been set - otherwise we could read
>> elem->ret before the return value was stored.
>
> Right. posix-aio-compat didn't need this because it only had ret.
> Just as important: the locking policy was not documented at all.
>
> I'm applying some changes. Logically (and for ease of review) they are
> four patches on top of this one, but they'll be squashed in the next
> submission. (Hmm, the fourth should be separate).
>
>>> +typedef struct ThreadPoolCo {
>>> + Coroutine *co;
>>> + int ret;
>>> +} ThreadPoolCo;
>>> +
>>> +static void thread_pool_co_cb(void *opaque, int ret)
>>> +{
>>> + ThreadPoolCo *co = opaque;
>>> +
>>> + co->ret = ret;
>>> + qemu_coroutine_enter(co->co, NULL);
>>> +}
>>> +
>>> +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>>> +{
>>> + ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
>>> + assert(qemu_in_coroutine());
>>> + thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
>>> + qemu_coroutine_yield();
>>> + return tpc.ret;
>>
>> It's important to understand that the submit_aio, yield, return ret
>> pattern works because we assume this function was called as part of the
>> main loop.
>>
>> If thread_pool_submit_co() was called outside the event loop and global
>> mutex, then there is a race between the submit_aio and yield steps where
>> thread_pool_co_cb() is called before this coroutine yields!
>
> Even before that, thread_pool_submit_aio would race on the
> non-thread-safe qemu_aio_get. Also, head is protected by the BQL.
>
> event_notifier_ready needs to run under the BQL too, because it
> accesses head and also calls qemu_aio_release.
>
> qemu_aio_get and qemu_aio_release should be moved to AioContext, so
> that they can use the (upcoming) AioContext lock instead of the BQL.
> The thread pool needs to be per-AioContext instead of using globals,
> too. However, this can be done later.
I just sent a patch to move from a manual freelist to g_slice_alloc().
This does away with manual pooling and is thread-safe. It's possible
that a lock + freelist is more efficient, but I think g_slice_alloc()
is a good start.
Your patches look good, looking forward to the next spin.
Stefan
^ permalink raw reply [flat|nested] 32+ messages in thread
end of thread, other threads:[~2012-10-31 13:45 UTC | newest]
Thread overview: 32+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-10-26 14:05 [Qemu-devel] [PATCH 00/25] AioContext & threadpool Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 01/25] event_notifier: add Win32 implementation Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 02/25] event_notifier: enable it to use pipes Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 03/25] aio: change qemu_aio_set_fd_handler to return void Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 04/25] aio: provide platform-independent API Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 05/25] aio: introduce AioContext, move bottom halves there Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 06/25] aio: add I/O handlers to the AioContext interface Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 07/25] aio: test node->deleted before calling io_flush Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 08/25] aio: add non-blocking variant of aio_wait Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 09/25] aio: prepare for introducing GSource-based dispatch Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 10/25] aio: add Win32 implementation Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 11/25] aio: make AioContexts GSources Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 12/25] aio: add aio_notify Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 13/25] aio: call aio_notify after setting I/O handlers Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 14/25] main-loop: use GSource to poll AIO file descriptors Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 15/25] main-loop: use aio_notify for qemu_notify_event Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 16/25] aio: clean up now-unused functions Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 17/25] linux-aio: use event notifiers Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 18/25] qemu-thread: add QemuSemaphore Paolo Bonzini
2012-10-30 18:48 ` Stefan Hajnoczi
2012-10-31 8:46 ` Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 19/25] aio: add generic thread-pool facility Paolo Bonzini
2012-10-30 19:13 ` Stefan Hajnoczi
2012-10-31 9:41 ` Paolo Bonzini
2012-10-31 13:44 ` Stefan Hajnoczi
2012-10-26 14:05 ` [Qemu-devel] [PATCH 20/25] block: switch posix-aio-compat to threadpool Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 21/25] raw: merge posix-aio-compat.c into block/raw-posix.c Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 22/25] raw-posix: rename raw-posix-aio.h, hide unavailable prototypes Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 23/25] raw-win32: add emulated AIO support Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 24/25] raw-posix: move linux-aio.c to block/ Paolo Bonzini
2012-10-26 14:05 ` [Qemu-devel] [PATCH 25/25] raw-win32: implement native asynchronous I/O Paolo Bonzini
2012-10-30 19:15 ` [Qemu-devel] [PATCH 00/25] AioContext & threadpool Stefan Hajnoczi
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).