CEPH filesystem development
 help / color / mirror / Atom feed
From: Josh Durgin <josh.durgin@inktank.com>
To: Alex Elder <elder@inktank.com>
Cc: ceph-devel@vger.kernel.org
Subject: Re: [PATCH, resend] rbd: simplify rbd_rq_fn()
Date: Mon, 29 Oct 2012 13:29:50 -0700	[thread overview]
Message-ID: <508EE73E.4090101@inktank.com> (raw)
In-Reply-To: <508B1242.1090808@inktank.com>

This is much easier to read now. It might be useful
to add messages for the different failure cases in
bio_chain_clone_range later.

Reviewed-by: Josh Durgin <josh.durgin@inktank.com>

On 10/26/2012 03:44 PM, Alex Elder wrote:
> When processing a request, rbd_rq_fn() makes clones of the bio's in
> the request's bio chain and submits the results to osd's to be
> satisfied.  If a request bio straddles the boundary between objects
> backing the rbd image, it must be represented by two cloned bio's,
> one for the first part (at the end of one object) and one for the
> second (at the beginning of the next object).
>
> This has been handled by a function bio_chain_clone(), which
> includes an interface only a mother could love, and which has
> been found to have other problems.
>
> This patch defines two new fairly generic bio functions (one which
> replaces bio_chain_clone()) to help out the situation, and then
> revises rbd_rq_fn() to make use of them.
>
> First, bio_clone_range() clones a portion of a single bio, starting
> at a given offset within the bio and including only as many bytes
> as requested.  As a convenience, a request to clone the entire bio
> is passed directly to bio_clone().
>
> Second, bio_chain_clone_range() performs a similar function,
> producing a chain of cloned bio's covering a sub-range of the
> source chain.  No bio_pair structures are used, and if successful
> the result will represent exactly the specified range.
>
> Using bio_chain_clone_range() makes bio_rq_fn() a little easier
> to understand, because it avoids the need to pass very much
> state information between consecutive calls.  By avoiding the need
> to track a bio_pair structure, it also eliminates the problem
> described here:  http://tracker.newdream.net/issues/2933
>
> Note that a block request (and therefore the complete length of
> a bio chain processed in rbd_rq_fn()) is an unsigned int, while
> the result of rbd_segment_length() is u64.  This change makes
> this range trunctation explicit, and trips a bug if the the
> segment boundary is too far off.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
>   drivers/block/rbd.c |  231
> +++++++++++++++++++++++++++++++++------------------
>   1 file changed, 152 insertions(+), 79 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index c800047..cc06c55 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -826,77 +826,144 @@ static void zero_bio_chain(struct bio *chain, int
> start_ofs)
>   }
>
>   /*
> - * bio_chain_clone - clone a chain of bios up to a certain length.
> - * might return a bio_pair that will need to be released.
> + * Clone a portion of a bio, starting at the given byte offset
> + * and continuing for the number of bytes indicated.
>    */
> -static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
> -				   struct bio_pair **bp,
> -				   int len, gfp_t gfpmask)
> -{
> -	struct bio *old_chain = *old;
> -	struct bio *new_chain = NULL;
> -	struct bio *tail;
> -	int total = 0;
> -
> -	if (*bp) {
> -		bio_pair_release(*bp);
> -		*bp = NULL;
> -	}
> +static struct bio *bio_clone_range(struct bio *bio_src,
> +					unsigned int offset,
> +					unsigned int len,
> +					gfp_t gfpmask)
> +{
> +	struct bio_vec *bv;
> +	unsigned int resid;
> +	unsigned short idx;
> +	unsigned int voff;
> +	unsigned short end_idx;
> +	unsigned short vcnt;
> +	struct bio *bio;
>
> -	while (old_chain && (total < len)) {
> -		struct bio *tmp;
> +	/* Handle the easy case for the caller */
>
> -		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
> -		if (!tmp)
> -			goto err_out;
> -		gfpmask &= ~__GFP_WAIT;	/* can't wait after the first */
> +	if (!offset && len == bio_src->bi_size)
> +		return bio_clone(bio_src, gfpmask);
>
> -		if (total + old_chain->bi_size > len) {
> -			struct bio_pair *bp;
> +	if (WARN_ON_ONCE(!len))
> +		return NULL;
> +	if (WARN_ON_ONCE(len > bio_src->bi_size))
> +		return NULL;
> +	if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
> +		return NULL;
>
> -			/*
> -			 * this split can only happen with a single paged bio,
> -			 * split_bio will BUG_ON if this is not the case
> -			 */
> -			dout("bio_chain_clone split! total=%d remaining=%d"
> -			     "bi_size=%u\n",
> -			     total, len - total, old_chain->bi_size);
> +	/* Find first affected segment... */
>
> -			/* split the bio. We'll release it either in the next
> -			   call, or it will have to be released outside */
> -			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
> -			if (!bp)
> -				goto err_out;
> +	resid = offset;
> +	__bio_for_each_segment(bv, bio_src, idx, 0) {
> +		if (resid < bv->bv_len)
> +			break;
> +		resid -= bv->bv_len;
> +	}
> +	voff = resid;
>
> -			__bio_clone(tmp, &bp->bio1);
> +	/* ...and the last affected segment */
>
> -			*next = &bp->bio2;
> -		} else {
> -			__bio_clone(tmp, old_chain);
> -			*next = old_chain->bi_next;
> -		}
> +	resid += len;
> +	__bio_for_each_segment(bv, bio_src, end_idx, idx) {
> +		if (resid <= bv->bv_len)
> +			break;
> +		resid -= bv->bv_len;
> +	}
> +	vcnt = end_idx - idx + 1;
> +
> +	/* Build the clone */
> +
> +	bio = bio_alloc(gfpmask, (unsigned int) vcnt);
> +	if (!bio)
> +		return NULL;	/* ENOMEM */
>
> -		tmp->bi_bdev = NULL;
> -		tmp->bi_next = NULL;
> -		if (new_chain)
> -			tail->bi_next = tmp;
> -		else
> -			new_chain = tmp;
> -		tail = tmp;
> -		old_chain = old_chain->bi_next;
> +	bio->bi_bdev = bio_src->bi_bdev;
> +	bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
> +	bio->bi_rw = bio_src->bi_rw;
> +	bio->bi_flags |= 1 << BIO_CLONED;
>
> -		total += tmp->bi_size;
> +	/*
> +	 * Copy over our part of the bio_vec, then update the first
> +	 * and last (or only) entries.
> +	 */
> +	memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
> +			vcnt * sizeof (struct bio_vec));
> +	bio->bi_io_vec[0].bv_offset += voff;
> +	if (vcnt > 1) {
> +		bio->bi_io_vec[0].bv_len -= voff;
> +		bio->bi_io_vec[vcnt - 1].bv_len = resid;
> +	} else {
> +		bio->bi_io_vec[0].bv_len = len;
>   	}
>
> -	rbd_assert(total == len);
> +	bio->bi_vcnt = vcnt;
> +	bio->bi_size = len;
> +	bio->bi_idx = 0;
> +
> +	return bio;
> +}
> +
> +/*
> + * Clone a portion of a bio chain, starting at the given byte offset
> + * into the first bio in the source chain and continuing for the
> + * number of bytes indicated.  The result is another bio chain of
> + * exactly the given length, or a null pointer on error.
> + *
> + * The bio_src and offset parameters are both in-out.  On entry they
> + * refer to the first source bio and the offset into that bio where
> + * the start of data to be cloned is located.
> + *
> + * On return, bio_src is updated to refer to the bio in the source
> + * chain that contains first un-cloned byte, and *offset will
> + * contain the offset of that byte within that bio.
> + */
> +static struct bio *bio_chain_clone_range(struct bio **bio_src,
> +					unsigned int *offset,
> +					unsigned int len,
> +					gfp_t gfpmask)
> +{
> +	struct bio *bi = *bio_src;
> +	unsigned int off = *offset;
> +	struct bio *chain = NULL;
> +	struct bio **end;
> +
> +	/* Build up a chain of clone bios up to the limit */
> +
> +	if (!bi || off >= bi->bi_size || !len)
> +		return NULL;		/* Nothing to clone */
>
> -	*old = old_chain;
> +	end = &chain;
> +	while (len) {
> +		unsigned int bi_size;
> +		struct bio *bio;
> +
> +		if (!bi)
> +			goto out_err;	/* EINVAL; ran out of bio's */
> +		bi_size = min_t(unsigned int, bi->bi_size - off, len);
> +		bio = bio_clone_range(bi, off, bi_size, gfpmask);
> +		if (!bio)
> +			goto out_err;	/* ENOMEM */
> +
> +		*end = bio;
> +		end = &bio->bi_next;
> +
> +		off += bi_size;
> +		if (off == bi->bi_size) {
> +			bi = bi->bi_next;
> +			off = 0;
> +		}
> +		len -= bi_size;
> +	}
> +	*bio_src = bi;
> +	*offset = off;
>
> -	return new_chain;
> +	return chain;
> +out_err:
> +	bio_chain_put(chain);
>
> -err_out:
> -	dout("bio_chain_clone with err\n");
> -	bio_chain_put(new_chain);
>   	return NULL;
>   }
>
> @@ -1014,8 +1081,9 @@ static int rbd_do_request(struct request *rq,
>   		req_data->coll_index = coll_index;
>   	}
>
> -	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
> -		(unsigned long long) ofs, (unsigned long long) len);
> +	dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
> +		object_name, (unsigned long long) ofs,
> +		(unsigned long long) len, coll, coll_index);
>
>   	osdc = &rbd_dev->rbd_client->client->osdc;
>   	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
> @@ -1463,18 +1531,16 @@ static void rbd_rq_fn(struct request_queue *q)
>   {
>   	struct rbd_device *rbd_dev = q->queuedata;
>   	struct request *rq;
> -	struct bio_pair *bp = NULL;
>
>   	while ((rq = blk_fetch_request(q))) {
>   		struct bio *bio;
> -		struct bio *rq_bio, *next_bio = NULL;
>   		bool do_write;
>   		unsigned int size;
> -		u64 op_size = 0;
>   		u64 ofs;
>   		int num_segs, cur_seg = 0;
>   		struct rbd_req_coll *coll;
>   		struct ceph_snap_context *snapc;
> +		unsigned int bio_offset;
>
>   		dout("fetched request\n");
>
> @@ -1486,10 +1552,6 @@ static void rbd_rq_fn(struct request_queue *q)
>
>   		/* deduce our operation (read, write) */
>   		do_write = (rq_data_dir(rq) == WRITE);
> -
> -		size = blk_rq_bytes(rq);
> -		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
> -		rq_bio = rq->bio;
>   		if (do_write && rbd_dev->mapping.read_only) {
>   			__blk_end_request_all(rq, -EROFS);
>   			continue;
> @@ -1512,6 +1574,10 @@ static void rbd_rq_fn(struct request_queue *q)
>
>   		up_read(&rbd_dev->header_rwsem);
>
> +		size = blk_rq_bytes(rq);
> +		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
> +		bio = rq->bio;
> +
>   		dout("%s 0x%x bytes at 0x%llx\n",
>   		     do_write ? "write" : "read",
>   		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
> @@ -1531,30 +1597,37 @@ static void rbd_rq_fn(struct request_queue *q)
>   			continue;
>   		}
>
> +		bio_offset = 0;
>   		do {
> -			/* a bio clone to be passed down to OSD req */
> +			u64 limit = rbd_segment_length(rbd_dev, ofs, size);
> +			unsigned int chain_size;
> +			struct bio *bio_chain;
> +
> +			BUG_ON(limit > (u64) UINT_MAX);
> +			chain_size = (unsigned int) limit;
>   			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
> -			op_size = rbd_segment_length(rbd_dev, ofs, size);
> +
>   			kref_get(&coll->kref);
> -			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
> -					      op_size, GFP_ATOMIC);
> -			if (bio)
> +
> +			/* Pass a cloned bio chain via an osd request */
> +
> +			bio_chain = bio_chain_clone_range(&bio,
> +						&bio_offset, chain_size,
> +						GFP_ATOMIC);
> +			if (bio_chain)
>   				(void) rbd_do_op(rq, rbd_dev, snapc,
> -						ofs, op_size,
> -						bio, coll, cur_seg);
> +						ofs, chain_size,
> +						bio_chain, coll, cur_seg);
>   			else
>   				rbd_coll_end_req_index(rq, coll, cur_seg,
> -						       -ENOMEM, op_size);
> -			size -= op_size;
> -			ofs += op_size;
> +						       -ENOMEM, chain_size);
> +			size -= chain_size;
> +			ofs += chain_size;
>
>   			cur_seg++;
> -			rq_bio = next_bio;
>   		} while (size > 0);
>   		kref_put(&coll->kref, rbd_coll_release);
>
> -		if (bp)
> -			bio_pair_release(bp);
>   		spin_lock_irq(q->queue_lock);
>
>   		ceph_put_snap_context(snapc);
> @@ -1564,7 +1637,7 @@ static void rbd_rq_fn(struct request_queue *q)
>   /*
>    * a queue callback. Makes sure that we don't create a bio that spans
> across
>    * multiple osd objects. One exception would be with a single page bios,
> - * which we handle later at bio_chain_clone
> + * which we handle later at bio_chain_clone_range()
>    */
>   static int rbd_merge_bvec(struct request_queue *q, struct
> bvec_merge_data *bmd,
>   			  struct bio_vec *bvec)
>


  reply	other threads:[~2012-10-29 20:30 UTC|newest]

Thread overview: 41+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-10-26 22:42 Another pile of patches Alex Elder
2012-10-26 22:44 ` [PATCH, resend] rbd: simplify rbd_rq_fn() Alex Elder
2012-10-29 20:29   ` Josh Durgin [this message]
2012-10-30 12:01     ` Alex Elder
2012-10-26 22:45 ` [PATCH] rbd: remove snapshots on error in rbd_add() Alex Elder
2012-10-29 20:36   ` Josh Durgin
2012-10-26 22:45 ` [PATCH] rbd: make pool_id a 64 bit value Alex Elder
2012-10-29 20:40   ` Josh Durgin
2012-10-26 22:48 ` [PATCH 0/2] rbd: mapping structure changes Alex Elder
2012-10-26 22:51   ` [PATCH 1/2] rbd: move snap info out of rbd_mapping struct Alex Elder
2012-10-29 20:43     ` Josh Durgin
2012-10-26 22:51   ` [PATCH 2/2] rbd: rename snap_exists field Alex Elder
2012-10-29 20:46     ` Josh Durgin
2012-10-26 22:52 ` [PATCH 0/2] rbd: consolidate argument parsing Alex Elder
2012-10-26 22:55   ` [PATCH 1/2] rbd: move ceph_parse_options() call up Alex Elder
2012-10-29 21:08     ` Josh Durgin
2012-10-26 22:55   ` [PATCH 2/2] rbd: do all argument parsing in one place Alex Elder
2012-10-29 21:13     ` Josh Durgin
2012-10-26 22:56 ` [PATCH 0/8] rbd: have rbd_add_parse_args() only parse args Alex Elder
2012-10-26 23:00   ` [PATCH 1/8] rbd: get rid of snap_name_len Alex Elder
2012-10-29 21:14     ` Josh Durgin
2012-10-26 23:00   ` [PATCH 2/8] rbd: remove options args from rbd_add_parse_args() Alex Elder
2012-10-29 21:17     ` Josh Durgin
2012-10-26 23:01   ` [PATCH 3/8] rbd: remove snap_name arg " Alex Elder
2012-10-29 21:26     ` Josh Durgin
2012-10-26 23:02   ` [PATCH 4/8] rbd: pass and populate rbd_options structure Alex Elder
2012-10-29 21:27     ` Josh Durgin
2012-10-26 23:02   ` [PATCH 5/8] rbd: have rbd_add_parse_args() return error Alex Elder
2012-10-29 21:28     ` Josh Durgin
2012-10-26 23:03   ` [PATCH 6/8] rbd: define image specification structure Alex Elder
2012-10-29 22:13     ` Josh Durgin
2012-10-30 12:40       ` Alex Elder
2012-10-30 20:13         ` Josh Durgin
2012-10-26 23:03   ` [PATCH 7/8] rbd: add reference counting to rbd_spec Alex Elder
2012-10-29 22:20     ` Josh Durgin
2012-10-30 12:59       ` Alex Elder
2012-10-30 20:17         ` Josh Durgin
2012-10-26 23:03   ` [PATCH 8/8] rbd: fill rbd_spec in rbd_add_parse_args() Alex Elder
2012-10-29 22:30     ` Josh Durgin
2012-10-30 13:09       ` Alex Elder
2012-10-30 20:18         ` Josh Durgin

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=508EE73E.4090101@inktank.com \
    --to=josh.durgin@inktank.com \
    --cc=ceph-devel@vger.kernel.org \
    --cc=elder@inktank.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox