From: Sage Weil <sage@newdream.net>
To: linux-fsdevel@vger.kernel.org, linux-kernel@vger.kernel.org
Cc: Sage Weil <sage@newdream.net>
Subject: [PATCH 22/26] ceph: message pools
Date: Tue, 2 Mar 2010 16:46:53 -0800 [thread overview]
Message-ID: <1267577217-31923-23-git-send-email-sage@newdream.net> (raw)
In-Reply-To: <1267577217-31923-1-git-send-email-sage@newdream.net>
The msgpool is a basic mempool_t-like structure to preallocate
messages we expect to receive over the wire. This ensures we have the
necessary memory preallocated to process replies to requests, or to
process unsolicited messages from various servers.
Signed-off-by: Sage Weil <sage@newdream.net>
---
fs/ceph/msgpool.c | 186 +++++++++++++++++++++++++++++++++++++++++++++++++++++
fs/ceph/msgpool.h | 27 ++++++++
2 files changed, 213 insertions(+), 0 deletions(-)
create mode 100644 fs/ceph/msgpool.c
create mode 100644 fs/ceph/msgpool.h
diff --git a/fs/ceph/msgpool.c b/fs/ceph/msgpool.c
new file mode 100644
index 0000000..ca3b44a
--- /dev/null
+++ b/fs/ceph/msgpool.c
@@ -0,0 +1,186 @@
+#include "ceph_debug.h"
+
+#include <linux/err.h>
+#include <linux/sched.h>
+#include <linux/types.h>
+#include <linux/vmalloc.h>
+
+#include "msgpool.h"
+
+/*
+ * We use msg pools to preallocate memory for messages we expect to
+ * receive over the wire, to avoid getting ourselves into OOM
+ * conditions at unexpected times. We take use a few different
+ * strategies:
+ *
+ * - for request/response type interactions, we preallocate the
+ * memory needed for the response when we generate the request.
+ *
+ * - for messages we can receive at any time from the MDS, we preallocate
+ * a pool of messages we can re-use.
+ *
+ * - for writeback, we preallocate some number of messages to use for
+ * requests and their replies, so that we always make forward
+ * progress.
+ *
+ * The msgpool behaves like a mempool_t, but keeps preallocated
+ * ceph_msgs strung together on a list_head instead of using a pointer
+ * vector. This avoids vector reallocation when we adjust the number
+ * of preallocated items (which happens frequently).
+ */
+
+
+/*
+ * Allocate or release as necessary to meet our target pool size.
+ */
+static int __fill_msgpool(struct ceph_msgpool *pool)
+{
+ struct ceph_msg *msg;
+
+ while (pool->num < pool->min) {
+ dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num,
+ pool->min);
+ spin_unlock(&pool->lock);
+ msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
+ spin_lock(&pool->lock);
+ if (IS_ERR(msg))
+ return PTR_ERR(msg);
+ msg->pool = pool;
+ list_add(&msg->list_head, &pool->msgs);
+ pool->num++;
+ }
+ while (pool->num > pool->min) {
+ msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head);
+ dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num,
+ pool->min, msg);
+ list_del_init(&msg->list_head);
+ pool->num--;
+ ceph_msg_kfree(msg);
+ }
+ return 0;
+}
+
+int ceph_msgpool_init(struct ceph_msgpool *pool,
+ int front_len, int min, bool blocking)
+{
+ int ret;
+
+ dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min);
+ spin_lock_init(&pool->lock);
+ pool->front_len = front_len;
+ INIT_LIST_HEAD(&pool->msgs);
+ pool->num = 0;
+ pool->min = min;
+ pool->blocking = blocking;
+ init_waitqueue_head(&pool->wait);
+
+ spin_lock(&pool->lock);
+ ret = __fill_msgpool(pool);
+ spin_unlock(&pool->lock);
+ return ret;
+}
+
+void ceph_msgpool_destroy(struct ceph_msgpool *pool)
+{
+ dout("msgpool_destroy %p\n", pool);
+ spin_lock(&pool->lock);
+ pool->min = 0;
+ __fill_msgpool(pool);
+ spin_unlock(&pool->lock);
+}
+
+int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta)
+{
+ int ret;
+
+ spin_lock(&pool->lock);
+ dout("msgpool_resv %p delta %d\n", pool, delta);
+ pool->min += delta;
+ ret = __fill_msgpool(pool);
+ spin_unlock(&pool->lock);
+ return ret;
+}
+
+struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len)
+{
+ wait_queue_t wait;
+ struct ceph_msg *msg;
+
+ if (front_len && front_len > pool->front_len) {
+ pr_err("msgpool_get pool %p need front %d, pool size is %d\n",
+ pool, front_len, pool->front_len);
+ WARN_ON(1);
+
+ /* try to alloc a fresh message */
+ msg = ceph_msg_new(0, front_len, 0, 0, NULL);
+ if (!IS_ERR(msg))
+ return msg;
+ }
+
+ if (!front_len)
+ front_len = pool->front_len;
+
+ if (pool->blocking) {
+ /* mempool_t behavior; first try to alloc */
+ msg = ceph_msg_new(0, front_len, 0, 0, NULL);
+ if (!IS_ERR(msg))
+ return msg;
+ }
+
+ while (1) {
+ spin_lock(&pool->lock);
+ if (likely(pool->num)) {
+ msg = list_entry(pool->msgs.next, struct ceph_msg,
+ list_head);
+ list_del_init(&msg->list_head);
+ pool->num--;
+ dout("msgpool_get %p got %p, now %d/%d\n", pool, msg,
+ pool->num, pool->min);
+ spin_unlock(&pool->lock);
+ return msg;
+ }
+ pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num,
+ pool->min, pool->blocking ? "waiting" : "may fail");
+ spin_unlock(&pool->lock);
+
+ if (!pool->blocking) {
+ WARN_ON(1);
+
+ /* maybe we can allocate it now? */
+ msg = ceph_msg_new(0, front_len, 0, 0, NULL);
+ if (!IS_ERR(msg))
+ return msg;
+
+ pr_err("msgpool_get %p empty + alloc failed\n", pool);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ init_wait(&wait);
+ prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE);
+ schedule();
+ finish_wait(&pool->wait, &wait);
+ }
+}
+
+void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
+{
+ spin_lock(&pool->lock);
+ if (pool->num < pool->min) {
+ /* reset msg front_len; user may have changed it */
+ msg->front.iov_len = pool->front_len;
+ msg->hdr.front_len = cpu_to_le32(pool->front_len);
+
+ kref_set(&msg->kref, 1); /* retake a single ref */
+ list_add(&msg->list_head, &pool->msgs);
+ pool->num++;
+ dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,
+ pool->num, pool->min);
+ spin_unlock(&pool->lock);
+ wake_up(&pool->wait);
+ } else {
+ dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg,
+ pool->num, pool->min);
+ spin_unlock(&pool->lock);
+ ceph_msg_kfree(msg);
+ }
+}
diff --git a/fs/ceph/msgpool.h b/fs/ceph/msgpool.h
new file mode 100644
index 0000000..bc834bf
--- /dev/null
+++ b/fs/ceph/msgpool.h
@@ -0,0 +1,27 @@
+#ifndef _FS_CEPH_MSGPOOL
+#define _FS_CEPH_MSGPOOL
+
+#include "messenger.h"
+
+/*
+ * we use memory pools for preallocating messages we may receive, to
+ * avoid unexpected OOM conditions.
+ */
+struct ceph_msgpool {
+ spinlock_t lock;
+ int front_len; /* preallocated payload size */
+ struct list_head msgs; /* msgs in the pool; each has 1 ref */
+ int num, min; /* cur, min # msgs in the pool */
+ bool blocking;
+ wait_queue_head_t wait;
+};
+
+extern int ceph_msgpool_init(struct ceph_msgpool *pool,
+ int front_len, int size, bool blocking);
+extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
+extern int ceph_msgpool_resv(struct ceph_msgpool *, int delta);
+extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *,
+ int front_len);
+extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);
+
+#endif
--
1.7.0
next prev parent reply other threads:[~2010-03-03 0:46 UTC|newest]
Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-03-03 0:46 [PATCH 00/26] ceph distributed file system client Sage Weil
2010-03-03 0:46 ` [PATCH 01/26] ceph: documentation Sage Weil
2010-03-03 0:46 ` [PATCH 02/26] ceph: on-wire types Sage Weil
2010-03-03 0:46 ` [PATCH 03/26] ceph: client types Sage Weil
2010-03-03 0:46 ` [PATCH 04/26] ceph: hash function Sage Weil
2010-03-03 0:46 ` [PATCH 05/26] ceph: ref counted buffer Sage Weil
2010-03-03 0:46 ` [PATCH 06/26] ceph: dynamic pagelist buffer Sage Weil
2010-03-03 0:46 ` [PATCH 07/26] ceph: super.c Sage Weil
2010-03-03 0:46 ` [PATCH 08/26] ceph: inode operations Sage Weil
2010-03-03 0:46 ` [PATCH 09/26] ceph: directory operations Sage Weil
2010-03-03 0:46 ` [PATCH 10/26] ceph: file operations Sage Weil
2010-03-03 0:46 ` [PATCH 11/26] ceph: address space operations Sage Weil
2010-03-03 0:46 ` [PATCH 12/26] ceph: MDS client Sage Weil
2010-03-03 0:46 ` [PATCH 13/26] ceph: OSD client Sage Weil
2010-03-03 0:46 ` [PATCH 14/26] ceph: CRUSH mapping algorithm Sage Weil
2010-03-03 0:46 ` [PATCH 15/26] ceph: monitor client Sage Weil
2010-03-03 0:46 ` [PATCH 16/26] ceph: authentication interface Sage Weil
2010-03-03 0:46 ` [PATCH 17/26] ceph: trivial 'auth_none' authentication scheme Sage Weil
2010-03-03 0:46 ` [PATCH 18/26] ceph: 'auth_x' " Sage Weil
2010-03-03 0:46 ` [PATCH 19/26] ceph: capability management Sage Weil
2010-03-03 0:46 ` [PATCH 20/26] ceph: snapshot management Sage Weil
2010-03-03 0:46 ` [PATCH 21/26] ceph: messenger library Sage Weil
2010-03-03 0:46 ` Sage Weil [this message]
2010-03-03 0:46 ` [PATCH 23/26] ceph: nfs re-export support Sage Weil
2010-03-03 7:48 ` Christoph Hellwig
2010-03-03 21:37 ` Sage Weil
2010-03-04 11:50 ` Miklos Szeredi
2010-03-03 0:46 ` [PATCH 24/26] ceph: ioctls Sage Weil
2010-03-03 0:46 ` [PATCH 25/26] ceph: debugfs Sage Weil
2010-03-03 0:46 ` [PATCH 26/26] ceph: Kconfig, Makefile, MAINTAINERS entry Sage Weil
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1267577217-31923-23-git-send-email-sage@newdream.net \
--to=sage@newdream.net \
--cc=linux-fsdevel@vger.kernel.org \
--cc=linux-kernel@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).