Netdev List
 help / color / mirror / Atom feed
* Re: [net-next PATCH 1/2] add iovnl netlink support
From: David Miller @ 2010-04-22 10:56 UTC (permalink / raw)
  To: arnd; +Cc: scofeldm, netdev, chrisw
In-Reply-To: <201004221253.11290.arnd@arndb.de>

From: Arnd Bergmann <arnd@arndb.de>
Date: Thu, 22 Apr 2010 12:53:11 +0200

> On Thursday 22 April 2010, David Miller wrote:
>> From: Scott Feldman <scofeldm@cisco.com>
>> Date: Mon, 19 Apr 2010 12:18:07 -0700
>> 
>> > +     if (tb[IOV_ATTR_VF_IFNAME])
>> > +             vf_dev = dev_get_by_name(&init_net,
>> > +                     nla_data(tb[IOV_ATTR_VF_IFNAME]));
>> 
>> It's probably best to check this for NULL and notify
>> the user with an error in that case (don't forget to
>> put 'dev' in that error path :-)
> 
> Since you brought up that hunk: shouldn't the namespace better
> be current->nsproxy->net_ns instead of init_ns? If the sender
> is confined in a separate network namespace, I would expect
> that it should be able to modify devices in its own namespace
> but none that are in the root namespace.

Yes, the namespace needs to be handled better.

But reading other parts of the discussion it seems that
IOV_ATTR_VF_IFNAME and some other bits will likely be
removed in the initial implementation of this stuff.

^ permalink raw reply

* Re: [net-next PATCH 1/2] add iovnl netlink support
From: Arnd Bergmann @ 2010-04-22 10:53 UTC (permalink / raw)
  To: David Miller; +Cc: scofeldm, netdev, chrisw
In-Reply-To: <20100421.235236.69366636.davem@davemloft.net>

On Thursday 22 April 2010, David Miller wrote:
> From: Scott Feldman <scofeldm@cisco.com>
> Date: Mon, 19 Apr 2010 12:18:07 -0700
> 
> > +     if (tb[IOV_ATTR_VF_IFNAME])
> > +             vf_dev = dev_get_by_name(&init_net,
> > +                     nla_data(tb[IOV_ATTR_VF_IFNAME]));
> 
> It's probably best to check this for NULL and notify
> the user with an error in that case (don't forget to
> put 'dev' in that error path :-)

Since you brought up that hunk: shouldn't the namespace better
be current->nsproxy->net_ns instead of init_ns? If the sender
is confined in a separate network namespace, I would expect
that it should be able to modify devices in its own namespace
but none that are in the root namespace.

	Arnd

^ permalink raw reply

* Re: [patch] rtnetlink: potential ERR_PTR dereference
From: Patrick McHardy @ 2010-04-22 10:35 UTC (permalink / raw)
  To: Dan Carpenter
  Cc: netdev, Eric Dumazet, Eric W. Biederman, Mitch Williams,
	David S. Miller, kernel-janitors
In-Reply-To: <20100422095327.GP29647@bicker>

Dan Carpenter wrote:
> In the original code, if rtnl_create_link() returned an ERR_PTR then that
> would get passed to rtnl_configure_link() which dereferences it.
> 
> Signed-off-by: Dan Carpenter <error27@gmail.com>
> ---
> Found by a static checker, and compile tested only.  :/

Looks fine to me.

Acked-by: Patrick McHardy <kaber@trash.net>

^ permalink raw reply

* [patch] rdma: potential ERR_PTR dereference
From: Dan Carpenter @ 2010-04-22  9:55 UTC (permalink / raw)
  To: Andy Grover; +Cc: David S. Miller, rds-devel, netdev, kernel-janitors

In the original code, the "goto out" calls "rdma_destroy_id(cm_id);"
That isn't needed here and would cause problems because "cm_id" is an 
ERR_PTR.  The new code just returns directly.

Signed-off-by: Dan Carpenter <error27@gmail.com>

diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c
index 9ece910..7b15508 100644
--- a/net/rds/rdma_transport.c
+++ b/net/rds/rdma_transport.c
@@ -134,7 +134,7 @@ static int __init rds_rdma_listen_init(void)
 		ret = PTR_ERR(cm_id);
 		printk(KERN_ERR "RDS/RDMA: failed to setup listener, "
 		       "rdma_create_id() returned %d\n", ret);
-		goto out;
+		return ret;
 	}
 
 	sin.sin_family = AF_INET,

^ permalink raw reply related

* [patch] rtnetlink: potential ERR_PTR dereference
From: Dan Carpenter @ 2010-04-22  9:53 UTC (permalink / raw)
  To: netdev
  Cc: Eric Dumazet, Patrick McHardy, Eric W. Biederman, Mitch Williams,
	David S. Miller, kernel-janitors

In the original code, if rtnl_create_link() returned an ERR_PTR then that
would get passed to rtnl_configure_link() which dereferences it.

Signed-off-by: Dan Carpenter <error27@gmail.com>
---
Found by a static checker, and compile tested only.  :/

diff --git a/net/core/rtnetlink.c b/net/core/rtnetlink.c
index 4568120..fe776c9 100644
--- a/net/core/rtnetlink.c
+++ b/net/core/rtnetlink.c
@@ -1270,10 +1270,11 @@ replay:
 			err = ops->newlink(net, dev, tb, data);
 		else
 			err = register_netdevice(dev);
-		if (err < 0 && !IS_ERR(dev)) {
+
+		if (err < 0 && !IS_ERR(dev))
 			free_netdev(dev);
+		if (err < 0)
 			goto out;
-		}
 
 		err = rtnl_configure_link(dev, ifm);
 		if (err < 0)

^ permalink raw reply related

* [patch] bluetooth: handle l2cap_create_connless_pdu() errors
From: Dan Carpenter @ 2010-04-22  9:52 UTC (permalink / raw)
  To: Marcel Holtmann
  Cc: David S. Miller, Gustavo F. Padovan, Andrei Emeltchenko,
	linux-bluetooth-u79uwXL29TY76Z2rM5mHXA,
	netdev-u79uwXL29TY76Z2rM5mHXA,
	kernel-janitors-u79uwXL29TY76Z2rM5mHXA

l2cap_create_connless_pdu() can sometimes return ERR_PTR(-ENOMEM) or
ERR_PTR(-EFAULT).

Signed-off-by: Dan Carpenter <error27-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>

diff --git a/net/bluetooth/l2cap.c b/net/bluetooth/l2cap.c
index 99d68c3..9753b69 100644
--- a/net/bluetooth/l2cap.c
+++ b/net/bluetooth/l2cap.c
@@ -1626,7 +1626,10 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
 	/* Connectionless channel */
 	if (sk->sk_type == SOCK_DGRAM) {
 		skb = l2cap_create_connless_pdu(sk, msg, len);
-		err = l2cap_do_send(sk, skb);
+		if (IS_ERR(skb))
+			err = PTR_ERR(skb);
+		else
+			err = l2cap_do_send(sk, skb);
 		goto done;
 	}
 

^ permalink raw reply related

* [patch] wimax: checking ERR_PTR vs null
From: Dan Carpenter @ 2010-04-22  9:50 UTC (permalink / raw)
  To: netdev
  Cc: Inaky Perez-Gonzalez, Alexey Dobriyan, Paulius Zaleckas,
	David S. Miller, wimax, kernel-janitors

stch_skb is allocated with wimax_gnl_re_state_change_alloc().  That
function returns ERR_PTRs on failure and doesn't return NULL.

Signed-off-by: Dan Carpenter <error27@gmail.com>

diff --git a/net/wimax/stack.c b/net/wimax/stack.c
index 1ed65db..62b1a66 100644
--- a/net/wimax/stack.c
+++ b/net/wimax/stack.c
@@ -315,7 +315,7 @@ void __wimax_state_change(struct wimax_dev *wimax_dev, enum wimax_st new_state)
 		BUG();
 	}
 	__wimax_state_set(wimax_dev, new_state);
-	if (stch_skb)
+	if (!IS_ERR(stch_skb))
 		wimax_gnl_re_state_change_send(wimax_dev, stch_skb, header);
 out:
 	d_fnend(3, dev, "(wimax_dev %p new_state %u [old %u]) = void\n",

^ permalink raw reply related

* Re: [RFC][PATCH v3 2/3] Provides multiple submits and asynchronous notifications.
From: Michael S. Tsirkin @ 2010-04-22  9:49 UTC (permalink / raw)
  To: xiaohui.xin; +Cc: arnd, netdev, kvm, linux-kernel, mingo, davem, jdike
In-Reply-To: <1271925436-4861-1-git-send-email-xiaohui.xin@intel.com>

On Thu, Apr 22, 2010 at 04:37:16PM +0800, xiaohui.xin@intel.com wrote:
> From: Xin Xiaohui <xiaohui.xin@intel.com>
> 
> The vhost-net backend now only supports synchronous send/recv
> operations. The patch provides multiple submits and asynchronous
> notifications. This is needed for zero-copy case.
> 
> Signed-off-by: Xin Xiaohui <xiaohui.xin@intel.com>
> ---
> 
> Michael,
> 
> >Can't vhost supply a kiocb completion callback that will handle the list?
> 
> Yes, thanks. And with it I also remove the vq->receiver finally.
> 
> Thanks
> Xiaohui

Nice progress. I commented on some minor issues below.
Thanks!

>  drivers/vhost/net.c   |  227 +++++++++++++++++++++++++++++++++++++++++++++++--
>  drivers/vhost/vhost.c |  115 ++++++++++++++-----------
>  drivers/vhost/vhost.h |   14 +++
>  3 files changed, 301 insertions(+), 55 deletions(-)
> 
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index 22d5fef..4a70f66 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -17,11 +17,13 @@
>  #include <linux/workqueue.h>
>  #include <linux/rcupdate.h>
>  #include <linux/file.h>
> +#include <linux/aio.h>
>  
>  #include <linux/net.h>
>  #include <linux/if_packet.h>
>  #include <linux/if_arp.h>
>  #include <linux/if_tun.h>
> +#include <linux/mpassthru.h>
>  #include <net/sock.h>
>  
> @@ -47,6 +49,7 @@ struct vhost_net {
>  	struct vhost_dev dev;
>  	struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
>  	struct vhost_poll poll[VHOST_NET_VQ_MAX];
> +	struct kmem_cache       *cache;
>  	/* Tells us whether we are polling a socket for TX.
>  	 * We only do this when socket buffer fills up.
>  	 * Protected by tx vq lock. */
> @@ -91,11 +94,132 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
>  	net->tx_poll_state = VHOST_NET_POLL_STARTED;
>  }
>  
> +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> +{
> +	struct kiocb *iocb = NULL;
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&vq->notify_lock, flags);
> +	if (!list_empty(&vq->notifier)) {
> +		iocb = list_first_entry(&vq->notifier,
> +				struct kiocb, ki_list);
> +		list_del(&iocb->ki_list);
> +	}
> +	spin_unlock_irqrestore(&vq->notify_lock, flags);
> +	return iocb;
> +}
> +
> +static void handle_iocb(struct kiocb *iocb)
> +{
> +	struct vhost_virtqueue *vq = iocb->private;
> +	unsigned long flags;
> +
> +        spin_lock_irqsave(&vq->notify_lock, flags);
> +        list_add_tail(&iocb->ki_list, &vq->notifier);
> +        spin_unlock_irqrestore(&vq->notify_lock, flags);
> +}
> +

checkpatch.pl does not complain about the above?

> +static void handle_async_rx_events_notify(struct vhost_net *net,
> +					 struct vhost_virtqueue *vq,
> +					 struct socket *sock)

continuation lines should start to the right of (.

> +{
> +	struct kiocb *iocb = NULL;
> +	struct vhost_log *vq_log = NULL;
> +	int rx_total_len = 0;
> +	unsigned int head, log, in, out;
> +	int size;
> +
> +	if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> +		return;
> +
> +	if (sock->sk->sk_data_ready)
> +		sock->sk->sk_data_ready(sock->sk, 0);
> +
> +	vq_log = unlikely(vhost_has_feature(
> +				&net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;

split the above line at ?, continuation being to the left of ( looks
ugly.

> +	while ((iocb = notify_dequeue(vq)) != NULL) {
> +		vhost_add_used_and_signal(&net->dev, vq,
> +				iocb->ki_pos, iocb->ki_nbytes);
> +		log = (int)(iocb->ki_user_data >> 32);

how about we always do the recompute step, and not encode
the log bit in ki_user_data?

> +		size = iocb->ki_nbytes;
> +		head = iocb->ki_pos;
> +		rx_total_len += iocb->ki_nbytes;
> +
> +		if (iocb->ki_dtor)
> +			iocb->ki_dtor(iocb);
> +		kmem_cache_free(net->cache, iocb);
> +
> +		/* when log is enabled, recomputing the log info is needed,
> +		 * since these buffers are in async queue, and may not get
> +		 * the log info before.
> +		 */
> +		if (unlikely(vq_log)) {
> +			if (!log)
> +				__vhost_get_vq_desc(&net->dev, vq, vq->iov,
> +						    ARRAY_SIZE(vq->iov),
> +						    &out, &in, vq_log,
> +						    &log, head);
> +			vhost_log_write(vq, vq_log, log, size);
> +		}
> +		if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> +			vhost_poll_queue(&vq->poll);
> +			break;
> +		}
> +	}
> +}
> +
> +static void handle_async_tx_events_notify(struct vhost_net *net,
> +					struct vhost_virtqueue *vq)
> +{
> +	struct kiocb *iocb = NULL;
> +	int tx_total_len = 0;
> +
> +	if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> +		return;
> +
> +	while ((iocb = notify_dequeue(vq)) != NULL) {
> +		vhost_add_used_and_signal(&net->dev, vq,
> +				iocb->ki_pos, 0);
> +		tx_total_len += iocb->ki_nbytes;
> +
> +		if (iocb->ki_dtor)
> +			iocb->ki_dtor(iocb);
> +
> +		kmem_cache_free(net->cache, iocb);
> +		if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> +			vhost_poll_queue(&vq->poll);
> +			break;
> +		}
> +	}
> +}
> +
> +static struct kiocb *create_iocb(struct vhost_net *net,
> +				 struct vhost_virtqueue *vq,
> +				 unsigned head, unsigned log)
> +{
> +	struct kiocb *iocb = NULL;
> +
> +	if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> +		return NULL; 
> +	iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> +	if (!iocb)
> +		return NULL;
> +	iocb->private = vq;
> +	iocb->ki_pos = head;
> +	iocb->ki_dtor = handle_iocb;
> +	if (vq == &net->dev.vqs[VHOST_NET_VQ_RX]) {
> +		iocb->ki_user_data = ((unsigned long)log << 32 | vq->num);
> +		iocb->ki_iovec = vq->hdr;
> +	}
> +	return iocb;
> +}
> +				 
>  /* Expects to be always run from workqueue - which acts as
>   * read-size critical section for our kind of RCU. */
>  static void handle_tx(struct vhost_net *net)
>  {
>  	struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
> +	struct kiocb *iocb = NULL;

do we need to init this?

>  	unsigned head, out, in, s;
>  	struct msghdr msg = {
>  		.msg_name = NULL,
> @@ -124,6 +248,8 @@ static void handle_tx(struct vhost_net *net)
>  		tx_poll_stop(net);
>  	hdr_size = vq->hdr_size;
>  
> +	handle_async_tx_events_notify(net, vq);
> +
>  	for (;;) {
>  		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
>  					 ARRAY_SIZE(vq->iov),
> @@ -151,6 +277,11 @@ static void handle_tx(struct vhost_net *net)
>  		/* Skip header. TODO: support TSO. */
>  		s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
>  		msg.msg_iovlen = out;
> +
> +		iocb = create_iocb(net, vq, head, 0);

For sync case, we can save some cycles by using iocb = NULL.

> +		if (vq->link_state == VHOST_VQ_LINK_ASYNC && !iocb)
> +			break;
> +
>  		len = iov_length(vq->iov, out);
>  		/* Sanity check */
>  		if (!len) {

Generally, I would like to reduce the number of places
where we do if (link_state == XXX) in code.
It should be possible to do this by splitting common code
out into functions.

> @@ -160,12 +291,18 @@ static void handle_tx(struct vhost_net *net)
>  			break;
>  		}
>  		/* TODO: Check specific error and bomb out unless ENOBUFS? */
> -		err = sock->ops->sendmsg(NULL, sock, &msg, len);
> +		err = sock->ops->sendmsg(iocb, sock, &msg, len);
>  		if (unlikely(err < 0)) {
> +			if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> +				kmem_cache_free(net->cache, iocb);
>  			vhost_discard_vq_desc(vq);
>  			tx_poll_start(net, sock);
>  			break;
>  		}
> +
> +		if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> +			continue;
> +
>  		if (err != len)
>  			pr_err("Truncated TX packet: "
>  			       " len %d != %zd\n", err, len);
> @@ -177,6 +314,8 @@ static void handle_tx(struct vhost_net *net)
>  		}
>  	}
>  
> +	handle_async_tx_events_notify(net, vq);
> +
>  	mutex_unlock(&vq->mutex);
>  	unuse_mm(net->dev.mm);
>  }
> @@ -186,6 +325,7 @@ static void handle_tx(struct vhost_net *net)
>  static void handle_rx(struct vhost_net *net)
>  {
>  	struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> +	struct kiocb *iocb = NULL;
>  	unsigned head, out, in, log, s;
>  	struct vhost_log *vq_log;
>  	struct msghdr msg = {
> @@ -206,7 +346,8 @@ static void handle_rx(struct vhost_net *net)
>  	int err;
>  	size_t hdr_size;
>  	struct socket *sock = rcu_dereference(vq->private_data);
> -	if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> +	if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> +			vq->link_state == VHOST_VQ_LINK_SYNC))
>  		return;
>  
>  	use_mm(net->dev.mm);
> @@ -214,9 +355,17 @@ static void handle_rx(struct vhost_net *net)
>  	vhost_disable_notify(vq);
>  	hdr_size = vq->hdr_size;
>  
> +	/* In async cases, when write log is enabled, in case the submitted
> +	 * buffers did not get log info before the log enabling, so we'd
> +	 * better recompute the log info when needed. We do this in
> +	 * handle_async_rx_events_notify().
> +	 */
> +
>  	vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
>  		vq->log : NULL;
>  
> +	handle_async_rx_events_notify(net, vq, sock);
> +
>  	for (;;) {
>  		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
>  					 ARRAY_SIZE(vq->iov),
> @@ -245,6 +394,11 @@ static void handle_rx(struct vhost_net *net)
>  		s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
>  		msg.msg_iovlen = in;
>  		len = iov_length(vq->iov, in);
> +
> +		iocb = create_iocb(net, vq, head, log);
> +		if (vq->link_state == VHOST_VQ_LINK_ASYNC && !iocb)
> +			break;
> +
>  		/* Sanity check */
>  		if (!len) {
>  			vq_err(vq, "Unexpected header len for RX: "
> @@ -252,13 +406,20 @@ static void handle_rx(struct vhost_net *net)
>  			       iov_length(vq->hdr, s), hdr_size);
>  			break;
>  		}
> -		err = sock->ops->recvmsg(NULL, sock, &msg,
> +
> +		err = sock->ops->recvmsg(iocb, sock, &msg,
>  					 len, MSG_DONTWAIT | MSG_TRUNC);
>  		/* TODO: Check specific error and bomb out unless EAGAIN? */
>  		if (err < 0) {
> +			if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> +				kmem_cache_free(net->cache, iocb);
>  			vhost_discard_vq_desc(vq);
>  			break;
>  		}
> +
> +		if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> +			continue;
> +
>  		/* TODO: Should check and handle checksum. */
>  		if (err > len) {
>  			pr_err("Discarded truncated rx packet: "
> @@ -284,10 +445,13 @@ static void handle_rx(struct vhost_net *net)
>  		}
>  	}
>  
> +	handle_async_rx_events_notify(net, vq, sock);
> +
>  	mutex_unlock(&vq->mutex);
>  	unuse_mm(net->dev.mm);
>  }
>  
> +

don't do this

>  static void handle_tx_kick(struct work_struct *work)
>  {
>  	struct vhost_virtqueue *vq;
> @@ -338,6 +502,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
>  	vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
>  	vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
>  	n->tx_poll_state = VHOST_NET_POLL_DISABLED;
> +	n->cache = NULL;
>  	return 0;
>  }
>  
> @@ -398,6 +563,18 @@ static void vhost_net_flush(struct vhost_net *n)
>  	vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
>  }
>  
> +static void vhost_async_cleanup(struct vhost_net *n)
> +{
> +	/* clean the notifier */
> +	struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
> +	struct kiocb *iocb = NULL;
> +	if (n->cache) {
> +		while ((iocb = notify_dequeue(vq)) != NULL)
> +			kmem_cache_free(n->cache, iocb);
> +		kmem_cache_destroy(n->cache);
> +	}
> +}
> +
>  static int vhost_net_release(struct inode *inode, struct file *f)
>  {
>  	struct vhost_net *n = f->private_data;
> @@ -414,6 +591,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
>  	/* We do an extra flush before freeing memory,
>  	 * since jobs can re-queue themselves. */
>  	vhost_net_flush(n);
> +	vhost_async_cleanup(n);
>  	kfree(n);
>  	return 0;
>  }
> @@ -462,7 +640,19 @@ static struct socket *get_tun_socket(int fd)
>  	return sock;
>  }
>  
> -static struct socket *get_socket(int fd)
> +static struct socket *get_mp_socket(int fd)
> +{
> +	struct file *file = fget(fd);
> +	struct socket *sock;
> +	if (!file)
> +		return ERR_PTR(-EBADF);
> +	sock = mp_get_socket(file);
> +	if (IS_ERR(sock))
> +		fput(file);
> +	return sock;
> +}
> +
> +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
>  {
>  	struct socket *sock;
>  	if (fd == -1)
> @@ -473,9 +663,30 @@ static struct socket *get_socket(int fd)
>  	sock = get_tun_socket(fd);
>  	if (!IS_ERR(sock))
>  		return sock;
> +	sock = get_mp_socket(fd);
> +	if (!IS_ERR(sock)) {
> +		vq->link_state = VHOST_VQ_LINK_ASYNC;
> +		return sock;
> +	}
>  	return ERR_PTR(-ENOTSOCK);
>  }
>  
> +static void vhost_init_link_state(struct vhost_net *n, int index)
> +{
> +	struct vhost_virtqueue *vq = n->vqs + index;
> +
> +	WARN_ON(!mutex_is_locked(&vq->mutex));
> +	if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> +		INIT_LIST_HEAD(&vq->notifier);
> +		spin_lock_init(&vq->notify_lock);
> +		if (!n->cache) {
> +			n->cache = kmem_cache_create("vhost_kiocb",
> +					sizeof(struct kiocb), 0,
> +					SLAB_HWCACHE_ALIGN, NULL);
> +		}
> +	}
> +}
> +
>  static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
>  {
>  	struct socket *sock, *oldsock;
> @@ -493,12 +704,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
>  	}
>  	vq = n->vqs + index;
>  	mutex_lock(&vq->mutex);
> -	sock = get_socket(fd);
> +	vq->link_state = VHOST_VQ_LINK_SYNC;
> +	sock = get_socket(vq, fd);
>  	if (IS_ERR(sock)) {
>  		r = PTR_ERR(sock);
>  		goto err;
>  	}
>  
> +	vhost_init_link_state(n, index);
> +

I think we should just teach get_socket to return link_state
in addition to the socket pointer, and pass the returned value to
vhost_init_link_state.

>  	/* start polling new socket */
>  	oldsock = vq->private_data;
>  	if (sock == oldsock)
> @@ -507,8 +721,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
>  	vhost_net_disable_vq(n, vq);
>  	rcu_assign_pointer(vq->private_data, sock);
>  	vhost_net_enable_vq(n, vq);
> -	mutex_unlock(&vq->mutex);
>  done:
> +	mutex_unlock(&vq->mutex);
>  	mutex_unlock(&n->dev.mutex);
>  	if (oldsock) {
>  		vhost_net_flush_vq(n, index);

why the change above? Are you sure it's safe?  Need to be careful here:
doing everything under vq and dev mutex is much simpler.
If this change is required, need to review locking carefully
to make sure we are not introducing races.

> @@ -516,6 +730,7 @@ done:
>  	}
>  	return r;
>  err:
> +	mutex_unlock(&vq->mutex);
>  	mutex_unlock(&n->dev.mutex);
>  	return r;
>  }
> diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
> index 97233d5..53dab80 100644
> --- a/drivers/vhost/vhost.c
> +++ b/drivers/vhost/vhost.c
> @@ -715,66 +715,21 @@ static unsigned get_indirect(struct vhost_dev *dev, struct vhost_virtqueue *vq,
>  	return 0;
>  }
>  
> -/* This looks in the virtqueue and for the first available buffer, and converts
> - * it to an iovec for convenient access.  Since descriptors consist of some
> - * number of output then some number of input descriptors, it's actually two
> - * iovecs, but we pack them into one and note how many of each there were.
> - *
> - * This function returns the descriptor number found, or vq->num (which
> - * is never a valid descriptor number) if none was found. */
> -unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
> +unsigned __vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
>  			   struct iovec iov[], unsigned int iov_size,
>  			   unsigned int *out_num, unsigned int *in_num,
> -			   struct vhost_log *log, unsigned int *log_num)
> +			   struct vhost_log *log, unsigned int *log_num,
> +			   unsigned int head)
>  {
>  	struct vring_desc desc;
> -	unsigned int i, head, found = 0;
> -	u16 last_avail_idx;
> +	unsigned int i = head, found = 0;
>  	int ret;
>  
> -	/* Check it isn't doing very strange things with descriptor numbers. */
> -	last_avail_idx = vq->last_avail_idx;
> -	if (get_user(vq->avail_idx, &vq->avail->idx)) {
> -		vq_err(vq, "Failed to access avail idx at %p\n",
> -		       &vq->avail->idx);
> -		return vq->num;
> -	}
> -
> -	if ((u16)(vq->avail_idx - last_avail_idx) > vq->num) {
> -		vq_err(vq, "Guest moved used index from %u to %u",
> -		       last_avail_idx, vq->avail_idx);
> -		return vq->num;
> -	}
> -
> -	/* If there's nothing new since last we looked, return invalid. */
> -	if (vq->avail_idx == last_avail_idx)
> -		return vq->num;
> -
> -	/* Only get avail ring entries after they have been exposed by guest. */
> -	rmb();
> -
> -	/* Grab the next descriptor number they're advertising, and increment
> -	 * the index we've seen. */
> -	if (get_user(head, &vq->avail->ring[last_avail_idx % vq->num])) {
> -		vq_err(vq, "Failed to read head: idx %d address %p\n",
> -		       last_avail_idx,
> -		       &vq->avail->ring[last_avail_idx % vq->num]);
> -		return vq->num;
> -	}
> -
> -	/* If their number is silly, that's an error. */
> -	if (head >= vq->num) {
> -		vq_err(vq, "Guest says index %u > %u is available",
> -		       head, vq->num);
> -		return vq->num;
> -	}
> -
>  	/* When we start there are none of either input nor output. */
>  	*out_num = *in_num = 0;
>  	if (unlikely(log))
>  		*log_num = 0;
>  
> -	i = head;
>  	do {
>  		unsigned iov_count = *in_num + *out_num;
>  		if (i >= vq->num) {
> @@ -833,8 +788,70 @@ unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
>  			*out_num += ret;
>  		}
>  	} while ((i = next_desc(&desc)) != -1);
> +	return head;
> +}
> +
> +/* This looks in the virtqueue and for the first available buffer, and converts
> + * it to an iovec for convenient access.  Since descriptors consist of some
> + * number of output then some number of input descriptors, it's actually two
> + * iovecs, but we pack them into one and note how many of each there were.
> + *
> + * This function returns the descriptor number found, or vq->num (which
> + * is never a valid descriptor number) if none was found. */
> +unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
> +			   struct iovec iov[], unsigned int iov_size,
> +			   unsigned int *out_num, unsigned int *in_num,
> +			   struct vhost_log *log, unsigned int *log_num)
> +{
> +	struct vring_desc desc;
> +	unsigned int i, head, found = 0;
> +	u16 last_avail_idx;
> +	unsigned int ret;
> +
> +	/* Check it isn't doing very strange things with descriptor numbers. */
> +	last_avail_idx = vq->last_avail_idx;
> +	if (get_user(vq->avail_idx, &vq->avail->idx)) {
> +		vq_err(vq, "Failed to access avail idx at %p\n",
> +		       &vq->avail->idx);
> +		return vq->num;
> +	}
> +
> +	if ((u16)(vq->avail_idx - last_avail_idx) > vq->num) {
> +		vq_err(vq, "Guest moved used index from %u to %u",
> +		       last_avail_idx, vq->avail_idx);
> +		return vq->num;
> +	}
> +
> +	/* If there's nothing new since last we looked, return invalid. */
> +	if (vq->avail_idx == last_avail_idx)
> +		return vq->num;
> +
> +	/* Only get avail ring entries after they have been exposed by guest. */
> +	rmb();
> +
> +	/* Grab the next descriptor number they're advertising, and increment
> +	 * the index we've seen. */
> +	if (get_user(head, &vq->avail->ring[last_avail_idx % vq->num])) {
> +		vq_err(vq, "Failed to read head: idx %d address %p\n",
> +		       last_avail_idx,
> +		       &vq->avail->ring[last_avail_idx % vq->num]);
> +		return vq->num;
> +	}
> +
> +	/* If their number is silly, that's an error. */
> +	if (head >= vq->num) {
> +		vq_err(vq, "Guest says index %u > %u is available",
> +		       head, vq->num);
> +		return vq->num;
> +	}
> +
> +	ret = __vhost_get_vq_desc(dev, vq, iov, iov_size,
> +				  out_num, in_num,
> +				  log, log_num, head);
>  
>  	/* On success, increment avail index. */
> +	if (ret == vq->num)
> +		return ret;
>  	vq->last_avail_idx++;
>  	return head;
>  }
> diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> index d1f0453..8b95df8 100644
> --- a/drivers/vhost/vhost.h
> +++ b/drivers/vhost/vhost.h
> @@ -43,6 +43,11 @@ struct vhost_log {
>  	u64 len;
>  };
>  
> +enum vhost_vq_link_state {
> +	VHOST_VQ_LINK_SYNC = 	0,
> +	VHOST_VQ_LINK_ASYNC = 	1,

don't try to align values to the right, just put a single space each
side of =.

> +};
> +
>  /* The virtqueue structure describes a queue attached to a device. */
>  struct vhost_virtqueue {
>  	struct vhost_dev *dev;
> @@ -96,6 +101,10 @@ struct vhost_virtqueue {
>  	/* Log write descriptors */
>  	void __user *log_base;
>  	struct vhost_log log[VHOST_NET_MAX_SG];
> +	/*Differiate async socket for 0-copy from normal*/

spaces after /* and before */.

> +	enum vhost_vq_link_state link_state;
> +	struct list_head notifier;
> +	spinlock_t notify_lock;
>  };
>  
>  struct vhost_dev {
> @@ -122,6 +131,11 @@ unsigned vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue *,
>  			   struct iovec iov[], unsigned int iov_count,
>  			   unsigned int *out_num, unsigned int *in_num,
>  			   struct vhost_log *log, unsigned int *log_num);
> +unsigned __vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue *,
> +			   struct iovec iov[], unsigned int iov_count,
> +			   unsigned int *out_num, unsigned int *in_num,
> +			   struct vhost_log *log, unsigned int *log_num,
> +			   unsigned int head);
>  void vhost_discard_vq_desc(struct vhost_virtqueue *);
>  
>  int vhost_add_used(struct vhost_virtqueue *, unsigned int head, int len);
> -- 
> 1.5.4.4

^ permalink raw reply

* [patch] wimax: wimax_msg_alloc() returns ERR_PTR not null
From: Dan Carpenter @ 2010-04-22  9:46 UTC (permalink / raw)
  To: Inaky Perez-Gonzalez
  Cc: linux-wimax, André Goddard Rosa, wimax, netdev,
	kernel-janitors

wimax_msg_alloc() returns an ERR_PTR and not null.  I changed it to test
for ERR_PTR instead of null.  I also added a check in front of the
kfree() because kfree() can handle null but not ERR_PTR.

Signed-off-by: Dan Carpenter <error27@gmail.com>

diff --git a/drivers/net/wimax/i2400m/rx.c b/drivers/net/wimax/i2400m/rx.c
index fa2e11e..05e2247 100644
--- a/drivers/net/wimax/i2400m/rx.c
+++ b/drivers/net/wimax/i2400m/rx.c
@@ -300,17 +300,16 @@ void i2400m_rx_ctl_ack(struct i2400m *i2400m,
 		d_printf(1, dev, "Huh? waiter for command reply cancelled\n");
 		goto error_waiter_cancelled;
 	}
-	if (ack_skb == NULL) {
+	if (IS_ERR(ack_skb))
 		dev_err(dev, "CMD/GET/SET ack: cannot allocate SKB\n");
-		i2400m->ack_skb = ERR_PTR(-ENOMEM);
-	} else
-		i2400m->ack_skb = ack_skb;
+	i2400m->ack_skb = ack_skb;
 	spin_unlock_irqrestore(&i2400m->rx_lock, flags);
 	complete(&i2400m->msg_completion);
 	return;
 
 error_waiter_cancelled:
-	kfree_skb(ack_skb);
+	if (!IS_ERR(ack_skb))
+		kfree_skb(ack_skb);
 error_no_waiter:
 	spin_unlock_irqrestore(&i2400m->rx_lock, flags);
 	return;

^ permalink raw reply related

* Re: [PATCH v5] net: batch skb dequeueing from softnet input_pkt_queue
From: David Miller @ 2010-04-22  9:43 UTC (permalink / raw)
  To: xiaosuo; +Cc: hadi, therbert, eric.dumazet, netdev
In-Reply-To: <1271927357-2973-1-git-send-email-xiaosuo@gmail.com>

From: Changli Gao <xiaosuo@gmail.com>
Date: Thu, 22 Apr 2010 17:09:17 +0800

> +	unsigned int		input_pkt_queue_len;
> +	struct sk_buff		*input_pkt_queue_head;
> +	struct sk_buff		**input_pkt_queue_tailp;
> +

Please do not ignore Stephen Hemminger's feedback.

We already have enough odd SKB queue implementations, we
do not need yet another one in a core location.  This makes
it harder and harder to eventually convert sk_buff to use
"struct list_head".

Instead, use "struct sk_buff_head" and the lockless accessors
(__skb_insert, etc.) and initializer (__skb_queue_head_init).

^ permalink raw reply

* [PATCH v5] net: batch skb dequeueing from softnet input_pkt_queue
From: Changli Gao @ 2010-04-22  9:09 UTC (permalink / raw)
  To: David S. Miller; +Cc: jamal, Tom Herbert, Eric Dumazet, netdev, Changli Gao

batch skb dequeueing from softnet input_pkt_queue

batch skb dequeueing from softnet input_pkt_queue to reduce potential lock
contention when RPS is enabled. input_pkt_queue is reimplemented as a single
linked list (FIFO) to keep enqueueing and dequeueing as fast as posible, and
input_pkt_queue_lock is moved into RPS section to reduce 4 bytes on 32bits
machine.

Note: input_pkt_queue_len doesn't been decreased until process_backlog()
returns.

Signed-off-by: Changli Gao <xiaosuo@gmail.com>
----
 include/linux/netdevice.h |   12 ++++-
 net/core/dev.c            |   99 +++++++++++++++++++++++++++++++++-------------
 2 files changed, 82 insertions(+), 29 deletions(-)
diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h
index 3c5ed5f..58abdd5 100644
--- a/include/linux/netdevice.h
+++ b/include/linux/netdevice.h
@@ -1387,6 +1387,7 @@ struct softnet_data {
 	struct Qdisc		*output_queue;
 	struct list_head	poll_list;
 	struct sk_buff		*completion_queue;
+	struct sk_buff		*process_queue;
 
 #ifdef CONFIG_RPS
 	struct softnet_data	*rps_ipi_list;
@@ -1396,15 +1397,20 @@ struct softnet_data {
 	struct softnet_data	*rps_ipi_next;
 	unsigned int		cpu;
 	unsigned int		input_queue_head;
+	spinlock_t		input_pkt_queue_lock;
 #endif
-	struct sk_buff_head	input_pkt_queue;
+	unsigned int		input_pkt_queue_len;
+	struct sk_buff		*input_pkt_queue_head;
+	struct sk_buff		**input_pkt_queue_tailp;
+
 	struct napi_struct	backlog;
 };
 
-static inline void input_queue_head_incr(struct softnet_data *sd)
+static inline void input_queue_head_add(struct softnet_data *sd,
+					unsigned int len)
 {
 #ifdef CONFIG_RPS
-	sd->input_queue_head++;
+	sd->input_queue_head += len;
 #endif
 }
 
diff --git a/net/core/dev.c b/net/core/dev.c
index e904c47..f37c223 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -211,14 +211,14 @@ static inline struct hlist_head *dev_index_hash(struct net *net, int ifindex)
 static inline void rps_lock(struct softnet_data *sd)
 {
 #ifdef CONFIG_RPS
-	spin_lock(&sd->input_pkt_queue.lock);
+	spin_lock(&sd->input_pkt_queue_lock);
 #endif
 }
 
 static inline void rps_unlock(struct softnet_data *sd)
 {
 #ifdef CONFIG_RPS
-	spin_unlock(&sd->input_pkt_queue.lock);
+	spin_unlock(&sd->input_pkt_queue_lock);
 #endif
 }
 
@@ -2409,12 +2409,15 @@ static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
 	__get_cpu_var(netdev_rx_stat).total++;
 
 	rps_lock(sd);
-	if (sd->input_pkt_queue.qlen <= netdev_max_backlog) {
-		if (sd->input_pkt_queue.qlen) {
+	if (sd->input_pkt_queue_len <= netdev_max_backlog) {
+		if (sd->input_pkt_queue_len) {
 enqueue:
-			__skb_queue_tail(&sd->input_pkt_queue, skb);
+			skb->next = NULL;
+			*sd->input_pkt_queue_tailp = skb;
+			sd->input_pkt_queue_tailp = &skb->next;
+			sd->input_pkt_queue_len++;
 #ifdef CONFIG_RPS
-			*qtail = sd->input_queue_head + sd->input_pkt_queue.qlen;
+			*qtail = sd->input_queue_head + sd->input_pkt_queue_len;
 #endif
 			rps_unlock(sd);
 			local_irq_restore(flags);
@@ -2927,19 +2930,37 @@ EXPORT_SYMBOL(netif_receive_skb);
 /* Network device is going away, flush any packets still pending
  * Called with irqs disabled.
  */
-static void flush_backlog(void *arg)
+
+static struct sk_buff **__flush_backlog(struct softnet_data *sd,
+					struct sk_buff **pskb,
+					struct net_device *dev)
 {
-	struct net_device *dev = arg;
-	struct softnet_data *sd = &__get_cpu_var(softnet_data);
-	struct sk_buff *skb, *tmp;
+	struct sk_buff *skb;
 
-	rps_lock(sd);
-	skb_queue_walk_safe(&sd->input_pkt_queue, skb, tmp)
+	while (*pskb) {
+		skb = *pskb;
 		if (skb->dev == dev) {
-			__skb_unlink(skb, &sd->input_pkt_queue);
+			*pskb = skb->next;
 			kfree_skb(skb);
-			input_queue_head_incr(sd);
+			input_queue_head_add(sd, 1);
+			sd->input_pkt_queue_len--;
+		} else {
+			pskb = &skb->next;
 		}
+	}
+
+	return pskb;
+}
+
+static void flush_backlog(void *arg)
+{
+	struct softnet_data *sd = &__get_cpu_var(softnet_data);
+	struct sk_buff **tailp;
+
+	rps_lock(sd);
+	tailp = __flush_backlog(sd, &sd->input_pkt_queue_head, arg);
+	sd->input_pkt_queue_tailp = tailp;
+	__flush_backlog(sd, &sd->process_queue, arg);
 	rps_unlock(sd);
 }
 
@@ -3249,24 +3270,39 @@ static int process_backlog(struct napi_struct *napi, int quota)
 	struct softnet_data *sd = &__get_cpu_var(softnet_data);
 
 	napi->weight = weight_p;
+	local_irq_disable();
 	do {
 		struct sk_buff *skb;
 
-		local_irq_disable();
+		while (sd->process_queue) {
+			skb = sd->process_queue;
+			sd->process_queue = skb->next;
+			local_irq_enable();
+			__netif_receive_skb(skb);
+			if (++work >= quota) {
+				local_irq_disable();
+				rps_lock(sd);
+				goto out;
+			}
+			local_irq_disable();
+		}
+
 		rps_lock(sd);
-		skb = __skb_dequeue(&sd->input_pkt_queue);
-		if (!skb) {
+		if (sd->input_pkt_queue_head == NULL) {
 			__napi_complete(napi);
-			rps_unlock(sd);
-			local_irq_enable();
 			break;
 		}
-		input_queue_head_incr(sd);
+		sd->process_queue = sd->input_pkt_queue_head;
+		sd->input_pkt_queue_head = NULL;
+		sd->input_pkt_queue_tailp = &sd->input_pkt_queue_head;
 		rps_unlock(sd);
-		local_irq_enable();
+	} while (1);
 
-		__netif_receive_skb(skb);
-	} while (++work < quota);
+out:
+	sd->input_pkt_queue_len -= work;
+	input_queue_head_add(sd, work);
+	rps_unlock(sd);
+	local_irq_enable();
 
 	return work;
 }
@@ -5621,10 +5657,17 @@ static int dev_cpu_callback(struct notifier_block *nfb,
 	local_irq_enable();
 
 	/* Process offline CPU's input_pkt_queue */
-	while ((skb = __skb_dequeue(&oldsd->input_pkt_queue))) {
+	while ((skb = oldsd->input_pkt_queue_head)) {
+		oldsd->input_pkt_queue_head = skb->next;
+		netif_rx(skb);
+	}
+	while ((skb = oldsd->process_queue)) {
+		oldsd->process_queue = skb->next;
 		netif_rx(skb);
-		input_queue_head_incr(oldsd);
 	}
+	oldsd->input_pkt_queue_tailp = &oldsd->input_pkt_queue_head;
+	input_queue_head_add(oldsd, oldsd->input_pkt_queue_len);
+	oldsd->input_pkt_queue_len = 0;
 
 	return NOTIFY_OK;
 }
@@ -5842,11 +5885,15 @@ static int __init net_dev_init(void)
 	for_each_possible_cpu(i) {
 		struct softnet_data *sd = &per_cpu(softnet_data, i);
 
-		skb_queue_head_init(&sd->input_pkt_queue);
+		sd->input_pkt_queue_head = NULL;
+		sd->input_pkt_queue_tailp = &sd->input_pkt_queue_head;
+		sd->input_pkt_queue_len = 0;
+		sd->process_queue = NULL;
 		sd->completion_queue = NULL;
 		INIT_LIST_HEAD(&sd->poll_list);
 
 #ifdef CONFIG_RPS
+		spin_lock_init(&sd->input_pkt_queue_lock);
 		sd->csd.func = rps_trigger_softirq;
 		sd->csd.info = sd;
 		sd->csd.flags = 0;

^ permalink raw reply related

* Re: [RFC][PATCH v2 0/3] Provide a zero-copy method on KVM virtio-net.
From: Michael S. Tsirkin @ 2010-04-22  9:19 UTC (permalink / raw)
  To: Xin, Xiaohui
  Cc: netdev@vger.kernel.org, kvm@vger.kernel.org,
	linux-kernel@vger.kernel.org, mingo@elte.hu,
	jdike@linux.intel.com, davem@davemloft.net
In-Reply-To: <F2E9EB7348B8264F86B6AB8151CE2D79026FAE0BDB@shsmsx502.ccr.corp.intel.com>

On Thu, Apr 22, 2010 at 04:57:56PM +0800, Xin, Xiaohui wrote:
> Michael,
> 
> >Yes, I think this packet split mode probably maps well to mergeable buffer
> >support. Note that
> >1. Not all devices support large packets in this way, others might map
> >   to indirect buffers better
> 
> Do the indirect buffers accord to deal with the skb->frag_list?

We currently use skb->frags.

> >   So we have to figure out how migration is going to work
> Yes, different guest virtio-net driver may contain different features.
> Does the qemu migration work with different features supported by virtio-net
> driver now?

For now, you must have identical feature-sets for migration to work.
And long as we manage the buffers in software, we can always make
features match.

> >2. It's up to guest driver whether to enable features such as
> >   mergeable buffers and indirect buffers
> >   So we have to figure out how to notify guest which mode
> >   is optimal for a given device
> Yes. When a device is binded, the mp device may query the capabilities from driver.
> Actually, there is a structure now in mp device can do this, we can add some field
> to support more.
> 
> >3. We don't want to depend on jumbo frames for decent performance
> >   So we probably should support GSO/GRO
> GSO is for the tx side, right? I think driver can handle it itself.
> For GRO, I'm not sure it's easy or not. Basically, the mp device now
> we have support is doing what raw socket is doing. The packets are not going to host stack.

See commit bfd5f4a3d605e0f6054df0b59fe0907ff7e696d3
(it doesn't currently work with vhost net, but that's
 a separate story).

> -- 
> MST

^ permalink raw reply

* RE: [RFC][PATCH v2 0/3] Provide a zero-copy method on KVM virtio-net.
From: Xin, Xiaohui @ 2010-04-22  8:57 UTC (permalink / raw)
  To: Michael S. Tsirkin
  Cc: netdev@vger.kernel.org, kvm@vger.kernel.org,
	linux-kernel@vger.kernel.org, mingo@elte.hu,
	jdike@linux.intel.com, davem@davemloft.net
In-Reply-To: <20100421083507.GA30855@redhat.com>

Michael,

>Yes, I think this packet split mode probably maps well to mergeable buffer
>support. Note that
>1. Not all devices support large packets in this way, others might map
>   to indirect buffers better

Do the indirect buffers accord to deal with the skb->frag_list?

>   So we have to figure out how migration is going to work
Yes, different guest virtio-net driver may contain different features.
Does the qemu migration work with different features supported by virtio-net
driver now?

>2. It's up to guest driver whether to enable features such as
>   mergeable buffers and indirect buffers
>   So we have to figure out how to notify guest which mode
>   is optimal for a given device
Yes. When a device is binded, the mp device may query the capabilities from driver.
Actually, there is a structure now in mp device can do this, we can add some field
to support more.

>3. We don't want to depend on jumbo frames for decent performance
>   So we probably should support GSO/GRO
GSO is for the tx side, right? I think driver can handle it itself.
For GRO, I'm not sure it's easy or not. Basically, the mp device now
we have support is doing what raw socket is doing. The packets are not going to host stack.
-- 
MST

^ permalink raw reply

* Re:[RFC][PATCH v3 2/3] Provides multiple submits and asynchronous notifications.
From: xiaohui.xin @ 2010-04-22  8:37 UTC (permalink / raw)
  To: mst; +Cc: arnd, netdev, kvm, linux-kernel, mingo, davem, jdike, Xin Xiaohui
In-Reply-To: <20100415090324.GA15135@redhat.com>

From: Xin Xiaohui <xiaohui.xin@intel.com>

The vhost-net backend now only supports synchronous send/recv
operations. The patch provides multiple submits and asynchronous
notifications. This is needed for zero-copy case.

Signed-off-by: Xin Xiaohui <xiaohui.xin@intel.com>
---

Michael,

>Can't vhost supply a kiocb completion callback that will handle the list?

Yes, thanks. And with it I also remove the vq->receiver finally.

Thanks
Xiaohui

 drivers/vhost/net.c   |  227 +++++++++++++++++++++++++++++++++++++++++++++++--
 drivers/vhost/vhost.c |  115 ++++++++++++++-----------
 drivers/vhost/vhost.h |   14 +++
 3 files changed, 301 insertions(+), 55 deletions(-)

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 22d5fef..4a70f66 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -17,11 +17,13 @@
 #include <linux/workqueue.h>
 #include <linux/rcupdate.h>
 #include <linux/file.h>
+#include <linux/aio.h>
 
 #include <linux/net.h>
 #include <linux/if_packet.h>
 #include <linux/if_arp.h>
 #include <linux/if_tun.h>
+#include <linux/mpassthru.h>
 
 #include <net/sock.h>
 
@@ -47,6 +49,7 @@ struct vhost_net {
 	struct vhost_dev dev;
 	struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
 	struct vhost_poll poll[VHOST_NET_VQ_MAX];
+	struct kmem_cache       *cache;
 	/* Tells us whether we are polling a socket for TX.
 	 * We only do this when socket buffer fills up.
 	 * Protected by tx vq lock. */
@@ -91,11 +94,132 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
 	net->tx_poll_state = VHOST_NET_POLL_STARTED;
 }
 
+struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
+{
+	struct kiocb *iocb = NULL;
+	unsigned long flags;
+
+	spin_lock_irqsave(&vq->notify_lock, flags);
+	if (!list_empty(&vq->notifier)) {
+		iocb = list_first_entry(&vq->notifier,
+				struct kiocb, ki_list);
+		list_del(&iocb->ki_list);
+	}
+	spin_unlock_irqrestore(&vq->notify_lock, flags);
+	return iocb;
+}
+
+static void handle_iocb(struct kiocb *iocb)
+{
+	struct vhost_virtqueue *vq = iocb->private;
+	unsigned long flags;
+
+        spin_lock_irqsave(&vq->notify_lock, flags);
+        list_add_tail(&iocb->ki_list, &vq->notifier);
+        spin_unlock_irqrestore(&vq->notify_lock, flags);
+}
+
+static void handle_async_rx_events_notify(struct vhost_net *net,
+					 struct vhost_virtqueue *vq,
+					 struct socket *sock)
+{
+	struct kiocb *iocb = NULL;
+	struct vhost_log *vq_log = NULL;
+	int rx_total_len = 0;
+	unsigned int head, log, in, out;
+	int size;
+
+	if (vq->link_state != VHOST_VQ_LINK_ASYNC)
+		return;
+
+	if (sock->sk->sk_data_ready)
+		sock->sk->sk_data_ready(sock->sk, 0);
+
+	vq_log = unlikely(vhost_has_feature(
+				&net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
+	while ((iocb = notify_dequeue(vq)) != NULL) {
+		vhost_add_used_and_signal(&net->dev, vq,
+				iocb->ki_pos, iocb->ki_nbytes);
+		log = (int)(iocb->ki_user_data >> 32);
+		size = iocb->ki_nbytes;
+		head = iocb->ki_pos;
+		rx_total_len += iocb->ki_nbytes;
+
+		if (iocb->ki_dtor)
+			iocb->ki_dtor(iocb);
+		kmem_cache_free(net->cache, iocb);
+
+		/* when log is enabled, recomputing the log info is needed,
+		 * since these buffers are in async queue, and may not get
+		 * the log info before.
+		 */
+		if (unlikely(vq_log)) {
+			if (!log)
+				__vhost_get_vq_desc(&net->dev, vq, vq->iov,
+						    ARRAY_SIZE(vq->iov),
+						    &out, &in, vq_log,
+						    &log, head);
+			vhost_log_write(vq, vq_log, log, size);
+		}
+		if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
+			vhost_poll_queue(&vq->poll);
+			break;
+		}
+	}
+}
+
+static void handle_async_tx_events_notify(struct vhost_net *net,
+					struct vhost_virtqueue *vq)
+{
+	struct kiocb *iocb = NULL;
+	int tx_total_len = 0;
+
+	if (vq->link_state != VHOST_VQ_LINK_ASYNC)
+		return;
+
+	while ((iocb = notify_dequeue(vq)) != NULL) {
+		vhost_add_used_and_signal(&net->dev, vq,
+				iocb->ki_pos, 0);
+		tx_total_len += iocb->ki_nbytes;
+
+		if (iocb->ki_dtor)
+			iocb->ki_dtor(iocb);
+
+		kmem_cache_free(net->cache, iocb);
+		if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
+			vhost_poll_queue(&vq->poll);
+			break;
+		}
+	}
+}
+
+static struct kiocb *create_iocb(struct vhost_net *net,
+				 struct vhost_virtqueue *vq,
+				 unsigned head, unsigned log)
+{
+	struct kiocb *iocb = NULL;
+
+	if (vq->link_state != VHOST_VQ_LINK_ASYNC)
+		return NULL; 
+	iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
+	if (!iocb)
+		return NULL;
+	iocb->private = vq;
+	iocb->ki_pos = head;
+	iocb->ki_dtor = handle_iocb;
+	if (vq == &net->dev.vqs[VHOST_NET_VQ_RX]) {
+		iocb->ki_user_data = ((unsigned long)log << 32 | vq->num);
+		iocb->ki_iovec = vq->hdr;
+	}
+	return iocb;
+}
+				 
 /* Expects to be always run from workqueue - which acts as
  * read-size critical section for our kind of RCU. */
 static void handle_tx(struct vhost_net *net)
 {
 	struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
+	struct kiocb *iocb = NULL;
 	unsigned head, out, in, s;
 	struct msghdr msg = {
 		.msg_name = NULL,
@@ -124,6 +248,8 @@ static void handle_tx(struct vhost_net *net)
 		tx_poll_stop(net);
 	hdr_size = vq->hdr_size;
 
+	handle_async_tx_events_notify(net, vq);
+
 	for (;;) {
 		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
 					 ARRAY_SIZE(vq->iov),
@@ -151,6 +277,11 @@ static void handle_tx(struct vhost_net *net)
 		/* Skip header. TODO: support TSO. */
 		s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
 		msg.msg_iovlen = out;
+
+		iocb = create_iocb(net, vq, head, 0);
+		if (vq->link_state == VHOST_VQ_LINK_ASYNC && !iocb)
+			break;
+
 		len = iov_length(vq->iov, out);
 		/* Sanity check */
 		if (!len) {
@@ -160,12 +291,18 @@ static void handle_tx(struct vhost_net *net)
 			break;
 		}
 		/* TODO: Check specific error and bomb out unless ENOBUFS? */
-		err = sock->ops->sendmsg(NULL, sock, &msg, len);
+		err = sock->ops->sendmsg(iocb, sock, &msg, len);
 		if (unlikely(err < 0)) {
+			if (vq->link_state == VHOST_VQ_LINK_ASYNC)
+				kmem_cache_free(net->cache, iocb);
 			vhost_discard_vq_desc(vq);
 			tx_poll_start(net, sock);
 			break;
 		}
+
+		if (vq->link_state == VHOST_VQ_LINK_ASYNC)
+			continue;
+
 		if (err != len)
 			pr_err("Truncated TX packet: "
 			       " len %d != %zd\n", err, len);
@@ -177,6 +314,8 @@ static void handle_tx(struct vhost_net *net)
 		}
 	}
 
+	handle_async_tx_events_notify(net, vq);
+
 	mutex_unlock(&vq->mutex);
 	unuse_mm(net->dev.mm);
 }
@@ -186,6 +325,7 @@ static void handle_tx(struct vhost_net *net)
 static void handle_rx(struct vhost_net *net)
 {
 	struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
+	struct kiocb *iocb = NULL;
 	unsigned head, out, in, log, s;
 	struct vhost_log *vq_log;
 	struct msghdr msg = {
@@ -206,7 +346,8 @@ static void handle_rx(struct vhost_net *net)
 	int err;
 	size_t hdr_size;
 	struct socket *sock = rcu_dereference(vq->private_data);
-	if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
+	if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
+			vq->link_state == VHOST_VQ_LINK_SYNC))
 		return;
 
 	use_mm(net->dev.mm);
@@ -214,9 +355,17 @@ static void handle_rx(struct vhost_net *net)
 	vhost_disable_notify(vq);
 	hdr_size = vq->hdr_size;
 
+	/* In async cases, when write log is enabled, in case the submitted
+	 * buffers did not get log info before the log enabling, so we'd
+	 * better recompute the log info when needed. We do this in
+	 * handle_async_rx_events_notify().
+	 */
+
 	vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
 		vq->log : NULL;
 
+	handle_async_rx_events_notify(net, vq, sock);
+
 	for (;;) {
 		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
 					 ARRAY_SIZE(vq->iov),
@@ -245,6 +394,11 @@ static void handle_rx(struct vhost_net *net)
 		s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
 		msg.msg_iovlen = in;
 		len = iov_length(vq->iov, in);
+
+		iocb = create_iocb(net, vq, head, log);
+		if (vq->link_state == VHOST_VQ_LINK_ASYNC && !iocb)
+			break;
+
 		/* Sanity check */
 		if (!len) {
 			vq_err(vq, "Unexpected header len for RX: "
@@ -252,13 +406,20 @@ static void handle_rx(struct vhost_net *net)
 			       iov_length(vq->hdr, s), hdr_size);
 			break;
 		}
-		err = sock->ops->recvmsg(NULL, sock, &msg,
+
+		err = sock->ops->recvmsg(iocb, sock, &msg,
 					 len, MSG_DONTWAIT | MSG_TRUNC);
 		/* TODO: Check specific error and bomb out unless EAGAIN? */
 		if (err < 0) {
+			if (vq->link_state == VHOST_VQ_LINK_ASYNC)
+				kmem_cache_free(net->cache, iocb);
 			vhost_discard_vq_desc(vq);
 			break;
 		}
+
+		if (vq->link_state == VHOST_VQ_LINK_ASYNC)
+			continue;
+
 		/* TODO: Should check and handle checksum. */
 		if (err > len) {
 			pr_err("Discarded truncated rx packet: "
@@ -284,10 +445,13 @@ static void handle_rx(struct vhost_net *net)
 		}
 	}
 
+	handle_async_rx_events_notify(net, vq, sock);
+
 	mutex_unlock(&vq->mutex);
 	unuse_mm(net->dev.mm);
 }
 
+
 static void handle_tx_kick(struct work_struct *work)
 {
 	struct vhost_virtqueue *vq;
@@ -338,6 +502,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
 	vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
 	vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
 	n->tx_poll_state = VHOST_NET_POLL_DISABLED;
+	n->cache = NULL;
 	return 0;
 }
 
@@ -398,6 +563,18 @@ static void vhost_net_flush(struct vhost_net *n)
 	vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
 }
 
+static void vhost_async_cleanup(struct vhost_net *n)
+{
+	/* clean the notifier */
+	struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
+	struct kiocb *iocb = NULL;
+	if (n->cache) {
+		while ((iocb = notify_dequeue(vq)) != NULL)
+			kmem_cache_free(n->cache, iocb);
+		kmem_cache_destroy(n->cache);
+	}
+}
+
 static int vhost_net_release(struct inode *inode, struct file *f)
 {
 	struct vhost_net *n = f->private_data;
@@ -414,6 +591,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
 	/* We do an extra flush before freeing memory,
 	 * since jobs can re-queue themselves. */
 	vhost_net_flush(n);
+	vhost_async_cleanup(n);
 	kfree(n);
 	return 0;
 }
@@ -462,7 +640,19 @@ static struct socket *get_tun_socket(int fd)
 	return sock;
 }
 
-static struct socket *get_socket(int fd)
+static struct socket *get_mp_socket(int fd)
+{
+	struct file *file = fget(fd);
+	struct socket *sock;
+	if (!file)
+		return ERR_PTR(-EBADF);
+	sock = mp_get_socket(file);
+	if (IS_ERR(sock))
+		fput(file);
+	return sock;
+}
+
+static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
 {
 	struct socket *sock;
 	if (fd == -1)
@@ -473,9 +663,30 @@ static struct socket *get_socket(int fd)
 	sock = get_tun_socket(fd);
 	if (!IS_ERR(sock))
 		return sock;
+	sock = get_mp_socket(fd);
+	if (!IS_ERR(sock)) {
+		vq->link_state = VHOST_VQ_LINK_ASYNC;
+		return sock;
+	}
 	return ERR_PTR(-ENOTSOCK);
 }
 
+static void vhost_init_link_state(struct vhost_net *n, int index)
+{
+	struct vhost_virtqueue *vq = n->vqs + index;
+
+	WARN_ON(!mutex_is_locked(&vq->mutex));
+	if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
+		INIT_LIST_HEAD(&vq->notifier);
+		spin_lock_init(&vq->notify_lock);
+		if (!n->cache) {
+			n->cache = kmem_cache_create("vhost_kiocb",
+					sizeof(struct kiocb), 0,
+					SLAB_HWCACHE_ALIGN, NULL);
+		}
+	}
+}
+
 static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
 {
 	struct socket *sock, *oldsock;
@@ -493,12 +704,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
 	}
 	vq = n->vqs + index;
 	mutex_lock(&vq->mutex);
-	sock = get_socket(fd);
+	vq->link_state = VHOST_VQ_LINK_SYNC;
+	sock = get_socket(vq, fd);
 	if (IS_ERR(sock)) {
 		r = PTR_ERR(sock);
 		goto err;
 	}
 
+	vhost_init_link_state(n, index);
+
 	/* start polling new socket */
 	oldsock = vq->private_data;
 	if (sock == oldsock)
@@ -507,8 +721,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
 	vhost_net_disable_vq(n, vq);
 	rcu_assign_pointer(vq->private_data, sock);
 	vhost_net_enable_vq(n, vq);
-	mutex_unlock(&vq->mutex);
 done:
+	mutex_unlock(&vq->mutex);
 	mutex_unlock(&n->dev.mutex);
 	if (oldsock) {
 		vhost_net_flush_vq(n, index);
@@ -516,6 +730,7 @@ done:
 	}
 	return r;
 err:
+	mutex_unlock(&vq->mutex);
 	mutex_unlock(&n->dev.mutex);
 	return r;
 }
diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
index 97233d5..53dab80 100644
--- a/drivers/vhost/vhost.c
+++ b/drivers/vhost/vhost.c
@@ -715,66 +715,21 @@ static unsigned get_indirect(struct vhost_dev *dev, struct vhost_virtqueue *vq,
 	return 0;
 }
 
-/* This looks in the virtqueue and for the first available buffer, and converts
- * it to an iovec for convenient access.  Since descriptors consist of some
- * number of output then some number of input descriptors, it's actually two
- * iovecs, but we pack them into one and note how many of each there were.
- *
- * This function returns the descriptor number found, or vq->num (which
- * is never a valid descriptor number) if none was found. */
-unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
+unsigned __vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
 			   struct iovec iov[], unsigned int iov_size,
 			   unsigned int *out_num, unsigned int *in_num,
-			   struct vhost_log *log, unsigned int *log_num)
+			   struct vhost_log *log, unsigned int *log_num,
+			   unsigned int head)
 {
 	struct vring_desc desc;
-	unsigned int i, head, found = 0;
-	u16 last_avail_idx;
+	unsigned int i = head, found = 0;
 	int ret;
 
-	/* Check it isn't doing very strange things with descriptor numbers. */
-	last_avail_idx = vq->last_avail_idx;
-	if (get_user(vq->avail_idx, &vq->avail->idx)) {
-		vq_err(vq, "Failed to access avail idx at %p\n",
-		       &vq->avail->idx);
-		return vq->num;
-	}
-
-	if ((u16)(vq->avail_idx - last_avail_idx) > vq->num) {
-		vq_err(vq, "Guest moved used index from %u to %u",
-		       last_avail_idx, vq->avail_idx);
-		return vq->num;
-	}
-
-	/* If there's nothing new since last we looked, return invalid. */
-	if (vq->avail_idx == last_avail_idx)
-		return vq->num;
-
-	/* Only get avail ring entries after they have been exposed by guest. */
-	rmb();
-
-	/* Grab the next descriptor number they're advertising, and increment
-	 * the index we've seen. */
-	if (get_user(head, &vq->avail->ring[last_avail_idx % vq->num])) {
-		vq_err(vq, "Failed to read head: idx %d address %p\n",
-		       last_avail_idx,
-		       &vq->avail->ring[last_avail_idx % vq->num]);
-		return vq->num;
-	}
-
-	/* If their number is silly, that's an error. */
-	if (head >= vq->num) {
-		vq_err(vq, "Guest says index %u > %u is available",
-		       head, vq->num);
-		return vq->num;
-	}
-
 	/* When we start there are none of either input nor output. */
 	*out_num = *in_num = 0;
 	if (unlikely(log))
 		*log_num = 0;
 
-	i = head;
 	do {
 		unsigned iov_count = *in_num + *out_num;
 		if (i >= vq->num) {
@@ -833,8 +788,70 @@ unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
 			*out_num += ret;
 		}
 	} while ((i = next_desc(&desc)) != -1);
+	return head;
+}
+
+/* This looks in the virtqueue and for the first available buffer, and converts
+ * it to an iovec for convenient access.  Since descriptors consist of some
+ * number of output then some number of input descriptors, it's actually two
+ * iovecs, but we pack them into one and note how many of each there were.
+ *
+ * This function returns the descriptor number found, or vq->num (which
+ * is never a valid descriptor number) if none was found. */
+unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
+			   struct iovec iov[], unsigned int iov_size,
+			   unsigned int *out_num, unsigned int *in_num,
+			   struct vhost_log *log, unsigned int *log_num)
+{
+	struct vring_desc desc;
+	unsigned int i, head, found = 0;
+	u16 last_avail_idx;
+	unsigned int ret;
+
+	/* Check it isn't doing very strange things with descriptor numbers. */
+	last_avail_idx = vq->last_avail_idx;
+	if (get_user(vq->avail_idx, &vq->avail->idx)) {
+		vq_err(vq, "Failed to access avail idx at %p\n",
+		       &vq->avail->idx);
+		return vq->num;
+	}
+
+	if ((u16)(vq->avail_idx - last_avail_idx) > vq->num) {
+		vq_err(vq, "Guest moved used index from %u to %u",
+		       last_avail_idx, vq->avail_idx);
+		return vq->num;
+	}
+
+	/* If there's nothing new since last we looked, return invalid. */
+	if (vq->avail_idx == last_avail_idx)
+		return vq->num;
+
+	/* Only get avail ring entries after they have been exposed by guest. */
+	rmb();
+
+	/* Grab the next descriptor number they're advertising, and increment
+	 * the index we've seen. */
+	if (get_user(head, &vq->avail->ring[last_avail_idx % vq->num])) {
+		vq_err(vq, "Failed to read head: idx %d address %p\n",
+		       last_avail_idx,
+		       &vq->avail->ring[last_avail_idx % vq->num]);
+		return vq->num;
+	}
+
+	/* If their number is silly, that's an error. */
+	if (head >= vq->num) {
+		vq_err(vq, "Guest says index %u > %u is available",
+		       head, vq->num);
+		return vq->num;
+	}
+
+	ret = __vhost_get_vq_desc(dev, vq, iov, iov_size,
+				  out_num, in_num,
+				  log, log_num, head);
 
 	/* On success, increment avail index. */
+	if (ret == vq->num)
+		return ret;
 	vq->last_avail_idx++;
 	return head;
 }
diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
index d1f0453..8b95df8 100644
--- a/drivers/vhost/vhost.h
+++ b/drivers/vhost/vhost.h
@@ -43,6 +43,11 @@ struct vhost_log {
 	u64 len;
 };
 
+enum vhost_vq_link_state {
+	VHOST_VQ_LINK_SYNC = 	0,
+	VHOST_VQ_LINK_ASYNC = 	1,
+};
+
 /* The virtqueue structure describes a queue attached to a device. */
 struct vhost_virtqueue {
 	struct vhost_dev *dev;
@@ -96,6 +101,10 @@ struct vhost_virtqueue {
 	/* Log write descriptors */
 	void __user *log_base;
 	struct vhost_log log[VHOST_NET_MAX_SG];
+	/*Differiate async socket for 0-copy from normal*/
+	enum vhost_vq_link_state link_state;
+	struct list_head notifier;
+	spinlock_t notify_lock;
 };
 
 struct vhost_dev {
@@ -122,6 +131,11 @@ unsigned vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue *,
 			   struct iovec iov[], unsigned int iov_count,
 			   unsigned int *out_num, unsigned int *in_num,
 			   struct vhost_log *log, unsigned int *log_num);
+unsigned __vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue *,
+			   struct iovec iov[], unsigned int iov_count,
+			   unsigned int *out_num, unsigned int *in_num,
+			   struct vhost_log *log, unsigned int *log_num,
+			   unsigned int head);
 void vhost_discard_vq_desc(struct vhost_virtqueue *);
 
 int vhost_add_used(struct vhost_virtqueue *, unsigned int head, int len);
-- 
1.5.4.4


^ permalink raw reply related

* RE: Re:[RFC][PATCH v3 1/3] A device for zero-copy based on KVM virtio-net.
From: Xin, Xiaohui @ 2010-04-22  8:29 UTC (permalink / raw)
  To: Xin, Xiaohui, mst@redhat.com
  Cc: arnd@arndb.de, netdev@vger.kernel.org, kvm@vger.kernel.org,
	linux-kernel@vger.kernel.org, mingo@elte.hu, davem@davemloft.net,
	jdike@linux.intel.com
In-Reply-To: <1271924658-4840-1-git-send-email-xiaohui.xin@intel.com>

Michael,
Sorry, it's based on the suggestion to hook an iocb completion callback
to handle the iocb list in vhost-net.

Thanks
Xiaohui

-----Original Message-----
From: Xin, Xiaohui
Sent: Thursday, April 22, 2010 4:24 PM
To: mst@redhat.com
Cc: arnd@arndb.de; netdev@vger.kernel.org; kvm@vger.kernel.org; linux-kernel@vger.kernel.org; mingo@elte.hu; davem@davemloft.net; jdike@linux.intel.com; Xin, Xiaohui
Subject: Re:[RFC][PATCH v3 1/3] A device for zero-copy based on KVM virtio-net.

From: Xin Xiaohui <xiaohui.xin@intel.com>

Add a device to utilize the vhost-net backend driver for
copy-less data transfer between guest FE and host NIC.
It pins the guest user space to the host memory and
provides proto_ops as sendmsg/recvmsg to vhost-net.

Signed-off-by: Xin Xiaohui <xiaohui.xin@intel.com>
Signed-off-by: Zhao Yu <yzhao81@gmail.com>
Reviewed-by: Jeff Dike <jdike@linux.intel.com>
---

Michael,
Thanks. I have updated the patch with your suggestion.
It looks much clean now. Please have a review.

Thanks
Xiaohui

 drivers/vhost/Kconfig     |   10 +
 drivers/vhost/Makefile    |    2 +
 drivers/vhost/mpassthru.c | 1239 +++++++++++++++++++++++++++++++++++++++++++++
 include/linux/mpassthru.h |   29 +
 4 files changed, 1280 insertions(+), 0 deletions(-)
 create mode 100644 drivers/vhost/mpassthru.c
 create mode 100644 include/linux/mpassthru.h

diff --git a/drivers/vhost/Kconfig b/drivers/vhost/Kconfig
index 9f409f4..91806b1 100644
--- a/drivers/vhost/Kconfig
+++ b/drivers/vhost/Kconfig
@@ -9,3 +9,13 @@ config VHOST_NET
          To compile this driver as a module, choose M here: the module will
          be called vhost_net.

+config MEDIATE_PASSTHRU
+       tristate "mediate passthru network driver (EXPERIMENTAL)"
+       depends on VHOST_NET
+       ---help---
+         zerocopy network I/O support, we call it as mediate passthru to
+         be distiguish with hardare passthru.
+
+         To compile this driver as a module, choose M here: the module will
+         be called mpassthru.
+
diff --git a/drivers/vhost/Makefile b/drivers/vhost/Makefile
index 72dd020..c18b9fc 100644
--- a/drivers/vhost/Makefile
+++ b/drivers/vhost/Makefile
@@ -1,2 +1,4 @@
 obj-$(CONFIG_VHOST_NET) += vhost_net.o
 vhost_net-y := vhost.o net.o
+
+obj-$(CONFIG_MEDIATE_PASSTHRU) += mpassthru.o
diff --git a/drivers/vhost/mpassthru.c b/drivers/vhost/mpassthru.c
new file mode 100644
index 0000000..cc99b14
--- /dev/null
+++ b/drivers/vhost/mpassthru.c
@@ -0,0 +1,1239 @@
+/*
+ *  MPASSTHRU - Mediate passthrough device.
+ *  Copyright (C) 2009 ZhaoYu, XinXiaohui, Dike, Jeffery G
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ *  GNU General Public License for more details.
+ *
+ */
+
+#define DRV_NAME        "mpassthru"
+#define DRV_DESCRIPTION "Mediate passthru device driver"
+#define DRV_COPYRIGHT   "(C) 2009 ZhaoYu, XinXiaohui, Dike, Jeffery G"
+
+#include <linux/module.h>
+#include <linux/errno.h>
+#include <linux/kernel.h>
+#include <linux/major.h>
+#include <linux/slab.h>
+#include <linux/smp_lock.h>
+#include <linux/poll.h>
+#include <linux/fcntl.h>
+#include <linux/init.h>
+#include <linux/aio.h>
+
+#include <linux/skbuff.h>
+#include <linux/netdevice.h>
+#include <linux/etherdevice.h>
+#include <linux/miscdevice.h>
+#include <linux/ethtool.h>
+#include <linux/rtnetlink.h>
+#include <linux/if.h>
+#include <linux/if_arp.h>
+#include <linux/if_ether.h>
+#include <linux/crc32.h>
+#include <linux/nsproxy.h>
+#include <linux/uaccess.h>
+#include <linux/virtio_net.h>
+#include <linux/mpassthru.h>
+#include <net/net_namespace.h>
+#include <net/netns/generic.h>
+#include <net/rtnetlink.h>
+#include <net/sock.h>
+
+#include <asm/system.h>
+
+/* Uncomment to enable debugging */
+/* #define MPASSTHRU_DEBUG 1 */
+
+#ifdef MPASSTHRU_DEBUG
+static int debug;
+
+#define DBG  if (mp->debug) printk
+#define DBG1 if (debug == 2) printk
+#else
+#define DBG(a...)
+#define DBG1(a...)
+#endif
+
+#define COPY_THRESHOLD (L1_CACHE_BYTES * 4)
+#define COPY_HDR_LEN   (L1_CACHE_BYTES < 64 ? 64 : L1_CACHE_BYTES)
+
+struct frag {
+       u16     offset;
+       u16     size;
+};
+
+struct page_ctor {
+       struct list_head        readq;
+       int                     w_len;
+       int                     r_len;
+       spinlock_t              read_lock;
+       struct kmem_cache       *cache;
+       /* record the locked pages */
+       int                     lock_pages;
+       struct rlimit           o_rlim;
+       struct net_device       *dev;
+       struct mpassthru_port   port;
+};
+
+struct page_info {
+       struct list_head        list;
+       int                     header;
+       /* indicate the actual length of bytes
+        * send/recv in the user space buffers
+        */
+       int                     total;
+       int                     offset;
+       struct page             *pages[MAX_SKB_FRAGS+1];
+       struct skb_frag_struct  frag[MAX_SKB_FRAGS+1];
+       struct sk_buff          *skb;
+       struct page_ctor        *ctor;
+
+       /* The pointer relayed to skb, to indicate
+        * it's a user space allocated skb or kernel
+        */
+       struct skb_user_page    user;
+       struct skb_shared_info  ushinfo;
+
+#define INFO_READ                      0
+#define INFO_WRITE                     1
+       unsigned                flags;
+       unsigned                pnum;
+
+       /* It's meaningful for receive, means
+        * the max length allowed
+        */
+       size_t                  len;
+
+       /* The fields after that is for backend
+        * driver, now for vhost-net.
+        */
+
+       struct kiocb            *iocb;
+       unsigned int            desc_pos;
+       unsigned int            log;
+       struct iovec            hdr[MAX_SKB_FRAGS + 2];
+       struct iovec            iov[MAX_SKB_FRAGS + 2];
+};
+
+struct mp_struct {
+       struct mp_file          *mfile;
+       struct net_device       *dev;
+       struct page_ctor        *ctor;
+       struct socket           socket;
+
+#ifdef MPASSTHRU_DEBUG
+       int debug;
+#endif
+};
+
+struct mp_file {
+       atomic_t count;
+       struct mp_struct *mp;
+       struct net *net;
+};
+
+struct mp_sock {
+       struct sock             sk;
+       struct mp_struct        *mp;
+};
+
+static int mp_dev_change_flags(struct net_device *dev, unsigned flags)
+{
+       int ret = 0;
+
+       rtnl_lock();
+       ret = dev_change_flags(dev, flags);
+       rtnl_unlock();
+
+       if (ret < 0)
+               printk(KERN_ERR "failed to change dev state of %s", dev->name);
+
+       return ret;
+}
+
+/* The main function to allocate user space buffers */
+static struct skb_user_page *page_ctor(struct mpassthru_port *port,
+                                       struct sk_buff *skb, int npages)
+{
+       int i;
+       unsigned long flags;
+       struct page_ctor *ctor;
+       struct page_info *info = NULL;
+
+       ctor = container_of(port, struct page_ctor, port);
+
+       spin_lock_irqsave(&ctor->read_lock, flags);
+       if (!list_empty(&ctor->readq)) {
+               info = list_first_entry(&ctor->readq, struct page_info, list);
+               list_del(&info->list);
+       }
+       spin_unlock_irqrestore(&ctor->read_lock, flags);
+       if (!info)
+               return NULL;
+
+       for (i = 0; i < info->pnum; i++) {
+               get_page(info->pages[i]);
+               info->frag[i].page = info->pages[i];
+               info->frag[i].page_offset = i ? 0 : info->offset;
+               info->frag[i].size = port->npages > 1 ? PAGE_SIZE :
+                       port->data_len;
+       }
+       info->skb = skb;
+       info->user.frags = info->frag;
+       info->user.ushinfo = &info->ushinfo;
+       return &info->user;
+}
+
+static void mp_ki_dtor(struct kiocb *iocb)
+{
+       struct page_info *info = (struct page_info *)(iocb->private);
+       int i;
+
+       if (info->flags == INFO_READ) {
+               for (i = 0; i < info->pnum; i++) {
+                       if (info->pages[i]) {
+                               set_page_dirty_lock(info->pages[i]);
+                               put_page(info->pages[i]);
+                       }
+               }
+               skb_shinfo(info->skb)->destructor_arg = &info->user;
+               info->skb->destructor = NULL;
+               kfree_skb(info->skb);
+       }
+       /* Decrement the number of locked pages */
+       info->ctor->lock_pages -= info->pnum;
+       kmem_cache_free(info->ctor->cache, info);
+
+       return;
+}
+
+static struct kiocb *create_iocb(struct page_info *info, int size)
+{
+       struct kiocb *iocb = NULL;
+
+       iocb = info->iocb;
+       if (!iocb)
+               return iocb;
+       iocb->ki_flags = 0;
+       iocb->ki_users = 1;
+       iocb->ki_key = 0;
+       iocb->ki_ctx = NULL;
+       iocb->ki_cancel = NULL;
+       iocb->ki_retry = NULL;
+       iocb->ki_iovec = NULL;
+       iocb->ki_eventfd = NULL;
+       iocb->ki_pos = info->desc_pos;
+       iocb->ki_nbytes = size;
+       iocb->ki_user_data = info->log;
+       iocb->ki_dtor(iocb);
+       iocb->private = (void *)info;
+       iocb->ki_dtor = mp_ki_dtor;
+
+       return iocb;
+}
+
+/* The callback to destruct the user space buffers or skb */
+static void page_dtor(struct skb_user_page *user)
+{
+       struct page_info *info;
+       struct page_ctor *ctor;
+       struct sock *sk;
+       struct sk_buff *skb;
+       struct kiocb *iocb = NULL;
+       unsigned long flags;
+       int i;
+
+       if (!user)
+               return;
+       info = container_of(user, struct page_info, user);
+       if (!info)
+               return;
+       ctor = info->ctor;
+       skb = info->skb;
+
+       if ((info->flags == INFO_READ) && info->skb)
+               info->skb->head = NULL;
+
+       /* If the info->total is 0, make it to be reused */
+       if (!info->total) {
+               spin_lock_irqsave(&ctor->read_lock, flags);
+               list_add(&info->list, &ctor->readq);
+               spin_unlock_irqrestore(&ctor->read_lock, flags);
+               return;
+       }
+
+       if (info->flags == INFO_READ)
+               return;
+
+       /* For transmit, we should wait for the DMA finish by hardware.
+        * Queue the notifier to wake up the backend driver
+        */
+
+       iocb = create_iocb(info, info->total);
+
+       sk = ctor->port.sock->sk;
+       sk->sk_write_space(sk);
+
+       return;
+}
+
+static int page_ctor_attach(struct mp_struct *mp)
+{
+       int rc;
+       struct page_ctor *ctor;
+       struct net_device *dev = mp->dev;
+
+       /* locked by mp_mutex */
+       if (rcu_dereference(mp->ctor))
+               return -EBUSY;
+
+       ctor = kzalloc(sizeof(*ctor), GFP_KERNEL);
+       if (!ctor)
+               return -ENOMEM;
+       rc = netdev_mp_port_prep(dev, &ctor->port);
+       if (rc)
+               goto fail;
+
+       ctor->cache = kmem_cache_create("skb_page_info",
+                       sizeof(struct page_info), 0,
+                       SLAB_HWCACHE_ALIGN, NULL);
+
+       if (!ctor->cache)
+               goto cache_fail;
+
+       INIT_LIST_HEAD(&ctor->readq);
+       spin_lock_init(&ctor->read_lock);
+
+       ctor->w_len = 0;
+       ctor->r_len = 0;
+
+       dev_hold(dev);
+       ctor->dev = dev;
+       ctor->port.ctor = page_ctor;
+       ctor->port.sock = &mp->socket;
+       ctor->lock_pages = 0;
+       rc = netdev_mp_port_attach(dev, &ctor->port);
+       if (rc)
+               goto fail;
+
+       /* locked by mp_mutex */
+       rcu_assign_pointer(mp->ctor, ctor);
+
+       /* XXX:Need we do set_offload here ? */
+
+       return 0;
+
+fail:
+       kmem_cache_destroy(ctor->cache);
+cache_fail:
+       kfree(ctor);
+       dev_put(dev);
+
+       return rc;
+}
+
+struct page_info *info_dequeue(struct page_ctor *ctor)
+{
+       unsigned long flags;
+       struct page_info *info = NULL;
+       spin_lock_irqsave(&ctor->read_lock, flags);
+       if (!list_empty(&ctor->readq)) {
+               info = list_first_entry(&ctor->readq,
+                               struct page_info, list);
+               list_del(&info->list);
+       }
+       spin_unlock_irqrestore(&ctor->read_lock, flags);
+       return info;
+}
+
+static int set_memlock_rlimit(struct page_ctor *ctor, int resource,
+                             unsigned long cur, unsigned long max)
+{
+       struct rlimit new_rlim, *old_rlim;
+       int retval;
+
+       if (resource != RLIMIT_MEMLOCK)
+               return -EINVAL;
+       new_rlim.rlim_cur = cur;
+       new_rlim.rlim_max = max;
+
+       old_rlim = current->signal->rlim + resource;
+
+       /* remember the old rlimit value when backend enabled */
+       ctor->o_rlim.rlim_cur = old_rlim->rlim_cur;
+       ctor->o_rlim.rlim_max = old_rlim->rlim_max;
+
+       if ((new_rlim.rlim_max > old_rlim->rlim_max) &&
+                       !capable(CAP_SYS_RESOURCE))
+               return -EPERM;
+
+       retval = security_task_setrlimit(resource, &new_rlim);
+       if (retval)
+               return retval;
+
+       task_lock(current->group_leader);
+       *old_rlim = new_rlim;
+       task_unlock(current->group_leader);
+       return 0;
+}
+
+static int page_ctor_detach(struct mp_struct *mp)
+{
+       struct page_ctor *ctor;
+       struct page_info *info;
+       struct kiocb *iocb = NULL;
+       int i;
+       unsigned long flags;
+
+       /* locked by mp_mutex */
+       ctor = rcu_dereference(mp->ctor);
+       if (!ctor)
+               return -ENODEV;
+
+       while ((info = info_dequeue(ctor))) {
+               for (i = 0; i < info->pnum; i++)
+                       if (info->pages[i])
+                               put_page(info->pages[i]);
+               iocb = create_iocb(info, 0);
+               kmem_cache_free(ctor->cache, info);
+       }
+       set_memlock_rlimit(ctor, RLIMIT_MEMLOCK,
+                          ctor->o_rlim.rlim_cur,
+                          ctor->o_rlim.rlim_max);
+       kmem_cache_destroy(ctor->cache);
+       netdev_mp_port_detach(ctor->dev);
+       dev_put(ctor->dev);
+
+       /* locked by mp_mutex */
+       rcu_assign_pointer(mp->ctor, NULL);
+       synchronize_rcu();
+
+       kfree(ctor);
+       return 0;
+}
+
+/* For small user space buffers transmit, we don't need to call
+ * get_user_pages().
+ */
+static struct page_info *alloc_small_page_info(struct page_ctor *ctor,
+                                               struct kiocb *iocb, int total)
+{
+       struct page_info *info = kmem_cache_zalloc(ctor->cache, GFP_KERNEL);
+
+       if (!info)
+               return NULL;
+       info->total = total;
+       info->user.dtor = page_dtor;
+       info->ctor = ctor;
+       info->flags = INFO_WRITE;
+       info->iocb = iocb;
+       return info;
+}
+
+/* The main function to transform the guest user space address
+ * to host kernel address via get_user_pages(). Thus the hardware
+ * can do DMA directly to the user space address.
+ */
+static struct page_info *alloc_page_info(struct page_ctor *ctor,
+                                       struct kiocb *iocb, struct iovec *iov,
+                                       int count, struct frag *frags,
+                                       int npages, int total)
+{
+       int rc;
+       int i, j, n = 0;
+       int len;
+       unsigned long base, lock_limit;
+       struct page_info *info = NULL;
+
+       lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
+       lock_limit >>= PAGE_SHIFT;
+
+       if (ctor->lock_pages + count > lock_limit) {
+               printk(KERN_INFO "exceed the locked memory rlimit %d!",
+                      lock_limit);
+               return NULL;
+       }
+
+       info = kmem_cache_zalloc(ctor->cache, GFP_KERNEL);
+
+       if (!info)
+               return NULL;
+
+       for (i = j = 0; i < count; i++) {
+               base = (unsigned long)iov[i].iov_base;
+               len = iov[i].iov_len;
+
+               if (!len)
+                       continue;
+               n = ((base & ~PAGE_MASK) + len + ~PAGE_MASK) >> PAGE_SHIFT;
+
+               rc = get_user_pages_fast(base, n, npages ? 1 : 0,
+                                               &info->pages[j]);
+               if (rc != n)
+                       goto failed;
+
+               while (n--) {
+                       frags[j].offset = base & ~PAGE_MASK;
+                       frags[j].size = min_t(int, len,
+                                       PAGE_SIZE - frags[j].offset);
+                       len -= frags[j].size;
+                       base += frags[j].size;
+                       j++;
+               }
+       }
+
+#ifdef CONFIG_HIGHMEM
+       if (npages && !(dev->features & NETIF_F_HIGHDMA)) {
+               for (i = 0; i < j; i++) {
+                       if (PageHighMem(info->pages[i]))
+                               goto failed;
+               }
+       }
+#endif
+
+       info->total = total;
+       info->user.dtor = page_dtor;
+       info->ctor = ctor;
+       info->pnum = j;
+       info->iocb = iocb;
+       if (!npages)
+               info->flags = INFO_WRITE;
+       if (info->flags == INFO_READ) {
+               info->user.start = (u8 *)(((unsigned long)
+                               (pfn_to_kaddr(page_to_pfn(info->pages[0]))) +
+                               frags[0].offset));
+#ifdef NET_SKBUFF_DATA_USES_OFFSET
+               info->user.size = SKB_DATA_ALIGN(
+                                 iov[0].iov_len + NET_IP_ALIGN + NET_SKB_PAD);
+#else
+               info->user.size = SKB_DATA_ALIGN(
+                                 iov[0].iov_len + NET_IP_ALIGN + NET_SKB_PAD) -
+                                 NET_IP_ALIGN - NET_SKB_PAD;
+#endif
+       }
+       /* increment the number of locked pages */
+       ctor->lock_pages += j;
+       return info;
+
+failed:
+       for (i = 0; i < j; i++)
+               put_page(info->pages[i]);
+
+       kmem_cache_free(ctor->cache, info);
+
+       return NULL;
+}
+
+static int mp_sendmsg(struct kiocb *iocb, struct socket *sock,
+                       struct msghdr *m, size_t total_len)
+{
+       struct mp_struct *mp = container_of(sock->sk, struct mp_sock, sk)->mp;
+       struct page_ctor *ctor;
+       struct iovec *iov = m->msg_iov;
+       struct page_info *info = NULL;
+       struct frag frags[MAX_SKB_FRAGS];
+       struct sk_buff *skb;
+       int count = m->msg_iovlen;
+       int total = 0, header, n, i, len, rc;
+       unsigned long base;
+
+       ctor = rcu_dereference(mp->ctor);
+       if (!ctor)
+               return -ENODEV;
+
+       total = iov_length(iov, count);
+
+       if (total < ETH_HLEN)
+               return -EINVAL;
+
+       if (total <= COPY_THRESHOLD)
+               goto copy;
+
+       n = 0;
+       for (i = 0; i < count; i++) {
+               base = (unsigned long)iov[i].iov_base;
+               len = iov[i].iov_len;
+               if (!len)
+                       continue;
+               n += ((base & ~PAGE_MASK) + len + ~PAGE_MASK) >> PAGE_SHIFT;
+               if (n > MAX_SKB_FRAGS)
+                       return -EINVAL;
+       }
+
+copy:
+       header = total > COPY_THRESHOLD ? COPY_HDR_LEN : total;
+
+       skb = alloc_skb(header + NET_IP_ALIGN, GFP_ATOMIC);
+       if (!skb)
+               goto drop;
+
+       skb_reserve(skb, NET_IP_ALIGN);
+
+       skb_set_network_header(skb, ETH_HLEN);
+
+       memcpy_fromiovec(skb->data, iov, header);
+       skb_put(skb, header);
+       skb->protocol = *((__be16 *)(skb->data) + ETH_ALEN);
+
+       if (header == total) {
+               rc = total;
+               info = alloc_small_page_info(ctor, iocb, total);
+       } else {
+               info = alloc_page_info(ctor, iocb, iov, count, frags, 0, total);
+               if (info)
+                       for (i = 0; info->pages[i]; i++) {
+                               skb_add_rx_frag(skb, i, info->pages[i],
+                                               frags[i].offset, frags[i].size);
+                               info->pages[i] = NULL;
+                       }
+       }
+       if (info != NULL) {
+               info->desc_pos = iocb->ki_pos;
+               info->total = total;
+               info->skb = skb;
+               skb_shinfo(skb)->destructor_arg = &info->user;
+               skb->dev = mp->dev;
+               dev_queue_xmit(skb);
+               return 0;
+       }
+drop:
+       kfree_skb(skb);
+       if (info) {
+               for (i = 0; info->pages[i]; i++)
+                       put_page(info->pages[i]);
+               kmem_cache_free(info->ctor->cache, info);
+       }
+       mp->dev->stats.tx_dropped++;
+       return -ENOMEM;
+}
+
+static int mp_recvmsg(struct kiocb *iocb, struct socket *sock,
+                       struct msghdr *m, size_t total_len,
+                       int flags)
+{
+       struct mp_struct *mp = container_of(sock->sk, struct mp_sock, sk)->mp;
+       struct page_ctor *ctor;
+       struct iovec *iov = m->msg_iov;
+       int count = m->msg_iovlen;
+       int npages, payload;
+       struct page_info *info;
+       struct frag frags[MAX_SKB_FRAGS];
+       unsigned long base;
+       int i, len;
+       unsigned long flag;
+
+       if (!(flags & MSG_DONTWAIT))
+               return -EINVAL;
+
+       ctor = rcu_dereference(mp->ctor);
+       if (!ctor)
+               return -EINVAL;
+
+       /* Error detections in case invalid user space buffer */
+       if (count > 2 && iov[1].iov_len < ctor->port.hdr_len &&
+                       mp->dev->features & NETIF_F_SG) {
+               return -EINVAL;
+       }
+
+       npages = ctor->port.npages;
+       payload = ctor->port.data_len;
+
+       /* If KVM guest virtio-net FE driver use SG feature */
+       if (count > 2) {
+               for (i = 2; i < count; i++) {
+                       base = (unsigned long)iov[i].iov_base & ~PAGE_MASK;
+                       len = iov[i].iov_len;
+                       if (npages == 1)
+                               len = min_t(int, len, PAGE_SIZE - base);
+                       else if (base)
+                               break;
+                       payload -= len;
+                       if (payload <= 0)
+                               goto proceed;
+                       if (npages == 1 || (len & ~PAGE_MASK))
+                               break;
+               }
+       }
+
+       if ((((unsigned long)iov[1].iov_base & ~PAGE_MASK)
+                               - NET_SKB_PAD - NET_IP_ALIGN) >= 0)
+               goto proceed;
+
+       return -EINVAL;
+
+proceed:
+       /* skip the virtnet head */
+       iov++;
+       count--;
+
+       if (!ctor->lock_pages)
+               set_memlock_rlimit(ctor, RLIMIT_MEMLOCK,
+                                (((1UL << 32) -1) & iocb->ki_user_data) * 4096,
+                                (((1UL << 32) -1) & iocb->ki_user_data) * 4096);
+
+       /* Translate address to kernel */
+       info = alloc_page_info(ctor, iocb, iov, count, frags, npages, 0);
+       if (!info)
+               return -ENOMEM;
+       info->len = total_len;
+       info->hdr[0].iov_base = iocb->ki_iovec[0].iov_base;
+       info->hdr[0].iov_len = iocb->ki_iovec[0].iov_len;
+       info->offset = frags[0].offset;
+       info->desc_pos = iocb->ki_pos;
+       info->log = iocb->ki_user_data;
+
+       iov--;
+       count++;
+
+       memcpy(info->iov, iov, sizeof(struct iovec) * count);
+
+       spin_lock_irqsave(&ctor->read_lock, flag);
+       list_add_tail(&info->list, &ctor->readq);
+       spin_unlock_irqrestore(&ctor->read_lock, flag);
+
+       return 0;
+}
+
+static void __mp_detach(struct mp_struct *mp)
+{
+       mp->mfile = NULL;
+
+       mp_dev_change_flags(mp->dev, mp->dev->flags & ~IFF_UP);
+       page_ctor_detach(mp);
+       mp_dev_change_flags(mp->dev, mp->dev->flags | IFF_UP);
+
+       /* Drop the extra count on the net device */
+       dev_put(mp->dev);
+}
+
+static DEFINE_MUTEX(mp_mutex);
+
+static void mp_detach(struct mp_struct *mp)
+{
+       mutex_lock(&mp_mutex);
+       __mp_detach(mp);
+       mutex_unlock(&mp_mutex);
+}
+
+static void mp_put(struct mp_file *mfile)
+{
+       if (atomic_dec_and_test(&mfile->count))
+               mp_detach(mfile->mp);
+}
+
+static int mp_release(struct socket *sock)
+{
+       struct mp_struct *mp = container_of(sock->sk, struct mp_sock, sk)->mp;
+       struct mp_file *mfile = mp->mfile;
+
+       mp_put(mfile);
+       sock_put(mp->socket.sk);
+       put_net(mfile->net);
+
+       return 0;
+}
+
+/* Ops structure to mimic raw sockets with mp device */
+static const struct proto_ops mp_socket_ops = {
+       .sendmsg = mp_sendmsg,
+       .recvmsg = mp_recvmsg,
+       .release = mp_release,
+};
+
+static struct proto mp_proto = {
+       .name           = "mp",
+       .owner          = THIS_MODULE,
+       .obj_size       = sizeof(struct mp_sock),
+};
+
+static int mp_chr_open(struct inode *inode, struct file * file)
+{
+       struct mp_file *mfile;
+       cycle_kernel_lock();
+       DBG1(KERN_INFO "mp: mp_chr_open\n");
+
+       mfile = kzalloc(sizeof(*mfile), GFP_KERNEL);
+       if (!mfile)
+               return -ENOMEM;
+       atomic_set(&mfile->count, 0);
+       mfile->mp = NULL;
+       mfile->net = get_net(current->nsproxy->net_ns);
+       file->private_data = mfile;
+       return 0;
+}
+
+
+static struct mp_struct *mp_get(struct mp_file *mfile)
+{
+       struct mp_struct *mp = NULL;
+       if (atomic_inc_not_zero(&mfile->count))
+               mp = mfile->mp;
+
+       return mp;
+}
+
+
+static int mp_attach(struct mp_struct *mp, struct file *file)
+{
+       struct mp_file *mfile = file->private_data;
+       int err;
+
+       netif_tx_lock_bh(mp->dev);
+
+       err = -EINVAL;
+
+       if (mfile->mp)
+               goto out;
+
+       err = -EBUSY;
+       if (mp->mfile)
+               goto out;
+
+       err = 0;
+       mfile->mp = mp;
+       mp->mfile = mfile;
+       mp->socket.file = file;
+       dev_hold(mp->dev);
+       sock_hold(mp->socket.sk);
+       atomic_inc(&mfile->count);
+
+out:
+       netif_tx_unlock_bh(mp->dev);
+       return err;
+}
+
+static void mp_sock_destruct(struct sock *sk)
+{
+       struct mp_struct *mp = container_of(sk, struct mp_sock, sk)->mp;
+       kfree(mp);
+}
+
+static int do_unbind(struct mp_file *mfile)
+{
+       struct mp_struct *mp = mp_get(mfile);
+
+       if (!mp)
+               return -EINVAL;
+
+       mp_detach(mp);
+       sock_put(mp->socket.sk);
+       mp_put(mfile);
+       return 0;
+}
+
+static void mp_sock_state_change(struct sock *sk)
+{
+       if (sk_has_sleeper(sk))
+               wake_up_interruptible_sync_poll(sk->sk_sleep, POLLIN);
+}
+
+static void mp_sock_data_ready(struct sock *sk, int coming)
+{
+       struct mp_struct *mp = container_of(sk, struct mp_sock, sk)->mp;
+       struct page_ctor *ctor = NULL;
+       struct sk_buff *skb = NULL;
+       struct page_info *info = NULL;
+       struct ethhdr *eth;
+       struct kiocb *iocb = NULL;
+       int len, i;
+       unsigned long flags;
+
+       struct virtio_net_hdr hdr = {
+               .flags = 0,
+               .gso_type = VIRTIO_NET_HDR_GSO_NONE
+       };
+
+       ctor = rcu_dereference(mp->ctor);
+       if (!ctor)
+               return;
+
+       while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+               if (skb_shinfo(skb)->destructor_arg) {
+                       info = container_of(skb_shinfo(skb)->destructor_arg,
+                                       struct page_info, user);
+                       info->skb = skb;
+                       if (skb->len > info->len) {
+                               mp->dev->stats.rx_dropped++;
+                               DBG(KERN_INFO "Discarded truncated rx packet: "
+                                       " len %d > %zd\n", skb->len, info->len);
+                               info->total = skb->len;
+                               goto clean;
+                       } else {
+                               int i;
+                               struct skb_shared_info *gshinfo =
+                               (struct skb_shared_info *)(&info->ushinfo);
+                               struct skb_shared_info *hshinfo =
+                                               skb_shinfo(skb);
+
+                               if (gshinfo->nr_frags < hshinfo->nr_frags)
+                                       goto clean;
+                               eth = eth_hdr(skb);
+                               skb_push(skb, ETH_HLEN);
+
+                               hdr.hdr_len = skb_headlen(skb);
+                               info->total = skb->len;
+
+                               for (i = 0; i < gshinfo->nr_frags; i++)
+                                       gshinfo->frags[i].size = 0;
+                               for (i = 0; i < hshinfo->nr_frags; i++)
+                                       gshinfo->frags[i].size =
+                                               hshinfo->frags[i].size;
+                               memcpy(skb_shinfo(skb), &info->ushinfo,
+                                               sizeof(struct skb_shared_info));
+                       }
+               } else {
+                       /* The skb composed with kernel buffers
+                        * in case user space buffers are not sufficent.
+                        * The case should be rare.
+                        */
+                       unsigned long flags;
+                       int i;
+                       struct skb_shared_info *gshinfo = NULL;
+
+                       info = NULL;
+
+                       spin_lock_irqsave(&ctor->read_lock, flags);
+                       if (!list_empty(&ctor->readq)) {
+                               info = list_first_entry(&ctor->readq,
+                                               struct page_info, list);
+                               list_del(&info->list);
+                       }
+                       spin_unlock_irqrestore(&ctor->read_lock, flags);
+                       if (!info) {
+                               DBG(KERN_INFO "No user buffer avaliable %p\n",
+                                                                       skb);
+                               skb_queue_head(&sk->sk_receive_queue,
+                                                                       skb);
+                               break;
+                       }
+                       info->skb = skb;
+                       /* compute the guest skb frags info */
+                       gshinfo = (struct skb_shared_info *)(info->user.start +
+                                       SKB_DATA_ALIGN(info->user.size));
+
+                       if (gshinfo->nr_frags < skb_shinfo(skb)->nr_frags)
+                               goto clean;
+
+                       eth = eth_hdr(skb);
+                       skb_push(skb, ETH_HLEN);
+                       info->total = skb->len;
+
+                       for (i = 0; i < gshinfo->nr_frags; i++)
+                               gshinfo->frags[i].size = 0;
+                       for (i = 0; i < skb_shinfo(skb)->nr_frags; i++)
+                               gshinfo->frags[i].size =
+                                       skb_shinfo(skb)->frags[i].size;
+                       hdr.hdr_len = min_t(int, skb->len,
+                                               info->iov[1].iov_len);
+                       skb_copy_datagram_iovec(skb, 0, info->iov, skb->len);
+               }
+
+               len = memcpy_toiovec(info->hdr, (unsigned char *)&hdr,
+                                                                sizeof hdr);
+               if (len) {
+                       DBG(KERN_INFO
+                               "Unable to write vnet_hdr at addr %p: %d\n",
+                               info->hdr->iov_base, len);
+                       goto clean;
+               }
+
+               iocb = create_iocb(info, skb->len + sizeof(hdr));
+               continue;
+
+clean:
+               kfree_skb(skb);
+               for (i = 0; info->pages[i]; i++)
+                       put_page(info->pages[i]);
+               kmem_cache_free(ctor->cache, info);
+       }
+       return;
+}
+
+static void mp_sock_write_space(struct sock *sk)
+{
+       if (sk_has_sleeper(sk))
+               wake_up_interruptible_sync_poll(sk->sk_sleep, POLLOUT);
+}
+
+static long mp_chr_ioctl(struct file *file, unsigned int cmd,
+               unsigned long arg)
+{
+       struct mp_file *mfile = file->private_data;
+       struct mp_struct *mp;
+       struct net_device *dev;
+       void __user* argp = (void __user *)arg;
+       struct ifreq ifr;
+       struct sock *sk;
+       int ret;
+
+       ret = -EINVAL;
+
+       switch (cmd) {
+       case MPASSTHRU_BINDDEV:
+               ret = -EFAULT;
+               if (copy_from_user(&ifr, argp, sizeof ifr))
+                       break;
+
+               ifr.ifr_name[IFNAMSIZ-1] = '\0';
+
+               ret = -EBUSY;
+
+               if (ifr.ifr_flags & IFF_MPASSTHRU_EXCL)
+                       break;
+
+               ret = -ENODEV;
+               dev = dev_get_by_name(mfile->net, ifr.ifr_name);
+               if (!dev)
+                       break;
+
+               mutex_lock(&mp_mutex);
+
+               ret = -EBUSY;
+               mp = mfile->mp;
+               if (mp)
+                       goto err_dev_put;
+
+               mp = kzalloc(sizeof(*mp), GFP_KERNEL);
+               if (!mp) {
+                       ret = -ENOMEM;
+                       goto err_dev_put;
+               }
+               mp->dev = dev;
+               ret = -ENOMEM;
+
+               sk = sk_alloc(mfile->net, AF_UNSPEC, GFP_KERNEL, &mp_proto);
+               if (!sk)
+                       goto err_free_mp;
+
+               init_waitqueue_head(&mp->socket.wait);
+               mp->socket.ops = &mp_socket_ops;
+               sock_init_data(&mp->socket, sk);
+               sk->sk_sndbuf = INT_MAX;
+               container_of(sk, struct mp_sock, sk)->mp = mp;
+
+               sk->sk_destruct = mp_sock_destruct;
+               sk->sk_data_ready = mp_sock_data_ready;
+               sk->sk_write_space = mp_sock_write_space;
+               sk->sk_state_change = mp_sock_state_change;
+               ret = mp_attach(mp, file);
+               if (ret < 0)
+                       goto err_free_sk;
+
+               ret = page_ctor_attach(mp);
+               if (ret < 0)
+                       goto err_free_sk;
+
+               ifr.ifr_flags |= IFF_MPASSTHRU_EXCL;
+               mp_dev_change_flags(mp->dev, mp->dev->flags | IFF_UP);
+out:
+               mutex_unlock(&mp_mutex);
+               break;
+err_free_sk:
+               sk_free(sk);
+err_free_mp:
+               kfree(mp);
+err_dev_put:
+               dev_put(dev);
+               goto out;
+
+       case MPASSTHRU_UNBINDDEV:
+               ret = do_unbind(mfile);
+               break;
+
+       default:
+               break;
+       }
+       return ret;
+}
+
+static unsigned int mp_chr_poll(struct file *file, poll_table * wait)
+{
+       struct mp_file *mfile = file->private_data;
+       struct mp_struct *mp = mp_get(mfile);
+       struct sock *sk;
+       unsigned int mask = 0;
+
+       if (!mp)
+               return POLLERR;
+
+       sk = mp->socket.sk;
+
+       poll_wait(file, &mp->socket.wait, wait);
+
+       if (!skb_queue_empty(&sk->sk_receive_queue))
+               mask |= POLLIN | POLLRDNORM;
+
+       if (sock_writeable(sk) ||
+               (!test_and_set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags) &&
+                        sock_writeable(sk)))
+               mask |= POLLOUT | POLLWRNORM;
+
+       if (mp->dev->reg_state != NETREG_REGISTERED)
+               mask = POLLERR;
+
+       mp_put(mfile);
+       return mask;
+}
+
+static ssize_t mp_chr_aio_write(struct kiocb *iocb, const struct iovec *iov,
+                               unsigned long count, loff_t pos)
+{
+       struct file *file = iocb->ki_filp;
+       struct mp_struct *mp = mp_get(file->private_data);
+       struct sock *sk = mp->socket.sk;
+       struct sk_buff *skb;
+       int len, err;
+       ssize_t result;
+
+       if (!mp)
+               return -EBADFD;
+
+       /* currently, async is not supported.
+        * but we may support real async aio from user application,
+        * maybe qemu virtio-net backend.
+        */
+       if (!is_sync_kiocb(iocb))
+               return -EFAULT;
+
+       len = iov_length(iov, count);
+
+       if (unlikely(len) < ETH_HLEN)
+               return -EINVAL;
+
+       skb = sock_alloc_send_skb(sk, len + NET_IP_ALIGN,
+                                 file->f_flags & O_NONBLOCK, &err);
+
+       if (!skb)
+               return -EFAULT;
+
+       skb_reserve(skb, NET_IP_ALIGN);
+       skb_put(skb, len);
+
+       if (skb_copy_datagram_from_iovec(skb, 0, iov, 0, len)) {
+               kfree_skb(skb);
+               return -EAGAIN;
+       }
+
+       skb->protocol = eth_type_trans(skb, mp->dev);
+       skb->dev = mp->dev;
+
+       dev_queue_xmit(skb);
+
+       mp_put(file->private_data);
+       return result;
+}
+
+static int mp_chr_close(struct inode *inode, struct file *file)
+{
+       struct mp_file *mfile = file->private_data;
+
+       /*
+        * Ignore return value since an error only means there was nothing to
+        * do
+        */
+       do_unbind(mfile);
+
+       put_net(mfile->net);
+       kfree(mfile);
+
+       return 0;
+}
+
+static const struct file_operations mp_fops = {
+       .owner  = THIS_MODULE,
+       .llseek = no_llseek,
+       .write  = do_sync_write,
+       .aio_write = mp_chr_aio_write,
+       .poll   = mp_chr_poll,
+       .unlocked_ioctl = mp_chr_ioctl,
+       .open   = mp_chr_open,
+       .release = mp_chr_close,
+};
+
+static struct miscdevice mp_miscdev = {
+       .minor = MISC_DYNAMIC_MINOR,
+       .name = "mp",
+       .nodename = "net/mp",
+       .fops = &mp_fops,
+};
+
+static int mp_device_event(struct notifier_block *unused,
+               unsigned long event, void *ptr)
+{
+       struct net_device *dev = ptr;
+       struct mpassthru_port *port;
+       struct mp_struct *mp = NULL;
+       struct socket *sock = NULL;
+
+       port = dev->mp_port;
+       if (port == NULL)
+               return NOTIFY_DONE;
+
+       switch (event) {
+       case NETDEV_UNREGISTER:
+                       sock = dev->mp_port->sock;
+                       mp = container_of(sock->sk, struct mp_sock, sk)->mp;
+                       do_unbind(mp->mfile);
+                       break;
+       }
+       return NOTIFY_DONE;
+}
+
+static struct notifier_block mp_notifier_block __read_mostly = {
+       .notifier_call  = mp_device_event,
+};
+
+static int mp_init(void)
+{
+       int ret = 0;
+
+       ret = misc_register(&mp_miscdev);
+       if (ret)
+               printk(KERN_ERR "mp: Can't register misc device\n");
+       else {
+               printk(KERN_INFO "Registering mp misc device - minor = %d\n",
+                       mp_miscdev.minor);
+               register_netdevice_notifier(&mp_notifier_block);
+       }
+       return ret;
+}
+
+void mp_cleanup(void)
+{
+       unregister_netdevice_notifier(&mp_notifier_block);
+       misc_deregister(&mp_miscdev);
+}
+
+/* Get an underlying socket object from mp file.  Returns error unless file is
+ * attached to a device.  The returned object works like a packet socket, it
+ * can be used for sock_sendmsg/sock_recvmsg.  The caller is responsible for
+ * holding a reference to the file for as long as the socket is in use. */
+struct socket *mp_get_socket(struct file *file)
+{
+       struct mp_file *mfile = file->private_data;
+       struct mp_struct *mp;
+
+       if (file->f_op != &mp_fops)
+               return ERR_PTR(-EINVAL);
+       mp = mp_get(mfile);
+       if (!mp)
+               return ERR_PTR(-EBADFD);
+       mp_put(mfile);
+       return &mp->socket;
+}
+EXPORT_SYMBOL_GPL(mp_get_socket);
+
+module_init(mp_init);
+module_exit(mp_cleanup);
+MODULE_AUTHOR(DRV_COPYRIGHT);
+MODULE_DESCRIPTION(DRV_DESCRIPTION);
+MODULE_LICENSE("GPL v2");
diff --git a/include/linux/mpassthru.h b/include/linux/mpassthru.h
new file mode 100644
index 0000000..e3983d3
--- /dev/null
+++ b/include/linux/mpassthru.h
@@ -0,0 +1,29 @@
+#ifndef __MPASSTHRU_H
+#define __MPASSTHRU_H
+
+#include <linux/types.h>
+#include <linux/if_ether.h>
+
+/* ioctl defines */
+#define MPASSTHRU_BINDDEV      _IOW('M', 213, int)
+#define MPASSTHRU_UNBINDDEV    _IOW('M', 214, int)
+
+/* MPASSTHRU ifc flags */
+#define IFF_MPASSTHRU          0x0001
+#define IFF_MPASSTHRU_EXCL     0x0002
+
+#ifdef __KERNEL__
+#if defined(CONFIG_MEDIATE_PASSTHRU) || defined(CONFIG_MEDIATE_PASSTHRU_MODULE)
+struct socket *mp_get_socket(struct file *);
+#else
+#include <linux/err.h>
+#include <linux/errno.h>
+struct file;
+struct socket;
+static inline struct socket *mp_get_socket(struct file *f)
+{
+       return ERR_PTR(-EINVAL);
+}
+#endif /* CONFIG_MEDIATE_PASSTHRU */
+#endif /* __KERNEL__ */
+#endif /* __MPASSTHRU_H */
--
1.5.4.4


^ permalink raw reply related

* Re:[RFC][PATCH v3 1/3] A device for zero-copy based on KVM virtio-net.
From: xiaohui.xin @ 2010-04-22  8:24 UTC (permalink / raw)
  To: mst; +Cc: arnd, netdev, kvm, linux-kernel, mingo, davem, jdike, Xin Xiaohui
In-Reply-To: <20100415090324.GA15135@redhat.com>

From: Xin Xiaohui <xiaohui.xin@intel.com>

Add a device to utilize the vhost-net backend driver for
copy-less data transfer between guest FE and host NIC.
It pins the guest user space to the host memory and
provides proto_ops as sendmsg/recvmsg to vhost-net.

Signed-off-by: Xin Xiaohui <xiaohui.xin@intel.com>
Signed-off-by: Zhao Yu <yzhao81@gmail.com>
Reviewed-by: Jeff Dike <jdike@linux.intel.com>
---

Michael,
Thanks. I have updated the patch with your suggestion.
It looks much clean now. Please have a review.

Thanks
Xiaohui

 drivers/vhost/Kconfig     |   10 +
 drivers/vhost/Makefile    |    2 +
 drivers/vhost/mpassthru.c | 1239 +++++++++++++++++++++++++++++++++++++++++++++
 include/linux/mpassthru.h |   29 +
 4 files changed, 1280 insertions(+), 0 deletions(-)
 create mode 100644 drivers/vhost/mpassthru.c
 create mode 100644 include/linux/mpassthru.h

diff --git a/drivers/vhost/Kconfig b/drivers/vhost/Kconfig
index 9f409f4..91806b1 100644
--- a/drivers/vhost/Kconfig
+++ b/drivers/vhost/Kconfig
@@ -9,3 +9,13 @@ config VHOST_NET
 	  To compile this driver as a module, choose M here: the module will
 	  be called vhost_net.
 
+config MEDIATE_PASSTHRU
+	tristate "mediate passthru network driver (EXPERIMENTAL)"
+	depends on VHOST_NET
+	---help---
+	  zerocopy network I/O support, we call it as mediate passthru to
+	  be distiguish with hardare passthru.
+
+	  To compile this driver as a module, choose M here: the module will
+	  be called mpassthru.
+
diff --git a/drivers/vhost/Makefile b/drivers/vhost/Makefile
index 72dd020..c18b9fc 100644
--- a/drivers/vhost/Makefile
+++ b/drivers/vhost/Makefile
@@ -1,2 +1,4 @@
 obj-$(CONFIG_VHOST_NET) += vhost_net.o
 vhost_net-y := vhost.o net.o
+
+obj-$(CONFIG_MEDIATE_PASSTHRU) += mpassthru.o
diff --git a/drivers/vhost/mpassthru.c b/drivers/vhost/mpassthru.c
new file mode 100644
index 0000000..cc99b14
--- /dev/null
+++ b/drivers/vhost/mpassthru.c
@@ -0,0 +1,1239 @@
+/*
+ *  MPASSTHRU - Mediate passthrough device.
+ *  Copyright (C) 2009 ZhaoYu, XinXiaohui, Dike, Jeffery G
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ *  GNU General Public License for more details.
+ *
+ */
+
+#define DRV_NAME        "mpassthru"
+#define DRV_DESCRIPTION "Mediate passthru device driver"
+#define DRV_COPYRIGHT   "(C) 2009 ZhaoYu, XinXiaohui, Dike, Jeffery G"
+
+#include <linux/module.h>
+#include <linux/errno.h>
+#include <linux/kernel.h>
+#include <linux/major.h>
+#include <linux/slab.h>
+#include <linux/smp_lock.h>
+#include <linux/poll.h>
+#include <linux/fcntl.h>
+#include <linux/init.h>
+#include <linux/aio.h>
+
+#include <linux/skbuff.h>
+#include <linux/netdevice.h>
+#include <linux/etherdevice.h>
+#include <linux/miscdevice.h>
+#include <linux/ethtool.h>
+#include <linux/rtnetlink.h>
+#include <linux/if.h>
+#include <linux/if_arp.h>
+#include <linux/if_ether.h>
+#include <linux/crc32.h>
+#include <linux/nsproxy.h>
+#include <linux/uaccess.h>
+#include <linux/virtio_net.h>
+#include <linux/mpassthru.h>
+#include <net/net_namespace.h>
+#include <net/netns/generic.h>
+#include <net/rtnetlink.h>
+#include <net/sock.h>
+
+#include <asm/system.h>
+
+/* Uncomment to enable debugging */
+/* #define MPASSTHRU_DEBUG 1 */
+
+#ifdef MPASSTHRU_DEBUG
+static int debug;
+
+#define DBG  if (mp->debug) printk
+#define DBG1 if (debug == 2) printk
+#else
+#define DBG(a...)
+#define DBG1(a...)
+#endif
+
+#define COPY_THRESHOLD (L1_CACHE_BYTES * 4)
+#define COPY_HDR_LEN   (L1_CACHE_BYTES < 64 ? 64 : L1_CACHE_BYTES)
+
+struct frag {
+	u16     offset;
+	u16     size;
+};
+
+struct page_ctor {
+	struct list_head        readq;
+	int 			w_len;
+	int 			r_len;
+	spinlock_t      	read_lock;
+	struct kmem_cache   	*cache;
+	/* record the locked pages */
+	int			lock_pages;
+	struct rlimit		o_rlim;
+	struct net_device   	*dev;
+	struct mpassthru_port	port;
+};
+
+struct page_info {
+	struct list_head    	list;
+	int         		header;
+	/* indicate the actual length of bytes
+	 * send/recv in the user space buffers
+	 */
+	int         		total;
+	int         		offset;
+	struct page     	*pages[MAX_SKB_FRAGS+1];
+	struct skb_frag_struct 	frag[MAX_SKB_FRAGS+1];
+	struct sk_buff      	*skb;
+	struct page_ctor   	*ctor;
+
+	/* The pointer relayed to skb, to indicate
+	 * it's a user space allocated skb or kernel
+	 */
+	struct skb_user_page    user;
+	struct skb_shared_info	ushinfo;
+
+#define INFO_READ      		0
+#define INFO_WRITE     		1
+	unsigned        	flags;
+	unsigned        	pnum;
+
+	/* It's meaningful for receive, means
+	 * the max length allowed
+	 */
+	size_t          	len;
+
+	/* The fields after that is for backend
+	 * driver, now for vhost-net.
+	 */
+
+	struct kiocb		*iocb;
+	unsigned int    	desc_pos;
+	unsigned int 		log;
+	struct iovec 		hdr[MAX_SKB_FRAGS + 2];
+	struct iovec 		iov[MAX_SKB_FRAGS + 2];
+};
+
+struct mp_struct {
+	struct mp_file   	*mfile;
+	struct net_device       *dev;
+	struct page_ctor	*ctor;
+	struct socket           socket;
+
+#ifdef MPASSTHRU_DEBUG
+	int debug;
+#endif
+};
+
+struct mp_file {
+	atomic_t count;
+	struct mp_struct *mp;
+	struct net *net;
+};
+
+struct mp_sock {
+	struct sock            	sk;
+	struct mp_struct       	*mp;
+};
+
+static int mp_dev_change_flags(struct net_device *dev, unsigned flags)
+{
+	int ret = 0;
+
+	rtnl_lock();
+	ret = dev_change_flags(dev, flags);
+	rtnl_unlock();
+
+	if (ret < 0)
+		printk(KERN_ERR "failed to change dev state of %s", dev->name);
+
+	return ret;
+}
+
+/* The main function to allocate user space buffers */
+static struct skb_user_page *page_ctor(struct mpassthru_port *port,
+					struct sk_buff *skb, int npages)
+{
+	int i;
+	unsigned long flags;
+	struct page_ctor *ctor;
+	struct page_info *info = NULL;
+
+	ctor = container_of(port, struct page_ctor, port);
+
+	spin_lock_irqsave(&ctor->read_lock, flags);
+	if (!list_empty(&ctor->readq)) {
+		info = list_first_entry(&ctor->readq, struct page_info, list);
+		list_del(&info->list);
+	}
+	spin_unlock_irqrestore(&ctor->read_lock, flags);
+	if (!info)
+		return NULL;
+
+	for (i = 0; i < info->pnum; i++) {
+		get_page(info->pages[i]);
+		info->frag[i].page = info->pages[i];
+		info->frag[i].page_offset = i ? 0 : info->offset;
+		info->frag[i].size = port->npages > 1 ? PAGE_SIZE :
+			port->data_len;
+	}
+	info->skb = skb;
+	info->user.frags = info->frag;
+	info->user.ushinfo = &info->ushinfo;
+	return &info->user;
+}
+
+static void mp_ki_dtor(struct kiocb *iocb)
+{
+	struct page_info *info = (struct page_info *)(iocb->private);
+	int i;
+
+	if (info->flags == INFO_READ) {
+		for (i = 0; i < info->pnum; i++) {
+			if (info->pages[i]) {
+				set_page_dirty_lock(info->pages[i]);
+				put_page(info->pages[i]);
+			}
+		}
+		skb_shinfo(info->skb)->destructor_arg = &info->user;
+		info->skb->destructor = NULL;
+		kfree_skb(info->skb);
+	}
+	/* Decrement the number of locked pages */
+	info->ctor->lock_pages -= info->pnum;
+	kmem_cache_free(info->ctor->cache, info);
+
+	return;
+}
+
+static struct kiocb *create_iocb(struct page_info *info, int size)
+{
+	struct kiocb *iocb = NULL;
+
+	iocb = info->iocb;
+	if (!iocb)
+		return iocb;
+	iocb->ki_flags = 0;
+	iocb->ki_users = 1;
+	iocb->ki_key = 0;
+	iocb->ki_ctx = NULL;
+	iocb->ki_cancel = NULL;
+	iocb->ki_retry = NULL;
+	iocb->ki_iovec = NULL;
+	iocb->ki_eventfd = NULL;
+	iocb->ki_pos = info->desc_pos;
+	iocb->ki_nbytes = size;
+	iocb->ki_user_data = info->log;
+	iocb->ki_dtor(iocb);
+	iocb->private = (void *)info;
+	iocb->ki_dtor = mp_ki_dtor;
+
+	return iocb;
+}
+
+/* The callback to destruct the user space buffers or skb */
+static void page_dtor(struct skb_user_page *user)
+{
+	struct page_info *info;
+	struct page_ctor *ctor;
+	struct sock *sk;
+	struct sk_buff *skb;
+	struct kiocb *iocb = NULL;
+	unsigned long flags;
+	int i;
+
+	if (!user)
+		return;
+	info = container_of(user, struct page_info, user);
+	if (!info)
+		return;
+	ctor = info->ctor;
+	skb = info->skb;
+
+	if ((info->flags == INFO_READ) && info->skb)
+		info->skb->head = NULL;
+
+	/* If the info->total is 0, make it to be reused */
+	if (!info->total) {
+		spin_lock_irqsave(&ctor->read_lock, flags);
+		list_add(&info->list, &ctor->readq);
+		spin_unlock_irqrestore(&ctor->read_lock, flags);
+		return;
+	}
+
+	if (info->flags == INFO_READ)
+		return;
+
+	/* For transmit, we should wait for the DMA finish by hardware.
+	 * Queue the notifier to wake up the backend driver
+	 */
+
+	iocb = create_iocb(info, info->total);
+	
+	sk = ctor->port.sock->sk;
+	sk->sk_write_space(sk);
+
+	return;
+}
+
+static int page_ctor_attach(struct mp_struct *mp)
+{
+	int rc;
+	struct page_ctor *ctor;
+	struct net_device *dev = mp->dev;
+
+	/* locked by mp_mutex */
+	if (rcu_dereference(mp->ctor))
+		return -EBUSY;
+
+	ctor = kzalloc(sizeof(*ctor), GFP_KERNEL);
+	if (!ctor)
+		return -ENOMEM;
+	rc = netdev_mp_port_prep(dev, &ctor->port);
+	if (rc)
+		goto fail;
+
+	ctor->cache = kmem_cache_create("skb_page_info",
+			sizeof(struct page_info), 0,
+			SLAB_HWCACHE_ALIGN, NULL);
+
+	if (!ctor->cache)
+		goto cache_fail;
+
+	INIT_LIST_HEAD(&ctor->readq);
+	spin_lock_init(&ctor->read_lock);
+
+	ctor->w_len = 0;
+	ctor->r_len = 0;
+
+	dev_hold(dev);
+	ctor->dev = dev;
+	ctor->port.ctor = page_ctor;
+	ctor->port.sock = &mp->socket;
+	ctor->lock_pages = 0;
+	rc = netdev_mp_port_attach(dev, &ctor->port);
+	if (rc)
+		goto fail;
+
+	/* locked by mp_mutex */
+	rcu_assign_pointer(mp->ctor, ctor);
+
+	/* XXX:Need we do set_offload here ? */
+
+	return 0;
+
+fail:
+	kmem_cache_destroy(ctor->cache);
+cache_fail:
+	kfree(ctor);
+	dev_put(dev);
+
+	return rc;
+}
+
+struct page_info *info_dequeue(struct page_ctor *ctor)
+{
+	unsigned long flags;
+	struct page_info *info = NULL;
+	spin_lock_irqsave(&ctor->read_lock, flags);
+	if (!list_empty(&ctor->readq)) {
+		info = list_first_entry(&ctor->readq,
+				struct page_info, list);
+		list_del(&info->list);
+	}
+	spin_unlock_irqrestore(&ctor->read_lock, flags);
+	return info;
+}
+
+static int set_memlock_rlimit(struct page_ctor *ctor, int resource,
+			      unsigned long cur, unsigned long max)
+{
+	struct rlimit new_rlim, *old_rlim;
+	int retval;
+
+	if (resource != RLIMIT_MEMLOCK)
+		return -EINVAL;
+	new_rlim.rlim_cur = cur;
+	new_rlim.rlim_max = max;
+
+	old_rlim = current->signal->rlim + resource;
+
+	/* remember the old rlimit value when backend enabled */
+	ctor->o_rlim.rlim_cur = old_rlim->rlim_cur;
+	ctor->o_rlim.rlim_max = old_rlim->rlim_max;
+
+	if ((new_rlim.rlim_max > old_rlim->rlim_max) &&
+			!capable(CAP_SYS_RESOURCE))
+		return -EPERM;
+
+	retval = security_task_setrlimit(resource, &new_rlim);
+	if (retval)
+		return retval;
+
+	task_lock(current->group_leader);
+	*old_rlim = new_rlim;
+	task_unlock(current->group_leader);
+	return 0;
+}
+
+static int page_ctor_detach(struct mp_struct *mp)
+{
+	struct page_ctor *ctor;
+	struct page_info *info;
+	struct kiocb *iocb = NULL;
+	int i;
+	unsigned long flags;
+
+	/* locked by mp_mutex */
+	ctor = rcu_dereference(mp->ctor);
+	if (!ctor)
+		return -ENODEV;
+
+	while ((info = info_dequeue(ctor))) {
+		for (i = 0; i < info->pnum; i++)
+			if (info->pages[i])
+				put_page(info->pages[i]);
+		iocb = create_iocb(info, 0);
+		kmem_cache_free(ctor->cache, info);
+	}
+	set_memlock_rlimit(ctor, RLIMIT_MEMLOCK,
+			   ctor->o_rlim.rlim_cur,
+			   ctor->o_rlim.rlim_max);
+	kmem_cache_destroy(ctor->cache);
+	netdev_mp_port_detach(ctor->dev);
+	dev_put(ctor->dev);
+
+	/* locked by mp_mutex */
+	rcu_assign_pointer(mp->ctor, NULL);
+	synchronize_rcu();
+
+	kfree(ctor);
+	return 0;
+}
+
+/* For small user space buffers transmit, we don't need to call
+ * get_user_pages().
+ */
+static struct page_info *alloc_small_page_info(struct page_ctor *ctor,
+						struct kiocb *iocb, int total)
+{
+	struct page_info *info = kmem_cache_zalloc(ctor->cache, GFP_KERNEL);
+
+	if (!info)
+		return NULL;
+	info->total = total;
+	info->user.dtor = page_dtor;
+	info->ctor = ctor;
+	info->flags = INFO_WRITE;
+	info->iocb = iocb;
+	return info;
+}
+
+/* The main function to transform the guest user space address
+ * to host kernel address via get_user_pages(). Thus the hardware
+ * can do DMA directly to the user space address.
+ */
+static struct page_info *alloc_page_info(struct page_ctor *ctor,
+					struct kiocb *iocb, struct iovec *iov,
+					int count, struct frag *frags,
+					int npages, int total)
+{
+	int rc;
+	int i, j, n = 0;
+	int len;
+	unsigned long base, lock_limit;
+	struct page_info *info = NULL;
+
+	lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
+	lock_limit >>= PAGE_SHIFT;
+
+	if (ctor->lock_pages + count > lock_limit) {
+		printk(KERN_INFO "exceed the locked memory rlimit %d!",
+		       lock_limit);
+		return NULL;
+	}
+
+	info = kmem_cache_zalloc(ctor->cache, GFP_KERNEL);
+
+	if (!info)
+		return NULL;
+
+	for (i = j = 0; i < count; i++) {
+		base = (unsigned long)iov[i].iov_base;
+		len = iov[i].iov_len;
+
+		if (!len)
+			continue;
+		n = ((base & ~PAGE_MASK) + len + ~PAGE_MASK) >> PAGE_SHIFT;
+
+		rc = get_user_pages_fast(base, n, npages ? 1 : 0,
+						&info->pages[j]);
+		if (rc != n)
+			goto failed;
+
+		while (n--) {
+			frags[j].offset = base & ~PAGE_MASK;
+			frags[j].size = min_t(int, len,
+					PAGE_SIZE - frags[j].offset);
+			len -= frags[j].size;
+			base += frags[j].size;
+			j++;
+		}
+	}
+
+#ifdef CONFIG_HIGHMEM
+	if (npages && !(dev->features & NETIF_F_HIGHDMA)) {
+		for (i = 0; i < j; i++) {
+			if (PageHighMem(info->pages[i]))
+				goto failed;
+		}
+	}
+#endif
+
+	info->total = total;
+	info->user.dtor = page_dtor;
+	info->ctor = ctor;
+	info->pnum = j;
+	info->iocb = iocb;
+	if (!npages)
+		info->flags = INFO_WRITE;
+	if (info->flags == INFO_READ) {
+		info->user.start = (u8 *)(((unsigned long)
+				(pfn_to_kaddr(page_to_pfn(info->pages[0]))) +
+				frags[0].offset));
+#ifdef NET_SKBUFF_DATA_USES_OFFSET
+		info->user.size = SKB_DATA_ALIGN(
+				  iov[0].iov_len + NET_IP_ALIGN + NET_SKB_PAD);
+#else
+		info->user.size = SKB_DATA_ALIGN(
+				  iov[0].iov_len + NET_IP_ALIGN + NET_SKB_PAD) -
+				  NET_IP_ALIGN - NET_SKB_PAD;
+#endif
+	}
+	/* increment the number of locked pages */
+	ctor->lock_pages += j;
+	return info;
+
+failed:
+	for (i = 0; i < j; i++)
+		put_page(info->pages[i]);
+
+	kmem_cache_free(ctor->cache, info);
+
+	return NULL;
+}
+
+static int mp_sendmsg(struct kiocb *iocb, struct socket *sock,
+			struct msghdr *m, size_t total_len)
+{
+	struct mp_struct *mp = container_of(sock->sk, struct mp_sock, sk)->mp;
+	struct page_ctor *ctor;
+	struct iovec *iov = m->msg_iov;
+	struct page_info *info = NULL;
+	struct frag frags[MAX_SKB_FRAGS];
+	struct sk_buff *skb;
+	int count = m->msg_iovlen;
+	int total = 0, header, n, i, len, rc;
+	unsigned long base;
+
+	ctor = rcu_dereference(mp->ctor);
+	if (!ctor)
+		return -ENODEV;
+
+	total = iov_length(iov, count);
+
+	if (total < ETH_HLEN)
+		return -EINVAL;
+
+	if (total <= COPY_THRESHOLD)
+		goto copy;
+
+	n = 0;
+	for (i = 0; i < count; i++) {
+		base = (unsigned long)iov[i].iov_base;
+		len = iov[i].iov_len;
+		if (!len)
+			continue;
+		n += ((base & ~PAGE_MASK) + len + ~PAGE_MASK) >> PAGE_SHIFT;
+		if (n > MAX_SKB_FRAGS)
+			return -EINVAL;
+	}
+
+copy:
+	header = total > COPY_THRESHOLD ? COPY_HDR_LEN : total;
+
+	skb = alloc_skb(header + NET_IP_ALIGN, GFP_ATOMIC);
+	if (!skb)
+		goto drop;
+
+	skb_reserve(skb, NET_IP_ALIGN);
+
+	skb_set_network_header(skb, ETH_HLEN);
+
+	memcpy_fromiovec(skb->data, iov, header);
+	skb_put(skb, header);
+	skb->protocol = *((__be16 *)(skb->data) + ETH_ALEN);
+
+	if (header == total) {
+		rc = total;
+		info = alloc_small_page_info(ctor, iocb, total);
+	} else {
+		info = alloc_page_info(ctor, iocb, iov, count, frags, 0, total);
+		if (info)
+			for (i = 0; info->pages[i]; i++) {
+				skb_add_rx_frag(skb, i, info->pages[i],
+						frags[i].offset, frags[i].size);
+				info->pages[i] = NULL;
+			}
+	}
+	if (info != NULL) {
+		info->desc_pos = iocb->ki_pos;
+		info->total = total;
+		info->skb = skb;
+		skb_shinfo(skb)->destructor_arg = &info->user;
+		skb->dev = mp->dev;
+		dev_queue_xmit(skb);
+		return 0;
+	}
+drop:
+	kfree_skb(skb);
+	if (info) {
+		for (i = 0; info->pages[i]; i++)
+			put_page(info->pages[i]);
+		kmem_cache_free(info->ctor->cache, info);
+	}
+	mp->dev->stats.tx_dropped++;
+	return -ENOMEM;
+}
+
+static int mp_recvmsg(struct kiocb *iocb, struct socket *sock,
+			struct msghdr *m, size_t total_len,
+			int flags)
+{
+	struct mp_struct *mp = container_of(sock->sk, struct mp_sock, sk)->mp;
+	struct page_ctor *ctor;
+	struct iovec *iov = m->msg_iov;
+	int count = m->msg_iovlen;
+	int npages, payload;
+	struct page_info *info;
+	struct frag frags[MAX_SKB_FRAGS];
+	unsigned long base;
+	int i, len;
+	unsigned long flag;
+
+	if (!(flags & MSG_DONTWAIT))
+		return -EINVAL;
+
+	ctor = rcu_dereference(mp->ctor);
+	if (!ctor)
+		return -EINVAL;
+
+	/* Error detections in case invalid user space buffer */
+	if (count > 2 && iov[1].iov_len < ctor->port.hdr_len &&
+			mp->dev->features & NETIF_F_SG) {
+		return -EINVAL;
+	}
+
+	npages = ctor->port.npages;
+	payload = ctor->port.data_len;
+
+	/* If KVM guest virtio-net FE driver use SG feature */
+	if (count > 2) {
+		for (i = 2; i < count; i++) {
+			base = (unsigned long)iov[i].iov_base & ~PAGE_MASK;
+			len = iov[i].iov_len;
+			if (npages == 1)
+				len = min_t(int, len, PAGE_SIZE - base);
+			else if (base)
+				break;
+			payload -= len;
+			if (payload <= 0)
+				goto proceed;
+			if (npages == 1 || (len & ~PAGE_MASK))
+				break;
+		}
+	}
+
+	if ((((unsigned long)iov[1].iov_base & ~PAGE_MASK)
+				- NET_SKB_PAD - NET_IP_ALIGN) >= 0)
+		goto proceed;
+
+	return -EINVAL;
+
+proceed:
+	/* skip the virtnet head */
+	iov++;
+	count--;
+
+	if (!ctor->lock_pages)
+		set_memlock_rlimit(ctor, RLIMIT_MEMLOCK,
+				 (((1UL << 32) -1) & iocb->ki_user_data) * 4096,
+				 (((1UL << 32) -1) & iocb->ki_user_data) * 4096);
+
+	/* Translate address to kernel */
+	info = alloc_page_info(ctor, iocb, iov, count, frags, npages, 0);
+	if (!info)
+		return -ENOMEM;
+	info->len = total_len;
+	info->hdr[0].iov_base = iocb->ki_iovec[0].iov_base;
+	info->hdr[0].iov_len = iocb->ki_iovec[0].iov_len;
+	info->offset = frags[0].offset;
+	info->desc_pos = iocb->ki_pos;
+	info->log = iocb->ki_user_data;
+
+	iov--;
+	count++;
+
+	memcpy(info->iov, iov, sizeof(struct iovec) * count);
+	
+	spin_lock_irqsave(&ctor->read_lock, flag);
+	list_add_tail(&info->list, &ctor->readq);
+	spin_unlock_irqrestore(&ctor->read_lock, flag);
+
+	return 0;
+}
+
+static void __mp_detach(struct mp_struct *mp)
+{
+	mp->mfile = NULL;
+
+	mp_dev_change_flags(mp->dev, mp->dev->flags & ~IFF_UP);
+	page_ctor_detach(mp);
+	mp_dev_change_flags(mp->dev, mp->dev->flags | IFF_UP);
+
+	/* Drop the extra count on the net device */
+	dev_put(mp->dev);
+}
+
+static DEFINE_MUTEX(mp_mutex);
+
+static void mp_detach(struct mp_struct *mp)
+{
+	mutex_lock(&mp_mutex);
+	__mp_detach(mp);
+	mutex_unlock(&mp_mutex);
+}
+
+static void mp_put(struct mp_file *mfile)
+{
+	if (atomic_dec_and_test(&mfile->count))
+		mp_detach(mfile->mp);
+}
+
+static int mp_release(struct socket *sock)
+{
+	struct mp_struct *mp = container_of(sock->sk, struct mp_sock, sk)->mp;
+	struct mp_file *mfile = mp->mfile;
+
+	mp_put(mfile);
+	sock_put(mp->socket.sk);
+	put_net(mfile->net);
+
+	return 0;
+}
+
+/* Ops structure to mimic raw sockets with mp device */
+static const struct proto_ops mp_socket_ops = {
+	.sendmsg = mp_sendmsg,
+	.recvmsg = mp_recvmsg,
+	.release = mp_release,
+};
+
+static struct proto mp_proto = {
+	.name           = "mp",
+	.owner          = THIS_MODULE,
+	.obj_size       = sizeof(struct mp_sock),
+};
+
+static int mp_chr_open(struct inode *inode, struct file * file)
+{
+	struct mp_file *mfile;
+	cycle_kernel_lock();
+	DBG1(KERN_INFO "mp: mp_chr_open\n");
+
+	mfile = kzalloc(sizeof(*mfile), GFP_KERNEL);
+	if (!mfile)
+		return -ENOMEM;
+	atomic_set(&mfile->count, 0);
+	mfile->mp = NULL;
+	mfile->net = get_net(current->nsproxy->net_ns);
+	file->private_data = mfile;
+	return 0;
+}
+
+
+static struct mp_struct *mp_get(struct mp_file *mfile)
+{
+	struct mp_struct *mp = NULL;
+	if (atomic_inc_not_zero(&mfile->count))
+		mp = mfile->mp;
+
+	return mp;
+}
+
+
+static int mp_attach(struct mp_struct *mp, struct file *file)
+{
+	struct mp_file *mfile = file->private_data;
+	int err;
+
+	netif_tx_lock_bh(mp->dev);
+
+	err = -EINVAL;
+
+	if (mfile->mp)
+		goto out;
+
+	err = -EBUSY;
+	if (mp->mfile)
+		goto out;
+
+	err = 0;
+	mfile->mp = mp;
+	mp->mfile = mfile;
+	mp->socket.file = file;
+	dev_hold(mp->dev);
+	sock_hold(mp->socket.sk);
+	atomic_inc(&mfile->count);
+
+out:
+	netif_tx_unlock_bh(mp->dev);
+	return err;
+}
+
+static void mp_sock_destruct(struct sock *sk)
+{
+	struct mp_struct *mp = container_of(sk, struct mp_sock, sk)->mp;
+	kfree(mp);
+}
+
+static int do_unbind(struct mp_file *mfile)
+{
+	struct mp_struct *mp = mp_get(mfile);
+
+	if (!mp)
+		return -EINVAL;
+
+	mp_detach(mp);
+	sock_put(mp->socket.sk);
+	mp_put(mfile);
+	return 0;
+}
+
+static void mp_sock_state_change(struct sock *sk)
+{
+	if (sk_has_sleeper(sk))
+		wake_up_interruptible_sync_poll(sk->sk_sleep, POLLIN);
+}
+
+static void mp_sock_data_ready(struct sock *sk, int coming)
+{
+	struct mp_struct *mp = container_of(sk, struct mp_sock, sk)->mp;
+	struct page_ctor *ctor = NULL;
+	struct sk_buff *skb = NULL;
+	struct page_info *info = NULL;
+	struct ethhdr *eth;
+	struct kiocb *iocb = NULL;
+	int len, i;
+	unsigned long flags;
+
+	struct virtio_net_hdr hdr = {
+		.flags = 0,
+		.gso_type = VIRTIO_NET_HDR_GSO_NONE
+	};
+
+	ctor = rcu_dereference(mp->ctor);
+	if (!ctor)
+		return;
+
+	while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+		if (skb_shinfo(skb)->destructor_arg) {
+			info = container_of(skb_shinfo(skb)->destructor_arg,
+					struct page_info, user);
+			info->skb = skb;
+			if (skb->len > info->len) {
+				mp->dev->stats.rx_dropped++;
+				DBG(KERN_INFO "Discarded truncated rx packet: "
+					" len %d > %zd\n", skb->len, info->len);
+				info->total = skb->len;
+				goto clean;
+			} else {
+				int i;
+				struct skb_shared_info *gshinfo =
+				(struct skb_shared_info *)(&info->ushinfo);
+				struct skb_shared_info *hshinfo =
+						skb_shinfo(skb);
+
+				if (gshinfo->nr_frags < hshinfo->nr_frags)
+					goto clean;
+				eth = eth_hdr(skb);
+				skb_push(skb, ETH_HLEN);
+
+				hdr.hdr_len = skb_headlen(skb);
+				info->total = skb->len;
+
+				for (i = 0; i < gshinfo->nr_frags; i++)
+					gshinfo->frags[i].size = 0;
+				for (i = 0; i < hshinfo->nr_frags; i++)
+					gshinfo->frags[i].size =
+						hshinfo->frags[i].size;
+				memcpy(skb_shinfo(skb), &info->ushinfo,
+						sizeof(struct skb_shared_info));
+			}
+		} else {
+			/* The skb composed with kernel buffers
+			 * in case user space buffers are not sufficent.
+			 * The case should be rare.
+			 */
+			unsigned long flags;
+			int i;
+			struct skb_shared_info *gshinfo = NULL;
+
+			info = NULL;
+
+			spin_lock_irqsave(&ctor->read_lock, flags);
+			if (!list_empty(&ctor->readq)) {
+				info = list_first_entry(&ctor->readq,
+						struct page_info, list);
+				list_del(&info->list);
+			}
+			spin_unlock_irqrestore(&ctor->read_lock, flags);
+			if (!info) {
+				DBG(KERN_INFO "No user buffer avaliable %p\n",
+									skb);
+				skb_queue_head(&sk->sk_receive_queue,
+									skb);
+				break;
+			}
+			info->skb = skb;
+			/* compute the guest skb frags info */
+			gshinfo = (struct skb_shared_info *)(info->user.start +
+					SKB_DATA_ALIGN(info->user.size));
+
+			if (gshinfo->nr_frags < skb_shinfo(skb)->nr_frags)
+				goto clean;
+
+			eth = eth_hdr(skb);
+			skb_push(skb, ETH_HLEN);
+			info->total = skb->len;
+
+			for (i = 0; i < gshinfo->nr_frags; i++)
+				gshinfo->frags[i].size = 0;
+			for (i = 0; i < skb_shinfo(skb)->nr_frags; i++)
+				gshinfo->frags[i].size =
+					skb_shinfo(skb)->frags[i].size;
+			hdr.hdr_len = min_t(int, skb->len,
+						info->iov[1].iov_len);
+			skb_copy_datagram_iovec(skb, 0, info->iov, skb->len);
+		}
+
+		len = memcpy_toiovec(info->hdr, (unsigned char *)&hdr,
+								 sizeof hdr);
+		if (len) {
+			DBG(KERN_INFO
+				"Unable to write vnet_hdr at addr %p: %d\n",
+				info->hdr->iov_base, len);
+			goto clean;
+		}
+
+		iocb = create_iocb(info, skb->len + sizeof(hdr));
+		continue;
+
+clean:
+		kfree_skb(skb);
+		for (i = 0; info->pages[i]; i++)
+			put_page(info->pages[i]);
+		kmem_cache_free(ctor->cache, info);
+	}
+	return;
+}
+
+static void mp_sock_write_space(struct sock *sk)
+{
+	if (sk_has_sleeper(sk))
+		wake_up_interruptible_sync_poll(sk->sk_sleep, POLLOUT);
+}
+
+static long mp_chr_ioctl(struct file *file, unsigned int cmd,
+		unsigned long arg)
+{
+	struct mp_file *mfile = file->private_data;
+	struct mp_struct *mp;
+	struct net_device *dev;
+	void __user* argp = (void __user *)arg;
+	struct ifreq ifr;
+	struct sock *sk;
+	int ret;
+
+	ret = -EINVAL;
+
+	switch (cmd) {
+	case MPASSTHRU_BINDDEV:
+		ret = -EFAULT;
+		if (copy_from_user(&ifr, argp, sizeof ifr))
+			break;
+
+		ifr.ifr_name[IFNAMSIZ-1] = '\0';
+
+		ret = -EBUSY;
+
+		if (ifr.ifr_flags & IFF_MPASSTHRU_EXCL)
+			break;
+
+		ret = -ENODEV;
+		dev = dev_get_by_name(mfile->net, ifr.ifr_name);
+		if (!dev)
+			break;
+
+		mutex_lock(&mp_mutex);
+
+		ret = -EBUSY;
+		mp = mfile->mp;
+		if (mp)
+			goto err_dev_put;
+
+		mp = kzalloc(sizeof(*mp), GFP_KERNEL);
+		if (!mp) {
+			ret = -ENOMEM;
+			goto err_dev_put;
+		}
+		mp->dev = dev;
+		ret = -ENOMEM;
+
+		sk = sk_alloc(mfile->net, AF_UNSPEC, GFP_KERNEL, &mp_proto);
+		if (!sk)
+			goto err_free_mp;
+
+		init_waitqueue_head(&mp->socket.wait);
+		mp->socket.ops = &mp_socket_ops;
+		sock_init_data(&mp->socket, sk);
+		sk->sk_sndbuf = INT_MAX;
+		container_of(sk, struct mp_sock, sk)->mp = mp;
+
+		sk->sk_destruct = mp_sock_destruct;
+		sk->sk_data_ready = mp_sock_data_ready;
+		sk->sk_write_space = mp_sock_write_space;
+		sk->sk_state_change = mp_sock_state_change;
+		ret = mp_attach(mp, file);
+		if (ret < 0)
+			goto err_free_sk;
+
+		ret = page_ctor_attach(mp);
+		if (ret < 0)
+			goto err_free_sk;
+
+		ifr.ifr_flags |= IFF_MPASSTHRU_EXCL;
+		mp_dev_change_flags(mp->dev, mp->dev->flags | IFF_UP);
+out:
+		mutex_unlock(&mp_mutex);
+		break;
+err_free_sk:
+		sk_free(sk);
+err_free_mp:
+		kfree(mp);
+err_dev_put:
+		dev_put(dev);
+		goto out;
+
+	case MPASSTHRU_UNBINDDEV:
+		ret = do_unbind(mfile);
+		break;
+
+	default:
+		break;
+	}
+	return ret;
+}
+
+static unsigned int mp_chr_poll(struct file *file, poll_table * wait)
+{
+	struct mp_file *mfile = file->private_data;
+	struct mp_struct *mp = mp_get(mfile);
+	struct sock *sk;
+	unsigned int mask = 0;
+
+	if (!mp)
+		return POLLERR;
+
+	sk = mp->socket.sk;
+
+	poll_wait(file, &mp->socket.wait, wait);
+
+	if (!skb_queue_empty(&sk->sk_receive_queue))
+		mask |= POLLIN | POLLRDNORM;
+
+	if (sock_writeable(sk) ||
+		(!test_and_set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags) &&
+			 sock_writeable(sk)))
+		mask |= POLLOUT | POLLWRNORM;
+
+	if (mp->dev->reg_state != NETREG_REGISTERED)
+		mask = POLLERR;
+
+	mp_put(mfile);
+	return mask;
+}
+
+static ssize_t mp_chr_aio_write(struct kiocb *iocb, const struct iovec *iov,
+				unsigned long count, loff_t pos)
+{
+	struct file *file = iocb->ki_filp;
+	struct mp_struct *mp = mp_get(file->private_data);
+	struct sock *sk = mp->socket.sk;
+	struct sk_buff *skb;
+	int len, err;
+	ssize_t result;
+
+	if (!mp)
+		return -EBADFD;
+
+	/* currently, async is not supported.
+	 * but we may support real async aio from user application,
+	 * maybe qemu virtio-net backend.
+	 */
+	if (!is_sync_kiocb(iocb))
+		return -EFAULT;
+
+	len = iov_length(iov, count);
+
+	if (unlikely(len) < ETH_HLEN)
+		return -EINVAL;
+
+	skb = sock_alloc_send_skb(sk, len + NET_IP_ALIGN,
+				  file->f_flags & O_NONBLOCK, &err);
+
+	if (!skb)
+		return -EFAULT;
+
+	skb_reserve(skb, NET_IP_ALIGN);
+	skb_put(skb, len);
+
+	if (skb_copy_datagram_from_iovec(skb, 0, iov, 0, len)) {
+		kfree_skb(skb);
+		return -EAGAIN;
+	}
+
+	skb->protocol = eth_type_trans(skb, mp->dev);
+	skb->dev = mp->dev;
+
+	dev_queue_xmit(skb);
+
+	mp_put(file->private_data);
+	return result;
+}
+
+static int mp_chr_close(struct inode *inode, struct file *file)
+{
+	struct mp_file *mfile = file->private_data;
+
+	/*
+	 * Ignore return value since an error only means there was nothing to
+	 * do
+	 */
+	do_unbind(mfile);
+
+	put_net(mfile->net);
+	kfree(mfile);
+
+	return 0;
+}
+
+static const struct file_operations mp_fops = {
+	.owner  = THIS_MODULE,
+	.llseek = no_llseek,
+	.write  = do_sync_write,
+	.aio_write = mp_chr_aio_write,
+	.poll   = mp_chr_poll,
+	.unlocked_ioctl = mp_chr_ioctl,
+	.open   = mp_chr_open,
+	.release = mp_chr_close,
+};
+
+static struct miscdevice mp_miscdev = {
+	.minor = MISC_DYNAMIC_MINOR,
+	.name = "mp",
+	.nodename = "net/mp",
+	.fops = &mp_fops,
+};
+
+static int mp_device_event(struct notifier_block *unused,
+		unsigned long event, void *ptr)
+{
+	struct net_device *dev = ptr;
+	struct mpassthru_port *port;
+	struct mp_struct *mp = NULL;
+	struct socket *sock = NULL;
+
+	port = dev->mp_port;
+	if (port == NULL)
+		return NOTIFY_DONE;
+
+	switch (event) {
+	case NETDEV_UNREGISTER:
+			sock = dev->mp_port->sock;
+			mp = container_of(sock->sk, struct mp_sock, sk)->mp;
+			do_unbind(mp->mfile);
+			break;
+	}
+	return NOTIFY_DONE;
+}
+
+static struct notifier_block mp_notifier_block __read_mostly = {
+	.notifier_call  = mp_device_event,
+};
+
+static int mp_init(void)
+{
+	int ret = 0;
+
+	ret = misc_register(&mp_miscdev);
+	if (ret)
+		printk(KERN_ERR "mp: Can't register misc device\n");
+	else {
+		printk(KERN_INFO "Registering mp misc device - minor = %d\n",
+			mp_miscdev.minor);
+		register_netdevice_notifier(&mp_notifier_block);
+	}
+	return ret;
+}
+
+void mp_cleanup(void)
+{
+	unregister_netdevice_notifier(&mp_notifier_block);
+	misc_deregister(&mp_miscdev);
+}
+
+/* Get an underlying socket object from mp file.  Returns error unless file is
+ * attached to a device.  The returned object works like a packet socket, it
+ * can be used for sock_sendmsg/sock_recvmsg.  The caller is responsible for
+ * holding a reference to the file for as long as the socket is in use. */
+struct socket *mp_get_socket(struct file *file)
+{
+	struct mp_file *mfile = file->private_data;
+	struct mp_struct *mp;
+
+	if (file->f_op != &mp_fops)
+		return ERR_PTR(-EINVAL);
+	mp = mp_get(mfile);
+	if (!mp)
+		return ERR_PTR(-EBADFD);
+	mp_put(mfile);
+	return &mp->socket;
+}
+EXPORT_SYMBOL_GPL(mp_get_socket);
+
+module_init(mp_init);
+module_exit(mp_cleanup);
+MODULE_AUTHOR(DRV_COPYRIGHT);
+MODULE_DESCRIPTION(DRV_DESCRIPTION);
+MODULE_LICENSE("GPL v2");
diff --git a/include/linux/mpassthru.h b/include/linux/mpassthru.h
new file mode 100644
index 0000000..e3983d3
--- /dev/null
+++ b/include/linux/mpassthru.h
@@ -0,0 +1,29 @@
+#ifndef __MPASSTHRU_H
+#define __MPASSTHRU_H
+
+#include <linux/types.h>
+#include <linux/if_ether.h>
+
+/* ioctl defines */
+#define MPASSTHRU_BINDDEV      _IOW('M', 213, int)
+#define MPASSTHRU_UNBINDDEV    _IOW('M', 214, int)
+
+/* MPASSTHRU ifc flags */
+#define IFF_MPASSTHRU		0x0001
+#define IFF_MPASSTHRU_EXCL	0x0002
+
+#ifdef __KERNEL__
+#if defined(CONFIG_MEDIATE_PASSTHRU) || defined(CONFIG_MEDIATE_PASSTHRU_MODULE)
+struct socket *mp_get_socket(struct file *);
+#else
+#include <linux/err.h>
+#include <linux/errno.h>
+struct file;
+struct socket;
+static inline struct socket *mp_get_socket(struct file *f)
+{
+	return ERR_PTR(-EINVAL);
+}
+#endif /* CONFIG_MEDIATE_PASSTHRU */
+#endif /* __KERNEL__ */
+#endif /* __MPASSTHRU_H */
-- 
1.5.4.4


^ permalink raw reply related

* Re: [PATCH v4] net: batch skb dequeueing from softnet input_pkt_queue
From: Changli Gao @ 2010-04-22  8:03 UTC (permalink / raw)
  To: Eric Dumazet
  Cc: Stephen Hemminger, David S. Miller, jamal, Tom Herbert, netdev
In-Reply-To: <1271920877.7895.4757.camel@edumazet-laptop>

On Thu, Apr 22, 2010 at 3:21 PM, Eric Dumazet <eric.dumazet@gmail.com> wrote:
> Jamal perf reports show lock contention but also cache line ping pongs.
>
> Yet, you keep a process_queue_len shared by producers and consumer.
>
> Producers want to read it, while consumer decrement it (dirtying its
> cache line) every packet, slowing down the things.
>
>
> The idea of batching is to let the consumer process its local queue with
> no impact to producers.
>
> Please remove it completely, or make the consumer zero it only at the
> end of batch processing.
>
> A cache line miss cost is about 120 cycles. Multiply it by 1 million
> packet per second...
>

OK, I'll remove it, and update the input_pkt_queue only before
process_backlog returns.


-- 
Regards,
Changli Gao(xiaosuo@gmail.com)

^ permalink raw reply

* Re: [PATCH net-next-2.6] net: Introduce skb_orphan_try()
From: Eric Dumazet @ 2010-04-22  7:59 UTC (permalink / raw)
  To: David Miller; +Cc: netdev
In-Reply-To: <20100422.005421.155629785.davem@davemloft.net>

Le jeudi 22 avril 2010 à 00:54 -0700, David Miller a écrit :
> From: Eric Dumazet <eric.dumazet@gmail.com>
> Date: Thu, 22 Apr 2010 09:47:03 +0200
> 
> > You could have one skb_orphan_try() call before the
> > 
> > if (netif_needs_gso(dev, skb)) {
> > 
> > and remove it from dev_gso_segment() ?
> 
> Yes, that's much more concise.  This should be ready to go:
> 
> net: Orphan and de-dst skbs earlier in xmit path.
> 
> This way GSO packets don't get handled differently.
> 
> With help from Eric Dumazet.
> 
> Signed-off-by: David S. Miller <davem@davemloft.net>
> 
> diff --git a/net/core/dev.c b/net/core/dev.c
> index 3ba774b..a4a7c36 100644
> --- a/net/core/dev.c
> +++ b/net/core/dev.c
> @@ -1902,13 +1902,6 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
>  		if (!list_empty(&ptype_all))
>  			dev_queue_xmit_nit(skb, dev);
>  
> -		if (netif_needs_gso(dev, skb)) {
> -			if (unlikely(dev_gso_segment(skb)))
> -				goto out_kfree_skb;
> -			if (skb->next)
> -				goto gso;
> -		}
> -
>  		/*
>  		 * If device doesnt need skb->dst, release it right now while
>  		 * its hot in this cpu cache
> @@ -1917,6 +1910,14 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
>  			skb_dst_drop(skb);
>  
>  		skb_orphan_try(skb);
> +
> +		if (netif_needs_gso(dev, skb)) {
> +			if (unlikely(dev_gso_segment(skb)))
> +				goto out_kfree_skb;
> +			if (skb->next)
> +				goto gso;
> +		}
> +
>  		rc = ops->ndo_start_xmit(skb, dev);
>  		if (rc == NETDEV_TX_OK)
>  			txq_trans_update(txq);

Signed-off-by: Eric Dumazet <eric.dumazet@gmail.com>

Thanks David !



^ permalink raw reply

* Re: [PATCH net-next-2.6] net: Introduce skb_orphan_try()
From: David Miller @ 2010-04-22  7:54 UTC (permalink / raw)
  To: eric.dumazet; +Cc: netdev
In-Reply-To: <1271922423.7895.4819.camel@edumazet-laptop>

From: Eric Dumazet <eric.dumazet@gmail.com>
Date: Thu, 22 Apr 2010 09:47:03 +0200

> You could have one skb_orphan_try() call before the
> 
> if (netif_needs_gso(dev, skb)) {
> 
> and remove it from dev_gso_segment() ?

Yes, that's much more concise.  This should be ready to go:

net: Orphan and de-dst skbs earlier in xmit path.

This way GSO packets don't get handled differently.

With help from Eric Dumazet.

Signed-off-by: David S. Miller <davem@davemloft.net>

diff --git a/net/core/dev.c b/net/core/dev.c
index 3ba774b..a4a7c36 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -1902,13 +1902,6 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
 		if (!list_empty(&ptype_all))
 			dev_queue_xmit_nit(skb, dev);
 
-		if (netif_needs_gso(dev, skb)) {
-			if (unlikely(dev_gso_segment(skb)))
-				goto out_kfree_skb;
-			if (skb->next)
-				goto gso;
-		}
-
 		/*
 		 * If device doesnt need skb->dst, release it right now while
 		 * its hot in this cpu cache
@@ -1917,6 +1910,14 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
 			skb_dst_drop(skb);
 
 		skb_orphan_try(skb);
+
+		if (netif_needs_gso(dev, skb)) {
+			if (unlikely(dev_gso_segment(skb)))
+				goto out_kfree_skb;
+			if (skb->next)
+				goto gso;
+		}
+
 		rc = ops->ndo_start_xmit(skb, dev);
 		if (rc == NETDEV_TX_OK)
 			txq_trans_update(txq);

^ permalink raw reply related

* Re: [PATCH net-next-2.6] net: Introduce skb_orphan_try()
From: Eric Dumazet @ 2010-04-22  7:47 UTC (permalink / raw)
  To: David Miller; +Cc: netdev
In-Reply-To: <20100422.004136.151480121.davem@davemloft.net>

Le jeudi 22 avril 2010 à 00:41 -0700, David Miller a écrit :
> From: Eric Dumazet <eric.dumazet@gmail.com>
> Date: Thu, 22 Apr 2010 09:33:57 +0200
> 
> > Le jeudi 22 avril 2010 à 00:26 -0700, David Miller a écrit :
> >> @@ -1865,6 +1865,7 @@ static int dev_gso_segment(struct sk_buff *skb)
> >>  	int features = dev->features & ~(illegal_highdma(dev, skb) ?
> >>  					 NETIF_F_SG : 0);
> >>  
> >> +	skb_orphan_try(skb);
> >>  	segs = skb_gso_segment(skb, features);
> >>  
> >>  	/* Verifying header integrity only. */
> > 
> > Yes, it seems better.
> > 
> > What about the 
> > 
> > if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
> > 	skb_dst_drop(skb);
> > 
> > This thing might also be moved before the split, since split probably
> > clone all dst ?
> 
> Good catch, agreed.
> 
> diff --git a/net/core/dev.c b/net/core/dev.c
> index 3ba774b..4f897e2 100644
> --- a/net/core/dev.c
> +++ b/net/core/dev.c
> @@ -1851,6 +1851,17 @@ static void dev_gso_skb_destructor(struct sk_buff *skb)
>  		cb->destructor(skb);
>  }
>  
> +/*
> + * Try to orphan skb early, right before transmission by the device.
> + * We cannot orphan skb if tx timestamp is requested, since
> + * drivers need to call skb_tstamp_tx() to send the timestamp.
> + */
> +static inline void skb_orphan_try(struct sk_buff *skb)
> +{
> +	if (!skb_tx(skb)->flags)
> +		skb_orphan(skb);
> +}
> +
>  /**
>   *	dev_gso_segment - Perform emulated hardware segmentation on skb.
>   *	@skb: buffer to segment
> @@ -1865,6 +1876,7 @@ static int dev_gso_segment(struct sk_buff *skb)
>  	int features = dev->features & ~(illegal_highdma(dev, skb) ?
>  					 NETIF_F_SG : 0);
>  
> +	skb_orphan_try(skb);
>  	segs = skb_gso_segment(skb, features);
>  
>  	/* Verifying header integrity only. */
> @@ -1881,17 +1893,6 @@ static int dev_gso_segment(struct sk_buff *skb)
>  	return 0;
>  }
>  
> -/*
> - * Try to orphan skb early, right before transmission by the device.
> - * We cannot orphan skb if tx timestamp is requested, since
> - * drivers need to call skb_tstamp_tx() to send the timestamp.
> - */
> -static inline void skb_orphan_try(struct sk_buff *skb)
> -{
> -	if (!skb_tx(skb)->flags)
> -		skb_orphan(skb);
> -}
> -
>  int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
>  			struct netdev_queue *txq)
>  {
> @@ -1902,13 +1903,6 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
>  		if (!list_empty(&ptype_all))
>  			dev_queue_xmit_nit(skb, dev);
>  
> -		if (netif_needs_gso(dev, skb)) {
> -			if (unlikely(dev_gso_segment(skb)))
> -				goto out_kfree_skb;
> -			if (skb->next)
> -				goto gso;
> -		}
> -
>  		/*
>  		 * If device doesnt need skb->dst, release it right now while
>  		 * its hot in this cpu cache
> @@ -1916,6 +1910,13 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
>  		if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
>  			skb_dst_drop(skb);
>  
> +		if (netif_needs_gso(dev, skb)) {
> +			if (unlikely(dev_gso_segment(skb)))
> +				goto out_kfree_skb;
> +			if (skb->next)
> +				goto gso;
> +		}
> +
>  		skb_orphan_try(skb);
>  		rc = ops->ndo_start_xmit(skb, dev);
>  		if (rc == NETDEV_TX_OK)

You could have one skb_orphan_try() call before the

if (netif_needs_gso(dev, skb)) {

and remove it from dev_gso_segment() ?



^ permalink raw reply

* Re: IPv6: race condition in __ipv6_ifa_notify() and dst_free() ?
From: David Miller @ 2010-04-22  7:43 UTC (permalink / raw)
  To: herbert; +Cc: jbohac, yoshfuji, netdev, shemminger
In-Reply-To: <20100422023211.GA7109@gondor.apana.org.au>

From: Herbert Xu <herbert@gondor.apana.org.au>
Date: Thu, 22 Apr 2010 10:32:11 +0800

> Anyway, I think the root of the issue is the fact that NDISC is
> calling addrconf_dad_failure with no locking whatsoever.  The
> latter is not idempotent so some form of locking is needed.
> 
> This bug appears to have been around since the very start.
> 
> I'll dig deeper to see where we might be able to add some locks.

Thanks Herbert.

^ permalink raw reply

* Re: [PATCH net-next-2.6] net: Introduce skb_orphan_try()
From: David Miller @ 2010-04-22  7:41 UTC (permalink / raw)
  To: eric.dumazet; +Cc: netdev
In-Reply-To: <1271921637.7895.4791.camel@edumazet-laptop>

From: Eric Dumazet <eric.dumazet@gmail.com>
Date: Thu, 22 Apr 2010 09:33:57 +0200

> Le jeudi 22 avril 2010 à 00:26 -0700, David Miller a écrit :
>> @@ -1865,6 +1865,7 @@ static int dev_gso_segment(struct sk_buff *skb)
>>  	int features = dev->features & ~(illegal_highdma(dev, skb) ?
>>  					 NETIF_F_SG : 0);
>>  
>> +	skb_orphan_try(skb);
>>  	segs = skb_gso_segment(skb, features);
>>  
>>  	/* Verifying header integrity only. */
> 
> Yes, it seems better.
> 
> What about the 
> 
> if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
> 	skb_dst_drop(skb);
> 
> This thing might also be moved before the split, since split probably
> clone all dst ?

Good catch, agreed.

diff --git a/net/core/dev.c b/net/core/dev.c
index 3ba774b..4f897e2 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -1851,6 +1851,17 @@ static void dev_gso_skb_destructor(struct sk_buff *skb)
 		cb->destructor(skb);
 }
 
+/*
+ * Try to orphan skb early, right before transmission by the device.
+ * We cannot orphan skb if tx timestamp is requested, since
+ * drivers need to call skb_tstamp_tx() to send the timestamp.
+ */
+static inline void skb_orphan_try(struct sk_buff *skb)
+{
+	if (!skb_tx(skb)->flags)
+		skb_orphan(skb);
+}
+
 /**
  *	dev_gso_segment - Perform emulated hardware segmentation on skb.
  *	@skb: buffer to segment
@@ -1865,6 +1876,7 @@ static int dev_gso_segment(struct sk_buff *skb)
 	int features = dev->features & ~(illegal_highdma(dev, skb) ?
 					 NETIF_F_SG : 0);
 
+	skb_orphan_try(skb);
 	segs = skb_gso_segment(skb, features);
 
 	/* Verifying header integrity only. */
@@ -1881,17 +1893,6 @@ static int dev_gso_segment(struct sk_buff *skb)
 	return 0;
 }
 
-/*
- * Try to orphan skb early, right before transmission by the device.
- * We cannot orphan skb if tx timestamp is requested, since
- * drivers need to call skb_tstamp_tx() to send the timestamp.
- */
-static inline void skb_orphan_try(struct sk_buff *skb)
-{
-	if (!skb_tx(skb)->flags)
-		skb_orphan(skb);
-}
-
 int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
 			struct netdev_queue *txq)
 {
@@ -1902,13 +1903,6 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
 		if (!list_empty(&ptype_all))
 			dev_queue_xmit_nit(skb, dev);
 
-		if (netif_needs_gso(dev, skb)) {
-			if (unlikely(dev_gso_segment(skb)))
-				goto out_kfree_skb;
-			if (skb->next)
-				goto gso;
-		}
-
 		/*
 		 * If device doesnt need skb->dst, release it right now while
 		 * its hot in this cpu cache
@@ -1916,6 +1910,13 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
 		if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
 			skb_dst_drop(skb);
 
+		if (netif_needs_gso(dev, skb)) {
+			if (unlikely(dev_gso_segment(skb)))
+				goto out_kfree_skb;
+			if (skb->next)
+				goto gso;
+		}
+
 		skb_orphan_try(skb);
 		rc = ops->ndo_start_xmit(skb, dev);
 		if (rc == NETDEV_TX_OK)

^ permalink raw reply related

* Re: [PATCH net-next-2.6] net: Introduce skb_orphan_try()
From: Eric Dumazet @ 2010-04-22  7:33 UTC (permalink / raw)
  To: David Miller; +Cc: netdev
In-Reply-To: <20100422.002623.00784210.davem@davemloft.net>

Le jeudi 22 avril 2010 à 00:26 -0700, David Miller a écrit :
> From: Eric Dumazet <eric.dumazet@gmail.com>
> Date: Thu, 22 Apr 2010 09:24:05 +0200
> 
> > Hmm... are you sure we want to call destructor for each skb ?
> > 
> > Should'nt we do it before initial skb is split ?
> 
> Good idea, therefore you mean something like this?
> 
> diff --git a/net/core/dev.c b/net/core/dev.c
> index 3ba774b..f3c3885 100644
> --- a/net/core/dev.c
> +++ b/net/core/dev.c
> @@ -1865,6 +1865,7 @@ static int dev_gso_segment(struct sk_buff *skb)
>  	int features = dev->features & ~(illegal_highdma(dev, skb) ?
>  					 NETIF_F_SG : 0);
>  
> +	skb_orphan_try(skb);
>  	segs = skb_gso_segment(skb, features);
>  
>  	/* Verifying header integrity only. */

Yes, it seems better.

What about the 

if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
	skb_dst_drop(skb);

This thing might also be moved before the split, since split probably
clone all dst ?



^ permalink raw reply

* Re: [PATCH net-next-2.6] rps: immediate send IPI in process_backlog()
From: Eric Dumazet @ 2010-04-22  7:28 UTC (permalink / raw)
  To: David Miller; +Cc: therbert, xiaosuo, netdev
In-Reply-To: <20100422.002118.107274505.davem@davemloft.net>

Le jeudi 22 avril 2010 à 00:21 -0700, David Miller a écrit :
> From: Eric Dumazet <eric.dumazet@gmail.com>
> Date: Wed, 21 Apr 2010 23:04:58 +0200
> 
> > If some skb are queued to our backlog, we are delaying IPI sending at
> > the end of net_rx_action(), increasing latencies. This defeats the
> > queueing, since we want to quickly dispatch packets to the pool of
> > worker cpus, then eventually deeply process our packets.
> > 
> > It's better to send IPI before processing our packets in upper layers,
> > from process_backlog().
> > 
> > Change the _and_disable_irq suffix to _and_enable_irq(), since we enable
> > local irq in net_rps_action(), sorry for the confusion.
> > 
> > Signed-off-by: Eric Dumazet <eric.dumazet@gmail.com>
> 
> Eric, irqs are enabled in process_backlog(), so I don't know how legal
> it is to invoke net_rps_action_and_irq_enable() from there.
> 
> At least, if you are depending upon a later action to pick up the
> pieces if the rps_ipi_list test races, you need to update the comment
> above net_rps_action_and_irq_enable() since it states that it is
> always invoked with IRQs disabled :-)
> --

But I do disable irqs berfore calling this function from
process_backlog, only if current pointer is non null.

Pointer is then re-fetched inside net_rps_action_and_irq_enable()

I thought using xchg(), but this adds an atomic op, so I think its
better to use local_irq_disable()/enable() pairs.


About the comment, it says :

/*
 * net_rps_action sends any pending IPI's for rps.
 * Note: called with local irq disabled, but exits with local irq
enabled.
 */


So it documents this function is called with irq disabled, and re-enable
them before return ?



^ permalink raw reply

* Re: [PATCH net-next-2.6] net: Introduce skb_orphan_try()
From: David Miller @ 2010-04-22  7:26 UTC (permalink / raw)
  To: eric.dumazet; +Cc: netdev
In-Reply-To: <1271921045.7895.4763.camel@edumazet-laptop>

From: Eric Dumazet <eric.dumazet@gmail.com>
Date: Thu, 22 Apr 2010 09:24:05 +0200

> Hmm... are you sure we want to call destructor for each skb ?
> 
> Should'nt we do it before initial skb is split ?

Good idea, therefore you mean something like this?

diff --git a/net/core/dev.c b/net/core/dev.c
index 3ba774b..f3c3885 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -1865,6 +1865,7 @@ static int dev_gso_segment(struct sk_buff *skb)
 	int features = dev->features & ~(illegal_highdma(dev, skb) ?
 					 NETIF_F_SG : 0);
 
+	skb_orphan_try(skb);
 	segs = skb_gso_segment(skb, features);
 
 	/* Verifying header integrity only. */

^ permalink raw reply related


This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox