All of lore.kernel.org
 help / color / mirror / Atom feed
From: Sagi Grimberg <sagig@dev.mellanox.co.il>
To: Chuck Lever <chuck.lever@oracle.com>,
	linux-rdma@vger.kernel.org, linux-nfs@vger.kernel.org
Subject: Re: [PATCH v1 05/18] xprtrdma: Replace send and receive arrays
Date: Sun, 20 Sep 2015 13:52:30 +0300	[thread overview]
Message-ID: <55FE8FEE.1010006@dev.mellanox.co.il> (raw)
In-Reply-To: <20150917204452.19671.66113.stgit@manet.1015granger.net>

On 9/17/2015 11:44 PM, Chuck Lever wrote:
> The rb_send_bufs and rb_recv_bufs arrays are used to implement a
> pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
> structs. Replace those arrays with free lists.
>
> To allow more than 512 RPCs in-flight at once, each of these arrays
> would be larger than a page (assuming 8-byte addresses and 4KB
> pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
> buffer array would have to be 128 pages. That's an order-6
> allocation. (Not that we're going there.)
>
> A list is easier to expand dynamically. Instead of allocating a
> larger array of pointers and copying the existing pointers to the
> new array, simply append more buffers to each list.
>
> This also makes it simpler to manage receive buffers that might
> catch backwards-direction calls, or to post receive buffers in
> bulk to amortize the overhead of ib_post_recv.
>
> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>

Hi Chuck,

I get the idea of this patch, but it is a bit confusing (to a
non-educated reader).

Can you explain why sometimes you call put/get_locked routines
and sometimes you open-code them? And is it mandatory to have
the callers lock before calling get/put? Perhaps the code would
be simpler if the get/put routines would take care of locking
since rb_lock looks dedicated to them.

> ---
>   net/sunrpc/xprtrdma/verbs.c     |  141 +++++++++++++++++----------------------
>   net/sunrpc/xprtrdma/xprt_rdma.h |    9 +-
>   2 files changed, 66 insertions(+), 84 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index ac1345b..8d99214 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -962,44 +962,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>   {
>   	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
>   	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> -	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
> -	char *p;
> -	size_t len;
>   	int i, rc;
>
> -	buf->rb_max_requests = cdata->max_requests;
> +	buf->rb_max_requests = r_xprt->rx_data.max_requests;
>   	spin_lock_init(&buf->rb_lock);
>
> -	/* Need to allocate:
> -	 *   1.  arrays for send and recv pointers
> -	 *   2.  arrays of struct rpcrdma_req to fill in pointers
> -	 *   3.  array of struct rpcrdma_rep for replies
> -	 * Send/recv buffers in req/rep need to be registered
> -	 */
> -	len = buf->rb_max_requests *
> -		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
> -
> -	p = kzalloc(len, GFP_KERNEL);
> -	if (p == NULL) {
> -		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
> -			__func__, len);
> -		rc = -ENOMEM;
> -		goto out;
> -	}
> -	buf->rb_pool = p;	/* for freeing it later */
> -
> -	buf->rb_send_bufs = (struct rpcrdma_req **) p;
> -	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
> -	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
> -	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
> -
>   	rc = ia->ri_ops->ro_init(r_xprt);
>   	if (rc)
>   		goto out;
>
> +	INIT_LIST_HEAD(&buf->rb_send_bufs);
>   	for (i = 0; i < buf->rb_max_requests; i++) {
>   		struct rpcrdma_req *req;
> -		struct rpcrdma_rep *rep;
>
>   		req = rpcrdma_create_req(r_xprt);
>   		if (IS_ERR(req)) {
> @@ -1008,7 +982,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>   			rc = PTR_ERR(req);
>   			goto out;
>   		}
> -		buf->rb_send_bufs[i] = req;
> +		list_add(&req->rl_free, &buf->rb_send_bufs);
> +	}
> +
> +	INIT_LIST_HEAD(&buf->rb_recv_bufs);
> +	for (i = 0; i < buf->rb_max_requests + 2; i++) {
> +		struct rpcrdma_rep *rep;
>
>   		rep = rpcrdma_create_rep(r_xprt);
>   		if (IS_ERR(rep)) {
> @@ -1017,7 +996,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>   			rc = PTR_ERR(rep);
>   			goto out;
>   		}
> -		buf->rb_recv_bufs[i] = rep;
> +		list_add(&rep->rr_list, &buf->rb_recv_bufs);
>   	}
>
>   	return 0;
> @@ -1051,25 +1030,26 @@ void
>   rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
>   {
>   	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
> -	int i;
>
> -	/* clean up in reverse order from create
> -	 *   1.  recv mr memory (mr free, then kfree)
> -	 *   2.  send mr memory (mr free, then kfree)
> -	 *   3.  MWs
> -	 */
> -	dprintk("RPC:       %s: entering\n", __func__);
> +	while (!list_empty(&buf->rb_recv_bufs)) {
> +		struct rpcrdma_rep *rep = list_entry(buf->rb_recv_bufs.next,
> +						     struct rpcrdma_rep,
> +						     rr_list);
>
> -	for (i = 0; i < buf->rb_max_requests; i++) {
> -		if (buf->rb_recv_bufs)
> -			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
> -		if (buf->rb_send_bufs)
> -			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
> +		list_del(&rep->rr_list);
> +		rpcrdma_destroy_rep(ia, rep);
>   	}
>
> -	ia->ri_ops->ro_destroy(buf);
> +	while (!list_empty(&buf->rb_send_bufs)) {
> +		struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
> +						     struct rpcrdma_req,
> +						     rl_free);
>
> -	kfree(buf->rb_pool);
> +		list_del(&req->rl_free);
> +		rpcrdma_destroy_req(ia, req);
> +	}
> +
> +	ia->ri_ops->ro_destroy(buf);
>   }
>
>   struct rpcrdma_mw *
> @@ -1102,24 +1082,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
>   }
>
>   static void
> -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
> +rpcrdma_buffer_put_locked(struct rpcrdma_rep *rep, struct rpcrdma_buffer *buf)
>   {
> -	buf->rb_send_bufs[--buf->rb_send_index] = req;
> -	req->rl_niovs = 0;
> -	if (req->rl_reply) {
> -		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
> -		req->rl_reply = NULL;
> -	}
> +	list_add_tail(&rep->rr_list, &buf->rb_recv_bufs);
> +}
> +
> +static struct rpcrdma_rep *
> +rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
> +{
> +	struct rpcrdma_rep *rep;
> +
> +	rep = list_first_entry(&buf->rb_recv_bufs,
> +			       struct rpcrdma_rep, rr_list);
> +	list_del(&rep->rr_list);
> +
> +	return rep;
>   }

There seems to be a distinction between send/recv buffers. Would it
make sense to have a symmetric handling for both send/recv buffers?

>
>   /*
>    * Get a set of request/reply buffers.
>    *
> - * Reply buffer (if needed) is attached to send buffer upon return.
> - * Rule:
> - *    rb_send_index and rb_recv_index MUST always be pointing to the
> - *    *next* available buffer (non-NULL). They are incremented after
> - *    removing buffers, and decremented *before* returning them.
> + * Reply buffer (if available) is attached to send buffer upon return.
>    */
>   struct rpcrdma_req *
>   rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
> @@ -1129,25 +1112,22 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>
>   	spin_lock_irqsave(&buffers->rb_lock, flags);
>
> -	if (buffers->rb_send_index == buffers->rb_max_requests) {
> +	if (list_empty(&buffers->rb_send_bufs)) {
>   		spin_unlock_irqrestore(&buffers->rb_lock, flags);
> -		dprintk("RPC:       %s: out of request buffers\n", __func__);
> -		return ((struct rpcrdma_req *)NULL);
> -	}
> -
> -	req = buffers->rb_send_bufs[buffers->rb_send_index];
> -	if (buffers->rb_send_index < buffers->rb_recv_index) {
> -		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
> -			__func__,
> -			buffers->rb_recv_index - buffers->rb_send_index);
> -		req->rl_reply = NULL;
> -	} else {
> -		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
> -		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
> +		pr_warn("RPC:       %s: out of request buffers\n", __func__);
> +		return NULL;
>   	}
> -	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
> +	req = list_first_entry(&buffers->rb_send_bufs,
> +			       struct rpcrdma_req, rl_free);
> +	list_del(&req->rl_free);
>
> +	req->rl_reply = NULL;
> +	if (!list_empty(&buffers->rb_recv_bufs))
> +		req->rl_reply = rpcrdma_buffer_get_locked(buffers);

Would it make sense to check !list_empty() inside _get_locked and handle
a possible NULL return?

>   	spin_unlock_irqrestore(&buffers->rb_lock, flags);
> +
> +	if (!req->rl_reply)
> +		pr_warn("RPC:       %s: out of reply buffers\n", __func__);
>   	return req;
>   }
>
> @@ -1159,17 +1139,22 @@ void
>   rpcrdma_buffer_put(struct rpcrdma_req *req)
>   {
>   	struct rpcrdma_buffer *buffers = req->rl_buffer;
> +	struct rpcrdma_rep *rep = req->rl_reply;
>   	unsigned long flags;
>
> +	req->rl_niovs = 0;
> +	req->rl_reply = NULL;
> +
>   	spin_lock_irqsave(&buffers->rb_lock, flags);
> -	rpcrdma_buffer_put_sendbuf(req, buffers);
> +	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
> +	if (rep)
> +		rpcrdma_buffer_put_locked(rep, buffers);
>   	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>   }
>
>   /*
>    * Recover reply buffers from pool.
> - * This happens when recovering from error conditions.
> - * Post-increment counter/array index.
> + * This happens when recovering from disconnect.
>    */
>   void
>   rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
> @@ -1178,10 +1163,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
>   	unsigned long flags;
>
>   	spin_lock_irqsave(&buffers->rb_lock, flags);
> -	if (buffers->rb_recv_index < buffers->rb_max_requests) {
> -		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
> -		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
> -	}
> +	if (!list_empty(&buffers->rb_recv_bufs))
> +		req->rl_reply = rpcrdma_buffer_get_locked(buffers);
>   	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>   }
>
> @@ -1196,7 +1179,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
>   	unsigned long flags;
>
>   	spin_lock_irqsave(&buffers->rb_lock, flags);
> -	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
> +	rpcrdma_buffer_put_locked(rep, buffers);
>   	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>   }
>
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index a13508b..e6a358f 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */
>   #define RPCRDMA_MAX_IOVS	(2)
>
>   struct rpcrdma_req {
> +	struct list_head	rl_free;
>   	unsigned int		rl_niovs;
>   	unsigned int		rl_nchunks;
>   	unsigned int		rl_connect_cookie;
> @@ -285,12 +286,10 @@ struct rpcrdma_buffer {
>   	struct list_head	rb_all;
>   	char			*rb_pool;
>
> -	spinlock_t		rb_lock;	/* protect buf arrays */
> +	spinlock_t		rb_lock;	/* protect buf lists */
> +	struct list_head	rb_send_bufs;
> +	struct list_head	rb_recv_bufs;
>   	u32			rb_max_requests;
> -	int			rb_send_index;
> -	int			rb_recv_index;
> -	struct rpcrdma_req	**rb_send_bufs;
> -	struct rpcrdma_rep	**rb_recv_bufs;
>   };
>   #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>


WARNING: multiple messages have this Message-ID (diff)
From: Sagi Grimberg <sagig-LDSdmyG8hGV8YrgS2mwiifqBs+8SCbDb@public.gmane.org>
To: Chuck Lever <chuck.lever-QHcLZuEGTsvQT0dZR+AlfA@public.gmane.org>,
	linux-rdma-u79uwXL29TY76Z2rM5mHXA@public.gmane.org,
	linux-nfs-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
Subject: Re: [PATCH v1 05/18] xprtrdma: Replace send and receive arrays
Date: Sun, 20 Sep 2015 13:52:30 +0300	[thread overview]
Message-ID: <55FE8FEE.1010006@dev.mellanox.co.il> (raw)
In-Reply-To: <20150917204452.19671.66113.stgit-FYjufvaPoItvLzlybtyyYzGyq/o6K9yX@public.gmane.org>

On 9/17/2015 11:44 PM, Chuck Lever wrote:
> The rb_send_bufs and rb_recv_bufs arrays are used to implement a
> pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
> structs. Replace those arrays with free lists.
>
> To allow more than 512 RPCs in-flight at once, each of these arrays
> would be larger than a page (assuming 8-byte addresses and 4KB
> pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
> buffer array would have to be 128 pages. That's an order-6
> allocation. (Not that we're going there.)
>
> A list is easier to expand dynamically. Instead of allocating a
> larger array of pointers and copying the existing pointers to the
> new array, simply append more buffers to each list.
>
> This also makes it simpler to manage receive buffers that might
> catch backwards-direction calls, or to post receive buffers in
> bulk to amortize the overhead of ib_post_recv.
>
> Signed-off-by: Chuck Lever <chuck.lever-QHcLZuEGTsvQT0dZR+AlfA@public.gmane.org>

Hi Chuck,

I get the idea of this patch, but it is a bit confusing (to a
non-educated reader).

Can you explain why sometimes you call put/get_locked routines
and sometimes you open-code them? And is it mandatory to have
the callers lock before calling get/put? Perhaps the code would
be simpler if the get/put routines would take care of locking
since rb_lock looks dedicated to them.

> ---
>   net/sunrpc/xprtrdma/verbs.c     |  141 +++++++++++++++++----------------------
>   net/sunrpc/xprtrdma/xprt_rdma.h |    9 +-
>   2 files changed, 66 insertions(+), 84 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index ac1345b..8d99214 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -962,44 +962,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>   {
>   	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
>   	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> -	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
> -	char *p;
> -	size_t len;
>   	int i, rc;
>
> -	buf->rb_max_requests = cdata->max_requests;
> +	buf->rb_max_requests = r_xprt->rx_data.max_requests;
>   	spin_lock_init(&buf->rb_lock);
>
> -	/* Need to allocate:
> -	 *   1.  arrays for send and recv pointers
> -	 *   2.  arrays of struct rpcrdma_req to fill in pointers
> -	 *   3.  array of struct rpcrdma_rep for replies
> -	 * Send/recv buffers in req/rep need to be registered
> -	 */
> -	len = buf->rb_max_requests *
> -		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
> -
> -	p = kzalloc(len, GFP_KERNEL);
> -	if (p == NULL) {
> -		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
> -			__func__, len);
> -		rc = -ENOMEM;
> -		goto out;
> -	}
> -	buf->rb_pool = p;	/* for freeing it later */
> -
> -	buf->rb_send_bufs = (struct rpcrdma_req **) p;
> -	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
> -	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
> -	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
> -
>   	rc = ia->ri_ops->ro_init(r_xprt);
>   	if (rc)
>   		goto out;
>
> +	INIT_LIST_HEAD(&buf->rb_send_bufs);
>   	for (i = 0; i < buf->rb_max_requests; i++) {
>   		struct rpcrdma_req *req;
> -		struct rpcrdma_rep *rep;
>
>   		req = rpcrdma_create_req(r_xprt);
>   		if (IS_ERR(req)) {
> @@ -1008,7 +982,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>   			rc = PTR_ERR(req);
>   			goto out;
>   		}
> -		buf->rb_send_bufs[i] = req;
> +		list_add(&req->rl_free, &buf->rb_send_bufs);
> +	}
> +
> +	INIT_LIST_HEAD(&buf->rb_recv_bufs);
> +	for (i = 0; i < buf->rb_max_requests + 2; i++) {
> +		struct rpcrdma_rep *rep;
>
>   		rep = rpcrdma_create_rep(r_xprt);
>   		if (IS_ERR(rep)) {
> @@ -1017,7 +996,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>   			rc = PTR_ERR(rep);
>   			goto out;
>   		}
> -		buf->rb_recv_bufs[i] = rep;
> +		list_add(&rep->rr_list, &buf->rb_recv_bufs);
>   	}
>
>   	return 0;
> @@ -1051,25 +1030,26 @@ void
>   rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
>   {
>   	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
> -	int i;
>
> -	/* clean up in reverse order from create
> -	 *   1.  recv mr memory (mr free, then kfree)
> -	 *   2.  send mr memory (mr free, then kfree)
> -	 *   3.  MWs
> -	 */
> -	dprintk("RPC:       %s: entering\n", __func__);
> +	while (!list_empty(&buf->rb_recv_bufs)) {
> +		struct rpcrdma_rep *rep = list_entry(buf->rb_recv_bufs.next,
> +						     struct rpcrdma_rep,
> +						     rr_list);
>
> -	for (i = 0; i < buf->rb_max_requests; i++) {
> -		if (buf->rb_recv_bufs)
> -			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
> -		if (buf->rb_send_bufs)
> -			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
> +		list_del(&rep->rr_list);
> +		rpcrdma_destroy_rep(ia, rep);
>   	}
>
> -	ia->ri_ops->ro_destroy(buf);
> +	while (!list_empty(&buf->rb_send_bufs)) {
> +		struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
> +						     struct rpcrdma_req,
> +						     rl_free);
>
> -	kfree(buf->rb_pool);
> +		list_del(&req->rl_free);
> +		rpcrdma_destroy_req(ia, req);
> +	}
> +
> +	ia->ri_ops->ro_destroy(buf);
>   }
>
>   struct rpcrdma_mw *
> @@ -1102,24 +1082,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
>   }
>
>   static void
> -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
> +rpcrdma_buffer_put_locked(struct rpcrdma_rep *rep, struct rpcrdma_buffer *buf)
>   {
> -	buf->rb_send_bufs[--buf->rb_send_index] = req;
> -	req->rl_niovs = 0;
> -	if (req->rl_reply) {
> -		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
> -		req->rl_reply = NULL;
> -	}
> +	list_add_tail(&rep->rr_list, &buf->rb_recv_bufs);
> +}
> +
> +static struct rpcrdma_rep *
> +rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
> +{
> +	struct rpcrdma_rep *rep;
> +
> +	rep = list_first_entry(&buf->rb_recv_bufs,
> +			       struct rpcrdma_rep, rr_list);
> +	list_del(&rep->rr_list);
> +
> +	return rep;
>   }

There seems to be a distinction between send/recv buffers. Would it
make sense to have a symmetric handling for both send/recv buffers?

>
>   /*
>    * Get a set of request/reply buffers.
>    *
> - * Reply buffer (if needed) is attached to send buffer upon return.
> - * Rule:
> - *    rb_send_index and rb_recv_index MUST always be pointing to the
> - *    *next* available buffer (non-NULL). They are incremented after
> - *    removing buffers, and decremented *before* returning them.
> + * Reply buffer (if available) is attached to send buffer upon return.
>    */
>   struct rpcrdma_req *
>   rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
> @@ -1129,25 +1112,22 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>
>   	spin_lock_irqsave(&buffers->rb_lock, flags);
>
> -	if (buffers->rb_send_index == buffers->rb_max_requests) {
> +	if (list_empty(&buffers->rb_send_bufs)) {
>   		spin_unlock_irqrestore(&buffers->rb_lock, flags);
> -		dprintk("RPC:       %s: out of request buffers\n", __func__);
> -		return ((struct rpcrdma_req *)NULL);
> -	}
> -
> -	req = buffers->rb_send_bufs[buffers->rb_send_index];
> -	if (buffers->rb_send_index < buffers->rb_recv_index) {
> -		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
> -			__func__,
> -			buffers->rb_recv_index - buffers->rb_send_index);
> -		req->rl_reply = NULL;
> -	} else {
> -		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
> -		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
> +		pr_warn("RPC:       %s: out of request buffers\n", __func__);
> +		return NULL;
>   	}
> -	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
> +	req = list_first_entry(&buffers->rb_send_bufs,
> +			       struct rpcrdma_req, rl_free);
> +	list_del(&req->rl_free);
>
> +	req->rl_reply = NULL;
> +	if (!list_empty(&buffers->rb_recv_bufs))
> +		req->rl_reply = rpcrdma_buffer_get_locked(buffers);

Would it make sense to check !list_empty() inside _get_locked and handle
a possible NULL return?

>   	spin_unlock_irqrestore(&buffers->rb_lock, flags);
> +
> +	if (!req->rl_reply)
> +		pr_warn("RPC:       %s: out of reply buffers\n", __func__);
>   	return req;
>   }
>
> @@ -1159,17 +1139,22 @@ void
>   rpcrdma_buffer_put(struct rpcrdma_req *req)
>   {
>   	struct rpcrdma_buffer *buffers = req->rl_buffer;
> +	struct rpcrdma_rep *rep = req->rl_reply;
>   	unsigned long flags;
>
> +	req->rl_niovs = 0;
> +	req->rl_reply = NULL;
> +
>   	spin_lock_irqsave(&buffers->rb_lock, flags);
> -	rpcrdma_buffer_put_sendbuf(req, buffers);
> +	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
> +	if (rep)
> +		rpcrdma_buffer_put_locked(rep, buffers);
>   	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>   }
>
>   /*
>    * Recover reply buffers from pool.
> - * This happens when recovering from error conditions.
> - * Post-increment counter/array index.
> + * This happens when recovering from disconnect.
>    */
>   void
>   rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
> @@ -1178,10 +1163,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
>   	unsigned long flags;
>
>   	spin_lock_irqsave(&buffers->rb_lock, flags);
> -	if (buffers->rb_recv_index < buffers->rb_max_requests) {
> -		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
> -		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
> -	}
> +	if (!list_empty(&buffers->rb_recv_bufs))
> +		req->rl_reply = rpcrdma_buffer_get_locked(buffers);
>   	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>   }
>
> @@ -1196,7 +1179,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
>   	unsigned long flags;
>
>   	spin_lock_irqsave(&buffers->rb_lock, flags);
> -	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
> +	rpcrdma_buffer_put_locked(rep, buffers);
>   	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>   }
>
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index a13508b..e6a358f 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */
>   #define RPCRDMA_MAX_IOVS	(2)
>
>   struct rpcrdma_req {
> +	struct list_head	rl_free;
>   	unsigned int		rl_niovs;
>   	unsigned int		rl_nchunks;
>   	unsigned int		rl_connect_cookie;
> @@ -285,12 +286,10 @@ struct rpcrdma_buffer {
>   	struct list_head	rb_all;
>   	char			*rb_pool;
>
> -	spinlock_t		rb_lock;	/* protect buf arrays */
> +	spinlock_t		rb_lock;	/* protect buf lists */
> +	struct list_head	rb_send_bufs;
> +	struct list_head	rb_recv_bufs;
>   	u32			rb_max_requests;
> -	int			rb_send_index;
> -	int			rb_recv_index;
> -	struct rpcrdma_req	**rb_send_bufs;
> -	struct rpcrdma_rep	**rb_recv_bufs;
>   };
>   #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo-u79uwXL29TY76Z2rM5mHXA@public.gmane.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

  reply	other threads:[~2015-09-20 10:52 UTC|newest]

Thread overview: 84+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-09-17 20:44 [PATCH v1 00/18] RFC NFS/RDMA patches for merging into v4.4 Chuck Lever
2015-09-17 20:44 ` Chuck Lever
2015-09-17 20:44 ` [PATCH v1 01/18] xprtrdma: Enable swap-on-NFS/RDMA Chuck Lever
2015-09-17 20:44   ` Chuck Lever
2015-09-21  8:58   ` Devesh Sharma
2015-09-21  8:58     ` Devesh Sharma
2015-09-17 20:44 ` [PATCH v1 02/18] xprtrdma: Replace global lkey with lkey local to PD Chuck Lever
2015-09-17 20:44   ` Chuck Lever
2015-09-21  8:59   ` Devesh Sharma
2015-09-21  8:59     ` Devesh Sharma
2015-09-17 20:44 ` [PATCH v1 03/18] xprtrdma: Remove completion polling budgets Chuck Lever
2015-09-17 20:44   ` Chuck Lever
2015-09-18  6:52   ` Devesh Sharma
2015-09-18  6:52     ` Devesh Sharma
2015-09-18 14:19     ` Chuck Lever
2015-09-18 14:19       ` Chuck Lever
2015-09-20 10:35       ` Sagi Grimberg
2015-09-20 10:35         ` Sagi Grimberg
2015-09-21  8:51         ` Devesh Sharma
2015-09-21  8:51           ` Devesh Sharma
2015-09-21 15:45           ` Chuck Lever
2015-09-21 15:45             ` Chuck Lever
2015-09-22 17:32             ` Devesh Sharma
2015-09-22 17:32               ` Devesh Sharma
2015-10-01 16:37               ` Chuck Lever
2015-10-01 16:37                 ` Chuck Lever
2015-10-01 17:13                 ` Jason Gunthorpe
2015-10-01 17:13                   ` Jason Gunthorpe
2015-10-01 17:36                   ` Chuck Lever
2015-10-01 17:36                     ` Chuck Lever
2015-10-01 18:15                     ` Jason Gunthorpe
2015-10-01 18:15                       ` Jason Gunthorpe
2015-10-01 18:31                       ` Chuck Lever
2015-10-01 18:31                         ` Chuck Lever
2015-10-01 18:38                         ` Jason Gunthorpe
2015-10-01 18:38                           ` Jason Gunthorpe
2015-09-21  8:51       ` Devesh Sharma
2015-09-21  8:51         ` Devesh Sharma
2015-09-17 20:44 ` [PATCH v1 04/18] xprtrdma: Refactor reply handler error handling Chuck Lever
2015-09-17 20:44   ` Chuck Lever
2015-09-21  8:59   ` Devesh Sharma
2015-09-21  8:59     ` Devesh Sharma
2015-09-17 20:44 ` [PATCH v1 05/18] xprtrdma: Replace send and receive arrays Chuck Lever
2015-09-17 20:44   ` Chuck Lever
2015-09-20 10:52   ` Sagi Grimberg [this message]
2015-09-20 10:52     ` Sagi Grimberg
2015-09-21 23:04     ` Chuck Lever
2015-09-21 23:04       ` Chuck Lever
2015-09-17 20:45 ` [PATCH v1 06/18] SUNRPC: Abstract backchannel operations Chuck Lever
2015-09-17 20:45   ` Chuck Lever
2015-09-17 20:45 ` [PATCH v1 07/18] xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers Chuck Lever
2015-09-17 20:45   ` Chuck Lever
2015-09-21 10:28   ` Devesh Sharma
2015-09-21 10:28     ` Devesh Sharma
2015-09-17 20:45 ` [PATCH v1 08/18] xprtrdma: Pre-allocate Work Requests for backchannel Chuck Lever
2015-09-17 20:45   ` Chuck Lever
2015-09-21 10:33   ` Devesh Sharma
2015-09-21 10:33     ` Devesh Sharma
2015-09-21 22:41     ` Chuck Lever
2015-09-21 22:41       ` Chuck Lever
2015-09-17 20:45 ` [PATCH v1 09/18] xprtrdma: Add support for sending backward direction RPC replies Chuck Lever
2015-09-17 20:45   ` Chuck Lever
2015-09-17 20:45 ` [PATCH v1 10/18] xprtrdma: Handle incoming backward direction RPC calls Chuck Lever
2015-09-17 20:45   ` Chuck Lever
2015-09-17 20:45 ` [PATCH v1 11/18] svcrdma: Add backward direction service for RPC/RDMA transport Chuck Lever
2015-09-17 20:45   ` Chuck Lever
2015-09-17 20:45 ` [PATCH v1 12/18] SUNRPC: Remove the TCP-only restriction in bc_svc_process() Chuck Lever
2015-09-17 20:45   ` Chuck Lever
2015-09-17 20:45 ` [PATCH v1 13/18] NFS: Enable client side NFSv4.1 backchannel to use other transports Chuck Lever
2015-09-17 20:45   ` Chuck Lever
2015-09-17 20:46 ` [PATCH v1 14/18] svcrdma: Define maximum number of backchannel requests Chuck Lever
2015-09-17 20:46   ` Chuck Lever
2015-09-17 20:46 ` [PATCH v1 15/18] svcrdma: Add svc_rdma_get_context() API that is allowed to fail Chuck Lever
2015-09-17 20:46   ` Chuck Lever
2015-09-20 12:40   ` Sagi Grimberg
2015-09-20 12:40     ` Sagi Grimberg
2015-09-21 22:34     ` Chuck Lever
2015-09-21 22:34       ` Chuck Lever
2015-09-17 20:46 ` [PATCH v1 16/18] svcrdma: Add infrastructure to send backwards direction RPC/RDMA calls Chuck Lever
2015-09-17 20:46   ` Chuck Lever
2015-09-17 20:46 ` [PATCH v1 17/18] svcrdma: Add infrastructure to receive backwards direction RPC/RDMA replies Chuck Lever
2015-09-17 20:46   ` Chuck Lever
2015-09-17 20:46 ` [PATCH v1 18/18] xprtrdma: Add class for RDMA backwards direction transport Chuck Lever
2015-09-17 20:46   ` Chuck Lever

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=55FE8FEE.1010006@dev.mellanox.co.il \
    --to=sagig@dev.mellanox.co.il \
    --cc=chuck.lever@oracle.com \
    --cc=linux-nfs@vger.kernel.org \
    --cc=linux-rdma@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.