From: Lukas Straub <lukasstraub2@web.de>
To: qemu-devel <qemu-devel@nongnu.org>
Cc: "Kevin Wolf" <kwolf@redhat.com>,
"Daniel P. Berrangé" <berrange@redhat.com>,
qemu-block <qemu-block@nongnu.org>,
"Juan Quintela" <quintela@redhat.com>,
"Markus Armbruster" <armbru@redhat.com>,
"Dr. David Alan Gilbert" <dgilbert@redhat.com>,
"Max Reitz" <mreitz@redhat.com>,
"Paolo Bonzini" <pbonzini@redhat.com>,
"Marc-André Lureau" <marcandre.lureau@redhat.com>
Subject: [PATCH v10 2/8] block/nbd.c: Add yank feature
Date: Fri, 30 Oct 2020 17:41:12 +0100 [thread overview]
Message-ID: <20201030174112.78a52061@luklap> (raw)
In-Reply-To: <cover.1604075469.git.lukasstraub2@web.de>
[-- Attachment #1: Type: text/plain, Size: 17440 bytes --]
Register a yank function which shuts down the socket and sets
s->state = NBD_CLIENT_QUIT. This is the same behaviour as if an
error occured.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
---
block/nbd.c | 154 +++++++++++++++++++++++++++++++---------------------
1 file changed, 93 insertions(+), 61 deletions(-)
diff --git a/block/nbd.c b/block/nbd.c
index 4548046cd7..d66c84ee40 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -35,6 +35,7 @@
#include "qemu/option.h"
#include "qemu/cutils.h"
#include "qemu/main-loop.h"
+#include "qemu/atomic.h"
#include "qapi/qapi-visit-sockets.h"
#include "qapi/qmp/qstring.h"
@@ -44,6 +45,8 @@
#include "block/nbd.h"
#include "block/block_int.h"
+#include "qemu/yank.h"
+
#define EN_OPTSTR ":exportname="
#define MAX_NBD_REQUESTS 16
@@ -140,14 +143,13 @@ typedef struct BDRVNBDState {
NBDConnectThread *connect_thread;
} BDRVNBDState;
-static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
- Error **errp);
-static QIOChannelSocket *nbd_co_establish_connection(BlockDriverState *bs,
- Error **errp);
+static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
+ Error **errp);
+static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp);
static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
bool detach);
-static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc,
- Error **errp);
+static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
+static void nbd_yank(void *opaque);
static void nbd_clear_bdrvstate(BDRVNBDState *s)
{
@@ -165,12 +167,12 @@ static void nbd_clear_bdrvstate(BDRVNBDState *s)
static void nbd_channel_error(BDRVNBDState *s, int ret)
{
if (ret == -EIO) {
- if (s->state == NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
NBD_CLIENT_CONNECTING_NOWAIT;
}
} else {
- if (s->state == NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
s->state = NBD_CLIENT_QUIT;
@@ -203,7 +205,7 @@ static void reconnect_delay_timer_cb(void *opaque)
{
BDRVNBDState *s = opaque;
- if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
while (qemu_co_enter_next(&s->free_sema, NULL)) {
/* Resume all queued requests */
@@ -215,7 +217,7 @@ static void reconnect_delay_timer_cb(void *opaque)
static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
{
- if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
return;
}
@@ -260,7 +262,7 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
* s->connection_co is either yielded from nbd_receive_reply or from
* nbd_co_reconnect_loop()
*/
- if (s->state == NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
}
@@ -286,7 +288,7 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
reconnect_delay_timer_del(s);
- if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
qemu_co_queue_restart_all(&s->free_sema);
}
@@ -337,13 +339,14 @@ static void nbd_teardown_connection(BlockDriverState *bs)
static bool nbd_client_connecting(BDRVNBDState *s)
{
- return s->state == NBD_CLIENT_CONNECTING_WAIT ||
- s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+ NBDClientState state = qatomic_load_acquire(&s->state);
+ return state == NBD_CLIENT_CONNECTING_WAIT ||
+ state == NBD_CLIENT_CONNECTING_NOWAIT;
}
static bool nbd_client_connecting_wait(BDRVNBDState *s)
{
- return s->state == NBD_CLIENT_CONNECTING_WAIT;
+ return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
}
static void connect_bh(void *opaque)
@@ -423,12 +426,12 @@ static void *connect_thread_func(void *opaque)
return NULL;
}
-static QIOChannelSocket *coroutine_fn
+static int coroutine_fn
nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
{
+ int ret;
QemuThread thread;
BDRVNBDState *s = bs->opaque;
- QIOChannelSocket *res;
NBDConnectThread *thr = s->connect_thread;
qemu_mutex_lock(&thr->mutex);
@@ -445,10 +448,12 @@ nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
case CONNECT_THREAD_SUCCESS:
/* Previous attempt finally succeeded in background */
thr->state = CONNECT_THREAD_NONE;
- res = thr->sioc;
+ s->sioc = thr->sioc;
thr->sioc = NULL;
+ yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+ nbd_yank, bs);
qemu_mutex_unlock(&thr->mutex);
- return res;
+ return 0;
case CONNECT_THREAD_RUNNING:
/* Already running, will wait */
break;
@@ -480,8 +485,13 @@ nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
thr->state = CONNECT_THREAD_NONE;
error_propagate(errp, thr->err);
thr->err = NULL;
- res = thr->sioc;
+ s->sioc = thr->sioc;
thr->sioc = NULL;
+ if (s->sioc) {
+ yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+ nbd_yank, bs);
+ }
+ ret = (s->sioc ? 0 : -1);
break;
case CONNECT_THREAD_RUNNING:
case CONNECT_THREAD_RUNNING_DETACHED:
@@ -490,7 +500,7 @@ nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
* failed. Still connect thread is executing in background, and its
* result may be used for next connection attempt.
*/
- res = NULL;
+ ret = -1;
error_setg(errp, "Connection attempt cancelled by other operation");
break;
@@ -507,7 +517,7 @@ nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
qemu_mutex_unlock(&thr->mutex);
- return res;
+ return ret;
}
/*
@@ -560,7 +570,6 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
{
int ret;
Error *local_err = NULL;
- QIOChannelSocket *sioc;
if (!nbd_client_connecting(s)) {
return;
@@ -593,21 +602,22 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
/* Finalize previous connection if any */
if (s->ioc) {
qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+ nbd_yank, s->bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
}
- sioc = nbd_co_establish_connection(s->bs, &local_err);
- if (!sioc) {
+ if (nbd_co_establish_connection(s->bs, &local_err) < 0) {
ret = -ECONNREFUSED;
goto out;
}
bdrv_dec_in_flight(s->bs);
- ret = nbd_client_handshake(s->bs, sioc, &local_err);
+ ret = nbd_client_handshake(s->bs, &local_err);
if (s->drained) {
s->wait_drained_end = true;
@@ -639,7 +649,7 @@ static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
- if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
s->reconnect_delay * NANOSECONDS_PER_SECOND);
}
@@ -682,7 +692,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
int ret = 0;
Error *local_err = NULL;
- while (s->state != NBD_CLIENT_QUIT) {
+ while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
/*
* The NBD client can only really be considered idle when it has
* yielded from qio_channel_readv_all_eof(), waiting for data. This is
@@ -697,7 +707,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
nbd_co_reconnect_loop(s);
}
- if (s->state != NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
continue;
}
@@ -752,6 +762,8 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
s->connection_co = NULL;
if (s->ioc) {
qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+ nbd_yank, s->bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
object_unref(OBJECT(s->ioc));
@@ -776,7 +788,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
}
- if (s->state != NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
rc = -EIO;
goto err;
}
@@ -803,7 +815,8 @@ static int nbd_co_send_request(BlockDriverState *bs,
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
- if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED &&
+ rc >= 0) {
if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
NULL) < 0) {
rc = -EIO;
@@ -1118,7 +1131,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
s->requests[i].receiving = true;
qemu_coroutine_yield();
s->requests[i].receiving = false;
- if (s->state != NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
error_setg(errp, "Connection closed");
return -EIO;
}
@@ -1277,7 +1290,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
NBDReply local_reply;
NBDStructuredReplyChunk *chunk;
Error *local_err = NULL;
- if (s->state != NBD_CLIENT_CONNECTED) {
+ if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
error_setg(&local_err, "Connection closed");
nbd_iter_channel_error(iter, -EIO, &local_err);
goto break_loop;
@@ -1302,7 +1315,8 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
}
/* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
- if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) {
+ if (nbd_reply_is_simple(reply) ||
+ qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
goto break_loop;
}
@@ -1734,6 +1748,15 @@ static int nbd_client_reopen_prepare(BDRVReopenState *state,
return 0;
}
+static void nbd_yank(void *opaque)
+{
+ BlockDriverState *bs = opaque;
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+ qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
+ qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+}
+
static void nbd_client_close(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
@@ -1746,52 +1769,53 @@ static void nbd_client_close(BlockDriverState *bs)
nbd_teardown_connection(bs);
}
-static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
- Error **errp)
+static int nbd_establish_connection(BlockDriverState *bs,
+ SocketAddress *saddr,
+ Error **errp)
{
ERRP_GUARD();
- QIOChannelSocket *sioc;
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- sioc = qio_channel_socket_new();
- qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
+ s->sioc = qio_channel_socket_new();
+ qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client");
- qio_channel_socket_connect_sync(sioc, saddr, errp);
+ qio_channel_socket_connect_sync(s->sioc, saddr, errp);
if (*errp) {
- object_unref(OBJECT(sioc));
- return NULL;
+ object_unref(OBJECT(s->sioc));
+ s->sioc = NULL;
+ return -1;
}
- qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+ yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank, bs);
+ qio_channel_set_delay(QIO_CHANNEL(s->sioc), false);
- return sioc;
+ return 0;
}
-/* nbd_client_handshake takes ownership on sioc. On failure it is unref'ed. */
-static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc,
- Error **errp)
+/* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed. */
+static int nbd_client_handshake(BlockDriverState *bs, Error **errp)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
AioContext *aio_context = bdrv_get_aio_context(bs);
int ret;
trace_nbd_client_handshake(s->export);
-
- s->sioc = sioc;
-
- qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
- qio_channel_attach_aio_context(QIO_CHANNEL(sioc), aio_context);
+ qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL);
+ qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context);
s->info.request_sizes = true;
s->info.structured_reply = true;
s->info.base_allocation = true;
s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
s->info.name = g_strdup(s->export ?: "");
- ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(sioc), s->tlscreds,
+ ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds,
s->hostname, &s->ioc, &s->info, errp);
g_free(s->info.x_dirty_bitmap);
g_free(s->info.name);
if (ret < 0) {
- object_unref(OBJECT(sioc));
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+ nbd_yank, bs);
+ object_unref(OBJECT(s->sioc));
s->sioc = NULL;
return ret;
}
@@ -1819,7 +1843,7 @@ static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc,
}
if (!s->ioc) {
- s->ioc = QIO_CHANNEL(sioc);
+ s->ioc = QIO_CHANNEL(s->sioc);
object_ref(OBJECT(s->ioc));
}
@@ -1835,9 +1859,11 @@ static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc,
{
NBDRequest request = { .type = NBD_CMD_DISC };
- nbd_send_request(s->ioc ?: QIO_CHANNEL(sioc), &request);
+ nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request);
- object_unref(OBJECT(sioc));
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
+ nbd_yank, bs);
+ object_unref(OBJECT(s->sioc));
s->sioc = NULL;
return ret;
@@ -2229,7 +2255,6 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
{
int ret;
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- QIOChannelSocket *sioc;
ret = nbd_process_options(bs, options, errp);
if (ret < 0) {
@@ -2240,17 +2265,23 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
qemu_co_mutex_init(&s->send_mutex);
qemu_co_queue_init(&s->free_sema);
+ yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp);
+ if (*errp) {
+ return -EEXIST;
+ }
+
/*
* establish TCP connection, return error if it fails
* TODO: Configurable retry-until-timeout behaviour.
*/
- sioc = nbd_establish_connection(s->saddr, errp);
- if (!sioc) {
+ if (nbd_establish_connection(bs, s->saddr, errp) < 0) {
+ yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
return -ECONNREFUSED;
}
- ret = nbd_client_handshake(bs, sioc, errp);
+ ret = nbd_client_handshake(bs, errp);
if (ret < 0) {
+ yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
nbd_clear_bdrvstate(s);
return ret;
}
@@ -2310,6 +2341,7 @@ static void nbd_close(BlockDriverState *bs)
BDRVNBDState *s = bs->opaque;
nbd_client_close(bs);
+ yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
nbd_clear_bdrvstate(s);
}
--
2.20.1
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
next prev parent reply other threads:[~2020-10-30 16:43 UTC|newest]
Thread overview: 12+ messages / expand[flat|nested] mbox.gz Atom feed top
[not found] <cover.1604075469.git.lukasstraub2@web.de>
2020-10-30 16:41 ` [PATCH v10 1/8] Introduce yank feature Lukas Straub
2020-11-02 6:32 ` Markus Armbruster
2020-11-15 8:28 ` Lukas Straub
2020-10-30 16:41 ` Lukas Straub [this message]
2020-10-30 16:41 ` [PATCH v10 3/8] chardev/char-socket.c: Add " Lukas Straub
2020-10-30 16:41 ` [PATCH v10 4/8] migration: " Lukas Straub
2020-10-30 16:41 ` [PATCH v10 5/8] io/channel-tls.c: make qio_channel_tls_shutdown thread-safe Lukas Straub
2020-10-30 16:41 ` [PATCH v10 6/8] io: Document qmp oob suitability of qio_channel_shutdown and io_shutdown Lukas Straub
2020-10-30 16:41 ` [PATCH v10 7/8] MAINTAINERS: Add myself as maintainer for yank feature Lukas Straub
2020-11-02 6:33 ` Markus Armbruster
2020-11-15 8:29 ` Lukas Straub
2020-10-30 16:41 ` [PATCH v10 8/8] tests/test-char.c: Wait for the chardev to connect in char_socket_client_dupid_test Lukas Straub
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20201030174112.78a52061@luklap \
--to=lukasstraub2@web.de \
--cc=armbru@redhat.com \
--cc=berrange@redhat.com \
--cc=dgilbert@redhat.com \
--cc=kwolf@redhat.com \
--cc=marcandre.lureau@redhat.com \
--cc=mreitz@redhat.com \
--cc=pbonzini@redhat.com \
--cc=qemu-block@nongnu.org \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.