From: Josh Durgin <josh.durgin@inktank.com>
To: Alex Elder <elder@inktank.com>
Cc: ceph-devel@vger.kernel.org
Subject: Re: [PATCH, resend] rbd: simplify rbd_rq_fn()
Date: Mon, 29 Oct 2012 13:29:50 -0700 [thread overview]
Message-ID: <508EE73E.4090101@inktank.com> (raw)
In-Reply-To: <508B1242.1090808@inktank.com>
This is much easier to read now. It might be useful
to add messages for the different failure cases in
bio_chain_clone_range later.
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
On 10/26/2012 03:44 PM, Alex Elder wrote:
> When processing a request, rbd_rq_fn() makes clones of the bio's in
> the request's bio chain and submits the results to osd's to be
> satisfied. If a request bio straddles the boundary between objects
> backing the rbd image, it must be represented by two cloned bio's,
> one for the first part (at the end of one object) and one for the
> second (at the beginning of the next object).
>
> This has been handled by a function bio_chain_clone(), which
> includes an interface only a mother could love, and which has
> been found to have other problems.
>
> This patch defines two new fairly generic bio functions (one which
> replaces bio_chain_clone()) to help out the situation, and then
> revises rbd_rq_fn() to make use of them.
>
> First, bio_clone_range() clones a portion of a single bio, starting
> at a given offset within the bio and including only as many bytes
> as requested. As a convenience, a request to clone the entire bio
> is passed directly to bio_clone().
>
> Second, bio_chain_clone_range() performs a similar function,
> producing a chain of cloned bio's covering a sub-range of the
> source chain. No bio_pair structures are used, and if successful
> the result will represent exactly the specified range.
>
> Using bio_chain_clone_range() makes bio_rq_fn() a little easier
> to understand, because it avoids the need to pass very much
> state information between consecutive calls. By avoiding the need
> to track a bio_pair structure, it also eliminates the problem
> described here: http://tracker.newdream.net/issues/2933
>
> Note that a block request (and therefore the complete length of
> a bio chain processed in rbd_rq_fn()) is an unsigned int, while
> the result of rbd_segment_length() is u64. This change makes
> this range trunctation explicit, and trips a bug if the the
> segment boundary is too far off.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> drivers/block/rbd.c | 231
> +++++++++++++++++++++++++++++++++------------------
> 1 file changed, 152 insertions(+), 79 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index c800047..cc06c55 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -826,77 +826,144 @@ static void zero_bio_chain(struct bio *chain, int
> start_ofs)
> }
>
> /*
> - * bio_chain_clone - clone a chain of bios up to a certain length.
> - * might return a bio_pair that will need to be released.
> + * Clone a portion of a bio, starting at the given byte offset
> + * and continuing for the number of bytes indicated.
> */
> -static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
> - struct bio_pair **bp,
> - int len, gfp_t gfpmask)
> -{
> - struct bio *old_chain = *old;
> - struct bio *new_chain = NULL;
> - struct bio *tail;
> - int total = 0;
> -
> - if (*bp) {
> - bio_pair_release(*bp);
> - *bp = NULL;
> - }
> +static struct bio *bio_clone_range(struct bio *bio_src,
> + unsigned int offset,
> + unsigned int len,
> + gfp_t gfpmask)
> +{
> + struct bio_vec *bv;
> + unsigned int resid;
> + unsigned short idx;
> + unsigned int voff;
> + unsigned short end_idx;
> + unsigned short vcnt;
> + struct bio *bio;
>
> - while (old_chain && (total < len)) {
> - struct bio *tmp;
> + /* Handle the easy case for the caller */
>
> - tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
> - if (!tmp)
> - goto err_out;
> - gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
> + if (!offset && len == bio_src->bi_size)
> + return bio_clone(bio_src, gfpmask);
>
> - if (total + old_chain->bi_size > len) {
> - struct bio_pair *bp;
> + if (WARN_ON_ONCE(!len))
> + return NULL;
> + if (WARN_ON_ONCE(len > bio_src->bi_size))
> + return NULL;
> + if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
> + return NULL;
>
> - /*
> - * this split can only happen with a single paged bio,
> - * split_bio will BUG_ON if this is not the case
> - */
> - dout("bio_chain_clone split! total=%d remaining=%d"
> - "bi_size=%u\n",
> - total, len - total, old_chain->bi_size);
> + /* Find first affected segment... */
>
> - /* split the bio. We'll release it either in the next
> - call, or it will have to be released outside */
> - bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
> - if (!bp)
> - goto err_out;
> + resid = offset;
> + __bio_for_each_segment(bv, bio_src, idx, 0) {
> + if (resid < bv->bv_len)
> + break;
> + resid -= bv->bv_len;
> + }
> + voff = resid;
>
> - __bio_clone(tmp, &bp->bio1);
> + /* ...and the last affected segment */
>
> - *next = &bp->bio2;
> - } else {
> - __bio_clone(tmp, old_chain);
> - *next = old_chain->bi_next;
> - }
> + resid += len;
> + __bio_for_each_segment(bv, bio_src, end_idx, idx) {
> + if (resid <= bv->bv_len)
> + break;
> + resid -= bv->bv_len;
> + }
> + vcnt = end_idx - idx + 1;
> +
> + /* Build the clone */
> +
> + bio = bio_alloc(gfpmask, (unsigned int) vcnt);
> + if (!bio)
> + return NULL; /* ENOMEM */
>
> - tmp->bi_bdev = NULL;
> - tmp->bi_next = NULL;
> - if (new_chain)
> - tail->bi_next = tmp;
> - else
> - new_chain = tmp;
> - tail = tmp;
> - old_chain = old_chain->bi_next;
> + bio->bi_bdev = bio_src->bi_bdev;
> + bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
> + bio->bi_rw = bio_src->bi_rw;
> + bio->bi_flags |= 1 << BIO_CLONED;
>
> - total += tmp->bi_size;
> + /*
> + * Copy over our part of the bio_vec, then update the first
> + * and last (or only) entries.
> + */
> + memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
> + vcnt * sizeof (struct bio_vec));
> + bio->bi_io_vec[0].bv_offset += voff;
> + if (vcnt > 1) {
> + bio->bi_io_vec[0].bv_len -= voff;
> + bio->bi_io_vec[vcnt - 1].bv_len = resid;
> + } else {
> + bio->bi_io_vec[0].bv_len = len;
> }
>
> - rbd_assert(total == len);
> + bio->bi_vcnt = vcnt;
> + bio->bi_size = len;
> + bio->bi_idx = 0;
> +
> + return bio;
> +}
> +
> +/*
> + * Clone a portion of a bio chain, starting at the given byte offset
> + * into the first bio in the source chain and continuing for the
> + * number of bytes indicated. The result is another bio chain of
> + * exactly the given length, or a null pointer on error.
> + *
> + * The bio_src and offset parameters are both in-out. On entry they
> + * refer to the first source bio and the offset into that bio where
> + * the start of data to be cloned is located.
> + *
> + * On return, bio_src is updated to refer to the bio in the source
> + * chain that contains first un-cloned byte, and *offset will
> + * contain the offset of that byte within that bio.
> + */
> +static struct bio *bio_chain_clone_range(struct bio **bio_src,
> + unsigned int *offset,
> + unsigned int len,
> + gfp_t gfpmask)
> +{
> + struct bio *bi = *bio_src;
> + unsigned int off = *offset;
> + struct bio *chain = NULL;
> + struct bio **end;
> +
> + /* Build up a chain of clone bios up to the limit */
> +
> + if (!bi || off >= bi->bi_size || !len)
> + return NULL; /* Nothing to clone */
>
> - *old = old_chain;
> + end = &chain;
> + while (len) {
> + unsigned int bi_size;
> + struct bio *bio;
> +
> + if (!bi)
> + goto out_err; /* EINVAL; ran out of bio's */
> + bi_size = min_t(unsigned int, bi->bi_size - off, len);
> + bio = bio_clone_range(bi, off, bi_size, gfpmask);
> + if (!bio)
> + goto out_err; /* ENOMEM */
> +
> + *end = bio;
> + end = &bio->bi_next;
> +
> + off += bi_size;
> + if (off == bi->bi_size) {
> + bi = bi->bi_next;
> + off = 0;
> + }
> + len -= bi_size;
> + }
> + *bio_src = bi;
> + *offset = off;
>
> - return new_chain;
> + return chain;
> +out_err:
> + bio_chain_put(chain);
>
> -err_out:
> - dout("bio_chain_clone with err\n");
> - bio_chain_put(new_chain);
> return NULL;
> }
>
> @@ -1014,8 +1081,9 @@ static int rbd_do_request(struct request *rq,
> req_data->coll_index = coll_index;
> }
>
> - dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
> - (unsigned long long) ofs, (unsigned long long) len);
> + dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
> + object_name, (unsigned long long) ofs,
> + (unsigned long long) len, coll, coll_index);
>
> osdc = &rbd_dev->rbd_client->client->osdc;
> req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
> @@ -1463,18 +1531,16 @@ static void rbd_rq_fn(struct request_queue *q)
> {
> struct rbd_device *rbd_dev = q->queuedata;
> struct request *rq;
> - struct bio_pair *bp = NULL;
>
> while ((rq = blk_fetch_request(q))) {
> struct bio *bio;
> - struct bio *rq_bio, *next_bio = NULL;
> bool do_write;
> unsigned int size;
> - u64 op_size = 0;
> u64 ofs;
> int num_segs, cur_seg = 0;
> struct rbd_req_coll *coll;
> struct ceph_snap_context *snapc;
> + unsigned int bio_offset;
>
> dout("fetched request\n");
>
> @@ -1486,10 +1552,6 @@ static void rbd_rq_fn(struct request_queue *q)
>
> /* deduce our operation (read, write) */
> do_write = (rq_data_dir(rq) == WRITE);
> -
> - size = blk_rq_bytes(rq);
> - ofs = blk_rq_pos(rq) * SECTOR_SIZE;
> - rq_bio = rq->bio;
> if (do_write && rbd_dev->mapping.read_only) {
> __blk_end_request_all(rq, -EROFS);
> continue;
> @@ -1512,6 +1574,10 @@ static void rbd_rq_fn(struct request_queue *q)
>
> up_read(&rbd_dev->header_rwsem);
>
> + size = blk_rq_bytes(rq);
> + ofs = blk_rq_pos(rq) * SECTOR_SIZE;
> + bio = rq->bio;
> +
> dout("%s 0x%x bytes at 0x%llx\n",
> do_write ? "write" : "read",
> size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
> @@ -1531,30 +1597,37 @@ static void rbd_rq_fn(struct request_queue *q)
> continue;
> }
>
> + bio_offset = 0;
> do {
> - /* a bio clone to be passed down to OSD req */
> + u64 limit = rbd_segment_length(rbd_dev, ofs, size);
> + unsigned int chain_size;
> + struct bio *bio_chain;
> +
> + BUG_ON(limit > (u64) UINT_MAX);
> + chain_size = (unsigned int) limit;
> dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
> - op_size = rbd_segment_length(rbd_dev, ofs, size);
> +
> kref_get(&coll->kref);
> - bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
> - op_size, GFP_ATOMIC);
> - if (bio)
> +
> + /* Pass a cloned bio chain via an osd request */
> +
> + bio_chain = bio_chain_clone_range(&bio,
> + &bio_offset, chain_size,
> + GFP_ATOMIC);
> + if (bio_chain)
> (void) rbd_do_op(rq, rbd_dev, snapc,
> - ofs, op_size,
> - bio, coll, cur_seg);
> + ofs, chain_size,
> + bio_chain, coll, cur_seg);
> else
> rbd_coll_end_req_index(rq, coll, cur_seg,
> - -ENOMEM, op_size);
> - size -= op_size;
> - ofs += op_size;
> + -ENOMEM, chain_size);
> + size -= chain_size;
> + ofs += chain_size;
>
> cur_seg++;
> - rq_bio = next_bio;
> } while (size > 0);
> kref_put(&coll->kref, rbd_coll_release);
>
> - if (bp)
> - bio_pair_release(bp);
> spin_lock_irq(q->queue_lock);
>
> ceph_put_snap_context(snapc);
> @@ -1564,7 +1637,7 @@ static void rbd_rq_fn(struct request_queue *q)
> /*
> * a queue callback. Makes sure that we don't create a bio that spans
> across
> * multiple osd objects. One exception would be with a single page bios,
> - * which we handle later at bio_chain_clone
> + * which we handle later at bio_chain_clone_range()
> */
> static int rbd_merge_bvec(struct request_queue *q, struct
> bvec_merge_data *bmd,
> struct bio_vec *bvec)
>
next prev parent reply other threads:[~2012-10-29 20:30 UTC|newest]
Thread overview: 41+ messages / expand[flat|nested] mbox.gz Atom feed top
2012-10-26 22:42 Another pile of patches Alex Elder
2012-10-26 22:44 ` [PATCH, resend] rbd: simplify rbd_rq_fn() Alex Elder
2012-10-29 20:29 ` Josh Durgin [this message]
2012-10-30 12:01 ` Alex Elder
2012-10-26 22:45 ` [PATCH] rbd: remove snapshots on error in rbd_add() Alex Elder
2012-10-29 20:36 ` Josh Durgin
2012-10-26 22:45 ` [PATCH] rbd: make pool_id a 64 bit value Alex Elder
2012-10-29 20:40 ` Josh Durgin
2012-10-26 22:48 ` [PATCH 0/2] rbd: mapping structure changes Alex Elder
2012-10-26 22:51 ` [PATCH 1/2] rbd: move snap info out of rbd_mapping struct Alex Elder
2012-10-29 20:43 ` Josh Durgin
2012-10-26 22:51 ` [PATCH 2/2] rbd: rename snap_exists field Alex Elder
2012-10-29 20:46 ` Josh Durgin
2012-10-26 22:52 ` [PATCH 0/2] rbd: consolidate argument parsing Alex Elder
2012-10-26 22:55 ` [PATCH 1/2] rbd: move ceph_parse_options() call up Alex Elder
2012-10-29 21:08 ` Josh Durgin
2012-10-26 22:55 ` [PATCH 2/2] rbd: do all argument parsing in one place Alex Elder
2012-10-29 21:13 ` Josh Durgin
2012-10-26 22:56 ` [PATCH 0/8] rbd: have rbd_add_parse_args() only parse args Alex Elder
2012-10-26 23:00 ` [PATCH 1/8] rbd: get rid of snap_name_len Alex Elder
2012-10-29 21:14 ` Josh Durgin
2012-10-26 23:00 ` [PATCH 2/8] rbd: remove options args from rbd_add_parse_args() Alex Elder
2012-10-29 21:17 ` Josh Durgin
2012-10-26 23:01 ` [PATCH 3/8] rbd: remove snap_name arg " Alex Elder
2012-10-29 21:26 ` Josh Durgin
2012-10-26 23:02 ` [PATCH 4/8] rbd: pass and populate rbd_options structure Alex Elder
2012-10-29 21:27 ` Josh Durgin
2012-10-26 23:02 ` [PATCH 5/8] rbd: have rbd_add_parse_args() return error Alex Elder
2012-10-29 21:28 ` Josh Durgin
2012-10-26 23:03 ` [PATCH 6/8] rbd: define image specification structure Alex Elder
2012-10-29 22:13 ` Josh Durgin
2012-10-30 12:40 ` Alex Elder
2012-10-30 20:13 ` Josh Durgin
2012-10-26 23:03 ` [PATCH 7/8] rbd: add reference counting to rbd_spec Alex Elder
2012-10-29 22:20 ` Josh Durgin
2012-10-30 12:59 ` Alex Elder
2012-10-30 20:17 ` Josh Durgin
2012-10-26 23:03 ` [PATCH 8/8] rbd: fill rbd_spec in rbd_add_parse_args() Alex Elder
2012-10-29 22:30 ` Josh Durgin
2012-10-30 13:09 ` Alex Elder
2012-10-30 20:18 ` Josh Durgin
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=508EE73E.4090101@inktank.com \
--to=josh.durgin@inktank.com \
--cc=ceph-devel@vger.kernel.org \
--cc=elder@inktank.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox