cluster-devel.redhat.com archive mirror
 help / color / mirror / Atom feed
* [Cluster-devel] GFS2: Use list_lru for quota lru list
@ 2013-09-13 11:44 Steven Whitehouse
  2013-09-17  8:38 ` Dave Chinner
  0 siblings, 1 reply; 2+ messages in thread
From: Steven Whitehouse @ 2013-09-13 11:44 UTC (permalink / raw)
  To: cluster-devel.redhat.com


This patch is a first go (only compile tested so far) at what list_lru
support should look like for GFS2's quota subsystem. I've done this one
rather than the glock lru list first, since this is easier. There are
still some unresolved issues on the glock side with how to use the new
list_lru code.

The conversion is fairly straight forward in this case though, just
requiring an extra function to conditionally add items to the lru
list upon an atomic counter hitting zero. The other change is that
the locking as split into two, one the internal lock in the lru code,
and the other being a new quota spinlock, which is used for all the
non-lru locking.

Let me know if you can spot any problems, and we'll move on to doing
a bit of testing before we merge this. Once thats done, I'll start
looking at the glock issues again to see if I can make some headway
there too,

Steve.


diff --git a/fs/gfs2/main.c b/fs/gfs2/main.c
index 351586e..7296592 100644
--- a/fs/gfs2/main.c
+++ b/fs/gfs2/main.c
@@ -18,6 +18,7 @@
 #include <linux/rculist_bl.h>
 #include <linux/atomic.h>
 #include <linux/mempool.h>
+#include <linux/list_lru.h>
 
 #include "gfs2.h"
 #include "incore.h"
@@ -35,6 +36,7 @@ static struct shrinker qd_shrinker = {
 	.count_objects = gfs2_qd_shrink_count,
 	.scan_objects = gfs2_qd_shrink_scan,
 	.seeks = DEFAULT_SEEKS,
+	.flags = SHRINKER_NUMA_AWARE,
 };
 
 static void gfs2_init_inode_once(void *foo)
@@ -87,6 +89,10 @@ static int __init init_gfs2_fs(void)
 	if (error)
 		return error;
 
+	error = list_lru_init(&gfs2_qd_lru);
+	if (error)
+		goto fail_lru;
+
 	error = gfs2_glock_init();
 	if (error)
 		goto fail;
@@ -179,6 +185,8 @@ fail_wq:
 fail_unregister:
 	unregister_filesystem(&gfs2_fs_type);
 fail:
+	list_lru_destroy(&gfs2_qd_lru);
+fail_lru:
 	unregister_shrinker(&qd_shrinker);
 	gfs2_glock_exit();
 
@@ -221,6 +229,7 @@ static void __exit exit_gfs2_fs(void)
 	unregister_filesystem(&gfs2meta_fs_type);
 	destroy_workqueue(gfs_recovery_wq);
 	destroy_workqueue(gfs2_control_wq);
+	list_lru_destroy(&gfs2_qd_lru);
 
 	rcu_barrier();
 
diff --git a/fs/gfs2/quota.c b/fs/gfs2/quota.c
index db44135..726fab3 100644
--- a/fs/gfs2/quota.c
+++ b/fs/gfs2/quota.c
@@ -50,6 +50,7 @@
 #include <linux/freezer.h>
 #include <linux/quota.h>
 #include <linux/dqblk_xfs.h>
+#include <linux/list_lru.h>
 
 #include "gfs2.h"
 #include "incore.h"
@@ -71,54 +72,56 @@ struct gfs2_quota_change_host {
 	struct kqid qc_id;
 };
 
-static LIST_HEAD(qd_lru_list);
-static atomic_t qd_lru_count = ATOMIC_INIT(0);
-static DEFINE_SPINLOCK(qd_lru_lock);
+struct list_lru gfs2_qd_lru;
+static DEFINE_SPINLOCK(qd_lock);
 
-unsigned long gfs2_qd_shrink_scan(struct shrinker *shrink,
-				  struct shrink_control *sc)
+static enum lru_status gfs2_qd_walk(struct list_head *item, spinlock_t *lock, void *arg)
 {
 	struct gfs2_quota_data *qd;
 	struct gfs2_sbd *sdp;
-	int nr_to_scan = sc->nr_to_scan;
-	long freed = 0;
 
-	if (!(sc->gfp_mask & __GFP_FS))
-		return SHRINK_STOP;
+	qd = list_entry(item, struct gfs2_quota_data, qd_reclaim);
+	sdp = qd->qd_gl->gl_sbd;
 
-	spin_lock(&qd_lru_lock);
-	while (nr_to_scan && !list_empty(&qd_lru_list)) {
-		qd = list_entry(qd_lru_list.next,
-				struct gfs2_quota_data, qd_reclaim);
-		sdp = qd->qd_gl->gl_sbd;
+	spin_lock(&qd_lock);
+	if (atomic_read(&qd->qd_count)) {
+		spin_unlock(&qd_lock);
+		return LRU_SKIP;
+	}
 
-		/* Free from the filesystem-specific list */
-		list_del(&qd->qd_list);
+	/* Free from the filesystem-specific list */
+	list_del(&qd->qd_list);
+	spin_unlock(&qd_lock);
+	spin_unlock(lock);
 
-		gfs2_assert_warn(sdp, !qd->qd_change);
-		gfs2_assert_warn(sdp, !qd->qd_slot_count);
-		gfs2_assert_warn(sdp, !qd->qd_bh_count);
+	gfs2_assert_warn(sdp, !qd->qd_change);
+	gfs2_assert_warn(sdp, !qd->qd_slot_count);
+	gfs2_assert_warn(sdp, !qd->qd_bh_count);
 
-		gfs2_glock_put(qd->qd_gl);
-		atomic_dec(&sdp->sd_quota_count);
+	gfs2_glock_put(qd->qd_gl);
+	atomic_dec(&sdp->sd_quota_count);
 
-		/* Delete it from the common reclaim list */
-		list_del_init(&qd->qd_reclaim);
-		atomic_dec(&qd_lru_count);
-		spin_unlock(&qd_lru_lock);
-		kmem_cache_free(gfs2_quotad_cachep, qd);
-		spin_lock(&qd_lru_lock);
-		nr_to_scan--;
-		freed++;
-	}
-	spin_unlock(&qd_lru_lock);
-	return freed;
+	/* Delete it from the common reclaim list */
+	kmem_cache_free(gfs2_quotad_cachep, qd);
+	spin_lock(lock);
+
+	return LRU_REMOVED;
+}
+
+unsigned long gfs2_qd_shrink_scan(struct shrinker *shrink,
+				  struct shrink_control *sc)
+{
+	if (!(sc->gfp_mask & __GFP_FS))
+		return SHRINK_STOP;
+
+	return list_lru_walk_node(&gfs2_qd_lru, sc->nid, gfs2_qd_walk, NULL,
+				  &sc->nr_to_scan);
 }
 
 unsigned long gfs2_qd_shrink_count(struct shrinker *shrink,
 				   struct shrink_control *sc)
 {
-	return vfs_pressure_ratio(atomic_read(&qd_lru_count));
+	return vfs_pressure_ratio(list_lru_count_node(&gfs2_qd_lru, sc->nid));
 }
 
 static u64 qd2index(struct gfs2_quota_data *qd)
@@ -177,15 +180,9 @@ static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
 
 	for (;;) {
 		found = 0;
-		spin_lock(&qd_lru_lock);
+		spin_lock(&qd_lock);
 		list_for_each_entry(qd, &sdp->sd_quota_list, qd_list) {
 			if (qid_eq(qd->qd_id, qid)) {
-				if (!atomic_read(&qd->qd_count) &&
-				    !list_empty(&qd->qd_reclaim)) {
-					/* Remove it from reclaim list */
-					list_del_init(&qd->qd_reclaim);
-					atomic_dec(&qd_lru_count);
-				}
 				atomic_inc(&qd->qd_count);
 				found = 1;
 				break;
@@ -202,9 +199,10 @@ static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
 			new_qd = NULL;
 		}
 
-		spin_unlock(&qd_lru_lock);
+		spin_unlock(&qd_lock);
 
 		if (qd) {
+			list_lru_del(&gfs2_qd_lru, &qd->qd_reclaim);
 			if (new_qd) {
 				gfs2_glock_put(new_qd->qd_gl);
 				kmem_cache_free(gfs2_quotad_cachep, new_qd);
@@ -228,12 +226,7 @@ static void qd_hold(struct gfs2_quota_data *qd)
 
 static void qd_put(struct gfs2_quota_data *qd)
 {
-	if (atomic_dec_and_lock(&qd->qd_count, &qd_lru_lock)) {
-		/* Add to the reclaim list */
-		list_add_tail(&qd->qd_reclaim, &qd_lru_list);
-		atomic_inc(&qd_lru_count);
-		spin_unlock(&qd_lru_lock);
-	}
+	list_lru_add_on_zero(&gfs2_qd_lru, &qd->qd_reclaim, &qd->qd_count);
 }
 
 static int slot_get(struct gfs2_quota_data *qd)
@@ -242,10 +235,10 @@ static int slot_get(struct gfs2_quota_data *qd)
 	unsigned int c, o = 0, b;
 	unsigned char byte = 0;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 
 	if (qd->qd_slot_count++) {
-		spin_unlock(&qd_lru_lock);
+		spin_unlock(&qd_lock);
 		return 0;
 	}
 
@@ -269,13 +262,13 @@ found:
 
 	sdp->sd_quota_bitmap[c][o] |= 1 << b;
 
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	return 0;
 
 fail:
 	qd->qd_slot_count--;
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 	return -ENOSPC;
 }
 
@@ -283,23 +276,23 @@ static void slot_hold(struct gfs2_quota_data *qd)
 {
 	struct gfs2_sbd *sdp = qd->qd_gl->gl_sbd;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	gfs2_assert(sdp, qd->qd_slot_count);
 	qd->qd_slot_count++;
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 }
 
 static void slot_put(struct gfs2_quota_data *qd)
 {
 	struct gfs2_sbd *sdp = qd->qd_gl->gl_sbd;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	gfs2_assert(sdp, qd->qd_slot_count);
 	if (!--qd->qd_slot_count) {
 		gfs2_icbit_munge(sdp, sdp->sd_quota_bitmap, qd->qd_slot, 0);
 		qd->qd_slot = -1;
 	}
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 }
 
 static int bh_get(struct gfs2_quota_data *qd)
@@ -374,7 +367,7 @@ static int qd_fish(struct gfs2_sbd *sdp, struct gfs2_quota_data **qdp)
 	if (sdp->sd_vfs->s_flags & MS_RDONLY)
 		return 0;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 
 	list_for_each_entry(qd, &sdp->sd_quota_list, qd_list) {
 		if (test_bit(QDF_LOCKED, &qd->qd_flags) ||
@@ -398,7 +391,7 @@ static int qd_fish(struct gfs2_sbd *sdp, struct gfs2_quota_data **qdp)
 	if (!found)
 		qd = NULL;
 
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	if (qd) {
 		gfs2_assert_warn(sdp, qd->qd_change_sync);
@@ -423,11 +416,11 @@ static int qd_trylock(struct gfs2_quota_data *qd)
 	if (sdp->sd_vfs->s_flags & MS_RDONLY)
 		return 0;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 
 	if (test_bit(QDF_LOCKED, &qd->qd_flags) ||
 	    !test_bit(QDF_CHANGE, &qd->qd_flags)) {
-		spin_unlock(&qd_lru_lock);
+		spin_unlock(&qd_lock);
 		return 0;
 	}
 
@@ -440,7 +433,7 @@ static int qd_trylock(struct gfs2_quota_data *qd)
 	gfs2_assert_warn(sdp, qd->qd_slot_count);
 	qd->qd_slot_count++;
 
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	gfs2_assert_warn(sdp, qd->qd_change_sync);
 	if (bh_get(qd)) {
@@ -602,9 +595,9 @@ static void do_qc(struct gfs2_quota_data *qd, s64 change)
 	x = be64_to_cpu(qc->qc_change) + change;
 	qc->qc_change = cpu_to_be64(x);
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	qd->qd_change = x;
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	if (!x) {
 		gfs2_assert_warn(sdp, test_bit(QDF_CHANGE, &qd->qd_flags));
@@ -974,9 +967,9 @@ static int need_sync(struct gfs2_quota_data *qd)
 	if (!qd->qd_qb.qb_limit)
 		return 0;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	value = qd->qd_change;
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	spin_lock(&gt->gt_spin);
 	num = gt->gt_quota_scale_num;
@@ -1067,9 +1060,9 @@ int gfs2_quota_check(struct gfs2_inode *ip, kuid_t uid, kgid_t gid)
 			continue;
 
 		value = (s64)be64_to_cpu(qd->qd_qb.qb_value);
-		spin_lock(&qd_lru_lock);
+		spin_lock(&qd_lock);
 		value += qd->qd_change;
-		spin_unlock(&qd_lru_lock);
+		spin_unlock(&qd_lock);
 
 		if (be64_to_cpu(qd->qd_qb.qb_limit) && (s64)be64_to_cpu(qd->qd_qb.qb_limit) < value) {
 			print_message(qd, "exceeded");
@@ -1258,11 +1251,11 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
 			qd->qd_slot = slot;
 			qd->qd_slot_count = 1;
 
-			spin_lock(&qd_lru_lock);
+			spin_lock(&qd_lock);
 			gfs2_icbit_munge(sdp, sdp->sd_quota_bitmap, slot, 1);
 			list_add(&qd->qd_list, &sdp->sd_quota_list);
 			atomic_inc(&sdp->sd_quota_count);
-			spin_unlock(&qd_lru_lock);
+			spin_unlock(&qd_lock);
 
 			found++;
 		}
@@ -1288,7 +1281,7 @@ void gfs2_quota_cleanup(struct gfs2_sbd *sdp)
 	struct gfs2_quota_data *qd;
 	unsigned int x;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	while (!list_empty(head)) {
 		qd = list_entry(head->prev, struct gfs2_quota_data, qd_list);
 
@@ -1296,20 +1289,15 @@ void gfs2_quota_cleanup(struct gfs2_sbd *sdp)
 		    (atomic_read(&qd->qd_count) &&
 		     !test_bit(QDF_CHANGE, &qd->qd_flags))) {
 			list_move(&qd->qd_list, head);
-			spin_unlock(&qd_lru_lock);
-			schedule();
-			spin_lock(&qd_lru_lock);
+			cond_resched_lock(&qd_lock);
 			continue;
 		}
 
 		list_del(&qd->qd_list);
+		spin_unlock(&qd_lock);
 		/* Also remove if this qd exists in the reclaim list */
-		if (!list_empty(&qd->qd_reclaim)) {
-			list_del_init(&qd->qd_reclaim);
-			atomic_dec(&qd_lru_count);
-		}
+		list_lru_del(&gfs2_qd_lru, &qd->qd_reclaim);
 		atomic_dec(&sdp->sd_quota_count);
-		spin_unlock(&qd_lru_lock);
 
 		if (!atomic_read(&qd->qd_count)) {
 			gfs2_assert_warn(sdp, !qd->qd_change);
@@ -1320,10 +1308,9 @@ void gfs2_quota_cleanup(struct gfs2_sbd *sdp)
 
 		gfs2_glock_put(qd->qd_gl);
 		kmem_cache_free(gfs2_quotad_cachep, qd);
-
-		spin_lock(&qd_lru_lock);
+		spin_lock(&qd_lock);
 	}
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	gfs2_assert_warn(sdp, !atomic_read(&sdp->sd_quota_count));
 
@@ -1462,7 +1449,7 @@ static int gfs2_quota_get_xstate(struct super_block *sb,
 	}
 	fqs->qs_uquota.qfs_nextents = 1; /* unsupported */
 	fqs->qs_gquota = fqs->qs_uquota; /* its the same inode in both cases */
-	fqs->qs_incoredqs = atomic_read(&qd_lru_count);
+	fqs->qs_incoredqs = list_lru_count(&gfs2_qd_lru);
 	return 0;
 }
 
diff --git a/fs/gfs2/quota.h b/fs/gfs2/quota.h
index 0f64d9d..19ac982 100644
--- a/fs/gfs2/quota.h
+++ b/fs/gfs2/quota.h
@@ -58,5 +58,6 @@ extern unsigned long gfs2_qd_shrink_count(struct shrinker *shrink,
 extern unsigned long gfs2_qd_shrink_scan(struct shrinker *shrink,
 					 struct shrink_control *sc);
 extern const struct quotactl_ops gfs2_quotactl_ops;
+extern struct list_lru gfs2_qd_lru;
 
 #endif /* __QUOTA_DOT_H__ */
diff --git a/include/linux/list_lru.h b/include/linux/list_lru.h
index 3ce5417..69895c0 100644
--- a/include/linux/list_lru.h
+++ b/include/linux/list_lru.h
@@ -53,6 +53,22 @@ int list_lru_init(struct list_lru *lru);
 bool list_lru_add(struct list_lru *lru, struct list_head *item);
 
 /**
+ * list_lru_add_on_zero: decrement counter and atomically add to lru on zero
+ * @list_lru: the lru pointer
+ * @item: the item to potentially add to the lru
+ * @counter: the counter to decrement
+ *
+ * If the @counter is zero after decrementing, then the item will be
+ * atomically added to the lru, assuming that it is not already there.
+ * This is identical to list_lru_add, aside from the additional testing
+ * of an atomic counter.
+ *
+ * Return value: true if the list was updated, false otherwise
+ */
+bool list_lru_add_on_zero(struct list_lru *lru, struct list_head *item, atomic_t
+ *count);
+
+/**
  * list_lru_del: delete an element to the lru list
  * @list_lru: the lru pointer
  * @item: the item to be deleted.
diff --git a/mm/list_lru.c b/mm/list_lru.c
index 7246791..22a57b1 100644
--- a/mm/list_lru.c
+++ b/mm/list_lru.c
@@ -29,6 +29,26 @@ bool list_lru_add(struct list_lru *lru, struct list_head *item)
 }
 EXPORT_SYMBOL_GPL(list_lru_add);
 
+bool list_lru_add_on_zero(struct list_lru *lru, struct list_head *item, atomic_t *count)
+{
+	int nid = page_to_nid(virt_to_page(item));
+	struct list_lru_node *nlru = &lru->node[nid];
+
+	if (atomic_dec_and_lock(count, &nlru->lock)) {
+		WARN_ON_ONCE(nlru->nr_items < 0);
+		if (list_empty(item)) {
+			list_add_tail(item, &nlru->list);
+			if (nlru->nr_items++ == 0)
+				node_set(nid, lru->active_nodes);
+			spin_unlock(&nlru->lock);
+			return true;
+		}
+		spin_unlock(&nlru->lock);
+	}
+	return false;		
+}
+EXPORT_SYMBOL_GPL(list_lru_add_on_zero);
+
 bool list_lru_del(struct list_lru *lru, struct list_head *item)
 {
 	int nid = page_to_nid(virt_to_page(item));




^ permalink raw reply related	[flat|nested] 2+ messages in thread

* [Cluster-devel] GFS2: Use list_lru for quota lru list
  2013-09-13 11:44 [Cluster-devel] GFS2: Use list_lru for quota lru list Steven Whitehouse
@ 2013-09-17  8:38 ` Dave Chinner
  0 siblings, 0 replies; 2+ messages in thread
From: Dave Chinner @ 2013-09-17  8:38 UTC (permalink / raw)
  To: cluster-devel.redhat.com

On Fri, Sep 13, 2013 at 12:44:03PM +0100, Steven Whitehouse wrote:
> 
> This patch is a first go (only compile tested so far) at what list_lru
> support should look like for GFS2's quota subsystem. I've done this one
> rather than the glock lru list first, since this is easier. There are
> still some unresolved issues on the glock side with how to use the new
> list_lru code.
> 
> The conversion is fairly straight forward in this case though, just
> requiring an extra function to conditionally add items to the lru
> list upon an atomic counter hitting zero. The other change is that
> the locking as split into two, one the internal lock in the lru code,
> and the other being a new quota spinlock, which is used for all the
> non-lru locking.

I'm nmot sure I really like the concept of adding reference count
futzing into the list_lru operations. The whole point of th
einternal locking of the list_lru is that it is independent of the
objects being placed on the LRUs. That is, objects cannot use the
internal LRU list locks for serialisation in any manner as it is
purely there for correctness of LRU list internals, not for external
object lifecycle management.

More below.

<trimmed to jus tbe the new code block>

> +static enum lru_status gfs2_qd_walk(struct list_head *item, spinlock_t *lock, void *arg)
>  {
>  	struct gfs2_quota_data *qd;
>  	struct gfs2_sbd *sdp;
>  
> +	qd = list_entry(item, struct gfs2_quota_data, qd_reclaim);
> +	sdp = qd->qd_gl->gl_sbd;
>  
> +	spin_lock(&qd_lock);
> +	if (atomic_read(&qd->qd_count)) {
> +		spin_unlock(&qd_lock);
> +		return LRU_SKIP;
> +	}

The LRU locks are considered "innermost" so you cannot take other
locks inside the lru locks like this. So that needs to be a
spin_trylock()....

>  
> +	/* Free from the filesystem-specific list */
> +	list_del(&qd->qd_list);

LRU lists require list_del_init()

> +	spin_unlock(&qd_lock);
> +	spin_unlock(lock);

You dropped the LRU lock, and hence can only return LRU_RETRY at
this point to indicate that the LRU traversal needs to be restarted.

> +	gfs2_assert_warn(sdp, !qd->qd_change);
> +	gfs2_assert_warn(sdp, !qd->qd_slot_count);
> +	gfs2_assert_warn(sdp, !qd->qd_bh_count);
>  
> +	gfs2_glock_put(qd->qd_gl);
> +	atomic_dec(&sdp->sd_quota_count);
>  
> +	/* Delete it from the common reclaim list */
> +	kmem_cache_free(gfs2_quotad_cachep, qd);

This callback is intended not to be used directly for freeing
objects. The idea is that you pass a dispose list to the callback
and you mark items as "on the dispose list" when you move it to them
so you don't need to drop the LRU lock. Then when the scan returns,
you walk the dispose list and free all the objects...

> +	spin_lock(lock);
> +
> +	return LRU_REMOVED;
> +}
> +
> +unsigned long gfs2_qd_shrink_scan(struct shrinker *shrink,
> +				  struct shrink_control *sc)
> +{
> +	if (!(sc->gfp_mask & __GFP_FS))
> +		return SHRINK_STOP;
> +
> +	return list_lru_walk_node(&gfs2_qd_lru, sc->nid, gfs2_qd_walk, NULL,
> +				  &sc->nr_to_scan);

i.e. here is where you'd dispose of all the items isolated from the
LRU in the isolation callback.

>  static u64 qd2index(struct gfs2_quota_data *qd)
> @@ -177,15 +180,9 @@ static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
>  
>  	for (;;) {
>  		found = 0;
> +		spin_lock(&qd_lock);
>  		list_for_each_entry(qd, &sdp->sd_quota_list, qd_list) {
>  			if (qid_eq(qd->qd_id, qid)) {
>  				atomic_inc(&qd->qd_count);
>  				found = 1;
>  				break;
> @@ -202,9 +199,10 @@ static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
>  			new_qd = NULL;
>  		}
>  
> +		spin_unlock(&qd_lock);
>  
>  		if (qd) {
> +			list_lru_del(&gfs2_qd_lru, &qd->qd_reclaim);
>  			if (new_qd) {
>  				gfs2_glock_put(new_qd->qd_gl);
>  				kmem_cache_free(gfs2_quotad_cachep, new_qd);
> @@ -228,12 +226,7 @@ static void qd_hold(struct gfs2_quota_data *qd)
>  
>  static void qd_put(struct gfs2_quota_data *qd)
>  {
> -	if (atomic_dec_and_lock(&qd->qd_count, &qd_lru_lock)) {
> -		/* Add to the reclaim list */
> -		list_add_tail(&qd->qd_reclaim, &qd_lru_list);
> -		atomic_inc(&qd_lru_count);
> -		spin_unlock(&qd_lru_lock);
> -	}
> +	list_lru_add_on_zero(&gfs2_qd_lru, &qd->qd_reclaim, &qd->qd_count);

Why do you need to take the LRU lock atomically here?

It appears to me that you have a situation where the reference count
of zero means "on the LRU list", but there's nothing in qd_get()
that enforces the LRU removal only occurs for the first reference
that is taken. Indeed, why do you even need to remove the item from
the LRU list when you get a reference to it? you skip referenced
dquots in the isolation callback, so the only time it needs to be
removed from the LRU is on reclaim. And that means you only need an
atomic_dec_and_test() to determine if you need to add the dquot to
the LRU....

So what it appears to me is that you need to do is:

	a) separate the dq_lru_lock > dq_lock changes into a
	separate patch

	b) separate the object reference counting from the LRU
	operations

	c) make the LRU operations the innermost operations for
	locking purposes

	d) convert to list_lru operations...

Cheers,

Dave.
-- 
Dave Chinner
dchinner at redhat.com



^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2013-09-17  8:38 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2013-09-13 11:44 [Cluster-devel] GFS2: Use list_lru for quota lru list Steven Whitehouse
2013-09-17  8:38 ` Dave Chinner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).