* [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling
@ 2020-09-18 14:27 Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 1/6] fs: dlm: remove lock dependency warning Alexander Aring
` (6 more replies)
0 siblings, 7 replies; 8+ messages in thread
From: Alexander Aring @ 2020-09-18 14:27 UTC (permalink / raw)
To: cluster-devel.redhat.com
Hi,
This patch series contains a huge change regarding to the
receive/transmit handling of dlm. As I placed sequence numbering on dlm
messages I expierenced that dlm messages can be transmitted in an
incorrect order. I changed the transmit handling that the writequeue
handles entries per dlm message buffer and sendmsg() will use
scatter/gather to transmit a bunch of them. At the receive handling I
removed some allocations and copies into different buffers. There is
only one receive buffer now which fill be get filled and the upper layer
will working on that buffer as well.
- Alex
Alexander Aring (6):
fs: dlm: remove lock dependency warning
fs: dlm: handle range check as callback
fs: dlm: disallow buffer size below default
fs: dlm: release connection with call_rcu
fs: dlm: rework receive handling
fs: dlm: rework transmit handling
fs/dlm/config.c | 47 +++--
fs/dlm/config.h | 2 +
fs/dlm/lowcomms.c | 471 ++++++++++++++++++++++------------------------
fs/dlm/midcomms.c | 136 ++++++-------
fs/dlm/midcomms.h | 3 +-
5 files changed, 310 insertions(+), 349 deletions(-)
--
2.26.2
^ permalink raw reply [flat|nested] 8+ messages in thread
* [Cluster-devel] [PATCH dlm/next 1/6] fs: dlm: remove lock dependency warning
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
@ 2020-09-18 14:27 ` Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 2/6] fs: dlm: handle range check as callback Alexander Aring
` (5 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Alexander Aring @ 2020-09-18 14:27 UTC (permalink / raw)
To: cluster-devel.redhat.com
During my experiments to make dlm robust against tcpkill application I
was able to run sometimes in a circular lock dependency warning between
clusters_root.subsys.su_mutex and con->sock_mutex. We don't need to
held the sock_mutex when getting the mark value which held the
clusters_root.subsys.su_mutex. This patch moves the specific handling
just before the sock_mutex will be held.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 1bf1808bfa6b0..24f5e55313d83 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -971,6 +971,10 @@ static void sctp_connect_to_sock(struct connection *con)
return;
}
+ result = dlm_comm_mark(con->nodeid, &mark);
+ if (result < 0)
+ return;
+
mutex_lock(&con->sock_mutex);
/* Some odd races can cause double-connects, ignore them */
@@ -995,11 +999,6 @@ static void sctp_connect_to_sock(struct connection *con)
if (result < 0)
goto socket_err;
- /* set skb mark */
- result = dlm_comm_mark(con->nodeid, &mark);
- if (result < 0)
- goto bind_err;
-
sock_set_mark(sock->sk, mark);
con->rx_action = receive_from_sock;
@@ -1072,6 +1071,10 @@ static void tcp_connect_to_sock(struct connection *con)
return;
}
+ result = dlm_comm_mark(con->nodeid, &mark);
+ if (result < 0)
+ return;
+
mutex_lock(&con->sock_mutex);
if (con->retries++ > MAX_CONNECT_RETRIES)
goto out;
@@ -1086,11 +1089,6 @@ static void tcp_connect_to_sock(struct connection *con)
if (result < 0)
goto out_err;
- /* set skb mark */
- result = dlm_comm_mark(con->nodeid, &mark);
- if (result < 0)
- goto out_err;
-
sock_set_mark(sock->sk, mark);
memset(&saddr, 0, sizeof(saddr));
--
2.26.2
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [Cluster-devel] [PATCH dlm/next 2/6] fs: dlm: handle range check as callback
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 1/6] fs: dlm: remove lock dependency warning Alexander Aring
@ 2020-09-18 14:27 ` Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 3/6] fs: dlm: disallow buffer size below default Alexander Aring
` (4 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Alexander Aring @ 2020-09-18 14:27 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch adds a callback to CLUSTER_ATTR macro to allow individual
callbacks for attributes which might have a more complex attribute range
checking just than non zero.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/config.c | 41 +++++++++++++++++++++++------------------
1 file changed, 23 insertions(+), 18 deletions(-)
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index f33a7e4ae917b..2b30104a34cef 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -125,7 +125,7 @@ static ssize_t cluster_cluster_name_store(struct config_item *item,
CONFIGFS_ATTR(cluster_, cluster_name);
static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
- int *info_field, int check_zero,
+ int *info_field, bool (*check_cb)(unsigned int x),
const char *buf, size_t len)
{
unsigned int x;
@@ -137,7 +137,7 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
if (rc)
return rc;
- if (check_zero && !x)
+ if (check_cb && check_cb(x))
return -EINVAL;
*cl_field = x;
@@ -146,13 +146,13 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
return len;
}
-#define CLUSTER_ATTR(name, check_zero) \
+#define CLUSTER_ATTR(name, check_cb) \
static ssize_t cluster_##name##_store(struct config_item *item, \
const char *buf, size_t len) \
{ \
struct dlm_cluster *cl = config_item_to_cluster(item); \
return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name, \
- check_zero, buf, len); \
+ check_cb, buf, len); \
} \
static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \
{ \
@@ -161,20 +161,25 @@ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \
} \
CONFIGFS_ATTR(cluster_, name);
-CLUSTER_ATTR(tcp_port, 1);
-CLUSTER_ATTR(buffer_size, 1);
-CLUSTER_ATTR(rsbtbl_size, 1);
-CLUSTER_ATTR(recover_timer, 1);
-CLUSTER_ATTR(toss_secs, 1);
-CLUSTER_ATTR(scan_secs, 1);
-CLUSTER_ATTR(log_debug, 0);
-CLUSTER_ATTR(log_info, 0);
-CLUSTER_ATTR(protocol, 0);
-CLUSTER_ATTR(mark, 0);
-CLUSTER_ATTR(timewarn_cs, 1);
-CLUSTER_ATTR(waitwarn_us, 0);
-CLUSTER_ATTR(new_rsb_count, 0);
-CLUSTER_ATTR(recover_callbacks, 0);
+static bool dlm_check_zero(unsigned int x)
+{
+ return !x;
+}
+
+CLUSTER_ATTR(tcp_port, dlm_check_zero);
+CLUSTER_ATTR(buffer_size, dlm_check_zero);
+CLUSTER_ATTR(rsbtbl_size, dlm_check_zero);
+CLUSTER_ATTR(recover_timer, dlm_check_zero);
+CLUSTER_ATTR(toss_secs, dlm_check_zero);
+CLUSTER_ATTR(scan_secs, dlm_check_zero);
+CLUSTER_ATTR(log_debug, NULL);
+CLUSTER_ATTR(log_info, NULL);
+CLUSTER_ATTR(protocol, NULL);
+CLUSTER_ATTR(mark, NULL);
+CLUSTER_ATTR(timewarn_cs, dlm_check_zero);
+CLUSTER_ATTR(waitwarn_us, NULL);
+CLUSTER_ATTR(new_rsb_count, NULL);
+CLUSTER_ATTR(recover_callbacks, NULL);
static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port,
--
2.26.2
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [Cluster-devel] [PATCH dlm/next 3/6] fs: dlm: disallow buffer size below default
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 1/6] fs: dlm: remove lock dependency warning Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 2/6] fs: dlm: handle range check as callback Alexander Aring
@ 2020-09-18 14:27 ` Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 4/6] fs: dlm: release connection with call_rcu Alexander Aring
` (3 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Alexander Aring @ 2020-09-18 14:27 UTC (permalink / raw)
To: cluster-devel.redhat.com
I observed dlm messages cannot below this value delivered from upper
layers. This buffer is used inside the current receive handling to copy a
dlm message into it. However we should not allow sizes below the default
value.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/config.c | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 2b30104a34cef..1734f2e861924 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -166,8 +166,14 @@ static bool dlm_check_zero(unsigned int x)
return !x;
}
+#define DEFAULT_BUFFER_SIZE 4096
+static bool dlm_check_buffer_size(unsigned int x)
+{
+ return (x < DEFAULT_BUFFER_SIZE);
+}
+
CLUSTER_ATTR(tcp_port, dlm_check_zero);
-CLUSTER_ATTR(buffer_size, dlm_check_zero);
+CLUSTER_ATTR(buffer_size, dlm_check_buffer_size);
CLUSTER_ATTR(rsbtbl_size, dlm_check_zero);
CLUSTER_ATTR(recover_timer, dlm_check_zero);
CLUSTER_ATTR(toss_secs, dlm_check_zero);
@@ -897,7 +903,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
/* Config file defaults */
#define DEFAULT_TCP_PORT 21064
-#define DEFAULT_BUFFER_SIZE 4096
#define DEFAULT_RSBTBL_SIZE 1024
#define DEFAULT_RECOVER_TIMER 5
#define DEFAULT_TOSS_SECS 10
--
2.26.2
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [Cluster-devel] [PATCH dlm/next 4/6] fs: dlm: release connection with call_rcu
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
` (2 preceding siblings ...)
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 3/6] fs: dlm: disallow buffer size below default Alexander Aring
@ 2020-09-18 14:27 ` Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 5/6] fs: dlm: rework receive handling Alexander Aring
` (2 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Alexander Aring @ 2020-09-18 14:27 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch will introduce to use call_rcu() for a connection structure
which will call do a deep-free of fields of the connection structure.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 24f5e55313d83..72764e6c9417f 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1603,18 +1603,23 @@ static void shutdown_conn(struct connection *con)
con->shutdown_action(con);
}
+static void connection_release(struct rcu_head *rcu)
+{
+ struct connection *con = container_of(rcu, struct connection, rcu);
+
+ clean_one_writequeue(con);
+ kfree(con);
+}
+
static void free_conn(struct connection *con)
{
close_connection(con, true, true, true);
spin_lock(&connections_lock);
hlist_del_rcu(&con->list);
spin_unlock(&connections_lock);
- if (con->othercon) {
- clean_one_writequeue(con->othercon);
- kfree_rcu(con->othercon, rcu);
- }
- clean_one_writequeue(con);
- kfree_rcu(con, rcu);
+ if (con->othercon)
+ call_rcu(&con->othercon->rcu, connection_release);
+ call_rcu(&con->rcu, connection_release);
}
static void work_flush(void)
--
2.26.2
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [Cluster-devel] [PATCH dlm/next 5/6] fs: dlm: rework receive handling
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
` (3 preceding siblings ...)
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 4/6] fs: dlm: release connection with call_rcu Alexander Aring
@ 2020-09-18 14:27 ` Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 6/6] fs: dlm: rework transmit handling Alexander Aring
2020-09-22 20:52 ` [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and " Alexander Ahring Oder Aring
6 siblings, 0 replies; 8+ messages in thread
From: Alexander Aring @ 2020-09-18 14:27 UTC (permalink / raw)
To: cluster-devel.redhat.com
This patch reworks the current receive handling of dlm. As I tried to
change the send handling to fix reorder issues I took a look into the
receive handling and simplified it, it works as the following:
Each connection has a preallocated receive buffer with a minimum length of
4096. On receive, the upper layer protocol will process all dlm message
until there is not enough data anymore. If there exists "leftover" data at
the end of the receive buffer because the dlm message wasn't fully received
it will be copied to the begin of the preallocated receive buffer. Next
receive more data will be appended to the previous "leftover" data and
processing will begin again.
This will remove a lot of code of the current mechanism. Inside the
processing functionality we will ensure with a memmove() that the dlm
message should be memory aligned. To have a dlm message always started
at the beginning of the buffer will reduce some amount of memmove()
calls because src and dest pointers are the same.
The cluster attribute "buffer_size" becomes a new meaning, it's now the
size of application layer receive buffer size. If this is changed during
runtime the receive buffer will be reallocated. It's important that the
receive buffer size has at minimum the size of the maximum possible dlm
message size otherwise the received message cannot be placed inside
the receive buffer size.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/config.c | 1 -
fs/dlm/config.h | 2 +
fs/dlm/lowcomms.c | 168 +++++++++++++++++++++-------------------------
fs/dlm/midcomms.c | 136 +++++++++++++++----------------------
fs/dlm/midcomms.h | 3 +-
5 files changed, 132 insertions(+), 178 deletions(-)
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 1734f2e861924..53d32ea368dda 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -166,7 +166,6 @@ static bool dlm_check_zero(unsigned int x)
return !x;
}
-#define DEFAULT_BUFFER_SIZE 4096
static bool dlm_check_buffer_size(unsigned int x)
{
return (x < DEFAULT_BUFFER_SIZE);
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index f62996cad5616..1b48ddbe39c7b 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -12,6 +12,8 @@
#ifndef __CONFIG_DOT_H__
#define __CONFIG_DOT_H__
+#define DEFAULT_BUFFER_SIZE 4096
+
struct dlm_config_node {
int nodeid;
int weight;
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 72764e6c9417f..c7b6e36845404 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -65,40 +65,6 @@
#define MAX_SEND_MSG_COUNT 25
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
-struct cbuf {
- unsigned int base;
- unsigned int len;
- unsigned int mask;
-};
-
-static void cbuf_add(struct cbuf *cb, int n)
-{
- cb->len += n;
-}
-
-static int cbuf_data(struct cbuf *cb)
-{
- return ((cb->base + cb->len) & cb->mask);
-}
-
-static void cbuf_init(struct cbuf *cb, int size)
-{
- cb->base = cb->len = 0;
- cb->mask = size-1;
-}
-
-static void cbuf_eat(struct cbuf *cb, int n)
-{
- cb->len -= n;
- cb->base += n;
- cb->base &= cb->mask;
-}
-
-static bool cbuf_empty(struct cbuf *cb)
-{
- return cb->len == 0;
-}
-
struct connection {
struct socket *sock; /* NULL if not connected */
uint32_t nodeid; /* So we know who we are in the list */
@@ -117,8 +83,6 @@ struct connection {
int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
- struct page *rx_page;
- struct cbuf cb;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@@ -126,6 +90,9 @@ struct connection {
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
+ unsigned char *rx_buf;
+ int rx_buflen;
+ int rx_leftover;
struct rcu_head rcu;
};
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -219,6 +186,13 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
if (!con)
return NULL;
+ con->rx_buflen = dlm_config.ci_buffer_size;
+ con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
+ if (!con->rx_buf) {
+ kfree(con);
+ return NULL;
+ }
+
con->nodeid = nodeid;
mutex_init(&con->sock_mutex);
INIT_LIST_HEAD(&con->writequeue);
@@ -613,11 +587,8 @@ static void close_connection(struct connection *con, bool and_other,
/* Will only re-enter once. */
close_connection(con->othercon, false, true, true);
}
- if (con->rx_page) {
- __free_page(con->rx_page);
- con->rx_page = NULL;
- }
+ con->rx_leftover = 0;
con->retries = 0;
mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags);
@@ -671,16 +642,33 @@ static void dlm_tcp_shutdown(struct connection *con)
shutdown_connection(con);
}
+static int con_realloc_receive_buf(struct connection *con, int newlen)
+{
+ unsigned char *newbuf;
+
+ newbuf = kmalloc(newlen, GFP_NOFS);
+ if (!newbuf)
+ return -ENOMEM;
+
+ /* copy any leftover from last receive */
+ if (con->rx_leftover)
+ memmove(newbuf, con->rx_buf, con->rx_leftover);
+
+ /* swap to new buffer space */
+ kfree(con->rx_buf);
+ con->rx_buflen = newlen;
+ con->rx_buf = newbuf;
+
+ return 0;
+}
+
/* Data received from remote end */
static int receive_from_sock(struct connection *con)
{
- int ret = 0;
- struct msghdr msg = {};
- struct kvec iov[2];
- unsigned len;
- int r;
int call_again_soon = 0;
- int nvec;
+ struct msghdr msg;
+ struct kvec iov;
+ int ret, buflen;
mutex_lock(&con->sock_mutex);
@@ -688,71 +676,55 @@ static int receive_from_sock(struct connection *con)
ret = -EAGAIN;
goto out_close;
}
+
if (con->nodeid == 0) {
ret = -EINVAL;
goto out_close;
}
- if (con->rx_page == NULL) {
- /*
- * This doesn't need to be atomic, but I think it should
- * improve performance if it is.
- */
- con->rx_page = alloc_page(GFP_ATOMIC);
- if (con->rx_page == NULL)
+ /* realloc if we get new buffer size to read out */
+ buflen = dlm_config.ci_buffer_size;
+ if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
+ ret = con_realloc_receive_buf(con, buflen);
+ if (ret < 0)
goto out_resched;
- cbuf_init(&con->cb, PAGE_SIZE);
}
- /*
- * iov[0] is the bit of the circular buffer between the current end
- * point (cb.base + cb.len) and the end of the buffer.
- */
- iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
- iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
- iov[1].iov_len = 0;
- nvec = 1;
-
- /*
- * iov[1] is the bit of the circular buffer between the start of the
- * buffer and the start of the currently used section (cb.base)
+ /* calculate new buffer parameter regarding last receive and
+ * possible leftover bytes
*/
- if (cbuf_data(&con->cb) >= con->cb.base) {
- iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb);
- iov[1].iov_len = con->cb.base;
- iov[1].iov_base = page_address(con->rx_page);
- nvec = 2;
- }
- len = iov[0].iov_len + iov[1].iov_len;
- iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len);
+ iov.iov_base = con->rx_buf + con->rx_leftover;
+ iov.iov_len = con->rx_buflen - con->rx_leftover;
- r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+ ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+ msg.msg_flags);
if (ret <= 0)
goto out_close;
- else if (ret == len)
+ else if (ret == iov.iov_len)
call_again_soon = 1;
- cbuf_add(&con->cb, ret);
- ret = dlm_process_incoming_buffer(con->nodeid,
- page_address(con->rx_page),
- con->cb.base, con->cb.len,
- PAGE_SIZE);
- if (ret < 0) {
- log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d",
- ret, page_address(con->rx_page), con->cb.base,
- con->cb.len, r);
- cbuf_eat(&con->cb, r);
- } else {
- cbuf_eat(&con->cb, ret);
- }
+ /* new buflen according readed bytes and leftover from last receive */
+ buflen = ret + con->rx_leftover;
+ ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
+ if (ret < 0)
+ goto out_close;
- if (cbuf_empty(&con->cb) && !call_again_soon) {
- __free_page(con->rx_page);
- con->rx_page = NULL;
+ /* calculate leftover bytes from process and put it into begin of
+ * the receive buffer, so next receive we have the full message
+ * at the start address of the receive buffer.
+ */
+ con->rx_leftover = buflen - ret;
+ if (con->rx_leftover) {
+ memmove(con->rx_buf, con->rx_buf + ret,
+ con->rx_leftover);
+ call_again_soon = true;
}
if (call_again_soon)
goto out_resched;
+
mutex_unlock(&con->sock_mutex);
return 0;
@@ -850,6 +822,17 @@ static int accept_from_sock(struct connection *con)
result = -ENOMEM;
goto accept_err;
}
+
+ othercon->rx_buflen = dlm_config.ci_buffer_size;
+ othercon->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
+ if (!othercon->rx_buf) {
+ mutex_unlock(&newcon->sock_mutex);
+ kfree(othercon);
+ log_print("failed to allocate incoming socket receive buffer");
+ result = -ENOMEM;
+ goto accept_err;
+ }
+
othercon->nodeid = nodeid;
othercon->rx_action = receive_from_sock;
mutex_init(&othercon->sock_mutex);
@@ -1608,6 +1591,7 @@ static void connection_release(struct rcu_head *rcu)
struct connection *con = container_of(rcu, struct connection, rcu);
clean_one_writequeue(con);
+ kfree(con->rx_buf);
kfree(con);
}
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 921322d133e3d..fde3a6afe4bea 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -22,114 +22,84 @@
* into packets and sends them to the comms layer.
*/
+#include <asm/unaligned.h>
+
#include "dlm_internal.h"
#include "lowcomms.h"
#include "config.h"
#include "lock.h"
#include "midcomms.h"
-
-static void copy_from_cb(void *dst, const void *base, unsigned offset,
- unsigned len, unsigned limit)
-{
- unsigned copy = len;
-
- if ((copy + offset) > limit)
- copy = limit - offset;
- memcpy(dst, base + offset, copy);
- len -= copy;
- if (len)
- memcpy(dst + copy, base, len);
-}
-
/*
* Called from the low-level comms layer to process a buffer of
* commands.
- *
- * Only complete messages are processed here, any "spare" bytes from
- * the end of a buffer are saved and tacked onto the front of the next
- * message that comes in. I doubt this will happen very often but we
- * need to be able to cope with it and I don't want the task to be waiting
- * for packets to come in when there is useful work to be done.
*/
-int dlm_process_incoming_buffer(int nodeid, const void *base,
- unsigned offset, unsigned len, unsigned limit)
+int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
- union {
- unsigned char __buf[DLM_INBUF_LEN];
- /* this is to force proper alignment on some arches */
- union dlm_packet p;
- } __tmp;
- union dlm_packet *p = &__tmp.p;
- int ret = 0;
- int err = 0;
+ const unsigned char *ptr = buf;
+ const struct dlm_header *hd;
uint16_t msglen;
- uint32_t lockspace;
-
- while (len > sizeof(struct dlm_header)) {
-
- /* Copy just the header to check the total length. The
- message may wrap around the end of the buffer back to the
- start, so we need to use a temp buffer and copy_from_cb. */
-
- copy_from_cb(p, base, offset, sizeof(struct dlm_header),
- limit);
-
- msglen = le16_to_cpu(p->header.h_length);
- lockspace = p->header.h_lockspace;
+ int ret = 0;
- err = -EINVAL;
- if (msglen < sizeof(struct dlm_header))
- break;
- if (p->header.h_cmd == DLM_MSG) {
- if (msglen < sizeof(struct dlm_message))
- break;
- } else {
- if (msglen < sizeof(struct dlm_rcom))
- break;
- }
- err = -E2BIG;
- if (msglen > dlm_config.ci_buffer_size) {
- log_print("message size %d from %d too big, buf len %d",
- msglen, nodeid, len);
- break;
+ while (len >= sizeof(struct dlm_header)) {
+ hd = (struct dlm_header *)ptr;
+
+ /* no message should be more than this otherwise we
+ * cannot deliver this message to upper layers
+ */
+ msglen = get_unaligned_le16(&hd->h_length);
+ if (msglen > DEFAULT_BUFFER_SIZE) {
+ log_print("received invalid length header: %u, will abort message parsing",
+ msglen);
+ return -EBADMSG;
}
- err = 0;
-
- /* If only part of the full message is contained in this
- buffer, then do nothing and wait for lowcomms to call
- us again later with more data. We return 0 meaning
- we've consumed none of the input buffer. */
+ /* caller will take care that leftover
+ * will be parsed next call with more data
+ */
if (msglen > len)
break;
- /* Allocate a larger temp buffer if the full message won't fit
- in the buffer on the stack (which should work for most
- ordinary messages). */
-
- if (msglen > sizeof(__tmp) && p == &__tmp.p) {
- p = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
- if (p == NULL)
- return ret;
- }
+ switch (hd->h_cmd) {
+ case DLM_MSG:
+ if (msglen < sizeof(struct dlm_message)) {
+ log_print("dlm msg too small: %u, will skip this message",
+ msglen);
+ goto skip;
+ }
- copy_from_cb(p, base, offset, msglen, limit);
+ break;
+ case DLM_RCOM:
+ if (msglen < sizeof(struct dlm_rcom)) {
+ log_print("dlm rcom msg too small: %u, will skip this message",
+ msglen);
+ goto skip;
+ }
- BUG_ON(lockspace != p->header.h_lockspace);
+ break;
+ default:
+ log_print("unsupported h_cmd received: %u, will skip this message",
+ hd->h_cmd);
+ goto skip;
+ }
+ /* for aligned memory access, we just copy current message
+ * to begin of the buffer which contains already parsed buffer
+ * data and should provide align access for upper layers
+ * because the start address of the buffer has a aligned
+ * address. This memmove can be removed when the upperlayer
+ * is capable of unaligned memory access.
+ */
+ memmove(buf, ptr, msglen);
+ dlm_receive_buffer((union dlm_packet *)buf, nodeid);
+
+skip:
ret += msglen;
- offset += msglen;
- offset &= (limit - 1);
len -= msglen;
-
- dlm_receive_buffer(p, nodeid);
+ ptr += msglen;
}
- if (p != &__tmp.p)
- kfree(p);
-
- return err ? err : ret;
+ return ret;
}
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index 2e122e81c8d0d..61e90a9218493 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -12,8 +12,7 @@
#ifndef __MIDCOMMS_DOT_H__
#define __MIDCOMMS_DOT_H__
-int dlm_process_incoming_buffer(int nodeid, const void *base, unsigned offset,
- unsigned len, unsigned limit);
+int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
#endif /* __MIDCOMMS_DOT_H__ */
--
2.26.2
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [Cluster-devel] [PATCH dlm/next 6/6] fs: dlm: rework transmit handling
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
` (4 preceding siblings ...)
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 5/6] fs: dlm: rework receive handling Alexander Aring
@ 2020-09-18 14:27 ` Alexander Aring
2020-09-22 20:52 ` [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and " Alexander Ahring Oder Aring
6 siblings, 0 replies; 8+ messages in thread
From: Alexander Aring @ 2020-09-18 14:27 UTC (permalink / raw)
To: cluster-devel.redhat.com
I detected that dlm messages are transmitted sometimes in a incorrect
order according to the writequeue locked area of commit_buffer(). This
patch replace the current sending mechanism by a basic per dlm message
queue which handles a entry of the writequeue as per dlm message. The
sending mechanism will then use a scatter/gather of the first n writequeue
entries. However that will ensure we send dlm messages in order according
to the commit_buffer() call. The lock of the writequeue was changed that
readers are lockless with help of RCU.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
fs/dlm/lowcomms.c | 268 ++++++++++++++++++++++------------------------
1 file changed, 131 insertions(+), 137 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index c7b6e36845404..2580ef80b8855 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -61,8 +61,8 @@
#define NEEDED_RMEM (4*1024*1024)
#define CONN_HASH_SIZE 32
-/* Number of messages to send before rescheduling */
-#define MAX_SEND_MSG_COUNT 25
+/* How many dlm messages should be used for scatter/gather to send */
+#define MAX_SEND_IOV_COUNT 25
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct connection {
@@ -83,6 +83,10 @@ struct connection {
int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
+ struct writequeue_entry *tx_queue[MAX_SEND_IOV_COUNT];
+ struct kvec tx_iov[MAX_SEND_IOV_COUNT];
+ struct msghdr tx_msg;
+ int tx_queue_cnt;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@@ -99,13 +103,12 @@ struct connection {
/* An entry waiting to be sent */
struct writequeue_entry {
- struct list_head list;
- struct page *page;
- int offset;
- int len;
- int end;
- int users;
struct connection *con;
+ unsigned char *buf;
+ int len;
+
+ struct list_head list;
+ struct rcu_head rcu;
};
struct dlm_node_addr {
@@ -233,6 +236,19 @@ static void foreach_conn(void (*conn_func)(struct connection *c))
srcu_read_unlock(&connections_srcu, idx);
}
+/* can only be used to check if something is pending to transmit */
+static bool writequeue_has_entries(struct connection *con)
+{
+ void *e;
+
+ rcu_read_lock();
+ e = list_first_or_null_rcu(&con->writequeue,
+ struct writequeue_entry, list);
+ rcu_read_unlock();
+
+ return !!e;
+}
+
static struct dlm_node_addr *find_node_addr(int nodeid)
{
struct dlm_node_addr *na;
@@ -588,6 +604,15 @@ static void close_connection(struct connection *con, bool and_other,
close_connection(con->othercon, false, true, true);
}
+ /* ideally we would check which messages inside the writequeue
+ * was sent and which was not, if we didn't send everything in
+ * one sendmsg() call. However on a sock_release() we will
+ * drop messages anyway, there need to be other mechanisms
+ * to check on duplicates or retranmissions. Setting
+ * con->tx_queue_cnt to zero should ensure we doesn't start
+ * to transmit in the middle of an dlm message.
+ */
+ con->tx_queue_cnt = 0;
con->rx_leftover = 0;
con->retries = 0;
mutex_unlock(&con->sock_mutex);
@@ -885,28 +910,23 @@ static int accept_from_sock(struct connection *con)
return result;
}
-static void free_entry(struct writequeue_entry *e)
+static void writequeue_entry_release(struct rcu_head *rcu)
{
- __free_page(e->page);
+ struct writequeue_entry *e = container_of(rcu, struct writequeue_entry,
+ rcu);
+
+ kfree(e->buf);
kfree(e);
}
-/*
- * writequeue_entry_complete - try to delete and free write queue entry
- * @e: write queue entry to try to delete
- * @completed: bytes completed
- *
- * writequeue_lock must be held.
- */
-static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
+static void writequeue_entry_complete(struct connection *con,
+ struct writequeue_entry *e)
{
- e->offset += completed;
- e->len -= completed;
+ spin_lock(&con->writequeue_lock);
+ list_del_rcu(&e->list);
+ spin_unlock(&con->writequeue_lock);
- if (e->len == 0 && e->users == 0) {
- list_del(&e->list);
- free_entry(e);
- }
+ call_rcu(&e->rcu, writequeue_entry_release);
}
/*
@@ -1306,149 +1326,123 @@ static int tcp_listen_for_all(void)
return result;
}
-
-
-static struct writequeue_entry *new_writequeue_entry(struct connection *con,
- gfp_t allocation)
+void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
{
- struct writequeue_entry *entry;
-
- entry = kmalloc(sizeof(struct writequeue_entry), allocation);
- if (!entry)
- return NULL;
+ struct writequeue_entry *e;
+ struct connection *con;
+ void *buf;
- entry->page = alloc_page(allocation);
- if (!entry->page) {
- kfree(entry);
+ if (len > DEFAULT_BUFFER_SIZE) {
+ log_print("Error ULP tried to allocate more than capable message size");
return NULL;
}
- entry->offset = 0;
- entry->len = 0;
- entry->end = 0;
- entry->users = 0;
- entry->con = con;
-
- return entry;
-}
-
-void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
-{
- struct connection *con;
- struct writequeue_entry *e;
- int offset = 0;
-
con = nodeid2con(nodeid, allocation);
if (!con)
return NULL;
- spin_lock(&con->writequeue_lock);
- e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
- if ((&e->list == &con->writequeue) ||
- (PAGE_SIZE - e->end < len)) {
- e = NULL;
- } else {
- offset = e->end;
- e->end += len;
- e->users++;
- }
- spin_unlock(&con->writequeue_lock);
+ e = kzalloc(sizeof(*e), allocation);
+ if (!e)
+ return NULL;
- if (e) {
- got_one:
- *ppc = page_address(e->page) + offset;
- return e;
+ buf = kmalloc(len, allocation);
+ if (!buf) {
+ kfree(e);
+ return NULL;
}
- e = new_writequeue_entry(con, allocation);
- if (e) {
- spin_lock(&con->writequeue_lock);
- offset = e->end;
- e->end += len;
- e->users++;
- list_add_tail(&e->list, &con->writequeue);
- spin_unlock(&con->writequeue_lock);
- goto got_one;
- }
- return NULL;
+ e->con = con;
+ e->buf = buf;
+ e->len = len;
+
+ *ppc = buf;
+
+ return e;
}
void dlm_lowcomms_commit_buffer(void *mh)
{
- struct writequeue_entry *e = (struct writequeue_entry *)mh;
+ struct writequeue_entry *e = mh;
struct connection *con = e->con;
- int users;
spin_lock(&con->writequeue_lock);
- users = --e->users;
- if (users)
- goto out;
- e->len = e->end - e->offset;
+ list_add_tail_rcu(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock);
queue_work(send_workqueue, &con->swork);
- return;
+}
-out:
- spin_unlock(&con->writequeue_lock);
- return;
+static void send_to_sock_setup_msg(struct connection *con)
+{
+ struct writequeue_entry *e;
+ size_t size = 0;
+ int num = 0;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(e, &con->writequeue, list) {
+ if (num == MAX_SEND_IOV_COUNT)
+ break;
+
+ con->tx_iov[num].iov_base = e->buf;
+ con->tx_iov[num].iov_len = e->len;
+ con->tx_queue[num] = e;
+
+ size += con->tx_iov[num].iov_len;
+ num++;
+ }
+ rcu_read_unlock();
+
+ memset(&con->tx_msg, 0, sizeof(con->tx_msg));
+ con->tx_msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+ iov_iter_kvec(&con->tx_msg.msg_iter, WRITE, con->tx_iov, num, size);
+ con->tx_queue_cnt = num;
}
/* Send a message */
static void send_to_sock(struct connection *con)
{
- int ret = 0;
- const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
- struct writequeue_entry *e;
- int len, offset;
- int count = 0;
+ int ret, i;
mutex_lock(&con->sock_mutex);
if (con->sock == NULL)
goto out_connect;
- spin_lock(&con->writequeue_lock);
- for (;;) {
- e = list_entry(con->writequeue.next, struct writequeue_entry,
- list);
- if ((struct list_head *) e == &con->writequeue)
- break;
-
- len = e->len;
- offset = e->offset;
- BUG_ON(len == 0 && e->users == 0);
- spin_unlock(&con->writequeue_lock);
-
- ret = 0;
- if (len) {
- ret = kernel_sendpage(con->sock, e->page, offset, len,
- msg_flags);
- if (ret == -EAGAIN || ret == 0) {
- if (ret == -EAGAIN &&
- test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
- !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
- /* Notify TCP that we're limited by the
- * application window size.
- */
- set_bit(SOCK_NOSPACE, &con->sock->flags);
- con->sock->sk->sk_write_pending++;
- }
- cond_resched();
- goto out;
- } else if (ret < 0)
- goto send_error;
+ /* fill up msghdr with messages from writequeue */
+ if (!con->tx_queue_cnt)
+ send_to_sock_setup_msg(con);
+
+ ret = sock_sendmsg(con->sock, &con->tx_msg);
+ if (ret == -EAGAIN) {
+ if (ret == -EAGAIN &&
+ test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
+ !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
+ /* Notify TCP that we're limited by the
+ * application window size.
+ */
+ set_bit(SOCK_NOSPACE, &con->sock->flags);
+ con->sock->sk->sk_write_pending++;
}
+ queue_work(send_workqueue, &con->swork);
+ cond_resched();
+ goto out;
+ } else if (ret < 0) {
+ goto send_error;
+ }
- /* Don't starve people filling buffers */
- if (++count >= MAX_SEND_MSG_COUNT) {
- cond_resched();
- count = 0;
- }
+ /* if we have nothing left to write cleanup messages from writequeue */
+ if (!msg_data_left(&con->tx_msg)) {
+ for (i = 0; i < con->tx_queue_cnt; i++)
+ writequeue_entry_complete(con, con->tx_queue[i]);
+
+ con->tx_queue_cnt = 0;
- spin_lock(&con->writequeue_lock);
- writequeue_entry_complete(e, ret);
+ if (writequeue_has_entries(con))
+ queue_work(send_workqueue, &con->swork);
+ } else {
+ /* call again sendmsg() we didn't send everything */
+ queue_work(send_workqueue, &con->swork);
}
- spin_unlock(&con->writequeue_lock);
+
out:
mutex_unlock(&con->sock_mutex);
return;
@@ -1469,14 +1463,13 @@ static void send_to_sock(struct connection *con)
static void clean_one_writequeue(struct connection *con)
{
- struct writequeue_entry *e, *safe;
+ struct writequeue_entry *e;
- spin_lock(&con->writequeue_lock);
- list_for_each_entry_safe(e, safe, &con->writequeue, list) {
- list_del(&e->list);
- free_entry(e);
+ rcu_read_lock();
+ list_for_each_entry_rcu(e, &con->writequeue, list) {
+ writequeue_entry_complete(con, e);
}
- spin_unlock(&con->writequeue_lock);
+ rcu_read_unlock();
}
/* Called from recovery when it knows that a node has
@@ -1527,7 +1520,8 @@ static void process_send_sockets(struct work_struct *work)
clear_bit(CF_WRITE_PENDING, &con->flags);
if (con->sock == NULL) /* not mutex protected so check it inside too */
con->connect_action(con);
- if (!list_empty(&con->writequeue))
+
+ if (writequeue_has_entries(con))
send_to_sock(con);
}
--
2.26.2
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
` (5 preceding siblings ...)
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 6/6] fs: dlm: rework transmit handling Alexander Aring
@ 2020-09-22 20:52 ` Alexander Ahring Oder Aring
6 siblings, 0 replies; 8+ messages in thread
From: Alexander Ahring Oder Aring @ 2020-09-22 20:52 UTC (permalink / raw)
To: cluster-devel.redhat.com
Hi David,
On Fri, Sep 18, 2020 at 10:27 AM Alexander Aring <aahringo@redhat.com> wrote:
>
> Hi,
>
> This patch series contains a huge change regarding to the
> receive/transmit handling of dlm. As I placed sequence numbering on dlm
> messages I expierenced that dlm messages can be transmitted in an
> incorrect order. I changed the transmit handling that the writequeue
> handles entries per dlm message buffer and sendmsg() will use
> scatter/gather to transmit a bunch of them. At the receive handling I
> removed some allocations and copies into different buffers. There is
> only one receive buffer now which fill be get filled and the upper layer
> will working on that buffer as well.
I got a review today and we should stick to sendpage() functionality.
Please wait for a resend of v2. Meanwhile I also detected issues with
the mark handling which I want to fix as well.
Thanks.
- Alex
^ permalink raw reply [flat|nested] 8+ messages in thread
end of thread, other threads:[~2020-09-22 20:52 UTC | newest]
Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 1/6] fs: dlm: remove lock dependency warning Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 2/6] fs: dlm: handle range check as callback Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 3/6] fs: dlm: disallow buffer size below default Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 4/6] fs: dlm: release connection with call_rcu Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 5/6] fs: dlm: rework receive handling Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 6/6] fs: dlm: rework transmit handling Alexander Aring
2020-09-22 20:52 ` [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and " Alexander Ahring Oder Aring
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).