From: Paolo Bonzini <pbonzini@redhat.com>
To: qemu-devel@nongnu.org
Subject: [Qemu-devel] [PATCH 24/25] qemu-nbd: asynchronous operation
Date: Tue, 6 Dec 2011 16:27:51 +0100 [thread overview]
Message-ID: <1323185272-2610-25-git-send-email-pbonzini@redhat.com> (raw)
In-Reply-To: <1323185272-2610-1-git-send-email-pbonzini@redhat.com>
Using coroutines enable asynchronous operation on both the network and
the block side. Network can be owned by two coroutines at the same time,
one writing and one reading. On the send side, mutual exclusion is
guaranteed by a CoMutex. On the receive side, mutual exclusion is
guaranteed because new coroutines immediately start receiving data,
and no new coroutines are created as long as the previous one is receiving.
Between receive and send, qemu-nbd can have an arbitrary number of
in-flight block transfers. Throttling is implemented by the next
patch.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
nbd.c | 74 ++++++++++++++++++++++++++++++++++++++++++++++------------------
1 files changed, 53 insertions(+), 21 deletions(-)
diff --git a/nbd.c b/nbd.c
index ee5325b..7eaaf88 100644
--- a/nbd.c
+++ b/nbd.c
@@ -20,6 +20,8 @@
#include "block.h"
#include "block_int.h"
+#include "qemu-coroutine.h"
+
#include <errno.h>
#include <string.h>
#ifndef _WIN32
@@ -607,6 +609,11 @@ struct NBDClient {
NBDExport *exp;
int sock;
+
+ Coroutine *recv_coroutine;
+
+ CoMutex send_lock;
+ Coroutine *send_coroutine;
};
static void nbd_client_get(NBDClient *client)
@@ -681,13 +688,20 @@ void nbd_export_close(NBDExport *exp)
g_free(exp);
}
-static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply,
+static void nbd_read(void *opaque);
+static void nbd_restart_write(void *opaque);
+
+static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
int len)
{
NBDClient *client = req->client;
int csock = client->sock;
int rc, ret;
+ qemu_co_mutex_lock(&client->send_lock);
+ qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client);
+ client->send_coroutine = qemu_coroutine_self();
+
if (!len) {
rc = nbd_send_reply(csock, reply);
if (rc == -1) {
@@ -697,7 +711,7 @@ static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply,
socket_set_cork(csock, 1);
rc = nbd_send_reply(csock, reply);
if (rc != -1) {
- ret = write_sync(csock, req->data, len);
+ ret = qemu_co_send(csock, req->data, len);
if (ret != len) {
errno = EIO;
rc = -1;
@@ -708,15 +722,20 @@ static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply,
}
socket_set_cork(csock, 0);
}
+
+ client->send_coroutine = NULL;
+ qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
+ qemu_co_mutex_unlock(&client->send_lock);
return rc;
}
-static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request)
+static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
{
NBDClient *client = req->client;
int csock = client->sock;
int rc;
+ client->recv_coroutine = qemu_coroutine_self();
if (nbd_receive_request(csock, request) == -1) {
rc = -EIO;
goto out;
@@ -741,7 +760,7 @@ static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request)
if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) {
TRACE("Reading %u byte(s)", request->len);
- if (read_sync(csock, req->data, request->len) != request->len) {
+ if (qemu_co_recv(csock, req->data, request->len) != request->len) {
LOG("reading from socket failed");
rc = -EIO;
goto out;
@@ -750,21 +769,22 @@ static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request)
rc = 0;
out:
+ client->recv_coroutine = NULL;
return rc;
}
-static int nbd_trip(NBDClient *client)
+static void nbd_trip(void *opaque)
{
+ NBDClient *client = opaque;
NBDRequest *req = nbd_request_get(client);
NBDExport *exp = client->exp;
struct nbd_request request;
struct nbd_reply reply;
- int rc = -1;
int ret;
TRACE("Reading request.");
- ret = nbd_do_receive_request(req, &request);
+ ret = nbd_co_receive_request(req, &request);
if (ret == -EIO) {
goto out;
}
@@ -799,7 +819,7 @@ static int nbd_trip(NBDClient *client)
}
TRACE("Read %u byte(s)", request.len);
- if (nbd_do_send_reply(req, &reply, request.len) < 0)
+ if (nbd_co_send_reply(req, &reply, request.len) < 0)
goto out;
break;
case NBD_CMD_WRITE:
@@ -822,7 +842,7 @@ static int nbd_trip(NBDClient *client)
}
if (request.type & NBD_CMD_FLAG_FUA) {
- ret = bdrv_flush(exp->bs);
+ ret = bdrv_co_flush(exp->bs);
if (ret < 0) {
LOG("flush failed");
reply.error = -ret;
@@ -830,34 +850,34 @@ static int nbd_trip(NBDClient *client)
}
}
- if (nbd_do_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out;
break;
case NBD_CMD_DISC:
TRACE("Request type is DISCONNECT");
errno = 0;
- return 1;
+ goto out;
case NBD_CMD_FLUSH:
TRACE("Request type is FLUSH");
- ret = bdrv_flush(exp->bs);
+ ret = bdrv_co_flush(exp->bs);
if (ret < 0) {
LOG("flush failed");
reply.error = -ret;
}
- if (nbd_do_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out;
break;
case NBD_CMD_TRIM:
TRACE("Request type is TRIM");
- ret = bdrv_discard(exp->bs, (request.from + exp->dev_offset) / 512,
- request.len / 512);
+ ret = bdrv_co_discard(exp->bs, (request.from + exp->dev_offset) / 512,
+ request.len / 512);
if (ret < 0) {
LOG("discard failed");
reply.error = -ret;
}
- if (nbd_do_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out;
break;
default:
@@ -865,28 +885,39 @@ static int nbd_trip(NBDClient *client)
invalid_request:
reply.error = -EINVAL;
error_reply:
- if (nbd_do_send_reply(req, &reply, 0) == -1)
+ if (nbd_co_send_reply(req, &reply, 0) == -1)
goto out;
break;
}
TRACE("Request/Reply complete");
- rc = 0;
+ nbd_request_put(req);
+ return;
+
out:
nbd_request_put(req);
- return rc;
+ nbd_client_close(client);
}
static void nbd_read(void *opaque)
{
NBDClient *client = opaque;
- if (nbd_trip(client) != 0) {
- nbd_client_close(client);
+ if (client->recv_coroutine) {
+ qemu_coroutine_enter(client->recv_coroutine, NULL);
+ } else {
+ qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
}
}
+static void nbd_restart_write(void *opaque)
+{
+ NBDClient *client = opaque;
+
+ qemu_coroutine_enter(client->send_coroutine, NULL);
+}
+
NBDClient *nbd_client_new(NBDExport *exp, int csock,
void (*close)(NBDClient *))
{
@@ -899,6 +930,7 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock,
client->exp = exp;
client->sock = csock;
client->close = close;
+ qemu_co_mutex_init(&client->send_lock);
qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
return client;
}
--
1.7.7.1
next prev parent reply other threads:[~2011-12-06 15:29 UTC|newest]
Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top
2011-12-06 15:27 [Qemu-devel] [PATCH 00/25] nbd asynchronous operation Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 01/25] add qemu_send_full and qemu_recv_full Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 02/25] sheepdog: move coroutine send/recv function to generic code Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 03/25] nbd: switch to asynchronous operation Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 04/25] nbd: split requests Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 05/25] nbd: allow multiple in-flight requests Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 06/25] nbd: fix error handling in the server Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 07/25] nbd: add support for NBD_CMD_FLAG_FUA Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 08/25] nbd: add support for NBD_CMD_FLUSH Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 09/25] nbd: add support for NBD_CMD_TRIM Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 10/25] Update ioctl order in nbd_init() to detect EBUSY Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 11/25] qemu-nbd: remove offset argument to nbd_trip Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 12/25] qemu-nbd: remove data_size " Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 13/25] move corking functions to osdep.c Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 14/25] qemu-nbd: simplify nbd_trip Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 15/25] qemu-nbd: introduce nbd_do_send_reply Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 16/25] qemu-nbd: more robust handling of invalid requests Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 17/25] qemu-nbd: introduce nbd_do_receive_request Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 18/25] qemu-nbd: introduce NBDExport Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 19/25] qemu-nbd: introduce NBDRequest Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 20/25] link the main loop and its dependencies into the tools Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 21/25] qemu-nbd: use common main loop Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 22/25] qemu-nbd: move client handling to nbd.c Paolo Bonzini
2011-12-06 15:27 ` [Qemu-devel] [PATCH 23/25] qemu-nbd: add client pointer to NBDRequest Paolo Bonzini
2011-12-06 15:27 ` Paolo Bonzini [this message]
2011-12-06 15:27 ` [Qemu-devel] [PATCH 25/25] qemu-nbd: throttle requests Paolo Bonzini
2011-12-15 10:21 ` [Qemu-devel] [PATCH 00/25] nbd asynchronous operation Paolo Bonzini
2011-12-15 11:09 ` Kevin Wolf
2011-12-21 18:11 ` Paolo Bonzini
2011-12-21 19:37 ` Anthony Liguori
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1323185272-2610-25-git-send-email-pbonzini@redhat.com \
--to=pbonzini@redhat.com \
--cc=qemu-devel@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).