All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH for-next v2 0/2] Fix memory ordering errors in queues
@ 2021-05-26  4:51 Bob Pearson
  2021-05-26  4:51 ` [PATCH for-next v2 1/2] RDMA/rxe: Add a type flag to rxe_queue structs Bob Pearson
  2021-05-26  4:51 ` [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores Bob Pearson
  0 siblings, 2 replies; 6+ messages in thread
From: Bob Pearson @ 2021-05-26  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, linux-rdma; +Cc: Bob Pearson

These two patches optimize the memory ordering in rxe_queue.h so
that user space and not kernel space indices are protected for loads
with smp_load_acquire() and stores with smp_store_release(). The
original implementation of this did not apply to all index references
which has recently caused test case errors traced to stale memory loads.
These patches fix those errors.

Reported-by: Zhu Yanjun <zyjzyj2000@gmail.com>
Fixes: d21a1240f516 ("RDMA/rxe: Use acquire/release for memory ordering")
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
v2:
  Added a way to distinguish between user and kernel indices.
---
Bob Pearson (2):
  RDMA/rxe: Add a type flag to rxe_queue structs
  RDMA/rxe: Protect user space index loads/stores

 drivers/infiniband/sw/rxe/rxe_cq.c    |   4 +-
 drivers/infiniband/sw/rxe/rxe_qp.c    |  12 +-
 drivers/infiniband/sw/rxe/rxe_queue.c |   8 +-
 drivers/infiniband/sw/rxe/rxe_queue.h | 181 ++++++++++++++++++--------
 drivers/infiniband/sw/rxe/rxe_srq.c   |   4 +-
 5 files changed, 145 insertions(+), 64 deletions(-)

-- 
2.30.2


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [PATCH for-next v2 1/2] RDMA/rxe: Add a type flag to rxe_queue structs
  2021-05-26  4:51 [PATCH for-next v2 0/2] Fix memory ordering errors in queues Bob Pearson
@ 2021-05-26  4:51 ` Bob Pearson
  2021-05-26  4:51 ` [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores Bob Pearson
  1 sibling, 0 replies; 6+ messages in thread
From: Bob Pearson @ 2021-05-26  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, linux-rdma; +Cc: Bob Pearson

To create optimal code only want to use smp_load_acquire() and
smp_store_release() for user indices in rxe_queue APIs since
kernel indices are protected by locks which also act as memory
barriers. By adding a type to the queues we can determine which
indices need to be protected.

Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
v2:
  This patch added in v2 to add the type field.
---
 drivers/infiniband/sw/rxe/rxe_cq.c    |  4 +++-
 drivers/infiniband/sw/rxe/rxe_qp.c    | 12 ++++++++----
 drivers/infiniband/sw/rxe/rxe_queue.c |  8 ++++----
 drivers/infiniband/sw/rxe/rxe_queue.h | 13 ++++++++++---
 drivers/infiniband/sw/rxe/rxe_srq.c   |  4 +++-
 5 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_cq.c b/drivers/infiniband/sw/rxe/rxe_cq.c
index b315ebf041ac..1d4d8a31bc12 100644
--- a/drivers/infiniband/sw/rxe/rxe_cq.c
+++ b/drivers/infiniband/sw/rxe/rxe_cq.c
@@ -59,9 +59,11 @@ int rxe_cq_from_init(struct rxe_dev *rxe, struct rxe_cq *cq, int cqe,
 		     struct rxe_create_cq_resp __user *uresp)
 {
 	int err;
+	enum queue_type type;
 
+	type = uresp ? QUEUE_TYPE_TO_USER : QUEUE_TYPE_KERNEL;
 	cq->queue = rxe_queue_init(rxe, &cqe,
-				   sizeof(struct rxe_cqe));
+			sizeof(struct rxe_cqe), type);
 	if (!cq->queue) {
 		pr_warn("unable to create cq\n");
 		return -ENOMEM;
diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
index 34ae957a315c..9bd6bf8f9bd9 100644
--- a/drivers/infiniband/sw/rxe/rxe_qp.c
+++ b/drivers/infiniband/sw/rxe/rxe_qp.c
@@ -206,6 +206,7 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
 {
 	int err;
 	int wqe_size;
+	enum queue_type type;
 
 	err = sock_create_kern(&init_net, AF_INET, SOCK_DGRAM, 0, &qp->sk);
 	if (err < 0)
@@ -231,7 +232,9 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
 	qp->sq.max_inline = init->cap.max_inline_data = wqe_size;
 	wqe_size += sizeof(struct rxe_send_wqe);
 
-	qp->sq.queue = rxe_queue_init(rxe, &qp->sq.max_wr, wqe_size);
+	type = uresp ? QUEUE_TYPE_FROM_USER : QUEUE_TYPE_KERNEL;
+	qp->sq.queue = rxe_queue_init(rxe, &qp->sq.max_wr,
+				wqe_size, type);
 	if (!qp->sq.queue)
 		return -ENOMEM;
 
@@ -273,6 +276,7 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
 {
 	int err;
 	int wqe_size;
+	enum queue_type type;
 
 	if (!qp->srq) {
 		qp->rq.max_wr		= init->cap.max_recv_wr;
@@ -283,9 +287,9 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
 		pr_debug("qp#%d max_wr = %d, max_sge = %d, wqe_size = %d\n",
 			 qp_num(qp), qp->rq.max_wr, qp->rq.max_sge, wqe_size);
 
-		qp->rq.queue = rxe_queue_init(rxe,
-					      &qp->rq.max_wr,
-					      wqe_size);
+		type = uresp ? QUEUE_TYPE_FROM_USER : QUEUE_TYPE_KERNEL;
+		qp->rq.queue = rxe_queue_init(rxe, &qp->rq.max_wr,
+					wqe_size, type);
 		if (!qp->rq.queue)
 			return -ENOMEM;
 
diff --git a/drivers/infiniband/sw/rxe/rxe_queue.c b/drivers/infiniband/sw/rxe/rxe_queue.c
index fa69241b1187..8f844d0b9e77 100644
--- a/drivers/infiniband/sw/rxe/rxe_queue.c
+++ b/drivers/infiniband/sw/rxe/rxe_queue.c
@@ -52,9 +52,8 @@ inline void rxe_queue_reset(struct rxe_queue *q)
 	memset(q->buf->data, 0, q->buf_size - sizeof(struct rxe_queue_buf));
 }
 
-struct rxe_queue *rxe_queue_init(struct rxe_dev *rxe,
-				 int *num_elem,
-				 unsigned int elem_size)
+struct rxe_queue *rxe_queue_init(struct rxe_dev *rxe, int *num_elem,
+			unsigned int elem_size, enum queue_type type)
 {
 	struct rxe_queue *q;
 	size_t buf_size;
@@ -69,6 +68,7 @@ struct rxe_queue *rxe_queue_init(struct rxe_dev *rxe,
 		goto err1;
 
 	q->rxe = rxe;
+	q->type = type;
 
 	/* used in resize, only need to copy used part of queue */
 	q->elem_size = elem_size;
@@ -136,7 +136,7 @@ int rxe_queue_resize(struct rxe_queue *q, unsigned int *num_elem_p,
 	int err;
 	unsigned long flags = 0, flags1;
 
-	new_q = rxe_queue_init(q->rxe, &num_elem, elem_size);
+	new_q = rxe_queue_init(q->rxe, &num_elem, elem_size, q->type);
 	if (!new_q)
 		return -ENOMEM;
 
diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
index 2902ca7b288c..4512745419f8 100644
--- a/drivers/infiniband/sw/rxe/rxe_queue.h
+++ b/drivers/infiniband/sw/rxe/rxe_queue.h
@@ -19,6 +19,13 @@
  * of the queue is one less than the number of element slots
  */
 
+/* type of queue */
+enum queue_type {
+	QUEUE_TYPE_KERNEL,
+	QUEUE_TYPE_TO_USER,
+	QUEUE_TYPE_FROM_USER,
+};
+
 struct rxe_queue {
 	struct rxe_dev		*rxe;
 	struct rxe_queue_buf	*buf;
@@ -27,6 +34,7 @@ struct rxe_queue {
 	size_t			elem_size;
 	unsigned int		log2_elem_size;
 	u32			index_mask;
+	enum queue_type		type;
 };
 
 int do_mmap_info(struct rxe_dev *rxe, struct mminfo __user *outbuf,
@@ -35,9 +43,8 @@ int do_mmap_info(struct rxe_dev *rxe, struct mminfo __user *outbuf,
 
 void rxe_queue_reset(struct rxe_queue *q);
 
-struct rxe_queue *rxe_queue_init(struct rxe_dev *rxe,
-				 int *num_elem,
-				 unsigned int elem_size);
+struct rxe_queue *rxe_queue_init(struct rxe_dev *rxe, int *num_elem,
+			unsigned int elem_size, enum queue_type type);
 
 int rxe_queue_resize(struct rxe_queue *q, unsigned int *num_elem_p,
 		     unsigned int elem_size, struct ib_udata *udata,
diff --git a/drivers/infiniband/sw/rxe/rxe_srq.c b/drivers/infiniband/sw/rxe/rxe_srq.c
index 41b0d1e11baf..52c5593741ec 100644
--- a/drivers/infiniband/sw/rxe/rxe_srq.c
+++ b/drivers/infiniband/sw/rxe/rxe_srq.c
@@ -78,6 +78,7 @@ int rxe_srq_from_init(struct rxe_dev *rxe, struct rxe_srq *srq,
 	int err;
 	int srq_wqe_size;
 	struct rxe_queue *q;
+	enum queue_type type;
 
 	srq->ibsrq.event_handler	= init->event_handler;
 	srq->ibsrq.srq_context		= init->srq_context;
@@ -91,8 +92,9 @@ int rxe_srq_from_init(struct rxe_dev *rxe, struct rxe_srq *srq,
 	spin_lock_init(&srq->rq.producer_lock);
 	spin_lock_init(&srq->rq.consumer_lock);
 
+	type = uresp ? QUEUE_TYPE_FROM_USER : QUEUE_TYPE_KERNEL;
 	q = rxe_queue_init(rxe, &srq->rq.max_wr,
-			   srq_wqe_size);
+			srq_wqe_size, type);
 	if (!q) {
 		pr_warn("unable to allocate queue for srq\n");
 		return -ENOMEM;
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores
  2021-05-26  4:51 [PATCH for-next v2 0/2] Fix memory ordering errors in queues Bob Pearson
  2021-05-26  4:51 ` [PATCH for-next v2 1/2] RDMA/rxe: Add a type flag to rxe_queue structs Bob Pearson
@ 2021-05-26  4:51 ` Bob Pearson
  2021-05-26 10:19   ` Zhu Yanjun
  2021-05-26 16:52   ` Jason Gunthorpe
  1 sibling, 2 replies; 6+ messages in thread
From: Bob Pearson @ 2021-05-26  4:51 UTC (permalink / raw)
  To: jgg, zyjzyj2000, linux-rdma; +Cc: Bob Pearson

Modify the queue APIs to protect all user space index loads
with smp_load_acquire() and all user space index stores with
smp_store_release(). Base this on the types of the queues which
can be one of ..KERNEL, ..FROM_USER, ..TO_USER. Kernel space
indices are protected by locks which also provide memory barriers.

Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
v2:
  In v2 use queue type to selectively protect user space indices.
---
 drivers/infiniband/sw/rxe/rxe_queue.h | 168 ++++++++++++++++++--------
 1 file changed, 117 insertions(+), 51 deletions(-)

diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
index 4512745419f8..6e705e09d357 100644
--- a/drivers/infiniband/sw/rxe/rxe_queue.h
+++ b/drivers/infiniband/sw/rxe/rxe_queue.h
@@ -66,12 +66,22 @@ static inline int queue_empty(struct rxe_queue *q)
 	u32 prod;
 	u32 cons;
 
-	/* make sure all changes to queue complete before
-	 * testing queue empty
-	 */
-	prod = smp_load_acquire(&q->buf->producer_index);
-	/* same */
-	cons = smp_load_acquire(&q->buf->consumer_index);
+	switch (q->type) {
+	case QUEUE_TYPE_FROM_USER:
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+		cons = q->buf->consumer_index;
+		break;
+	case QUEUE_TYPE_TO_USER:
+		prod = q->buf->producer_index;
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+		break;
+	case QUEUE_TYPE_KERNEL:
+		prod = q->buf->producer_index;
+		cons = q->buf->consumer_index;
+		break;
+	}
 
 	return ((prod - cons) & q->index_mask) == 0;
 }
@@ -81,95 +91,151 @@ static inline int queue_full(struct rxe_queue *q)
 	u32 prod;
 	u32 cons;
 
-	/* make sure all changes to queue complete before
-	 * testing queue full
-	 */
-	prod = smp_load_acquire(&q->buf->producer_index);
-	/* same */
-	cons = smp_load_acquire(&q->buf->consumer_index);
+	switch (q->type) {
+	case QUEUE_TYPE_FROM_USER:
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+		cons = q->buf->consumer_index;
+		break;
+	case QUEUE_TYPE_TO_USER:
+		prod = q->buf->producer_index;
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+		break;
+	case QUEUE_TYPE_KERNEL:
+		prod = q->buf->producer_index;
+		cons = q->buf->consumer_index;
+		break;
+	}
 
 	return ((prod + 1 - cons) & q->index_mask) == 0;
 }
 
-static inline void advance_producer(struct rxe_queue *q)
+static inline unsigned int queue_count(const struct rxe_queue *q)
 {
 	u32 prod;
+	u32 cons;
 
-	prod = (q->buf->producer_index + 1) & q->index_mask;
+	switch (q->type) {
+	case QUEUE_TYPE_FROM_USER:
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+		cons = q->buf->consumer_index;
+		break;
+	case QUEUE_TYPE_TO_USER:
+		prod = q->buf->producer_index;
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+		break;
+	case QUEUE_TYPE_KERNEL:
+		prod = q->buf->producer_index;
+		cons = q->buf->consumer_index;
+		break;
+	}
+
+	return (prod - cons) & q->index_mask;
+}
+
+static inline void advance_producer(struct rxe_queue *q)
+{
+	u32 prod;
 
-	/* make sure all changes to queue complete before
-	 * changing producer index
-	 */
-	smp_store_release(&q->buf->producer_index, prod);
+	if (q->type == QUEUE_TYPE_FROM_USER) {
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+		prod = (prod + 1) & q->index_mask;
+		/* same */
+		smp_store_release(&q->buf->producer_index, prod);
+	} else {
+		prod = q->buf->producer_index;
+		q->buf->producer_index = (prod + 1) & q->index_mask;
+	}
 }
 
 static inline void advance_consumer(struct rxe_queue *q)
 {
 	u32 cons;
 
-	cons = (q->buf->consumer_index + 1) & q->index_mask;
-
-	/* make sure all changes to queue complete before
-	 * changing consumer index
-	 */
-	smp_store_release(&q->buf->consumer_index, cons);
+	if (q->type == QUEUE_TYPE_TO_USER) {
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+		cons = (cons + 1) & q->index_mask;
+		/* same */
+		smp_store_release(&q->buf->consumer_index, cons);
+	} else {
+		cons = q->buf->consumer_index;
+		q->buf->consumer_index = (cons + 1) & q->index_mask;
+	}
 }
 
 static inline void *producer_addr(struct rxe_queue *q)
 {
-	return q->buf->data + ((q->buf->producer_index & q->index_mask)
-				<< q->log2_elem_size);
+	u32 prod;
+
+	if (q->type == QUEUE_TYPE_FROM_USER)
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+	else
+		prod = q->buf->producer_index;
+
+	return q->buf->data + ((prod & q->index_mask) << q->log2_elem_size);
 }
 
 static inline void *consumer_addr(struct rxe_queue *q)
 {
-	return q->buf->data + ((q->buf->consumer_index & q->index_mask)
-				<< q->log2_elem_size);
+	u32 cons;
+
+	if (q->type == QUEUE_TYPE_TO_USER)
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+	else
+		cons = q->buf->consumer_index;
+
+	return q->buf->data + ((cons & q->index_mask) << q->log2_elem_size);
 }
 
 static inline unsigned int producer_index(struct rxe_queue *q)
 {
-	u32 index;
+	u32 prod;
+
+	if (q->type == QUEUE_TYPE_FROM_USER)
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+	else
+		prod = q->buf->producer_index;
 
-	/* make sure all changes to queue
-	 * complete before getting producer index
-	 */
-	index = smp_load_acquire(&q->buf->producer_index);
-	index &= q->index_mask;
+	prod &= q->index_mask;
 
-	return index;
+	return prod;
 }
 
 static inline unsigned int consumer_index(struct rxe_queue *q)
 {
-	u32 index;
+	u32 cons;
 
-	/* make sure all changes to queue
-	 * complete before getting consumer index
-	 */
-	index = smp_load_acquire(&q->buf->consumer_index);
-	index &= q->index_mask;
+	if (q->type == QUEUE_TYPE_TO_USER)
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+	else
+		cons = q->buf->consumer_index;
 
-	return index;
+	cons &= q->index_mask;
+
+	return cons;
 }
 
-static inline void *addr_from_index(struct rxe_queue *q, unsigned int index)
+static inline void *addr_from_index(struct rxe_queue *q,
+				unsigned int index)
 {
 	return q->buf->data + ((index & q->index_mask)
 				<< q->buf->log2_elem_size);
 }
 
 static inline unsigned int index_from_addr(const struct rxe_queue *q,
-					   const void *addr)
+				const void *addr)
 {
 	return (((u8 *)addr - q->buf->data) >> q->log2_elem_size)
-		& q->index_mask;
-}
-
-static inline unsigned int queue_count(const struct rxe_queue *q)
-{
-	return (q->buf->producer_index - q->buf->consumer_index)
-		& q->index_mask;
+				& q->index_mask;
 }
 
 static inline void *queue_head(struct rxe_queue *q)
-- 
2.30.2


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores
  2021-05-26  4:51 ` [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores Bob Pearson
@ 2021-05-26 10:19   ` Zhu Yanjun
  2021-05-26 15:23     ` Pearson, Robert B
  2021-05-26 16:52   ` Jason Gunthorpe
  1 sibling, 1 reply; 6+ messages in thread
From: Zhu Yanjun @ 2021-05-26 10:19 UTC (permalink / raw)
  To: Bob Pearson; +Cc: Jason Gunthorpe, RDMA mailing list

On Wed, May 26, 2021 at 12:52 PM Bob Pearson <rpearsonhpe@gmail.com> wrote:
>
> Modify the queue APIs to protect all user space index loads
> with smp_load_acquire() and all user space index stores with
> smp_store_release(). Base this on the types of the queues which
> can be one of ..KERNEL, ..FROM_USER, ..TO_USER. Kernel space
> indices are protected by locks which also provide memory barriers.
>
> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
> ---
> v2:
>   In v2 use queue type to selectively protect user space indices.
> ---
>  drivers/infiniband/sw/rxe/rxe_queue.h | 168 ++++++++++++++++++--------
>  1 file changed, 117 insertions(+), 51 deletions(-)
>
> diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
> index 4512745419f8..6e705e09d357 100644
> --- a/drivers/infiniband/sw/rxe/rxe_queue.h
> +++ b/drivers/infiniband/sw/rxe/rxe_queue.h
> @@ -66,12 +66,22 @@ static inline int queue_empty(struct rxe_queue *q)
>         u32 prod;
>         u32 cons;
>
> -       /* make sure all changes to queue complete before
> -        * testing queue empty
> -        */
> -       prod = smp_load_acquire(&q->buf->producer_index);
> -       /* same */
> -       cons = smp_load_acquire(&q->buf->consumer_index);
> +       switch (q->type) {
> +       case QUEUE_TYPE_FROM_USER:
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +               cons = q->buf->consumer_index;
> +               break;
> +       case QUEUE_TYPE_TO_USER:
> +               prod = q->buf->producer_index;
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +               break;
> +       case QUEUE_TYPE_KERNEL:
> +               prod = q->buf->producer_index;
> +               cons = q->buf->consumer_index;
> +               break;
> +       }
>
>         return ((prod - cons) & q->index_mask) == 0;
>  }
> @@ -81,95 +91,151 @@ static inline int queue_full(struct rxe_queue *q)
>         u32 prod;
>         u32 cons;
>
> -       /* make sure all changes to queue complete before
> -        * testing queue full
> -        */
> -       prod = smp_load_acquire(&q->buf->producer_index);
> -       /* same */
> -       cons = smp_load_acquire(&q->buf->consumer_index);
> +       switch (q->type) {
> +       case QUEUE_TYPE_FROM_USER:
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +               cons = q->buf->consumer_index;
> +               break;
> +       case QUEUE_TYPE_TO_USER:
> +               prod = q->buf->producer_index;
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +               break;
> +       case QUEUE_TYPE_KERNEL:
> +               prod = q->buf->producer_index;
> +               cons = q->buf->consumer_index;
> +               break;
> +       }
>
>         return ((prod + 1 - cons) & q->index_mask) == 0;
>  }
>
> -static inline void advance_producer(struct rxe_queue *q)
> +static inline unsigned int queue_count(const struct rxe_queue *q)
>  {
>         u32 prod;
> +       u32 cons;
>
> -       prod = (q->buf->producer_index + 1) & q->index_mask;
> +       switch (q->type) {
> +       case QUEUE_TYPE_FROM_USER:
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +               cons = q->buf->consumer_index;
> +               break;
> +       case QUEUE_TYPE_TO_USER:
> +               prod = q->buf->producer_index;
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +               break;
> +       case QUEUE_TYPE_KERNEL:
> +               prod = q->buf->producer_index;
> +               cons = q->buf->consumer_index;
> +               break;
> +       }

The above source code appears in the functions queue_count, queue_full
and queue_empty.
So is it possible to use a seperate function to implement it?

Thanks
Zhu Yanjun

> +
> +       return (prod - cons) & q->index_mask;
> +}
> +
> +static inline void advance_producer(struct rxe_queue *q)
> +{
> +       u32 prod;
>
> -       /* make sure all changes to queue complete before
> -        * changing producer index
> -        */
> -       smp_store_release(&q->buf->producer_index, prod);
> +       if (q->type == QUEUE_TYPE_FROM_USER) {
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +               prod = (prod + 1) & q->index_mask;
> +               /* same */
> +               smp_store_release(&q->buf->producer_index, prod);
> +       } else {
> +               prod = q->buf->producer_index;
> +               q->buf->producer_index = (prod + 1) & q->index_mask;
> +       }
>  }
>
>  static inline void advance_consumer(struct rxe_queue *q)
>  {
>         u32 cons;
>
> -       cons = (q->buf->consumer_index + 1) & q->index_mask;
> -
> -       /* make sure all changes to queue complete before
> -        * changing consumer index
> -        */
> -       smp_store_release(&q->buf->consumer_index, cons);
> +       if (q->type == QUEUE_TYPE_TO_USER) {
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +               cons = (cons + 1) & q->index_mask;
> +               /* same */
> +               smp_store_release(&q->buf->consumer_index, cons);
> +       } else {
> +               cons = q->buf->consumer_index;
> +               q->buf->consumer_index = (cons + 1) & q->index_mask;
> +       }
>  }
>
>  static inline void *producer_addr(struct rxe_queue *q)
>  {
> -       return q->buf->data + ((q->buf->producer_index & q->index_mask)
> -                               << q->log2_elem_size);
> +       u32 prod;
> +
> +       if (q->type == QUEUE_TYPE_FROM_USER)
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +       else
> +               prod = q->buf->producer_index;
> +
> +       return q->buf->data + ((prod & q->index_mask) << q->log2_elem_size);
>  }
>
>  static inline void *consumer_addr(struct rxe_queue *q)
>  {
> -       return q->buf->data + ((q->buf->consumer_index & q->index_mask)
> -                               << q->log2_elem_size);
> +       u32 cons;
> +
> +       if (q->type == QUEUE_TYPE_TO_USER)
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +       else
> +               cons = q->buf->consumer_index;
> +
> +       return q->buf->data + ((cons & q->index_mask) << q->log2_elem_size);
>  }
>
>  static inline unsigned int producer_index(struct rxe_queue *q)
>  {
> -       u32 index;
> +       u32 prod;
> +
> +       if (q->type == QUEUE_TYPE_FROM_USER)
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +       else
> +               prod = q->buf->producer_index;
>
> -       /* make sure all changes to queue
> -        * complete before getting producer index
> -        */
> -       index = smp_load_acquire(&q->buf->producer_index);
> -       index &= q->index_mask;
> +       prod &= q->index_mask;
>
> -       return index;
> +       return prod;
>  }
>
>  static inline unsigned int consumer_index(struct rxe_queue *q)
>  {
> -       u32 index;
> +       u32 cons;
>
> -       /* make sure all changes to queue
> -        * complete before getting consumer index
> -        */
> -       index = smp_load_acquire(&q->buf->consumer_index);
> -       index &= q->index_mask;
> +       if (q->type == QUEUE_TYPE_TO_USER)
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +       else
> +               cons = q->buf->consumer_index;
>
> -       return index;
> +       cons &= q->index_mask;
> +
> +       return cons;
>  }
>
> -static inline void *addr_from_index(struct rxe_queue *q, unsigned int index)
> +static inline void *addr_from_index(struct rxe_queue *q,
> +                               unsigned int index)
>  {
>         return q->buf->data + ((index & q->index_mask)
>                                 << q->buf->log2_elem_size);
>  }
>
>  static inline unsigned int index_from_addr(const struct rxe_queue *q,
> -                                          const void *addr)
> +                               const void *addr)
>  {
>         return (((u8 *)addr - q->buf->data) >> q->log2_elem_size)
> -               & q->index_mask;
> -}
> -
> -static inline unsigned int queue_count(const struct rxe_queue *q)
> -{
> -       return (q->buf->producer_index - q->buf->consumer_index)
> -               & q->index_mask;
> +                               & q->index_mask;
>  }
>
>  static inline void *queue_head(struct rxe_queue *q)
> --
> 2.30.2
>

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores
  2021-05-26 10:19   ` Zhu Yanjun
@ 2021-05-26 15:23     ` Pearson, Robert B
  0 siblings, 0 replies; 6+ messages in thread
From: Pearson, Robert B @ 2021-05-26 15:23 UTC (permalink / raw)
  To: Zhu Yanjun; +Cc: Jason Gunthorpe, RDMA mailing list


On 5/26/2021 5:19 AM, Zhu Yanjun wrote:
> On Wed, May 26, 2021 at 12:52 PM Bob Pearson <rpearsonhpe@gmail.com> wrote:
>> Modify the queue APIs to protect all user space index loads
>> with smp_load_acquire() and all user space index stores with
>> smp_store_release(). Base this on the types of the queues which
>> can be one of ..KERNEL, ..FROM_USER, ..TO_USER. Kernel space
>> indices are protected by locks which also provide memory barriers.
>>
>> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
>> ---
>> v2:
>>    In v2 use queue type to selectively protect user space indices.
>> ---
>>   drivers/infiniband/sw/rxe/rxe_queue.h | 168 ++++++++++++++++++--------
>>   1 file changed, 117 insertions(+), 51 deletions(-)
>>
>> diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
>> index 4512745419f8..6e705e09d357 100644
>> --- a/drivers/infiniband/sw/rxe/rxe_queue.h
>> +++ b/drivers/infiniband/sw/rxe/rxe_queue.h
>> @@ -66,12 +66,22 @@ static inline int queue_empty(struct rxe_queue *q)
>>          u32 prod;
>>          u32 cons;
>>
>> -       /* make sure all changes to queue complete before
>> -        * testing queue empty
>> -        */
>> -       prod = smp_load_acquire(&q->buf->producer_index);
>> -       /* same */
>> -       cons = smp_load_acquire(&q->buf->consumer_index);
>> +       switch (q->type) {
>> +       case QUEUE_TYPE_FROM_USER:
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       case QUEUE_TYPE_TO_USER:
>> +               prod = q->buf->producer_index;
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +               break;
>> +       case QUEUE_TYPE_KERNEL:
>> +               prod = q->buf->producer_index;
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       }
>>
>>          return ((prod - cons) & q->index_mask) == 0;
>>   }
>> @@ -81,95 +91,151 @@ static inline int queue_full(struct rxe_queue *q)
>>          u32 prod;
>>          u32 cons;
>>
>> -       /* make sure all changes to queue complete before
>> -        * testing queue full
>> -        */
>> -       prod = smp_load_acquire(&q->buf->producer_index);
>> -       /* same */
>> -       cons = smp_load_acquire(&q->buf->consumer_index);
>> +       switch (q->type) {
>> +       case QUEUE_TYPE_FROM_USER:
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       case QUEUE_TYPE_TO_USER:
>> +               prod = q->buf->producer_index;
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +               break;
>> +       case QUEUE_TYPE_KERNEL:
>> +               prod = q->buf->producer_index;
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       }
>>
>>          return ((prod + 1 - cons) & q->index_mask) == 0;
>>   }
>>
>> -static inline void advance_producer(struct rxe_queue *q)
>> +static inline unsigned int queue_count(const struct rxe_queue *q)
>>   {
>>          u32 prod;
>> +       u32 cons;
>>
>> -       prod = (q->buf->producer_index + 1) & q->index_mask;
>> +       switch (q->type) {
>> +       case QUEUE_TYPE_FROM_USER:
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       case QUEUE_TYPE_TO_USER:
>> +               prod = q->buf->producer_index;
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +               break;
>> +       case QUEUE_TYPE_KERNEL:
>> +               prod = q->buf->producer_index;
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       }
> The above source code appears in the functions queue_count, queue_full
> and queue_empty.
> So is it possible to use a seperate function to implement it?
>
> Thanks
> Zhu Yanjun

Maybe, but I was trying to keep this inline. It won't save anything at 
runtime.

Bob

>
>> +
>> +       return (prod - cons) & q->index_mask;
>> +}
>> +
>> +static inline void advance_producer(struct rxe_queue *q)
>> +{
>> +       u32 prod;
>>
>> -       /* make sure all changes to queue complete before
>> -        * changing producer index
>> -        */
>> -       smp_store_release(&q->buf->producer_index, prod);
>> +       if (q->type == QUEUE_TYPE_FROM_USER) {
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +               prod = (prod + 1) & q->index_mask;
>> +               /* same */
>> +               smp_store_release(&q->buf->producer_index, prod);
>> +       } else {
>> +               prod = q->buf->producer_index;
>> +               q->buf->producer_index = (prod + 1) & q->index_mask;
>> +       }
>>   }
>>
>>   static inline void advance_consumer(struct rxe_queue *q)
>>   {
>>          u32 cons;
>>
>> -       cons = (q->buf->consumer_index + 1) & q->index_mask;
>> -
>> -       /* make sure all changes to queue complete before
>> -        * changing consumer index
>> -        */
>> -       smp_store_release(&q->buf->consumer_index, cons);
>> +       if (q->type == QUEUE_TYPE_TO_USER) {
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +               cons = (cons + 1) & q->index_mask;
>> +               /* same */
>> +               smp_store_release(&q->buf->consumer_index, cons);
>> +       } else {
>> +               cons = q->buf->consumer_index;
>> +               q->buf->consumer_index = (cons + 1) & q->index_mask;
>> +       }
>>   }
>>
>>   static inline void *producer_addr(struct rxe_queue *q)
>>   {
>> -       return q->buf->data + ((q->buf->producer_index & q->index_mask)
>> -                               << q->log2_elem_size);
>> +       u32 prod;
>> +
>> +       if (q->type == QUEUE_TYPE_FROM_USER)
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +       else
>> +               prod = q->buf->producer_index;
>> +
>> +       return q->buf->data + ((prod & q->index_mask) << q->log2_elem_size);
>>   }
>>
>>   static inline void *consumer_addr(struct rxe_queue *q)
>>   {
>> -       return q->buf->data + ((q->buf->consumer_index & q->index_mask)
>> -                               << q->log2_elem_size);
>> +       u32 cons;
>> +
>> +       if (q->type == QUEUE_TYPE_TO_USER)
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +       else
>> +               cons = q->buf->consumer_index;
>> +
>> +       return q->buf->data + ((cons & q->index_mask) << q->log2_elem_size);
>>   }
>>
>>   static inline unsigned int producer_index(struct rxe_queue *q)
>>   {
>> -       u32 index;
>> +       u32 prod;
>> +
>> +       if (q->type == QUEUE_TYPE_FROM_USER)
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +       else
>> +               prod = q->buf->producer_index;
>>
>> -       /* make sure all changes to queue
>> -        * complete before getting producer index
>> -        */
>> -       index = smp_load_acquire(&q->buf->producer_index);
>> -       index &= q->index_mask;
>> +       prod &= q->index_mask;
>>
>> -       return index;
>> +       return prod;
>>   }
>>
>>   static inline unsigned int consumer_index(struct rxe_queue *q)
>>   {
>> -       u32 index;
>> +       u32 cons;
>>
>> -       /* make sure all changes to queue
>> -        * complete before getting consumer index
>> -        */
>> -       index = smp_load_acquire(&q->buf->consumer_index);
>> -       index &= q->index_mask;
>> +       if (q->type == QUEUE_TYPE_TO_USER)
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +       else
>> +               cons = q->buf->consumer_index;
>>
>> -       return index;
>> +       cons &= q->index_mask;
>> +
>> +       return cons;
>>   }
>>
>> -static inline void *addr_from_index(struct rxe_queue *q, unsigned int index)
>> +static inline void *addr_from_index(struct rxe_queue *q,
>> +                               unsigned int index)
>>   {
>>          return q->buf->data + ((index & q->index_mask)
>>                                  << q->buf->log2_elem_size);
>>   }
>>
>>   static inline unsigned int index_from_addr(const struct rxe_queue *q,
>> -                                          const void *addr)
>> +                               const void *addr)
>>   {
>>          return (((u8 *)addr - q->buf->data) >> q->log2_elem_size)
>> -               & q->index_mask;
>> -}
>> -
>> -static inline unsigned int queue_count(const struct rxe_queue *q)
>> -{
>> -       return (q->buf->producer_index - q->buf->consumer_index)
>> -               & q->index_mask;
>> +                               & q->index_mask;
>>   }
>>
>>   static inline void *queue_head(struct rxe_queue *q)
>> --
>> 2.30.2
>>

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores
  2021-05-26  4:51 ` [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores Bob Pearson
  2021-05-26 10:19   ` Zhu Yanjun
@ 2021-05-26 16:52   ` Jason Gunthorpe
  1 sibling, 0 replies; 6+ messages in thread
From: Jason Gunthorpe @ 2021-05-26 16:52 UTC (permalink / raw)
  To: Bob Pearson; +Cc: zyjzyj2000, linux-rdma

On Tue, May 25, 2021 at 11:51:40PM -0500, Bob Pearson wrote:
> Modify the queue APIs to protect all user space index loads
> with smp_load_acquire() and all user space index stores with
> smp_store_release(). Base this on the types of the queues which
> can be one of ..KERNEL, ..FROM_USER, ..TO_USER. Kernel space
> indices are protected by locks which also provide memory barriers.
> 
> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
> v2:
>   In v2 use queue type to selectively protect user space indices.
>  drivers/infiniband/sw/rxe/rxe_queue.h | 168 ++++++++++++++++++--------
>  1 file changed, 117 insertions(+), 51 deletions(-)
> 
> diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
> index 4512745419f8..6e705e09d357 100644
> +++ b/drivers/infiniband/sw/rxe/rxe_queue.h
> @@ -66,12 +66,22 @@ static inline int queue_empty(struct rxe_queue *q)
>  	u32 prod;
>  	u32 cons;
>  
> -	/* make sure all changes to queue complete before
> -	 * testing queue empty
> -	 */
> -	prod = smp_load_acquire(&q->buf->producer_index);
> -	/* same */
> -	cons = smp_load_acquire(&q->buf->consumer_index);
> +	switch (q->type) {
> +	case QUEUE_TYPE_FROM_USER:
> +		/* protect user space index */
> +		prod = smp_load_acquire(&q->buf->producer_index);
> +		cons = q->buf->consumer_index;

The other issue is you can't store the kernel owned consumer_index in
the 'buf'

It should be stored in 'q' and only on write copied to be buf

Kernel never reads the user memory it writes to

This is why splitting it makes sense because it really needs to be
reading different memory, not just using the correct load primitive

Jason

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2021-05-26 16:52 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2021-05-26  4:51 [PATCH for-next v2 0/2] Fix memory ordering errors in queues Bob Pearson
2021-05-26  4:51 ` [PATCH for-next v2 1/2] RDMA/rxe: Add a type flag to rxe_queue structs Bob Pearson
2021-05-26  4:51 ` [PATCH for-next v2 2/2] RDMA/rxe: Protect user space index loads/stores Bob Pearson
2021-05-26 10:19   ` Zhu Yanjun
2021-05-26 15:23     ` Pearson, Robert B
2021-05-26 16:52   ` Jason Gunthorpe

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.