* [Qemu-devel] [PATCH 01/18] Make QEMUFile buf expandable, and introduce qemu_realloc_buffer() and qemu_clear_buffer().
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState Yoshiaki Tamura
` (16 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Currently buf size is fixed at 32KB. It would be useful if it could
be flexible.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
hw/hw.h | 2 ++
savevm.c | 20 +++++++++++++++++++-
2 files changed, 21 insertions(+), 1 deletions(-)
diff --git a/hw/hw.h b/hw/hw.h
index 5e24329..a168a37 100644
--- a/hw/hw.h
+++ b/hw/hw.h
@@ -58,6 +58,8 @@ void qemu_fflush(QEMUFile *f);
int qemu_fclose(QEMUFile *f);
void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
void qemu_put_byte(QEMUFile *f, int v);
+void *qemu_realloc_buffer(QEMUFile *f, int size);
+void qemu_clear_buffer(QEMUFile *f);
static inline void qemu_put_ubyte(QEMUFile *f, unsigned int v)
{
diff --git a/savevm.c b/savevm.c
index 6d83b0f..6c4c72b 100644
--- a/savevm.c
+++ b/savevm.c
@@ -171,7 +171,8 @@ struct QEMUFile {
when reading */
int buf_index;
int buf_size; /* 0 when writing */
- uint8_t buf[IO_BUF_SIZE];
+ int buf_max_size;
+ uint8_t *buf;
int has_error;
};
@@ -422,6 +423,9 @@ QEMUFile *qemu_fopen_ops(void *opaque, QEMUFilePutBufferFunc *put_buffer,
f->get_rate_limit = get_rate_limit;
f->is_write = 0;
+ f->buf_max_size = IO_BUF_SIZE;
+ f->buf = qemu_malloc(sizeof(uint8_t) * f->buf_max_size);
+
return f;
}
@@ -452,6 +456,19 @@ void qemu_fflush(QEMUFile *f)
}
}
+void *qemu_realloc_buffer(QEMUFile *f, int size)
+{
+ f->buf_max_size = size;
+ f->buf = qemu_realloc(f->buf, f->buf_max_size);
+
+ return f->buf;
+}
+
+void qemu_clear_buffer(QEMUFile *f)
+{
+ f->buf_size = f->buf_index = f->buf_offset = 0;
+}
+
static void qemu_fill_buffer(QEMUFile *f)
{
int len;
@@ -477,6 +494,7 @@ int qemu_fclose(QEMUFile *f)
qemu_fflush(f);
if (f->close)
ret = f->close(f->opaque);
+ qemu_free(f->buf);
qemu_free(f);
return ret;
}
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 01/18] Make QEMUFile buf expandable, and introduce qemu_realloc_buffer() and qemu_clear_buffer() Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:54 ` Anthony Liguori
2011-02-10 9:30 ` [Qemu-devel] [PATCH 03/18] Introduce skip_header parameter to qemu_loadvm_state() Yoshiaki Tamura
` (15 subsequent siblings)
17 siblings, 1 reply; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Currently FdMigrationState doesn't support read(), and this patch
introduces it to get response from the other side.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
migration-tcp.c | 15 +++++++++++++++
migration.c | 13 +++++++++++++
migration.h | 3 +++
3 files changed, 31 insertions(+), 0 deletions(-)
diff --git a/migration-tcp.c b/migration-tcp.c
index b55f419..55777c8 100644
--- a/migration-tcp.c
+++ b/migration-tcp.c
@@ -39,6 +39,20 @@ static int socket_write(FdMigrationState *s, const void * buf, size_t size)
return send(s->fd, buf, size, 0);
}
+static int socket_read(FdMigrationState *s, const void * buf, size_t size)
+{
+ ssize_t len;
+
+ do {
+ len = recv(s->fd, (void *)buf, size, 0);
+ } while (len == -1 && socket_error() == EINTR);
+ if (len == -1) {
+ len = -socket_error();
+ }
+
+ return len;
+}
+
static int tcp_close(FdMigrationState *s)
{
DPRINTF("tcp_close\n");
@@ -94,6 +108,7 @@ MigrationState *tcp_start_outgoing_migration(Monitor *mon,
s->get_error = socket_errno;
s->write = socket_write;
+ s->read = socket_read;
s->close = tcp_close;
s->mig_state.cancel = migrate_fd_cancel;
s->mig_state.get_status = migrate_fd_get_status;
diff --git a/migration.c b/migration.c
index 3612572..f0df5fc 100644
--- a/migration.c
+++ b/migration.c
@@ -340,6 +340,19 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
return ret;
}
+int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t size)
+{
+ FdMigrationState *s = opaque;
+ int ret;
+
+ ret = s->read(s, data, size);
+ if (ret == -1) {
+ ret = -(s->get_error(s));
+ }
+
+ return ret;
+}
+
void migrate_fd_connect(FdMigrationState *s)
{
int ret;
diff --git a/migration.h b/migration.h
index 2170792..88a6987 100644
--- a/migration.h
+++ b/migration.h
@@ -48,6 +48,7 @@ struct FdMigrationState
int (*get_error)(struct FdMigrationState*);
int (*close)(struct FdMigrationState*);
int (*write)(struct FdMigrationState*, const void *, size_t);
+ int (*read)(struct FdMigrationState *, const void *, size_t);
void *opaque;
};
@@ -116,6 +117,8 @@ void migrate_fd_put_notify(void *opaque);
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
+int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t size);
+
void migrate_fd_connect(FdMigrationState *s);
void migrate_fd_put_ready(void *opaque);
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState.
2011-02-10 9:30 ` [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState Yoshiaki Tamura
@ 2011-02-10 9:54 ` Anthony Liguori
2011-02-10 10:00 ` Yoshiaki Tamura
2011-02-10 10:18 ` Daniel P. Berrange
0 siblings, 2 replies; 35+ messages in thread
From: Anthony Liguori @ 2011-02-10 9:54 UTC (permalink / raw)
To: Yoshiaki Tamura
Cc: kwolf, aliguori, dlaor, ananth, kvm, mst, mtosatti, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
On 02/10/2011 10:30 AM, Yoshiaki Tamura wrote:
> Currently FdMigrationState doesn't support read(), and this patch
> introduces it to get response from the other side.
>
> Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>
>
Migration is unidirectional. Changing this is fundamental and not
something to be done lightly.
I thought we previously discussed using a protocol wrapper around the
existing migration protocol?
Regards,
Anthony Liguori
> ---
> migration-tcp.c | 15 +++++++++++++++
> migration.c | 13 +++++++++++++
> migration.h | 3 +++
> 3 files changed, 31 insertions(+), 0 deletions(-)
>
> diff --git a/migration-tcp.c b/migration-tcp.c
> index b55f419..55777c8 100644
> --- a/migration-tcp.c
> +++ b/migration-tcp.c
> @@ -39,6 +39,20 @@ static int socket_write(FdMigrationState *s, const void * buf, size_t size)
> return send(s->fd, buf, size, 0);
> }
>
> +static int socket_read(FdMigrationState *s, const void * buf, size_t size)
> +{
> + ssize_t len;
> +
> + do {
> + len = recv(s->fd, (void *)buf, size, 0);
> + } while (len == -1&& socket_error() == EINTR);
> + if (len == -1) {
> + len = -socket_error();
> + }
> +
> + return len;
> +}
> +
> static int tcp_close(FdMigrationState *s)
> {
> DPRINTF("tcp_close\n");
> @@ -94,6 +108,7 @@ MigrationState *tcp_start_outgoing_migration(Monitor *mon,
>
> s->get_error = socket_errno;
> s->write = socket_write;
> + s->read = socket_read;
> s->close = tcp_close;
> s->mig_state.cancel = migrate_fd_cancel;
> s->mig_state.get_status = migrate_fd_get_status;
> diff --git a/migration.c b/migration.c
> index 3612572..f0df5fc 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -340,6 +340,19 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
> return ret;
> }
>
> +int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t size)
> +{
> + FdMigrationState *s = opaque;
> + int ret;
> +
> + ret = s->read(s, data, size);
> + if (ret == -1) {
> + ret = -(s->get_error(s));
> + }
> +
> + return ret;
> +}
> +
> void migrate_fd_connect(FdMigrationState *s)
> {
> int ret;
> diff --git a/migration.h b/migration.h
> index 2170792..88a6987 100644
> --- a/migration.h
> +++ b/migration.h
> @@ -48,6 +48,7 @@ struct FdMigrationState
> int (*get_error)(struct FdMigrationState*);
> int (*close)(struct FdMigrationState*);
> int (*write)(struct FdMigrationState*, const void *, size_t);
> + int (*read)(struct FdMigrationState *, const void *, size_t);
> void *opaque;
> };
>
> @@ -116,6 +117,8 @@ void migrate_fd_put_notify(void *opaque);
>
> ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
>
> +int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t size);
> +
> void migrate_fd_connect(FdMigrationState *s);
>
> void migrate_fd_put_ready(void *opaque);
>
^ permalink raw reply [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState.
2011-02-10 9:54 ` Anthony Liguori
@ 2011-02-10 10:00 ` Yoshiaki Tamura
2011-02-10 10:18 ` Daniel P. Berrange
1 sibling, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 10:00 UTC (permalink / raw)
To: Anthony Liguori
Cc: kwolf, aliguori, dlaor, ananth, kvm, mst, mtosatti, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
2011/2/10 Anthony Liguori <anthony@codemonkey.ws>:
> On 02/10/2011 10:30 AM, Yoshiaki Tamura wrote:
>>
>> Currently FdMigrationState doesn't support read(), and this patch
>> introduces it to get response from the other side.
>>
>> Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>
>>
>
> Migration is unidirectional. Changing this is fundamental and not something
> to be done lightly.
>
> I thought we previously discussed using a protocol wrapper around the
> existing migration protocol?
AFAIR, I don't think we had that discussion before. I applied
comments from Stefan though. If I missed the discussion, could
you please give me the link?
Thanks,
Yoshi
>
> Regards,
>
> Anthony Liguori
>
>> ---
>> migration-tcp.c | 15 +++++++++++++++
>> migration.c | 13 +++++++++++++
>> migration.h | 3 +++
>> 3 files changed, 31 insertions(+), 0 deletions(-)
>>
>> diff --git a/migration-tcp.c b/migration-tcp.c
>> index b55f419..55777c8 100644
>> --- a/migration-tcp.c
>> +++ b/migration-tcp.c
>> @@ -39,6 +39,20 @@ static int socket_write(FdMigrationState *s, const void
>> * buf, size_t size)
>> return send(s->fd, buf, size, 0);
>> }
>>
>> +static int socket_read(FdMigrationState *s, const void * buf, size_t
>> size)
>> +{
>> + ssize_t len;
>> +
>> + do {
>> + len = recv(s->fd, (void *)buf, size, 0);
>> + } while (len == -1&& socket_error() == EINTR);
>> + if (len == -1) {
>> + len = -socket_error();
>> + }
>> +
>> + return len;
>> +}
>> +
>> static int tcp_close(FdMigrationState *s)
>> {
>> DPRINTF("tcp_close\n");
>> @@ -94,6 +108,7 @@ MigrationState *tcp_start_outgoing_migration(Monitor
>> *mon,
>>
>> s->get_error = socket_errno;
>> s->write = socket_write;
>> + s->read = socket_read;
>> s->close = tcp_close;
>> s->mig_state.cancel = migrate_fd_cancel;
>> s->mig_state.get_status = migrate_fd_get_status;
>> diff --git a/migration.c b/migration.c
>> index 3612572..f0df5fc 100644
>> --- a/migration.c
>> +++ b/migration.c
>> @@ -340,6 +340,19 @@ ssize_t migrate_fd_put_buffer(void *opaque, const
>> void *data, size_t size)
>> return ret;
>> }
>>
>> +int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos,
>> size_t size)
>> +{
>> + FdMigrationState *s = opaque;
>> + int ret;
>> +
>> + ret = s->read(s, data, size);
>> + if (ret == -1) {
>> + ret = -(s->get_error(s));
>> + }
>> +
>> + return ret;
>> +}
>> +
>> void migrate_fd_connect(FdMigrationState *s)
>> {
>> int ret;
>> diff --git a/migration.h b/migration.h
>> index 2170792..88a6987 100644
>> --- a/migration.h
>> +++ b/migration.h
>> @@ -48,6 +48,7 @@ struct FdMigrationState
>> int (*get_error)(struct FdMigrationState*);
>> int (*close)(struct FdMigrationState*);
>> int (*write)(struct FdMigrationState*, const void *, size_t);
>> + int (*read)(struct FdMigrationState *, const void *, size_t);
>> void *opaque;
>> };
>>
>> @@ -116,6 +117,8 @@ void migrate_fd_put_notify(void *opaque);
>>
>> ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t
>> size);
>>
>> +int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos,
>> size_t size);
>> +
>> void migrate_fd_connect(FdMigrationState *s);
>>
>> void migrate_fd_put_ready(void *opaque);
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
^ permalink raw reply [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState.
2011-02-10 9:54 ` Anthony Liguori
2011-02-10 10:00 ` Yoshiaki Tamura
@ 2011-02-10 10:18 ` Daniel P. Berrange
2011-02-10 10:23 ` Yoshiaki Tamura
1 sibling, 1 reply; 35+ messages in thread
From: Daniel P. Berrange @ 2011-02-10 10:18 UTC (permalink / raw)
To: Anthony Liguori
Cc: kwolf, aliguori, mtosatti, ananth, kvm, mst, dlaor,
Yoshiaki Tamura, qemu-devel, blauwirbel, ohmura.kei, avi, vatsa,
pbonzini, psuriset, stefanha
On Thu, Feb 10, 2011 at 10:54:01AM +0100, Anthony Liguori wrote:
> On 02/10/2011 10:30 AM, Yoshiaki Tamura wrote:
> >Currently FdMigrationState doesn't support read(), and this patch
> >introduces it to get response from the other side.
> >
> >Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>
>
> Migration is unidirectional. Changing this is fundamental and not
> something to be done lightly.
Making it bi-directional might break libvirt's save/restore
to file support which uses migration, passing a unidirectional
FD for the file. It could also break libvirt's secure tunnelled
migration support which is currently only expecting to have
data sent in one direction on the socket.
Daniel
--
|: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org -o- http://virt-manager.org :|
|: http://autobuild.org -o- http://search.cpan.org/~danberr/ :|
|: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|
^ permalink raw reply [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState.
2011-02-10 10:18 ` Daniel P. Berrange
@ 2011-02-10 10:23 ` Yoshiaki Tamura
2011-02-10 10:44 ` Daniel P. Berrange
0 siblings, 1 reply; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 10:23 UTC (permalink / raw)
To: Daniel P. Berrange
Cc: kwolf, aliguori, mtosatti, ananth, kvm, mst, dlaor, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
2011/2/10 Daniel P. Berrange <berrange@redhat.com>:
> On Thu, Feb 10, 2011 at 10:54:01AM +0100, Anthony Liguori wrote:
>> On 02/10/2011 10:30 AM, Yoshiaki Tamura wrote:
>> >Currently FdMigrationState doesn't support read(), and this patch
>> >introduces it to get response from the other side.
>> >
>> >Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>
>>
>> Migration is unidirectional. Changing this is fundamental and not
>> something to be done lightly.
>
> Making it bi-directional might break libvirt's save/restore
> to file support which uses migration, passing a unidirectional
> FD for the file. It could also break libvirt's secure tunnelled
> migration support which is currently only expecting to have
> data sent in one direction on the socket.
Hi Daniel,
IIUC, this patch isn't something to make existing live migration
bi-directional. Just opens up a way for Kemari to use it. Do
you think it's dangerous for libvirt still?
Thanks,
Yoshi
>
> Daniel
> --
> |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :|
> |: http://libvirt.org -o- http://virt-manager.org :|
> |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :|
> |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
^ permalink raw reply [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState.
2011-02-10 10:23 ` Yoshiaki Tamura
@ 2011-02-10 10:44 ` Daniel P. Berrange
2011-02-10 10:51 ` Yoshiaki Tamura
0 siblings, 1 reply; 35+ messages in thread
From: Daniel P. Berrange @ 2011-02-10 10:44 UTC (permalink / raw)
To: Yoshiaki Tamura
Cc: kwolf, aliguori, mtosatti, ananth, kvm, mst, dlaor, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
On Thu, Feb 10, 2011 at 07:23:33PM +0900, Yoshiaki Tamura wrote:
> 2011/2/10 Daniel P. Berrange <berrange@redhat.com>:
> > On Thu, Feb 10, 2011 at 10:54:01AM +0100, Anthony Liguori wrote:
> >> On 02/10/2011 10:30 AM, Yoshiaki Tamura wrote:
> >> >Currently FdMigrationState doesn't support read(), and this patch
> >> >introduces it to get response from the other side.
> >> >
> >> >Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>
> >>
> >> Migration is unidirectional. Changing this is fundamental and not
> >> something to be done lightly.
> >
> > Making it bi-directional might break libvirt's save/restore
> > to file support which uses migration, passing a unidirectional
> > FD for the file. It could also break libvirt's secure tunnelled
> > migration support which is currently only expecting to have
> > data sent in one direction on the socket.
>
> Hi Daniel,
>
> IIUC, this patch isn't something to make existing live migration
> bi-directional. Just opens up a way for Kemari to use it. Do
> you think it's dangerous for libvirt still?
The key is for it to be a no-op for any usage of the existing
'migrate' command. I had thought this was wiring up read into
the event loop too, so it would be poll()ing for reads, but
after re-reading I see this isn't the case here.
Regards,
Daniel
--
|: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org -o- http://virt-manager.org :|
|: http://autobuild.org -o- http://search.cpan.org/~danberr/ :|
|: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|
^ permalink raw reply [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState.
2011-02-10 10:44 ` Daniel P. Berrange
@ 2011-02-10 10:51 ` Yoshiaki Tamura
0 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 10:51 UTC (permalink / raw)
To: Daniel P. Berrange
Cc: kwolf, aliguori, mtosatti, ananth, kvm, mst, dlaor, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
2011/2/10 Daniel P. Berrange <berrange@redhat.com>:
> On Thu, Feb 10, 2011 at 07:23:33PM +0900, Yoshiaki Tamura wrote:
>> 2011/2/10 Daniel P. Berrange <berrange@redhat.com>:
>> > On Thu, Feb 10, 2011 at 10:54:01AM +0100, Anthony Liguori wrote:
>> >> On 02/10/2011 10:30 AM, Yoshiaki Tamura wrote:
>> >> >Currently FdMigrationState doesn't support read(), and this patch
>> >> >introduces it to get response from the other side.
>> >> >
>> >> >Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>
>> >>
>> >> Migration is unidirectional. Changing this is fundamental and not
>> >> something to be done lightly.
>> >
>> > Making it bi-directional might break libvirt's save/restore
>> > to file support which uses migration, passing a unidirectional
>> > FD for the file. It could also break libvirt's secure tunnelled
>> > migration support which is currently only expecting to have
>> > data sent in one direction on the socket.
>>
>> Hi Daniel,
>>
>> IIUC, this patch isn't something to make existing live migration
>> bi-directional. Just opens up a way for Kemari to use it. Do
>> you think it's dangerous for libvirt still?
>
> The key is for it to be a no-op for any usage of the existing
> 'migrate' command. I had thought this was wiring up read into
> the event loop too, so it would be poll()ing for reads, but
> after re-reading I see this isn't the case here.
It's a no-op for existing migration related code. Anthony, did
you have the same concern?
Yoshi
>
> Regards,
> Daniel
> --
> |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :|
> |: http://libvirt.org -o- http://virt-manager.org :|
> |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :|
> |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
^ permalink raw reply [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 03/18] Introduce skip_header parameter to qemu_loadvm_state().
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 01/18] Make QEMUFile buf expandable, and introduce qemu_realloc_buffer() and qemu_clear_buffer() Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 02/18] Introduce read() to FdMigrationState Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 04/18] qemu-char: export socket_set_nodelay() Yoshiaki Tamura
` (14 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Introduce skip_header parameter to qemu_loadvm_state() so that it can
be called iteratively without reading the header.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
migration.c | 2 +-
savevm.c | 24 +++++++++++++-----------
sysemu.h | 2 +-
3 files changed, 15 insertions(+), 13 deletions(-)
diff --git a/migration.c b/migration.c
index f0df5fc..dd3bf94 100644
--- a/migration.c
+++ b/migration.c
@@ -63,7 +63,7 @@ int qemu_start_incoming_migration(const char *uri)
void process_incoming_migration(QEMUFile *f)
{
- if (qemu_loadvm_state(f) < 0) {
+ if (qemu_loadvm_state(f, 0) < 0) {
fprintf(stderr, "load of migration failed\n");
exit(0);
}
diff --git a/savevm.c b/savevm.c
index 6c4c72b..58e48e3 100644
--- a/savevm.c
+++ b/savevm.c
@@ -1716,7 +1716,7 @@ typedef struct LoadStateEntry {
int version_id;
} LoadStateEntry;
-int qemu_loadvm_state(QEMUFile *f)
+int qemu_loadvm_state(QEMUFile *f, int skip_header)
{
QLIST_HEAD(, LoadStateEntry) loadvm_handlers =
QLIST_HEAD_INITIALIZER(loadvm_handlers);
@@ -1729,17 +1729,19 @@ int qemu_loadvm_state(QEMUFile *f)
return -EINVAL;
}
- v = qemu_get_be32(f);
- if (v != QEMU_VM_FILE_MAGIC)
- return -EINVAL;
+ if (!skip_header) {
+ v = qemu_get_be32(f);
+ if (v != QEMU_VM_FILE_MAGIC)
+ return -EINVAL;
- v = qemu_get_be32(f);
- if (v == QEMU_VM_FILE_VERSION_COMPAT) {
- fprintf(stderr, "SaveVM v2 format is obsolete and don't work anymore\n");
- return -ENOTSUP;
+ v = qemu_get_be32(f);
+ if (v == QEMU_VM_FILE_VERSION_COMPAT) {
+ fprintf(stderr, "SaveVM v2 format is obsolete and don't work anymore\n");
+ return -ENOTSUP;
+ }
+ if (v != QEMU_VM_FILE_VERSION)
+ return -ENOTSUP;
}
- if (v != QEMU_VM_FILE_VERSION)
- return -ENOTSUP;
while ((section_type = qemu_get_byte(f)) != QEMU_VM_EOF) {
uint32_t instance_id, version_id, section_id;
@@ -2062,7 +2064,7 @@ int load_vmstate(const char *name)
return -EINVAL;
}
- ret = qemu_loadvm_state(f);
+ ret = qemu_loadvm_state(f, 0);
qemu_fclose(f);
if (ret < 0) {
diff --git a/sysemu.h b/sysemu.h
index 23ae17e..c86b4e8 100644
--- a/sysemu.h
+++ b/sysemu.h
@@ -81,7 +81,7 @@ int qemu_savevm_state_begin(Monitor *mon, QEMUFile *f, int blk_enable,
int qemu_savevm_state_iterate(Monitor *mon, QEMUFile *f);
int qemu_savevm_state_complete(Monitor *mon, QEMUFile *f);
void qemu_savevm_state_cancel(Monitor *mon, QEMUFile *f);
-int qemu_loadvm_state(QEMUFile *f);
+int qemu_loadvm_state(QEMUFile *f, int skip_header);
/* SLIRP */
void do_info_slirp(Monitor *mon);
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 04/18] qemu-char: export socket_set_nodelay().
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (2 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 03/18] Introduce skip_header parameter to qemu_loadvm_state() Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 05/18] vl.c: add deleted flag for deleting the handler Yoshiaki Tamura
` (13 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
qemu-char.c | 2 +-
qemu_socket.h | 1 +
2 files changed, 2 insertions(+), 1 deletions(-)
diff --git a/qemu-char.c b/qemu-char.c
index ee4f4ca..7286aeb 100644
--- a/qemu-char.c
+++ b/qemu-char.c
@@ -2111,7 +2111,7 @@ static void tcp_chr_telnet_init(int fd)
send(fd, (char *)buf, 3, 0);
}
-static void socket_set_nodelay(int fd)
+void socket_set_nodelay(int fd)
{
int val = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, sizeof(val));
diff --git a/qemu_socket.h b/qemu_socket.h
index 897a8ae..b7f8465 100644
--- a/qemu_socket.h
+++ b/qemu_socket.h
@@ -36,6 +36,7 @@ int inet_aton(const char *cp, struct in_addr *ia);
int qemu_socket(int domain, int type, int protocol);
int qemu_accept(int s, struct sockaddr *addr, socklen_t *addrlen);
void socket_set_nonblock(int fd);
+void socket_set_nodelay(int fd);
int send_all(int fd, const void *buf, int len1);
/* New, ipv6-ready socket helper functions, see qemu-sockets.c */
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 05/18] vl.c: add deleted flag for deleting the handler.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (3 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 04/18] qemu-char: export socket_set_nodelay() Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 06/18] virtio: decrement last_avail_idx with inuse before saving Yoshiaki Tamura
` (12 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Make deleting handlers robust against deletion of any elements in a
handler by using a deleted flag like in file descriptors.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
vl.c | 13 +++++++++----
1 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/vl.c b/vl.c
index ed2cdfa..00155fb 100644
--- a/vl.c
+++ b/vl.c
@@ -1158,6 +1158,7 @@ static void nographic_update(void *opaque)
struct vm_change_state_entry {
VMChangeStateHandler *cb;
void *opaque;
+ int deleted;
QLIST_ENTRY (vm_change_state_entry) entries;
};
@@ -1178,8 +1179,7 @@ VMChangeStateEntry *qemu_add_vm_change_state_handler(VMChangeStateHandler *cb,
void qemu_del_vm_change_state_handler(VMChangeStateEntry *e)
{
- QLIST_REMOVE (e, entries);
- qemu_free (e);
+ e->deleted = 1;
}
void vm_state_notify(int running, int reason)
@@ -1188,8 +1188,13 @@ void vm_state_notify(int running, int reason)
trace_vm_state_notify(running, reason);
- for (e = vm_change_state_head.lh_first; e; e = e->entries.le_next) {
- e->cb(e->opaque, running, reason);
+ QLIST_FOREACH(e, &vm_change_state_head, entries) {
+ if (e->deleted) {
+ QLIST_REMOVE(e, entries);
+ qemu_free(e);
+ } else {
+ e->cb(e->opaque, running, reason);
+ }
}
}
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 06/18] virtio: decrement last_avail_idx with inuse before saving.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (4 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 05/18] vl.c: add deleted flag for deleting the handler Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode Yoshiaki Tamura
` (11 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
For regular migration inuse == 0 always as requests are flushed before
save. However, event-tap log when enabled introduces an extra queue
for requests which is not being flushed, thus the last inuse requests
are left in the event-tap queue. Move the last_avail_idx value sent
to the remote back to make it repeat the last inuse requests.
Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
hw/virtio.c | 10 +++++++++-
1 files changed, 9 insertions(+), 1 deletions(-)
diff --git a/hw/virtio.c b/hw/virtio.c
index 31bd9e3..f05d1b6 100644
--- a/hw/virtio.c
+++ b/hw/virtio.c
@@ -673,12 +673,20 @@ void virtio_save(VirtIODevice *vdev, QEMUFile *f)
qemu_put_be32(f, i);
for (i = 0; i < VIRTIO_PCI_QUEUE_MAX; i++) {
+ /* For regular migration inuse == 0 always as
+ * requests are flushed before save. However,
+ * event-tap log when enabled introduces an extra
+ * queue for requests which is not being flushed,
+ * thus the last inuse requests are left in the event-tap queue.
+ * Move the last_avail_idx value sent to the remote back
+ * to make it repeat the last inuse requests. */
+ uint16_t last_avail = vdev->vq[i].last_avail_idx - vdev->vq[i].inuse;
if (vdev->vq[i].vring.num == 0)
break;
qemu_put_be32(f, vdev->vq[i].vring.num);
qemu_put_be64(f, vdev->vq[i].pa);
- qemu_put_be16s(f, &vdev->vq[i].last_avail_idx);
+ qemu_put_be16s(f, &last_avail);
if (vdev->binding->save_queue)
vdev->binding->save_queue(vdev->binding_opaque, i, f);
}
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (5 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 06/18] virtio: decrement last_avail_idx with inuse before saving Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-21 4:46 ` [Qemu-devel] " ya su
2011-02-10 9:30 ` [Qemu-devel] [PATCH 08/18] savevm: introduce util functions to control ft_trans_file from savevm layer Yoshiaki Tamura
` (10 subsequent siblings)
17 siblings, 1 reply; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
This code implements VM transaction protocol. Like buffered_file, it
sits between savevm and migration layer. With this architecture, VM
transaction protocol is implemented mostly independent from other
existing code.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
---
Makefile.objs | 1 +
ft_trans_file.c | 624 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
ft_trans_file.h | 72 +++++++
migration.c | 3 +
trace-events | 15 ++
5 files changed, 715 insertions(+), 0 deletions(-)
create mode 100644 ft_trans_file.c
create mode 100644 ft_trans_file.h
diff --git a/Makefile.objs b/Makefile.objs
index 353b1a8..04148b5 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o
common-obj-y += qdev.o qdev-properties.o
common-obj-y += block-migration.o
common-obj-y += pflib.o
+common-obj-y += ft_trans_file.o
common-obj-$(CONFIG_BRLAPI) += baum.o
common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o
diff --git a/ft_trans_file.c b/ft_trans_file.c
new file mode 100644
index 0000000..2b42b95
--- /dev/null
+++ b/ft_trans_file.c
@@ -0,0 +1,624 @@
+/*
+ * Fault tolerant VM transaction QEMUFile
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ * This source code is based on buffered_file.c.
+ * Copyright IBM, Corp. 2008
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ */
+
+#include "qemu-common.h"
+#include "qemu-error.h"
+#include "hw/hw.h"
+#include "qemu-timer.h"
+#include "sysemu.h"
+#include "qemu-char.h"
+#include "trace.h"
+#include "ft_trans_file.h"
+
+typedef struct FtTransHdr
+{
+ uint16_t cmd;
+ uint16_t id;
+ uint32_t seq;
+ uint32_t payload_len;
+} FtTransHdr;
+
+typedef struct QEMUFileFtTrans
+{
+ FtTransPutBufferFunc *put_buffer;
+ FtTransGetBufferFunc *get_buffer;
+ FtTransPutReadyFunc *put_ready;
+ FtTransGetReadyFunc *get_ready;
+ FtTransWaitForUnfreezeFunc *wait_for_unfreeze;
+ FtTransCloseFunc *close;
+ void *opaque;
+ QEMUFile *file;
+
+ enum QEMU_VM_TRANSACTION_STATE state;
+ uint32_t seq;
+ uint16_t id;
+
+ int has_error;
+
+ bool freeze_output;
+ bool freeze_input;
+ bool rate_limit;
+ bool is_sender;
+ bool is_payload;
+
+ uint8_t *buf;
+ size_t buf_max_size;
+ size_t put_offset;
+ size_t get_offset;
+
+ FtTransHdr header;
+ size_t header_offset;
+} QEMUFileFtTrans;
+
+#define IO_BUF_SIZE 32768
+
+static void ft_trans_append(QEMUFileFtTrans *s,
+ const uint8_t *buf, size_t size)
+{
+ if (size > (s->buf_max_size - s->put_offset)) {
+ trace_ft_trans_realloc(s->buf_max_size, size + 1024);
+ s->buf_max_size += size + 1024;
+ s->buf = qemu_realloc(s->buf, s->buf_max_size);
+ }
+
+ trace_ft_trans_append(size);
+ memcpy(s->buf + s->put_offset, buf, size);
+ s->put_offset += size;
+}
+
+static void ft_trans_flush(QEMUFileFtTrans *s)
+{
+ size_t offset = 0;
+
+ if (s->has_error) {
+ error_report("flush when error %d, bailing", s->has_error);
+ return;
+ }
+
+ while (offset < s->put_offset) {
+ ssize_t ret;
+
+ ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - offset);
+ if (ret == -EAGAIN) {
+ break;
+ }
+
+ if (ret <= 0) {
+ error_report("error flushing data, %s", strerror(errno));
+ s->has_error = FT_TRANS_ERR_FLUSH;
+ break;
+ } else {
+ offset += ret;
+ }
+ }
+
+ trace_ft_trans_flush(offset, s->put_offset);
+ memmove(s->buf, s->buf + offset, s->put_offset - offset);
+ s->put_offset -= offset;
+ s->freeze_output = !!s->put_offset;
+}
+
+static ssize_t ft_trans_put(void *opaque, void *buf, int size)
+{
+ QEMUFileFtTrans *s = opaque;
+ size_t offset = 0;
+ ssize_t len;
+
+ /* flush buffered data before putting next */
+ if (s->put_offset) {
+ ft_trans_flush(s);
+ }
+
+ while (!s->freeze_output && offset < size) {
+ len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset);
+
+ if (len == -EAGAIN) {
+ trace_ft_trans_freeze_output();
+ s->freeze_output = 1;
+ break;
+ }
+
+ if (len <= 0) {
+ error_report("putting data failed, %s", strerror(errno));
+ s->has_error = 1;
+ offset = -EINVAL;
+ break;
+ }
+
+ offset += len;
+ }
+
+ if (s->freeze_output) {
+ ft_trans_append(s, buf + offset, size - offset);
+ offset = size;
+ }
+
+ return offset;
+}
+
+static int ft_trans_send_header(QEMUFileFtTrans *s,
+ enum QEMU_VM_TRANSACTION_STATE state,
+ uint32_t payload_len)
+{
+ int ret;
+ FtTransHdr *hdr = &s->header;
+
+ trace_ft_trans_send_header(state);
+
+ hdr->cmd = s->state = state;
+ hdr->id = s->id;
+ hdr->seq = s->seq;
+ hdr->payload_len = payload_len;
+
+ ret = ft_trans_put(s, hdr, sizeof(*hdr));
+ if (ret < 0) {
+ error_report("send header failed");
+ s->has_error = FT_TRANS_ERR_SEND_HDR;
+ }
+
+ return ret;
+}
+
+static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
+{
+ QEMUFileFtTrans *s = opaque;
+ ssize_t ret;
+
+ trace_ft_trans_put_buffer(size, pos);
+
+ if (s->has_error) {
+ error_report("put_buffer when error %d, bailing", s->has_error);
+ return -EINVAL;
+ }
+
+ /* assuming qemu_file_put_notify() is calling */
+ if (pos == 0 && size == 0) {
+ trace_ft_trans_put_ready();
+ ft_trans_flush(s);
+
+ if (!s->freeze_output) {
+ trace_ft_trans_cb(s->put_ready);
+ ret = s->put_ready();
+ }
+
+ ret = 0;
+ goto out;
+ }
+
+ ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size);
+ if (ret < 0) {
+ goto out;
+ }
+
+ ret = ft_trans_put(s, (uint8_t *)buf, size);
+ if (ret < 0) {
+ error_report("send palyload failed");
+ s->has_error = FT_TRANS_ERR_SEND_PAYLOAD;
+ goto out;
+ }
+
+ s->seq++;
+
+out:
+ return ret;
+}
+
+static int ft_trans_fill_buffer(void *opaque, void *buf, int size)
+{
+ QEMUFileFtTrans *s = opaque;
+ size_t offset = 0;
+ ssize_t len;
+
+ s->freeze_input = 0;
+
+ while (offset < size) {
+ len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
+ 0, size - offset);
+ if (len == -EAGAIN) {
+ trace_ft_trans_freeze_input();
+ s->freeze_input = 1;
+ break;
+ }
+
+ if (len <= 0) {
+ error_report("fill buffer failed, %s", strerror(errno));
+ s->has_error = 1;
+ return -EINVAL;
+ }
+
+ offset += len;
+ }
+
+ return offset;
+}
+
+static int ft_trans_recv_header(QEMUFileFtTrans *s)
+{
+ int ret;
+ char *buf = (char *)&s->header + s->header_offset;
+
+ ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - s->header_offset);
+ if (ret < 0) {
+ error_report("recv header failed");
+ s->has_error = FT_TRANS_ERR_RECV_HDR;
+ goto out;
+ }
+
+ s->header_offset += ret;
+ if (s->header_offset == sizeof(FtTransHdr)) {
+ trace_ft_trans_recv_header(s->header.cmd);
+ s->state = s->header.cmd;
+ s->header_offset = 0;
+
+ if (!s->is_sender) {
+ s->id = s->header.id;
+ s->seq = s->header.seq;
+ }
+ }
+
+out:
+ return ret;
+}
+
+static int ft_trans_recv_payload(QEMUFileFtTrans *s)
+{
+ QEMUFile *f = s->file;
+ int ret = -1;
+
+ /* extend QEMUFile buf if there weren't enough space */
+ if (s->header.payload_len > (s->buf_max_size - s->get_offset)) {
+ s->buf_max_size += (s->header.payload_len -
+ (s->buf_max_size - s->get_offset));
+ s->buf = qemu_realloc_buffer(f, s->buf_max_size);
+ }
+
+ ret = ft_trans_fill_buffer(s, s->buf + s->get_offset,
+ s->header.payload_len);
+ if (ret < 0) {
+ error_report("recv payload failed");
+ s->has_error = FT_TRANS_ERR_RECV_PAYLOAD;
+ goto out;
+ }
+
+ trace_ft_trans_recv_payload(ret, s->header.payload_len, s->get_offset);
+
+ s->header.payload_len -= ret;
+ s->get_offset += ret;
+ s->is_payload = !!s->header.payload_len;
+
+out:
+ return ret;
+}
+
+static int ft_trans_recv(QEMUFileFtTrans *s)
+{
+ int ret;
+
+ /* get payload and return */
+ if (s->is_payload) {
+ ret = ft_trans_recv_payload(s);
+ goto out;
+ }
+
+ ret = ft_trans_recv_header(s);
+ if (ret < 0 || s->freeze_input) {
+ goto out;
+ }
+
+ switch (s->state) {
+ case QEMU_VM_TRANSACTION_BEGIN:
+ /* CONTINUE or COMMIT should come shortly */
+ s->is_payload = 0;
+ break;
+
+ case QEMU_VM_TRANSACTION_CONTINUE:
+ /* get payload */
+ s->is_payload = 1;
+ break;
+
+ case QEMU_VM_TRANSACTION_COMMIT:
+ ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
+ if (ret < 0) {
+ goto out;
+ }
+
+ trace_ft_trans_cb(s->get_ready);
+ ret = s->get_ready(s->opaque);
+ if (ret < 0) {
+ goto out;
+ }
+
+ qemu_clear_buffer(s->file);
+ s->get_offset = 0;
+ s->is_payload = 0;
+
+ break;
+
+ case QEMU_VM_TRANSACTION_ATOMIC:
+ /* not implemented yet */
+ error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d",
+ ret);
+ break;
+
+ case QEMU_VM_TRANSACTION_CANCEL:
+ /* return -EINVAL until migrate cancel on recevier side is supported */
+ ret = -EINVAL;
+ break;
+
+ default:
+ error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret);
+ s->has_error = FT_TRANS_ERR_STATE_INVALID;
+ ret = -EINVAL;
+ }
+
+out:
+ return ret;
+}
+
+static int ft_trans_get_buffer(void *opaque, uint8_t *buf,
+ int64_t pos, int size)
+{
+ QEMUFileFtTrans *s = opaque;
+ int ret;
+
+ if (s->has_error) {
+ error_report("get_buffer when error %d, bailing", s->has_error);
+ return -EINVAL;
+ }
+
+ /* assuming qemu_file_get_notify() is calling */
+ if (pos == 0 && size == 0) {
+ trace_ft_trans_get_ready();
+ s->freeze_input = 0;
+
+ /* sender should be waiting for ACK */
+ if (s->is_sender) {
+ ret = ft_trans_recv_header(s);
+ if (s->freeze_input) {
+ ret = 0;
+ goto out;
+ }
+ if (ret < 0) {
+ error_report("recv ack failed");
+ goto out;
+ }
+
+ if (s->state != QEMU_VM_TRANSACTION_ACK) {
+ error_report("recv invalid state %d", s->state);
+ s->has_error = FT_TRANS_ERR_STATE_INVALID;
+ ret = -EINVAL;
+ goto out;
+ }
+
+ trace_ft_trans_cb(s->get_ready);
+ ret = s->get_ready(s->opaque);
+ if (ret < 0) {
+ goto out;
+ }
+
+ /* proceed trans id */
+ s->id++;
+
+ return 0;
+ }
+
+ /* set QEMUFile buf at beginning */
+ if (!s->buf) {
+ s->buf = buf;
+ }
+
+ ret = ft_trans_recv(s);
+ goto out;
+ }
+
+ ret = s->get_offset;
+
+out:
+ return ret;
+}
+
+static int ft_trans_close(void *opaque)
+{
+ QEMUFileFtTrans *s = opaque;
+ int ret;
+
+ trace_ft_trans_close();
+ ret = s->close(s->opaque);
+ if (s->is_sender) {
+ qemu_free(s->buf);
+ }
+ qemu_free(s);
+
+ return ret;
+}
+
+static int ft_trans_rate_limit(void *opaque)
+{
+ QEMUFileFtTrans *s = opaque;
+
+ if (s->has_error) {
+ return 0;
+ }
+
+ if (s->rate_limit && s->freeze_output) {
+ return 1;
+ }
+
+ return 0;
+}
+
+static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate)
+{
+ QEMUFileFtTrans *s = opaque;
+
+ if (s->has_error) {
+ goto out;
+ }
+
+ s->rate_limit = !!new_rate;
+
+out:
+ return s->rate_limit;
+}
+
+int ft_trans_begin(void *opaque)
+{
+ QEMUFileFtTrans *s = opaque;
+ int ret;
+ s->seq = 0;
+
+ /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */
+ if (!s->is_sender) {
+ if (s->state != QEMU_VM_TRANSACTION_INIT) {
+ error_report("invalid state %d", s->state);
+ s->has_error = FT_TRANS_ERR_STATE_INVALID;
+ ret = -EINVAL;
+ }
+
+ ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
+ goto out;
+ }
+
+ /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */
+ if (s->state == QEMU_VM_TRANSACTION_INIT) {
+retry:
+ ret = ft_trans_recv_header(s);
+ if (s->freeze_input) {
+ goto retry;
+ }
+ if (ret < 0) {
+ error_report("recv ack failed");
+ goto out;
+ }
+
+ if (s->state != QEMU_VM_TRANSACTION_ACK) {
+ error_report("recv invalid state %d", s->state);
+ s->has_error = FT_TRANS_ERR_STATE_INVALID;
+ ret = -EINVAL;
+ goto out;
+ }
+ }
+
+ ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0);
+ if (ret < 0) {
+ goto out;
+ }
+
+ s->state = QEMU_VM_TRANSACTION_CONTINUE;
+
+out:
+ return ret;
+}
+
+int ft_trans_commit(void *opaque)
+{
+ QEMUFileFtTrans *s = opaque;
+ int ret;
+
+ if (!s->is_sender) {
+ ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
+ goto out;
+ }
+
+ /* sender should flush buf before sending COMMIT */
+ qemu_fflush(s->file);
+
+ ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0);
+ if (ret < 0) {
+ goto out;
+ }
+
+ while (!s->has_error && s->put_offset) {
+ ft_trans_flush(s);
+ if (s->freeze_output) {
+ s->wait_for_unfreeze(s);
+ }
+ }
+
+ if (s->has_error) {
+ ret = -EINVAL;
+ goto out;
+ }
+
+ ret = ft_trans_recv_header(s);
+ if (s->freeze_input) {
+ ret = -EAGAIN;
+ goto out;
+ }
+ if (ret < 0) {
+ error_report("recv ack failed");
+ goto out;
+ }
+
+ if (s->state != QEMU_VM_TRANSACTION_ACK) {
+ error_report("recv invalid state %d", s->state);
+ s->has_error = FT_TRANS_ERR_STATE_INVALID;
+ ret = -EINVAL;
+ goto out;
+ }
+
+ s->id++;
+ ret = 0;
+
+out:
+ return ret;
+}
+
+int ft_trans_cancel(void *opaque)
+{
+ QEMUFileFtTrans *s = opaque;
+
+ /* invalid until migrate cancel on recevier side is supported */
+ if (!s->is_sender) {
+ return -EINVAL;
+ }
+
+ return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0);
+}
+
+QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
+ FtTransPutBufferFunc *put_buffer,
+ FtTransGetBufferFunc *get_buffer,
+ FtTransPutReadyFunc *put_ready,
+ FtTransGetReadyFunc *get_ready,
+ FtTransWaitForUnfreezeFunc *wait_for_unfreeze,
+ FtTransCloseFunc *close,
+ bool is_sender)
+{
+ QEMUFileFtTrans *s;
+
+ s = qemu_mallocz(sizeof(*s));
+
+ s->opaque = opaque;
+ s->put_buffer = put_buffer;
+ s->get_buffer = get_buffer;
+ s->put_ready = put_ready;
+ s->get_ready = get_ready;
+ s->wait_for_unfreeze = wait_for_unfreeze;
+ s->close = close;
+ s->is_sender = is_sender;
+ s->id = 0;
+ s->seq = 0;
+ s->rate_limit = 1;
+
+ if (!s->is_sender) {
+ s->buf_max_size = IO_BUF_SIZE;
+ }
+
+ s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer,
+ ft_trans_close, ft_trans_rate_limit,
+ ft_trans_set_rate_limit, NULL);
+
+ return s->file;
+}
diff --git a/ft_trans_file.h b/ft_trans_file.h
new file mode 100644
index 0000000..5ca6b53
--- /dev/null
+++ b/ft_trans_file.h
@@ -0,0 +1,72 @@
+/*
+ * Fault tolerant VM transaction QEMUFile
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ * This source code is based on buffered_file.h.
+ * Copyright IBM, Corp. 2008
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ */
+
+#ifndef QEMU_FT_TRANSACTION_FILE_H
+#define QEMU_FT_TRANSACTION_FILE_H
+
+#include "hw/hw.h"
+
+enum QEMU_VM_TRANSACTION_STATE {
+ QEMU_VM_TRANSACTION_NACK = -1,
+ QEMU_VM_TRANSACTION_INIT,
+ QEMU_VM_TRANSACTION_BEGIN,
+ QEMU_VM_TRANSACTION_CONTINUE,
+ QEMU_VM_TRANSACTION_COMMIT,
+ QEMU_VM_TRANSACTION_CANCEL,
+ QEMU_VM_TRANSACTION_ATOMIC,
+ QEMU_VM_TRANSACTION_ACK,
+};
+
+enum FT_MODE {
+ FT_ERROR = -1,
+ FT_OFF,
+ FT_INIT,
+ FT_TRANSACTION_BEGIN,
+ FT_TRANSACTION_ITER,
+ FT_TRANSACTION_COMMIT,
+ FT_TRANSACTION_ATOMIC,
+ FT_TRANSACTION_RECV,
+};
+extern enum FT_MODE ft_mode;
+
+#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */
+#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */
+#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */
+#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */
+#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */
+#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */
+#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */
+
+typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size);
+typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t pos, size_t size);
+typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt);
+typedef int (FtTransPutReadyFunc)(void);
+typedef int (FtTransGetReadyFunc)(void *opaque);
+typedef void (FtTransWaitForUnfreezeFunc)(void *opaque);
+typedef int (FtTransCloseFunc)(void *opaque);
+
+int ft_trans_begin(void *opaque);
+int ft_trans_commit(void *opaque);
+int ft_trans_cancel(void *opaque);
+
+QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
+ FtTransPutBufferFunc *put_buffer,
+ FtTransGetBufferFunc *get_buffer,
+ FtTransPutReadyFunc *put_ready,
+ FtTransGetReadyFunc *get_ready,
+ FtTransWaitForUnfreezeFunc *wait_for_unfreeze,
+ FtTransCloseFunc *close,
+ bool is_sender);
+
+#endif
diff --git a/migration.c b/migration.c
index dd3bf94..c5e0146 100644
--- a/migration.c
+++ b/migration.c
@@ -15,6 +15,7 @@
#include "migration.h"
#include "monitor.h"
#include "buffered_file.h"
+#include "ft_trans_file.h"
#include "sysemu.h"
#include "block.h"
#include "qemu_socket.h"
@@ -31,6 +32,8 @@
do { } while (0)
#endif
+enum FT_MODE ft_mode = FT_OFF;
+
/* Migration speed throttling */
static int64_t max_throttle = (32 << 20);
diff --git a/trace-events b/trace-events
index e6138ea..50ac840 100644
--- a/trace-events
+++ b/trace-events
@@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice wrottn %lu of requested %zd
disable spice_vmc_read(int bytes, int len) "spice read %lu of requested %zd"
disable spice_vmc_register_interface(void *scd) "spice vmc registered interface %p"
disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered interface %p"
+
+# ft_trans_file.c
+disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing buffer from %zu by %zu"
+disable ft_trans_append(size_t size) "buffering %zu bytes"
+disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu bytes"
+disable ft_trans_send_header(uint16_t cmd) "send header %d"
+disable ft_trans_recv_header(uint16_t cmd) "recv header %d"
+disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes at %"PRId64""
+disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) "recv %d of %d total %d"
+disable ft_trans_close(void) "closing"
+disable ft_trans_freeze_output(void) "backend not ready, freezing output"
+disable ft_trans_freeze_input(void) "backend not ready, freezing input"
+disable ft_trans_put_ready(void) "file is ready to put"
+disable ft_trans_get_ready(void) "file is ready to get"
+disable ft_trans_cb(void *cb) "callback %p"
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] Re: [PATCH 07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode.
2011-02-10 9:30 ` [Qemu-devel] [PATCH 07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode Yoshiaki Tamura
@ 2011-02-21 4:46 ` ya su
2011-02-21 9:42 ` Yoshiaki Tamura
0 siblings, 1 reply; 35+ messages in thread
From: ya su @ 2011-02-21 4:46 UTC (permalink / raw)
To: Yoshiaki Tamura
Cc: kwolf, aliguori, dlaor, ananth, kvm, mst, mtosatti, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
[-- Attachment #1: Type: text/plain, Size: 23415 bytes --]
Yoshiaki:
I have one question about ram_save_live, during migration 3
stage(completation stage), it will call
cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty pages.
at the end of migrate_ft_trans_connect function, it will invoke vm_start(),
at this time, cpu_physical_memory_set_dirty_tracking(1) is not called yet,
so there may have some ram pages not recorded when qemu_savevm_trans_begin
is called. I think you need calll
cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect
function, Am I right?
BR
Green.
2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
> This code implements VM transaction protocol. Like buffered_file, it
> sits between savevm and migration layer. With this architecture, VM
> transaction protocol is implemented mostly independent from other
> existing code.
>
> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
> ---
> Makefile.objs | 1 +
> ft_trans_file.c | 624
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++
> ft_trans_file.h | 72 +++++++
> migration.c | 3 +
> trace-events | 15 ++
> 5 files changed, 715 insertions(+), 0 deletions(-)
> create mode 100644 ft_trans_file.c
> create mode 100644 ft_trans_file.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 353b1a8..04148b5 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o
> common-obj-y += qdev.o qdev-properties.o
> common-obj-y += block-migration.o
> common-obj-y += pflib.o
> +common-obj-y += ft_trans_file.o
>
> common-obj-$(CONFIG_BRLAPI) += baum.o
> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o
> migration-fd.o
> diff --git a/ft_trans_file.c b/ft_trans_file.c
> new file mode 100644
> index 0000000..2b42b95
> --- /dev/null
> +++ b/ft_trans_file.c
> @@ -0,0 +1,624 @@
> +/*
> + * Fault tolerant VM transaction QEMUFile
> + *
> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + *
> + * This source code is based on buffered_file.c.
> + * Copyright IBM, Corp. 2008
> + * Authors:
> + * Anthony Liguori <aliguori@us.ibm.com>
> + */
> +
> +#include "qemu-common.h"
> +#include "qemu-error.h"
> +#include "hw/hw.h"
> +#include "qemu-timer.h"
> +#include "sysemu.h"
> +#include "qemu-char.h"
> +#include "trace.h"
> +#include "ft_trans_file.h"
> +
> +typedef struct FtTransHdr
> +{
> + uint16_t cmd;
> + uint16_t id;
> + uint32_t seq;
> + uint32_t payload_len;
> +} FtTransHdr;
> +
> +typedef struct QEMUFileFtTrans
> +{
> + FtTransPutBufferFunc *put_buffer;
> + FtTransGetBufferFunc *get_buffer;
> + FtTransPutReadyFunc *put_ready;
> + FtTransGetReadyFunc *get_ready;
> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze;
> + FtTransCloseFunc *close;
> + void *opaque;
> + QEMUFile *file;
> +
> + enum QEMU_VM_TRANSACTION_STATE state;
> + uint32_t seq;
> + uint16_t id;
> +
> + int has_error;
> +
> + bool freeze_output;
> + bool freeze_input;
> + bool rate_limit;
> + bool is_sender;
> + bool is_payload;
> +
> + uint8_t *buf;
> + size_t buf_max_size;
> + size_t put_offset;
> + size_t get_offset;
> +
> + FtTransHdr header;
> + size_t header_offset;
> +} QEMUFileFtTrans;
> +
> +#define IO_BUF_SIZE 32768
> +
> +static void ft_trans_append(QEMUFileFtTrans *s,
> + const uint8_t *buf, size_t size)
> +{
> + if (size > (s->buf_max_size - s->put_offset)) {
> + trace_ft_trans_realloc(s->buf_max_size, size + 1024);
> + s->buf_max_size += size + 1024;
> + s->buf = qemu_realloc(s->buf, s->buf_max_size);
> + }
> +
> + trace_ft_trans_append(size);
> + memcpy(s->buf + s->put_offset, buf, size);
> + s->put_offset += size;
> +}
> +
> +static void ft_trans_flush(QEMUFileFtTrans *s)
> +{
> + size_t offset = 0;
> +
> + if (s->has_error) {
> + error_report("flush when error %d, bailing", s->has_error);
> + return;
> + }
> +
> + while (offset < s->put_offset) {
> + ssize_t ret;
> +
> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset -
> offset);
> + if (ret == -EAGAIN) {
> + break;
> + }
> +
> + if (ret <= 0) {
> + error_report("error flushing data, %s", strerror(errno));
> + s->has_error = FT_TRANS_ERR_FLUSH;
> + break;
> + } else {
> + offset += ret;
> + }
> + }
> +
> + trace_ft_trans_flush(offset, s->put_offset);
> + memmove(s->buf, s->buf + offset, s->put_offset - offset);
> + s->put_offset -= offset;
> + s->freeze_output = !!s->put_offset;
> +}
> +
> +static ssize_t ft_trans_put(void *opaque, void *buf, int size)
> +{
> + QEMUFileFtTrans *s = opaque;
> + size_t offset = 0;
> + ssize_t len;
> +
> + /* flush buffered data before putting next */
> + if (s->put_offset) {
> + ft_trans_flush(s);
> + }
> +
> + while (!s->freeze_output && offset < size) {
> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size -
> offset);
> +
> + if (len == -EAGAIN) {
> + trace_ft_trans_freeze_output();
> + s->freeze_output = 1;
> + break;
> + }
> +
> + if (len <= 0) {
> + error_report("putting data failed, %s", strerror(errno));
> + s->has_error = 1;
> + offset = -EINVAL;
> + break;
> + }
> +
> + offset += len;
> + }
> +
> + if (s->freeze_output) {
> + ft_trans_append(s, buf + offset, size - offset);
> + offset = size;
> + }
> +
> + return offset;
> +}
> +
> +static int ft_trans_send_header(QEMUFileFtTrans *s,
> + enum QEMU_VM_TRANSACTION_STATE state,
> + uint32_t payload_len)
> +{
> + int ret;
> + FtTransHdr *hdr = &s->header;
> +
> + trace_ft_trans_send_header(state);
> +
> + hdr->cmd = s->state = state;
> + hdr->id = s->id;
> + hdr->seq = s->seq;
> + hdr->payload_len = payload_len;
> +
> + ret = ft_trans_put(s, hdr, sizeof(*hdr));
> + if (ret < 0) {
> + error_report("send header failed");
> + s->has_error = FT_TRANS_ERR_SEND_HDR;
> + }
> +
> + return ret;
> +}
> +
> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t
> pos, int size)
> +{
> + QEMUFileFtTrans *s = opaque;
> + ssize_t ret;
> +
> + trace_ft_trans_put_buffer(size, pos);
> +
> + if (s->has_error) {
> + error_report("put_buffer when error %d, bailing", s->has_error);
> + return -EINVAL;
> + }
> +
> + /* assuming qemu_file_put_notify() is calling */
> + if (pos == 0 && size == 0) {
> + trace_ft_trans_put_ready();
> + ft_trans_flush(s);
> +
> + if (!s->freeze_output) {
> + trace_ft_trans_cb(s->put_ready);
> + ret = s->put_ready();
> + }
> +
> + ret = 0;
> + goto out;
> + }
> +
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + ret = ft_trans_put(s, (uint8_t *)buf, size);
> + if (ret < 0) {
> + error_report("send palyload failed");
> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD;
> + goto out;
> + }
> +
> + s->seq++;
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size)
> +{
> + QEMUFileFtTrans *s = opaque;
> + size_t offset = 0;
> + ssize_t len;
> +
> + s->freeze_input = 0;
> +
> + while (offset < size) {
> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
> + 0, size - offset);
> + if (len == -EAGAIN) {
> + trace_ft_trans_freeze_input();
> + s->freeze_input = 1;
> + break;
> + }
> +
> + if (len <= 0) {
> + error_report("fill buffer failed, %s", strerror(errno));
> + s->has_error = 1;
> + return -EINVAL;
> + }
> +
> + offset += len;
> + }
> +
> + return offset;
> +}
> +
> +static int ft_trans_recv_header(QEMUFileFtTrans *s)
> +{
> + int ret;
> + char *buf = (char *)&s->header + s->header_offset;
> +
> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) -
> s->header_offset);
> + if (ret < 0) {
> + error_report("recv header failed");
> + s->has_error = FT_TRANS_ERR_RECV_HDR;
> + goto out;
> + }
> +
> + s->header_offset += ret;
> + if (s->header_offset == sizeof(FtTransHdr)) {
> + trace_ft_trans_recv_header(s->header.cmd);
> + s->state = s->header.cmd;
> + s->header_offset = 0;
> +
> + if (!s->is_sender) {
> + s->id = s->header.id;
> + s->seq = s->header.seq;
> + }
> + }
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_recv_payload(QEMUFileFtTrans *s)
> +{
> + QEMUFile *f = s->file;
> + int ret = -1;
> +
> + /* extend QEMUFile buf if there weren't enough space */
> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) {
> + s->buf_max_size += (s->header.payload_len -
> + (s->buf_max_size - s->get_offset));
> + s->buf = qemu_realloc_buffer(f, s->buf_max_size);
> + }
> +
> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset,
> + s->header.payload_len);
> + if (ret < 0) {
> + error_report("recv payload failed");
> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD;
> + goto out;
> + }
> +
> + trace_ft_trans_recv_payload(ret, s->header.payload_len,
> s->get_offset);
> +
> + s->header.payload_len -= ret;
> + s->get_offset += ret;
> + s->is_payload = !!s->header.payload_len;
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_recv(QEMUFileFtTrans *s)
> +{
> + int ret;
> +
> + /* get payload and return */
> + if (s->is_payload) {
> + ret = ft_trans_recv_payload(s);
> + goto out;
> + }
> +
> + ret = ft_trans_recv_header(s);
> + if (ret < 0 || s->freeze_input) {
> + goto out;
> + }
> +
> + switch (s->state) {
> + case QEMU_VM_TRANSACTION_BEGIN:
> + /* CONTINUE or COMMIT should come shortly */
> + s->is_payload = 0;
> + break;
> +
> + case QEMU_VM_TRANSACTION_CONTINUE:
> + /* get payload */
> + s->is_payload = 1;
> + break;
> +
> + case QEMU_VM_TRANSACTION_COMMIT:
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + trace_ft_trans_cb(s->get_ready);
> + ret = s->get_ready(s->opaque);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + qemu_clear_buffer(s->file);
> + s->get_offset = 0;
> + s->is_payload = 0;
> +
> + break;
> +
> + case QEMU_VM_TRANSACTION_ATOMIC:
> + /* not implemented yet */
> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d",
> + ret);
> + break;
> +
> + case QEMU_VM_TRANSACTION_CANCEL:
> + /* return -EINVAL until migrate cancel on recevier side is
> supported */
> + ret = -EINVAL;
> + break;
> +
> + default:
> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + }
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf,
> + int64_t pos, int size)
> +{
> + QEMUFileFtTrans *s = opaque;
> + int ret;
> +
> + if (s->has_error) {
> + error_report("get_buffer when error %d, bailing", s->has_error);
> + return -EINVAL;
> + }
> +
> + /* assuming qemu_file_get_notify() is calling */
> + if (pos == 0 && size == 0) {
> + trace_ft_trans_get_ready();
> + s->freeze_input = 0;
> +
> + /* sender should be waiting for ACK */
> + if (s->is_sender) {
> + ret = ft_trans_recv_header(s);
> + if (s->freeze_input) {
> + ret = 0;
> + goto out;
> + }
> + if (ret < 0) {
> + error_report("recv ack failed");
> + goto out;
> + }
> +
> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> + error_report("recv invalid state %d", s->state);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + goto out;
> + }
> +
> + trace_ft_trans_cb(s->get_ready);
> + ret = s->get_ready(s->opaque);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + /* proceed trans id */
> + s->id++;
> +
> + return 0;
> + }
> +
> + /* set QEMUFile buf at beginning */
> + if (!s->buf) {
> + s->buf = buf;
> + }
> +
> + ret = ft_trans_recv(s);
> + goto out;
> + }
> +
> + ret = s->get_offset;
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_close(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> + int ret;
> +
> + trace_ft_trans_close();
> + ret = s->close(s->opaque);
> + if (s->is_sender) {
> + qemu_free(s->buf);
> + }
> + qemu_free(s);
> +
> + return ret;
> +}
> +
> +static int ft_trans_rate_limit(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> +
> + if (s->has_error) {
> + return 0;
> + }
> +
> + if (s->rate_limit && s->freeze_output) {
> + return 1;
> + }
> +
> + return 0;
> +}
> +
> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate)
> +{
> + QEMUFileFtTrans *s = opaque;
> +
> + if (s->has_error) {
> + goto out;
> + }
> +
> + s->rate_limit = !!new_rate;
> +
> +out:
> + return s->rate_limit;
> +}
> +
> +int ft_trans_begin(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> + int ret;
> + s->seq = 0;
> +
> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */
> + if (!s->is_sender) {
> + if (s->state != QEMU_VM_TRANSACTION_INIT) {
> + error_report("invalid state %d", s->state);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + }
> +
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> + goto out;
> + }
> +
> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */
> + if (s->state == QEMU_VM_TRANSACTION_INIT) {
> +retry:
> + ret = ft_trans_recv_header(s);
> + if (s->freeze_input) {
> + goto retry;
> + }
> + if (ret < 0) {
> + error_report("recv ack failed");
> + goto out;
> + }
> +
> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> + error_report("recv invalid state %d", s->state);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + goto out;
> + }
> + }
> +
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + s->state = QEMU_VM_TRANSACTION_CONTINUE;
> +
> +out:
> + return ret;
> +}
> +
> +int ft_trans_commit(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> + int ret;
> +
> + if (!s->is_sender) {
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> + goto out;
> + }
> +
> + /* sender should flush buf before sending COMMIT */
> + qemu_fflush(s->file);
> +
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + while (!s->has_error && s->put_offset) {
> + ft_trans_flush(s);
> + if (s->freeze_output) {
> + s->wait_for_unfreeze(s);
> + }
> + }
> +
> + if (s->has_error) {
> + ret = -EINVAL;
> + goto out;
> + }
> +
> + ret = ft_trans_recv_header(s);
> + if (s->freeze_input) {
> + ret = -EAGAIN;
> + goto out;
> + }
> + if (ret < 0) {
> + error_report("recv ack failed");
> + goto out;
> + }
> +
> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> + error_report("recv invalid state %d", s->state);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + goto out;
> + }
> +
> + s->id++;
> + ret = 0;
> +
> +out:
> + return ret;
> +}
> +
> +int ft_trans_cancel(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> +
> + /* invalid until migrate cancel on recevier side is supported */
> + if (!s->is_sender) {
> + return -EINVAL;
> + }
> +
> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0);
> +}
> +
> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
> + FtTransPutBufferFunc *put_buffer,
> + FtTransGetBufferFunc *get_buffer,
> + FtTransPutReadyFunc *put_ready,
> + FtTransGetReadyFunc *get_ready,
> + FtTransWaitForUnfreezeFunc
> *wait_for_unfreeze,
> + FtTransCloseFunc *close,
> + bool is_sender)
> +{
> + QEMUFileFtTrans *s;
> +
> + s = qemu_mallocz(sizeof(*s));
> +
> + s->opaque = opaque;
> + s->put_buffer = put_buffer;
> + s->get_buffer = get_buffer;
> + s->put_ready = put_ready;
> + s->get_ready = get_ready;
> + s->wait_for_unfreeze = wait_for_unfreeze;
> + s->close = close;
> + s->is_sender = is_sender;
> + s->id = 0;
> + s->seq = 0;
> + s->rate_limit = 1;
> +
> + if (!s->is_sender) {
> + s->buf_max_size = IO_BUF_SIZE;
> + }
> +
> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer,
> + ft_trans_close, ft_trans_rate_limit,
> + ft_trans_set_rate_limit, NULL);
> +
> + return s->file;
> +}
> diff --git a/ft_trans_file.h b/ft_trans_file.h
> new file mode 100644
> index 0000000..5ca6b53
> --- /dev/null
> +++ b/ft_trans_file.h
> @@ -0,0 +1,72 @@
> +/*
> + * Fault tolerant VM transaction QEMUFile
> + *
> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + *
> + * This source code is based on buffered_file.h.
> + * Copyright IBM, Corp. 2008
> + * Authors:
> + * Anthony Liguori <aliguori@us.ibm.com>
> + */
> +
> +#ifndef QEMU_FT_TRANSACTION_FILE_H
> +#define QEMU_FT_TRANSACTION_FILE_H
> +
> +#include "hw/hw.h"
> +
> +enum QEMU_VM_TRANSACTION_STATE {
> + QEMU_VM_TRANSACTION_NACK = -1,
> + QEMU_VM_TRANSACTION_INIT,
> + QEMU_VM_TRANSACTION_BEGIN,
> + QEMU_VM_TRANSACTION_CONTINUE,
> + QEMU_VM_TRANSACTION_COMMIT,
> + QEMU_VM_TRANSACTION_CANCEL,
> + QEMU_VM_TRANSACTION_ATOMIC,
> + QEMU_VM_TRANSACTION_ACK,
> +};
> +
> +enum FT_MODE {
> + FT_ERROR = -1,
> + FT_OFF,
> + FT_INIT,
> + FT_TRANSACTION_BEGIN,
> + FT_TRANSACTION_ITER,
> + FT_TRANSACTION_COMMIT,
> + FT_TRANSACTION_ATOMIC,
> + FT_TRANSACTION_RECV,
> +};
> +extern enum FT_MODE ft_mode;
> +
> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */
> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */
> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */
> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */
> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */
> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */
> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */
> +
> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data,
> size_t size);
> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t
> pos, size_t size);
> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec
> *iov, int iovcnt);
> +typedef int (FtTransPutReadyFunc)(void);
> +typedef int (FtTransGetReadyFunc)(void *opaque);
> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque);
> +typedef int (FtTransCloseFunc)(void *opaque);
> +
> +int ft_trans_begin(void *opaque);
> +int ft_trans_commit(void *opaque);
> +int ft_trans_cancel(void *opaque);
> +
> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
> + FtTransPutBufferFunc *put_buffer,
> + FtTransGetBufferFunc *get_buffer,
> + FtTransPutReadyFunc *put_ready,
> + FtTransGetReadyFunc *get_ready,
> + FtTransWaitForUnfreezeFunc
> *wait_for_unfreeze,
> + FtTransCloseFunc *close,
> + bool is_sender);
> +
> +#endif
> diff --git a/migration.c b/migration.c
> index dd3bf94..c5e0146 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -15,6 +15,7 @@
> #include "migration.h"
> #include "monitor.h"
> #include "buffered_file.h"
> +#include "ft_trans_file.h"
> #include "sysemu.h"
> #include "block.h"
> #include "qemu_socket.h"
> @@ -31,6 +32,8 @@
> do { } while (0)
> #endif
>
> +enum FT_MODE ft_mode = FT_OFF;
> +
> /* Migration speed throttling */
> static int64_t max_throttle = (32 << 20);
>
> diff --git a/trace-events b/trace-events
> index e6138ea..50ac840 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice
> wrottn %lu of requested %zd
> disable spice_vmc_read(int bytes, int len) "spice read %lu of requested
> %zd"
> disable spice_vmc_register_interface(void *scd) "spice vmc registered
> interface %p"
> disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered
> interface %p"
> +
> +# ft_trans_file.c
> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing
> buffer from %zu by %zu"
> +disable ft_trans_append(size_t size) "buffering %zu bytes"
> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu bytes"
> +disable ft_trans_send_header(uint16_t cmd) "send header %d"
> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d"
> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes at
> %"PRId64""
> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total)
> "recv %d of %d total %d"
> +disable ft_trans_close(void) "closing"
> +disable ft_trans_freeze_output(void) "backend not ready, freezing output"
> +disable ft_trans_freeze_input(void) "backend not ready, freezing input"
> +disable ft_trans_put_ready(void) "file is ready to put"
> +disable ft_trans_get_ready(void) "file is ready to get"
> +disable ft_trans_cb(void *cb) "callback %p"
> --
> 1.7.1.2
>
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
[-- Attachment #2: Type: text/html, Size: 27588 bytes --]
^ permalink raw reply [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] Re: [PATCH 07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode.
2011-02-21 4:46 ` [Qemu-devel] " ya su
@ 2011-02-21 9:42 ` Yoshiaki Tamura
2011-02-23 2:28 ` ya su
0 siblings, 1 reply; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-21 9:42 UTC (permalink / raw)
To: ya su
Cc: kwolf, aliguori, mtosatti, ananth, kvm, mst, dlaor, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
Hi Green,
2011/2/21 ya su <suya94335@gmail.com>:
> Yoshiaki:
>
> I have one question about ram_save_live, during migration 3
> stage(completation stage), it will call
> cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty pages.
> at the end of migrate_ft_trans_connect function, it will invoke vm_start(),
> at this time, cpu_physical_memory_set_dirty_tracking(1) is not called yet,
> so there may have some ram pages not recorded when qemu_savevm_trans_begin
> is called. I think you need calll
> cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect
> function, Am I right?
Thank you for taking a look.
When qemu_savevm_trans_begin is called for the first time, it
calls ram_save_live with stage 1, that sends all pages and sets
dirty tracking, so there won't be missing pages. Note that
event-tap is turned on by then, meaning no outputs are sent before
finishing the first transaction. I understand that this
implementation is inefficient, and planning to introduce a new
stage that is almost same as stage 3 but keeps dirty tracking in
the future.
Thanks,
Yoshi
>
> BR
>
> Green.
>
>
> 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
>>
>> This code implements VM transaction protocol. Like buffered_file, it
>> sits between savevm and migration layer. With this architecture, VM
>> transaction protocol is implemented mostly independent from other
>> existing code.
>>
>> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
>> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
>> ---
>> Makefile.objs | 1 +
>> ft_trans_file.c | 624
>> +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>> ft_trans_file.h | 72 +++++++
>> migration.c | 3 +
>> trace-events | 15 ++
>> 5 files changed, 715 insertions(+), 0 deletions(-)
>> create mode 100644 ft_trans_file.c
>> create mode 100644 ft_trans_file.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index 353b1a8..04148b5 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o
>> common-obj-y += qdev.o qdev-properties.o
>> common-obj-y += block-migration.o
>> common-obj-y += pflib.o
>> +common-obj-y += ft_trans_file.o
>>
>> common-obj-$(CONFIG_BRLAPI) += baum.o
>> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o
>> migration-fd.o
>> diff --git a/ft_trans_file.c b/ft_trans_file.c
>> new file mode 100644
>> index 0000000..2b42b95
>> --- /dev/null
>> +++ b/ft_trans_file.c
>> @@ -0,0 +1,624 @@
>> +/*
>> + * Fault tolerant VM transaction QEMUFile
>> + *
>> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2. See
>> + * the COPYING file in the top-level directory.
>> + *
>> + * This source code is based on buffered_file.c.
>> + * Copyright IBM, Corp. 2008
>> + * Authors:
>> + * Anthony Liguori <aliguori@us.ibm.com>
>> + */
>> +
>> +#include "qemu-common.h"
>> +#include "qemu-error.h"
>> +#include "hw/hw.h"
>> +#include "qemu-timer.h"
>> +#include "sysemu.h"
>> +#include "qemu-char.h"
>> +#include "trace.h"
>> +#include "ft_trans_file.h"
>> +
>> +typedef struct FtTransHdr
>> +{
>> + uint16_t cmd;
>> + uint16_t id;
>> + uint32_t seq;
>> + uint32_t payload_len;
>> +} FtTransHdr;
>> +
>> +typedef struct QEMUFileFtTrans
>> +{
>> + FtTransPutBufferFunc *put_buffer;
>> + FtTransGetBufferFunc *get_buffer;
>> + FtTransPutReadyFunc *put_ready;
>> + FtTransGetReadyFunc *get_ready;
>> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze;
>> + FtTransCloseFunc *close;
>> + void *opaque;
>> + QEMUFile *file;
>> +
>> + enum QEMU_VM_TRANSACTION_STATE state;
>> + uint32_t seq;
>> + uint16_t id;
>> +
>> + int has_error;
>> +
>> + bool freeze_output;
>> + bool freeze_input;
>> + bool rate_limit;
>> + bool is_sender;
>> + bool is_payload;
>> +
>> + uint8_t *buf;
>> + size_t buf_max_size;
>> + size_t put_offset;
>> + size_t get_offset;
>> +
>> + FtTransHdr header;
>> + size_t header_offset;
>> +} QEMUFileFtTrans;
>> +
>> +#define IO_BUF_SIZE 32768
>> +
>> +static void ft_trans_append(QEMUFileFtTrans *s,
>> + const uint8_t *buf, size_t size)
>> +{
>> + if (size > (s->buf_max_size - s->put_offset)) {
>> + trace_ft_trans_realloc(s->buf_max_size, size + 1024);
>> + s->buf_max_size += size + 1024;
>> + s->buf = qemu_realloc(s->buf, s->buf_max_size);
>> + }
>> +
>> + trace_ft_trans_append(size);
>> + memcpy(s->buf + s->put_offset, buf, size);
>> + s->put_offset += size;
>> +}
>> +
>> +static void ft_trans_flush(QEMUFileFtTrans *s)
>> +{
>> + size_t offset = 0;
>> +
>> + if (s->has_error) {
>> + error_report("flush when error %d, bailing", s->has_error);
>> + return;
>> + }
>> +
>> + while (offset < s->put_offset) {
>> + ssize_t ret;
>> +
>> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset -
>> offset);
>> + if (ret == -EAGAIN) {
>> + break;
>> + }
>> +
>> + if (ret <= 0) {
>> + error_report("error flushing data, %s", strerror(errno));
>> + s->has_error = FT_TRANS_ERR_FLUSH;
>> + break;
>> + } else {
>> + offset += ret;
>> + }
>> + }
>> +
>> + trace_ft_trans_flush(offset, s->put_offset);
>> + memmove(s->buf, s->buf + offset, s->put_offset - offset);
>> + s->put_offset -= offset;
>> + s->freeze_output = !!s->put_offset;
>> +}
>> +
>> +static ssize_t ft_trans_put(void *opaque, void *buf, int size)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> + size_t offset = 0;
>> + ssize_t len;
>> +
>> + /* flush buffered data before putting next */
>> + if (s->put_offset) {
>> + ft_trans_flush(s);
>> + }
>> +
>> + while (!s->freeze_output && offset < size) {
>> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size -
>> offset);
>> +
>> + if (len == -EAGAIN) {
>> + trace_ft_trans_freeze_output();
>> + s->freeze_output = 1;
>> + break;
>> + }
>> +
>> + if (len <= 0) {
>> + error_report("putting data failed, %s", strerror(errno));
>> + s->has_error = 1;
>> + offset = -EINVAL;
>> + break;
>> + }
>> +
>> + offset += len;
>> + }
>> +
>> + if (s->freeze_output) {
>> + ft_trans_append(s, buf + offset, size - offset);
>> + offset = size;
>> + }
>> +
>> + return offset;
>> +}
>> +
>> +static int ft_trans_send_header(QEMUFileFtTrans *s,
>> + enum QEMU_VM_TRANSACTION_STATE state,
>> + uint32_t payload_len)
>> +{
>> + int ret;
>> + FtTransHdr *hdr = &s->header;
>> +
>> + trace_ft_trans_send_header(state);
>> +
>> + hdr->cmd = s->state = state;
>> + hdr->id = s->id;
>> + hdr->seq = s->seq;
>> + hdr->payload_len = payload_len;
>> +
>> + ret = ft_trans_put(s, hdr, sizeof(*hdr));
>> + if (ret < 0) {
>> + error_report("send header failed");
>> + s->has_error = FT_TRANS_ERR_SEND_HDR;
>> + }
>> +
>> + return ret;
>> +}
>> +
>> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t
>> pos, int size)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> + ssize_t ret;
>> +
>> + trace_ft_trans_put_buffer(size, pos);
>> +
>> + if (s->has_error) {
>> + error_report("put_buffer when error %d, bailing", s->has_error);
>> + return -EINVAL;
>> + }
>> +
>> + /* assuming qemu_file_put_notify() is calling */
>> + if (pos == 0 && size == 0) {
>> + trace_ft_trans_put_ready();
>> + ft_trans_flush(s);
>> +
>> + if (!s->freeze_output) {
>> + trace_ft_trans_cb(s->put_ready);
>> + ret = s->put_ready();
>> + }
>> +
>> + ret = 0;
>> + goto out;
>> + }
>> +
>> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size);
>> + if (ret < 0) {
>> + goto out;
>> + }
>> +
>> + ret = ft_trans_put(s, (uint8_t *)buf, size);
>> + if (ret < 0) {
>> + error_report("send palyload failed");
>> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD;
>> + goto out;
>> + }
>> +
>> + s->seq++;
>> +
>> +out:
>> + return ret;
>> +}
>> +
>> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> + size_t offset = 0;
>> + ssize_t len;
>> +
>> + s->freeze_input = 0;
>> +
>> + while (offset < size) {
>> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
>> + 0, size - offset);
>> + if (len == -EAGAIN) {
>> + trace_ft_trans_freeze_input();
>> + s->freeze_input = 1;
>> + break;
>> + }
>> +
>> + if (len <= 0) {
>> + error_report("fill buffer failed, %s", strerror(errno));
>> + s->has_error = 1;
>> + return -EINVAL;
>> + }
>> +
>> + offset += len;
>> + }
>> +
>> + return offset;
>> +}
>> +
>> +static int ft_trans_recv_header(QEMUFileFtTrans *s)
>> +{
>> + int ret;
>> + char *buf = (char *)&s->header + s->header_offset;
>> +
>> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) -
>> s->header_offset);
>> + if (ret < 0) {
>> + error_report("recv header failed");
>> + s->has_error = FT_TRANS_ERR_RECV_HDR;
>> + goto out;
>> + }
>> +
>> + s->header_offset += ret;
>> + if (s->header_offset == sizeof(FtTransHdr)) {
>> + trace_ft_trans_recv_header(s->header.cmd);
>> + s->state = s->header.cmd;
>> + s->header_offset = 0;
>> +
>> + if (!s->is_sender) {
>> + s->id = s->header.id;
>> + s->seq = s->header.seq;
>> + }
>> + }
>> +
>> +out:
>> + return ret;
>> +}
>> +
>> +static int ft_trans_recv_payload(QEMUFileFtTrans *s)
>> +{
>> + QEMUFile *f = s->file;
>> + int ret = -1;
>> +
>> + /* extend QEMUFile buf if there weren't enough space */
>> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) {
>> + s->buf_max_size += (s->header.payload_len -
>> + (s->buf_max_size - s->get_offset));
>> + s->buf = qemu_realloc_buffer(f, s->buf_max_size);
>> + }
>> +
>> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset,
>> + s->header.payload_len);
>> + if (ret < 0) {
>> + error_report("recv payload failed");
>> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD;
>> + goto out;
>> + }
>> +
>> + trace_ft_trans_recv_payload(ret, s->header.payload_len,
>> s->get_offset);
>> +
>> + s->header.payload_len -= ret;
>> + s->get_offset += ret;
>> + s->is_payload = !!s->header.payload_len;
>> +
>> +out:
>> + return ret;
>> +}
>> +
>> +static int ft_trans_recv(QEMUFileFtTrans *s)
>> +{
>> + int ret;
>> +
>> + /* get payload and return */
>> + if (s->is_payload) {
>> + ret = ft_trans_recv_payload(s);
>> + goto out;
>> + }
>> +
>> + ret = ft_trans_recv_header(s);
>> + if (ret < 0 || s->freeze_input) {
>> + goto out;
>> + }
>> +
>> + switch (s->state) {
>> + case QEMU_VM_TRANSACTION_BEGIN:
>> + /* CONTINUE or COMMIT should come shortly */
>> + s->is_payload = 0;
>> + break;
>> +
>> + case QEMU_VM_TRANSACTION_CONTINUE:
>> + /* get payload */
>> + s->is_payload = 1;
>> + break;
>> +
>> + case QEMU_VM_TRANSACTION_COMMIT:
>> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
>> + if (ret < 0) {
>> + goto out;
>> + }
>> +
>> + trace_ft_trans_cb(s->get_ready);
>> + ret = s->get_ready(s->opaque);
>> + if (ret < 0) {
>> + goto out;
>> + }
>> +
>> + qemu_clear_buffer(s->file);
>> + s->get_offset = 0;
>> + s->is_payload = 0;
>> +
>> + break;
>> +
>> + case QEMU_VM_TRANSACTION_ATOMIC:
>> + /* not implemented yet */
>> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d",
>> + ret);
>> + break;
>> +
>> + case QEMU_VM_TRANSACTION_CANCEL:
>> + /* return -EINVAL until migrate cancel on recevier side is
>> supported */
>> + ret = -EINVAL;
>> + break;
>> +
>> + default:
>> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret);
>> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> + ret = -EINVAL;
>> + }
>> +
>> +out:
>> + return ret;
>> +}
>> +
>> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf,
>> + int64_t pos, int size)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> + int ret;
>> +
>> + if (s->has_error) {
>> + error_report("get_buffer when error %d, bailing", s->has_error);
>> + return -EINVAL;
>> + }
>> +
>> + /* assuming qemu_file_get_notify() is calling */
>> + if (pos == 0 && size == 0) {
>> + trace_ft_trans_get_ready();
>> + s->freeze_input = 0;
>> +
>> + /* sender should be waiting for ACK */
>> + if (s->is_sender) {
>> + ret = ft_trans_recv_header(s);
>> + if (s->freeze_input) {
>> + ret = 0;
>> + goto out;
>> + }
>> + if (ret < 0) {
>> + error_report("recv ack failed");
>> + goto out;
>> + }
>> +
>> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
>> + error_report("recv invalid state %d", s->state);
>> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> + ret = -EINVAL;
>> + goto out;
>> + }
>> +
>> + trace_ft_trans_cb(s->get_ready);
>> + ret = s->get_ready(s->opaque);
>> + if (ret < 0) {
>> + goto out;
>> + }
>> +
>> + /* proceed trans id */
>> + s->id++;
>> +
>> + return 0;
>> + }
>> +
>> + /* set QEMUFile buf at beginning */
>> + if (!s->buf) {
>> + s->buf = buf;
>> + }
>> +
>> + ret = ft_trans_recv(s);
>> + goto out;
>> + }
>> +
>> + ret = s->get_offset;
>> +
>> +out:
>> + return ret;
>> +}
>> +
>> +static int ft_trans_close(void *opaque)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> + int ret;
>> +
>> + trace_ft_trans_close();
>> + ret = s->close(s->opaque);
>> + if (s->is_sender) {
>> + qemu_free(s->buf);
>> + }
>> + qemu_free(s);
>> +
>> + return ret;
>> +}
>> +
>> +static int ft_trans_rate_limit(void *opaque)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> +
>> + if (s->has_error) {
>> + return 0;
>> + }
>> +
>> + if (s->rate_limit && s->freeze_output) {
>> + return 1;
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> +
>> + if (s->has_error) {
>> + goto out;
>> + }
>> +
>> + s->rate_limit = !!new_rate;
>> +
>> +out:
>> + return s->rate_limit;
>> +}
>> +
>> +int ft_trans_begin(void *opaque)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> + int ret;
>> + s->seq = 0;
>> +
>> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */
>> + if (!s->is_sender) {
>> + if (s->state != QEMU_VM_TRANSACTION_INIT) {
>> + error_report("invalid state %d", s->state);
>> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> + ret = -EINVAL;
>> + }
>> +
>> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
>> + goto out;
>> + }
>> +
>> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */
>> + if (s->state == QEMU_VM_TRANSACTION_INIT) {
>> +retry:
>> + ret = ft_trans_recv_header(s);
>> + if (s->freeze_input) {
>> + goto retry;
>> + }
>> + if (ret < 0) {
>> + error_report("recv ack failed");
>> + goto out;
>> + }
>> +
>> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
>> + error_report("recv invalid state %d", s->state);
>> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> + ret = -EINVAL;
>> + goto out;
>> + }
>> + }
>> +
>> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0);
>> + if (ret < 0) {
>> + goto out;
>> + }
>> +
>> + s->state = QEMU_VM_TRANSACTION_CONTINUE;
>> +
>> +out:
>> + return ret;
>> +}
>> +
>> +int ft_trans_commit(void *opaque)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> + int ret;
>> +
>> + if (!s->is_sender) {
>> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
>> + goto out;
>> + }
>> +
>> + /* sender should flush buf before sending COMMIT */
>> + qemu_fflush(s->file);
>> +
>> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0);
>> + if (ret < 0) {
>> + goto out;
>> + }
>> +
>> + while (!s->has_error && s->put_offset) {
>> + ft_trans_flush(s);
>> + if (s->freeze_output) {
>> + s->wait_for_unfreeze(s);
>> + }
>> + }
>> +
>> + if (s->has_error) {
>> + ret = -EINVAL;
>> + goto out;
>> + }
>> +
>> + ret = ft_trans_recv_header(s);
>> + if (s->freeze_input) {
>> + ret = -EAGAIN;
>> + goto out;
>> + }
>> + if (ret < 0) {
>> + error_report("recv ack failed");
>> + goto out;
>> + }
>> +
>> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
>> + error_report("recv invalid state %d", s->state);
>> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> + ret = -EINVAL;
>> + goto out;
>> + }
>> +
>> + s->id++;
>> + ret = 0;
>> +
>> +out:
>> + return ret;
>> +}
>> +
>> +int ft_trans_cancel(void *opaque)
>> +{
>> + QEMUFileFtTrans *s = opaque;
>> +
>> + /* invalid until migrate cancel on recevier side is supported */
>> + if (!s->is_sender) {
>> + return -EINVAL;
>> + }
>> +
>> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0);
>> +}
>> +
>> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
>> + FtTransPutBufferFunc *put_buffer,
>> + FtTransGetBufferFunc *get_buffer,
>> + FtTransPutReadyFunc *put_ready,
>> + FtTransGetReadyFunc *get_ready,
>> + FtTransWaitForUnfreezeFunc
>> *wait_for_unfreeze,
>> + FtTransCloseFunc *close,
>> + bool is_sender)
>> +{
>> + QEMUFileFtTrans *s;
>> +
>> + s = qemu_mallocz(sizeof(*s));
>> +
>> + s->opaque = opaque;
>> + s->put_buffer = put_buffer;
>> + s->get_buffer = get_buffer;
>> + s->put_ready = put_ready;
>> + s->get_ready = get_ready;
>> + s->wait_for_unfreeze = wait_for_unfreeze;
>> + s->close = close;
>> + s->is_sender = is_sender;
>> + s->id = 0;
>> + s->seq = 0;
>> + s->rate_limit = 1;
>> +
>> + if (!s->is_sender) {
>> + s->buf_max_size = IO_BUF_SIZE;
>> + }
>> +
>> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer,
>> + ft_trans_close, ft_trans_rate_limit,
>> + ft_trans_set_rate_limit, NULL);
>> +
>> + return s->file;
>> +}
>> diff --git a/ft_trans_file.h b/ft_trans_file.h
>> new file mode 100644
>> index 0000000..5ca6b53
>> --- /dev/null
>> +++ b/ft_trans_file.h
>> @@ -0,0 +1,72 @@
>> +/*
>> + * Fault tolerant VM transaction QEMUFile
>> + *
>> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2. See
>> + * the COPYING file in the top-level directory.
>> + *
>> + * This source code is based on buffered_file.h.
>> + * Copyright IBM, Corp. 2008
>> + * Authors:
>> + * Anthony Liguori <aliguori@us.ibm.com>
>> + */
>> +
>> +#ifndef QEMU_FT_TRANSACTION_FILE_H
>> +#define QEMU_FT_TRANSACTION_FILE_H
>> +
>> +#include "hw/hw.h"
>> +
>> +enum QEMU_VM_TRANSACTION_STATE {
>> + QEMU_VM_TRANSACTION_NACK = -1,
>> + QEMU_VM_TRANSACTION_INIT,
>> + QEMU_VM_TRANSACTION_BEGIN,
>> + QEMU_VM_TRANSACTION_CONTINUE,
>> + QEMU_VM_TRANSACTION_COMMIT,
>> + QEMU_VM_TRANSACTION_CANCEL,
>> + QEMU_VM_TRANSACTION_ATOMIC,
>> + QEMU_VM_TRANSACTION_ACK,
>> +};
>> +
>> +enum FT_MODE {
>> + FT_ERROR = -1,
>> + FT_OFF,
>> + FT_INIT,
>> + FT_TRANSACTION_BEGIN,
>> + FT_TRANSACTION_ITER,
>> + FT_TRANSACTION_COMMIT,
>> + FT_TRANSACTION_ATOMIC,
>> + FT_TRANSACTION_RECV,
>> +};
>> +extern enum FT_MODE ft_mode;
>> +
>> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */
>> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */
>> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */
>> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */
>> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */
>> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */
>> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */
>> +
>> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data,
>> size_t size);
>> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t
>> pos, size_t size);
>> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec
>> *iov, int iovcnt);
>> +typedef int (FtTransPutReadyFunc)(void);
>> +typedef int (FtTransGetReadyFunc)(void *opaque);
>> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque);
>> +typedef int (FtTransCloseFunc)(void *opaque);
>> +
>> +int ft_trans_begin(void *opaque);
>> +int ft_trans_commit(void *opaque);
>> +int ft_trans_cancel(void *opaque);
>> +
>> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
>> + FtTransPutBufferFunc *put_buffer,
>> + FtTransGetBufferFunc *get_buffer,
>> + FtTransPutReadyFunc *put_ready,
>> + FtTransGetReadyFunc *get_ready,
>> + FtTransWaitForUnfreezeFunc
>> *wait_for_unfreeze,
>> + FtTransCloseFunc *close,
>> + bool is_sender);
>> +
>> +#endif
>> diff --git a/migration.c b/migration.c
>> index dd3bf94..c5e0146 100644
>> --- a/migration.c
>> +++ b/migration.c
>> @@ -15,6 +15,7 @@
>> #include "migration.h"
>> #include "monitor.h"
>> #include "buffered_file.h"
>> +#include "ft_trans_file.h"
>> #include "sysemu.h"
>> #include "block.h"
>> #include "qemu_socket.h"
>> @@ -31,6 +32,8 @@
>> do { } while (0)
>> #endif
>>
>> +enum FT_MODE ft_mode = FT_OFF;
>> +
>> /* Migration speed throttling */
>> static int64_t max_throttle = (32 << 20);
>>
>> diff --git a/trace-events b/trace-events
>> index e6138ea..50ac840 100644
>> --- a/trace-events
>> +++ b/trace-events
>> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice
>> wrottn %lu of requested %zd
>> disable spice_vmc_read(int bytes, int len) "spice read %lu of requested
>> %zd"
>> disable spice_vmc_register_interface(void *scd) "spice vmc registered
>> interface %p"
>> disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered
>> interface %p"
>> +
>> +# ft_trans_file.c
>> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing
>> buffer from %zu by %zu"
>> +disable ft_trans_append(size_t size) "buffering %zu bytes"
>> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu
>> bytes"
>> +disable ft_trans_send_header(uint16_t cmd) "send header %d"
>> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d"
>> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes
>> at %"PRId64""
>> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total)
>> "recv %d of %d total %d"
>> +disable ft_trans_close(void) "closing"
>> +disable ft_trans_freeze_output(void) "backend not ready, freezing output"
>> +disable ft_trans_freeze_input(void) "backend not ready, freezing input"
>> +disable ft_trans_put_ready(void) "file is ready to put"
>> +disable ft_trans_get_ready(void) "file is ready to get"
>> +disable ft_trans_cb(void *cb) "callback %p"
>> --
>> 1.7.1.2
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe kvm" in
>> the body of a message to majordomo@vger.kernel.org
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] Re: [PATCH 07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode.
2011-02-21 9:42 ` Yoshiaki Tamura
@ 2011-02-23 2:28 ` ya su
2011-02-23 5:05 ` Yoshiaki Tamura
0 siblings, 1 reply; 35+ messages in thread
From: ya su @ 2011-02-23 2:28 UTC (permalink / raw)
To: Yoshiaki Tamura
Cc: kwolf, aliguori, mtosatti, ananth, kvm, mst, dlaor, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
[-- Attachment #1: Type: text/plain, Size: 26789 bytes --]
Yoshi:
thanks for your explaining.
if you introduce a new stage as 3, I think stage 1 also need to change as
it will mark all pages dirty.
looking forward to your new patch update.
Green.
2011/2/21 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
> Hi Green,
>
> 2011/2/21 ya su <suya94335@gmail.com>:
> > Yoshiaki:
> >
> > I have one question about ram_save_live, during migration 3
> > stage(completation stage), it will call
> > cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty
> pages.
> > at the end of migrate_ft_trans_connect function, it will invoke
> vm_start(),
> > at this time, cpu_physical_memory_set_dirty_tracking(1) is not called
> yet,
> > so there may have some ram pages not recorded when
> qemu_savevm_trans_begin
> > is called. I think you need calll
> > cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect
> > function, Am I right?
>
> Thank you for taking a look.
> When qemu_savevm_trans_begin is called for the first time, it
> calls ram_save_live with stage 1, that sends all pages and sets
> dirty tracking, so there won't be missing pages. Note that
> event-tap is turned on by then, meaning no outputs are sent before
> finishing the first transaction. I understand that this
> implementation is inefficient, and planning to introduce a new
> stage that is almost same as stage 3 but keeps dirty tracking in
> the future.
>
> Thanks,
>
> Yoshi
>
> >
> > BR
> >
> > Green.
> >
> >
> > 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
> >>
> >> This code implements VM transaction protocol. Like buffered_file, it
> >> sits between savevm and migration layer. With this architecture, VM
> >> transaction protocol is implemented mostly independent from other
> >> existing code.
> >>
> >> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
> >> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
> >> ---
> >> Makefile.objs | 1 +
> >> ft_trans_file.c | 624
> >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++
> >> ft_trans_file.h | 72 +++++++
> >> migration.c | 3 +
> >> trace-events | 15 ++
> >> 5 files changed, 715 insertions(+), 0 deletions(-)
> >> create mode 100644 ft_trans_file.c
> >> create mode 100644 ft_trans_file.h
> >>
> >> diff --git a/Makefile.objs b/Makefile.objs
> >> index 353b1a8..04148b5 100644
> >> --- a/Makefile.objs
> >> +++ b/Makefile.objs
> >> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o
> >> common-obj-y += qdev.o qdev-properties.o
> >> common-obj-y += block-migration.o
> >> common-obj-y += pflib.o
> >> +common-obj-y += ft_trans_file.o
> >>
> >> common-obj-$(CONFIG_BRLAPI) += baum.o
> >> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o
> >> migration-fd.o
> >> diff --git a/ft_trans_file.c b/ft_trans_file.c
> >> new file mode 100644
> >> index 0000000..2b42b95
> >> --- /dev/null
> >> +++ b/ft_trans_file.c
> >> @@ -0,0 +1,624 @@
> >> +/*
> >> + * Fault tolerant VM transaction QEMUFile
> >> + *
> >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
> >> + *
> >> + * This work is licensed under the terms of the GNU GPL, version 2.
> See
> >> + * the COPYING file in the top-level directory.
> >> + *
> >> + * This source code is based on buffered_file.c.
> >> + * Copyright IBM, Corp. 2008
> >> + * Authors:
> >> + * Anthony Liguori <aliguori@us.ibm.com>
> >> + */
> >> +
> >> +#include "qemu-common.h"
> >> +#include "qemu-error.h"
> >> +#include "hw/hw.h"
> >> +#include "qemu-timer.h"
> >> +#include "sysemu.h"
> >> +#include "qemu-char.h"
> >> +#include "trace.h"
> >> +#include "ft_trans_file.h"
> >> +
> >> +typedef struct FtTransHdr
> >> +{
> >> + uint16_t cmd;
> >> + uint16_t id;
> >> + uint32_t seq;
> >> + uint32_t payload_len;
> >> +} FtTransHdr;
> >> +
> >> +typedef struct QEMUFileFtTrans
> >> +{
> >> + FtTransPutBufferFunc *put_buffer;
> >> + FtTransGetBufferFunc *get_buffer;
> >> + FtTransPutReadyFunc *put_ready;
> >> + FtTransGetReadyFunc *get_ready;
> >> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze;
> >> + FtTransCloseFunc *close;
> >> + void *opaque;
> >> + QEMUFile *file;
> >> +
> >> + enum QEMU_VM_TRANSACTION_STATE state;
> >> + uint32_t seq;
> >> + uint16_t id;
> >> +
> >> + int has_error;
> >> +
> >> + bool freeze_output;
> >> + bool freeze_input;
> >> + bool rate_limit;
> >> + bool is_sender;
> >> + bool is_payload;
> >> +
> >> + uint8_t *buf;
> >> + size_t buf_max_size;
> >> + size_t put_offset;
> >> + size_t get_offset;
> >> +
> >> + FtTransHdr header;
> >> + size_t header_offset;
> >> +} QEMUFileFtTrans;
> >> +
> >> +#define IO_BUF_SIZE 32768
> >> +
> >> +static void ft_trans_append(QEMUFileFtTrans *s,
> >> + const uint8_t *buf, size_t size)
> >> +{
> >> + if (size > (s->buf_max_size - s->put_offset)) {
> >> + trace_ft_trans_realloc(s->buf_max_size, size + 1024);
> >> + s->buf_max_size += size + 1024;
> >> + s->buf = qemu_realloc(s->buf, s->buf_max_size);
> >> + }
> >> +
> >> + trace_ft_trans_append(size);
> >> + memcpy(s->buf + s->put_offset, buf, size);
> >> + s->put_offset += size;
> >> +}
> >> +
> >> +static void ft_trans_flush(QEMUFileFtTrans *s)
> >> +{
> >> + size_t offset = 0;
> >> +
> >> + if (s->has_error) {
> >> + error_report("flush when error %d, bailing", s->has_error);
> >> + return;
> >> + }
> >> +
> >> + while (offset < s->put_offset) {
> >> + ssize_t ret;
> >> +
> >> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset -
> >> offset);
> >> + if (ret == -EAGAIN) {
> >> + break;
> >> + }
> >> +
> >> + if (ret <= 0) {
> >> + error_report("error flushing data, %s", strerror(errno));
> >> + s->has_error = FT_TRANS_ERR_FLUSH;
> >> + break;
> >> + } else {
> >> + offset += ret;
> >> + }
> >> + }
> >> +
> >> + trace_ft_trans_flush(offset, s->put_offset);
> >> + memmove(s->buf, s->buf + offset, s->put_offset - offset);
> >> + s->put_offset -= offset;
> >> + s->freeze_output = !!s->put_offset;
> >> +}
> >> +
> >> +static ssize_t ft_trans_put(void *opaque, void *buf, int size)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> + size_t offset = 0;
> >> + ssize_t len;
> >> +
> >> + /* flush buffered data before putting next */
> >> + if (s->put_offset) {
> >> + ft_trans_flush(s);
> >> + }
> >> +
> >> + while (!s->freeze_output && offset < size) {
> >> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size -
> >> offset);
> >> +
> >> + if (len == -EAGAIN) {
> >> + trace_ft_trans_freeze_output();
> >> + s->freeze_output = 1;
> >> + break;
> >> + }
> >> +
> >> + if (len <= 0) {
> >> + error_report("putting data failed, %s", strerror(errno));
> >> + s->has_error = 1;
> >> + offset = -EINVAL;
> >> + break;
> >> + }
> >> +
> >> + offset += len;
> >> + }
> >> +
> >> + if (s->freeze_output) {
> >> + ft_trans_append(s, buf + offset, size - offset);
> >> + offset = size;
> >> + }
> >> +
> >> + return offset;
> >> +}
> >> +
> >> +static int ft_trans_send_header(QEMUFileFtTrans *s,
> >> + enum QEMU_VM_TRANSACTION_STATE state,
> >> + uint32_t payload_len)
> >> +{
> >> + int ret;
> >> + FtTransHdr *hdr = &s->header;
> >> +
> >> + trace_ft_trans_send_header(state);
> >> +
> >> + hdr->cmd = s->state = state;
> >> + hdr->id = s->id;
> >> + hdr->seq = s->seq;
> >> + hdr->payload_len = payload_len;
> >> +
> >> + ret = ft_trans_put(s, hdr, sizeof(*hdr));
> >> + if (ret < 0) {
> >> + error_report("send header failed");
> >> + s->has_error = FT_TRANS_ERR_SEND_HDR;
> >> + }
> >> +
> >> + return ret;
> >> +}
> >> +
> >> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf,
> int64_t
> >> pos, int size)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> + ssize_t ret;
> >> +
> >> + trace_ft_trans_put_buffer(size, pos);
> >> +
> >> + if (s->has_error) {
> >> + error_report("put_buffer when error %d, bailing",
> s->has_error);
> >> + return -EINVAL;
> >> + }
> >> +
> >> + /* assuming qemu_file_put_notify() is calling */
> >> + if (pos == 0 && size == 0) {
> >> + trace_ft_trans_put_ready();
> >> + ft_trans_flush(s);
> >> +
> >> + if (!s->freeze_output) {
> >> + trace_ft_trans_cb(s->put_ready);
> >> + ret = s->put_ready();
> >> + }
> >> +
> >> + ret = 0;
> >> + goto out;
> >> + }
> >> +
> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size);
> >> + if (ret < 0) {
> >> + goto out;
> >> + }
> >> +
> >> + ret = ft_trans_put(s, (uint8_t *)buf, size);
> >> + if (ret < 0) {
> >> + error_report("send palyload failed");
> >> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD;
> >> + goto out;
> >> + }
> >> +
> >> + s->seq++;
> >> +
> >> +out:
> >> + return ret;
> >> +}
> >> +
> >> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> + size_t offset = 0;
> >> + ssize_t len;
> >> +
> >> + s->freeze_input = 0;
> >> +
> >> + while (offset < size) {
> >> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
> >> + 0, size - offset);
> >> + if (len == -EAGAIN) {
> >> + trace_ft_trans_freeze_input();
> >> + s->freeze_input = 1;
> >> + break;
> >> + }
> >> +
> >> + if (len <= 0) {
> >> + error_report("fill buffer failed, %s", strerror(errno));
> >> + s->has_error = 1;
> >> + return -EINVAL;
> >> + }
> >> +
> >> + offset += len;
> >> + }
> >> +
> >> + return offset;
> >> +}
> >> +
> >> +static int ft_trans_recv_header(QEMUFileFtTrans *s)
> >> +{
> >> + int ret;
> >> + char *buf = (char *)&s->header + s->header_offset;
> >> +
> >> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) -
> >> s->header_offset);
> >> + if (ret < 0) {
> >> + error_report("recv header failed");
> >> + s->has_error = FT_TRANS_ERR_RECV_HDR;
> >> + goto out;
> >> + }
> >> +
> >> + s->header_offset += ret;
> >> + if (s->header_offset == sizeof(FtTransHdr)) {
> >> + trace_ft_trans_recv_header(s->header.cmd);
> >> + s->state = s->header.cmd;
> >> + s->header_offset = 0;
> >> +
> >> + if (!s->is_sender) {
> >> + s->id = s->header.id;
> >> + s->seq = s->header.seq;
> >> + }
> >> + }
> >> +
> >> +out:
> >> + return ret;
> >> +}
> >> +
> >> +static int ft_trans_recv_payload(QEMUFileFtTrans *s)
> >> +{
> >> + QEMUFile *f = s->file;
> >> + int ret = -1;
> >> +
> >> + /* extend QEMUFile buf if there weren't enough space */
> >> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) {
> >> + s->buf_max_size += (s->header.payload_len -
> >> + (s->buf_max_size - s->get_offset));
> >> + s->buf = qemu_realloc_buffer(f, s->buf_max_size);
> >> + }
> >> +
> >> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset,
> >> + s->header.payload_len);
> >> + if (ret < 0) {
> >> + error_report("recv payload failed");
> >> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD;
> >> + goto out;
> >> + }
> >> +
> >> + trace_ft_trans_recv_payload(ret, s->header.payload_len,
> >> s->get_offset);
> >> +
> >> + s->header.payload_len -= ret;
> >> + s->get_offset += ret;
> >> + s->is_payload = !!s->header.payload_len;
> >> +
> >> +out:
> >> + return ret;
> >> +}
> >> +
> >> +static int ft_trans_recv(QEMUFileFtTrans *s)
> >> +{
> >> + int ret;
> >> +
> >> + /* get payload and return */
> >> + if (s->is_payload) {
> >> + ret = ft_trans_recv_payload(s);
> >> + goto out;
> >> + }
> >> +
> >> + ret = ft_trans_recv_header(s);
> >> + if (ret < 0 || s->freeze_input) {
> >> + goto out;
> >> + }
> >> +
> >> + switch (s->state) {
> >> + case QEMU_VM_TRANSACTION_BEGIN:
> >> + /* CONTINUE or COMMIT should come shortly */
> >> + s->is_payload = 0;
> >> + break;
> >> +
> >> + case QEMU_VM_TRANSACTION_CONTINUE:
> >> + /* get payload */
> >> + s->is_payload = 1;
> >> + break;
> >> +
> >> + case QEMU_VM_TRANSACTION_COMMIT:
> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> >> + if (ret < 0) {
> >> + goto out;
> >> + }
> >> +
> >> + trace_ft_trans_cb(s->get_ready);
> >> + ret = s->get_ready(s->opaque);
> >> + if (ret < 0) {
> >> + goto out;
> >> + }
> >> +
> >> + qemu_clear_buffer(s->file);
> >> + s->get_offset = 0;
> >> + s->is_payload = 0;
> >> +
> >> + break;
> >> +
> >> + case QEMU_VM_TRANSACTION_ATOMIC:
> >> + /* not implemented yet */
> >> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d",
> >> + ret);
> >> + break;
> >> +
> >> + case QEMU_VM_TRANSACTION_CANCEL:
> >> + /* return -EINVAL until migrate cancel on recevier side is
> >> supported */
> >> + ret = -EINVAL;
> >> + break;
> >> +
> >> + default:
> >> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret);
> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> >> + ret = -EINVAL;
> >> + }
> >> +
> >> +out:
> >> + return ret;
> >> +}
> >> +
> >> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf,
> >> + int64_t pos, int size)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> + int ret;
> >> +
> >> + if (s->has_error) {
> >> + error_report("get_buffer when error %d, bailing",
> s->has_error);
> >> + return -EINVAL;
> >> + }
> >> +
> >> + /* assuming qemu_file_get_notify() is calling */
> >> + if (pos == 0 && size == 0) {
> >> + trace_ft_trans_get_ready();
> >> + s->freeze_input = 0;
> >> +
> >> + /* sender should be waiting for ACK */
> >> + if (s->is_sender) {
> >> + ret = ft_trans_recv_header(s);
> >> + if (s->freeze_input) {
> >> + ret = 0;
> >> + goto out;
> >> + }
> >> + if (ret < 0) {
> >> + error_report("recv ack failed");
> >> + goto out;
> >> + }
> >> +
> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> >> + error_report("recv invalid state %d", s->state);
> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> >> + ret = -EINVAL;
> >> + goto out;
> >> + }
> >> +
> >> + trace_ft_trans_cb(s->get_ready);
> >> + ret = s->get_ready(s->opaque);
> >> + if (ret < 0) {
> >> + goto out;
> >> + }
> >> +
> >> + /* proceed trans id */
> >> + s->id++;
> >> +
> >> + return 0;
> >> + }
> >> +
> >> + /* set QEMUFile buf at beginning */
> >> + if (!s->buf) {
> >> + s->buf = buf;
> >> + }
> >> +
> >> + ret = ft_trans_recv(s);
> >> + goto out;
> >> + }
> >> +
> >> + ret = s->get_offset;
> >> +
> >> +out:
> >> + return ret;
> >> +}
> >> +
> >> +static int ft_trans_close(void *opaque)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> + int ret;
> >> +
> >> + trace_ft_trans_close();
> >> + ret = s->close(s->opaque);
> >> + if (s->is_sender) {
> >> + qemu_free(s->buf);
> >> + }
> >> + qemu_free(s);
> >> +
> >> + return ret;
> >> +}
> >> +
> >> +static int ft_trans_rate_limit(void *opaque)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> +
> >> + if (s->has_error) {
> >> + return 0;
> >> + }
> >> +
> >> + if (s->rate_limit && s->freeze_output) {
> >> + return 1;
> >> + }
> >> +
> >> + return 0;
> >> +}
> >> +
> >> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> +
> >> + if (s->has_error) {
> >> + goto out;
> >> + }
> >> +
> >> + s->rate_limit = !!new_rate;
> >> +
> >> +out:
> >> + return s->rate_limit;
> >> +}
> >> +
> >> +int ft_trans_begin(void *opaque)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> + int ret;
> >> + s->seq = 0;
> >> +
> >> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */
> >> + if (!s->is_sender) {
> >> + if (s->state != QEMU_VM_TRANSACTION_INIT) {
> >> + error_report("invalid state %d", s->state);
> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> >> + ret = -EINVAL;
> >> + }
> >> +
> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> >> + goto out;
> >> + }
> >> +
> >> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */
> >> + if (s->state == QEMU_VM_TRANSACTION_INIT) {
> >> +retry:
> >> + ret = ft_trans_recv_header(s);
> >> + if (s->freeze_input) {
> >> + goto retry;
> >> + }
> >> + if (ret < 0) {
> >> + error_report("recv ack failed");
> >> + goto out;
> >> + }
> >> +
> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> >> + error_report("recv invalid state %d", s->state);
> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> >> + ret = -EINVAL;
> >> + goto out;
> >> + }
> >> + }
> >> +
> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0);
> >> + if (ret < 0) {
> >> + goto out;
> >> + }
> >> +
> >> + s->state = QEMU_VM_TRANSACTION_CONTINUE;
> >> +
> >> +out:
> >> + return ret;
> >> +}
> >> +
> >> +int ft_trans_commit(void *opaque)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> + int ret;
> >> +
> >> + if (!s->is_sender) {
> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> >> + goto out;
> >> + }
> >> +
> >> + /* sender should flush buf before sending COMMIT */
> >> + qemu_fflush(s->file);
> >> +
> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0);
> >> + if (ret < 0) {
> >> + goto out;
> >> + }
> >> +
> >> + while (!s->has_error && s->put_offset) {
> >> + ft_trans_flush(s);
> >> + if (s->freeze_output) {
> >> + s->wait_for_unfreeze(s);
> >> + }
> >> + }
> >> +
> >> + if (s->has_error) {
> >> + ret = -EINVAL;
> >> + goto out;
> >> + }
> >> +
> >> + ret = ft_trans_recv_header(s);
> >> + if (s->freeze_input) {
> >> + ret = -EAGAIN;
> >> + goto out;
> >> + }
> >> + if (ret < 0) {
> >> + error_report("recv ack failed");
> >> + goto out;
> >> + }
> >> +
> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> >> + error_report("recv invalid state %d", s->state);
> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> >> + ret = -EINVAL;
> >> + goto out;
> >> + }
> >> +
> >> + s->id++;
> >> + ret = 0;
> >> +
> >> +out:
> >> + return ret;
> >> +}
> >> +
> >> +int ft_trans_cancel(void *opaque)
> >> +{
> >> + QEMUFileFtTrans *s = opaque;
> >> +
> >> + /* invalid until migrate cancel on recevier side is supported */
> >> + if (!s->is_sender) {
> >> + return -EINVAL;
> >> + }
> >> +
> >> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0);
> >> +}
> >> +
> >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
> >> + FtTransPutBufferFunc *put_buffer,
> >> + FtTransGetBufferFunc *get_buffer,
> >> + FtTransPutReadyFunc *put_ready,
> >> + FtTransGetReadyFunc *get_ready,
> >> + FtTransWaitForUnfreezeFunc
> >> *wait_for_unfreeze,
> >> + FtTransCloseFunc *close,
> >> + bool is_sender)
> >> +{
> >> + QEMUFileFtTrans *s;
> >> +
> >> + s = qemu_mallocz(sizeof(*s));
> >> +
> >> + s->opaque = opaque;
> >> + s->put_buffer = put_buffer;
> >> + s->get_buffer = get_buffer;
> >> + s->put_ready = put_ready;
> >> + s->get_ready = get_ready;
> >> + s->wait_for_unfreeze = wait_for_unfreeze;
> >> + s->close = close;
> >> + s->is_sender = is_sender;
> >> + s->id = 0;
> >> + s->seq = 0;
> >> + s->rate_limit = 1;
> >> +
> >> + if (!s->is_sender) {
> >> + s->buf_max_size = IO_BUF_SIZE;
> >> + }
> >> +
> >> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer,
> ft_trans_get_buffer,
> >> + ft_trans_close, ft_trans_rate_limit,
> >> + ft_trans_set_rate_limit, NULL);
> >> +
> >> + return s->file;
> >> +}
> >> diff --git a/ft_trans_file.h b/ft_trans_file.h
> >> new file mode 100644
> >> index 0000000..5ca6b53
> >> --- /dev/null
> >> +++ b/ft_trans_file.h
> >> @@ -0,0 +1,72 @@
> >> +/*
> >> + * Fault tolerant VM transaction QEMUFile
> >> + *
> >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
> >> + *
> >> + * This work is licensed under the terms of the GNU GPL, version 2.
> See
> >> + * the COPYING file in the top-level directory.
> >> + *
> >> + * This source code is based on buffered_file.h.
> >> + * Copyright IBM, Corp. 2008
> >> + * Authors:
> >> + * Anthony Liguori <aliguori@us.ibm.com>
> >> + */
> >> +
> >> +#ifndef QEMU_FT_TRANSACTION_FILE_H
> >> +#define QEMU_FT_TRANSACTION_FILE_H
> >> +
> >> +#include "hw/hw.h"
> >> +
> >> +enum QEMU_VM_TRANSACTION_STATE {
> >> + QEMU_VM_TRANSACTION_NACK = -1,
> >> + QEMU_VM_TRANSACTION_INIT,
> >> + QEMU_VM_TRANSACTION_BEGIN,
> >> + QEMU_VM_TRANSACTION_CONTINUE,
> >> + QEMU_VM_TRANSACTION_COMMIT,
> >> + QEMU_VM_TRANSACTION_CANCEL,
> >> + QEMU_VM_TRANSACTION_ATOMIC,
> >> + QEMU_VM_TRANSACTION_ACK,
> >> +};
> >> +
> >> +enum FT_MODE {
> >> + FT_ERROR = -1,
> >> + FT_OFF,
> >> + FT_INIT,
> >> + FT_TRANSACTION_BEGIN,
> >> + FT_TRANSACTION_ITER,
> >> + FT_TRANSACTION_COMMIT,
> >> + FT_TRANSACTION_ATOMIC,
> >> + FT_TRANSACTION_RECV,
> >> +};
> >> +extern enum FT_MODE ft_mode;
> >> +
> >> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */
> >> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */
> >> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */
> >> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */
> >> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */
> >> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed
> */
> >> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */
> >> +
> >> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data,
> >> size_t size);
> >> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t
> >> pos, size_t size);
> >> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec
> >> *iov, int iovcnt);
> >> +typedef int (FtTransPutReadyFunc)(void);
> >> +typedef int (FtTransGetReadyFunc)(void *opaque);
> >> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque);
> >> +typedef int (FtTransCloseFunc)(void *opaque);
> >> +
> >> +int ft_trans_begin(void *opaque);
> >> +int ft_trans_commit(void *opaque);
> >> +int ft_trans_cancel(void *opaque);
> >> +
> >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
> >> + FtTransPutBufferFunc *put_buffer,
> >> + FtTransGetBufferFunc *get_buffer,
> >> + FtTransPutReadyFunc *put_ready,
> >> + FtTransGetReadyFunc *get_ready,
> >> + FtTransWaitForUnfreezeFunc
> >> *wait_for_unfreeze,
> >> + FtTransCloseFunc *close,
> >> + bool is_sender);
> >> +
> >> +#endif
> >> diff --git a/migration.c b/migration.c
> >> index dd3bf94..c5e0146 100644
> >> --- a/migration.c
> >> +++ b/migration.c
> >> @@ -15,6 +15,7 @@
> >> #include "migration.h"
> >> #include "monitor.h"
> >> #include "buffered_file.h"
> >> +#include "ft_trans_file.h"
> >> #include "sysemu.h"
> >> #include "block.h"
> >> #include "qemu_socket.h"
> >> @@ -31,6 +32,8 @@
> >> do { } while (0)
> >> #endif
> >>
> >> +enum FT_MODE ft_mode = FT_OFF;
> >> +
> >> /* Migration speed throttling */
> >> static int64_t max_throttle = (32 << 20);
> >>
> >> diff --git a/trace-events b/trace-events
> >> index e6138ea..50ac840 100644
> >> --- a/trace-events
> >> +++ b/trace-events
> >> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len)
> "spice
> >> wrottn %lu of requested %zd
> >> disable spice_vmc_read(int bytes, int len) "spice read %lu of requested
> >> %zd"
> >> disable spice_vmc_register_interface(void *scd) "spice vmc registered
> >> interface %p"
> >> disable spice_vmc_unregister_interface(void *scd) "spice vmc
> unregistered
> >> interface %p"
> >> +
> >> +# ft_trans_file.c
> >> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing
> >> buffer from %zu by %zu"
> >> +disable ft_trans_append(size_t size) "buffering %zu bytes"
> >> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu
> >> bytes"
> >> +disable ft_trans_send_header(uint16_t cmd) "send header %d"
> >> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d"
> >> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes
> >> at %"PRId64""
> >> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total)
> >> "recv %d of %d total %d"
> >> +disable ft_trans_close(void) "closing"
> >> +disable ft_trans_freeze_output(void) "backend not ready, freezing
> output"
> >> +disable ft_trans_freeze_input(void) "backend not ready, freezing input"
> >> +disable ft_trans_put_ready(void) "file is ready to put"
> >> +disable ft_trans_get_ready(void) "file is ready to get"
> >> +disable ft_trans_cb(void *cb) "callback %p"
> >> --
> >> 1.7.1.2
> >>
> >> --
> >> To unsubscribe from this list: send the line "unsubscribe kvm" in
> >> the body of a message to majordomo@vger.kernel.org
> >> More majordomo info at http://vger.kernel.org/majordomo-info.html
> >
> >
>
[-- Attachment #2: Type: text/html, Size: 36215 bytes --]
^ permalink raw reply [flat|nested] 35+ messages in thread
* Re: [Qemu-devel] Re: [PATCH 07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode.
2011-02-23 2:28 ` ya su
@ 2011-02-23 5:05 ` Yoshiaki Tamura
0 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-23 5:05 UTC (permalink / raw)
To: ya su
Cc: kwolf, aliguori, dlaor, ananth, kvm, mst, mtosatti, qemu-devel,
vatsa, blauwirbel, ohmura.kei, avi, pbonzini, psuriset, stefanha
2011/2/23 ya su <suya94335@gmail.com>:
> Yoshi:
>
> thanks for your explaining.
> if you introduce a new stage as 3, I think stage 1 also need to change as
> it will mark all pages dirty.
> looking forward to your new patch update.
Unless there're strong comments from others, I won't put it in
this series though because I don't want to touch other components
as much as possible this time.
Yoshi
>
> Green.
>
>
> 2011/2/21 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
>>
>> Hi Green,
>>
>> 2011/2/21 ya su <suya94335@gmail.com>:
>> > Yoshiaki:
>> >
>> > I have one question about ram_save_live, during migration 3
>> > stage(completation stage), it will call
>> > cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty
>> > pages.
>> > at the end of migrate_ft_trans_connect function, it will invoke
>> > vm_start(),
>> > at this time, cpu_physical_memory_set_dirty_tracking(1) is not called
>> > yet,
>> > so there may have some ram pages not recorded when
>> > qemu_savevm_trans_begin
>> > is called. I think you need calll
>> > cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect
>> > function, Am I right?
>>
>> Thank you for taking a look.
>> When qemu_savevm_trans_begin is called for the first time, it
>> calls ram_save_live with stage 1, that sends all pages and sets
>> dirty tracking, so there won't be missing pages. Note that
>> event-tap is turned on by then, meaning no outputs are sent before
>> finishing the first transaction. I understand that this
>> implementation is inefficient, and planning to introduce a new
>> stage that is almost same as stage 3 but keeps dirty tracking in
>> the future.
>>
>> Thanks,
>>
>> Yoshi
>>
>> >
>> > BR
>> >
>> > Green.
>> >
>> >
>> > 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
>> >>
>> >> This code implements VM transaction protocol. Like buffered_file, it
>> >> sits between savevm and migration layer. With this architecture, VM
>> >> transaction protocol is implemented mostly independent from other
>> >> existing code.
>> >>
>> >> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
>> >> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
>> >> ---
>> >> Makefile.objs | 1 +
>> >> ft_trans_file.c | 624
>> >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>> >> ft_trans_file.h | 72 +++++++
>> >> migration.c | 3 +
>> >> trace-events | 15 ++
>> >> 5 files changed, 715 insertions(+), 0 deletions(-)
>> >> create mode 100644 ft_trans_file.c
>> >> create mode 100644 ft_trans_file.h
>> >>
>> >> diff --git a/Makefile.objs b/Makefile.objs
>> >> index 353b1a8..04148b5 100644
>> >> --- a/Makefile.objs
>> >> +++ b/Makefile.objs
>> >> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o
>> >> common-obj-y += qdev.o qdev-properties.o
>> >> common-obj-y += block-migration.o
>> >> common-obj-y += pflib.o
>> >> +common-obj-y += ft_trans_file.o
>> >>
>> >> common-obj-$(CONFIG_BRLAPI) += baum.o
>> >> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o
>> >> migration-fd.o
>> >> diff --git a/ft_trans_file.c b/ft_trans_file.c
>> >> new file mode 100644
>> >> index 0000000..2b42b95
>> >> --- /dev/null
>> >> +++ b/ft_trans_file.c
>> >> @@ -0,0 +1,624 @@
>> >> +/*
>> >> + * Fault tolerant VM transaction QEMUFile
>> >> + *
>> >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
>> >> + *
>> >> + * This work is licensed under the terms of the GNU GPL, version 2.
>> >> See
>> >> + * the COPYING file in the top-level directory.
>> >> + *
>> >> + * This source code is based on buffered_file.c.
>> >> + * Copyright IBM, Corp. 2008
>> >> + * Authors:
>> >> + * Anthony Liguori <aliguori@us.ibm.com>
>> >> + */
>> >> +
>> >> +#include "qemu-common.h"
>> >> +#include "qemu-error.h"
>> >> +#include "hw/hw.h"
>> >> +#include "qemu-timer.h"
>> >> +#include "sysemu.h"
>> >> +#include "qemu-char.h"
>> >> +#include "trace.h"
>> >> +#include "ft_trans_file.h"
>> >> +
>> >> +typedef struct FtTransHdr
>> >> +{
>> >> + uint16_t cmd;
>> >> + uint16_t id;
>> >> + uint32_t seq;
>> >> + uint32_t payload_len;
>> >> +} FtTransHdr;
>> >> +
>> >> +typedef struct QEMUFileFtTrans
>> >> +{
>> >> + FtTransPutBufferFunc *put_buffer;
>> >> + FtTransGetBufferFunc *get_buffer;
>> >> + FtTransPutReadyFunc *put_ready;
>> >> + FtTransGetReadyFunc *get_ready;
>> >> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze;
>> >> + FtTransCloseFunc *close;
>> >> + void *opaque;
>> >> + QEMUFile *file;
>> >> +
>> >> + enum QEMU_VM_TRANSACTION_STATE state;
>> >> + uint32_t seq;
>> >> + uint16_t id;
>> >> +
>> >> + int has_error;
>> >> +
>> >> + bool freeze_output;
>> >> + bool freeze_input;
>> >> + bool rate_limit;
>> >> + bool is_sender;
>> >> + bool is_payload;
>> >> +
>> >> + uint8_t *buf;
>> >> + size_t buf_max_size;
>> >> + size_t put_offset;
>> >> + size_t get_offset;
>> >> +
>> >> + FtTransHdr header;
>> >> + size_t header_offset;
>> >> +} QEMUFileFtTrans;
>> >> +
>> >> +#define IO_BUF_SIZE 32768
>> >> +
>> >> +static void ft_trans_append(QEMUFileFtTrans *s,
>> >> + const uint8_t *buf, size_t size)
>> >> +{
>> >> + if (size > (s->buf_max_size - s->put_offset)) {
>> >> + trace_ft_trans_realloc(s->buf_max_size, size + 1024);
>> >> + s->buf_max_size += size + 1024;
>> >> + s->buf = qemu_realloc(s->buf, s->buf_max_size);
>> >> + }
>> >> +
>> >> + trace_ft_trans_append(size);
>> >> + memcpy(s->buf + s->put_offset, buf, size);
>> >> + s->put_offset += size;
>> >> +}
>> >> +
>> >> +static void ft_trans_flush(QEMUFileFtTrans *s)
>> >> +{
>> >> + size_t offset = 0;
>> >> +
>> >> + if (s->has_error) {
>> >> + error_report("flush when error %d, bailing", s->has_error);
>> >> + return;
>> >> + }
>> >> +
>> >> + while (offset < s->put_offset) {
>> >> + ssize_t ret;
>> >> +
>> >> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset
>> >> -
>> >> offset);
>> >> + if (ret == -EAGAIN) {
>> >> + break;
>> >> + }
>> >> +
>> >> + if (ret <= 0) {
>> >> + error_report("error flushing data, %s", strerror(errno));
>> >> + s->has_error = FT_TRANS_ERR_FLUSH;
>> >> + break;
>> >> + } else {
>> >> + offset += ret;
>> >> + }
>> >> + }
>> >> +
>> >> + trace_ft_trans_flush(offset, s->put_offset);
>> >> + memmove(s->buf, s->buf + offset, s->put_offset - offset);
>> >> + s->put_offset -= offset;
>> >> + s->freeze_output = !!s->put_offset;
>> >> +}
>> >> +
>> >> +static ssize_t ft_trans_put(void *opaque, void *buf, int size)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> + size_t offset = 0;
>> >> + ssize_t len;
>> >> +
>> >> + /* flush buffered data before putting next */
>> >> + if (s->put_offset) {
>> >> + ft_trans_flush(s);
>> >> + }
>> >> +
>> >> + while (!s->freeze_output && offset < size) {
>> >> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size -
>> >> offset);
>> >> +
>> >> + if (len == -EAGAIN) {
>> >> + trace_ft_trans_freeze_output();
>> >> + s->freeze_output = 1;
>> >> + break;
>> >> + }
>> >> +
>> >> + if (len <= 0) {
>> >> + error_report("putting data failed, %s", strerror(errno));
>> >> + s->has_error = 1;
>> >> + offset = -EINVAL;
>> >> + break;
>> >> + }
>> >> +
>> >> + offset += len;
>> >> + }
>> >> +
>> >> + if (s->freeze_output) {
>> >> + ft_trans_append(s, buf + offset, size - offset);
>> >> + offset = size;
>> >> + }
>> >> +
>> >> + return offset;
>> >> +}
>> >> +
>> >> +static int ft_trans_send_header(QEMUFileFtTrans *s,
>> >> + enum QEMU_VM_TRANSACTION_STATE state,
>> >> + uint32_t payload_len)
>> >> +{
>> >> + int ret;
>> >> + FtTransHdr *hdr = &s->header;
>> >> +
>> >> + trace_ft_trans_send_header(state);
>> >> +
>> >> + hdr->cmd = s->state = state;
>> >> + hdr->id = s->id;
>> >> + hdr->seq = s->seq;
>> >> + hdr->payload_len = payload_len;
>> >> +
>> >> + ret = ft_trans_put(s, hdr, sizeof(*hdr));
>> >> + if (ret < 0) {
>> >> + error_report("send header failed");
>> >> + s->has_error = FT_TRANS_ERR_SEND_HDR;
>> >> + }
>> >> +
>> >> + return ret;
>> >> +}
>> >> +
>> >> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf,
>> >> int64_t
>> >> pos, int size)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> + ssize_t ret;
>> >> +
>> >> + trace_ft_trans_put_buffer(size, pos);
>> >> +
>> >> + if (s->has_error) {
>> >> + error_report("put_buffer when error %d, bailing",
>> >> s->has_error);
>> >> + return -EINVAL;
>> >> + }
>> >> +
>> >> + /* assuming qemu_file_put_notify() is calling */
>> >> + if (pos == 0 && size == 0) {
>> >> + trace_ft_trans_put_ready();
>> >> + ft_trans_flush(s);
>> >> +
>> >> + if (!s->freeze_output) {
>> >> + trace_ft_trans_cb(s->put_ready);
>> >> + ret = s->put_ready();
>> >> + }
>> >> +
>> >> + ret = 0;
>> >> + goto out;
>> >> + }
>> >> +
>> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size);
>> >> + if (ret < 0) {
>> >> + goto out;
>> >> + }
>> >> +
>> >> + ret = ft_trans_put(s, (uint8_t *)buf, size);
>> >> + if (ret < 0) {
>> >> + error_report("send palyload failed");
>> >> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD;
>> >> + goto out;
>> >> + }
>> >> +
>> >> + s->seq++;
>> >> +
>> >> +out:
>> >> + return ret;
>> >> +}
>> >> +
>> >> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> + size_t offset = 0;
>> >> + ssize_t len;
>> >> +
>> >> + s->freeze_input = 0;
>> >> +
>> >> + while (offset < size) {
>> >> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
>> >> + 0, size - offset);
>> >> + if (len == -EAGAIN) {
>> >> + trace_ft_trans_freeze_input();
>> >> + s->freeze_input = 1;
>> >> + break;
>> >> + }
>> >> +
>> >> + if (len <= 0) {
>> >> + error_report("fill buffer failed, %s", strerror(errno));
>> >> + s->has_error = 1;
>> >> + return -EINVAL;
>> >> + }
>> >> +
>> >> + offset += len;
>> >> + }
>> >> +
>> >> + return offset;
>> >> +}
>> >> +
>> >> +static int ft_trans_recv_header(QEMUFileFtTrans *s)
>> >> +{
>> >> + int ret;
>> >> + char *buf = (char *)&s->header + s->header_offset;
>> >> +
>> >> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) -
>> >> s->header_offset);
>> >> + if (ret < 0) {
>> >> + error_report("recv header failed");
>> >> + s->has_error = FT_TRANS_ERR_RECV_HDR;
>> >> + goto out;
>> >> + }
>> >> +
>> >> + s->header_offset += ret;
>> >> + if (s->header_offset == sizeof(FtTransHdr)) {
>> >> + trace_ft_trans_recv_header(s->header.cmd);
>> >> + s->state = s->header.cmd;
>> >> + s->header_offset = 0;
>> >> +
>> >> + if (!s->is_sender) {
>> >> + s->id = s->header.id;
>> >> + s->seq = s->header.seq;
>> >> + }
>> >> + }
>> >> +
>> >> +out:
>> >> + return ret;
>> >> +}
>> >> +
>> >> +static int ft_trans_recv_payload(QEMUFileFtTrans *s)
>> >> +{
>> >> + QEMUFile *f = s->file;
>> >> + int ret = -1;
>> >> +
>> >> + /* extend QEMUFile buf if there weren't enough space */
>> >> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) {
>> >> + s->buf_max_size += (s->header.payload_len -
>> >> + (s->buf_max_size - s->get_offset));
>> >> + s->buf = qemu_realloc_buffer(f, s->buf_max_size);
>> >> + }
>> >> +
>> >> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset,
>> >> + s->header.payload_len);
>> >> + if (ret < 0) {
>> >> + error_report("recv payload failed");
>> >> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD;
>> >> + goto out;
>> >> + }
>> >> +
>> >> + trace_ft_trans_recv_payload(ret, s->header.payload_len,
>> >> s->get_offset);
>> >> +
>> >> + s->header.payload_len -= ret;
>> >> + s->get_offset += ret;
>> >> + s->is_payload = !!s->header.payload_len;
>> >> +
>> >> +out:
>> >> + return ret;
>> >> +}
>> >> +
>> >> +static int ft_trans_recv(QEMUFileFtTrans *s)
>> >> +{
>> >> + int ret;
>> >> +
>> >> + /* get payload and return */
>> >> + if (s->is_payload) {
>> >> + ret = ft_trans_recv_payload(s);
>> >> + goto out;
>> >> + }
>> >> +
>> >> + ret = ft_trans_recv_header(s);
>> >> + if (ret < 0 || s->freeze_input) {
>> >> + goto out;
>> >> + }
>> >> +
>> >> + switch (s->state) {
>> >> + case QEMU_VM_TRANSACTION_BEGIN:
>> >> + /* CONTINUE or COMMIT should come shortly */
>> >> + s->is_payload = 0;
>> >> + break;
>> >> +
>> >> + case QEMU_VM_TRANSACTION_CONTINUE:
>> >> + /* get payload */
>> >> + s->is_payload = 1;
>> >> + break;
>> >> +
>> >> + case QEMU_VM_TRANSACTION_COMMIT:
>> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
>> >> + if (ret < 0) {
>> >> + goto out;
>> >> + }
>> >> +
>> >> + trace_ft_trans_cb(s->get_ready);
>> >> + ret = s->get_ready(s->opaque);
>> >> + if (ret < 0) {
>> >> + goto out;
>> >> + }
>> >> +
>> >> + qemu_clear_buffer(s->file);
>> >> + s->get_offset = 0;
>> >> + s->is_payload = 0;
>> >> +
>> >> + break;
>> >> +
>> >> + case QEMU_VM_TRANSACTION_ATOMIC:
>> >> + /* not implemented yet */
>> >> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d",
>> >> + ret);
>> >> + break;
>> >> +
>> >> + case QEMU_VM_TRANSACTION_CANCEL:
>> >> + /* return -EINVAL until migrate cancel on recevier side is
>> >> supported */
>> >> + ret = -EINVAL;
>> >> + break;
>> >> +
>> >> + default:
>> >> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret);
>> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> >> + ret = -EINVAL;
>> >> + }
>> >> +
>> >> +out:
>> >> + return ret;
>> >> +}
>> >> +
>> >> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf,
>> >> + int64_t pos, int size)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> + int ret;
>> >> +
>> >> + if (s->has_error) {
>> >> + error_report("get_buffer when error %d, bailing",
>> >> s->has_error);
>> >> + return -EINVAL;
>> >> + }
>> >> +
>> >> + /* assuming qemu_file_get_notify() is calling */
>> >> + if (pos == 0 && size == 0) {
>> >> + trace_ft_trans_get_ready();
>> >> + s->freeze_input = 0;
>> >> +
>> >> + /* sender should be waiting for ACK */
>> >> + if (s->is_sender) {
>> >> + ret = ft_trans_recv_header(s);
>> >> + if (s->freeze_input) {
>> >> + ret = 0;
>> >> + goto out;
>> >> + }
>> >> + if (ret < 0) {
>> >> + error_report("recv ack failed");
>> >> + goto out;
>> >> + }
>> >> +
>> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
>> >> + error_report("recv invalid state %d", s->state);
>> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> >> + ret = -EINVAL;
>> >> + goto out;
>> >> + }
>> >> +
>> >> + trace_ft_trans_cb(s->get_ready);
>> >> + ret = s->get_ready(s->opaque);
>> >> + if (ret < 0) {
>> >> + goto out;
>> >> + }
>> >> +
>> >> + /* proceed trans id */
>> >> + s->id++;
>> >> +
>> >> + return 0;
>> >> + }
>> >> +
>> >> + /* set QEMUFile buf at beginning */
>> >> + if (!s->buf) {
>> >> + s->buf = buf;
>> >> + }
>> >> +
>> >> + ret = ft_trans_recv(s);
>> >> + goto out;
>> >> + }
>> >> +
>> >> + ret = s->get_offset;
>> >> +
>> >> +out:
>> >> + return ret;
>> >> +}
>> >> +
>> >> +static int ft_trans_close(void *opaque)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> + int ret;
>> >> +
>> >> + trace_ft_trans_close();
>> >> + ret = s->close(s->opaque);
>> >> + if (s->is_sender) {
>> >> + qemu_free(s->buf);
>> >> + }
>> >> + qemu_free(s);
>> >> +
>> >> + return ret;
>> >> +}
>> >> +
>> >> +static int ft_trans_rate_limit(void *opaque)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> +
>> >> + if (s->has_error) {
>> >> + return 0;
>> >> + }
>> >> +
>> >> + if (s->rate_limit && s->freeze_output) {
>> >> + return 1;
>> >> + }
>> >> +
>> >> + return 0;
>> >> +}
>> >> +
>> >> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> +
>> >> + if (s->has_error) {
>> >> + goto out;
>> >> + }
>> >> +
>> >> + s->rate_limit = !!new_rate;
>> >> +
>> >> +out:
>> >> + return s->rate_limit;
>> >> +}
>> >> +
>> >> +int ft_trans_begin(void *opaque)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> + int ret;
>> >> + s->seq = 0;
>> >> +
>> >> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */
>> >> + if (!s->is_sender) {
>> >> + if (s->state != QEMU_VM_TRANSACTION_INIT) {
>> >> + error_report("invalid state %d", s->state);
>> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> >> + ret = -EINVAL;
>> >> + }
>> >> +
>> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
>> >> + goto out;
>> >> + }
>> >> +
>> >> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction
>> >> */
>> >> + if (s->state == QEMU_VM_TRANSACTION_INIT) {
>> >> +retry:
>> >> + ret = ft_trans_recv_header(s);
>> >> + if (s->freeze_input) {
>> >> + goto retry;
>> >> + }
>> >> + if (ret < 0) {
>> >> + error_report("recv ack failed");
>> >> + goto out;
>> >> + }
>> >> +
>> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
>> >> + error_report("recv invalid state %d", s->state);
>> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> >> + ret = -EINVAL;
>> >> + goto out;
>> >> + }
>> >> + }
>> >> +
>> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0);
>> >> + if (ret < 0) {
>> >> + goto out;
>> >> + }
>> >> +
>> >> + s->state = QEMU_VM_TRANSACTION_CONTINUE;
>> >> +
>> >> +out:
>> >> + return ret;
>> >> +}
>> >> +
>> >> +int ft_trans_commit(void *opaque)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> + int ret;
>> >> +
>> >> + if (!s->is_sender) {
>> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
>> >> + goto out;
>> >> + }
>> >> +
>> >> + /* sender should flush buf before sending COMMIT */
>> >> + qemu_fflush(s->file);
>> >> +
>> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0);
>> >> + if (ret < 0) {
>> >> + goto out;
>> >> + }
>> >> +
>> >> + while (!s->has_error && s->put_offset) {
>> >> + ft_trans_flush(s);
>> >> + if (s->freeze_output) {
>> >> + s->wait_for_unfreeze(s);
>> >> + }
>> >> + }
>> >> +
>> >> + if (s->has_error) {
>> >> + ret = -EINVAL;
>> >> + goto out;
>> >> + }
>> >> +
>> >> + ret = ft_trans_recv_header(s);
>> >> + if (s->freeze_input) {
>> >> + ret = -EAGAIN;
>> >> + goto out;
>> >> + }
>> >> + if (ret < 0) {
>> >> + error_report("recv ack failed");
>> >> + goto out;
>> >> + }
>> >> +
>> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
>> >> + error_report("recv invalid state %d", s->state);
>> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
>> >> + ret = -EINVAL;
>> >> + goto out;
>> >> + }
>> >> +
>> >> + s->id++;
>> >> + ret = 0;
>> >> +
>> >> +out:
>> >> + return ret;
>> >> +}
>> >> +
>> >> +int ft_trans_cancel(void *opaque)
>> >> +{
>> >> + QEMUFileFtTrans *s = opaque;
>> >> +
>> >> + /* invalid until migrate cancel on recevier side is supported */
>> >> + if (!s->is_sender) {
>> >> + return -EINVAL;
>> >> + }
>> >> +
>> >> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0);
>> >> +}
>> >> +
>> >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
>> >> + FtTransPutBufferFunc *put_buffer,
>> >> + FtTransGetBufferFunc *get_buffer,
>> >> + FtTransPutReadyFunc *put_ready,
>> >> + FtTransGetReadyFunc *get_ready,
>> >> + FtTransWaitForUnfreezeFunc
>> >> *wait_for_unfreeze,
>> >> + FtTransCloseFunc *close,
>> >> + bool is_sender)
>> >> +{
>> >> + QEMUFileFtTrans *s;
>> >> +
>> >> + s = qemu_mallocz(sizeof(*s));
>> >> +
>> >> + s->opaque = opaque;
>> >> + s->put_buffer = put_buffer;
>> >> + s->get_buffer = get_buffer;
>> >> + s->put_ready = put_ready;
>> >> + s->get_ready = get_ready;
>> >> + s->wait_for_unfreeze = wait_for_unfreeze;
>> >> + s->close = close;
>> >> + s->is_sender = is_sender;
>> >> + s->id = 0;
>> >> + s->seq = 0;
>> >> + s->rate_limit = 1;
>> >> +
>> >> + if (!s->is_sender) {
>> >> + s->buf_max_size = IO_BUF_SIZE;
>> >> + }
>> >> +
>> >> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer,
>> >> ft_trans_get_buffer,
>> >> + ft_trans_close, ft_trans_rate_limit,
>> >> + ft_trans_set_rate_limit, NULL);
>> >> +
>> >> + return s->file;
>> >> +}
>> >> diff --git a/ft_trans_file.h b/ft_trans_file.h
>> >> new file mode 100644
>> >> index 0000000..5ca6b53
>> >> --- /dev/null
>> >> +++ b/ft_trans_file.h
>> >> @@ -0,0 +1,72 @@
>> >> +/*
>> >> + * Fault tolerant VM transaction QEMUFile
>> >> + *
>> >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
>> >> + *
>> >> + * This work is licensed under the terms of the GNU GPL, version 2.
>> >> See
>> >> + * the COPYING file in the top-level directory.
>> >> + *
>> >> + * This source code is based on buffered_file.h.
>> >> + * Copyright IBM, Corp. 2008
>> >> + * Authors:
>> >> + * Anthony Liguori <aliguori@us.ibm.com>
>> >> + */
>> >> +
>> >> +#ifndef QEMU_FT_TRANSACTION_FILE_H
>> >> +#define QEMU_FT_TRANSACTION_FILE_H
>> >> +
>> >> +#include "hw/hw.h"
>> >> +
>> >> +enum QEMU_VM_TRANSACTION_STATE {
>> >> + QEMU_VM_TRANSACTION_NACK = -1,
>> >> + QEMU_VM_TRANSACTION_INIT,
>> >> + QEMU_VM_TRANSACTION_BEGIN,
>> >> + QEMU_VM_TRANSACTION_CONTINUE,
>> >> + QEMU_VM_TRANSACTION_COMMIT,
>> >> + QEMU_VM_TRANSACTION_CANCEL,
>> >> + QEMU_VM_TRANSACTION_ATOMIC,
>> >> + QEMU_VM_TRANSACTION_ACK,
>> >> +};
>> >> +
>> >> +enum FT_MODE {
>> >> + FT_ERROR = -1,
>> >> + FT_OFF,
>> >> + FT_INIT,
>> >> + FT_TRANSACTION_BEGIN,
>> >> + FT_TRANSACTION_ITER,
>> >> + FT_TRANSACTION_COMMIT,
>> >> + FT_TRANSACTION_ATOMIC,
>> >> + FT_TRANSACTION_RECV,
>> >> +};
>> >> +extern enum FT_MODE ft_mode;
>> >> +
>> >> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */
>> >> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */
>> >> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */
>> >> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */
>> >> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */
>> >> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed
>> >> */
>> >> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */
>> >> +
>> >> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data,
>> >> size_t size);
>> >> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t
>> >> pos, size_t size);
>> >> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct
>> >> iovec
>> >> *iov, int iovcnt);
>> >> +typedef int (FtTransPutReadyFunc)(void);
>> >> +typedef int (FtTransGetReadyFunc)(void *opaque);
>> >> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque);
>> >> +typedef int (FtTransCloseFunc)(void *opaque);
>> >> +
>> >> +int ft_trans_begin(void *opaque);
>> >> +int ft_trans_commit(void *opaque);
>> >> +int ft_trans_cancel(void *opaque);
>> >> +
>> >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
>> >> + FtTransPutBufferFunc *put_buffer,
>> >> + FtTransGetBufferFunc *get_buffer,
>> >> + FtTransPutReadyFunc *put_ready,
>> >> + FtTransGetReadyFunc *get_ready,
>> >> + FtTransWaitForUnfreezeFunc
>> >> *wait_for_unfreeze,
>> >> + FtTransCloseFunc *close,
>> >> + bool is_sender);
>> >> +
>> >> +#endif
>> >> diff --git a/migration.c b/migration.c
>> >> index dd3bf94..c5e0146 100644
>> >> --- a/migration.c
>> >> +++ b/migration.c
>> >> @@ -15,6 +15,7 @@
>> >> #include "migration.h"
>> >> #include "monitor.h"
>> >> #include "buffered_file.h"
>> >> +#include "ft_trans_file.h"
>> >> #include "sysemu.h"
>> >> #include "block.h"
>> >> #include "qemu_socket.h"
>> >> @@ -31,6 +32,8 @@
>> >> do { } while (0)
>> >> #endif
>> >>
>> >> +enum FT_MODE ft_mode = FT_OFF;
>> >> +
>> >> /* Migration speed throttling */
>> >> static int64_t max_throttle = (32 << 20);
>> >>
>> >> diff --git a/trace-events b/trace-events
>> >> index e6138ea..50ac840 100644
>> >> --- a/trace-events
>> >> +++ b/trace-events
>> >> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len)
>> >> "spice
>> >> wrottn %lu of requested %zd
>> >> disable spice_vmc_read(int bytes, int len) "spice read %lu of
>> >> requested
>> >> %zd"
>> >> disable spice_vmc_register_interface(void *scd) "spice vmc registered
>> >> interface %p"
>> >> disable spice_vmc_unregister_interface(void *scd) "spice vmc
>> >> unregistered
>> >> interface %p"
>> >> +
>> >> +# ft_trans_file.c
>> >> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing
>> >> buffer from %zu by %zu"
>> >> +disable ft_trans_append(size_t size) "buffering %zu bytes"
>> >> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu
>> >> bytes"
>> >> +disable ft_trans_send_header(uint16_t cmd) "send header %d"
>> >> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d"
>> >> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d
>> >> bytes
>> >> at %"PRId64""
>> >> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total)
>> >> "recv %d of %d total %d"
>> >> +disable ft_trans_close(void) "closing"
>> >> +disable ft_trans_freeze_output(void) "backend not ready, freezing
>> >> output"
>> >> +disable ft_trans_freeze_input(void) "backend not ready, freezing
>> >> input"
>> >> +disable ft_trans_put_ready(void) "file is ready to put"
>> >> +disable ft_trans_get_ready(void) "file is ready to get"
>> >> +disable ft_trans_cb(void *cb) "callback %p"
>> >> --
>> >> 1.7.1.2
>> >>
>> >> --
>> >> To unsubscribe from this list: send the line "unsubscribe kvm" in
>> >> the body of a message to majordomo@vger.kernel.org
>> >> More majordomo info at http://vger.kernel.org/majordomo-info.html
>> >
>> >
>
>
^ permalink raw reply [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 08/18] savevm: introduce util functions to control ft_trans_file from savevm layer.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (6 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 09/18] Introduce event-tap Yoshiaki Tamura
` (9 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
To utilize ft_trans_file function, savevm needs interfaces to be
exported.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
hw/hw.h | 5 ++
savevm.c | 149 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 154 insertions(+), 0 deletions(-)
diff --git a/hw/hw.h b/hw/hw.h
index a168a37..a9eff5a 100644
--- a/hw/hw.h
+++ b/hw/hw.h
@@ -51,6 +51,7 @@ QEMUFile *qemu_fopen_ops(void *opaque, QEMUFilePutBufferFunc *put_buffer,
QEMUFile *qemu_fopen(const char *filename, const char *mode);
QEMUFile *qemu_fdopen(int fd, const char *mode);
QEMUFile *qemu_fopen_socket(int fd);
+QEMUFile *qemu_fopen_ft_trans(int s_fd, int c_fd);
QEMUFile *qemu_popen(FILE *popen_file, const char *mode);
QEMUFile *qemu_popen_cmd(const char *command, const char *mode);
int qemu_stdio_fd(QEMUFile *f);
@@ -60,6 +61,9 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
void qemu_put_byte(QEMUFile *f, int v);
void *qemu_realloc_buffer(QEMUFile *f, int size);
void qemu_clear_buffer(QEMUFile *f);
+int qemu_ft_trans_begin(QEMUFile *f);
+int qemu_ft_trans_commit(QEMUFile *f);
+int qemu_ft_trans_cancel(QEMUFile *f);
static inline void qemu_put_ubyte(QEMUFile *f, unsigned int v)
{
@@ -94,6 +98,7 @@ void qemu_file_set_error(QEMUFile *f);
* halted due to rate limiting or EAGAIN errors occur as it can be used to
* resume output. */
void qemu_file_put_notify(QEMUFile *f);
+void qemu_file_get_notify(void *opaque);
static inline void qemu_put_be64s(QEMUFile *f, const uint64_t *pv)
{
diff --git a/savevm.c b/savevm.c
index 58e48e3..e44eccd 100644
--- a/savevm.c
+++ b/savevm.c
@@ -82,6 +82,7 @@
#include "migration.h"
#include "qemu_socket.h"
#include "qemu-queue.h"
+#include "ft_trans_file.h"
#define SELF_ANNOUNCE_ROUNDS 5
@@ -189,6 +190,13 @@ typedef struct QEMUFileSocket
QEMUFile *file;
} QEMUFileSocket;
+typedef struct QEMUFileSocketTrans
+{
+ int fd;
+ QEMUFileSocket *s;
+ VMChangeStateEntry *e;
+} QEMUFileSocketTrans;
+
static int socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
{
QEMUFileSocket *s = opaque;
@@ -204,6 +212,22 @@ static int socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
return len;
}
+static ssize_t socket_put_buffer(void *opaque, const void *buf, size_t size)
+{
+ QEMUFileSocket *s = opaque;
+ ssize_t len;
+
+ do {
+ len = send(s->fd, (void *)buf, size, 0);
+ } while (len == -1 && socket_error() == EINTR);
+
+ if (len == -1) {
+ len = -socket_error();
+ }
+
+ return len;
+}
+
static int socket_close(void *opaque)
{
QEMUFileSocket *s = opaque;
@@ -211,6 +235,70 @@ static int socket_close(void *opaque)
return 0;
}
+static int socket_trans_get_buffer(void *opaque, uint8_t *buf, int64_t pos, size_t size)
+{
+ QEMUFileSocketTrans *t = opaque;
+ QEMUFileSocket *s = t->s;
+ ssize_t len;
+
+ len = socket_get_buffer(s, buf, pos, size);
+
+ return len;
+}
+
+static ssize_t socket_trans_put_buffer(void *opaque, const void *buf, size_t size)
+{
+ QEMUFileSocketTrans *t = opaque;
+
+ return socket_put_buffer(t->s, buf, size);
+}
+
+
+static int socket_trans_get_ready(void *opaque)
+{
+ QEMUFileSocketTrans *t = opaque;
+ QEMUFileSocket *s = t->s;
+ QEMUFile *f = s->file;
+ int ret = 0;
+
+ ret = qemu_loadvm_state(f, 1);
+ if (ret < 0) {
+ fprintf(stderr,
+ "socket_trans_get_ready: error while loading vmstate\n");
+ }
+
+ return ret;
+}
+
+static int socket_trans_close(void *opaque)
+{
+ QEMUFileSocketTrans *t = opaque;
+ QEMUFileSocket *s = t->s;
+
+ qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
+ qemu_set_fd_handler2(t->fd, NULL, NULL, NULL, NULL);
+ qemu_del_vm_change_state_handler(t->e);
+ close(s->fd);
+ close(t->fd);
+ qemu_free(s);
+ qemu_free(t);
+
+ return 0;
+}
+
+static void socket_trans_resume(void *opaque, int running, int reason)
+{
+ QEMUFileSocketTrans *t = opaque;
+ QEMUFileSocket *s = t->s;
+
+ if (!running) {
+ return;
+ }
+
+ qemu_announce_self();
+ qemu_fclose(s->file);
+}
+
static int stdio_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
{
QEMUFileStdio *s = opaque;
@@ -333,6 +421,26 @@ QEMUFile *qemu_fopen_socket(int fd)
return s->file;
}
+QEMUFile *qemu_fopen_ft_trans(int s_fd, int c_fd)
+{
+ QEMUFileSocketTrans *t = qemu_mallocz(sizeof(QEMUFileSocketTrans));
+ QEMUFileSocket *s = qemu_mallocz(sizeof(QEMUFileSocket));
+
+ t->s = s;
+ t->fd = s_fd;
+ t->e = qemu_add_vm_change_state_handler(socket_trans_resume, t);
+
+ s->fd = c_fd;
+ s->file = qemu_fopen_ops_ft_trans(t, socket_trans_put_buffer,
+ socket_trans_get_buffer, NULL,
+ socket_trans_get_ready,
+ migrate_fd_wait_for_unfreeze,
+ socket_trans_close, 0);
+ socket_set_nonblock(s->fd);
+
+ return s->file;
+}
+
static int file_put_buffer(void *opaque, const uint8_t *buf,
int64_t pos, int size)
{
@@ -469,6 +577,39 @@ void qemu_clear_buffer(QEMUFile *f)
f->buf_size = f->buf_index = f->buf_offset = 0;
}
+int qemu_ft_trans_begin(QEMUFile *f)
+{
+ int ret;
+ ret = ft_trans_begin(f->opaque);
+ if (ret < 0) {
+ f->has_error = 1;
+ }
+ return ret;
+}
+
+int qemu_ft_trans_commit(QEMUFile *f)
+{
+ int ret;
+ ret = ft_trans_commit(f->opaque);
+ if (ret == -EAGAIN) {
+ return 1;
+ }
+ if (ret < 0) {
+ f->has_error = 1;
+ }
+ return ret;
+}
+
+int qemu_ft_trans_cancel(QEMUFile *f)
+{
+ int ret;
+ ret = ft_trans_cancel(f->opaque);
+ if (ret < 0) {
+ f->has_error = 1;
+ }
+ return ret;
+}
+
static void qemu_fill_buffer(QEMUFile *f)
{
int len;
@@ -504,6 +645,14 @@ void qemu_file_put_notify(QEMUFile *f)
f->put_buffer(f->opaque, NULL, 0, 0);
}
+void qemu_file_get_notify(void *opaque)
+{
+ QEMUFile *f = opaque;
+ if (f->get_buffer(f->opaque, f->buf, 0, 0) < 0) {
+ f->has_error = 1;
+ }
+}
+
void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
{
int l;
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 09/18] Introduce event-tap.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (7 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 08/18] savevm: introduce util functions to control ft_trans_file from savevm layer Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 10/18] Call init handler of event-tap at main() in vl.c Yoshiaki Tamura
` (8 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
event-tap controls when to start FT transaction, and provides proxy
functions to called from net/block devices. While FT transaction, it
queues up net/block requests, and flush them when the transaction gets
completed.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
---
Makefile.target | 1 +
event-tap.c | 939 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
event-tap.h | 44 +++
qemu-tool.c | 28 ++
trace-events | 10 +
5 files changed, 1022 insertions(+), 0 deletions(-)
create mode 100644 event-tap.c
create mode 100644 event-tap.h
diff --git a/Makefile.target b/Makefile.target
index b0ba95f..edbdbee 100644
--- a/Makefile.target
+++ b/Makefile.target
@@ -199,6 +199,7 @@ obj-y += rwhandler.o
obj-$(CONFIG_KVM) += kvm.o kvm-all.o
obj-$(CONFIG_NO_KVM) += kvm-stub.o
LIBS+=-lz
+obj-y += event-tap.o
QEMU_CFLAGS += $(VNC_TLS_CFLAGS)
QEMU_CFLAGS += $(VNC_SASL_CFLAGS)
diff --git a/event-tap.c b/event-tap.c
new file mode 100644
index 0000000..f44d835
--- /dev/null
+++ b/event-tap.c
@@ -0,0 +1,939 @@
+/*
+ * Event Tap functions for QEMU
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-common.h"
+#include "qemu-error.h"
+#include "block.h"
+#include "block_int.h"
+#include "ioport.h"
+#include "osdep.h"
+#include "sysemu.h"
+#include "hw/hw.h"
+#include "net.h"
+#include "event-tap.h"
+#include "trace.h"
+
+enum EVENT_TAP_STATE {
+ EVENT_TAP_OFF,
+ EVENT_TAP_ON,
+ EVENT_TAP_SUSPEND,
+ EVENT_TAP_FLUSH,
+ EVENT_TAP_LOAD,
+ EVENT_TAP_REPLAY,
+};
+
+static enum EVENT_TAP_STATE event_tap_state = EVENT_TAP_OFF;
+
+typedef struct EventTapIOport {
+ uint32_t address;
+ uint32_t data;
+ int index;
+} EventTapIOport;
+
+#define MMIO_BUF_SIZE 8
+
+typedef struct EventTapMMIO {
+ uint64_t address;
+ uint8_t buf[MMIO_BUF_SIZE];
+ int len;
+} EventTapMMIO;
+
+typedef struct EventTapNetReq {
+ char *device_name;
+ int iovcnt;
+ int vlan_id;
+ bool vlan_needed;
+ bool async;
+ struct iovec *iov;
+ NetPacketSent *sent_cb;
+} EventTapNetReq;
+
+#define MAX_BLOCK_REQUEST 32
+
+typedef struct EventTapAIOCB EventTapAIOCB;
+
+typedef struct EventTapBlkReq {
+ char *device_name;
+ int num_reqs;
+ int num_cbs;
+ bool is_flush;
+ BlockRequest reqs[MAX_BLOCK_REQUEST];
+ EventTapAIOCB *acb[MAX_BLOCK_REQUEST];
+} EventTapBlkReq;
+
+#define EVENT_TAP_IOPORT (1 << 0)
+#define EVENT_TAP_MMIO (1 << 1)
+#define EVENT_TAP_NET (1 << 2)
+#define EVENT_TAP_BLK (1 << 3)
+
+#define EVENT_TAP_TYPE_MASK (EVENT_TAP_NET - 1)
+
+typedef struct EventTapLog {
+ int mode;
+ union {
+ EventTapIOport ioport;
+ EventTapMMIO mmio;
+ };
+ union {
+ EventTapNetReq net_req;
+ EventTapBlkReq blk_req;
+ };
+ QTAILQ_ENTRY(EventTapLog) node;
+} EventTapLog;
+
+struct EventTapAIOCB {
+ BlockDriverAIOCB common;
+ BlockDriverAIOCB *acb;
+ bool is_canceled;
+};
+
+static EventTapLog *last_event_tap;
+
+static QTAILQ_HEAD(, EventTapLog) event_list;
+static QTAILQ_HEAD(, EventTapLog) event_pool;
+
+static int (*event_tap_cb)(void);
+static QEMUBH *event_tap_bh;
+static VMChangeStateEntry *vmstate;
+
+static void event_tap_bh_cb(void *p)
+{
+ if (event_tap_cb) {
+ event_tap_cb();
+ }
+
+ qemu_bh_delete(event_tap_bh);
+ event_tap_bh = NULL;
+}
+
+static void event_tap_schedule_bh(void)
+{
+ trace_event_tap_ignore_bh(!!event_tap_bh);
+
+ /* if bh is already set, we ignore it for now */
+ if (event_tap_bh) {
+ return;
+ }
+
+ event_tap_bh = qemu_bh_new(event_tap_bh_cb, NULL);
+ qemu_bh_schedule(event_tap_bh);
+
+ return;
+}
+
+static void *event_tap_alloc_log(void)
+{
+ EventTapLog *log;
+
+ if (QTAILQ_EMPTY(&event_pool)) {
+ log = qemu_mallocz(sizeof(EventTapLog));
+ } else {
+ log = QTAILQ_FIRST(&event_pool);
+ QTAILQ_REMOVE(&event_pool, log, node);
+ }
+
+ return log;
+}
+
+static void event_tap_free_net_req(EventTapNetReq *net_req);
+static void event_tap_free_blk_req(EventTapBlkReq *blk_req);
+
+static void event_tap_free_log(EventTapLog *log)
+{
+ int mode = log->mode & ~EVENT_TAP_TYPE_MASK;
+
+ if (mode == EVENT_TAP_NET) {
+ event_tap_free_net_req(&log->net_req);
+ } else if (mode == EVENT_TAP_BLK) {
+ event_tap_free_blk_req(&log->blk_req);
+ }
+
+ log->mode = 0;
+
+ /* return the log to event_pool */
+ QTAILQ_INSERT_HEAD(&event_pool, log, node);
+}
+
+static void event_tap_free_pool(void)
+{
+ EventTapLog *log, *next;
+
+ QTAILQ_FOREACH_SAFE(log, &event_pool, node, next) {
+ QTAILQ_REMOVE(&event_pool, log, node);
+ qemu_free(log);
+ }
+}
+
+static void event_tap_free_net_req(EventTapNetReq *net_req)
+{
+ int i;
+
+ if (!net_req->async) {
+ for (i = 0; i < net_req->iovcnt; i++) {
+ qemu_free(net_req->iov[i].iov_base);
+ }
+ qemu_free(net_req->iov);
+ } else if (event_tap_state >= EVENT_TAP_LOAD) {
+ qemu_free(net_req->iov);
+ }
+
+ qemu_free(net_req->device_name);
+}
+
+static void event_tap_alloc_net_req(EventTapNetReq *net_req,
+ VLANClientState *vc,
+ const struct iovec *iov, int iovcnt,
+ NetPacketSent *sent_cb, bool async)
+{
+ int i;
+
+ net_req->iovcnt = iovcnt;
+ net_req->async = async;
+ net_req->device_name = qemu_strdup(vc->name);
+ net_req->sent_cb = sent_cb;
+
+ if (vc->vlan) {
+ net_req->vlan_needed = 1;
+ net_req->vlan_id = vc->vlan->id;
+ } else {
+ net_req->vlan_needed = 0;
+ }
+
+ if (async) {
+ net_req->iov = (struct iovec *)iov;
+ } else {
+ net_req->iov = qemu_malloc(sizeof(struct iovec) * iovcnt);
+ for (i = 0; i < iovcnt; i++) {
+ net_req->iov[i].iov_base = qemu_malloc(iov[i].iov_len);
+ memcpy(net_req->iov[i].iov_base, iov[i].iov_base, iov[i].iov_len);
+ net_req->iov[i].iov_len = iov[i].iov_len;
+ }
+ }
+}
+
+static void event_tap_packet(VLANClientState *vc, const struct iovec *iov,
+ int iovcnt, NetPacketSent *sent_cb, bool async)
+{
+ int empty;
+ EventTapLog *log = last_event_tap;
+
+ if (!log) {
+ trace_event_tap_no_event();
+ log = event_tap_alloc_log();
+ }
+
+ if (log->mode & ~EVENT_TAP_TYPE_MASK) {
+ trace_event_tap_already_used(log->mode & ~EVENT_TAP_TYPE_MASK);
+ return;
+ }
+
+ log->mode |= EVENT_TAP_NET;
+ event_tap_alloc_net_req(&log->net_req, vc, iov, iovcnt, sent_cb, async);
+
+ empty = QTAILQ_EMPTY(&event_list);
+ QTAILQ_INSERT_TAIL(&event_list, log, node);
+ last_event_tap = NULL;
+
+ if (empty) {
+ event_tap_schedule_bh();
+ }
+}
+
+void event_tap_send_packet(VLANClientState *vc, const uint8_t *buf, int size)
+{
+ struct iovec iov;
+
+ assert(event_tap_state == EVENT_TAP_ON);
+
+ iov.iov_base = (uint8_t *)buf;
+ iov.iov_len = size;
+ event_tap_packet(vc, &iov, 1, NULL, 0);
+
+ return;
+}
+
+ssize_t event_tap_sendv_packet_async(VLANClientState *vc,
+ const struct iovec *iov,
+ int iovcnt, NetPacketSent *sent_cb)
+{
+ assert(event_tap_state == EVENT_TAP_ON);
+ event_tap_packet(vc, iov, iovcnt, sent_cb, 1);
+ return 0;
+}
+
+static void event_tap_net_flush(EventTapNetReq *net_req)
+{
+ VLANClientState *vc;
+ ssize_t len;
+
+ if (net_req->vlan_needed) {
+ vc = qemu_find_vlan_client_by_name(NULL, net_req->vlan_id,
+ net_req->device_name);
+ } else {
+ vc = qemu_find_netdev(net_req->device_name);
+ }
+
+ if (net_req->async) {
+ len = qemu_sendv_packet_async(vc, net_req->iov, net_req->iovcnt,
+ net_req->sent_cb);
+ if (len) {
+ net_req->sent_cb(vc, len);
+ } else {
+ /* packets are queued in the net layer */
+ trace_event_tap_append_packet();
+ }
+ } else {
+ qemu_send_packet(vc, net_req->iov[0].iov_base,
+ net_req->iov[0].iov_len);
+ }
+
+ /* force flush to avoid request inversion */
+ qemu_aio_flush();
+}
+
+static void event_tap_net_save(QEMUFile *f, EventTapNetReq *net_req)
+{
+ ram_addr_t page_addr;
+ int i, len;
+
+ len = strlen(net_req->device_name);
+ qemu_put_byte(f, len);
+ qemu_put_buffer(f, (uint8_t *)net_req->device_name, len);
+ qemu_put_byte(f, net_req->vlan_id);
+ qemu_put_byte(f, net_req->vlan_needed);
+ qemu_put_byte(f, net_req->async);
+ qemu_put_be32(f, net_req->iovcnt);
+
+ for (i = 0; i < net_req->iovcnt; i++) {
+ qemu_put_be64(f, net_req->iov[i].iov_len);
+ if (net_req->async) {
+ page_addr =
+ qemu_ram_addr_from_host_nofail(net_req->iov[i].iov_base);
+ qemu_put_be64(f, page_addr);
+ } else {
+ qemu_put_buffer(f, (uint8_t *)net_req->iov[i].iov_base,
+ net_req->iov[i].iov_len);
+ }
+ }
+}
+
+static void event_tap_net_load(QEMUFile *f, EventTapNetReq *net_req)
+{
+ ram_addr_t page_addr;
+ int i, len;
+
+ len = qemu_get_byte(f);
+ net_req->device_name = qemu_malloc(len + 1);
+ qemu_get_buffer(f, (uint8_t *)net_req->device_name, len);
+ net_req->device_name[len] = '\0';
+ net_req->vlan_id = qemu_get_byte(f);
+ net_req->vlan_needed = qemu_get_byte(f);
+ net_req->async = qemu_get_byte(f);
+ net_req->iovcnt = qemu_get_be32(f);
+ net_req->iov = qemu_malloc(sizeof(struct iovec) * net_req->iovcnt);
+
+ for (i = 0; i < net_req->iovcnt; i++) {
+ net_req->iov[i].iov_len = qemu_get_be64(f);
+ if (net_req->async) {
+ page_addr = qemu_get_be64(f);
+ net_req->iov[i].iov_base = qemu_get_ram_ptr(page_addr);
+ } else {
+ net_req->iov[i].iov_base = qemu_malloc(net_req->iov[i].iov_len);
+ qemu_get_buffer(f, (uint8_t *)net_req->iov[i].iov_base,
+ net_req->iov[i].iov_len);
+ }
+ }
+}
+
+static void event_tap_free_blk_req(EventTapBlkReq *blk_req)
+{
+ int i;
+
+ if (event_tap_state >= EVENT_TAP_LOAD && !blk_req->is_flush) {
+ for (i = 0; i < blk_req->num_reqs; i++) {
+ qemu_iovec_destroy(blk_req->reqs[i].qiov);
+ qemu_free(blk_req->reqs[i].qiov);
+ }
+ }
+
+ qemu_free(blk_req->device_name);
+}
+
+static void event_tap_blk_cb(void *opaque, int ret)
+{
+ EventTapLog *log = container_of(opaque, EventTapLog, blk_req);
+ EventTapBlkReq *blk_req = opaque;
+ int i;
+
+ blk_req->num_cbs--;
+
+ /* all outstanding requests are flushed */
+ if (blk_req->num_cbs == 0) {
+ for (i = 0; i < blk_req->num_reqs; i++) {
+ EventTapAIOCB *eacb = blk_req->acb[i];
+ eacb->common.cb(eacb->common.opaque, ret);
+ qemu_aio_release(eacb);
+ }
+
+ event_tap_free_log(log);
+ }
+}
+
+static void event_tap_bdrv_aio_cancel(BlockDriverAIOCB *acb)
+{
+ EventTapAIOCB *eacb = container_of(acb, EventTapAIOCB, common);
+
+ /* check if already passed to block layer */
+ if (eacb->acb) {
+ bdrv_aio_cancel(eacb->acb);
+ } else {
+ eacb->is_canceled = 1;
+ }
+}
+
+static AIOPool event_tap_aio_pool = {
+ .aiocb_size = sizeof(EventTapAIOCB),
+ .cancel = event_tap_bdrv_aio_cancel,
+};
+
+static void event_tap_alloc_blk_req(EventTapBlkReq *blk_req,
+ BlockDriverState *bs, BlockRequest *reqs,
+ int num_reqs, void *opaque, bool is_flush)
+{
+ int i;
+
+ blk_req->num_reqs = num_reqs;
+ blk_req->num_cbs = num_reqs;
+ blk_req->device_name = qemu_strdup(bs->device_name);
+ blk_req->is_flush = is_flush;
+
+ for (i = 0; i < num_reqs; i++) {
+ blk_req->reqs[i].sector = reqs[i].sector;
+ blk_req->reqs[i].nb_sectors = reqs[i].nb_sectors;
+ blk_req->reqs[i].qiov = reqs[i].qiov;
+ blk_req->reqs[i].cb = event_tap_blk_cb;
+ blk_req->reqs[i].opaque = opaque;
+
+ blk_req->acb[i] = qemu_aio_get(&event_tap_aio_pool, bs,
+ reqs[i].cb, reqs[i].opaque);
+ }
+}
+
+static EventTapBlkReq *event_tap_bdrv(BlockDriverState *bs, BlockRequest *reqs,
+ int num_reqs, bool is_flush)
+{
+ EventTapLog *log = last_event_tap;
+ int empty;
+
+ if (!log) {
+ trace_event_tap_no_event();
+ log = event_tap_alloc_log();
+ }
+
+ if (log->mode & ~EVENT_TAP_TYPE_MASK) {
+ trace_event_tap_already_used(log->mode & ~EVENT_TAP_TYPE_MASK);
+ return NULL;
+ }
+
+ log->mode |= EVENT_TAP_BLK;
+ event_tap_alloc_blk_req(&log->blk_req, bs, reqs,
+ num_reqs, &log->blk_req, is_flush);
+
+ empty = QTAILQ_EMPTY(&event_list);
+ QTAILQ_INSERT_TAIL(&event_list, log, node);
+ last_event_tap = NULL;
+
+ if (empty) {
+ event_tap_schedule_bh();
+ }
+
+ return &log->blk_req;
+}
+
+BlockDriverAIOCB *event_tap_bdrv_aio_writev(BlockDriverState *bs,
+ int64_t sector_num,
+ QEMUIOVector *iov,
+ int nb_sectors,
+ BlockDriverCompletionFunc *cb,
+ void *opaque)
+{
+ BlockRequest req;
+ EventTapBlkReq *ereq;
+
+ assert(event_tap_state == EVENT_TAP_ON);
+
+ req.sector = sector_num;
+ req.nb_sectors = nb_sectors;
+ req.qiov = iov;
+ req.cb = cb;
+ req.opaque = opaque;
+ ereq = event_tap_bdrv(bs, &req, 1, 0);
+
+ return &ereq->acb[0]->common;
+}
+
+BlockDriverAIOCB *event_tap_bdrv_aio_flush(BlockDriverState *bs,
+ BlockDriverCompletionFunc *cb,
+ void *opaque)
+{
+ BlockRequest req;
+ EventTapBlkReq *ereq;
+
+ assert(event_tap_state == EVENT_TAP_ON);
+
+ memset(&req, 0, sizeof(req));
+ req.cb = cb;
+ req.opaque = opaque;
+ ereq = event_tap_bdrv(bs, &req, 1, 1);
+
+ return &ereq->acb[0]->common;
+}
+
+void event_tap_bdrv_flush(void)
+{
+ qemu_bh_cancel(event_tap_bh);
+
+ while (!QTAILQ_EMPTY(&event_list)) {
+ event_tap_cb();
+ }
+}
+
+static void event_tap_blk_flush(EventTapBlkReq *blk_req)
+{
+ int i, ret;
+
+ for (i = 0; i < blk_req->num_reqs; i++) {
+ BlockRequest *req = &blk_req->reqs[i];
+ EventTapAIOCB *eacb = blk_req->acb[i];
+ BlockDriverAIOCB *acb = &eacb->common;
+
+ /* don't flush if canceled */
+ if (eacb->is_canceled) {
+ continue;
+ }
+
+ /* receiver needs to restore bs from device name */
+ if (!acb->bs) {
+ acb->bs = bdrv_find(blk_req->device_name);
+ }
+
+ if (blk_req->is_flush) {
+ eacb->acb = bdrv_aio_flush(acb->bs, req->cb, req->opaque);
+ if (!eacb->acb) {
+ req->cb(req->opaque, -EIO);
+ }
+ return;
+ }
+
+ eacb->acb = bdrv_aio_writev(acb->bs, req->sector, req->qiov,
+ req->nb_sectors, req->cb, req->opaque);
+ if (!eacb->acb) {
+ req->cb(req->opaque, -EIO);
+ }
+
+ /* force flush to avoid request inversion */
+ qemu_aio_flush();
+ ret = bdrv_flush(acb->bs);
+ if (ret < 0) {
+ error_report("flushing blk_req to %s failed", blk_req->device_name);
+ }
+ }
+}
+
+static void event_tap_blk_save(QEMUFile *f, EventTapBlkReq *blk_req)
+{
+ ram_addr_t page_addr;
+ int i, j, len;
+
+ len = strlen(blk_req->device_name);
+ qemu_put_byte(f, len);
+ qemu_put_buffer(f, (uint8_t *)blk_req->device_name, len);
+ qemu_put_byte(f, blk_req->num_reqs);
+ qemu_put_byte(f, blk_req->is_flush);
+
+ if (blk_req->is_flush) {
+ return;
+ }
+
+ for (i = 0; i < blk_req->num_reqs; i++) {
+ BlockRequest *req = &blk_req->reqs[i];
+ EventTapAIOCB *eacb = blk_req->acb[i];
+ /* don't save canceled requests */
+ if (eacb->is_canceled) {
+ continue;
+ }
+ qemu_put_be64(f, req->sector);
+ qemu_put_be32(f, req->nb_sectors);
+ qemu_put_be32(f, req->qiov->niov);
+
+ for (j = 0; j < req->qiov->niov; j++) {
+ page_addr =
+ qemu_ram_addr_from_host_nofail(req->qiov->iov[j].iov_base);
+ qemu_put_be64(f, page_addr);
+ qemu_put_be64(f, req->qiov->iov[j].iov_len);
+ }
+ }
+}
+
+static void event_tap_blk_load(QEMUFile *f, EventTapBlkReq *blk_req)
+{
+ BlockRequest *req;
+ ram_addr_t page_addr;
+ int i, j, len, niov;
+
+ len = qemu_get_byte(f);
+ blk_req->device_name = qemu_malloc(len + 1);
+ qemu_get_buffer(f, (uint8_t *)blk_req->device_name, len);
+ blk_req->device_name[len] = '\0';
+ blk_req->num_reqs = qemu_get_byte(f);
+ blk_req->is_flush = qemu_get_byte(f);
+
+ if (blk_req->is_flush) {
+ return;
+ }
+
+ for (i = 0; i < blk_req->num_reqs; i++) {
+ req = &blk_req->reqs[i];
+ req->sector = qemu_get_be64(f);
+ req->nb_sectors = qemu_get_be32(f);
+ req->qiov = qemu_mallocz(sizeof(QEMUIOVector));
+ niov = qemu_get_be32(f);
+ qemu_iovec_init(req->qiov, niov);
+
+ for (j = 0; j < niov; j++) {
+ void *iov_base;
+ size_t iov_len;
+ page_addr = qemu_get_be64(f);
+ iov_base = qemu_get_ram_ptr(page_addr);
+ iov_len = qemu_get_be64(f);
+ qemu_iovec_add(req->qiov, iov_base, iov_len);
+ }
+ }
+}
+
+void event_tap_ioport(int index, uint32_t address, uint32_t data)
+{
+ if (event_tap_state != EVENT_TAP_ON) {
+ return;
+ }
+
+ if (!last_event_tap) {
+ last_event_tap = event_tap_alloc_log();
+ }
+
+ last_event_tap->mode = EVENT_TAP_IOPORT;
+ last_event_tap->ioport.index = index;
+ last_event_tap->ioport.address = address;
+ last_event_tap->ioport.data = data;
+}
+
+static inline void event_tap_ioport_save(QEMUFile *f, EventTapIOport *ioport)
+{
+ qemu_put_be32(f, ioport->index);
+ qemu_put_be32(f, ioport->address);
+ qemu_put_byte(f, ioport->data);
+}
+
+static inline void event_tap_ioport_load(QEMUFile *f,
+ EventTapIOport *ioport)
+{
+ ioport->index = qemu_get_be32(f);
+ ioport->address = qemu_get_be32(f);
+ ioport->data = qemu_get_byte(f);
+}
+
+void event_tap_mmio(uint64_t address, uint8_t *buf, int len)
+{
+ if (event_tap_state != EVENT_TAP_ON || len > MMIO_BUF_SIZE) {
+ return;
+ }
+
+ if (!last_event_tap) {
+ last_event_tap = event_tap_alloc_log();
+ }
+
+ last_event_tap->mode = EVENT_TAP_MMIO;
+ last_event_tap->mmio.address = address;
+ last_event_tap->mmio.len = len;
+ memcpy(last_event_tap->mmio.buf, buf, len);
+}
+
+static inline void event_tap_mmio_save(QEMUFile *f, EventTapMMIO *mmio)
+{
+ qemu_put_be64(f, mmio->address);
+ qemu_put_byte(f, mmio->len);
+ qemu_put_buffer(f, mmio->buf, mmio->len);
+}
+
+static inline void event_tap_mmio_load(QEMUFile *f, EventTapMMIO *mmio)
+{
+ mmio->address = qemu_get_be64(f);
+ mmio->len = qemu_get_byte(f);
+ qemu_get_buffer(f, mmio->buf, mmio->len);
+}
+
+int event_tap_register(int (*cb)(void))
+{
+ if (event_tap_state != EVENT_TAP_OFF) {
+ error_report("event-tap is already on");
+ return -EINVAL;
+ }
+
+ if (!cb || event_tap_cb) {
+ error_report("can't set event_tap_cb");
+ return -EINVAL;
+ }
+
+ event_tap_cb = cb;
+ event_tap_state = EVENT_TAP_ON;
+
+ return 0;
+}
+
+void event_tap_unregister(void)
+{
+ if (event_tap_state == EVENT_TAP_OFF) {
+ error_report("event-tap is already off");
+ return;
+ }
+
+ qemu_del_vm_change_state_handler(vmstate);
+ event_tap_state = EVENT_TAP_OFF;
+ event_tap_cb = NULL;
+
+ event_tap_flush();
+ event_tap_free_pool();
+}
+
+int event_tap_is_on(void)
+{
+ return (event_tap_state == EVENT_TAP_ON);
+}
+
+static void event_tap_suspend(void *opaque, int running, int reason)
+{
+ event_tap_state = running ? EVENT_TAP_ON : EVENT_TAP_SUSPEND;
+}
+
+/* returns 1 if the queue gets emtpy */
+int event_tap_flush_one(void)
+{
+ EventTapLog *log;
+ int ret;
+
+ if (QTAILQ_EMPTY(&event_list)) {
+ return 1;
+ }
+
+ event_tap_state = EVENT_TAP_FLUSH;
+
+ log = QTAILQ_FIRST(&event_list);
+ QTAILQ_REMOVE(&event_list, log, node);
+ switch (log->mode & ~EVENT_TAP_TYPE_MASK) {
+ case EVENT_TAP_NET:
+ event_tap_net_flush(&log->net_req);
+ event_tap_free_log(log);
+ break;
+ case EVENT_TAP_BLK:
+ event_tap_blk_flush(&log->blk_req);
+ break;
+ default:
+ error_report("Unknown state %d", log->mode);
+ event_tap_free_log(log);
+ return -EINVAL;
+ }
+
+ ret = QTAILQ_EMPTY(&event_list);
+ event_tap_state = ret ? EVENT_TAP_ON : EVENT_TAP_FLUSH;
+
+ return ret;
+}
+
+void event_tap_flush(void)
+{
+ int ret;
+
+ do {
+ ret = event_tap_flush_one();
+ } while (ret == 0);
+
+ if (ret < 0) {
+ error_report("error flushing event-tap requests");
+ abort();
+ }
+}
+
+static void event_tap_replay(void *opaque, int running, int reason)
+{
+ EventTapLog *log, *next;
+
+ if (!running) {
+ return;
+ }
+
+ assert(event_tap_state == EVENT_TAP_LOAD);
+
+ event_tap_state = EVENT_TAP_REPLAY;
+
+ QTAILQ_FOREACH(log, &event_list, node) {
+ if ((log->mode & ~EVENT_TAP_TYPE_MASK) == EVENT_TAP_NET) {
+ EventTapNetReq *net_req = &log->net_req;
+ if (!net_req->async) {
+ event_tap_net_flush(net_req);
+ continue;
+ }
+ }
+
+ switch (log->mode & EVENT_TAP_TYPE_MASK) {
+ case EVENT_TAP_IOPORT:
+ switch (log->ioport.index) {
+ case 0:
+ cpu_outb(log->ioport.address, log->ioport.data);
+ break;
+ case 1:
+ cpu_outw(log->ioport.address, log->ioport.data);
+ break;
+ case 2:
+ cpu_outl(log->ioport.address, log->ioport.data);
+ break;
+ }
+ break;
+ case EVENT_TAP_MMIO:
+ cpu_physical_memory_rw(log->mmio.address,
+ log->mmio.buf,
+ log->mmio.len, 1);
+ break;
+ case 0:
+ trace_event_tap_replay_no_event();
+ break;
+ default:
+ error_report("Unknown state %d", log->mode);
+ QTAILQ_REMOVE(&event_list, log, node);
+ event_tap_free_log(log);
+ return;
+ }
+ }
+
+ /* remove event logs from queue */
+ QTAILQ_FOREACH_SAFE(log, &event_list, node, next) {
+ QTAILQ_REMOVE(&event_list, log, node);
+ event_tap_free_log(log);
+ }
+
+ event_tap_state = EVENT_TAP_OFF;
+ qemu_del_vm_change_state_handler(vmstate);
+}
+
+static void event_tap_save(QEMUFile *f, void *opaque)
+{
+ EventTapLog *log;
+
+ QTAILQ_FOREACH(log, &event_list, node) {
+ qemu_put_byte(f, log->mode);
+
+ switch (log->mode & EVENT_TAP_TYPE_MASK) {
+ case EVENT_TAP_IOPORT:
+ event_tap_ioport_save(f, &log->ioport);
+ break;
+ case EVENT_TAP_MMIO:
+ event_tap_mmio_save(f, &log->mmio);
+ break;
+ case 0:
+ trace_event_tap_save_no_event();
+ break;
+ default:
+ error_report("Unknown state %d", log->mode);
+ return;
+ }
+
+ switch (log->mode & ~EVENT_TAP_TYPE_MASK) {
+ case EVENT_TAP_NET:
+ event_tap_net_save(f, &log->net_req);
+ break;
+ case EVENT_TAP_BLK:
+ event_tap_blk_save(f, &log->blk_req);
+ break;
+ default:
+ error_report("Unknown state %d", log->mode);
+ return;
+ }
+ }
+
+ qemu_put_byte(f, 0); /* EOF */
+}
+
+static int event_tap_load(QEMUFile *f, void *opaque, int version_id)
+{
+ EventTapLog *log, *next;
+ int mode;
+
+ event_tap_state = EVENT_TAP_LOAD;
+
+ QTAILQ_FOREACH_SAFE(log, &event_list, node, next) {
+ QTAILQ_REMOVE(&event_list, log, node);
+ event_tap_free_log(log);
+ }
+
+ /* loop until EOF */
+ while ((mode = qemu_get_byte(f)) != 0) {
+ EventTapLog *log = event_tap_alloc_log();
+
+ log->mode = mode;
+ switch (log->mode & EVENT_TAP_TYPE_MASK) {
+ case EVENT_TAP_IOPORT:
+ event_tap_ioport_load(f, &log->ioport);
+ break;
+ case EVENT_TAP_MMIO:
+ event_tap_mmio_load(f, &log->mmio);
+ break;
+ case 0:
+ trace_event_tap_load_no_event();
+ break;
+ default:
+ error_report("Unknown state %d", log->mode);
+ event_tap_free_log(log);
+ return -EINVAL;
+ }
+
+ switch (log->mode & ~EVENT_TAP_TYPE_MASK) {
+ case EVENT_TAP_NET:
+ event_tap_net_load(f, &log->net_req);
+ break;
+ case EVENT_TAP_BLK:
+ event_tap_blk_load(f, &log->blk_req);
+ break;
+ default:
+ error_report("Unknown state %d", log->mode);
+ event_tap_free_log(log);
+ return -EINVAL;
+ }
+
+ QTAILQ_INSERT_TAIL(&event_list, log, node);
+ }
+
+ return 0;
+}
+
+void event_tap_schedule_replay(void)
+{
+ vmstate = qemu_add_vm_change_state_handler(event_tap_replay, NULL);
+}
+
+void event_tap_schedule_suspend(void)
+{
+ vmstate = qemu_add_vm_change_state_handler(event_tap_suspend, NULL);
+}
+
+void event_tap_init(void)
+{
+ QTAILQ_INIT(&event_list);
+ QTAILQ_INIT(&event_pool);
+ register_savevm(NULL, "event-tap", 0, 1,
+ event_tap_save, event_tap_load, &last_event_tap);
+}
diff --git a/event-tap.h b/event-tap.h
new file mode 100644
index 0000000..ab677f8
--- /dev/null
+++ b/event-tap.h
@@ -0,0 +1,44 @@
+/*
+ * Event Tap functions for QEMU
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef EVENT_TAP_H
+#define EVENT_TAP_H
+
+#include "qemu-common.h"
+#include "net.h"
+#include "block.h"
+
+int event_tap_register(int (*cb)(void));
+void event_tap_unregister(void);
+int event_tap_is_on(void);
+void event_tap_schedule_suspend(void);
+void event_tap_ioport(int index, uint32_t address, uint32_t data);
+void event_tap_mmio(uint64_t address, uint8_t *buf, int len);
+void event_tap_init(void);
+void event_tap_flush(void);
+int event_tap_flush_one(void);
+void event_tap_schedule_replay(void);
+
+void event_tap_send_packet(VLANClientState *vc, const uint8_t *buf, int size);
+ssize_t event_tap_sendv_packet_async(VLANClientState *vc,
+ const struct iovec *iov,
+ int iovcnt, NetPacketSent *sent_cb);
+
+BlockDriverAIOCB *event_tap_bdrv_aio_writev(BlockDriverState *bs,
+ int64_t sector_num,
+ QEMUIOVector *iov,
+ int nb_sectors,
+ BlockDriverCompletionFunc *cb,
+ void *opaque);
+BlockDriverAIOCB *event_tap_bdrv_aio_flush(BlockDriverState *bs,
+ BlockDriverCompletionFunc *cb,
+ void *opaque);
+void event_tap_bdrv_flush(void);
+
+#endif
diff --git a/qemu-tool.c b/qemu-tool.c
index 392e1c9..3f71215 100644
--- a/qemu-tool.c
+++ b/qemu-tool.c
@@ -16,6 +16,7 @@
#include "qemu-timer.h"
#include "qemu-log.h"
#include "sysemu.h"
+#include "event-tap.h"
#include <sys/time.h>
@@ -111,3 +112,30 @@ int qemu_set_fd_handler2(int fd,
{
return 0;
}
+
+BlockDriverAIOCB *event_tap_bdrv_aio_writev(BlockDriverState *bs,
+ int64_t sector_num,
+ QEMUIOVector *iov,
+ int nb_sectors,
+ BlockDriverCompletionFunc *cb,
+ void *opaque)
+{
+ return NULL;
+}
+
+BlockDriverAIOCB *event_tap_bdrv_aio_flush(BlockDriverState *bs,
+ BlockDriverCompletionFunc *cb,
+ void *opaque)
+{
+ return NULL;
+}
+
+void event_tap_bdrv_flush(void)
+{
+}
+
+int event_tap_is_on(void)
+{
+ return 0;
+}
+
diff --git a/trace-events b/trace-events
index 50ac840..1af3895 100644
--- a/trace-events
+++ b/trace-events
@@ -269,3 +269,13 @@ disable ft_trans_freeze_input(void) "backend not ready, freezing input"
disable ft_trans_put_ready(void) "file is ready to put"
disable ft_trans_get_ready(void) "file is ready to get"
disable ft_trans_cb(void *cb) "callback %p"
+
+# event-tap.c
+disable event_tap_ignore_bh(int bh) "event_tap_bh is already scheduled %d"
+disable event_tap_net_cb(char *s, ssize_t len) "%s: %zd bytes packet was sended"
+disable event_tap_no_event(void) "no last_event_tap"
+disable event_tap_already_used(int mode) "last_event_tap already used %d"
+disable event_tap_append_packet(void) "This packet is appended"
+disable event_tap_replay_no_event(void) "No event to replay"
+disable event_tap_save_no_event(void) "No event to save"
+disable event_tap_load_no_event(void) "No event to load"
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 10/18] Call init handler of event-tap at main() in vl.c.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (8 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 09/18] Introduce event-tap Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 11/18] ioport: insert event_tap_ioport() to ioport_write() Yoshiaki Tamura
` (7 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
vl.c | 3 +++
1 files changed, 3 insertions(+), 0 deletions(-)
diff --git a/vl.c b/vl.c
index 00155fb..f4d4abf 100644
--- a/vl.c
+++ b/vl.c
@@ -162,6 +162,7 @@ int main(int argc, char **argv)
#include "qemu-queue.h"
#include "cpus.h"
#include "arch_init.h"
+#include "event-tap.h"
#include "ui/qemu-spice.h"
@@ -2919,6 +2920,8 @@ int main(int argc, char **argv, char **envp)
blk_mig_init();
+ event_tap_init();
+
/* open the virtual block devices */
if (snapshot)
qemu_opts_foreach(qemu_find_opts("drive"), drive_enable_snapshot, NULL, 0);
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 11/18] ioport: insert event_tap_ioport() to ioport_write().
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (9 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 10/18] Call init handler of event-tap at main() in vl.c Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 12/18] Insert event_tap_mmio() to cpu_physical_memory_rw() in exec.c Yoshiaki Tamura
` (6 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Record ioport event to replay it upon failover.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
ioport.c | 2 ++
1 files changed, 2 insertions(+), 0 deletions(-)
diff --git a/ioport.c b/ioport.c
index aa4188a..74aebf5 100644
--- a/ioport.c
+++ b/ioport.c
@@ -27,6 +27,7 @@
#include "ioport.h"
#include "trace.h"
+#include "event-tap.h"
/***********************************************************/
/* IO Port */
@@ -76,6 +77,7 @@ static void ioport_write(int index, uint32_t address, uint32_t data)
default_ioport_writel
};
IOPortWriteFunc *func = ioport_write_table[index][address];
+ event_tap_ioport(index, address, data);
if (!func)
func = default_func[index];
func(ioport_opaque[address], address, data);
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 12/18] Insert event_tap_mmio() to cpu_physical_memory_rw() in exec.c.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (10 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 11/18] ioport: insert event_tap_ioport() to ioport_write() Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 13/18] net: insert event-tap to qemu_send_packet() and qemu_sendv_packet_async() Yoshiaki Tamura
` (5 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Record mmio write event to replay it upon failover.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
exec.c | 4 ++++
1 files changed, 4 insertions(+), 0 deletions(-)
diff --git a/exec.c b/exec.c
index e950df2..c81fd09 100644
--- a/exec.c
+++ b/exec.c
@@ -33,6 +33,7 @@
#include "osdep.h"
#include "kvm.h"
#include "qemu-timer.h"
+#include "event-tap.h"
#if defined(CONFIG_USER_ONLY)
#include <qemu.h>
#include <signal.h>
@@ -3632,6 +3633,9 @@ void cpu_physical_memory_rw(target_phys_addr_t addr, uint8_t *buf,
io_index = (pd >> IO_MEM_SHIFT) & (IO_MEM_NB_ENTRIES - 1);
if (p)
addr1 = (addr & ~TARGET_PAGE_MASK) + p->region_offset;
+
+ event_tap_mmio(addr, buf, len);
+
/* XXX: could force cpu_single_env to NULL to avoid
potential bugs */
if (l >= 4 && ((addr1 & 3) == 0)) {
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 13/18] net: insert event-tap to qemu_send_packet() and qemu_sendv_packet_async().
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (11 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 12/18] Insert event_tap_mmio() to cpu_physical_memory_rw() in exec.c Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 14/18] block: insert event-tap to bdrv_aio_writev(), bdrv_aio_flush() and bdrv_flush() Yoshiaki Tamura
` (4 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
event-tap function is called only when it is on.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
net.c | 9 +++++++++
1 files changed, 9 insertions(+), 0 deletions(-)
diff --git a/net.c b/net.c
index 9ba5be2..1176124 100644
--- a/net.c
+++ b/net.c
@@ -36,6 +36,7 @@
#include "qemu-common.h"
#include "qemu_socket.h"
#include "hw/qdev.h"
+#include "event-tap.h"
static QTAILQ_HEAD(, VLANState) vlans;
static QTAILQ_HEAD(, VLANClientState) non_vlan_clients;
@@ -559,6 +560,10 @@ ssize_t qemu_send_packet_async(VLANClientState *sender,
void qemu_send_packet(VLANClientState *vc, const uint8_t *buf, int size)
{
+ if (event_tap_is_on()) {
+ return event_tap_send_packet(vc, buf, size);
+ }
+
qemu_send_packet_async(vc, buf, size, NULL);
}
@@ -657,6 +662,10 @@ ssize_t qemu_sendv_packet_async(VLANClientState *sender,
{
NetQueue *queue;
+ if (event_tap_is_on()) {
+ return event_tap_sendv_packet_async(sender, iov, iovcnt, sent_cb);
+ }
+
if (sender->link_down || (!sender->peer && !sender->vlan)) {
return calc_iov_length(iov, iovcnt);
}
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 14/18] block: insert event-tap to bdrv_aio_writev(), bdrv_aio_flush() and bdrv_flush().
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (12 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 13/18] net: insert event-tap to qemu_send_packet() and qemu_sendv_packet_async() Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 15/18] savevm: introduce qemu_savevm_trans_{begin, commit} Yoshiaki Tamura
` (3 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
event-tap function is called only when it is on, and requests were
sent from device emulators.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
Acked-by: Kevin Wolf <kwolf@redhat.com>
---
block.c | 15 +++++++++++++++
1 files changed, 15 insertions(+), 0 deletions(-)
diff --git a/block.c b/block.c
index b476479..8ddce13 100644
--- a/block.c
+++ b/block.c
@@ -28,6 +28,7 @@
#include "block_int.h"
#include "module.h"
#include "qemu-objects.h"
+#include "event-tap.h"
#ifdef CONFIG_BSD
#include <sys/types.h>
@@ -1482,6 +1483,10 @@ int bdrv_flush(BlockDriverState *bs)
}
if (bs->drv && bs->drv->bdrv_flush) {
+ if (*bs->device_name && event_tap_is_on()) {
+ event_tap_bdrv_flush();
+ }
+
return bs->drv->bdrv_flush(bs);
}
@@ -2117,6 +2122,11 @@ BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState *bs, int64_t sector_num,
if (bdrv_check_request(bs, sector_num, nb_sectors))
return NULL;
+ if (*bs->device_name && event_tap_is_on()) {
+ return event_tap_bdrv_aio_writev(bs, sector_num, qiov, nb_sectors,
+ cb, opaque);
+ }
+
if (bs->dirty_bitmap) {
blk_cb_data = blk_dirty_cb_alloc(bs, sector_num, nb_sectors, cb,
opaque);
@@ -2380,6 +2390,11 @@ BlockDriverAIOCB *bdrv_aio_flush(BlockDriverState *bs,
if (!drv)
return NULL;
+
+ if (*bs->device_name && event_tap_is_on()) {
+ return event_tap_bdrv_aio_flush(bs, cb, opaque);
+ }
+
return drv->bdrv_aio_flush(bs, cb, opaque);
}
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 15/18] savevm: introduce qemu_savevm_trans_{begin, commit}.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (13 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 14/18] block: insert event-tap to bdrv_aio_writev(), bdrv_aio_flush() and bdrv_flush() Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 16/18] migration: introduce migrate_ft_trans_{put, get}_ready(), and modify migrate_fd_put_ready() when ft_mode is on Yoshiaki Tamura
` (2 subsequent siblings)
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Introduce qemu_savevm_trans_{begin,commit} to send the memory and
device info together, while avoiding cancelling memory state tracking.
This patch also abstracts common code between
qemu_savevm_state_{begin,iterate,commit}.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
savevm.c | 157 +++++++++++++++++++++++++++++++++++++++-----------------------
sysemu.h | 2 +
2 files changed, 101 insertions(+), 58 deletions(-)
diff --git a/savevm.c b/savevm.c
index e44eccd..1c2a7fb 100644
--- a/savevm.c
+++ b/savevm.c
@@ -1601,29 +1601,68 @@ bool qemu_savevm_state_blocked(Monitor *mon)
return false;
}
-int qemu_savevm_state_begin(Monitor *mon, QEMUFile *f, int blk_enable,
- int shared)
+/*
+ * section: header to write
+ * inc: if true, forces to pass SECTION_PART instead of SECTION_START
+ * pause: if true, breaks the loop when live handler returned 0
+ */
+static int qemu_savevm_state_live(Monitor *mon, QEMUFile *f, int section,
+ bool inc, bool pause)
{
SaveStateEntry *se;
+ int skip = 0, ret;
QTAILQ_FOREACH(se, &savevm_handlers, entry) {
- if(se->set_params == NULL) {
+ int len, stage;
+
+ if (se->save_live_state == NULL) {
continue;
- }
- se->set_params(blk_enable, shared, se->opaque);
+ }
+
+ /* Section type */
+ qemu_put_byte(f, section);
+ qemu_put_be32(f, se->section_id);
+
+ if (section == QEMU_VM_SECTION_START) {
+ /* ID string */
+ len = strlen(se->idstr);
+ qemu_put_byte(f, len);
+ qemu_put_buffer(f, (uint8_t *)se->idstr, len);
+
+ qemu_put_be32(f, se->instance_id);
+ qemu_put_be32(f, se->version_id);
+
+ stage = inc ? QEMU_VM_SECTION_PART : QEMU_VM_SECTION_START;
+ } else {
+ assert(inc);
+ stage = section;
+ }
+
+ ret = se->save_live_state(mon, f, stage, se->opaque);
+ if (!ret) {
+ skip++;
+ if (pause) {
+ break;
+ }
+ }
}
-
- qemu_put_be32(f, QEMU_VM_FILE_MAGIC);
- qemu_put_be32(f, QEMU_VM_FILE_VERSION);
+
+ return skip;
+}
+
+static void qemu_savevm_state_full(QEMUFile *f)
+{
+ SaveStateEntry *se;
QTAILQ_FOREACH(se, &savevm_handlers, entry) {
int len;
- if (se->save_live_state == NULL)
+ if (se->save_state == NULL && se->vmsd == NULL) {
continue;
+ }
/* Section type */
- qemu_put_byte(f, QEMU_VM_SECTION_START);
+ qemu_put_byte(f, QEMU_VM_SECTION_FULL);
qemu_put_be32(f, se->section_id);
/* ID string */
@@ -1634,9 +1673,29 @@ int qemu_savevm_state_begin(Monitor *mon, QEMUFile *f, int blk_enable,
qemu_put_be32(f, se->instance_id);
qemu_put_be32(f, se->version_id);
- se->save_live_state(mon, f, QEMU_VM_SECTION_START, se->opaque);
+ vmstate_save(f, se);
+ }
+
+ qemu_put_byte(f, QEMU_VM_EOF);
+}
+
+int qemu_savevm_state_begin(Monitor *mon, QEMUFile *f, int blk_enable,
+ int shared)
+{
+ SaveStateEntry *se;
+
+ QTAILQ_FOREACH(se, &savevm_handlers, entry) {
+ if (se->set_params == NULL) {
+ continue;
+ }
+ se->set_params(blk_enable, shared, se->opaque);
}
+ qemu_put_be32(f, QEMU_VM_FILE_MAGIC);
+ qemu_put_be32(f, QEMU_VM_FILE_VERSION);
+
+ qemu_savevm_state_live(mon, f, QEMU_VM_SECTION_START, 0, 0);
+
if (qemu_file_has_error(f)) {
qemu_savevm_state_cancel(mon, f);
return -EIO;
@@ -1647,29 +1706,16 @@ int qemu_savevm_state_begin(Monitor *mon, QEMUFile *f, int blk_enable,
int qemu_savevm_state_iterate(Monitor *mon, QEMUFile *f)
{
- SaveStateEntry *se;
int ret = 1;
- QTAILQ_FOREACH(se, &savevm_handlers, entry) {
- if (se->save_live_state == NULL)
- continue;
-
- /* Section type */
- qemu_put_byte(f, QEMU_VM_SECTION_PART);
- qemu_put_be32(f, se->section_id);
-
- ret = se->save_live_state(mon, f, QEMU_VM_SECTION_PART, se->opaque);
- if (!ret) {
- /* Do not proceed to the next vmstate before this one reported
- completion of the current stage. This serializes the migration
- and reduces the probability that a faster changing state is
- synchronized over and over again. */
- break;
- }
- }
-
- if (ret)
+ /* Do not proceed to the next vmstate before this one reported
+ completion of the current stage. This serializes the migration
+ and reduces the probability that a faster changing state is
+ synchronized over and over again. */
+ ret = qemu_savevm_state_live(mon, f, QEMU_VM_SECTION_PART, 1, 1);
+ if (!ret) {
return 1;
+ }
if (qemu_file_has_error(f)) {
qemu_savevm_state_cancel(mon, f);
@@ -1681,46 +1727,41 @@ int qemu_savevm_state_iterate(Monitor *mon, QEMUFile *f)
int qemu_savevm_state_complete(Monitor *mon, QEMUFile *f)
{
- SaveStateEntry *se;
-
cpu_synchronize_all_states();
- QTAILQ_FOREACH(se, &savevm_handlers, entry) {
- if (se->save_live_state == NULL)
- continue;
-
- /* Section type */
- qemu_put_byte(f, QEMU_VM_SECTION_END);
- qemu_put_be32(f, se->section_id);
+ qemu_savevm_state_live(mon, f, QEMU_VM_SECTION_END, 1, 0);
+ qemu_savevm_state_full(f);
- se->save_live_state(mon, f, QEMU_VM_SECTION_END, se->opaque);
+ if (qemu_file_has_error(f)) {
+ return -EIO;
}
- QTAILQ_FOREACH(se, &savevm_handlers, entry) {
- int len;
+ return 0;
+}
- if (se->save_state == NULL && se->vmsd == NULL)
- continue;
+int qemu_savevm_trans_begin(Monitor *mon, QEMUFile *f, int init)
+{
+ int ret;
- /* Section type */
- qemu_put_byte(f, QEMU_VM_SECTION_FULL);
- qemu_put_be32(f, se->section_id);
+ ret = qemu_savevm_state_live(mon, f, QEMU_VM_SECTION_START, !init, 0);
- /* ID string */
- len = strlen(se->idstr);
- qemu_put_byte(f, len);
- qemu_put_buffer(f, (uint8_t *)se->idstr, len);
+ if (qemu_file_has_error(f)) {
+ return -EIO;
+ }
- qemu_put_be32(f, se->instance_id);
- qemu_put_be32(f, se->version_id);
+ return ret;
+}
- vmstate_save(f, se);
- }
+int qemu_savevm_trans_complete(Monitor *mon, QEMUFile *f)
+{
+ cpu_synchronize_all_states();
- qemu_put_byte(f, QEMU_VM_EOF);
+ qemu_savevm_state_live(mon, f, QEMU_VM_SECTION_PART, 1, 0);
+ qemu_savevm_state_full(f);
- if (qemu_file_has_error(f))
+ if (qemu_file_has_error(f)) {
return -EIO;
+ }
return 0;
}
diff --git a/sysemu.h b/sysemu.h
index c86b4e8..6579714 100644
--- a/sysemu.h
+++ b/sysemu.h
@@ -81,6 +81,8 @@ int qemu_savevm_state_begin(Monitor *mon, QEMUFile *f, int blk_enable,
int qemu_savevm_state_iterate(Monitor *mon, QEMUFile *f);
int qemu_savevm_state_complete(Monitor *mon, QEMUFile *f);
void qemu_savevm_state_cancel(Monitor *mon, QEMUFile *f);
+int qemu_savevm_trans_begin(Monitor *mon, QEMUFile *f, int init);
+int qemu_savevm_trans_complete(Monitor *mon, QEMUFile *f);
int qemu_loadvm_state(QEMUFile *f, int skip_header);
/* SLIRP */
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 16/18] migration: introduce migrate_ft_trans_{put, get}_ready(), and modify migrate_fd_put_ready() when ft_mode is on.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (14 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 15/18] savevm: introduce qemu_savevm_trans_{begin, commit} Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 17/18] migration-tcp: modify tcp_accept_incoming_migration() to handle ft_mode, and add a hack not to close fd when ft_mode is enabled Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 18/18] Introduce "kemari:" to enable FT migration mode (Kemari) Yoshiaki Tamura
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
Introduce migrate_ft_trans_put_ready() which kicks the FT transaction
cycle. When ft_mode is on, migrate_fd_put_ready() would open
ft_trans_file and turn on event_tap. To end or cancel FT transaction,
ft_mode and event_tap is turned off. migrate_ft_trans_get_ready() is
called to receive ack from the receiver.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
migration.c | 261 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 260 insertions(+), 1 deletions(-)
diff --git a/migration.c b/migration.c
index c5e0146..7837c55 100644
--- a/migration.c
+++ b/migration.c
@@ -21,6 +21,7 @@
#include "qemu_socket.h"
#include "block-migration.h"
#include "qemu-objects.h"
+#include "event-tap.h"
//#define DEBUG_MIGRATION
@@ -283,6 +284,14 @@ void migrate_fd_error(FdMigrationState *s)
migrate_fd_cleanup(s);
}
+static void migrate_ft_trans_error(FdMigrationState *s)
+{
+ ft_mode = FT_ERROR;
+ qemu_savevm_state_cancel(s->mon, s->file);
+ migrate_fd_error(s);
+ event_tap_unregister();
+}
+
int migrate_fd_cleanup(FdMigrationState *s)
{
int ret = 0;
@@ -318,6 +327,17 @@ void migrate_fd_put_notify(void *opaque)
qemu_file_put_notify(s->file);
}
+static void migrate_fd_get_notify(void *opaque)
+{
+ FdMigrationState *s = opaque;
+
+ qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
+ qemu_file_get_notify(s->file);
+ if (qemu_file_has_error(s->file)) {
+ migrate_ft_trans_error(s);
+ }
+}
+
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
{
FdMigrationState *s = opaque;
@@ -353,6 +373,10 @@ int migrate_fd_get_buffer(void *opaque, uint8_t *data, int64_t pos, size_t size)
ret = -(s->get_error(s));
}
+ if (ret == -EAGAIN) {
+ qemu_set_fd_handler2(s->fd, NULL, migrate_fd_get_notify, NULL, s);
+ }
+
return ret;
}
@@ -379,6 +403,230 @@ void migrate_fd_connect(FdMigrationState *s)
migrate_fd_put_ready(s);
}
+static int migrate_ft_trans_commit(void *opaque)
+{
+ FdMigrationState *s = opaque;
+ int ret = -1;
+
+ if (ft_mode != FT_TRANSACTION_COMMIT && ft_mode != FT_TRANSACTION_ATOMIC) {
+ fprintf(stderr,
+ "migrate_ft_trans_commit: invalid ft_mode %d\n", ft_mode);
+ goto out;
+ }
+
+ do {
+ if (ft_mode == FT_TRANSACTION_ATOMIC) {
+ if (qemu_ft_trans_begin(s->file) < 0) {
+ fprintf(stderr, "qemu_ft_trans_begin failed\n");
+ goto out;
+ }
+
+ ret = qemu_savevm_trans_begin(s->mon, s->file, 0);
+ if (ret < 0) {
+ fprintf(stderr, "qemu_savevm_trans_begin failed\n");
+ goto out;
+ }
+
+ ft_mode = FT_TRANSACTION_COMMIT;
+ if (ret) {
+ /* don't proceed until if fd isn't ready */
+ goto out;
+ }
+ }
+
+ /* make the VM state consistent by flushing outstanding events */
+ vm_stop(0);
+
+ /* send at full speed */
+ qemu_file_set_rate_limit(s->file, 0);
+
+ ret = qemu_savevm_trans_complete(s->mon, s->file);
+ if (ret < 0) {
+ fprintf(stderr, "qemu_savevm_trans_complete failed\n");
+ goto out;
+ }
+
+ ret = qemu_ft_trans_commit(s->file);
+ if (ret < 0) {
+ fprintf(stderr, "qemu_ft_trans_commit failed\n");
+ goto out;
+ }
+
+ if (ret) {
+ ft_mode = FT_TRANSACTION_RECV;
+ ret = 1;
+ goto out;
+ }
+
+ /* flush and check if events are remaining */
+ vm_start();
+ ret = event_tap_flush_one();
+ if (ret < 0) {
+ fprintf(stderr, "event_tap_flush_one failed\n");
+ goto out;
+ }
+
+ ft_mode = ret ? FT_TRANSACTION_BEGIN : FT_TRANSACTION_ATOMIC;
+ } while (ft_mode != FT_TRANSACTION_BEGIN);
+
+ vm_start();
+ ret = 0;
+
+out:
+ return ret;
+}
+
+static int migrate_ft_trans_get_ready(void *opaque)
+{
+ FdMigrationState *s = opaque;
+ int ret = -1;
+
+ if (ft_mode != FT_TRANSACTION_RECV) {
+ fprintf(stderr,
+ "migrate_ft_trans_get_ready: invalid ft_mode %d\n", ft_mode);
+ goto error_out;
+ }
+
+ /* flush and check if events are remaining */
+ vm_start();
+ ret = event_tap_flush_one();
+ if (ret < 0) {
+ fprintf(stderr, "event_tap_flush_one failed\n");
+ goto error_out;
+ }
+
+ if (ret) {
+ ft_mode = FT_TRANSACTION_BEGIN;
+ } else {
+ ft_mode = FT_TRANSACTION_ATOMIC;
+
+ ret = migrate_ft_trans_commit(s);
+ if (ret < 0) {
+ goto error_out;
+ }
+ if (ret) {
+ goto out;
+ }
+ }
+
+ vm_start();
+ ret = 0;
+ goto out;
+
+error_out:
+ migrate_ft_trans_error(s);
+
+out:
+ return ret;
+}
+
+static int migrate_ft_trans_put_ready(void)
+{
+ FdMigrationState *s = migrate_to_fms(current_migration);
+ int ret = -1, init = 0, timeout;
+ static int64_t start, now;
+
+ switch (ft_mode) {
+ case FT_INIT:
+ init = 1;
+ ft_mode = FT_TRANSACTION_BEGIN;
+ case FT_TRANSACTION_BEGIN:
+ now = start = qemu_get_clock(vm_clock);
+ /* start transatcion at best effort */
+ qemu_file_set_rate_limit(s->file, 1);
+
+ if (qemu_ft_trans_begin(s->file) < 0) {
+ fprintf(stderr, "qemu_transaction_begin failed\n");
+ goto error_out;
+ }
+
+ vm_stop(0);
+
+ ret = qemu_savevm_trans_begin(s->mon, s->file, init);
+ if (ret < 0) {
+ fprintf(stderr, "qemu_savevm_trans_begin\n");
+ goto error_out;
+ }
+
+ if (ret) {
+ ft_mode = FT_TRANSACTION_ITER;
+ vm_start();
+ } else {
+ ft_mode = FT_TRANSACTION_COMMIT;
+ if (migrate_ft_trans_commit(s) < 0) {
+ goto error_out;
+ }
+ }
+ break;
+
+ case FT_TRANSACTION_ITER:
+ now = qemu_get_clock(vm_clock);
+ timeout = ((now - start) >= max_downtime);
+ if (timeout || qemu_savevm_state_iterate(s->mon, s->file) == 1) {
+ DPRINTF("ft trans iter timeout %d\n", timeout);
+
+ ft_mode = FT_TRANSACTION_COMMIT;
+ if (migrate_ft_trans_commit(s) < 0) {
+ goto error_out;
+ }
+ return 1;
+ }
+
+ ft_mode = FT_TRANSACTION_ITER;
+ break;
+
+ case FT_TRANSACTION_ATOMIC:
+ case FT_TRANSACTION_COMMIT:
+ if (migrate_ft_trans_commit(s) < 0) {
+ goto error_out;
+ }
+ break;
+
+ default:
+ fprintf(stderr,
+ "migrate_ft_trans_put_ready: invalid ft_mode %d", ft_mode);
+ goto error_out;
+ }
+
+ ret = 0;
+ goto out;
+
+error_out:
+ migrate_ft_trans_error(s);
+
+out:
+ return ret;
+}
+
+static void migrate_ft_trans_connect(FdMigrationState *s, int old_vm_running)
+{
+ /* close buffered_file and open ft_trans_file
+ * NB: fd won't get closed, and reused by ft_trans_file
+ */
+ qemu_fclose(s->file);
+
+ s->file = qemu_fopen_ops_ft_trans(s,
+ migrate_fd_put_buffer,
+ migrate_fd_get_buffer,
+ migrate_ft_trans_put_ready,
+ migrate_ft_trans_get_ready,
+ migrate_fd_wait_for_unfreeze,
+ migrate_fd_close,
+ 1);
+ socket_set_nodelay(s->fd);
+
+ /* events are tapped from now */
+ if (event_tap_register(migrate_ft_trans_put_ready) < 0) {
+ migrate_ft_trans_error(s);
+ }
+
+ event_tap_schedule_suspend();
+
+ if (old_vm_running) {
+ vm_start();
+ }
+}
+
void migrate_fd_put_ready(void *opaque)
{
FdMigrationState *s = opaque;
@@ -404,6 +652,11 @@ void migrate_fd_put_ready(void *opaque)
} else {
state = MIG_STATE_COMPLETED;
}
+
+ if (ft_mode && state == MIG_STATE_COMPLETED) {
+ return migrate_ft_trans_connect(s, old_vm_running);
+ }
+
if (migrate_fd_cleanup(s) < 0) {
if (old_vm_running) {
vm_start();
@@ -432,8 +685,14 @@ void migrate_fd_cancel(MigrationState *mig_state)
s->state = MIG_STATE_CANCELLED;
notifier_list_notify(&migration_state_notifiers);
- qemu_savevm_state_cancel(s->mon, s->file);
+ if (ft_mode) {
+ qemu_ft_trans_cancel(s->file);
+ ft_mode = FT_OFF;
+ event_tap_unregister();
+ }
+
+ qemu_savevm_state_cancel(s->mon, s->file);
migrate_fd_cleanup(s);
}
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 17/18] migration-tcp: modify tcp_accept_incoming_migration() to handle ft_mode, and add a hack not to close fd when ft_mode is enabled.
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (15 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 16/18] migration: introduce migrate_ft_trans_{put, get}_ready(), and modify migrate_fd_put_ready() when ft_mode is on Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:30 ` [Qemu-devel] [PATCH 18/18] Introduce "kemari:" to enable FT migration mode (Kemari) Yoshiaki Tamura
17 siblings, 0 replies; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
When ft_mode is set in the header, tcp_accept_incoming_migration()
sets ft_trans_incoming() as a callback, and call
qemu_file_get_notify() to receive FT transaction iteratively. We also
need a hack no to close fd before moving to ft_transaction mode, so
that we can reuse the fd for it. vm_change_state_handler is added to
turn off ft_mode when cont is pressed.
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
migration-tcp.c | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 66 insertions(+), 1 deletions(-)
diff --git a/migration-tcp.c b/migration-tcp.c
index 55777c8..84076d6 100644
--- a/migration-tcp.c
+++ b/migration-tcp.c
@@ -18,6 +18,8 @@
#include "sysemu.h"
#include "buffered_file.h"
#include "block.h"
+#include "ft_trans_file.h"
+#include "event-tap.h"
//#define DEBUG_MIGRATION_TCP
@@ -29,6 +31,8 @@
do { } while (0)
#endif
+static VMChangeStateEntry *vmstate;
+
static int socket_errno(FdMigrationState *s)
{
return socket_error();
@@ -56,7 +60,8 @@ static int socket_read(FdMigrationState *s, const void * buf, size_t size)
static int tcp_close(FdMigrationState *s)
{
DPRINTF("tcp_close\n");
- if (s->fd != -1) {
+ /* FIX ME: accessing ft_mode here isn't clean */
+ if (s->fd != -1 && ft_mode != FT_INIT) {
close(s->fd);
s->fd = -1;
}
@@ -150,6 +155,36 @@ MigrationState *tcp_start_outgoing_migration(Monitor *mon,
return &s->mig_state;
}
+static void ft_trans_incoming(void *opaque)
+{
+ QEMUFile *f = opaque;
+
+ qemu_file_get_notify(f);
+ if (qemu_file_has_error(f)) {
+ ft_mode = FT_ERROR;
+ qemu_fclose(f);
+ }
+}
+
+static void ft_trans_reset(void *opaque, int running, int reason)
+{
+ QEMUFile *f = opaque;
+
+ if (running) {
+ if (ft_mode != FT_ERROR) {
+ qemu_fclose(f);
+ }
+ ft_mode = FT_OFF;
+ qemu_del_vm_change_state_handler(vmstate);
+ }
+}
+
+static void ft_trans_schedule_replay(QEMUFile *f)
+{
+ event_tap_schedule_replay();
+ vmstate = qemu_add_vm_change_state_handler(ft_trans_reset, f);
+}
+
static void tcp_accept_incoming_migration(void *opaque)
{
struct sockaddr_in addr;
@@ -175,8 +210,38 @@ static void tcp_accept_incoming_migration(void *opaque)
goto out;
}
+ if (ft_mode == FT_INIT) {
+ autostart = 0;
+ }
+
process_incoming_migration(f);
+
+ if (ft_mode == FT_INIT) {
+ int ret;
+
+ socket_set_nodelay(c);
+
+ f = qemu_fopen_ft_trans(s, c);
+ if (f == NULL) {
+ fprintf(stderr, "could not qemu_fopen_ft_trans\n");
+ goto out;
+ }
+
+ /* need to wait sender to setup */
+ ret = qemu_ft_trans_begin(f);
+ if (ret < 0) {
+ goto out;
+ }
+
+ qemu_set_fd_handler2(c, NULL, ft_trans_incoming, NULL, f);
+ ft_trans_schedule_replay(f);
+ ft_mode = FT_TRANSACTION_RECV;
+
+ return;
+ }
+
qemu_fclose(f);
+
out:
close(c);
out2:
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] [PATCH 18/18] Introduce "kemari:" to enable FT migration mode (Kemari).
2011-02-10 9:30 [Qemu-devel] [PATCH 00/18] Kemari for KVM v0.2.10 Yoshiaki Tamura
` (16 preceding siblings ...)
2011-02-10 9:30 ` [Qemu-devel] [PATCH 17/18] migration-tcp: modify tcp_accept_incoming_migration() to handle ft_mode, and add a hack not to close fd when ft_mode is enabled Yoshiaki Tamura
@ 2011-02-10 9:30 ` Yoshiaki Tamura
2011-02-10 9:52 ` [Qemu-devel] " Paolo Bonzini
17 siblings, 1 reply; 35+ messages in thread
From: Yoshiaki Tamura @ 2011-02-10 9:30 UTC (permalink / raw)
To: kvm, qemu-devel
Cc: kwolf, aliguori, mtosatti, ananth, mst, dlaor, vatsa,
Yoshiaki Tamura, blauwirbel, ohmura.kei, avi, pbonzini, psuriset,
stefanha
When "kemari:" is set in front of URI of migrate command, it will turn
on ft_mode to start FT migration mode (Kemari). On the receiver side,
the option looks like, -incoming kemari:<protocol>:<address>:<port>
Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
---
hmp-commands.hx | 4 +++-
migration.c | 12 ++++++++++++
qmp-commands.hx | 4 +++-
3 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/hmp-commands.hx b/hmp-commands.hx
index 38e1eb7..ee14344 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -760,7 +760,9 @@ ETEXI
"\n\t\t\t -b for migration without shared storage with"
" full copy of disk\n\t\t\t -i for migration without "
"shared storage with incremental copy of disk "
- "(base image shared between src and destination)",
+ "(base image shared between src and destination)"
+ "\n\t\t\t put \"kemari:\" in front of URI to enable "
+ "Fault Tolerance mode (Kemari protocol)",
.user_print = monitor_user_noop,
.mhandler.cmd_new = do_migrate,
},
diff --git a/migration.c b/migration.c
index 7837c55..a3f7722 100644
--- a/migration.c
+++ b/migration.c
@@ -48,6 +48,12 @@ int qemu_start_incoming_migration(const char *uri)
const char *p;
int ret;
+ /* check ft_mode (Kemari protocol) */
+ if (strstart(uri, "kemari:", &p)) {
+ ft_mode = FT_INIT;
+ uri = p;
+ }
+
if (strstart(uri, "tcp:", &p))
ret = tcp_start_incoming_migration(p);
#if !defined(WIN32)
@@ -99,6 +105,12 @@ int do_migrate(Monitor *mon, const QDict *qdict, QObject **ret_data)
return -1;
}
+ /* check ft_mode (Kemari protocol) */
+ if (strstart(uri, "kemari:", &p)) {
+ ft_mode = FT_INIT;
+ uri = p;
+ }
+
if (strstart(uri, "tcp:", &p)) {
s = tcp_start_outgoing_migration(mon, p, max_throttle, detach,
blk, inc);
diff --git a/qmp-commands.hx b/qmp-commands.hx
index df40a3d..68ca48a 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -437,7 +437,9 @@ EQMP
"\n\t\t\t -b for migration without shared storage with"
" full copy of disk\n\t\t\t -i for migration without "
"shared storage with incremental copy of disk "
- "(base image shared between src and destination)",
+ "(base image shared between src and destination)"
+ "\n\t\t\t put \"kemari:\" in front of URI to enable "
+ "Fault Tolerance mode (Kemari protocol)",
.user_print = monitor_user_noop,
.mhandler.cmd_new = do_migrate,
},
--
1.7.1.2
^ permalink raw reply related [flat|nested] 35+ messages in thread
* [Qemu-devel] Re: [PATCH 18/18] Introduce "kemari:" to enable FT migration mode (Kemari).
2011-02-10 9:30 ` [Qemu-devel] [PATCH 18/18] Introduce "kemari:" to enable FT migration mode (Kemari) Yoshiaki Tamura
@ 2011-02-10 9:52 ` Paolo Bonzini
0 siblings, 0 replies; 35+ messages in thread
From: Paolo Bonzini @ 2011-02-10 9:52 UTC (permalink / raw)
To: Yoshiaki Tamura; +Cc: ohmura.kei, qemu-devel, kvm
On 02/10/2011 10:30 AM, Yoshiaki Tamura wrote:
> When "kemari:" is set in front of URI of migrate command, it will turn
> on ft_mode to start FT migration mode (Kemari). On the receiver side,
> the option looks like, -incoming kemari:<protocol>:<address>:<port>
>
> Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>
> ---
> hmp-commands.hx | 4 +++-
> migration.c | 12 ++++++++++++
> qmp-commands.hx | 4 +++-
> 3 files changed, 18 insertions(+), 2 deletions(-)
>
> diff --git a/hmp-commands.hx b/hmp-commands.hx
> index 38e1eb7..ee14344 100644
> --- a/hmp-commands.hx
> +++ b/hmp-commands.hx
> @@ -760,7 +760,9 @@ ETEXI
> "\n\t\t\t -b for migration without shared storage with"
> " full copy of disk\n\t\t\t -i for migration without "
> "shared storage with incremental copy of disk "
> - "(base image shared between src and destination)",
> + "(base image shared between src and destination)"
> + "\n\t\t\t put \"kemari:\" in front of URI to enable "
> + "Fault Tolerance mode (Kemari protocol)",
> .user_print = monitor_user_noop,
> .mhandler.cmd_new = do_migrate,
> },
> diff --git a/migration.c b/migration.c
> index 7837c55..a3f7722 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -48,6 +48,12 @@ int qemu_start_incoming_migration(const char *uri)
> const char *p;
> int ret;
>
> + /* check ft_mode (Kemari protocol) */
> + if (strstart(uri, "kemari:",&p)) {
> + ft_mode = FT_INIT;
> + uri = p;
> + }
> +
> if (strstart(uri, "tcp:",&p))
> ret = tcp_start_incoming_migration(p);
> #if !defined(WIN32)
> @@ -99,6 +105,12 @@ int do_migrate(Monitor *mon, const QDict *qdict, QObject **ret_data)
> return -1;
> }
>
> + /* check ft_mode (Kemari protocol) */
> + if (strstart(uri, "kemari:",&p)) {
> + ft_mode = FT_INIT;
> + uri = p;
> + }
> +
> if (strstart(uri, "tcp:",&p)) {
> s = tcp_start_outgoing_migration(mon, p, max_throttle, detach,
> blk, inc);
> diff --git a/qmp-commands.hx b/qmp-commands.hx
> index df40a3d..68ca48a 100644
> --- a/qmp-commands.hx
> +++ b/qmp-commands.hx
> @@ -437,7 +437,9 @@ EQMP
> "\n\t\t\t -b for migration without shared storage with"
> " full copy of disk\n\t\t\t -i for migration without "
> "shared storage with incremental copy of disk "
> - "(base image shared between src and destination)",
> + "(base image shared between src and destination)"
> + "\n\t\t\t put \"kemari:\" in front of URI to enable "
> + "Fault Tolerance mode (Kemari protocol)",
> .user_print = monitor_user_noop,
> .mhandler.cmd_new = do_migrate,
> },
Acked-by: Paolo Bonzini <pbonzini@redhat.com>
Paolo
^ permalink raw reply [flat|nested] 35+ messages in thread