linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 2/4] direct-io: make O_DIRECT IO path be page based
  2009-08-18  8:34 [PATCH 0/4] Page based O_DIRECT v2 Jens Axboe
@ 2009-08-18  8:34 ` Jens Axboe
  0 siblings, 0 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-18  8:34 UTC (permalink / raw)
  To: linux-kernel; +Cc: zach.brown, hch, Jens Axboe

Currently we pass in the iovec array and let the O_DIRECT core
handle the get_user_pages() business. This work, but it means that
we can ever only use user pages for O_DIRECT.

Switch the aops->direct_IO() and below code to use page arrays
instead, so that it doesn't make any assumptions about who the pages
belong to. This works directly for all users but NFS, which just
uses the same helper that the generic mapping read/write functions
also call.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 fs/direct-io.c         |  304 ++++++++++++++++++++----------------------------
 fs/nfs/direct.c        |  161 +++++++++----------------
 fs/nfs/file.c          |    8 +-
 include/linux/fs.h     |   15 ++-
 include/linux/nfs_fs.h |    7 +-
 mm/filemap.c           |    6 +-
 6 files changed, 206 insertions(+), 295 deletions(-)

diff --git a/fs/direct-io.c b/fs/direct-io.c
index 181848c..22a945b 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -38,12 +38,6 @@
 #include <asm/atomic.h>
 
 /*
- * How many user pages to map in one call to get_user_pages().  This determines
- * the size of a structure on the stack.
- */
-#define DIO_PAGES	64
-
-/*
  * This code generally works in units of "dio_blocks".  A dio_block is
  * somewhere between the hard sector size and the filesystem block size.  it
  * is determined on a per-invocation basis.   When talking to the filesystem
@@ -105,20 +99,13 @@ struct dio {
 	sector_t cur_page_block;	/* Where it starts */
 
 	/*
-	 * Page fetching state. These variables belong to dio_refill_pages().
-	 */
-	int curr_page;			/* changes */
-	int total_pages;		/* doesn't change */
-	unsigned long curr_user_address;/* changes */
-
-	/*
 	 * Page queue.  These variables belong to dio_refill_pages() and
 	 * dio_get_page().
 	 */
-	struct page *pages[DIO_PAGES];	/* page buffer */
-	unsigned head;			/* next page to process */
-	unsigned tail;			/* last valid page + 1 */
-	int page_errors;		/* errno from get_user_pages() */
+	struct page **pages;		/* page buffer */
+	unsigned int head_page;		/* next page to process */
+	unsigned int total_pages;	/* last valid page + 1 */
+	unsigned int first_page_off;	/* offset into first page in map */
 
 	/* BIO completion state */
 	spinlock_t bio_lock;		/* protects BIO fields below */
@@ -134,57 +121,6 @@ struct dio {
 };
 
 /*
- * How many pages are in the queue?
- */
-static inline unsigned dio_pages_present(struct dio *dio)
-{
-	return dio->tail - dio->head;
-}
-
-/*
- * Go grab and pin some userspace pages.   Typically we'll get 64 at a time.
- */
-static int dio_refill_pages(struct dio *dio)
-{
-	int ret;
-	int nr_pages;
-
-	nr_pages = min(dio->total_pages - dio->curr_page, DIO_PAGES);
-	ret = get_user_pages_fast(
-		dio->curr_user_address,		/* Where from? */
-		nr_pages,			/* How many pages? */
-		dio->rw == READ,		/* Write to memory? */
-		&dio->pages[0]);		/* Put results here */
-
-	if (ret < 0 && dio->blocks_available && (dio->rw & WRITE)) {
-		struct page *page = ZERO_PAGE(0);
-		/*
-		 * A memory fault, but the filesystem has some outstanding
-		 * mapped blocks.  We need to use those blocks up to avoid
-		 * leaking stale data in the file.
-		 */
-		if (dio->page_errors == 0)
-			dio->page_errors = ret;
-		page_cache_get(page);
-		dio->pages[0] = page;
-		dio->head = 0;
-		dio->tail = 1;
-		ret = 0;
-		goto out;
-	}
-
-	if (ret >= 0) {
-		dio->curr_user_address += ret * PAGE_SIZE;
-		dio->curr_page += ret;
-		dio->head = 0;
-		dio->tail = ret;
-		ret = 0;
-	}
-out:
-	return ret;	
-}
-
-/*
  * Get another userspace page.  Returns an ERR_PTR on error.  Pages are
  * buffered inside the dio so that we can call get_user_pages() against a
  * decent number of pages, less frequently.  To provide nicer use of the
@@ -192,15 +128,10 @@ out:
  */
 static struct page *dio_get_page(struct dio *dio)
 {
-	if (dio_pages_present(dio) == 0) {
-		int ret;
+	if (dio->head_page < dio->total_pages)
+		return dio->pages[dio->head_page++];
 
-		ret = dio_refill_pages(dio);
-		if (ret)
-			return ERR_PTR(ret);
-		BUG_ON(dio_pages_present(dio) == 0);
-	}
-	return dio->pages[dio->head++];
+	return NULL;
 }
 
 /**
@@ -245,8 +176,6 @@ static int dio_complete(struct dio *dio, loff_t offset, int ret)
 		up_read_non_owner(&dio->inode->i_alloc_sem);
 
 	if (ret == 0)
-		ret = dio->page_errors;
-	if (ret == 0)
 		ret = dio->io_error;
 	if (ret == 0)
 		ret = transferred;
@@ -351,8 +280,10 @@ static void dio_bio_submit(struct dio *dio)
  */
 static void dio_cleanup(struct dio *dio)
 {
-	while (dio_pages_present(dio))
-		page_cache_release(dio_get_page(dio));
+	struct page *page;
+
+	while ((page = dio_get_page(dio)) != NULL)
+		page_cache_release(page);
 }
 
 /*
@@ -490,7 +421,6 @@ static int dio_bio_reap(struct dio *dio)
  */
 static int get_more_blocks(struct dio *dio)
 {
-	int ret;
 	struct buffer_head *map_bh = &dio->map_bh;
 	sector_t fs_startblk;	/* Into file, in filesystem-sized blocks */
 	unsigned long fs_count;	/* Number of filesystem-sized blocks */
@@ -502,38 +432,33 @@ static int get_more_blocks(struct dio *dio)
 	 * If there was a memory error and we've overwritten all the
 	 * mapped blocks then we can now return that memory error
 	 */
-	ret = dio->page_errors;
-	if (ret == 0) {
-		BUG_ON(dio->block_in_file >= dio->final_block_in_request);
-		fs_startblk = dio->block_in_file >> dio->blkfactor;
-		dio_count = dio->final_block_in_request - dio->block_in_file;
-		fs_count = dio_count >> dio->blkfactor;
-		blkmask = (1 << dio->blkfactor) - 1;
-		if (dio_count & blkmask)	
-			fs_count++;
-
-		map_bh->b_state = 0;
-		map_bh->b_size = fs_count << dio->inode->i_blkbits;
-
-		create = dio->rw & WRITE;
-		if (dio->lock_type == DIO_LOCKING) {
-			if (dio->block_in_file < (i_size_read(dio->inode) >>
-							dio->blkbits))
-				create = 0;
-		} else if (dio->lock_type == DIO_NO_LOCKING) {
+	BUG_ON(dio->block_in_file >= dio->final_block_in_request);
+	fs_startblk = dio->block_in_file >> dio->blkfactor;
+	dio_count = dio->final_block_in_request - dio->block_in_file;
+	fs_count = dio_count >> dio->blkfactor;
+	blkmask = (1 << dio->blkfactor) - 1;
+	if (dio_count & blkmask)
+		fs_count++;
+
+	map_bh->b_state = 0;
+	map_bh->b_size = fs_count << dio->inode->i_blkbits;
+
+	create = dio->rw & WRITE;
+	if (dio->lock_type == DIO_LOCKING) {
+		if (dio->block_in_file < (i_size_read(dio->inode) >>
+						dio->blkbits))
 			create = 0;
-		}
-
-		/*
-		 * For writes inside i_size we forbid block creations: only
-		 * overwrites are permitted.  We fall back to buffered writes
-		 * at a higher level for inside-i_size block-instantiating
-		 * writes.
-		 */
-		ret = (*dio->get_block)(dio->inode, fs_startblk,
-						map_bh, create);
+	} else if (dio->lock_type == DIO_NO_LOCKING) {
+		create = 0;
 	}
-	return ret;
+
+	/*
+	 * For writes inside i_size we forbid block creations: only
+	 * overwrites are permitted.  We fall back to buffered writes
+	 * at a higher level for inside-i_size block-instantiating
+	 * writes.
+	 */
+	return dio->get_block(dio->inode, fs_startblk, map_bh, create);
 }
 
 /*
@@ -567,8 +492,8 @@ static int dio_bio_add_page(struct dio *dio)
 {
 	int ret;
 
-	ret = bio_add_page(dio->bio, dio->cur_page,
-			dio->cur_page_len, dio->cur_page_offset);
+	ret = bio_add_page(dio->bio, dio->cur_page, dio->cur_page_len,
+				dio->cur_page_offset);
 	if (ret == dio->cur_page_len) {
 		/*
 		 * Decrement count only, if we are done with this page
@@ -804,6 +729,9 @@ static int do_direct_IO(struct dio *dio)
 			unsigned this_chunk_blocks;	/* # of blocks */
 			unsigned u;
 
+			offset_in_page += dio->first_page_off;
+			dio->first_page_off = 0;
+
 			if (dio->blocks_available == 0) {
 				/*
 				 * Need to go and map some more disk
@@ -933,13 +861,10 @@ direct_io_worker(struct kiocb *iocb, struct inode *inode,
 	struct dio_args *args, unsigned blkbits, get_block_t get_block,
 	dio_iodone_t end_io, struct dio *dio)
 {
-	const struct iovec *iov = args->iov;
-	unsigned long user_addr;
 	unsigned long flags;
-	int seg, rw = args->rw;
+	int rw = args->rw;
 	ssize_t ret = 0;
 	ssize_t ret2;
-	size_t bytes;
 
 	dio->inode = inode;
 	dio->rw = rw;
@@ -965,46 +890,25 @@ direct_io_worker(struct kiocb *iocb, struct inode *inode,
 	if (unlikely(dio->blkfactor))
 		dio->pages_in_io = 2;
 
-	for (seg = 0; seg < args->nr_segs; seg++) {
-		user_addr = (unsigned long) iov[seg].iov_base;
-		dio->pages_in_io +=
-			((user_addr+iov[seg].iov_len +PAGE_SIZE-1)/PAGE_SIZE
-				- user_addr/PAGE_SIZE);
-	}
+	dio->pages_in_io += args->nr_segs;
+	dio->size = args->length;
+	if (args->user_addr) {
+		dio->first_page_off = args->user_addr & ~PAGE_MASK;
+		dio->first_block_in_page = dio->first_page_off >> blkbits;
+		if (dio->first_block_in_page)
+			dio->first_page_off -= 1 << blkbits;
+	} else
+		dio->first_page_off = args->first_page_off;
 
-	for (seg = 0; seg < args->nr_segs; seg++) {
-		user_addr = (unsigned long)iov[seg].iov_base;
-		dio->size += bytes = iov[seg].iov_len;
-
-		/* Index into the first page of the first block */
-		dio->first_block_in_page = (user_addr & ~PAGE_MASK) >> blkbits;
-		dio->final_block_in_request = dio->block_in_file +
-						(bytes >> blkbits);
-		/* Page fetching state */
-		dio->head = 0;
-		dio->tail = 0;
-		dio->curr_page = 0;
-
-		dio->total_pages = 0;
-		if (user_addr & (PAGE_SIZE-1)) {
-			dio->total_pages++;
-			bytes -= PAGE_SIZE - (user_addr & (PAGE_SIZE - 1));
-		}
-		dio->total_pages += (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
-		dio->curr_user_address = user_addr;
-	
-		ret = do_direct_IO(dio);
+	dio->final_block_in_request = dio->block_in_file + (dio->size >> blkbits);
+	dio->head_page = 0;
+	dio->total_pages = args->nr_segs;
 
-		dio->result += iov[seg].iov_len -
+	ret = do_direct_IO(dio);
+
+	dio->result += args->length -
 			((dio->final_block_in_request - dio->block_in_file) <<
 					blkbits);
-
-		if (ret) {
-			dio_cleanup(dio);
-			break;
-		}
-	} /* end iovec loop */
-
 	if (ret == -ENOTBLK && (rw & WRITE)) {
 		/*
 		 * The remaining part of the request will be
@@ -1110,9 +1014,6 @@ __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	struct block_device *bdev, struct dio_args *args, get_block_t get_block,
 	dio_iodone_t end_io, int dio_lock_type)
 {
-	int seg;
-	size_t size;
-	unsigned long addr;
 	unsigned blkbits = inode->i_blkbits;
 	unsigned bdev_blkbits = 0;
 	unsigned blocksize_mask = (1 << blkbits) - 1;
@@ -1138,17 +1039,14 @@ __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	}
 
 	/* Check the memory alignment.  Blocks cannot straddle pages */
-	for (seg = 0; seg < args->nr_segs; seg++) {
-		addr = (unsigned long) args->iov[seg].iov_base;
-		size = args->iov[seg].iov_len;
-		end += size;
-		if ((addr & blocksize_mask) || (size & blocksize_mask))  {
-			if (bdev)
-				 blkbits = bdev_blkbits;
-			blocksize_mask = (1 << blkbits) - 1;
-			if ((addr & blocksize_mask) || (size & blocksize_mask))  
-				goto out;
-		}
+	if ((args->user_addr & blocksize_mask) ||
+	    (args->length & blocksize_mask))  {
+		if (bdev)
+			 blkbits = bdev_blkbits;
+		blocksize_mask = (1 << blkbits) - 1;
+		if ((args->user_addr & blocksize_mask) ||
+		    (args->length & blocksize_mask))
+			goto out;
 	}
 
 	dio = kzalloc(sizeof(*dio), GFP_KERNEL);
@@ -1156,6 +1054,8 @@ __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	if (!dio)
 		goto out;
 
+	dio->pages = args->pages;
+
 	/*
 	 * For block device access DIO_NO_LOCKING is used,
 	 *	neither readers nor writers do any locking at all
@@ -1232,20 +1132,70 @@ out:
 }
 EXPORT_SYMBOL(__blockdev_direct_IO);
 
-ssize_t generic_file_direct_IO(int rw, struct address_space *mapping,
-			       struct kiocb *iocb, const struct iovec *iov,
-			       loff_t offset, unsigned long nr_segs)
+static ssize_t __generic_file_direct_IO(int rw, struct address_space *mapping,
+					struct kiocb *iocb,
+					const struct iovec *iov, loff_t offset,
+					dio_io_actor *actor)
 {
+	struct page *stack_pages[UIO_FASTIOV];
+	unsigned long nr_pages, start, end;
 	struct dio_args args = {
-		.rw		= rw,
-		.iov		= iov,
-		.length		= iov_length(iov, nr_segs),
+		.pages		= stack_pages,
+		.length		= iov->iov_len,
+		.user_addr	= (unsigned long) iov->iov_base,
 		.offset		= offset,
-		.nr_segs	= nr_segs,
 	};
+	ssize_t ret;
+
+	end = (args.user_addr + iov->iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
+	start = args.user_addr >> PAGE_SHIFT;
+	nr_pages = end - start;
+
+	if (nr_pages >= UIO_FASTIOV) {
+		args.pages = kzalloc(nr_pages * sizeof(struct page *),
+					GFP_KERNEL);
+		if (!args.pages)
+			return -ENOMEM;
+	}
+
+	ret = get_user_pages_fast(args.user_addr, nr_pages, rw == READ,
+					args.pages);
+	if (ret > 0) {
+		args.nr_segs = ret;
+		ret = actor(iocb, &args);
+	}
 
-	if (mapping->a_ops->direct_IO)
-		return mapping->a_ops->direct_IO(iocb, &args);
+	if (args.pages != stack_pages)
+		kfree(args.pages);
 
-	return -EINVAL;
+	return ret;
+}
+
+/*
+ * Transform the iov into a page based structure for passing into the lower
+ * parts of O_DIRECT handling
+ */
+ssize_t generic_file_direct_IO(int rw, struct address_space *mapping,
+			       struct kiocb *kiocb, const struct iovec *iov,
+			       loff_t offset, unsigned long nr_segs,
+			       dio_io_actor *actor)
+{
+	ssize_t ret = 0, ret2;
+	unsigned long i;
+
+	for (i = 0; i < nr_segs; i++) {
+		ret2 = __generic_file_direct_IO(rw, mapping, kiocb, iov, offset,
+							actor);
+		if (ret2 < 0) {
+			if (!ret)
+				ret = ret2;
+			break;
+		}
+		iov++;
+		offset += ret2;
+		ret += ret2;
+	}
+
+	return ret;
 }
+EXPORT_SYMBOL_GPL(generic_file_direct_IO);
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 45d931b..d9da548 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -271,13 +271,12 @@ static const struct rpc_call_ops nfs_read_direct_ops = {
  * no requests have been sent, just return an error.
  */
 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
-						const struct iovec *iov,
-						loff_t pos)
+						struct dio_args *args)
 {
 	struct nfs_open_context *ctx = dreq->ctx;
 	struct inode *inode = ctx->path.dentry->d_inode;
-	unsigned long user_addr = (unsigned long)iov->iov_base;
-	size_t count = iov->iov_len;
+	unsigned long user_addr = args->user_addr;
+	size_t count = args->length;
 	size_t rsize = NFS_SERVER(inode)->rsize;
 	struct rpc_task *task;
 	struct rpc_message msg = {
@@ -306,24 +305,8 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
 		if (unlikely(!data))
 			break;
 
-		down_read(&current->mm->mmap_sem);
-		result = get_user_pages(current, current->mm, user_addr,
-					data->npages, 1, 0, data->pagevec, NULL);
-		up_read(&current->mm->mmap_sem);
-		if (result < 0) {
-			nfs_readdata_free(data);
-			break;
-		}
-		if ((unsigned)result < data->npages) {
-			bytes = result * PAGE_SIZE;
-			if (bytes <= pgbase) {
-				nfs_direct_release_pages(data->pagevec, result);
-				nfs_readdata_free(data);
-				break;
-			}
-			bytes -= pgbase;
-			data->npages = result;
-		}
+		data->pagevec = args->pages;
+		data->npages = args->nr_segs;
 
 		get_dreq(dreq);
 
@@ -332,7 +315,7 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
 		data->cred = msg.rpc_cred;
 		data->args.fh = NFS_FH(inode);
 		data->args.context = ctx;
-		data->args.offset = pos;
+		data->args.offset = args->offset;
 		data->args.pgbase = pgbase;
 		data->args.pages = data->pagevec;
 		data->args.count = bytes;
@@ -361,7 +344,7 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
 
 		started += bytes;
 		user_addr += bytes;
-		pos += bytes;
+		args->offset += bytes;
 		/* FIXME: Remove this unnecessary math from final patch */
 		pgbase += bytes;
 		pgbase &= ~PAGE_MASK;
@@ -376,26 +359,19 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
 }
 
 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
-					      const struct iovec *iov,
-					      unsigned long nr_segs,
-					      loff_t pos)
+					      struct dio_args *args)
 {
 	ssize_t result = -EINVAL;
 	size_t requested_bytes = 0;
-	unsigned long seg;
 
 	get_dreq(dreq);
 
-	for (seg = 0; seg < nr_segs; seg++) {
-		const struct iovec *vec = &iov[seg];
-		result = nfs_direct_read_schedule_segment(dreq, vec, pos);
-		if (result < 0)
-			break;
-		requested_bytes += result;
-		if ((size_t)result < vec->iov_len)
-			break;
-		pos += vec->iov_len;
-	}
+	result = nfs_direct_read_schedule_segment(dreq, args);
+	if (result < 0)
+		goto out;
+
+	requested_bytes += result;
+	args += result;
 
 	if (put_dreq(dreq))
 		nfs_direct_complete(dreq);
@@ -403,13 +379,13 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
 	if (requested_bytes != 0)
 		return 0;
 
+out:
 	if (result < 0)
 		return result;
 	return -EIO;
 }
 
-static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
-			       unsigned long nr_segs, loff_t pos)
+static ssize_t nfs_direct_read(struct kiocb *iocb, struct dio_args *args)
 {
 	ssize_t result = 0;
 	struct inode *inode = iocb->ki_filp->f_mapping->host;
@@ -424,7 +400,7 @@ static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
 	if (!is_sync_kiocb(iocb))
 		dreq->iocb = iocb;
 
-	result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos);
+	result = nfs_direct_read_schedule_iovec(dreq, args);
 	if (!result)
 		result = nfs_direct_wait(dreq);
 	nfs_direct_req_release(dreq);
@@ -691,13 +667,13 @@ static const struct rpc_call_ops nfs_write_direct_ops = {
  * no requests have been sent, just return an error.
  */
 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
-						 const struct iovec *iov,
-						 loff_t pos, int sync)
+						 struct dio_args *args,
+						 int sync)
 {
 	struct nfs_open_context *ctx = dreq->ctx;
 	struct inode *inode = ctx->path.dentry->d_inode;
-	unsigned long user_addr = (unsigned long)iov->iov_base;
-	size_t count = iov->iov_len;
+	unsigned long user_addr = args->user_addr;
+	size_t count = args->length;
 	struct rpc_task *task;
 	struct rpc_message msg = {
 		.rpc_cred = ctx->cred,
@@ -726,24 +702,8 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
 		if (unlikely(!data))
 			break;
 
-		down_read(&current->mm->mmap_sem);
-		result = get_user_pages(current, current->mm, user_addr,
-					data->npages, 0, 0, data->pagevec, NULL);
-		up_read(&current->mm->mmap_sem);
-		if (result < 0) {
-			nfs_writedata_free(data);
-			break;
-		}
-		if ((unsigned)result < data->npages) {
-			bytes = result * PAGE_SIZE;
-			if (bytes <= pgbase) {
-				nfs_direct_release_pages(data->pagevec, result);
-				nfs_writedata_free(data);
-				break;
-			}
-			bytes -= pgbase;
-			data->npages = result;
-		}
+		data->pagevec = args->pages;
+		data->npages = args->nr_segs;
 
 		get_dreq(dreq);
 
@@ -754,7 +714,7 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
 		data->cred = msg.rpc_cred;
 		data->args.fh = NFS_FH(inode);
 		data->args.context = ctx;
-		data->args.offset = pos;
+		data->args.offset = args->offset;
 		data->args.pgbase = pgbase;
 		data->args.pages = data->pagevec;
 		data->args.count = bytes;
@@ -784,7 +744,7 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
 
 		started += bytes;
 		user_addr += bytes;
-		pos += bytes;
+		args->offset += bytes;
 
 		/* FIXME: Remove this useless math from the final patch */
 		pgbase += bytes;
@@ -800,27 +760,19 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
 }
 
 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
-					       const struct iovec *iov,
-					       unsigned long nr_segs,
-					       loff_t pos, int sync)
+					       struct dio_args *args, int sync)
 {
 	ssize_t result = 0;
 	size_t requested_bytes = 0;
-	unsigned long seg;
 
 	get_dreq(dreq);
 
-	for (seg = 0; seg < nr_segs; seg++) {
-		const struct iovec *vec = &iov[seg];
-		result = nfs_direct_write_schedule_segment(dreq, vec,
-							   pos, sync);
-		if (result < 0)
-			break;
-		requested_bytes += result;
-		if ((size_t)result < vec->iov_len)
-			break;
-		pos += vec->iov_len;
-	}
+	result = nfs_direct_write_schedule_segment(dreq, args, sync);
+	if (result < 0)
+		goto out;
+
+	requested_bytes += result;
+	args->offset += result;
 
 	if (put_dreq(dreq))
 		nfs_direct_write_complete(dreq, dreq->inode);
@@ -828,14 +780,13 @@ static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
 	if (requested_bytes != 0)
 		return 0;
 
+out:
 	if (result < 0)
 		return result;
 	return -EIO;
 }
 
-static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
-				unsigned long nr_segs, loff_t pos,
-				size_t count)
+static ssize_t nfs_direct_write(struct kiocb *iocb, struct dio_args *args)
 {
 	ssize_t result = 0;
 	struct inode *inode = iocb->ki_filp->f_mapping->host;
@@ -848,7 +799,7 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
 		return -ENOMEM;
 	nfs_alloc_commit_data(dreq);
 
-	if (dreq->commit_data == NULL || count < wsize)
+	if (dreq->commit_data == NULL || args->length < wsize)
 		sync = NFS_FILE_SYNC;
 
 	dreq->inode = inode;
@@ -856,7 +807,7 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
 	if (!is_sync_kiocb(iocb))
 		dreq->iocb = iocb;
 
-	result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync);
+	result = nfs_direct_write_schedule_iovec(dreq, args, sync);
 	if (!result)
 		result = nfs_direct_wait(dreq);
 	nfs_direct_req_release(dreq);
@@ -867,9 +818,7 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
 /**
  * nfs_file_direct_read - file direct read operation for NFS files
  * @iocb: target I/O control block
- * @iov: vector of user buffers into which to read data
- * @nr_segs: size of iov vector
- * @pos: byte offset in file where reading starts
+ * @args: direct IO arguments
  *
  * We use this function for direct reads instead of calling
  * generic_file_aio_read() in order to avoid gfar's check to see if
@@ -885,21 +834,20 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
  * client must read the updated atime from the server back into its
  * cache.
  */
-ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
-				unsigned long nr_segs, loff_t pos)
+static ssize_t nfs_file_direct_read(struct kiocb *iocb, struct dio_args *args)
 {
 	ssize_t retval = -EINVAL;
 	struct file *file = iocb->ki_filp;
 	struct address_space *mapping = file->f_mapping;
 	size_t count;
 
-	count = iov_length(iov, nr_segs);
+	count = args->length;
 	nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
 
 	dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n",
 		file->f_path.dentry->d_parent->d_name.name,
 		file->f_path.dentry->d_name.name,
-		count, (long long) pos);
+		count, (long long) args->offset);
 
 	retval = 0;
 	if (!count)
@@ -909,9 +857,9 @@ ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
 	if (retval)
 		goto out;
 
-	retval = nfs_direct_read(iocb, iov, nr_segs, pos);
+	retval = nfs_direct_read(iocb, args);
 	if (retval > 0)
-		iocb->ki_pos = pos + retval;
+		iocb->ki_pos = args->offset + retval;
 
 out:
 	return retval;
@@ -920,9 +868,7 @@ out:
 /**
  * nfs_file_direct_write - file direct write operation for NFS files
  * @iocb: target I/O control block
- * @iov: vector of user buffers from which to write data
- * @nr_segs: size of iov vector
- * @pos: byte offset in file where writing starts
+ * @args: direct IO arguments
  *
  * We use this function for direct writes instead of calling
  * generic_file_aio_write() in order to avoid taking the inode
@@ -942,23 +888,22 @@ out:
  * Note that O_APPEND is not supported for NFS direct writes, as there
  * is no atomic O_APPEND write facility in the NFS protocol.
  */
-ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
-				unsigned long nr_segs, loff_t pos)
+static ssize_t nfs_file_direct_write(struct kiocb *iocb, struct dio_args *args)
 {
 	ssize_t retval = -EINVAL;
 	struct file *file = iocb->ki_filp;
 	struct address_space *mapping = file->f_mapping;
 	size_t count;
 
-	count = iov_length(iov, nr_segs);
+	count = args->length;
 	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
 
 	dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n",
 		file->f_path.dentry->d_parent->d_name.name,
 		file->f_path.dentry->d_name.name,
-		count, (long long) pos);
+		count, (long long) args->offset);
 
-	retval = generic_write_checks(file, &pos, &count, 0);
+	retval = generic_write_checks(file, &args->offset, &count, 0);
 	if (retval)
 		goto out;
 
@@ -973,15 +918,23 @@ ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
 	if (retval)
 		goto out;
 
-	retval = nfs_direct_write(iocb, iov, nr_segs, pos, count);
+	retval = nfs_direct_write(iocb, args);
 
 	if (retval > 0)
-		iocb->ki_pos = pos + retval;
+		iocb->ki_pos = args->offset + retval;
 
 out:
 	return retval;
 }
 
+ssize_t nfs_file_direct_io(struct kiocb *kiocb, struct dio_args *args)
+{
+	if (args->rw == READ)
+		return nfs_file_direct_read(kiocb, args);
+
+	return nfs_file_direct_write(kiocb, args);
+}
+
 /**
  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
  *
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index 0506232..97d8cc7 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -249,13 +249,15 @@ static ssize_t
 nfs_file_read(struct kiocb *iocb, const struct iovec *iov,
 		unsigned long nr_segs, loff_t pos)
 {
+	struct address_space *mapping = iocb->ki_filp->f_mapping;
 	struct dentry * dentry = iocb->ki_filp->f_path.dentry;
 	struct inode * inode = dentry->d_inode;
 	ssize_t result;
 	size_t count = iov_length(iov, nr_segs);
 
 	if (iocb->ki_filp->f_flags & O_DIRECT)
-		return nfs_file_direct_read(iocb, iov, nr_segs, pos);
+		return generic_file_direct_IO(READ, mapping, iocb, iov, pos,
+						nr_segs, nfs_file_direct_io);
 
 	dprintk("NFS: read(%s/%s, %lu@%lu)\n",
 		dentry->d_parent->d_name.name, dentry->d_name.name,
@@ -546,13 +548,15 @@ static int nfs_need_sync_write(struct file *filp, struct inode *inode)
 static ssize_t nfs_file_write(struct kiocb *iocb, const struct iovec *iov,
 				unsigned long nr_segs, loff_t pos)
 {
+	struct address_space *mapping = iocb->ki_filp->f_mapping;
 	struct dentry * dentry = iocb->ki_filp->f_path.dentry;
 	struct inode * inode = dentry->d_inode;
 	ssize_t result;
 	size_t count = iov_length(iov, nr_segs);
 
 	if (iocb->ki_filp->f_flags & O_DIRECT)
-		return nfs_file_direct_write(iocb, iov, nr_segs, pos);
+		return generic_file_direct_IO(WRITE, mapping, iocb, iov, pos,
+						nr_segs, nfs_file_direct_io);
 
 	dprintk("NFS: write(%s/%s, %lu@%Ld)\n",
 		dentry->d_parent->d_name.name, dentry->d_name.name,
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 5971116..539994a 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -2247,18 +2247,27 @@ static inline int xip_truncate_page(struct address_space *mapping, loff_t from)
  */
 struct dio_args {
 	int rw;
-	const struct iovec *iov;
+	struct page **pages;
+	unsigned int first_page_off;
+	unsigned long nr_segs;
 	unsigned long length;
 	loff_t offset;
-	unsigned long nr_segs;
+
+	/*
+	 * Original user pointer, we'll get rid of this
+	 */
+	unsigned long user_addr;
 };
 
 ssize_t __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	struct block_device *bdev, struct dio_args *args, get_block_t get_block,
 	dio_iodone_t end_io, int lock_type);
 
+typedef ssize_t (dio_io_actor)(struct kiocb *, struct dio_args *);
+
 ssize_t generic_file_direct_IO(int, struct address_space *, struct kiocb *,
-				const struct iovec *, loff_t, unsigned long);
+				const struct iovec *, loff_t, unsigned long,
+				dio_io_actor);
 
 enum {
 	DIO_LOCKING = 1, /* need locking between buffered and direct access */
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index 97a2383..ded8337 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -409,12 +409,7 @@ extern int nfs3_removexattr (struct dentry *, const char *name);
  * linux/fs/nfs/direct.c
  */
 extern ssize_t nfs_direct_IO(struct kiocb *, struct dio_args *);
-extern ssize_t nfs_file_direct_read(struct kiocb *iocb,
-			const struct iovec *iov, unsigned long nr_segs,
-			loff_t pos);
-extern ssize_t nfs_file_direct_write(struct kiocb *iocb,
-			const struct iovec *iov, unsigned long nr_segs,
-			loff_t pos);
+extern ssize_t nfs_file_direct_io(struct kiocb *, struct dio_args *);
 
 /*
  * linux/fs/nfs/dir.c
diff --git a/mm/filemap.c b/mm/filemap.c
index cf85298..3e03021 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -1346,8 +1346,8 @@ generic_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
 					pos + iov_length(iov, nr_segs) - 1);
 			if (!retval) {
 				retval = generic_file_direct_IO(READ, mapping,
-								iocb, iov,
-								pos, nr_segs);
+						iocb, iov, pos, nr_segs,
+						mapping->a_ops->direct_IO);
 			}
 			if (retval > 0)
 				*ppos = pos + retval;
@@ -2146,7 +2146,7 @@ generic_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
 	}
 
 	written = generic_file_direct_IO(WRITE, mapping, iocb, iov, pos,
-						*nr_segs);
+					*nr_segs, mapping->a_ops->direct_IO);
 
 	/*
 	 * Finally, try again to invalidate clean pages which might have been
-- 
1.6.4.53.g3f55e


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 0/6] Lazy workqueues
@ 2009-08-20 10:17 Jens Axboe
  2009-08-20 10:17 ` [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure Jens Axboe
  0 siblings, 1 reply; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan

Hi,

After yesterdays rant on having too many kernel threads and checking
how many I actually have running on this system (531!), I decided to 
try and do something about it.

My goal was to retain the workqueue interface instead of coming up with
a new scheme that required conversion (or converting to slow_work which,
btw, is an awful name :-). I also wanted to retain the affinity
guarantees of workqueues as much as possible.

So this is a first step in that direction, it's probably full of races
and holes, but should get the idea across. It adds a
create_lazy_workqueue() helper, similar to the other variants that we
currently have. A lazy workqueue works like a normal workqueue, except
that it only (by default) starts a core thread instead of threads for
all online CPUs. When work is queued on a lazy workqueue for a CPU
that doesn't have a thread running, it will be placed on the core CPUs
list and that will then create and move the work to the right target.
Should task creation fail, the queued work will be executed on the
core CPU instead. Once a lazy workqueue thread has been idle for a
certain amount of time, it will again exit.

The patch boots here and I exercised the rpciod workqueue and
verified that it gets created, runs on the right CPU, and exits a while
later. So core functionality should be there, even if it has holes.

With this patchset, I am now down to 280 kernel threads on one of my test
boxes. Still too many, but it's a start and a net reduction of 251
threads here, or 47%!

The code can also be pulled from:

  git://git.kernel.dk/linux-2.6-block.git workqueue

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure
  2009-08-20 10:17 [PATCH 0/6] Lazy workqueues Jens Axboe
@ 2009-08-20 10:17 ` Jens Axboe
  2009-08-20 10:17   ` [PATCH 1/6] workqueue: replace singlethread/freezable/rt parameters and variables with flags Jens Axboe
  2009-08-21  0:03   ` [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure Andrew Morton
  0 siblings, 2 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

The O_DIRECT IO path is a mess of arguments. Clean that up by passing
those arguments in a dedicated dio_args structure.

This is in preparation for changing the internal implementation to be
page based instead of using iovecs.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 fs/block_dev.c              |    7 ++--
 fs/btrfs/inode.c            |    4 +--
 fs/direct-io.c              |   70 +++++++++++++++++++++++++++----------------
 fs/ext2/inode.c             |    8 ++---
 fs/ext3/inode.c             |   15 ++++-----
 fs/ext4/inode.c             |   15 ++++-----
 fs/fat/inode.c              |   12 +++----
 fs/gfs2/aops.c              |   11 ++----
 fs/hfs/inode.c              |    7 ++--
 fs/hfsplus/inode.c          |    8 ++--
 fs/jfs/inode.c              |    7 ++--
 fs/nfs/direct.c             |    9 ++----
 fs/nilfs2/inode.c           |    9 ++---
 fs/ocfs2/aops.c             |   11 ++-----
 fs/reiserfs/inode.c         |    7 +---
 fs/xfs/linux-2.6/xfs_aops.c |   19 ++++--------
 include/linux/fs.h          |   59 +++++++++++++++++++++---------------
 include/linux/nfs_fs.h      |    3 +-
 mm/filemap.c                |    8 +++--
 19 files changed, 141 insertions(+), 148 deletions(-)

diff --git a/fs/block_dev.c b/fs/block_dev.c
index 94dfda2..2e494fa 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -166,14 +166,13 @@ blkdev_get_blocks(struct inode *inode, sector_t iblock,
 }
 
 static ssize_t
-blkdev_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov,
-			loff_t offset, unsigned long nr_segs)
+blkdev_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
 
-	return blockdev_direct_IO_no_locking(rw, iocb, inode, I_BDEV(inode),
-				iov, offset, nr_segs, blkdev_get_blocks, NULL);
+	return blockdev_direct_IO_no_locking(iocb, inode, I_BDEV(inode),
+				args, blkdev_get_blocks, NULL);
 }
 
 int __sync_blockdev(struct block_device *bdev, int wait)
diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c
index 272b9b2..094e3a7 100644
--- a/fs/btrfs/inode.c
+++ b/fs/btrfs/inode.c
@@ -4308,9 +4308,7 @@ out:
 	return em;
 }
 
-static ssize_t btrfs_direct_IO(int rw, struct kiocb *iocb,
-			const struct iovec *iov, loff_t offset,
-			unsigned long nr_segs)
+static ssize_t btrfs_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	return -EINVAL;
 }
diff --git a/fs/direct-io.c b/fs/direct-io.c
index 8b10b87..181848c 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -929,14 +929,14 @@ out:
  * Releases both i_mutex and i_alloc_sem
  */
 static ssize_t
-direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, 
-	const struct iovec *iov, loff_t offset, unsigned long nr_segs, 
-	unsigned blkbits, get_block_t get_block, dio_iodone_t end_io,
-	struct dio *dio)
+direct_io_worker(struct kiocb *iocb, struct inode *inode, 
+	struct dio_args *args, unsigned blkbits, get_block_t get_block,
+	dio_iodone_t end_io, struct dio *dio)
 {
-	unsigned long user_addr; 
+	const struct iovec *iov = args->iov;
+	unsigned long user_addr;
 	unsigned long flags;
-	int seg;
+	int seg, rw = args->rw;
 	ssize_t ret = 0;
 	ssize_t ret2;
 	size_t bytes;
@@ -945,7 +945,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 	dio->rw = rw;
 	dio->blkbits = blkbits;
 	dio->blkfactor = inode->i_blkbits - blkbits;
-	dio->block_in_file = offset >> blkbits;
+	dio->block_in_file = args->offset >> blkbits;
 
 	dio->get_block = get_block;
 	dio->end_io = end_io;
@@ -965,14 +965,14 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 	if (unlikely(dio->blkfactor))
 		dio->pages_in_io = 2;
 
-	for (seg = 0; seg < nr_segs; seg++) {
-		user_addr = (unsigned long)iov[seg].iov_base;
+	for (seg = 0; seg < args->nr_segs; seg++) {
+		user_addr = (unsigned long) iov[seg].iov_base;
 		dio->pages_in_io +=
 			((user_addr+iov[seg].iov_len +PAGE_SIZE-1)/PAGE_SIZE
 				- user_addr/PAGE_SIZE);
 	}
 
-	for (seg = 0; seg < nr_segs; seg++) {
+	for (seg = 0; seg < args->nr_segs; seg++) {
 		user_addr = (unsigned long)iov[seg].iov_base;
 		dio->size += bytes = iov[seg].iov_len;
 
@@ -1076,7 +1076,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 	spin_unlock_irqrestore(&dio->bio_lock, flags);
 
 	if (ret2 == 0) {
-		ret = dio_complete(dio, offset, ret);
+		ret = dio_complete(dio, args->offset, ret);
 		kfree(dio);
 	} else
 		BUG_ON(ret != -EIOCBQUEUED);
@@ -1106,10 +1106,9 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
  * Additional i_alloc_sem locking requirements described inline below.
  */
 ssize_t
-__blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
-	struct block_device *bdev, const struct iovec *iov, loff_t offset, 
-	unsigned long nr_segs, get_block_t get_block, dio_iodone_t end_io,
-	int dio_lock_type)
+__blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
+	struct block_device *bdev, struct dio_args *args, get_block_t get_block,
+	dio_iodone_t end_io, int dio_lock_type)
 {
 	int seg;
 	size_t size;
@@ -1118,10 +1117,11 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
 	unsigned bdev_blkbits = 0;
 	unsigned blocksize_mask = (1 << blkbits) - 1;
 	ssize_t retval = -EINVAL;
-	loff_t end = offset;
+	loff_t end = args->offset;
 	struct dio *dio;
 	int release_i_mutex = 0;
 	int acquire_i_mutex = 0;
+	int rw = args->rw;
 
 	if (rw & WRITE)
 		rw = WRITE_ODIRECT;
@@ -1129,18 +1129,18 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
 	if (bdev)
 		bdev_blkbits = blksize_bits(bdev_logical_block_size(bdev));
 
-	if (offset & blocksize_mask) {
+	if (args->offset & blocksize_mask) {
 		if (bdev)
 			 blkbits = bdev_blkbits;
 		blocksize_mask = (1 << blkbits) - 1;
-		if (offset & blocksize_mask)
+		if (args->offset & blocksize_mask)
 			goto out;
 	}
 
 	/* Check the memory alignment.  Blocks cannot straddle pages */
-	for (seg = 0; seg < nr_segs; seg++) {
-		addr = (unsigned long)iov[seg].iov_base;
-		size = iov[seg].iov_len;
+	for (seg = 0; seg < args->nr_segs; seg++) {
+		addr = (unsigned long) args->iov[seg].iov_base;
+		size = args->iov[seg].iov_len;
 		end += size;
 		if ((addr & blocksize_mask) || (size & blocksize_mask))  {
 			if (bdev)
@@ -1168,7 +1168,7 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
 	dio->lock_type = dio_lock_type;
 	if (dio_lock_type != DIO_NO_LOCKING) {
 		/* watch out for a 0 len io from a tricksy fs */
-		if (rw == READ && end > offset) {
+		if (rw == READ && end > args->offset) {
 			struct address_space *mapping;
 
 			mapping = iocb->ki_filp->f_mapping;
@@ -1177,8 +1177,8 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
 				release_i_mutex = 1;
 			}
 
-			retval = filemap_write_and_wait_range(mapping, offset,
-							      end - 1);
+			retval = filemap_write_and_wait_range(mapping,
+							args->offset, end - 1);
 			if (retval) {
 				kfree(dio);
 				goto out;
@@ -1204,8 +1204,8 @@ __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
 	dio->is_async = !is_sync_kiocb(iocb) && !((rw & WRITE) &&
 		(end > i_size_read(inode)));
 
-	retval = direct_io_worker(rw, iocb, inode, iov, offset,
-				nr_segs, blkbits, get_block, end_io, dio);
+	retval = direct_io_worker(iocb, inode, args, blkbits, get_block, end_io,
+					dio);
 
 	/*
 	 * In case of error extending write may have instantiated a few
@@ -1231,3 +1231,21 @@ out:
 	return retval;
 }
 EXPORT_SYMBOL(__blockdev_direct_IO);
+
+ssize_t generic_file_direct_IO(int rw, struct address_space *mapping,
+			       struct kiocb *iocb, const struct iovec *iov,
+			       loff_t offset, unsigned long nr_segs)
+{
+	struct dio_args args = {
+		.rw		= rw,
+		.iov		= iov,
+		.length		= iov_length(iov, nr_segs),
+		.offset		= offset,
+		.nr_segs	= nr_segs,
+	};
+
+	if (mapping->a_ops->direct_IO)
+		return mapping->a_ops->direct_IO(iocb, &args);
+
+	return -EINVAL;
+}
diff --git a/fs/ext2/inode.c b/fs/ext2/inode.c
index e271303..e813df7 100644
--- a/fs/ext2/inode.c
+++ b/fs/ext2/inode.c
@@ -790,15 +790,13 @@ static sector_t ext2_bmap(struct address_space *mapping, sector_t block)
 	return generic_block_bmap(mapping,block,ext2_get_block);
 }
 
-static ssize_t
-ext2_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov,
-			loff_t offset, unsigned long nr_segs)
+static ssize_t ext2_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
 
-	return blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				offset, nr_segs, ext2_get_block, NULL);
+	return blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
+					ext2_get_block, NULL);
 }
 
 static int
diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c
index b49908a..11dc0d1 100644
--- a/fs/ext3/inode.c
+++ b/fs/ext3/inode.c
@@ -1713,9 +1713,7 @@ static int ext3_releasepage(struct page *page, gfp_t wait)
  * crashes then stale disk data _may_ be exposed inside the file. But current
  * VFS code falls back into buffered path in that case so we are safe.
  */
-static ssize_t ext3_direct_IO(int rw, struct kiocb *iocb,
-			const struct iovec *iov, loff_t offset,
-			unsigned long nr_segs)
+static ssize_t ext3_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
@@ -1723,10 +1721,10 @@ static ssize_t ext3_direct_IO(int rw, struct kiocb *iocb,
 	handle_t *handle;
 	ssize_t ret;
 	int orphan = 0;
-	size_t count = iov_length(iov, nr_segs);
+	size_t count = args->length;
 
-	if (rw == WRITE) {
-		loff_t final_size = offset + count;
+	if (args->rw == WRITE) {
+		loff_t final_size = args->offset + count;
 
 		if (final_size > inode->i_size) {
 			/* Credits for sb + inode write */
@@ -1746,8 +1744,7 @@ static ssize_t ext3_direct_IO(int rw, struct kiocb *iocb,
 		}
 	}
 
-	ret = blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				 offset, nr_segs,
+	ret = blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
 				 ext3_get_block, NULL);
 
 	if (orphan) {
@@ -1765,7 +1762,7 @@ static ssize_t ext3_direct_IO(int rw, struct kiocb *iocb,
 		if (inode->i_nlink)
 			ext3_orphan_del(handle, inode);
 		if (ret > 0) {
-			loff_t end = offset + ret;
+			loff_t end = args->offset + ret;
 			if (end > inode->i_size) {
 				ei->i_disksize = end;
 				i_size_write(inode, end);
diff --git a/fs/ext4/inode.c b/fs/ext4/inode.c
index f9c642b..164fdb3 100644
--- a/fs/ext4/inode.c
+++ b/fs/ext4/inode.c
@@ -3267,9 +3267,7 @@ static int ext4_releasepage(struct page *page, gfp_t wait)
  * crashes then stale disk data _may_ be exposed inside the file. But current
  * VFS code falls back into buffered path in that case so we are safe.
  */
-static ssize_t ext4_direct_IO(int rw, struct kiocb *iocb,
-			      const struct iovec *iov, loff_t offset,
-			      unsigned long nr_segs)
+static ssize_t ext4_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
@@ -3277,10 +3275,10 @@ static ssize_t ext4_direct_IO(int rw, struct kiocb *iocb,
 	handle_t *handle;
 	ssize_t ret;
 	int orphan = 0;
-	size_t count = iov_length(iov, nr_segs);
+	size_t count = args->length;
 
-	if (rw == WRITE) {
-		loff_t final_size = offset + count;
+	if (args->rw == WRITE) {
+		loff_t final_size = args->offset + count;
 
 		if (final_size > inode->i_size) {
 			/* Credits for sb + inode write */
@@ -3300,8 +3298,7 @@ static ssize_t ext4_direct_IO(int rw, struct kiocb *iocb,
 		}
 	}
 
-	ret = blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				 offset, nr_segs,
+	ret = blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
 				 ext4_get_block, NULL);
 
 	if (orphan) {
@@ -3319,7 +3316,7 @@ static ssize_t ext4_direct_IO(int rw, struct kiocb *iocb,
 		if (inode->i_nlink)
 			ext4_orphan_del(handle, inode);
 		if (ret > 0) {
-			loff_t end = offset + ret;
+			loff_t end = args->offset + ret;
 			if (end > inode->i_size) {
 				ei->i_disksize = end;
 				i_size_write(inode, end);
diff --git a/fs/fat/inode.c b/fs/fat/inode.c
index 8970d8c..9d41851 100644
--- a/fs/fat/inode.c
+++ b/fs/fat/inode.c
@@ -167,14 +167,12 @@ static int fat_write_end(struct file *file, struct address_space *mapping,
 	return err;
 }
 
-static ssize_t fat_direct_IO(int rw, struct kiocb *iocb,
-			     const struct iovec *iov,
-			     loff_t offset, unsigned long nr_segs)
+static ssize_t fat_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
 
-	if (rw == WRITE) {
+	if (args->rw == WRITE) {
 		/*
 		 * FIXME: blockdev_direct_IO() doesn't use ->write_begin(),
 		 * so we need to update the ->mmu_private to block boundary.
@@ -184,7 +182,7 @@ static ssize_t fat_direct_IO(int rw, struct kiocb *iocb,
 		 *
 		 * Return 0, and fallback to normal buffered write.
 		 */
-		loff_t size = offset + iov_length(iov, nr_segs);
+		loff_t size = args->offset + args->length;
 		if (MSDOS_I(inode)->mmu_private < size)
 			return 0;
 	}
@@ -193,8 +191,8 @@ static ssize_t fat_direct_IO(int rw, struct kiocb *iocb,
 	 * FAT need to use the DIO_LOCKING for avoiding the race
 	 * condition of fat_get_block() and ->truncate().
 	 */
-	return blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				  offset, nr_segs, fat_get_block, NULL);
+	return blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
+				  fat_get_block, NULL);
 }
 
 static sector_t _fat_bmap(struct address_space *mapping, sector_t block)
diff --git a/fs/gfs2/aops.c b/fs/gfs2/aops.c
index 7ebae9a..a9422a2 100644
--- a/fs/gfs2/aops.c
+++ b/fs/gfs2/aops.c
@@ -1021,9 +1021,7 @@ static int gfs2_ok_for_dio(struct gfs2_inode *ip, int rw, loff_t offset)
 
 
 
-static ssize_t gfs2_direct_IO(int rw, struct kiocb *iocb,
-			      const struct iovec *iov, loff_t offset,
-			      unsigned long nr_segs)
+static ssize_t gfs2_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
@@ -1043,13 +1041,12 @@ static ssize_t gfs2_direct_IO(int rw, struct kiocb *iocb,
 	rv = gfs2_glock_nq(&gh);
 	if (rv)
 		return rv;
-	rv = gfs2_ok_for_dio(ip, rw, offset);
+	rv = gfs2_ok_for_dio(ip, args->rw, args->offset);
 	if (rv != 1)
 		goto out; /* dio not valid, fall back to buffered i/o */
 
-	rv = blockdev_direct_IO_no_locking(rw, iocb, inode, inode->i_sb->s_bdev,
-					   iov, offset, nr_segs,
-					   gfs2_get_block_direct, NULL);
+	rv = blockdev_direct_IO_no_locking(iocb, inode, inode->i_sb->s_bdev,
+					   args, gfs2_get_block_direct, NULL);
 out:
 	gfs2_glock_dq_m(1, &gh);
 	gfs2_holder_uninit(&gh);
diff --git a/fs/hfs/inode.c b/fs/hfs/inode.c
index a1cbff2..2998914 100644
--- a/fs/hfs/inode.c
+++ b/fs/hfs/inode.c
@@ -107,14 +107,13 @@ static int hfs_releasepage(struct page *page, gfp_t mask)
 	return res ? try_to_free_buffers(page) : 0;
 }
 
-static ssize_t hfs_direct_IO(int rw, struct kiocb *iocb,
-		const struct iovec *iov, loff_t offset, unsigned long nr_segs)
+static ssize_t hfs_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_path.dentry->d_inode->i_mapping->host;
 
-	return blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				  offset, nr_segs, hfs_get_block, NULL);
+	return blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
+				  hfs_get_block, NULL);
 }
 
 static int hfs_writepages(struct address_space *mapping,
diff --git a/fs/hfsplus/inode.c b/fs/hfsplus/inode.c
index 1bcf597..dd7102b 100644
--- a/fs/hfsplus/inode.c
+++ b/fs/hfsplus/inode.c
@@ -100,14 +100,14 @@ static int hfsplus_releasepage(struct page *page, gfp_t mask)
 	return res ? try_to_free_buffers(page) : 0;
 }
 
-static ssize_t hfsplus_direct_IO(int rw, struct kiocb *iocb,
-		const struct iovec *iov, loff_t offset, unsigned long nr_segs)
+static ssize_t hfsplus_direct_IO(struct kiocb *iocb,
+				 struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_path.dentry->d_inode->i_mapping->host;
 
-	return blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				  offset, nr_segs, hfsplus_get_block, NULL);
+	return blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
+				  hfsplus_get_block, NULL);
 }
 
 static int hfsplus_writepages(struct address_space *mapping,
diff --git a/fs/jfs/inode.c b/fs/jfs/inode.c
index b2ae190..e1420de 100644
--- a/fs/jfs/inode.c
+++ b/fs/jfs/inode.c
@@ -306,14 +306,13 @@ static sector_t jfs_bmap(struct address_space *mapping, sector_t block)
 	return generic_block_bmap(mapping, block, jfs_get_block);
 }
 
-static ssize_t jfs_direct_IO(int rw, struct kiocb *iocb,
-	const struct iovec *iov, loff_t offset, unsigned long nr_segs)
+static ssize_t jfs_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
 
-	return blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				offset, nr_segs, jfs_get_block, NULL);
+	return blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
+					jfs_get_block, NULL);
 }
 
 const struct address_space_operations jfs_aops = {
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index e4e089a..45d931b 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -103,21 +103,18 @@ static inline int put_dreq(struct nfs_direct_req *dreq)
 /**
  * nfs_direct_IO - NFS address space operation for direct I/O
  * @rw: direction (read or write)
- * @iocb: target I/O control block
- * @iov: array of vectors that define I/O buffer
- * @pos: offset in file to begin the operation
- * @nr_segs: size of iovec array
+ * @args: IO arguments
  *
  * The presence of this routine in the address space ops vector means
  * the NFS client supports direct I/O.  However, we shunt off direct
  * read and write requests before the VFS gets them, so this method
  * should never be called.
  */
-ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
+ssize_t nfs_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
 			iocb->ki_filp->f_path.dentry->d_name.name,
-			(long long) pos, nr_segs);
+			(long long) args->offset, args->nr_segs);
 
 	return -EINVAL;
 }
diff --git a/fs/nilfs2/inode.c b/fs/nilfs2/inode.c
index fe9d8f2..840c307 100644
--- a/fs/nilfs2/inode.c
+++ b/fs/nilfs2/inode.c
@@ -222,19 +222,18 @@ static int nilfs_write_end(struct file *file, struct address_space *mapping,
 }
 
 static ssize_t
-nilfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov,
-		loff_t offset, unsigned long nr_segs)
+nilfs_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
 	ssize_t size;
 
-	if (rw == WRITE)
+	if (args->rw == WRITE)
 		return 0;
 
 	/* Needs synchronization with the cleaner */
-	size = blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				  offset, nr_segs, nilfs_get_block, NULL);
+	size = blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
+				  nilfs_get_block, NULL);
 	return size;
 }
 
diff --git a/fs/ocfs2/aops.c b/fs/ocfs2/aops.c
index b401654..56e61ba 100644
--- a/fs/ocfs2/aops.c
+++ b/fs/ocfs2/aops.c
@@ -668,11 +668,7 @@ static int ocfs2_releasepage(struct page *page, gfp_t wait)
 	return jbd2_journal_try_to_free_buffers(journal, page, wait);
 }
 
-static ssize_t ocfs2_direct_IO(int rw,
-			       struct kiocb *iocb,
-			       const struct iovec *iov,
-			       loff_t offset,
-			       unsigned long nr_segs)
+static ssize_t ocfs2_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_path.dentry->d_inode->i_mapping->host;
@@ -687,9 +683,8 @@ static ssize_t ocfs2_direct_IO(int rw,
 	if (OCFS2_I(inode)->ip_dyn_features & OCFS2_INLINE_DATA_FL)
 		return 0;
 
-	ret = blockdev_direct_IO_no_locking(rw, iocb, inode,
-					    inode->i_sb->s_bdev, iov, offset,
-					    nr_segs, 
+	ret = blockdev_direct_IO_no_locking(iocb, inode,
+					    inode->i_sb->s_bdev, args,
 					    ocfs2_direct_IO_get_blocks,
 					    ocfs2_dio_end_io);
 
diff --git a/fs/reiserfs/inode.c b/fs/reiserfs/inode.c
index a14d6cd..201e6ca 100644
--- a/fs/reiserfs/inode.c
+++ b/fs/reiserfs/inode.c
@@ -3025,15 +3025,12 @@ static int reiserfs_releasepage(struct page *page, gfp_t unused_gfp_flags)
 
 /* We thank Mingming Cao for helping us understand in great detail what
    to do in this section of the code. */
-static ssize_t reiserfs_direct_IO(int rw, struct kiocb *iocb,
-				  const struct iovec *iov, loff_t offset,
-				  unsigned long nr_segs)
+static ssize_t reiserfs_direct_IO(struct kiocb *iocb, struct dio_args *args)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_mapping->host;
 
-	return blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov,
-				  offset, nr_segs,
+	return blockdev_direct_IO(iocb, inode, inode->i_sb->s_bdev, args,
 				  reiserfs_get_blocks_direct_io, NULL);
 }
 
diff --git a/fs/xfs/linux-2.6/xfs_aops.c b/fs/xfs/linux-2.6/xfs_aops.c
index aecf251..0faf1fe 100644
--- a/fs/xfs/linux-2.6/xfs_aops.c
+++ b/fs/xfs/linux-2.6/xfs_aops.c
@@ -1532,11 +1532,8 @@ xfs_end_io_direct(
 
 STATIC ssize_t
 xfs_vm_direct_IO(
-	int			rw,
 	struct kiocb		*iocb,
-	const struct iovec	*iov,
-	loff_t			offset,
-	unsigned long		nr_segs)
+	struct dio_args		*args)
 {
 	struct file	*file = iocb->ki_filp;
 	struct inode	*inode = file->f_mapping->host;
@@ -1545,18 +1542,14 @@ xfs_vm_direct_IO(
 
 	bdev = xfs_find_bdev_for_inode(XFS_I(inode));
 
-	if (rw == WRITE) {
+	if (args->rw == WRITE) {
 		iocb->private = xfs_alloc_ioend(inode, IOMAP_UNWRITTEN);
-		ret = blockdev_direct_IO_own_locking(rw, iocb, inode,
-			bdev, iov, offset, nr_segs,
-			xfs_get_blocks_direct,
-			xfs_end_io_direct);
+		ret = blockdev_direct_IO_own_locking(iocb, inode, bdev, args,
+				xfs_get_blocks_direct, xfs_end_io_direct);
 	} else {
 		iocb->private = xfs_alloc_ioend(inode, IOMAP_READ);
-		ret = blockdev_direct_IO_no_locking(rw, iocb, inode,
-			bdev, iov, offset, nr_segs,
-			xfs_get_blocks_direct,
-			xfs_end_io_direct);
+		ret = blockdev_direct_IO_no_locking(iocb, inode,
+			bdev, args, xfs_get_blocks_direct, xfs_end_io_direct);
 	}
 
 	if (unlikely(ret != -EIOCBQUEUED && iocb->private))
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 67888a9..5971116 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -560,6 +560,7 @@ typedef struct {
 typedef int (*read_actor_t)(read_descriptor_t *, struct page *,
 		unsigned long, unsigned long);
 
+struct dio_args;
 struct address_space_operations {
 	int (*writepage)(struct page *page, struct writeback_control *wbc);
 	int (*readpage)(struct file *, struct page *);
@@ -585,8 +586,7 @@ struct address_space_operations {
 	sector_t (*bmap)(struct address_space *, sector_t);
 	void (*invalidatepage) (struct page *, unsigned long);
 	int (*releasepage) (struct page *, gfp_t);
-	ssize_t (*direct_IO)(int, struct kiocb *, const struct iovec *iov,
-			loff_t offset, unsigned long nr_segs);
+	ssize_t (*direct_IO)(struct kiocb *, struct dio_args *);
 	int (*get_xip_mem)(struct address_space *, pgoff_t, int,
 						void **, unsigned long *);
 	/* migrate the contents of a page to the specified target */
@@ -2241,10 +2241,24 @@ static inline int xip_truncate_page(struct address_space *mapping, loff_t from)
 #endif
 
 #ifdef CONFIG_BLOCK
-ssize_t __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
-	struct block_device *bdev, const struct iovec *iov, loff_t offset,
-	unsigned long nr_segs, get_block_t get_block, dio_iodone_t end_io,
-	int lock_type);
+
+/*
+ * Arguments passwed to aops->direct_IO()
+ */
+struct dio_args {
+	int rw;
+	const struct iovec *iov;
+	unsigned long length;
+	loff_t offset;
+	unsigned long nr_segs;
+};
+
+ssize_t __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
+	struct block_device *bdev, struct dio_args *args, get_block_t get_block,
+	dio_iodone_t end_io, int lock_type);
+
+ssize_t generic_file_direct_IO(int, struct address_space *, struct kiocb *,
+				const struct iovec *, loff_t, unsigned long);
 
 enum {
 	DIO_LOCKING = 1, /* need locking between buffered and direct access */
@@ -2252,31 +2266,28 @@ enum {
 	DIO_OWN_LOCKING, /* filesystem locks buffered and direct internally */
 };
 
-static inline ssize_t blockdev_direct_IO(int rw, struct kiocb *iocb,
-	struct inode *inode, struct block_device *bdev, const struct iovec *iov,
-	loff_t offset, unsigned long nr_segs, get_block_t get_block,
-	dio_iodone_t end_io)
+static inline ssize_t blockdev_direct_IO(struct kiocb *iocb,
+	struct inode *inode, struct block_device *bdev, struct dio_args *args,
+	get_block_t get_block, dio_iodone_t end_io)
 {
-	return __blockdev_direct_IO(rw, iocb, inode, bdev, iov, offset,
-				nr_segs, get_block, end_io, DIO_LOCKING);
+	return __blockdev_direct_IO(iocb, inode, bdev, args,
+					get_block, end_io, DIO_LOCKING);
 }
 
-static inline ssize_t blockdev_direct_IO_no_locking(int rw, struct kiocb *iocb,
-	struct inode *inode, struct block_device *bdev, const struct iovec *iov,
-	loff_t offset, unsigned long nr_segs, get_block_t get_block,
-	dio_iodone_t end_io)
+static inline ssize_t blockdev_direct_IO_no_locking(struct kiocb *iocb,
+	struct inode *inode, struct block_device *bdev, struct dio_args *args,
+	get_block_t get_block, dio_iodone_t end_io)
 {
-	return __blockdev_direct_IO(rw, iocb, inode, bdev, iov, offset,
-				nr_segs, get_block, end_io, DIO_NO_LOCKING);
+	return __blockdev_direct_IO(iocb, inode, bdev, args,
+					get_block, end_io, DIO_NO_LOCKING);
 }
 
-static inline ssize_t blockdev_direct_IO_own_locking(int rw, struct kiocb *iocb,
-	struct inode *inode, struct block_device *bdev, const struct iovec *iov,
-	loff_t offset, unsigned long nr_segs, get_block_t get_block,
-	dio_iodone_t end_io)
+static inline ssize_t blockdev_direct_IO_own_locking(struct kiocb *iocb,
+	struct inode *inode, struct block_device *bdev, struct dio_args *args,
+	get_block_t get_block, dio_iodone_t end_io)
 {
-	return __blockdev_direct_IO(rw, iocb, inode, bdev, iov, offset,
-				nr_segs, get_block, end_io, DIO_OWN_LOCKING);
+	return __blockdev_direct_IO(iocb, inode, bdev, args,
+					get_block, end_io, DIO_OWN_LOCKING);
 }
 #endif
 
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index f6b9024..97a2383 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -408,8 +408,7 @@ extern int nfs3_removexattr (struct dentry *, const char *name);
 /*
  * linux/fs/nfs/direct.c
  */
-extern ssize_t nfs_direct_IO(int, struct kiocb *, const struct iovec *, loff_t,
-			unsigned long);
+extern ssize_t nfs_direct_IO(struct kiocb *, struct dio_args *);
 extern ssize_t nfs_file_direct_read(struct kiocb *iocb,
 			const struct iovec *iov, unsigned long nr_segs,
 			loff_t pos);
diff --git a/mm/filemap.c b/mm/filemap.c
index ccea3b6..cf85298 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -1345,8 +1345,9 @@ generic_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
 			retval = filemap_write_and_wait_range(mapping, pos,
 					pos + iov_length(iov, nr_segs) - 1);
 			if (!retval) {
-				retval = mapping->a_ops->direct_IO(READ, iocb,
-							iov, pos, nr_segs);
+				retval = generic_file_direct_IO(READ, mapping,
+								iocb, iov,
+								pos, nr_segs);
 			}
 			if (retval > 0)
 				*ppos = pos + retval;
@@ -2144,7 +2145,8 @@ generic_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
 		}
 	}
 
-	written = mapping->a_ops->direct_IO(WRITE, iocb, iov, pos, *nr_segs);
+	written = generic_file_direct_IO(WRITE, mapping, iocb, iov, pos,
+						*nr_segs);
 
 	/*
 	 * Finally, try again to invalidate clean pages which might have been
-- 
1.6.4.53.g3f55e


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 1/6] workqueue: replace singlethread/freezable/rt parameters and variables with flags
  2009-08-20 10:17 ` [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure Jens Axboe
@ 2009-08-20 10:17   ` Jens Axboe
  2009-08-20 10:17     ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Jens Axboe
  2009-08-21  0:03   ` [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure Andrew Morton
  1 sibling, 1 reply; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Collapse the three ints into a flags variable, in preparation for
adding another flag.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 include/linux/workqueue.h |   32 ++++++++++++++++++--------------
 kernel/workqueue.c        |   22 ++++++++--------------
 2 files changed, 26 insertions(+), 28 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 13e1adf..f14e20e 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -165,12 +165,17 @@ struct execute_work {
 
 
 extern struct workqueue_struct *
-__create_workqueue_key(const char *name, int singlethread,
-		       int freezeable, int rt, struct lock_class_key *key,
-		       const char *lock_name);
+__create_workqueue_key(const char *name, unsigned int flags,
+		       struct lock_class_key *key, const char *lock_name);
+
+enum {
+	WQ_F_SINGLETHREAD	= 1,
+	WQ_F_FREEZABLE		= 2,
+	WQ_F_RT			= 4,
+};
 
 #ifdef CONFIG_LOCKDEP
-#define __create_workqueue(name, singlethread, freezeable, rt)	\
+#define __create_workqueue(name, flags)				\
 ({								\
 	static struct lock_class_key __key;			\
 	const char *__lock_name;				\
@@ -180,20 +185,19 @@ __create_workqueue_key(const char *name, int singlethread,
 	else							\
 		__lock_name = #name;				\
 								\
-	__create_workqueue_key((name), (singlethread),		\
-			       (freezeable), (rt), &__key,	\
-			       __lock_name);			\
+	__create_workqueue_key((name), (flags), &__key, __lockname);	\
 })
 #else
-#define __create_workqueue(name, singlethread, freezeable, rt)	\
-	__create_workqueue_key((name), (singlethread), (freezeable), (rt), \
-			       NULL, NULL)
+#define __create_workqueue(name, flags)				\
+	__create_workqueue_key((name), (flags), NULL, NULL)	
 #endif
 
-#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
-#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
-#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
-#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
+#define create_workqueue(name)		__create_workqueue((name), 0)
+#define create_rt_workqueue(name)	__create_workqueue((name), WQ_F_RT)
+#define create_freezeable_workqueue(name)	\
+	__create_workqueue((name), WQ_F_SINGLETHREAD | WQ_F_FREEZABLE)
+#define create_singlethread_workqueue(name)	\
+	__create_workqueue((name), WQ_F_SINGLETHREAD)
 
 extern void destroy_workqueue(struct workqueue_struct *wq);
 
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 0668795..02ba7c9 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -60,9 +60,7 @@ struct workqueue_struct {
 	struct cpu_workqueue_struct *cpu_wq;
 	struct list_head list;
 	const char *name;
-	int singlethread;
-	int freezeable;		/* Freeze threads during suspend */
-	int rt;
+	unsigned int flags;	/* WQ_F_* flags */
 #ifdef CONFIG_LOCKDEP
 	struct lockdep_map lockdep_map;
 #endif
@@ -84,9 +82,9 @@ static const struct cpumask *cpu_singlethread_map __read_mostly;
 static cpumask_var_t cpu_populated_map __read_mostly;
 
 /* If it's single threaded, it isn't in the list of workqueues. */
-static inline int is_wq_single_threaded(struct workqueue_struct *wq)
+static inline bool is_wq_single_threaded(struct workqueue_struct *wq)
 {
-	return wq->singlethread;
+	return wq->flags & WQ_F_SINGLETHREAD;
 }
 
 static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
@@ -314,7 +312,7 @@ static int worker_thread(void *__cwq)
 	struct cpu_workqueue_struct *cwq = __cwq;
 	DEFINE_WAIT(wait);
 
-	if (cwq->wq->freezeable)
+	if (cwq->wq->flags & WQ_F_FREEZABLE)
 		set_freezable();
 
 	set_user_nice(current, -5);
@@ -768,7 +766,7 @@ static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
 	 */
 	if (IS_ERR(p))
 		return PTR_ERR(p);
-	if (cwq->wq->rt)
+	if (cwq->wq->flags & WQ_F_RT)
 		sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
 	cwq->thread = p;
 
@@ -789,9 +787,7 @@ static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
 }
 
 struct workqueue_struct *__create_workqueue_key(const char *name,
-						int singlethread,
-						int freezeable,
-						int rt,
+						unsigned int flags,
 						struct lock_class_key *key,
 						const char *lock_name)
 {
@@ -811,12 +807,10 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 
 	wq->name = name;
 	lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
-	wq->singlethread = singlethread;
-	wq->freezeable = freezeable;
-	wq->rt = rt;
+	wq->flags = flags;
 	INIT_LIST_HEAD(&wq->list);
 
-	if (singlethread) {
+	if (flags & WQ_F_SINGLETHREAD) {
 		cwq = init_cpu_workqueue(wq, singlethread_cpu);
 		err = create_workqueue_thread(cwq, singlethread_cpu);
 		start_workqueue_thread(cwq, -1);
-- 
1.6.4.173.g3f189


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 2/4] direct-io: make O_DIRECT IO path be page based
  2009-08-20 10:17   ` [PATCH 1/6] workqueue: replace singlethread/freezable/rt parameters and variables with flags Jens Axboe
@ 2009-08-20 10:17     ` Jens Axboe
  2009-08-20 10:17       ` [PATCH 2/6] workqueue: add support for lazy workqueues Jens Axboe
                         ` (2 more replies)
  0 siblings, 3 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Currently we pass in the iovec array and let the O_DIRECT core
handle the get_user_pages() business. This work, but it means that
we can ever only use user pages for O_DIRECT.

Switch the aops->direct_IO() and below code to use page arrays
instead, so that it doesn't make any assumptions about who the pages
belong to. This works directly for all users but NFS, which just
uses the same helper that the generic mapping read/write functions
also call.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 fs/direct-io.c         |  304 ++++++++++++++++++++----------------------------
 fs/nfs/direct.c        |  161 +++++++++----------------
 fs/nfs/file.c          |    8 +-
 include/linux/fs.h     |   15 ++-
 include/linux/nfs_fs.h |    7 +-
 mm/filemap.c           |    6 +-
 6 files changed, 206 insertions(+), 295 deletions(-)

diff --git a/fs/direct-io.c b/fs/direct-io.c
index 181848c..22a945b 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -38,12 +38,6 @@
 #include <asm/atomic.h>
 
 /*
- * How many user pages to map in one call to get_user_pages().  This determines
- * the size of a structure on the stack.
- */
-#define DIO_PAGES	64
-
-/*
  * This code generally works in units of "dio_blocks".  A dio_block is
  * somewhere between the hard sector size and the filesystem block size.  it
  * is determined on a per-invocation basis.   When talking to the filesystem
@@ -105,20 +99,13 @@ struct dio {
 	sector_t cur_page_block;	/* Where it starts */
 
 	/*
-	 * Page fetching state. These variables belong to dio_refill_pages().
-	 */
-	int curr_page;			/* changes */
-	int total_pages;		/* doesn't change */
-	unsigned long curr_user_address;/* changes */
-
-	/*
 	 * Page queue.  These variables belong to dio_refill_pages() and
 	 * dio_get_page().
 	 */
-	struct page *pages[DIO_PAGES];	/* page buffer */
-	unsigned head;			/* next page to process */
-	unsigned tail;			/* last valid page + 1 */
-	int page_errors;		/* errno from get_user_pages() */
+	struct page **pages;		/* page buffer */
+	unsigned int head_page;		/* next page to process */
+	unsigned int total_pages;	/* last valid page + 1 */
+	unsigned int first_page_off;	/* offset into first page in map */
 
 	/* BIO completion state */
 	spinlock_t bio_lock;		/* protects BIO fields below */
@@ -134,57 +121,6 @@ struct dio {
 };
 
 /*
- * How many pages are in the queue?
- */
-static inline unsigned dio_pages_present(struct dio *dio)
-{
-	return dio->tail - dio->head;
-}
-
-/*
- * Go grab and pin some userspace pages.   Typically we'll get 64 at a time.
- */
-static int dio_refill_pages(struct dio *dio)
-{
-	int ret;
-	int nr_pages;
-
-	nr_pages = min(dio->total_pages - dio->curr_page, DIO_PAGES);
-	ret = get_user_pages_fast(
-		dio->curr_user_address,		/* Where from? */
-		nr_pages,			/* How many pages? */
-		dio->rw == READ,		/* Write to memory? */
-		&dio->pages[0]);		/* Put results here */
-
-	if (ret < 0 && dio->blocks_available && (dio->rw & WRITE)) {
-		struct page *page = ZERO_PAGE(0);
-		/*
-		 * A memory fault, but the filesystem has some outstanding
-		 * mapped blocks.  We need to use those blocks up to avoid
-		 * leaking stale data in the file.
-		 */
-		if (dio->page_errors == 0)
-			dio->page_errors = ret;
-		page_cache_get(page);
-		dio->pages[0] = page;
-		dio->head = 0;
-		dio->tail = 1;
-		ret = 0;
-		goto out;
-	}
-
-	if (ret >= 0) {
-		dio->curr_user_address += ret * PAGE_SIZE;
-		dio->curr_page += ret;
-		dio->head = 0;
-		dio->tail = ret;
-		ret = 0;
-	}
-out:
-	return ret;	
-}
-
-/*
  * Get another userspace page.  Returns an ERR_PTR on error.  Pages are
  * buffered inside the dio so that we can call get_user_pages() against a
  * decent number of pages, less frequently.  To provide nicer use of the
@@ -192,15 +128,10 @@ out:
  */
 static struct page *dio_get_page(struct dio *dio)
 {
-	if (dio_pages_present(dio) == 0) {
-		int ret;
+	if (dio->head_page < dio->total_pages)
+		return dio->pages[dio->head_page++];
 
-		ret = dio_refill_pages(dio);
-		if (ret)
-			return ERR_PTR(ret);
-		BUG_ON(dio_pages_present(dio) == 0);
-	}
-	return dio->pages[dio->head++];
+	return NULL;
 }
 
 /**
@@ -245,8 +176,6 @@ static int dio_complete(struct dio *dio, loff_t offset, int ret)
 		up_read_non_owner(&dio->inode->i_alloc_sem);
 
 	if (ret == 0)
-		ret = dio->page_errors;
-	if (ret == 0)
 		ret = dio->io_error;
 	if (ret == 0)
 		ret = transferred;
@@ -351,8 +280,10 @@ static void dio_bio_submit(struct dio *dio)
  */
 static void dio_cleanup(struct dio *dio)
 {
-	while (dio_pages_present(dio))
-		page_cache_release(dio_get_page(dio));
+	struct page *page;
+
+	while ((page = dio_get_page(dio)) != NULL)
+		page_cache_release(page);
 }
 
 /*
@@ -490,7 +421,6 @@ static int dio_bio_reap(struct dio *dio)
  */
 static int get_more_blocks(struct dio *dio)
 {
-	int ret;
 	struct buffer_head *map_bh = &dio->map_bh;
 	sector_t fs_startblk;	/* Into file, in filesystem-sized blocks */
 	unsigned long fs_count;	/* Number of filesystem-sized blocks */
@@ -502,38 +432,33 @@ static int get_more_blocks(struct dio *dio)
 	 * If there was a memory error and we've overwritten all the
 	 * mapped blocks then we can now return that memory error
 	 */
-	ret = dio->page_errors;
-	if (ret == 0) {
-		BUG_ON(dio->block_in_file >= dio->final_block_in_request);
-		fs_startblk = dio->block_in_file >> dio->blkfactor;
-		dio_count = dio->final_block_in_request - dio->block_in_file;
-		fs_count = dio_count >> dio->blkfactor;
-		blkmask = (1 << dio->blkfactor) - 1;
-		if (dio_count & blkmask)	
-			fs_count++;
-
-		map_bh->b_state = 0;
-		map_bh->b_size = fs_count << dio->inode->i_blkbits;
-
-		create = dio->rw & WRITE;
-		if (dio->lock_type == DIO_LOCKING) {
-			if (dio->block_in_file < (i_size_read(dio->inode) >>
-							dio->blkbits))
-				create = 0;
-		} else if (dio->lock_type == DIO_NO_LOCKING) {
+	BUG_ON(dio->block_in_file >= dio->final_block_in_request);
+	fs_startblk = dio->block_in_file >> dio->blkfactor;
+	dio_count = dio->final_block_in_request - dio->block_in_file;
+	fs_count = dio_count >> dio->blkfactor;
+	blkmask = (1 << dio->blkfactor) - 1;
+	if (dio_count & blkmask)
+		fs_count++;
+
+	map_bh->b_state = 0;
+	map_bh->b_size = fs_count << dio->inode->i_blkbits;
+
+	create = dio->rw & WRITE;
+	if (dio->lock_type == DIO_LOCKING) {
+		if (dio->block_in_file < (i_size_read(dio->inode) >>
+						dio->blkbits))
 			create = 0;
-		}
-
-		/*
-		 * For writes inside i_size we forbid block creations: only
-		 * overwrites are permitted.  We fall back to buffered writes
-		 * at a higher level for inside-i_size block-instantiating
-		 * writes.
-		 */
-		ret = (*dio->get_block)(dio->inode, fs_startblk,
-						map_bh, create);
+	} else if (dio->lock_type == DIO_NO_LOCKING) {
+		create = 0;
 	}
-	return ret;
+
+	/*
+	 * For writes inside i_size we forbid block creations: only
+	 * overwrites are permitted.  We fall back to buffered writes
+	 * at a higher level for inside-i_size block-instantiating
+	 * writes.
+	 */
+	return dio->get_block(dio->inode, fs_startblk, map_bh, create);
 }
 
 /*
@@ -567,8 +492,8 @@ static int dio_bio_add_page(struct dio *dio)
 {
 	int ret;
 
-	ret = bio_add_page(dio->bio, dio->cur_page,
-			dio->cur_page_len, dio->cur_page_offset);
+	ret = bio_add_page(dio->bio, dio->cur_page, dio->cur_page_len,
+				dio->cur_page_offset);
 	if (ret == dio->cur_page_len) {
 		/*
 		 * Decrement count only, if we are done with this page
@@ -804,6 +729,9 @@ static int do_direct_IO(struct dio *dio)
 			unsigned this_chunk_blocks;	/* # of blocks */
 			unsigned u;
 
+			offset_in_page += dio->first_page_off;
+			dio->first_page_off = 0;
+
 			if (dio->blocks_available == 0) {
 				/*
 				 * Need to go and map some more disk
@@ -933,13 +861,10 @@ direct_io_worker(struct kiocb *iocb, struct inode *inode,
 	struct dio_args *args, unsigned blkbits, get_block_t get_block,
 	dio_iodone_t end_io, struct dio *dio)
 {
-	const struct iovec *iov = args->iov;
-	unsigned long user_addr;
 	unsigned long flags;
-	int seg, rw = args->rw;
+	int rw = args->rw;
 	ssize_t ret = 0;
 	ssize_t ret2;
-	size_t bytes;
 
 	dio->inode = inode;
 	dio->rw = rw;
@@ -965,46 +890,25 @@ direct_io_worker(struct kiocb *iocb, struct inode *inode,
 	if (unlikely(dio->blkfactor))
 		dio->pages_in_io = 2;
 
-	for (seg = 0; seg < args->nr_segs; seg++) {
-		user_addr = (unsigned long) iov[seg].iov_base;
-		dio->pages_in_io +=
-			((user_addr+iov[seg].iov_len +PAGE_SIZE-1)/PAGE_SIZE
-				- user_addr/PAGE_SIZE);
-	}
+	dio->pages_in_io += args->nr_segs;
+	dio->size = args->length;
+	if (args->user_addr) {
+		dio->first_page_off = args->user_addr & ~PAGE_MASK;
+		dio->first_block_in_page = dio->first_page_off >> blkbits;
+		if (dio->first_block_in_page)
+			dio->first_page_off -= 1 << blkbits;
+	} else
+		dio->first_page_off = args->first_page_off;
 
-	for (seg = 0; seg < args->nr_segs; seg++) {
-		user_addr = (unsigned long)iov[seg].iov_base;
-		dio->size += bytes = iov[seg].iov_len;
-
-		/* Index into the first page of the first block */
-		dio->first_block_in_page = (user_addr & ~PAGE_MASK) >> blkbits;
-		dio->final_block_in_request = dio->block_in_file +
-						(bytes >> blkbits);
-		/* Page fetching state */
-		dio->head = 0;
-		dio->tail = 0;
-		dio->curr_page = 0;
-
-		dio->total_pages = 0;
-		if (user_addr & (PAGE_SIZE-1)) {
-			dio->total_pages++;
-			bytes -= PAGE_SIZE - (user_addr & (PAGE_SIZE - 1));
-		}
-		dio->total_pages += (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
-		dio->curr_user_address = user_addr;
-	
-		ret = do_direct_IO(dio);
+	dio->final_block_in_request = dio->block_in_file + (dio->size >> blkbits);
+	dio->head_page = 0;
+	dio->total_pages = args->nr_segs;
 
-		dio->result += iov[seg].iov_len -
+	ret = do_direct_IO(dio);
+
+	dio->result += args->length -
 			((dio->final_block_in_request - dio->block_in_file) <<
 					blkbits);
-
-		if (ret) {
-			dio_cleanup(dio);
-			break;
-		}
-	} /* end iovec loop */
-
 	if (ret == -ENOTBLK && (rw & WRITE)) {
 		/*
 		 * The remaining part of the request will be
@@ -1110,9 +1014,6 @@ __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	struct block_device *bdev, struct dio_args *args, get_block_t get_block,
 	dio_iodone_t end_io, int dio_lock_type)
 {
-	int seg;
-	size_t size;
-	unsigned long addr;
 	unsigned blkbits = inode->i_blkbits;
 	unsigned bdev_blkbits = 0;
 	unsigned blocksize_mask = (1 << blkbits) - 1;
@@ -1138,17 +1039,14 @@ __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	}
 
 	/* Check the memory alignment.  Blocks cannot straddle pages */
-	for (seg = 0; seg < args->nr_segs; seg++) {
-		addr = (unsigned long) args->iov[seg].iov_base;
-		size = args->iov[seg].iov_len;
-		end += size;
-		if ((addr & blocksize_mask) || (size & blocksize_mask))  {
-			if (bdev)
-				 blkbits = bdev_blkbits;
-			blocksize_mask = (1 << blkbits) - 1;
-			if ((addr & blocksize_mask) || (size & blocksize_mask))  
-				goto out;
-		}
+	if ((args->user_addr & blocksize_mask) ||
+	    (args->length & blocksize_mask))  {
+		if (bdev)
+			 blkbits = bdev_blkbits;
+		blocksize_mask = (1 << blkbits) - 1;
+		if ((args->user_addr & blocksize_mask) ||
+		    (args->length & blocksize_mask))
+			goto out;
 	}
 
 	dio = kzalloc(sizeof(*dio), GFP_KERNEL);
@@ -1156,6 +1054,8 @@ __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	if (!dio)
 		goto out;
 
+	dio->pages = args->pages;
+
 	/*
 	 * For block device access DIO_NO_LOCKING is used,
 	 *	neither readers nor writers do any locking at all
@@ -1232,20 +1132,70 @@ out:
 }
 EXPORT_SYMBOL(__blockdev_direct_IO);
 
-ssize_t generic_file_direct_IO(int rw, struct address_space *mapping,
-			       struct kiocb *iocb, const struct iovec *iov,
-			       loff_t offset, unsigned long nr_segs)
+static ssize_t __generic_file_direct_IO(int rw, struct address_space *mapping,
+					struct kiocb *iocb,
+					const struct iovec *iov, loff_t offset,
+					dio_io_actor *actor)
 {
+	struct page *stack_pages[UIO_FASTIOV];
+	unsigned long nr_pages, start, end;
 	struct dio_args args = {
-		.rw		= rw,
-		.iov		= iov,
-		.length		= iov_length(iov, nr_segs),
+		.pages		= stack_pages,
+		.length		= iov->iov_len,
+		.user_addr	= (unsigned long) iov->iov_base,
 		.offset		= offset,
-		.nr_segs	= nr_segs,
 	};
+	ssize_t ret;
+
+	end = (args.user_addr + iov->iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
+	start = args.user_addr >> PAGE_SHIFT;
+	nr_pages = end - start;
+
+	if (nr_pages >= UIO_FASTIOV) {
+		args.pages = kzalloc(nr_pages * sizeof(struct page *),
+					GFP_KERNEL);
+		if (!args.pages)
+			return -ENOMEM;
+	}
+
+	ret = get_user_pages_fast(args.user_addr, nr_pages, rw == READ,
+					args.pages);
+	if (ret > 0) {
+		args.nr_segs = ret;
+		ret = actor(iocb, &args);
+	}
 
-	if (mapping->a_ops->direct_IO)
-		return mapping->a_ops->direct_IO(iocb, &args);
+	if (args.pages != stack_pages)
+		kfree(args.pages);
 
-	return -EINVAL;
+	return ret;
+}
+
+/*
+ * Transform the iov into a page based structure for passing into the lower
+ * parts of O_DIRECT handling
+ */
+ssize_t generic_file_direct_IO(int rw, struct address_space *mapping,
+			       struct kiocb *kiocb, const struct iovec *iov,
+			       loff_t offset, unsigned long nr_segs,
+			       dio_io_actor *actor)
+{
+	ssize_t ret = 0, ret2;
+	unsigned long i;
+
+	for (i = 0; i < nr_segs; i++) {
+		ret2 = __generic_file_direct_IO(rw, mapping, kiocb, iov, offset,
+							actor);
+		if (ret2 < 0) {
+			if (!ret)
+				ret = ret2;
+			break;
+		}
+		iov++;
+		offset += ret2;
+		ret += ret2;
+	}
+
+	return ret;
 }
+EXPORT_SYMBOL_GPL(generic_file_direct_IO);
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 45d931b..d9da548 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -271,13 +271,12 @@ static const struct rpc_call_ops nfs_read_direct_ops = {
  * no requests have been sent, just return an error.
  */
 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
-						const struct iovec *iov,
-						loff_t pos)
+						struct dio_args *args)
 {
 	struct nfs_open_context *ctx = dreq->ctx;
 	struct inode *inode = ctx->path.dentry->d_inode;
-	unsigned long user_addr = (unsigned long)iov->iov_base;
-	size_t count = iov->iov_len;
+	unsigned long user_addr = args->user_addr;
+	size_t count = args->length;
 	size_t rsize = NFS_SERVER(inode)->rsize;
 	struct rpc_task *task;
 	struct rpc_message msg = {
@@ -306,24 +305,8 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
 		if (unlikely(!data))
 			break;
 
-		down_read(&current->mm->mmap_sem);
-		result = get_user_pages(current, current->mm, user_addr,
-					data->npages, 1, 0, data->pagevec, NULL);
-		up_read(&current->mm->mmap_sem);
-		if (result < 0) {
-			nfs_readdata_free(data);
-			break;
-		}
-		if ((unsigned)result < data->npages) {
-			bytes = result * PAGE_SIZE;
-			if (bytes <= pgbase) {
-				nfs_direct_release_pages(data->pagevec, result);
-				nfs_readdata_free(data);
-				break;
-			}
-			bytes -= pgbase;
-			data->npages = result;
-		}
+		data->pagevec = args->pages;
+		data->npages = args->nr_segs;
 
 		get_dreq(dreq);
 
@@ -332,7 +315,7 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
 		data->cred = msg.rpc_cred;
 		data->args.fh = NFS_FH(inode);
 		data->args.context = ctx;
-		data->args.offset = pos;
+		data->args.offset = args->offset;
 		data->args.pgbase = pgbase;
 		data->args.pages = data->pagevec;
 		data->args.count = bytes;
@@ -361,7 +344,7 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
 
 		started += bytes;
 		user_addr += bytes;
-		pos += bytes;
+		args->offset += bytes;
 		/* FIXME: Remove this unnecessary math from final patch */
 		pgbase += bytes;
 		pgbase &= ~PAGE_MASK;
@@ -376,26 +359,19 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
 }
 
 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
-					      const struct iovec *iov,
-					      unsigned long nr_segs,
-					      loff_t pos)
+					      struct dio_args *args)
 {
 	ssize_t result = -EINVAL;
 	size_t requested_bytes = 0;
-	unsigned long seg;
 
 	get_dreq(dreq);
 
-	for (seg = 0; seg < nr_segs; seg++) {
-		const struct iovec *vec = &iov[seg];
-		result = nfs_direct_read_schedule_segment(dreq, vec, pos);
-		if (result < 0)
-			break;
-		requested_bytes += result;
-		if ((size_t)result < vec->iov_len)
-			break;
-		pos += vec->iov_len;
-	}
+	result = nfs_direct_read_schedule_segment(dreq, args);
+	if (result < 0)
+		goto out;
+
+	requested_bytes += result;
+	args += result;
 
 	if (put_dreq(dreq))
 		nfs_direct_complete(dreq);
@@ -403,13 +379,13 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
 	if (requested_bytes != 0)
 		return 0;
 
+out:
 	if (result < 0)
 		return result;
 	return -EIO;
 }
 
-static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
-			       unsigned long nr_segs, loff_t pos)
+static ssize_t nfs_direct_read(struct kiocb *iocb, struct dio_args *args)
 {
 	ssize_t result = 0;
 	struct inode *inode = iocb->ki_filp->f_mapping->host;
@@ -424,7 +400,7 @@ static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
 	if (!is_sync_kiocb(iocb))
 		dreq->iocb = iocb;
 
-	result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos);
+	result = nfs_direct_read_schedule_iovec(dreq, args);
 	if (!result)
 		result = nfs_direct_wait(dreq);
 	nfs_direct_req_release(dreq);
@@ -691,13 +667,13 @@ static const struct rpc_call_ops nfs_write_direct_ops = {
  * no requests have been sent, just return an error.
  */
 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
-						 const struct iovec *iov,
-						 loff_t pos, int sync)
+						 struct dio_args *args,
+						 int sync)
 {
 	struct nfs_open_context *ctx = dreq->ctx;
 	struct inode *inode = ctx->path.dentry->d_inode;
-	unsigned long user_addr = (unsigned long)iov->iov_base;
-	size_t count = iov->iov_len;
+	unsigned long user_addr = args->user_addr;
+	size_t count = args->length;
 	struct rpc_task *task;
 	struct rpc_message msg = {
 		.rpc_cred = ctx->cred,
@@ -726,24 +702,8 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
 		if (unlikely(!data))
 			break;
 
-		down_read(&current->mm->mmap_sem);
-		result = get_user_pages(current, current->mm, user_addr,
-					data->npages, 0, 0, data->pagevec, NULL);
-		up_read(&current->mm->mmap_sem);
-		if (result < 0) {
-			nfs_writedata_free(data);
-			break;
-		}
-		if ((unsigned)result < data->npages) {
-			bytes = result * PAGE_SIZE;
-			if (bytes <= pgbase) {
-				nfs_direct_release_pages(data->pagevec, result);
-				nfs_writedata_free(data);
-				break;
-			}
-			bytes -= pgbase;
-			data->npages = result;
-		}
+		data->pagevec = args->pages;
+		data->npages = args->nr_segs;
 
 		get_dreq(dreq);
 
@@ -754,7 +714,7 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
 		data->cred = msg.rpc_cred;
 		data->args.fh = NFS_FH(inode);
 		data->args.context = ctx;
-		data->args.offset = pos;
+		data->args.offset = args->offset;
 		data->args.pgbase = pgbase;
 		data->args.pages = data->pagevec;
 		data->args.count = bytes;
@@ -784,7 +744,7 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
 
 		started += bytes;
 		user_addr += bytes;
-		pos += bytes;
+		args->offset += bytes;
 
 		/* FIXME: Remove this useless math from the final patch */
 		pgbase += bytes;
@@ -800,27 +760,19 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
 }
 
 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
-					       const struct iovec *iov,
-					       unsigned long nr_segs,
-					       loff_t pos, int sync)
+					       struct dio_args *args, int sync)
 {
 	ssize_t result = 0;
 	size_t requested_bytes = 0;
-	unsigned long seg;
 
 	get_dreq(dreq);
 
-	for (seg = 0; seg < nr_segs; seg++) {
-		const struct iovec *vec = &iov[seg];
-		result = nfs_direct_write_schedule_segment(dreq, vec,
-							   pos, sync);
-		if (result < 0)
-			break;
-		requested_bytes += result;
-		if ((size_t)result < vec->iov_len)
-			break;
-		pos += vec->iov_len;
-	}
+	result = nfs_direct_write_schedule_segment(dreq, args, sync);
+	if (result < 0)
+		goto out;
+
+	requested_bytes += result;
+	args->offset += result;
 
 	if (put_dreq(dreq))
 		nfs_direct_write_complete(dreq, dreq->inode);
@@ -828,14 +780,13 @@ static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
 	if (requested_bytes != 0)
 		return 0;
 
+out:
 	if (result < 0)
 		return result;
 	return -EIO;
 }
 
-static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
-				unsigned long nr_segs, loff_t pos,
-				size_t count)
+static ssize_t nfs_direct_write(struct kiocb *iocb, struct dio_args *args)
 {
 	ssize_t result = 0;
 	struct inode *inode = iocb->ki_filp->f_mapping->host;
@@ -848,7 +799,7 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
 		return -ENOMEM;
 	nfs_alloc_commit_data(dreq);
 
-	if (dreq->commit_data == NULL || count < wsize)
+	if (dreq->commit_data == NULL || args->length < wsize)
 		sync = NFS_FILE_SYNC;
 
 	dreq->inode = inode;
@@ -856,7 +807,7 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
 	if (!is_sync_kiocb(iocb))
 		dreq->iocb = iocb;
 
-	result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync);
+	result = nfs_direct_write_schedule_iovec(dreq, args, sync);
 	if (!result)
 		result = nfs_direct_wait(dreq);
 	nfs_direct_req_release(dreq);
@@ -867,9 +818,7 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
 /**
  * nfs_file_direct_read - file direct read operation for NFS files
  * @iocb: target I/O control block
- * @iov: vector of user buffers into which to read data
- * @nr_segs: size of iov vector
- * @pos: byte offset in file where reading starts
+ * @args: direct IO arguments
  *
  * We use this function for direct reads instead of calling
  * generic_file_aio_read() in order to avoid gfar's check to see if
@@ -885,21 +834,20 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
  * client must read the updated atime from the server back into its
  * cache.
  */
-ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
-				unsigned long nr_segs, loff_t pos)
+static ssize_t nfs_file_direct_read(struct kiocb *iocb, struct dio_args *args)
 {
 	ssize_t retval = -EINVAL;
 	struct file *file = iocb->ki_filp;
 	struct address_space *mapping = file->f_mapping;
 	size_t count;
 
-	count = iov_length(iov, nr_segs);
+	count = args->length;
 	nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
 
 	dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n",
 		file->f_path.dentry->d_parent->d_name.name,
 		file->f_path.dentry->d_name.name,
-		count, (long long) pos);
+		count, (long long) args->offset);
 
 	retval = 0;
 	if (!count)
@@ -909,9 +857,9 @@ ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
 	if (retval)
 		goto out;
 
-	retval = nfs_direct_read(iocb, iov, nr_segs, pos);
+	retval = nfs_direct_read(iocb, args);
 	if (retval > 0)
-		iocb->ki_pos = pos + retval;
+		iocb->ki_pos = args->offset + retval;
 
 out:
 	return retval;
@@ -920,9 +868,7 @@ out:
 /**
  * nfs_file_direct_write - file direct write operation for NFS files
  * @iocb: target I/O control block
- * @iov: vector of user buffers from which to write data
- * @nr_segs: size of iov vector
- * @pos: byte offset in file where writing starts
+ * @args: direct IO arguments
  *
  * We use this function for direct writes instead of calling
  * generic_file_aio_write() in order to avoid taking the inode
@@ -942,23 +888,22 @@ out:
  * Note that O_APPEND is not supported for NFS direct writes, as there
  * is no atomic O_APPEND write facility in the NFS protocol.
  */
-ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
-				unsigned long nr_segs, loff_t pos)
+static ssize_t nfs_file_direct_write(struct kiocb *iocb, struct dio_args *args)
 {
 	ssize_t retval = -EINVAL;
 	struct file *file = iocb->ki_filp;
 	struct address_space *mapping = file->f_mapping;
 	size_t count;
 
-	count = iov_length(iov, nr_segs);
+	count = args->length;
 	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
 
 	dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n",
 		file->f_path.dentry->d_parent->d_name.name,
 		file->f_path.dentry->d_name.name,
-		count, (long long) pos);
+		count, (long long) args->offset);
 
-	retval = generic_write_checks(file, &pos, &count, 0);
+	retval = generic_write_checks(file, &args->offset, &count, 0);
 	if (retval)
 		goto out;
 
@@ -973,15 +918,23 @@ ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
 	if (retval)
 		goto out;
 
-	retval = nfs_direct_write(iocb, iov, nr_segs, pos, count);
+	retval = nfs_direct_write(iocb, args);
 
 	if (retval > 0)
-		iocb->ki_pos = pos + retval;
+		iocb->ki_pos = args->offset + retval;
 
 out:
 	return retval;
 }
 
+ssize_t nfs_file_direct_io(struct kiocb *kiocb, struct dio_args *args)
+{
+	if (args->rw == READ)
+		return nfs_file_direct_read(kiocb, args);
+
+	return nfs_file_direct_write(kiocb, args);
+}
+
 /**
  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
  *
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index 0506232..97d8cc7 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -249,13 +249,15 @@ static ssize_t
 nfs_file_read(struct kiocb *iocb, const struct iovec *iov,
 		unsigned long nr_segs, loff_t pos)
 {
+	struct address_space *mapping = iocb->ki_filp->f_mapping;
 	struct dentry * dentry = iocb->ki_filp->f_path.dentry;
 	struct inode * inode = dentry->d_inode;
 	ssize_t result;
 	size_t count = iov_length(iov, nr_segs);
 
 	if (iocb->ki_filp->f_flags & O_DIRECT)
-		return nfs_file_direct_read(iocb, iov, nr_segs, pos);
+		return generic_file_direct_IO(READ, mapping, iocb, iov, pos,
+						nr_segs, nfs_file_direct_io);
 
 	dprintk("NFS: read(%s/%s, %lu@%lu)\n",
 		dentry->d_parent->d_name.name, dentry->d_name.name,
@@ -546,13 +548,15 @@ static int nfs_need_sync_write(struct file *filp, struct inode *inode)
 static ssize_t nfs_file_write(struct kiocb *iocb, const struct iovec *iov,
 				unsigned long nr_segs, loff_t pos)
 {
+	struct address_space *mapping = iocb->ki_filp->f_mapping;
 	struct dentry * dentry = iocb->ki_filp->f_path.dentry;
 	struct inode * inode = dentry->d_inode;
 	ssize_t result;
 	size_t count = iov_length(iov, nr_segs);
 
 	if (iocb->ki_filp->f_flags & O_DIRECT)
-		return nfs_file_direct_write(iocb, iov, nr_segs, pos);
+		return generic_file_direct_IO(WRITE, mapping, iocb, iov, pos,
+						nr_segs, nfs_file_direct_io);
 
 	dprintk("NFS: write(%s/%s, %lu@%Ld)\n",
 		dentry->d_parent->d_name.name, dentry->d_name.name,
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 5971116..539994a 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -2247,18 +2247,27 @@ static inline int xip_truncate_page(struct address_space *mapping, loff_t from)
  */
 struct dio_args {
 	int rw;
-	const struct iovec *iov;
+	struct page **pages;
+	unsigned int first_page_off;
+	unsigned long nr_segs;
 	unsigned long length;
 	loff_t offset;
-	unsigned long nr_segs;
+
+	/*
+	 * Original user pointer, we'll get rid of this
+	 */
+	unsigned long user_addr;
 };
 
 ssize_t __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	struct block_device *bdev, struct dio_args *args, get_block_t get_block,
 	dio_iodone_t end_io, int lock_type);
 
+typedef ssize_t (dio_io_actor)(struct kiocb *, struct dio_args *);
+
 ssize_t generic_file_direct_IO(int, struct address_space *, struct kiocb *,
-				const struct iovec *, loff_t, unsigned long);
+				const struct iovec *, loff_t, unsigned long,
+				dio_io_actor);
 
 enum {
 	DIO_LOCKING = 1, /* need locking between buffered and direct access */
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index 97a2383..ded8337 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -409,12 +409,7 @@ extern int nfs3_removexattr (struct dentry *, const char *name);
  * linux/fs/nfs/direct.c
  */
 extern ssize_t nfs_direct_IO(struct kiocb *, struct dio_args *);
-extern ssize_t nfs_file_direct_read(struct kiocb *iocb,
-			const struct iovec *iov, unsigned long nr_segs,
-			loff_t pos);
-extern ssize_t nfs_file_direct_write(struct kiocb *iocb,
-			const struct iovec *iov, unsigned long nr_segs,
-			loff_t pos);
+extern ssize_t nfs_file_direct_io(struct kiocb *, struct dio_args *);
 
 /*
  * linux/fs/nfs/dir.c
diff --git a/mm/filemap.c b/mm/filemap.c
index cf85298..3e03021 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -1346,8 +1346,8 @@ generic_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
 					pos + iov_length(iov, nr_segs) - 1);
 			if (!retval) {
 				retval = generic_file_direct_IO(READ, mapping,
-								iocb, iov,
-								pos, nr_segs);
+						iocb, iov, pos, nr_segs,
+						mapping->a_ops->direct_IO);
 			}
 			if (retval > 0)
 				*ppos = pos + retval;
@@ -2146,7 +2146,7 @@ generic_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
 	}
 
 	written = generic_file_direct_IO(WRITE, mapping, iocb, iov, pos,
-						*nr_segs);
+					*nr_segs, mapping->a_ops->direct_IO);
 
 	/*
 	 * Finally, try again to invalidate clean pages which might have been
-- 
1.6.4.53.g3f55e


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 2/6] workqueue: add support for lazy workqueues
  2009-08-20 10:17     ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Jens Axboe
@ 2009-08-20 10:17       ` Jens Axboe
  2009-08-20 10:17         ` [PATCH 3/6] crypto: use " Jens Axboe
  2009-08-21  0:20         ` [PATCH 2/6] workqueue: add support for " Andrew Morton
  2009-08-20 13:19       ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Trond Myklebust
  2009-08-21  0:12       ` Andrew Morton
  2 siblings, 2 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Lazy workqueues are like normal workqueues, except they don't
start a thread per CPU by default. Instead threads are started
when they are needed, and exit when they have been idle for
some time.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 include/linux/workqueue.h |    5 ++
 kernel/workqueue.c        |  152 ++++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 147 insertions(+), 10 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index f14e20e..b2dd267 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -32,6 +32,7 @@ struct work_struct {
 #ifdef CONFIG_LOCKDEP
 	struct lockdep_map lockdep_map;
 #endif
+	unsigned int cpu;
 };
 
 #define WORK_DATA_INIT()	ATOMIC_LONG_INIT(0)
@@ -172,6 +173,7 @@ enum {
 	WQ_F_SINGLETHREAD	= 1,
 	WQ_F_FREEZABLE		= 2,
 	WQ_F_RT			= 4,
+	WQ_F_LAZY		= 8,
 };
 
 #ifdef CONFIG_LOCKDEP
@@ -198,6 +200,7 @@ enum {
 	__create_workqueue((name), WQ_F_SINGLETHREAD | WQ_F_FREEZABLE)
 #define create_singlethread_workqueue(name)	\
 	__create_workqueue((name), WQ_F_SINGLETHREAD)
+#define create_lazy_workqueue(name)	__create_workqueue((name), WQ_F_LAZY)
 
 extern void destroy_workqueue(struct workqueue_struct *wq);
 
@@ -211,6 +214,8 @@ extern int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 
 extern void flush_workqueue(struct workqueue_struct *wq);
 extern void flush_scheduled_work(void);
+extern void workqueue_set_lazy_timeout(struct workqueue_struct *wq,
+			unsigned long timeout);
 
 extern int schedule_work(struct work_struct *work);
 extern int schedule_work_on(int cpu, struct work_struct *work);
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 02ba7c9..d9ccebc 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -61,11 +61,17 @@ struct workqueue_struct {
 	struct list_head list;
 	const char *name;
 	unsigned int flags;	/* WQ_F_* flags */
+	unsigned long lazy_timeout;
+	unsigned int core_cpu;
 #ifdef CONFIG_LOCKDEP
 	struct lockdep_map lockdep_map;
 #endif
 };
 
+/* Default lazy workqueue timeout */
+#define WQ_DEF_LAZY_TIMEOUT	(60 * HZ)
+
+
 /* Serializes the accesses to the list of workqueues. */
 static DEFINE_SPINLOCK(workqueue_lock);
 static LIST_HEAD(workqueues);
@@ -81,6 +87,8 @@ static const struct cpumask *cpu_singlethread_map __read_mostly;
  */
 static cpumask_var_t cpu_populated_map __read_mostly;
 
+static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu);
+
 /* If it's single threaded, it isn't in the list of workqueues. */
 static inline bool is_wq_single_threaded(struct workqueue_struct *wq)
 {
@@ -141,11 +149,29 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
 static void __queue_work(struct cpu_workqueue_struct *cwq,
 			 struct work_struct *work)
 {
+	struct workqueue_struct *wq = cwq->wq;
 	unsigned long flags;
 
-	spin_lock_irqsave(&cwq->lock, flags);
-	insert_work(cwq, work, &cwq->worklist);
-	spin_unlock_irqrestore(&cwq->lock, flags);
+	/*
+	 * This is a lazy workqueue and this particular CPU thread has
+	 * exited. We can't create it from here, so add this work on our
+	 * static thread. It will create this thread and move the work there.
+	 */
+	if ((wq->flags & WQ_F_LAZY) && !cwq->thread) {
+		struct cpu_workqueue_struct *__cwq;
+
+		local_irq_save(flags);
+		__cwq = wq_per_cpu(wq, wq->core_cpu);
+		work->cpu = smp_processor_id();
+		spin_lock(&__cwq->lock);
+		insert_work(__cwq, work, &__cwq->worklist);
+		spin_unlock_irqrestore(&__cwq->lock, flags);
+	} else {
+		spin_lock_irqsave(&cwq->lock, flags);
+		work->cpu = smp_processor_id();
+		insert_work(cwq, work, &cwq->worklist);
+		spin_unlock_irqrestore(&cwq->lock, flags);
+	}
 }
 
 /**
@@ -259,13 +285,16 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 }
 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
 
-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+static int run_workqueue(struct cpu_workqueue_struct *cwq)
 {
+	int did_work = 0;
+
 	spin_lock_irq(&cwq->lock);
 	while (!list_empty(&cwq->worklist)) {
 		struct work_struct *work = list_entry(cwq->worklist.next,
 						struct work_struct, entry);
 		work_func_t f = work->func;
+		int cpu;
 #ifdef CONFIG_LOCKDEP
 		/*
 		 * It is permissible to free the struct work_struct
@@ -280,7 +309,34 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
 		trace_workqueue_execution(cwq->thread, work);
 		cwq->current_work = work;
 		list_del_init(cwq->worklist.next);
+		cpu = smp_processor_id();
 		spin_unlock_irq(&cwq->lock);
+		did_work = 1;
+
+		/*
+		 * If work->cpu isn't us, then we need to create the target
+		 * workqueue thread (if someone didn't already do that) and
+		 * move the work over there.
+		 */
+		if ((cwq->wq->flags & WQ_F_LAZY) && work->cpu != cpu) {
+			struct cpu_workqueue_struct *__cwq;
+			struct task_struct *p;
+			int err;
+
+			__cwq = wq_per_cpu(cwq->wq, work->cpu);
+			p = __cwq->thread;
+			if (!p)
+				err = create_workqueue_thread(__cwq, work->cpu);
+			p = __cwq->thread;
+			if (p) {
+				if (work->cpu >= 0)
+					kthread_bind(p, work->cpu);
+				insert_work(__cwq, work, &__cwq->worklist);
+				wake_up_process(p);
+				goto out;
+			}
+		}
+
 
 		BUG_ON(get_wq_data(work) != cwq);
 		work_clear_pending(work);
@@ -305,24 +361,45 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
 		cwq->current_work = NULL;
 	}
 	spin_unlock_irq(&cwq->lock);
+out:
+	return did_work;
 }
 
 static int worker_thread(void *__cwq)
 {
 	struct cpu_workqueue_struct *cwq = __cwq;
+	struct workqueue_struct *wq = cwq->wq;
+	unsigned long last_active = jiffies;
 	DEFINE_WAIT(wait);
+	int may_exit;
 
-	if (cwq->wq->flags & WQ_F_FREEZABLE)
+	if (wq->flags & WQ_F_FREEZABLE)
 		set_freezable();
 
 	set_user_nice(current, -5);
 
+	/*
+	 * Allow exit if this isn't our core thread
+	 */
+	if ((wq->flags & WQ_F_LAZY) && smp_processor_id() != wq->core_cpu)
+		may_exit = 1;
+	else
+		may_exit = 0;
+
 	for (;;) {
+		int did_work;
+
 		prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
 		if (!freezing(current) &&
 		    !kthread_should_stop() &&
-		    list_empty(&cwq->worklist))
-			schedule();
+		    list_empty(&cwq->worklist)) {
+			unsigned long timeout = wq->lazy_timeout;
+
+			if (timeout && may_exit)
+				schedule_timeout(timeout);
+			else
+				schedule();
+		}
 		finish_wait(&cwq->more_work, &wait);
 
 		try_to_freeze();
@@ -330,7 +407,19 @@ static int worker_thread(void *__cwq)
 		if (kthread_should_stop())
 			break;
 
-		run_workqueue(cwq);
+		did_work = run_workqueue(cwq);
+
+		/*
+		 * If we did no work for the defined timeout period and we are
+		 * allowed to exit, do so.
+		 */
+		if (did_work)
+			last_active = jiffies;
+		else if (time_after(jiffies, last_active + wq->lazy_timeout) &&
+			 may_exit) {
+			cwq->thread = NULL;
+			break;
+		}
 	}
 
 	return 0;
@@ -814,7 +903,10 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 		cwq = init_cpu_workqueue(wq, singlethread_cpu);
 		err = create_workqueue_thread(cwq, singlethread_cpu);
 		start_workqueue_thread(cwq, -1);
+		wq->core_cpu = singlethread_cpu;
 	} else {
+		int created = 0;
+
 		cpu_maps_update_begin();
 		/*
 		 * We must place this wq on list even if the code below fails.
@@ -833,10 +925,16 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 		 */
 		for_each_possible_cpu(cpu) {
 			cwq = init_cpu_workqueue(wq, cpu);
-			if (err || !cpu_online(cpu))
+			if (err || !cpu_online(cpu) ||
+			    (created && (wq->flags & WQ_F_LAZY)))
 				continue;
 			err = create_workqueue_thread(cwq, cpu);
 			start_workqueue_thread(cwq, cpu);
+			if (!err) {
+				if (!created)
+					wq->core_cpu = cpu;
+				created++;
+			}
 		}
 		cpu_maps_update_done();
 	}
@@ -844,7 +942,9 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 	if (err) {
 		destroy_workqueue(wq);
 		wq = NULL;
-	}
+	} else if (wq->flags & WQ_F_LAZY)
+		workqueue_set_lazy_timeout(wq, WQ_DEF_LAZY_TIMEOUT);
+
 	return wq;
 }
 EXPORT_SYMBOL_GPL(__create_workqueue_key);
@@ -877,6 +977,13 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
 	cwq->thread = NULL;
 }
 
+static bool hotplug_should_start_thread(struct workqueue_struct *wq, int cpu)
+{
+	if ((wq->flags & WQ_F_LAZY) && cpu != wq->core_cpu)
+		return 0;
+	return 1;
+}
+
 /**
  * destroy_workqueue - safely terminate a workqueue
  * @wq: target workqueue
@@ -923,6 +1030,8 @@ undo:
 
 		switch (action) {
 		case CPU_UP_PREPARE:
+			if (!hotplug_should_start_thread(wq, cpu))
+				break;
 			if (!create_workqueue_thread(cwq, cpu))
 				break;
 			printk(KERN_ERR "workqueue [%s] for %i failed\n",
@@ -932,6 +1041,8 @@ undo:
 			goto undo;
 
 		case CPU_ONLINE:
+			if (!hotplug_should_start_thread(wq, cpu))
+				break;
 			start_workqueue_thread(cwq, cpu);
 			break;
 
@@ -999,6 +1110,27 @@ long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
 EXPORT_SYMBOL_GPL(work_on_cpu);
 #endif /* CONFIG_SMP */
 
+/**
+ * workqueue_set_lazy_timeout - set lazy exit timeout
+ * @wq: the associated workqueue_struct
+ * @timeout: timeout in jiffies
+ *
+ * This will set the timeout for a lazy workqueue. If no work has been
+ * processed for @timeout jiffies, then the workqueue is allowed to exit.
+ * It will be dynamically created again when work is queued to it.
+ *
+ * Note that this only works for workqueues created with
+ * create_lazy_workqueue().
+ */
+void workqueue_set_lazy_timeout(struct workqueue_struct *wq,
+				unsigned long timeout)
+{
+	if (WARN_ON(!(wq->flags & WQ_F_LAZY)))
+		return;
+
+	wq->lazy_timeout = timeout;
+}
+
 void __init init_workqueues(void)
 {
 	alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
-- 
1.6.4.173.g3f189


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 3/6] crypto: use lazy workqueues
  2009-08-20 10:17       ` [PATCH 2/6] workqueue: add support for lazy workqueues Jens Axboe
@ 2009-08-20 10:17         ` Jens Axboe
  2009-08-20 10:17           ` [PATCH 3/4] direct-io: add a "IO for kernel" flag to kiocb Jens Axboe
  2009-08-21  0:20         ` [PATCH 2/6] workqueue: add support for " Andrew Morton
  1 sibling, 1 reply; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 crypto/crypto_wq.c |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/crypto/crypto_wq.c b/crypto/crypto_wq.c
index fdcf624..88cccee 100644
--- a/crypto/crypto_wq.c
+++ b/crypto/crypto_wq.c
@@ -20,7 +20,7 @@ EXPORT_SYMBOL_GPL(kcrypto_wq);
 
 static int __init crypto_wq_init(void)
 {
-	kcrypto_wq = create_workqueue("crypto");
+	kcrypto_wq = create_lazy_workqueue("crypto");
 	if (unlikely(!kcrypto_wq))
 		return -ENOMEM;
 	return 0;
-- 
1.6.4.173.g3f189


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 3/4] direct-io: add a "IO for kernel" flag to kiocb
  2009-08-20 10:17         ` [PATCH 3/6] crypto: use " Jens Axboe
@ 2009-08-20 10:17           ` Jens Axboe
  2009-08-20 10:17             ` [PATCH 4/6] libata: use lazy workqueues for the pio task Jens Axboe
  0 siblings, 1 reply; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 fs/direct-io.c      |   12 +++++++++++-
 include/linux/aio.h |    3 +++
 2 files changed, 14 insertions(+), 1 deletions(-)

diff --git a/fs/direct-io.c b/fs/direct-io.c
index 22a945b..0e923f2 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -117,6 +117,7 @@ struct dio {
 	struct kiocb *iocb;		/* kiocb */
 	int is_async;			/* is IO async ? */
 	int io_error;			/* IO error in completion path */
+	int is_kernel;
 	ssize_t result;                 /* IO result */
 };
 
@@ -336,7 +337,7 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
 
 	if (dio->is_async && dio->rw == READ) {
 		bio_check_pages_dirty(bio);	/* transfers ownership */
-	} else {
+	} else if (!dio->is_kernel) {
 		for (page_no = 0; page_no < bio->bi_vcnt; page_no++) {
 			struct page *page = bvec[page_no].bv_page;
 
@@ -345,6 +346,14 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
 			page_cache_release(page);
 		}
 		bio_put(bio);
+	} else {
+		for (page_no = 0; page_no < bio->bi_vcnt; page_no++) {
+			struct page *page = bvec[page_no].bv_page;
+
+			if (!dio->io_error)
+				SetPageUptodate(page);
+		}
+		bio_put(bio);
 	}
 	return uptodate ? 0 : -EIO;
 }
@@ -1103,6 +1112,7 @@ __blockdev_direct_IO(struct kiocb *iocb, struct inode *inode,
 	 */
 	dio->is_async = !is_sync_kiocb(iocb) && !((rw & WRITE) &&
 		(end > i_size_read(inode)));
+	dio->is_kernel = kiocbIsKernel(iocb);
 
 	retval = direct_io_worker(iocb, inode, args, blkbits, get_block, end_io,
 					dio);
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 47f7d93..ce3f34b 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -34,6 +34,7 @@ struct kioctx;
 /* #define KIF_LOCKED		0 */
 #define KIF_KICKED		1
 #define KIF_CANCELLED		2
+#define KIF_KERNEL_PAGES	3
 
 #define kiocbTryLock(iocb)	test_and_set_bit(KIF_LOCKED, &(iocb)->ki_flags)
 #define kiocbTryKick(iocb)	test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags)
@@ -41,6 +42,7 @@ struct kioctx;
 #define kiocbSetLocked(iocb)	set_bit(KIF_LOCKED, &(iocb)->ki_flags)
 #define kiocbSetKicked(iocb)	set_bit(KIF_KICKED, &(iocb)->ki_flags)
 #define kiocbSetCancelled(iocb)	set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+#define kiocbSetKernel(iocb)	set_bit(KIF_KERNEL_PAGES, &(iocb)->ki_flags)
 
 #define kiocbClearLocked(iocb)	clear_bit(KIF_LOCKED, &(iocb)->ki_flags)
 #define kiocbClearKicked(iocb)	clear_bit(KIF_KICKED, &(iocb)->ki_flags)
@@ -49,6 +51,7 @@ struct kioctx;
 #define kiocbIsLocked(iocb)	test_bit(KIF_LOCKED, &(iocb)->ki_flags)
 #define kiocbIsKicked(iocb)	test_bit(KIF_KICKED, &(iocb)->ki_flags)
 #define kiocbIsCancelled(iocb)	test_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+#define kiocbIsKernel(iocb)	test_bit(KIF_KERNEL_PAGES, &(iocb)->ki_flags)
 
 /* is there a better place to document function pointer methods? */
 /**
-- 
1.6.4.53.g3f55e


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 4/6] libata: use lazy workqueues for the pio task
  2009-08-20 10:17           ` [PATCH 3/4] direct-io: add a "IO for kernel" flag to kiocb Jens Axboe
@ 2009-08-20 10:17             ` Jens Axboe
  2009-08-20 10:17               ` [PATCH 4/5] loop: support O_DIRECT transfer mode Jens Axboe
  0 siblings, 1 reply; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 drivers/ata/libata-core.c |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/drivers/ata/libata-core.c b/drivers/ata/libata-core.c
index 072ba5e..35f74c9 100644
--- a/drivers/ata/libata-core.c
+++ b/drivers/ata/libata-core.c
@@ -6580,7 +6580,7 @@ static int __init ata_init(void)
 {
 	ata_parse_force_param();
 
-	ata_wq = create_workqueue("ata");
+	ata_wq = create_lazy_workqueue("ata");
 	if (!ata_wq)
 		goto free_force_tbl;
 
-- 
1.6.4.173.g3f189


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 4/5] loop: support O_DIRECT transfer mode
  2009-08-20 10:17             ` [PATCH 4/6] libata: use lazy workqueues for the pio task Jens Axboe
@ 2009-08-20 10:17               ` Jens Axboe
  2009-08-20 10:17                 ` [PATCH 5/6] aio: use lazy workqueues Jens Axboe
  0 siblings, 1 reply; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Make use of the (now) page array backed O_DIRECT with a special
transfer mode for loop.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 drivers/block/loop.c |  139 +++++++++++++++++++++++++++++++++++++++++++++++++-
 include/linux/loop.h |    1 +
 2 files changed, 139 insertions(+), 1 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 5757188..cf8cf20 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -82,6 +82,9 @@ static DEFINE_MUTEX(loop_devices_mutex);
 
 static int max_part;
 static int part_shift;
+static int odirect = 1;
+
+#define LOOP_MAX_DIRECT_SEGMENTS	32
 
 /*
  * Transfer functions
@@ -201,6 +204,124 @@ lo_do_transfer(struct loop_device *lo, int cmd,
 	return lo->transfer(lo, cmd, rpage, roffs, lpage, loffs, size, rblock);
 }
 
+static int direct_fill_pages(struct bio *bio, struct page **pages)
+{
+	unsigned int i, pidx = 0;
+	struct bio_vec *bvec;
+
+	bio_for_each_segment(bvec, bio, i)
+		pages[pidx++] = bvec->bv_page;
+
+	return pidx;
+}
+
+/*
+ * Use the O_DIRECT IO path for this read
+ */
+static int lo_direct_read(struct loop_device *lo, struct bio *bio, loff_t pos)
+{
+	struct file *file = lo->lo_backing_file;
+	struct address_space *mapping = file->f_mapping;
+	struct inode *inode = mapping->host;
+	struct page *pages[LOOP_MAX_DIRECT_SEGMENTS];
+	struct kiocb iocb;
+	loff_t size;
+	int ret = 0;
+	struct dio_args args = {
+		.rw		= READ,
+		.pages		= pages,
+		.length		= bio->bi_size,
+		.user_addr	= 0,
+		.offset		= pos,
+	};
+
+	init_sync_kiocb(&iocb, file);
+	iocb.ki_pos = pos;
+	iocb.ki_left = bio->bi_size;
+	kiocbSetKernel(&iocb);
+
+	args.nr_segs = direct_fill_pages(bio, pages);
+	args.first_page_off = bio->bi_io_vec[0].bv_offset;
+
+	size = i_size_read(inode);
+	if (pos >= size)
+		return 0;
+
+	ret = filemap_write_and_wait_range(mapping, pos, pos + bio->bi_size-1);
+	if (ret)
+		return ret;
+
+	ret = mapping->a_ops->direct_IO(&iocb, &args);
+	if (ret > 0) {
+		file_accessed(file);
+		ret = 0;
+	}
+
+	return ret;
+}
+
+/*
+ * Use the O_DIRECT IO path for this write
+ */
+static int lo_direct_write(struct loop_device *lo, struct bio *bio, loff_t pos)
+{
+	struct file *file = lo->lo_backing_file;
+	struct address_space *mapping = file->f_mapping;
+	struct inode *inode = mapping->host;
+	struct page *pages[LOOP_MAX_DIRECT_SEGMENTS];
+	struct kiocb iocb;
+	int ret = 0;
+	pgoff_t end;
+	struct dio_args args = {
+		.rw		= WRITE,
+		.pages		= pages,
+		.length		= bio->bi_size,
+		.user_addr	= 0,
+		.offset		= pos,
+	};
+
+	ret = filemap_write_and_wait_range(mapping, pos, pos + bio->bi_size-1);
+	if (ret)
+		goto out;
+
+	end = (pos + bio->bi_size - 1) >> PAGE_CACHE_SHIFT;
+	if (mapping->nrpages) {
+		ret = invalidate_inode_pages2_range(mapping,
+						pos >> PAGE_CACHE_SHIFT, end);
+		if (ret) {
+			if (ret == -EBUSY)
+				ret = 0;
+			goto out;
+		}
+	}
+
+	init_sync_kiocb(&iocb, file);
+	iocb.ki_pos = pos;
+	iocb.ki_left = bio->bi_size;
+	kiocbSetKernel(&iocb);
+
+	args.nr_segs = direct_fill_pages(bio, pages);
+	args.first_page_off = bio->bi_io_vec[0].bv_offset;
+
+	ret = mapping->a_ops->direct_IO(&iocb, &args);
+
+	if (mapping->nrpages) {
+		invalidate_inode_pages2_range(mapping,
+					pos >> PAGE_CACHE_SHIFT, end);
+	}
+
+	if (ret > 0) {
+		loff_t end = pos + ret;
+		if (end > i_size_read(inode) && !S_ISBLK(inode->i_mode)) {
+			i_size_write(inode, end);
+			mark_inode_dirty(inode);
+		}
+		ret = 0;
+	}
+out:
+	return ret;
+}
+
 /**
  * do_lo_send_aops - helper for writing data to a loop device
  *
@@ -347,6 +468,9 @@ static int lo_send(struct loop_device *lo, struct bio *bio, loff_t pos)
 	struct page *page = NULL;
 	int i, ret = 0;
 
+	if (lo->lo_flags & LO_FLAGS_ODIRECT)
+		return lo_direct_write(lo, bio, pos);
+
 	do_lo_send = do_lo_send_aops;
 	if (!(lo->lo_flags & LO_FLAGS_USE_AOPS)) {
 		do_lo_send = do_lo_send_direct_write;
@@ -458,6 +582,9 @@ lo_receive(struct loop_device *lo, struct bio *bio, int bsize, loff_t pos)
 	struct bio_vec *bvec;
 	int i, ret = 0;
 
+	if (lo->lo_flags & LO_FLAGS_ODIRECT)
+		return lo_direct_read(lo, bio, pos);
+
 	bio_for_each_segment(bvec, bio, i) {
 		ret = do_lo_receive(lo, bvec, bsize, pos);
 		if (ret < 0)
@@ -784,7 +911,11 @@ static int loop_set_fd(struct loop_device *lo, fmode_t mode,
 	if (S_ISREG(inode->i_mode) || S_ISBLK(inode->i_mode)) {
 		const struct address_space_operations *aops = mapping->a_ops;
 
-		if (aops->write_begin)
+		if (odirect) {
+			if (!aops->direct_IO)
+				goto out_putf;
+			lo_flags |= LO_FLAGS_ODIRECT;
+		} else if (aops->write_begin)
 			lo_flags |= LO_FLAGS_USE_AOPS;
 		if (!(lo_flags & LO_FLAGS_USE_AOPS) && !file->f_op->write)
 			lo_flags |= LO_FLAGS_READ_ONLY;
@@ -831,6 +962,10 @@ static int loop_set_fd(struct loop_device *lo, fmode_t mode,
 
 	if (!(lo_flags & LO_FLAGS_READ_ONLY) && file->f_op->fsync)
 		blk_queue_ordered(lo->lo_queue, QUEUE_ORDERED_DRAIN, NULL);
+	if (odirect) {
+		blk_queue_max_phys_segments(lo->lo_queue, LOOP_MAX_DIRECT_SEGMENTS);
+		blk_queue_max_hw_segments(lo->lo_queue, LOOP_MAX_DIRECT_SEGMENTS);
+	}
 
 	set_capacity(lo->lo_disk, size);
 	bd_set_size(bdev, size << 9);
@@ -1456,6 +1591,8 @@ module_param(max_loop, int, 0);
 MODULE_PARM_DESC(max_loop, "Maximum number of loop devices");
 module_param(max_part, int, 0);
 MODULE_PARM_DESC(max_part, "Maximum number of partitions per loop device");
+module_param(odirect, int, 0);
+MODULE_PARM_DESC(odirect, "Use O_DIRECT for IO");
 MODULE_LICENSE("GPL");
 MODULE_ALIAS_BLOCKDEV_MAJOR(LOOP_MAJOR);
 
diff --git a/include/linux/loop.h b/include/linux/loop.h
index 66c194e..a7433c0 100644
--- a/include/linux/loop.h
+++ b/include/linux/loop.h
@@ -76,6 +76,7 @@ enum {
 	LO_FLAGS_READ_ONLY	= 1,
 	LO_FLAGS_USE_AOPS	= 2,
 	LO_FLAGS_AUTOCLEAR	= 4,
+	LO_FLAGS_ODIRECT	= 8,
 };
 
 #include <asm/posix_types.h>	/* for __kernel_old_dev_t */
-- 
1.6.4.53.g3f55e


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 5/6] aio: use lazy workqueues
  2009-08-20 10:17               ` [PATCH 4/5] loop: support O_DIRECT transfer mode Jens Axboe
@ 2009-08-20 10:17                 ` Jens Axboe
  2009-08-20 10:17                   ` [PATCH 4/4] direct-io: get rid of irq flag saving where it isn't needed Jens Axboe
  0 siblings, 1 reply; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 fs/aio.c |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index d065b2c..4103b59 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -72,7 +72,7 @@ static int __init aio_setup(void)
 	kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
 	kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
 
-	aio_wq = create_workqueue("aio");
+	aio_wq = create_lazy_workqueue("aio");
 
 	pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
 
-- 
1.6.4.173.g3f189


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 4/4] direct-io: get rid of irq flag saving where it isn't needed
  2009-08-20 10:17                 ` [PATCH 5/6] aio: use lazy workqueues Jens Axboe
@ 2009-08-20 10:17                   ` Jens Axboe
  2009-08-20 10:17                     ` [PATCH 6/6] sunrpc: use lazy workqueues Jens Axboe
  0 siblings, 1 reply; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

We use the flags saving variant of the spin lock functions everywhere
in fs/direct-io.c, even in places where we otherwise block. Get rid
of that except for the end_io path, which may indeed be called with
irqs disabled.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 fs/direct-io.c |   24 ++++++++++--------------
 1 files changed, 10 insertions(+), 14 deletions(-)

diff --git a/fs/direct-io.c b/fs/direct-io.c
index 0e923f2..2f73593 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -259,13 +259,12 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
 static void dio_bio_submit(struct dio *dio)
 {
 	struct bio *bio = dio->bio;
-	unsigned long flags;
 
 	bio->bi_private = dio;
 
-	spin_lock_irqsave(&dio->bio_lock, flags);
+	spin_lock_irq(&dio->bio_lock);
 	dio->refcount++;
-	spin_unlock_irqrestore(&dio->bio_lock, flags);
+	spin_unlock_irq(&dio->bio_lock);
 
 	if (dio->is_async && dio->rw == READ)
 		bio_set_pages_dirty(bio);
@@ -295,10 +294,9 @@ static void dio_cleanup(struct dio *dio)
  */
 static struct bio *dio_await_one(struct dio *dio)
 {
-	unsigned long flags;
 	struct bio *bio = NULL;
 
-	spin_lock_irqsave(&dio->bio_lock, flags);
+	spin_lock_irq(&dio->bio_lock);
 
 	/*
 	 * Wait as long as the list is empty and there are bios in flight.  bio
@@ -309,17 +307,17 @@ static struct bio *dio_await_one(struct dio *dio)
 	while (dio->refcount > 1 && dio->bio_list == NULL) {
 		__set_current_state(TASK_UNINTERRUPTIBLE);
 		dio->waiter = current;
-		spin_unlock_irqrestore(&dio->bio_lock, flags);
+		spin_unlock_irq(&dio->bio_lock);
 		io_schedule();
 		/* wake up sets us TASK_RUNNING */
-		spin_lock_irqsave(&dio->bio_lock, flags);
+		spin_lock_irq(&dio->bio_lock);
 		dio->waiter = NULL;
 	}
 	if (dio->bio_list) {
 		bio = dio->bio_list;
 		dio->bio_list = bio->bi_private;
 	}
-	spin_unlock_irqrestore(&dio->bio_lock, flags);
+	spin_unlock_irq(&dio->bio_lock);
 	return bio;
 }
 
@@ -388,14 +386,13 @@ static int dio_bio_reap(struct dio *dio)
 
 	if (dio->reap_counter++ >= 64) {
 		while (dio->bio_list) {
-			unsigned long flags;
 			struct bio *bio;
 			int ret2;
 
-			spin_lock_irqsave(&dio->bio_lock, flags);
+			spin_lock_irq(&dio->bio_lock);
 			bio = dio->bio_list;
 			dio->bio_list = bio->bi_private;
-			spin_unlock_irqrestore(&dio->bio_lock, flags);
+			spin_unlock_irq(&dio->bio_lock);
 			ret2 = dio_bio_complete(dio, bio);
 			if (ret == 0)
 				ret = ret2;
@@ -870,7 +867,6 @@ direct_io_worker(struct kiocb *iocb, struct inode *inode,
 	struct dio_args *args, unsigned blkbits, get_block_t get_block,
 	dio_iodone_t end_io, struct dio *dio)
 {
-	unsigned long flags;
 	int rw = args->rw;
 	ssize_t ret = 0;
 	ssize_t ret2;
@@ -984,9 +980,9 @@ direct_io_worker(struct kiocb *iocb, struct inode *inode,
 	 * completion paths can drop their ref and use the remaining count to
 	 * decide to wake the submission path atomically.
 	 */
-	spin_lock_irqsave(&dio->bio_lock, flags);
+	spin_lock_irq(&dio->bio_lock);
 	ret2 = --dio->refcount;
-	spin_unlock_irqrestore(&dio->bio_lock, flags);
+	spin_unlock_irq(&dio->bio_lock);
 
 	if (ret2 == 0) {
 		ret = dio_complete(dio, args->offset, ret);
-- 
1.6.4.53.g3f55e


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 6/6] sunrpc: use lazy workqueues
  2009-08-20 10:17                   ` [PATCH 4/4] direct-io: get rid of irq flag saving where it isn't needed Jens Axboe
@ 2009-08-20 10:17                     ` Jens Axboe
  0 siblings, 0 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-20 10:17 UTC (permalink / raw)
  To: linux-kernel; +Cc: jeff, benh, htejun, bzolnier, alan, Jens Axboe

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 net/sunrpc/sched.c |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 8f459ab..ce99fe2 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -970,7 +970,7 @@ static int rpciod_start(void)
 	 * Create the rpciod thread and wait for it to start.
 	 */
 	dprintk("RPC:       creating workqueue rpciod\n");
-	wq = create_workqueue("rpciod");
+	wq = create_lazy_workqueue("rpciod");
 	rpciod_workqueue = wq;
 	return rpciod_workqueue != NULL;
 }
-- 
1.6.4.173.g3f189


^ permalink raw reply related	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/4] direct-io: make O_DIRECT IO path be page based
  2009-08-20 10:17     ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Jens Axboe
  2009-08-20 10:17       ` [PATCH 2/6] workqueue: add support for lazy workqueues Jens Axboe
@ 2009-08-20 13:19       ` Trond Myklebust
  2009-08-24  7:58         ` Jens Axboe
  2009-08-21  0:12       ` Andrew Morton
  2 siblings, 1 reply; 21+ messages in thread
From: Trond Myklebust @ 2009-08-20 13:19 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-kernel, jeff, benh, htejun, bzolnier, alan

On Thu, 2009-08-20 at 12:17 +0200, Jens Axboe wrote:
> Currently we pass in the iovec array and let the O_DIRECT core
> handle the get_user_pages() business. This work, but it means that
> we can ever only use user pages for O_DIRECT.
> 
> Switch the aops->direct_IO() and below code to use page arrays
> instead, so that it doesn't make any assumptions about who the pages
> belong to. This works directly for all users but NFS, which just
> uses the same helper that the generic mapping read/write functions
> also call.
> 
> Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
> ---
>  static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
> -						 const struct iovec *iov,
> -						 loff_t pos, int sync)
> +						 struct dio_args *args,
> +						 int sync)
>  {
>  	struct nfs_open_context *ctx = dreq->ctx;
>  	struct inode *inode = ctx->path.dentry->d_inode;
> -	unsigned long user_addr = (unsigned long)iov->iov_base;
> -	size_t count = iov->iov_len;
> +	unsigned long user_addr = args->user_addr;
> +	size_t count = args->length;
>  	struct rpc_task *task;
>  	struct rpc_message msg = {
>  		.rpc_cred = ctx->cred,
> @@ -726,24 +702,8 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
>  		if (unlikely(!data))
>  			break;
>  
> -		down_read(&current->mm->mmap_sem);
> -		result = get_user_pages(current, current->mm, user_addr,
> -					data->npages, 0, 0, data->pagevec, NULL);
> -		up_read(&current->mm->mmap_sem);
> -		if (result < 0) {
> -			nfs_writedata_free(data);
> -			break;
> -		}
> -		if ((unsigned)result < data->npages) {
> -			bytes = result * PAGE_SIZE;
> -			if (bytes <= pgbase) {
> -				nfs_direct_release_pages(data->pagevec, result);
> -				nfs_writedata_free(data);
> -				break;
> -			}
> -			bytes -= pgbase;
> -			data->npages = result;
> -		}
> +		data->pagevec = args->pages;
> +		data->npages = args->nr_segs;
>  
>  		get_dreq(dreq);
>  

This looks a bit odd. What guarantees that args->pages contain <= wsize
bytes? The server will not accept larger segments in a single RPC call.

Cheers
  Trond


^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure
  2009-08-20 10:17 ` [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure Jens Axboe
  2009-08-20 10:17   ` [PATCH 1/6] workqueue: replace singlethread/freezable/rt parameters and variables with flags Jens Axboe
@ 2009-08-21  0:03   ` Andrew Morton
  2009-08-24  7:58     ` Jens Axboe
  1 sibling, 1 reply; 21+ messages in thread
From: Andrew Morton @ 2009-08-21  0:03 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-kernel, jeff, benh, htejun, bzolnier, alan, jens.axboe

On Thu, 20 Aug 2009 12:17:36 +0200
Jens Axboe <jens.axboe@oracle.com> wrote:

> The O_DIRECT IO path is a mess of arguments. Clean that up by passing
> those arguments in a dedicated dio_args structure.
> 
> This is in preparation for changing the internal implementation to be
> page based instead of using iovecs.
> 
> ...
>
> +/*
> + * Arguments passwed to aops->direct_IO()

cnat tpye

> + */
> +struct dio_args {
> +	int rw;
> +	const struct iovec *iov;
> +	unsigned long length;
> +	loff_t offset;
> +	unsigned long nr_segs;
> +};

It would be nice to have some documentation.  Especially for that
pestiferous `rw' thing.

nr_segs is associated with the iovec and might as well be contiguous
with it in the struct.



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/4] direct-io: make O_DIRECT IO path be page based
  2009-08-20 10:17     ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Jens Axboe
  2009-08-20 10:17       ` [PATCH 2/6] workqueue: add support for lazy workqueues Jens Axboe
  2009-08-20 13:19       ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Trond Myklebust
@ 2009-08-21  0:12       ` Andrew Morton
  2009-08-24  8:00         ` Jens Axboe
  2 siblings, 1 reply; 21+ messages in thread
From: Andrew Morton @ 2009-08-21  0:12 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-kernel, jeff, benh, htejun, bzolnier, alan, jens.axboe

On Thu, 20 Aug 2009 12:17:38 +0200
Jens Axboe <jens.axboe@oracle.com> wrote:

> Currently we pass in the iovec array and let the O_DIRECT core
> handle the get_user_pages() business. This work, but it means that
> we can ever only use user pages for O_DIRECT.
> 
> Switch the aops->direct_IO() and below code to use page arrays
> instead, so that it doesn't make any assumptions about who the pages
> belong to. This works directly for all users but NFS, which just
> uses the same helper that the generic mapping read/write functions
> also call.
>

This:

> +	struct page **pages;		/* page buffer */
> +	unsigned int head_page;		/* next page to process */
> +	unsigned int total_pages;	/* last valid page + 1 */
> +	unsigned int first_page_off;	/* offset into first page in map */

Is a disaster waiting to happen.

>  static struct page *dio_get_page(struct dio *dio)
>  {
> -	if (dio_pages_present(dio) == 0) {
> -		int ret;
> +	if (dio->head_page < dio->total_pages)
> +		return dio->pages[dio->head_page++];
>  
> -		ret = dio_refill_pages(dio);
> -		if (ret)
> -			return ERR_PTR(ret);
> -		BUG_ON(dio_pages_present(dio) == 0);
> -	}
> -	return dio->pages[dio->head++];
> +	return NULL;
>  }
>
> ...
>
> @@ -351,8 +280,10 @@ static void dio_bio_submit(struct dio *dio)
>   */
>  static void dio_cleanup(struct dio *dio)
>  {
> -	while (dio_pages_present(dio))
> -		page_cache_release(dio_get_page(dio));
> +	struct page *page;
> +
> +	while ((page = dio_get_page(dio)) != NULL)
> +		page_cache_release(page);
>  }

What is the value of dio->head_page here?

How are callers to use this function at the start of the operation, at
the end and partway through is IO error, EFAULT etc is detected?

There's good potential for double-releases, leaks etc here.  We need to
be very careful and explicit in documenting the usage and state and
meaning of head_page, and the protocol around managing it.

It's just like that ghastly bio.bi_idx thing - how many bugs did we add
and have to fix due to that thing?

Really I think it'd be better to just nuke .head_page altogether and get
callers to manage their index openly.


^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/6] workqueue: add support for lazy workqueues
  2009-08-20 10:17       ` [PATCH 2/6] workqueue: add support for lazy workqueues Jens Axboe
  2009-08-20 10:17         ` [PATCH 3/6] crypto: use " Jens Axboe
@ 2009-08-21  0:20         ` Andrew Morton
  2009-08-24  8:06           ` Jens Axboe
  1 sibling, 1 reply; 21+ messages in thread
From: Andrew Morton @ 2009-08-21  0:20 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-kernel, jeff, benh, htejun, bzolnier, alan, jens.axboe

On Thu, 20 Aug 2009 12:17:39 +0200
Jens Axboe <jens.axboe@oracle.com> wrote:

> Lazy workqueues are like normal workqueues, except they don't
> start a thread per CPU by default. Instead threads are started
> when they are needed, and exit when they have been idle for
> some time.
> 
>
> ...
>
> @@ -280,7 +309,34 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
>  		trace_workqueue_execution(cwq->thread, work);
>  		cwq->current_work = work;
>  		list_del_init(cwq->worklist.next);
> +		cpu = smp_processor_id();
>  		spin_unlock_irq(&cwq->lock);
> +		did_work = 1;
> +
> +		/*
> +		 * If work->cpu isn't us, then we need to create the target
> +		 * workqueue thread (if someone didn't already do that) and
> +		 * move the work over there.
> +		 */
> +		if ((cwq->wq->flags & WQ_F_LAZY) && work->cpu != cpu) {
> +			struct cpu_workqueue_struct *__cwq;
> +			struct task_struct *p;
> +			int err;
> +
> +			__cwq = wq_per_cpu(cwq->wq, work->cpu);
> +			p = __cwq->thread;
> +			if (!p)
> +				err = create_workqueue_thread(__cwq, work->cpu);
> +			p = __cwq->thread;
> +			if (p) {
> +				if (work->cpu >= 0)

It's an unsigned int.  This test is always true.

> +					kthread_bind(p, work->cpu);

I wonder what happens if work->cpu isn't online any more.

> +				insert_work(__cwq, work, &__cwq->worklist);
> +				wake_up_process(p);
> +				goto out;
> +			}
> +		}
> +
>  
>  		BUG_ON(get_wq_data(work) != cwq);
>  		work_clear_pending(work);
>
> ...
>

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure
  2009-08-21  0:03   ` [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure Andrew Morton
@ 2009-08-24  7:58     ` Jens Axboe
  0 siblings, 0 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-24  7:58 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, jeff, benh, htejun, bzolnier, alan

On Thu, Aug 20 2009, Andrew Morton wrote:
> On Thu, 20 Aug 2009 12:17:36 +0200
> Jens Axboe <jens.axboe@oracle.com> wrote:
> 
> > The O_DIRECT IO path is a mess of arguments. Clean that up by passing
> > those arguments in a dedicated dio_args structure.
> > 
> > This is in preparation for changing the internal implementation to be
> > page based instead of using iovecs.
> > 
> > ...
> >
> > +/*
> > + * Arguments passwed to aops->direct_IO()
> 
> cnat tpye

Ineedd, thanks.

> > + */
> > +struct dio_args {
> > +	int rw;
> > +	const struct iovec *iov;
> > +	unsigned long length;
> > +	loff_t offset;
> > +	unsigned long nr_segs;
> > +};
> 
> It would be nice to have some documentation.  Especially for that
> pestiferous `rw' thing.
> 
> nr_segs is associated with the iovec and might as well be contiguous
> with it in the struct.

Agree on both accounts, I'll add some comment above/inside that struct.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/4] direct-io: make O_DIRECT IO path be page based
  2009-08-20 13:19       ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Trond Myklebust
@ 2009-08-24  7:58         ` Jens Axboe
  0 siblings, 0 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-24  7:58 UTC (permalink / raw)
  To: Trond Myklebust; +Cc: linux-kernel, jeff, benh, htejun, bzolnier, alan

On Thu, Aug 20 2009, Trond Myklebust wrote:
> On Thu, 2009-08-20 at 12:17 +0200, Jens Axboe wrote:
> > Currently we pass in the iovec array and let the O_DIRECT core
> > handle the get_user_pages() business. This work, but it means that
> > we can ever only use user pages for O_DIRECT.
> > 
> > Switch the aops->direct_IO() and below code to use page arrays
> > instead, so that it doesn't make any assumptions about who the pages
> > belong to. This works directly for all users but NFS, which just
> > uses the same helper that the generic mapping read/write functions
> > also call.
> > 
> > Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
> > ---
> >  static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
> > -						 const struct iovec *iov,
> > -						 loff_t pos, int sync)
> > +						 struct dio_args *args,
> > +						 int sync)
> >  {
> >  	struct nfs_open_context *ctx = dreq->ctx;
> >  	struct inode *inode = ctx->path.dentry->d_inode;
> > -	unsigned long user_addr = (unsigned long)iov->iov_base;
> > -	size_t count = iov->iov_len;
> > +	unsigned long user_addr = args->user_addr;
> > +	size_t count = args->length;
> >  	struct rpc_task *task;
> >  	struct rpc_message msg = {
> >  		.rpc_cred = ctx->cred,
> > @@ -726,24 +702,8 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq,
> >  		if (unlikely(!data))
> >  			break;
> >  
> > -		down_read(&current->mm->mmap_sem);
> > -		result = get_user_pages(current, current->mm, user_addr,
> > -					data->npages, 0, 0, data->pagevec, NULL);
> > -		up_read(&current->mm->mmap_sem);
> > -		if (result < 0) {
> > -			nfs_writedata_free(data);
> > -			break;
> > -		}
> > -		if ((unsigned)result < data->npages) {
> > -			bytes = result * PAGE_SIZE;
> > -			if (bytes <= pgbase) {
> > -				nfs_direct_release_pages(data->pagevec, result);
> > -				nfs_writedata_free(data);
> > -				break;
> > -			}
> > -			bytes -= pgbase;
> > -			data->npages = result;
> > -		}
> > +		data->pagevec = args->pages;
> > +		data->npages = args->nr_segs;
> >  
> >  		get_dreq(dreq);
> >  
> 
> This looks a bit odd. What guarantees that args->pages contain <= wsize
> bytes? The server will not accept larger segments in a single RPC call.

Nothing, thanks for the info. The NFS bits are still very much untested,
I'll post an update soon.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/4] direct-io: make O_DIRECT IO path be page based
  2009-08-21  0:12       ` Andrew Morton
@ 2009-08-24  8:00         ` Jens Axboe
  0 siblings, 0 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-24  8:00 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, jeff, benh, htejun, bzolnier, alan

On Thu, Aug 20 2009, Andrew Morton wrote:
> On Thu, 20 Aug 2009 12:17:38 +0200
> Jens Axboe <jens.axboe@oracle.com> wrote:
> 
> > Currently we pass in the iovec array and let the O_DIRECT core
> > handle the get_user_pages() business. This work, but it means that
> > we can ever only use user pages for O_DIRECT.
> > 
> > Switch the aops->direct_IO() and below code to use page arrays
> > instead, so that it doesn't make any assumptions about who the pages
> > belong to. This works directly for all users but NFS, which just
> > uses the same helper that the generic mapping read/write functions
> > also call.
> >
> 
> This:
> 
> > +	struct page **pages;		/* page buffer */
> > +	unsigned int head_page;		/* next page to process */
> > +	unsigned int total_pages;	/* last valid page + 1 */
> > +	unsigned int first_page_off;	/* offset into first page in map */
> 
> Is a disaster waiting to happen.

It was a disaster already, there's actually not a lot of change. So I do
agree that that the hole page/cur_page/head_page needs cleaning up.

> >  static struct page *dio_get_page(struct dio *dio)
> >  {
> > -	if (dio_pages_present(dio) == 0) {
> > -		int ret;
> > +	if (dio->head_page < dio->total_pages)
> > +		return dio->pages[dio->head_page++];
> >  
> > -		ret = dio_refill_pages(dio);
> > -		if (ret)
> > -			return ERR_PTR(ret);
> > -		BUG_ON(dio_pages_present(dio) == 0);
> > -	}
> > -	return dio->pages[dio->head++];
> > +	return NULL;
> >  }
> >
> > ...
> >
> > @@ -351,8 +280,10 @@ static void dio_bio_submit(struct dio *dio)
> >   */
> >  static void dio_cleanup(struct dio *dio)
> >  {
> > -	while (dio_pages_present(dio))
> > -		page_cache_release(dio_get_page(dio));
> > +	struct page *page;
> > +
> > +	while ((page = dio_get_page(dio)) != NULL)
> > +		page_cache_release(page);
> >  }
> 
> What is the value of dio->head_page here?
> 
> How are callers to use this function at the start of the operation, at
> the end and partway through is IO error, EFAULT etc is detected?
> 
> There's good potential for double-releases, leaks etc here.  We need to
> be very careful and explicit in documenting the usage and state and
> meaning of head_page, and the protocol around managing it.
> 
> It's just like that ghastly bio.bi_idx thing - how many bugs did we add
> and have to fix due to that thing?
> 
> Really I think it'd be better to just nuke .head_page altogether and get
> callers to manage their index openly.

Completely agree, I'll attempt to sanitize it. Was postponed due to the
messy nature of it, but with that done it should be easier to verify
that it does the right thing.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/6] workqueue: add support for lazy workqueues
  2009-08-21  0:20         ` [PATCH 2/6] workqueue: add support for " Andrew Morton
@ 2009-08-24  8:06           ` Jens Axboe
  0 siblings, 0 replies; 21+ messages in thread
From: Jens Axboe @ 2009-08-24  8:06 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, jeff, benh, htejun, bzolnier, alan

On Thu, Aug 20 2009, Andrew Morton wrote:
> On Thu, 20 Aug 2009 12:17:39 +0200
> Jens Axboe <jens.axboe@oracle.com> wrote:
> 
> > Lazy workqueues are like normal workqueues, except they don't
> > start a thread per CPU by default. Instead threads are started
> > when they are needed, and exit when they have been idle for
> > some time.
> > 
> >
> > ...
> >
> > @@ -280,7 +309,34 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
> >  		trace_workqueue_execution(cwq->thread, work);
> >  		cwq->current_work = work;
> >  		list_del_init(cwq->worklist.next);
> > +		cpu = smp_processor_id();
> >  		spin_unlock_irq(&cwq->lock);
> > +		did_work = 1;
> > +
> > +		/*
> > +		 * If work->cpu isn't us, then we need to create the target
> > +		 * workqueue thread (if someone didn't already do that) and
> > +		 * move the work over there.
> > +		 */
> > +		if ((cwq->wq->flags & WQ_F_LAZY) && work->cpu != cpu) {
> > +			struct cpu_workqueue_struct *__cwq;
> > +			struct task_struct *p;
> > +			int err;
> > +
> > +			__cwq = wq_per_cpu(cwq->wq, work->cpu);
> > +			p = __cwq->thread;
> > +			if (!p)
> > +				err = create_workqueue_thread(__cwq, work->cpu);
> > +			p = __cwq->thread;
> > +			if (p) {
> > +				if (work->cpu >= 0)
> 
> It's an unsigned int.  This test is always true.
> 
> > +					kthread_bind(p, work->cpu);
> 
> I wonder what happens if work->cpu isn't online any more.

That's a good question. The workqueue "documentation" states that it is
the callers responsibility to ensure that the CPU stays online, but I
think that requirement is pretty much ignored. Probably since it'd be
costly to do.

So that bits needs looking into.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2009-08-24  8:06 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2009-08-20 10:17 [PATCH 0/6] Lazy workqueues Jens Axboe
2009-08-20 10:17 ` [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure Jens Axboe
2009-08-20 10:17   ` [PATCH 1/6] workqueue: replace singlethread/freezable/rt parameters and variables with flags Jens Axboe
2009-08-20 10:17     ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Jens Axboe
2009-08-20 10:17       ` [PATCH 2/6] workqueue: add support for lazy workqueues Jens Axboe
2009-08-20 10:17         ` [PATCH 3/6] crypto: use " Jens Axboe
2009-08-20 10:17           ` [PATCH 3/4] direct-io: add a "IO for kernel" flag to kiocb Jens Axboe
2009-08-20 10:17             ` [PATCH 4/6] libata: use lazy workqueues for the pio task Jens Axboe
2009-08-20 10:17               ` [PATCH 4/5] loop: support O_DIRECT transfer mode Jens Axboe
2009-08-20 10:17                 ` [PATCH 5/6] aio: use lazy workqueues Jens Axboe
2009-08-20 10:17                   ` [PATCH 4/4] direct-io: get rid of irq flag saving where it isn't needed Jens Axboe
2009-08-20 10:17                     ` [PATCH 6/6] sunrpc: use lazy workqueues Jens Axboe
2009-08-21  0:20         ` [PATCH 2/6] workqueue: add support for " Andrew Morton
2009-08-24  8:06           ` Jens Axboe
2009-08-20 13:19       ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Trond Myklebust
2009-08-24  7:58         ` Jens Axboe
2009-08-21  0:12       ` Andrew Morton
2009-08-24  8:00         ` Jens Axboe
2009-08-21  0:03   ` [PATCH 1/4] direct-io: unify argument passing by adding a dio_args structure Andrew Morton
2009-08-24  7:58     ` Jens Axboe
  -- strict thread matches above, loose matches on Subject: below --
2009-08-18  8:34 [PATCH 0/4] Page based O_DIRECT v2 Jens Axboe
2009-08-18  8:34 ` [PATCH 2/4] direct-io: make O_DIRECT IO path be page based Jens Axboe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).