* [PATCH net-next v5 1/8] __ptr_ring_full_next: Returns if ring will be full after next insertion
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
@ 2025-09-22 22:15 ` Simon Schippers
2025-09-22 22:15 ` [PATCH net-next v5 2/8] Move the decision of invalidation out of __ptr_ring_discard_one Simon Schippers
` (8 subsequent siblings)
9 siblings, 0 replies; 37+ messages in thread
From: Simon Schippers @ 2025-09-22 22:15 UTC (permalink / raw)
To: willemdebruijn.kernel, jasowang, mst, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
Cc: Simon Schippers, Tim Gebauer
Useful if the caller would like to act before the ptr_ring gets full after
the next __ptr_ring_produce call. Because __ptr_ring_produce has a
smp_wmb(), taking action before ensures memory ordering.
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
include/linux/ptr_ring.h | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 551329220e4f..c45e95071d7e 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -96,6 +96,28 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r)
return ret;
}
+/* Returns if the ptr_ring will be full after inserting the next ptr.
+ */
+static inline bool __ptr_ring_full_next(struct ptr_ring *r)
+{
+ int p;
+
+ /* Since __ptr_ring_discard_one invalidates in reverse order, the
+ * next producer entry might be NULL even though the current one
+ * is not. Therefore, also check the current producer entry with
+ * __ptr_ring_full.
+ */
+ if (unlikely(r->size <= 1 || __ptr_ring_full(r)))
+ return true;
+
+ p = r->producer + 1;
+
+ if (unlikely(p >= r->size))
+ p = 0;
+
+ return r->queue[p];
+}
+
/* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax(). Callers must hold producer_lock.
* Callers are responsible for making sure pointer that is being queued
--
2.43.0
^ permalink raw reply related [flat|nested] 37+ messages in thread* [PATCH net-next v5 2/8] Move the decision of invalidation out of __ptr_ring_discard_one
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
2025-09-22 22:15 ` [PATCH net-next v5 1/8] __ptr_ring_full_next: Returns if ring will be full after next insertion Simon Schippers
@ 2025-09-22 22:15 ` Simon Schippers
2025-09-22 22:15 ` [PATCH net-next v5 3/8] TUN, TAP & vhost_net: Stop netdev queue before reaching a full ptr_ring Simon Schippers
` (7 subsequent siblings)
9 siblings, 0 replies; 37+ messages in thread
From: Simon Schippers @ 2025-09-22 22:15 UTC (permalink / raw)
To: willemdebruijn.kernel, jasowang, mst, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
Cc: Simon Schippers, Tim Gebauer
__ptr_ring_will_invalidate is useful if the caller would like to act
before entries of the ptr_ring get invalidated by __ptr_ring_discard_one.
__ptr_ring_consume calls the new method and passes the result to
__ptr_ring_discard_one, preserving the pre-patch logic.
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
include/linux/ptr_ring.h | 32 ++++++++++++++++++++++----------
1 file changed, 22 insertions(+), 10 deletions(-)
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index c45e95071d7e..78fb3efedc7a 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -266,7 +266,22 @@ static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
}
/* Must only be called after __ptr_ring_peek returned !NULL */
-static inline void __ptr_ring_discard_one(struct ptr_ring *r)
+static inline bool __ptr_ring_will_invalidate(struct ptr_ring *r)
+{
+ /* Once we have processed enough entries invalidate them in
+ * the ring all at once so producer can reuse their space in the ring.
+ * We also do this when we reach end of the ring - not mandatory
+ * but helps keep the implementation simple.
+ */
+ int consumer_head = r->consumer_head + 1;
+
+ return consumer_head - r->consumer_tail >= r->batch ||
+ consumer_head >= r->size;
+}
+
+/* Must only be called after __ptr_ring_peek returned !NULL */
+static inline void __ptr_ring_discard_one(struct ptr_ring *r,
+ bool invalidate)
{
/* Fundamentally, what we want to do is update consumer
* index and zero out the entry so producer can reuse it.
@@ -286,13 +301,7 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
int consumer_head = r->consumer_head;
int head = consumer_head++;
- /* Once we have processed enough entries invalidate them in
- * the ring all at once so producer can reuse their space in the ring.
- * We also do this when we reach end of the ring - not mandatory
- * but helps keep the implementation simple.
- */
- if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
- consumer_head >= r->size)) {
+ if (unlikely(invalidate)) {
/* Zero out entries in the reverse order: this way we touch the
* cache line that producer might currently be reading the last;
* producer won't make progress and touch other cache lines
@@ -312,6 +321,7 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
static inline void *__ptr_ring_consume(struct ptr_ring *r)
{
+ bool invalidate;
void *ptr;
/* The READ_ONCE in __ptr_ring_peek guarantees that anyone
@@ -319,8 +329,10 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r)
* with smp_wmb in __ptr_ring_produce.
*/
ptr = __ptr_ring_peek(r);
- if (ptr)
- __ptr_ring_discard_one(r);
+ if (ptr) {
+ invalidate = __ptr_ring_will_invalidate(r);
+ __ptr_ring_discard_one(r, invalidate);
+ }
return ptr;
}
--
2.43.0
^ permalink raw reply related [flat|nested] 37+ messages in thread* [PATCH net-next v5 3/8] TUN, TAP & vhost_net: Stop netdev queue before reaching a full ptr_ring
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
2025-09-22 22:15 ` [PATCH net-next v5 1/8] __ptr_ring_full_next: Returns if ring will be full after next insertion Simon Schippers
2025-09-22 22:15 ` [PATCH net-next v5 2/8] Move the decision of invalidation out of __ptr_ring_discard_one Simon Schippers
@ 2025-09-22 22:15 ` Simon Schippers
2025-09-23 14:47 ` Michael S. Tsirkin
2025-09-22 22:15 ` [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry Simon Schippers
` (6 subsequent siblings)
9 siblings, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-22 22:15 UTC (permalink / raw)
To: willemdebruijn.kernel, jasowang, mst, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
Cc: Simon Schippers, Tim Gebauer
Stop the netdev queue ahead of __ptr_ring_produce when
__ptr_ring_full_next signals the ring is about to fill. Due to the
smp_wmb() of __ptr_ring_produce the consumer is guaranteed to be able to
notice the stopped netdev queue after seeing the new ptr_ring entry. As
both __ptr_ring_full_next and __ptr_ring_produce need the producer_lock,
the lock is held during the execution of both methods.
dev->lltx is disabled to ensure that tun_net_xmit is not called even
though the netdev queue is stopped (which happened in my testing,
resulting in rare packet drops). Consequently, the update of trans_start
in tun_net_xmit is also removed.
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
drivers/net/tun.c | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 86a9e927d0ff..c6b22af9bae8 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -931,7 +931,7 @@ static int tun_net_init(struct net_device *dev)
dev->vlan_features = dev->features &
~(NETIF_F_HW_VLAN_CTAG_TX |
NETIF_F_HW_VLAN_STAG_TX);
- dev->lltx = true;
+ dev->lltx = false;
tun->flags = (tun->flags & ~TUN_FEATURES) |
(ifr->ifr_flags & TUN_FEATURES);
@@ -1060,14 +1060,18 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
nf_reset_ct(skb);
- if (ptr_ring_produce(&tfile->tx_ring, skb)) {
+ queue = netdev_get_tx_queue(dev, txq);
+
+ spin_lock(&tfile->tx_ring.producer_lock);
+ if (__ptr_ring_full_next(&tfile->tx_ring))
+ netif_tx_stop_queue(queue);
+
+ if (unlikely(__ptr_ring_produce(&tfile->tx_ring, skb))) {
+ spin_unlock(&tfile->tx_ring.producer_lock);
drop_reason = SKB_DROP_REASON_FULL_RING;
goto drop;
}
-
- /* dev->lltx requires to do our own update of trans_start */
- queue = netdev_get_tx_queue(dev, txq);
- txq_trans_cond_update(queue);
+ spin_unlock(&tfile->tx_ring.producer_lock);
/* Notify and wake up reader process */
if (tfile->flags & TUN_FASYNC)
--
2.43.0
^ permalink raw reply related [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 3/8] TUN, TAP & vhost_net: Stop netdev queue before reaching a full ptr_ring
2025-09-22 22:15 ` [PATCH net-next v5 3/8] TUN, TAP & vhost_net: Stop netdev queue before reaching a full ptr_ring Simon Schippers
@ 2025-09-23 14:47 ` Michael S. Tsirkin
2025-09-24 5:41 ` Simon Schippers
0 siblings, 1 reply; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-23 14:47 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Tue, Sep 23, 2025 at 12:15:48AM +0200, Simon Schippers wrote:
> Stop the netdev queue ahead of __ptr_ring_produce when
> __ptr_ring_full_next signals the ring is about to fill. Due to the
> smp_wmb() of __ptr_ring_produce the consumer is guaranteed to be able to
> notice the stopped netdev queue after seeing the new ptr_ring entry. As
> both __ptr_ring_full_next and __ptr_ring_produce need the producer_lock,
> the lock is held during the execution of both methods.
>
> dev->lltx is disabled to ensure that tun_net_xmit is not called even
> though the netdev queue is stopped (which happened in my testing,
> resulting in rare packet drops). Consequently, the update of trans_start
> in tun_net_xmit is also removed.
>
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> ---
> drivers/net/tun.c | 16 ++++++++++------
> 1 file changed, 10 insertions(+), 6 deletions(-)
>
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index 86a9e927d0ff..c6b22af9bae8 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -931,7 +931,7 @@ static int tun_net_init(struct net_device *dev)
> dev->vlan_features = dev->features &
> ~(NETIF_F_HW_VLAN_CTAG_TX |
> NETIF_F_HW_VLAN_STAG_TX);
> - dev->lltx = true;
> + dev->lltx = false;
>
> tun->flags = (tun->flags & ~TUN_FEATURES) |
> (ifr->ifr_flags & TUN_FEATURES);
> @@ -1060,14 +1060,18 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>
> nf_reset_ct(skb);
>
> - if (ptr_ring_produce(&tfile->tx_ring, skb)) {
> + queue = netdev_get_tx_queue(dev, txq);
> +
> + spin_lock(&tfile->tx_ring.producer_lock);
> + if (__ptr_ring_full_next(&tfile->tx_ring))
> + netif_tx_stop_queue(queue);
> +
> + if (unlikely(__ptr_ring_produce(&tfile->tx_ring, skb))) {
> + spin_unlock(&tfile->tx_ring.producer_lock);
> drop_reason = SKB_DROP_REASON_FULL_RING;
> goto drop;
> }
The comment makes it sound like you always keep one slot free
in the queue but that is not the case - you just
check before calling __ptr_ring_produce.
But it is racy isn't it? So first of all I suspect you
are missing an mb before netif_tx_stop_queue.
Second it's racy because more entries can get freed
afterwards. Which maybe is ok in this instance?
But it really should be explained in more detail, if so.
Now - why not just check ring full *after* __ptr_ring_produce?
Why do we need all these new APIs, and we can
use existing ones which at least are not so hard to understand.
> -
> - /* dev->lltx requires to do our own update of trans_start */
> - queue = netdev_get_tx_queue(dev, txq);
> - txq_trans_cond_update(queue);
> + spin_unlock(&tfile->tx_ring.producer_lock);
>
> /* Notify and wake up reader process */
> if (tfile->flags & TUN_FASYNC)
> --
> 2.43.0
^ permalink raw reply [flat|nested] 37+ messages in thread* [PATCH net-next v5 3/8] TUN, TAP & vhost_net: Stop netdev queue before reaching a full ptr_ring
2025-09-23 14:47 ` Michael S. Tsirkin
@ 2025-09-24 5:41 ` Simon Schippers
2025-09-24 5:50 ` Michael S. Tsirkin
0 siblings, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-24 5:41 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
Hi,
first of all thank you very much for your detailed replies! :)
On 23.09.25 16:47, Michael S. Tsirkin wrote:
> On Tue, Sep 23, 2025 at 12:15:48AM +0200, Simon Schippers wrote:
>> Stop the netdev queue ahead of __ptr_ring_produce when
>> __ptr_ring_full_next signals the ring is about to fill. Due to the
>> smp_wmb() of __ptr_ring_produce the consumer is guaranteed to be able to
>> notice the stopped netdev queue after seeing the new ptr_ring entry. As
>> both __ptr_ring_full_next and __ptr_ring_produce need the producer_lock,
>> the lock is held during the execution of both methods.
>>
>> dev->lltx is disabled to ensure that tun_net_xmit is not called even
>> though the netdev queue is stopped (which happened in my testing,
>> resulting in rare packet drops). Consequently, the update of trans_start
>> in tun_net_xmit is also removed.
>>
>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>> ---
>> drivers/net/tun.c | 16 ++++++++++------
>> 1 file changed, 10 insertions(+), 6 deletions(-)
>>
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index 86a9e927d0ff..c6b22af9bae8 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -931,7 +931,7 @@ static int tun_net_init(struct net_device *dev)
>> dev->vlan_features = dev->features &
>> ~(NETIF_F_HW_VLAN_CTAG_TX |
>> NETIF_F_HW_VLAN_STAG_TX);
>> - dev->lltx = true;
>> + dev->lltx = false;
>>
>> tun->flags = (tun->flags & ~TUN_FEATURES) |
>> (ifr->ifr_flags & TUN_FEATURES);
>> @@ -1060,14 +1060,18 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>
>> nf_reset_ct(skb);
>>
>> - if (ptr_ring_produce(&tfile->tx_ring, skb)) {
>> + queue = netdev_get_tx_queue(dev, txq);
>> +
>> + spin_lock(&tfile->tx_ring.producer_lock);
>> + if (__ptr_ring_full_next(&tfile->tx_ring))
>> + netif_tx_stop_queue(queue);
>> +
>> + if (unlikely(__ptr_ring_produce(&tfile->tx_ring, skb))) {
>> + spin_unlock(&tfile->tx_ring.producer_lock);
>> drop_reason = SKB_DROP_REASON_FULL_RING;
>> goto drop;
>> }
>
> The comment makes it sound like you always keep one slot free
> in the queue but that is not the case - you just
> check before calling __ptr_ring_produce.
>
I agree.
>
> But it is racy isn't it? So first of all I suspect you
> are missing an mb before netif_tx_stop_queue.
>
I don’t really get this point right now.
> Second it's racy because more entries can get freed
> afterwards. Which maybe is ok in this instance?
> But it really should be explained in more detail, if so.
>
Will be covered in the next mail.
>
>
> Now - why not just check ring full *after* __ptr_ring_produce?
> Why do we need all these new APIs, and we can
> use existing ones which at least are not so hard to understand.
>
>
You convinced me about changing my implementation anyway but here my (old)
idea:
I did this in V1-V4. The problem is that vhost_net is only called on
EPOLLIN triggered by tun_net_xmit. Then, after consuming a batch from the
ptr_ring, it must be able to see if the netdev queue stopped or not. If
this is not the case the ptr_ring might get empty and vhost_net is not
able to wake the queue again (because it is not stopped from its POV),
which happened in my testing in my V4.
This is the reason why, now in the V5, in tun_net_xmit I stop the netdev
queue before producing. With that I exploit the smp_wmb() in
__ptr_ring_produce which is paired with the READ_ONCE in __ptr_ring_peek
to ensure that the consumer in vhost_net sees that the netdev queue
stopped after consuming a batch.
>
>
>> -
>> - /* dev->lltx requires to do our own update of trans_start */
>> - queue = netdev_get_tx_queue(dev, txq);
>> - txq_trans_cond_update(queue);
>> + spin_unlock(&tfile->tx_ring.producer_lock);
>>
>> /* Notify and wake up reader process */
>> if (tfile->flags & TUN_FASYNC)
>> --
>> 2.43.0
>
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 3/8] TUN, TAP & vhost_net: Stop netdev queue before reaching a full ptr_ring
2025-09-24 5:41 ` Simon Schippers
@ 2025-09-24 5:50 ` Michael S. Tsirkin
0 siblings, 0 replies; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 5:50 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Wed, Sep 24, 2025 at 07:41:28AM +0200, Simon Schippers wrote:
> Hi,
> first of all thank you very much for your detailed replies! :)
>
> On 23.09.25 16:47, Michael S. Tsirkin wrote:
> > On Tue, Sep 23, 2025 at 12:15:48AM +0200, Simon Schippers wrote:
> >> Stop the netdev queue ahead of __ptr_ring_produce when
> >> __ptr_ring_full_next signals the ring is about to fill. Due to the
> >> smp_wmb() of __ptr_ring_produce the consumer is guaranteed to be able to
> >> notice the stopped netdev queue after seeing the new ptr_ring entry. As
> >> both __ptr_ring_full_next and __ptr_ring_produce need the producer_lock,
> >> the lock is held during the execution of both methods.
> >>
> >> dev->lltx is disabled to ensure that tun_net_xmit is not called even
> >> though the netdev queue is stopped (which happened in my testing,
> >> resulting in rare packet drops). Consequently, the update of trans_start
> >> in tun_net_xmit is also removed.
> >>
> >> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> >> ---
> >> drivers/net/tun.c | 16 ++++++++++------
> >> 1 file changed, 10 insertions(+), 6 deletions(-)
> >>
> >> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> >> index 86a9e927d0ff..c6b22af9bae8 100644
> >> --- a/drivers/net/tun.c
> >> +++ b/drivers/net/tun.c
> >> @@ -931,7 +931,7 @@ static int tun_net_init(struct net_device *dev)
> >> dev->vlan_features = dev->features &
> >> ~(NETIF_F_HW_VLAN_CTAG_TX |
> >> NETIF_F_HW_VLAN_STAG_TX);
> >> - dev->lltx = true;
> >> + dev->lltx = false;
> >>
> >> tun->flags = (tun->flags & ~TUN_FEATURES) |
> >> (ifr->ifr_flags & TUN_FEATURES);
> >> @@ -1060,14 +1060,18 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
> >>
> >> nf_reset_ct(skb);
> >>
> >> - if (ptr_ring_produce(&tfile->tx_ring, skb)) {
> >> + queue = netdev_get_tx_queue(dev, txq);
> >> +
> >> + spin_lock(&tfile->tx_ring.producer_lock);
> >> + if (__ptr_ring_full_next(&tfile->tx_ring))
> >> + netif_tx_stop_queue(queue);
> >> +
> >> + if (unlikely(__ptr_ring_produce(&tfile->tx_ring, skb))) {
> >> + spin_unlock(&tfile->tx_ring.producer_lock);
> >> drop_reason = SKB_DROP_REASON_FULL_RING;
> >> goto drop;
> >> }
> >
> > The comment makes it sound like you always keep one slot free
> > in the queue but that is not the case - you just
> > check before calling __ptr_ring_produce.
> >
>
> I agree.
>
> >
> > But it is racy isn't it? So first of all I suspect you
> > are missing an mb before netif_tx_stop_queue.
> >
>
> I don’t really get this point right now.
ring full next is a read. stop queue is a write. if you are
relying on ordering them in some way you need a full mb generally.
> > Second it's racy because more entries can get freed
> > afterwards. Which maybe is ok in this instance?
> > But it really should be explained in more detail, if so.
> >
>
> Will be covered in the next mail.
>
> >
> >
> > Now - why not just check ring full *after* __ptr_ring_produce?
> > Why do we need all these new APIs, and we can
> > use existing ones which at least are not so hard to understand.
> >
> >
>
> You convinced me about changing my implementation anyway but here my (old)
> idea:
> I did this in V1-V4. The problem is that vhost_net is only called on
> EPOLLIN triggered by tun_net_xmit. Then, after consuming a batch from the
> ptr_ring, it must be able to see if the netdev queue stopped or not. If
> this is not the case the ptr_ring might get empty and vhost_net is not
> able to wake the queue again (because it is not stopped from its POV),
> which happened in my testing in my V4.
>
> This is the reason why, now in the V5, in tun_net_xmit I stop the netdev
> queue before producing. With that I exploit the smp_wmb() in
> __ptr_ring_produce which is paired with the READ_ONCE in __ptr_ring_peek
> to ensure that the consumer in vhost_net sees that the netdev queue
> stopped after consuming a batch.
yea you said it somewhere in code, too, and I am not sure I understand it all, but
wmb isn't paired with READ_ONCE generally. barrier pairing
is described in memory-barriers.txt, READ_ONCE is not a barrier
at all.
> >
> >
> >> -
> >> - /* dev->lltx requires to do our own update of trans_start */
> >> - queue = netdev_get_tx_queue(dev, txq);
> >> - txq_trans_cond_update(queue);
> >> + spin_unlock(&tfile->tx_ring.producer_lock);
> >>
> >> /* Notify and wake up reader process */
> >> if (tfile->flags & TUN_FASYNC)
> >> --
> >> 2.43.0
> >
^ permalink raw reply [flat|nested] 37+ messages in thread
* [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
` (2 preceding siblings ...)
2025-09-22 22:15 ` [PATCH net-next v5 3/8] TUN, TAP & vhost_net: Stop netdev queue before reaching a full ptr_ring Simon Schippers
@ 2025-09-22 22:15 ` Simon Schippers
2025-09-23 14:54 ` Michael S. Tsirkin
2025-09-23 16:36 ` Michael S. Tsirkin
2025-09-22 22:15 ` [PATCH net-next v5 5/8] TUN & TAP: Provide ptr_ring_consume_batched wrappers for vhost_net Simon Schippers
` (5 subsequent siblings)
9 siblings, 2 replies; 37+ messages in thread
From: Simon Schippers @ 2025-09-22 22:15 UTC (permalink / raw)
To: willemdebruijn.kernel, jasowang, mst, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
Cc: Simon Schippers, Tim Gebauer
The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
entry of the ptr_ring and then waking the netdev queue when entries got
invalidated to be used again by the producer.
To avoid waking the netdev queue when the ptr_ring is full, it is checked
if the netdev queue is stopped before invalidating entries. Like that the
netdev queue can be safely woken after invalidating entries.
The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
__ptr_ring_produce within tun_net_xmit guarantees that the information
about the netdev queue being stopped is visible after __ptr_ring_peek is
called.
The netdev queue is also woken after resizing the ptr_ring.
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 88 insertions(+), 3 deletions(-)
diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 1197f245e873..f8292721a9d6 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
return ret ? ret : total;
}
+static struct sk_buff *tap_ring_consume(struct tap_queue *q)
+{
+ struct netdev_queue *txq;
+ struct net_device *dev;
+ bool will_invalidate;
+ bool stopped;
+ void *ptr;
+
+ spin_lock(&q->ring.consumer_lock);
+ ptr = __ptr_ring_peek(&q->ring);
+ if (!ptr) {
+ spin_unlock(&q->ring.consumer_lock);
+ return ptr;
+ }
+
+ /* Check if the queue stopped before zeroing out, so no ptr get
+ * produced in the meantime, because this could result in waking
+ * even though the ptr_ring is full. The order of the operations
+ * is ensured by barrier().
+ */
+ will_invalidate = __ptr_ring_will_invalidate(&q->ring);
+ if (unlikely(will_invalidate)) {
+ rcu_read_lock();
+ dev = rcu_dereference(q->tap)->dev;
+ txq = netdev_get_tx_queue(dev, q->queue_index);
+ stopped = netif_tx_queue_stopped(txq);
+ }
+ barrier();
+ __ptr_ring_discard_one(&q->ring, will_invalidate);
+
+ if (unlikely(will_invalidate)) {
+ if (stopped)
+ netif_tx_wake_queue(txq);
+ rcu_read_unlock();
+ }
+ spin_unlock(&q->ring.consumer_lock);
+
+ return ptr;
+}
+
static ssize_t tap_do_read(struct tap_queue *q,
struct iov_iter *to,
int noblock, struct sk_buff *skb)
@@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
TASK_INTERRUPTIBLE);
/* Read frames from the queue */
- skb = ptr_ring_consume(&q->ring);
+ skb = tap_ring_consume(q);
if (skb)
break;
if (noblock) {
@@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
ret = ptr_ring_resize_multiple_bh(rings, n,
dev->tx_queue_len, GFP_KERNEL,
__skb_array_destroy_skb);
+ if (netif_running(dev))
+ netif_tx_wake_all_queues(dev);
kfree(rings);
return ret;
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index c6b22af9bae8..682df8157b55 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
return total;
}
+static void *tun_ring_consume(struct tun_file *tfile)
+{
+ struct netdev_queue *txq;
+ struct net_device *dev;
+ bool will_invalidate;
+ bool stopped;
+ void *ptr;
+
+ spin_lock(&tfile->tx_ring.consumer_lock);
+ ptr = __ptr_ring_peek(&tfile->tx_ring);
+ if (!ptr) {
+ spin_unlock(&tfile->tx_ring.consumer_lock);
+ return ptr;
+ }
+
+ /* Check if the queue stopped before zeroing out, so no ptr get
+ * produced in the meantime, because this could result in waking
+ * even though the ptr_ring is full. The order of the operations
+ * is ensured by barrier().
+ */
+ will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
+ if (unlikely(will_invalidate)) {
+ rcu_read_lock();
+ dev = rcu_dereference(tfile->tun)->dev;
+ txq = netdev_get_tx_queue(dev, tfile->queue_index);
+ stopped = netif_tx_queue_stopped(txq);
+ }
+ barrier();
+ __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
+
+ if (unlikely(will_invalidate)) {
+ if (stopped)
+ netif_tx_wake_queue(txq);
+ rcu_read_unlock();
+ }
+ spin_unlock(&tfile->tx_ring.consumer_lock);
+
+ return ptr;
+}
+
static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
{
DECLARE_WAITQUEUE(wait, current);
void *ptr = NULL;
int error = 0;
- ptr = ptr_ring_consume(&tfile->tx_ring);
+ ptr = tun_ring_consume(tfile);
if (ptr)
goto out;
if (noblock) {
@@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
while (1) {
set_current_state(TASK_INTERRUPTIBLE);
- ptr = ptr_ring_consume(&tfile->tx_ring);
+ ptr = tun_ring_consume(tfile);
if (ptr)
break;
if (signal_pending(current)) {
@@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
dev->tx_queue_len, GFP_KERNEL,
tun_ptr_free);
+ if (netif_running(dev))
+ netif_tx_wake_all_queues(dev);
+
kfree(rings);
return ret;
}
--
2.43.0
^ permalink raw reply related [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-22 22:15 ` [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry Simon Schippers
@ 2025-09-23 14:54 ` Michael S. Tsirkin
2025-09-23 16:36 ` Michael S. Tsirkin
1 sibling, 0 replies; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-23 14:54 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
> entry of the ptr_ring and then waking the netdev queue when entries got
> invalidated to be used again by the producer.
> To avoid waking the netdev queue when the ptr_ring is full, it is checked
> if the netdev queue is stopped before invalidating entries. Like that the
> netdev queue can be safely woken after invalidating entries.
>
> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
> __ptr_ring_produce within tun_net_xmit guarantees that the information
> about the netdev queue being stopped is visible after __ptr_ring_peek is
> called.
>
> The netdev queue is also woken after resizing the ptr_ring.
>
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
Sounds like there is subtle interplay here between queue stopped bit and
ring full status, all lockless with fancy ordering tricks. I have to
say this is fragile. I'd like to see much more documentation.
Or alternatively, I ask myself if, after detecting flow control
issues, it is possible to just use a spinlock to synchronize,
somehow.
> ---
> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
> 2 files changed, 88 insertions(+), 3 deletions(-)
>
> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
> index 1197f245e873..f8292721a9d6 100644
> --- a/drivers/net/tap.c
> +++ b/drivers/net/tap.c
> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
> return ret ? ret : total;
> }
>
> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
> +{
> + struct netdev_queue *txq;
> + struct net_device *dev;
> + bool will_invalidate;
> + bool stopped;
> + void *ptr;
> +
> + spin_lock(&q->ring.consumer_lock);
> + ptr = __ptr_ring_peek(&q->ring);
> + if (!ptr) {
> + spin_unlock(&q->ring.consumer_lock);
> + return ptr;
> + }
> +
> + /* Check if the queue stopped before zeroing out, so no ptr get
> + * produced in the meantime, because this could result in waking
> + * even though the ptr_ring is full. The order of the operations
which operations, I don't get it? it's unusual for barrier()
to be effective. are you trying to order the read in netif_tx_queue_stopped
versus the read in __ptr_ring_discard_one?
Then that would seem to need smp_rmb and accordingly smp_wmb in the
producing side.
> + * is ensured by barrier().
> + */
> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
> + if (unlikely(will_invalidate)) {
> + rcu_read_lock();
> + dev = rcu_dereference(q->tap)->dev;
> + txq = netdev_get_tx_queue(dev, q->queue_index);
> + stopped = netif_tx_queue_stopped(txq);
> + }
> + barrier();
> + __ptr_ring_discard_one(&q->ring, will_invalidate);
> +
> + if (unlikely(will_invalidate)) {
> + if (stopped)
> + netif_tx_wake_queue(txq);
> + rcu_read_unlock();
> + }
> + spin_unlock(&q->ring.consumer_lock);
> +
> + return ptr;
> +}
> +
> static ssize_t tap_do_read(struct tap_queue *q,
> struct iov_iter *to,
> int noblock, struct sk_buff *skb)
> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
> TASK_INTERRUPTIBLE);
>
> /* Read frames from the queue */
> - skb = ptr_ring_consume(&q->ring);
> + skb = tap_ring_consume(q);
> if (skb)
> break;
> if (noblock) {
> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
> ret = ptr_ring_resize_multiple_bh(rings, n,
> dev->tx_queue_len, GFP_KERNEL,
> __skb_array_destroy_skb);
> + if (netif_running(dev))
> + netif_tx_wake_all_queues(dev);
>
> kfree(rings);
> return ret;
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index c6b22af9bae8..682df8157b55 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
> return total;
> }
>
> +static void *tun_ring_consume(struct tun_file *tfile)
> +{
> + struct netdev_queue *txq;
> + struct net_device *dev;
> + bool will_invalidate;
> + bool stopped;
> + void *ptr;
> +
> + spin_lock(&tfile->tx_ring.consumer_lock);
> + ptr = __ptr_ring_peek(&tfile->tx_ring);
> + if (!ptr) {
> + spin_unlock(&tfile->tx_ring.consumer_lock);
> + return ptr;
> + }
> +
> + /* Check if the queue stopped before zeroing out, so no ptr get
> + * produced in the meantime, because this could result in waking
> + * even though the ptr_ring is full. The order of the operations
> + * is ensured by barrier().
> + */
> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
> + if (unlikely(will_invalidate)) {
> + rcu_read_lock();
> + dev = rcu_dereference(tfile->tun)->dev;
> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
> + stopped = netif_tx_queue_stopped(txq);
> + }
> + barrier();
> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
> +
> + if (unlikely(will_invalidate)) {
> + if (stopped)
> + netif_tx_wake_queue(txq);
> + rcu_read_unlock();
> + }
> + spin_unlock(&tfile->tx_ring.consumer_lock);
> +
> + return ptr;
> +}
> +
> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> {
> DECLARE_WAITQUEUE(wait, current);
> void *ptr = NULL;
> int error = 0;
>
> - ptr = ptr_ring_consume(&tfile->tx_ring);
> + ptr = tun_ring_consume(tfile);
> if (ptr)
> goto out;
> if (noblock) {
> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>
> while (1) {
> set_current_state(TASK_INTERRUPTIBLE);
> - ptr = ptr_ring_consume(&tfile->tx_ring);
> + ptr = tun_ring_consume(tfile);
> if (ptr)
> break;
> if (signal_pending(current)) {
> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
> dev->tx_queue_len, GFP_KERNEL,
> tun_ptr_free);
>
> + if (netif_running(dev))
> + netif_tx_wake_all_queues(dev);
> +
> kfree(rings);
> return ret;
> }
> --
> 2.43.0
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-22 22:15 ` [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry Simon Schippers
2025-09-23 14:54 ` Michael S. Tsirkin
@ 2025-09-23 16:36 ` Michael S. Tsirkin
2025-09-24 5:56 ` Simon Schippers
2025-09-28 21:27 ` Simon Schippers
1 sibling, 2 replies; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-23 16:36 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
> entry of the ptr_ring and then waking the netdev queue when entries got
> invalidated to be used again by the producer.
> To avoid waking the netdev queue when the ptr_ring is full, it is checked
> if the netdev queue is stopped before invalidating entries. Like that the
> netdev queue can be safely woken after invalidating entries.
>
> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
> __ptr_ring_produce within tun_net_xmit guarantees that the information
> about the netdev queue being stopped is visible after __ptr_ring_peek is
> called.
>
> The netdev queue is also woken after resizing the ptr_ring.
>
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> ---
> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
> 2 files changed, 88 insertions(+), 3 deletions(-)
>
> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
> index 1197f245e873..f8292721a9d6 100644
> --- a/drivers/net/tap.c
> +++ b/drivers/net/tap.c
> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
> return ret ? ret : total;
> }
>
> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
> +{
> + struct netdev_queue *txq;
> + struct net_device *dev;
> + bool will_invalidate;
> + bool stopped;
> + void *ptr;
> +
> + spin_lock(&q->ring.consumer_lock);
> + ptr = __ptr_ring_peek(&q->ring);
> + if (!ptr) {
> + spin_unlock(&q->ring.consumer_lock);
> + return ptr;
> + }
> +
> + /* Check if the queue stopped before zeroing out, so no ptr get
> + * produced in the meantime, because this could result in waking
> + * even though the ptr_ring is full.
So what? Maybe it would be a bit suboptimal? But with your design, I do
not get what prevents this:
stopped? -> No
ring is stopped
discard
and queue stays stopped forever
> The order of the operations
> + * is ensured by barrier().
> + */
> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
> + if (unlikely(will_invalidate)) {
> + rcu_read_lock();
> + dev = rcu_dereference(q->tap)->dev;
> + txq = netdev_get_tx_queue(dev, q->queue_index);
> + stopped = netif_tx_queue_stopped(txq);
> + }
> + barrier();
> + __ptr_ring_discard_one(&q->ring, will_invalidate);
> +
> + if (unlikely(will_invalidate)) {
> + if (stopped)
> + netif_tx_wake_queue(txq);
> + rcu_read_unlock();
> + }
After an entry is consumed, you can detect this by checking
r->consumer_head >= r->consumer_tail
so it seems you could keep calling regular ptr_ring_consume
and check afterwards?
> + spin_unlock(&q->ring.consumer_lock);
> +
> + return ptr;
> +}
> +
> static ssize_t tap_do_read(struct tap_queue *q,
> struct iov_iter *to,
> int noblock, struct sk_buff *skb)
> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
> TASK_INTERRUPTIBLE);
>
> /* Read frames from the queue */
> - skb = ptr_ring_consume(&q->ring);
> + skb = tap_ring_consume(q);
> if (skb)
> break;
> if (noblock) {
> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
> ret = ptr_ring_resize_multiple_bh(rings, n,
> dev->tx_queue_len, GFP_KERNEL,
> __skb_array_destroy_skb);
> + if (netif_running(dev))
> + netif_tx_wake_all_queues(dev);
>
> kfree(rings);
> return ret;
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index c6b22af9bae8..682df8157b55 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
> return total;
> }
>
> +static void *tun_ring_consume(struct tun_file *tfile)
> +{
> + struct netdev_queue *txq;
> + struct net_device *dev;
> + bool will_invalidate;
> + bool stopped;
> + void *ptr;
> +
> + spin_lock(&tfile->tx_ring.consumer_lock);
> + ptr = __ptr_ring_peek(&tfile->tx_ring);
> + if (!ptr) {
> + spin_unlock(&tfile->tx_ring.consumer_lock);
> + return ptr;
> + }
> +
> + /* Check if the queue stopped before zeroing out, so no ptr get
> + * produced in the meantime, because this could result in waking
> + * even though the ptr_ring is full. The order of the operations
> + * is ensured by barrier().
> + */
> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
> + if (unlikely(will_invalidate)) {
> + rcu_read_lock();
> + dev = rcu_dereference(tfile->tun)->dev;
> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
> + stopped = netif_tx_queue_stopped(txq);
> + }
> + barrier();
> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
> +
> + if (unlikely(will_invalidate)) {
> + if (stopped)
> + netif_tx_wake_queue(txq);
> + rcu_read_unlock();
> + }
> + spin_unlock(&tfile->tx_ring.consumer_lock);
> +
> + return ptr;
> +}
> +
> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> {
> DECLARE_WAITQUEUE(wait, current);
> void *ptr = NULL;
> int error = 0;
>
> - ptr = ptr_ring_consume(&tfile->tx_ring);
> + ptr = tun_ring_consume(tfile);
> if (ptr)
> goto out;
> if (noblock) {
> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>
> while (1) {
> set_current_state(TASK_INTERRUPTIBLE);
> - ptr = ptr_ring_consume(&tfile->tx_ring);
> + ptr = tun_ring_consume(tfile);
> if (ptr)
> break;
> if (signal_pending(current)) {
> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
> dev->tx_queue_len, GFP_KERNEL,
> tun_ptr_free);
>
> + if (netif_running(dev))
> + netif_tx_wake_all_queues(dev);
> +
> kfree(rings);
> return ret;
> }
> --
> 2.43.0
^ permalink raw reply [flat|nested] 37+ messages in thread* [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-23 16:36 ` Michael S. Tsirkin
@ 2025-09-24 5:56 ` Simon Schippers
2025-09-24 6:55 ` Michael S. Tsirkin
2025-09-28 21:27 ` Simon Schippers
1 sibling, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-24 5:56 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On 23.09.25 18:36, Michael S. Tsirkin wrote:
> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
>> entry of the ptr_ring and then waking the netdev queue when entries got
>> invalidated to be used again by the producer.
>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
>> if the netdev queue is stopped before invalidating entries. Like that the
>> netdev queue can be safely woken after invalidating entries.
>>
>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
>> __ptr_ring_produce within tun_net_xmit guarantees that the information
>> about the netdev queue being stopped is visible after __ptr_ring_peek is
>> called.
>>
>> The netdev queue is also woken after resizing the ptr_ring.
>>
>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>> ---
>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
>> 2 files changed, 88 insertions(+), 3 deletions(-)
>>
>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
>> index 1197f245e873..f8292721a9d6 100644
>> --- a/drivers/net/tap.c
>> +++ b/drivers/net/tap.c
>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
>> return ret ? ret : total;
>> }
>>
>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
>> +{
>> + struct netdev_queue *txq;
>> + struct net_device *dev;
>> + bool will_invalidate;
>> + bool stopped;
>> + void *ptr;
>> +
>> + spin_lock(&q->ring.consumer_lock);
>> + ptr = __ptr_ring_peek(&q->ring);
>> + if (!ptr) {
>> + spin_unlock(&q->ring.consumer_lock);
>> + return ptr;
>> + }
>> +
>> + /* Check if the queue stopped before zeroing out, so no ptr get
>> + * produced in the meantime, because this could result in waking
>> + * even though the ptr_ring is full.
>
> So what? Maybe it would be a bit suboptimal? But with your design, I do
> not get what prevents this:
>
>
> stopped? -> No
> ring is stopped
> discard
>
> and queue stays stopped forever
>
>
I totally missed this (but I am not sure why it did not happen in my
testing with different ptr_ring sizes..).
I guess you are right, there must be some type of locking.
It probably makes sense to lock the netdev txq->_xmit_lock whenever the
consumer invalidates old ptr_ring entries (so when r->consumer_head >=
r->consumer_tail). The producer holds this lock with dev->lltx=false. Then
the consumer is able to wake the queue safely.
So I would now just change the implementation to:
tun_net_xmit:
...
if ptr_ring_produce
// Could happen because of unproduce in vhost_net..
netif_tx_stop_queue
...
goto drop
if ptr_ring_full
netif_tx_stop_queue
...
tun_ring_recv/tap_do_read (the implementation for the batched methods
would be done in the similar way):
...
ptr_ring_consume
if r->consumer_head >= r->consumer_tail
__netif_tx_lock_bh
netif_tx_wake_queue
__netif_tx_unlock_bh
This implementation does not need any new ptr_ring helpers and no fancy
ordering tricks.
Would this implementation be sufficient in your opinion?
>> The order of the operations
>> + * is ensured by barrier().
>> + */
>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
>> + if (unlikely(will_invalidate)) {
>> + rcu_read_lock();
>> + dev = rcu_dereference(q->tap)->dev;
>> + txq = netdev_get_tx_queue(dev, q->queue_index);
>> + stopped = netif_tx_queue_stopped(txq);
>> + }
>> + barrier();
>> + __ptr_ring_discard_one(&q->ring, will_invalidate);
>> +
>> + if (unlikely(will_invalidate)) {
>> + if (stopped)
>> + netif_tx_wake_queue(txq);
>> + rcu_read_unlock();
>> + }
>
>
> After an entry is consumed, you can detect this by checking
>
> r->consumer_head >= r->consumer_tail
>
>
> so it seems you could keep calling regular ptr_ring_consume
> and check afterwards?
>
>
>
>
>> + spin_unlock(&q->ring.consumer_lock);
>> +
>> + return ptr;
>> +}
>> +
>> static ssize_t tap_do_read(struct tap_queue *q,
>> struct iov_iter *to,
>> int noblock, struct sk_buff *skb)
>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
>> TASK_INTERRUPTIBLE);
>>
>> /* Read frames from the queue */
>> - skb = ptr_ring_consume(&q->ring);
>> + skb = tap_ring_consume(q);
>> if (skb)
>> break;
>> if (noblock) {
>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
>> ret = ptr_ring_resize_multiple_bh(rings, n,
>> dev->tx_queue_len, GFP_KERNEL,
>> __skb_array_destroy_skb);
>> + if (netif_running(dev))
>> + netif_tx_wake_all_queues(dev);
>>
>> kfree(rings);
>> return ret;
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index c6b22af9bae8..682df8157b55 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>> return total;
>> }
>>
>> +static void *tun_ring_consume(struct tun_file *tfile)
>> +{
>> + struct netdev_queue *txq;
>> + struct net_device *dev;
>> + bool will_invalidate;
>> + bool stopped;
>> + void *ptr;
>> +
>> + spin_lock(&tfile->tx_ring.consumer_lock);
>> + ptr = __ptr_ring_peek(&tfile->tx_ring);
>> + if (!ptr) {
>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>> + return ptr;
>> + }
>> +
>> + /* Check if the queue stopped before zeroing out, so no ptr get
>> + * produced in the meantime, because this could result in waking
>> + * even though the ptr_ring is full. The order of the operations
>> + * is ensured by barrier().
>> + */
>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
>> + if (unlikely(will_invalidate)) {
>> + rcu_read_lock();
>> + dev = rcu_dereference(tfile->tun)->dev;
>> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
>> + stopped = netif_tx_queue_stopped(txq);
>> + }
>> + barrier();
>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
>> +
>> + if (unlikely(will_invalidate)) {
>> + if (stopped)
>> + netif_tx_wake_queue(txq);
>> + rcu_read_unlock();
>> + }
>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>> +
>> + return ptr;
>> +}
>> +
>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>> {
>> DECLARE_WAITQUEUE(wait, current);
>> void *ptr = NULL;
>> int error = 0;
>>
>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>> + ptr = tun_ring_consume(tfile);
>> if (ptr)
>> goto out;
>> if (noblock) {
>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>
>> while (1) {
>> set_current_state(TASK_INTERRUPTIBLE);
>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>> + ptr = tun_ring_consume(tfile);
>> if (ptr)
>> break;
>> if (signal_pending(current)) {
>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
>> dev->tx_queue_len, GFP_KERNEL,
>> tun_ptr_free);
>>
>> + if (netif_running(dev))
>> + netif_tx_wake_all_queues(dev);
>> +
>> kfree(rings);
>> return ret;
>> }
>> --
>> 2.43.0
>
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-24 5:56 ` Simon Schippers
@ 2025-09-24 6:55 ` Michael S. Tsirkin
2025-09-24 7:42 ` Simon Schippers
0 siblings, 1 reply; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 6:55 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote:
> On 23.09.25 18:36, Michael S. Tsirkin wrote:
> > On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
> >> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
> >> entry of the ptr_ring and then waking the netdev queue when entries got
> >> invalidated to be used again by the producer.
> >> To avoid waking the netdev queue when the ptr_ring is full, it is checked
> >> if the netdev queue is stopped before invalidating entries. Like that the
> >> netdev queue can be safely woken after invalidating entries.
> >>
> >> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
> >> __ptr_ring_produce within tun_net_xmit guarantees that the information
> >> about the netdev queue being stopped is visible after __ptr_ring_peek is
> >> called.
> >>
> >> The netdev queue is also woken after resizing the ptr_ring.
> >>
> >> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> >> ---
> >> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
> >> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
> >> 2 files changed, 88 insertions(+), 3 deletions(-)
> >>
> >> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
> >> index 1197f245e873..f8292721a9d6 100644
> >> --- a/drivers/net/tap.c
> >> +++ b/drivers/net/tap.c
> >> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
> >> return ret ? ret : total;
> >> }
> >>
> >> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
> >> +{
> >> + struct netdev_queue *txq;
> >> + struct net_device *dev;
> >> + bool will_invalidate;
> >> + bool stopped;
> >> + void *ptr;
> >> +
> >> + spin_lock(&q->ring.consumer_lock);
> >> + ptr = __ptr_ring_peek(&q->ring);
> >> + if (!ptr) {
> >> + spin_unlock(&q->ring.consumer_lock);
> >> + return ptr;
> >> + }
> >> +
> >> + /* Check if the queue stopped before zeroing out, so no ptr get
> >> + * produced in the meantime, because this could result in waking
> >> + * even though the ptr_ring is full.
> >
> > So what? Maybe it would be a bit suboptimal? But with your design, I do
> > not get what prevents this:
> >
> >
> > stopped? -> No
> > ring is stopped
> > discard
> >
> > and queue stays stopped forever
> >
> >
>
> I totally missed this (but I am not sure why it did not happen in my
> testing with different ptr_ring sizes..).
>
> I guess you are right, there must be some type of locking.
> It probably makes sense to lock the netdev txq->_xmit_lock whenever the
> consumer invalidates old ptr_ring entries (so when r->consumer_head >=
> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then
> the consumer is able to wake the queue safely.
>
> So I would now just change the implementation to:
> tun_net_xmit:
> ...
> if ptr_ring_produce
> // Could happen because of unproduce in vhost_net..
> netif_tx_stop_queue
> ...
> goto drop
>
> if ptr_ring_full
> netif_tx_stop_queue
> ...
>
> tun_ring_recv/tap_do_read (the implementation for the batched methods
> would be done in the similar way):
> ...
> ptr_ring_consume
> if r->consumer_head >= r->consumer_tail
> __netif_tx_lock_bh
> netif_tx_wake_queue
> __netif_tx_unlock_bh
>
> This implementation does not need any new ptr_ring helpers and no fancy
> ordering tricks.
> Would this implementation be sufficient in your opinion?
Maybe you mean == ? Pls don't poke at ptr ring internals though.
What are we testing for here?
I think the point is that a batch of entries was consumed?
Maybe __ptr_ring_consumed_batch ? and a comment explaining
this returns true when last successful call to consume
freed up a batch of space in the ring for producer to make
progress.
consumer_head == consumer_tail also happens rather a lot,
though thankfully not on every entry.
So taking tx lock each time this happens, even if queue
is not stopped, seems heavyweight.
> >> The order of the operations
> >> + * is ensured by barrier().
> >> + */
> >> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
> >> + if (unlikely(will_invalidate)) {
> >> + rcu_read_lock();
> >> + dev = rcu_dereference(q->tap)->dev;
> >> + txq = netdev_get_tx_queue(dev, q->queue_index);
> >> + stopped = netif_tx_queue_stopped(txq);
> >> + }
> >> + barrier();
> >> + __ptr_ring_discard_one(&q->ring, will_invalidate);
> >> +
> >> + if (unlikely(will_invalidate)) {
> >> + if (stopped)
> >> + netif_tx_wake_queue(txq);
> >> + rcu_read_unlock();
> >> + }
> >
> >
> > After an entry is consumed, you can detect this by checking
> >
> > r->consumer_head >= r->consumer_tail
> >
> >
> > so it seems you could keep calling regular ptr_ring_consume
> > and check afterwards?
> >
> >
> >
> >
> >> + spin_unlock(&q->ring.consumer_lock);
> >> +
> >> + return ptr;
> >> +}
> >> +
> >> static ssize_t tap_do_read(struct tap_queue *q,
> >> struct iov_iter *to,
> >> int noblock, struct sk_buff *skb)
> >> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
> >> TASK_INTERRUPTIBLE);
> >>
> >> /* Read frames from the queue */
> >> - skb = ptr_ring_consume(&q->ring);
> >> + skb = tap_ring_consume(q);
> >> if (skb)
> >> break;
> >> if (noblock) {
> >> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
> >> ret = ptr_ring_resize_multiple_bh(rings, n,
> >> dev->tx_queue_len, GFP_KERNEL,
> >> __skb_array_destroy_skb);
> >> + if (netif_running(dev))
> >> + netif_tx_wake_all_queues(dev);
> >>
> >> kfree(rings);
> >> return ret;
> >> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> >> index c6b22af9bae8..682df8157b55 100644
> >> --- a/drivers/net/tun.c
> >> +++ b/drivers/net/tun.c
> >> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
> >> return total;
> >> }
> >>
> >> +static void *tun_ring_consume(struct tun_file *tfile)
> >> +{
> >> + struct netdev_queue *txq;
> >> + struct net_device *dev;
> >> + bool will_invalidate;
> >> + bool stopped;
> >> + void *ptr;
> >> +
> >> + spin_lock(&tfile->tx_ring.consumer_lock);
> >> + ptr = __ptr_ring_peek(&tfile->tx_ring);
> >> + if (!ptr) {
> >> + spin_unlock(&tfile->tx_ring.consumer_lock);
> >> + return ptr;
> >> + }
> >> +
> >> + /* Check if the queue stopped before zeroing out, so no ptr get
> >> + * produced in the meantime, because this could result in waking
> >> + * even though the ptr_ring is full. The order of the operations
> >> + * is ensured by barrier().
> >> + */
> >> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
> >> + if (unlikely(will_invalidate)) {
> >> + rcu_read_lock();
> >> + dev = rcu_dereference(tfile->tun)->dev;
> >> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
> >> + stopped = netif_tx_queue_stopped(txq);
> >> + }
> >> + barrier();
> >> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
> >> +
> >> + if (unlikely(will_invalidate)) {
> >> + if (stopped)
> >> + netif_tx_wake_queue(txq);
> >> + rcu_read_unlock();
> >> + }
> >> + spin_unlock(&tfile->tx_ring.consumer_lock);
> >> +
> >> + return ptr;
> >> +}
> >> +
> >> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> >> {
> >> DECLARE_WAITQUEUE(wait, current);
> >> void *ptr = NULL;
> >> int error = 0;
> >>
> >> - ptr = ptr_ring_consume(&tfile->tx_ring);
> >> + ptr = tun_ring_consume(tfile);
> >> if (ptr)
> >> goto out;
> >> if (noblock) {
> >> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> >>
> >> while (1) {
> >> set_current_state(TASK_INTERRUPTIBLE);
> >> - ptr = ptr_ring_consume(&tfile->tx_ring);
> >> + ptr = tun_ring_consume(tfile);
> >> if (ptr)
> >> break;
> >> if (signal_pending(current)) {
> >> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
> >> dev->tx_queue_len, GFP_KERNEL,
> >> tun_ptr_free);
> >>
> >> + if (netif_running(dev))
> >> + netif_tx_wake_all_queues(dev);
> >> +
> >> kfree(rings);
> >> return ret;
> >> }
> >> --
> >> 2.43.0
> >
^ permalink raw reply [flat|nested] 37+ messages in thread* [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-24 6:55 ` Michael S. Tsirkin
@ 2025-09-24 7:42 ` Simon Schippers
2025-09-24 7:49 ` Michael S. Tsirkin
0 siblings, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-24 7:42 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On 24.09.25 08:55, Michael S. Tsirkin wrote:
> On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote:
>> On 23.09.25 18:36, Michael S. Tsirkin wrote:
>>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
>>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
>>>> entry of the ptr_ring and then waking the netdev queue when entries got
>>>> invalidated to be used again by the producer.
>>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
>>>> if the netdev queue is stopped before invalidating entries. Like that the
>>>> netdev queue can be safely woken after invalidating entries.
>>>>
>>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
>>>> __ptr_ring_produce within tun_net_xmit guarantees that the information
>>>> about the netdev queue being stopped is visible after __ptr_ring_peek is
>>>> called.
>>>>
>>>> The netdev queue is also woken after resizing the ptr_ring.
>>>>
>>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>>>> ---
>>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
>>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
>>>> 2 files changed, 88 insertions(+), 3 deletions(-)
>>>>
>>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
>>>> index 1197f245e873..f8292721a9d6 100644
>>>> --- a/drivers/net/tap.c
>>>> +++ b/drivers/net/tap.c
>>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
>>>> return ret ? ret : total;
>>>> }
>>>>
>>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
>>>> +{
>>>> + struct netdev_queue *txq;
>>>> + struct net_device *dev;
>>>> + bool will_invalidate;
>>>> + bool stopped;
>>>> + void *ptr;
>>>> +
>>>> + spin_lock(&q->ring.consumer_lock);
>>>> + ptr = __ptr_ring_peek(&q->ring);
>>>> + if (!ptr) {
>>>> + spin_unlock(&q->ring.consumer_lock);
>>>> + return ptr;
>>>> + }
>>>> +
>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
>>>> + * produced in the meantime, because this could result in waking
>>>> + * even though the ptr_ring is full.
>>>
>>> So what? Maybe it would be a bit suboptimal? But with your design, I do
>>> not get what prevents this:
>>>
>>>
>>> stopped? -> No
>>> ring is stopped
>>> discard
>>>
>>> and queue stays stopped forever
>>>
>>>
>>
>> I totally missed this (but I am not sure why it did not happen in my
>> testing with different ptr_ring sizes..).
>>
>> I guess you are right, there must be some type of locking.
>> It probably makes sense to lock the netdev txq->_xmit_lock whenever the
>> consumer invalidates old ptr_ring entries (so when r->consumer_head >=
>> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then
>> the consumer is able to wake the queue safely.
>>
>> So I would now just change the implementation to:
>> tun_net_xmit:
>> ...
>> if ptr_ring_produce
>> // Could happen because of unproduce in vhost_net..
>> netif_tx_stop_queue
>> ...
>> goto drop
>>
>> if ptr_ring_full
>> netif_tx_stop_queue
>> ...
>>
>> tun_ring_recv/tap_do_read (the implementation for the batched methods
>> would be done in the similar way):
>> ...
>> ptr_ring_consume
>> if r->consumer_head >= r->consumer_tail
>> __netif_tx_lock_bh
>> netif_tx_wake_queue
>> __netif_tx_unlock_bh
>>
>> This implementation does not need any new ptr_ring helpers and no fancy
>> ordering tricks.
>> Would this implementation be sufficient in your opinion?
>
>
> Maybe you mean == ? Pls don't poke at ptr ring internals though.
> What are we testing for here?
> I think the point is that a batch of entries was consumed?
> Maybe __ptr_ring_consumed_batch ? and a comment explaining
> this returns true when last successful call to consume
> freed up a batch of space in the ring for producer to make
> progress.
>
Yes, I mean ==.
Having a dedicated helper for this purpose makes sense. I just find
the name __ptr_ring_consumed_batch a bit confusing next to
__ptr_ring_consume_batched, since they both refer to different kinds of
batches.
>
> consumer_head == consumer_tail also happens rather a lot,
> though thankfully not on every entry.
> So taking tx lock each time this happens, even if queue
> is not stopped, seems heavyweight.
>
>
Yes, I agree — but avoiding locking probably requires some fancy
ordering tricks again..
>
>
>
>>>> The order of the operations
>>>> + * is ensured by barrier().
>>>> + */
>>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
>>>> + if (unlikely(will_invalidate)) {
>>>> + rcu_read_lock();
>>>> + dev = rcu_dereference(q->tap)->dev;
>>>> + txq = netdev_get_tx_queue(dev, q->queue_index);
>>>> + stopped = netif_tx_queue_stopped(txq);
>>>> + }
>>>> + barrier();
>>>> + __ptr_ring_discard_one(&q->ring, will_invalidate);
>>>> +
>>>> + if (unlikely(will_invalidate)) {
>>>> + if (stopped)
>>>> + netif_tx_wake_queue(txq);
>>>> + rcu_read_unlock();
>>>> + }
>>>
>>>
>>> After an entry is consumed, you can detect this by checking
>>>
>>> r->consumer_head >= r->consumer_tail
>>>
>>>
>>> so it seems you could keep calling regular ptr_ring_consume
>>> and check afterwards?
>>>
>>>
>>>
>>>
>>>> + spin_unlock(&q->ring.consumer_lock);
>>>> +
>>>> + return ptr;
>>>> +}
>>>> +
>>>> static ssize_t tap_do_read(struct tap_queue *q,
>>>> struct iov_iter *to,
>>>> int noblock, struct sk_buff *skb)
>>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
>>>> TASK_INTERRUPTIBLE);
>>>>
>>>> /* Read frames from the queue */
>>>> - skb = ptr_ring_consume(&q->ring);
>>>> + skb = tap_ring_consume(q);
>>>> if (skb)
>>>> break;
>>>> if (noblock) {
>>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
>>>> ret = ptr_ring_resize_multiple_bh(rings, n,
>>>> dev->tx_queue_len, GFP_KERNEL,
>>>> __skb_array_destroy_skb);
>>>> + if (netif_running(dev))
>>>> + netif_tx_wake_all_queues(dev);
>>>>
>>>> kfree(rings);
>>>> return ret;
>>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>>>> index c6b22af9bae8..682df8157b55 100644
>>>> --- a/drivers/net/tun.c
>>>> +++ b/drivers/net/tun.c
>>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>>>> return total;
>>>> }
>>>>
>>>> +static void *tun_ring_consume(struct tun_file *tfile)
>>>> +{
>>>> + struct netdev_queue *txq;
>>>> + struct net_device *dev;
>>>> + bool will_invalidate;
>>>> + bool stopped;
>>>> + void *ptr;
>>>> +
>>>> + spin_lock(&tfile->tx_ring.consumer_lock);
>>>> + ptr = __ptr_ring_peek(&tfile->tx_ring);
>>>> + if (!ptr) {
>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>>>> + return ptr;
>>>> + }
>>>> +
>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
>>>> + * produced in the meantime, because this could result in waking
>>>> + * even though the ptr_ring is full. The order of the operations
>>>> + * is ensured by barrier().
>>>> + */
>>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
>>>> + if (unlikely(will_invalidate)) {
>>>> + rcu_read_lock();
>>>> + dev = rcu_dereference(tfile->tun)->dev;
>>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
>>>> + stopped = netif_tx_queue_stopped(txq);
>>>> + }
>>>> + barrier();
>>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
>>>> +
>>>> + if (unlikely(will_invalidate)) {
>>>> + if (stopped)
>>>> + netif_tx_wake_queue(txq);
>>>> + rcu_read_unlock();
>>>> + }
>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>>>> +
>>>> + return ptr;
>>>> +}
>>>> +
>>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>>> {
>>>> DECLARE_WAITQUEUE(wait, current);
>>>> void *ptr = NULL;
>>>> int error = 0;
>>>>
>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>>>> + ptr = tun_ring_consume(tfile);
>>>> if (ptr)
>>>> goto out;
>>>> if (noblock) {
>>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>>>
>>>> while (1) {
>>>> set_current_state(TASK_INTERRUPTIBLE);
>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>>>> + ptr = tun_ring_consume(tfile);
>>>> if (ptr)
>>>> break;
>>>> if (signal_pending(current)) {
>>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
>>>> dev->tx_queue_len, GFP_KERNEL,
>>>> tun_ptr_free);
>>>>
>>>> + if (netif_running(dev))
>>>> + netif_tx_wake_all_queues(dev);
>>>> +
>>>> kfree(rings);
>>>> return ret;
>>>> }
>>>> --
>>>> 2.43.0
>>>
>
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-24 7:42 ` Simon Schippers
@ 2025-09-24 7:49 ` Michael S. Tsirkin
2025-09-24 8:40 ` Simon Schippers
0 siblings, 1 reply; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 7:49 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Wed, Sep 24, 2025 at 09:42:45AM +0200, Simon Schippers wrote:
> On 24.09.25 08:55, Michael S. Tsirkin wrote:
> > On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote:
> >> On 23.09.25 18:36, Michael S. Tsirkin wrote:
> >>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
> >>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
> >>>> entry of the ptr_ring and then waking the netdev queue when entries got
> >>>> invalidated to be used again by the producer.
> >>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
> >>>> if the netdev queue is stopped before invalidating entries. Like that the
> >>>> netdev queue can be safely woken after invalidating entries.
> >>>>
> >>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
> >>>> __ptr_ring_produce within tun_net_xmit guarantees that the information
> >>>> about the netdev queue being stopped is visible after __ptr_ring_peek is
> >>>> called.
> >>>>
> >>>> The netdev queue is also woken after resizing the ptr_ring.
> >>>>
> >>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> >>>> ---
> >>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
> >>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
> >>>> 2 files changed, 88 insertions(+), 3 deletions(-)
> >>>>
> >>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
> >>>> index 1197f245e873..f8292721a9d6 100644
> >>>> --- a/drivers/net/tap.c
> >>>> +++ b/drivers/net/tap.c
> >>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
> >>>> return ret ? ret : total;
> >>>> }
> >>>>
> >>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
> >>>> +{
> >>>> + struct netdev_queue *txq;
> >>>> + struct net_device *dev;
> >>>> + bool will_invalidate;
> >>>> + bool stopped;
> >>>> + void *ptr;
> >>>> +
> >>>> + spin_lock(&q->ring.consumer_lock);
> >>>> + ptr = __ptr_ring_peek(&q->ring);
> >>>> + if (!ptr) {
> >>>> + spin_unlock(&q->ring.consumer_lock);
> >>>> + return ptr;
> >>>> + }
> >>>> +
> >>>> + /* Check if the queue stopped before zeroing out, so no ptr get
> >>>> + * produced in the meantime, because this could result in waking
> >>>> + * even though the ptr_ring is full.
> >>>
> >>> So what? Maybe it would be a bit suboptimal? But with your design, I do
> >>> not get what prevents this:
> >>>
> >>>
> >>> stopped? -> No
> >>> ring is stopped
> >>> discard
> >>>
> >>> and queue stays stopped forever
> >>>
> >>>
> >>
> >> I totally missed this (but I am not sure why it did not happen in my
> >> testing with different ptr_ring sizes..).
> >>
> >> I guess you are right, there must be some type of locking.
> >> It probably makes sense to lock the netdev txq->_xmit_lock whenever the
> >> consumer invalidates old ptr_ring entries (so when r->consumer_head >=
> >> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then
> >> the consumer is able to wake the queue safely.
> >>
> >> So I would now just change the implementation to:
> >> tun_net_xmit:
> >> ...
> >> if ptr_ring_produce
> >> // Could happen because of unproduce in vhost_net..
> >> netif_tx_stop_queue
> >> ...
> >> goto drop
> >>
> >> if ptr_ring_full
> >> netif_tx_stop_queue
> >> ...
> >>
> >> tun_ring_recv/tap_do_read (the implementation for the batched methods
> >> would be done in the similar way):
> >> ...
> >> ptr_ring_consume
> >> if r->consumer_head >= r->consumer_tail
> >> __netif_tx_lock_bh
> >> netif_tx_wake_queue
> >> __netif_tx_unlock_bh
> >>
> >> This implementation does not need any new ptr_ring helpers and no fancy
> >> ordering tricks.
> >> Would this implementation be sufficient in your opinion?
> >
> >
> > Maybe you mean == ? Pls don't poke at ptr ring internals though.
> > What are we testing for here?
> > I think the point is that a batch of entries was consumed?
> > Maybe __ptr_ring_consumed_batch ? and a comment explaining
> > this returns true when last successful call to consume
> > freed up a batch of space in the ring for producer to make
> > progress.
> >
>
> Yes, I mean ==.
>
> Having a dedicated helper for this purpose makes sense. I just find
> the name __ptr_ring_consumed_batch a bit confusing next to
> __ptr_ring_consume_batched, since they both refer to different kinds of
> batches.
__ptr_ring_consume_created_space ?
/* Previous call to ptr_ring_consume created some space.
*
* NB: only refers to the last call to __ptr_ring_consume,
* if you are calling ptr_ring_consume multiple times, you
* have to check this multiple times.
* Accordingly, do not use this after __ptr_ring_consume_batched.
*/
> >
> > consumer_head == consumer_tail also happens rather a lot,
> > though thankfully not on every entry.
> > So taking tx lock each time this happens, even if queue
> > is not stopped, seems heavyweight.
> >
> >
>
> Yes, I agree — but avoiding locking probably requires some fancy
> ordering tricks again..
>
>
> >
> >
> >
> >>>> The order of the operations
> >>>> + * is ensured by barrier().
> >>>> + */
> >>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
> >>>> + if (unlikely(will_invalidate)) {
> >>>> + rcu_read_lock();
> >>>> + dev = rcu_dereference(q->tap)->dev;
> >>>> + txq = netdev_get_tx_queue(dev, q->queue_index);
> >>>> + stopped = netif_tx_queue_stopped(txq);
> >>>> + }
> >>>> + barrier();
> >>>> + __ptr_ring_discard_one(&q->ring, will_invalidate);
> >>>> +
> >>>> + if (unlikely(will_invalidate)) {
> >>>> + if (stopped)
> >>>> + netif_tx_wake_queue(txq);
> >>>> + rcu_read_unlock();
> >>>> + }
> >>>
> >>>
> >>> After an entry is consumed, you can detect this by checking
> >>>
> >>> r->consumer_head >= r->consumer_tail
> >>>
> >>>
> >>> so it seems you could keep calling regular ptr_ring_consume
> >>> and check afterwards?
> >>>
> >>>
> >>>
> >>>
> >>>> + spin_unlock(&q->ring.consumer_lock);
> >>>> +
> >>>> + return ptr;
> >>>> +}
> >>>> +
> >>>> static ssize_t tap_do_read(struct tap_queue *q,
> >>>> struct iov_iter *to,
> >>>> int noblock, struct sk_buff *skb)
> >>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
> >>>> TASK_INTERRUPTIBLE);
> >>>>
> >>>> /* Read frames from the queue */
> >>>> - skb = ptr_ring_consume(&q->ring);
> >>>> + skb = tap_ring_consume(q);
> >>>> if (skb)
> >>>> break;
> >>>> if (noblock) {
> >>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
> >>>> ret = ptr_ring_resize_multiple_bh(rings, n,
> >>>> dev->tx_queue_len, GFP_KERNEL,
> >>>> __skb_array_destroy_skb);
> >>>> + if (netif_running(dev))
> >>>> + netif_tx_wake_all_queues(dev);
> >>>>
> >>>> kfree(rings);
> >>>> return ret;
> >>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> >>>> index c6b22af9bae8..682df8157b55 100644
> >>>> --- a/drivers/net/tun.c
> >>>> +++ b/drivers/net/tun.c
> >>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
> >>>> return total;
> >>>> }
> >>>>
> >>>> +static void *tun_ring_consume(struct tun_file *tfile)
> >>>> +{
> >>>> + struct netdev_queue *txq;
> >>>> + struct net_device *dev;
> >>>> + bool will_invalidate;
> >>>> + bool stopped;
> >>>> + void *ptr;
> >>>> +
> >>>> + spin_lock(&tfile->tx_ring.consumer_lock);
> >>>> + ptr = __ptr_ring_peek(&tfile->tx_ring);
> >>>> + if (!ptr) {
> >>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
> >>>> + return ptr;
> >>>> + }
> >>>> +
> >>>> + /* Check if the queue stopped before zeroing out, so no ptr get
> >>>> + * produced in the meantime, because this could result in waking
> >>>> + * even though the ptr_ring is full. The order of the operations
> >>>> + * is ensured by barrier().
> >>>> + */
> >>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
> >>>> + if (unlikely(will_invalidate)) {
> >>>> + rcu_read_lock();
> >>>> + dev = rcu_dereference(tfile->tun)->dev;
> >>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
> >>>> + stopped = netif_tx_queue_stopped(txq);
> >>>> + }
> >>>> + barrier();
> >>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
> >>>> +
> >>>> + if (unlikely(will_invalidate)) {
> >>>> + if (stopped)
> >>>> + netif_tx_wake_queue(txq);
> >>>> + rcu_read_unlock();
> >>>> + }
> >>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
> >>>> +
> >>>> + return ptr;
> >>>> +}
> >>>> +
> >>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> >>>> {
> >>>> DECLARE_WAITQUEUE(wait, current);
> >>>> void *ptr = NULL;
> >>>> int error = 0;
> >>>>
> >>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
> >>>> + ptr = tun_ring_consume(tfile);
> >>>> if (ptr)
> >>>> goto out;
> >>>> if (noblock) {
> >>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> >>>>
> >>>> while (1) {
> >>>> set_current_state(TASK_INTERRUPTIBLE);
> >>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
> >>>> + ptr = tun_ring_consume(tfile);
> >>>> if (ptr)
> >>>> break;
> >>>> if (signal_pending(current)) {
> >>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
> >>>> dev->tx_queue_len, GFP_KERNEL,
> >>>> tun_ptr_free);
> >>>>
> >>>> + if (netif_running(dev))
> >>>> + netif_tx_wake_all_queues(dev);
> >>>> +
> >>>> kfree(rings);
> >>>> return ret;
> >>>> }
> >>>> --
> >>>> 2.43.0
> >>>
> >
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-24 7:49 ` Michael S. Tsirkin
@ 2025-09-24 8:40 ` Simon Schippers
2025-09-24 9:00 ` Michael S. Tsirkin
0 siblings, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-24 8:40 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On 24.09.25 09:49, Michael S. Tsirkin wrote:
> On Wed, Sep 24, 2025 at 09:42:45AM +0200, Simon Schippers wrote:
>> On 24.09.25 08:55, Michael S. Tsirkin wrote:
>>> On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote:
>>>> On 23.09.25 18:36, Michael S. Tsirkin wrote:
>>>>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
>>>>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
>>>>>> entry of the ptr_ring and then waking the netdev queue when entries got
>>>>>> invalidated to be used again by the producer.
>>>>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
>>>>>> if the netdev queue is stopped before invalidating entries. Like that the
>>>>>> netdev queue can be safely woken after invalidating entries.
>>>>>>
>>>>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
>>>>>> __ptr_ring_produce within tun_net_xmit guarantees that the information
>>>>>> about the netdev queue being stopped is visible after __ptr_ring_peek is
>>>>>> called.
>>>>>>
>>>>>> The netdev queue is also woken after resizing the ptr_ring.
>>>>>>
>>>>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>>>>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>>>>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>>>>>> ---
>>>>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
>>>>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
>>>>>> 2 files changed, 88 insertions(+), 3 deletions(-)
>>>>>>
>>>>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
>>>>>> index 1197f245e873..f8292721a9d6 100644
>>>>>> --- a/drivers/net/tap.c
>>>>>> +++ b/drivers/net/tap.c
>>>>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
>>>>>> return ret ? ret : total;
>>>>>> }
>>>>>>
>>>>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
>>>>>> +{
>>>>>> + struct netdev_queue *txq;
>>>>>> + struct net_device *dev;
>>>>>> + bool will_invalidate;
>>>>>> + bool stopped;
>>>>>> + void *ptr;
>>>>>> +
>>>>>> + spin_lock(&q->ring.consumer_lock);
>>>>>> + ptr = __ptr_ring_peek(&q->ring);
>>>>>> + if (!ptr) {
>>>>>> + spin_unlock(&q->ring.consumer_lock);
>>>>>> + return ptr;
>>>>>> + }
>>>>>> +
>>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
>>>>>> + * produced in the meantime, because this could result in waking
>>>>>> + * even though the ptr_ring is full.
>>>>>
>>>>> So what? Maybe it would be a bit suboptimal? But with your design, I do
>>>>> not get what prevents this:
>>>>>
>>>>>
>>>>> stopped? -> No
>>>>> ring is stopped
>>>>> discard
>>>>>
>>>>> and queue stays stopped forever
>>>>>
>>>>>
>>>>
>>>> I totally missed this (but I am not sure why it did not happen in my
>>>> testing with different ptr_ring sizes..).
>>>>
>>>> I guess you are right, there must be some type of locking.
>>>> It probably makes sense to lock the netdev txq->_xmit_lock whenever the
>>>> consumer invalidates old ptr_ring entries (so when r->consumer_head >=
>>>> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then
>>>> the consumer is able to wake the queue safely.
>>>>
>>>> So I would now just change the implementation to:
>>>> tun_net_xmit:
>>>> ...
>>>> if ptr_ring_produce
>>>> // Could happen because of unproduce in vhost_net..
>>>> netif_tx_stop_queue
>>>> ...
>>>> goto drop
>>>>
>>>> if ptr_ring_full
>>>> netif_tx_stop_queue
>>>> ...
>>>>
>>>> tun_ring_recv/tap_do_read (the implementation for the batched methods
>>>> would be done in the similar way):
>>>> ...
>>>> ptr_ring_consume
>>>> if r->consumer_head >= r->consumer_tail
>>>> __netif_tx_lock_bh
>>>> netif_tx_wake_queue
>>>> __netif_tx_unlock_bh
>>>>
>>>> This implementation does not need any new ptr_ring helpers and no fancy
>>>> ordering tricks.
>>>> Would this implementation be sufficient in your opinion?
>>>
>>>
>>> Maybe you mean == ? Pls don't poke at ptr ring internals though.
>>> What are we testing for here?
>>> I think the point is that a batch of entries was consumed?
>>> Maybe __ptr_ring_consumed_batch ? and a comment explaining
>>> this returns true when last successful call to consume
>>> freed up a batch of space in the ring for producer to make
>>> progress.
>>>
>>
>> Yes, I mean ==.
>>
>> Having a dedicated helper for this purpose makes sense. I just find
>> the name __ptr_ring_consumed_batch a bit confusing next to
>> __ptr_ring_consume_batched, since they both refer to different kinds of
>> batches.
>
> __ptr_ring_consume_created_space ?
>
> /* Previous call to ptr_ring_consume created some space.
> *
> * NB: only refers to the last call to __ptr_ring_consume,
> * if you are calling ptr_ring_consume multiple times, you
> * have to check this multiple times.
> * Accordingly, do not use this after __ptr_ring_consume_batched.
> */
>
Sounds good.
Regarding __ptr_ring_consume_batched:
Theoretically the consumer_tail before and after calling the method could
be compared to avoid calling __ptr_ring_consume_created_space at each
iteration. But I guess it is also fine calling it at each iteration.
>>>
>>> consumer_head == consumer_tail also happens rather a lot,
>>> though thankfully not on every entry.
>>> So taking tx lock each time this happens, even if queue
>>> is not stopped, seems heavyweight.
>>>
>>>
>>
>> Yes, I agree — but avoiding locking probably requires some fancy
>> ordering tricks again..
>>
>>
>>>
>>>
>>>
>>>>>> The order of the operations
>>>>>> + * is ensured by barrier().
>>>>>> + */
>>>>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
>>>>>> + if (unlikely(will_invalidate)) {
>>>>>> + rcu_read_lock();
>>>>>> + dev = rcu_dereference(q->tap)->dev;
>>>>>> + txq = netdev_get_tx_queue(dev, q->queue_index);
>>>>>> + stopped = netif_tx_queue_stopped(txq);
>>>>>> + }
>>>>>> + barrier();
>>>>>> + __ptr_ring_discard_one(&q->ring, will_invalidate);
>>>>>> +
>>>>>> + if (unlikely(will_invalidate)) {
>>>>>> + if (stopped)
>>>>>> + netif_tx_wake_queue(txq);
>>>>>> + rcu_read_unlock();
>>>>>> + }
>>>>>
>>>>>
>>>>> After an entry is consumed, you can detect this by checking
>>>>>
>>>>> r->consumer_head >= r->consumer_tail
>>>>>
>>>>>
>>>>> so it seems you could keep calling regular ptr_ring_consume
>>>>> and check afterwards?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>> + spin_unlock(&q->ring.consumer_lock);
>>>>>> +
>>>>>> + return ptr;
>>>>>> +}
>>>>>> +
>>>>>> static ssize_t tap_do_read(struct tap_queue *q,
>>>>>> struct iov_iter *to,
>>>>>> int noblock, struct sk_buff *skb)
>>>>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
>>>>>> TASK_INTERRUPTIBLE);
>>>>>>
>>>>>> /* Read frames from the queue */
>>>>>> - skb = ptr_ring_consume(&q->ring);
>>>>>> + skb = tap_ring_consume(q);
>>>>>> if (skb)
>>>>>> break;
>>>>>> if (noblock) {
>>>>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
>>>>>> ret = ptr_ring_resize_multiple_bh(rings, n,
>>>>>> dev->tx_queue_len, GFP_KERNEL,
>>>>>> __skb_array_destroy_skb);
>>>>>> + if (netif_running(dev))
>>>>>> + netif_tx_wake_all_queues(dev);
>>>>>>
>>>>>> kfree(rings);
>>>>>> return ret;
>>>>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>>>>>> index c6b22af9bae8..682df8157b55 100644
>>>>>> --- a/drivers/net/tun.c
>>>>>> +++ b/drivers/net/tun.c
>>>>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>>>>>> return total;
>>>>>> }
>>>>>>
>>>>>> +static void *tun_ring_consume(struct tun_file *tfile)
>>>>>> +{
>>>>>> + struct netdev_queue *txq;
>>>>>> + struct net_device *dev;
>>>>>> + bool will_invalidate;
>>>>>> + bool stopped;
>>>>>> + void *ptr;
>>>>>> +
>>>>>> + spin_lock(&tfile->tx_ring.consumer_lock);
>>>>>> + ptr = __ptr_ring_peek(&tfile->tx_ring);
>>>>>> + if (!ptr) {
>>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>>>>>> + return ptr;
>>>>>> + }
>>>>>> +
>>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
>>>>>> + * produced in the meantime, because this could result in waking
>>>>>> + * even though the ptr_ring is full. The order of the operations
>>>>>> + * is ensured by barrier().
>>>>>> + */
>>>>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
>>>>>> + if (unlikely(will_invalidate)) {
>>>>>> + rcu_read_lock();
>>>>>> + dev = rcu_dereference(tfile->tun)->dev;
>>>>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
>>>>>> + stopped = netif_tx_queue_stopped(txq);
>>>>>> + }
>>>>>> + barrier();
>>>>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
>>>>>> +
>>>>>> + if (unlikely(will_invalidate)) {
>>>>>> + if (stopped)
>>>>>> + netif_tx_wake_queue(txq);
>>>>>> + rcu_read_unlock();
>>>>>> + }
>>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>>>>>> +
>>>>>> + return ptr;
>>>>>> +}
>>>>>> +
>>>>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>>>>> {
>>>>>> DECLARE_WAITQUEUE(wait, current);
>>>>>> void *ptr = NULL;
>>>>>> int error = 0;
>>>>>>
>>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>>>>>> + ptr = tun_ring_consume(tfile);
>>>>>> if (ptr)
>>>>>> goto out;
>>>>>> if (noblock) {
>>>>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>>>>>
>>>>>> while (1) {
>>>>>> set_current_state(TASK_INTERRUPTIBLE);
>>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>>>>>> + ptr = tun_ring_consume(tfile);
>>>>>> if (ptr)
>>>>>> break;
>>>>>> if (signal_pending(current)) {
>>>>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
>>>>>> dev->tx_queue_len, GFP_KERNEL,
>>>>>> tun_ptr_free);
>>>>>>
>>>>>> + if (netif_running(dev))
>>>>>> + netif_tx_wake_all_queues(dev);
>>>>>> +
>>>>>> kfree(rings);
>>>>>> return ret;
>>>>>> }
>>>>>> --
>>>>>> 2.43.0
>>>>>
>>>
>
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-24 8:40 ` Simon Schippers
@ 2025-09-24 9:00 ` Michael S. Tsirkin
0 siblings, 0 replies; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 9:00 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Wed, Sep 24, 2025 at 10:40:04AM +0200, Simon Schippers wrote:
> On 24.09.25 09:49, Michael S. Tsirkin wrote:
> > On Wed, Sep 24, 2025 at 09:42:45AM +0200, Simon Schippers wrote:
> >> On 24.09.25 08:55, Michael S. Tsirkin wrote:
> >>> On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote:
> >>>> On 23.09.25 18:36, Michael S. Tsirkin wrote:
> >>>>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
> >>>>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
> >>>>>> entry of the ptr_ring and then waking the netdev queue when entries got
> >>>>>> invalidated to be used again by the producer.
> >>>>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
> >>>>>> if the netdev queue is stopped before invalidating entries. Like that the
> >>>>>> netdev queue can be safely woken after invalidating entries.
> >>>>>>
> >>>>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
> >>>>>> __ptr_ring_produce within tun_net_xmit guarantees that the information
> >>>>>> about the netdev queue being stopped is visible after __ptr_ring_peek is
> >>>>>> called.
> >>>>>>
> >>>>>> The netdev queue is also woken after resizing the ptr_ring.
> >>>>>>
> >>>>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >>>>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >>>>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> >>>>>> ---
> >>>>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
> >>>>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
> >>>>>> 2 files changed, 88 insertions(+), 3 deletions(-)
> >>>>>>
> >>>>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
> >>>>>> index 1197f245e873..f8292721a9d6 100644
> >>>>>> --- a/drivers/net/tap.c
> >>>>>> +++ b/drivers/net/tap.c
> >>>>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
> >>>>>> return ret ? ret : total;
> >>>>>> }
> >>>>>>
> >>>>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
> >>>>>> +{
> >>>>>> + struct netdev_queue *txq;
> >>>>>> + struct net_device *dev;
> >>>>>> + bool will_invalidate;
> >>>>>> + bool stopped;
> >>>>>> + void *ptr;
> >>>>>> +
> >>>>>> + spin_lock(&q->ring.consumer_lock);
> >>>>>> + ptr = __ptr_ring_peek(&q->ring);
> >>>>>> + if (!ptr) {
> >>>>>> + spin_unlock(&q->ring.consumer_lock);
> >>>>>> + return ptr;
> >>>>>> + }
> >>>>>> +
> >>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
> >>>>>> + * produced in the meantime, because this could result in waking
> >>>>>> + * even though the ptr_ring is full.
> >>>>>
> >>>>> So what? Maybe it would be a bit suboptimal? But with your design, I do
> >>>>> not get what prevents this:
> >>>>>
> >>>>>
> >>>>> stopped? -> No
> >>>>> ring is stopped
> >>>>> discard
> >>>>>
> >>>>> and queue stays stopped forever
> >>>>>
> >>>>>
> >>>>
> >>>> I totally missed this (but I am not sure why it did not happen in my
> >>>> testing with different ptr_ring sizes..).
> >>>>
> >>>> I guess you are right, there must be some type of locking.
> >>>> It probably makes sense to lock the netdev txq->_xmit_lock whenever the
> >>>> consumer invalidates old ptr_ring entries (so when r->consumer_head >=
> >>>> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then
> >>>> the consumer is able to wake the queue safely.
> >>>>
> >>>> So I would now just change the implementation to:
> >>>> tun_net_xmit:
> >>>> ...
> >>>> if ptr_ring_produce
> >>>> // Could happen because of unproduce in vhost_net..
> >>>> netif_tx_stop_queue
> >>>> ...
> >>>> goto drop
> >>>>
> >>>> if ptr_ring_full
> >>>> netif_tx_stop_queue
> >>>> ...
> >>>>
> >>>> tun_ring_recv/tap_do_read (the implementation for the batched methods
> >>>> would be done in the similar way):
> >>>> ...
> >>>> ptr_ring_consume
> >>>> if r->consumer_head >= r->consumer_tail
> >>>> __netif_tx_lock_bh
> >>>> netif_tx_wake_queue
> >>>> __netif_tx_unlock_bh
> >>>>
> >>>> This implementation does not need any new ptr_ring helpers and no fancy
> >>>> ordering tricks.
> >>>> Would this implementation be sufficient in your opinion?
> >>>
> >>>
> >>> Maybe you mean == ? Pls don't poke at ptr ring internals though.
> >>> What are we testing for here?
> >>> I think the point is that a batch of entries was consumed?
> >>> Maybe __ptr_ring_consumed_batch ? and a comment explaining
> >>> this returns true when last successful call to consume
> >>> freed up a batch of space in the ring for producer to make
> >>> progress.
> >>>
> >>
> >> Yes, I mean ==.
> >>
> >> Having a dedicated helper for this purpose makes sense. I just find
> >> the name __ptr_ring_consumed_batch a bit confusing next to
> >> __ptr_ring_consume_batched, since they both refer to different kinds of
> >> batches.
> >
> > __ptr_ring_consume_created_space ?
> >
> > /* Previous call to ptr_ring_consume created some space.
> > *
> > * NB: only refers to the last call to __ptr_ring_consume,
> > * if you are calling ptr_ring_consume multiple times, you
> > * have to check this multiple times.
> > * Accordingly, do not use this after __ptr_ring_consume_batched.
> > */
> >
>
> Sounds good.
>
> Regarding __ptr_ring_consume_batched:
> Theoretically the consumer_tail before and after calling the method could
> be compared to avoid calling __ptr_ring_consume_created_space at each
> iteration. But I guess it is also fine calling it at each iteration.
Hmm good point, though I worry about wrap-around a bit.
> >>>
> >>> consumer_head == consumer_tail also happens rather a lot,
> >>> though thankfully not on every entry.
> >>> So taking tx lock each time this happens, even if queue
> >>> is not stopped, seems heavyweight.
> >>>
> >>>
> >>
> >> Yes, I agree — but avoiding locking probably requires some fancy
> >> ordering tricks again..
> >>
> >>
> >>>
> >>>
> >>>
> >>>>>> The order of the operations
> >>>>>> + * is ensured by barrier().
> >>>>>> + */
> >>>>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
> >>>>>> + if (unlikely(will_invalidate)) {
> >>>>>> + rcu_read_lock();
> >>>>>> + dev = rcu_dereference(q->tap)->dev;
> >>>>>> + txq = netdev_get_tx_queue(dev, q->queue_index);
> >>>>>> + stopped = netif_tx_queue_stopped(txq);
> >>>>>> + }
> >>>>>> + barrier();
> >>>>>> + __ptr_ring_discard_one(&q->ring, will_invalidate);
> >>>>>> +
> >>>>>> + if (unlikely(will_invalidate)) {
> >>>>>> + if (stopped)
> >>>>>> + netif_tx_wake_queue(txq);
> >>>>>> + rcu_read_unlock();
> >>>>>> + }
> >>>>>
> >>>>>
> >>>>> After an entry is consumed, you can detect this by checking
> >>>>>
> >>>>> r->consumer_head >= r->consumer_tail
> >>>>>
> >>>>>
> >>>>> so it seems you could keep calling regular ptr_ring_consume
> >>>>> and check afterwards?
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>> + spin_unlock(&q->ring.consumer_lock);
> >>>>>> +
> >>>>>> + return ptr;
> >>>>>> +}
> >>>>>> +
> >>>>>> static ssize_t tap_do_read(struct tap_queue *q,
> >>>>>> struct iov_iter *to,
> >>>>>> int noblock, struct sk_buff *skb)
> >>>>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
> >>>>>> TASK_INTERRUPTIBLE);
> >>>>>>
> >>>>>> /* Read frames from the queue */
> >>>>>> - skb = ptr_ring_consume(&q->ring);
> >>>>>> + skb = tap_ring_consume(q);
> >>>>>> if (skb)
> >>>>>> break;
> >>>>>> if (noblock) {
> >>>>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
> >>>>>> ret = ptr_ring_resize_multiple_bh(rings, n,
> >>>>>> dev->tx_queue_len, GFP_KERNEL,
> >>>>>> __skb_array_destroy_skb);
> >>>>>> + if (netif_running(dev))
> >>>>>> + netif_tx_wake_all_queues(dev);
> >>>>>>
> >>>>>> kfree(rings);
> >>>>>> return ret;
> >>>>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> >>>>>> index c6b22af9bae8..682df8157b55 100644
> >>>>>> --- a/drivers/net/tun.c
> >>>>>> +++ b/drivers/net/tun.c
> >>>>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
> >>>>>> return total;
> >>>>>> }
> >>>>>>
> >>>>>> +static void *tun_ring_consume(struct tun_file *tfile)
> >>>>>> +{
> >>>>>> + struct netdev_queue *txq;
> >>>>>> + struct net_device *dev;
> >>>>>> + bool will_invalidate;
> >>>>>> + bool stopped;
> >>>>>> + void *ptr;
> >>>>>> +
> >>>>>> + spin_lock(&tfile->tx_ring.consumer_lock);
> >>>>>> + ptr = __ptr_ring_peek(&tfile->tx_ring);
> >>>>>> + if (!ptr) {
> >>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
> >>>>>> + return ptr;
> >>>>>> + }
> >>>>>> +
> >>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
> >>>>>> + * produced in the meantime, because this could result in waking
> >>>>>> + * even though the ptr_ring is full. The order of the operations
> >>>>>> + * is ensured by barrier().
> >>>>>> + */
> >>>>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
> >>>>>> + if (unlikely(will_invalidate)) {
> >>>>>> + rcu_read_lock();
> >>>>>> + dev = rcu_dereference(tfile->tun)->dev;
> >>>>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
> >>>>>> + stopped = netif_tx_queue_stopped(txq);
> >>>>>> + }
> >>>>>> + barrier();
> >>>>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
> >>>>>> +
> >>>>>> + if (unlikely(will_invalidate)) {
> >>>>>> + if (stopped)
> >>>>>> + netif_tx_wake_queue(txq);
> >>>>>> + rcu_read_unlock();
> >>>>>> + }
> >>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
> >>>>>> +
> >>>>>> + return ptr;
> >>>>>> +}
> >>>>>> +
> >>>>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> >>>>>> {
> >>>>>> DECLARE_WAITQUEUE(wait, current);
> >>>>>> void *ptr = NULL;
> >>>>>> int error = 0;
> >>>>>>
> >>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
> >>>>>> + ptr = tun_ring_consume(tfile);
> >>>>>> if (ptr)
> >>>>>> goto out;
> >>>>>> if (noblock) {
> >>>>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> >>>>>>
> >>>>>> while (1) {
> >>>>>> set_current_state(TASK_INTERRUPTIBLE);
> >>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
> >>>>>> + ptr = tun_ring_consume(tfile);
> >>>>>> if (ptr)
> >>>>>> break;
> >>>>>> if (signal_pending(current)) {
> >>>>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
> >>>>>> dev->tx_queue_len, GFP_KERNEL,
> >>>>>> tun_ptr_free);
> >>>>>>
> >>>>>> + if (netif_running(dev))
> >>>>>> + netif_tx_wake_all_queues(dev);
> >>>>>> +
> >>>>>> kfree(rings);
> >>>>>> return ret;
> >>>>>> }
> >>>>>> --
> >>>>>> 2.43.0
> >>>>>
> >>>
> >
^ permalink raw reply [flat|nested] 37+ messages in thread
* [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-23 16:36 ` Michael S. Tsirkin
2025-09-24 5:56 ` Simon Schippers
@ 2025-09-28 21:27 ` Simon Schippers
2025-09-28 22:33 ` Michael S. Tsirkin
1 sibling, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-28 21:27 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On 23.09.25 18:36, Michael S. Tsirkin wrote:
> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
>> entry of the ptr_ring and then waking the netdev queue when entries got
>> invalidated to be used again by the producer.
>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
>> if the netdev queue is stopped before invalidating entries. Like that the
>> netdev queue can be safely woken after invalidating entries.
>>
>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
>> __ptr_ring_produce within tun_net_xmit guarantees that the information
>> about the netdev queue being stopped is visible after __ptr_ring_peek is
>> called.
>>
>> The netdev queue is also woken after resizing the ptr_ring.
>>
>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>> ---
>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
>> 2 files changed, 88 insertions(+), 3 deletions(-)
>>
>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
>> index 1197f245e873..f8292721a9d6 100644
>> --- a/drivers/net/tap.c
>> +++ b/drivers/net/tap.c
>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
>> return ret ? ret : total;
>> }
>>
>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
>> +{
>> + struct netdev_queue *txq;
>> + struct net_device *dev;
>> + bool will_invalidate;
>> + bool stopped;
>> + void *ptr;
>> +
>> + spin_lock(&q->ring.consumer_lock);
>> + ptr = __ptr_ring_peek(&q->ring);
>> + if (!ptr) {
>> + spin_unlock(&q->ring.consumer_lock);
>> + return ptr;
>> + }
>> +
>> + /* Check if the queue stopped before zeroing out, so no ptr get
>> + * produced in the meantime, because this could result in waking
>> + * even though the ptr_ring is full.
>
> So what? Maybe it would be a bit suboptimal? But with your design, I do
> not get what prevents this:
>
>
> stopped? -> No
> ring is stopped
> discard
>
> and queue stays stopped forever
>
I think I found a solution to this problem, see below:
>
>> The order of the operations
>> + * is ensured by barrier().
>> + */
>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
>> + if (unlikely(will_invalidate)) {
>> + rcu_read_lock();
>> + dev = rcu_dereference(q->tap)->dev;
>> + txq = netdev_get_tx_queue(dev, q->queue_index);
>> + stopped = netif_tx_queue_stopped(txq);
>> + }
>> + barrier();
>> + __ptr_ring_discard_one(&q->ring, will_invalidate);
>> +
>> + if (unlikely(will_invalidate)) {
Here I just check for
if (will_invalidate || __ptr_ring_empty(&q->ring)) {
instead because, if the ptr_ring is empty and the netdev queue stopped,
the race must have occurred. Then it is safe to wake the netdev queue,
because it is known that space in the ptr_ring was freed when the race
occurred. Also, it is guaranteed that tap_ring_consume is called at least
once after the race, because a new entry is generated by the producer at
the race.
In my adjusted implementation, it tests fine with pktgen without any lost
packets.
Generally now I think that the whole implementation can be fine without
using spinlocks at all. I am currently adjusting the implementation
regarding SMP memory barrier pairings, and I have a question:
In the v4 you mentioned "the stop -> wake bounce involves enough barriers
already". Does it, for instance, mean that netif_tx_wake_queue already
ensures memory ordering, and I do not have to use an smp_wmb() in front of
netif_tx_wake_queue() and smp_rmb() in front of the ptr_ring operations
in tun_net_xmit?
I dug through net/core/netdevice.h and dev.c but could not really
answer this question by myself...
Thanks :)
>> + if (stopped)
>> + netif_tx_wake_queue(txq);
>> + rcu_read_unlock();
>> + }
>
>
> After an entry is consumed, you can detect this by checking
>
> r->consumer_head >= r->consumer_tail
>
>
> so it seems you could keep calling regular ptr_ring_consume
> and check afterwards?
>
>
>
>
>> + spin_unlock(&q->ring.consumer_lock);
>> +
>> + return ptr;
>> +}
>> +
>> static ssize_t tap_do_read(struct tap_queue *q,
>> struct iov_iter *to,
>> int noblock, struct sk_buff *skb)
>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
>> TASK_INTERRUPTIBLE);
>>
>> /* Read frames from the queue */
>> - skb = ptr_ring_consume(&q->ring);
>> + skb = tap_ring_consume(q);
>> if (skb)
>> break;
>> if (noblock) {
>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
>> ret = ptr_ring_resize_multiple_bh(rings, n,
>> dev->tx_queue_len, GFP_KERNEL,
>> __skb_array_destroy_skb);
>> + if (netif_running(dev))
>> + netif_tx_wake_all_queues(dev);
>>
>> kfree(rings);
>> return ret;
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index c6b22af9bae8..682df8157b55 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>> return total;
>> }
>>
>> +static void *tun_ring_consume(struct tun_file *tfile)
>> +{
>> + struct netdev_queue *txq;
>> + struct net_device *dev;
>> + bool will_invalidate;
>> + bool stopped;
>> + void *ptr;
>> +
>> + spin_lock(&tfile->tx_ring.consumer_lock);
>> + ptr = __ptr_ring_peek(&tfile->tx_ring);
>> + if (!ptr) {
>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>> + return ptr;
>> + }
>> +
>> + /* Check if the queue stopped before zeroing out, so no ptr get
>> + * produced in the meantime, because this could result in waking
>> + * even though the ptr_ring is full. The order of the operations
>> + * is ensured by barrier().
>> + */
>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
>> + if (unlikely(will_invalidate)) {
>> + rcu_read_lock();
>> + dev = rcu_dereference(tfile->tun)->dev;
>> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
>> + stopped = netif_tx_queue_stopped(txq);
>> + }
>> + barrier();
>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
>> +
>> + if (unlikely(will_invalidate)) {
>> + if (stopped)
>> + netif_tx_wake_queue(txq);
>> + rcu_read_unlock();
>> + }
>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>> +
>> + return ptr;
>> +}
>> +
>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>> {
>> DECLARE_WAITQUEUE(wait, current);
>> void *ptr = NULL;
>> int error = 0;
>>
>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>> + ptr = tun_ring_consume(tfile);
>> if (ptr)
>> goto out;
>> if (noblock) {
>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>
>> while (1) {
>> set_current_state(TASK_INTERRUPTIBLE);
>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>> + ptr = tun_ring_consume(tfile);
>> if (ptr)
>> break;
>> if (signal_pending(current)) {
>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
>> dev->tx_queue_len, GFP_KERNEL,
>> tun_ptr_free);
>>
>> + if (netif_running(dev))
>> + netif_tx_wake_all_queues(dev);
>> +
>> kfree(rings);
>> return ret;
>> }
>> --
>> 2.43.0
>
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-28 21:27 ` Simon Schippers
@ 2025-09-28 22:33 ` Michael S. Tsirkin
2025-09-29 9:43 ` Simon Schippers
0 siblings, 1 reply; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-28 22:33 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Sun, Sep 28, 2025 at 11:27:25PM +0200, Simon Schippers wrote:
> On 23.09.25 18:36, Michael S. Tsirkin wrote:
> > On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
> >> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
> >> entry of the ptr_ring and then waking the netdev queue when entries got
> >> invalidated to be used again by the producer.
> >> To avoid waking the netdev queue when the ptr_ring is full, it is checked
> >> if the netdev queue is stopped before invalidating entries. Like that the
> >> netdev queue can be safely woken after invalidating entries.
> >>
> >> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
> >> __ptr_ring_produce within tun_net_xmit guarantees that the information
> >> about the netdev queue being stopped is visible after __ptr_ring_peek is
> >> called.
> >>
> >> The netdev queue is also woken after resizing the ptr_ring.
> >>
> >> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> >> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> >> ---
> >> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
> >> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
> >> 2 files changed, 88 insertions(+), 3 deletions(-)
> >>
> >> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
> >> index 1197f245e873..f8292721a9d6 100644
> >> --- a/drivers/net/tap.c
> >> +++ b/drivers/net/tap.c
> >> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
> >> return ret ? ret : total;
> >> }
> >>
> >> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
> >> +{
> >> + struct netdev_queue *txq;
> >> + struct net_device *dev;
> >> + bool will_invalidate;
> >> + bool stopped;
> >> + void *ptr;
> >> +
> >> + spin_lock(&q->ring.consumer_lock);
> >> + ptr = __ptr_ring_peek(&q->ring);
> >> + if (!ptr) {
> >> + spin_unlock(&q->ring.consumer_lock);
> >> + return ptr;
> >> + }
> >> +
> >> + /* Check if the queue stopped before zeroing out, so no ptr get
> >> + * produced in the meantime, because this could result in waking
> >> + * even though the ptr_ring is full.
> >
> > So what? Maybe it would be a bit suboptimal? But with your design, I do
> > not get what prevents this:
> >
> >
> > stopped? -> No
> > ring is stopped
> > discard
> >
> > and queue stays stopped forever
> >
>
> I think I found a solution to this problem, see below:
>
> >
> >> The order of the operations
> >> + * is ensured by barrier().
> >> + */
> >> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
> >> + if (unlikely(will_invalidate)) {
> >> + rcu_read_lock();
> >> + dev = rcu_dereference(q->tap)->dev;
> >> + txq = netdev_get_tx_queue(dev, q->queue_index);
> >> + stopped = netif_tx_queue_stopped(txq);
> >> + }
> >> + barrier();
> >> + __ptr_ring_discard_one(&q->ring, will_invalidate);
> >> +
> >> + if (unlikely(will_invalidate)) {
>
> Here I just check for
>
> if (will_invalidate || __ptr_ring_empty(&q->ring)) {
>
> instead because, if the ptr_ring is empty and the netdev queue stopped,
> the race must have occurred. Then it is safe to wake the netdev queue,
> because it is known that space in the ptr_ring was freed when the race
> occurred. Also, it is guaranteed that tap_ring_consume is called at least
> once after the race, because a new entry is generated by the producer at
> the race.
> In my adjusted implementation, it tests fine with pktgen without any lost
> packets.
what if it is not empty and ring is stopped?
>
> Generally now I think that the whole implementation can be fine without
> using spinlocks at all. I am currently adjusting the implementation
> regarding SMP memory barrier pairings, and I have a question:
> In the v4 you mentioned "the stop -> wake bounce involves enough barriers
> already". Does it, for instance, mean that netif_tx_wake_queue already
> ensures memory ordering, and I do not have to use an smp_wmb() in front of
> netif_tx_wake_queue() and smp_rmb() in front of the ptr_ring operations
> in tun_net_xmit?
> I dug through net/core/netdevice.h and dev.c but could not really
> answer this question by myself...
> Thanks :)
Only if it wakes up something, I think.
Read:
SLEEP AND WAKE-UP FUNCTIONS
in Documentation/memory-barriers.txt
IIUC this is the same.
>
> >> + if (stopped)
> >> + netif_tx_wake_queue(txq);
> >> + rcu_read_unlock();
> >> + }
> >
> >
> > After an entry is consumed, you can detect this by checking
> >
> > r->consumer_head >= r->consumer_tail
> >
> >
> > so it seems you could keep calling regular ptr_ring_consume
> > and check afterwards?
> >
> >
> >
> >
> >> + spin_unlock(&q->ring.consumer_lock);
> >> +
> >> + return ptr;
> >> +}
> >> +
> >> static ssize_t tap_do_read(struct tap_queue *q,
> >> struct iov_iter *to,
> >> int noblock, struct sk_buff *skb)
> >> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
> >> TASK_INTERRUPTIBLE);
> >>
> >> /* Read frames from the queue */
> >> - skb = ptr_ring_consume(&q->ring);
> >> + skb = tap_ring_consume(q);
> >> if (skb)
> >> break;
> >> if (noblock) {
> >> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
> >> ret = ptr_ring_resize_multiple_bh(rings, n,
> >> dev->tx_queue_len, GFP_KERNEL,
> >> __skb_array_destroy_skb);
> >> + if (netif_running(dev))
> >> + netif_tx_wake_all_queues(dev);
> >>
> >> kfree(rings);
> >> return ret;
> >> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> >> index c6b22af9bae8..682df8157b55 100644
> >> --- a/drivers/net/tun.c
> >> +++ b/drivers/net/tun.c
> >> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
> >> return total;
> >> }
> >>
> >> +static void *tun_ring_consume(struct tun_file *tfile)
> >> +{
> >> + struct netdev_queue *txq;
> >> + struct net_device *dev;
> >> + bool will_invalidate;
> >> + bool stopped;
> >> + void *ptr;
> >> +
> >> + spin_lock(&tfile->tx_ring.consumer_lock);
> >> + ptr = __ptr_ring_peek(&tfile->tx_ring);
> >> + if (!ptr) {
> >> + spin_unlock(&tfile->tx_ring.consumer_lock);
> >> + return ptr;
> >> + }
> >> +
> >> + /* Check if the queue stopped before zeroing out, so no ptr get
> >> + * produced in the meantime, because this could result in waking
> >> + * even though the ptr_ring is full. The order of the operations
> >> + * is ensured by barrier().
> >> + */
> >> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
> >> + if (unlikely(will_invalidate)) {
> >> + rcu_read_lock();
> >> + dev = rcu_dereference(tfile->tun)->dev;
> >> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
> >> + stopped = netif_tx_queue_stopped(txq);
> >> + }
> >> + barrier();
> >> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
> >> +
> >> + if (unlikely(will_invalidate)) {
> >> + if (stopped)
> >> + netif_tx_wake_queue(txq);
> >> + rcu_read_unlock();
> >> + }
> >> + spin_unlock(&tfile->tx_ring.consumer_lock);
> >> +
> >> + return ptr;
> >> +}
> >> +
> >> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> >> {
> >> DECLARE_WAITQUEUE(wait, current);
> >> void *ptr = NULL;
> >> int error = 0;
> >>
> >> - ptr = ptr_ring_consume(&tfile->tx_ring);
> >> + ptr = tun_ring_consume(tfile);
> >> if (ptr)
> >> goto out;
> >> if (noblock) {
> >> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
> >>
> >> while (1) {
> >> set_current_state(TASK_INTERRUPTIBLE);
> >> - ptr = ptr_ring_consume(&tfile->tx_ring);
> >> + ptr = tun_ring_consume(tfile);
> >> if (ptr)
> >> break;
> >> if (signal_pending(current)) {
> >> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
> >> dev->tx_queue_len, GFP_KERNEL,
> >> tun_ptr_free);
> >>
> >> + if (netif_running(dev))
> >> + netif_tx_wake_all_queues(dev);
> >> +
> >> kfree(rings);
> >> return ret;
> >> }
> >> --
> >> 2.43.0
> >
^ permalink raw reply [flat|nested] 37+ messages in thread* [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-28 22:33 ` Michael S. Tsirkin
@ 2025-09-29 9:43 ` Simon Schippers
2025-10-11 9:15 ` Simon Schippers
0 siblings, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-29 9:43 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On 29.09.25 00:33, Michael S. Tsirkin wrote:
> On Sun, Sep 28, 2025 at 11:27:25PM +0200, Simon Schippers wrote:
>> On 23.09.25 18:36, Michael S. Tsirkin wrote:
>>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
>>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
>>>> entry of the ptr_ring and then waking the netdev queue when entries got
>>>> invalidated to be used again by the producer.
>>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
>>>> if the netdev queue is stopped before invalidating entries. Like that the
>>>> netdev queue can be safely woken after invalidating entries.
>>>>
>>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
>>>> __ptr_ring_produce within tun_net_xmit guarantees that the information
>>>> about the netdev queue being stopped is visible after __ptr_ring_peek is
>>>> called.
>>>>
>>>> The netdev queue is also woken after resizing the ptr_ring.
>>>>
>>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>>>> ---
>>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
>>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
>>>> 2 files changed, 88 insertions(+), 3 deletions(-)
>>>>
>>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
>>>> index 1197f245e873..f8292721a9d6 100644
>>>> --- a/drivers/net/tap.c
>>>> +++ b/drivers/net/tap.c
>>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
>>>> return ret ? ret : total;
>>>> }
>>>>
>>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
>>>> +{
>>>> + struct netdev_queue *txq;
>>>> + struct net_device *dev;
>>>> + bool will_invalidate;
>>>> + bool stopped;
>>>> + void *ptr;
>>>> +
>>>> + spin_lock(&q->ring.consumer_lock);
>>>> + ptr = __ptr_ring_peek(&q->ring);
>>>> + if (!ptr) {
>>>> + spin_unlock(&q->ring.consumer_lock);
>>>> + return ptr;
>>>> + }
>>>> +
>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
>>>> + * produced in the meantime, because this could result in waking
>>>> + * even though the ptr_ring is full.
>>>
>>> So what? Maybe it would be a bit suboptimal? But with your design, I do
>>> not get what prevents this:
>>>
>>>
>>> stopped? -> No
>>> ring is stopped
>>> discard
>>>
>>> and queue stays stopped forever
>>>
>>
>> I think I found a solution to this problem, see below:
>>
>>>
>>>> The order of the operations
>>>> + * is ensured by barrier().
>>>> + */
>>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
>>>> + if (unlikely(will_invalidate)) {
>>>> + rcu_read_lock();
>>>> + dev = rcu_dereference(q->tap)->dev;
>>>> + txq = netdev_get_tx_queue(dev, q->queue_index);
>>>> + stopped = netif_tx_queue_stopped(txq);
>>>> + }
>>>> + barrier();
>>>> + __ptr_ring_discard_one(&q->ring, will_invalidate);
>>>> +
>>>> + if (unlikely(will_invalidate)) {
>>
>> Here I just check for
>>
>> if (will_invalidate || __ptr_ring_empty(&q->ring)) {
>>
>> instead because, if the ptr_ring is empty and the netdev queue stopped,
>> the race must have occurred. Then it is safe to wake the netdev queue,
>> because it is known that space in the ptr_ring was freed when the race
>> occurred. Also, it is guaranteed that tap_ring_consume is called at least
>> once after the race, because a new entry is generated by the producer at
>> the race.
>> In my adjusted implementation, it tests fine with pktgen without any lost
>> packets.
>
>
> what if it is not empty and ring is stopped?
>
Then it can not be assumed that there is free space in the ptr_ring,
because __ptr_ring_discard_one may only create space after one of the
upcoming entries that it will consume. Only if the ptr_ring is empty
(which will obviously happen after some time) it is guaranteed that there
is free space in the ptr_ring, either because the race occurred
previously or __ptr_ring_discard_one freed entries right before.
>>
>> Generally now I think that the whole implementation can be fine without
>> using spinlocks at all. I am currently adjusting the implementation
>> regarding SMP memory barrier pairings, and I have a question:
>> In the v4 you mentioned "the stop -> wake bounce involves enough barriers
>> already". Does it, for instance, mean that netif_tx_wake_queue already
>> ensures memory ordering, and I do not have to use an smp_wmb() in front of
>> netif_tx_wake_queue() and smp_rmb() in front of the ptr_ring operations
>> in tun_net_xmit?
>> I dug through net/core/netdevice.h and dev.c but could not really
>> answer this question by myself...
>> Thanks :)
>
> Only if it wakes up something, I think.
>
> Read:
>
> SLEEP AND WAKE-UP FUNCTIONS
>
>
> in Documentation/memory-barriers.txt
>
>
> IIUC this is the same.
>
>
Thanks, I will look into it! :)
>>
>>>> + if (stopped)
>>>> + netif_tx_wake_queue(txq);
>>>> + rcu_read_unlock();
>>>> + }
>>>
>>>
>>> After an entry is consumed, you can detect this by checking
>>>
>>> r->consumer_head >= r->consumer_tail
>>>
>>>
>>> so it seems you could keep calling regular ptr_ring_consume
>>> and check afterwards?
>>>
>>>
>>>
>>>
>>>> + spin_unlock(&q->ring.consumer_lock);
>>>> +
>>>> + return ptr;
>>>> +}
>>>> +
>>>> static ssize_t tap_do_read(struct tap_queue *q,
>>>> struct iov_iter *to,
>>>> int noblock, struct sk_buff *skb)
>>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
>>>> TASK_INTERRUPTIBLE);
>>>>
>>>> /* Read frames from the queue */
>>>> - skb = ptr_ring_consume(&q->ring);
>>>> + skb = tap_ring_consume(q);
>>>> if (skb)
>>>> break;
>>>> if (noblock) {
>>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
>>>> ret = ptr_ring_resize_multiple_bh(rings, n,
>>>> dev->tx_queue_len, GFP_KERNEL,
>>>> __skb_array_destroy_skb);
>>>> + if (netif_running(dev))
>>>> + netif_tx_wake_all_queues(dev);
>>>>
>>>> kfree(rings);
>>>> return ret;
>>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>>>> index c6b22af9bae8..682df8157b55 100644
>>>> --- a/drivers/net/tun.c
>>>> +++ b/drivers/net/tun.c
>>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>>>> return total;
>>>> }
>>>>
>>>> +static void *tun_ring_consume(struct tun_file *tfile)
>>>> +{
>>>> + struct netdev_queue *txq;
>>>> + struct net_device *dev;
>>>> + bool will_invalidate;
>>>> + bool stopped;
>>>> + void *ptr;
>>>> +
>>>> + spin_lock(&tfile->tx_ring.consumer_lock);
>>>> + ptr = __ptr_ring_peek(&tfile->tx_ring);
>>>> + if (!ptr) {
>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>>>> + return ptr;
>>>> + }
>>>> +
>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
>>>> + * produced in the meantime, because this could result in waking
>>>> + * even though the ptr_ring is full. The order of the operations
>>>> + * is ensured by barrier().
>>>> + */
>>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
>>>> + if (unlikely(will_invalidate)) {
>>>> + rcu_read_lock();
>>>> + dev = rcu_dereference(tfile->tun)->dev;
>>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
>>>> + stopped = netif_tx_queue_stopped(txq);
>>>> + }
>>>> + barrier();
>>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
>>>> +
>>>> + if (unlikely(will_invalidate)) {
>>>> + if (stopped)
>>>> + netif_tx_wake_queue(txq);
>>>> + rcu_read_unlock();
>>>> + }
>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>>>> +
>>>> + return ptr;
>>>> +}
>>>> +
>>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>>> {
>>>> DECLARE_WAITQUEUE(wait, current);
>>>> void *ptr = NULL;
>>>> int error = 0;
>>>>
>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>>>> + ptr = tun_ring_consume(tfile);
>>>> if (ptr)
>>>> goto out;
>>>> if (noblock) {
>>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>>>
>>>> while (1) {
>>>> set_current_state(TASK_INTERRUPTIBLE);
>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>>>> + ptr = tun_ring_consume(tfile);
>>>> if (ptr)
>>>> break;
>>>> if (signal_pending(current)) {
>>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
>>>> dev->tx_queue_len, GFP_KERNEL,
>>>> tun_ptr_free);
>>>>
>>>> + if (netif_running(dev))
>>>> + netif_tx_wake_all_queues(dev);
>>>> +
>>>> kfree(rings);
>>>> return ret;
>>>> }
>>>> --
>>>> 2.43.0
>>>
>
^ permalink raw reply [flat|nested] 37+ messages in thread* [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry
2025-09-29 9:43 ` Simon Schippers
@ 2025-10-11 9:15 ` Simon Schippers
0 siblings, 0 replies; 37+ messages in thread
From: Simon Schippers @ 2025-10-11 9:15 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On 29.09.25 11:43, Simon Schippers wrote:
> On 29.09.25 00:33, Michael S. Tsirkin wrote:
>> On Sun, Sep 28, 2025 at 11:27:25PM +0200, Simon Schippers wrote:
>>> On 23.09.25 18:36, Michael S. Tsirkin wrote:
>>>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
>>>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
>>>>> entry of the ptr_ring and then waking the netdev queue when entries got
>>>>> invalidated to be used again by the producer.
>>>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
>>>>> if the netdev queue is stopped before invalidating entries. Like that the
>>>>> netdev queue can be safely woken after invalidating entries.
>>>>>
>>>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
>>>>> __ptr_ring_produce within tun_net_xmit guarantees that the information
>>>>> about the netdev queue being stopped is visible after __ptr_ring_peek is
>>>>> called.
>>>>>
>>>>> The netdev queue is also woken after resizing the ptr_ring.
>>>>>
>>>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>>>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>>>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>>>>> ---
>>>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
>>>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
>>>>> 2 files changed, 88 insertions(+), 3 deletions(-)
>>>>>
>>>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
>>>>> index 1197f245e873..f8292721a9d6 100644
>>>>> --- a/drivers/net/tap.c
>>>>> +++ b/drivers/net/tap.c
>>>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
>>>>> return ret ? ret : total;
>>>>> }
>>>>>
>>>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
>>>>> +{
>>>>> + struct netdev_queue *txq;
>>>>> + struct net_device *dev;
>>>>> + bool will_invalidate;
>>>>> + bool stopped;
>>>>> + void *ptr;
>>>>> +
>>>>> + spin_lock(&q->ring.consumer_lock);
>>>>> + ptr = __ptr_ring_peek(&q->ring);
>>>>> + if (!ptr) {
>>>>> + spin_unlock(&q->ring.consumer_lock);
>>>>> + return ptr;
>>>>> + }
>>>>> +
>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
>>>>> + * produced in the meantime, because this could result in waking
>>>>> + * even though the ptr_ring is full.
>>>>
>>>> So what? Maybe it would be a bit suboptimal? But with your design, I do
>>>> not get what prevents this:
>>>>
>>>>
>>>> stopped? -> No
>>>> ring is stopped
>>>> discard
>>>>
>>>> and queue stays stopped forever
>>>>
>>>
>>> I think I found a solution to this problem, see below:
>>>
>>>>
>>>>> The order of the operations
>>>>> + * is ensured by barrier().
>>>>> + */
>>>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
>>>>> + if (unlikely(will_invalidate)) {
>>>>> + rcu_read_lock();
>>>>> + dev = rcu_dereference(q->tap)->dev;
>>>>> + txq = netdev_get_tx_queue(dev, q->queue_index);
>>>>> + stopped = netif_tx_queue_stopped(txq);
>>>>> + }
>>>>> + barrier();
>>>>> + __ptr_ring_discard_one(&q->ring, will_invalidate);
>>>>> +
>>>>> + if (unlikely(will_invalidate)) {
>>>
>>> Here I just check for
>>>
>>> if (will_invalidate || __ptr_ring_empty(&q->ring)) {
>>>
>>> instead because, if the ptr_ring is empty and the netdev queue stopped,
>>> the race must have occurred. Then it is safe to wake the netdev queue,
>>> because it is known that space in the ptr_ring was freed when the race
>>> occurred. Also, it is guaranteed that tap_ring_consume is called at least
>>> once after the race, because a new entry is generated by the producer at
>>> the race.
>>> In my adjusted implementation, it tests fine with pktgen without any lost
>>> packets.
>>
>>
>> what if it is not empty and ring is stopped?
>>
>
> Then it can not be assumed that there is free space in the ptr_ring,
> because __ptr_ring_discard_one may only create space after one of the
> upcoming entries that it will consume. Only if the ptr_ring is empty
> (which will obviously happen after some time) it is guaranteed that there
> is free space in the ptr_ring, either because the race occurred
> previously or __ptr_ring_discard_one freed entries right before.
>
>>>
>>> Generally now I think that the whole implementation can be fine without
>>> using spinlocks at all. I am currently adjusting the implementation
>>> regarding SMP memory barrier pairings, and I have a question:
>>> In the v4 you mentioned "the stop -> wake bounce involves enough barriers
>>> already". Does it, for instance, mean that netif_tx_wake_queue already
>>> ensures memory ordering, and I do not have to use an smp_wmb() in front of
>>> netif_tx_wake_queue() and smp_rmb() in front of the ptr_ring operations
>>> in tun_net_xmit?
>>> I dug through net/core/netdevice.h and dev.c but could not really
>>> answer this question by myself...
>>> Thanks :)
>>
>> Only if it wakes up something, I think.
>>
>> Read:
>>
>> SLEEP AND WAKE-UP FUNCTIONS
>>
>>
>> in Documentation/memory-barriers.txt
>>
>>
>> IIUC this is the same.
>>
>>
>
> Thanks, I will look into it! :)
>
I do not see how the netdev queue flow control follows a setup with a
sleeper and a waker as described in SLEEP AND WAKE-UP FUNCTIONS.
Yes, there is netif_tx_wake_queue, but it does not call a waker function
like complete() or wake_up(). And I do not see how there is a sleeper
that calls schedule() somewhere.
Do I misunderstand something?
For now I would instead use an additional smp_wmb() in front of
netif_tx_wake_queue for the consumer and an smp_rmb() in front of all
ptr_ring operations for the producer. This ensures that space, which is
freed by the consumer that woke up the netdev queue, is visible for the
producer.
Thanks! :)
>>>
>>>>> + if (stopped)
>>>>> + netif_tx_wake_queue(txq);
>>>>> + rcu_read_unlock();
>>>>> + }
>>>>
>>>>
>>>> After an entry is consumed, you can detect this by checking
>>>>
>>>> r->consumer_head >= r->consumer_tail
>>>>
>>>>
>>>> so it seems you could keep calling regular ptr_ring_consume
>>>> and check afterwards?
>>>>
>>>>
>>>>
>>>>
>>>>> + spin_unlock(&q->ring.consumer_lock);
>>>>> +
>>>>> + return ptr;
>>>>> +}
>>>>> +
>>>>> static ssize_t tap_do_read(struct tap_queue *q,
>>>>> struct iov_iter *to,
>>>>> int noblock, struct sk_buff *skb)
>>>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
>>>>> TASK_INTERRUPTIBLE);
>>>>>
>>>>> /* Read frames from the queue */
>>>>> - skb = ptr_ring_consume(&q->ring);
>>>>> + skb = tap_ring_consume(q);
>>>>> if (skb)
>>>>> break;
>>>>> if (noblock) {
>>>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
>>>>> ret = ptr_ring_resize_multiple_bh(rings, n,
>>>>> dev->tx_queue_len, GFP_KERNEL,
>>>>> __skb_array_destroy_skb);
>>>>> + if (netif_running(dev))
>>>>> + netif_tx_wake_all_queues(dev);
>>>>>
>>>>> kfree(rings);
>>>>> return ret;
>>>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>>>>> index c6b22af9bae8..682df8157b55 100644
>>>>> --- a/drivers/net/tun.c
>>>>> +++ b/drivers/net/tun.c
>>>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>>>>> return total;
>>>>> }
>>>>>
>>>>> +static void *tun_ring_consume(struct tun_file *tfile)
>>>>> +{
>>>>> + struct netdev_queue *txq;
>>>>> + struct net_device *dev;
>>>>> + bool will_invalidate;
>>>>> + bool stopped;
>>>>> + void *ptr;
>>>>> +
>>>>> + spin_lock(&tfile->tx_ring.consumer_lock);
>>>>> + ptr = __ptr_ring_peek(&tfile->tx_ring);
>>>>> + if (!ptr) {
>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>>>>> + return ptr;
>>>>> + }
>>>>> +
>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get
>>>>> + * produced in the meantime, because this could result in waking
>>>>> + * even though the ptr_ring is full. The order of the operations
>>>>> + * is ensured by barrier().
>>>>> + */
>>>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
>>>>> + if (unlikely(will_invalidate)) {
>>>>> + rcu_read_lock();
>>>>> + dev = rcu_dereference(tfile->tun)->dev;
>>>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index);
>>>>> + stopped = netif_tx_queue_stopped(txq);
>>>>> + }
>>>>> + barrier();
>>>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
>>>>> +
>>>>> + if (unlikely(will_invalidate)) {
>>>>> + if (stopped)
>>>>> + netif_tx_wake_queue(txq);
>>>>> + rcu_read_unlock();
>>>>> + }
>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock);
>>>>> +
>>>>> + return ptr;
>>>>> +}
>>>>> +
>>>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>>>> {
>>>>> DECLARE_WAITQUEUE(wait, current);
>>>>> void *ptr = NULL;
>>>>> int error = 0;
>>>>>
>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>>>>> + ptr = tun_ring_consume(tfile);
>>>>> if (ptr)
>>>>> goto out;
>>>>> if (noblock) {
>>>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>>>>
>>>>> while (1) {
>>>>> set_current_state(TASK_INTERRUPTIBLE);
>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring);
>>>>> + ptr = tun_ring_consume(tfile);
>>>>> if (ptr)
>>>>> break;
>>>>> if (signal_pending(current)) {
>>>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
>>>>> dev->tx_queue_len, GFP_KERNEL,
>>>>> tun_ptr_free);
>>>>>
>>>>> + if (netif_running(dev))
>>>>> + netif_tx_wake_all_queues(dev);
>>>>> +
>>>>> kfree(rings);
>>>>> return ret;
>>>>> }
>>>>> --
>>>>> 2.43.0
>>>>
>>
^ permalink raw reply [flat|nested] 37+ messages in thread
* [PATCH net-next v5 5/8] TUN & TAP: Provide ptr_ring_consume_batched wrappers for vhost_net
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
` (3 preceding siblings ...)
2025-09-22 22:15 ` [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming an entry Simon Schippers
@ 2025-09-22 22:15 ` Simon Schippers
2025-09-23 16:23 ` Michael S. Tsirkin
2025-09-22 22:15 ` [PATCH net-next v5 6/8] TUN & TAP: Provide ptr_ring_unconsume " Simon Schippers
` (4 subsequent siblings)
9 siblings, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-22 22:15 UTC (permalink / raw)
To: willemdebruijn.kernel, jasowang, mst, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
Cc: Simon Schippers, Tim Gebauer
The wrappers tun_ring_consume_batched/tap_ring_consume_batched are similar
to the wrappers tun_ring_consume/tap_ring_consume. They deal with
consuming a batch of entries of the ptr_ring and then waking the
netdev queue whenever entries get invalidated to be used again by the
producer.
To avoid waking the netdev queue when the ptr_ring is full, it is checked
if the netdev queue is stopped before invalidating entries. Like that the
netdev queue can be safely woken after invalidating entries.
The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
__ptr_ring_produce within tun_net_xmit guarantees that the information
about the netdev queue being stopped is visible after __ptr_ring_peek is
called.
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
drivers/net/tap.c | 52 ++++++++++++++++++++++++++++++++++++++++
drivers/net/tun.c | 54 ++++++++++++++++++++++++++++++++++++++++++
include/linux/if_tap.h | 6 +++++
include/linux/if_tun.h | 7 ++++++
4 files changed, 119 insertions(+)
diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index f8292721a9d6..651d48612329 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -1216,6 +1216,58 @@ struct socket *tap_get_socket(struct file *file)
}
EXPORT_SYMBOL_GPL(tap_get_socket);
+int tap_ring_consume_batched(struct file *file,
+ void **array, int n)
+{
+ struct tap_queue *q = file->private_data;
+ struct netdev_queue *txq;
+ struct net_device *dev;
+ bool will_invalidate;
+ bool stopped;
+ void *ptr;
+ int i;
+
+ spin_lock(&q->ring.consumer_lock);
+ ptr = __ptr_ring_peek(&q->ring);
+
+ if (!ptr) {
+ spin_unlock(&q->ring.consumer_lock);
+ return 0;
+ }
+
+ i = 0;
+ do {
+ /* Check if the queue stopped before zeroing out, so no
+ * ptr get produced in the meantime, because this could
+ * result in waking even though the ptr_ring is full.
+ * The order of the operations is ensured by barrier().
+ */
+ will_invalidate = __ptr_ring_will_invalidate(&q->ring);
+ if (unlikely(will_invalidate)) {
+ rcu_read_lock();
+ dev = rcu_dereference(q->tap)->dev;
+ txq = netdev_get_tx_queue(dev, q->queue_index);
+ stopped = netif_tx_queue_stopped(txq);
+ }
+ barrier();
+ __ptr_ring_discard_one(&q->ring, will_invalidate);
+
+ if (unlikely(will_invalidate)) {
+ if (stopped)
+ netif_tx_wake_queue(txq);
+ rcu_read_unlock();
+ }
+
+ array[i++] = ptr;
+ if (i >= n)
+ break;
+ } while ((ptr = __ptr_ring_peek(&q->ring)));
+ spin_unlock(&q->ring.consumer_lock);
+
+ return i;
+}
+EXPORT_SYMBOL_GPL(tap_ring_consume_batched);
+
struct ptr_ring *tap_get_ptr_ring(struct file *file)
{
struct tap_queue *q;
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 682df8157b55..7566b22780fb 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -3759,6 +3759,60 @@ struct socket *tun_get_socket(struct file *file)
}
EXPORT_SYMBOL_GPL(tun_get_socket);
+int tun_ring_consume_batched(struct file *file,
+ void **array, int n)
+{
+ struct tun_file *tfile = file->private_data;
+ struct netdev_queue *txq;
+ struct net_device *dev;
+ bool will_invalidate;
+ bool stopped;
+ void *ptr;
+ int i;
+
+ spin_lock(&tfile->tx_ring.consumer_lock);
+ ptr = __ptr_ring_peek(&tfile->tx_ring);
+
+ if (!ptr) {
+ spin_unlock(&tfile->tx_ring.consumer_lock);
+ return 0;
+ }
+
+ i = 0;
+ do {
+ /* Check if the queue stopped before zeroing out, so no
+ * ptr get produced in the meantime, because this could
+ * result in waking even though the ptr_ring is full.
+ * The order of the operations is ensured by barrier().
+ */
+ will_invalidate =
+ __ptr_ring_will_invalidate(&tfile->tx_ring);
+ if (unlikely(will_invalidate)) {
+ rcu_read_lock();
+ dev = rcu_dereference(tfile->tun)->dev;
+ txq = netdev_get_tx_queue(dev,
+ tfile->queue_index);
+ stopped = netif_tx_queue_stopped(txq);
+ }
+ barrier();
+ __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
+
+ if (unlikely(will_invalidate)) {
+ if (stopped)
+ netif_tx_wake_queue(txq);
+ rcu_read_unlock();
+ }
+
+ array[i++] = ptr;
+ if (i >= n)
+ break;
+ } while ((ptr = __ptr_ring_peek(&tfile->tx_ring)));
+ spin_unlock(&tfile->tx_ring.consumer_lock);
+
+ return i;
+}
+EXPORT_SYMBOL_GPL(tun_ring_consume_batched);
+
struct ptr_ring *tun_get_tx_ring(struct file *file)
{
struct tun_file *tfile;
diff --git a/include/linux/if_tap.h b/include/linux/if_tap.h
index 553552fa635c..2e5542d6aef4 100644
--- a/include/linux/if_tap.h
+++ b/include/linux/if_tap.h
@@ -11,6 +11,7 @@ struct socket;
#if IS_ENABLED(CONFIG_TAP)
struct socket *tap_get_socket(struct file *);
struct ptr_ring *tap_get_ptr_ring(struct file *file);
+int tap_ring_consume_batched(struct file *file, void **array, int n);
#else
#include <linux/err.h>
#include <linux/errno.h>
@@ -22,6 +23,11 @@ static inline struct ptr_ring *tap_get_ptr_ring(struct file *f)
{
return ERR_PTR(-EINVAL);
}
+static inline int tap_ring_consume_batched(struct file *f,
+ void **array, int n)
+{
+ return 0;
+}
#endif /* CONFIG_TAP */
/*
diff --git a/include/linux/if_tun.h b/include/linux/if_tun.h
index 80166eb62f41..5b41525ac007 100644
--- a/include/linux/if_tun.h
+++ b/include/linux/if_tun.h
@@ -22,6 +22,7 @@ struct tun_msg_ctl {
#if defined(CONFIG_TUN) || defined(CONFIG_TUN_MODULE)
struct socket *tun_get_socket(struct file *);
struct ptr_ring *tun_get_tx_ring(struct file *file);
+int tun_ring_consume_batched(struct file *file, void **array, int n);
static inline bool tun_is_xdp_frame(void *ptr)
{
@@ -55,6 +56,12 @@ static inline struct ptr_ring *tun_get_tx_ring(struct file *f)
return ERR_PTR(-EINVAL);
}
+static inline int tun_ring_consume_batched(struct file *file,
+ void **array, int n)
+{
+ return 0;
+}
+
static inline bool tun_is_xdp_frame(void *ptr)
{
return false;
--
2.43.0
^ permalink raw reply related [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 5/8] TUN & TAP: Provide ptr_ring_consume_batched wrappers for vhost_net
2025-09-22 22:15 ` [PATCH net-next v5 5/8] TUN & TAP: Provide ptr_ring_consume_batched wrappers for vhost_net Simon Schippers
@ 2025-09-23 16:23 ` Michael S. Tsirkin
0 siblings, 0 replies; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-23 16:23 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm, Tim Gebauer
On Tue, Sep 23, 2025 at 12:15:50AM +0200, Simon Schippers wrote:
> The wrappers tun_ring_consume_batched/tap_ring_consume_batched are similar
> to the wrappers tun_ring_consume/tap_ring_consume. They deal with
> consuming a batch of entries of the ptr_ring and then waking the
> netdev queue whenever entries get invalidated to be used again by the
> producer.
> To avoid waking the netdev queue when the ptr_ring is full, it is checked
> if the netdev queue is stopped before invalidating entries. Like that the
> netdev queue can be safely woken after invalidating entries.
>
> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
> __ptr_ring_produce within tun_net_xmit guarantees that the information
> about the netdev queue being stopped is visible after __ptr_ring_peek is
> called.
READ_ONCE generally can't pair with smp_wmb
From Documentation/memory-barriers.txt
SMP BARRIER PAIRING
-------------------
When dealing with CPU-CPU interactions, certain types of memory barrier should
always be paired. A lack of appropriate pairing is almost certainly an error.
....
A write barrier pairs
with an address-dependency barrier, a control dependency, an acquire barrier,
a release barrier, a read barrier, or a general barrier.
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> ---
> drivers/net/tap.c | 52 ++++++++++++++++++++++++++++++++++++++++
> drivers/net/tun.c | 54 ++++++++++++++++++++++++++++++++++++++++++
> include/linux/if_tap.h | 6 +++++
> include/linux/if_tun.h | 7 ++++++
> 4 files changed, 119 insertions(+)
>
> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
> index f8292721a9d6..651d48612329 100644
> --- a/drivers/net/tap.c
> +++ b/drivers/net/tap.c
> @@ -1216,6 +1216,58 @@ struct socket *tap_get_socket(struct file *file)
> }
> EXPORT_SYMBOL_GPL(tap_get_socket);
>
> +int tap_ring_consume_batched(struct file *file,
> + void **array, int n)
> +{
> + struct tap_queue *q = file->private_data;
> + struct netdev_queue *txq;
> + struct net_device *dev;
> + bool will_invalidate;
> + bool stopped;
> + void *ptr;
> + int i;
> +
> + spin_lock(&q->ring.consumer_lock);
> + ptr = __ptr_ring_peek(&q->ring);
> +
> + if (!ptr) {
> + spin_unlock(&q->ring.consumer_lock);
> + return 0;
> + }
> +
> + i = 0;
> + do {
> + /* Check if the queue stopped before zeroing out, so no
> + * ptr get produced in the meantime, because this could
> + * result in waking even though the ptr_ring is full.
> + * The order of the operations is ensured by barrier().
> + */
> + will_invalidate = __ptr_ring_will_invalidate(&q->ring);
> + if (unlikely(will_invalidate)) {
> + rcu_read_lock();
> + dev = rcu_dereference(q->tap)->dev;
> + txq = netdev_get_tx_queue(dev, q->queue_index);
> + stopped = netif_tx_queue_stopped(txq);
> + }
> + barrier();
> + __ptr_ring_discard_one(&q->ring, will_invalidate);
> +
> + if (unlikely(will_invalidate)) {
> + if (stopped)
> + netif_tx_wake_queue(txq);
> + rcu_read_unlock();
> + }
> +
> + array[i++] = ptr;
> + if (i >= n)
> + break;
> + } while ((ptr = __ptr_ring_peek(&q->ring)));
> + spin_unlock(&q->ring.consumer_lock);
> +
> + return i;
> +}
> +EXPORT_SYMBOL_GPL(tap_ring_consume_batched);
> +
> struct ptr_ring *tap_get_ptr_ring(struct file *file)
> {
> struct tap_queue *q;
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index 682df8157b55..7566b22780fb 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -3759,6 +3759,60 @@ struct socket *tun_get_socket(struct file *file)
> }
> EXPORT_SYMBOL_GPL(tun_get_socket);
>
> +int tun_ring_consume_batched(struct file *file,
> + void **array, int n)
> +{
> + struct tun_file *tfile = file->private_data;
> + struct netdev_queue *txq;
> + struct net_device *dev;
> + bool will_invalidate;
> + bool stopped;
> + void *ptr;
> + int i;
> +
> + spin_lock(&tfile->tx_ring.consumer_lock);
> + ptr = __ptr_ring_peek(&tfile->tx_ring);
> +
> + if (!ptr) {
> + spin_unlock(&tfile->tx_ring.consumer_lock);
> + return 0;
> + }
> +
> + i = 0;
> + do {
> + /* Check if the queue stopped before zeroing out, so no
> + * ptr get produced in the meantime, because this could
> + * result in waking even though the ptr_ring is full.
> + * The order of the operations is ensured by barrier().
> + */
> + will_invalidate =
> + __ptr_ring_will_invalidate(&tfile->tx_ring);
> + if (unlikely(will_invalidate)) {
> + rcu_read_lock();
> + dev = rcu_dereference(tfile->tun)->dev;
> + txq = netdev_get_tx_queue(dev,
> + tfile->queue_index);
> + stopped = netif_tx_queue_stopped(txq);
> + }
> + barrier();
> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
> +
> + if (unlikely(will_invalidate)) {
> + if (stopped)
> + netif_tx_wake_queue(txq);
> + rcu_read_unlock();
> + }
> +
> + array[i++] = ptr;
> + if (i >= n)
> + break;
> + } while ((ptr = __ptr_ring_peek(&tfile->tx_ring)));
> + spin_unlock(&tfile->tx_ring.consumer_lock);
> +
> + return i;
> +}
> +EXPORT_SYMBOL_GPL(tun_ring_consume_batched);
> +
> struct ptr_ring *tun_get_tx_ring(struct file *file)
> {
> struct tun_file *tfile;
> diff --git a/include/linux/if_tap.h b/include/linux/if_tap.h
> index 553552fa635c..2e5542d6aef4 100644
> --- a/include/linux/if_tap.h
> +++ b/include/linux/if_tap.h
> @@ -11,6 +11,7 @@ struct socket;
> #if IS_ENABLED(CONFIG_TAP)
> struct socket *tap_get_socket(struct file *);
> struct ptr_ring *tap_get_ptr_ring(struct file *file);
> +int tap_ring_consume_batched(struct file *file, void **array, int n);
> #else
> #include <linux/err.h>
> #include <linux/errno.h>
> @@ -22,6 +23,11 @@ static inline struct ptr_ring *tap_get_ptr_ring(struct file *f)
> {
> return ERR_PTR(-EINVAL);
> }
> +static inline int tap_ring_consume_batched(struct file *f,
> + void **array, int n)
> +{
> + return 0;
> +}
> #endif /* CONFIG_TAP */
>
> /*
> diff --git a/include/linux/if_tun.h b/include/linux/if_tun.h
> index 80166eb62f41..5b41525ac007 100644
> --- a/include/linux/if_tun.h
> +++ b/include/linux/if_tun.h
> @@ -22,6 +22,7 @@ struct tun_msg_ctl {
> #if defined(CONFIG_TUN) || defined(CONFIG_TUN_MODULE)
> struct socket *tun_get_socket(struct file *);
> struct ptr_ring *tun_get_tx_ring(struct file *file);
> +int tun_ring_consume_batched(struct file *file, void **array, int n);
>
> static inline bool tun_is_xdp_frame(void *ptr)
> {
> @@ -55,6 +56,12 @@ static inline struct ptr_ring *tun_get_tx_ring(struct file *f)
> return ERR_PTR(-EINVAL);
> }
>
> +static inline int tun_ring_consume_batched(struct file *file,
> + void **array, int n)
> +{
> + return 0;
> +}
> +
> static inline bool tun_is_xdp_frame(void *ptr)
> {
> return false;
> --
> 2.43.0
^ permalink raw reply [flat|nested] 37+ messages in thread
* [PATCH net-next v5 6/8] TUN & TAP: Provide ptr_ring_unconsume wrappers for vhost_net
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
` (4 preceding siblings ...)
2025-09-22 22:15 ` [PATCH net-next v5 5/8] TUN & TAP: Provide ptr_ring_consume_batched wrappers for vhost_net Simon Schippers
@ 2025-09-22 22:15 ` Simon Schippers
2025-09-22 22:15 ` [PATCH net-next v5 7/8] TUN & TAP: Methods to determine whether file is TUN/TAP " Simon Schippers
` (3 subsequent siblings)
9 siblings, 0 replies; 37+ messages in thread
From: Simon Schippers @ 2025-09-22 22:15 UTC (permalink / raw)
To: willemdebruijn.kernel, jasowang, mst, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
Cc: Simon Schippers, Tim Gebauer
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
drivers/net/tap.c | 9 +++++++++
drivers/net/tun.c | 9 +++++++++
include/linux/if_tap.h | 4 ++++
include/linux/if_tun.h | 5 +++++
4 files changed, 27 insertions(+)
diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 651d48612329..9720481f6d41 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -1268,6 +1268,15 @@ int tap_ring_consume_batched(struct file *file,
}
EXPORT_SYMBOL_GPL(tap_ring_consume_batched);
+void tap_ring_unconsume(struct file *file, void **batch, int n,
+ void (*destroy)(void *))
+{
+ struct tap_queue *q = file->private_data;
+
+ ptr_ring_unconsume(&q->ring, batch, n, destroy);
+}
+EXPORT_SYMBOL_GPL(tap_ring_unconsume);
+
struct ptr_ring *tap_get_ptr_ring(struct file *file)
{
struct tap_queue *q;
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 7566b22780fb..25b170e903e1 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -3813,6 +3813,15 @@ int tun_ring_consume_batched(struct file *file,
}
EXPORT_SYMBOL_GPL(tun_ring_consume_batched);
+void tun_ring_unconsume(struct file *file, void **batch, int n,
+ void (*destroy)(void *))
+{
+ struct tun_file *tfile = file->private_data;
+
+ ptr_ring_unconsume(&tfile->tx_ring, batch, n, destroy);
+}
+EXPORT_SYMBOL_GPL(tun_ring_unconsume);
+
struct ptr_ring *tun_get_tx_ring(struct file *file)
{
struct tun_file *tfile;
diff --git a/include/linux/if_tap.h b/include/linux/if_tap.h
index 2e5542d6aef4..0cf8cf200d52 100644
--- a/include/linux/if_tap.h
+++ b/include/linux/if_tap.h
@@ -12,6 +12,8 @@ struct socket;
struct socket *tap_get_socket(struct file *);
struct ptr_ring *tap_get_ptr_ring(struct file *file);
int tap_ring_consume_batched(struct file *file, void **array, int n);
+void tap_ring_unconsume(struct file *file, void **batch, int n,
+ void (*destroy)(void *));
#else
#include <linux/err.h>
#include <linux/errno.h>
@@ -28,6 +30,8 @@ static inline int tap_ring_consume_batched(struct file *f,
{
return 0;
}
+static inline void tap_ring_unconsume(struct file *file, void **batch,
+ int n, void (*destroy)(void *)) {}
#endif /* CONFIG_TAP */
/*
diff --git a/include/linux/if_tun.h b/include/linux/if_tun.h
index 5b41525ac007..bd954bb117e8 100644
--- a/include/linux/if_tun.h
+++ b/include/linux/if_tun.h
@@ -23,6 +23,8 @@ struct tun_msg_ctl {
struct socket *tun_get_socket(struct file *);
struct ptr_ring *tun_get_tx_ring(struct file *file);
int tun_ring_consume_batched(struct file *file, void **array, int n);
+void tun_ring_unconsume(struct file *file, void **batch, int n,
+ void (*destroy)(void *));
static inline bool tun_is_xdp_frame(void *ptr)
{
@@ -62,6 +64,9 @@ static inline int tun_ring_consume_batched(struct file *file,
return 0;
}
+static inline void tun_ring_unconsume(struct file *file, void **batch,
+ int n, void (*destroy)(void *)) {}
+
static inline bool tun_is_xdp_frame(void *ptr)
{
return false;
--
2.43.0
^ permalink raw reply related [flat|nested] 37+ messages in thread* [PATCH net-next v5 7/8] TUN & TAP: Methods to determine whether file is TUN/TAP for vhost_net
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
` (5 preceding siblings ...)
2025-09-22 22:15 ` [PATCH net-next v5 6/8] TUN & TAP: Provide ptr_ring_unconsume " Simon Schippers
@ 2025-09-22 22:15 ` Simon Schippers
2025-09-22 22:15 ` [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers Simon Schippers
` (2 subsequent siblings)
9 siblings, 0 replies; 37+ messages in thread
From: Simon Schippers @ 2025-09-22 22:15 UTC (permalink / raw)
To: willemdebruijn.kernel, jasowang, mst, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
Cc: Simon Schippers, Tim Gebauer
Those wrappers are inspired by tun_get_tx_ring/tap_get_tx_ring and replace
those methods.
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
drivers/net/tap.c | 10 +++++-----
drivers/net/tun.c | 10 +++++-----
include/linux/if_tap.h | 5 +++++
include/linux/if_tun.h | 6 ++++++
4 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 9720481f6d41..8d3e74330309 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -1277,18 +1277,18 @@ void tap_ring_unconsume(struct file *file, void **batch, int n,
}
EXPORT_SYMBOL_GPL(tap_ring_unconsume);
-struct ptr_ring *tap_get_ptr_ring(struct file *file)
+bool is_tap_file(struct file *file)
{
struct tap_queue *q;
if (file->f_op != &tap_fops)
- return ERR_PTR(-EINVAL);
+ return false;
q = file->private_data;
if (!q)
- return ERR_PTR(-EBADFD);
- return &q->ring;
+ return false;
+ return true;
}
-EXPORT_SYMBOL_GPL(tap_get_ptr_ring);
+EXPORT_SYMBOL_GPL(is_tap_file);
int tap_queue_resize(struct tap_dev *tap)
{
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 25b170e903e1..b0193b06fedc 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -3822,18 +3822,18 @@ void tun_ring_unconsume(struct file *file, void **batch, int n,
}
EXPORT_SYMBOL_GPL(tun_ring_unconsume);
-struct ptr_ring *tun_get_tx_ring(struct file *file)
+bool is_tun_file(struct file *file)
{
struct tun_file *tfile;
if (file->f_op != &tun_fops)
- return ERR_PTR(-EINVAL);
+ return false;
tfile = file->private_data;
if (!tfile)
- return ERR_PTR(-EBADFD);
- return &tfile->tx_ring;
+ return false;
+ return true;
}
-EXPORT_SYMBOL_GPL(tun_get_tx_ring);
+EXPORT_SYMBOL_GPL(is_tun_file);
module_init(tun_init);
module_exit(tun_cleanup);
diff --git a/include/linux/if_tap.h b/include/linux/if_tap.h
index 0cf8cf200d52..5bbcc8611bf5 100644
--- a/include/linux/if_tap.h
+++ b/include/linux/if_tap.h
@@ -14,6 +14,7 @@ struct ptr_ring *tap_get_ptr_ring(struct file *file);
int tap_ring_consume_batched(struct file *file, void **array, int n);
void tap_ring_unconsume(struct file *file, void **batch, int n,
void (*destroy)(void *));
+bool is_tap_file(struct file *file);
#else
#include <linux/err.h>
#include <linux/errno.h>
@@ -32,6 +33,10 @@ static inline int tap_ring_consume_batched(struct file *f,
}
static inline void tap_ring_unconsume(struct file *file, void **batch,
int n, void (*destroy)(void *)) {}
+static inline bool is_tap_file(struct file *f)
+{
+ return false;
+}
#endif /* CONFIG_TAP */
/*
diff --git a/include/linux/if_tun.h b/include/linux/if_tun.h
index bd954bb117e8..869d61898e60 100644
--- a/include/linux/if_tun.h
+++ b/include/linux/if_tun.h
@@ -25,6 +25,7 @@ struct ptr_ring *tun_get_tx_ring(struct file *file);
int tun_ring_consume_batched(struct file *file, void **array, int n);
void tun_ring_unconsume(struct file *file, void **batch, int n,
void (*destroy)(void *));
+bool is_tun_file(struct file *file);
static inline bool tun_is_xdp_frame(void *ptr)
{
@@ -67,6 +68,11 @@ static inline int tun_ring_consume_batched(struct file *file,
static inline void tun_ring_unconsume(struct file *file, void **batch,
int n, void (*destroy)(void *)) {}
+static inline bool is_tun_file(struct file *f)
+{
+ return false;
+}
+
static inline bool tun_is_xdp_frame(void *ptr)
{
return false;
--
2.43.0
^ permalink raw reply related [flat|nested] 37+ messages in thread* [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
` (6 preceding siblings ...)
2025-09-22 22:15 ` [PATCH net-next v5 7/8] TUN & TAP: Methods to determine whether file is TUN/TAP " Simon Schippers
@ 2025-09-22 22:15 ` Simon Schippers
2025-09-23 14:14 ` kernel test robot
2025-09-26 13:47 ` kernel test robot
2025-09-23 14:55 ` [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Michael S. Tsirkin
2025-09-24 7:18 ` Michael S. Tsirkin
9 siblings, 2 replies; 37+ messages in thread
From: Simon Schippers @ 2025-09-22 22:15 UTC (permalink / raw)
To: willemdebruijn.kernel, jasowang, mst, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
Cc: Simon Schippers, Tim Gebauer
Instead of the rx_ring, the virtqueue saves the interface type TUN, TAP
(or IF_NONE) to call TUN/TAP wrappers.
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
drivers/vhost/net.c | 90 +++++++++++++++++++++++++++++----------------
1 file changed, 58 insertions(+), 32 deletions(-)
diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index c6508fe0d5c8..6be17b53cc6c 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -127,10 +127,10 @@ struct vhost_net_virtqueue {
/* Reference counting for outstanding ubufs.
* Protected by vq mutex. Writers must also take device mutex. */
struct vhost_net_ubuf_ref *ubufs;
- struct ptr_ring *rx_ring;
struct vhost_net_buf rxq;
/* Batched XDP buffs */
struct xdp_buff *xdp;
+ enum if_type {IF_NONE = 0, TUN, TAP} type;
};
struct vhost_net {
@@ -176,24 +176,54 @@ static void *vhost_net_buf_consume(struct vhost_net_buf *rxq)
return ret;
}
-static int vhost_net_buf_produce(struct vhost_net_virtqueue *nvq)
+static int vhost_net_buf_produce(struct vhost_net_virtqueue *nvq,
+ struct sock *sk)
{
+ struct file *file = sk->sk_socket->file;
struct vhost_net_buf *rxq = &nvq->rxq;
rxq->head = 0;
- rxq->tail = ptr_ring_consume_batched(nvq->rx_ring, rxq->queue,
- VHOST_NET_BATCH);
+
+ switch (nvq->type) {
+ case TUN:
+ rxq->tail = tun_ring_consume_batched(file,
+ rxq->queue, VHOST_NET_BATCH);
+ break;
+ case TAP:
+ rxq->tail = tap_ring_consume_batched(file,
+ rxq->queue, VHOST_NET_BATCH);
+ break;
+ case IF_NONE:
+ WARN_ON_ONCE();
+ }
+
return rxq->tail;
}
-static void vhost_net_buf_unproduce(struct vhost_net_virtqueue *nvq)
+static void vhost_net_buf_unproduce(struct vhost_net_virtqueue *nvq,
+ struct socket *sk)
{
struct vhost_net_buf *rxq = &nvq->rxq;
-
- if (nvq->rx_ring && !vhost_net_buf_is_empty(rxq)) {
- ptr_ring_unconsume(nvq->rx_ring, rxq->queue + rxq->head,
- vhost_net_buf_get_size(rxq),
- tun_ptr_free);
+ struct file *file;
+
+ if (sk && !vhost_net_buf_is_empty(rxq)) {
+ file = sk->file;
+ switch (nvq->type) {
+ case TUN:
+ tun_ring_unconsume(file,
+ rxq->queue + rxq->head,
+ vhost_net_buf_get_size(rxq),
+ tun_ptr_free);
+ break;
+ case TAP:
+ tap_ring_unconsume(file,
+ rxq->queue + rxq->head,
+ vhost_net_buf_get_size(rxq),
+ tun_ptr_free);
+ break;
+ case IF_NONE:
+ return;
+ }
rxq->head = rxq->tail = 0;
}
}
@@ -209,14 +239,15 @@ static int vhost_net_buf_peek_len(void *ptr)
return __skb_array_len_with_tag(ptr);
}
-static int vhost_net_buf_peek(struct vhost_net_virtqueue *nvq)
+static int vhost_net_buf_peek(struct vhost_net_virtqueue *nvq,
+ struct sock *sk)
{
struct vhost_net_buf *rxq = &nvq->rxq;
if (!vhost_net_buf_is_empty(rxq))
goto out;
- if (!vhost_net_buf_produce(nvq))
+ if (!vhost_net_buf_produce(nvq, sk))
return 0;
out:
@@ -998,8 +1029,8 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
int len = 0;
unsigned long flags;
- if (rvq->rx_ring)
- return vhost_net_buf_peek(rvq);
+ if (rvq->type)
+ return vhost_net_buf_peek(rvq, sk);
spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
head = skb_peek(&sk->sk_receive_queue);
@@ -1207,7 +1238,7 @@ static void handle_rx(struct vhost_net *net)
goto out;
}
busyloop_intr = false;
- if (nvq->rx_ring)
+ if (nvq->type)
msg.msg_control = vhost_net_buf_consume(&nvq->rxq);
/* On overrun, truncate and discard */
if (unlikely(headcount > UIO_MAXIOV)) {
@@ -1363,7 +1394,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
n->vqs[i].batched_xdp = 0;
n->vqs[i].vhost_hlen = 0;
n->vqs[i].sock_hlen = 0;
- n->vqs[i].rx_ring = NULL;
+ n->vqs[i].type = IF_NONE;
vhost_net_buf_init(&n->vqs[i].rxq);
}
vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX,
@@ -1393,8 +1424,8 @@ static struct socket *vhost_net_stop_vq(struct vhost_net *n,
sock = vhost_vq_get_backend(vq);
vhost_net_disable_vq(n, vq);
vhost_vq_set_backend(vq, NULL);
- vhost_net_buf_unproduce(nvq);
- nvq->rx_ring = NULL;
+ vhost_net_buf_unproduce(nvq, sock);
+ nvq->type = IF_NONE;
mutex_unlock(&vq->mutex);
return sock;
}
@@ -1474,18 +1505,13 @@ static struct socket *get_raw_socket(int fd)
return ERR_PTR(r);
}
-static struct ptr_ring *get_tap_ptr_ring(struct file *file)
+static enum if_type get_if_type(struct file *file)
{
- struct ptr_ring *ring;
- ring = tun_get_tx_ring(file);
- if (!IS_ERR(ring))
- goto out;
- ring = tap_get_ptr_ring(file);
- if (!IS_ERR(ring))
- goto out;
- ring = NULL;
-out:
- return ring;
+ if (is_tap_file(file))
+ return TAP;
+ if (is_tun_file(file))
+ return TUN;
+ return IF_NONE;
}
static struct socket *get_tap_socket(int fd)
@@ -1567,7 +1593,7 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
vhost_net_disable_vq(n, vq);
vhost_vq_set_backend(vq, sock);
- vhost_net_buf_unproduce(nvq);
+ vhost_net_buf_unproduce(nvq, sock);
r = vhost_vq_init_access(vq);
if (r)
goto err_used;
@@ -1576,9 +1602,9 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
goto err_used;
if (index == VHOST_NET_VQ_RX) {
if (sock)
- nvq->rx_ring = get_tap_ptr_ring(sock->file);
+ nvq->type = get_if_type(sock->file);
else
- nvq->rx_ring = NULL;
+ nvq->type = IF_NONE;
}
oldubufs = nvq->ubufs;
--
2.43.0
^ permalink raw reply related [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers
2025-09-22 22:15 ` [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers Simon Schippers
@ 2025-09-23 14:14 ` kernel test robot
2025-09-26 13:47 ` kernel test robot
1 sibling, 0 replies; 37+ messages in thread
From: kernel test robot @ 2025-09-23 14:14 UTC (permalink / raw)
To: Simon Schippers, willemdebruijn.kernel, jasowang, mst, eperezma,
stephen, leiyang, netdev, linux-kernel, virtualization, kvm
Cc: llvm, oe-kbuild-all, Simon Schippers, Tim Gebauer
Hi Simon,
kernel test robot noticed the following build errors:
[auto build test ERROR on net-next/main]
url: https://github.com/intel-lab-lkp/linux/commits/Simon-Schippers/__ptr_ring_full_next-Returns-if-ring-will-be-full-after-next-insertion/20250923-062130
base: net-next/main
patch link: https://lore.kernel.org/r/20250922221553.47802-9-simon.schippers%40tu-dortmund.de
patch subject: [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers
config: i386-buildonly-randconfig-003-20250923 (https://download.01.org/0day-ci/archive/20250923/202509232113.Z0qRJQHs-lkp@intel.com/config)
compiler: clang version 20.1.8 (https://github.com/llvm/llvm-project 87f0227cb60147a26a1eeb4fb06e3b505e9c7261)
reproduce (this is a W=1 build): (https://download.01.org/0day-ci/archive/20250923/202509232113.Z0qRJQHs-lkp@intel.com/reproduce)
If you fix the issue in a separate patch/commit (i.e. not just a new version of
the same patch/commit), kindly add following tags
| Reported-by: kernel test robot <lkp@intel.com>
| Closes: https://lore.kernel.org/oe-kbuild-all/202509232113.Z0qRJQHs-lkp@intel.com/
All errors (new ones prefixed by >>):
>> drivers/vhost/net.c:197:3: error: expected expression
197 | WARN_ON_ONCE();
| ^
include/asm-generic/bug.h:111:34: note: expanded from macro 'WARN_ON_ONCE'
111 | int __ret_warn_on = !!(condition); \
| ^
1 error generated.
vim +197 drivers/vhost/net.c
178
179 static int vhost_net_buf_produce(struct vhost_net_virtqueue *nvq,
180 struct sock *sk)
181 {
182 struct file *file = sk->sk_socket->file;
183 struct vhost_net_buf *rxq = &nvq->rxq;
184
185 rxq->head = 0;
186
187 switch (nvq->type) {
188 case TUN:
189 rxq->tail = tun_ring_consume_batched(file,
190 rxq->queue, VHOST_NET_BATCH);
191 break;
192 case TAP:
193 rxq->tail = tap_ring_consume_batched(file,
194 rxq->queue, VHOST_NET_BATCH);
195 break;
196 case IF_NONE:
> 197 WARN_ON_ONCE();
198 }
199
200 return rxq->tail;
201 }
202
--
0-DAY CI Kernel Test Service
https://github.com/intel/lkp-tests/wiki
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers
2025-09-22 22:15 ` [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers Simon Schippers
2025-09-23 14:14 ` kernel test robot
@ 2025-09-26 13:47 ` kernel test robot
1 sibling, 0 replies; 37+ messages in thread
From: kernel test robot @ 2025-09-26 13:47 UTC (permalink / raw)
To: Simon Schippers, willemdebruijn.kernel, jasowang, mst, eperezma,
stephen, leiyang, netdev, linux-kernel, virtualization, kvm
Cc: llvm, oe-kbuild-all, Simon Schippers, Tim Gebauer
Hi Simon,
kernel test robot noticed the following build errors:
[auto build test ERROR on net-next/main]
url: https://github.com/intel-lab-lkp/linux/commits/Simon-Schippers/__ptr_ring_full_next-Returns-if-ring-will-be-full-after-next-insertion/20250924-180633
base: net-next/main
patch link: https://lore.kernel.org/r/20250922221553.47802-9-simon.schippers%40tu-dortmund.de
patch subject: [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers
config: x86_64-kexec (https://download.01.org/0day-ci/archive/20250926/202509262102.4AmV1Mms-lkp@intel.com/config)
compiler: clang version 20.1.8 (https://github.com/llvm/llvm-project 87f0227cb60147a26a1eeb4fb06e3b505e9c7261)
reproduce (this is a W=1 build): (https://download.01.org/0day-ci/archive/20250926/202509262102.4AmV1Mms-lkp@intel.com/reproduce)
If you fix the issue in a separate patch/commit (i.e. not just a new version of
the same patch/commit), kindly add following tags
| Reported-by: kernel test robot <lkp@intel.com>
| Closes: https://lore.kernel.org/oe-kbuild-all/202509262102.4AmV1Mms-lkp@intel.com/
All errors (new ones prefixed by >>):
>> drivers/vhost/net.c:197:3: error: expected expression
197 | WARN_ON_ONCE();
| ^
include/asm-generic/bug.h:111:34: note: expanded from macro 'WARN_ON_ONCE'
111 | int __ret_warn_on = !!(condition); \
| ^
1 error generated.
vim +197 drivers/vhost/net.c
178
179 static int vhost_net_buf_produce(struct vhost_net_virtqueue *nvq,
180 struct sock *sk)
181 {
182 struct file *file = sk->sk_socket->file;
183 struct vhost_net_buf *rxq = &nvq->rxq;
184
185 rxq->head = 0;
186
187 switch (nvq->type) {
188 case TUN:
189 rxq->tail = tun_ring_consume_batched(file,
190 rxq->queue, VHOST_NET_BATCH);
191 break;
192 case TAP:
193 rxq->tail = tap_ring_consume_batched(file,
194 rxq->queue, VHOST_NET_BATCH);
195 break;
196 case IF_NONE:
> 197 WARN_ON_ONCE();
198 }
199
200 return rxq->tail;
201 }
202
--
0-DAY CI Kernel Test Service
https://github.com/intel/lkp-tests/wiki
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
` (7 preceding siblings ...)
2025-09-22 22:15 ` [PATCH net-next v5 8/8] vhost_net: Replace rx_ring with calls of TUN/TAP wrappers Simon Schippers
@ 2025-09-23 14:55 ` Michael S. Tsirkin
2025-09-24 5:59 ` Simon Schippers
2025-09-24 7:18 ` Michael S. Tsirkin
9 siblings, 1 reply; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-23 14:55 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> This patch series deals with TUN, TAP and vhost_net which drop incoming
> SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> patch series, the associated netdev queue is stopped before this happens.
> This allows the connected qdisc to function correctly as reported by [1]
> and improves application-layer performance, see our paper [2]. Meanwhile
> the theoretical performance differs only slightly:
>
> +------------------------+----------+----------+
> | pktgen benchmarks | Stock | Patched |
> | i5 6300HQ, 20M packets | | |
> +------------------------+----------+----------+
> | TAP | 2.10Mpps | 1.99Mpps |
> +------------------------+----------+----------+
> | TAP+vhost_net | 6.05Mpps | 6.14Mpps |
> +------------------------+----------+----------+
> | Note: Patched had no TX drops at all, |
> | while stock suffered numerous drops. |
> +----------------------------------------------+
>
> This patch series includes TUN, TAP, and vhost_net because they share
> logic. Adjusting only one of them would break the others. Therefore, the
> patch series is structured as follows:
> 1+2: New ptr_ring helpers for 3 & 4
> 3: TUN & TAP: Stop netdev queue upon reaching a full ptr_ring
so what happens if you only apply patches 1-3?
> 4: TUN & TAP: Wake netdev queue after consuming an entry
> 5+6+7: TUN & TAP: ptr_ring wrappers and other helpers to be called by
> vhost_net
> 8: vhost_net: Call the wrappers & helpers
>
> Possible future work:
> - Introduction of Byte Queue Limits as suggested by Stephen Hemminger
> - Adaption of the netdev queue flow control for ipvtap & macvtap
>
> [1] Link:
> https://unix.stackexchange.com/questions/762935/traffic-shaping-ineffective-on-tun-device
> [2] Link:
> https://cni.etit.tu-dortmund.de/storages/cni-etit/r/Research/Publications/2025/Gebauer_2025_VTCFall/Gebauer_VTCFall2025_AuthorsVersion.pdf
>
> Links to previous versions:
> V4:
> https://lore.kernel.org/netdev/20250902080957.47265-1-simon.schippers@tu-dortmund.de/T/#u
> V3:
> https://lore.kernel.org/netdev/20250825211832.84901-1-simon.schippers@tu-dortmund.de/T/#u
> V2:
> https://lore.kernel.org/netdev/20250811220430.14063-1-simon.schippers@tu-dortmund.de/T/#u
> V1:
> https://lore.kernel.org/netdev/20250808153721.261334-1-simon.schippers@tu-dortmund.de/T/#u
>
> Changelog:
> V4 -> V5:
> - Stop the netdev queue prior to producing the final fitting ptr_ring entry
> -> Ensures the consumer has the latest netdev queue state, making it safe
> to wake the queue
> -> Resolves an issue in vhost_net where the netdev queue could remain
> stopped despite being empty
> -> For TUN/TAP, the netdev queue no longer needs to be woken in the
> blocking loop
> -> Introduces new helpers __ptr_ring_full_next and
> __ptr_ring_will_invalidate for this purpose
>
> - vhost_net now uses wrappers of TUN/TAP for ptr_ring consumption rather
> than maintaining its own rx_ring pointer
>
> V3 -> V4:
> - Target net-next instead of net
> - Changed to patch series instead of single patch
> - Changed to new title from old title
> "TUN/TAP: Improving throughput and latency by avoiding SKB drops"
> - Wake netdev queue with new helpers wake_netdev_queue when there is any
> spare capacity in the ptr_ring instead of waiting for it to be empty
> - Use tun_file instead of tun_struct in tun_ring_recv as a more consistent
> logic
> - Use smp_wmb() and smp_rmb() barrier pair, which avoids any packet drops
> that happened rarely before
> - Use safer logic for vhost_net using RCU read locks to access TUN/TAP data
>
> V2 -> V3: Added support for TAP and TAP+vhost_net.
>
> V1 -> V2: Removed NETDEV_TX_BUSY return case in tun_net_xmit and removed
> unnecessary netif_tx_wake_queue in tun_ring_recv.
>
> Thanks,
> Simon :)
>
> Simon Schippers (8):
> __ptr_ring_full_next: Returns if ring will be full after next
> insertion
> Move the decision of invalidation out of __ptr_ring_discard_one
> TUN, TAP & vhost_net: Stop netdev queue before reaching a full
> ptr_ring
> TUN & TAP: Wake netdev queue after consuming an entry
> TUN & TAP: Provide ptr_ring_consume_batched wrappers for vhost_net
> TUN & TAP: Provide ptr_ring_unconsume wrappers for vhost_net
> TUN & TAP: Methods to determine whether file is TUN/TAP for vhost_net
> vhost_net: Replace rx_ring with calls of TUN/TAP wrappers
>
> drivers/net/tap.c | 115 +++++++++++++++++++++++++++++++--
> drivers/net/tun.c | 136 +++++++++++++++++++++++++++++++++++----
> drivers/vhost/net.c | 90 +++++++++++++++++---------
> include/linux/if_tap.h | 15 +++++
> include/linux/if_tun.h | 18 ++++++
> include/linux/ptr_ring.h | 54 +++++++++++++---
> 6 files changed, 367 insertions(+), 61 deletions(-)
>
> --
> 2.43.0
^ permalink raw reply [flat|nested] 37+ messages in thread* [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-23 14:55 ` [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Michael S. Tsirkin
@ 2025-09-24 5:59 ` Simon Schippers
2025-09-24 6:12 ` Michael S. Tsirkin
0 siblings, 1 reply; 37+ messages in thread
From: Simon Schippers @ 2025-09-24 5:59 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
On 23.09.25 16:55, Michael S. Tsirkin wrote:
> On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
>> This patch series deals with TUN, TAP and vhost_net which drop incoming
>> SKBs whenever their internal ptr_ring buffer is full. Instead, with this
>> patch series, the associated netdev queue is stopped before this happens.
>> This allows the connected qdisc to function correctly as reported by [1]
>> and improves application-layer performance, see our paper [2]. Meanwhile
>> the theoretical performance differs only slightly:
>>
>> +------------------------+----------+----------+
>> | pktgen benchmarks | Stock | Patched |
>> | i5 6300HQ, 20M packets | | |
>> +------------------------+----------+----------+
>> | TAP | 2.10Mpps | 1.99Mpps |
>> +------------------------+----------+----------+
>> | TAP+vhost_net | 6.05Mpps | 6.14Mpps |
>> +------------------------+----------+----------+
>> | Note: Patched had no TX drops at all, |
>> | while stock suffered numerous drops. |
>> +----------------------------------------------+
>>
>> This patch series includes TUN, TAP, and vhost_net because they share
>> logic. Adjusting only one of them would break the others. Therefore, the
>> patch series is structured as follows:
>> 1+2: New ptr_ring helpers for 3 & 4
>> 3: TUN & TAP: Stop netdev queue upon reaching a full ptr_ring
>
>
> so what happens if you only apply patches 1-3?
>
The netdev queue of vhost_net would be stopped by tun_net_xmit but will
never be woken again.
>> 4: TUN & TAP: Wake netdev queue after consuming an entry
>> 5+6+7: TUN & TAP: ptr_ring wrappers and other helpers to be called by
>> vhost_net
>> 8: vhost_net: Call the wrappers & helpers
>>
>> Possible future work:
>> - Introduction of Byte Queue Limits as suggested by Stephen Hemminger
>> - Adaption of the netdev queue flow control for ipvtap & macvtap
>>
>> [1] Link:
>> https://unix.stackexchange.com/questions/762935/traffic-shaping-ineffective-on-tun-device
>> [2] Link:
>> https://cni.etit.tu-dortmund.de/storages/cni-etit/r/Research/Publications/2025/Gebauer_2025_VTCFall/Gebauer_VTCFall2025_AuthorsVersion.pdf
>>
>> Links to previous versions:
>> V4:
>> https://lore.kernel.org/netdev/20250902080957.47265-1-simon.schippers@tu-dortmund.de/T/#u
>> V3:
>> https://lore.kernel.org/netdev/20250825211832.84901-1-simon.schippers@tu-dortmund.de/T/#u
>> V2:
>> https://lore.kernel.org/netdev/20250811220430.14063-1-simon.schippers@tu-dortmund.de/T/#u
>> V1:
>> https://lore.kernel.org/netdev/20250808153721.261334-1-simon.schippers@tu-dortmund.de/T/#u
>>
>> Changelog:
>> V4 -> V5:
>> - Stop the netdev queue prior to producing the final fitting ptr_ring entry
>> -> Ensures the consumer has the latest netdev queue state, making it safe
>> to wake the queue
>> -> Resolves an issue in vhost_net where the netdev queue could remain
>> stopped despite being empty
>> -> For TUN/TAP, the netdev queue no longer needs to be woken in the
>> blocking loop
>> -> Introduces new helpers __ptr_ring_full_next and
>> __ptr_ring_will_invalidate for this purpose
>>
>> - vhost_net now uses wrappers of TUN/TAP for ptr_ring consumption rather
>> than maintaining its own rx_ring pointer
>>
>> V3 -> V4:
>> - Target net-next instead of net
>> - Changed to patch series instead of single patch
>> - Changed to new title from old title
>> "TUN/TAP: Improving throughput and latency by avoiding SKB drops"
>> - Wake netdev queue with new helpers wake_netdev_queue when there is any
>> spare capacity in the ptr_ring instead of waiting for it to be empty
>> - Use tun_file instead of tun_struct in tun_ring_recv as a more consistent
>> logic
>> - Use smp_wmb() and smp_rmb() barrier pair, which avoids any packet drops
>> that happened rarely before
>> - Use safer logic for vhost_net using RCU read locks to access TUN/TAP data
>>
>> V2 -> V3: Added support for TAP and TAP+vhost_net.
>>
>> V1 -> V2: Removed NETDEV_TX_BUSY return case in tun_net_xmit and removed
>> unnecessary netif_tx_wake_queue in tun_ring_recv.
>>
>> Thanks,
>> Simon :)
>>
>> Simon Schippers (8):
>> __ptr_ring_full_next: Returns if ring will be full after next
>> insertion
>> Move the decision of invalidation out of __ptr_ring_discard_one
>> TUN, TAP & vhost_net: Stop netdev queue before reaching a full
>> ptr_ring
>> TUN & TAP: Wake netdev queue after consuming an entry
>> TUN & TAP: Provide ptr_ring_consume_batched wrappers for vhost_net
>> TUN & TAP: Provide ptr_ring_unconsume wrappers for vhost_net
>> TUN & TAP: Methods to determine whether file is TUN/TAP for vhost_net
>> vhost_net: Replace rx_ring with calls of TUN/TAP wrappers
>>
>> drivers/net/tap.c | 115 +++++++++++++++++++++++++++++++--
>> drivers/net/tun.c | 136 +++++++++++++++++++++++++++++++++++----
>> drivers/vhost/net.c | 90 +++++++++++++++++---------
>> include/linux/if_tap.h | 15 +++++
>> include/linux/if_tun.h | 18 ++++++
>> include/linux/ptr_ring.h | 54 +++++++++++++---
>> 6 files changed, 367 insertions(+), 61 deletions(-)
>>
>> --
>> 2.43.0
>
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-24 5:59 ` Simon Schippers
@ 2025-09-24 6:12 ` Michael S. Tsirkin
0 siblings, 0 replies; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 6:12 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
On Wed, Sep 24, 2025 at 07:59:46AM +0200, Simon Schippers wrote:
> On 23.09.25 16:55, Michael S. Tsirkin wrote:
> > On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> >> This patch series deals with TUN, TAP and vhost_net which drop incoming
> >> SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> >> patch series, the associated netdev queue is stopped before this happens.
> >> This allows the connected qdisc to function correctly as reported by [1]
> >> and improves application-layer performance, see our paper [2]. Meanwhile
> >> the theoretical performance differs only slightly:
> >>
> >> +------------------------+----------+----------+
> >> | pktgen benchmarks | Stock | Patched |
> >> | i5 6300HQ, 20M packets | | |
> >> +------------------------+----------+----------+
> >> | TAP | 2.10Mpps | 1.99Mpps |
> >> +------------------------+----------+----------+
> >> | TAP+vhost_net | 6.05Mpps | 6.14Mpps |
> >> +------------------------+----------+----------+
> >> | Note: Patched had no TX drops at all, |
> >> | while stock suffered numerous drops. |
> >> +----------------------------------------------+
> >>
> >> This patch series includes TUN, TAP, and vhost_net because they share
> >> logic. Adjusting only one of them would break the others. Therefore, the
> >> patch series is structured as follows:
> >> 1+2: New ptr_ring helpers for 3 & 4
> >> 3: TUN & TAP: Stop netdev queue upon reaching a full ptr_ring
> >
> >
> > so what happens if you only apply patches 1-3?
> >
>
> The netdev queue of vhost_net would be stopped by tun_net_xmit but will
> never be woken again.
So this breaks bisect. Don't split patches like this please.
> >> 4: TUN & TAP: Wake netdev queue after consuming an entry
> >> 5+6+7: TUN & TAP: ptr_ring wrappers and other helpers to be called by
> >> vhost_net
> >> 8: vhost_net: Call the wrappers & helpers
> >>
> >> Possible future work:
> >> - Introduction of Byte Queue Limits as suggested by Stephen Hemminger
> >> - Adaption of the netdev queue flow control for ipvtap & macvtap
> >>
> >> [1] Link:
> >> https://unix.stackexchange.com/questions/762935/traffic-shaping-ineffective-on-tun-device
> >> [2] Link:
> >> https://cni.etit.tu-dortmund.de/storages/cni-etit/r/Research/Publications/2025/Gebauer_2025_VTCFall/Gebauer_VTCFall2025_AuthorsVersion.pdf
> >>
> >> Links to previous versions:
> >> V4:
> >> https://lore.kernel.org/netdev/20250902080957.47265-1-simon.schippers@tu-dortmund.de/T/#u
> >> V3:
> >> https://lore.kernel.org/netdev/20250825211832.84901-1-simon.schippers@tu-dortmund.de/T/#u
> >> V2:
> >> https://lore.kernel.org/netdev/20250811220430.14063-1-simon.schippers@tu-dortmund.de/T/#u
> >> V1:
> >> https://lore.kernel.org/netdev/20250808153721.261334-1-simon.schippers@tu-dortmund.de/T/#u
> >>
> >> Changelog:
> >> V4 -> V5:
> >> - Stop the netdev queue prior to producing the final fitting ptr_ring entry
> >> -> Ensures the consumer has the latest netdev queue state, making it safe
> >> to wake the queue
> >> -> Resolves an issue in vhost_net where the netdev queue could remain
> >> stopped despite being empty
> >> -> For TUN/TAP, the netdev queue no longer needs to be woken in the
> >> blocking loop
> >> -> Introduces new helpers __ptr_ring_full_next and
> >> __ptr_ring_will_invalidate for this purpose
> >>
> >> - vhost_net now uses wrappers of TUN/TAP for ptr_ring consumption rather
> >> than maintaining its own rx_ring pointer
> >>
> >> V3 -> V4:
> >> - Target net-next instead of net
> >> - Changed to patch series instead of single patch
> >> - Changed to new title from old title
> >> "TUN/TAP: Improving throughput and latency by avoiding SKB drops"
> >> - Wake netdev queue with new helpers wake_netdev_queue when there is any
> >> spare capacity in the ptr_ring instead of waiting for it to be empty
> >> - Use tun_file instead of tun_struct in tun_ring_recv as a more consistent
> >> logic
> >> - Use smp_wmb() and smp_rmb() barrier pair, which avoids any packet drops
> >> that happened rarely before
> >> - Use safer logic for vhost_net using RCU read locks to access TUN/TAP data
> >>
> >> V2 -> V3: Added support for TAP and TAP+vhost_net.
> >>
> >> V1 -> V2: Removed NETDEV_TX_BUSY return case in tun_net_xmit and removed
> >> unnecessary netif_tx_wake_queue in tun_ring_recv.
> >>
> >> Thanks,
> >> Simon :)
> >>
> >> Simon Schippers (8):
> >> __ptr_ring_full_next: Returns if ring will be full after next
> >> insertion
> >> Move the decision of invalidation out of __ptr_ring_discard_one
> >> TUN, TAP & vhost_net: Stop netdev queue before reaching a full
> >> ptr_ring
> >> TUN & TAP: Wake netdev queue after consuming an entry
> >> TUN & TAP: Provide ptr_ring_consume_batched wrappers for vhost_net
> >> TUN & TAP: Provide ptr_ring_unconsume wrappers for vhost_net
> >> TUN & TAP: Methods to determine whether file is TUN/TAP for vhost_net
> >> vhost_net: Replace rx_ring with calls of TUN/TAP wrappers
> >>
> >> drivers/net/tap.c | 115 +++++++++++++++++++++++++++++++--
> >> drivers/net/tun.c | 136 +++++++++++++++++++++++++++++++++++----
> >> drivers/vhost/net.c | 90 +++++++++++++++++---------
> >> include/linux/if_tap.h | 15 +++++
> >> include/linux/if_tun.h | 18 ++++++
> >> include/linux/ptr_ring.h | 54 +++++++++++++---
> >> 6 files changed, 367 insertions(+), 61 deletions(-)
> >>
> >> --
> >> 2.43.0
> >
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-22 22:15 [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Simon Schippers
` (8 preceding siblings ...)
2025-09-23 14:55 ` [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop Michael S. Tsirkin
@ 2025-09-24 7:18 ` Michael S. Tsirkin
2025-09-24 7:33 ` Jason Wang
9 siblings, 1 reply; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 7:18 UTC (permalink / raw)
To: Simon Schippers
Cc: willemdebruijn.kernel, jasowang, eperezma, stephen, leiyang,
netdev, linux-kernel, virtualization, kvm
On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> This patch series deals with TUN, TAP and vhost_net which drop incoming
> SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> patch series, the associated netdev queue is stopped before this happens.
> This allows the connected qdisc to function correctly as reported by [1]
> and improves application-layer performance, see our paper [2]. Meanwhile
> the theoretical performance differs only slightly:
About this whole approach.
What if userspace is not consuming packets?
Won't the watchdog warnings appear?
Is it safe to allow userspace to block a tx queue
indefinitely?
--
MST
^ permalink raw reply [flat|nested] 37+ messages in thread* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-24 7:18 ` Michael S. Tsirkin
@ 2025-09-24 7:33 ` Jason Wang
2025-09-24 7:41 ` Michael S. Tsirkin
0 siblings, 1 reply; 37+ messages in thread
From: Jason Wang @ 2025-09-24 7:33 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: Simon Schippers, willemdebruijn.kernel, eperezma, stephen,
leiyang, netdev, linux-kernel, virtualization, kvm
On Wed, Sep 24, 2025 at 3:18 PM Michael S. Tsirkin <mst@redhat.com> wrote:
>
> On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> > This patch series deals with TUN, TAP and vhost_net which drop incoming
> > SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> > patch series, the associated netdev queue is stopped before this happens.
> > This allows the connected qdisc to function correctly as reported by [1]
> > and improves application-layer performance, see our paper [2]. Meanwhile
> > the theoretical performance differs only slightly:
>
>
> About this whole approach.
> What if userspace is not consuming packets?
> Won't the watchdog warnings appear?
> Is it safe to allow userspace to block a tx queue
> indefinitely?
I think it's safe as it's a userspace device, there's no way to
guarantee the userspace can process the packet in time (so no watchdog
for TUN).
Thanks
>
> --
> MST
>
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-24 7:33 ` Jason Wang
@ 2025-09-24 7:41 ` Michael S. Tsirkin
2025-09-24 8:08 ` Jason Wang
0 siblings, 1 reply; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 7:41 UTC (permalink / raw)
To: Jason Wang
Cc: Simon Schippers, willemdebruijn.kernel, eperezma, stephen,
leiyang, netdev, linux-kernel, virtualization, kvm
On Wed, Sep 24, 2025 at 03:33:08PM +0800, Jason Wang wrote:
> On Wed, Sep 24, 2025 at 3:18 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> >
> > On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> > > This patch series deals with TUN, TAP and vhost_net which drop incoming
> > > SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> > > patch series, the associated netdev queue is stopped before this happens.
> > > This allows the connected qdisc to function correctly as reported by [1]
> > > and improves application-layer performance, see our paper [2]. Meanwhile
> > > the theoretical performance differs only slightly:
> >
> >
> > About this whole approach.
> > What if userspace is not consuming packets?
> > Won't the watchdog warnings appear?
> > Is it safe to allow userspace to block a tx queue
> > indefinitely?
>
> I think it's safe as it's a userspace device, there's no way to
> guarantee the userspace can process the packet in time (so no watchdog
> for TUN).
>
> Thanks
Hmm. Anyway, I guess if we ever want to enable timeout for tun,
we can worry about it then. Does not need to block this patchset.
> >
> > --
> > MST
> >
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-24 7:41 ` Michael S. Tsirkin
@ 2025-09-24 8:08 ` Jason Wang
2025-09-24 8:09 ` Michael S. Tsirkin
0 siblings, 1 reply; 37+ messages in thread
From: Jason Wang @ 2025-09-24 8:08 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: Simon Schippers, willemdebruijn.kernel, eperezma, stephen,
leiyang, netdev, linux-kernel, virtualization, kvm
On Wed, Sep 24, 2025 at 3:42 PM Michael S. Tsirkin <mst@redhat.com> wrote:
>
> On Wed, Sep 24, 2025 at 03:33:08PM +0800, Jason Wang wrote:
> > On Wed, Sep 24, 2025 at 3:18 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> > >
> > > On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> > > > This patch series deals with TUN, TAP and vhost_net which drop incoming
> > > > SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> > > > patch series, the associated netdev queue is stopped before this happens.
> > > > This allows the connected qdisc to function correctly as reported by [1]
> > > > and improves application-layer performance, see our paper [2]. Meanwhile
> > > > the theoretical performance differs only slightly:
> > >
> > >
> > > About this whole approach.
> > > What if userspace is not consuming packets?
> > > Won't the watchdog warnings appear?
> > > Is it safe to allow userspace to block a tx queue
> > > indefinitely?
> >
> > I think it's safe as it's a userspace device, there's no way to
> > guarantee the userspace can process the packet in time (so no watchdog
> > for TUN).
> >
> > Thanks
>
> Hmm. Anyway, I guess if we ever want to enable timeout for tun,
> we can worry about it then.
The problem is that the skb is freed until userspace calls recvmsg(),
so it would be tricky to implement a watchdog. (Or if we can do, we
can do BQL as well).
> Does not need to block this patchset.
Yes.
Thanks
>
> > >
> > > --
> > > MST
> > >
>
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-24 8:08 ` Jason Wang
@ 2025-09-24 8:09 ` Michael S. Tsirkin
2025-09-24 8:30 ` Jason Wang
0 siblings, 1 reply; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 8:09 UTC (permalink / raw)
To: Jason Wang
Cc: Simon Schippers, willemdebruijn.kernel, eperezma, stephen,
leiyang, netdev, linux-kernel, virtualization, kvm
On Wed, Sep 24, 2025 at 04:08:33PM +0800, Jason Wang wrote:
> On Wed, Sep 24, 2025 at 3:42 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> >
> > On Wed, Sep 24, 2025 at 03:33:08PM +0800, Jason Wang wrote:
> > > On Wed, Sep 24, 2025 at 3:18 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> > > >
> > > > On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> > > > > This patch series deals with TUN, TAP and vhost_net which drop incoming
> > > > > SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> > > > > patch series, the associated netdev queue is stopped before this happens.
> > > > > This allows the connected qdisc to function correctly as reported by [1]
> > > > > and improves application-layer performance, see our paper [2]. Meanwhile
> > > > > the theoretical performance differs only slightly:
> > > >
> > > >
> > > > About this whole approach.
> > > > What if userspace is not consuming packets?
> > > > Won't the watchdog warnings appear?
> > > > Is it safe to allow userspace to block a tx queue
> > > > indefinitely?
> > >
> > > I think it's safe as it's a userspace device, there's no way to
> > > guarantee the userspace can process the packet in time (so no watchdog
> > > for TUN).
> > >
> > > Thanks
> >
> > Hmm. Anyway, I guess if we ever want to enable timeout for tun,
> > we can worry about it then.
>
> The problem is that the skb is freed until userspace calls recvmsg(),
> so it would be tricky to implement a watchdog. (Or if we can do, we
> can do BQL as well).
I thought the watchdog generally watches queues not individual skbs?
> > Does not need to block this patchset.
>
> Yes.
>
> Thanks
>
> >
> > > >
> > > > --
> > > > MST
> > > >
> >
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-24 8:09 ` Michael S. Tsirkin
@ 2025-09-24 8:30 ` Jason Wang
2025-09-24 8:54 ` Michael S. Tsirkin
0 siblings, 1 reply; 37+ messages in thread
From: Jason Wang @ 2025-09-24 8:30 UTC (permalink / raw)
To: Michael S. Tsirkin
Cc: Simon Schippers, willemdebruijn.kernel, eperezma, stephen,
leiyang, netdev, linux-kernel, virtualization, kvm
On Wed, Sep 24, 2025 at 4:10 PM Michael S. Tsirkin <mst@redhat.com> wrote:
>
> On Wed, Sep 24, 2025 at 04:08:33PM +0800, Jason Wang wrote:
> > On Wed, Sep 24, 2025 at 3:42 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> > >
> > > On Wed, Sep 24, 2025 at 03:33:08PM +0800, Jason Wang wrote:
> > > > On Wed, Sep 24, 2025 at 3:18 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> > > > >
> > > > > On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> > > > > > This patch series deals with TUN, TAP and vhost_net which drop incoming
> > > > > > SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> > > > > > patch series, the associated netdev queue is stopped before this happens.
> > > > > > This allows the connected qdisc to function correctly as reported by [1]
> > > > > > and improves application-layer performance, see our paper [2]. Meanwhile
> > > > > > the theoretical performance differs only slightly:
> > > > >
> > > > >
> > > > > About this whole approach.
> > > > > What if userspace is not consuming packets?
> > > > > Won't the watchdog warnings appear?
> > > > > Is it safe to allow userspace to block a tx queue
> > > > > indefinitely?
> > > >
> > > > I think it's safe as it's a userspace device, there's no way to
> > > > guarantee the userspace can process the packet in time (so no watchdog
> > > > for TUN).
> > > >
> > > > Thanks
> > >
> > > Hmm. Anyway, I guess if we ever want to enable timeout for tun,
> > > we can worry about it then.
> >
> > The problem is that the skb is freed until userspace calls recvmsg(),
> > so it would be tricky to implement a watchdog. (Or if we can do, we
> > can do BQL as well).
>
> I thought the watchdog generally watches queues not individual skbs?
Yes, but only if ndo_tx_timeout is implemented.
I mean it would be tricky if we want to implement ndo_tx_timeout since
we can't choose a good timeout.
Thanks
^ permalink raw reply [flat|nested] 37+ messages in thread
* Re: [PATCH net-next v5 0/8] TUN/TAP & vhost_net: netdev queue flow control to avoid ptr_ring tail drop
2025-09-24 8:30 ` Jason Wang
@ 2025-09-24 8:54 ` Michael S. Tsirkin
0 siblings, 0 replies; 37+ messages in thread
From: Michael S. Tsirkin @ 2025-09-24 8:54 UTC (permalink / raw)
To: Jason Wang
Cc: Simon Schippers, willemdebruijn.kernel, eperezma, stephen,
leiyang, netdev, linux-kernel, virtualization, kvm
On Wed, Sep 24, 2025 at 04:30:45PM +0800, Jason Wang wrote:
> On Wed, Sep 24, 2025 at 4:10 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> >
> > On Wed, Sep 24, 2025 at 04:08:33PM +0800, Jason Wang wrote:
> > > On Wed, Sep 24, 2025 at 3:42 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> > > >
> > > > On Wed, Sep 24, 2025 at 03:33:08PM +0800, Jason Wang wrote:
> > > > > On Wed, Sep 24, 2025 at 3:18 PM Michael S. Tsirkin <mst@redhat.com> wrote:
> > > > > >
> > > > > > On Tue, Sep 23, 2025 at 12:15:45AM +0200, Simon Schippers wrote:
> > > > > > > This patch series deals with TUN, TAP and vhost_net which drop incoming
> > > > > > > SKBs whenever their internal ptr_ring buffer is full. Instead, with this
> > > > > > > patch series, the associated netdev queue is stopped before this happens.
> > > > > > > This allows the connected qdisc to function correctly as reported by [1]
> > > > > > > and improves application-layer performance, see our paper [2]. Meanwhile
> > > > > > > the theoretical performance differs only slightly:
> > > > > >
> > > > > >
> > > > > > About this whole approach.
> > > > > > What if userspace is not consuming packets?
> > > > > > Won't the watchdog warnings appear?
> > > > > > Is it safe to allow userspace to block a tx queue
> > > > > > indefinitely?
> > > > >
> > > > > I think it's safe as it's a userspace device, there's no way to
> > > > > guarantee the userspace can process the packet in time (so no watchdog
> > > > > for TUN).
> > > > >
> > > > > Thanks
> > > >
> > > > Hmm. Anyway, I guess if we ever want to enable timeout for tun,
> > > > we can worry about it then.
> > >
> > > The problem is that the skb is freed until userspace calls recvmsg(),
> > > so it would be tricky to implement a watchdog. (Or if we can do, we
> > > can do BQL as well).
> >
> > I thought the watchdog generally watches queues not individual skbs?
>
> Yes, but only if ndo_tx_timeout is implemented.
>
> I mean it would be tricky if we want to implement ndo_tx_timeout since
> we can't choose a good timeout.
>
> Thanks
userspace could supply that, thinkably. anyway, we can worry
about that when we need that.
--
MST
^ permalink raw reply [flat|nested] 37+ messages in thread