From: Kent Overstreet <koverstreet@google.com>
To: Benjamin LaHaise <bcrl@kvack.org>
Cc: linux-kernel@vger.kernel.org, linux-aio@kvack.org,
linux-fsdevel@vger.kernel.org, zab@redhat.com, jmoyer@redhat.com,
axboe@kernel.dk, viro@zeniv.linux.org.uk
Subject: Re: [PATCH 12/25] aio: Refcounting cleanup
Date: Wed, 28 Nov 2012 17:38:58 -0800 [thread overview]
Message-ID: <20121129013858.GD15094@google.com> (raw)
In-Reply-To: <20121129004631.GE19042@kvack.org>
On Wed, Nov 28, 2012 at 07:46:31PM -0500, Benjamin LaHaise wrote:
> Hi Kent,
>
> On Wed, Nov 28, 2012 at 08:43:36AM -0800, Kent Overstreet wrote:
> > + * now it's safe to cancel any that need to be.
> > + */
> > +static void free_ioctx(struct kioctx *ctx)
> ...
> > + aio_nr -= ctx->max_reqs;
> > + spin_unlock(&aio_nr_lock);
> > +
> > + synchronize_rcu();
> > +
> > + pr_debug("freeing %p\n", ctx);
> > + kmem_cache_free(kioctx_cachep, ctx);
> > +}
>
> As mentioned on irc, we probably want to avoid the synchronize_rcu()
> overhead, since delays here will impact the time it takes for a task to
> exit. Cheers,
Yeah, suppose you're right. Have an updated patch below, which also
documents exactly what the rcu usage is for:
I'm not a big fan of the contortions where kill_ioctx() does one thing
and exit_aio() does a slightly different thing so the vm_munmap()
happens in the right context, but oh well.
commit 8f3f71c5e9ae0a76bcf019a8f00510076ecc052e
Author: Kent Overstreet <koverstreet@google.com>
Date: Wed Nov 28 17:27:19 2012 -0800
aio: Refcounting cleanup
The usage of ctx->dead was fubar - it makes no sense to explicitly
check it all over the place, especially when we're already using RCU.
Now, ctx->dead only indicates whether we've dropped the initial
refcount. The new teardown sequence is:
set ctx->dead
hlist_del_rcu();
synchronize_rcu();
Now we know no system calls can take a new ref, and it's safe to drop
the initial ref:
put_ioctx();
We also need to ensure there are no more outstanding kiocbs. This was
done incorrectly - it was being done in kill_ctx(), and before dropping
the initial refcount. At this point, other syscalls may still be
submitting kiocbs!
Now, we cancel and wait for outstanding kiocbs in free_ioctx(), after
kioctx->users has dropped to 0 and we know no more iocbs could be
submitted.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
diff --git a/fs/aio.c b/fs/aio.c
index 4c9a5bf..7b75590 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -79,7 +79,7 @@ static inline unsigned aio_ring_avail(struct aio_ring_info *info,
struct kioctx {
atomic_t users;
- int dead;
+ atomic_t dead;
/* This needs improving */
unsigned long user_id;
@@ -98,6 +98,7 @@ struct kioctx {
struct aio_ring_info ring_info;
struct rcu_head rcu_head;
+ struct work_struct rcu_work;
};
/*------ sysctl variables----*/
@@ -234,44 +235,6 @@ static int aio_setup_ring(struct kioctx *ctx)
kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
} while(0)
-static void ctx_rcu_free(struct rcu_head *head)
-{
- struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
- kmem_cache_free(kioctx_cachep, ctx);
-}
-
-/* __put_ioctx
- * Called when the last user of an aio context has gone away,
- * and the struct needs to be freed.
- */
-static void __put_ioctx(struct kioctx *ctx)
-{
- unsigned nr_events = ctx->max_reqs;
- BUG_ON(atomic_read(&ctx->reqs_active));
-
- aio_free_ring(ctx);
- if (nr_events) {
- spin_lock(&aio_nr_lock);
- BUG_ON(aio_nr - nr_events > aio_nr);
- aio_nr -= nr_events;
- spin_unlock(&aio_nr_lock);
- }
- pr_debug("freeing %p\n", ctx);
- call_rcu(&ctx->rcu_head, ctx_rcu_free);
-}
-
-static inline int try_get_ioctx(struct kioctx *kioctx)
-{
- return atomic_inc_not_zero(&kioctx->users);
-}
-
-static inline void put_ioctx(struct kioctx *kioctx)
-{
- BUG_ON(atomic_read(&kioctx->users) <= 0);
- if (unlikely(atomic_dec_and_test(&kioctx->users)))
- __put_ioctx(kioctx);
-}
-
static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
struct io_event *res)
{
@@ -295,6 +258,61 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
return ret;
}
+static void free_ioctx_rcu(struct rcu_head *head)
+{
+ struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+ kmem_cache_free(kioctx_cachep, ctx);
+}
+
+/*
+ * When this function runs, the kioctx has been removed from the "hash table"
+ * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
+ * now it's safe to cancel any that need to be.
+ */
+static void free_ioctx(struct kioctx *ctx)
+{
+ struct io_event res;
+ struct kiocb *iocb;
+
+ spin_lock_irq(&ctx->ctx_lock);
+
+ while (!list_empty(&ctx->active_reqs)) {
+ iocb = list_first_entry(&ctx->active_reqs,
+ struct kiocb, ki_list);
+
+ list_del_init(&iocb->ki_list);
+ kiocb_cancel(ctx, iocb, &res);
+ }
+
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ wait_event(ctx->wait, !atomic_read(&ctx->reqs_active));
+
+ aio_free_ring(ctx);
+
+ spin_lock(&aio_nr_lock);
+ BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
+ aio_nr -= ctx->max_reqs;
+ spin_unlock(&aio_nr_lock);
+
+ pr_debug("freeing %p\n", ctx);
+
+ /*
+ * Here the call_rcu() is between the wait_event() for reqs_active to
+ * hit 0, and freeing the ioctx.
+ *
+ * aio_complete() decrements reqs_active, but it has to touch the ioctx
+ * after to issue a wakeup so we use rcu.
+ */
+ call_rcu(&ctx->rcu_head, free_ioctx_rcu);
+}
+
+static void put_ioctx(struct kioctx *ctx)
+{
+ if (unlikely(atomic_dec_and_test(&ctx->users)))
+ free_ioctx(ctx);
+}
+
/* ioctx_alloc
* Allocates and initializes an ioctx. Returns an ERR_PTR if it failed.
*/
@@ -321,6 +339,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
ctx->max_reqs = nr_events;
atomic_set(&ctx->users, 2);
+ atomic_set(&ctx->dead, 0);
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);
@@ -358,43 +377,43 @@ out_freectx:
return ERR_PTR(err);
}
-/* kill_ctx
- * Cancels all outstanding aio requests on an aio context. Used
- * when the processes owning a context have all exited to encourage
- * the rapid destruction of the kioctx.
- */
-static void kill_ctx(struct kioctx *ctx)
+static void kill_ioctx_work(struct work_struct *work)
{
- struct task_struct *tsk = current;
- DECLARE_WAITQUEUE(wait, tsk);
- struct io_event res;
+ struct kioctx *ctx = container_of(work, struct kioctx, rcu_work);
- spin_lock_irq(&ctx->ctx_lock);
- ctx->dead = 1;
- while (!list_empty(&ctx->active_reqs)) {
- struct list_head *pos = ctx->active_reqs.next;
- struct kiocb *iocb = list_kiocb(pos);
- list_del_init(&iocb->ki_list);
+ wake_up_all(&ctx->wait);
+ put_ioctx(ctx);
+}
- kiocb_cancel(ctx, iocb, &res);
- }
+static void kill_ioctx_rcu(struct rcu_head *head)
+{
+ struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
- if (!atomic_read(&ctx->reqs_active))
- goto out;
+ INIT_WORK(&ctx->rcu_work, kill_ioctx_work);
+ schedule_work(&ctx->rcu_work);
+}
- add_wait_queue(&ctx->wait, &wait);
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- while (atomic_read(&ctx->reqs_active)) {
- spin_unlock_irq(&ctx->ctx_lock);
- io_schedule();
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- spin_lock_irq(&ctx->ctx_lock);
- }
- __set_task_state(tsk, TASK_RUNNING);
- remove_wait_queue(&ctx->wait, &wait);
+/* kill_ioctx
+ * Cancels all outstanding aio requests on an aio context. Used
+ * when the processes owning a context have all exited to encourage
+ * the rapid destruction of the kioctx.
+ */
+static void kill_ioctx(struct kioctx *ctx)
+{
+ if (!atomic_xchg(&ctx->dead, 1)) {
+ hlist_del_rcu(&ctx->list);
+ /* Between hlist_del_rcu() and dropping the initial ref */
+ synchronize_rcu();
-out:
- spin_unlock_irq(&ctx->ctx_lock);
+ /*
+ * We can't punt to workqueue here because put_ioctx() ->
+ * free_ioctx() will unmap the ringbuffer, and that has to be
+ * done in the original process's context. kill_ioctx_rcu/work()
+ * exist for exit_aio(), as in that path free_ioctx() won't do
+ * the unmap.
+ */
+ kill_ioctx_work(&ctx->rcu_work);
+ }
}
/* wait_on_sync_kiocb:
@@ -413,27 +432,25 @@ ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
}
EXPORT_SYMBOL(wait_on_sync_kiocb);
-/* exit_aio: called when the last user of mm goes away. At this point,
- * there is no way for any new requests to be submited or any of the
- * io_* syscalls to be called on the context. However, there may be
- * outstanding requests which hold references to the context; as they
- * go away, they will call put_ioctx and release any pinned memory
- * associated with the request (held via struct page * references).
+/*
+ * exit_aio: called when the last user of mm goes away. At this point, there is
+ * no way for any new requests to be submited or any of the io_* syscalls to be
+ * called on the context.
+ *
+ * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on
+ * them.
*/
void exit_aio(struct mm_struct *mm)
{
struct kioctx *ctx;
+ struct hlist_node *p, *n;
- while (!hlist_empty(&mm->ioctx_list)) {
- ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
- hlist_del_rcu(&ctx->list);
-
- kill_ctx(ctx);
-
+ hlist_for_each_entry_safe(ctx, p, n, &mm->ioctx_list, list) {
if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG
"exit_aio:ioctx still alive: %d %d %d\n",
- atomic_read(&ctx->users), ctx->dead,
+ atomic_read(&ctx->users),
+ atomic_read(&ctx->dead),
atomic_read(&ctx->reqs_active));
/*
* We don't need to bother with munmap() here -
@@ -444,7 +461,11 @@ void exit_aio(struct mm_struct *mm)
* place that uses ->mmap_size, so it's safe.
*/
ctx->ring_info.mmap_size = 0;
- put_ioctx(ctx);
+
+ if (!atomic_xchg(&ctx->dead, 1)) {
+ hlist_del_rcu(&ctx->list);
+ call_rcu(&ctx->rcu_head, kill_ioctx_rcu);
+ }
}
}
@@ -510,8 +531,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
kmem_cache_free(kiocb_cachep, req);
atomic_dec(&ctx->reqs_active);
}
- if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
- wake_up_all(&ctx->wait);
spin_unlock_irq(&ctx->ctx_lock);
}
@@ -607,18 +626,12 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
rcu_read_lock();
- hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
- /*
- * RCU protects us against accessing freed memory but
- * we have to be careful not to get a reference when the
- * reference count already dropped to 0 (ctx->dead test
- * is unreliable because of races).
- */
- if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
+ hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
+ if (ctx->user_id == ctx_id){
+ atomic_inc(&ctx->users);
ret = ctx;
break;
}
- }
rcu_read_unlock();
return ret;
@@ -655,12 +668,15 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
info = &ctx->ring_info;
- /* add a completion event to the ring buffer.
- * must be done holding ctx->ctx_lock to prevent
- * other code from messing with the tail
- * pointer since we might be called from irq
- * context.
+ /*
+ * Add a completion event to the ring buffer. Must be done holding
+ * ctx->ctx_lock to prevent other code from messing with the tail
+ * pointer since we might be called from irq context.
+ *
+ * Take rcu_read_lock() in case the kioctx is being destroyed, as we
+ * need to issue a wakeup after decrementing reqs_active.
*/
+ rcu_read_lock();
spin_lock_irqsave(&ctx->ctx_lock, flags);
list_del(&iocb->ki_list); /* remove from active_reqs */
@@ -726,6 +742,7 @@ put_rq:
wake_up(&ctx->wait);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ rcu_read_unlock();
}
EXPORT_SYMBOL(aio_complete);
@@ -869,7 +886,7 @@ static int read_events(struct kioctx *ctx,
break;
if (min_nr <= i)
break;
- if (unlikely(ctx->dead)) {
+ if (unlikely(atomic_read(&ctx->dead))) {
ret = -EINVAL;
break;
}
@@ -912,35 +929,6 @@ out:
return i ? i : ret;
}
-/* Take an ioctx and remove it from the list of ioctx's. Protects
- * against races with itself via ->dead.
- */
-static void io_destroy(struct kioctx *ioctx)
-{
- struct mm_struct *mm = current->mm;
- int was_dead;
-
- /* delete the entry from the list is someone else hasn't already */
- spin_lock(&mm->ioctx_lock);
- was_dead = ioctx->dead;
- ioctx->dead = 1;
- hlist_del_rcu(&ioctx->list);
- spin_unlock(&mm->ioctx_lock);
-
- pr_debug("(%p)\n", ioctx);
- if (likely(!was_dead))
- put_ioctx(ioctx); /* twice for the list */
-
- kill_ctx(ioctx);
-
- /*
- * Wake up any waiters. The setting of ctx->dead must be seen
- * by other CPUs at this point. Right now, we rely on the
- * locking done by the above calls to ensure this consistency.
- */
- wake_up_all(&ioctx->wait);
-}
-
/* sys_io_setup:
* Create an aio_context capable of receiving at least nr_events.
* ctxp must not point to an aio_context that already exists, and
@@ -976,7 +964,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
if (!IS_ERR(ioctx)) {
ret = put_user(ioctx->user_id, ctxp);
if (ret)
- io_destroy(ioctx);
+ kill_ioctx(ioctx);
put_ioctx(ioctx);
}
@@ -994,7 +982,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
{
struct kioctx *ioctx = lookup_ioctx(ctx);
if (likely(NULL != ioctx)) {
- io_destroy(ioctx);
+ kill_ioctx(ioctx);
put_ioctx(ioctx);
return 0;
}
@@ -1297,25 +1285,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
if (ret)
goto out_put_req;
- spin_lock_irq(&ctx->ctx_lock);
- /*
- * We could have raced with io_destroy() and are currently holding a
- * reference to ctx which should be destroyed. We cannot submit IO
- * since ctx gets freed as soon as io_submit() puts its reference. The
- * check here is reliable: io_destroy() sets ctx->dead before waiting
- * for outstanding IO and the barrier between these two is realized by
- * unlock of mm->ioctx_lock and lock of ctx->ctx_lock. Analogously we
- * increment ctx->reqs_active before checking for ctx->dead and the
- * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we
- * don't see ctx->dead set here, io_destroy() waits for our IO to
- * finish.
- */
- if (ctx->dead)
- ret = -EINVAL;
- spin_unlock_irq(&ctx->ctx_lock);
- if (ret)
- goto out_put_req;
-
if (unlikely(kiocbIsCancelled(req))) {
ret = -EINTR;
} else {
@@ -1341,9 +1310,6 @@ out_put_req:
spin_unlock_irq(&ctx->ctx_lock);
atomic_dec(&ctx->reqs_active);
- if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
- wake_up_all(&ctx->wait);
-
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
next prev parent reply other threads:[~2012-11-29 1:38 UTC|newest]
Thread overview: 53+ messages / expand[flat|nested] mbox.gz Atom feed top
2012-11-28 16:43 [PATCH 00/25] AIO performance improvements/cleanups Kent Overstreet
2012-11-28 16:43 ` [PATCH 01/25] mm: remove old aio use_mm() comment Kent Overstreet
2012-11-28 16:43 ` [PATCH 02/25] aio: remove dead code from aio.h Kent Overstreet
2012-11-28 16:43 ` [PATCH 03/25] gadget: remove only user of aio retry Kent Overstreet
2012-11-28 16:43 ` [PATCH 04/25] aio: remove retry-based AIO Kent Overstreet
2012-11-28 16:43 ` [PATCH 05/25] char: add aio_{read,write} to /dev/{null,zero} Kent Overstreet
2012-11-28 16:43 ` [PATCH 06/25] aio: Kill return value of aio_complete() Kent Overstreet
2012-11-28 16:43 ` [PATCH 07/25] aio: kiocb_cancel() Kent Overstreet
2012-11-29 0:07 ` Zach Brown
2012-11-29 0:58 ` Kent Overstreet
2012-11-28 16:43 ` [PATCH 08/25] aio: Move private stuff out of aio.h Kent Overstreet
2012-11-28 16:43 ` [PATCH 09/25] aio: dprintk() -> pr_debug() Kent Overstreet
2012-11-28 16:43 ` [PATCH 10/25] aio: do fget() after aio_get_req() Kent Overstreet
2012-11-28 16:43 ` [PATCH 11/25] aio: Make aio_put_req() lockless Kent Overstreet
2012-11-28 16:43 ` [PATCH 12/25] aio: Refcounting cleanup Kent Overstreet
2012-11-29 0:17 ` Zach Brown
2012-11-29 1:12 ` Kent Overstreet
2012-11-29 0:46 ` Benjamin LaHaise
2012-11-29 1:38 ` Kent Overstreet [this message]
2012-11-28 16:43 ` [PATCH 13/25] aio: Convert read_events() to hrtimers Kent Overstreet
2012-11-29 0:24 ` Zach Brown
2012-11-29 1:05 ` Kent Overstreet
2012-11-28 16:43 ` [PATCH 14/25] aio: Make aio_read_evt() more efficient Kent Overstreet
2012-11-29 0:38 ` Zach Brown
2012-11-29 19:31 ` Kent Overstreet
2012-11-30 0:20 ` Kent Overstreet
2012-11-28 16:43 ` [PATCH 15/25] aio: Use cancellation list lazily Kent Overstreet
2012-11-28 16:43 ` [PATCH 16/25] aio: Change reqs_active to include unreaped completions Kent Overstreet
2012-11-28 16:43 ` [PATCH 17/25] aio: Kill batch allocation Kent Overstreet
2012-11-28 16:43 ` [PATCH 18/25] aio: Kill struct aio_ring_info Kent Overstreet
2012-11-28 16:43 ` [PATCH 19/25] aio: Give shared kioctx fields their own cachelines Kent Overstreet
2012-11-28 16:43 ` [PATCH 20/25] aio: reqs_active -> reqs_available Kent Overstreet
2012-11-28 16:43 ` [PATCH 21/25] aio: percpu reqs_available Kent Overstreet
2012-11-28 16:43 ` [PATCH 22/25] Generic dynamic per cpu refcounting Kent Overstreet
2012-11-29 18:45 ` Andi Kleen
2012-11-29 18:57 ` Kent Overstreet
2012-11-29 18:59 ` Andi Kleen
2012-11-29 19:12 ` Kent Overstreet
2012-11-29 19:20 ` Andi Kleen
2012-11-29 19:29 ` Kent Overstreet
2012-11-29 19:34 ` Benjamin LaHaise
2012-11-29 20:22 ` Kent Overstreet
2012-11-29 20:42 ` Andi Kleen
2012-11-29 20:45 ` Kent Overstreet
2012-11-29 20:54 ` Andi Kleen
2012-11-29 20:59 ` Kent Overstreet
2012-11-29 21:57 ` Jamie Lokier
2012-11-28 16:43 ` [PATCH 23/25] aio: Percpu ioctx refcount Kent Overstreet
2012-11-28 16:43 ` [PATCH 24/25] aio: use xchg() instead of completion_lock Kent Overstreet
2012-11-28 16:43 ` [PATCH 25/25] aio: Don't include aio.h in sched.h Kent Overstreet
2012-11-29 0:03 ` [PATCH 00/25] AIO performance improvements/cleanups Zach Brown
2012-11-29 19:01 ` Kent Overstreet
-- strict thread matches above, loose matches on Subject: below --
2012-11-28 3:19 Kent Overstreet
2012-11-28 3:19 ` [PATCH 12/25] aio: Refcounting cleanup Kent Overstreet
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20121129013858.GD15094@google.com \
--to=koverstreet@google.com \
--cc=axboe@kernel.dk \
--cc=bcrl@kvack.org \
--cc=jmoyer@redhat.com \
--cc=linux-aio@kvack.org \
--cc=linux-fsdevel@vger.kernel.org \
--cc=linux-kernel@vger.kernel.org \
--cc=viro@zeniv.linux.org.uk \
--cc=zab@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).