cluster-devel.redhat.com archive mirror
 help / color / mirror / Atom feed
From: Alexander Aring <aahringo@redhat.com>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] [PATCH dlm/next 6/6] fs: dlm: rework transmit handling
Date: Fri, 18 Sep 2020 10:27:28 -0400	[thread overview]
Message-ID: <20200918142728.147392-7-aahringo@redhat.com> (raw)
In-Reply-To: <20200918142728.147392-1-aahringo@redhat.com>

I detected that dlm messages are transmitted sometimes in a incorrect
order according to the writequeue locked area of commit_buffer(). This
patch replace the current sending mechanism by a basic per dlm message
queue which handles a entry of the writequeue as per dlm message. The
sending mechanism will then use a scatter/gather of the first n writequeue
entries. However that will ensure we send dlm messages in order according
to the commit_buffer() call. The lock of the writequeue was changed that
readers are lockless with help of RCU.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
---
 fs/dlm/lowcomms.c | 268 ++++++++++++++++++++++------------------------
 1 file changed, 131 insertions(+), 137 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index c7b6e36845404..2580ef80b8855 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -61,8 +61,8 @@
 #define NEEDED_RMEM (4*1024*1024)
 #define CONN_HASH_SIZE 32
 
-/* Number of messages to send before rescheduling */
-#define MAX_SEND_MSG_COUNT 25
+/* How many dlm messages should be used for scatter/gather to send */
+#define MAX_SEND_IOV_COUNT 25
 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
 
 struct connection {
@@ -83,6 +83,10 @@ struct connection {
 	int (*rx_action) (struct connection *);	/* What to do when active */
 	void (*connect_action) (struct connection *);	/* What to do to connect */
 	void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
+	struct writequeue_entry *tx_queue[MAX_SEND_IOV_COUNT];
+	struct kvec tx_iov[MAX_SEND_IOV_COUNT];
+	struct msghdr tx_msg;
+	int tx_queue_cnt;
 	int retries;
 #define MAX_CONNECT_RETRIES 3
 	struct hlist_node list;
@@ -99,13 +103,12 @@ struct connection {
 
 /* An entry waiting to be sent */
 struct writequeue_entry {
-	struct list_head list;
-	struct page *page;
-	int offset;
-	int len;
-	int end;
-	int users;
 	struct connection *con;
+	unsigned char *buf;
+	int len;
+
+	struct list_head list;
+	struct rcu_head rcu;
 };
 
 struct dlm_node_addr {
@@ -233,6 +236,19 @@ static void foreach_conn(void (*conn_func)(struct connection *c))
 	srcu_read_unlock(&connections_srcu, idx);
 }
 
+/* can only be used to check if something is pending to transmit */
+static bool writequeue_has_entries(struct connection *con)
+{
+	void *e;
+
+	rcu_read_lock();
+	e = list_first_or_null_rcu(&con->writequeue,
+				   struct writequeue_entry, list);
+	rcu_read_unlock();
+
+	return !!e;
+}
+
 static struct dlm_node_addr *find_node_addr(int nodeid)
 {
 	struct dlm_node_addr *na;
@@ -588,6 +604,15 @@ static void close_connection(struct connection *con, bool and_other,
 		close_connection(con->othercon, false, true, true);
 	}
 
+	/* ideally we would check which messages inside the writequeue
+	 * was sent and which was not, if we didn't send everything in
+	 * one sendmsg() call. However on a sock_release() we will
+	 * drop messages anyway, there need to be other mechanisms
+	 * to check on duplicates or retranmissions. Setting
+	 * con->tx_queue_cnt to zero should ensure we doesn't start
+	 * to transmit in the middle of an dlm message.
+	 */
+	con->tx_queue_cnt = 0;
 	con->rx_leftover = 0;
 	con->retries = 0;
 	mutex_unlock(&con->sock_mutex);
@@ -885,28 +910,23 @@ static int accept_from_sock(struct connection *con)
 	return result;
 }
 
-static void free_entry(struct writequeue_entry *e)
+static void writequeue_entry_release(struct rcu_head *rcu)
 {
-	__free_page(e->page);
+	struct writequeue_entry *e = container_of(rcu, struct writequeue_entry,
+						  rcu);
+
+	kfree(e->buf);
 	kfree(e);
 }
 
-/*
- * writequeue_entry_complete - try to delete and free write queue entry
- * @e: write queue entry to try to delete
- * @completed: bytes completed
- *
- * writequeue_lock must be held.
- */
-static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
+static void writequeue_entry_complete(struct connection *con,
+				      struct writequeue_entry *e)
 {
-	e->offset += completed;
-	e->len -= completed;
+	spin_lock(&con->writequeue_lock);
+	list_del_rcu(&e->list);
+	spin_unlock(&con->writequeue_lock);
 
-	if (e->len == 0 && e->users == 0) {
-		list_del(&e->list);
-		free_entry(e);
-	}
+	call_rcu(&e->rcu, writequeue_entry_release);
 }
 
 /*
@@ -1306,149 +1326,123 @@ static int tcp_listen_for_all(void)
 	return result;
 }
 
-
-
-static struct writequeue_entry *new_writequeue_entry(struct connection *con,
-						     gfp_t allocation)
+void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
 {
-	struct writequeue_entry *entry;
-
-	entry = kmalloc(sizeof(struct writequeue_entry), allocation);
-	if (!entry)
-		return NULL;
+	struct writequeue_entry *e;
+	struct connection *con;
+	void *buf;
 
-	entry->page = alloc_page(allocation);
-	if (!entry->page) {
-		kfree(entry);
+	if (len > DEFAULT_BUFFER_SIZE) {
+		log_print("Error ULP tried to allocate more than capable message size");
 		return NULL;
 	}
 
-	entry->offset = 0;
-	entry->len = 0;
-	entry->end = 0;
-	entry->users = 0;
-	entry->con = con;
-
-	return entry;
-}
-
-void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
-{
-	struct connection *con;
-	struct writequeue_entry *e;
-	int offset = 0;
-
 	con = nodeid2con(nodeid, allocation);
 	if (!con)
 		return NULL;
 
-	spin_lock(&con->writequeue_lock);
-	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
-	if ((&e->list == &con->writequeue) ||
-	    (PAGE_SIZE - e->end < len)) {
-		e = NULL;
-	} else {
-		offset = e->end;
-		e->end += len;
-		e->users++;
-	}
-	spin_unlock(&con->writequeue_lock);
+	e = kzalloc(sizeof(*e), allocation);
+	if (!e)
+		return NULL;
 
-	if (e) {
-	got_one:
-		*ppc = page_address(e->page) + offset;
-		return e;
+	buf = kmalloc(len, allocation);
+	if (!buf) {
+		kfree(e);
+		return NULL;
 	}
 
-	e = new_writequeue_entry(con, allocation);
-	if (e) {
-		spin_lock(&con->writequeue_lock);
-		offset = e->end;
-		e->end += len;
-		e->users++;
-		list_add_tail(&e->list, &con->writequeue);
-		spin_unlock(&con->writequeue_lock);
-		goto got_one;
-	}
-	return NULL;
+	e->con = con;
+	e->buf = buf;
+	e->len = len;
+
+	*ppc = buf;
+
+	return e;
 }
 
 void dlm_lowcomms_commit_buffer(void *mh)
 {
-	struct writequeue_entry *e = (struct writequeue_entry *)mh;
+	struct writequeue_entry *e = mh;
 	struct connection *con = e->con;
-	int users;
 
 	spin_lock(&con->writequeue_lock);
-	users = --e->users;
-	if (users)
-		goto out;
-	e->len = e->end - e->offset;
+	list_add_tail_rcu(&e->list, &con->writequeue);
 	spin_unlock(&con->writequeue_lock);
 
 	queue_work(send_workqueue, &con->swork);
-	return;
+}
 
-out:
-	spin_unlock(&con->writequeue_lock);
-	return;
+static void send_to_sock_setup_msg(struct connection *con)
+{
+	struct writequeue_entry *e;
+	size_t size = 0;
+	int num = 0;
+
+	rcu_read_lock();
+	list_for_each_entry_rcu(e, &con->writequeue, list) {
+		if (num == MAX_SEND_IOV_COUNT)
+			break;
+
+		con->tx_iov[num].iov_base = e->buf;
+		con->tx_iov[num].iov_len = e->len;
+		con->tx_queue[num] = e;
+
+		size += con->tx_iov[num].iov_len;
+		num++;
+	}
+	rcu_read_unlock();
+
+	memset(&con->tx_msg, 0, sizeof(con->tx_msg));
+	con->tx_msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+	iov_iter_kvec(&con->tx_msg.msg_iter, WRITE, con->tx_iov, num, size);
+	con->tx_queue_cnt = num;
 }
 
 /* Send a message */
 static void send_to_sock(struct connection *con)
 {
-	int ret = 0;
-	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
-	struct writequeue_entry *e;
-	int len, offset;
-	int count = 0;
+	int ret, i;
 
 	mutex_lock(&con->sock_mutex);
 	if (con->sock == NULL)
 		goto out_connect;
 
-	spin_lock(&con->writequeue_lock);
-	for (;;) {
-		e = list_entry(con->writequeue.next, struct writequeue_entry,
-			       list);
-		if ((struct list_head *) e == &con->writequeue)
-			break;
-
-		len = e->len;
-		offset = e->offset;
-		BUG_ON(len == 0 && e->users == 0);
-		spin_unlock(&con->writequeue_lock);
-
-		ret = 0;
-		if (len) {
-			ret = kernel_sendpage(con->sock, e->page, offset, len,
-					      msg_flags);
-			if (ret == -EAGAIN || ret == 0) {
-				if (ret == -EAGAIN &&
-				    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
-				    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
-					/* Notify TCP that we're limited by the
-					 * application window size.
-					 */
-					set_bit(SOCK_NOSPACE, &con->sock->flags);
-					con->sock->sk->sk_write_pending++;
-				}
-				cond_resched();
-				goto out;
-			} else if (ret < 0)
-				goto send_error;
+	/* fill up msghdr with messages from writequeue */
+	if (!con->tx_queue_cnt)
+		send_to_sock_setup_msg(con);
+
+	ret = sock_sendmsg(con->sock, &con->tx_msg);
+	if (ret == -EAGAIN) {
+		if (ret == -EAGAIN &&
+		    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
+		    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
+			/* Notify TCP that we're limited by the
+			 * application window size.
+			 */
+			set_bit(SOCK_NOSPACE, &con->sock->flags);
+			con->sock->sk->sk_write_pending++;
 		}
+		queue_work(send_workqueue, &con->swork);
+		cond_resched();
+		goto out;
+	} else if (ret < 0) {
+		goto send_error;
+	}
 
-		/* Don't starve people filling buffers */
-		if (++count >= MAX_SEND_MSG_COUNT) {
-			cond_resched();
-			count = 0;
-		}
+	/* if we have nothing left to write cleanup messages from writequeue */
+	if (!msg_data_left(&con->tx_msg)) {
+		for (i = 0; i < con->tx_queue_cnt; i++)
+			writequeue_entry_complete(con, con->tx_queue[i]);
+
+		con->tx_queue_cnt = 0;
 
-		spin_lock(&con->writequeue_lock);
-		writequeue_entry_complete(e, ret);
+		if (writequeue_has_entries(con))
+			queue_work(send_workqueue, &con->swork);
+	} else {
+		/* call again sendmsg() we didn't send everything */
+		queue_work(send_workqueue, &con->swork);
 	}
-	spin_unlock(&con->writequeue_lock);
+
 out:
 	mutex_unlock(&con->sock_mutex);
 	return;
@@ -1469,14 +1463,13 @@ static void send_to_sock(struct connection *con)
 
 static void clean_one_writequeue(struct connection *con)
 {
-	struct writequeue_entry *e, *safe;
+	struct writequeue_entry *e;
 
-	spin_lock(&con->writequeue_lock);
-	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
-		list_del(&e->list);
-		free_entry(e);
+	rcu_read_lock();
+	list_for_each_entry_rcu(e, &con->writequeue, list) {
+		writequeue_entry_complete(con, e);
 	}
-	spin_unlock(&con->writequeue_lock);
+	rcu_read_unlock();
 }
 
 /* Called from recovery when it knows that a node has
@@ -1527,7 +1520,8 @@ static void process_send_sockets(struct work_struct *work)
 	clear_bit(CF_WRITE_PENDING, &con->flags);
 	if (con->sock == NULL) /* not mutex protected so check it inside too */
 		con->connect_action(con);
-	if (!list_empty(&con->writequeue))
+
+	if (writequeue_has_entries(con))
 		send_to_sock(con);
 }
 
-- 
2.26.2



  parent reply	other threads:[~2020-09-18 14:27 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-09-18 14:27 [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 1/6] fs: dlm: remove lock dependency warning Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 2/6] fs: dlm: handle range check as callback Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 3/6] fs: dlm: disallow buffer size below default Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 4/6] fs: dlm: release connection with call_rcu Alexander Aring
2020-09-18 14:27 ` [Cluster-devel] [PATCH dlm/next 5/6] fs: dlm: rework receive handling Alexander Aring
2020-09-18 14:27 ` Alexander Aring [this message]
2020-09-22 20:52 ` [Cluster-devel] [PATCH dlm/next 0/6] fs: dlm: rework receive and transmit handling Alexander Ahring Oder Aring

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200918142728.147392-7-aahringo@redhat.com \
    --to=aahringo@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).