From mboxrd@z Thu Jan 1 00:00:00 1970 From: "Michael S. Tsirkin" Subject: [PATCH net-next 01/12] ptr_ring: keep consumer_head valid at all times Date: Fri, 26 Jan 2018 01:36:27 +0200 Message-ID: <1516923320-16959-2-git-send-email-mst@redhat.com> References: <1516923320-16959-1-git-send-email-mst@redhat.com> Mime-Version: 1.0 Content-Type: text/plain; charset=us-ascii Cc: netdev@vger.kernel.org, Jason Wang , John Fastabend , David Miller To: linux-kernel@vger.kernel.org Return-path: Content-Disposition: inline In-Reply-To: <1516923320-16959-1-git-send-email-mst@redhat.com> Sender: linux-kernel-owner@vger.kernel.org List-Id: netdev.vger.kernel.org The comment near __ptr_ring_peek says: * If ring is never resized, and if the pointer is merely * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. but this was in fact never possible since consumer_head would sometimes point outside the ring. Refactor the code so that it's always pointing within a ring. Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array") Signed-off-by: Michael S. Tsirkin --- include/linux/ptr_ring.h | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h index 9ca1726..5ebcdd4 100644 --- a/include/linux/ptr_ring.h +++ b/include/linux/ptr_ring.h @@ -248,22 +248,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) /* Fundamentally, what we want to do is update consumer * index and zero out the entry so producer can reuse it. * Doing it naively at each consume would be as simple as: - * r->queue[r->consumer++] = NULL; - * if (unlikely(r->consumer >= r->size)) - * r->consumer = 0; + * consumer = r->consumer; + * r->queue[consumer++] = NULL; + * if (unlikely(consumer >= r->size)) + * consumer = 0; + * r->consumer = consumer; * but that is suboptimal when the ring is full as producer is writing * out new entries in the same cache line. Defer these updates until a * batch of entries has been consumed. */ - int head = r->consumer_head++; + /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty + * to work correctly. + */ + int consumer_head = r->consumer_head; + int head = consumer_head++; /* Once we have processed enough entries invalidate them in * the ring all at once so producer can reuse their space in the ring. * We also do this when we reach end of the ring - not mandatory * but helps keep the implementation simple. */ - if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || - r->consumer_head >= r->size)) { + if (unlikely(consumer_head - r->consumer_tail >= r->batch || + consumer_head >= r->size)) { /* Zero out entries in the reverse order: this way we touch the * cache line that producer might currently be reading the last; * producer won't make progress and touch other cache lines @@ -271,12 +277,13 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) */ while (likely(head >= r->consumer_tail)) r->queue[head--] = NULL; - r->consumer_tail = r->consumer_head; + r->consumer_tail = consumer_head; } - if (unlikely(r->consumer_head >= r->size)) { - r->consumer_head = 0; + if (unlikely(consumer_head >= r->size)) { + consumer_head = 0; r->consumer_tail = 0; } + r->consumer_head = consumer_head; } static inline void *__ptr_ring_consume(struct ptr_ring *r) -- MST