* [PATCH v2 1/3] QIOCHannel: Add io_async_writev & io_async_flush callbacks
2021-09-22 5:03 [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
@ 2021-09-22 5:03 ` Leonardo Bras
2021-09-22 5:03 ` [PATCH v2 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush Leonardo Bras
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Leonardo Bras @ 2021-09-22 5:03 UTC (permalink / raw)
To: Daniel P. Berrangé, Juan Quintela, Dr. David Alan Gilbert,
Peter Xu, Jason Wang
Cc: Leonardo Bras, qemu-devel
Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
allowing the implementation of asynchronous writes by subclasses.
How to use them:
- Write data using qio_channel_async_writev(),
- Wait write completion with qio_channel_async_flush().
Notes:
Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
recommended to keep the write buffer untouched until the return of
qio_channel_async_flush().
As the new callbacks are optional, if a subclass does not implement them
there will be a fallback to the mandatory synchronous implementation:
- io_async_writev will fallback to io_writev,
- io_async_flush will return without changing anything.
This makes simpler for the user to make use of the asynchronous implementation.
Also, some functions like qio_channel_writev_full_all() were adapted to
offer an async version, and make better use of the new callbacks.
Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
io/channel.c | 66 ++++++++++++++++++++++++-------
2 files changed, 129 insertions(+), 30 deletions(-)
diff --git a/include/io/channel.h b/include/io/channel.h
index 88988979f8..74f2e3ae8a 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -136,6 +136,14 @@ struct QIOChannelClass {
IOHandler *io_read,
IOHandler *io_write,
void *opaque);
+ ssize_t (*io_async_writev)(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp);
+ void (*io_async_flush)(QIOChannel *ioc,
+ Error **errp);
};
/* General I/O handling functions */
@@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
* or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
* and the channel is non-blocking
*/
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds,
- size_t nfds,
- Error **errp);
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ bool async,
+ Error **errp);
+#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
+ __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
+ __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)
/**
* qio_channel_readv_all_eof:
@@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
*
* Returns: 0 if all bytes were written, or -1 on error
*/
-int qio_channel_writev_all(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- Error **erp);
+int __qio_channel_writev_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ bool async,
+ Error **erp);
+#define qio_channel_writev_all(ioc, iov, niov, erp) \
+ __qio_channel_writev_all(ioc, iov, niov, false, erp)
+#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
+ __qio_channel_writev_all(ioc, iov, niov, true, erp)
/**
* qio_channel_readv:
@@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
* Returns: 0 if all bytes were written, or -1 on error
*/
-int qio_channel_writev_full_all(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds, size_t nfds,
- Error **errp);
+int __qio_channel_writev_full_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds, size_t nfds,
+ bool async, Error **errp);
+#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+ __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+ __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
+
+/**
+ * qio_channel_async_writev:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @fds: an array of file handles to send
+ * @nfds: number of file handles in @fds
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Behaves like qio_channel_writev_full, but will send
+ * data asynchronously, this meaning this function
+ * may return before the data is actually sent.
+ *
+ * If at some point it's necessary wait for all data to be
+ * sent, use qio_channel_async_flush().
+ *
+ * If not implemented, falls back to the default writev
+ */
+
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp);
+
+/**
+ * qio_channel_async_flush:
+ * @ioc: the channel object
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Will lock until every packet queued with qio_channel_async_writev()
+ * is sent.
+ *
+ * If not implemented, returns without changing anything.
+ */
+
+void qio_channel_async_flush(QIOChannel *ioc,
+ Error **errp);
+
#endif /* QIO_CHANNEL_H */
diff --git a/io/channel.c b/io/channel.c
index e8b019dc36..a35109a006 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
}
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds,
- size_t nfds,
- Error **errp)
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ bool async,
+ Error **errp)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
@@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
return -1;
}
+ if (async) {
+ return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+ }
+
return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
}
@@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
return ret;
}
-int qio_channel_writev_all(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- Error **errp)
+int __qio_channel_writev_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ bool async,
+ Error **errp)
{
- return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
+ return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
}
-int qio_channel_writev_full_all(QIOChannel *ioc,
+int __qio_channel_writev_full_all(QIOChannel *ioc,
const struct iovec *iov,
size_t niov,
int *fds, size_t nfds,
- Error **errp)
+ bool async, Error **errp)
{
int ret = -1;
struct iovec *local_iov = g_new(struct iovec, niov);
@@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
while (nlocal_iov > 0) {
ssize_t len;
- len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
- errp);
+ len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
+ async, errp);
if (len == QIO_CHANNEL_ERR_BLOCK) {
if (qemu_in_coroutine()) {
qio_channel_yield(ioc, G_IO_OUT);
@@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
}
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if (!klass->io_async_writev) {
+ return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
+ }
+
+ return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+}
+
+
+void qio_channel_async_flush(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if (!klass->io_async_flush) {
+ return;
+ }
+
+ klass->io_async_flush(ioc, errp);
+}
+
+
static void qio_channel_restart_read(void *opaque)
{
QIOChannel *ioc = opaque;
--
2.33.0
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [PATCH v2 2/3] QIOChannelSocket: Implement io_async_write & io_async_flush
2021-09-22 5:03 [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras
2021-09-22 5:03 ` [PATCH v2 1/3] QIOCHannel: Add io_async_writev & io_async_flush callbacks Leonardo Bras
@ 2021-09-22 5:03 ` Leonardo Bras
2021-09-22 5:03 ` [PATCH v2 3/3] multifd: Send using asynchronous write on nocomp to send RAM pages Leonardo Bras
2021-09-22 22:28 ` [PATCH v2 0/3] QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd Leonardo Bras Soares Passos
3 siblings, 0 replies; 5+ messages in thread
From: Leonardo Bras @ 2021-09-22 5:03 UTC (permalink / raw)
To: Daniel P. Berrangé, Juan Quintela, Dr. David Alan Gilbert,
Peter Xu, Jason Wang
Cc: Leonardo Bras, qemu-devel
Implement the new optional callbacks io_async_write and io_async_flush on
QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is
available in the host kernel, and TCP sockets are used.
qio_channel_socket_writev() contents were moved to a helper function
__qio_channel_socket_writev() which accepts an extra 'flag' argument.
This helper function is used to implement qio_channel_socket_writev(), with
flags = 0, keeping it's behavior unchanged, and
qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY.
qio_channel_socket_async_flush() was implemented by reading the socket's error
queue, which will have information on MSG_ZEROCOPY send completion.
There is no need to worry with re-sending packets in case any error happens, as
MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs.
Notes on using async_write():
- As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
some caution is necessary to avoid overwriting any buffer before it's sent.
If something like this happen, a newer version of the buffer may be sent instead.
- If this is a problem, it's recommended to use async_flush() before freeing or
re-using the buffer.
.
Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
include/io/channel-socket.h | 2 +
io/channel-socket.c | 145 ++++++++++++++++++++++++++++++++++--
2 files changed, 140 insertions(+), 7 deletions(-)
diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
index e747e63514..4d1be0637a 100644
--- a/include/io/channel-socket.h
+++ b/include/io/channel-socket.h
@@ -47,6 +47,8 @@ struct QIOChannelSocket {
socklen_t localAddrLen;
struct sockaddr_storage remoteAddr;
socklen_t remoteAddrLen;
+ ssize_t async_queued;
+ ssize_t async_sent;
};
diff --git a/io/channel-socket.c b/io/channel-socket.c
index 606ec97cf7..128fab4cd2 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -26,9 +26,23 @@
#include "io/channel-watch.h"
#include "trace.h"
#include "qapi/clone-visitor.h"
+#ifdef CONFIG_LINUX
+#include <linux/errqueue.h>
+#include <poll.h>
+#endif
#define SOCKET_MAX_FDS 16
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp);
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+ Error **errp);
+
SocketAddress *
qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
Error **errp)
@@ -55,6 +69,8 @@ qio_channel_socket_new(void)
sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
sioc->fd = -1;
+ sioc->async_queued = 0;
+ sioc->async_sent = 0;
ioc = QIO_CHANNEL(sioc);
qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -140,6 +156,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
Error **errp)
{
int fd;
+ int ret, v = 1;
trace_qio_channel_socket_connect_sync(ioc, addr);
fd = socket_connect(addr, errp);
@@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
return -1;
}
+#ifdef CONFIG_LINUX
+ if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
+ return 0;
+ }
+
+ ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
+ if (ret >= 0) {
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+ klass->io_async_writev = qio_channel_socket_async_writev;
+ klass->io_async_flush = qio_channel_socket_async_flush;
+ }
+#endif
+
return 0;
}
@@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
return ret;
}
-static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
- const struct iovec *iov,
- size_t niov,
- int *fds,
- size_t nfds,
- Error **errp)
+static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ int flags,
+ Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
ssize_t ret;
@@ -558,7 +589,7 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
}
retry:
- ret = sendmsg(sioc->fd, &msg, 0);
+ ret = sendmsg(sioc->fd, &msg, flags);
if (ret <= 0) {
if (errno == EAGAIN) {
return QIO_CHANNEL_ERR_BLOCK;
@@ -572,6 +603,106 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
}
return ret;
}
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
+}
+
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+
+ sioc->async_queued++;
+
+ return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
+ errp);
+}
+
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ struct msghdr msg = {};
+ struct pollfd pfd;
+ struct sock_extended_err *serr;
+ struct cmsghdr *cm;
+ char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
+ int ret;
+
+ memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
+ msg.msg_control = control;
+ msg.msg_controllen = sizeof(control);
+
+ while (sioc->async_sent < sioc->async_queued) {
+ ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ /* Nothing on errqueue, wait */
+ pfd.fd = sioc->fd;
+ pfd.events = 0;
+ ret = poll(&pfd, 1, 250);
+ if (ret == 0) {
+ /*
+ * Timeout : After 250ms without receiving any zerocopy
+ * notification, consider all data as sent.
+ */
+ break;
+ } else if (ret < 0 ||
+ (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
+ error_setg_errno(errp, errno,
+ "Poll error");
+ break;
+ } else {
+ continue;
+ }
+ }
+ if (errno == EINTR) {
+ continue;
+ }
+
+ error_setg_errno(errp, errno,
+ "Unable to read errqueue");
+ break;
+ }
+
+ cm = CMSG_FIRSTHDR(&msg);
+ if (cm->cmsg_level != SOL_IP &&
+ cm->cmsg_type != IP_RECVERR) {
+ error_setg_errno(errp, EPROTOTYPE,
+ "Wrong cmsg in errqueue");
+ break;
+ }
+
+ serr = (void *) CMSG_DATA(cm);
+ if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
+ error_setg_errno(errp, serr->ee_errno,
+ "Error on socket");
+ break;
+ }
+ if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+ error_setg_errno(errp, serr->ee_origin,
+ "Error not from zerocopy");
+ break;
+ }
+
+ /* No errors, count sent ids*/
+ sioc->async_sent += serr->ee_data - serr->ee_info + 1;
+ }
+}
+
+
#else /* WIN32 */
static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
const struct iovec *iov,
--
2.33.0
^ permalink raw reply related [flat|nested] 5+ messages in thread