* [Qemu-devel] [PATCH 1/3] net: asynchronous send/receive infrastructure for net/socket.c
2012-08-20 13:26 [Qemu-devel] [PATCH 0/3] net: asynchronous send/receive for net/socket.c Stefan Hajnoczi
@ 2012-08-20 13:26 ` Stefan Hajnoczi
2012-08-20 13:27 ` [Qemu-devel] [PATCH 2/3] net: EAGAIN handling for net/socket.c UDP Stefan Hajnoczi
2012-08-20 13:27 ` [Qemu-devel] [PATCH 3/3] net: EAGAIN handling for net/socket.c TCP Stefan Hajnoczi
2 siblings, 0 replies; 6+ messages in thread
From: Stefan Hajnoczi @ 2012-08-20 13:26 UTC (permalink / raw)
To: qemu-devel; +Cc: Stefan Hajnoczi
The net/socket.c net client is not truly asynchronous. This patch
borrows the qemu_set_fd_handler2() code from net/tap.c as the basis for
proper asynchronous send/receive.
Only read packets from the socket when the peer is able to receive.
This avoids needless queuing.
Later patches implement asynchronous send.
Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
net/socket.c | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++------
1 file changed, 52 insertions(+), 6 deletions(-)
diff --git a/net/socket.c b/net/socket.c
index c172c24..54e32f0 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -42,9 +42,51 @@ typedef struct NetSocketState {
unsigned int packet_len;
uint8_t buf[4096];
struct sockaddr_in dgram_dst; /* contains inet host and port destination iff connectionless (SOCK_DGRAM) */
+ IOHandler *send_fn; /* differs between SOCK_STREAM/SOCK_DGRAM */
+ bool read_poll; /* waiting to receive data? */
+ bool write_poll; /* waiting to transmit data? */
} NetSocketState;
static void net_socket_accept(void *opaque);
+static void net_socket_writable(void *opaque);
+
+/* Only read packets from socket when peer can receive them */
+static int net_socket_can_send(void *opaque)
+{
+ NetSocketState *s = opaque;
+
+ return qemu_can_send_packet(&s->nc);
+}
+
+static void net_socket_update_fd_handler(NetSocketState *s)
+{
+ qemu_set_fd_handler2(s->fd,
+ s->read_poll ? net_socket_can_send : NULL,
+ s->read_poll ? s->send_fn : NULL,
+ s->write_poll ? net_socket_writable : NULL,
+ s);
+}
+
+static void net_socket_read_poll(NetSocketState *s, bool enable)
+{
+ s->read_poll = enable;
+ net_socket_update_fd_handler(s);
+}
+
+static void net_socket_write_poll(NetSocketState *s, bool enable)
+{
+ s->write_poll = enable;
+ net_socket_update_fd_handler(s);
+}
+
+static void net_socket_writable(void *opaque)
+{
+ NetSocketState *s = opaque;
+
+ net_socket_write_poll(s, false);
+
+ qemu_flush_queued_packets(&s->nc);
+}
/* XXX: we consider we can send the whole packet without blocking */
static ssize_t net_socket_receive(NetClientState *nc, const uint8_t *buf, size_t size)
@@ -81,7 +123,8 @@ static void net_socket_send(void *opaque)
} else if (size == 0) {
/* end of connection */
eoc:
- qemu_set_fd_handler(s->fd, NULL, NULL, NULL);
+ net_socket_read_poll(s, false);
+ net_socket_write_poll(s, false);
if (s->listen_fd != -1) {
qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
}
@@ -152,7 +195,8 @@ static void net_socket_send_dgram(void *opaque)
return;
if (size == 0) {
/* end of connection */
- qemu_set_fd_handler(s->fd, NULL, NULL, NULL);
+ net_socket_read_poll(s, false);
+ net_socket_write_poll(s, false);
return;
}
qemu_send_packet(&s->nc, s->buf, size);
@@ -243,7 +287,8 @@ static void net_socket_cleanup(NetClientState *nc)
{
NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
if (s->fd != -1) {
- qemu_set_fd_handler(s->fd, NULL, NULL, NULL);
+ net_socket_read_poll(s, false);
+ net_socket_write_poll(s, false);
close(s->fd);
s->fd = -1;
}
@@ -314,8 +359,8 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
s->fd = fd;
s->listen_fd = -1;
-
- qemu_set_fd_handler(s->fd, net_socket_send_dgram, NULL, s);
+ s->send_fn = net_socket_send_dgram;
+ net_socket_read_poll(s, true);
/* mcast: save bound address as dst */
if (is_connected) {
@@ -332,7 +377,8 @@ err:
static void net_socket_connect(void *opaque)
{
NetSocketState *s = opaque;
- qemu_set_fd_handler(s->fd, net_socket_send, NULL, s);
+ s->send_fn = net_socket_send;
+ net_socket_read_poll(s, true);
}
static NetClientInfo net_socket_info = {
--
1.7.10.4
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [Qemu-devel] [PATCH 2/3] net: EAGAIN handling for net/socket.c UDP
2012-08-20 13:26 [Qemu-devel] [PATCH 0/3] net: asynchronous send/receive for net/socket.c Stefan Hajnoczi
2012-08-20 13:26 ` [Qemu-devel] [PATCH 1/3] net: asynchronous send/receive infrastructure " Stefan Hajnoczi
@ 2012-08-20 13:27 ` Stefan Hajnoczi
2012-08-20 13:27 ` [Qemu-devel] [PATCH 3/3] net: EAGAIN handling for net/socket.c TCP Stefan Hajnoczi
2 siblings, 0 replies; 6+ messages in thread
From: Stefan Hajnoczi @ 2012-08-20 13:27 UTC (permalink / raw)
To: qemu-devel; +Cc: Stefan Hajnoczi
Implement asynchronous send for UDP (or other SOCK_DGRAM) sockets. If
send fails with EAGAIN we wait for the socket to become writable again.
Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
net/socket.c | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git a/net/socket.c b/net/socket.c
index 54e32f0..e5e4e8d 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -102,9 +102,19 @@ static ssize_t net_socket_receive(NetClientState *nc, const uint8_t *buf, size_t
static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf, size_t size)
{
NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
+ ssize_t ret;
- return sendto(s->fd, (const void *)buf, size, 0,
- (struct sockaddr *)&s->dgram_dst, sizeof(s->dgram_dst));
+ do {
+ ret = sendto(s->fd, buf, size, 0,
+ (struct sockaddr *)&s->dgram_dst,
+ sizeof(s->dgram_dst));
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1 && errno == EAGAIN) {
+ net_socket_write_poll(s, true);
+ return 0;
+ }
+ return ret;
}
static void net_socket_send(void *opaque)
--
1.7.10.4
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [Qemu-devel] [PATCH 3/3] net: EAGAIN handling for net/socket.c TCP
2012-08-20 13:26 [Qemu-devel] [PATCH 0/3] net: asynchronous send/receive for net/socket.c Stefan Hajnoczi
2012-08-20 13:26 ` [Qemu-devel] [PATCH 1/3] net: asynchronous send/receive infrastructure " Stefan Hajnoczi
2012-08-20 13:27 ` [Qemu-devel] [PATCH 2/3] net: EAGAIN handling for net/socket.c UDP Stefan Hajnoczi
@ 2012-08-20 13:27 ` Stefan Hajnoczi
2012-08-20 14:57 ` Peter Maydell
2 siblings, 1 reply; 6+ messages in thread
From: Stefan Hajnoczi @ 2012-08-20 13:27 UTC (permalink / raw)
To: qemu-devel; +Cc: Stefan Hajnoczi
Replace spinning send_all() with a proper non-blocking send. When the
socket write buffer limit is reached, we should stop trying to send and
wait for the socket to become writable again.
Non-blocking TCP sockets can return in two different ways when the write
buffer limit is reached:
1. ret = -1 and errno = EAGAIN/EWOULDBLOCK. No data has been written.
2. ret < total_size. Short write, only part of the message was
transmitted.
Handle both cases and keep track of how many bytes have been written in
s->send_index. (This includes the 'length' header before the actual
payload buffer.)
Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
net/socket.c | 50 +++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 45 insertions(+), 5 deletions(-)
diff --git a/net/socket.c b/net/socket.c
index e5e4e8d..be44105 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -40,6 +40,7 @@ typedef struct NetSocketState {
int state; /* 0 = getting length, 1 = getting data */
unsigned int index;
unsigned int packet_len;
+ unsigned int send_index; /* number of bytes sent (only SOCK_STREAM) */
uint8_t buf[4096];
struct sockaddr_in dgram_dst; /* contains inet host and port destination iff connectionless (SOCK_DGRAM) */
IOHandler *send_fn; /* differs between SOCK_STREAM/SOCK_DGRAM */
@@ -88,15 +89,54 @@ static void net_socket_writable(void *opaque)
qemu_flush_queued_packets(&s->nc);
}
-/* XXX: we consider we can send the whole packet without blocking */
static ssize_t net_socket_receive(NetClientState *nc, const uint8_t *buf, size_t size)
{
NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
- uint32_t len;
- len = htonl(size);
+ struct iovec iov[2];
+ int iovcnt = 0;
+ ssize_t ret;
+ ssize_t total_size = 0;
+ uint32_t len = htonl(size);
+ unsigned int buf_index;
+
+ /* Length header */
+ if (s->send_index < sizeof(len)) {
+ iov[iovcnt].iov_base = (uint8_t *)&len + s->send_index;
+ iov[iovcnt].iov_len = sizeof(len) - s->send_index;
+ total_size += iov[iovcnt].iov_len;
+ iovcnt++;
+
+ buf_index = 0;
+ } else {
+ buf_index = s->send_index - sizeof(len);
+ }
+
+ assert(buf_index < size);
+
+ /* Payload buffer */
+ iov[iovcnt].iov_base = (uint8_t *)buf + buf_index;
+ iov[iovcnt].iov_len = size - buf_index;
+ total_size += iov[iovcnt].iov_len;
+ iovcnt++;
+
+ do {
+ ret = writev(s->fd, iov, iovcnt);
+ } while (ret == -1 && ret == EINTR);
- send_all(s->fd, (const uint8_t *)&len, sizeof(len));
- return send_all(s->fd, buf, size);
+ if (ret == -1 && errno == EAGAIN) {
+ ret = 0; /* handled further down */
+ }
+ if (ret == -1) {
+ s->send_index = 0;
+ return -errno;
+ }
+ if (ret < total_size) {
+ s->send_index += ret;
+ net_socket_write_poll(s, true);
+ return 0;
+ }
+ s->send_index = 0;
+ return size;
}
static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf, size_t size)
--
1.7.10.4
^ permalink raw reply related [flat|nested] 6+ messages in thread