From mboxrd@z Thu Jan 1 00:00:00 1970 From: Josh Durgin Subject: Re: [PATCH, resend] rbd: simplify rbd_rq_fn() Date: Mon, 29 Oct 2012 13:29:50 -0700 Message-ID: <508EE73E.4090101@inktank.com> References: <508B11E3.3040108@inktank.com> <508B1242.1090808@inktank.com> Mime-Version: 1.0 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Return-path: Received: from mail-pa0-f46.google.com ([209.85.220.46]:41735 "EHLO mail-pa0-f46.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1751753Ab2J2UaA (ORCPT ); Mon, 29 Oct 2012 16:30:00 -0400 Received: by mail-pa0-f46.google.com with SMTP id hz1so3687589pad.19 for ; Mon, 29 Oct 2012 13:30:00 -0700 (PDT) In-Reply-To: <508B1242.1090808@inktank.com> Sender: ceph-devel-owner@vger.kernel.org List-ID: To: Alex Elder Cc: ceph-devel@vger.kernel.org This is much easier to read now. It might be useful to add messages for the different failure cases in bio_chain_clone_range later. Reviewed-by: Josh Durgin On 10/26/2012 03:44 PM, Alex Elder wrote: > When processing a request, rbd_rq_fn() makes clones of the bio's in > the request's bio chain and submits the results to osd's to be > satisfied. If a request bio straddles the boundary between objects > backing the rbd image, it must be represented by two cloned bio's, > one for the first part (at the end of one object) and one for the > second (at the beginning of the next object). > > This has been handled by a function bio_chain_clone(), which > includes an interface only a mother could love, and which has > been found to have other problems. > > This patch defines two new fairly generic bio functions (one which > replaces bio_chain_clone()) to help out the situation, and then > revises rbd_rq_fn() to make use of them. > > First, bio_clone_range() clones a portion of a single bio, starting > at a given offset within the bio and including only as many bytes > as requested. As a convenience, a request to clone the entire bio > is passed directly to bio_clone(). > > Second, bio_chain_clone_range() performs a similar function, > producing a chain of cloned bio's covering a sub-range of the > source chain. No bio_pair structures are used, and if successful > the result will represent exactly the specified range. > > Using bio_chain_clone_range() makes bio_rq_fn() a little easier > to understand, because it avoids the need to pass very much > state information between consecutive calls. By avoiding the need > to track a bio_pair structure, it also eliminates the problem > described here: http://tracker.newdream.net/issues/2933 > > Note that a block request (and therefore the complete length of > a bio chain processed in rbd_rq_fn()) is an unsigned int, while > the result of rbd_segment_length() is u64. This change makes > this range trunctation explicit, and trips a bug if the the > segment boundary is too far off. > > Signed-off-by: Alex Elder > --- > drivers/block/rbd.c | 231 > +++++++++++++++++++++++++++++++++------------------ > 1 file changed, 152 insertions(+), 79 deletions(-) > > diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c > index c800047..cc06c55 100644 > --- a/drivers/block/rbd.c > +++ b/drivers/block/rbd.c > @@ -826,77 +826,144 @@ static void zero_bio_chain(struct bio *chain, int > start_ofs) > } > > /* > - * bio_chain_clone - clone a chain of bios up to a certain length. > - * might return a bio_pair that will need to be released. > + * Clone a portion of a bio, starting at the given byte offset > + * and continuing for the number of bytes indicated. > */ > -static struct bio *bio_chain_clone(struct bio **old, struct bio **next, > - struct bio_pair **bp, > - int len, gfp_t gfpmask) > -{ > - struct bio *old_chain = *old; > - struct bio *new_chain = NULL; > - struct bio *tail; > - int total = 0; > - > - if (*bp) { > - bio_pair_release(*bp); > - *bp = NULL; > - } > +static struct bio *bio_clone_range(struct bio *bio_src, > + unsigned int offset, > + unsigned int len, > + gfp_t gfpmask) > +{ > + struct bio_vec *bv; > + unsigned int resid; > + unsigned short idx; > + unsigned int voff; > + unsigned short end_idx; > + unsigned short vcnt; > + struct bio *bio; > > - while (old_chain && (total < len)) { > - struct bio *tmp; > + /* Handle the easy case for the caller */ > > - tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); > - if (!tmp) > - goto err_out; > - gfpmask &= ~__GFP_WAIT; /* can't wait after the first */ > + if (!offset && len == bio_src->bi_size) > + return bio_clone(bio_src, gfpmask); > > - if (total + old_chain->bi_size > len) { > - struct bio_pair *bp; > + if (WARN_ON_ONCE(!len)) > + return NULL; > + if (WARN_ON_ONCE(len > bio_src->bi_size)) > + return NULL; > + if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) > + return NULL; > > - /* > - * this split can only happen with a single paged bio, > - * split_bio will BUG_ON if this is not the case > - */ > - dout("bio_chain_clone split! total=%d remaining=%d" > - "bi_size=%u\n", > - total, len - total, old_chain->bi_size); > + /* Find first affected segment... */ > > - /* split the bio. We'll release it either in the next > - call, or it will have to be released outside */ > - bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); > - if (!bp) > - goto err_out; > + resid = offset; > + __bio_for_each_segment(bv, bio_src, idx, 0) { > + if (resid < bv->bv_len) > + break; > + resid -= bv->bv_len; > + } > + voff = resid; > > - __bio_clone(tmp, &bp->bio1); > + /* ...and the last affected segment */ > > - *next = &bp->bio2; > - } else { > - __bio_clone(tmp, old_chain); > - *next = old_chain->bi_next; > - } > + resid += len; > + __bio_for_each_segment(bv, bio_src, end_idx, idx) { > + if (resid <= bv->bv_len) > + break; > + resid -= bv->bv_len; > + } > + vcnt = end_idx - idx + 1; > + > + /* Build the clone */ > + > + bio = bio_alloc(gfpmask, (unsigned int) vcnt); > + if (!bio) > + return NULL; /* ENOMEM */ > > - tmp->bi_bdev = NULL; > - tmp->bi_next = NULL; > - if (new_chain) > - tail->bi_next = tmp; > - else > - new_chain = tmp; > - tail = tmp; > - old_chain = old_chain->bi_next; > + bio->bi_bdev = bio_src->bi_bdev; > + bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); > + bio->bi_rw = bio_src->bi_rw; > + bio->bi_flags |= 1 << BIO_CLONED; > > - total += tmp->bi_size; > + /* > + * Copy over our part of the bio_vec, then update the first > + * and last (or only) entries. > + */ > + memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], > + vcnt * sizeof (struct bio_vec)); > + bio->bi_io_vec[0].bv_offset += voff; > + if (vcnt > 1) { > + bio->bi_io_vec[0].bv_len -= voff; > + bio->bi_io_vec[vcnt - 1].bv_len = resid; > + } else { > + bio->bi_io_vec[0].bv_len = len; > } > > - rbd_assert(total == len); > + bio->bi_vcnt = vcnt; > + bio->bi_size = len; > + bio->bi_idx = 0; > + > + return bio; > +} > + > +/* > + * Clone a portion of a bio chain, starting at the given byte offset > + * into the first bio in the source chain and continuing for the > + * number of bytes indicated. The result is another bio chain of > + * exactly the given length, or a null pointer on error. > + * > + * The bio_src and offset parameters are both in-out. On entry they > + * refer to the first source bio and the offset into that bio where > + * the start of data to be cloned is located. > + * > + * On return, bio_src is updated to refer to the bio in the source > + * chain that contains first un-cloned byte, and *offset will > + * contain the offset of that byte within that bio. > + */ > +static struct bio *bio_chain_clone_range(struct bio **bio_src, > + unsigned int *offset, > + unsigned int len, > + gfp_t gfpmask) > +{ > + struct bio *bi = *bio_src; > + unsigned int off = *offset; > + struct bio *chain = NULL; > + struct bio **end; > + > + /* Build up a chain of clone bios up to the limit */ > + > + if (!bi || off >= bi->bi_size || !len) > + return NULL; /* Nothing to clone */ > > - *old = old_chain; > + end = &chain; > + while (len) { > + unsigned int bi_size; > + struct bio *bio; > + > + if (!bi) > + goto out_err; /* EINVAL; ran out of bio's */ > + bi_size = min_t(unsigned int, bi->bi_size - off, len); > + bio = bio_clone_range(bi, off, bi_size, gfpmask); > + if (!bio) > + goto out_err; /* ENOMEM */ > + > + *end = bio; > + end = &bio->bi_next; > + > + off += bi_size; > + if (off == bi->bi_size) { > + bi = bi->bi_next; > + off = 0; > + } > + len -= bi_size; > + } > + *bio_src = bi; > + *offset = off; > > - return new_chain; > + return chain; > +out_err: > + bio_chain_put(chain); > > -err_out: > - dout("bio_chain_clone with err\n"); > - bio_chain_put(new_chain); > return NULL; > } > > @@ -1014,8 +1081,9 @@ static int rbd_do_request(struct request *rq, > req_data->coll_index = coll_index; > } > > - dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, > - (unsigned long long) ofs, (unsigned long long) len); > + dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", > + object_name, (unsigned long long) ofs, > + (unsigned long long) len, coll, coll_index); > > osdc = &rbd_dev->rbd_client->client->osdc; > req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, > @@ -1463,18 +1531,16 @@ static void rbd_rq_fn(struct request_queue *q) > { > struct rbd_device *rbd_dev = q->queuedata; > struct request *rq; > - struct bio_pair *bp = NULL; > > while ((rq = blk_fetch_request(q))) { > struct bio *bio; > - struct bio *rq_bio, *next_bio = NULL; > bool do_write; > unsigned int size; > - u64 op_size = 0; > u64 ofs; > int num_segs, cur_seg = 0; > struct rbd_req_coll *coll; > struct ceph_snap_context *snapc; > + unsigned int bio_offset; > > dout("fetched request\n"); > > @@ -1486,10 +1552,6 @@ static void rbd_rq_fn(struct request_queue *q) > > /* deduce our operation (read, write) */ > do_write = (rq_data_dir(rq) == WRITE); > - > - size = blk_rq_bytes(rq); > - ofs = blk_rq_pos(rq) * SECTOR_SIZE; > - rq_bio = rq->bio; > if (do_write && rbd_dev->mapping.read_only) { > __blk_end_request_all(rq, -EROFS); > continue; > @@ -1512,6 +1574,10 @@ static void rbd_rq_fn(struct request_queue *q) > > up_read(&rbd_dev->header_rwsem); > > + size = blk_rq_bytes(rq); > + ofs = blk_rq_pos(rq) * SECTOR_SIZE; > + bio = rq->bio; > + > dout("%s 0x%x bytes at 0x%llx\n", > do_write ? "write" : "read", > size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); > @@ -1531,30 +1597,37 @@ static void rbd_rq_fn(struct request_queue *q) > continue; > } > > + bio_offset = 0; > do { > - /* a bio clone to be passed down to OSD req */ > + u64 limit = rbd_segment_length(rbd_dev, ofs, size); > + unsigned int chain_size; > + struct bio *bio_chain; > + > + BUG_ON(limit > (u64) UINT_MAX); > + chain_size = (unsigned int) limit; > dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); > - op_size = rbd_segment_length(rbd_dev, ofs, size); > + > kref_get(&coll->kref); > - bio = bio_chain_clone(&rq_bio, &next_bio, &bp, > - op_size, GFP_ATOMIC); > - if (bio) > + > + /* Pass a cloned bio chain via an osd request */ > + > + bio_chain = bio_chain_clone_range(&bio, > + &bio_offset, chain_size, > + GFP_ATOMIC); > + if (bio_chain) > (void) rbd_do_op(rq, rbd_dev, snapc, > - ofs, op_size, > - bio, coll, cur_seg); > + ofs, chain_size, > + bio_chain, coll, cur_seg); > else > rbd_coll_end_req_index(rq, coll, cur_seg, > - -ENOMEM, op_size); > - size -= op_size; > - ofs += op_size; > + -ENOMEM, chain_size); > + size -= chain_size; > + ofs += chain_size; > > cur_seg++; > - rq_bio = next_bio; > } while (size > 0); > kref_put(&coll->kref, rbd_coll_release); > > - if (bp) > - bio_pair_release(bp); > spin_lock_irq(q->queue_lock); > > ceph_put_snap_context(snapc); > @@ -1564,7 +1637,7 @@ static void rbd_rq_fn(struct request_queue *q) > /* > * a queue callback. Makes sure that we don't create a bio that spans > across > * multiple osd objects. One exception would be with a single page bios, > - * which we handle later at bio_chain_clone > + * which we handle later at bio_chain_clone_range() > */ > static int rbd_merge_bvec(struct request_queue *q, struct > bvec_merge_data *bmd, > struct bio_vec *bvec) >