From: "Paul E. McKenney" <paulmck@linux.vnet.ibm.com>
To: Nadia.Derbey@bull.net
Cc: efault@gmx.de, manfred@colorfullife.com,
linux-kernel@vger.kernel.org, akpm@linux-foundation.org,
peterz@infradead.org, xemul@openvz.org
Subject: Re: [PATCH 00/13] Re: Scalability requirements for sysv ipc
Date: Sat, 19 Apr 2008 16:25:39 -0700 [thread overview]
Message-ID: <20080419232539.GE20138@linux.vnet.ibm.com> (raw)
In-Reply-To: <20080411161702.460410000@bull.net>
On Fri, Apr 11, 2008 at 06:17:02PM +0200, Nadia.Derbey@bull.net wrote:
>
>
> Here is finally the ipc ridr-based implementation I was talking about last
> week (see http://lkml.org/lkml/2008/4/4/208).
> I couldn't avoid much of the code duplication, but at least made things
> incremental.
>
> Does somebody now a test suite that exists for the idr API, that I could
> run on this new api?
>
> Mike, can you try to run it on your victim: I had such a hard time building
> this patch, that I couldn't re-run the test on my 8-core with this new
> version. So the last results I have are for 2.6.25-rc3-mm1.
>
> Also, I think a careful review should be done to avoid introducing yet other
> problems :-(
>
> *WARNING*: this patch contains a fix for idr.c
> I know, I'm doing things bad, but I only saw the problem this
> afternoon.
>
> It should be applied on linux-2.6.25-rc8-mm1, in the following order:
>
> [ PATCH 01/13 ] : copy_idr_code.patch
> [ PATCH 02/13 ] : change_ridr_struct.patch
> [ PATCH 03/13 ] : ridr_pre_get.patch
> [ PATCH 04/13 ] : ridr_alloc_layer.patch
> [ PATCH 05/13 ] : ridr_free_layer.patch
> [ PATCH 06/13 ] : ridr_sub_alloc.patch
> [ PATCH 07/13 ] : ridr_get_empty_slot.patch
> [ PATCH 08/13 ] : ridr_get_new.patch
> [ PATCH 09/13 ] : ridr_remove.patch
> [ PATCH 10/13 ] : ridr_find.patch
> [ PATCH 11/13 ] : ridr_integrate.patch
> [ PATCH 12/13 ] : ipc_use_ridr.patch
> [ PATCH 13/13 ] : remove_ipc_lock_down.patch
And some more comments on the resulting ridr.c. Note that we might in
fact want to keep the rcu_assign_pointer() calls that I complain about --
see Johannes Berg's posting about making sparse smarter about RCU.
But I include them for completeness.
Thanx, Paul
> /*
> * RCU-based idr API
> */
>
> #ifndef TEST /* to test in user space... */
> #include <linux/slab.h>
> #include <linux/init.h>
> #include <linux/module.h>
> #endif
> #include <linux/err.h>
> #include <linux/string.h>
> #include <linux/ridr.h>
>
> static struct kmem_cache *ridr_layer_cache;
>
> /*
> * Per-cpu pool of preloaded layers
> */
> struct ridr_preget {
> int nr;
> struct ridr_layer *layers[MAX_LEVEL];
> };
> DEFINE_PER_CPU(struct ridr_preget, ridr_pregets) = { 0, };
>
> static inline gfp_t ridr_gfp_mask(struct ridr *idp)
> {
> return idp->gfp_mask & __GFP_BITS_MASK;
> }
>
> static struct ridr_layer *alloc_layer(struct ridr *idp)
> {
> struct ridr_layer *ret = NULL;
> gfp_t gfp_mask = ridr_gfp_mask(idp);
>
> if (!(gfp_mask & __GFP_WAIT)) {
> struct ridr_preget *ridp;
>
> /*
> * Provided the caller has preloaded here, we will always
> * succeed in getting a node here (and never reach
> * kmem_cache_alloc)
> */
> ridp = &__get_cpu_var(ridr_pregets);
> if (ridp->nr) {
> ret = ridp->layers[ridp->nr - 1];
> ridp->layers[ridp->nr - 1] = NULL;
> ridp->nr--;
> }
Might be good to have a BUG_ON(!ret) or some such here?
> }
> if (ret == NULL)
> ret = kmem_cache_alloc(ridr_layer_cache, gfp_mask);
Either that or get kmem_cache_alloc() upset about being called with
preemption disabled. But only sometimes, IIRC.
> return ret;
> }
>
> static void ridr_layer_rcu_free(struct rcu_head *head)
> {
> struct ridr_layer *layer;
>
> layer = container_of(head, struct ridr_layer, rcu_head);
> kmem_cache_free(ridr_layer_cache, layer);
> }
>
> static inline void free_layer(struct ridr_layer *p)
> {
> call_rcu(&p->rcu_head, ridr_layer_rcu_free);
> }
>
> static void ridr_mark_full(struct ridr_layer **pa, int id)
> {
> struct ridr_layer *p = pa[0];
> int l = 0;
>
> __set_bit(id & IDR_MASK, &p->bitmap);
> /*
> * If this layer is full mark the bit in the layer above to
> * show that this part of the radix tree is full. This may
> * complete the layer above and require walking up the radix
> * tree.
> */
> while (p->bitmap == IDR_FULL) {
> p = pa[++l];
> if (!p)
> break;
> id = id >> IDR_BITS;
> __set_bit((id & IDR_MASK), &p->bitmap);
> }
> }
>
> /**
> * ridr_pre_get - reserver resources for ridr allocation
> * @idp: ridr handle
> * @gfp_mask: memory allocation flags
> *
> * Load up this CPU's ridr_layer buffer with sufficient objects to
> * ensure that the addition of a single element in the tree cannot fail.
> *
> * If the system is REALLY out of memory this function returns 0, with
> * preemption enabled.
> * Otherwise 1, with preemption disabled.
> */
> int ridr_pre_get(gfp_t gfp_mask)
> {
> struct ridr_preget *idp;
> struct ridr_layer *layer;
> int ret = 0;
>
> preempt_disable();
> idp = &__get_cpu_var(ridr_pregets);
> while (idp->nr < ARRAY_SIZE(idp->layers)) {
> preempt_enable();
> layer = kmem_cache_alloc(ridr_layer_cache, gfp_mask);
> if (layer == NULL)
> goto out;
Here we potentially spatter free elements across the CPUs, which seems
a bit strange -- unless I am missing something, there has to be a single
lock guarding all updates of a given ridr structure, right?
> preempt_disable();
> idp = &__get_cpu_var(ridr_pregets);
> if (idp->nr < ARRAY_SIZE(idp->layers))
> idp->layers[idp->nr++] = layer;
> else
> kmem_cache_free(ridr_layer_cache, layer);
> }
> ret = 1;
> out:
> return ret;
> }
> EXPORT_SYMBOL(ridr_pre_get);
>
> static int sub_alloc(struct ridr *idp, int *starting_id,
> struct ridr_layer **pa)
> {
> int n, m, sh;
> struct ridr_layer *p, *new;
> int l, id, oid;
> unsigned long bm;
>
> id = *starting_id;
> restart:
> rcu_assign_pointer(p, idp->top);
We don't need the above rcu_assign_pointer(), because "p" is a local
variable. (And we don't publish p's address somewhere that other CPUs
can find it, correct?)
> l = idp->layers;
> rcu_assign_pointer(pa[l--], NULL);
This is a static function, and its caller is also static. So the pa[]
array is always a local variable a ways up the stack, and this need not
be rcu_assign_pointer().
> while (1) {
> /*
> * We run around this while until we reach the leaf node...
> */
> n = (id >> (IDR_BITS*l)) & IDR_MASK;
> bm = ~p->bitmap;
> m = find_next_bit(&bm, IDR_SIZE, n);
> if (m == IDR_SIZE) {
> /* no space available go back to previous layer. */
> l++;
> oid = id;
> id = (id | ((1 << (IDR_BITS * l)) - 1)) + 1;
>
> /* if already at the top layer, we need to grow */
> rcu_assign_pointer(p, pa[l]);
Another unneeded rcu_assign_pointer(), "p" is local.
> if (!p) {
> *starting_id = id;
> return -2;
> }
>
> /* If we need to go up one layer, continue the
> * loop; otherwise, restart from the top.
> */
> sh = IDR_BITS * (l + 1);
> if (oid >> sh == id >> sh)
> continue;
> else
> goto restart;
> }
> if (m != n) {
> sh = IDR_BITS*l;
> id = ((id >> sh) ^ n ^ m) << sh;
> }
> if ((id >= MAX_ID_BIT) || (id < 0))
> return -3;
> if (l == 0)
> break;
> /*
> * Create the layer below if it is missing.
> */
> if (!p->ary[m]) {
> new = alloc_layer(idp);
> if (!new)
> return -1;
> rcu_assign_pointer(p->ary[m], new);
Not yet published, so rcu_assign_pointer() not needed.
> p->count++;
> }
> rcu_assign_pointer(pa[l--], p);
Assignment to local variable (up the stack), so no need for
rcu_assign_pointer().
> p = p->ary[m];
> }
>
> rcu_assign_pointer(pa[l], p);
> return id;
Assignment to local variable (up the stack), so no need for
rcu_assign_pointer().
> }
>
> static int ridr_get_empty_slot(struct ridr *idp, int starting_id,
> struct ridr_layer **pa)
> {
> struct ridr_layer *p, *new;
> int layers, v, id;
>
> id = starting_id;
> build_up:
> p = idp->top;
> layers = idp->layers;
> if (unlikely(!p)) {
> p = alloc_layer(idp);
> if (!p)
> return -1;
> layers = 1;
> }
> /*
> * Add a new layer to the top of the tree if the requested
> * id is larger than the currently allocated space.
> */
> while ((layers < (MAX_LEVEL - 1)) && (id >= (1 << (layers*IDR_BITS)))) {
> layers++;
> if (!p->count)
> continue;
> new = alloc_layer(idp);
> if (!new) {
> /*
> * The allocation failed. If we built part of
> * the structure tear it down.
> */
> for (new = p; p && p != idp->top; new = p) {
> p = p->ary[0];
> new->ary[0] = NULL;
> new->bitmap = new->count = 0;
> free_layer(new);
> }
> return -1;
> }
> rcu_assign_pointer(new->ary[0], p);
The above rcu_assign_pointer() is not needed because we haven't yet
make "new" accessible to other CPUs.
> new->count = 1;
> if (p->bitmap == IDR_FULL)
> __set_bit(0, &new->bitmap);
> rcu_assign_pointer(p, new);
The above need not be rcu_assign_pointer() because "p" is a local variable
that is (I hope!) not being accessed by other CPUs.
> }
> rcu_assign_pointer(idp->top, p);
Interesting... We assign to idp->top whether it changed or not.
Not a problem -- the alternative would make backing out on OOM
quite painful.
> idp->layers = layers;
> v = sub_alloc(idp, &id, pa);
> if (v == -2)
> goto build_up;
> return(v);
> }
>
> static int ridr_get_new_above_int(struct ridr *idp, void *ptr, int starting_id)
> {
> struct ridr_layer *pa[MAX_LEVEL];
> int id;
>
> id = ridr_get_empty_slot(idp, starting_id, pa);
> if (id >= 0) {
> /*
> * Successfully found an empty slot. Install the user
> * pointer and mark the slot full.
> */
> rcu_assign_pointer(pa[0]->ary[id & IDR_MASK],
> (struct ridr_layer *)ptr);
Here we are assigning to the live tree, so we -do- need rcu_assign_pointer().
> pa[0]->count++;
> ridr_mark_full(pa, id);
Is ridr_mark_full() really safe in face of concurrent readers? Seems like
it should be, since it is doing a bunch of __set_bit() calls.
> }
>
> return id;
> }
>
> /**
> * ridr_get_new - allocate new ridr entry
> * @idp: ridr handle
> * @ptr: pointer you want associated with the ide
> * @id: pointer to the allocated handle
> *
> * This is the allocate id function. It should be called with any
> * required locks.
> *
> * If memory is required, it will return -EAGAIN, you should unlock, enable
> * preemption and go back to the ridr_pre_get() call.
> * If the ridr is full, it will return -ENOSPC.
> *
> * @id returns a value in the range 0 ... 0x7fffffff
> */
> int ridr_get_new(struct ridr *idp, void *ptr, int *id)
> {
> int rv;
>
> rv = ridr_get_new_above_int(idp, ptr, 0);
> /*
> * This is a cheap hack until the IDR code can be fixed to
> * return proper error values.
> */
> if (rv < 0) {
> if (rv == -1)
> return -EAGAIN;
> else /* Will be -3 */
> return -ENOSPC;
> }
> *id = rv;
> return 0;
> }
> EXPORT_SYMBOL(ridr_get_new);
>
> static void sub_remove(struct ridr *idp, int shift, int id)
> {
> struct ridr_layer *p = idp->top;
> struct ridr_layer **pa[MAX_LEVEL];
> struct ridr_layer ***paa = &pa[0];
> struct ridr_layer *to_free;
> int n;
>
> *paa = NULL;
> *++paa = &idp->top;
>
> while ((shift > 0) && p) {
> n = (id >> shift) & IDR_MASK;
> __clear_bit(n, &p->bitmap);
> *++paa = &p->ary[n];
> p = p->ary[n];
> shift -= IDR_BITS;
> }
> n = id & IDR_MASK;
> if (likely(p != NULL && test_bit(n, &p->bitmap))) {
> __clear_bit(n, &p->bitmap);
> p->ary[n] = NULL;
> to_free = NULL;
> while (*paa && !--((**paa)->count)) {
> if (to_free)
> free_layer(to_free);
> to_free = **paa;
> **paa-- = NULL;
> }
> if (!*paa)
> idp->layers = 0;
> if (to_free)
> free_layer(to_free);
> } else
> idr_remove_warning("ridr_remove", id);
> }
>
> /**
> * ridr_remove - remove the given id and free it's slot
> * @idp: ridr handle
> * @id: unique key
> */
> void ridr_remove(struct ridr *idp, int id)
> {
> struct ridr_layer *p, *to_free;
>
> /* Mask off upper bits we don't use for the search. */
> id &= MAX_ID_MASK;
>
> sub_remove(idp, (idp->layers - 1) * IDR_BITS, id);
> if (idp->top && idp->top->count == 1 && (idp->layers > 1) &&
> idp->top->ary[0]) { /* We can drop a layer */
Why do we drop layers both in sub_remove() and here?
Hmmm... For the same reason we do in idr_remove(), I guess. Whatever
reason that might be. ;-)
> to_free = idp->top;
> p = idp->top->ary[0];
> idp->top = p;
> --idp->layers;
> to_free->bitmap = to_free->count = 0;
> free_layer(to_free);
> }
> return;
> }
> EXPORT_SYMBOL(ridr_remove);
>
> /**
> * ridr_find - return pointer for given id
> * @idp: ridr handle
> * @id: lookup key
> *
> * Return the pointer given the id it has been registered with. A %NULL
> * return indicates that @id is not valid or you passed %NULL in
> * ridr_get_new().
> *
> * The caller must serialize ridr_find() vs ridr_get_new() and ridr_remove().
> */
> void *ridr_find(struct ridr *idp, int id)
> {
> int n;
> struct ridr_layer *p;
>
> n = idp->layers * IDR_BITS;
> p = rcu_dereference(idp->top);
>
> /* Mask off upper bits we don't use for the search. */
> id &= MAX_ID_MASK;
>
> if (id >= (1 << n))
> return NULL;
>
> while (n > 0 && p) {
> n -= IDR_BITS;
> p = rcu_dereference(p->ary[(id >> n) & IDR_MASK]);
> }
> return((void *)p);
> }
> EXPORT_SYMBOL(ridr_find);
>
> static void ridr_cache_ctor(struct kmem_cache *ridr_layer_cache,
> void *ridr_layer)
> {
> memset(ridr_layer, 0, sizeof(struct ridr_layer));
> }
>
> void __init ridr_init_cache(void)
> {
> ridr_layer_cache = kmem_cache_create("ridr_layer_cache",
> sizeof(struct ridr_layer), 0, SLAB_PANIC,
> ridr_cache_ctor);
> }
next prev parent reply other threads:[~2008-04-19 23:25 UTC|newest]
Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top
2008-04-11 16:17 [PATCH 00/13] Re: Scalability requirements for sysv ipc Nadia.Derbey
2008-04-11 16:17 ` [PATCH 01/13] duplicate idr code Nadia.Derbey
2008-04-11 16:17 ` [PATCH 02/13] Change ridr structure Nadia.Derbey
2008-04-11 16:17 ` [PATCH 03/13] Fix ridr_pre_get() Nadia.Derbey
2008-04-11 16:17 ` [PATCH 04/13] Fix ridr_alloc_layer() Nadia.Derbey
2008-04-11 16:17 ` [PATCH 05/13] Fix free_layer() Nadia.Derbey
2008-04-11 16:17 ` [PATCH 06/13] Fix sub_alloc() Nadia.Derbey
2008-04-11 16:17 ` [PATCH 07/13] Fix get_empty_slot() Nadia.Derbey
2008-04-11 16:17 ` [PATCH 08/13] Fix ridr_get_new_above_int() Nadia.Derbey
2008-04-11 16:17 ` [PATCH 09/13] Fix ridr_remove() Nadia.Derbey
2008-04-11 16:17 ` [PATCH 10/13] Fix ridr_find() Nadia.Derbey
2008-04-11 16:17 ` [PATCH 11/13] Integrate the ridr code Nadia.Derbey
2008-04-11 16:17 ` [PATCH 12/13] Integrate the ridr code into IPC code Nadia.Derbey
2008-04-11 16:17 ` [PATCH 13/13] Get rid of ipc_lock_down() Nadia.Derbey
2008-04-11 16:27 ` [PATCH 00/13] Re: Scalability requirements for sysv ipc Peter Zijlstra
2008-04-14 5:18 ` Nadia Derbey
2008-04-14 7:15 ` Peter Zijlstra
2008-04-14 8:33 ` Nadia Derbey
2008-04-14 10:52 ` Nadia Derbey
2008-04-14 18:54 ` Manfred Spraul
2008-04-15 6:13 ` Nadia Derbey
2008-04-19 23:28 ` Paul E. McKenney
2008-04-21 8:07 ` Nadia Derbey
2008-04-21 14:44 ` Paul E. McKenney
2008-04-14 13:54 ` Mike Galbraith
2008-04-14 15:01 ` Nadia Derbey
2008-04-19 23:24 ` Paul E. McKenney
2008-04-19 23:25 ` Paul E. McKenney [this message]
2008-04-21 5:59 ` Nadia Derbey
2008-04-29 14:35 ` Nadia Derbey
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20080419232539.GE20138@linux.vnet.ibm.com \
--to=paulmck@linux.vnet.ibm.com \
--cc=Nadia.Derbey@bull.net \
--cc=akpm@linux-foundation.org \
--cc=efault@gmx.de \
--cc=linux-kernel@vger.kernel.org \
--cc=manfred@colorfullife.com \
--cc=peterz@infradead.org \
--cc=xemul@openvz.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.