netdev.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH net-next 0/4] net/smc: splice implementation
@ 2018-05-03 16:12 Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
                   ` (4 more replies)
  0 siblings, 5 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Ursula Braun <ursula.braun@de.ibm.com>

Dave,

Stefan comes up with an smc implementation for splice(). The first
three patches are preparational patches, the 4th patch implements
splice().

Thanks, Ursula

Stefan Raspl (4):
  smc: simplify abort logic
  smc: make smc_rx_wait_data() generic
  smc: allocate RMBs as compound pages
  smc: add support for splice()

 net/smc/af_smc.c   |  40 +++++++++--
 net/smc/smc.h      |   3 +
 net/smc/smc_core.c |  18 ++---
 net/smc/smc_core.h |   1 +
 net/smc/smc_rx.c   | 200 +++++++++++++++++++++++++++++++++++++++++++----------
 net/smc/smc_rx.h   |  12 +++-
 6 files changed, 219 insertions(+), 55 deletions(-)

-- 
2.13.5

^ permalink raw reply	[flat|nested] 6+ messages in thread

* [PATCH net-next 1/4] smc: simplify abort logic
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
@ 2018-05-03 16:12 ` Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic Ursula Braun
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Some of the conditions to exit recv() are common in two pathes - cleaning up
code by moving the check up so we have it only once.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/smc_rx.c | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index af851d8df1f8..def33fb29ac9 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -112,26 +112,22 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		if (atomic_read(&conn->bytes_to_rcv))
 			goto copy;
 
+		if (sk->sk_shutdown & RCV_SHUTDOWN ||
+		    smc_cdc_rxed_any_close_or_senddone(conn) ||
+		    conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
+			break;
+
 		if (read_done) {
 			if (sk->sk_err ||
 			    sk->sk_state == SMC_CLOSED ||
-			    sk->sk_shutdown & RCV_SHUTDOWN ||
 			    !timeo ||
-			    signal_pending(current) ||
-			    smc_cdc_rxed_any_close_or_senddone(conn) ||
-			    conn->local_tx_ctrl.conn_state_flags.
-			    peer_conn_abort)
+			    signal_pending(current))
 				break;
 		} else {
 			if (sk->sk_err) {
 				read_done = sock_error(sk);
 				break;
 			}
-			if (sk->sk_shutdown & RCV_SHUTDOWN ||
-			    smc_cdc_rxed_any_close_or_senddone(conn) ||
-			    conn->local_tx_ctrl.conn_state_flags.
-			    peer_conn_abort)
-				break;
 			if (sk->sk_state == SMC_CLOSED) {
 				if (!sock_flag(sk, SOCK_DONE)) {
 					/* This occurs when user tries to read
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
@ 2018-05-03 16:12 ` Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 3/4] smc: allocate RMBs as compound pages Ursula Braun
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Turn smc_rx_wait_data into a generic function that can be used at various
instances to wait on traffic to complete with varying criteria.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/af_smc.c |  2 +-
 net/smc/smc_rx.c | 21 +++++++++++----------
 net/smc/smc_rx.h |  8 +++++++-
 3 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index 823ea3371575..747fdf1a2d6f 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -1092,7 +1092,7 @@ static int smc_accept(struct socket *sock, struct socket *new_sock,
 			release_sock(clcsk);
 		} else if (!atomic_read(&smc_sk(nsk)->conn.bytes_to_rcv)) {
 			lock_sock(nsk);
-			smc_rx_wait_data(smc_sk(nsk), &timeo);
+			smc_rx_wait(smc_sk(nsk), &timeo, smc_rx_data_available);
 			release_sock(nsk);
 		}
 	}
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index def33fb29ac9..7b64bee656e8 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -22,11 +22,10 @@
 #include "smc_tx.h" /* smc_tx_consumer_update() */
 #include "smc_rx.h"
 
-/* callback implementation for sk.sk_data_ready()
- * to wakeup rcvbuf consumers that blocked with smc_rx_wait_data().
+/* callback implementation to wakeup consumers blocked with smc_rx_wait().
  * indirectly called by smc_cdc_msg_recv_action().
  */
-static void smc_rx_data_ready(struct sock *sk)
+static void smc_rx_wake_up(struct sock *sk)
 {
 	struct socket_wq *wq;
 
@@ -47,25 +46,27 @@ static void smc_rx_data_ready(struct sock *sk)
 /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
  *   @smc    smc socket
  *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout
+ *   @fcrit  add'l criterion to evaluate as function pointer
  * Returns:
  * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
  * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
  */
-int smc_rx_wait_data(struct smc_sock *smc, long *timeo)
+int smc_rx_wait(struct smc_sock *smc, long *timeo,
+		int (*fcrit)(struct smc_connection *conn))
 {
 	DEFINE_WAIT_FUNC(wait, woken_wake_function);
 	struct smc_connection *conn = &smc->conn;
 	struct sock *sk = &smc->sk;
 	int rc;
 
-	if (atomic_read(&conn->bytes_to_rcv))
+	if (fcrit(conn))
 		return 1;
 	sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
 	add_wait_queue(sk_sleep(sk), &wait);
 	rc = sk_wait_event(sk, timeo,
 			   sk->sk_err ||
 			   sk->sk_shutdown & RCV_SHUTDOWN ||
-			   atomic_read(&conn->bytes_to_rcv) ||
+			   fcrit(conn) ||
 			   smc_cdc_rxed_any_close_or_senddone(conn),
 			   &wait);
 	remove_wait_queue(sk_sleep(sk), &wait);
@@ -146,14 +147,14 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 				return -EAGAIN;
 		}
 
-		if (!atomic_read(&conn->bytes_to_rcv)) {
-			smc_rx_wait_data(smc, &timeo);
+		if (!smc_rx_data_available(conn)) {
+			smc_rx_wait(smc, &timeo, smc_rx_data_available);
 			continue;
 		}
 
 copy:
 		/* initialize variables for 1st iteration of subsequent loop */
-		/* could be just 1 byte, even after smc_rx_wait_data above */
+		/* could be just 1 byte, even after waiting on data above */
 		readable = atomic_read(&conn->bytes_to_rcv);
 		/* not more than what user space asked for */
 		copylen = min_t(size_t, read_remaining, readable);
@@ -213,5 +214,5 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 /* Initialize receive properties on connection establishment. NB: not __init! */
 void smc_rx_init(struct smc_sock *smc)
 {
-	smc->sk.sk_data_ready = smc_rx_data_ready;
+	smc->sk.sk_data_ready = smc_rx_wake_up;
 }
diff --git a/net/smc/smc_rx.h b/net/smc/smc_rx.h
index 0b75a6b470e6..8f9f00997641 100644
--- a/net/smc/smc_rx.h
+++ b/net/smc/smc_rx.h
@@ -20,6 +20,12 @@
 void smc_rx_init(struct smc_sock *smc);
 int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		   int flags);
-int smc_rx_wait_data(struct smc_sock *smc, long *timeo);
+int smc_rx_wait(struct smc_sock *smc, long *timeo,
+		int (*fcrit)(struct smc_connection *conn));
+static inline int smc_rx_data_available(struct smc_connection *conn)
+{
+	return atomic_read(&conn->bytes_to_rcv);
+}
+
 
 #endif /* SMC_RX_H */
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH net-next 3/4] smc: allocate RMBs as compound pages
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic Ursula Braun
@ 2018-05-03 16:12 ` Ursula Braun
  2018-05-03 16:12 ` [PATCH net-next 4/4] smc: add support for splice() Ursula Braun
  2018-05-03 20:31 ` [PATCH net-next 0/4] net/smc: splice implementation David Miller
  4 siblings, 0 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Preparatory work for splice() support.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/smc_core.c | 18 +++++++++---------
 net/smc/smc_core.h |  1 +
 2 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/net/smc/smc_core.c b/net/smc/smc_core.c
index 1f3ea62fac5c..4051fb504393 100644
--- a/net/smc/smc_core.c
+++ b/net/smc/smc_core.c
@@ -274,8 +274,8 @@ static void smc_buf_free(struct smc_buf_desc *buf_desc, struct smc_link *lnk,
 				    DMA_TO_DEVICE);
 	}
 	sg_free_table(&buf_desc->sgt[SMC_SINGLE_LINK]);
-	if (buf_desc->cpu_addr)
-		free_pages((unsigned long)buf_desc->cpu_addr, buf_desc->order);
+	if (buf_desc->pages)
+		__free_pages(buf_desc->pages, buf_desc->order);
 	kfree(buf_desc);
 }
 
@@ -550,16 +550,16 @@ static struct smc_buf_desc *smc_new_buf_create(struct smc_link_group *lgr,
 	if (!buf_desc)
 		return ERR_PTR(-ENOMEM);
 
-	buf_desc->cpu_addr =
-		(void *)__get_free_pages(GFP_KERNEL | __GFP_NOWARN |
-					 __GFP_NOMEMALLOC |
-					 __GFP_NORETRY | __GFP_ZERO,
-					 get_order(bufsize));
-	if (!buf_desc->cpu_addr) {
+	buf_desc->order = get_order(bufsize);
+	buf_desc->pages = alloc_pages(GFP_KERNEL | __GFP_NOWARN |
+				      __GFP_NOMEMALLOC | __GFP_COMP |
+				      __GFP_NORETRY | __GFP_ZERO,
+				      buf_desc->order);
+	if (!buf_desc->pages) {
 		kfree(buf_desc);
 		return ERR_PTR(-EAGAIN);
 	}
-	buf_desc->order = get_order(bufsize);
+	buf_desc->cpu_addr = (void *)page_address(buf_desc->pages);
 
 	/* build the sg table from the pages */
 	lnk = &lgr->lnk[SMC_SINGLE_LINK];
diff --git a/net/smc/smc_core.h b/net/smc/smc_core.h
index 97339f03ba79..cdf800f1fae7 100644
--- a/net/smc/smc_core.h
+++ b/net/smc/smc_core.h
@@ -120,6 +120,7 @@ struct smc_link {
 struct smc_buf_desc {
 	struct list_head	list;
 	void			*cpu_addr;	/* virtual address of buffer */
+	struct page		*pages;
 	struct sg_table		sgt[SMC_LINKS_PER_LGR_MAX];/* virtual buffer */
 	struct ib_mr		*mr_rx[SMC_LINKS_PER_LGR_MAX];
 						/* for rmb only: memory region
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH net-next 4/4] smc: add support for splice()
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
                   ` (2 preceding siblings ...)
  2018-05-03 16:12 ` [PATCH net-next 3/4] smc: allocate RMBs as compound pages Ursula Braun
@ 2018-05-03 16:12 ` Ursula Braun
  2018-05-03 20:31 ` [PATCH net-next 0/4] net/smc: splice implementation David Miller
  4 siblings, 0 replies; 6+ messages in thread
From: Ursula Braun @ 2018-05-03 16:12 UTC (permalink / raw)
  To: davem; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl, ubraun

From: Stefan Raspl <stefan.raspl@linux.ibm.com>

Provide an implementation for splice() when we are using SMC. See
smc_splice_read() for further details.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com><
---
 net/smc/af_smc.c |  38 +++++++++++--
 net/smc/smc.h    |   3 +
 net/smc/smc_rx.c | 163 +++++++++++++++++++++++++++++++++++++++++++++++++------
 net/smc/smc_rx.h |   6 +-
 4 files changed, 185 insertions(+), 25 deletions(-)

diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index 747fdf1a2d6f..553fe4eb4066 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -1166,10 +1166,12 @@ static int smc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 		goto out;
 	}
 
-	if (smc->use_fallback)
+	if (smc->use_fallback) {
 		rc = smc->clcsock->ops->recvmsg(smc->clcsock, msg, len, flags);
-	else
-		rc = smc_rx_recvmsg(smc, msg, len, flags);
+	} else {
+		msg->msg_namelen = 0;
+		rc = smc_rx_recvmsg(smc, msg, NULL, len, flags);
+	}
 
 out:
 	release_sock(sk);
@@ -1446,9 +1448,15 @@ static ssize_t smc_sendpage(struct socket *sock, struct page *page,
 	return rc;
 }
 
+/* Map the affected portions of the rmbe into an spd, note the number of bytes
+ * to splice in conn->splice_pending, and press 'go'. Delays consumer cursor
+ * updates till whenever a respective page has been fully processed.
+ * Note that subsequent recv() calls have to wait till all splice() processing
+ * completed.
+ */
 static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,
 			       struct pipe_inode_info *pipe, size_t len,
-				    unsigned int flags)
+			       unsigned int flags)
 {
 	struct sock *sk = sock->sk;
 	struct smc_sock *smc;
@@ -1456,16 +1464,34 @@ static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,
 
 	smc = smc_sk(sk);
 	lock_sock(sk);
-	if ((sk->sk_state != SMC_ACTIVE) && (sk->sk_state != SMC_CLOSED))
+
+	if (sk->sk_state == SMC_INIT ||
+	    sk->sk_state == SMC_LISTEN ||
+	    sk->sk_state == SMC_CLOSED)
+		goto out;
+
+	if (sk->sk_state == SMC_PEERFINCLOSEWAIT) {
+		rc = 0;
 		goto out;
+	}
+
 	if (smc->use_fallback) {
 		rc = smc->clcsock->ops->splice_read(smc->clcsock, ppos,
 						    pipe, len, flags);
 	} else {
-		rc = -EOPNOTSUPP;
+		if (*ppos) {
+			rc = -ESPIPE;
+			goto out;
+		}
+		if (flags & SPLICE_F_NONBLOCK)
+			flags = MSG_DONTWAIT;
+		else
+			flags = 0;
+		rc = smc_rx_recvmsg(smc, NULL, pipe, len, flags);
 	}
 out:
 	release_sock(sk);
+
 	return rc;
 }
 
diff --git a/net/smc/smc.h b/net/smc/smc.h
index 2405e889b93d..ec209cd48d42 100644
--- a/net/smc/smc.h
+++ b/net/smc/smc.h
@@ -164,6 +164,9 @@ struct smc_connection {
 	atomic_t		bytes_to_rcv;	/* arrived data,
 						 * not yet received
 						 */
+	atomic_t		splice_pending;	/* number of spliced bytes
+						 * pending processing
+						 */
 #ifndef KERNEL_HAS_ATOMIC64
 	spinlock_t		acurs_lock;	/* protect cursors */
 #endif
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index 7b64bee656e8..ed45569289f5 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -43,6 +43,116 @@ static void smc_rx_wake_up(struct sock *sk)
 	rcu_read_unlock();
 }
 
+/* Update consumer cursor
+ *   @conn   connection to update
+ *   @cons   consumer cursor
+ *   @len    number of Bytes consumed
+ */
+static void smc_rx_update_consumer(struct smc_connection *conn,
+				   union smc_host_cursor cons, size_t len)
+{
+	smc_curs_add(conn->rmbe_size, &cons, len);
+	smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn),
+		       conn);
+	/* send consumer cursor update if required */
+	/* similar to advertising new TCP rcv_wnd if required */
+	smc_tx_consumer_update(conn);
+}
+
+struct smc_spd_priv {
+	struct smc_sock *smc;
+	size_t		 len;
+};
+
+static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
+				    struct pipe_buffer *buf)
+{
+	struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
+	struct smc_sock *smc = priv->smc;
+	struct smc_connection *conn;
+	union smc_host_cursor cons;
+	struct sock *sk = &smc->sk;
+
+	if (sk->sk_state == SMC_CLOSED ||
+	    sk->sk_state == SMC_PEERFINCLOSEWAIT ||
+	    sk->sk_state == SMC_APPFINCLOSEWAIT)
+		goto out;
+	conn = &smc->conn;
+	lock_sock(sk);
+	smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
+		       conn);
+	smc_rx_update_consumer(conn, cons, priv->len);
+	release_sock(sk);
+	if (atomic_sub_and_test(priv->len, &conn->splice_pending))
+		smc_rx_wake_up(sk);
+out:
+	kfree(priv);
+	put_page(buf->page);
+	sock_put(sk);
+}
+
+static int smc_rx_pipe_buf_nosteal(struct pipe_inode_info *pipe,
+				   struct pipe_buffer *buf)
+{
+	return 1;
+}
+
+static const struct pipe_buf_operations smc_pipe_ops = {
+	.can_merge = 0,
+	.confirm = generic_pipe_buf_confirm,
+	.release = smc_rx_pipe_buf_release,
+	.steal = smc_rx_pipe_buf_nosteal,
+	.get = generic_pipe_buf_get
+};
+
+static void smc_rx_spd_release(struct splice_pipe_desc *spd,
+			       unsigned int i)
+{
+	put_page(spd->pages[i]);
+}
+
+static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
+			 struct smc_sock *smc)
+{
+	struct splice_pipe_desc spd;
+	struct partial_page partial;
+	struct smc_spd_priv *priv;
+	struct page *page;
+	int bytes;
+
+	page = virt_to_page(smc->conn.rmb_desc->cpu_addr);
+	priv = kzalloc(sizeof(*priv), GFP_KERNEL);
+	if (!priv)
+		return -ENOMEM;
+	priv->len = len;
+	priv->smc = smc;
+	partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
+	partial.len = len;
+	partial.private = (unsigned long)priv;
+
+	spd.nr_pages_max = 1;
+	spd.nr_pages = 1;
+	spd.pages = &page;
+	spd.partial = &partial;
+	spd.ops = &smc_pipe_ops;
+	spd.spd_release = smc_rx_spd_release;
+
+	bytes = splice_to_pipe(pipe, &spd);
+	if (bytes > 0) {
+		sock_hold(&smc->sk);
+		get_page(smc->conn.rmb_desc->pages);
+		atomic_add(bytes, &smc->conn.splice_pending);
+	}
+
+	return bytes;
+}
+
+static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
+{
+	return atomic_read(&conn->bytes_to_rcv) &&
+	       !atomic_read(&conn->splice_pending);
+}
+
 /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
  *   @smc    smc socket
  *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout
@@ -74,19 +184,25 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo,
 	return rc;
 }
 
-/* rcvbuf consumer: main API called by socket layer.
- * called under sk lock.
+/* smc_rx_recvmsg - receive data from RMBE
+ * @msg:	copy data to receive buffer
+ * @pipe:	copy data to pipe if set - indicates splice() call
+ *
+ * rcvbuf consumer: main API called by socket layer.
+ * Called under sk lock.
  */
-int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
-		   int flags)
+int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
+		   struct pipe_inode_info *pipe, size_t len, int flags)
 {
 	size_t copylen, read_done = 0, read_remaining = len;
 	size_t chunk_len, chunk_off, chunk_len_sum;
 	struct smc_connection *conn = &smc->conn;
+	int (*func)(struct smc_connection *conn);
 	union smc_host_cursor cons;
 	int readable, chunk;
 	char *rcvbuf_base;
 	struct sock *sk;
+	int splbytes;
 	long timeo;
 	int target;		/* Read at least these many bytes */
 	int rc;
@@ -102,12 +218,11 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
 
-	msg->msg_namelen = 0;
 	/* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
 	rcvbuf_base = conn->rmb_desc->cpu_addr;
 
 	do { /* while (read_remaining) */
-		if (read_done >= target)
+		if (read_done >= target || (pipe && read_done))
 			break;
 
 		if (atomic_read(&conn->bytes_to_rcv))
@@ -156,11 +271,24 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		/* initialize variables for 1st iteration of subsequent loop */
 		/* could be just 1 byte, even after waiting on data above */
 		readable = atomic_read(&conn->bytes_to_rcv);
+		splbytes = atomic_read(&conn->splice_pending);
+		if (!readable || (msg && splbytes)) {
+			if (splbytes)
+				func = smc_rx_data_available_and_no_splice_pend;
+			else
+				func = smc_rx_data_available;
+			smc_rx_wait(smc, &timeo, func);
+			continue;
+		}
+
 		/* not more than what user space asked for */
 		copylen = min_t(size_t, read_remaining, readable);
 		smc_curs_write(&cons,
 			       smc_curs_read(&conn->local_tx_ctrl.cons, conn),
 			       conn);
+		/* subsequent splice() calls pick up where previous left */
+		if (splbytes)
+			smc_curs_add(conn->rmbe_size, &cons, splbytes);
 		/* determine chunks where to read from rcvbuf */
 		/* either unwrapped case, or 1st chunk of wrapped case */
 		chunk_len = min_t(size_t,
@@ -170,9 +298,16 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 		smc_rmb_sync_sg_for_cpu(conn);
 		for (chunk = 0; chunk < 2; chunk++) {
 			if (!(flags & MSG_TRUNC)) {
-				rc = memcpy_to_msg(msg, rcvbuf_base + chunk_off,
-						   chunk_len);
-				if (rc) {
+				if (msg) {
+					rc = memcpy_to_msg(msg, rcvbuf_base +
+							   chunk_off,
+							   chunk_len);
+				} else {
+					rc = smc_rx_splice(pipe, rcvbuf_base +
+							chunk_off, chunk_len,
+							smc);
+				}
+				if (rc < 0) {
 					if (!read_done)
 						read_done = -EFAULT;
 					smc_rmb_sync_sg_for_device(conn);
@@ -193,18 +328,13 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 
 		/* update cursors */
 		if (!(flags & MSG_PEEK)) {
-			smc_curs_add(conn->rmbe_size, &cons, copylen);
 			/* increased in recv tasklet smc_cdc_msg_rcv() */
 			smp_mb__before_atomic();
 			atomic_sub(copylen, &conn->bytes_to_rcv);
 			/* guarantee 0 <= bytes_to_rcv <= rmbe_size */
 			smp_mb__after_atomic();
-			smc_curs_write(&conn->local_tx_ctrl.cons,
-				       smc_curs_read(&cons, conn),
-				       conn);
-			/* send consumer cursor update if required */
-			/* similar to advertising new TCP rcv_wnd if required */
-			smc_tx_consumer_update(conn);
+			if (msg)
+				smc_rx_update_consumer(conn, cons, copylen);
 		}
 	} while (read_remaining);
 out:
@@ -215,4 +345,5 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
 void smc_rx_init(struct smc_sock *smc)
 {
 	smc->sk.sk_data_ready = smc_rx_wake_up;
+	atomic_set(&smc->conn.splice_pending, 0);
 }
diff --git a/net/smc/smc_rx.h b/net/smc/smc_rx.h
index 8f9f00997641..db823c97d824 100644
--- a/net/smc/smc_rx.h
+++ b/net/smc/smc_rx.h
@@ -18,8 +18,9 @@
 #include "smc.h"
 
 void smc_rx_init(struct smc_sock *smc);
-int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
-		   int flags);
+
+int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
+		   struct pipe_inode_info *pipe, size_t len, int flags);
 int smc_rx_wait(struct smc_sock *smc, long *timeo,
 		int (*fcrit)(struct smc_connection *conn));
 static inline int smc_rx_data_available(struct smc_connection *conn)
@@ -27,5 +28,4 @@ static inline int smc_rx_data_available(struct smc_connection *conn)
 	return atomic_read(&conn->bytes_to_rcv);
 }
 
-
 #endif /* SMC_RX_H */
-- 
2.13.5

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [PATCH net-next 0/4] net/smc: splice implementation
  2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
                   ` (3 preceding siblings ...)
  2018-05-03 16:12 ` [PATCH net-next 4/4] smc: add support for splice() Ursula Braun
@ 2018-05-03 20:31 ` David Miller
  4 siblings, 0 replies; 6+ messages in thread
From: David Miller @ 2018-05-03 20:31 UTC (permalink / raw)
  To: ubraun; +Cc: netdev, linux-s390, schwidefsky, heiko.carstens, raspl

From: Ursula Braun <ubraun@linux.ibm.com>
Date: Thu,  3 May 2018 18:12:35 +0200

> From: Ursula Braun <ursula.braun@de.ibm.com>
> 
> Dave,
> 
> Stefan comes up with an smc implementation for splice(). The first
> three patches are preparational patches, the 4th patch implements
> splice().

Doesn't look too bad :)

Series applied, thanks.

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2018-05-03 20:31 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-05-03 16:12 [PATCH net-next 0/4] net/smc: splice implementation Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 1/4] smc: simplify abort logic Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 2/4] smc: make smc_rx_wait_data() generic Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 3/4] smc: allocate RMBs as compound pages Ursula Braun
2018-05-03 16:12 ` [PATCH net-next 4/4] smc: add support for splice() Ursula Braun
2018-05-03 20:31 ` [PATCH net-next 0/4] net/smc: splice implementation David Miller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).