From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from [140.186.70.92] (port=52098 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PGAzV-0004mb-DY for qemu-devel@nongnu.org; Wed, 10 Nov 2010 08:45:34 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1PGAzU-0003kU-7C for qemu-devel@nongnu.org; Wed, 10 Nov 2010 08:45:33 -0500 Received: from mail-ww0-f53.google.com ([74.125.82.53]:54579) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1PGAzT-0003jh-U7 for qemu-devel@nongnu.org; Wed, 10 Nov 2010 08:45:32 -0500 Received: by wwb22 with SMTP id 22so196075wwb.10 for ; Wed, 10 Nov 2010 05:45:30 -0800 (PST) MIME-Version: 1.0 In-Reply-To: <20101110131946.29714.98218.stgit@localhost6.localdomain6> References: <20101110131822.29714.34137.stgit@localhost6.localdomain6> <20101110131946.29714.98218.stgit@localhost6.localdomain6> Date: Wed, 10 Nov 2010 13:45:29 +0000 Message-ID: Subject: Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure From: Stefan Hajnoczi Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Arun R Bharadwaj Cc: qemu-devel@nongnu.org On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj wrote: > @@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb = *aiocb) > =A0 =A0 return nbytes; > =A0} > > -static void *aio_thread(void *unused) > +static void aio_thread(ThreadletWork *work) > =A0{ > =A0 =A0 pid_t pid; > + =A0 =A0struct qemu_paiocb *aiocb =3D container_of(work, struct qemu_pai= ocb, work); > + =A0 =A0ssize_t ret =3D 0; > > =A0 =A0 pid =3D getpid(); > > - =A0 =A0while (1) { > - =A0 =A0 =A0 =A0struct qemu_paiocb *aiocb; > - =A0 =A0 =A0 =A0ssize_t ret =3D 0; > - =A0 =A0 =A0 =A0qemu_timeval tv; > - =A0 =A0 =A0 =A0struct timespec ts; > - > - =A0 =A0 =A0 =A0qemu_gettimeofday(&tv); > - =A0 =A0 =A0 =A0ts.tv_sec =3D tv.tv_sec + 10; > - =A0 =A0 =A0 =A0ts.tv_nsec =3D 0; > - > - =A0 =A0 =A0 =A0mutex_lock(&lock); > - > - =A0 =A0 =A0 =A0while (QTAILQ_EMPTY(&request_list) && > - =A0 =A0 =A0 =A0 =A0 =A0 =A0 !(ret =3D=3D ETIMEDOUT)) { > - =A0 =A0 =A0 =A0 =A0 =A0ret =3D cond_timedwait(&cond, &lock, &ts); > - =A0 =A0 =A0 =A0} > - > - =A0 =A0 =A0 =A0if (QTAILQ_EMPTY(&request_list)) > - =A0 =A0 =A0 =A0 =A0 =A0break; > - > - =A0 =A0 =A0 =A0aiocb =3D QTAILQ_FIRST(&request_list); > - =A0 =A0 =A0 =A0QTAILQ_REMOVE(&request_list, aiocb, node); > - =A0 =A0 =A0 =A0aiocb->active =3D 1; > - =A0 =A0 =A0 =A0idle_threads--; > - =A0 =A0 =A0 =A0mutex_unlock(&lock); > - > - =A0 =A0 =A0 =A0switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { > - =A0 =A0 =A0 =A0case QEMU_AIO_READ: > - =A0 =A0 =A0 =A0case QEMU_AIO_WRITE: > - =A0 =A0 =A0 =A0 =A0 =A0ret =3D handle_aiocb_rw(aiocb); > - =A0 =A0 =A0 =A0 =A0 =A0break; > - =A0 =A0 =A0 =A0case QEMU_AIO_FLUSH: > - =A0 =A0 =A0 =A0 =A0 =A0ret =3D handle_aiocb_flush(aiocb); > - =A0 =A0 =A0 =A0 =A0 =A0break; > - =A0 =A0 =A0 =A0case QEMU_AIO_IOCTL: > - =A0 =A0 =A0 =A0 =A0 =A0ret =3D handle_aiocb_ioctl(aiocb); > - =A0 =A0 =A0 =A0 =A0 =A0break; > - =A0 =A0 =A0 =A0default: > - =A0 =A0 =A0 =A0 =A0 =A0fprintf(stderr, "invalid aio request (0x%x)\n", = aiocb->aio_type); > - =A0 =A0 =A0 =A0 =A0 =A0ret =3D -EINVAL; > - =A0 =A0 =A0 =A0 =A0 =A0break; > - =A0 =A0 =A0 =A0} > - > - =A0 =A0 =A0 =A0mutex_lock(&lock); > - =A0 =A0 =A0 =A0aiocb->ret =3D ret; > - =A0 =A0 =A0 =A0idle_threads++; > - =A0 =A0 =A0 =A0mutex_unlock(&lock); > - > - =A0 =A0 =A0 =A0if (kill(pid, aiocb->ev_signo)) die("kill failed"); > + =A0 =A0switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { > + =A0 =A0case QEMU_AIO_READ: > + =A0 =A0case QEMU_AIO_WRITE: > + =A0 =A0 =A0 =A0ret =3D handle_aiocb_rw(aiocb); > + =A0 =A0 =A0 =A0break; > + =A0 =A0case QEMU_AIO_FLUSH: > + =A0 =A0 =A0 =A0ret =3D handle_aiocb_flush(aiocb); > + =A0 =A0 =A0 =A0break; > + =A0 =A0case QEMU_AIO_IOCTL: > + =A0 =A0 =A0 =A0ret =3D handle_aiocb_ioctl(aiocb); > + =A0 =A0 =A0 =A0break; > + =A0 =A0default: > + =A0 =A0 =A0 =A0fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->a= io_type); > + =A0 =A0 =A0 =A0ret =3D -EINVAL; > + =A0 =A0 =A0 =A0break; > =A0 =A0 } > > - =A0 =A0idle_threads--; > - =A0 =A0cur_threads--; > - =A0 =A0mutex_unlock(&lock); > - > - =A0 =A0return NULL; > -} > - > -static void spawn_thread(void) > -{ > - =A0 =A0sigset_t set, oldset; > - > - =A0 =A0cur_threads++; > - =A0 =A0idle_threads++; > + =A0 =A0qemu_mutex_lock(&aiocb_mutex); > + =A0 =A0aiocb->ret =3D ret; This is where qemu_cond_broadcast() is needed. > + =A0 =A0qemu_mutex_unlock(&aiocb_mutex); > > - =A0 =A0/* block all signals */ > - =A0 =A0if (sigfillset(&set)) die("sigfillset"); > - =A0 =A0if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask"); > - > - =A0 =A0thread_create(&thread_id, &attr, aio_thread, NULL); > - > - =A0 =A0if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask re= store"); > + =A0 =A0if (kill(pid, aiocb->ev_signo)) { > + =A0 =A0 =A0 =A0die("kill failed"); > + =A0 =A0} > =A0} > > =A0static void qemu_paio_submit(struct qemu_paiocb *aiocb) > =A0{ > + =A0 =A0qemu_mutex_lock(&aiocb_mutex); > =A0 =A0 aiocb->ret =3D -EINPROGRESS; > - =A0 =A0aiocb->active =3D 0; > - =A0 =A0mutex_lock(&lock); > - =A0 =A0if (idle_threads =3D=3D 0 && cur_threads < max_threads) > - =A0 =A0 =A0 =A0spawn_thread(); > - =A0 =A0QTAILQ_INSERT_TAIL(&request_list, aiocb, node); > - =A0 =A0mutex_unlock(&lock); > - =A0 =A0cond_signal(&cond); > + =A0 =A0qemu_mutex_unlock(&aiocb_mutex); > + > + =A0 =A0aiocb->work.func =3D aio_thread; > + =A0 =A0submit_work(&aiocb->work); > =A0} > > =A0static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) > =A0{ > =A0 =A0 ssize_t ret; > > - =A0 =A0mutex_lock(&lock); > + =A0 =A0qemu_mutex_lock(&aiocb_mutex); > =A0 =A0 ret =3D aiocb->ret; > - =A0 =A0mutex_unlock(&lock); > - > + =A0 =A0qemu_mutex_unlock(&aiocb_mutex); > + =A0 =A0qemu_cond_broadcast(&aiocb_completion); qemu_paio_return() gets the return value from qemu_paiocb. It shouldn't have side effects and is the wrong place to broadcast completion. > =A0 =A0 return ret; > =A0} > > @@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb) > =A0static void paio_cancel(BlockDriverAIOCB *blockacb) > =A0{ > =A0 =A0 struct qemu_paiocb *acb =3D (struct qemu_paiocb *)blockacb; > - =A0 =A0int active =3D 0; > - > - =A0 =A0mutex_lock(&lock); > - =A0 =A0if (!acb->active) { > - =A0 =A0 =A0 =A0QTAILQ_REMOVE(&request_list, acb, node); > - =A0 =A0 =A0 =A0acb->ret =3D -ECANCELED; > - =A0 =A0} else if (acb->ret =3D=3D -EINPROGRESS) { > - =A0 =A0 =A0 =A0active =3D 1; > - =A0 =A0} > - =A0 =A0mutex_unlock(&lock); > > - =A0 =A0if (active) { > - =A0 =A0 =A0 =A0/* fail safe: if the aio could not be canceled, we wait = for > - =A0 =A0 =A0 =A0 =A0 it */ > - =A0 =A0 =A0 =A0while (qemu_paio_error(acb) =3D=3D EINPROGRESS) > - =A0 =A0 =A0 =A0 =A0 =A0; > + =A0 =A0if (dequeue_work(&acb->work) !=3D 0) { > + =A0 =A0 =A0 =A0/* Wait for running work item to complete */ > + =A0 =A0 =A0 =A0qemu_mutex_lock(&aiocb_mutex); > + =A0 =A0 =A0 =A0while (acb->ret =3D=3D -EINPROGRESS) { > + =A0 =A0 =A0 =A0 =A0 =A0qemu_cond_wait(&aiocb_completion, &aiocb_mutex); > + =A0 =A0 =A0 =A0} > + =A0 =A0 =A0 =A0qemu_mutex_unlock(&aiocb_mutex); I wonder if the condition variable has a measurable performance overhead. We unconditionally broadcast on paiocb completion. One idea would be to keep a counter of waiters (should only ever be 0 or 1) protected by aiocb_mutex and broadcast only when there is a waiter. I just want to share this idea, I don't know if it's necessary to implement it or if it could even work without a race condition. > =A0 =A0 } > > =A0 =A0 paio_remove(acb); > @@ -618,11 +692,12 @@ int paio_init(void) > =A0 =A0 struct sigaction act; > =A0 =A0 PosixAioState *s; > =A0 =A0 int fds[2]; > - =A0 =A0int ret; > > =A0 =A0 if (posix_aio_state) > =A0 =A0 =A0 =A0 return 0; > > + =A0 =A0qemu_mutex_init(&aiocb_mutex); Also needs qemu_cond_init(&aiocb_completion). Stefan