* [PATCH 2/3] fuse: add fuse numa node struct
2013-04-30 6:17 FUSE: fixes to improve scalability on NUMA systems Srinivas Eeda
2013-04-30 6:17 ` [PATCH 1/3] fuse: add numa mount option Srinivas Eeda
@ 2013-04-30 6:17 ` Srinivas Eeda
2013-04-30 6:17 ` [PATCH 3/3] fuse: split fuse queues to help numa systems Srinivas Eeda
2013-04-30 16:29 ` [fuse-devel] FUSE: fixes to improve scalability on NUMA systems Miklos Szeredi
3 siblings, 0 replies; 8+ messages in thread
From: Srinivas Eeda @ 2013-04-30 6:17 UTC (permalink / raw)
To: linux-fsdevel, fuse-devel; +Cc: mszeredi, srinivas.eeda
This patch introduces new structure fuse_numa_node, which groups some fields
from fuse_conn structure. An instance of fuse_numa_node is created per each
numa node that exists on the system. This is to reduce contention on single
spinlock which creates latencies when accessesed across NUMA regions.
Signed-off-by: Srinivas Eeda <srinivas.eeda@oracle.com>
---
fs/fuse/control.c | 25 ++++++++----
fs/fuse/cuse.c | 11 ++++-
fs/fuse/fuse_i.h | 118 +++++++++++++++++++++++++++++++----------------------
fs/fuse/inode.c | 114 +++++++++++++++++++++++++++++++++++++++------------
4 files changed, 182 insertions(+), 86 deletions(-)
diff --git a/fs/fuse/control.c b/fs/fuse/control.c
index a0b0855..9a9ca5c 100644
--- a/fs/fuse/control.c
+++ b/fs/fuse/control.c
@@ -48,12 +48,13 @@ static ssize_t fuse_conn_waiting_read(struct file *file, char __user *buf,
size_t size;
if (!*ppos) {
- long value;
+ long i, value;
struct fuse_conn *fc = fuse_ctl_file_conn_get(file);
if (!fc)
return 0;
- value = atomic_read(&fc->num_waiting);
+ for (i = 0, value = 0; i < fc->nr_nodes; i++)
+ value += atomic_read(&fc->nn[i]->num_waiting);
file->private_data = (void *)value;
fuse_conn_put(fc);
}
@@ -101,13 +102,14 @@ static ssize_t fuse_conn_max_background_read(struct file *file,
loff_t *ppos)
{
struct fuse_conn *fc;
- unsigned val;
+ unsigned i, val;
fc = fuse_ctl_file_conn_get(file);
if (!fc)
return 0;
- val = fc->max_background;
+ for (i = 0, val = 0; i < fc->nr_nodes; i++)
+ val += fc->nn[i]->max_background;
fuse_conn_put(fc);
return fuse_conn_limit_read(file, buf, len, ppos, val);
@@ -123,9 +125,12 @@ static ssize_t fuse_conn_max_background_write(struct file *file,
ret = fuse_conn_limit_write(file, buf, count, ppos, &val,
max_user_bgreq);
if (ret > 0) {
+ int i;
struct fuse_conn *fc = fuse_ctl_file_conn_get(file);
if (fc) {
- fc->max_background = val;
+ val = (val + fc->nr_nodes - 1) / fc->nr_nodes;
+ for (i = 0; i < fc->nr_nodes; i++)
+ fc->nn[i]->max_background = val;
fuse_conn_put(fc);
}
}
@@ -138,13 +143,14 @@ static ssize_t fuse_conn_congestion_threshold_read(struct file *file,
loff_t *ppos)
{
struct fuse_conn *fc;
- unsigned val;
+ unsigned i, val;
fc = fuse_ctl_file_conn_get(file);
if (!fc)
return 0;
- val = fc->congestion_threshold;
+ for (i = 0, val = 0; i < fc->nr_nodes; i++)
+ val += fc->nn[i]->congestion_threshold;
fuse_conn_put(fc);
return fuse_conn_limit_read(file, buf, len, ppos, val);
@@ -160,9 +166,12 @@ static ssize_t fuse_conn_congestion_threshold_write(struct file *file,
ret = fuse_conn_limit_write(file, buf, count, ppos, &val,
max_user_congthresh);
if (ret > 0) {
+ int i;
struct fuse_conn *fc = fuse_ctl_file_conn_get(file);
if (fc) {
- fc->congestion_threshold = val;
+ val = (val + fc->nr_nodes - 1) / fc->nr_nodes;
+ for (i = 0; i < fc->nr_nodes; i++)
+ fc->nn[i]->congestion_threshold = val;
fuse_conn_put(fc);
}
}
diff --git a/fs/fuse/cuse.c b/fs/fuse/cuse.c
index de10bdf..90d99d4 100644
--- a/fs/fuse/cuse.c
+++ b/fs/fuse/cuse.c
@@ -498,13 +498,14 @@ static int cuse_channel_open(struct inode *inode, struct file *file)
if (!cc)
return -ENOMEM;
- fuse_conn_init(&cc->fc, 0);
+ rc = fuse_conn_init(&cc->fc, 0);
+ if (rc < 0)
+ return rc;
INIT_LIST_HEAD(&cc->list);
cc->fc.release = cuse_fc_release;
cc->fc.connected = 1;
- cc->fc.blocked = 0;
rc = cuse_send_init(cc);
if (rc) {
fuse_conn_put(&cc->fc);
@@ -562,8 +563,12 @@ static ssize_t cuse_class_waiting_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct cuse_conn *cc = dev_get_drvdata(dev);
+ int i, val;
+
+ for (i = 0, val = 0; i < cc->fc.nr_nodes; i++)
+ val += atomic_read(&cc->fc.nn[i]->num_waiting);
- return sprintf(buf, "%d\n", atomic_read(&cc->fc.num_waiting));
+ return sprintf(buf, "%d\n", val);
}
static ssize_t cuse_class_abort_store(struct device *dev,
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index dd9a7ad..b44675b 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -232,6 +232,9 @@ enum fuse_req_state {
* A request to the client
*/
struct fuse_req {
+ /* numa node number on which fuse_req is allocated from */
+ int numaid;
+
/** This can be on either pending processing or io lists in
fuse_conn */
struct list_head list;
@@ -342,6 +345,66 @@ struct fuse_req {
struct file *stolen_file;
};
+/* structure that tracks numa node specific fields */
+struct fuse_numa_node {
+ /* numa node id */
+ int numaid;
+
+ /* Lock protecting accessess to members of this structure */
+ spinlock_t lock;
+
+ /* pointer to main fuse_connection */
+ struct fuse_conn *fc;
+
+ /* Flag indicating if queue is blocked. This will be
+ the case before the INIT reply is received, and if there
+ are too many outstading backgrounds requests */
+ int blocked;
+
+ /* Maximum number of outstanding background requests */
+ unsigned max_background;
+
+ /* Number of background requests at which congestion starts */
+ unsigned congestion_threshold;
+
+ /* Number of requests currently in the background */
+ unsigned num_background;
+
+ /* Number of background requests currently queued for userspace */
+ unsigned active_background;
+
+ /* The number of requests waiting for completion */
+ atomic_t num_waiting;
+
+ /** Queue of pending forgets */
+ struct fuse_forget_link forget_list_head;
+ struct fuse_forget_link *forget_list_tail;
+
+ /** Batching of FORGET requests (positive indicates FORGET batch) */
+ int forget_batch;
+
+ /* waitq for blocked connection */
+ wait_queue_head_t blocked_waitq;
+
+ /* Readers of the connection are waiting on this */
+ wait_queue_head_t waitq;
+
+ /* The list of background requests set aside for later queuing */
+ struct list_head bg_queue;
+
+ /* Pending interrupts */
+ struct list_head interrupts;
+
+ /* The list of pending requests */
+ struct list_head pending;
+
+ /* The list of requests being processed */
+ struct list_head processing;
+
+ /* The list of requests under I/O */
+ struct list_head io;
+};
+
/**
* A Fuse connection.
*
@@ -356,6 +419,9 @@ struct fuse_conn {
/** tracks if numa enabled */
int numa_on;
+ /** Number of numa nodes */
+ int nr_nodes;
+
/** Mutex protecting against directory alias creation */
struct mutex inst_mutex;
@@ -377,57 +443,12 @@ struct fuse_conn {
/** Maximum write size */
unsigned max_write;
- /** Readers of the connection are waiting on this */
- wait_queue_head_t waitq;
-
- /** The list of pending requests */
- struct list_head pending;
-
- /** The list of requests being processed */
- struct list_head processing;
-
- /** The list of requests under I/O */
- struct list_head io;
-
/** The next unique kernel file handle */
u64 khctr;
/** rbtree of fuse_files waiting for poll events indexed by ph */
struct rb_root polled_files;
- /** Maximum number of outstanding background requests */
- unsigned max_background;
-
- /** Number of background requests at which congestion starts */
- unsigned congestion_threshold;
-
- /** Number of requests currently in the background */
- unsigned num_background;
-
- /** Number of background requests currently queued for userspace */
- unsigned active_background;
-
- /** The list of background requests set aside for later queuing */
- struct list_head bg_queue;
-
- /** Pending interrupts */
- struct list_head interrupts;
-
- /** Queue of pending forgets */
- struct fuse_forget_link forget_list_head;
- struct fuse_forget_link *forget_list_tail;
-
- /** Batching of FORGET requests (positive indicates FORGET batch) */
- int forget_batch;
-
- /** Flag indicating if connection is blocked. This will be
- the case before the INIT reply is received, and if there
- are too many outstading backgrounds requests */
- int blocked;
-
- /** waitq for blocked connection */
- wait_queue_head_t blocked_waitq;
-
/** waitq for reserved requests */
wait_queue_head_t reserved_req_waitq;
@@ -523,9 +544,6 @@ struct fuse_conn {
/** Does the filesystem want adaptive readdirplus? */
unsigned readdirplus_auto:1;
- /** The number of requests waiting for completion */
- atomic_t num_waiting;
-
/** Negotiated minor version */
unsigned minor;
@@ -564,6 +582,8 @@ struct fuse_conn {
/** Read/write semaphore to hold when accessing sb. */
struct rw_semaphore killsb;
+
+ struct fuse_numa_node **nn;
};
static inline struct fuse_conn *get_fuse_conn_super(struct super_block *sb)
@@ -766,7 +786,7 @@ void fuse_conn_kill(struct fuse_conn *fc);
/**
* Initialize fuse_conn
*/
-void fuse_conn_init(struct fuse_conn *fc, int numaon);
+int fuse_conn_init(struct fuse_conn *fc, int numaon);
/**
* Release reference to fuse_conn
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index 1837f74..250eb38 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -360,14 +360,21 @@ static void fuse_bdi_destroy(struct fuse_conn *fc)
void fuse_conn_kill(struct fuse_conn *fc)
{
+ int i;
+ struct fuse_numa_node *nn;
+
spin_lock(&fc->lock);
fc->connected = 0;
- fc->blocked = 0;
spin_unlock(&fc->lock);
/* Flush all readers on this fs */
kill_fasync(&fc->fasync, SIGIO, POLL_IN);
- wake_up_all(&fc->waitq);
- wake_up_all(&fc->blocked_waitq);
+ for (i = 0; i < fc->nr_nodes; i++) {
+ nn = fc->nn[i];
+ nn->blocked = 0;
+ wake_up_all(&nn->waitq);
+ wake_up_all(&nn->blocked_waitq);
+ }
+ wake_up_all(&fc->poll_waitq);
wake_up_all(&fc->reserved_req_waitq);
}
EXPORT_SYMBOL_GPL(fuse_conn_kill);
@@ -567,8 +574,11 @@ static int fuse_show_options(struct seq_file *m, struct dentry *root)
return 0;
}
-void fuse_conn_init(struct fuse_conn *fc, int numaon)
+int fuse_conn_init(struct fuse_conn *fc, int numaon)
{
+ int i, sz, ret;
+ struct fuse_numa_node *nn;
+
memset(fc, 0, sizeof(*fc));
spin_lock_init(&fc->lock);
mutex_init(&fc->inst_mutex);
@@ -576,25 +586,61 @@ void fuse_conn_init(struct fuse_conn *fc, int numaon)
atomic_set(&fc->count, 1);
if (numaon)
fc->numa_on = 1;
- init_waitqueue_head(&fc->waitq);
- init_waitqueue_head(&fc->blocked_waitq);
init_waitqueue_head(&fc->reserved_req_waitq);
- INIT_LIST_HEAD(&fc->pending);
- INIT_LIST_HEAD(&fc->processing);
- INIT_LIST_HEAD(&fc->io);
- INIT_LIST_HEAD(&fc->interrupts);
- INIT_LIST_HEAD(&fc->bg_queue);
+ init_waitqueue_head(&fc->poll_waitq);
INIT_LIST_HEAD(&fc->entry);
- fc->forget_list_tail = &fc->forget_list_head;
- atomic_set(&fc->num_waiting, 0);
- fc->max_background = FUSE_DEFAULT_MAX_BACKGROUND;
- fc->congestion_threshold = FUSE_DEFAULT_CONGESTION_THRESHOLD;
fc->khctr = 0;
fc->polled_files = RB_ROOT;
fc->reqctr = 0;
- fc->blocked = 1;
fc->attr_version = 1;
get_random_bytes(&fc->scramble_key, sizeof(fc->scramble_key));
+
+ if (numaon) {
+ fc->numa_on = 1;
+ fc->nr_nodes = nr_node_ids;
+ } else
+ fc->nr_nodes = 1;
+
+ ret = -ENOMEM;
+ sz = sizeof(struct fuse_numa_node *) * fc->nr_nodes;
+ fc->nn = kmalloc(sz, GFP_KERNEL);
+ if (!fc->nn)
+ return ret;
+ memset(fc->nn, 0, sz);
+
+ sz = sizeof(struct fuse_numa_node);
+ for (i = 0; i < fc->nr_nodes; i++) {
+ nn = kmalloc_node(sz, GFP_KERNEL, i);
+ if (!nn)
+ goto out;
+ memset(nn, 0, sz);
+ fc->nn[i] = nn;
+ nn->fc = fc;
+ nn->numaid = i;
+ nn->blocked = 1;
+ spin_lock_init(&nn->lock);
+ init_waitqueue_head(&nn->waitq);
+ init_waitqueue_head(&nn->blocked_waitq);
+ INIT_LIST_HEAD(&nn->bg_queue);
+ INIT_LIST_HEAD(&nn->interrupts);
+ INIT_LIST_HEAD(&nn->pending);
+ INIT_LIST_HEAD(&nn->processing);
+ INIT_LIST_HEAD(&nn->io);
+ nn->forget_list_tail = &nn->forget_list_head;
+ atomic_set(&nn->num_waiting, 0);
+ nn->max_background = FUSE_DEFAULT_MAX_BACKGROUND;
+ nn->congestion_threshold = FUSE_DEFAULT_CONGESTION_THRESHOLD;
+ }
+ return 0;
+out:
+ while (i > 0) {
+ if (fc->nn[i - 1])
+ kfree(fc->nn[i - 1]);
+ i--;
+ };
+ if (fc->nn)
+ kfree(fc->nn);
+ return ret;
}
EXPORT_SYMBOL_GPL(fuse_conn_init);
@@ -816,6 +862,7 @@ static int set_global_limit(const char *val, struct kernel_param *kp)
static void process_init_limits(struct fuse_conn *fc, struct fuse_init_out *arg)
{
int cap_sys_admin = capable(CAP_SYS_ADMIN);
+ int i, val;
if (arg->minor < 13)
return;
@@ -824,22 +871,29 @@ static void process_init_limits(struct fuse_conn *fc, struct fuse_init_out *arg)
sanitize_global_limit(&max_user_congthresh);
if (arg->max_background) {
- fc->max_background = arg->max_background;
+ val = arg->max_background;
+ if (!cap_sys_admin && (val > max_user_bgreq))
+ val = max_user_bgreq;
+
+ val = (val + fc->nr_nodes - 1) / fc->nr_nodes;
+ for (i = 0; i < fc->nr_nodes; i++)
+ fc->nn[i]->max_background = val;
- if (!cap_sys_admin && fc->max_background > max_user_bgreq)
- fc->max_background = max_user_bgreq;
}
if (arg->congestion_threshold) {
- fc->congestion_threshold = arg->congestion_threshold;
+ val = arg->congestion_threshold;
+ if (!cap_sys_admin && val > max_user_congthresh)
+ val = max_user_congthresh;
- if (!cap_sys_admin &&
- fc->congestion_threshold > max_user_congthresh)
- fc->congestion_threshold = max_user_congthresh;
+ val = (val + fc->nr_nodes - 1) / fc->nr_nodes;
+ for (i = 0; i < fc->nr_nodes; i++)
+ fc->nn[i]->congestion_threshold = val;
}
}
static void process_init_reply(struct fuse_conn *fc, struct fuse_req *req)
{
+ int i;
struct fuse_init_out *arg = &req->misc.init_out;
if (req->out.h.error || arg->major != FUSE_KERNEL_VERSION)
@@ -891,8 +945,10 @@ static void process_init_reply(struct fuse_conn *fc, struct fuse_req *req)
fc->max_write = max_t(unsigned, 4096, fc->max_write);
fc->conn_init = 1;
}
- fc->blocked = 0;
- wake_up_all(&fc->blocked_waitq);
+ for (i = 0; i < fc->nr_nodes; i++) {
+ fc->nn[i]->blocked = 0;
+ wake_up_all(&fc->nn[i]->blocked_waitq);
+ }
}
static void fuse_send_init(struct fuse_conn *fc, struct fuse_req *req)
@@ -924,6 +980,11 @@ static void fuse_send_init(struct fuse_conn *fc, struct fuse_req *req)
static void fuse_free_conn(struct fuse_conn *fc)
{
+ int i;
+
+ for (i = 0; i < fc->nr_nodes; i++)
+ if (fc->nn[i])
+ kfree(fc->nn[i]);
kfree(fc);
}
@@ -1019,7 +1080,8 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent)
if (!fc)
goto err_fput;
- fuse_conn_init(fc, d.numaon);
+ if (fuse_conn_init(fc, d.numaon) < 0)
+ goto err_fput;
fc->dev = sb->s_dev;
fc->sb = sb;
--
1.5.4.3
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 3/3] fuse: split fuse queues to help numa systems
2013-04-30 6:17 FUSE: fixes to improve scalability on NUMA systems Srinivas Eeda
2013-04-30 6:17 ` [PATCH 1/3] fuse: add numa mount option Srinivas Eeda
2013-04-30 6:17 ` [PATCH 2/3] fuse: add fuse numa node struct Srinivas Eeda
@ 2013-04-30 6:17 ` Srinivas Eeda
2013-04-30 16:29 ` [fuse-devel] FUSE: fixes to improve scalability on NUMA systems Miklos Szeredi
3 siblings, 0 replies; 8+ messages in thread
From: Srinivas Eeda @ 2013-04-30 6:17 UTC (permalink / raw)
To: linux-fsdevel, fuse-devel; +Cc: mszeredi, srinivas.eeda
This patch modifies dev.c to accomadate new fuse_numa_node structure
Signed-off-by: Srinivas Eeda <srinivas.eeda@oracle.com>
---
fs/fuse/dev.c | 597 +++++++++++++++++++++++++++++++++---------------------
fs/fuse/file.c | 4 +-
fs/fuse/fuse_i.h | 8 +-
fs/fuse/inode.c | 4 +-
4 files changed, 378 insertions(+), 235 deletions(-)
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 11dfa0c..b2d1ab6 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -25,6 +25,22 @@ MODULE_ALIAS("devname:fuse");
static struct kmem_cache *fuse_req_cachep;
+static inline int fuse_get_numa_id(struct fuse_conn *fc)
+{
+ if (fc->numa_on)
+ return numa_node_id();
+ else
+ return 0;
+}
+
+static inline struct fuse_numa_node *fuse_get_numa_node(struct fuse_conn *fc)
+{
+ if (fc->numa_on)
+ return fc->nn[numa_node_id()];
+ else
+ return fc->nn[0];
+}
+
static struct fuse_conn *fuse_get_conn(struct file *file)
{
/*
@@ -34,7 +50,23 @@ static struct fuse_conn *fuse_get_conn(struct file *file)
return file->private_data;
}
-static void fuse_request_init(struct fuse_req *req, struct page **pages,
+static u64 fuse_get_unique(struct fuse_conn *fc)
+{
+ u64 ctr;
+
+ spin_lock(&fc->lock);
+ fc->reqctr++;
+ /* zero is special */
+ if (fc->reqctr == 0)
+ fc->reqctr = 1;
+ ctr = fc->reqctr;
+ spin_unlock(&fc->lock);
+
+ return ctr;
+}
+
+static void fuse_request_init(struct fuse_numa_node *nn,
+ struct fuse_req *req, struct page **pages,
struct fuse_page_desc *page_descs,
unsigned npages)
{
@@ -48,11 +80,18 @@ static void fuse_request_init(struct fuse_req *req, struct page **pages,
req->pages = pages;
req->page_descs = page_descs;
req->max_pages = npages;
+ req->numaid = nn->numaid;
+ req->in.h.unique = fuse_get_unique(nn->fc);
}
-static struct fuse_req *__fuse_request_alloc(unsigned npages, gfp_t flags)
+static struct fuse_req *__fuse_request_alloc(struct fuse_conn *fc,
+ unsigned npages, gfp_t flags)
{
- struct fuse_req *req = kmem_cache_alloc(fuse_req_cachep, flags);
+ struct fuse_req *req;
+ struct fuse_numa_node *nn;
+
+ nn = fuse_get_numa_node(fc);
+ req = kmem_cache_alloc_node(fuse_req_cachep, GFP_KERNEL, nn->numaid);
if (req) {
struct page **pages;
struct fuse_page_desc *page_descs;
@@ -73,20 +112,20 @@ static struct fuse_req *__fuse_request_alloc(unsigned npages, gfp_t flags)
return NULL;
}
- fuse_request_init(req, pages, page_descs, npages);
+ fuse_request_init(nn, req, pages, page_descs, npages);
}
return req;
}
-struct fuse_req *fuse_request_alloc(unsigned npages)
+struct fuse_req *fuse_request_alloc(struct fuse_conn *fc, unsigned npages)
{
- return __fuse_request_alloc(npages, GFP_KERNEL);
+ return __fuse_request_alloc(fc, npages, GFP_KERNEL);
}
EXPORT_SYMBOL_GPL(fuse_request_alloc);
-struct fuse_req *fuse_request_alloc_nofs(unsigned npages)
+struct fuse_req *fuse_request_alloc_nofs(struct fuse_conn *fc, unsigned npages)
{
- return __fuse_request_alloc(npages, GFP_NOFS);
+ return __fuse_request_alloc(fc, npages, GFP_NOFS);
}
void fuse_request_free(struct fuse_req *req)
@@ -132,14 +171,16 @@ static void fuse_req_init_context(struct fuse_req *req)
struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
{
+ struct fuse_numa_node *nn;
struct fuse_req *req;
sigset_t oldset;
int intr;
int err;
- atomic_inc(&fc->num_waiting);
+ nn = fuse_get_numa_node(fc);
+ atomic_inc(&nn->num_waiting);
block_sigs(&oldset);
- intr = wait_event_interruptible(fc->blocked_waitq, !fc->blocked);
+ intr = wait_event_interruptible(nn->blocked_waitq, !nn->blocked);
restore_sigs(&oldset);
err = -EINTR;
if (intr)
@@ -149,7 +190,7 @@ struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
if (!fc->connected)
goto out;
- req = fuse_request_alloc(npages);
+ req = fuse_request_alloc(fc, npages);
err = -ENOMEM;
if (!req)
goto out;
@@ -159,7 +200,7 @@ struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
return req;
out:
- atomic_dec(&fc->num_waiting);
+ atomic_dec(&nn->num_waiting);
return ERR_PTR(err);
}
EXPORT_SYMBOL_GPL(fuse_get_req);
@@ -174,6 +215,7 @@ static struct fuse_req *get_reserved_req(struct fuse_conn *fc,
{
struct fuse_req *req = NULL;
struct fuse_file *ff = file->private_data;
+ struct fuse_numa_node *nn;
do {
wait_event(fc->reserved_req_waitq, ff->reserved_req);
@@ -186,6 +228,11 @@ static struct fuse_req *get_reserved_req(struct fuse_conn *fc,
spin_unlock(&fc->lock);
} while (!req);
+ /*
+ * initialize during get than put so we use local fuse numa node
+ */
+ nn = fuse_get_numa_node(fc);
+ fuse_request_init(nn, req, req->pages, req->page_descs, req->max_pages);
return req;
}
@@ -198,7 +245,6 @@ static void put_reserved_req(struct fuse_conn *fc, struct fuse_req *req)
struct fuse_file *ff = file->private_data;
spin_lock(&fc->lock);
- fuse_request_init(req, req->pages, req->page_descs, req->max_pages);
BUG_ON(ff->reserved_req);
ff->reserved_req = req;
wake_up_all(&fc->reserved_req_waitq);
@@ -223,12 +269,16 @@ struct fuse_req *fuse_get_req_nofail_nopages(struct fuse_conn *fc,
struct file *file)
{
struct fuse_req *req;
+ struct fuse_numa_node *nn = fuse_get_numa_node(fc);
- atomic_inc(&fc->num_waiting);
- wait_event(fc->blocked_waitq, !fc->blocked);
- req = fuse_request_alloc(0);
- if (!req)
+ atomic_inc(&nn->num_waiting);
+ wait_event(nn->blocked_waitq, !nn->blocked);
+ req = fuse_request_alloc(fc, 0);
+ if (!req) {
req = get_reserved_req(fc, file);
+ if (req->numaid != nn->numaid)
+ req->numaid = nn->numaid;
+ }
fuse_req_init_context(req);
req->waiting = 1;
@@ -237,9 +287,11 @@ struct fuse_req *fuse_get_req_nofail_nopages(struct fuse_conn *fc,
void fuse_put_request(struct fuse_conn *fc, struct fuse_req *req)
{
+ struct fuse_numa_node *nn = fc->nn[req->numaid];
+
if (atomic_dec_and_test(&req->count)) {
if (req->waiting)
- atomic_dec(&fc->num_waiting);
+ atomic_dec(&nn->num_waiting);
if (req->stolen_file)
put_reserved_req(fc, req);
@@ -260,59 +312,55 @@ static unsigned len_args(unsigned numargs, struct fuse_arg *args)
return nbytes;
}
-static u64 fuse_get_unique(struct fuse_conn *fc)
+static void queue_request(struct fuse_numa_node *nn, struct fuse_req *req)
{
- fc->reqctr++;
- /* zero is special */
- if (fc->reqctr == 0)
- fc->reqctr = 1;
+ struct fuse_conn *fc = nn->fc;
- return fc->reqctr;
-}
-
-static void queue_request(struct fuse_conn *fc, struct fuse_req *req)
-{
req->in.h.len = sizeof(struct fuse_in_header) +
len_args(req->in.numargs, (struct fuse_arg *) req->in.args);
- list_add_tail(&req->list, &fc->pending);
+ list_add_tail(&req->list, &nn->pending);
req->state = FUSE_REQ_PENDING;
if (!req->waiting) {
req->waiting = 1;
- atomic_inc(&fc->num_waiting);
+ atomic_inc(&nn->num_waiting);
}
- wake_up(&fc->waitq);
+ wake_up(&nn->waitq);
+ wake_up(&fc->poll_waitq);
kill_fasync(&fc->fasync, SIGIO, POLL_IN);
}
void fuse_queue_forget(struct fuse_conn *fc, struct fuse_forget_link *forget,
u64 nodeid, u64 nlookup)
{
+ struct fuse_numa_node *nn;
+ int numaid = fuse_get_numa_id(fc);
+
+ nn = fc->nn[numaid];
forget->forget_one.nodeid = nodeid;
forget->forget_one.nlookup = nlookup;
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
if (fc->connected) {
- fc->forget_list_tail->next = forget;
- fc->forget_list_tail = forget;
- wake_up(&fc->waitq);
+ nn->forget_list_tail->next = forget;
+ nn->forget_list_tail = forget;
+ wake_up(&nn->waitq);
kill_fasync(&fc->fasync, SIGIO, POLL_IN);
} else {
kfree(forget);
}
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
}
-static void flush_bg_queue(struct fuse_conn *fc)
+static void flush_bg_queue(struct fuse_numa_node *nn)
{
- while (fc->active_background < fc->max_background &&
- !list_empty(&fc->bg_queue)) {
+ while (nn->active_background < nn->max_background &&
+ !list_empty(&nn->bg_queue)) {
struct fuse_req *req;
- req = list_entry(fc->bg_queue.next, struct fuse_req, list);
+ req = list_entry(nn->bg_queue.next, struct fuse_req, list);
list_del(&req->list);
- fc->active_background++;
- req->in.h.unique = fuse_get_unique(fc);
- queue_request(fc, req);
+ nn->active_background++;
+ queue_request(nn, req);
}
}
@@ -326,62 +374,69 @@ static void flush_bg_queue(struct fuse_conn *fc)
*
* Called with fc->lock, unlocks it
*/
-static void request_end(struct fuse_conn *fc, struct fuse_req *req)
-__releases(fc->lock)
+static void request_end(struct fuse_numa_node *nn, struct fuse_req *req)
+__releases(&nn->lock)
{
void (*end) (struct fuse_conn *, struct fuse_req *) = req->end;
+ struct fuse_conn *fc = nn->fc;
+
req->end = NULL;
list_del(&req->list);
list_del(&req->intr_entry);
req->state = FUSE_REQ_FINISHED;
if (req->background) {
- if (fc->num_background == fc->max_background) {
- fc->blocked = 0;
- wake_up_all(&fc->blocked_waitq);
+ if (nn->num_background == nn->max_background) {
+ nn->blocked = 0;
+ wake_up_all(&nn->blocked_waitq);
}
- if (fc->num_background == fc->congestion_threshold &&
+ if (nn->num_background == nn->congestion_threshold &&
fc->connected && fc->bdi_initialized) {
clear_bdi_congested(&fc->bdi, BLK_RW_SYNC);
clear_bdi_congested(&fc->bdi, BLK_RW_ASYNC);
}
- fc->num_background--;
- fc->active_background--;
- flush_bg_queue(fc);
+ nn->num_background--;
+ nn->active_background--;
+ flush_bg_queue(nn);
}
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
wake_up(&req->waitq);
if (end)
end(fc, req);
fuse_put_request(fc, req);
}
-static void wait_answer_interruptible(struct fuse_conn *fc,
+static void wait_answer_interruptible(struct fuse_numa_node *nn,
struct fuse_req *req)
-__releases(fc->lock)
-__acquires(fc->lock)
+__releases(nn->lock)
+__acquires(nn->lock)
{
if (signal_pending(current))
return;
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
wait_event_interruptible(req->waitq, req->state == FUSE_REQ_FINISHED);
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
}
-static void queue_interrupt(struct fuse_conn *fc, struct fuse_req *req)
+static void queue_interrupt(struct fuse_numa_node *nn, struct fuse_req *req)
{
- list_add_tail(&req->intr_entry, &fc->interrupts);
- wake_up(&fc->waitq);
+ struct fuse_conn *fc = nn->fc;
+
+ list_add_tail(&req->intr_entry, &nn->interrupts);
+ wake_up(&nn->waitq);
+ wake_up(&fc->poll_waitq);
kill_fasync(&fc->fasync, SIGIO, POLL_IN);
}
-static void request_wait_answer(struct fuse_conn *fc, struct fuse_req *req)
-__releases(fc->lock)
-__acquires(fc->lock)
+static void request_wait_answer(struct fuse_numa_node *nn, struct fuse_req *req)
+__releases(&nn->lock)
+__acquires(&nn->lock)
{
+ struct fuse_conn *fc = nn->fc;
+
if (!fc->no_interrupt) {
/* Any signal may interrupt this */
- wait_answer_interruptible(fc, req);
+ wait_answer_interruptible(nn, req);
if (req->aborted)
goto aborted;
@@ -390,7 +445,7 @@ __acquires(fc->lock)
req->interrupted = 1;
if (req->state == FUSE_REQ_SENT)
- queue_interrupt(fc, req);
+ queue_interrupt(nn, req);
}
if (!req->force) {
@@ -398,7 +453,7 @@ __acquires(fc->lock)
/* Only fatal signals may interrupt this */
block_sigs(&oldset);
- wait_answer_interruptible(fc, req);
+ wait_answer_interruptible(nn, req);
restore_sigs(&oldset);
if (req->aborted)
@@ -419,9 +474,9 @@ __acquires(fc->lock)
* Either request is already in userspace, or it was forced.
* Wait it out.
*/
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
wait_event(req->waitq, req->state == FUSE_REQ_FINISHED);
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
if (!req->aborted)
return;
@@ -434,29 +489,30 @@ __acquires(fc->lock)
locked state, there mustn't be any filesystem
operation (e.g. page fault), since that could lead
to deadlock */
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
wait_event(req->waitq, !req->locked);
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
}
}
static void __fuse_request_send(struct fuse_conn *fc, struct fuse_req *req)
{
- spin_lock(&fc->lock);
+ struct fuse_numa_node *nn = fc->nn[req->numaid];
+
+ spin_lock(&nn->lock);
if (!fc->connected)
req->out.h.error = -ENOTCONN;
else if (fc->conn_error)
req->out.h.error = -ECONNREFUSED;
else {
- req->in.h.unique = fuse_get_unique(fc);
- queue_request(fc, req);
+ queue_request(nn, req);
/* acquire extra reference, since request is still needed
after request_end() */
__fuse_get_request(req);
- request_wait_answer(fc, req);
+ request_wait_answer(nn, req);
}
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
}
void fuse_request_send(struct fuse_conn *fc, struct fuse_req *req)
@@ -466,38 +522,41 @@ void fuse_request_send(struct fuse_conn *fc, struct fuse_req *req)
}
EXPORT_SYMBOL_GPL(fuse_request_send);
-static void fuse_request_send_nowait_locked(struct fuse_conn *fc,
+static void fuse_request_send_nowait_locked(struct fuse_numa_node *nn,
struct fuse_req *req)
{
+ struct fuse_conn *fc = nn->fc;
+
req->background = 1;
- fc->num_background++;
- if (fc->num_background == fc->max_background)
- fc->blocked = 1;
- if (fc->num_background == fc->congestion_threshold &&
+ nn->num_background++;
+ if (nn->num_background == nn->max_background)
+ nn->blocked = 1;
+ if (nn->num_background == nn->congestion_threshold &&
fc->bdi_initialized) {
set_bdi_congested(&fc->bdi, BLK_RW_SYNC);
set_bdi_congested(&fc->bdi, BLK_RW_ASYNC);
}
- list_add_tail(&req->list, &fc->bg_queue);
- flush_bg_queue(fc);
+ list_add_tail(&req->list, &nn->bg_queue);
+ flush_bg_queue(nn);
}
-static void fuse_request_send_nowait(struct fuse_conn *fc, struct fuse_req *req)
+static void fuse_request_send_nowait(struct fuse_numa_node *nn,
+ struct fuse_req *req)
{
- spin_lock(&fc->lock);
- if (fc->connected) {
- fuse_request_send_nowait_locked(fc, req);
- spin_unlock(&fc->lock);
+ spin_lock(&nn->lock);
+ if (nn->fc->connected) {
+ fuse_request_send_nowait_locked(nn, req);
+ spin_unlock(&nn->lock);
} else {
req->out.h.error = -ENOTCONN;
- request_end(fc, req);
+ request_end(nn, req);
}
}
void fuse_request_send_background(struct fuse_conn *fc, struct fuse_req *req)
{
req->isreply = 1;
- fuse_request_send_nowait(fc, req);
+ fuse_request_send_nowait(fc->nn[req->numaid], req);
}
EXPORT_SYMBOL_GPL(fuse_request_send_background);
@@ -505,15 +564,16 @@ static int fuse_request_send_notify_reply(struct fuse_conn *fc,
struct fuse_req *req, u64 unique)
{
int err = -ENODEV;
+ struct fuse_numa_node *nn = fc->nn[req->numaid];
req->isreply = 0;
req->in.h.unique = unique;
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
if (fc->connected) {
- queue_request(fc, req);
+ queue_request(nn, req);
err = 0;
}
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
return err;
}
@@ -526,8 +586,12 @@ static int fuse_request_send_notify_reply(struct fuse_conn *fc,
void fuse_request_send_background_locked(struct fuse_conn *fc,
struct fuse_req *req)
{
+ struct fuse_numa_node *nn = fc->nn[req->numaid];
+
+ spin_lock(&nn->lock);
req->isreply = 1;
- fuse_request_send_nowait_locked(fc, req);
+ fuse_request_send_nowait_locked(nn, req);
+ spin_unlock(&nn->lock);
}
void fuse_force_forget(struct file *file, u64 nodeid)
@@ -556,16 +620,16 @@ void fuse_force_forget(struct file *file, u64 nodeid)
* anything that could cause a page-fault. If the request was already
* aborted bail out.
*/
-static int lock_request(struct fuse_conn *fc, struct fuse_req *req)
+static int lock_request(struct fuse_numa_node *nn, struct fuse_req *req)
{
int err = 0;
if (req) {
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
if (req->aborted)
err = -ENOENT;
else
req->locked = 1;
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
}
return err;
}
@@ -575,19 +639,19 @@ static int lock_request(struct fuse_conn *fc, struct fuse_req *req)
* requester thread is currently waiting for it to be unlocked, so
* wake it up.
*/
-static void unlock_request(struct fuse_conn *fc, struct fuse_req *req)
+static void unlock_request(struct fuse_numa_node *nn, struct fuse_req *req)
{
if (req) {
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
req->locked = 0;
if (req->aborted)
wake_up(&req->waitq);
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
}
}
struct fuse_copy_state {
- struct fuse_conn *fc;
+ struct fuse_numa_node *nn;
int write;
struct fuse_req *req;
const struct iovec *iov;
@@ -604,12 +668,12 @@ struct fuse_copy_state {
unsigned move_pages:1;
};
-static void fuse_copy_init(struct fuse_copy_state *cs, struct fuse_conn *fc,
- int write,
+static void fuse_copy_init(struct fuse_copy_state *cs,
+ struct fuse_numa_node *nn, int write,
const struct iovec *iov, unsigned long nr_segs)
{
memset(cs, 0, sizeof(*cs));
- cs->fc = fc;
+ cs->nn = nn;
cs->write = write;
cs->iov = iov;
cs->nr_segs = nr_segs;
@@ -649,7 +713,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
unsigned long offset;
int err;
- unlock_request(cs->fc, cs->req);
+ unlock_request(cs->nn, cs->req);
fuse_copy_finish(cs);
if (cs->pipebufs) {
struct pipe_buffer *buf = cs->pipebufs;
@@ -707,7 +771,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
cs->addr += cs->len;
}
- return lock_request(cs->fc, cs->req);
+ return lock_request(cs->nn, cs->req);
}
/* Do as much copy to/from userspace buffer as we can */
@@ -753,7 +817,7 @@ static int fuse_try_move_page(struct fuse_copy_state *cs, struct page **pagep)
struct page *newpage;
struct pipe_buffer *buf = cs->pipebufs;
- unlock_request(cs->fc, cs->req);
+ unlock_request(cs->nn, cs->req);
fuse_copy_finish(cs);
err = buf->ops->confirm(cs->pipe, buf);
@@ -807,12 +871,12 @@ static int fuse_try_move_page(struct fuse_copy_state *cs, struct page **pagep)
lru_cache_add_file(newpage);
err = 0;
- spin_lock(&cs->fc->lock);
+ spin_lock(&cs->nn->lock);
if (cs->req->aborted)
err = -ENOENT;
else
*pagep = newpage;
- spin_unlock(&cs->fc->lock);
+ spin_unlock(&cs->nn->lock);
if (err) {
unlock_page(newpage);
@@ -832,7 +896,7 @@ out_fallback:
cs->mapaddr = buf->ops->map(cs->pipe, buf, 1);
cs->buf = cs->mapaddr + buf->offset;
- err = lock_request(cs->fc, cs->req);
+ err = lock_request(cs->nn, cs->req);
if (err)
return err;
@@ -847,7 +911,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
if (cs->nr_segs == cs->pipe->buffers)
return -EIO;
- unlock_request(cs->fc, cs->req);
+ unlock_request(cs->nn, cs->req);
fuse_copy_finish(cs);
buf = cs->pipebufs;
@@ -958,36 +1022,37 @@ static int fuse_copy_args(struct fuse_copy_state *cs, unsigned numargs,
return err;
}
-static int forget_pending(struct fuse_conn *fc)
+static int forget_pending(struct fuse_numa_node *nn)
{
- return fc->forget_list_head.next != NULL;
+ return nn->forget_list_head.next != NULL;
}
-static int request_pending(struct fuse_conn *fc)
+static int request_pending(struct fuse_numa_node *nn)
{
- return !list_empty(&fc->pending) || !list_empty(&fc->interrupts) ||
- forget_pending(fc);
+ return !list_empty(&nn->pending) || !list_empty(&nn->interrupts) ||
+ forget_pending(nn);
}
/* Wait until a request is available on the pending list */
-static void request_wait(struct fuse_conn *fc)
-__releases(fc->lock)
-__acquires(fc->lock)
+static void request_wait(struct fuse_numa_node *nn)
+__releases(&nn->lock)
+__acquires(&nn->lock)
{
+ struct fuse_conn *fc = nn->fc;
DECLARE_WAITQUEUE(wait, current);
- add_wait_queue_exclusive(&fc->waitq, &wait);
- while (fc->connected && !request_pending(fc)) {
+ add_wait_queue_exclusive(&nn->waitq, &wait);
+ while (fc->connected && !request_pending(nn)) {
set_current_state(TASK_INTERRUPTIBLE);
if (signal_pending(current))
break;
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
schedule();
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
}
set_current_state(TASK_RUNNING);
- remove_wait_queue(&fc->waitq, &wait);
+ remove_wait_queue(&nn->waitq, &wait);
}
/*
@@ -998,16 +1063,19 @@ __acquires(fc->lock)
*
* Called with fc->lock held, releases it
*/
-static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_copy_state *cs,
+static int fuse_read_interrupt(struct fuse_numa_node *nn,
+ struct fuse_copy_state *cs,
size_t nbytes, struct fuse_req *req)
-__releases(fc->lock)
+__releases(nn->lock)
{
struct fuse_in_header ih;
struct fuse_interrupt_in arg;
+ struct fuse_conn *fc = nn->fc;
unsigned reqsize = sizeof(ih) + sizeof(arg);
int err;
list_del_init(&req->intr_entry);
+ spin_unlock(&nn->lock);
req->intr_unique = fuse_get_unique(fc);
memset(&ih, 0, sizeof(ih));
memset(&arg, 0, sizeof(arg));
@@ -1016,7 +1084,6 @@ __releases(fc->lock)
ih.unique = req->intr_unique;
arg.unique = req->in.h.unique;
- spin_unlock(&fc->lock);
if (nbytes < reqsize)
return -EINVAL;
@@ -1028,21 +1095,21 @@ __releases(fc->lock)
return err ? err : reqsize;
}
-static struct fuse_forget_link *dequeue_forget(struct fuse_conn *fc,
+static struct fuse_forget_link *dequeue_forget(struct fuse_numa_node *nn,
unsigned max,
unsigned *countp)
{
- struct fuse_forget_link *head = fc->forget_list_head.next;
+ struct fuse_forget_link *head = nn->forget_list_head.next;
struct fuse_forget_link **newhead = &head;
unsigned count;
for (count = 0; *newhead != NULL && count < max; count++)
newhead = &(*newhead)->next;
- fc->forget_list_head.next = *newhead;
+ nn->forget_list_head.next = *newhead;
*newhead = NULL;
- if (fc->forget_list_head.next == NULL)
- fc->forget_list_tail = &fc->forget_list_head;
+ if (nn->forget_list_head.next == NULL)
+ nn->forget_list_tail = &nn->forget_list_head;
if (countp != NULL)
*countp = count;
@@ -1050,24 +1117,24 @@ static struct fuse_forget_link *dequeue_forget(struct fuse_conn *fc,
return head;
}
-static int fuse_read_single_forget(struct fuse_conn *fc,
+static int fuse_read_single_forget(struct fuse_numa_node *nn,
struct fuse_copy_state *cs,
size_t nbytes)
-__releases(fc->lock)
+__releases(nn->lock)
{
int err;
- struct fuse_forget_link *forget = dequeue_forget(fc, 1, NULL);
+ struct fuse_forget_link *forget = dequeue_forget(nn, 1, NULL);
struct fuse_forget_in arg = {
.nlookup = forget->forget_one.nlookup,
};
struct fuse_in_header ih = {
.opcode = FUSE_FORGET,
.nodeid = forget->forget_one.nodeid,
- .unique = fuse_get_unique(fc),
+ .unique = fuse_get_unique(nn->fc),
.len = sizeof(ih) + sizeof(arg),
};
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
kfree(forget);
if (nbytes < ih.len)
return -EINVAL;
@@ -1083,9 +1150,9 @@ __releases(fc->lock)
return ih.len;
}
-static int fuse_read_batch_forget(struct fuse_conn *fc,
+static int fuse_read_batch_forget(struct fuse_numa_node *nn,
struct fuse_copy_state *cs, size_t nbytes)
-__releases(fc->lock)
+__releases(nn->lock)
{
int err;
unsigned max_forgets;
@@ -1094,18 +1161,18 @@ __releases(fc->lock)
struct fuse_batch_forget_in arg = { .count = 0 };
struct fuse_in_header ih = {
.opcode = FUSE_BATCH_FORGET,
- .unique = fuse_get_unique(fc),
+ .unique = fuse_get_unique(nn->fc),
.len = sizeof(ih) + sizeof(arg),
};
if (nbytes < ih.len) {
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
return -EINVAL;
}
max_forgets = (nbytes - ih.len) / sizeof(struct fuse_forget_one);
- head = dequeue_forget(fc, max_forgets, &count);
- spin_unlock(&fc->lock);
+ head = dequeue_forget(nn, max_forgets, &count);
+ spin_unlock(&nn->lock);
arg.count = count;
ih.len += count * sizeof(struct fuse_forget_one);
@@ -1132,14 +1199,14 @@ __releases(fc->lock)
return ih.len;
}
-static int fuse_read_forget(struct fuse_conn *fc, struct fuse_copy_state *cs,
- size_t nbytes)
-__releases(fc->lock)
+static int fuse_read_forget(struct fuse_numa_node *nn,
+ struct fuse_copy_state *cs, size_t nbytes)
+__releases(nn->lock)
{
- if (fc->minor < 16 || fc->forget_list_head.next->next == NULL)
- return fuse_read_single_forget(fc, cs, nbytes);
+ if (nn->fc->minor < 16 || nn->forget_list_head.next->next == NULL)
+ return fuse_read_single_forget(nn, cs, nbytes);
else
- return fuse_read_batch_forget(fc, cs, nbytes);
+ return fuse_read_batch_forget(nn, cs, nbytes);
}
/*
@@ -1151,46 +1218,47 @@ __releases(fc->lock)
* request_end(). Otherwise add it to the processing list, and set
* the 'sent' flag.
*/
-static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
+static ssize_t fuse_dev_do_read(struct fuse_numa_node *nn, struct file *file,
struct fuse_copy_state *cs, size_t nbytes)
{
int err;
struct fuse_req *req;
struct fuse_in *in;
unsigned reqsize;
+ struct fuse_conn *fc = nn->fc;
restart:
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
err = -EAGAIN;
if ((file->f_flags & O_NONBLOCK) && fc->connected &&
- !request_pending(fc))
+ !request_pending(nn))
goto err_unlock;
- request_wait(fc);
+ request_wait(nn);
err = -ENODEV;
if (!fc->connected)
goto err_unlock;
err = -ERESTARTSYS;
- if (!request_pending(fc))
+ if (!request_pending(nn))
goto err_unlock;
- if (!list_empty(&fc->interrupts)) {
- req = list_entry(fc->interrupts.next, struct fuse_req,
+ if (!list_empty(&nn->interrupts)) {
+ req = list_entry(nn->interrupts.next, struct fuse_req,
intr_entry);
- return fuse_read_interrupt(fc, cs, nbytes, req);
+ return fuse_read_interrupt(nn, cs, nbytes, req);
}
- if (forget_pending(fc)) {
- if (list_empty(&fc->pending) || fc->forget_batch-- > 0)
- return fuse_read_forget(fc, cs, nbytes);
+ if (forget_pending(nn)) {
+ if (list_empty(&nn->pending) || nn->forget_batch-- > 0)
+ return fuse_read_forget(nn, cs, nbytes);
- if (fc->forget_batch <= -8)
- fc->forget_batch = 16;
+ if (nn->forget_batch <= -8)
+ nn->forget_batch = 16;
}
- req = list_entry(fc->pending.next, struct fuse_req, list);
+ req = list_entry(nn->pending.next, struct fuse_req, list);
req->state = FUSE_REQ_READING;
- list_move(&req->list, &fc->io);
+ list_move(&req->list, &nn->io);
in = &req->in;
reqsize = in->h.len;
@@ -1200,40 +1268,40 @@ static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
/* SETXATTR is special, since it may contain too large data */
if (in->h.opcode == FUSE_SETXATTR)
req->out.h.error = -E2BIG;
- request_end(fc, req);
+ request_end(nn, req);
goto restart;
}
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
cs->req = req;
err = fuse_copy_one(cs, &in->h, sizeof(in->h));
if (!err)
err = fuse_copy_args(cs, in->numargs, in->argpages,
(struct fuse_arg *) in->args, 0);
fuse_copy_finish(cs);
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
req->locked = 0;
if (req->aborted) {
- request_end(fc, req);
+ request_end(nn, req);
return -ENODEV;
}
if (err) {
req->out.h.error = -EIO;
- request_end(fc, req);
+ request_end(nn, req);
return err;
}
if (!req->isreply)
- request_end(fc, req);
+ request_end(nn, req);
else {
req->state = FUSE_REQ_SENT;
- list_move_tail(&req->list, &fc->processing);
+ list_move_tail(&req->list, &nn->processing);
if (req->interrupted)
- queue_interrupt(fc, req);
- spin_unlock(&fc->lock);
+ queue_interrupt(nn, req);
+ spin_unlock(&nn->lock);
}
return reqsize;
err_unlock:
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
return err;
}
@@ -1243,12 +1311,14 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
struct fuse_copy_state cs;
struct file *file = iocb->ki_filp;
struct fuse_conn *fc = fuse_get_conn(file);
+ struct fuse_numa_node *nn;
if (!fc)
return -EPERM;
- fuse_copy_init(&cs, fc, 1, iov, nr_segs);
+ nn = fuse_get_numa_node(fc);
+ fuse_copy_init(&cs, nn, 1, iov, nr_segs);
- return fuse_dev_do_read(fc, file, &cs, iov_length(iov, nr_segs));
+ return fuse_dev_do_read(nn, file, &cs, iov_length(iov, nr_segs));
}
static int fuse_dev_pipe_buf_steal(struct pipe_inode_info *pipe,
@@ -1277,17 +1347,19 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
struct pipe_buffer *bufs;
struct fuse_copy_state cs;
struct fuse_conn *fc = fuse_get_conn(in);
+ struct fuse_numa_node *nn;
if (!fc)
return -EPERM;
+ nn = fuse_get_numa_node(fc);
bufs = kmalloc(pipe->buffers * sizeof(struct pipe_buffer), GFP_KERNEL);
if (!bufs)
return -ENOMEM;
- fuse_copy_init(&cs, fc, 1, NULL, 0);
+ fuse_copy_init(&cs, nn, 1, NULL, 0);
cs.pipebufs = bufs;
cs.pipe = pipe;
- ret = fuse_dev_do_read(fc, in, &cs, len);
+ ret = fuse_dev_do_read(nn, in, &cs, len);
if (ret < 0)
goto out;
@@ -1718,11 +1790,11 @@ static int fuse_notify(struct fuse_conn *fc, enum fuse_notify_code code,
}
/* Look up request on processing list by unique ID */
-static struct fuse_req *request_find(struct fuse_conn *fc, u64 unique)
+static struct fuse_req *request_find(struct fuse_numa_node *nn, u64 unique)
{
struct list_head *entry;
- list_for_each(entry, &fc->processing) {
+ list_for_each(entry, &nn->processing) {
struct fuse_req *req;
req = list_entry(entry, struct fuse_req, list);
if (req->in.h.unique == unique || req->intr_unique == unique)
@@ -1731,6 +1803,24 @@ static struct fuse_req *request_find(struct fuse_conn *fc, u64 unique)
return NULL;
}
+static struct fuse_req *request_find_allnodes(struct fuse_conn *fc,
+ u64 unique)
+{
+ struct fuse_numa_node *nn;
+ struct fuse_req *req;
+ int i;
+
+ for (i = 0; i < fc->nr_nodes; i++) {
+ nn = fc->nn[i];
+ spin_lock(&nn->lock);
+ req = request_find(nn, unique);
+ if (req)
+ return req;
+ spin_unlock(&nn->lock);
+ }
+ return NULL;
+}
+
static int copy_out_args(struct fuse_copy_state *cs, struct fuse_out *out,
unsigned nbytes)
{
@@ -1761,12 +1851,13 @@ static int copy_out_args(struct fuse_copy_state *cs, struct fuse_out *out,
* it from the list and copy the rest of the buffer to the request.
* The request is finished by calling request_end()
*/
-static ssize_t fuse_dev_do_write(struct fuse_conn *fc,
+static ssize_t fuse_dev_do_write(struct fuse_numa_node *nn,
struct fuse_copy_state *cs, size_t nbytes)
{
int err;
struct fuse_req *req;
struct fuse_out_header oh;
+ struct fuse_conn *fc = nn->fc;
if (nbytes < sizeof(struct fuse_out_header))
return -EINVAL;
@@ -1792,20 +1883,30 @@ static ssize_t fuse_dev_do_write(struct fuse_conn *fc,
if (oh.error <= -1000 || oh.error > 0)
goto err_finish;
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
err = -ENOENT;
if (!fc->connected)
goto err_unlock;
- req = request_find(fc, oh.unique);
- if (!req)
- goto err_unlock;
+ req = request_find(nn, oh.unique);
+ if (!req) {
+ /*
+ * responding process could be different from reaped one, so
+ * the responding process could be on a different NUMA node.
+ * Hence search all numa node queues for the request
+ */
+ spin_unlock(&nn->lock);
+ req = request_find_allnodes(fc, oh.unique);
+ if (!req)
+ goto err_finish;
+ nn = fc->nn[req->numaid];
+ }
if (req->aborted) {
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
fuse_copy_finish(cs);
- spin_lock(&fc->lock);
- request_end(fc, req);
+ spin_lock(&nn->lock);
+ request_end(nn, req);
return -ENOENT;
}
/* Is it an interrupt reply? */
@@ -1817,38 +1918,38 @@ static ssize_t fuse_dev_do_write(struct fuse_conn *fc,
if (oh.error == -ENOSYS)
fc->no_interrupt = 1;
else if (oh.error == -EAGAIN)
- queue_interrupt(fc, req);
+ queue_interrupt(nn, req);
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
fuse_copy_finish(cs);
return nbytes;
}
req->state = FUSE_REQ_WRITING;
- list_move(&req->list, &fc->io);
+ list_move(&req->list, &nn->io);
req->out.h = oh;
req->locked = 1;
cs->req = req;
if (!req->out.page_replace)
cs->move_pages = 0;
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
err = copy_out_args(cs, &req->out, nbytes);
fuse_copy_finish(cs);
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
req->locked = 0;
if (!err) {
if (req->aborted)
err = -ENOENT;
} else if (!req->aborted)
req->out.h.error = -EIO;
- request_end(fc, req);
+ request_end(nn, req);
return err ? err : nbytes;
err_unlock:
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
err_finish:
fuse_copy_finish(cs);
return err;
@@ -1859,12 +1960,14 @@ static ssize_t fuse_dev_write(struct kiocb *iocb, const struct iovec *iov,
{
struct fuse_copy_state cs;
struct fuse_conn *fc = fuse_get_conn(iocb->ki_filp);
+ struct fuse_numa_node *nn;
if (!fc)
return -EPERM;
- fuse_copy_init(&cs, fc, 0, iov, nr_segs);
+ nn = fuse_get_numa_node(fc);
+ fuse_copy_init(&cs, nn, 0, iov, nr_segs);
- return fuse_dev_do_write(fc, &cs, iov_length(iov, nr_segs));
+ return fuse_dev_do_write(nn, &cs, iov_length(iov, nr_segs));
}
static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
@@ -1875,6 +1978,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
unsigned idx;
struct pipe_buffer *bufs;
struct fuse_copy_state cs;
+ struct fuse_numa_node *nn;
struct fuse_conn *fc;
size_t rem;
ssize_t ret;
@@ -1927,14 +2031,15 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
}
pipe_unlock(pipe);
- fuse_copy_init(&cs, fc, 0, NULL, nbuf);
+ nn = fuse_get_numa_node(fc);
+ fuse_copy_init(&cs, nn, 0, NULL, nbuf);
cs.pipebufs = bufs;
cs.pipe = pipe;
if (flags & SPLICE_F_MOVE)
cs.move_pages = 1;
- ret = fuse_dev_do_write(fc, &cs, len);
+ ret = fuse_dev_do_write(nn, &cs, len);
for (idx = 0; idx < nbuf; idx++) {
struct pipe_buffer *buf = &bufs[idx];
@@ -1949,16 +2054,28 @@ static unsigned fuse_dev_poll(struct file *file, poll_table *wait)
{
unsigned mask = POLLOUT | POLLWRNORM;
struct fuse_conn *fc = fuse_get_conn(file);
+ struct fuse_numa_node *nn;
+ int i;
if (!fc)
return POLLERR;
- poll_wait(file, &fc->waitq, wait);
+ poll_wait(file, &fc->poll_waitq, wait);
spin_lock(&fc->lock);
if (!fc->connected)
mask = POLLERR;
- else if (request_pending(fc))
- mask |= POLLIN | POLLRDNORM;
+ else {
+ for (i = 0; i < fc->nr_nodes; i++) {
+ nn = fc->nn[i];
+ spin_lock(&nn->lock);
+ if (request_pending(nn)) {
+ mask |= POLLIN | POLLRDNORM;
+ spin_unlock(&nn->lock);
+ break;
+ }
+ spin_unlock(&nn->lock);
+ }
+ }
spin_unlock(&fc->lock);
return mask;
@@ -1969,16 +2086,16 @@ static unsigned fuse_dev_poll(struct file *file, poll_table *wait)
*
* This function releases and reacquires fc->lock
*/
-static void end_requests(struct fuse_conn *fc, struct list_head *head)
-__releases(fc->lock)
-__acquires(fc->lock)
+static void end_requests(struct fuse_numa_node *nn, struct list_head *head)
+__releases(&nn->lock)
+__acquires(&nn->lock)
{
while (!list_empty(head)) {
struct fuse_req *req;
req = list_entry(head->next, struct fuse_req, list);
req->out.h.error = -ECONNABORTED;
- request_end(fc, req);
- spin_lock(&fc->lock);
+ request_end(nn, req);
+ spin_lock(&nn->lock);
}
}
@@ -1993,14 +2110,15 @@ __acquires(fc->lock)
* called after waiting for the request to be unlocked (if it was
* locked).
*/
-static void end_io_requests(struct fuse_conn *fc)
-__releases(fc->lock)
-__acquires(fc->lock)
+static void end_io_requests(struct fuse_numa_node *nn)
+__releases(&nn->lock)
+__acquires(&nn->lock)
{
- while (!list_empty(&fc->io)) {
+ while (!list_empty(&nn->io)) {
struct fuse_req *req =
- list_entry(fc->io.next, struct fuse_req, list);
+ list_entry(nn->io.next, struct fuse_req, list);
void (*end) (struct fuse_conn *, struct fuse_req *) = req->end;
+ struct fuse_conn *fc = nn->fc;
req->aborted = 1;
req->out.h.error = -ECONNABORTED;
@@ -2010,25 +2128,33 @@ __acquires(fc->lock)
if (end) {
req->end = NULL;
__fuse_get_request(req);
- spin_unlock(&fc->lock);
+ spin_unlock(&nn->lock);
wait_event(req->waitq, !req->locked);
end(fc, req);
fuse_put_request(fc, req);
- spin_lock(&fc->lock);
+ spin_lock(&nn->lock);
}
}
}
static void end_queued_requests(struct fuse_conn *fc)
-__releases(fc->lock)
-__acquires(fc->lock)
-{
- fc->max_background = UINT_MAX;
- flush_bg_queue(fc);
- end_requests(fc, &fc->pending);
- end_requests(fc, &fc->processing);
- while (forget_pending(fc))
- kfree(dequeue_forget(fc, 1, NULL));
+__releases(nn->lock)
+__acquires(nn->lock)
+{
+ int i;
+ struct fuse_numa_node *nn;
+
+ for (i = 0; i < fc->nr_nodes; i++) {
+ nn = fc->nn[i];
+ spin_lock(&nn->lock);
+ nn->max_background = UINT_MAX;
+ flush_bg_queue(nn);
+ end_requests(nn, &nn->pending);
+ end_requests(nn, &nn->processing);
+ while (forget_pending(nn))
+ kfree(dequeue_forget(nn, 1, NULL));
+ spin_unlock(&nn->lock);
+ }
}
static void end_polls(struct fuse_conn *fc)
@@ -2067,15 +2193,22 @@ static void end_polls(struct fuse_conn *fc)
*/
void fuse_abort_conn(struct fuse_conn *fc)
{
+ int i;
+ struct fuse_numa_node *nn;
+
spin_lock(&fc->lock);
if (fc->connected) {
fc->connected = 0;
- fc->blocked = 0;
- end_io_requests(fc);
+ for(i = 0; i < fc->nr_nodes; i++) {
+ nn = fc->nn[i];
+ nn->blocked = 0;
+ end_io_requests(nn);
+ wake_up_all(&nn->waitq);
+ wake_up_all(&nn->blocked_waitq);
+ }
+ wake_up_all(&fc->poll_waitq);
end_queued_requests(fc);
end_polls(fc);
- wake_up_all(&fc->waitq);
- wake_up_all(&fc->blocked_waitq);
kill_fasync(&fc->fasync, SIGIO, POLL_IN);
}
spin_unlock(&fc->lock);
@@ -2084,14 +2217,20 @@ EXPORT_SYMBOL_GPL(fuse_abort_conn);
int fuse_dev_release(struct inode *inode, struct file *file)
{
+ int i;
+ struct fuse_numa_node *nn;
struct fuse_conn *fc = fuse_get_conn(file);
+
if (fc) {
spin_lock(&fc->lock);
fc->connected = 0;
- fc->blocked = 0;
end_queued_requests(fc);
+ for (i = 0; i < fc->nr_nodes; i++) {
+ nn = fc->nn[i];
+ nn->blocked = 0;
+ wake_up_all(&nn->blocked_waitq);
+ }
end_polls(fc);
- wake_up_all(&fc->blocked_waitq);
spin_unlock(&fc->lock);
fuse_conn_put(fc);
}
diff --git a/fs/fuse/file.c b/fs/fuse/file.c
index 34b80ba..e8e57d4 100644
--- a/fs/fuse/file.c
+++ b/fs/fuse/file.c
@@ -57,7 +57,7 @@ struct fuse_file *fuse_file_alloc(struct fuse_conn *fc)
return NULL;
ff->fc = fc;
- ff->reserved_req = fuse_request_alloc(0);
+ ff->reserved_req = fuse_request_alloc(fc, 0);
if (unlikely(!ff->reserved_req)) {
kfree(ff);
return NULL;
@@ -1371,7 +1371,7 @@ static int fuse_writepage_locked(struct page *page)
set_page_writeback(page);
- req = fuse_request_alloc_nofs(1);
+ req = fuse_request_alloc_nofs(fc, 1);
if (!req)
goto err;
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index b44675b..6e9de37 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -449,6 +449,9 @@ struct fuse_conn {
/** rbtree of fuse_files waiting for poll events indexed by ph */
struct rb_root polled_files;
+ /** waitq for poll requests */
+ wait_queue_head_t poll_waitq;
+
/** waitq for reserved requests */
wait_queue_head_t reserved_req_waitq;
@@ -717,9 +720,10 @@ void fuse_ctl_cleanup(void);
/**
* Allocate a request
*/
-struct fuse_req *fuse_request_alloc(unsigned npages);
+struct fuse_req *fuse_request_alloc(struct fuse_conn *fc, unsigned npages);
-struct fuse_req *fuse_request_alloc_nofs(unsigned npages);
+struct fuse_req *fuse_request_alloc_nofs(struct fuse_conn *fc,
+ unsigned npages);
/**
* Free a request
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index 250eb38..cd8ffeb 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -1113,12 +1113,12 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent)
/* only now - we want root dentry with NULL ->d_op */
sb->s_d_op = &fuse_dentry_operations;
- init_req = fuse_request_alloc(0);
+ init_req = fuse_request_alloc(fc, 0);
if (!init_req)
goto err_put_root;
if (is_bdev) {
- fc->destroy_req = fuse_request_alloc(0);
+ fc->destroy_req = fuse_request_alloc(fc, 0);
if (!fc->destroy_req)
goto err_free_init_req;
}
--
1.5.4.3
^ permalink raw reply related [flat|nested] 8+ messages in thread