From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:47240) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1XNP54-00008J-6i for qemu-devel@nongnu.org; Fri, 29 Aug 2014 12:31:22 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1XNP4z-00031j-4k for qemu-devel@nongnu.org; Fri, 29 Aug 2014 12:31:18 -0400 Received: from mx1.redhat.com ([209.132.183.28]:38899) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1XNP4y-00031F-Mh for qemu-devel@nongnu.org; Fri, 29 Aug 2014 12:31:13 -0400 From: Stefan Hajnoczi Date: Fri, 29 Aug 2014 17:29:56 +0100 Message-Id: <1409329803-20744-29-git-send-email-stefanha@redhat.com> In-Reply-To: <1409329803-20744-1-git-send-email-stefanha@redhat.com> References: <1409329803-20744-1-git-send-email-stefanha@redhat.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Subject: [Qemu-devel] [PULL 28/35] linux-aio: avoid deadlock in nested aio_poll() calls List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: Kevin Wolf , Peter Maydell , Ming Lei , =?UTF-8?q?Marcin=20Gibu=C5=82a?= , Stefan Hajnoczi , Paolo Bonzini If two Linux AIO request completions are fetched in the same io_getevents() call, QEMU will deadlock if request A's callback waits for request B to complete using an aio_poll() loop. This was reported to happen with the mirror blockjob. This patch moves completion processing into a BH and makes it resumable. Nested event loops can resume completion processing so that request B will complete and the deadlock will not occur. Cc: Kevin Wolf Cc: Paolo Bonzini Cc: Ming Lei Cc: Marcin Gibu=C5=82a Reported-by: Marcin Gibu=C5=82a Signed-off-by: Stefan Hajnoczi Tested-by: Marcin Gibu=C5=82a --- block/linux-aio.c | 71 ++++++++++++++++++++++++++++++++++++++++++-------= ------ 1 file changed, 55 insertions(+), 16 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index 7ac7e8c..9aca758 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -51,6 +51,12 @@ struct qemu_laio_state { =20 /* io queue for submit at batch */ LaioQueue io_q; + + /* I/O completion processing */ + QEMUBH *completion_bh; + struct io_event events[MAX_EVENTS]; + int event_idx; + int event_max; }; =20 static inline ssize_t io_event_ret(struct io_event *ev) @@ -86,27 +92,58 @@ static void qemu_laio_process_completion(struct qemu_= laio_state *s, qemu_aio_release(laiocb); } =20 -static void qemu_laio_completion_cb(EventNotifier *e) +/* The completion BH fetches completed I/O requests and invokes their + * callbacks. + * + * The function is somewhat tricky because it supports nested event loop= s, for + * example when a request callback invokes aio_poll(). In order to do t= his, + * the completion events array and index are kept in qemu_laio_state. T= he BH + * reschedules itself as long as there are completions pending so it wil= l + * either be called again in a nested event loop or will be called after= all + * events have been completed. When there are no events left to complet= e, the + * BH returns without rescheduling. + */ +static void qemu_laio_completion_bh(void *opaque) { - struct qemu_laio_state *s =3D container_of(e, struct qemu_laio_state= , e); - - while (event_notifier_test_and_clear(&s->e)) { - struct io_event events[MAX_EVENTS]; - struct timespec ts =3D { 0 }; - int nevents, i; + struct qemu_laio_state *s =3D opaque; =20 + /* Fetch more completion events when empty */ + if (s->event_idx =3D=3D s->event_max) { do { - nevents =3D io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, eve= nts, &ts); - } while (nevents =3D=3D -EINTR); + struct timespec ts =3D { 0 }; + s->event_max =3D io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS= , + s->events, &ts); + } while (s->event_max =3D=3D -EINTR); + + s->event_idx =3D 0; + if (s->event_max <=3D 0) { + s->event_max =3D 0; + return; /* no more events */ + } + } =20 - for (i =3D 0; i < nevents; i++) { - struct iocb *iocb =3D events[i].obj; - struct qemu_laiocb *laiocb =3D - container_of(iocb, struct qemu_laiocb, iocb); + /* Reschedule so nested event loops see currently pending completion= s */ + qemu_bh_schedule(s->completion_bh); =20 - laiocb->ret =3D io_event_ret(&events[i]); - qemu_laio_process_completion(s, laiocb); - } + /* Process completion events */ + while (s->event_idx < s->event_max) { + struct iocb *iocb =3D s->events[s->event_idx].obj; + struct qemu_laiocb *laiocb =3D + container_of(iocb, struct qemu_laiocb, iocb); + + laiocb->ret =3D io_event_ret(&s->events[s->event_idx]); + s->event_idx++; + + qemu_laio_process_completion(s, laiocb); + } +} + +static void qemu_laio_completion_cb(EventNotifier *e) +{ + struct qemu_laio_state *s =3D container_of(e, struct qemu_laio_state= , e); + + if (event_notifier_test_and_clear(&s->e)) { + qemu_bh_schedule(s->completion_bh); } } =20 @@ -272,12 +309,14 @@ void laio_detach_aio_context(void *s_, AioContext *= old_context) struct qemu_laio_state *s =3D s_; =20 aio_set_event_notifier(old_context, &s->e, NULL); + qemu_bh_delete(s->completion_bh); } =20 void laio_attach_aio_context(void *s_, AioContext *new_context) { struct qemu_laio_state *s =3D s_; =20 + s->completion_bh =3D aio_bh_new(new_context, qemu_laio_completion_bh= , s); aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); } =20 --=20 1.9.3