* [PATCH 16/32] aio: Use cancellation list lazily
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Cancelling kiocbs requires adding them to a per kioctx linked list,
which is one of the few things we need to take the kioctx lock for in
the fast path. But most kiocbs can't be cancelled - so if we just do
this lazily, we can avoid quite a bit of locking overhead.
While we're at it, instead of using a flag bit switch to using ki_cancel
itself to indicate that a kiocb has been cancelled/completed. This lets
us get rid of ki_flags entirely.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
drivers/usb/gadget/inode.c | 3 +-
fs/aio.c | 95 +++++++++++++++++++++++++---------------------
include/linux/aio.h | 16 ++++----
3 files changed, 59 insertions(+), 55 deletions(-)
diff --git a/drivers/usb/gadget/inode.c b/drivers/usb/gadget/inode.c
index 7640e01..3bf0c35 100644
--- a/drivers/usb/gadget/inode.c
+++ b/drivers/usb/gadget/inode.c
@@ -534,7 +534,6 @@ static int ep_aio_cancel(struct kiocb *iocb, struct io_event *e)
local_irq_disable();
epdata = priv->epdata;
// spin_lock(&epdata->dev->lock);
- kiocbSetCancelled(iocb);
if (likely(epdata && epdata->ep && priv->req))
value = usb_ep_dequeue (epdata->ep, priv->req);
else
@@ -664,7 +663,7 @@ fail:
goto fail;
}
- iocb->ki_cancel = ep_aio_cancel;
+ kiocb_set_cancel_fn(iocb, ep_aio_cancel);
get_ep(epdata);
priv->epdata = epdata;
priv->actual = 0;
diff --git a/fs/aio.c b/fs/aio.c
index c1047c8..276c6ea 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -97,6 +97,8 @@ struct kioctx {
struct aio_ring_info ring_info;
+ spinlock_t completion_lock;
+
struct rcu_head rcu_head;
struct work_struct rcu_work;
};
@@ -217,25 +219,40 @@ static int aio_setup_ring(struct kioctx *ctx)
#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
+void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
+{
+ if (!req->ki_list.next) {
+ struct kioctx *ctx = req->ki_ctx;
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->ctx_lock, flags);
+ list_add(&req->ki_list, &ctx->active_reqs);
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ }
+
+ req->ki_cancel = cancel;
+}
+EXPORT_SYMBOL(kiocb_set_cancel_fn);
+
static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
struct io_event *res)
{
- int (*cancel)(struct kiocb *, struct io_event *);
+ kiocb_cancel_fn *cancel;
int ret = -EINVAL;
- cancel = kiocb->ki_cancel;
- kiocbSetCancelled(kiocb);
- if (cancel) {
- atomic_inc(&kiocb->ki_users);
- spin_unlock_irq(&ctx->ctx_lock);
+ cancel = xchg(&kiocb->ki_cancel, KIOCB_CANCELLED);
+ if (!cancel || cancel == KIOCB_CANCELLED)
+ return ret;
+
+ atomic_inc(&kiocb->ki_users);
+ spin_unlock_irq(&ctx->ctx_lock);
- memset(res, 0, sizeof(*res));
- res->obj = (u64) kiocb->ki_obj.user;
- res->data = kiocb->ki_user_data;
- ret = cancel(kiocb, res);
+ memset(res, 0, sizeof(*res));
+ res->obj = (u64) kiocb->ki_obj.user;
+ res->data = kiocb->ki_user_data;
+ ret = cancel(kiocb, res);
- spin_lock_irq(&ctx->ctx_lock);
- }
+ spin_lock_irq(&ctx->ctx_lock);
return ret;
}
@@ -323,6 +340,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->users, 2);
atomic_set(&ctx->dead, 0);
spin_lock_init(&ctx->ctx_lock);
+ spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);
@@ -465,20 +483,12 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
{
struct kiocb *req = NULL;
- req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
+ req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req))
return NULL;
- req->ki_flags = 0;
atomic_set(&req->ki_users, 2);
- req->ki_key = 0;
req->ki_ctx = ctx;
- req->ki_cancel = NULL;
- req->ki_retry = NULL;
- req->ki_dtor = NULL;
- req->private = NULL;
- req->ki_iovec = NULL;
- req->ki_eventfd = NULL;
return req;
}
@@ -509,7 +519,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
spin_lock_irq(&ctx->ctx_lock);
list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
list_del(&req->ki_batch);
- list_del(&req->ki_list);
kmem_cache_free(kiocb_cachep, req);
atomic_dec(&ctx->reqs_active);
}
@@ -555,10 +564,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
}
batch->count -= allocated;
- list_for_each_entry(req, &batch->head, ki_batch) {
- list_add(&req->ki_list, &ctx->active_reqs);
- atomic_inc(&ctx->reqs_active);
- }
+ atomic_add(allocated, &ctx->reqs_active);
kunmap_atomic(ring);
spin_unlock_irq(&ctx->ctx_lock);
@@ -649,25 +655,34 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
info = &ctx->ring_info;
/*
- * Add a completion event to the ring buffer. Must be done holding
- * ctx->ctx_lock to prevent other code from messing with the tail
- * pointer since we might be called from irq context.
- *
* Take rcu_read_lock() in case the kioctx is being destroyed, as we
* need to issue a wakeup after decrementing reqs_active.
*/
rcu_read_lock();
- spin_lock_irqsave(&ctx->ctx_lock, flags);
- list_del(&iocb->ki_list); /* remove from active_reqs */
+ if (iocb->ki_list.next) {
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->ctx_lock, flags);
+ list_del(&iocb->ki_list);
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ }
/*
* cancelled requests don't get events, userland was given one
* when the event got cancelled.
*/
- if (kiocbIsCancelled(iocb))
+ if (unlikely(xchg(&iocb->ki_cancel,
+ KIOCB_CANCELLED) == KIOCB_CANCELLED))
goto put_rq;
+ /*
+ * Add a completion event to the ring buffer. Must be done holding
+ * ctx->ctx_lock to prevent other code from messing with the tail
+ * pointer since we might be called from irq context.
+ */
+ spin_lock_irqsave(&ctx->completion_lock, flags);
+
tail = info->tail;
pos = tail + AIO_EVENTS_OFFSET;
@@ -701,6 +716,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
kunmap_atomic(ring);
flush_dcache_page(info->ring_pages[0]);
+ spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
pr_debug("added to ring %p at [%u]\n", iocb, tail);
/*
@@ -727,7 +744,6 @@ put_rq:
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
rcu_read_unlock();
}
EXPORT_SYMBOL(aio_complete);
@@ -1196,15 +1212,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
req->ki_opcode = iocb->aio_lio_opcode;
ret = aio_setup_iocb(req, compat);
-
if (ret)
goto out_put_req;
- if (unlikely(kiocbIsCancelled(req))) {
- ret = -EINTR;
- } else {
- ret = req->ki_retry(req);
- }
+ ret = req->ki_retry(req);
if (ret != -EIOCBQUEUED) {
/*
* There's no easy way to restart the syscall since other AIO's
@@ -1220,10 +1231,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
- spin_lock_irq(&ctx->ctx_lock);
- list_del(&req->ki_list);
- spin_unlock_irq(&ctx->ctx_lock);
-
atomic_dec(&ctx->reqs_active);
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 1e728f0..fc3c467 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -10,17 +10,13 @@
#include <linux/atomic.h>
struct kioctx;
+struct kiocb;
#define KIOCB_SYNC_KEY (~0U)
-/* ki_flags bits */
-#define KIF_CANCELLED 2
+#define KIOCB_CANCELLED ((void *) (~0ULL))
-#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-
-#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-
-#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
/* is there a better place to document function pointer methods? */
/**
@@ -48,13 +44,12 @@ struct kioctx;
* calls may result in undefined behaviour.
*/
struct kiocb {
- unsigned long ki_flags;
atomic_t ki_users;
unsigned ki_key; /* id of this request */
struct file *ki_filp;
struct kioctx *ki_ctx; /* may be NULL for sync ops */
- int (*ki_cancel)(struct kiocb *, struct io_event *);
+ kiocb_cancel_fn *ki_cancel;
ssize_t (*ki_retry)(struct kiocb *);
void (*ki_dtor)(struct kiocb *);
@@ -112,6 +107,7 @@ struct mm_struct;
extern void exit_aio(struct mm_struct *mm);
extern long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user *__user *iocbpp, bool compat);
+void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel);
#else
static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
static inline void aio_put_req(struct kiocb *iocb) { }
@@ -121,6 +117,8 @@ static inline void exit_aio(struct mm_struct *mm) { }
static inline long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user * __user *iocbpp,
bool compat) { return 0; }
+static inline void kiocb_set_cancel_fn(struct kiocb *req,
+ kiocb_cancel_fn *cancel) { }
#endif /* CONFIG_AIO */
static inline struct kiocb *list_kiocb(struct list_head *h)
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 17/32] aio: Change reqs_active to include unreaped completions
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
The aio code tries really hard to avoid having to deal with the
completion ringbuffer overflowing. To do that, it has to keep track of
the number of outstanding kiocbs, and the number of completions
currently in the ringbuffer - and it's got to check that every time we
allocate a kiocb. Ouch.
But - we can improve this quite a bit if we just change reqs_active to
mean "number of outstanding requests and unreaped completions" - that
means kiocb allocation doesn't have to look at the ringbuffer, which is
a fairly significant win.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 38 +++++++++++++++++++++++++-------------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 276c6ea..b1be0cf 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -71,12 +71,6 @@ struct aio_ring_info {
struct page *internal_pages[AIO_RING_PAGES];
};
-static inline unsigned aio_ring_avail(struct aio_ring_info *info,
- struct aio_ring *ring)
-{
- return (ring->head + info->nr - 1 - ring->tail) % info->nr;
-}
-
struct kioctx {
atomic_t users;
atomic_t dead;
@@ -270,8 +264,11 @@ static void free_ioctx_rcu(struct rcu_head *head)
*/
static void free_ioctx(struct kioctx *ctx)
{
+ struct aio_ring_info *info = &ctx->ring_info;
+ struct aio_ring *ring;
struct io_event res;
struct kiocb *req;
+ unsigned head, avail;
spin_lock_irq(&ctx->ctx_lock);
@@ -285,7 +282,21 @@ static void free_ioctx(struct kioctx *ctx)
spin_unlock_irq(&ctx->ctx_lock);
- wait_event(ctx->wait, !atomic_read(&ctx->reqs_active));
+ ring = kmap_atomic(info->ring_pages[0]);
+ head = ring->head;
+ kunmap_atomic(ring);
+
+ while (atomic_read(&ctx->reqs_active) > 0) {
+ wait_event(ctx->wait, head != info->tail);
+
+ avail = (head < info->tail ? info->tail : info->nr) - head;
+
+ atomic_sub(avail, &ctx->reqs_active);
+ head += avail;
+ head %= info->nr;
+ }
+
+ WARN_ON(atomic_read(&ctx->reqs_active) < 0);
aio_free_ring(ctx);
@@ -534,7 +545,6 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
unsigned short allocated, to_alloc;
long avail;
struct kiocb *req, *n;
- struct aio_ring *ring;
to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
for (allocated = 0; allocated < to_alloc; allocated++) {
@@ -549,9 +559,8 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
goto out;
spin_lock_irq(&ctx->ctx_lock);
- ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
- avail = aio_ring_avail(&ctx->ring_info, ring) - atomic_read(&ctx->reqs_active);
+ avail = ctx->ring_info.nr - atomic_read(&ctx->reqs_active);
BUG_ON(avail < 0);
if (avail < allocated) {
/* Trim back the number of requests. */
@@ -566,7 +575,6 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
batch->count -= allocated;
atomic_add(allocated, &ctx->reqs_active);
- kunmap_atomic(ring);
spin_unlock_irq(&ctx->ctx_lock);
out:
@@ -673,8 +681,11 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
* when the event got cancelled.
*/
if (unlikely(xchg(&iocb->ki_cancel,
- KIOCB_CANCELLED) == KIOCB_CANCELLED))
+ KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
+ atomic_dec(&ctx->reqs_active);
+ /* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
+ }
/*
* Add a completion event to the ring buffer. Must be done holding
@@ -731,7 +742,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
put_rq:
/* everything turned out well, dispose of the aiocb. */
aio_put_req(iocb);
- atomic_dec(&ctx->reqs_active);
/*
* We have to order our ring_info tail store above and test
@@ -812,6 +822,8 @@ static int aio_read_events_ring(struct kioctx *ctx,
flush_dcache_page(info->ring_pages[0]);
pr_debug("%d h%u t%u\n", ret, head, info->tail);
+
+ atomic_sub(ret, &ctx->reqs_active);
out:
mutex_unlock(&info->ring_lock);
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 18/32] aio: Kill batch allocation
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Previously, allocating a kiocb required touching quite a few global
(well, per kioctx) cachelines... so batching up allocation to amortize
those was worthwhile. But we've gotten rid of some of those, and in
another couple of patches kiocb allocation won't require writing to any
shared cachelines, so that means we can just rip this code out.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 116 +++++++---------------------------------------------
include/linux/aio.h | 1 -
2 files changed, 15 insertions(+), 102 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index b1be0cf..5ca383e 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -490,108 +490,27 @@ void exit_aio(struct mm_struct *mm)
* This prevents races between the aio code path referencing the
* req (after submitting it) and aio_complete() freeing the req.
*/
-static struct kiocb *__aio_get_req(struct kioctx *ctx)
+static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
- struct kiocb *req = NULL;
+ struct kiocb *req;
+
+ if (atomic_read(&ctx->reqs_active) >= ctx->ring_info.nr)
+ return NULL;
+
+ if (atomic_inc_return(&ctx->reqs_active) > ctx->ring_info.nr)
+ goto out_put;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req))
- return NULL;
+ goto out_put;
atomic_set(&req->ki_users, 2);
req->ki_ctx = ctx;
return req;
-}
-
-/*
- * struct kiocb's are allocated in batches to reduce the number of
- * times the ctx lock is acquired and released.
- */
-#define KIOCB_BATCH_SIZE 32L
-struct kiocb_batch {
- struct list_head head;
- long count; /* number of requests left to allocate */
-};
-
-static void kiocb_batch_init(struct kiocb_batch *batch, long total)
-{
- INIT_LIST_HEAD(&batch->head);
- batch->count = total;
-}
-
-static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
-{
- struct kiocb *req, *n;
-
- if (list_empty(&batch->head))
- return;
-
- spin_lock_irq(&ctx->ctx_lock);
- list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
- list_del(&req->ki_batch);
- kmem_cache_free(kiocb_cachep, req);
- atomic_dec(&ctx->reqs_active);
- }
- spin_unlock_irq(&ctx->ctx_lock);
-}
-
-/*
- * Allocate a batch of kiocbs. This avoids taking and dropping the
- * context lock a lot during setup.
- */
-static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
-{
- unsigned short allocated, to_alloc;
- long avail;
- struct kiocb *req, *n;
-
- to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
- for (allocated = 0; allocated < to_alloc; allocated++) {
- req = __aio_get_req(ctx);
- if (!req)
- /* allocation failed, go with what we've got */
- break;
- list_add(&req->ki_batch, &batch->head);
- }
-
- if (allocated == 0)
- goto out;
-
- spin_lock_irq(&ctx->ctx_lock);
-
- avail = ctx->ring_info.nr - atomic_read(&ctx->reqs_active);
- BUG_ON(avail < 0);
- if (avail < allocated) {
- /* Trim back the number of requests. */
- list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
- list_del(&req->ki_batch);
- kmem_cache_free(kiocb_cachep, req);
- if (--allocated <= avail)
- break;
- }
- }
-
- batch->count -= allocated;
- atomic_add(allocated, &ctx->reqs_active);
-
- spin_unlock_irq(&ctx->ctx_lock);
-
-out:
- return allocated;
-}
-
-static inline struct kiocb *aio_get_req(struct kioctx *ctx,
- struct kiocb_batch *batch)
-{
- struct kiocb *req;
-
- if (list_empty(&batch->head))
- if (kiocb_batch_refill(ctx, batch) == 0)
- return NULL;
- req = list_first_entry(&batch->head, struct kiocb, ki_batch);
- list_del(&req->ki_batch);
- return req;
+out_put:
+ atomic_dec(&ctx->reqs_active);
+ return NULL;
}
static void kiocb_free(struct kiocb *req)
@@ -1162,8 +1081,7 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
}
static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
- struct iocb *iocb, struct kiocb_batch *batch,
- bool compat)
+ struct iocb *iocb, bool compat)
{
struct kiocb *req;
ssize_t ret;
@@ -1184,7 +1102,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return -EINVAL;
}
- req = aio_get_req(ctx, batch); /* returns with 2 references to req */
+ req = aio_get_req(ctx); /* returns with 2 references to req */
if (unlikely(!req))
return -EAGAIN;
@@ -1256,7 +1174,6 @@ long do_io_submit(aio_context_t ctx_id, long nr,
long ret = 0;
int i = 0;
struct blk_plug plug;
- struct kiocb_batch batch;
if (unlikely(nr < 0))
return -EINVAL;
@@ -1273,8 +1190,6 @@ long do_io_submit(aio_context_t ctx_id, long nr,
return -EINVAL;
}
- kiocb_batch_init(&batch, nr);
-
blk_start_plug(&plug);
/*
@@ -1295,13 +1210,12 @@ long do_io_submit(aio_context_t ctx_id, long nr,
break;
}
- ret = io_submit_one(ctx, user_iocb, &tmp, &batch, compat);
+ ret = io_submit_one(ctx, user_iocb, &tmp, compat);
if (ret)
break;
}
blk_finish_plug(&plug);
- kiocb_batch_free(ctx, &batch);
put_ioctx(ctx);
return i ? i : ret;
}
diff --git a/include/linux/aio.h b/include/linux/aio.h
index fc3c467..58adc56 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -74,7 +74,6 @@ struct kiocb {
struct list_head ki_list; /* the aio core uses this
* for cancellation */
- struct list_head ki_batch; /* batch allocation */
/*
* If the aio_resfd field of the userspace iocb is not zero,
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 19/32] aio: Kill struct aio_ring_info
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
struct aio_ring_info was kind of odd, the only place it's used is where
it's embedded in struct kioctx - there's no real need for it.
The next patch rearranges struct kioctx and puts various things on their
own cachelines - getting rid of struct aio_ring_info now makes that
reordering a bit clearer.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 149 ++++++++++++++++++++++++++++++---------------------------------
1 file changed, 71 insertions(+), 78 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 5ca383e..96fbd6b 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -58,18 +58,6 @@ struct aio_ring {
}; /* 128 bytes + ring size */
#define AIO_RING_PAGES 8
-struct aio_ring_info {
- unsigned long mmap_base;
- unsigned long mmap_size;
-
- struct page **ring_pages;
- struct mutex ring_lock;
- long nr_pages;
-
- unsigned nr, tail;
-
- struct page *internal_pages[AIO_RING_PAGES];
-};
struct kioctx {
atomic_t users;
@@ -86,12 +74,27 @@ struct kioctx {
atomic_t reqs_active;
struct list_head active_reqs; /* used for cancellation */
+ unsigned nr;
+
/* sys_io_setup currently limits this to an unsigned int */
unsigned max_reqs;
- struct aio_ring_info ring_info;
+ unsigned long mmap_base;
+ unsigned long mmap_size;
+
+ struct page **ring_pages;
+ long nr_pages;
- spinlock_t completion_lock;
+ struct {
+ struct mutex ring_lock;
+ } ____cacheline_aligned;
+
+ struct {
+ unsigned tail;
+ spinlock_t completion_lock;
+ } ____cacheline_aligned;
+
+ struct page *internal_pages[AIO_RING_PAGES];
struct rcu_head rcu_head;
struct work_struct rcu_work;
@@ -123,26 +126,21 @@ __initcall(aio_setup);
static void aio_free_ring(struct kioctx *ctx)
{
- struct aio_ring_info *info = &ctx->ring_info;
long i;
- for (i=0; i<info->nr_pages; i++)
- put_page(info->ring_pages[i]);
+ for (i = 0; i < ctx->nr_pages; i++)
+ put_page(ctx->ring_pages[i]);
- if (info->mmap_size) {
- vm_munmap(info->mmap_base, info->mmap_size);
- }
+ if (ctx->mmap_size)
+ vm_munmap(ctx->mmap_base, ctx->mmap_size);
- if (info->ring_pages && info->ring_pages != info->internal_pages)
- kfree(info->ring_pages);
- info->ring_pages = NULL;
- info->nr = 0;
+ if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages)
+ kfree(ctx->ring_pages);
}
static int aio_setup_ring(struct kioctx *ctx)
{
struct aio_ring *ring;
- struct aio_ring_info *info = &ctx->ring_info;
unsigned nr_events = ctx->max_reqs;
struct mm_struct *mm = current->mm;
unsigned long size;
@@ -160,42 +158,42 @@ static int aio_setup_ring(struct kioctx *ctx)
nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
- info->nr = 0;
- info->ring_pages = info->internal_pages;
+ ctx->nr = 0;
+ ctx->ring_pages = ctx->internal_pages;
if (nr_pages > AIO_RING_PAGES) {
- info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
- if (!info->ring_pages)
+ ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
+ if (!ctx->ring_pages)
return -ENOMEM;
}
- info->mmap_size = nr_pages * PAGE_SIZE;
- pr_debug("attempting mmap of %lu bytes\n", info->mmap_size);
+ ctx->mmap_size = nr_pages * PAGE_SIZE;
+ pr_debug("attempting mmap of %lu bytes\n", ctx->mmap_size);
down_write(&mm->mmap_sem);
- info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size,
- PROT_READ|PROT_WRITE,
- MAP_ANONYMOUS|MAP_PRIVATE, 0);
- if (IS_ERR((void *)info->mmap_base)) {
+ ctx->mmap_base = do_mmap_pgoff(NULL, 0, ctx->mmap_size,
+ PROT_READ|PROT_WRITE,
+ MAP_ANONYMOUS|MAP_PRIVATE, 0);
+ if (IS_ERR((void *)ctx->mmap_base)) {
up_write(&mm->mmap_sem);
- info->mmap_size = 0;
+ ctx->mmap_size = 0;
aio_free_ring(ctx);
return -EAGAIN;
}
- pr_debug("mmap address: 0x%08lx\n", info->mmap_base);
- info->nr_pages = get_user_pages(current, mm, info->mmap_base, nr_pages,
- 1, 0, info->ring_pages, NULL);
+ pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base);
+ ctx->nr_pages = get_user_pages(current, mm, ctx->mmap_base, nr_pages,
+ 1, 0, ctx->ring_pages, NULL);
up_write(&mm->mmap_sem);
- if (unlikely(info->nr_pages != nr_pages)) {
+ if (unlikely(ctx->nr_pages != nr_pages)) {
aio_free_ring(ctx);
return -EAGAIN;
}
- ctx->user_id = info->mmap_base;
+ ctx->user_id = ctx->mmap_base;
- info->nr = nr_events; /* trusted copy */
+ ctx->nr = nr_events; /* trusted copy */
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
ring->nr = nr_events; /* user copy */
ring->id = ctx->user_id;
ring->head = ring->tail = 0;
@@ -204,7 +202,7 @@ static int aio_setup_ring(struct kioctx *ctx)
ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
ring->header_length = sizeof(struct aio_ring);
kunmap_atomic(ring);
- flush_dcache_page(info->ring_pages[0]);
+ flush_dcache_page(ctx->ring_pages[0]);
return 0;
}
@@ -264,7 +262,6 @@ static void free_ioctx_rcu(struct rcu_head *head)
*/
static void free_ioctx(struct kioctx *ctx)
{
- struct aio_ring_info *info = &ctx->ring_info;
struct aio_ring *ring;
struct io_event res;
struct kiocb *req;
@@ -282,18 +279,18 @@ static void free_ioctx(struct kioctx *ctx)
spin_unlock_irq(&ctx->ctx_lock);
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head;
kunmap_atomic(ring);
while (atomic_read(&ctx->reqs_active) > 0) {
- wait_event(ctx->wait, head != info->tail);
+ wait_event(ctx->wait, head != ctx->tail);
- avail = (head < info->tail ? info->tail : info->nr) - head;
+ avail = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
atomic_sub(avail, &ctx->reqs_active);
head += avail;
- head %= info->nr;
+ head %= ctx->nr;
}
WARN_ON(atomic_read(&ctx->reqs_active) < 0);
@@ -352,7 +349,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->dead, 0);
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
- mutex_init(&ctx->ring_info.ring_lock);
+ mutex_init(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs);
@@ -376,7 +373,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
spin_unlock(&mm->ioctx_lock);
pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
- ctx, ctx->user_id, mm, ctx->ring_info.nr);
+ ctx, ctx->user_id, mm, ctx->nr);
return ctx;
out_cleanup:
@@ -471,7 +468,7 @@ void exit_aio(struct mm_struct *mm)
* just set it to 0; aio_free_ring() is the only
* place that uses ->mmap_size, so it's safe.
*/
- ctx->ring_info.mmap_size = 0;
+ ctx->mmap_size = 0;
if (!atomic_xchg(&ctx->dead, 1)) {
hlist_del_rcu(&ctx->list);
@@ -494,10 +491,10 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
- if (atomic_read(&ctx->reqs_active) >= ctx->ring_info.nr)
+ if (atomic_read(&ctx->reqs_active) >= ctx->nr)
return NULL;
- if (atomic_inc_return(&ctx->reqs_active) > ctx->ring_info.nr)
+ if (atomic_inc_return(&ctx->reqs_active) > ctx->nr)
goto out_put;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
@@ -558,7 +555,6 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
void aio_complete(struct kiocb *iocb, long res, long res2)
{
struct kioctx *ctx = iocb->ki_ctx;
- struct aio_ring_info *info;
struct aio_ring *ring;
struct io_event *ev_page, *event;
unsigned long flags;
@@ -579,8 +575,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
return;
}
- info = &ctx->ring_info;
-
/*
* Take rcu_read_lock() in case the kioctx is being destroyed, as we
* need to issue a wakeup after decrementing reqs_active.
@@ -613,13 +607,13 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
spin_lock_irqsave(&ctx->completion_lock, flags);
- tail = info->tail;
+ tail = ctx->tail;
pos = tail + AIO_EVENTS_OFFSET;
- if (++tail >= info->nr)
+ if (++tail >= ctx->nr)
tail = 0;
- ev_page = kmap_atomic(info->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
+ ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
event = ev_page + pos % AIO_EVENTS_PER_PAGE;
event->obj = (u64)(unsigned long)iocb->ki_obj.user;
@@ -628,7 +622,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
event->res2 = res2;
kunmap_atomic(ev_page);
- flush_dcache_page(info->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
+ flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n",
ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
@@ -639,12 +633,12 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
smp_wmb(); /* make event visible before updating tail */
- info->tail = tail;
+ ctx->tail = tail;
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
ring->tail = tail;
kunmap_atomic(ring);
- flush_dcache_page(info->ring_pages[0]);
+ flush_dcache_page(ctx->ring_pages[0]);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
@@ -684,33 +678,32 @@ EXPORT_SYMBOL(aio_complete);
static int aio_read_events_ring(struct kioctx *ctx,
struct io_event __user *event, long nr)
{
- struct aio_ring_info *info = &ctx->ring_info;
struct aio_ring *ring;
unsigned head, pos;
int ret = 0, copy_ret;
- if (!mutex_trylock(&info->ring_lock)) {
+ if (!mutex_trylock(&ctx->ring_lock)) {
__set_current_state(TASK_RUNNING);
- mutex_lock(&info->ring_lock);
+ mutex_lock(&ctx->ring_lock);
}
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head;
kunmap_atomic(ring);
- pr_debug("h%u t%u m%u\n", head, info->tail, info->nr);
+ pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr);
- if (head == info->tail)
+ if (head == ctx->tail)
goto out;
__set_current_state(TASK_RUNNING);
while (ret < nr) {
- unsigned i = (head < info->tail ? info->tail : info->nr) - head;
+ unsigned i = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
struct io_event *ev;
struct page *page;
- if (head == info->tail)
+ if (head == ctx->tail)
break;
i = min_t(int, i, nr - ret);
@@ -718,7 +711,7 @@ static int aio_read_events_ring(struct kioctx *ctx,
((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
pos = head + AIO_EVENTS_OFFSET;
- page = info->ring_pages[pos / AIO_EVENTS_PER_PAGE];
+ page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE];
pos %= AIO_EVENTS_PER_PAGE;
ev = kmap(page);
@@ -732,19 +725,19 @@ static int aio_read_events_ring(struct kioctx *ctx,
ret += i;
head += i;
- head %= info->nr;
+ head %= ctx->nr;
}
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
ring->head = head;
kunmap_atomic(ring);
- flush_dcache_page(info->ring_pages[0]);
+ flush_dcache_page(ctx->ring_pages[0]);
- pr_debug("%d h%u t%u\n", ret, head, info->tail);
+ pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
atomic_sub(ret, &ctx->reqs_active);
out:
- mutex_unlock(&info->ring_lock);
+ mutex_unlock(&ctx->ring_lock);
return ret;
}
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 20/32] aio: Give shared kioctx fields their own cachelines
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 27 +++++++++++++++------------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 96fbd6b..fa87732 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -67,13 +67,6 @@ struct kioctx {
unsigned long user_id;
struct hlist_node list;
- wait_queue_head_t wait;
-
- spinlock_t ctx_lock;
-
- atomic_t reqs_active;
- struct list_head active_reqs; /* used for cancellation */
-
unsigned nr;
/* sys_io_setup currently limits this to an unsigned int */
@@ -85,19 +78,29 @@ struct kioctx {
struct page **ring_pages;
long nr_pages;
+ struct rcu_head rcu_head;
+ struct work_struct rcu_work;
+
struct {
- struct mutex ring_lock;
+ atomic_t reqs_active;
} ____cacheline_aligned;
struct {
+ spinlock_t ctx_lock;
+ struct list_head active_reqs; /* used for cancellation */
+ } ____cacheline_aligned_in_smp;
+
+ struct {
+ struct mutex ring_lock;
+ wait_queue_head_t wait;
+ } ____cacheline_aligned_in_smp;
+
+ struct {
unsigned tail;
spinlock_t completion_lock;
- } ____cacheline_aligned;
+ } ____cacheline_aligned_in_smp;
struct page *internal_pages[AIO_RING_PAGES];
-
- struct rcu_head rcu_head;
- struct work_struct rcu_work;
};
/*------ sysctl variables----*/
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 21/32] aio: reqs_active -> reqs_available
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
The number of outstanding kiocbs is one of the few shared things left
that has to be touched for every kiocb - it'd be nice to make it percpu.
We can make it per cpu by treating it like an allocation problem: we
have a maximum number of kiocbs that can be outstanding (i.e. slots) -
then we just allocate and free slots, and we know how to write per cpu
allocators.
So as prep work for that, we convert reqs_active to reqs_available.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 27 +++++++++++++--------------
1 file changed, 13 insertions(+), 14 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index fa87732..d384eb2 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -82,7 +82,7 @@ struct kioctx {
struct work_struct rcu_work;
struct {
- atomic_t reqs_active;
+ atomic_t reqs_available;
} ____cacheline_aligned;
struct {
@@ -286,17 +286,17 @@ static void free_ioctx(struct kioctx *ctx)
head = ring->head;
kunmap_atomic(ring);
- while (atomic_read(&ctx->reqs_active) > 0) {
+ while (atomic_read(&ctx->reqs_available) < ctx->nr) {
wait_event(ctx->wait, head != ctx->tail);
avail = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
- atomic_sub(avail, &ctx->reqs_active);
+ atomic_add(avail, &ctx->reqs_available);
head += avail;
head %= ctx->nr;
}
- WARN_ON(atomic_read(&ctx->reqs_active) < 0);
+ WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr);
aio_free_ring(ctx);
@@ -360,6 +360,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
if (aio_setup_ring(ctx) < 0)
goto out_freectx;
+ atomic_set(&ctx->reqs_available, ctx->nr);
+
/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
if (aio_nr + nr_events > aio_max_nr ||
@@ -462,7 +464,7 @@ void exit_aio(struct mm_struct *mm)
"exit_aio:ioctx still alive: %d %d %d\n",
atomic_read(&ctx->users),
atomic_read(&ctx->dead),
- atomic_read(&ctx->reqs_active));
+ atomic_read(&ctx->reqs_available));
/*
* We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything.
@@ -494,12 +496,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
- if (atomic_read(&ctx->reqs_active) >= ctx->nr)
+ if (atomic_dec_if_positive(&ctx->reqs_available) <= 0)
return NULL;
- if (atomic_inc_return(&ctx->reqs_active) > ctx->nr)
- goto out_put;
-
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req))
goto out_put;
@@ -509,7 +508,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
return req;
out_put:
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
return NULL;
}
@@ -580,7 +579,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
/*
* Take rcu_read_lock() in case the kioctx is being destroyed, as we
- * need to issue a wakeup after decrementing reqs_active.
+ * need to issue a wakeup after incrementing reqs_available.
*/
rcu_read_lock();
@@ -598,7 +597,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
if (unlikely(xchg(&iocb->ki_cancel,
KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
}
@@ -738,7 +737,7 @@ static int aio_read_events_ring(struct kioctx *ctx,
pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
- atomic_sub(ret, &ctx->reqs_active);
+ atomic_add(ret, &ctx->reqs_available);
out:
mutex_unlock(&ctx->ring_lock);
@@ -1157,7 +1156,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 22/32] aio: percpu reqs_available
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
See the previous patch for why we want to do this - this basically
implements a per cpu allocator for reqs_available that doesn't actually
allocate anything.
Note that we need to increase the size of the ringbuffer we allocate,
since a single thread won't necessarily be able to use all the
reqs_available slots - some (up to about half) might be on other per cpu
lists, unavailable for the current thread.
We size the ringbuffer based on the nr_events userspace passed to
io_setup(), so this is a slight behaviour change - but nr_events wasn't
being used as a hard limit before, it was being rounded up to the next
page before so this doesn't change the actual semantics.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 85 insertions(+), 7 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index d384eb2..e415b33 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -26,6 +26,7 @@
#include <linux/mm.h>
#include <linux/mman.h>
#include <linux/mmu_context.h>
+#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/timer.h>
#include <linux/aio.h>
@@ -59,6 +60,10 @@ struct aio_ring {
#define AIO_RING_PAGES 8
+struct kioctx_cpu {
+ unsigned reqs_available;
+};
+
struct kioctx {
atomic_t users;
atomic_t dead;
@@ -67,6 +72,10 @@ struct kioctx {
unsigned long user_id;
struct hlist_node list;
+ struct __percpu kioctx_cpu *cpu;
+
+ unsigned req_batch;
+
unsigned nr;
/* sys_io_setup currently limits this to an unsigned int */
@@ -149,6 +158,9 @@ static int aio_setup_ring(struct kioctx *ctx)
unsigned long size;
int nr_pages;
+ nr_events = max(nr_events, num_possible_cpus() * 4);
+ nr_events *= 2;
+
/* Compensate for the ring buffer's head/tail overlap entry */
nr_events += 2; /* 1 is required, 2 for good luck */
@@ -255,6 +267,8 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
static void free_ioctx_rcu(struct rcu_head *head)
{
struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+
+ free_percpu(ctx->cpu);
kmem_cache_free(kioctx_cachep, ctx);
}
@@ -268,7 +282,7 @@ static void free_ioctx(struct kioctx *ctx)
struct aio_ring *ring;
struct io_event res;
struct kiocb *req;
- unsigned head, avail;
+ unsigned cpu, head, avail;
spin_lock_irq(&ctx->ctx_lock);
@@ -282,6 +296,13 @@ static void free_ioctx(struct kioctx *ctx)
spin_unlock_irq(&ctx->ctx_lock);
+ for_each_possible_cpu(cpu) {
+ struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu);
+
+ atomic_add(kcpu->reqs_available, &ctx->reqs_available);
+ kcpu->reqs_available = 0;
+ }
+
ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head;
kunmap_atomic(ring);
@@ -357,10 +378,16 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
INIT_LIST_HEAD(&ctx->active_reqs);
- if (aio_setup_ring(ctx) < 0)
+ ctx->cpu = alloc_percpu(struct kioctx_cpu);
+ if (!ctx->cpu)
goto out_freectx;
+ if (aio_setup_ring(ctx) < 0)
+ goto out_freepcpu;
+
atomic_set(&ctx->reqs_available, ctx->nr);
+ ctx->req_batch = ctx->nr / (num_possible_cpus() * 4);
+ BUG_ON(!ctx->req_batch);
/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
@@ -384,6 +411,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
out_cleanup:
err = -EAGAIN;
aio_free_ring(ctx);
+out_freepcpu:
+ free_percpu(ctx->cpu);
out_freectx:
kmem_cache_free(kioctx_cachep, ctx);
pr_debug("error allocating ioctx %d\n", err);
@@ -482,6 +511,52 @@ void exit_aio(struct mm_struct *mm)
}
}
+static void put_reqs_available(struct kioctx *ctx, unsigned nr)
+{
+ struct kioctx_cpu *kcpu;
+
+ preempt_disable();
+ kcpu = this_cpu_ptr(ctx->cpu);
+
+ kcpu->reqs_available += nr;
+ while (kcpu->reqs_available >= ctx->req_batch * 2) {
+ kcpu->reqs_available -= ctx->req_batch;
+ atomic_add(ctx->req_batch, &ctx->reqs_available);
+ }
+
+ preempt_enable();
+}
+
+static bool get_reqs_available(struct kioctx *ctx)
+{
+ struct kioctx_cpu *kcpu;
+ bool ret = false;
+
+ preempt_disable();
+ kcpu = this_cpu_ptr(ctx->cpu);
+
+ if (!kcpu->reqs_available) {
+ int old, avail = atomic_read(&ctx->reqs_available);
+
+ do {
+ if (avail < ctx->req_batch)
+ goto out;
+
+ old = avail;
+ avail = atomic_cmpxchg(&ctx->reqs_available,
+ avail, avail - ctx->req_batch);
+ } while (avail != old);
+
+ kcpu->reqs_available += ctx->req_batch;
+ }
+
+ ret = true;
+ kcpu->reqs_available--;
+out:
+ preempt_enable();
+ return ret;
+}
+
/* aio_get_req
* Allocate a slot for an aio request. Increments the ki_users count
* of the kioctx so that the kioctx stays around until all requests are
@@ -496,7 +571,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
- if (atomic_dec_if_positive(&ctx->reqs_available) <= 0)
+ if (!get_reqs_available(ctx))
return NULL;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
@@ -505,10 +580,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
atomic_set(&req->ki_users, 2);
req->ki_ctx = ctx;
-
return req;
out_put:
- atomic_inc(&ctx->reqs_available);
+ put_reqs_available(ctx, 1);
return NULL;
}
@@ -597,6 +671,10 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
if (unlikely(xchg(&iocb->ki_cancel,
KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
+ /*
+ * Can't use the percpu reqs_available here - could race with
+ * free_ioctx()
+ */
atomic_inc(&ctx->reqs_available);
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
@@ -737,7 +815,7 @@ static int aio_read_events_ring(struct kioctx *ctx,
pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
- atomic_add(ret, &ctx->reqs_available);
+ put_reqs_available(ctx, ret);
out:
mutex_unlock(&ctx->ring_lock);
@@ -1156,7 +1234,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
- atomic_inc(&ctx->reqs_available);
+ put_reqs_available(ctx, 1);
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 23/32] Generic dynamic per cpu refcounting
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
This implements a refcount with similar semantics to
atomic_get()/atomic_dec_and_test(), that starts out as just an atomic_t
but dynamically switches to per cpu refcounting when the rate of
gets/puts becomes too high.
It also implements two stage shutdown, as we need it to tear down the
percpu counts. Before dropping the initial refcount, you must call
percpu_ref_kill(); this puts the refcount in "shutting down mode" and
switches back to a single atomic refcount with the appropriate barriers
(synchronize_rcu()).
It's also legal to call percpu_ref_kill() multiple times - it only
returns true once, so callers don't have to reimplement shutdown
synchronization.
For the sake of simplicity/efficiency, the heuristic is pretty simple -
it just switches to percpu refcounting if there are more than x gets
in one second (completely arbitrarily, 4096).
It'd be more correct to count the number of cache misses or something
else more profile driven, but doing so would require accessing the
shared ref twice per get - by just counting the number of gets(), we can
stick that counter in the high bits of the refcount and increment both
with a single atomic64_add(). But I expect this'll be good enough in
practice.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
include/linux/percpu-refcount.h | 29 +++++++
lib/Makefile | 2 +-
lib/percpu-refcount.c | 164 ++++++++++++++++++++++++++++++++++++++++
3 files changed, 194 insertions(+), 1 deletion(-)
create mode 100644 include/linux/percpu-refcount.h
create mode 100644 lib/percpu-refcount.c
diff --git a/include/linux/percpu-refcount.h b/include/linux/percpu-refcount.h
new file mode 100644
index 0000000..1268010
--- /dev/null
+++ b/include/linux/percpu-refcount.h
@@ -0,0 +1,29 @@
+#ifndef _LINUX_PERCPU_REFCOUNT_H
+#define _LINUX_PERCPU_REFCOUNT_H
+
+#include <linux/atomic.h>
+#include <linux/percpu.h>
+
+struct percpu_ref {
+ atomic64_t count;
+ unsigned __percpu *pcpu_count;
+};
+
+void percpu_ref_init(struct percpu_ref *ref);
+void __percpu_ref_get(struct percpu_ref *ref, bool alloc);
+int percpu_ref_put(struct percpu_ref *ref);
+
+int percpu_ref_kill(struct percpu_ref *ref);
+int percpu_ref_dead(struct percpu_ref *ref);
+
+static inline void percpu_ref_get(struct percpu_ref *ref)
+{
+ __percpu_ref_get(ref, true);
+}
+
+static inline void percpu_ref_get_noalloc(struct percpu_ref *ref)
+{
+ __percpu_ref_get(ref, false);
+}
+
+#endif
diff --git a/lib/Makefile b/lib/Makefile
index a08b791..48a8d26 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -12,7 +12,7 @@ lib-y := ctype.o string.o vsprintf.o cmdline.o \
idr.o int_sqrt.o extable.o \
sha1.o md5.o irq_regs.o reciprocal_div.o argv_split.o \
proportions.o flex_proportions.o prio_heap.o ratelimit.o show_mem.o \
- is_single_threaded.o plist.o decompress.o
+ is_single_threaded.o plist.o decompress.o percpu-refcount.o
lib-$(CONFIG_MMU) += ioremap.o
lib-$(CONFIG_SMP) += cpumask.o
diff --git a/lib/percpu-refcount.c b/lib/percpu-refcount.c
new file mode 100644
index 0000000..522b2df
--- /dev/null
+++ b/lib/percpu-refcount.c
@@ -0,0 +1,164 @@
+#define pr_fmt(fmt) "%s: " fmt "\n", __func__
+
+#include <linux/kernel.h>
+#include <linux/percpu-refcount.h>
+#include <linux/rcupdate.h>
+
+#define PCPU_COUNT_BITS 50
+#define PCPU_COUNT_MASK ((1LL << PCPU_COUNT_BITS) - 1)
+
+#define PCPU_STATUS_BITS 2
+#define PCPU_STATUS_MASK ((1 << PCPU_STATUS_BITS) - 1)
+
+#define PCPU_REF_PTR 0
+#define PCPU_REF_NONE 1
+#define PCPU_REF_DYING 2
+#define PCPU_REF_DEAD 3
+
+#define REF_STATUS(count) ((unsigned long) count & PCPU_STATUS_MASK)
+
+void percpu_ref_init(struct percpu_ref *ref)
+{
+ unsigned long now = jiffies;
+
+ atomic64_set(&ref->count, 1);
+
+ now <<= PCPU_STATUS_BITS;
+ now |= PCPU_REF_NONE;
+
+ ref->pcpu_count = (void *) now;
+}
+
+static void percpu_ref_alloc(struct percpu_ref *ref, unsigned __user *pcpu_count)
+{
+ unsigned __percpu *new;
+ unsigned long last = (unsigned long) pcpu_count;
+ unsigned long now = jiffies;
+
+ now <<= PCPU_STATUS_BITS;
+ now |= PCPU_REF_NONE;
+
+ if (now - last <= HZ << PCPU_STATUS_BITS) {
+ rcu_read_unlock();
+ new = alloc_percpu(unsigned);
+ rcu_read_lock();
+
+ if (!new)
+ goto update_time;
+
+ BUG_ON(((unsigned long) new) & PCPU_STATUS_MASK);
+
+ if (cmpxchg(&ref->pcpu_count, pcpu_count, new) != pcpu_count)
+ free_percpu(new);
+ else
+ pr_debug("created");
+ } else {
+update_time: new = (void *) now;
+ cmpxchg(&ref->pcpu_count, pcpu_count, new);
+ }
+}
+
+void __percpu_ref_get(struct percpu_ref *ref, bool alloc)
+{
+ unsigned __percpu *pcpu_count;
+ uint64_t v;
+
+ pcpu_count = rcu_dereference(ref->pcpu_count);
+
+ if (REF_STATUS(pcpu_count) == PCPU_REF_PTR) {
+ __this_cpu_inc(*pcpu_count);
+ } else {
+ v = atomic64_add_return(1 + (1ULL << PCPU_COUNT_BITS),
+ &ref->count);
+
+ if (!(v >> PCPU_COUNT_BITS) &&
+ REF_STATUS(pcpu_count) == PCPU_REF_NONE && alloc)
+ percpu_ref_alloc(ref, pcpu_count);
+ }
+}
+
+int percpu_ref_put(struct percpu_ref *ref)
+{
+ unsigned __percpu *pcpu_count;
+ uint64_t v;
+ int ret = 0;
+
+ rcu_read_lock();
+
+ pcpu_count = rcu_dereference(ref->pcpu_count);
+
+ switch (REF_STATUS(pcpu_count)) {
+ case PCPU_REF_PTR:
+ __this_cpu_dec(*pcpu_count);
+ break;
+ case PCPU_REF_NONE:
+ case PCPU_REF_DYING:
+ atomic64_dec(&ref->count);
+ break;
+ case PCPU_REF_DEAD:
+ v = atomic64_dec_return(&ref->count);
+ v &= PCPU_COUNT_MASK;
+
+ ret = v == 0;
+ break;
+ }
+
+ rcu_read_unlock();
+
+ return ret;
+}
+
+int percpu_ref_kill(struct percpu_ref *ref)
+{
+ unsigned __percpu *old, *new, *pcpu_count = ref->pcpu_count;
+ unsigned long status;
+
+ do {
+ status = REF_STATUS(pcpu_count);
+
+ switch (status) {
+ case PCPU_REF_PTR:
+ new = (void *) PCPU_REF_DYING;
+ break;
+ case PCPU_REF_NONE:
+ new = (void *) PCPU_REF_DEAD;
+ break;
+ case PCPU_REF_DYING:
+ case PCPU_REF_DEAD:
+ return 0;
+ }
+
+ old = pcpu_count;
+ pcpu_count = cmpxchg(&ref->pcpu_count, old, new);
+ } while (pcpu_count != old);
+
+ if (status == PCPU_REF_PTR) {
+ unsigned count = 0, cpu;
+
+ synchronize_rcu();
+
+ for_each_possible_cpu(cpu)
+ count += *per_cpu_ptr(pcpu_count, cpu);
+
+ pr_debug("global %lli pcpu %i",
+ atomic64_read(&ref->count) & PCPU_COUNT_MASK,
+ (int) count);
+
+ atomic64_add((int) count, &ref->count);
+ smp_wmb();
+ /* Between setting global count and setting PCPU_REF_DEAD */
+ ref->pcpu_count = (void *) PCPU_REF_DEAD;
+
+ free_percpu(pcpu_count);
+ }
+
+ return 1;
+}
+
+int percpu_ref_dead(struct percpu_ref *ref)
+{
+ unsigned status = REF_STATUS(ref->pcpu_count);
+
+ return status == PCPU_REF_DYING ||
+ status == PCPU_REF_DEAD;
+}
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 24/32] aio: Percpu ioctx refcount
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
This just converts the ioctx refcount to the new generic dynamic percpu
refcount code.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 27 ++++++++++++---------------
1 file changed, 12 insertions(+), 15 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index e415b33..b26ad5c 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -36,6 +36,7 @@
#include <linux/eventfd.h>
#include <linux/blkdev.h>
#include <linux/compat.h>
+#include <linux/percpu-refcount.h>
#include <asm/kmap_types.h>
#include <asm/uaccess.h>
@@ -65,8 +66,7 @@ struct kioctx_cpu {
};
struct kioctx {
- atomic_t users;
- atomic_t dead;
+ struct percpu_ref users;
/* This needs improving */
unsigned long user_id;
@@ -340,7 +340,7 @@ static void free_ioctx(struct kioctx *ctx)
static void put_ioctx(struct kioctx *ctx)
{
- if (unlikely(atomic_dec_and_test(&ctx->users)))
+ if (percpu_ref_put(&ctx->users))
free_ioctx(ctx);
}
@@ -369,8 +369,11 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
ctx->max_reqs = nr_events;
- atomic_set(&ctx->users, 2);
- atomic_set(&ctx->dead, 0);
+ percpu_ref_init(&ctx->users);
+ rcu_read_lock();
+ percpu_ref_get(&ctx->users);
+ rcu_read_unlock();
+
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
@@ -442,7 +445,7 @@ static void kill_ioctx_rcu(struct rcu_head *head)
*/
static void kill_ioctx(struct kioctx *ctx)
{
- if (!atomic_xchg(&ctx->dead, 1)) {
+ if (percpu_ref_kill(&ctx->users)) {
hlist_del_rcu(&ctx->list);
/* Between hlist_del_rcu() and dropping the initial ref */
synchronize_rcu();
@@ -488,12 +491,6 @@ void exit_aio(struct mm_struct *mm)
struct hlist_node *p, *n;
hlist_for_each_entry_safe(ctx, p, n, &mm->ioctx_list, list) {
- if (1 != atomic_read(&ctx->users))
- printk(KERN_DEBUG
- "exit_aio:ioctx still alive: %d %d %d\n",
- atomic_read(&ctx->users),
- atomic_read(&ctx->dead),
- atomic_read(&ctx->reqs_available));
/*
* We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything.
@@ -504,7 +501,7 @@ void exit_aio(struct mm_struct *mm)
*/
ctx->mmap_size = 0;
- if (!atomic_xchg(&ctx->dead, 1)) {
+ if (percpu_ref_kill(&ctx->users)) {
hlist_del_rcu(&ctx->list);
call_rcu(&ctx->rcu_head, kill_ioctx_rcu);
}
@@ -616,7 +613,7 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
if (ctx->user_id == ctx_id){
- atomic_inc(&ctx->users);
+ percpu_ref_get(&ctx->users);
ret = ctx;
break;
}
@@ -830,7 +827,7 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
if (ret > 0)
*i += ret;
- if (unlikely(atomic_read(&ctx->dead)))
+ if (unlikely(percpu_ref_dead(&ctx->users)))
ret = -EINVAL;
if (!*i)
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 25/32] aio: use xchg() instead of completion_lock
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
So, for sticking kiocb completions on the kioctx ringbuffer, we need a
lock - it unfortunately can't be lockless.
When the kioctx is shared between threads on different cpus and the rate
of completions is high, this lock sees quite a bit of contention - in
terms of cacheline contention it's the hottest thing in the aio
subsystem.
That means, with a regular spinlock, we're going to take a cache miss
to grab the lock, then another cache miss when we touch the data the
lock protects - if it's on the same cacheline as the lock, other cpus
spinning on the lock are going to be pulling it out from under us as
we're using it.
So, we use an old trick to get rid of this second forced cache miss -
make the data the lock protects be the lock itself, so we grab them both
at once.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 44 ++++++++++++++++++++------------------------
1 file changed, 20 insertions(+), 24 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index b26ad5c..fcd1f38 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -102,11 +102,11 @@ struct kioctx {
struct {
struct mutex ring_lock;
wait_queue_head_t wait;
+ unsigned shadow_tail;
} ____cacheline_aligned_in_smp;
struct {
unsigned tail;
- spinlock_t completion_lock;
} ____cacheline_aligned_in_smp;
struct page *internal_pages[AIO_RING_PAGES];
@@ -308,9 +308,9 @@ static void free_ioctx(struct kioctx *ctx)
kunmap_atomic(ring);
while (atomic_read(&ctx->reqs_available) < ctx->nr) {
- wait_event(ctx->wait, head != ctx->tail);
+ wait_event(ctx->wait, head != ctx->shadow_tail);
- avail = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
+ avail = (head < ctx->shadow_tail ? ctx->shadow_tail : ctx->nr) - head;
atomic_add(avail, &ctx->reqs_available);
head += avail;
@@ -375,7 +375,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
rcu_read_unlock();
spin_lock_init(&ctx->ctx_lock);
- spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
@@ -673,18 +672,19 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
* free_ioctx()
*/
atomic_inc(&ctx->reqs_available);
+ smp_mb__after_atomic_inc();
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
}
/*
- * Add a completion event to the ring buffer. Must be done holding
- * ctx->ctx_lock to prevent other code from messing with the tail
- * pointer since we might be called from irq context.
+ * Add a completion event to the ring buffer; ctx->tail is both our lock
+ * and the canonical version of the tail pointer.
*/
- spin_lock_irqsave(&ctx->completion_lock, flags);
+ local_irq_save(flags);
+ while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
+ cpu_relax();
- tail = ctx->tail;
pos = tail + AIO_EVENTS_OFFSET;
if (++tail >= ctx->nr)
@@ -710,14 +710,18 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
smp_wmb(); /* make event visible before updating tail */
- ctx->tail = tail;
+ ctx->shadow_tail = tail;
ring = kmap_atomic(ctx->ring_pages[0]);
ring->tail = tail;
kunmap_atomic(ring);
flush_dcache_page(ctx->ring_pages[0]);
- spin_unlock_irqrestore(&ctx->completion_lock, flags);
+ /* unlock, make new tail visible before checking waitlist */
+ smp_mb();
+
+ ctx->tail = tail;
+ local_irq_restore(flags);
pr_debug("added to ring %p at [%u]\n", iocb, tail);
@@ -733,14 +737,6 @@ put_rq:
/* everything turned out well, dispose of the aiocb. */
aio_put_req(iocb);
- /*
- * We have to order our ring_info tail store above and test
- * of the wait list below outside the wait lock. This is
- * like in wake_up_bit() where clearing a bit has to be
- * ordered with the unlocked test.
- */
- smp_mb();
-
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
@@ -768,19 +764,19 @@ static int aio_read_events_ring(struct kioctx *ctx,
head = ring->head;
kunmap_atomic(ring);
- pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr);
+ pr_debug("h%u t%u m%u\n", head, ctx->shadow_tail, ctx->nr);
- if (head == ctx->tail)
+ if (head == ctx->shadow_tail)
goto out;
__set_current_state(TASK_RUNNING);
while (ret < nr) {
- unsigned i = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
+ unsigned i = (head < ctx->shadow_tail ? ctx->shadow_tail : ctx->nr) - head;
struct io_event *ev;
struct page *page;
- if (head == ctx->tail)
+ if (head == ctx->shadow_tail)
break;
i = min_t(int, i, nr - ret);
@@ -810,7 +806,7 @@ static int aio_read_events_ring(struct kioctx *ctx,
kunmap_atomic(ring);
flush_dcache_page(ctx->ring_pages[0]);
- pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
+ pr_debug("%d h%u t%u\n", ret, head, ctx->shadow_tail);
put_reqs_available(ctx, ret);
out:
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 26/32] aio: Don't include aio.h in sched.h
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Faster kernel compiles by way of fewer unnecessary includes.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
arch/s390/hypfs/inode.c | 1 +
block/scsi_ioctl.c | 1 +
drivers/char/mem.c | 1 +
drivers/infiniband/hw/ipath/ipath_file_ops.c | 1 +
drivers/infiniband/hw/qib/qib_file_ops.c | 2 +-
drivers/staging/android/logger.c | 1 +
fs/9p/vfs_addr.c | 1 +
fs/afs/write.c | 1 +
fs/block_dev.c | 1 +
fs/btrfs/file.c | 1 +
fs/btrfs/inode.c | 1 +
fs/ceph/file.c | 1 +
fs/compat.c | 1 +
fs/direct-io.c | 1 +
fs/ecryptfs/file.c | 1 +
fs/ext2/inode.c | 1 +
fs/ext3/inode.c | 1 +
fs/ext4/file.c | 1 +
fs/ext4/indirect.c | 1 +
fs/ext4/inode.c | 1 +
fs/ext4/page-io.c | 1 +
fs/fat/inode.c | 1 +
fs/fuse/dev.c | 1 +
fs/fuse/file.c | 1 +
fs/gfs2/aops.c | 1 +
fs/gfs2/file.c | 1 +
fs/hfs/inode.c | 1 +
fs/hfsplus/inode.c | 1 +
fs/jfs/inode.c | 1 +
fs/nilfs2/inode.c | 2 +-
fs/ntfs/file.c | 1 +
fs/ntfs/inode.c | 1 +
fs/ocfs2/aops.h | 2 ++
fs/ocfs2/inode.h | 2 ++
fs/pipe.c | 1 +
fs/read_write.c | 1 +
fs/reiserfs/inode.c | 1 +
fs/ubifs/file.c | 1 +
fs/udf/inode.c | 1 +
fs/xfs/xfs_aops.c | 1 +
fs/xfs/xfs_file.c | 1 +
include/linux/cgroup.h | 1 +
include/linux/sched.h | 2 --
kernel/fork.c | 1 +
kernel/printk.c | 1 +
kernel/ptrace.c | 1 +
mm/page_io.c | 1 +
mm/shmem.c | 1 +
mm/swap.c | 1 +
security/keys/internal.h | 2 ++
security/keys/keyctl.c | 1 +
sound/core/pcm_native.c | 2 +-
52 files changed, 54 insertions(+), 5 deletions(-)
diff --git a/arch/s390/hypfs/inode.c b/arch/s390/hypfs/inode.c
index 06ea69b..c6c6f43 100644
--- a/arch/s390/hypfs/inode.c
+++ b/arch/s390/hypfs/inode.c
@@ -21,6 +21,7 @@
#include <linux/module.h>
#include <linux/seq_file.h>
#include <linux/mount.h>
+#include <linux/aio.h>
#include <asm/ebcdic.h>
#include "hypfs.h"
diff --git a/block/scsi_ioctl.c b/block/scsi_ioctl.c
index 9a87daa..a5ffcc9 100644
--- a/block/scsi_ioctl.c
+++ b/block/scsi_ioctl.c
@@ -27,6 +27,7 @@
#include <linux/ratelimit.h>
#include <linux/slab.h>
#include <linux/times.h>
+#include <linux/uio.h>
#include <asm/uaccess.h>
#include <scsi/scsi.h>
diff --git a/drivers/char/mem.c b/drivers/char/mem.c
index 968ae6e..6447854 100644
--- a/drivers/char/mem.c
+++ b/drivers/char/mem.c
@@ -28,6 +28,7 @@
#include <linux/pfn.h>
#include <linux/export.h>
#include <linux/io.h>
+#include <linux/aio.h>
#include <asm/uaccess.h>
diff --git a/drivers/infiniband/hw/ipath/ipath_file_ops.c b/drivers/infiniband/hw/ipath/ipath_file_ops.c
index 3eb7e45..62edc41 100644
--- a/drivers/infiniband/hw/ipath/ipath_file_ops.c
+++ b/drivers/infiniband/hw/ipath/ipath_file_ops.c
@@ -40,6 +40,7 @@
#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/io.h>
+#include <linux/aio.h>
#include <linux/jiffies.h>
#include <linux/cpu.h>
#include <asm/pgtable.h>
diff --git a/drivers/infiniband/hw/qib/qib_file_ops.c b/drivers/infiniband/hw/qib/qib_file_ops.c
index 959a5c4..488300c 100644
--- a/drivers/infiniband/hw/qib/qib_file_ops.c
+++ b/drivers/infiniband/hw/qib/qib_file_ops.c
@@ -39,7 +39,7 @@
#include <linux/vmalloc.h>
#include <linux/highmem.h>
#include <linux/io.h>
-#include <linux/uio.h>
+#include <linux/aio.h>
#include <linux/jiffies.h>
#include <asm/pgtable.h>
#include <linux/delay.h>
diff --git a/drivers/staging/android/logger.c b/drivers/staging/android/logger.c
index 1d5ed47..c79c101 100644
--- a/drivers/staging/android/logger.c
+++ b/drivers/staging/android/logger.c
@@ -28,6 +28,7 @@
#include <linux/slab.h>
#include <linux/time.h>
#include <linux/vmalloc.h>
+#include <linux/aio.h>
#include "logger.h"
#include <asm/ioctls.h>
diff --git a/fs/9p/vfs_addr.c b/fs/9p/vfs_addr.c
index 0ad61c6..055562c 100644
--- a/fs/9p/vfs_addr.c
+++ b/fs/9p/vfs_addr.c
@@ -33,6 +33,7 @@
#include <linux/pagemap.h>
#include <linux/idr.h>
#include <linux/sched.h>
+#include <linux/aio.h>
#include <net/9p/9p.h>
#include <net/9p/client.h>
diff --git a/fs/afs/write.c b/fs/afs/write.c
index 9aa52d9..5151ea3 100644
--- a/fs/afs/write.c
+++ b/fs/afs/write.c
@@ -14,6 +14,7 @@
#include <linux/pagemap.h>
#include <linux/writeback.h>
#include <linux/pagevec.h>
+#include <linux/aio.h>
#include "internal.h"
static int afs_write_back_from_locked_page(struct afs_writeback *wb,
diff --git a/fs/block_dev.c b/fs/block_dev.c
index ab3a456..bcccb5c 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -27,6 +27,7 @@
#include <linux/namei.h>
#include <linux/log2.h>
#include <linux/cleancache.h>
+#include <linux/aio.h>
#include <asm/uaccess.h>
#include "internal.h"
diff --git a/fs/btrfs/file.c b/fs/btrfs/file.c
index 9ab1bed..166f386 100644
--- a/fs/btrfs/file.c
+++ b/fs/btrfs/file.c
@@ -30,6 +30,7 @@
#include <linux/statfs.h>
#include <linux/compat.h>
#include <linux/slab.h>
+#include <linux/aio.h>
#include "ctree.h"
#include "disk-io.h"
#include "transaction.h"
diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c
index 95542a1..0a7c785 100644
--- a/fs/btrfs/inode.c
+++ b/fs/btrfs/inode.c
@@ -39,6 +39,7 @@
#include <linux/slab.h>
#include <linux/ratelimit.h>
#include <linux/mount.h>
+#include <linux/aio.h>
#include "compat.h"
#include "ctree.h"
#include "disk-io.h"
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 5840d2a..74cee4e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -7,6 +7,7 @@
#include <linux/mount.h>
#include <linux/namei.h>
#include <linux/writeback.h>
+#include <linux/aio.h>
#include "super.h"
#include "mds_client.h"
diff --git a/fs/compat.c b/fs/compat.c
index 015e1e1..6538684 100644
--- a/fs/compat.c
+++ b/fs/compat.c
@@ -48,6 +48,7 @@
#include <linux/fs_struct.h>
#include <linux/slab.h>
#include <linux/pagemap.h>
+#include <linux/aio.h>
#include <asm/uaccess.h>
#include <asm/mmu_context.h>
diff --git a/fs/direct-io.c b/fs/direct-io.c
index cf5b44b..d5099c2 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -37,6 +37,7 @@
#include <linux/uio.h>
#include <linux/atomic.h>
#include <linux/prefetch.h>
+#include <linux/aio.h>
/*
* How many user pages to map in one call to get_user_pages(). This determines
diff --git a/fs/ecryptfs/file.c b/fs/ecryptfs/file.c
index d45ba45..fd04283 100644
--- a/fs/ecryptfs/file.c
+++ b/fs/ecryptfs/file.c
@@ -31,6 +31,7 @@
#include <linux/security.h>
#include <linux/compat.h>
#include <linux/fs_stack.h>
+#include <linux/aio.h>
#include "ecryptfs_kernel.h"
/**
diff --git a/fs/ext2/inode.c b/fs/ext2/inode.c
index 6363ac6..388e77d 100644
--- a/fs/ext2/inode.c
+++ b/fs/ext2/inode.c
@@ -31,6 +31,7 @@
#include <linux/mpage.h>
#include <linux/fiemap.h>
#include <linux/namei.h>
+#include <linux/aio.h>
#include "ext2.h"
#include "acl.h"
#include "xip.h"
diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c
index 7e87e37..f99db41 100644
--- a/fs/ext3/inode.c
+++ b/fs/ext3/inode.c
@@ -27,6 +27,7 @@
#include <linux/writeback.h>
#include <linux/mpage.h>
#include <linux/namei.h>
+#include <linux/aio.h>
#include "ext3.h"
#include "xattr.h"
#include "acl.h"
diff --git a/fs/ext4/file.c b/fs/ext4/file.c
index bf3966b..246d693 100644
--- a/fs/ext4/file.c
+++ b/fs/ext4/file.c
@@ -24,6 +24,7 @@
#include <linux/mount.h>
#include <linux/path.h>
#include <linux/quotaops.h>
+#include <linux/aio.h>
#include "ext4.h"
#include "ext4_jbd2.h"
#include "xattr.h"
diff --git a/fs/ext4/indirect.c b/fs/ext4/indirect.c
index 792e388..dcfcb0d 100644
--- a/fs/ext4/indirect.c
+++ b/fs/ext4/indirect.c
@@ -20,6 +20,7 @@
* (sct@redhat.com), 1993, 1998
*/
+#include <linux/aio.h>
#include "ext4_jbd2.h"
#include "truncate.h"
diff --git a/fs/ext4/inode.c b/fs/ext4/inode.c
index b3c243b..6e42b49 100644
--- a/fs/ext4/inode.c
+++ b/fs/ext4/inode.c
@@ -37,6 +37,7 @@
#include <linux/printk.h>
#include <linux/slab.h>
#include <linux/ratelimit.h>
+#include <linux/aio.h>
#include "ext4_jbd2.h"
#include "xattr.h"
diff --git a/fs/ext4/page-io.c b/fs/ext4/page-io.c
index 68e896e..c3b15e2 100644
--- a/fs/ext4/page-io.c
+++ b/fs/ext4/page-io.c
@@ -23,6 +23,7 @@
#include <linux/workqueue.h>
#include <linux/kernel.h>
#include <linux/slab.h>
+#include <linux/aio.h>
#include "ext4_jbd2.h"
#include "xattr.h"
diff --git a/fs/fat/inode.c b/fs/fat/inode.c
index 5bafaad..4dd0d8d 100644
--- a/fs/fat/inode.c
+++ b/fs/fat/inode.c
@@ -26,6 +26,7 @@
#include <linux/writeback.h>
#include <linux/log2.h>
#include <linux/hash.h>
+#include <linux/aio.h>
#include <asm/unaligned.h>
#include "fat.h"
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 8c23fa7..b4c3f56 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -19,6 +19,7 @@
#include <linux/pipe_fs_i.h>
#include <linux/swap.h>
#include <linux/splice.h>
+#include <linux/aio.h>
MODULE_ALIAS_MISCDEV(FUSE_MINOR);
MODULE_ALIAS("devname:fuse");
diff --git a/fs/fuse/file.c b/fs/fuse/file.c
index 78d2837..c65e75d 100644
--- a/fs/fuse/file.c
+++ b/fs/fuse/file.c
@@ -15,6 +15,7 @@
#include <linux/module.h>
#include <linux/compat.h>
#include <linux/swap.h>
+#include <linux/aio.h>
static const struct file_operations fuse_direct_io_file_operations;
diff --git a/fs/gfs2/aops.c b/fs/gfs2/aops.c
index 01c4975..5b8d824 100644
--- a/fs/gfs2/aops.c
+++ b/fs/gfs2/aops.c
@@ -20,6 +20,7 @@
#include <linux/swap.h>
#include <linux/gfs2_ondisk.h>
#include <linux/backing-dev.h>
+#include <linux/aio.h>
#include "gfs2.h"
#include "incore.h"
diff --git a/fs/gfs2/file.c b/fs/gfs2/file.c
index e056b4c..ac49082 100644
--- a/fs/gfs2/file.c
+++ b/fs/gfs2/file.c
@@ -25,6 +25,7 @@
#include <asm/uaccess.h>
#include <linux/dlm.h>
#include <linux/dlm_plock.h>
+#include <linux/aio.h>
#include "gfs2.h"
#include "incore.h"
diff --git a/fs/hfs/inode.c b/fs/hfs/inode.c
index 0b35903..8e4c003 100644
--- a/fs/hfs/inode.c
+++ b/fs/hfs/inode.c
@@ -14,6 +14,7 @@
#include <linux/pagemap.h>
#include <linux/mpage.h>
#include <linux/sched.h>
+#include <linux/aio.h>
#include "hfs_fs.h"
#include "btree.h"
diff --git a/fs/hfsplus/inode.c b/fs/hfsplus/inode.c
index 2172aa5..50ae88c 100644
--- a/fs/hfsplus/inode.c
+++ b/fs/hfsplus/inode.c
@@ -14,6 +14,7 @@
#include <linux/pagemap.h>
#include <linux/mpage.h>
#include <linux/sched.h>
+#include <linux/aio.h>
#include "hfsplus_fs.h"
#include "hfsplus_raw.h"
diff --git a/fs/jfs/inode.c b/fs/jfs/inode.c
index 4692bf3..a462b57 100644
--- a/fs/jfs/inode.c
+++ b/fs/jfs/inode.c
@@ -23,6 +23,7 @@
#include <linux/pagemap.h>
#include <linux/quotaops.h>
#include <linux/writeback.h>
+#include <linux/aio.h>
#include "jfs_incore.h"
#include "jfs_inode.h"
#include "jfs_filsys.h"
diff --git a/fs/nilfs2/inode.c b/fs/nilfs2/inode.c
index 4d31d2c..1e246df 100644
--- a/fs/nilfs2/inode.c
+++ b/fs/nilfs2/inode.c
@@ -25,7 +25,7 @@
#include <linux/gfp.h>
#include <linux/mpage.h>
#include <linux/writeback.h>
-#include <linux/uio.h>
+#include <linux/aio.h>
#include "nilfs.h"
#include "btnode.h"
#include "segment.h"
diff --git a/fs/ntfs/file.c b/fs/ntfs/file.c
index 1ecf464..1373f97 100644
--- a/fs/ntfs/file.c
+++ b/fs/ntfs/file.c
@@ -27,6 +27,7 @@
#include <linux/swap.h>
#include <linux/uio.h>
#include <linux/writeback.h>
+#include <linux/aio.h>
#include <asm/page.h>
#include <asm/uaccess.h>
diff --git a/fs/ntfs/inode.c b/fs/ntfs/inode.c
index 1d27331..e3afc6e 100644
--- a/fs/ntfs/inode.c
+++ b/fs/ntfs/inode.c
@@ -28,6 +28,7 @@
#include <linux/quotaops.h>
#include <linux/slab.h>
#include <linux/log2.h>
+#include <linux/aio.h>
#include "aops.h"
#include "attrib.h"
diff --git a/fs/ocfs2/aops.h b/fs/ocfs2/aops.h
index ffb2da3..f671e49 100644
--- a/fs/ocfs2/aops.h
+++ b/fs/ocfs2/aops.h
@@ -22,6 +22,8 @@
#ifndef OCFS2_AOPS_H
#define OCFS2_AOPS_H
+#include <linux/aio.h>
+
handle_t *ocfs2_start_walk_page_trans(struct inode *inode,
struct page *page,
unsigned from,
diff --git a/fs/ocfs2/inode.h b/fs/ocfs2/inode.h
index 88924a3..c765bdf 100644
--- a/fs/ocfs2/inode.h
+++ b/fs/ocfs2/inode.h
@@ -28,6 +28,8 @@
#include "extent_map.h"
+struct iocb;
+
/* OCFS2 Inode Private Data */
struct ocfs2_inode_info
{
diff --git a/fs/pipe.c b/fs/pipe.c
index bd3479d..dc86533 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -21,6 +21,7 @@
#include <linux/audit.h>
#include <linux/syscalls.h>
#include <linux/fcntl.h>
+#include <linux/aio.h>
#include <asm/uaccess.h>
#include <asm/ioctls.h>
diff --git a/fs/read_write.c b/fs/read_write.c
index 7347732..a088b00 100644
--- a/fs/read_write.c
+++ b/fs/read_write.c
@@ -15,6 +15,7 @@
#include <linux/syscalls.h>
#include <linux/pagemap.h>
#include <linux/splice.h>
+#include <linux/aio.h>
#include "read_write.h"
#include <asm/uaccess.h>
diff --git a/fs/reiserfs/inode.c b/fs/reiserfs/inode.c
index d83736f..c760aed 100644
--- a/fs/reiserfs/inode.c
+++ b/fs/reiserfs/inode.c
@@ -18,6 +18,7 @@
#include <linux/writeback.h>
#include <linux/quotaops.h>
#include <linux/swap.h>
+#include <linux/aio.h>
int reiserfs_commit_write(struct file *f, struct page *page,
unsigned from, unsigned to);
diff --git a/fs/ubifs/file.c b/fs/ubifs/file.c
index 5bc7781..1b337c7 100644
--- a/fs/ubifs/file.c
+++ b/fs/ubifs/file.c
@@ -50,6 +50,7 @@
*/
#include "ubifs.h"
+#include <linux/aio.h>
#include <linux/mount.h>
#include <linux/namei.h>
#include <linux/slab.h>
diff --git a/fs/udf/inode.c b/fs/udf/inode.c
index df88b95..7cd8448 100644
--- a/fs/udf/inode.c
+++ b/fs/udf/inode.c
@@ -38,6 +38,7 @@
#include <linux/slab.h>
#include <linux/crc-itu-t.h>
#include <linux/mpage.h>
+#include <linux/aio.h>
#include "udf_i.h"
#include "udf_sb.h"
diff --git a/fs/xfs/xfs_aops.c b/fs/xfs/xfs_aops.c
index e57e2da..d519030 100644
--- a/fs/xfs/xfs_aops.c
+++ b/fs/xfs/xfs_aops.c
@@ -31,6 +31,7 @@
#include "xfs_vnodeops.h"
#include "xfs_trace.h"
#include "xfs_bmap.h"
+#include <linux/aio.h>
#include <linux/gfp.h>
#include <linux/mpage.h>
#include <linux/pagevec.h>
diff --git a/fs/xfs/xfs_file.c b/fs/xfs/xfs_file.c
index aa473fa..46176a5 100644
--- a/fs/xfs/xfs_file.c
+++ b/fs/xfs/xfs_file.c
@@ -34,6 +34,7 @@
#include "xfs_ioctl.h"
#include "xfs_trace.h"
+#include <linux/aio.h>
#include <linux/dcache.h>
#include <linux/falloc.h>
#include <linux/pagevec.h>
diff --git a/include/linux/cgroup.h b/include/linux/cgroup.h
index f8a030c..33e158f 100644
--- a/include/linux/cgroup.h
+++ b/include/linux/cgroup.h
@@ -26,6 +26,7 @@ struct cgroup_subsys;
struct inode;
struct cgroup;
struct css_id;
+struct eventfd_ctx;
extern int cgroup_init_early(void);
extern int cgroup_init(void);
diff --git a/include/linux/sched.h b/include/linux/sched.h
index 0dd42a0..a04716d 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -345,8 +345,6 @@ struct user_namespace;
extern int sysctl_max_map_count;
-#include <linux/aio.h>
-
#ifdef CONFIG_MMU
extern void arch_pick_mmap_layout(struct mm_struct *mm);
extern unsigned long
diff --git a/kernel/fork.c b/kernel/fork.c
index 8b20ab7..1104945 100644
--- a/kernel/fork.c
+++ b/kernel/fork.c
@@ -70,6 +70,7 @@
#include <linux/khugepaged.h>
#include <linux/signalfd.h>
#include <linux/uprobes.h>
+#include <linux/aio.h>
#include <asm/pgtable.h>
#include <asm/pgalloc.h>
diff --git a/kernel/printk.c b/kernel/printk.c
index 2d607f4..3fce6e3 100644
--- a/kernel/printk.c
+++ b/kernel/printk.c
@@ -42,6 +42,7 @@
#include <linux/notifier.h>
#include <linux/rculist.h>
#include <linux/poll.h>
+#include <linux/uio.h>
#include <asm/uaccess.h>
diff --git a/kernel/ptrace.c b/kernel/ptrace.c
index 1f5e55d..3f08b89 100644
--- a/kernel/ptrace.c
+++ b/kernel/ptrace.c
@@ -24,6 +24,7 @@
#include <linux/regset.h>
#include <linux/hw_breakpoint.h>
#include <linux/cn_proc.h>
+#include <linux/uio.h>
static int ptrace_trapping_sleep_fn(void *flags)
diff --git a/mm/page_io.c b/mm/page_io.c
index 78eee32..c535d39 100644
--- a/mm/page_io.c
+++ b/mm/page_io.c
@@ -20,6 +20,7 @@
#include <linux/buffer_head.h>
#include <linux/writeback.h>
#include <linux/frontswap.h>
+#include <linux/aio.h>
#include <asm/pgtable.h>
static struct bio *get_swap_bio(gfp_t gfp_flags,
diff --git a/mm/shmem.c b/mm/shmem.c
index 50c5b8f..4a780f9 100644
--- a/mm/shmem.c
+++ b/mm/shmem.c
@@ -30,6 +30,7 @@
#include <linux/mm.h>
#include <linux/export.h>
#include <linux/swap.h>
+#include <linux/aio.h>
static struct vfsmount *shm_mnt;
diff --git a/mm/swap.c b/mm/swap.c
index 6310dc2..d0cb95e 100644
--- a/mm/swap.c
+++ b/mm/swap.c
@@ -30,6 +30,7 @@
#include <linux/backing-dev.h>
#include <linux/memcontrol.h>
#include <linux/gfp.h>
+#include <linux/uio.h>
#include "internal.h"
diff --git a/security/keys/internal.h b/security/keys/internal.h
index 8bbefc3..d4f1468 100644
--- a/security/keys/internal.h
+++ b/security/keys/internal.h
@@ -16,6 +16,8 @@
#include <linux/key-type.h>
#include <linux/task_work.h>
+struct iovec;
+
#ifdef __KDEBUG
#define kenter(FMT, ...) \
printk(KERN_DEBUG "==> %s("FMT")\n", __func__, ##__VA_ARGS__)
diff --git a/security/keys/keyctl.c b/security/keys/keyctl.c
index 5d34b4e..c664201 100644
--- a/security/keys/keyctl.c
+++ b/security/keys/keyctl.c
@@ -22,6 +22,7 @@
#include <linux/err.h>
#include <linux/vmalloc.h>
#include <linux/security.h>
+#include <linux/uio.h>
#include <asm/uaccess.h>
#include "internal.h"
diff --git a/sound/core/pcm_native.c b/sound/core/pcm_native.c
index f9ddecf..2cdd17a 100644
--- a/sound/core/pcm_native.c
+++ b/sound/core/pcm_native.c
@@ -25,7 +25,7 @@
#include <linux/slab.h>
#include <linux/time.h>
#include <linux/pm_qos.h>
-#include <linux/uio.h>
+#include <linux/aio.h>
#include <linux/dma-mapping.h>
#include <sound/core.h>
#include <sound/control.h>
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 27/32] aio: Kill ki_key
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
ki_key wasn't actually used for anything previously - it was always 0.
Drop it to trim struct kiocb a bit.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 7 +++++--
include/linux/aio.h | 9 ++++-----
2 files changed, 9 insertions(+), 7 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index fcd1f38..f6bf227 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1193,7 +1193,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
}
}
- ret = put_user(req->ki_key, &user_iocb->aio_key);
+ ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
if (unlikely(ret)) {
pr_debug("EFAULT: aio_key\n");
goto out_put_req;
@@ -1314,10 +1314,13 @@ static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
assert_spin_locked(&ctx->ctx_lock);
+ if (key != KIOCB_KEY)
+ return NULL;
+
/* TODO: use a hash or array, this sucks. */
list_for_each(pos, &ctx->active_reqs) {
struct kiocb *kiocb = list_kiocb(pos);
- if (kiocb->ki_obj.user == iocb && kiocb->ki_key == key)
+ if (kiocb->ki_obj.user == iocb)
return kiocb;
}
return NULL;
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 58adc56..76a6e59 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -12,7 +12,7 @@
struct kioctx;
struct kiocb;
-#define KIOCB_SYNC_KEY (~0U)
+#define KIOCB_KEY 0
#define KIOCB_CANCELLED ((void *) (~0ULL))
@@ -45,10 +45,9 @@ typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
*/
struct kiocb {
atomic_t ki_users;
- unsigned ki_key; /* id of this request */
struct file *ki_filp;
- struct kioctx *ki_ctx; /* may be NULL for sync ops */
+ struct kioctx *ki_ctx; /* NULL for sync ops */
kiocb_cancel_fn *ki_cancel;
ssize_t (*ki_retry)(struct kiocb *);
void (*ki_dtor)(struct kiocb *);
@@ -84,14 +83,14 @@ struct kiocb {
static inline bool is_sync_kiocb(struct kiocb *kiocb)
{
- return kiocb->ki_key == KIOCB_SYNC_KEY;
+ return kiocb->ki_ctx == NULL;
}
static inline void init_sync_kiocb(struct kiocb *kiocb, struct file *filp)
{
*kiocb = (struct kiocb) {
.ki_users = ATOMIC_INIT(1),
- .ki_key = KIOCB_SYNC_KEY,
+ .ki_ctx = NULL,
.ki_filp = filp,
.ki_obj.tsk = current,
};
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 28/32] aio: Kill ki_retry
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Thanks to Zach Brown's work to rip out the retry infrastructure, we
don't need this anymore - ki_retry was only called right after the kiocb
was initialized.
This also refactors and trims some duplicated code, as well as cleaning
up the refcounting/error handling a bit.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 223 +++++++++++++++++++---------------------------------
include/linux/aio.h | 26 ------
2 files changed, 83 insertions(+), 166 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index f6bf227..fedd8f6 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -574,7 +574,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
if (unlikely(!req))
goto out_put;
- atomic_set(&req->ki_users, 2);
+ atomic_set(&req->ki_users, 1);
req->ki_ctx = ctx;
return req;
out_put:
@@ -941,24 +941,15 @@ static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
BUG_ON(ret > 0 && iocb->ki_left == 0);
}
-static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
+typedef ssize_t (aio_rw_op)(struct kiocb *, const struct iovec *,
+ unsigned long, loff_t);
+
+static ssize_t aio_rw_vect_retry(struct kiocb *iocb, int rw, aio_rw_op *rw_op)
{
struct file *file = iocb->ki_filp;
struct address_space *mapping = file->f_mapping;
struct inode *inode = mapping->host;
- ssize_t (*rw_op)(struct kiocb *, const struct iovec *,
- unsigned long, loff_t);
ssize_t ret = 0;
- unsigned short opcode;
-
- if ((iocb->ki_opcode == IOCB_CMD_PREADV) ||
- (iocb->ki_opcode == IOCB_CMD_PREAD)) {
- rw_op = file->f_op->aio_read;
- opcode = IOCB_CMD_PREADV;
- } else {
- rw_op = file->f_op->aio_write;
- opcode = IOCB_CMD_PWRITEV;
- }
/* This matches the pread()/pwrite() logic */
if (iocb->ki_pos < 0)
@@ -974,7 +965,7 @@ static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
/* retry all partial writes. retry partial reads as long as its a
* regular file. */
} while (ret > 0 && iocb->ki_left > 0 &&
- (opcode == IOCB_CMD_PWRITEV ||
+ (rw == WRITE ||
(!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))));
/* This means we must have transferred all that we could */
@@ -984,7 +975,7 @@ static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
/* If we managed to write some out we return that, rather than
* the eventual error. */
- if (opcode == IOCB_CMD_PWRITEV
+ if (rw == WRITE
&& ret < 0 && ret != -EIOCBQUEUED
&& iocb->ki_nbytes - iocb->ki_left)
ret = iocb->ki_nbytes - iocb->ki_left;
@@ -992,73 +983,41 @@ static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
return ret;
}
-static ssize_t aio_fdsync(struct kiocb *iocb)
-{
- struct file *file = iocb->ki_filp;
- ssize_t ret = -EINVAL;
-
- if (file->f_op->aio_fsync)
- ret = file->f_op->aio_fsync(iocb, 1);
- return ret;
-}
-
-static ssize_t aio_fsync(struct kiocb *iocb)
-{
- struct file *file = iocb->ki_filp;
- ssize_t ret = -EINVAL;
-
- if (file->f_op->aio_fsync)
- ret = file->f_op->aio_fsync(iocb, 0);
- return ret;
-}
-
-static ssize_t aio_setup_vectored_rw(int type, struct kiocb *kiocb, bool compat)
+static ssize_t aio_setup_vectored_rw(int rw, struct kiocb *kiocb, bool compat)
{
ssize_t ret;
+ kiocb->ki_nr_segs = kiocb->ki_nbytes;
+
#ifdef CONFIG_COMPAT
if (compat)
- ret = compat_rw_copy_check_uvector(type,
+ ret = compat_rw_copy_check_uvector(rw,
(struct compat_iovec __user *)kiocb->ki_buf,
- kiocb->ki_nbytes, 1, &kiocb->ki_inline_vec,
+ kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec,
&kiocb->ki_iovec);
else
#endif
- ret = rw_copy_check_uvector(type,
+ ret = rw_copy_check_uvector(rw,
(struct iovec __user *)kiocb->ki_buf,
- kiocb->ki_nbytes, 1, &kiocb->ki_inline_vec,
+ kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec,
&kiocb->ki_iovec);
if (ret < 0)
- goto out;
-
- ret = rw_verify_area(type, kiocb->ki_filp, &kiocb->ki_pos, ret);
- if (ret < 0)
- goto out;
+ return ret;
- kiocb->ki_nr_segs = kiocb->ki_nbytes;
- kiocb->ki_cur_seg = 0;
- /* ki_nbytes/left now reflect bytes instead of segs */
+ /* ki_nbytes now reflect bytes instead of segs */
kiocb->ki_nbytes = ret;
- kiocb->ki_left = ret;
-
- ret = 0;
-out:
- return ret;
+ return 0;
}
-static ssize_t aio_setup_single_vector(int type, struct file * file, struct kiocb *kiocb)
+static ssize_t aio_setup_single_vector(int rw, struct kiocb *kiocb)
{
- int bytes;
-
- bytes = rw_verify_area(type, file, &kiocb->ki_pos, kiocb->ki_left);
- if (bytes < 0)
- return bytes;
+ if (unlikely(!access_ok(!rw, kiocb->ki_buf, kiocb->ki_nbytes)))
+ return -EFAULT;
kiocb->ki_iovec = &kiocb->ki_inline_vec;
kiocb->ki_iovec->iov_base = kiocb->ki_buf;
- kiocb->ki_iovec->iov_len = bytes;
+ kiocb->ki_iovec->iov_len = kiocb->ki_nbytes;
kiocb->ki_nr_segs = 1;
- kiocb->ki_cur_seg = 0;
return 0;
}
@@ -1067,81 +1026,80 @@ static ssize_t aio_setup_single_vector(int type, struct file * file, struct kioc
* Performs the initial checks and aio retry method
* setup for the kiocb at the time of io submission.
*/
-static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
+static ssize_t aio_run_iocb(struct kiocb *req, bool compat)
{
- struct file *file = kiocb->ki_filp;
- ssize_t ret = 0;
+ struct file *file = req->ki_filp;
+ ssize_t ret;
+ int rw, mode;
+ aio_rw_op *rw_op;
- switch (kiocb->ki_opcode) {
+ switch (req->ki_opcode) {
case IOCB_CMD_PREAD:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_READ)))
- break;
- ret = -EFAULT;
- if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf,
- kiocb->ki_left)))
- break;
- ret = aio_setup_single_vector(READ, file, kiocb);
- if (ret)
- break;
- ret = -EINVAL;
- if (file->f_op->aio_read)
- kiocb->ki_retry = aio_rw_vect_retry;
- break;
- case IOCB_CMD_PWRITE:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_WRITE)))
- break;
- ret = -EFAULT;
- if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf,
- kiocb->ki_left)))
- break;
- ret = aio_setup_single_vector(WRITE, file, kiocb);
- if (ret)
- break;
- ret = -EINVAL;
- if (file->f_op->aio_write)
- kiocb->ki_retry = aio_rw_vect_retry;
- break;
case IOCB_CMD_PREADV:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_READ)))
- break;
- ret = aio_setup_vectored_rw(READ, kiocb, compat);
- if (ret)
- break;
- ret = -EINVAL;
- if (file->f_op->aio_read)
- kiocb->ki_retry = aio_rw_vect_retry;
- break;
+ mode = FMODE_READ;
+ rw = READ;
+ rw_op = file->f_op->aio_read;
+ goto rw_common;
+
+ case IOCB_CMD_PWRITE:
case IOCB_CMD_PWRITEV:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_WRITE)))
- break;
- ret = aio_setup_vectored_rw(WRITE, kiocb, compat);
+ mode = FMODE_WRITE;
+ rw = WRITE;
+ rw_op = file->f_op->aio_write;
+ goto rw_common;
+rw_common:
+ if (unlikely(!(file->f_mode & mode)))
+ return -EBADF;
+
+ if (!rw_op)
+ return -EINVAL;
+
+ ret = (req->ki_opcode == IOCB_CMD_PREADV ||
+ req->ki_opcode == IOCB_CMD_PWRITEV)
+ ? aio_setup_vectored_rw(rw, req, compat)
+ : aio_setup_single_vector(rw, req);
if (ret)
- break;
- ret = -EINVAL;
- if (file->f_op->aio_write)
- kiocb->ki_retry = aio_rw_vect_retry;
+ return ret;
+
+ ret = rw_verify_area(rw, file, &req->ki_pos, req->ki_nbytes);
+ if (ret < 0)
+ return ret;
+
+ req->ki_nbytes = ret;
+ req->ki_left = ret;
+
+ ret = aio_rw_vect_retry(req, rw, rw_op);
break;
+
case IOCB_CMD_FDSYNC:
- ret = -EINVAL;
- if (file->f_op->aio_fsync)
- kiocb->ki_retry = aio_fdsync;
+ if (!file->f_op->aio_fsync)
+ return -EINVAL;
+
+ ret = file->f_op->aio_fsync(req, 1);
break;
+
case IOCB_CMD_FSYNC:
- ret = -EINVAL;
- if (file->f_op->aio_fsync)
- kiocb->ki_retry = aio_fsync;
+ if (!file->f_op->aio_fsync)
+ return -EINVAL;
+
+ ret = file->f_op->aio_fsync(req, 0);
break;
+
default:
pr_debug("EINVAL: no operation provided\n");
- ret = -EINVAL;
+ return -EINVAL;
}
- if (!kiocb->ki_retry)
- return ret;
+ if (ret != -EIOCBQUEUED) {
+ /*
+ * There's no easy way to restart the syscall since other AIO's
+ * may be already running. Just fail this IO with EINTR.
+ */
+ if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
+ ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
+ ret = -EINTR;
+ aio_complete(req, ret, 0);
+ }
return 0;
}
@@ -1168,7 +1126,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return -EINVAL;
}
- req = aio_get_req(ctx); /* returns with 2 references to req */
+ req = aio_get_req(ctx);
if (unlikely(!req))
return -EAGAIN;
@@ -1207,29 +1165,14 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
req->ki_opcode = iocb->aio_lio_opcode;
- ret = aio_setup_iocb(req, compat);
+ ret = aio_run_iocb(req, compat);
if (ret)
goto out_put_req;
- ret = req->ki_retry(req);
- if (ret != -EIOCBQUEUED) {
- /*
- * There's no easy way to restart the syscall since other AIO's
- * may be already running. Just fail this IO with EINTR.
- */
- if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
- ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
- ret = -EINTR;
- aio_complete(req, ret, 0);
- }
-
- aio_put_req(req); /* drop extra ref to req */
return 0;
-
out_put_req:
put_reqs_available(ctx, 1);
- aio_put_req(req); /* drop extra ref to req */
- aio_put_req(req); /* drop i/o ref to req */
+ aio_put_req(req);
return ret;
}
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 76a6e59..2a7ad9f 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -18,38 +18,12 @@ struct kiocb;
typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
-/* is there a better place to document function pointer methods? */
-/**
- * ki_retry - iocb forward progress callback
- * @kiocb: The kiocb struct to advance by performing an operation.
- *
- * This callback is called when the AIO core wants a given AIO operation
- * to make forward progress. The kiocb argument describes the operation
- * that is to be performed. As the operation proceeds, perhaps partially,
- * ki_retry is expected to update the kiocb with progress made. Typically
- * ki_retry is set in the AIO core and it itself calls file_operations
- * helpers.
- *
- * ki_retry's return value determines when the AIO operation is completed
- * and an event is generated in the AIO event ring. Except the special
- * return values described below, the value that is returned from ki_retry
- * is transferred directly into the completion ring as the operation's
- * resulting status. Once this has happened ki_retry *MUST NOT* reference
- * the kiocb pointer again.
- *
- * If ki_retry returns -EIOCBQUEUED it has made a promise that aio_complete()
- * will be called on the kiocb pointer in the future. The AIO core will
- * not ask the method again -- ki_retry must ensure forward progress.
- * aio_complete() must be called once and only once in the future, multiple
- * calls may result in undefined behaviour.
- */
struct kiocb {
atomic_t ki_users;
struct file *ki_filp;
struct kioctx *ki_ctx; /* NULL for sync ops */
kiocb_cancel_fn *ki_cancel;
- ssize_t (*ki_retry)(struct kiocb *);
void (*ki_dtor)(struct kiocb *);
union {
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 29/32] block, aio: Batch completion for bios/kiocbs
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
When completing a kiocb, there's some fixed overhead from touching the
kioctx's ring buffer the kiocb belongs to. Some newer high end block
devices can complete multiple IOs per interrupt, much like many network
interfaces have been for some time.
This plumbs through infrastructure so we can take advantage of multiple
completions at the interrupt level, and complete multiple kiocbs at the
same time.
Drivers have to be converted to take advantage of this, but it's a
simple change and the next patches will convert a few drivers.
To use it, an interrupt handler (or any code that completes bios or
requests) declares and initializes a struct batch_complete:
struct batch_complete batch;
batch_complete_init(&batch);
Then, instead of calling bio_endio(), it calls
bio_endio_batch(bio, err, &batch). This just adds the bio to a list in
the batch_complete.
At the end, it calls
batch_complete(&batch);
This completes all the bios all at once, building up a list of kiocbs;
then the list of kiocbs are completed all at once.
Also, in order to batch up the kiocbs we have to add a different
bio_endio function to struct bio, that takes a pointer to the
batch_complete - this patch converts the dio code's bio_endio function.
In order to avoid changing every bio_endio function in the kernel (there
are many), we currently use a union and a flag to indicate what kind of
bio endio function to call. This is admittedly a hack, but should
suffice for now.
For batching to work through say md or dm devices, the md/dm bio_endio
functions would have to be converted, much like the dio code. That is
left for future patches.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
block/blk-core.c | 34 ++++---
block/blk-flush.c | 2 +-
block/blk.h | 3 +-
drivers/block/swim3.c | 2 +-
drivers/md/dm.c | 2 +-
fs/aio.c | 254 +++++++++++++++++++++++++++++++---------------
fs/bio.c | 52 ++++++----
fs/direct-io.c | 20 ++--
include/linux/aio.h | 22 +++-
include/linux/bio.h | 36 ++++++-
include/linux/blk_types.h | 11 +-
include/linux/blkdev.h | 12 ++-
12 files changed, 311 insertions(+), 139 deletions(-)
diff --git a/block/blk-core.c b/block/blk-core.c
index 3c95c4d..4fac6ddb 100644
--- a/block/blk-core.c
+++ b/block/blk-core.c
@@ -151,7 +151,8 @@ void blk_rq_init(struct request_queue *q, struct request *rq)
EXPORT_SYMBOL(blk_rq_init);
static void req_bio_endio(struct request *rq, struct bio *bio,
- unsigned int nbytes, int error)
+ unsigned int nbytes, int error,
+ struct batch_complete *batch)
{
if (error)
clear_bit(BIO_UPTODATE, &bio->bi_flags);
@@ -175,7 +176,7 @@ static void req_bio_endio(struct request *rq, struct bio *bio,
/* don't actually finish bio if it's part of flush sequence */
if (bio->bi_size == 0 && !(rq->cmd_flags & REQ_FLUSH_SEQ))
- bio_endio(bio, error);
+ bio_endio_batch(bio, error, batch);
}
void blk_dump_rq_flags(struct request *rq, char *msg)
@@ -2215,7 +2216,8 @@ EXPORT_SYMBOL(blk_fetch_request);
* %false - this request doesn't have any more data
* %true - this request has more data
**/
-bool blk_update_request(struct request *req, int error, unsigned int nr_bytes)
+bool blk_update_request(struct request *req, int error, unsigned int nr_bytes,
+ struct batch_complete *batch)
{
int total_bytes, bio_nbytes, next_idx = 0;
struct bio *bio;
@@ -2271,7 +2273,7 @@ bool blk_update_request(struct request *req, int error, unsigned int nr_bytes)
if (nr_bytes >= bio->bi_size) {
req->bio = bio->bi_next;
nbytes = bio->bi_size;
- req_bio_endio(req, bio, nbytes, error);
+ req_bio_endio(req, bio, nbytes, error, batch);
next_idx = 0;
bio_nbytes = 0;
} else {
@@ -2333,7 +2335,7 @@ bool blk_update_request(struct request *req, int error, unsigned int nr_bytes)
* if the request wasn't completed, update state
*/
if (bio_nbytes) {
- req_bio_endio(req, bio, bio_nbytes, error);
+ req_bio_endio(req, bio, bio_nbytes, error, batch);
bio->bi_idx += next_idx;
bio_iovec(bio)->bv_offset += nr_bytes;
bio_iovec(bio)->bv_len -= nr_bytes;
@@ -2370,14 +2372,15 @@ EXPORT_SYMBOL_GPL(blk_update_request);
static bool blk_update_bidi_request(struct request *rq, int error,
unsigned int nr_bytes,
- unsigned int bidi_bytes)
+ unsigned int bidi_bytes,
+ struct batch_complete *batch)
{
- if (blk_update_request(rq, error, nr_bytes))
+ if (blk_update_request(rq, error, nr_bytes, batch))
return true;
/* Bidi request must be completed as a whole */
if (unlikely(blk_bidi_rq(rq)) &&
- blk_update_request(rq->next_rq, error, bidi_bytes))
+ blk_update_request(rq->next_rq, error, bidi_bytes, batch))
return true;
if (blk_queue_add_random(rq->q))
@@ -2460,7 +2463,7 @@ static bool blk_end_bidi_request(struct request *rq, int error,
struct request_queue *q = rq->q;
unsigned long flags;
- if (blk_update_bidi_request(rq, error, nr_bytes, bidi_bytes))
+ if (blk_update_bidi_request(rq, error, nr_bytes, bidi_bytes, NULL))
return true;
spin_lock_irqsave(q->queue_lock, flags);
@@ -2486,9 +2489,10 @@ static bool blk_end_bidi_request(struct request *rq, int error,
* %true - still buffers pending for this request
**/
bool __blk_end_bidi_request(struct request *rq, int error,
- unsigned int nr_bytes, unsigned int bidi_bytes)
+ unsigned int nr_bytes, unsigned int bidi_bytes,
+ struct batch_complete *batch)
{
- if (blk_update_bidi_request(rq, error, nr_bytes, bidi_bytes))
+ if (blk_update_bidi_request(rq, error, nr_bytes, bidi_bytes, batch))
return true;
blk_finish_request(rq, error);
@@ -2589,7 +2593,7 @@ EXPORT_SYMBOL_GPL(blk_end_request_err);
**/
bool __blk_end_request(struct request *rq, int error, unsigned int nr_bytes)
{
- return __blk_end_bidi_request(rq, error, nr_bytes, 0);
+ return __blk_end_bidi_request(rq, error, nr_bytes, 0, NULL);
}
EXPORT_SYMBOL(__blk_end_request);
@@ -2601,7 +2605,7 @@ EXPORT_SYMBOL(__blk_end_request);
* Description:
* Completely finish @rq. Must be called with queue lock held.
*/
-void __blk_end_request_all(struct request *rq, int error)
+void blk_end_request_all_batch(struct request *rq, int error, struct batch_complete *batch)
{
bool pending;
unsigned int bidi_bytes = 0;
@@ -2609,10 +2613,10 @@ void __blk_end_request_all(struct request *rq, int error)
if (unlikely(blk_bidi_rq(rq)))
bidi_bytes = blk_rq_bytes(rq->next_rq);
- pending = __blk_end_bidi_request(rq, error, blk_rq_bytes(rq), bidi_bytes);
+ pending = __blk_end_bidi_request(rq, error, blk_rq_bytes(rq), bidi_bytes, batch);
BUG_ON(pending);
}
-EXPORT_SYMBOL(__blk_end_request_all);
+EXPORT_SYMBOL(blk_end_request_all_batch);
/**
* __blk_end_request_cur - Helper function to finish the current request chunk.
diff --git a/block/blk-flush.c b/block/blk-flush.c
index 720ad60..ab8e211 100644
--- a/block/blk-flush.c
+++ b/block/blk-flush.c
@@ -316,7 +316,7 @@ void blk_insert_flush(struct request *rq)
* complete the request.
*/
if (!policy) {
- __blk_end_bidi_request(rq, 0, 0, 0);
+ __blk_end_bidi_request(rq, 0, 0, 0, NULL);
return;
}
diff --git a/block/blk.h b/block/blk.h
index ca51543..38ad3e1 100644
--- a/block/blk.h
+++ b/block/blk.h
@@ -31,7 +31,8 @@ void blk_queue_bypass_end(struct request_queue *q);
void blk_dequeue_request(struct request *rq);
void __blk_queue_free_tags(struct request_queue *q);
bool __blk_end_bidi_request(struct request *rq, int error,
- unsigned int nr_bytes, unsigned int bidi_bytes);
+ unsigned int nr_bytes, unsigned int bidi_bytes,
+ struct batch_complete *batch);
void blk_rq_timed_out_timer(unsigned long data);
void blk_delete_timer(struct request *);
diff --git a/drivers/block/swim3.c b/drivers/block/swim3.c
index 89ddab1..f90be3b 100644
--- a/drivers/block/swim3.c
+++ b/drivers/block/swim3.c
@@ -775,7 +775,7 @@ static irqreturn_t swim3_interrupt(int irq, void *dev_id)
if (intr & ERROR_INTR) {
n = fs->scount - 1 - resid / 512;
if (n > 0) {
- blk_update_request(req, 0, n << 9);
+ blk_update_request(req, 0, n << 9, NULL);
fs->req_sector += n;
}
if (fs->retries < 5) {
diff --git a/drivers/md/dm.c b/drivers/md/dm.c
index 77e6eff..7105e46 100644
--- a/drivers/md/dm.c
+++ b/drivers/md/dm.c
@@ -724,7 +724,7 @@ static void end_clone_bio(struct bio *clone, int error)
* Do not use blk_end_request() here, because it may complete
* the original request before the clone, and break the ordering.
*/
- blk_update_request(tio->orig, 0, nr_bytes);
+ blk_update_request(tio->orig, 0, nr_bytes, NULL);
}
/*
diff --git a/fs/aio.c b/fs/aio.c
index fedd8f6..0e70b0e 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -621,71 +621,11 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
return ret;
}
-/* aio_complete
- * Called when the io request on the given iocb is complete.
- */
-void aio_complete(struct kiocb *iocb, long res, long res2)
+static inline unsigned kioctx_ring_put(struct kioctx *ctx, struct kiocb *req,
+ unsigned tail)
{
- struct kioctx *ctx = iocb->ki_ctx;
- struct aio_ring *ring;
struct io_event *ev_page, *event;
- unsigned long flags;
- unsigned tail, pos;
-
- /*
- * Special case handling for sync iocbs:
- * - events go directly into the iocb for fast handling
- * - the sync task with the iocb in its stack holds the single iocb
- * ref, no other paths have a way to get another ref
- * - the sync task helpfully left a reference to itself in the iocb
- */
- if (is_sync_kiocb(iocb)) {
- BUG_ON(atomic_read(&iocb->ki_users) != 1);
- iocb->ki_user_data = res;
- atomic_set(&iocb->ki_users, 0);
- wake_up_process(iocb->ki_obj.tsk);
- return;
- }
-
- /*
- * Take rcu_read_lock() in case the kioctx is being destroyed, as we
- * need to issue a wakeup after incrementing reqs_available.
- */
- rcu_read_lock();
-
- if (iocb->ki_list.next) {
- unsigned long flags;
-
- spin_lock_irqsave(&ctx->ctx_lock, flags);
- list_del(&iocb->ki_list);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
- }
-
- /*
- * cancelled requests don't get events, userland was given one
- * when the event got cancelled.
- */
- if (unlikely(xchg(&iocb->ki_cancel,
- KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
- /*
- * Can't use the percpu reqs_available here - could race with
- * free_ioctx()
- */
- atomic_inc(&ctx->reqs_available);
- smp_mb__after_atomic_inc();
- /* Still need the wake_up in case free_ioctx is waiting */
- goto put_rq;
- }
-
- /*
- * Add a completion event to the ring buffer; ctx->tail is both our lock
- * and the canonical version of the tail pointer.
- */
- local_irq_save(flags);
- while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
- cpu_relax();
-
- pos = tail + AIO_EVENTS_OFFSET;
+ unsigned pos = tail + AIO_EVENTS_OFFSET;
if (++tail >= ctx->nr)
tail = 0;
@@ -693,22 +633,41 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
event = ev_page + pos % AIO_EVENTS_PER_PAGE;
- event->obj = (u64)(unsigned long)iocb->ki_obj.user;
- event->data = iocb->ki_user_data;
- event->res = res;
- event->res2 = res2;
+ event->obj = (u64) req->ki_obj.user;
+ event->data = req->ki_user_data;
+ event->res = req->ki_res;
+ event->res2 = req->ki_res2;
kunmap_atomic(ev_page);
flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n",
- ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
- res, res2);
+ ctx, tail, req, req->ki_obj.user, req->ki_user_data,
+ req->ki_res, req->ki_res2);
+
+ return tail;
+}
- /* after flagging the request as done, we
- * must never even look at it again
+static inline unsigned kioctx_ring_lock(struct kioctx *ctx)
+{
+ unsigned tail;
+
+ /*
+ * ctx->tail is both our lock and the canonical version of the tail
+ * pointer.
*/
- smp_wmb(); /* make event visible before updating tail */
+ while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
+ cpu_relax();
+
+ return tail;
+}
+
+static inline void kioctx_ring_unlock(struct kioctx *ctx, unsigned tail)
+{
+ struct aio_ring *ring;
+
+ smp_wmb();
+ /* make event visible before updating tail */
ctx->shadow_tail = tail;
@@ -721,28 +680,157 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
smp_mb();
ctx->tail = tail;
- local_irq_restore(flags);
- pr_debug("added to ring %p at [%u]\n", iocb, tail);
+ if (waitqueue_active(&ctx->wait))
+ wake_up(&ctx->wait);
+}
+
+void batch_complete_aio(struct batch_complete *batch)
+{
+ struct kioctx *ctx = NULL;
+ struct eventfd_ctx *eventfd = NULL;
+ struct rb_node *n;
+ unsigned long flags;
+ unsigned tail = 0;
+
+ if (RB_EMPTY_ROOT(&batch->kiocb))
+ return;
+
+ /*
+ * Take rcu_read_lock() in case the kioctx is being destroyed, as we
+ * need to issue a wakeup after incrementing reqs_available.
+ */
+ rcu_read_lock();
+ local_irq_save(flags);
+
+ n = rb_first(&batch->kiocb);
+ while (n) {
+ struct kiocb *req = container_of(n, struct kiocb, ki_node);
+
+ if (n->rb_right) {
+ n->rb_right->__rb_parent_color = n->__rb_parent_color;
+ n = n->rb_right;
+
+ while (n->rb_left)
+ n = n->rb_left;
+ } else {
+ n = rb_parent(n);
+ }
+
+ if (unlikely(xchg(&req->ki_cancel,
+ KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
+ /*
+ * Can't use the percpu reqs_available here - could race
+ * with free_ioctx()
+ */
+ atomic_inc(&req->ki_ctx->reqs_available);
+ aio_put_req(req);
+ continue;
+ }
+
+ if (unlikely(req->ki_eventfd != eventfd)) {
+ if (eventfd) {
+ /* Make event visible */
+ kioctx_ring_unlock(ctx, tail);
+ ctx = NULL;
+
+ eventfd_signal(eventfd, 1);
+ eventfd_ctx_put(eventfd);
+ }
+
+ eventfd = req->ki_eventfd;
+ req->ki_eventfd = NULL;
+ }
+
+ if (unlikely(req->ki_ctx != ctx)) {
+ if (ctx)
+ kioctx_ring_unlock(ctx, tail);
+
+ ctx = req->ki_ctx;
+ tail = kioctx_ring_lock(ctx);
+ }
+
+ tail = kioctx_ring_put(ctx, req, tail);
+ aio_put_req(req);
+ }
+
+ kioctx_ring_unlock(ctx, tail);
+ local_irq_restore(flags);
+ rcu_read_unlock();
/*
* Check if the user asked us to deliver the result through an
* eventfd. The eventfd_signal() function is safe to be called
* from IRQ context.
*/
- if (iocb->ki_eventfd != NULL)
- eventfd_signal(iocb->ki_eventfd, 1);
+ if (eventfd) {
+ eventfd_signal(eventfd, 1);
+ eventfd_ctx_put(eventfd);
+ }
+}
+EXPORT_SYMBOL(batch_complete_aio);
-put_rq:
- /* everything turned out well, dispose of the aiocb. */
- aio_put_req(iocb);
+/* aio_complete_batch
+ * Called when the io request on the given iocb is complete; @batch may be
+ * NULL.
+ */
+void aio_complete_batch(struct kiocb *req, long res, long res2,
+ struct batch_complete *batch)
+{
+ req->ki_res = res;
+ req->ki_res2 = res2;
- if (waitqueue_active(&ctx->wait))
- wake_up(&ctx->wait);
+ if (req->ki_list.next) {
+ struct kioctx *ctx = req->ki_ctx;
+ unsigned long flags;
- rcu_read_unlock();
+ spin_lock_irqsave(&ctx->ctx_lock, flags);
+ list_del(&req->ki_list);
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ }
+
+ /*
+ * Special case handling for sync iocbs:
+ * - events go directly into the iocb for fast handling
+ * - the sync task with the iocb in its stack holds the single iocb
+ * ref, no other paths have a way to get another ref
+ * - the sync task helpfully left a reference to itself in the iocb
+ */
+ if (is_sync_kiocb(req)) {
+ BUG_ON(atomic_read(&req->ki_users) != 1);
+ req->ki_user_data = req->ki_res;
+ atomic_set(&req->ki_users, 0);
+ wake_up_process(req->ki_obj.tsk);
+ } else if (batch) {
+ int res;
+ struct kiocb *t;
+ struct rb_node **n = &batch->kiocb.rb_node, *parent = NULL;
+
+ while (*n) {
+ parent = *n;
+ t = container_of(*n, struct kiocb, ki_node);
+
+ res = req->ki_ctx != t->ki_ctx
+ ? req->ki_ctx < t->ki_ctx
+ : req->ki_eventfd != t->ki_eventfd
+ ? req->ki_eventfd < t->ki_eventfd
+ : req < t;
+
+ n = res ? &(*n)->rb_left : &(*n)->rb_right;
+ }
+
+ rb_link_node(&req->ki_node, parent, n);
+ rb_insert_color(&req->ki_node, &batch->kiocb);
+ } else {
+ struct batch_complete batch_stack;
+
+ memset(&req->ki_node, 0, sizeof(req->ki_node));
+ batch_stack.kiocb.rb_node = &req->ki_node;
+
+ batch_complete_aio(&batch_stack);
+ }
}
-EXPORT_SYMBOL(aio_complete);
+EXPORT_SYMBOL(aio_complete_batch);
/* aio_read_events
* Pull an event off of the ioctx's event ring. Returns the number of
diff --git a/fs/bio.c b/fs/bio.c
index b96fc6c..c89807d 100644
--- a/fs/bio.c
+++ b/fs/bio.c
@@ -27,6 +27,7 @@
#include <linux/mempool.h>
#include <linux/workqueue.h>
#include <linux/cgroup.h>
+#include <linux/aio.h>
#include <scsi/sg.h> /* for struct sg_iovec */
#include <trace/events/block.h>
@@ -1407,31 +1408,42 @@ void bio_flush_dcache_pages(struct bio *bi)
EXPORT_SYMBOL(bio_flush_dcache_pages);
#endif
-/**
- * bio_endio - end I/O on a bio
- * @bio: bio
- * @error: error, if any
- *
- * Description:
- * bio_endio() will end I/O on the whole bio. bio_endio() is the
- * preferred way to end I/O on a bio, it takes care of clearing
- * BIO_UPTODATE on error. @error is 0 on success, and and one of the
- * established -Exxxx (-EIO, for instance) error values in case
- * something went wrong. No one should call bi_end_io() directly on a
- * bio unless they own it and thus know that it has an end_io
- * function.
- **/
-void bio_endio(struct bio *bio, int error)
+static inline void __bio_endio(struct bio *bio, struct batch_complete *batch)
{
- if (error)
+ if (bio->bi_error)
clear_bit(BIO_UPTODATE, &bio->bi_flags);
else if (!test_bit(BIO_UPTODATE, &bio->bi_flags))
- error = -EIO;
+ bio->bi_error = -EIO;
+
+ if (bio_flagged(bio, BIO_BATCH_ENDIO))
+ bio->bi_batch_end_io(bio, bio->bi_error, batch);
+ else if (bio->bi_end_io)
+ bio->bi_end_io(bio, bio->bi_error);
+}
+
+void bio_endio_batch(struct bio *bio, int error, struct batch_complete *batch)
+{
+ if (error)
+ bio->bi_error = error;
+
+ if (batch)
+ bio_list_add(&batch->bio, bio);
+ else
+ __bio_endio(bio, batch);
+
+}
+EXPORT_SYMBOL(bio_endio_batch);
+
+void batch_complete(struct batch_complete *batch)
+{
+ struct bio *bio;
+
+ while ((bio = bio_list_pop(&batch->bio)))
+ __bio_endio(bio, batch);
- if (bio->bi_end_io)
- bio->bi_end_io(bio, error);
+ batch_complete_aio(batch);
}
-EXPORT_SYMBOL(bio_endio);
+EXPORT_SYMBOL(batch_complete);
void bio_pair_release(struct bio_pair *bp)
{
diff --git a/fs/direct-io.c b/fs/direct-io.c
index d5099c2..42bf67a 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -230,7 +230,8 @@ static inline struct page *dio_get_page(struct dio *dio,
* filesystems can use it to hold additional state between get_block calls and
* dio_complete.
*/
-static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async)
+static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async,
+ struct batch_complete *batch)
{
ssize_t transferred = 0;
@@ -263,7 +264,7 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is
dio->private, ret, is_async);
} else {
if (is_async)
- aio_complete(dio->iocb, ret, 0);
+ aio_complete_batch(dio->iocb, ret, 0, batch);
inode_dio_done(dio->inode);
}
@@ -274,7 +275,7 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio);
/*
* Asynchronous IO callback.
*/
-static void dio_bio_end_aio(struct bio *bio, int error)
+static void dio_bio_end_aio(struct bio *bio, int error, struct batch_complete *batch)
{
struct dio *dio = bio->bi_private;
unsigned long remaining;
@@ -290,7 +291,7 @@ static void dio_bio_end_aio(struct bio *bio, int error)
spin_unlock_irqrestore(&dio->bio_lock, flags);
if (remaining == 0) {
- dio_complete(dio, dio->iocb->ki_pos, 0, true);
+ dio_complete(dio, dio->iocb->ki_pos, 0, true, batch);
kmem_cache_free(dio_cache, dio);
}
}
@@ -329,7 +330,7 @@ void dio_end_io(struct bio *bio, int error)
struct dio *dio = bio->bi_private;
if (dio->is_async)
- dio_bio_end_aio(bio, error);
+ dio_bio_end_aio(bio, error, NULL);
else
dio_bio_end_io(bio, error);
}
@@ -350,9 +351,10 @@ dio_bio_alloc(struct dio *dio, struct dio_submit *sdio,
bio->bi_bdev = bdev;
bio->bi_sector = first_sector;
- if (dio->is_async)
- bio->bi_end_io = dio_bio_end_aio;
- else
+ if (dio->is_async) {
+ bio->bi_batch_end_io = dio_bio_end_aio;
+ bio->bi_flags |= 1 << BIO_BATCH_ENDIO;
+ } else
bio->bi_end_io = dio_bio_end_io;
sdio->bio = bio;
@@ -1273,7 +1275,7 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
dio_await_completion(dio);
if (drop_refcount(dio) == 0) {
- retval = dio_complete(dio, offset, retval, false);
+ retval = dio_complete(dio, offset, retval, false, NULL);
kmem_cache_free(dio_cache, dio);
} else
BUG_ON(retval != -EIOCBQUEUED);
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 2a7ad9f..db6b856 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -6,11 +6,12 @@
#include <linux/aio_abi.h>
#include <linux/uio.h>
#include <linux/rcupdate.h>
-
#include <linux/atomic.h>
+#include <linux/rbtree.h>
struct kioctx;
struct kiocb;
+struct batch_complete;
#define KIOCB_KEY 0
@@ -19,6 +20,8 @@ struct kiocb;
typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
struct kiocb {
+ struct rb_node ki_node;
+
atomic_t ki_users;
struct file *ki_filp;
@@ -32,6 +35,9 @@ struct kiocb {
} ki_obj;
__u64 ki_user_data; /* user's data for completion */
+ long ki_res;
+ long ki_res2;
+
loff_t ki_pos;
void *private;
@@ -74,7 +80,9 @@ static inline void init_sync_kiocb(struct kiocb *kiocb, struct file *filp)
#ifdef CONFIG_AIO
extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb);
extern void aio_put_req(struct kiocb *iocb);
-extern void aio_complete(struct kiocb *iocb, long res, long res2);
+extern void batch_complete_aio(struct batch_complete *batch);
+extern void aio_complete_batch(struct kiocb *iocb, long res, long res2,
+ struct batch_complete *batch);
struct mm_struct;
extern void exit_aio(struct mm_struct *mm);
extern long do_io_submit(aio_context_t ctx_id, long nr,
@@ -83,7 +91,10 @@ void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel);
#else
static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
static inline void aio_put_req(struct kiocb *iocb) { }
-static inline void aio_complete(struct kiocb *iocb, long res, long res2) { }
+
+static inline void batch_complete_aio(struct batch_complete *batch) { }
+static inline void aio_complete_batch(struct kiocb *iocb, long res, long res2,
+ struct batch_complete *batch) { return 0; }
struct mm_struct;
static inline void exit_aio(struct mm_struct *mm) { }
static inline long do_io_submit(aio_context_t ctx_id, long nr,
@@ -93,6 +104,11 @@ static inline void kiocb_set_cancel_fn(struct kiocb *req,
kiocb_cancel_fn *cancel) { }
#endif /* CONFIG_AIO */
+static inline void aio_complete(struct kiocb *iocb, long res, long res2)
+{
+ aio_complete_batch(iocb, res, res2, NULL);
+}
+
static inline struct kiocb *list_kiocb(struct list_head *h)
{
return list_entry(h, struct kiocb, ki_list);
diff --git a/include/linux/bio.h b/include/linux/bio.h
index 820e7aa..b4e9df5f 100644
--- a/include/linux/bio.h
+++ b/include/linux/bio.h
@@ -241,7 +241,27 @@ static inline struct bio *bio_clone_kmalloc(struct bio *bio, gfp_t gfp_mask)
}
-extern void bio_endio(struct bio *, int);
+void bio_endio_batch(struct bio *bio, int error, struct batch_complete *batch);
+
+/**
+ * bio_endio - end I/O on a bio
+ * @bio: bio
+ * @error: error, if any
+ *
+ * Description:
+ * bio_endio() will end I/O on the whole bio. bio_endio() is the
+ * preferred way to end I/O on a bio, it takes care of clearing
+ * BIO_UPTODATE on error. @error is 0 on success, and and one of the
+ * established -Exxxx (-EIO, for instance) error values in case
+ * something went wrong. No one should call bi_end_io() directly on a
+ * bio unless they own it and thus know that it has an end_io
+ * function.
+ **/
+static inline void bio_endio(struct bio *bio, int error)
+{
+ bio_endio_batch(bio, error, NULL);
+}
+
struct request_queue;
extern int bio_phys_segments(struct request_queue *, struct bio *);
@@ -527,6 +547,20 @@ static inline struct bio *bio_list_get(struct bio_list *bl)
return bio;
}
+struct batch_complete {
+ struct bio_list bio;
+ struct rb_root kiocb;
+};
+
+static inline void batch_complete_init(struct batch_complete *batch)
+{
+ bio_list_init(&batch->bio);
+ batch->kiocb = RB_ROOT;
+}
+
+void batch_complete(struct batch_complete *batch);
+
+
#if defined(CONFIG_BLK_DEV_INTEGRITY)
#define bip_vec_idx(bip, idx) (&(bip->bip_vec[(idx)]))
diff --git a/include/linux/blk_types.h b/include/linux/blk_types.h
index cdf1119..d4e7bab 100644
--- a/include/linux/blk_types.h
+++ b/include/linux/blk_types.h
@@ -16,7 +16,9 @@ struct page;
struct block_device;
struct io_context;
struct cgroup_subsys_state;
+struct batch_complete;
typedef void (bio_end_io_t) (struct bio *, int);
+typedef void (bio_batch_end_io_t) (struct bio *, int, struct batch_complete *);
typedef void (bio_destructor_t) (struct bio *);
/*
@@ -42,6 +44,7 @@ struct bio {
* top bits priority
*/
+ short bi_error;
unsigned short bi_vcnt; /* how many bio_vec's */
unsigned short bi_idx; /* current index into bvl_vec */
@@ -59,7 +62,10 @@ struct bio {
unsigned int bi_seg_front_size;
unsigned int bi_seg_back_size;
- bio_end_io_t *bi_end_io;
+ union {
+ bio_end_io_t *bi_end_io;
+ bio_batch_end_io_t *bi_batch_end_io;
+ };
void *bi_private;
#ifdef CONFIG_BLK_CGROUP
@@ -111,12 +117,13 @@ struct bio {
#define BIO_FS_INTEGRITY 9 /* fs owns integrity data, not block layer */
#define BIO_QUIET 10 /* Make BIO Quiet */
#define BIO_MAPPED_INTEGRITY 11/* integrity metadata has been remapped */
+#define BIO_BATCH_ENDIO 12
/*
* Flags starting here get preserved by bio_reset() - this includes
* BIO_POOL_IDX()
*/
-#define BIO_RESET_BITS 12
+#define BIO_RESET_BITS 13
#define bio_flagged(bio, flag) ((bio)->bi_flags & (1 << (flag)))
diff --git a/include/linux/blkdev.h b/include/linux/blkdev.h
index 1756001..c298293 100644
--- a/include/linux/blkdev.h
+++ b/include/linux/blkdev.h
@@ -867,7 +867,8 @@ extern struct request *blk_fetch_request(struct request_queue *q);
* This prevents code duplication in drivers.
*/
extern bool blk_update_request(struct request *rq, int error,
- unsigned int nr_bytes);
+ unsigned int nr_bytes,
+ struct batch_complete *batch);
extern bool blk_end_request(struct request *rq, int error,
unsigned int nr_bytes);
extern void blk_end_request_all(struct request *rq, int error);
@@ -875,10 +876,17 @@ extern bool blk_end_request_cur(struct request *rq, int error);
extern bool blk_end_request_err(struct request *rq, int error);
extern bool __blk_end_request(struct request *rq, int error,
unsigned int nr_bytes);
-extern void __blk_end_request_all(struct request *rq, int error);
extern bool __blk_end_request_cur(struct request *rq, int error);
extern bool __blk_end_request_err(struct request *rq, int error);
+extern void blk_end_request_all_batch(struct request *rq, int error,
+ struct batch_complete *batch);
+
+static inline void __blk_end_request_all(struct request *rq, int error)
+{
+ blk_end_request_all_batch(rq, error, NULL);
+}
+
extern void blk_complete_request(struct request *);
extern void __blk_complete_request(struct request *);
extern void blk_abort_request(struct request *);
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 30/32] virtio-blk: Convert to batch completion
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
drivers/block/virtio_blk.c | 31 ++++++++++++++++++++-----------
1 file changed, 20 insertions(+), 11 deletions(-)
diff --git a/drivers/block/virtio_blk.c b/drivers/block/virtio_blk.c
index 0bdde8f..6b659d1 100644
--- a/drivers/block/virtio_blk.c
+++ b/drivers/block/virtio_blk.c
@@ -210,7 +210,8 @@ static void virtblk_bio_send_flush_work(struct work_struct *work)
virtblk_bio_send_flush(vbr);
}
-static inline void virtblk_request_done(struct virtblk_req *vbr)
+static inline void virtblk_request_done(struct virtblk_req *vbr,
+ struct batch_complete *batch)
{
struct virtio_blk *vblk = vbr->vblk;
struct request *req = vbr->req;
@@ -224,11 +225,12 @@ static inline void virtblk_request_done(struct virtblk_req *vbr)
req->errors = (error != 0);
}
- __blk_end_request_all(req, error);
+ blk_end_request_all_batch(req, error, batch);
mempool_free(vbr, vblk->pool);
}
-static inline void virtblk_bio_flush_done(struct virtblk_req *vbr)
+static inline void virtblk_bio_flush_done(struct virtblk_req *vbr,
+ struct batch_complete *batch)
{
struct virtio_blk *vblk = vbr->vblk;
@@ -237,12 +239,13 @@ static inline void virtblk_bio_flush_done(struct virtblk_req *vbr)
INIT_WORK(&vbr->work, virtblk_bio_send_data_work);
queue_work(virtblk_wq, &vbr->work);
} else {
- bio_endio(vbr->bio, virtblk_result(vbr));
+ bio_endio_batch(vbr->bio, virtblk_result(vbr), batch);
mempool_free(vbr, vblk->pool);
}
}
-static inline void virtblk_bio_data_done(struct virtblk_req *vbr)
+static inline void virtblk_bio_data_done(struct virtblk_req *vbr,
+ struct batch_complete *batch)
{
struct virtio_blk *vblk = vbr->vblk;
@@ -252,17 +255,18 @@ static inline void virtblk_bio_data_done(struct virtblk_req *vbr)
INIT_WORK(&vbr->work, virtblk_bio_send_flush_work);
queue_work(virtblk_wq, &vbr->work);
} else {
- bio_endio(vbr->bio, virtblk_result(vbr));
+ bio_endio_batch(vbr->bio, virtblk_result(vbr), batch);
mempool_free(vbr, vblk->pool);
}
}
-static inline void virtblk_bio_done(struct virtblk_req *vbr)
+static inline void virtblk_bio_done(struct virtblk_req *vbr,
+ struct batch_complete *batch)
{
if (unlikely(vbr->flags & VBLK_IS_FLUSH))
- virtblk_bio_flush_done(vbr);
+ virtblk_bio_flush_done(vbr, batch);
else
- virtblk_bio_data_done(vbr);
+ virtblk_bio_data_done(vbr, batch);
}
static void virtblk_done(struct virtqueue *vq)
@@ -272,16 +276,19 @@ static void virtblk_done(struct virtqueue *vq)
struct virtblk_req *vbr;
unsigned long flags;
unsigned int len;
+ struct batch_complete batch;
+
+ batch_complete_init(&batch);
spin_lock_irqsave(vblk->disk->queue->queue_lock, flags);
do {
virtqueue_disable_cb(vq);
while ((vbr = virtqueue_get_buf(vblk->vq, &len)) != NULL) {
if (vbr->bio) {
- virtblk_bio_done(vbr);
+ virtblk_bio_done(vbr, &batch);
bio_done = true;
} else {
- virtblk_request_done(vbr);
+ virtblk_request_done(vbr, &batch);
req_done = true;
}
}
@@ -291,6 +298,8 @@ static void virtblk_done(struct virtqueue *vq)
blk_start_queue(vblk->disk->queue);
spin_unlock_irqrestore(vblk->disk->queue->queue_lock, flags);
+ batch_complete(&batch);
+
if (bio_done)
wake_up(&vblk->queue_wait);
}
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 31/32] mtip32xx: Convert to batch completion
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
drivers/block/mtip32xx/mtip32xx.c | 68 ++++++++++++++++++---------------------
drivers/block/mtip32xx/mtip32xx.h | 8 ++---
2 files changed, 34 insertions(+), 42 deletions(-)
diff --git a/drivers/block/mtip32xx/mtip32xx.c b/drivers/block/mtip32xx/mtip32xx.c
index 9694dd9..5a9982b 100644
--- a/drivers/block/mtip32xx/mtip32xx.c
+++ b/drivers/block/mtip32xx/mtip32xx.c
@@ -159,11 +159,9 @@ static void mtip_command_cleanup(struct driver_data *dd)
command = &port->commands[commandindex];
if (atomic_read(&command->active)
- && (command->async_callback)) {
- command->async_callback(command->async_data,
- -ENODEV);
- command->async_callback = NULL;
- command->async_data = NULL;
+ && (command->bio)) {
+ bio_endio(command->bio, -ENODEV);
+ command->bio = NULL;
}
dma_unmap_sg(&port->dd->pdev->dev,
@@ -603,11 +601,9 @@ static void mtip_timeout_function(unsigned long int data)
writel(1 << bit, port->completed[group]);
/* Call the async completion callback. */
- if (likely(command->async_callback))
- command->async_callback(command->async_data,
- -EIO);
- command->async_callback = NULL;
- command->comp_func = NULL;
+ if (likely(command->bio))
+ bio_endio(command->bio, -EIO);
+ command->bio = NULL;
/* Unmap the DMA scatter list entries */
dma_unmap_sg(&port->dd->pdev->dev,
@@ -675,7 +671,8 @@ static void mtip_timeout_function(unsigned long int data)
static void mtip_async_complete(struct mtip_port *port,
int tag,
void *data,
- int status)
+ int status,
+ struct batch_complete *batch)
{
struct mtip_cmd *command;
struct driver_data *dd = data;
@@ -692,11 +689,10 @@ static void mtip_async_complete(struct mtip_port *port,
}
/* Upper layer callback */
- if (likely(command->async_callback))
- command->async_callback(command->async_data, cb_status);
+ if (likely(command->bio))
+ bio_endio_batch(command->bio, cb_status, batch);
- command->async_callback = NULL;
- command->comp_func = NULL;
+ command->bio = NULL;
/* Unmap the DMA scatter list entries */
dma_unmap_sg(&dd->pdev->dev,
@@ -729,24 +725,22 @@ static void mtip_async_complete(struct mtip_port *port,
static void mtip_completion(struct mtip_port *port,
int tag,
void *data,
- int status)
+ int status,
+ struct batch_complete *batch)
{
- struct mtip_cmd *command = &port->commands[tag];
struct completion *waiting = data;
if (unlikely(status == PORT_IRQ_TF_ERR))
dev_warn(&port->dd->pdev->dev,
"Internal command %d completed with TFE\n", tag);
- command->async_callback = NULL;
- command->comp_func = NULL;
-
complete(waiting);
}
static void mtip_null_completion(struct mtip_port *port,
int tag,
void *data,
- int status)
+ int status,
+ struct batch_complete *batch)
{
return;
}
@@ -792,7 +786,7 @@ static void mtip_handle_tfe(struct driver_data *dd)
atomic_inc(&cmd->active); /* active > 1 indicates error */
if (cmd->comp_data && cmd->comp_func) {
cmd->comp_func(port, MTIP_TAG_INTERNAL,
- cmd->comp_data, PORT_IRQ_TF_ERR);
+ cmd->comp_data, PORT_IRQ_TF_ERR, NULL);
}
goto handle_tfe_exit;
}
@@ -825,7 +819,7 @@ static void mtip_handle_tfe(struct driver_data *dd)
cmd->comp_func(port,
tag,
cmd->comp_data,
- 0);
+ 0, NULL);
} else {
dev_err(&port->dd->pdev->dev,
"Missing completion func for tag %d",
@@ -912,7 +906,7 @@ static void mtip_handle_tfe(struct driver_data *dd)
if (cmd->comp_func) {
cmd->comp_func(port, tag,
cmd->comp_data,
- -ENODATA);
+ -ENODATA, NULL);
}
continue;
}
@@ -942,7 +936,7 @@ static void mtip_handle_tfe(struct driver_data *dd)
port,
tag,
cmd->comp_data,
- PORT_IRQ_TF_ERR);
+ PORT_IRQ_TF_ERR, NULL);
else
dev_warn(&port->dd->pdev->dev,
"Bad completion for tag %d\n",
@@ -969,6 +963,9 @@ static inline void mtip_process_sdbf(struct driver_data *dd)
int group, tag, bit;
u32 completed;
struct mtip_cmd *command;
+ struct batch_complete batch;
+
+ batch_complete_init(&batch);
/* walk all bits in all slot groups */
for (group = 0; group < dd->slot_groups; group++) {
@@ -997,7 +994,8 @@ static inline void mtip_process_sdbf(struct driver_data *dd)
port,
tag,
command->comp_data,
- 0);
+ 0,
+ &batch);
} else {
dev_warn(&dd->pdev->dev,
"Null completion "
@@ -1007,12 +1005,14 @@ static inline void mtip_process_sdbf(struct driver_data *dd)
if (mtip_check_surprise_removal(
dd->pdev)) {
mtip_command_cleanup(dd);
- return;
+ goto out;
}
}
}
}
}
+out:
+ batch_complete(&batch);
}
/*
@@ -1030,7 +1030,7 @@ static inline void mtip_process_legacy(struct driver_data *dd, u32 port_stat)
cmd->comp_func(port,
MTIP_TAG_INTERNAL,
cmd->comp_data,
- 0);
+ 0, NULL);
return;
}
}
@@ -2441,8 +2441,8 @@ static int mtip_hw_ioctl(struct driver_data *dd, unsigned int cmd,
* None
*/
static void mtip_hw_submit_io(struct driver_data *dd, sector_t sector,
- int nsect, int nents, int tag, void *callback,
- void *data, int dir)
+ int nsect, int nents, int tag,
+ struct bio *bio, int dir)
{
struct host_to_dev_fis *fis;
struct mtip_port *port = dd->port;
@@ -2497,12 +2497,7 @@ static void mtip_hw_submit_io(struct driver_data *dd, sector_t sector,
command->comp_func = mtip_async_complete;
command->direction = dma_dir;
- /*
- * Set the completion function and data for the command passed
- * from the upper layer.
- */
- command->async_data = data;
- command->async_callback = callback;
+ command->bio = bio;
/*
* To prevent this command from being issued
@@ -3672,7 +3667,6 @@ static void mtip_make_request(struct request_queue *queue, struct bio *bio)
bio_sectors(bio),
nents,
tag,
- bio_endio,
bio,
bio_data_dir(bio));
} else
diff --git a/drivers/block/mtip32xx/mtip32xx.h b/drivers/block/mtip32xx/mtip32xx.h
index b174264..5e094e8 100644
--- a/drivers/block/mtip32xx/mtip32xx.h
+++ b/drivers/block/mtip32xx/mtip32xx.h
@@ -296,11 +296,9 @@ struct mtip_cmd {
void (*comp_func)(struct mtip_port *port,
int tag,
void *data,
- int status);
- /* Additional callback function that may be called by comp_func() */
- void (*async_callback)(void *data, int status);
-
- void *async_data; /* Addl. data passed to async_callback() */
+ int status,
+ struct batch_complete *batch);
+ struct bio *bio;
int scatter_ents; /* Number of scatter list entries used */
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 32/32] aio: Smoosh struct kiocb
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
This patch squishes struct kiocb down to 160 bytes, from 208 previously
- mainly, some of the fields aren't needed until after aio_complete() is
called.
Also, reorder the fields to reduce the amount of memory that has to be
zeroed in aio_get_req(), and to keep members next to each other that are
used in the same place.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 22 +++++++++++--------
include/linux/aio.h | 61 +++++++++++++++++++++++++++++------------------------
2 files changed, 46 insertions(+), 37 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 0e70b0e..6b05ddb 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -570,12 +570,13 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
if (!get_reqs_available(ctx))
return NULL;
- req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
+ req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
if (unlikely(!req))
goto out_put;
- atomic_set(&req->ki_users, 1);
+ memset(req, 0, offsetof(struct kiocb, ki_ctx));
req->ki_ctx = ctx;
+ atomic_set(&req->ki_users, 1);
return req;
out_put:
put_reqs_available(ctx, 1);
@@ -633,8 +634,8 @@ static inline unsigned kioctx_ring_put(struct kioctx *ctx, struct kiocb *req,
ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
event = ev_page + pos % AIO_EVENTS_PER_PAGE;
- event->obj = (u64) req->ki_obj.user;
event->data = req->ki_user_data;
+ event->obj = (u64) req->ki_obj.user;
event->res = req->ki_res;
event->res2 = req->ki_res2;
@@ -1245,13 +1246,16 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
goto out_put_req;
}
- req->ki_obj.user = user_iocb;
- req->ki_user_data = iocb->aio_data;
- req->ki_pos = iocb->aio_offset;
+ req->ki_user_data = iocb->aio_data;
+ req->ki_obj.user = user_iocb;
- req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf;
- req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
- req->ki_opcode = iocb->aio_lio_opcode;
+ req->ki_opcode = iocb->aio_lio_opcode;
+ req->ki_pos = iocb->aio_offset;
+ req->ki_nbytes = iocb->aio_nbytes;
+ req->ki_left = iocb->aio_nbytes;
+ req->ki_buf = (char __user *) iocb->aio_buf;
+ req->ki_nr_segs = 0;
+ req->ki_cur_seg = 0;
ret = aio_run_iocb(req, compat);
if (ret)
diff --git a/include/linux/aio.h b/include/linux/aio.h
index db6b856..f9ffee3 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -20,45 +20,50 @@ struct batch_complete;
typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
struct kiocb {
- struct rb_node ki_node;
+ struct list_head ki_list; /* the aio core uses this
+ * for cancellation */
+ kiocb_cancel_fn *ki_cancel;
+ void (*ki_dtor)(struct kiocb *);
+ void *private;
+ struct iovec *ki_iovec;
+
+ /*
+ * If the aio_resfd field of the userspace iocb is not zero,
+ * this is the underlying eventfd context to deliver events to.
+ */
+ struct eventfd_ctx *ki_eventfd;
+ struct kioctx *ki_ctx; /* NULL for sync ops */
+ struct file *ki_filp;
atomic_t ki_users;
- struct file *ki_filp;
- struct kioctx *ki_ctx; /* NULL for sync ops */
- kiocb_cancel_fn *ki_cancel;
- void (*ki_dtor)(struct kiocb *);
+ /* State that we remember to be able to restart/retry */
+ unsigned ki_opcode;
+ __u64 ki_user_data; /* user's data for completion */
union {
void __user *user;
struct task_struct *tsk;
} ki_obj;
- __u64 ki_user_data; /* user's data for completion */
- long ki_res;
- long ki_res2;
-
- loff_t ki_pos;
+ union {
+ struct {
+ loff_t ki_pos;
+ size_t ki_nbytes; /* copy of iocb->aio_nbytes */
+ size_t ki_left; /* remaining bytes */
+ char __user *ki_buf; /* remaining iocb->aio_buf */
+ unsigned long ki_nr_segs;
+ unsigned long ki_cur_seg;
+ };
+
+ struct {
+ long ki_res;
+ long ki_res2;
+ struct rb_node ki_node;
+ };
+ };
- void *private;
- /* State that we remember to be able to restart/retry */
- unsigned short ki_opcode;
- size_t ki_nbytes; /* copy of iocb->aio_nbytes */
- char __user *ki_buf; /* remaining iocb->aio_buf */
- size_t ki_left; /* remaining bytes */
struct iovec ki_inline_vec; /* inline vector */
- struct iovec *ki_iovec;
- unsigned long ki_nr_segs;
- unsigned long ki_cur_seg;
-
- struct list_head ki_list; /* the aio core uses this
- * for cancellation */
-
- /*
- * If the aio_resfd field of the userspace iocb is not zero,
- * this is the underlying eventfd context to deliver events to.
- */
- struct eventfd_ctx *ki_eventfd;
};
static inline bool is_sync_kiocb(struct kiocb *kiocb)
--
1.7.12
--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo@kvack.org. For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: <a href=mailto:"aart@kvack.org">aart@kvack.org</a>
^ permalink raw reply related
* [PATCH 00/32] AIO performance improvements/cleanups, v3
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
Last posting: http://article.gmane.org/gmane.linux.kernel.aio.general/3242
As before, changes should mostly be noted in the patch descriptions.
Some random bits:
* flush_dcache_page() patch is new
* Rewrote the aio_read_evt() stuff again
* Fixed a few comments
* Included some more patches, notably the batch completion stuff
My git repo has Jens' aio/dio patches on top of this stuff. As of the
latest version, I'm seeing a couple percent better throughput with the
ring buffer, and I think Jens was seeing a couple percent better with
his linked list approach - at this point I think the difference is
noise, we're both testing with fairly crappy drivers.
Patch series is on top of v3.7, git repo is at
http://evilpiepirate.org/git/linux-bcache.git aio-upstream
Kent Overstreet (27):
aio: Kill return value of aio_complete()
aio: kiocb_cancel()
aio: Move private stuff out of aio.h
aio: dprintk() -> pr_debug()
aio: do fget() after aio_get_req()
aio: Make aio_put_req() lockless
aio: Refcounting cleanup
wait: Add wait_event_hrtimeout()
aio: Make aio_read_evt() more efficient, convert to hrtimers
aio: Use flush_dcache_page()
aio: Use cancellation list lazily
aio: Change reqs_active to include unreaped completions
aio: Kill batch allocation
aio: Kill struct aio_ring_info
aio: Give shared kioctx fields their own cachelines
aio: reqs_active -> reqs_available
aio: percpu reqs_available
Generic dynamic per cpu refcounting
aio: Percpu ioctx refcount
aio: use xchg() instead of completion_lock
aio: Don't include aio.h in sched.h
aio: Kill ki_key
aio: Kill ki_retry
block, aio: Batch completion for bios/kiocbs
virtio-blk: Convert to batch completion
mtip32xx: Convert to batch completion
aio: Smoosh struct kiocb
Zach Brown (5):
mm: remove old aio use_mm() comment
aio: remove dead code from aio.h
gadget: remove only user of aio retry
aio: remove retry-based AIO
char: add aio_{read,write} to /dev/{null,zero}
arch/s390/hypfs/inode.c | 1 +
block/blk-core.c | 34 +-
block/blk-flush.c | 2 +-
block/blk.h | 3 +-
block/scsi_ioctl.c | 1 +
drivers/block/mtip32xx/mtip32xx.c | 68 +-
drivers/block/mtip32xx/mtip32xx.h | 8 +-
drivers/block/swim3.c | 2 +-
drivers/block/virtio_blk.c | 31 +-
drivers/char/mem.c | 36 +
drivers/infiniband/hw/ipath/ipath_file_ops.c | 1 +
drivers/infiniband/hw/qib/qib_file_ops.c | 2 +-
drivers/md/dm.c | 2 +-
drivers/staging/android/logger.c | 1 +
drivers/usb/gadget/inode.c | 42 +-
fs/9p/vfs_addr.c | 1 +
fs/afs/write.c | 1 +
fs/aio.c | 1766 +++++++++++---------------
fs/bio.c | 52 +-
fs/block_dev.c | 1 +
fs/btrfs/file.c | 1 +
fs/btrfs/inode.c | 1 +
fs/ceph/file.c | 1 +
fs/compat.c | 1 +
fs/direct-io.c | 21 +-
fs/ecryptfs/file.c | 1 +
fs/ext2/inode.c | 1 +
fs/ext3/inode.c | 1 +
fs/ext4/file.c | 1 +
fs/ext4/indirect.c | 1 +
fs/ext4/inode.c | 1 +
fs/ext4/page-io.c | 1 +
fs/fat/inode.c | 1 +
fs/fuse/dev.c | 1 +
fs/fuse/file.c | 1 +
fs/gfs2/aops.c | 1 +
fs/gfs2/file.c | 1 +
fs/hfs/inode.c | 1 +
fs/hfsplus/inode.c | 1 +
fs/jfs/inode.c | 1 +
fs/nilfs2/inode.c | 2 +-
fs/ntfs/file.c | 1 +
fs/ntfs/inode.c | 1 +
fs/ocfs2/aops.h | 2 +
fs/ocfs2/dlmglue.c | 2 +-
fs/ocfs2/inode.h | 2 +
fs/pipe.c | 1 +
fs/read_write.c | 35 +-
fs/reiserfs/inode.c | 1 +
fs/ubifs/file.c | 1 +
fs/udf/inode.c | 1 +
fs/xfs/xfs_aops.c | 1 +
fs/xfs/xfs_file.c | 1 +
include/linux/aio.h | 242 +---
include/linux/bio.h | 36 +-
include/linux/blk_types.h | 11 +-
include/linux/blkdev.h | 12 +-
include/linux/cgroup.h | 1 +
include/linux/errno.h | 1 -
include/linux/percpu-refcount.h | 29 +
include/linux/sched.h | 2 -
include/linux/wait.h | 86 ++
kernel/fork.c | 1 +
kernel/printk.c | 1 +
kernel/ptrace.c | 1 +
lib/Makefile | 2 +-
lib/percpu-refcount.c | 164 +++
mm/mmu_context.c | 3 -
mm/page_io.c | 1 +
mm/shmem.c | 1 +
mm/swap.c | 1 +
security/keys/internal.h | 2 +
security/keys/keyctl.c | 1 +
sound/core/pcm_native.c | 2 +-
74 files changed, 1350 insertions(+), 1396 deletions(-)
create mode 100644 include/linux/percpu-refcount.h
create mode 100644 lib/percpu-refcount.c
--
1.7.12
^ permalink raw reply
* [PATCH 01/32] mm: remove old aio use_mm() comment
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Zach Brown, bcrl, jmoyer, axboe, viro, tytso, Kent Overstreet
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
From: Zach Brown <zab@redhat.com>
use_mm() is used in more places than just aio. There's no need to
mention callers when describing the function.
Signed-off-by: Zach Brown <zab@redhat.com>
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
mm/mmu_context.c | 3 ---
1 file changed, 3 deletions(-)
diff --git a/mm/mmu_context.c b/mm/mmu_context.c
index 3dcfaf4..8a8cd02 100644
--- a/mm/mmu_context.c
+++ b/mm/mmu_context.c
@@ -14,9 +14,6 @@
* use_mm
* Makes the calling kernel thread take on the specified
* mm context.
- * Called by the retry thread execute retries within the
- * iocb issuer's mm context, so that copy_from/to_user
- * operations work seamlessly for aio.
* (Note: this routine is intended to be called only
* from a kernel thread context)
*/
--
1.7.12
^ permalink raw reply related
* [PATCH 02/32] aio: remove dead code from aio.h
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Zach Brown, bcrl, jmoyer, axboe, viro, tytso, Kent Overstreet
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
From: Zach Brown <zab@redhat.com>
Signed-off-by: Zach Brown <zab@redhat.com>
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
include/linux/aio.h | 24 ------------------------
1 file changed, 24 deletions(-)
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 31ff6db..b46a09f 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -9,44 +9,22 @@
#include <linux/atomic.h>
-#define AIO_MAXSEGS 4
-#define AIO_KIOGRP_NR_ATOMIC 8
-
struct kioctx;
-/* Notes on cancelling a kiocb:
- * If a kiocb is cancelled, aio_complete may return 0 to indicate
- * that cancel has not yet disposed of the kiocb. All cancel
- * operations *must* call aio_put_req to dispose of the kiocb
- * to guard against races with the completion code.
- */
-#define KIOCB_C_CANCELLED 0x01
-#define KIOCB_C_COMPLETE 0x02
-
#define KIOCB_SYNC_KEY (~0U)
/* ki_flags bits */
-/*
- * This may be used for cancel/retry serialization in the future, but
- * for now it's unused and we probably don't want modules to even
- * think they can use it.
- */
-/* #define KIF_LOCKED 0 */
#define KIF_KICKED 1
#define KIF_CANCELLED 2
-#define kiocbTryLock(iocb) test_and_set_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbTryKick(iocb) test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags)
-#define kiocbSetLocked(iocb) set_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbSetKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-#define kiocbClearLocked(iocb) clear_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbClearKicked(iocb) clear_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-#define kiocbIsLocked(iocb) test_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbIsKicked(iocb) test_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags)
@@ -207,8 +185,6 @@ struct kioctx {
};
/* prototypes */
-extern unsigned aio_max_size;
-
#ifdef CONFIG_AIO
extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb);
extern int aio_put_req(struct kiocb *iocb);
--
1.7.12
^ permalink raw reply related
* [PATCH 07/32] aio: kiocb_cancel()
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Minor refactoring, to get rid of some duplicated code
v2: Fix return value for NULL kiocb, so it matches old code; change
synchronization to use xchg() instead of a bit in ki_flags, so we can
get rid of ki_flags.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 79 +++++++++++++++++++++++++++++++++++-----------------------------
1 file changed, 43 insertions(+), 36 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 0b85822..e1d4084 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -217,6 +217,29 @@ static inline void put_ioctx(struct kioctx *kioctx)
__put_ioctx(kioctx);
}
+static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
+ struct io_event *res)
+{
+ int (*cancel)(struct kiocb *, struct io_event *);
+ int ret = -EINVAL;
+
+ cancel = kiocb->ki_cancel;
+ kiocbSetCancelled(kiocb);
+ if (cancel) {
+ kiocb->ki_users++;
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ memset(res, 0, sizeof(*res));
+ res->obj = (u64) kiocb->ki_obj.user;
+ res->data = kiocb->ki_user_data;
+ ret = cancel(kiocb, res);
+
+ spin_lock_irq(&ctx->ctx_lock);
+ }
+
+ return ret;
+}
+
/* ioctx_alloc
* Allocates and initializes an ioctx. Returns an ERR_PTR if it failed.
*/
@@ -287,25 +310,19 @@ out_freectx:
*/
static void kill_ctx(struct kioctx *ctx)
{
- int (*cancel)(struct kiocb *, struct io_event *);
struct task_struct *tsk = current;
DECLARE_WAITQUEUE(wait, tsk);
struct io_event res;
+ struct kiocb *req;
spin_lock_irq(&ctx->ctx_lock);
ctx->dead = 1;
while (!list_empty(&ctx->active_reqs)) {
- struct list_head *pos = ctx->active_reqs.next;
- struct kiocb *iocb = list_kiocb(pos);
- list_del_init(&iocb->ki_list);
- cancel = iocb->ki_cancel;
- kiocbSetCancelled(iocb);
- if (cancel) {
- iocb->ki_users++;
- spin_unlock_irq(&ctx->ctx_lock);
- cancel(iocb, &res);
- spin_lock_irq(&ctx->ctx_lock);
- }
+ req = list_first_entry(&ctx->active_reqs,
+ struct kiocb, ki_list);
+
+ list_del_init(&req->ki_list);
+ kiocb_cancel(ctx, req, &res);
}
if (!ctx->reqs_active)
@@ -1409,7 +1426,7 @@ static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
struct io_event __user *, result)
{
- int (*cancel)(struct kiocb *iocb, struct io_event *res);
+ struct io_event res;
struct kioctx *ctx;
struct kiocb *kiocb;
u32 key;
@@ -1424,32 +1441,22 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
return -EINVAL;
spin_lock_irq(&ctx->ctx_lock);
- ret = -EAGAIN;
+
kiocb = lookup_kiocb(ctx, iocb, key);
- if (kiocb && kiocb->ki_cancel) {
- cancel = kiocb->ki_cancel;
- kiocb->ki_users ++;
- kiocbSetCancelled(kiocb);
- } else
- cancel = NULL;
+ if (kiocb)
+ ret = kiocb_cancel(ctx, kiocb, &res);
+ else
+ ret = -EINVAL;
+
spin_unlock_irq(&ctx->ctx_lock);
- if (NULL != cancel) {
- struct io_event tmp;
- pr_debug("calling cancel\n");
- memset(&tmp, 0, sizeof(tmp));
- tmp.obj = (u64)(unsigned long)kiocb->ki_obj.user;
- tmp.data = kiocb->ki_user_data;
- ret = cancel(kiocb, &tmp);
- if (!ret) {
- /* Cancellation succeeded -- copy the result
- * into the user's buffer.
- */
- if (copy_to_user(result, &tmp, sizeof(tmp)))
- ret = -EFAULT;
- }
- } else
- ret = -EINVAL;
+ if (!ret) {
+ /* Cancellation succeeded -- copy the result
+ * into the user's buffer.
+ */
+ if (copy_to_user(result, &res, sizeof(res)))
+ ret = -EFAULT;
+ }
put_ioctx(ctx);
--
1.7.12
^ permalink raw reply related
* [PATCH 09/32] aio: dprintk() -> pr_debug()
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 57 ++++++++++++++++++++++++---------------------------------
1 file changed, 24 insertions(+), 33 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 8fcea98..868ac0a 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -8,6 +8,8 @@
*
* See ../COPYING for licensing terms.
*/
+#define pr_fmt(fmt) "%s: " fmt, __func__
+
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
@@ -18,8 +20,6 @@
#include <linux/backing-dev.h>
#include <linux/uio.h>
-#define DEBUG 0
-
#include <linux/sched.h>
#include <linux/fs.h>
#include <linux/file.h>
@@ -39,12 +39,6 @@
#include <asm/kmap_types.h>
#include <asm/uaccess.h>
-#if DEBUG > 1
-#define dprintk printk
-#else
-#define dprintk(x...) do { ; } while (0)
-#endif
-
#define AIO_RING_MAGIC 0xa10a10a1
#define AIO_RING_COMPAT_FEATURES 1
#define AIO_RING_INCOMPAT_FEATURES 0
@@ -124,7 +118,7 @@ static int __init aio_setup(void)
kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
- pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
+ pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page));
return 0;
}
@@ -178,7 +172,7 @@ static int aio_setup_ring(struct kioctx *ctx)
}
info->mmap_size = nr_pages * PAGE_SIZE;
- dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
+ pr_debug("attempting mmap of %lu bytes\n", info->mmap_size);
down_write(&mm->mmap_sem);
info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size,
PROT_READ|PROT_WRITE,
@@ -190,7 +184,7 @@ static int aio_setup_ring(struct kioctx *ctx)
return -EAGAIN;
}
- dprintk("mmap address: 0x%08lx\n", info->mmap_base);
+ pr_debug("mmap address: 0x%08lx\n", info->mmap_base);
info->nr_pages = get_user_pages(current, mm, info->mmap_base, nr_pages,
1, 0, info->ring_pages, NULL);
up_write(&mm->mmap_sem);
@@ -262,7 +256,7 @@ static void __put_ioctx(struct kioctx *ctx)
aio_nr -= nr_events;
spin_unlock(&aio_nr_lock);
}
- pr_debug("__put_ioctx: freeing %p\n", ctx);
+ pr_debug("freeing %p\n", ctx);
call_rcu(&ctx->rcu_head, ctx_rcu_free);
}
@@ -351,7 +345,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
spin_unlock(&mm->ioctx_lock);
- dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
+ pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
ctx, ctx->user_id, mm, ctx->ring_info.nr);
return ctx;
@@ -360,7 +354,7 @@ out_cleanup:
aio_free_ring(ctx);
out_freectx:
kmem_cache_free(kioctx_cachep, ctx);
- dprintk("aio: error allocating ioctx %d\n", err);
+ pr_debug("error allocating ioctx %d\n", err);
return ERR_PTR(err);
}
@@ -608,8 +602,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
*/
static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
{
- dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld\n",
- req, atomic_long_read(&req->ki_filp->f_count));
+ pr_debug("(%p): f_count=%ld\n",
+ req, atomic_long_read(&req->ki_filp->f_count));
assert_spin_locked(&ctx->ctx_lock);
@@ -720,9 +714,9 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
event->res = res;
event->res2 = res2;
- dprintk("aio_complete: %p[%lu]: %p: %p %Lx %lx %lx\n",
- ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
- res, res2);
+ pr_debug("%p[%lu]: %p: %p %Lx %lx %lx\n",
+ ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
+ res, res2);
/* after flagging the request as done, we
* must never even look at it again
@@ -778,9 +772,7 @@ static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
int ret = 0;
ring = kmap_atomic(info->ring_pages[0]);
- dprintk("in aio_read_evt h%lu t%lu m%lu\n",
- (unsigned long)ring->head, (unsigned long)ring->tail,
- (unsigned long)ring->nr);
+ pr_debug("h%u t%u m%u\n", ring->head, ring->tail, ring->nr);
if (ring->head == ring->tail)
goto out;
@@ -801,8 +793,7 @@ static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
out:
kunmap_atomic(ring);
- dprintk("leaving aio_read_evt: %d h%lu t%lu\n", ret,
- (unsigned long)ring->head, (unsigned long)ring->tail);
+ pr_debug("%d h%u t%u\n", ret, ring->head, ring->tail);
return ret;
}
@@ -865,13 +856,13 @@ static int read_events(struct kioctx *ctx,
if (unlikely(ret <= 0))
break;
- dprintk("read event: %Lx %Lx %Lx %Lx\n",
- ent.data, ent.obj, ent.res, ent.res2);
+ pr_debug("%Lx %Lx %Lx %Lx\n",
+ ent.data, ent.obj, ent.res, ent.res2);
/* Could we split the check in two? */
ret = -EFAULT;
if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- dprintk("aio: lost an event due to EFAULT.\n");
+ pr_debug("lost an event due to EFAULT.\n");
break;
}
ret = 0;
@@ -934,7 +925,7 @@ static int read_events(struct kioctx *ctx,
ret = -EFAULT;
if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- dprintk("aio: lost an event due to EFAULT.\n");
+ pr_debug("lost an event due to EFAULT.\n");
break;
}
@@ -965,7 +956,7 @@ static void io_destroy(struct kioctx *ioctx)
hlist_del_rcu(&ioctx->list);
spin_unlock(&mm->ioctx_lock);
- dprintk("aio_release(%p)\n", ioctx);
+ pr_debug("(%p)\n", ioctx);
if (likely(!was_dead))
put_ioctx(ioctx); /* twice for the list */
@@ -1258,7 +1249,7 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
kiocb->ki_retry = aio_fsync;
break;
default:
- dprintk("EINVAL: io_submit: no operation provided\n");
+ pr_debug("EINVAL: no operation provided\n");
ret = -EINVAL;
}
@@ -1278,7 +1269,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
/* enforce forwards compatibility on users */
if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
- pr_debug("EINVAL: io_submit: reserve field set\n");
+ pr_debug("EINVAL: reserve field set\n");
return -EINVAL;
}
@@ -1319,7 +1310,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
ret = put_user(req->ki_key, &user_iocb->aio_key);
if (unlikely(ret)) {
- dprintk("EFAULT: aio_key\n");
+ pr_debug("EFAULT: aio_key\n");
goto out_put_req;
}
@@ -1400,7 +1391,7 @@ long do_io_submit(aio_context_t ctx_id, long nr,
ctx = lookup_ioctx(ctx_id);
if (unlikely(!ctx)) {
- pr_debug("EINVAL: io_submit: invalid context id\n");
+ pr_debug("EINVAL: invalid context id\n");
return -EINVAL;
}
--
1.7.12
^ permalink raw reply related
* [PATCH 22/32] aio: percpu reqs_available
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
See the previous patch for why we want to do this - this basically
implements a per cpu allocator for reqs_available that doesn't actually
allocate anything.
Note that we need to increase the size of the ringbuffer we allocate,
since a single thread won't necessarily be able to use all the
reqs_available slots - some (up to about half) might be on other per cpu
lists, unavailable for the current thread.
We size the ringbuffer based on the nr_events userspace passed to
io_setup(), so this is a slight behaviour change - but nr_events wasn't
being used as a hard limit before, it was being rounded up to the next
page before so this doesn't change the actual semantics.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 85 insertions(+), 7 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index d384eb2..e415b33 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -26,6 +26,7 @@
#include <linux/mm.h>
#include <linux/mman.h>
#include <linux/mmu_context.h>
+#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/timer.h>
#include <linux/aio.h>
@@ -59,6 +60,10 @@ struct aio_ring {
#define AIO_RING_PAGES 8
+struct kioctx_cpu {
+ unsigned reqs_available;
+};
+
struct kioctx {
atomic_t users;
atomic_t dead;
@@ -67,6 +72,10 @@ struct kioctx {
unsigned long user_id;
struct hlist_node list;
+ struct __percpu kioctx_cpu *cpu;
+
+ unsigned req_batch;
+
unsigned nr;
/* sys_io_setup currently limits this to an unsigned int */
@@ -149,6 +158,9 @@ static int aio_setup_ring(struct kioctx *ctx)
unsigned long size;
int nr_pages;
+ nr_events = max(nr_events, num_possible_cpus() * 4);
+ nr_events *= 2;
+
/* Compensate for the ring buffer's head/tail overlap entry */
nr_events += 2; /* 1 is required, 2 for good luck */
@@ -255,6 +267,8 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
static void free_ioctx_rcu(struct rcu_head *head)
{
struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+
+ free_percpu(ctx->cpu);
kmem_cache_free(kioctx_cachep, ctx);
}
@@ -268,7 +282,7 @@ static void free_ioctx(struct kioctx *ctx)
struct aio_ring *ring;
struct io_event res;
struct kiocb *req;
- unsigned head, avail;
+ unsigned cpu, head, avail;
spin_lock_irq(&ctx->ctx_lock);
@@ -282,6 +296,13 @@ static void free_ioctx(struct kioctx *ctx)
spin_unlock_irq(&ctx->ctx_lock);
+ for_each_possible_cpu(cpu) {
+ struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu);
+
+ atomic_add(kcpu->reqs_available, &ctx->reqs_available);
+ kcpu->reqs_available = 0;
+ }
+
ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head;
kunmap_atomic(ring);
@@ -357,10 +378,16 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
INIT_LIST_HEAD(&ctx->active_reqs);
- if (aio_setup_ring(ctx) < 0)
+ ctx->cpu = alloc_percpu(struct kioctx_cpu);
+ if (!ctx->cpu)
goto out_freectx;
+ if (aio_setup_ring(ctx) < 0)
+ goto out_freepcpu;
+
atomic_set(&ctx->reqs_available, ctx->nr);
+ ctx->req_batch = ctx->nr / (num_possible_cpus() * 4);
+ BUG_ON(!ctx->req_batch);
/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
@@ -384,6 +411,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
out_cleanup:
err = -EAGAIN;
aio_free_ring(ctx);
+out_freepcpu:
+ free_percpu(ctx->cpu);
out_freectx:
kmem_cache_free(kioctx_cachep, ctx);
pr_debug("error allocating ioctx %d\n", err);
@@ -482,6 +511,52 @@ void exit_aio(struct mm_struct *mm)
}
}
+static void put_reqs_available(struct kioctx *ctx, unsigned nr)
+{
+ struct kioctx_cpu *kcpu;
+
+ preempt_disable();
+ kcpu = this_cpu_ptr(ctx->cpu);
+
+ kcpu->reqs_available += nr;
+ while (kcpu->reqs_available >= ctx->req_batch * 2) {
+ kcpu->reqs_available -= ctx->req_batch;
+ atomic_add(ctx->req_batch, &ctx->reqs_available);
+ }
+
+ preempt_enable();
+}
+
+static bool get_reqs_available(struct kioctx *ctx)
+{
+ struct kioctx_cpu *kcpu;
+ bool ret = false;
+
+ preempt_disable();
+ kcpu = this_cpu_ptr(ctx->cpu);
+
+ if (!kcpu->reqs_available) {
+ int old, avail = atomic_read(&ctx->reqs_available);
+
+ do {
+ if (avail < ctx->req_batch)
+ goto out;
+
+ old = avail;
+ avail = atomic_cmpxchg(&ctx->reqs_available,
+ avail, avail - ctx->req_batch);
+ } while (avail != old);
+
+ kcpu->reqs_available += ctx->req_batch;
+ }
+
+ ret = true;
+ kcpu->reqs_available--;
+out:
+ preempt_enable();
+ return ret;
+}
+
/* aio_get_req
* Allocate a slot for an aio request. Increments the ki_users count
* of the kioctx so that the kioctx stays around until all requests are
@@ -496,7 +571,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
- if (atomic_dec_if_positive(&ctx->reqs_available) <= 0)
+ if (!get_reqs_available(ctx))
return NULL;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
@@ -505,10 +580,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
atomic_set(&req->ki_users, 2);
req->ki_ctx = ctx;
-
return req;
out_put:
- atomic_inc(&ctx->reqs_available);
+ put_reqs_available(ctx, 1);
return NULL;
}
@@ -597,6 +671,10 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
if (unlikely(xchg(&iocb->ki_cancel,
KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
+ /*
+ * Can't use the percpu reqs_available here - could race with
+ * free_ioctx()
+ */
atomic_inc(&ctx->reqs_available);
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
@@ -737,7 +815,7 @@ static int aio_read_events_ring(struct kioctx *ctx,
pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
- atomic_add(ret, &ctx->reqs_available);
+ put_reqs_available(ctx, ret);
out:
mutex_unlock(&ctx->ring_lock);
@@ -1156,7 +1234,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
- atomic_inc(&ctx->reqs_available);
+ put_reqs_available(ctx, 1);
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
--
1.7.12
^ permalink raw reply related
* [PATCH 21/32] aio: reqs_active -> reqs_available
From: Kent Overstreet @ 2012-12-27 2:00 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
The number of outstanding kiocbs is one of the few shared things left
that has to be touched for every kiocb - it'd be nice to make it percpu.
We can make it per cpu by treating it like an allocation problem: we
have a maximum number of kiocbs that can be outstanding (i.e. slots) -
then we just allocate and free slots, and we know how to write per cpu
allocators.
So as prep work for that, we convert reqs_active to reqs_available.
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 27 +++++++++++++--------------
1 file changed, 13 insertions(+), 14 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index fa87732..d384eb2 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -82,7 +82,7 @@ struct kioctx {
struct work_struct rcu_work;
struct {
- atomic_t reqs_active;
+ atomic_t reqs_available;
} ____cacheline_aligned;
struct {
@@ -286,17 +286,17 @@ static void free_ioctx(struct kioctx *ctx)
head = ring->head;
kunmap_atomic(ring);
- while (atomic_read(&ctx->reqs_active) > 0) {
+ while (atomic_read(&ctx->reqs_available) < ctx->nr) {
wait_event(ctx->wait, head != ctx->tail);
avail = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
- atomic_sub(avail, &ctx->reqs_active);
+ atomic_add(avail, &ctx->reqs_available);
head += avail;
head %= ctx->nr;
}
- WARN_ON(atomic_read(&ctx->reqs_active) < 0);
+ WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr);
aio_free_ring(ctx);
@@ -360,6 +360,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
if (aio_setup_ring(ctx) < 0)
goto out_freectx;
+ atomic_set(&ctx->reqs_available, ctx->nr);
+
/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
if (aio_nr + nr_events > aio_max_nr ||
@@ -462,7 +464,7 @@ void exit_aio(struct mm_struct *mm)
"exit_aio:ioctx still alive: %d %d %d\n",
atomic_read(&ctx->users),
atomic_read(&ctx->dead),
- atomic_read(&ctx->reqs_active));
+ atomic_read(&ctx->reqs_available));
/*
* We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything.
@@ -494,12 +496,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
- if (atomic_read(&ctx->reqs_active) >= ctx->nr)
+ if (atomic_dec_if_positive(&ctx->reqs_available) <= 0)
return NULL;
- if (atomic_inc_return(&ctx->reqs_active) > ctx->nr)
- goto out_put;
-
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req))
goto out_put;
@@ -509,7 +508,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
return req;
out_put:
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
return NULL;
}
@@ -580,7 +579,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
/*
* Take rcu_read_lock() in case the kioctx is being destroyed, as we
- * need to issue a wakeup after decrementing reqs_active.
+ * need to issue a wakeup after incrementing reqs_available.
*/
rcu_read_lock();
@@ -598,7 +597,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
if (unlikely(xchg(&iocb->ki_cancel,
KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
}
@@ -738,7 +737,7 @@ static int aio_read_events_ring(struct kioctx *ctx,
pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
- atomic_sub(ret, &ctx->reqs_active);
+ atomic_add(ret, &ctx->reqs_available);
out:
mutex_unlock(&ctx->ring_lock);
@@ -1157,7 +1156,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
--
1.7.12
^ permalink raw reply related
* [PATCH 20/32] aio: Give shared kioctx fields their own cachelines
From: Kent Overstreet @ 2012-12-27 1:59 UTC (permalink / raw)
To: linux-kernel, linux-aio, linux-fsdevel
Cc: Kent Overstreet, zab, bcrl, jmoyer, axboe, viro, tytso
In-Reply-To: <1356573611-18590-1-git-send-email-koverstreet@google.com>
Signed-off-by: Kent Overstreet <koverstreet@google.com>
---
fs/aio.c | 27 +++++++++++++++------------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 96fbd6b..fa87732 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -67,13 +67,6 @@ struct kioctx {
unsigned long user_id;
struct hlist_node list;
- wait_queue_head_t wait;
-
- spinlock_t ctx_lock;
-
- atomic_t reqs_active;
- struct list_head active_reqs; /* used for cancellation */
-
unsigned nr;
/* sys_io_setup currently limits this to an unsigned int */
@@ -85,19 +78,29 @@ struct kioctx {
struct page **ring_pages;
long nr_pages;
+ struct rcu_head rcu_head;
+ struct work_struct rcu_work;
+
struct {
- struct mutex ring_lock;
+ atomic_t reqs_active;
} ____cacheline_aligned;
struct {
+ spinlock_t ctx_lock;
+ struct list_head active_reqs; /* used for cancellation */
+ } ____cacheline_aligned_in_smp;
+
+ struct {
+ struct mutex ring_lock;
+ wait_queue_head_t wait;
+ } ____cacheline_aligned_in_smp;
+
+ struct {
unsigned tail;
spinlock_t completion_lock;
- } ____cacheline_aligned;
+ } ____cacheline_aligned_in_smp;
struct page *internal_pages[AIO_RING_PAGES];
-
- struct rcu_head rcu_head;
- struct work_struct rcu_work;
};
/*------ sysctl variables----*/
--
1.7.12
^ permalink raw reply related
page: next (older) | prev (newer) | latest
- recent:[subjects (threaded)|topics (new)|topics (active)]
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.