From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:49178) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1ZC7K9-0004XW-Qt for qemu-devel@nongnu.org; Mon, 06 Jul 2015 10:24:51 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1ZC7K6-0001Mw-CL for qemu-devel@nongnu.org; Mon, 06 Jul 2015 10:24:45 -0400 Received: from mx1.redhat.com ([209.132.183.28]:34112) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1ZC7K6-0001Mj-2m for qemu-devel@nongnu.org; Mon, 06 Jul 2015 10:24:42 -0400 From: Stefan Hajnoczi Date: Mon, 6 Jul 2015 15:24:24 +0100 Message-Id: <1436192669-10062-6-git-send-email-stefanha@redhat.com> In-Reply-To: <1436192669-10062-1-git-send-email-stefanha@redhat.com> References: <1436192669-10062-1-git-send-email-stefanha@redhat.com> Subject: [Qemu-devel] [PATCH v2 05/10] block: add block job transactions List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: Kevin Wolf , famz@redhat.com, John Snow , Jeff Cody , mreitz@redhat.com, vsementsov@parallels.com, Stefan Hajnoczi Sometimes block jobs must execute as a transaction group. Finishing jobs wait until all other jobs are ready to complete successfully. Failure or cancellation of one job cancels the other jobs in the group. Signed-off-by: Stefan Hajnoczi --- v2: * Set txn pointer to NULL in block_job_txn_begin() [jsnow] * Rename block_job_txn_prepare_to_complete to block_job_txn_job_done [jsnow] * Rename block_job_txn_complete to block_job_txn_kick [jsnow] * Add BLOCK_JOB_TXN_CANCEL_PENDING to solve race condition on cancel [jsnow] * Document when txn may be NULL --- blockjob.c | 193 +++++++++++++++++++++++++++++++++++++++++++++++ include/block/block.h | 1 + include/block/blockjob.h | 52 +++++++++++++ trace-events | 4 + 4 files changed, 250 insertions(+) diff --git a/blockjob.c b/blockjob.c index ec46fad..d1f0206 100644 --- a/blockjob.c +++ b/blockjob.c @@ -400,3 +400,196 @@ void block_job_defer_to_main_loop(BlockJob *job, qemu_bh_schedule(data->bh); } + +typedef enum { + BLOCK_JOB_TXN_OK, /* no job failed yet */ + BLOCK_JOB_TXN_CANCEL_PENDING, /* kick scheduled to cancel jobs */ + BLOCK_JOB_TXN_CANCEL_DONE, /* cancelled jobs can terminate now */ +} BlockJobTxnState; + +/* Transactional group of block jobs */ +struct BlockJobTxn { + /* Jobs may be in different AioContexts so protect all fields */ + QemuMutex lock; + + /* Reference count for txn object */ + unsigned int ref; + + /* Is this txn ok or are jobs being cancelled? */ + BlockJobTxnState state; + + /* Number of jobs still running */ + unsigned int jobs_pending; + + /* List of jobs */ + QLIST_HEAD(, BlockJob) jobs; +}; + +BlockJobTxn *block_job_txn_new(void) +{ + BlockJobTxn *txn = g_new(BlockJobTxn, 1); + qemu_mutex_init(&txn->lock); + txn->ref = 1; /* dropped by block_job_txn_begin() */ + txn->state = BLOCK_JOB_TXN_OK; + txn->jobs_pending = 0; + QLIST_INIT(&txn->jobs); + return txn; +} + +static void block_job_txn_unref(BlockJobTxn *txn) +{ + qemu_mutex_lock(&txn->lock); + + if (--txn->ref > 0) { + qemu_mutex_unlock(&txn->lock); + return; + } + + qemu_mutex_unlock(&txn->lock); + qemu_mutex_destroy(&txn->lock); + g_free(txn); +} + +/* The purpose of this is to keep txn alive until all jobs have been added */ +void block_job_txn_begin(BlockJobTxn **txn) +{ + block_job_txn_unref(*txn); + *txn = NULL; +} + +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job) +{ + if (!txn) { + return; + } + + assert(!job->txn); + job->txn = txn; + + qemu_mutex_lock(&txn->lock); + txn->ref++; + txn->jobs_pending++; + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); + qemu_mutex_unlock(&txn->lock); +} + +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of + * successful completion. Runs from main loop. + */ +static void block_job_txn_kick(BlockJob *job, void *opaque) +{ + BlockJobTxn *txn = opaque; + BlockJob *other_job; + GSList *ctxs = NULL; + GSList *ctxs_iter; + bool cancel = false; + + qemu_mutex_lock(&txn->lock); + txn->ref++; /* keep txn alive until the end of this loop */ + + /* Acquire AioContexts so jobs cannot race with us */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + AioContext *ctx; + + qemu_mutex_unlock(&txn->lock); + ctx = bdrv_get_aio_context(other_job->bs); + aio_context_acquire(ctx); + ctxs = g_slist_prepend(ctxs, ctx); + qemu_mutex_lock(&txn->lock); + } + + /* From here on block_job_txn_job_done() callers should not wait */ + if (txn->state == BLOCK_JOB_TXN_CANCEL_PENDING) { + txn->state = BLOCK_JOB_TXN_CANCEL_DONE; + cancel = true; + } + + /* Kick jobs */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + qemu_mutex_unlock(&txn->lock); + + /* Don't cancel our own failed job since cancellation throws away the + * error value. + */ + if (cancel && other_job != job) { + block_job_cancel(other_job); + } else { + block_job_enter(other_job); + } + + qemu_mutex_lock(&txn->lock); + } + + qemu_mutex_unlock(&txn->lock); + block_job_txn_unref(txn); + + /* Release AioContexts */ + for (ctxs_iter = ctxs; ctxs_iter; ctxs_iter = ctxs_iter->next) { + aio_context_release(ctxs_iter->data); + } + g_slist_free(ctxs); +} + +void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn, + BlockJob *job, + int ret) +{ + if (!txn) { + return; + } + + qemu_mutex_lock(&txn->lock); + + /* This function is entered in 4 cases: + * + * 1. Successful job completion - wait for other jobs + * 2. First failed/cancelled job in txn - cancel other jobs and wait + * 3. Subsequent cancelled jobs - finish immediately, don't wait + * 4. While kick is pending - wait for kick to cancel or wake us + */ + trace_block_job_txn_job_done_entry(txn, job, ret, + block_job_is_cancelled(job), + txn->state, + txn->jobs_pending); + + if (txn->state == BLOCK_JOB_TXN_CANCEL_DONE) { /* Case 3 */ + assert(block_job_is_cancelled(job)); + goto out; /* already cancelled, don't yield */ + } + + if (txn->state == BLOCK_JOB_TXN_OK) { + if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */ + txn->state = BLOCK_JOB_TXN_CANCEL_PENDING; + block_job_defer_to_main_loop(job, block_job_txn_kick, txn); + } else { /* Case 1 */ + if (--txn->jobs_pending == 0) { + block_job_defer_to_main_loop(job, block_job_txn_kick, txn); + } + } + } + + /* Wait for block_job_txn_kick() in cases 1, 2, and 4 */ + do { + qemu_mutex_unlock(&txn->lock); + job->busy = false; + qemu_coroutine_yield(); + job->busy = true; + qemu_mutex_lock(&txn->lock); + + /* Did the user just cancel this job? */ + if (block_job_is_cancelled(job) && txn->state == BLOCK_JOB_TXN_OK) { + txn->state = BLOCK_JOB_TXN_CANCEL_PENDING; + block_job_defer_to_main_loop(job, block_job_txn_kick, txn); + } + } while (txn->state != BLOCK_JOB_TXN_CANCEL_DONE && + txn->jobs_pending > 0); + +out: + trace_block_job_txn_job_done_return(txn, job, ret, + block_job_is_cancelled(job), + txn->state, + txn->jobs_pending); + + qemu_mutex_unlock(&txn->lock); + block_job_txn_unref(txn); +} diff --git a/include/block/block.h b/include/block/block.h index 7437590..c7fc5b6 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -13,6 +13,7 @@ typedef struct BlockDriver BlockDriver; typedef struct BlockJob BlockJob; typedef struct BdrvChildRole BdrvChildRole; +typedef struct BlockJobTxn BlockJobTxn; typedef struct BlockDriverInfo { /* in bytes, 0 if irrelevant */ diff --git a/include/block/blockjob.h b/include/block/blockjob.h index 57d8ef1..7d6ffb7 100644 --- a/include/block/blockjob.h +++ b/include/block/blockjob.h @@ -122,6 +122,10 @@ struct BlockJob { /** The opaque value that is passed to the completion function. */ void *opaque; + + /** Non-NULL if this job is part of a transaction */ + BlockJobTxn *txn; + QLIST_ENTRY(BlockJob) txn_list; }; /** @@ -348,4 +352,52 @@ void block_job_defer_to_main_loop(BlockJob *job, BlockJobDeferToMainLoopFn *fn, void *opaque); +/** + * block_job_txn_new: + * + * Allocate and return a new block job transaction. Jobs can be added to the + * transaction using block_job_txn_add_job(). block_job_txn_begin() must be + * called when all jobs (if any) have been added. + * + * All jobs in the transaction either complete successfully or fail/cancel as a + * group. Jobs wait for each other before completing. Cancelling one job + * cancels all jobs in the transaction. + */ +BlockJobTxn *block_job_txn_new(void); + +/** + * block_job_txn_add_job: + * @txn: The transaction (may be NULL) + * @job: Job to add to the transaction + * + * Add @job to the transaction. The @job must not already be in a transaction. + * The block job driver must call block_job_txn_prepare_to_complete() before + * final cleanup and completion. + */ +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job); + +/** + * block_job_txn_begin: + * @txn: The transaction pointer + * + * Call this to mark the end of adding jobs to the transaction. This must be + * called even if no jobs were added. + * + * The caller may not add jobs after the transaction begins so the @txn pointer + * is set to NULL to show that the caller has released ownership. + */ +void block_job_txn_begin(BlockJobTxn **txn); + +/** + * block_job_txn_job_done: + * @txn: The transaction (may be NULL) + * @job: The block job + * @ret: Block job return value (0 for success, otherwise job failure) + * + * Wait for other jobs in the transaction to complete. If @ret is non-zero or + * @job is cancelled, all other jobs in the transaction will be cancelled. + */ +void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn, + BlockJob *job, int ret); + #endif diff --git a/trace-events b/trace-events index 52b7efa..5877289 100644 --- a/trace-events +++ b/trace-events @@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p" virtio_blk_data_plane_stop(void *s) "dataplane %p" virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u" +# blockjob.c +block_job_txn_job_done_entry(void *txn, void *job, int ret, bool cancelled, int status, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d status %d jobs_pending %u" +block_job_txn_job_done_return(void *txn, void *job, int ret, bool cancelled, int status, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d status %d jobs_pending %u" + # hw/virtio/dataplane/vring.c vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p" -- 2.4.3