qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Fam Zheng <famz@redhat.com>
To: qemu-devel@nongnu.org
Cc: Kevin Wolf <kwolf@redhat.com>,
	Paolo Bonzini <pbonzini@redhat.com>,
	qemu-block@nongnu.org
Subject: [Qemu-devel] [PATCH v3 3/3] nbd-server: Coroutine based negotiation
Date: Thu, 14 Jan 2016 16:41:03 +0800	[thread overview]
Message-ID: <1452760863-25350-4-git-send-email-famz@redhat.com> (raw)
In-Reply-To: <1452760863-25350-1-git-send-email-famz@redhat.com>

Create a coroutine in nbd_client_new, so that nbd_send_negotiate doesn't
need qemu_set_block().

Handlers need to be set temporarily for csock fd in case the coroutine
yields during I/O.

With this, if the other end disappears in the middle of the negotiation,
we don't block the whole event loop.

To make the code clearer, unify all function names that belong to
negotiate, so they are less likely to be misused. This is important
because we rely on negotiation staying in main loop, as commented in
nbd_negotiate_read/write().

Signed-off-by: Fam Zheng <famz@redhat.com>
---
 nbd/server.c | 150 ++++++++++++++++++++++++++++++++++++++++-------------------
 1 file changed, 103 insertions(+), 47 deletions(-)

diff --git a/nbd/server.c b/nbd/server.c
index 7780ff2..b51642a 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -93,13 +93,45 @@ static void nbd_set_handlers(NBDClient *client);
 static void nbd_unset_handlers(NBDClient *client);
 static void nbd_update_can_read(NBDClient *client);
 
-static ssize_t drop_sync(int fd, size_t size)
+static void nbd_negotiate_continue(void *opaque)
+{
+    qemu_coroutine_enter(opaque, NULL);
+}
+
+static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
+{
+    ssize_t ret;
+
+    assert(qemu_in_coroutine());
+    /* Negotiation are always in main loop. */
+    qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
+                        qemu_coroutine_self());
+    ret = read_sync(fd, buffer, size);
+    qemu_set_fd_handler(fd, NULL, NULL, NULL);
+    return ret;
+
+}
+
+static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
+{
+    ssize_t ret;
+
+    assert(qemu_in_coroutine());
+    /* Negotiation are always in main loop. */
+    qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
+                        qemu_coroutine_self());
+    ret = write_sync(fd, buffer, size);
+    qemu_set_fd_handler(fd, NULL, NULL, NULL);
+    return ret;
+}
+
+static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
 {
     ssize_t ret, dropped = size;
     uint8_t *buffer = g_malloc(MIN(65536, size));
 
     while (size > 0) {
-        ret = read_sync(fd, buffer, MIN(65536, size));
+        ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
         if (ret < 0) {
             g_free(buffer);
             return ret;
@@ -140,96 +172,96 @@ static ssize_t drop_sync(int fd, size_t size)
 
 */
 
-static int nbd_send_rep(int csock, uint32_t type, uint32_t opt)
+static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
 {
     uint64_t magic;
     uint32_t len;
 
     magic = cpu_to_be64(NBD_REP_MAGIC);
-    if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
         LOG("write failed (rep magic)");
         return -EINVAL;
     }
     opt = cpu_to_be32(opt);
-    if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+    if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
         LOG("write failed (rep opt)");
         return -EINVAL;
     }
     type = cpu_to_be32(type);
-    if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) {
+    if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
         LOG("write failed (rep type)");
         return -EINVAL;
     }
     len = cpu_to_be32(0);
-    if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (rep data length)");
         return -EINVAL;
     }
     return 0;
 }
 
-static int nbd_send_rep_list(int csock, NBDExport *exp)
+static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
 {
     uint64_t magic, name_len;
     uint32_t opt, type, len;
 
     name_len = strlen(exp->name);
     magic = cpu_to_be64(NBD_REP_MAGIC);
-    if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
         LOG("write failed (magic)");
         return -EINVAL;
      }
     opt = cpu_to_be32(NBD_OPT_LIST);
-    if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+    if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
         LOG("write failed (opt)");
         return -EINVAL;
     }
     type = cpu_to_be32(NBD_REP_SERVER);
-    if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) {
+    if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
         LOG("write failed (reply type)");
         return -EINVAL;
     }
     len = cpu_to_be32(name_len + sizeof(len));
-    if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (length)");
         return -EINVAL;
     }
     len = cpu_to_be32(name_len);
-    if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (length)");
         return -EINVAL;
     }
-    if (write_sync(csock, exp->name, name_len) != name_len) {
+    if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
         LOG("write failed (buffer)");
         return -EINVAL;
     }
     return 0;
 }
 
-static int nbd_handle_list(NBDClient *client, uint32_t length)
+static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
 {
     int csock;
     NBDExport *exp;
 
     csock = client->sock;
     if (length) {
-        if (drop_sync(csock, length) != length) {
+        if (nbd_negotiate_drop_sync(csock, length) != length) {
             return -EIO;
         }
-        return nbd_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
+        return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
     }
 
     /* For each export, send a NBD_REP_SERVER reply. */
     QTAILQ_FOREACH(exp, &exports, next) {
-        if (nbd_send_rep_list(csock, exp)) {
+        if (nbd_negotiate_send_rep_list(csock, exp)) {
             return -EINVAL;
         }
     }
     /* Finish with a NBD_REP_ACK. */
-    return nbd_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
+    return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
 }
 
-static int nbd_handle_export_name(NBDClient *client, uint32_t length)
+static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
 {
     int rc = -EINVAL, csock = client->sock;
     char name[256];
@@ -242,7 +274,7 @@ static int nbd_handle_export_name(NBDClient *client, uint32_t length)
         LOG("Bad length received");
         goto fail;
     }
-    if (read_sync(csock, name, length) != length) {
+    if (nbd_negotiate_read(csock, name, length) != length) {
         LOG("read failed");
         goto fail;
     }
@@ -261,7 +293,7 @@ fail:
     return rc;
 }
 
-static int nbd_receive_options(NBDClient *client)
+static int nbd_negotiate_options(NBDClient *client)
 {
     int csock = client->sock;
     uint32_t flags;
@@ -280,7 +312,7 @@ static int nbd_receive_options(NBDClient *client)
         ...           Rest of request
     */
 
-    if (read_sync(csock, &flags, sizeof(flags)) != sizeof(flags)) {
+    if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
         LOG("read failed");
         return -EIO;
     }
@@ -296,7 +328,7 @@ static int nbd_receive_options(NBDClient *client)
         uint32_t tmp, length;
         uint64_t magic;
 
-        if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+        if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
             LOG("read failed");
             return -EINVAL;
         }
@@ -306,12 +338,13 @@ static int nbd_receive_options(NBDClient *client)
             return -EINVAL;
         }
 
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             LOG("read failed");
             return -EINVAL;
         }
 
-        if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) {
+        if (nbd_negotiate_read(csock, &length,
+                               sizeof(length)) != sizeof(length)) {
             LOG("read failed");
             return -EINVAL;
         }
@@ -320,7 +353,7 @@ static int nbd_receive_options(NBDClient *client)
         TRACE("Checking option");
         switch (be32_to_cpu(tmp)) {
         case NBD_OPT_LIST:
-            ret = nbd_handle_list(client, length);
+            ret = nbd_negotiate_handle_list(client, length);
             if (ret < 0) {
                 return ret;
             }
@@ -330,19 +363,25 @@ static int nbd_receive_options(NBDClient *client)
             return -EINVAL;
 
         case NBD_OPT_EXPORT_NAME:
-            return nbd_handle_export_name(client, length);
+            return nbd_negotiate_handle_export_name(client, length);
 
         default:
             tmp = be32_to_cpu(tmp);
             LOG("Unsupported option 0x%x", tmp);
-            nbd_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
+            nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
             return -EINVAL;
         }
     }
 }
 
-static int nbd_send_negotiate(NBDClient *client)
+typedef struct {
+    NBDClient *client;
+    Coroutine *co;
+} NBDClientNewData;
+
+static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
 {
+    NBDClient *client = data->client;
     int csock = client->sock;
     char buf[8 + 8 + 8 + 128];
     int rc;
@@ -368,7 +407,6 @@ static int nbd_send_negotiate(NBDClient *client)
         [28 .. 151]   reserved     (0)
      */
 
-    qemu_set_block(csock);
     rc = -EINVAL;
 
     TRACE("Beginning negotiation.");
@@ -385,16 +423,16 @@ static int nbd_send_negotiate(NBDClient *client)
     }
 
     if (client->exp) {
-        if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
+        if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
             LOG("write failed");
             goto fail;
         }
     } else {
-        if (write_sync(csock, buf, 18) != 18) {
+        if (nbd_negotiate_write(csock, buf, 18) != 18) {
             LOG("write failed");
             goto fail;
         }
-        rc = nbd_receive_options(client);
+        rc = nbd_negotiate_options(client);
         if (rc != 0) {
             LOG("option negotiation failed");
             goto fail;
@@ -403,7 +441,8 @@ static int nbd_send_negotiate(NBDClient *client)
         assert ((client->exp->nbdflags & ~65535) == 0);
         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
-        if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) {
+        if (nbd_negotiate_write(csock, buf + 18,
+                                sizeof(buf) - 18) != sizeof(buf) - 18) {
             LOG("write failed");
             goto fail;
         }
@@ -412,7 +451,6 @@ static int nbd_send_negotiate(NBDClient *client)
     TRACE("Negotiation succeeded.");
     rc = 0;
 fail:
-    qemu_set_nonblock(csock);
     return rc;
 }
 
@@ -1028,25 +1066,43 @@ static void nbd_update_can_read(NBDClient *client)
     }
 }
 
+static coroutine_fn void nbd_co_client_start(void *opaque)
+{
+    NBDClientNewData *data = opaque;
+    NBDClient *client = data->client;
+    NBDExport *exp = client->exp;
+
+    if (exp) {
+        nbd_export_get(exp);
+    }
+    if (nbd_negotiate(data)) {
+        shutdown(client->sock, 2);
+        client->close(client);
+        goto out;
+    }
+    qemu_co_mutex_init(&client->send_lock);
+    nbd_set_handlers(client);
+
+    if (exp) {
+        QTAILQ_INSERT_TAIL(&exp->clients, client, next);
+    }
+out:
+    g_free(data);
+}
+
 void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
 {
     NBDClient *client;
+    NBDClientNewData *data = g_new(NBDClientNewData, 1);
+
     client = g_malloc0(sizeof(NBDClient));
     client->refcount = 1;
     client->exp = exp;
     client->sock = csock;
     client->can_read = true;
-    if (nbd_send_negotiate(client)) {
-        shutdown(client->sock, 2);
-        close_fn(client);
-        return;
-    }
     client->close = close_fn;
-    qemu_co_mutex_init(&client->send_lock);
-    nbd_set_handlers(client);
 
-    if (exp) {
-        QTAILQ_INSERT_TAIL(&exp->clients, client, next);
-        nbd_export_get(exp);
-    }
+    data->client = client;
+    data->co = qemu_coroutine_create(nbd_co_client_start);
+    qemu_coroutine_enter(data->co, data);
 }
-- 
2.4.3

  parent reply	other threads:[~2016-01-14  8:41 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2016-01-14  8:41 [Qemu-devel] [PATCH v3 0/3] nbd: Async built-in server negotiation Fam Zheng
2016-01-14  8:41 ` [Qemu-devel] [PATCH v3 1/3] nbd: Always call "close_fn" in nbd_client_new Fam Zheng
2016-01-14 17:23   ` Daniel P. Berrange
2016-01-14  8:41 ` [Qemu-devel] [PATCH v3 2/3] nbd: Split nbd.c Fam Zheng
2016-01-14 17:26   ` Daniel P. Berrange
2016-01-14 20:31     ` Paolo Bonzini
2016-01-14  8:41 ` Fam Zheng [this message]
2016-01-14 17:29   ` [Qemu-devel] [PATCH v3 3/3] nbd-server: Coroutine based negotiation Daniel P. Berrange

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1452760863-25350-4-git-send-email-famz@redhat.com \
    --to=famz@redhat.com \
    --cc=kwolf@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=qemu-block@nongnu.org \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).