netdev.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* Re: [PATCH] Packet socket: mmapped IO: PACKET_TX_RING
@ 2008-10-30 13:00 Johann Baudy
  2008-10-30 18:21 ` Lovich, Vitali
  0 siblings, 1 reply; 59+ messages in thread
From: Johann Baudy @ 2008-10-30 13:00 UTC (permalink / raw)
  To: David Miller; +Cc: netdev

Hi David,

> 
> Please don't do this, the comma there is intentional.
> 
> If a future patch adds a new version, the diff will be the
> addition of one new line, instead of one changed line and one
> new line.


Thanks for your comment.
Please find below new patch with those changes:
- new skb->packet_index field is now used instead of skb->mark (to forward frame index between send() and skb destructor)
- send() spin locks 
- tpacket_version comma


 Documentation/networking/packet_mmap.txt |  132 ++++++++--
 include/linux/if_packet.h                |    1 +
 include/linux/skbuff.h                   |    4 +
 net/packet/af_packet.c                   |  426 +++++++++++++++++++++++++-----
 4 files changed, 477 insertions(+), 86 deletions(-)

diff --git a/Documentation/networking/packet_mmap.txt b/Documentation/networking/packet_mmap.txt
index 07c53d5..f07d39a 100644
--- a/Documentation/networking/packet_mmap.txt
+++ b/Documentation/networking/packet_mmap.txt
@@ -4,8 +4,8 @@
 
 This file documents the CONFIG_PACKET_MMAP option available with the PACKET
 socket interface on 2.4 and 2.6 kernels. This type of sockets is used for 
-capture network traffic with utilities like tcpdump or any other that uses 
-the libpcap library. 
+capture network traffic with utilities like tcpdump or any other that needs
+raw access to network interface.
 
 You can find the latest version of this document at
 
@@ -14,6 +14,7 @@ You can find the latest version of this document at
 Please send me your comments to
 
     Ulisses Alonso Camaró <uaca@i.hate.spam.alumni.uv.es>
+    Johann Baudy <johann.baudy@gnu-log.net> (TX RING)
 
 -------------------------------------------------------------------------------
 + Why use PACKET_MMAP
@@ -25,19 +26,24 @@ to capture each packet, it requires two if you want to get packet's
 timestamp (like libpcap always does).
 
 In the other hand PACKET_MMAP is very efficient. PACKET_MMAP provides a size 
-configurable circular buffer mapped in user space. This way reading packets just 
-needs to wait for them, most of the time there is no need to issue a single 
-system call. By using a shared buffer between the kernel and the user 
-also has the benefit of minimizing packet copies.
-
-It's fine to use PACKET_MMAP to improve the performance of the capture process, 
-but it isn't everything. At least, if you are capturing at high speeds (this 
-is relative to the cpu speed), you should check if the device driver of your 
-network interface card supports some sort of interrupt load mitigation or 
-(even better) if it supports NAPI, also make sure it is enabled.
+configurable circular buffer mapped in user space that can be used to either
+send or receive packets. This way reading packets just needs to wait for them,
+most of the time there is no need to issue a single system call. Concerning
+transmission, multiple packets can be sent through one system call to get the
+highest bandwidth.
+By using a shared buffer between the kernel and the user also has the benefit
+of minimizing packet copies.
+
+It's fine to use PACKET_MMAP to improve the performance of the capture and
+transmission process, but it isn't everything. At least, if you are capturing
+at high speeds (this is relative to the cpu speed), you should check if the
+device driver of your network interface card supports some sort of interrupt
+load mitigation or (even better) if it supports NAPI, also make sure it is
+enabled. For transmission, check the MTU (Maximum Transmission Unit) used and
+supported by devices of your network.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP
++ How to use CONFIG_PACKET_MMAP to improve capture process
 --------------------------------------------------------------------------------
 
 From the user standpoint, you should use the higher level libpcap library, which
@@ -57,7 +63,7 @@ the low level details or want to improve libpcap by including PACKET_MMAP
 support.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP directly
++ How to use CONFIG_PACKET_MMAP directly to improve capture process
 --------------------------------------------------------------------------------
 
 From the system calls stand point, the use of PACKET_MMAP involves
@@ -66,6 +72,7 @@ the following process:
 
 [setup]     socket() -------> creation of the capture socket
             setsockopt() ---> allocation of the circular buffer (ring)
+                              option: PACKET_RX_RING
             mmap() ---------> mapping of the allocated buffer to the
                               user process
 
@@ -97,13 +104,75 @@ also the mapping of the circular buffer in the user process and
 the use of this buffer.
 
 --------------------------------------------------------------------------------
++ How to use CONFIG_PACKET_MMAP directly to improve transmission process
+--------------------------------------------------------------------------------
+Transmission process is similar to capture as shown below.
+
+[setup]          socket() -------> creation of the transmission socket
+                 setsockopt() ---> allocation of the circular buffer (ring)
+                                   option: PACKET_TX_RING
+                 bind() ---------> bind transmission socket with a network interface
+                 mmap() ---------> mapping of the allocated buffer to the
+                                   user process
+
+[transmission]   poll() ---------> wait for free packets (optional)
+                 send() ---------> send all packets that are set as ready in
+                                   the ring
+                                   The flag MSG_DONTWAIT can be used to return
+                                   before end of transfer.
+
+[shutdown]  close() --------> destruction of the transmission socket and
+                              deallocation of all associated resources.
+
+Binding the socket to your network interface is mandatory (with zero copy) to
+know the header size of frames used in the circular buffer.
+
+As capture, each frame contains two parts:
+
+ --------------------
+| struct tpacket_hdr | Header. It contains the status of
+|                    | of this frame
+|--------------------|
+| data buffer        |
+.                    .  Data that will be sent over the network interface.
+.                    .
+ --------------------
+
+ bind() associates the socket to your network interface thanks to
+ sll_ifindex parameter of struct sockaddr_ll.
+
+ Initialization example:
+
+ struct sockaddr_ll my_addr;
+ struct ifreq s_ifr;
+ ...
+
+ strncpy (s_ifr.ifr_name, "eth0", sizeof(s_ifr.ifr_name));
+
+ /* get interface index of eth0 */
+ ioctl(this->socket, SIOCGIFINDEX, &s_ifr);
+
+ /* fill sockaddr_ll struct to prepare binding */
+ my_addr.sll_family = AF_PACKET;
+ my_addr.sll_protocol = ETH_P_ALL;
+ my_addr.sll_ifindex =  s_ifr.ifr_ifindex;
+
+ /* bind socket to eth0 */
+ bind(this->socket, (struct sockaddr *)&my_addr, sizeof(struct sockaddr_ll));
+
+ A complete tutorial is available at: http://wiki.gnu-log.net/
+
+--------------------------------------------------------------------------------
 + PACKET_MMAP settings
 --------------------------------------------------------------------------------
 
 
 To setup PACKET_MMAP from user level code is done with a call like
 
+ - Capture process
      setsockopt(fd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req))
+ - Transmission process
+     setsockopt(fd, SOL_PACKET, PACKET_TX_RING, (void *) &req, sizeof(req))
 
 The most significant argument in the previous call is the req parameter, 
 this parameter must to have the following structure:
@@ -117,11 +186,11 @@ this parameter must to have the following structure:
     };
 
 This structure is defined in /usr/include/linux/if_packet.h and establishes a 
-circular buffer (ring) of unswappable memory mapped in the capture process. 
+circular buffer (ring) of unswappable memory.
 Being mapped in the capture process allows reading the captured frames and 
 related meta-information like timestamps without requiring a system call.
 
-Captured frames are grouped in blocks. Each block is a physically contiguous 
+Frames are grouped in blocks. Each block is a physically contiguous
 region of memory and holds tp_block_size/tp_frame_size frames. The total number 
 of blocks is tp_block_nr. Note that tp_frame_nr is a redundant parameter because
 
@@ -336,6 +405,7 @@ struct tpacket_hdr). If this field is 0 means that the frame is ready
 to be used for the kernel, If not, there is a frame the user can read 
 and the following flags apply:
 
++++ Capture process:
      from include/linux/if_packet.h
 
      #define TP_STATUS_COPY          2 
@@ -391,6 +461,36 @@ packets are in the ring:
 It doesn't incur in a race condition to first check the status value and 
 then poll for frames.
 
+
+++ Transmission process
+Those defines are also used for transmission:
+
+     #define TP_STATUS_KERNEL        0 // Frame is available
+     #define TP_STATUS_USER          1 // Frame will be sent on next send()
+     #define TP_STATUS_COPY          2 // Frame is currently in transmission
+
+First, the kernel initializes all frames to TP_STATUS_KERNEL. To send a packet,
+the user fills a data buffer of an available frame, sets tp_len to current
+data buffer size and sets its status field to TP_STATUS_USER. This can be done
+on multiple frames. Once the user is ready to transmit, it calls send().
+Then all buffers with status equal to TP_STATUS_USER are forwarded to the
+network device. The kernel updates each status of sent frames with
+TP_STATUS_COPY until the end of transfer.
+At the end of each transfer, buffer status returns to TP_STATUS_KERNEL.
+
+    header->tp_len = in_i_size;
+    header->tp_status = TP_STATUS_USER;
+    retval = send(this->socket, NULL, 0, 0);
+
+The user can also use poll() to check if a buffer is available:
+(status == TP_STATUS_KERNEL)
+
+    struct pollfd pfd;
+    pfd.fd = fd;
+    pfd.revents = 0;
+    pfd.events = POLLOUT;
+    retval = poll(&pfd, 1, timeout);
+
 --------------------------------------------------------------------------------
 + THANKS
 --------------------------------------------------------------------------------
diff --git a/include/linux/if_packet.h b/include/linux/if_packet.h
index 18db066..6682963 100644
--- a/include/linux/if_packet.h
+++ b/include/linux/if_packet.h
@@ -46,6 +46,7 @@ struct sockaddr_ll
 #define PACKET_VERSION			10
 #define PACKET_HDRLEN			11
 #define PACKET_RESERVE			12
+#define PACKET_TX_RING			13
 
 struct tpacket_stats
 {
diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index 9099237..f52a75d 100644
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -248,6 +248,7 @@ typedef unsigned char *sk_buff_data_t;
  *		done by skb DMA functions
  *	@secmark: security marking
  *	@vlan_tci: vlan tag control information
+ *	@packet_index: index of packet mmap frame
  */
 
 struct sk_buff {
@@ -328,6 +329,9 @@ struct sk_buff {
 #ifdef CONFIG_NETWORK_SECMARK
 	__u32			secmark;
 #endif
+#ifdef CONFIG_PACKET_MMAP
+	__u32			packet_index;
+#endif
 
 	__u32			mark;
 
diff --git a/net/packet/af_packet.c b/net/packet/af_packet.c
index c718e7e..87d6c12 100644
--- a/net/packet/af_packet.c
+++ b/net/packet/af_packet.c
@@ -156,7 +156,25 @@ struct packet_mreq_max
 };
 
 #ifdef CONFIG_PACKET_MMAP
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing);
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req,
+		int closing, int tx_ring);
+
+struct packet_ring_buffer {
+	char *			*pg_vec;
+	unsigned int		head;
+	unsigned int		frames_per_block;
+	unsigned int		frame_size;
+	unsigned int		frame_max;
+
+	unsigned int		pg_vec_order;
+	unsigned int		pg_vec_pages;
+	unsigned int		pg_vec_len;
+
+	spinlock_t		lock;
+};
+
+struct packet_sock;
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg);
 #endif
 
 static void packet_flush_mclist(struct sock *sk);
@@ -166,12 +184,10 @@ struct packet_sock {
 	struct sock		sk;
 	struct tpacket_stats	stats;
 #ifdef CONFIG_PACKET_MMAP
-	char *			*pg_vec;
-	unsigned int		head;
-	unsigned int            frames_per_block;
-	unsigned int		frame_size;
-	unsigned int		frame_max;
+	struct packet_ring_buffer	rx_ring;
+	struct packet_ring_buffer	tx_ring;
 	int			copy_thresh;
+	atomic_t		tx_pending_skb;
 #endif
 	struct packet_type	prot_hook;
 	spinlock_t		bind_lock;
@@ -183,9 +199,6 @@ struct packet_sock {
 	struct packet_mclist	*mclist;
 #ifdef CONFIG_PACKET_MMAP
 	atomic_t		mapped;
-	unsigned int            pg_vec_order;
-	unsigned int		pg_vec_pages;
-	unsigned int		pg_vec_len;
 	enum tpacket_versions	tp_version;
 	unsigned int		tp_hdrlen;
 	unsigned int		tp_reserve;
@@ -204,8 +217,10 @@ struct packet_skb_cb {
 
 #ifdef CONFIG_PACKET_MMAP
 
-static void *packet_lookup_frame(struct packet_sock *po, unsigned int position,
-				 int status)
+static void *packet_lookup_frame(struct packet_sock *po,
+		struct packet_ring_buffer *buff,
+		unsigned int position,
+		int status)
 {
 	unsigned int pg_vec_pos, frame_offset;
 	union {
@@ -214,25 +229,50 @@ static void *packet_lookup_frame(struct packet_sock *po, unsigned int position,
 		void *raw;
 	} h;
 
-	pg_vec_pos = position / po->frames_per_block;
-	frame_offset = position % po->frames_per_block;
+	pg_vec_pos = position / buff->frames_per_block;
+	frame_offset = position % buff->frames_per_block;
 
-	h.raw = po->pg_vec[pg_vec_pos] + (frame_offset * po->frame_size);
+	h.raw = buff->pg_vec[pg_vec_pos] + (frame_offset * buff->frame_size);
 	switch (po->tp_version) {
 	case TPACKET_V1:
-		if (status != h.h1->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL)
+		if (status != h.h1->tp_status)
 			return NULL;
 		break;
 	case TPACKET_V2:
-		if (status != h.h2->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL)
+		if (status != h.h2->tp_status)
 			return NULL;
 		break;
 	}
 	return h.raw;
 }
 
+static inline void *packet_current_rx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->rx_ring, po->rx_ring.head, status);
+}
+
+static inline void *packet_current_tx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->tx_ring, po->tx_ring.head, status);
+}
+
+static inline void *packet_previous_rx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->rx_ring.head ? po->rx_ring.head - 1 : po->rx_ring.frame_max;
+	return packet_lookup_frame(po, &po->rx_ring, previous, status);
+}
+
+static inline void *packet_previous_tx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->tx_ring.head ? po->tx_ring.head - 1 : po->tx_ring.frame_max;
+	return packet_lookup_frame(po, &po->tx_ring, previous, status);
+}
+
+static inline void packet_increment_head(struct packet_ring_buffer *buff)
+{
+	buff->head = buff->head != buff->frame_max ? buff->head+1 : 0;
+}
+
 static void __packet_set_status(struct packet_sock *po, void *frame, int status)
 {
 	union {
@@ -646,7 +686,7 @@ static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 		macoff = netoff - maclen;
 	}
 
-	if (macoff + snaplen > po->frame_size) {
+	if (macoff + snaplen > po->rx_ring.frame_size) {
 		if (po->copy_thresh &&
 		    atomic_read(&sk->sk_rmem_alloc) + skb->truesize <
 		    (unsigned)sk->sk_rcvbuf) {
@@ -659,16 +699,16 @@ static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 			if (copy_skb)
 				skb_set_owner_r(copy_skb, sk);
 		}
-		snaplen = po->frame_size - macoff;
+		snaplen = po->rx_ring.frame_size - macoff;
 		if ((int)snaplen < 0)
 			snaplen = 0;
 	}
 
 	spin_lock(&sk->sk_receive_queue.lock);
-	h.raw = packet_lookup_frame(po, po->head, TP_STATUS_KERNEL);
+	h.raw = packet_current_rx_frame(po, TP_STATUS_KERNEL);
 	if (!h.raw)
 		goto ring_is_full;
-	po->head = po->head != po->frame_max ? po->head+1 : 0;
+	packet_increment_head(&po->rx_ring);
 	po->stats.tp_packets++;
 	if (copy_skb) {
 		status |= TP_STATUS_COPY;
@@ -759,10 +799,214 @@ ring_is_full:
 	goto drop_n_restore;
 }
 
-#endif
+static void tpacket_destruct_skb(struct sk_buff *skb)
+{
+	struct packet_sock *po = pkt_sk(skb->sk);
+	void * ph;
 
+	BUG_ON(skb == NULL);
+	ph = packet_lookup_frame( po, &po->tx_ring, skb->packet_index, TP_STATUS_COPY);
 
-static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+	BUG_ON(ph == NULL);
+	BUG_ON(atomic_read(&po->tx_pending_skb) == 0);
+
+	atomic_dec(&po->tx_pending_skb);
+	__packet_set_status(po, ph, TP_STATUS_KERNEL);
+
+	sock_wfree(skb);
+}
+
+static int tpacket_fill_skb(struct packet_sock *po, struct sk_buff * skb, void * frame,
+		struct net_device *dev, int size_max, __be16 proto,
+		unsigned char * addr)
+{
+	union {
+		struct tpacket_hdr *h1;
+		struct tpacket2_hdr *h2;
+		void *raw;
+	} ph;
+	int to_write, offset, len, tp_len;
+	struct socket *sock = po->sk.sk_socket;
+	struct page *page;
+	void *data;
+	int err;
+
+	ph.raw = frame;
+
+	skb->protocol = proto;
+	skb->dev = dev;
+	skb->priority = po->sk.sk_priority;
+	skb->destructor = tpacket_destruct_skb;
+	skb->packet_index = po->tx_ring.head;
+
+	switch(po->tp_version) {
+	case TPACKET_V2:
+		tp_len = ph.h1->tp_len;
+		break;
+	default:
+		tp_len = ph.h1->tp_len;
+		break;
+	}
+
+	if (unlikely(tp_len > size_max)) {
+		printk(KERN_ERR "packet size is too long (%d > %d)\n",
+				tp_len, size_max);
+		return -EMSGSIZE;
+	}
+
+	skb_reserve(skb, LL_RESERVED_SPACE(dev));
+	data = ph.raw + po->tp_hdrlen;
+
+	if (sock->type == SOCK_DGRAM) {
+		err = dev_hard_header(skb, dev, ntohs(proto), addr,
+				NULL, tp_len);
+		if (unlikely(err < 0))
+			return -EINVAL;
+	} else if (dev->hard_header_len ) {
+		/* net device doesn't like empty head */
+		if(unlikely(tp_len <= dev->hard_header_len)) {
+			printk(KERN_ERR "packet size is too short "
+					"(%d < %d)\n", tp_len,
+					dev->hard_header_len);
+			return -EINVAL;
+		}
+
+		skb_push(skb, dev->hard_header_len);
+		err = skb_store_bits(skb, 0, data,
+				dev->hard_header_len);
+		if (unlikely(err))
+			return err;
+	}
+
+	err = -EFAULT;
+	to_write = tp_len - dev->hard_header_len;
+	data += dev->hard_header_len;
+	page = virt_to_page(data);
+	len = ((to_write > PAGE_SIZE) ? PAGE_SIZE : to_write);
+
+	offset = (int)((long)data & (~PAGE_MASK));
+	len -= offset;
+
+	skb->data_len = to_write;
+	skb->len += to_write;
+
+	while ( likely(to_write) ) {
+		get_page(page);
+		skb_fill_page_desc(skb,
+				skb_shinfo(skb)->nr_frags,
+				page++, offset, len);
+		to_write -= len;
+		len = (to_write > PAGE_SIZE) ? PAGE_SIZE : to_write;
+		offset = 0;
+	}
+
+	return tp_len;
+}
+
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg)
+{
+	struct socket *sock;
+	struct sk_buff *skb;
+	struct net_device *dev;
+	__be16 proto;
+	int ifindex, err, reserve = 0;
+	void * ph;
+	struct sockaddr_ll *saddr=(struct sockaddr_ll *)msg->msg_name;
+	int tp_len, size_max;
+	unsigned char *addr;
+	int len_sum = 0;
+
+	BUG_ON(po == NULL);
+	sock = po->sk.sk_socket;
+
+	if (saddr == NULL) {
+		ifindex	= po->ifindex;
+		proto	= po->num;
+		addr	= NULL;
+	} else {
+		err = -EINVAL;
+		if (msg->msg_namelen < sizeof(struct sockaddr_ll))
+			goto out;
+		if (msg->msg_namelen < (saddr->sll_halen + offsetof(struct sockaddr_ll, sll_addr)))
+			goto out;
+		ifindex	= saddr->sll_ifindex;
+		proto	= saddr->sll_protocol;
+		addr	= saddr->sll_addr;
+	}
+
+	dev = dev_get_by_index(sock_net(&po->sk), ifindex);
+	err = -ENXIO;
+	if (unlikely(dev == NULL))
+		goto out;
+
+	err = -EINVAL;
+	if (unlikely(sock->type != SOCK_RAW))
+		goto out_unlock;
+
+	reserve = dev->hard_header_len;
+
+	err = -ENETDOWN;
+	if (unlikely(!(dev->flags & IFF_UP)))
+		goto out_unlock;
+
+	size_max = po->tx_ring.frame_size - sizeof(struct skb_shared_info)
+		- po->tp_hdrlen - LL_ALLOCATED_SPACE(dev);
+
+	if (size_max > dev->mtu + reserve)
+		size_max = dev->mtu + reserve;
+
+	do
+	{
+		spin_lock(&po->tx_ring.lock);
+		ph = packet_current_tx_frame(po, TP_STATUS_USER);
+		if(unlikely(ph == NULL)) {
+			spin_unlock(&po->tx_ring.lock);
+			continue;
+		}
+
+		__packet_set_status(po, ph, TP_STATUS_COPY);
+		atomic_inc(&po->tx_pending_skb);
+		spin_unlock(&po->tx_ring.lock);
+
+		skb = sock_alloc_send_skb(&po->sk, LL_ALLOCATED_SPACE(dev),
+				msg->msg_flags & MSG_DONTWAIT, &err);
+		if (unlikely(skb == NULL))
+			goto out_status;
+
+		tp_len = tpacket_fill_skb(po, skb, ph, dev, size_max, proto,
+				addr);
+		if(unlikely(tp_len < 0)) {
+			err = tp_len;
+			goto out_free;
+		}
+
+		err = dev_queue_xmit(skb);
+		if (unlikely(err > 0 && (err = net_xmit_errno(err)) != 0))
+			goto out_free;
+
+		packet_increment_head(&po->tx_ring);
+		len_sum += tp_len;
+	}
+	while(likely((ph != NULL)
+				|| ((!(msg->msg_flags & MSG_DONTWAIT))
+					&& atomic_read(&po->tx_pending_skb))));
+
+	err = len_sum;
+	goto out_unlock;
+
+out_free:
+	kfree_skb(skb);
+out_status:
+	__packet_set_status(po, ph, TP_STATUS_USER);
+	atomic_dec(&po->tx_pending_skb);
+out_unlock:
+	dev_put(dev);
+out:
+	return err;
+}
+#endif
+
+static int packet_snd(struct socket *sock,
 			  struct msghdr *msg, size_t len)
 {
 	struct sock *sk = sock->sk;
@@ -853,6 +1097,19 @@ out:
 	return err;
 }
 
+static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+		struct msghdr *msg, size_t len)
+{
+#ifdef CONFIG_PACKET_MMAP
+	struct sock *sk = sock->sk;
+	struct packet_sock *po = pkt_sk(sk);
+	if (po->tx_ring.pg_vec)
+		return tpacket_snd(po, msg);
+	else
+#endif
+		return packet_snd(sock, msg, len);
+}
+
 /*
  *	Close a PACKET socket. This is fairly simple. We immediately go
  *	to 'closed' state and remove our protocol entry in the device list.
@@ -891,10 +1148,13 @@ static int packet_release(struct socket *sock)
 	packet_flush_mclist(sk);
 
 #ifdef CONFIG_PACKET_MMAP
-	if (po->pg_vec) {
+	{
 		struct tpacket_req req;
 		memset(&req, 0, sizeof(req));
-		packet_set_ring(sk, &req, 1);
+		if (po->rx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 0);
+		if (po->tx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 1);
 	}
 #endif
 
@@ -1411,6 +1671,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 #ifdef CONFIG_PACKET_MMAP
 	case PACKET_RX_RING:
+	case PACKET_TX_RING:
 	{
 		struct tpacket_req req;
 
@@ -1418,7 +1679,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 			return -EINVAL;
 		if (copy_from_user(&req,optval,sizeof(req)))
 			return -EFAULT;
-		return packet_set_ring(sk, &req, 0);
+		return packet_set_ring(sk, &req, 0, optname == PACKET_TX_RING);
 	}
 	case PACKET_COPY_THRESH:
 	{
@@ -1438,7 +1699,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1457,7 +1718,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1701,13 +1962,17 @@ static unsigned int packet_poll(struct file * file, struct socket *sock,
 	unsigned int mask = datagram_poll(file, sock, wait);
 
 	spin_lock_bh(&sk->sk_receive_queue.lock);
-	if (po->pg_vec) {
-		unsigned last = po->head ? po->head-1 : po->frame_max;
-
-		if (packet_lookup_frame(po, last, TP_STATUS_USER))
+	if (po->rx_ring.pg_vec) {
+		if (packet_previous_rx_frame(po, TP_STATUS_USER))
 			mask |= POLLIN | POLLRDNORM;
 	}
 	spin_unlock_bh(&sk->sk_receive_queue.lock);
+	spin_lock_bh(&sk->sk_write_queue.lock);
+	if (po->tx_ring.pg_vec) {
+		if (packet_current_tx_frame(po, TP_STATUS_KERNEL))
+			mask |= POLLOUT | POLLWRNORM;
+	}
+	spin_unlock_bh(&sk->sk_write_queue.lock);
 	return mask;
 }
 
@@ -1783,20 +2048,24 @@ out_free_pgvec:
 	goto out;
 }
 
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing)
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing, int tx_ring)
 {
 	char **pg_vec = NULL;
 	struct packet_sock *po = pkt_sk(sk);
 	int was_running, order = 0;
+	struct packet_ring_buffer *rb;
+	struct sk_buff_head *rb_queue;
 	__be16 num;
 	int err = 0;
 
+	rb = tx_ring ? &po->tx_ring : &po->rx_ring;
+	rb_queue = tx_ring ? &sk->sk_write_queue : &sk->sk_receive_queue;
+
 	if (req->tp_block_nr) {
 		int i;
 
 		/* Sanity tests and some calculations */
-
-		if (unlikely(po->pg_vec))
+		if (unlikely(rb->pg_vec))
 			return -EBUSY;
 
 		switch (po->tp_version) {
@@ -1813,16 +2082,16 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 		if (unlikely(req->tp_block_size & (PAGE_SIZE - 1)))
 			return -EINVAL;
 		if (unlikely(req->tp_frame_size < po->tp_hdrlen +
-						  po->tp_reserve))
+					po->tp_reserve))
 			return -EINVAL;
 		if (unlikely(req->tp_frame_size & (TPACKET_ALIGNMENT - 1)))
 			return -EINVAL;
 
-		po->frames_per_block = req->tp_block_size/req->tp_frame_size;
-		if (unlikely(po->frames_per_block <= 0))
+		rb->frames_per_block = req->tp_block_size/req->tp_frame_size;
+		if (unlikely(rb->frames_per_block <= 0))
 			return -EINVAL;
-		if (unlikely((po->frames_per_block * req->tp_block_nr) !=
-			     req->tp_frame_nr))
+		if (unlikely((rb->frames_per_block * req->tp_block_nr) !=
+					req->tp_frame_nr))
 			return -EINVAL;
 
 		err = -ENOMEM;
@@ -1835,17 +2104,19 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 			void *ptr = pg_vec[i];
 			int k;
 
-			for (k = 0; k < po->frames_per_block; k++) {
+			for (k = 0; k < rb->frames_per_block; k++) {
 				__packet_set_status(po, ptr, TP_STATUS_KERNEL);
 				ptr += req->tp_frame_size;
 			}
 		}
-		/* Done */
-	} else {
+	}
+	/* Done */
+	else {
 		if (unlikely(req->tp_frame_nr))
 			return -EINVAL;
 	}
 
+
 	lock_sock(sk);
 
 	/* Detach socket from network */
@@ -1866,20 +2137,19 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 	if (closing || atomic_read(&po->mapped) == 0) {
 		err = 0;
 #define XC(a, b) ({ __typeof__ ((a)) __t; __t = (a); (a) = (b); __t; })
-
-		spin_lock_bh(&sk->sk_receive_queue.lock);
-		pg_vec = XC(po->pg_vec, pg_vec);
-		po->frame_max = (req->tp_frame_nr - 1);
-		po->head = 0;
-		po->frame_size = req->tp_frame_size;
-		spin_unlock_bh(&sk->sk_receive_queue.lock);
-
-		order = XC(po->pg_vec_order, order);
-		req->tp_block_nr = XC(po->pg_vec_len, req->tp_block_nr);
-
-		po->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
-		po->prot_hook.func = po->pg_vec ? tpacket_rcv : packet_rcv;
-		skb_queue_purge(&sk->sk_receive_queue);
+		spin_lock_bh(&rb_queue->lock);
+		pg_vec = XC(rb->pg_vec, pg_vec);
+		rb->frame_max = (req->tp_frame_nr - 1);
+		rb->head = 0;
+		rb->frame_size = req->tp_frame_size;
+		spin_unlock_bh(&rb_queue->lock);
+
+		order = XC(rb->pg_vec_order, order);
+		req->tp_block_nr = XC(rb->pg_vec_len, req->tp_block_nr);
+
+		rb->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
+		po->prot_hook.func = (po->rx_ring.pg_vec) ? tpacket_rcv : packet_rcv;
+		skb_queue_purge(rb_queue);
 #undef XC
 		if (atomic_read(&po->mapped))
 			printk(KERN_DEBUG "packet_mmap: vma is busy: %d\n", atomic_read(&po->mapped));
@@ -1906,7 +2176,8 @@ static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 {
 	struct sock *sk = sock->sk;
 	struct packet_sock *po = pkt_sk(sk);
-	unsigned long size;
+	unsigned long size, expected_size;
+	struct packet_ring_buffer *rb;
 	unsigned long start;
 	int err = -EINVAL;
 	int i;
@@ -1917,23 +2188,38 @@ static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 	size = vma->vm_end - vma->vm_start;
 
 	lock_sock(sk);
-	if (po->pg_vec == NULL)
+
+	expected_size = 0;
+	if (po->rx_ring.pg_vec)
+		expected_size += po->rx_ring.pg_vec_len * po->rx_ring.pg_vec_pages * PAGE_SIZE;
+	if (po->tx_ring.pg_vec)
+		expected_size += po->tx_ring.pg_vec_len * po->tx_ring.pg_vec_pages * PAGE_SIZE;
+
+	if (expected_size == 0)
 		goto out;
-	if (size != po->pg_vec_len*po->pg_vec_pages*PAGE_SIZE)
+
+	if (size != expected_size)
 		goto out;
 
 	start = vma->vm_start;
-	for (i = 0; i < po->pg_vec_len; i++) {
-		struct page *page = virt_to_page(po->pg_vec[i]);
-		int pg_num;
-
-		for (pg_num = 0; pg_num < po->pg_vec_pages; pg_num++, page++) {
-			err = vm_insert_page(vma, start, page);
-			if (unlikely(err))
-				goto out;
-			start += PAGE_SIZE;
+
+	for(rb = &po->rx_ring; rb <= &po->tx_ring; rb++) {
+		if (rb->pg_vec == NULL)
+			continue;
+
+		for (i = 0; i < rb->pg_vec_len; i++) {
+			struct page *page = virt_to_page(rb->pg_vec[i]);
+			int pg_num;
+
+			for (pg_num = 0; pg_num < rb->pg_vec_pages; pg_num++, page++) {
+				err = vm_insert_page(vma, start, page);
+				if (unlikely(err))
+					goto out;
+				start += PAGE_SIZE;
+			}
 		}
 	}
+
 	atomic_inc(&po->mapped);
 	vma->vm_ops = &packet_mmap_ops;
 	err = 0;






^ permalink raw reply related	[flat|nested] 59+ messages in thread
* Re: [PATCH] Packet socket: mmapped IO: PACKET_TX_RING
@ 2008-11-12 13:19 Johann Baudy
  0 siblings, 0 replies; 59+ messages in thread
From: Johann Baudy @ 2008-11-12 13:19 UTC (permalink / raw)
  To: Lovich, Vitali; +Cc: Evgeniy Polyakov, netdev@vger.kernel.org

Hi Vitali,

Please find below my draft.

According to our discussion and your proposal to replace Mutex, I've made this draft.
A counter busy: That is used to prevent send from simultaneous send(). 
	Initialized to 1, decremented at beginning of send()( with atomic_dec_and_test()), returns EBUSY if result != 0.
	This one is used in packet_set_ring() and tpacket_snd().

A flag shutdown: To notify send() that a shutdown is ongoing.
	When a release occurs, packet_release raises shutdown flag then it performs a while(atomic(busy counter != 1)); in order to wait end of send() procedure.
	In tpacket_send, if shutdown is seen, we exit the loop.
	In skb destructor, if shutdown is seen, we call sock_wfree directly().
	I think such loop is better than delaying close procedure. To be sure that if close() has returned, everything is clean into the kernel.

And resize is not allowed if (as we said):
	> - someone has mapped the socket (1.a)
	> - sending is ongoing - tx_pending_skb != 0 (1.b)
	> - a send() procedure is ongoing (1.c)


I've also put pending and busy counters in packet_ring_buffer. Not useful for RX, but I think it's more clean.

PS: For the moment, I'm paranoid so I've added smp_rmb/smp_wmb/and dcache flush for get/set status and also when filling skb !!
(I've lost 10MBytes/s :( on my bench) 
- I'm using skb->mark again to forward buffer index to destructor.
- I've not tested SOCK_DGRAM yet, but I've updated to_write and data contents properly (I hope)

Thanks in advance,
Johann

  Documentation/networking/packet_mmap.txt |  133 +++++++-
 include/linux/if_packet.h                |    1 +
 net/packet/af_packet.c                   |  539 ++++++++++++++++++++++++------
 3 files changed, 557 insertions(+), 116 deletions(-)

diff --git a/Documentation/networking/packet_mmap.txt b/Documentation/networking/packet_mmap.txt
index 07c53d5..38e5fa3 100644
--- a/Documentation/networking/packet_mmap.txt
+++ b/Documentation/networking/packet_mmap.txt
@@ -4,8 +4,8 @@
 
 This file documents the CONFIG_PACKET_MMAP option available with the PACKET
 socket interface on 2.4 and 2.6 kernels. This type of sockets is used for 
-capture network traffic with utilities like tcpdump or any other that uses 
-the libpcap library. 
+capture network traffic with utilities like tcpdump or any other that needs
+raw access to network interface.
 
 You can find the latest version of this document at
 
@@ -14,6 +14,8 @@ You can find the latest version of this document at
 Please send me your comments to
 
     Ulisses Alonso Camaró <uaca@i.hate.spam.alumni.uv.es>
+    Johann Baudy <johann.baudy@gnu-log.net> (TX RING)
+    Vitali Lovitch <vlovich@qualcomm.com> (TX RING)
 
 -------------------------------------------------------------------------------
 + Why use PACKET_MMAP
@@ -25,19 +27,24 @@ to capture each packet, it requires two if you want to get packet's
 timestamp (like libpcap always does).
 
 In the other hand PACKET_MMAP is very efficient. PACKET_MMAP provides a size 
-configurable circular buffer mapped in user space. This way reading packets just 
-needs to wait for them, most of the time there is no need to issue a single 
-system call. By using a shared buffer between the kernel and the user 
-also has the benefit of minimizing packet copies.
-
-It's fine to use PACKET_MMAP to improve the performance of the capture process, 
-but it isn't everything. At least, if you are capturing at high speeds (this 
-is relative to the cpu speed), you should check if the device driver of your 
-network interface card supports some sort of interrupt load mitigation or 
-(even better) if it supports NAPI, also make sure it is enabled.
+configurable circular buffer mapped in user space that can be used to either
+send or receive packets. This way reading packets just needs to wait for them,
+most of the time there is no need to issue a single system call. Concerning
+transmission, multiple packets can be sent through one system call to get the
+highest bandwidth.
+By using a shared buffer between the kernel and the user also has the benefit
+of minimizing packet copies.
+
+It's fine to use PACKET_MMAP to improve the performance of the capture and
+transmission process, but it isn't everything. At least, if you are capturing
+at high speeds (this is relative to the cpu speed), you should check if the
+device driver of your network interface card supports some sort of interrupt
+load mitigation or (even better) if it supports NAPI, also make sure it is
+enabled. For transmission, check the MTU (Maximum Transmission Unit) used and
+supported by devices of your network.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP
++ How to use CONFIG_PACKET_MMAP to improve capture process
 --------------------------------------------------------------------------------
 
 From the user standpoint, you should use the higher level libpcap library, which
@@ -57,7 +64,7 @@ the low level details or want to improve libpcap by including PACKET_MMAP
 support.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP directly
++ How to use CONFIG_PACKET_MMAP directly to improve capture process
 --------------------------------------------------------------------------------
 
 From the system calls stand point, the use of PACKET_MMAP involves
@@ -66,6 +73,7 @@ the following process:
 
 [setup]     socket() -------> creation of the capture socket
             setsockopt() ---> allocation of the circular buffer (ring)
+                              option: PACKET_RX_RING
             mmap() ---------> mapping of the allocated buffer to the
                               user process
 
@@ -97,13 +105,75 @@ also the mapping of the circular buffer in the user process and
 the use of this buffer.
 
 --------------------------------------------------------------------------------
++ How to use CONFIG_PACKET_MMAP directly to improve transmission process
+--------------------------------------------------------------------------------
+Transmission process is similar to capture as shown below.
+
+[setup]          socket() -------> creation of the transmission socket
+                 setsockopt() ---> allocation of the circular buffer (ring)
+                                   option: PACKET_TX_RING
+                 bind() ---------> bind transmission socket with a network interface
+                 mmap() ---------> mapping of the allocated buffer to the
+                                   user process
+
+[transmission]   poll() ---------> wait for free packets (optional)
+                 send() ---------> send all packets that are set as ready in
+                                   the ring
+                                   The flag MSG_DONTWAIT can be used to return
+                                   before end of transfer.
+
+[shutdown]  close() --------> destruction of the transmission socket and
+                              deallocation of all associated resources.
+
+Binding the socket to your network interface is mandatory (with zero copy) to
+know the header size of frames used in the circular buffer.
+
+As capture, each frame contains two parts:
+
+ --------------------
+| struct tpacket_hdr | Header. It contains the status of
+|                    | of this frame
+|--------------------|
+| data buffer        |
+.                    .  Data that will be sent over the network interface.
+.                    .
+ --------------------
+
+ bind() associates the socket to your network interface thanks to
+ sll_ifindex parameter of struct sockaddr_ll.
+
+ Initialization example:
+
+ struct sockaddr_ll my_addr;
+ struct ifreq s_ifr;
+ ...
+
+ strncpy (s_ifr.ifr_name, "eth0", sizeof(s_ifr.ifr_name));
+
+ /* get interface index of eth0 */
+ ioctl(this->socket, SIOCGIFINDEX, &s_ifr);
+
+ /* fill sockaddr_ll struct to prepare binding */
+ my_addr.sll_family = AF_PACKET;
+ my_addr.sll_protocol = ETH_P_ALL;
+ my_addr.sll_ifindex =  s_ifr.ifr_ifindex;
+
+ /* bind socket to eth0 */
+ bind(this->socket, (struct sockaddr *)&my_addr, sizeof(struct sockaddr_ll));
+
+ A complete tutorial is available at: http://wiki.gnu-log.net/
+
+--------------------------------------------------------------------------------
 + PACKET_MMAP settings
 --------------------------------------------------------------------------------
 
 
 To setup PACKET_MMAP from user level code is done with a call like
 
+ - Capture process
      setsockopt(fd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req))
+ - Transmission process
+     setsockopt(fd, SOL_PACKET, PACKET_TX_RING, (void *) &req, sizeof(req))
 
 The most significant argument in the previous call is the req parameter, 
 this parameter must to have the following structure:
@@ -117,11 +187,11 @@ this parameter must to have the following structure:
     };
 
 This structure is defined in /usr/include/linux/if_packet.h and establishes a 
-circular buffer (ring) of unswappable memory mapped in the capture process. 
+circular buffer (ring) of unswappable memory.
 Being mapped in the capture process allows reading the captured frames and 
 related meta-information like timestamps without requiring a system call.
 
-Captured frames are grouped in blocks. Each block is a physically contiguous 
+Frames are grouped in blocks. Each block is a physically contiguous
 region of memory and holds tp_block_size/tp_frame_size frames. The total number 
 of blocks is tp_block_nr. Note that tp_frame_nr is a redundant parameter because
 
@@ -336,6 +406,7 @@ struct tpacket_hdr). If this field is 0 means that the frame is ready
 to be used for the kernel, If not, there is a frame the user can read 
 and the following flags apply:
 
++++ Capture process:
      from include/linux/if_packet.h
 
      #define TP_STATUS_COPY          2 
@@ -391,6 +462,36 @@ packets are in the ring:
 It doesn't incur in a race condition to first check the status value and 
 then poll for frames.
 
+
+++ Transmission process
+Those defines are also used for transmission:
+
+     #define TP_STATUS_KERNEL        0 // Frame is available
+     #define TP_STATUS_USER          1 // Frame will be sent on next send()
+     #define TP_STATUS_COPY          2 // Frame is currently in transmission
+
+First, the kernel initializes all frames to TP_STATUS_KERNEL. To send a packet,
+the user fills a data buffer of an available frame, sets tp_len to current
+data buffer size and sets its status field to TP_STATUS_USER. This can be done
+on multiple frames. Once the user is ready to transmit, it calls send().
+Then all buffers with status equal to TP_STATUS_USER are forwarded to the
+network device. The kernel updates each status of sent frames with
+TP_STATUS_COPY until the end of transfer.
+At the end of each transfer, buffer status returns to TP_STATUS_KERNEL.
+
+    header->tp_len = in_i_size;
+    header->tp_status = TP_STATUS_USER;
+    retval = send(this->socket, NULL, 0, 0);
+
+The user can also use poll() to check if a buffer is available:
+(status == TP_STATUS_KERNEL)
+
+    struct pollfd pfd;
+    pfd.fd = fd;
+    pfd.revents = 0;
+    pfd.events = POLLOUT;
+    retval = poll(&pfd, 1, timeout);
+
 --------------------------------------------------------------------------------
 + THANKS
 --------------------------------------------------------------------------------
diff --git a/include/linux/if_packet.h b/include/linux/if_packet.h
index 18db066..6682963 100644
--- a/include/linux/if_packet.h
+++ b/include/linux/if_packet.h
@@ -46,6 +46,7 @@ struct sockaddr_ll
 #define PACKET_VERSION			10
 #define PACKET_HDRLEN			11
 #define PACKET_RESERVE			12
+#define PACKET_TX_RING			13
 
 struct tpacket_stats
 {
diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index 9099237..2d67a39 100644
diff --git a/net/packet/af_packet.c b/net/packet/af_packet.c
old mode 100644
new mode 100755
index c718e7e..fb776a7
--- a/net/packet/af_packet.c
+++ b/net/packet/af_packet.c
@@ -156,7 +156,26 @@ struct packet_mreq_max
 };
 
 #ifdef CONFIG_PACKET_MMAP
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing);
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req,
+		int closing, int tx_ring);
+
+struct packet_ring_buffer {
+	char *			*pg_vec;
+	unsigned int		head;
+	unsigned int		frames_per_block;
+	unsigned int		frame_size;
+	unsigned int		frame_max;
+
+	unsigned int		pg_vec_order;
+	unsigned int		pg_vec_pages;
+	unsigned int		pg_vec_len;
+
+	atomic_t		busy;
+	atomic_t		pending;
+};
+
+struct packet_sock;
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg);
 #endif
 
 static void packet_flush_mclist(struct sock *sk);
@@ -166,11 +185,9 @@ struct packet_sock {
 	struct sock		sk;
 	struct tpacket_stats	stats;
 #ifdef CONFIG_PACKET_MMAP
-	char *			*pg_vec;
-	unsigned int		head;
-	unsigned int            frames_per_block;
-	unsigned int		frame_size;
-	unsigned int		frame_max;
+	struct packet_ring_buffer	rx_ring;
+	struct packet_ring_buffer	tx_ring;
+	atomic_t	shutdown;
 	int			copy_thresh;
 #endif
 	struct packet_type	prot_hook;
@@ -183,9 +200,6 @@ struct packet_sock {
 	struct packet_mclist	*mclist;
 #ifdef CONFIG_PACKET_MMAP
 	atomic_t		mapped;
-	unsigned int            pg_vec_order;
-	unsigned int		pg_vec_pages;
-	unsigned int		pg_vec_len;
 	enum tpacket_versions	tp_version;
 	unsigned int		tp_hdrlen;
 	unsigned int		tp_reserve;
@@ -204,36 +218,29 @@ struct packet_skb_cb {
 
 #ifdef CONFIG_PACKET_MMAP
 
-static void *packet_lookup_frame(struct packet_sock *po, unsigned int position,
-				 int status)
+static void __packet_set_status(struct packet_sock *po, void *frame, int status)
 {
-	unsigned int pg_vec_pos, frame_offset;
 	union {
 		struct tpacket_hdr *h1;
 		struct tpacket2_hdr *h2;
 		void *raw;
 	} h;
 
-	pg_vec_pos = position / po->frames_per_block;
-	frame_offset = position % po->frames_per_block;
-
-	h.raw = po->pg_vec[pg_vec_pos] + (frame_offset * po->frame_size);
+	h.raw = frame;
 	switch (po->tp_version) {
 	case TPACKET_V1:
-		if (status != h.h1->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL)
-			return NULL;
+		h.h1->tp_status = status;
 		break;
 	case TPACKET_V2:
-		if (status != h.h2->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL)
-			return NULL;
+		h.h2->tp_status = status;
 		break;
 	}
-	return h.raw;
+
+	smp_wmb();
+	flush_dcache_page(virt_to_page(h.raw));
 }
 
-static void __packet_set_status(struct packet_sock *po, void *frame, int status)
+static int __packet_get_status(struct packet_sock *po, void *frame)
 {
 	union {
 		struct tpacket_hdr *h1;
@@ -241,16 +248,71 @@ static void __packet_set_status(struct packet_sock *po, void *frame, int status)
 		void *raw;
 	} h;
 
+	smp_rmb();
+	flush_dcache_page(virt_to_page(h.raw));
+
 	h.raw = frame;
 	switch (po->tp_version) {
 	case TPACKET_V1:
-		h.h1->tp_status = status;
+		return h.h1->tp_status;
 		break;
 	case TPACKET_V2:
-		h.h2->tp_status = status;
+		return h.h2->tp_status;
 		break;
 	}
+	return 0;
+}
+
+static void *packet_lookup_frame(struct packet_sock *po,
+		struct packet_ring_buffer *buff,
+		unsigned int position,
+		int status)
+{
+	unsigned int pg_vec_pos, frame_offset;
+	union {
+		struct tpacket_hdr *h1;
+		struct tpacket2_hdr *h2;
+		void *raw;
+	} h;
+
+	pg_vec_pos = position / buff->frames_per_block;
+	frame_offset = position % buff->frames_per_block;
+
+	h.raw = buff->pg_vec[pg_vec_pos] + (frame_offset * buff->frame_size);
+
+	if( status != __packet_get_status(po, h.raw) )
+		return NULL;
+
+	return h.raw;
+}
+
+static inline void *packet_current_rx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->rx_ring, po->rx_ring.head, status);
+}
+
+static inline void *packet_current_tx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->tx_ring, po->tx_ring.head, status);
+}
+
+static inline void *packet_previous_rx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->rx_ring.head ? po->rx_ring.head - 1 : po->rx_ring.frame_max;
+	return packet_lookup_frame(po, &po->rx_ring, previous, status);
+}
+
+static inline void *packet_previous_tx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->tx_ring.head ? po->tx_ring.head - 1 : po->tx_ring.frame_max;
+	return packet_lookup_frame(po, &po->tx_ring, previous, status);
+}
+
+static inline void packet_increment_head(struct packet_ring_buffer *buff)
+{
+	buff->head = buff->head != buff->frame_max ? buff->head+1 : 0;
 }
+
 #endif
 
 static inline struct packet_sock *pkt_sk(struct sock *sk)
@@ -646,7 +708,7 @@ static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 		macoff = netoff - maclen;
 	}
 
-	if (macoff + snaplen > po->frame_size) {
+	if (macoff + snaplen > po->rx_ring.frame_size) {
 		if (po->copy_thresh &&
 		    atomic_read(&sk->sk_rmem_alloc) + skb->truesize <
 		    (unsigned)sk->sk_rcvbuf) {
@@ -659,16 +721,16 @@ static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 			if (copy_skb)
 				skb_set_owner_r(copy_skb, sk);
 		}
-		snaplen = po->frame_size - macoff;
+		snaplen = po->rx_ring.frame_size - macoff;
 		if ((int)snaplen < 0)
 			snaplen = 0;
 	}
 
 	spin_lock(&sk->sk_receive_queue.lock);
-	h.raw = packet_lookup_frame(po, po->head, TP_STATUS_KERNEL);
+	h.raw = packet_current_rx_frame(po, TP_STATUS_KERNEL);
 	if (!h.raw)
 		goto ring_is_full;
-	po->head = po->head != po->frame_max ? po->head+1 : 0;
+	packet_increment_head(&po->rx_ring);
 	po->stats.tp_packets++;
 	if (copy_skb) {
 		status |= TP_STATUS_COPY;
@@ -725,7 +787,6 @@ static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 
 	__packet_set_status(po, h.raw, status);
 	smp_mb();
-
 	{
 		struct page *p_start, *p_end;
 		u8 *h_end = h.raw + macoff + snaplen - 1;
@@ -759,10 +820,237 @@ ring_is_full:
 	goto drop_n_restore;
 }
 
-#endif
+static void tpacket_destruct_skb(struct sk_buff *skb)
+{
+	struct packet_sock *po = pkt_sk(skb->sk);
+	void * ph;
 
+	BUG_ON(skb == NULL);
 
-static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+	if (unlikely(atomic_read(&po->shutdown)))
+		goto out_wfree;
+
+	ph = packet_lookup_frame(po, &po->tx_ring, skb->mark, TP_STATUS_COPY);
+	BUG_ON(atomic_read(&po->tx_ring.pending) == 0);
+	atomic_dec(&po->tx_ring.pending);
+	__packet_set_status(po, ph, TP_STATUS_KERNEL);
+
+out_wfree:
+	sock_wfree(skb);
+}
+
+static int tpacket_fill_skb(struct packet_sock *po, struct sk_buff * skb, void * frame,
+		struct net_device *dev, int size_max, __be16 proto,
+		unsigned char * addr)
+{
+	union {
+		struct tpacket_hdr *h1;
+		struct tpacket2_hdr *h2;
+		void *raw;
+	} ph;
+	int to_write, offset, len, tp_len, nr_frags;
+	struct socket *sock = po->sk.sk_socket;
+	struct page *page;
+	void *data;
+	int err;
+
+	ph.raw = frame;
+
+	skb->protocol = proto;
+	skb->dev = dev;
+	skb->priority = po->sk.sk_priority;
+	skb->mark = po->tx_ring.head;
+
+	switch(po->tp_version) {
+	case TPACKET_V2:
+		tp_len = ph.h1->tp_len;
+		break;
+	default:
+		tp_len = ph.h1->tp_len;
+		break;
+	}
+	if (unlikely(tp_len > size_max)) {
+		printk(KERN_ERR "packet size is too long (%d > %d)\n",
+				tp_len, size_max);
+		return -EMSGSIZE;
+	}
+
+	skb_reserve(skb, LL_RESERVED_SPACE(dev));
+	skb_reset_network_header(skb);
+
+	data = ph.raw + po->tp_hdrlen;
+	to_write = tp_len;
+
+	if (sock->type == SOCK_DGRAM) {
+		err = dev_hard_header(skb, dev, ntohs(proto), addr,
+				NULL, tp_len);
+		if (unlikely(err < 0))
+			return -EINVAL;
+	} else if (dev->hard_header_len ) {
+		/* net device doesn't like empty head */
+		if(unlikely(tp_len <= dev->hard_header_len)) {
+			printk(KERN_ERR "packet size is too short "
+					"(%d < %d)\n", tp_len,
+					dev->hard_header_len);
+			return -EINVAL;
+		}
+
+		skb_push(skb, dev->hard_header_len);
+		err = skb_store_bits(skb, 0, data,
+				dev->hard_header_len);
+		if (unlikely(err))
+			return err;
+
+		data += dev->hard_header_len;
+		to_write -= dev->hard_header_len;
+	}
+
+	err = -EFAULT;
+	page = virt_to_page(data);
+	len = ((to_write > PAGE_SIZE) ? PAGE_SIZE : to_write);
+
+	offset = (int)((long)data & (~PAGE_MASK));
+	len -= offset;
+
+	skb->data_len = to_write;
+	skb->len += to_write;
+
+	while ( likely(to_write) ) {
+		nr_frags = skb_shinfo(skb)->nr_frags;
+
+		if(unlikely(nr_frags >= MAX_SKB_FRAGS)) {
+			printk(KERN_ERR "Packet exceed the number "
+					"of skb frags(%lu)\n",
+					MAX_SKB_FRAGS);
+			return -EFAULT;
+		}
+
+		flush_dcache_page(page);
+		get_page(page);
+		skb_fill_page_desc(skb,
+				nr_frags,
+				page++, offset, len);
+		to_write -= len;
+		len = (to_write > PAGE_SIZE) ? PAGE_SIZE : to_write;
+		offset = 0;
+	}
+
+	return tp_len;
+}
+
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg)
+{
+	struct socket *sock;
+	struct sk_buff *skb;
+	struct net_device *dev;
+	__be16 proto;
+	int ifindex, err, reserve = 0;
+	void * ph;
+	struct sockaddr_ll *saddr=(struct sockaddr_ll *)msg->msg_name;
+	int tp_len, size_max;
+	unsigned char *addr;
+	int len_sum = 0;
+	int status = 0;
+
+	BUG_ON(po == NULL);
+	sock = po->sk.sk_socket;
+
+	err = -EBUSY;
+	if(!atomic_dec_and_test(&po->tx_ring.busy))
+		goto out;
+
+	if (saddr == NULL) {
+		ifindex	= po->ifindex;
+		proto	= po->num;
+		addr	= NULL;
+	} else {
+		err = -EINVAL;
+		if (msg->msg_namelen < sizeof(struct sockaddr_ll))
+			goto out;
+		if (msg->msg_namelen < (saddr->sll_halen + offsetof(struct sockaddr_ll, sll_addr)))
+			goto out;
+		ifindex	= saddr->sll_ifindex;
+		proto	= saddr->sll_protocol;
+		addr	= saddr->sll_addr;
+	}
+
+	dev = dev_get_by_index(sock_net(&po->sk), ifindex);
+	err = -ENXIO;
+	if (unlikely(dev == NULL))
+		goto out;
+
+	err = -EINVAL;
+	if (unlikely(sock->type != SOCK_RAW))
+		goto out_put;
+
+	reserve = dev->hard_header_len;
+
+	err = -ENETDOWN;
+	if (unlikely(!(dev->flags & IFF_UP)))
+		goto out_put;
+
+	size_max = po->tx_ring.frame_size - sizeof(struct skb_shared_info)
+		- po->tp_hdrlen - LL_ALLOCATED_SPACE(dev);
+
+	if (size_max > dev->mtu + reserve)
+		size_max = dev->mtu + reserve;
+
+	do
+	{
+		ph = packet_current_tx_frame(po, TP_STATUS_USER);
+		if(unlikely(ph == NULL)) {
+			continue;
+		}
+
+		status = TP_STATUS_USER;
+		skb = sock_alloc_send_skb(&po->sk, LL_ALLOCATED_SPACE(dev),
+				msg->msg_flags & MSG_DONTWAIT, &err);
+		if (unlikely(skb == NULL))
+			goto out_status;
+
+		status = TP_STATUS_LOSING;
+		tp_len = tpacket_fill_skb(po, skb, ph, dev, size_max, proto,
+				addr);
+		if(unlikely(tp_len < 0)) {
+			err = tp_len;
+			goto out_status;
+		}
+
+		skb->destructor = tpacket_destruct_skb;
+		__packet_set_status(po, ph, TP_STATUS_COPY);
+		atomic_inc(&po->tx_ring.pending);
+
+		status = TP_STATUS_USER;
+		err = dev_queue_xmit(skb);
+		if (unlikely(err > 0 && (err = net_xmit_errno(err)) != 0))
+			goto out_xmit;
+		packet_increment_head(&po->tx_ring);
+		len_sum += tp_len;
+	}
+	while(likely((!atomic_read(&po->shutdown)) &&
+				((ph != NULL)
+				 || ((!(msg->msg_flags & MSG_DONTWAIT))
+					 && (atomic_read(&po->tx_ring.pending))))
+		    ));
+
+	err = len_sum;
+	goto out_put;
+
+out_xmit:
+	skb->destructor = sock_wfree;
+	atomic_dec(&po->tx_ring.pending);
+out_status:
+	__packet_set_status(po, ph, status);
+	kfree_skb(skb);
+out_put:
+	dev_put(dev);
+out:
+	atomic_inc(&po->tx_ring.busy);
+	return err;
+}
+#endif
+
+static int packet_snd(struct socket *sock,
 			  struct msghdr *msg, size_t len)
 {
 	struct sock *sk = sock->sk;
@@ -853,6 +1141,19 @@ out:
 	return err;
 }
 
+static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+		struct msghdr *msg, size_t len)
+{
+#ifdef CONFIG_PACKET_MMAP
+	struct sock *sk = sock->sk;
+	struct packet_sock *po = pkt_sk(sk);
+	if (po->tx_ring.pg_vec)
+		return tpacket_snd(po, msg);
+	else
+#endif
+		return packet_snd(sock, msg, len);
+}
+
 /*
  *	Close a PACKET socket. This is fairly simple. We immediately go
  *	to 'closed' state and remove our protocol entry in the device list.
@@ -891,10 +1192,17 @@ static int packet_release(struct socket *sock)
 	packet_flush_mclist(sk);
 
 #ifdef CONFIG_PACKET_MMAP
-	if (po->pg_vec) {
+	{
 		struct tpacket_req req;
 		memset(&req, 0, sizeof(req));
-		packet_set_ring(sk, &req, 1);
+		atomic_set(&po->shutdown, 1);
+
+		if (po->rx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 0);
+
+		while(unlikely(atomic_read(&po->tx_ring.busy) != 1));
+		if (po->tx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 1);
 	}
 #endif
 
@@ -1082,6 +1390,8 @@ static int packet_create(struct net *net, struct socket *sock, int protocol)
 		po->running = 1;
 	}
 
+	atomic_set(&po->tx_ring.busy, 1);
+
 	write_lock_bh(&net->packet.sklist_lock);
 	sk_add_node(sk, &net->packet.sklist);
 	write_unlock_bh(&net->packet.sklist_lock);
@@ -1411,6 +1721,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 #ifdef CONFIG_PACKET_MMAP
 	case PACKET_RX_RING:
+	case PACKET_TX_RING:
 	{
 		struct tpacket_req req;
 
@@ -1418,7 +1729,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 			return -EINVAL;
 		if (copy_from_user(&req,optval,sizeof(req)))
 			return -EFAULT;
-		return packet_set_ring(sk, &req, 0);
+		return packet_set_ring(sk, &req, 0, optname == PACKET_TX_RING);
 	}
 	case PACKET_COPY_THRESH:
 	{
@@ -1438,7 +1749,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1457,7 +1768,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1701,13 +2012,17 @@ static unsigned int packet_poll(struct file * file, struct socket *sock,
 	unsigned int mask = datagram_poll(file, sock, wait);
 
 	spin_lock_bh(&sk->sk_receive_queue.lock);
-	if (po->pg_vec) {
-		unsigned last = po->head ? po->head-1 : po->frame_max;
-
-		if (packet_lookup_frame(po, last, TP_STATUS_USER))
+	if (po->rx_ring.pg_vec) {
+		if (packet_previous_rx_frame(po, TP_STATUS_USER))
 			mask |= POLLIN | POLLRDNORM;
 	}
 	spin_unlock_bh(&sk->sk_receive_queue.lock);
+	spin_lock_bh(&sk->sk_write_queue.lock);
+	if (po->tx_ring.pg_vec) {
+		if (packet_current_tx_frame(po, TP_STATUS_KERNEL))
+			mask |= POLLOUT | POLLWRNORM;
+	}
+	spin_unlock_bh(&sk->sk_write_queue.lock);
 	return mask;
 }
 
@@ -1783,21 +2098,34 @@ out_free_pgvec:
 	goto out;
 }
 
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing)
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing, int tx_ring)
 {
 	char **pg_vec = NULL;
 	struct packet_sock *po = pkt_sk(sk);
-	int was_running, order = 0;
+	int was_running, busy, order = 0;
+	struct packet_ring_buffer *rb;
+	struct sk_buff_head *rb_queue;
 	__be16 num;
-	int err = 0;
+	int err;
 
-	if (req->tp_block_nr) {
-		int i;
+	rb = tx_ring ? &po->tx_ring : &po->rx_ring;
+	rb_queue = tx_ring ? &sk->sk_write_queue : &sk->sk_receive_queue;
 
-		/* Sanity tests and some calculations */
+	err = -EBUSY;
+	busy = !atomic_dec_and_test(&rb->busy);
 
-		if (unlikely(po->pg_vec))
-			return -EBUSY;
+	if(!closing) {
+		if (atomic_read(&po->mapped))
+			goto out;
+		if (busy || atomic_read(&rb->pending))
+			goto out;
+	}
+
+	if (req->tp_block_nr) {
+		/* Sanity tests and some calculations */
+		err = -EBUSY;
+		if (unlikely(rb->pg_vec))
+			goto out;
 
 		switch (po->tp_version) {
 		case TPACKET_V1:
@@ -1808,42 +2136,35 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 			break;
 		}
 
+		err = -EINVAL;
 		if (unlikely((int)req->tp_block_size <= 0))
-			return -EINVAL;
+			goto out;
 		if (unlikely(req->tp_block_size & (PAGE_SIZE - 1)))
-			return -EINVAL;
+			goto out;
 		if (unlikely(req->tp_frame_size < po->tp_hdrlen +
-						  po->tp_reserve))
-			return -EINVAL;
+					po->tp_reserve))
+			goto out;
 		if (unlikely(req->tp_frame_size & (TPACKET_ALIGNMENT - 1)))
-			return -EINVAL;
+			goto out;
 
-		po->frames_per_block = req->tp_block_size/req->tp_frame_size;
-		if (unlikely(po->frames_per_block <= 0))
-			return -EINVAL;
-		if (unlikely((po->frames_per_block * req->tp_block_nr) !=
-			     req->tp_frame_nr))
-			return -EINVAL;
+		rb->frames_per_block = req->tp_block_size/req->tp_frame_size;
+		if (unlikely(rb->frames_per_block <= 0))
+			goto out;
+		if (unlikely((rb->frames_per_block * req->tp_block_nr) !=
+					req->tp_frame_nr))
+			goto out;
 
 		err = -ENOMEM;
 		order = get_order(req->tp_block_size);
 		pg_vec = alloc_pg_vec(req, order);
 		if (unlikely(!pg_vec))
 			goto out;
-
-		for (i = 0; i < req->tp_block_nr; i++) {
-			void *ptr = pg_vec[i];
-			int k;
-
-			for (k = 0; k < po->frames_per_block; k++) {
-				__packet_set_status(po, ptr, TP_STATUS_KERNEL);
-				ptr += req->tp_frame_size;
-			}
-		}
-		/* Done */
-	} else {
+	}
+	/* Done */
+	else {
+		err = -EINVAL;
 		if (unlikely(req->tp_frame_nr))
-			return -EINVAL;
+			goto out;
 	}
 
 	lock_sock(sk);
@@ -1866,20 +2187,19 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 	if (closing || atomic_read(&po->mapped) == 0) {
 		err = 0;
 #define XC(a, b) ({ __typeof__ ((a)) __t; __t = (a); (a) = (b); __t; })
-
-		spin_lock_bh(&sk->sk_receive_queue.lock);
-		pg_vec = XC(po->pg_vec, pg_vec);
-		po->frame_max = (req->tp_frame_nr - 1);
-		po->head = 0;
-		po->frame_size = req->tp_frame_size;
-		spin_unlock_bh(&sk->sk_receive_queue.lock);
-
-		order = XC(po->pg_vec_order, order);
-		req->tp_block_nr = XC(po->pg_vec_len, req->tp_block_nr);
-
-		po->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
-		po->prot_hook.func = po->pg_vec ? tpacket_rcv : packet_rcv;
-		skb_queue_purge(&sk->sk_receive_queue);
+		spin_lock_bh(&rb_queue->lock);
+		pg_vec = XC(rb->pg_vec, pg_vec);
+		rb->frame_max = (req->tp_frame_nr - 1);
+		rb->head = 0;
+		rb->frame_size = req->tp_frame_size;
+		spin_unlock_bh(&rb_queue->lock);
+
+		order = XC(rb->pg_vec_order, order);
+		req->tp_block_nr = XC(rb->pg_vec_len, req->tp_block_nr);
+
+		rb->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
+		po->prot_hook.func = (po->rx_ring.pg_vec) ? tpacket_rcv : packet_rcv;
+		skb_queue_purge(rb_queue);
 #undef XC
 		if (atomic_read(&po->mapped))
 			printk(KERN_DEBUG "packet_mmap: vma is busy: %d\n", atomic_read(&po->mapped));
@@ -1898,7 +2218,10 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 
 	if (pg_vec)
 		free_pg_vec(pg_vec, order, req->tp_block_nr);
+
+	err = 0;
 out:
+	atomic_inc(&rb->busy);
 	return err;
 }
 
@@ -1906,7 +2229,8 @@ static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 {
 	struct sock *sk = sock->sk;
 	struct packet_sock *po = pkt_sk(sk);
-	unsigned long size;
+	unsigned long size, expected_size;
+	struct packet_ring_buffer *rb;
 	unsigned long start;
 	int err = -EINVAL;
 	int i;
@@ -1917,23 +2241,38 @@ static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 	size = vma->vm_end - vma->vm_start;
 
 	lock_sock(sk);
-	if (po->pg_vec == NULL)
+
+	expected_size = 0;
+	if (po->rx_ring.pg_vec)
+		expected_size += po->rx_ring.pg_vec_len * po->rx_ring.pg_vec_pages * PAGE_SIZE;
+	if (po->tx_ring.pg_vec)
+		expected_size += po->tx_ring.pg_vec_len * po->tx_ring.pg_vec_pages * PAGE_SIZE;
+
+	if (expected_size == 0)
 		goto out;
-	if (size != po->pg_vec_len*po->pg_vec_pages*PAGE_SIZE)
+
+	if (size != expected_size)
 		goto out;
 
 	start = vma->vm_start;
-	for (i = 0; i < po->pg_vec_len; i++) {
-		struct page *page = virt_to_page(po->pg_vec[i]);
-		int pg_num;
-
-		for (pg_num = 0; pg_num < po->pg_vec_pages; pg_num++, page++) {
-			err = vm_insert_page(vma, start, page);
-			if (unlikely(err))
-				goto out;
-			start += PAGE_SIZE;
+
+	for(rb = &po->rx_ring; rb <= &po->tx_ring; rb++) {
+		if (rb->pg_vec == NULL)
+			continue;
+
+		for (i = 0; i < rb->pg_vec_len; i++) {
+			struct page *page = virt_to_page(rb->pg_vec[i]);
+			int pg_num;
+
+			for (pg_num = 0; pg_num < rb->pg_vec_pages; pg_num++, page++) {
+				err = vm_insert_page(vma, start, page);
+				if (unlikely(err))
+					goto out;
+				start += PAGE_SIZE;
+			}
 		}
 	}
+
 	atomic_inc(&po->mapped);
 	vma->vm_ops = &packet_mmap_ops;
 	err = 0;


Thanks,
Johann
-- 
Johann Baudy
johaahn@gmail.com


^ permalink raw reply related	[flat|nested] 59+ messages in thread
* Re: [PATCH] Packet socket: mmapped IO: PACKET_TX_RING
@ 2008-11-05 15:16 Johann Baudy
  2008-11-05 17:49 ` Lovich, Vitali
  0 siblings, 1 reply; 59+ messages in thread
From: Johann Baudy @ 2008-11-05 15:16 UTC (permalink / raw)
  To: Patrick McHardy; +Cc: Lovich, Vitali, netdev@vger.kernel.org, Evgeniy Polyakov

Hi Patrick,


> The reason why its not in there is to avoid the unnecessary
> barrier when setting up the ring in packet_set_ring().

Is it a serious problem? packet_set_status() and packet_get_status()
will be called from multiple places and we will obtain a write/read
barrier at each call. I imagine that's more clean to insert those
barriers inside status functions even if we add only one useless flush
at the beginning. No?

>
> The code is fishy though, I agree. Have a look at this discussion:
>
> http://marc.info/?l=linux-netdev&m=119383985207984&w=2
>
My approach was to use the mechanism of RX_RING where the kernel is
allocating the buffer. In order to get something similar from kernel and
user point of view. Moving kernel buffer to user space could be studied
as global enhancement for RX_RING and TX_RING but that will need user
application changes.

I'm wondering if I also need to use flush_dcache_page() before getting
status buffer and attaching circular buffer pages to skb (both written
by user). 
According to this quote in [cachetlb.txt] of flush_dcache_page part:
	Any time the kernel writes to a page cache page, _OR_
	the kernel is about to read from a page cache page and
	user space shared/writable mappings of this page potentially
	exist, this routine is called.
 	...
	The phrase "kernel writes to a page cache page" means,
	...
	The corollary case is just as important, if there are users
	which have shared+writable mappings of this file, we must make
	sure that kernel reads of these pages will see the most recent
	stores done by the user. 

Can someone confirm?

Thanks in advance,
Johann











^ permalink raw reply	[flat|nested] 59+ messages in thread
* Re: [PATCH] Packet socket: mmapped IO: PACKET_TX_RING
@ 2008-11-05 10:55 Johann Baudy
  2008-11-05 11:02 ` Patrick McHardy
  2008-11-05 17:32 ` Lovich, Vitali
  0 siblings, 2 replies; 59+ messages in thread
From: Johann Baudy @ 2008-11-05 10:55 UTC (permalink / raw)
  To: Lovich, Vitali; +Cc: netdev@vger.kernel.org, Evgeniy Polyakov

Hi Vitali,

> Right, but check out http://www.mjmwired.net/kernel/Documentation/memory-barriers.txt - namely the implicit nature of ULOCK operations, IMPLICIT KERNEL MEMORY BARRIERS, & ATOMIC OPERATIONS.
>
> However, I'm not sure why currently in the code it's a full memory barrier after __packet_set_status - it could be downgraded to a write barrier.

So you assume an implicit read or data dep barrier before packet_current_rx_frame()?

>
> Also, there is a bug in the current code I believe:
>
> Around line 726:
>
> @@ -723,8 +723,9 @@ static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
>        else
>                sll->sll_ifindex = dev->ifindex;
>
> +       smp_wmb();
>        __packet_set_status(po, h.raw, status);
> -       smp_mb();
> +       smp_wmb();
>
>        {
>                struct page *p_start, *p_end;
>
> We only really need 1 memory barrier: for architectures where flush_dcache_page is a no-op and writes may be re-ordered, we need the smp_wmb before (if writes may not be re-ordered, ala x86, then a simple barrier() will suffice).  With architectures that actually need the data flushed, we put it after the __packet_set_status (and this can't be a simple barrier).  However, to generically do it, we do both (I couldn't find a pre-processor that gets defined to indicate if flush_dcache_page is no-op or actually does something).  If we don't do it this way it's possible (however unlikely), that the status is set before the data is filled in correctly.  This'll come up if the compiler moves the set status around or the CPU re-orders writes and flush_dcache_page is a no-op.
>
> Should I submit this as a patch, or am I missing something?
>
> "I'm afraid that once a barrier discussion comes up and we insert them, then I become dazedly paranoid and it's very hard to shake me from seeing a need for barriers everywhere, including a barrier before and after every barrier ad infinitum to make sure they're really barriers."
> -- Hugh Dickins
>
> Vitali
>
If my understanding is correct, smp_wmb should ensure that "all the
STORE operations specified before the barrier will appear to happen
before all the STORE operations specified after the barrier with respect
to the other components of the system.". 
I think that those barriers address reordering effects at the hardware
level and depend on architecture. But again, I'm not an expert of this.

Moreover if we look into Kernel code, most of the time, smp_wmb() is
used immediately after shared data update . (not before)
(ex:kernel/marker.c, kernel/trace/ftrace.c, ...).

I would like to integrate this barrier mechanism in __packet_[set|
get]_status() function. Thoughts?

Best regards,
Johann











-- 
Johann Baudy
johaahn@gmail.com




^ permalink raw reply	[flat|nested] 59+ messages in thread
* Re: [PATCH] Packet socket: mmapped IO: PACKET_TX_RING
@ 2008-10-31 10:58 Johann Baudy
  2008-10-31 17:07 ` Lovich, Vitali
  0 siblings, 1 reply; 59+ messages in thread
From: Johann Baudy @ 2008-10-31 10:58 UTC (permalink / raw)
  To: Lovich, Vitali; +Cc: David Miller, netdev@vger.kernel.org

Hi Vitali,

> There's no need to keep the index (packet_index).  Just store the
pointer directly (change it to void *) - saves an extra lookup.  

Indeed, it will be faster. I'll do the change.

> Also, I don't think setting TP_STATUS_COPY is necessary since the user
> can't really do anything with that information. Simply leave it at
> TP_STATUS_USER & TP_STATUS_KERNEL.

This information is not useful for user. It is to prevent kernel from
sending a packet twice or more. Inside the tx ring lock, queued packets
must be tagged as "Kernel has already handled this packet" to not send
it again at next turn of tx ring. 
(That case can happen if device/queue is very slow or if you have only
few frames)

> The atomic_inc of pending_skb should happen after the skb was
> allocated - the atomic_dec in out_status can also be removed. 
> out_status can be removed completely if you get rid of
> TP_STATUS_COPY.  If you leave it, you still need the barriers as above
> after changing tp_status, or the user may not see the change.  

I don't understand why "atomic_inc of pending_skb should happen after
the skb was allocated". This counter is used to monitor the number of TX
packets queued. So as requirement, we have to increment it before
dev_queue_xmit().

atomic_dec() will be needed anyway if tpacket_fill_skb() or
dev_queue_xmit() are failing (If performed after skb alloc).

> Also, you've got a potential threading issue - you're not protecting
> the frame index behind the spinlock.

You are right, I think I will spin-lock outside the do_while loop.

> Also, when you set the status back to TP_STATUS_KERNEL in the
destructor, you need
>  to add the following barriers:
>
> __packet_set_status(po, ph, TP_STATUS_KERNEL);
> smp_mb();   // make sure the TP_STATUS_KERNEL was actually written to
> memory before this - couldn't this actually be just a smp_wmb?
> flush_dcache_page(virt_to_page(ph));  // on non-x86 architectures like
> ARM that have a moronic cache (i.e cache by virtual rather than
> physical address). on x86 this is a noop.
>

So, If my understanding of those memory barriers is correct, we should
have a smp_rmb() before status reading and smp_wmb() after status
writing in skb destructor and send procedure.

> Also, I think that I looked at your code while working on my version
> and there may have been some logical problems with the way you're
> building packets.  I'll compare it within the next few days as I start
> cleaning up my code.

I've noticed "data += dev->hard_header_len; to_write -=
dev->hard_header_len;" that must be in (sock->type != SOCK_DGRAM)
condition. 

> As a benchmark on a 10G card (and not performing any optimizations
> like using syspart & dedicating a cpu for tx), I was able to hit 8.6
> GBits/s using a dedicated kernel thread for the transmission.  With
> the dedicated CPU, I'm confident the line-rate will go up
> significantly
>
> .I'll try to test your changes within the next few days.  TCPdump
> maxes out at around 1.5 GBits/s
>
> As for CPU usage, there's a noticeable advantage to traditional send. 
> Using tcpreplay  - there's about 80% CPU utilization when sending. 
> Using the tx ring, there's maybe 10-15%.
>

On my side, I'm using a 1G device with a PPC405.
I've reached 107MBytes/s with TX RING against 25MBytes/S with standard
packet socket raw and 107MBytes with pktgen.

> I'm going to have latency numbers soon as well (i.e. how much jitter
> is introduced by the kernel).
>

Many thanks Vitali for your comments and help :)


-- 
Johann Baudy
johaahn@gmail.com




^ permalink raw reply	[flat|nested] 59+ messages in thread
* [PATCH] Packet socket: mmapped IO: PACKET_TX_RING
@ 2008-10-27  9:33 Johann Baudy
  2008-10-28 22:44 ` David Miller
  0 siblings, 1 reply; 59+ messages in thread
From: Johann Baudy @ 2008-10-27  9:33 UTC (permalink / raw)
  To: netdev

New packet socket feature that makes packet socket more efficient for transmission.
- It reduces number of system call through a PACKET_TX_RING mechanism, based on PACKET_RX_RING (Circular buffer allocated in kernel space which is mmapped from user space).
- It minimizes CPU copy using fragmented SKB.

Signed-off-by: Johann Baudy <johann.baudy@gnu-log.net>

--
Hi All,

Before going on submitting process, I would like to get your opinion on those below points:

1#:
In order to make such feature, each packet header of the circular buffer can be set to one of those three statuses:
- TP_STATUS_USER: packet buffer needs to be transmitted. 
On this status, Kernel: 
 - Allocates a new skb
 - Attaches packet buffer pages 
 - Changes skb destructor 
 - Stores packet header pointer or index into skb 
 - Changes the packet header status to TP_STATUS_COPY;
 - Sends it to device.

- TP_STATUS_COPY: packet transmission is ongoing
On this status, the skb destructor (called during skb release)
 - Gets packet header pointer linked to skb pointer;
 - Changes the packet header status to TP_STATUS_KERNEL;

- TP_STATUS_KERNEL: packet buffer has been transmitted and buffer is ready for user.

As you can see, this skb destructor needs packet header pointer related to sk_buff pointer to change header status. 
I've first used skb->cb (control buffer) to forward this parameter to destructor. 
Unfortunately, this one seems to be overwritten in packet queue mechanism. So I've finally chosen skb->mark to store buffer index.

Can I use skb->mark in such condition?
If not, what is the best solution:
- new field in sk_buff struct?
- a sk_buff pointer array [NB_FRAME] (one index matches to one skb pointer)? (parsing is not really good for cpu load)
- other ?

2#:
Do I need to protect send() procedure with some locks? 
Especially when changing status from TP_STATUS_USER to TP_STATUS_COPY to prevent kernel from sending a packet buffer twice? 
(If two send() are called from different threads In SMP for example)
Or is it implicit?

Thanks in advance,
Johann Baudy

--
  Documentation/networking/packet_mmap.txt |  132 ++++++++--
 include/linux/if_packet.h                |    3 +-
 net/packet/af_packet.c                   |  422 +++++++++++++++++++++++++-----
 3 files changed, 470 insertions(+), 87 deletions(-)

diff --git a/Documentation/networking/packet_mmap.txt b/Documentation/networking/packet_mmap.txt
index 07c53d5..32f5c33 100644
--- a/Documentation/networking/packet_mmap.txt
+++ b/Documentation/networking/packet_mmap.txt
@@ -4,8 +4,8 @@
 
 This file documents the CONFIG_PACKET_MMAP option available with the PACKET
 socket interface on 2.4 and 2.6 kernels. This type of sockets is used for 
-capture network traffic with utilities like tcpdump or any other that uses 
-the libpcap library. 
+capture network traffic with utilities like tcpdump or any other that needs
+raw acces to network interface.
 
 You can find the latest version of this document at
 
@@ -14,6 +14,7 @@ You can find the latest version of this document at
 Please send me your comments to
 
     Ulisses Alonso Camaró <uaca@i.hate.spam.alumni.uv.es>
+    Johann Baudy <johann.baudy@gnu-log.net> (TX RING)
 
 -------------------------------------------------------------------------------
 + Why use PACKET_MMAP
@@ -25,19 +26,24 @@ to capture each packet, it requires two if you want to get packet's
 timestamp (like libpcap always does).
 
 In the other hand PACKET_MMAP is very efficient. PACKET_MMAP provides a size 
-configurable circular buffer mapped in user space. This way reading packets just 
-needs to wait for them, most of the time there is no need to issue a single 
-system call. By using a shared buffer between the kernel and the user 
-also has the benefit of minimizing packet copies.
-
-It's fine to use PACKET_MMAP to improve the performance of the capture process, 
-but it isn't everything. At least, if you are capturing at high speeds (this 
-is relative to the cpu speed), you should check if the device driver of your 
-network interface card supports some sort of interrupt load mitigation or 
-(even better) if it supports NAPI, also make sure it is enabled.
+configurable circular buffer mapped in user space that can be used to either
+send or receive packets. This way reading packets just needs to wait for them,
+most of the time there is no need to issue a single system call. Concerning
+transmission, multiple packets can be sent through one system call to get the
+highest bandwidth.
+By using a shared buffer between the kernel and the user also has the benefit
+of minimizing packet copies.
+
+It's fine to use PACKET_MMAP to improve the performance of the capture and
+transmission process, but it isn't everything. At least, if you are capturing
+at high speeds (this is relative to the cpu speed), you should check if the
+device driver of your network interface card supports some sort of interrupt
+load mitigation or (even better) if it supports NAPI, also make sure it is
+enabled. For transmission, check the MTU (Maximum Transmission Unit) used and
+supported by devices of your network.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP
++ How to use CONFIG_PACKET_MMAP to improve capture process
 --------------------------------------------------------------------------------
 
 From the user standpoint, you should use the higher level libpcap library, which
@@ -57,7 +63,7 @@ the low level details or want to improve libpcap by including PACKET_MMAP
 support.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP directly
++ How to use CONFIG_PACKET_MMAP directly to improve capture porcess
 --------------------------------------------------------------------------------
 
 From the system calls stand point, the use of PACKET_MMAP involves
@@ -66,6 +72,7 @@ the following process:
 
 [setup]     socket() -------> creation of the capture socket
             setsockopt() ---> allocation of the circular buffer (ring)
+                              option: PACKET_RX_RING
             mmap() ---------> mapping of the allocated buffer to the
                               user process
 
@@ -97,13 +104,75 @@ also the mapping of the circular buffer in the user process and
 the use of this buffer.
 
 --------------------------------------------------------------------------------
++ How to use CONFIG_PACKET_MMAP directly to improve transmission process
+--------------------------------------------------------------------------------
+Transmission process is similar to capture as shown below.
+
+[setup]          socket() -------> creation of the transmission socket
+                 setsockopt() ---> allocation of the circular buffer (ring)
+                                   option: PACKET_TX_RING
+                 bind() ---------> bind transmission socket with a network interface
+                 mmap() ---------> mapping of the allocated buffer to the
+                                   user process
+
+[transmission]   poll() ---------> wait for free packets (optional)
+                 send() ---------> send all packets that are set as ready in
+                                   the ring
+                                   The flag MSG_DONTWAIT can be used to return
+                                   before end of transfer.
+
+[shutdown]  close() --------> destruction of the transmission socket and
+                              deallocation of all associated resources.
+
+Binding the socket to your network interface is mandatory (with zero copy) to
+know the header size of frames used in the circular buffer.
+
+As capture, each frame contains two parts:
+
+ --------------------
+| struct tpacket_hdr | Header. It contains the status of
+|                    | of this frame
+|--------------------|
+| data buffer        |
+.                    .  Data that will be sent over the network interface.
+.                    .
+ --------------------
+
+ bind() associates the socket to your network interface thanks to
+ sll_ifindex parameter of struct sockaddr_ll.
+
+ Initialization example:
+
+ struct sockaddr_ll my_addr;
+ struct ifreq s_ifr;
+ ...
+
+ strncpy (s_ifr.ifr_name, "eth0", sizeof(s_ifr.ifr_name));
+
+ /* get interface index of eth0 */
+ ioctl(this->socket, SIOCGIFINDEX, &s_ifr);
+
+ /* fill sockaddr_ll struct to prepare binding */
+ my_addr.sll_family = AF_PACKET;
+ my_addr.sll_protocol = ETH_P_ALL;
+ my_addr.sll_ifindex =  s_ifr.ifr_ifindex;
+
+ /* bind socket to eth0 */
+ bind(this->socket, (struct sockaddr *)&my_addr, sizeof(struct sockaddr_ll));
+
+ A complete tutorial is available at: http://wiki.gnu-log.net/
+
+--------------------------------------------------------------------------------
 + PACKET_MMAP settings
 --------------------------------------------------------------------------------
 
 
 To setup PACKET_MMAP from user level code is done with a call like
 
+ - Capture process
      setsockopt(fd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req))
+ - Transmission process
+     setsockopt(fd, SOL_PACKET, PACKET_TX_RING, (void *) &req, sizeof(req))
 
 The most significant argument in the previous call is the req parameter, 
 this parameter must to have the following structure:
@@ -117,11 +186,11 @@ this parameter must to have the following structure:
     };
 
 This structure is defined in /usr/include/linux/if_packet.h and establishes a 
-circular buffer (ring) of unswappable memory mapped in the capture process. 
+circular buffer (ring) of unswappable memory.
 Being mapped in the capture process allows reading the captured frames and 
 related meta-information like timestamps without requiring a system call.
 
-Captured frames are grouped in blocks. Each block is a physically contiguous 
+Frames are grouped in blocks. Each block is a physically contiguous
 region of memory and holds tp_block_size/tp_frame_size frames. The total number 
 of blocks is tp_block_nr. Note that tp_frame_nr is a redundant parameter because
 
@@ -336,6 +405,7 @@ struct tpacket_hdr). If this field is 0 means that the frame is ready
 to be used for the kernel, If not, there is a frame the user can read 
 and the following flags apply:
 
++++ Capture process:
      from include/linux/if_packet.h
 
      #define TP_STATUS_COPY          2 
@@ -391,6 +461,36 @@ packets are in the ring:
 It doesn't incur in a race condition to first check the status value and 
 then poll for frames.
 
+
+++ Transmission process
+Those defines are also used for transmission:
+
+     #define TP_STATUS_KERNEL        0 // Frame is available
+     #define TP_STATUS_USER          1 // Frame will be sent on next send()
+     #define TP_STATUS_COPY          2 // Frame is currently in transmission
+
+First, the kernel initializes all frames to TP_STATUS_KERNEL. To send a packet,
+the user fills a data buffer of an available frame, sets tp_len to current
+data buffer size and sets its status field to TP_STATUS_USER. This can be done
+on multiple frames. Once the user is ready to transmit, it calls send().
+Then all buffers with status equal to TP_STATUS_USER are forwarded to the
+network device. The kernel updates each status of sent frames with
+TP_STATUS_COPY until the end of transfer.
+At the end of each transfer, buffer status returns to TP_STATUS_KERNEL.
+
+    header->tp_len = in_i_size;
+    header->tp_status = TP_STATUS_USER;
+    retval = send(this->socket, NULL, 0, 0);
+
+The user can also use poll() to check if a buffer is available:
+(status == TP_STATUS_KERNEL)
+
+    struct pollfd pfd;
+    pfd.fd = fd;
+    pfd.revents = 0;
+    pfd.events = POLLOUT;
+    retval = poll(&pfd, 1, timeout);
+
 --------------------------------------------------------------------------------
 + THANKS
 --------------------------------------------------------------------------------
diff --git a/include/linux/if_packet.h b/include/linux/if_packet.h
index 18db066..f6a247a 100644
--- a/include/linux/if_packet.h
+++ b/include/linux/if_packet.h
@@ -46,6 +46,7 @@ struct sockaddr_ll
 #define PACKET_VERSION			10
 #define PACKET_HDRLEN			11
 #define PACKET_RESERVE			12
+#define PACKET_TX_RING			13
 
 struct tpacket_stats
 {
@@ -100,7 +101,7 @@ struct tpacket2_hdr
 enum tpacket_versions
 {
 	TPACKET_V1,
-	TPACKET_V2,
+	TPACKET_V2
 };
 
 /*
diff --git a/net/packet/af_packet.c b/net/packet/af_packet.c
index c718e7e..c52462a 100644
--- a/net/packet/af_packet.c
+++ b/net/packet/af_packet.c
@@ -156,7 +156,23 @@ struct packet_mreq_max
 };
 
 #ifdef CONFIG_PACKET_MMAP
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing);
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req,
+		int closing, int tx_ring);
+
+struct packet_ring_buffer {
+	char *			*pg_vec;
+	unsigned int		head;
+	unsigned int		frames_per_block;
+	unsigned int		frame_size;
+	unsigned int		frame_max;
+
+	unsigned int		pg_vec_order;
+	unsigned int		pg_vec_pages;
+	unsigned int		pg_vec_len;
+};
+
+struct packet_sock;
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg);
 #endif
 
 static void packet_flush_mclist(struct sock *sk);
@@ -166,12 +182,10 @@ struct packet_sock {
 	struct sock		sk;
 	struct tpacket_stats	stats;
 #ifdef CONFIG_PACKET_MMAP
-	char *			*pg_vec;
-	unsigned int		head;
-	unsigned int            frames_per_block;
-	unsigned int		frame_size;
-	unsigned int		frame_max;
+	struct packet_ring_buffer	rx_ring;
+	struct packet_ring_buffer	tx_ring;
 	int			copy_thresh;
+	atomic_t		tx_pending_skb;
 #endif
 	struct packet_type	prot_hook;
 	spinlock_t		bind_lock;
@@ -183,9 +197,6 @@ struct packet_sock {
 	struct packet_mclist	*mclist;
 #ifdef CONFIG_PACKET_MMAP
 	atomic_t		mapped;
-	unsigned int            pg_vec_order;
-	unsigned int		pg_vec_pages;
-	unsigned int		pg_vec_len;
 	enum tpacket_versions	tp_version;
 	unsigned int		tp_hdrlen;
 	unsigned int		tp_reserve;
@@ -204,8 +215,10 @@ struct packet_skb_cb {
 
 #ifdef CONFIG_PACKET_MMAP
 
-static void *packet_lookup_frame(struct packet_sock *po, unsigned int position,
-				 int status)
+static void *packet_lookup_frame(struct packet_sock *po,
+		struct packet_ring_buffer *buff,
+		unsigned int position,
+		int status)
 {
 	unsigned int pg_vec_pos, frame_offset;
 	union {
@@ -214,25 +227,50 @@ static void *packet_lookup_frame(struct packet_sock *po, unsigned int position,
 		void *raw;
 	} h;
 
-	pg_vec_pos = position / po->frames_per_block;
-	frame_offset = position % po->frames_per_block;
+	pg_vec_pos = position / buff->frames_per_block;
+	frame_offset = position % buff->frames_per_block;
 
-	h.raw = po->pg_vec[pg_vec_pos] + (frame_offset * po->frame_size);
+	h.raw = buff->pg_vec[pg_vec_pos] + (frame_offset * buff->frame_size);
 	switch (po->tp_version) {
 	case TPACKET_V1:
-		if (status != h.h1->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL)
+		if (status != h.h1->tp_status)
 			return NULL;
 		break;
 	case TPACKET_V2:
-		if (status != h.h2->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL)
+		if (status != h.h2->tp_status)
 			return NULL;
 		break;
 	}
 	return h.raw;
 }
 
+static inline void *packet_current_rx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->rx_ring, po->rx_ring.head, status);
+}
+
+static inline void *packet_current_tx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->tx_ring, po->tx_ring.head, status);
+}
+
+static inline void *packet_previous_rx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->rx_ring.head ? po->rx_ring.head - 1 : po->rx_ring.frame_max;
+	return packet_lookup_frame(po, &po->rx_ring, previous, status);
+}
+
+static inline void *packet_previous_tx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->tx_ring.head ? po->tx_ring.head - 1 : po->tx_ring.frame_max;
+	return packet_lookup_frame(po, &po->tx_ring, previous, status);
+}
+
+static inline void packet_increment_head(struct packet_ring_buffer *buff)
+{
+	buff->head = buff->head != buff->frame_max ? buff->head+1 : 0;
+}
+
 static void __packet_set_status(struct packet_sock *po, void *frame, int status)
 {
 	union {
@@ -646,7 +684,7 @@ static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 		macoff = netoff - maclen;
 	}
 
-	if (macoff + snaplen > po->frame_size) {
+	if (macoff + snaplen > po->rx_ring.frame_size) {
 		if (po->copy_thresh &&
 		    atomic_read(&sk->sk_rmem_alloc) + skb->truesize <
 		    (unsigned)sk->sk_rcvbuf) {
@@ -659,16 +697,16 @@ static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 			if (copy_skb)
 				skb_set_owner_r(copy_skb, sk);
 		}
-		snaplen = po->frame_size - macoff;
+		snaplen = po->rx_ring.frame_size - macoff;
 		if ((int)snaplen < 0)
 			snaplen = 0;
 	}
 
 	spin_lock(&sk->sk_receive_queue.lock);
-	h.raw = packet_lookup_frame(po, po->head, TP_STATUS_KERNEL);
+	h.raw = packet_current_rx_frame(po, TP_STATUS_KERNEL);
 	if (!h.raw)
 		goto ring_is_full;
-	po->head = po->head != po->frame_max ? po->head+1 : 0;
+	packet_increment_head(&po->rx_ring);
 	po->stats.tp_packets++;
 	if (copy_skb) {
 		status |= TP_STATUS_COPY;
@@ -759,10 +797,212 @@ ring_is_full:
 	goto drop_n_restore;
 }
 
-#endif
+static void tpacket_destruct_skb(struct sk_buff *skb)
+{
+	struct packet_sock *po = pkt_sk(skb->sk);
+	void * ph;
 
+	BUG_ON(skb == NULL);
+	ph = packet_lookup_frame( po, &po->tx_ring, skb->mark, TP_STATUS_COPY);
 
-static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+	BUG_ON(ph == NULL);
+	BUG_ON(atomic_read(&po->tx_pending_skb) == 0);
+
+	atomic_dec(&po->tx_pending_skb);
+	__packet_set_status(po, ph, TP_STATUS_KERNEL);
+
+	sock_wfree(skb);
+}
+
+static int tpacket_fill_skb(struct packet_sock *po, struct sk_buff * skb, void * frame,
+		struct net_device *dev, int size_max, __be16 proto,
+		unsigned char * addr)
+{
+	union {
+		struct tpacket_hdr *h1;
+		struct tpacket2_hdr *h2;
+		void *raw;
+	} ph;
+	int to_write, offset, len, tp_len;
+	struct socket *sock = po->sk.sk_socket;
+	struct page *page;
+	void *data;
+	int err;
+
+
+	ph.raw = frame;
+
+	skb->protocol = proto;
+	skb->dev = dev;
+	skb->priority = po->sk.sk_priority;
+	skb->destructor = tpacket_destruct_skb;
+	skb->mark = po->tx_ring.head;
+
+	switch(po->tp_version) {
+	case TPACKET_V2:
+		tp_len = ph.h1->tp_len;
+		break;
+	default:
+		tp_len = ph.h1->tp_len;
+		break;
+	}
+
+	if (unlikely(tp_len > size_max)) {
+		printk(KERN_ERR "packet size is too long (%d > %d)\n",
+				tp_len, size_max);
+		return -EMSGSIZE;
+	}
+
+	skb_reserve(skb, LL_RESERVED_SPACE(dev));
+	data = ph.raw + po->tp_hdrlen;
+
+	if (sock->type == SOCK_DGRAM) {
+		err = dev_hard_header(skb, dev, ntohs(proto), addr,
+				NULL, tp_len);
+		if (unlikely(err < 0))
+			return -EINVAL;
+	} else if (dev->hard_header_len ) {
+		/* net device doesn't like empty head */
+		if(unlikely(tp_len <= dev->hard_header_len)) {
+			printk(KERN_ERR "packet size is too short "
+					"(%d < %d)\n", tp_len,
+					dev->hard_header_len);
+			return -EINVAL;
+		}
+
+		skb_push(skb, dev->hard_header_len);
+		err = skb_store_bits(skb, 0, data,
+				dev->hard_header_len);
+		if (unlikely(err))
+			return err;
+	}
+
+	err = -EFAULT;
+	to_write = tp_len - dev->hard_header_len;
+	data += dev->hard_header_len;
+	page = virt_to_page(data);
+	len = ((to_write > PAGE_SIZE) ? PAGE_SIZE : to_write);
+
+	offset = (int)((long)data & (~PAGE_MASK));
+	len -= offset;
+
+	skb->data_len = to_write;
+	skb->len += to_write;
+
+	while ( likely(to_write) ) {
+		get_page(page);
+		skb_fill_page_desc(skb,
+				skb_shinfo(skb)->nr_frags,
+				page++, offset, len);
+		to_write -= len;
+		len = (to_write > PAGE_SIZE) ? PAGE_SIZE : to_write;
+		offset = 0;
+	}
+
+
+	return tp_len;
+}
+
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg)
+{
+	struct socket *sock;
+	struct sk_buff *skb;
+	struct net_device *dev;
+	__be16 proto;
+	int ifindex, err, reserve = 0;
+	void * ph;
+	struct sockaddr_ll *saddr=(struct sockaddr_ll *)msg->msg_name;
+	int tp_len, size_max;
+	unsigned char *addr;
+	int len_sum = 0;
+
+	BUG_ON(po == NULL);
+	sock = po->sk.sk_socket;
+
+	if (saddr == NULL) {
+		ifindex	= po->ifindex;
+		proto	= po->num;
+		addr	= NULL;
+	} else {
+		err = -EINVAL;
+		if (msg->msg_namelen < sizeof(struct sockaddr_ll))
+			goto out;
+		if (msg->msg_namelen < (saddr->sll_halen + offsetof(struct sockaddr_ll, sll_addr)))
+			goto out;
+		ifindex	= saddr->sll_ifindex;
+		proto	= saddr->sll_protocol;
+		addr	= saddr->sll_addr;
+	}
+
+	dev = dev_get_by_index(sock_net(&po->sk), ifindex);
+	err = -ENXIO;
+	if (unlikely(dev == NULL))
+		goto out;
+
+	err = -EINVAL;
+	if (unlikely(sock->type != SOCK_RAW))
+		goto out_unlock;
+
+	reserve = dev->hard_header_len;
+
+	err = -ENETDOWN;
+	if (unlikely(!(dev->flags & IFF_UP)))
+		goto out_unlock;
+
+	size_max = po->tx_ring.frame_size - sizeof(struct skb_shared_info)
+		- po->tp_hdrlen - LL_ALLOCATED_SPACE(dev);
+
+	if (size_max > dev->mtu + reserve)
+		size_max = dev->mtu + reserve;
+
+
+	do
+	{
+		ph = packet_current_tx_frame(po, TP_STATUS_USER);
+		if(unlikely(ph == NULL))
+			continue;
+
+		skb = sock_alloc_send_skb(&po->sk, LL_ALLOCATED_SPACE(dev),
+				msg->msg_flags & MSG_DONTWAIT, &err);
+		if (unlikely(skb == NULL))
+			goto out_unlock;
+
+		__packet_set_status(po, ph, TP_STATUS_COPY);
+		atomic_inc(&po->tx_pending_skb);
+
+		tp_len = tpacket_fill_skb(po, skb, ph, dev, size_max, proto,
+				addr);
+		if(unlikely(tp_len < 0)) {
+			err = tp_len;
+			goto out_free;
+		}
+
+		err = dev_queue_xmit(skb);
+		if (unlikely(err > 0 && (err = net_xmit_errno(err)) != 0))
+			goto out_free;
+
+		packet_increment_head(&po->tx_ring);
+		len_sum += tp_len;
+	}
+	while(likely((ph != NULL)
+				|| ((!(msg->msg_flags & MSG_DONTWAIT))
+					&& atomic_read(&po->tx_pending_skb))));
+
+	err = len_sum;
+	goto out_unlock;
+
+out_free:
+	__packet_set_status(po, ph, TP_STATUS_USER);
+	atomic_dec(&po->tx_pending_skb);
+	kfree_skb(skb);
+out_unlock:
+	dev_put(dev);
+out:
+	return err;
+}
+#endif
+
+static int packet_snd(struct socket *sock,
 			  struct msghdr *msg, size_t len)
 {
 	struct sock *sk = sock->sk;
@@ -853,6 +1093,19 @@ out:
 	return err;
 }
 
+static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+		struct msghdr *msg, size_t len)
+{
+#ifdef CONFIG_PACKET_MMAP
+	struct sock *sk = sock->sk;
+	struct packet_sock *po = pkt_sk(sk);
+	if (po->tx_ring.pg_vec)
+		return tpacket_snd(po, msg);
+	else
+#endif
+		return packet_snd(sock, msg, len);
+}
+
 /*
  *	Close a PACKET socket. This is fairly simple. We immediately go
  *	to 'closed' state and remove our protocol entry in the device list.
@@ -891,10 +1144,13 @@ static int packet_release(struct socket *sock)
 	packet_flush_mclist(sk);
 
 #ifdef CONFIG_PACKET_MMAP
-	if (po->pg_vec) {
+	{
 		struct tpacket_req req;
 		memset(&req, 0, sizeof(req));
-		packet_set_ring(sk, &req, 1);
+		if (po->rx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 0);
+		if (po->tx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 1);
 	}
 #endif
 
@@ -1411,6 +1667,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 #ifdef CONFIG_PACKET_MMAP
 	case PACKET_RX_RING:
+	case PACKET_TX_RING:
 	{
 		struct tpacket_req req;
 
@@ -1418,7 +1675,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 			return -EINVAL;
 		if (copy_from_user(&req,optval,sizeof(req)))
 			return -EFAULT;
-		return packet_set_ring(sk, &req, 0);
+		return packet_set_ring(sk, &req, 0, optname == PACKET_TX_RING);
 	}
 	case PACKET_COPY_THRESH:
 	{
@@ -1438,7 +1695,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1457,7 +1714,7 @@ packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1701,13 +1958,17 @@ static unsigned int packet_poll(struct file * file, struct socket *sock,
 	unsigned int mask = datagram_poll(file, sock, wait);
 
 	spin_lock_bh(&sk->sk_receive_queue.lock);
-	if (po->pg_vec) {
-		unsigned last = po->head ? po->head-1 : po->frame_max;
-
-		if (packet_lookup_frame(po, last, TP_STATUS_USER))
+	if (po->rx_ring.pg_vec) {
+		if (packet_previous_rx_frame(po, TP_STATUS_USER))
 			mask |= POLLIN | POLLRDNORM;
 	}
 	spin_unlock_bh(&sk->sk_receive_queue.lock);
+	spin_lock_bh(&sk->sk_write_queue.lock);
+	if (po->tx_ring.pg_vec) {
+		if (packet_current_tx_frame(po, TP_STATUS_KERNEL))
+			mask |= POLLOUT | POLLWRNORM;
+	}
+	spin_unlock_bh(&sk->sk_write_queue.lock);
 	return mask;
 }
 
@@ -1783,20 +2044,24 @@ out_free_pgvec:
 	goto out;
 }
 
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing)
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing, int tx_ring)
 {
 	char **pg_vec = NULL;
 	struct packet_sock *po = pkt_sk(sk);
 	int was_running, order = 0;
+	struct packet_ring_buffer *rb;
+	struct sk_buff_head *rb_queue;
 	__be16 num;
 	int err = 0;
 
+	rb = tx_ring ? &po->tx_ring : &po->rx_ring;
+	rb_queue = tx_ring ? &sk->sk_write_queue : &sk->sk_receive_queue;
+
 	if (req->tp_block_nr) {
 		int i;
 
 		/* Sanity tests and some calculations */
-
-		if (unlikely(po->pg_vec))
+		if (unlikely(rb->pg_vec))
 			return -EBUSY;
 
 		switch (po->tp_version) {
@@ -1813,16 +2078,16 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 		if (unlikely(req->tp_block_size & (PAGE_SIZE - 1)))
 			return -EINVAL;
 		if (unlikely(req->tp_frame_size < po->tp_hdrlen +
-						  po->tp_reserve))
+					po->tp_reserve))
 			return -EINVAL;
 		if (unlikely(req->tp_frame_size & (TPACKET_ALIGNMENT - 1)))
 			return -EINVAL;
 
-		po->frames_per_block = req->tp_block_size/req->tp_frame_size;
-		if (unlikely(po->frames_per_block <= 0))
+		rb->frames_per_block = req->tp_block_size/req->tp_frame_size;
+		if (unlikely(rb->frames_per_block <= 0))
 			return -EINVAL;
-		if (unlikely((po->frames_per_block * req->tp_block_nr) !=
-			     req->tp_frame_nr))
+		if (unlikely((rb->frames_per_block * req->tp_block_nr) !=
+					req->tp_frame_nr))
 			return -EINVAL;
 
 		err = -ENOMEM;
@@ -1835,17 +2100,19 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 			void *ptr = pg_vec[i];
 			int k;
 
-			for (k = 0; k < po->frames_per_block; k++) {
+			for (k = 0; k < rb->frames_per_block; k++) {
 				__packet_set_status(po, ptr, TP_STATUS_KERNEL);
 				ptr += req->tp_frame_size;
 			}
 		}
-		/* Done */
-	} else {
+	}
+	/* Done */
+	else {
 		if (unlikely(req->tp_frame_nr))
 			return -EINVAL;
 	}
 
+
 	lock_sock(sk);
 
 	/* Detach socket from network */
@@ -1866,20 +2133,19 @@ static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 	if (closing || atomic_read(&po->mapped) == 0) {
 		err = 0;
 #define XC(a, b) ({ __typeof__ ((a)) __t; __t = (a); (a) = (b); __t; })
-
-		spin_lock_bh(&sk->sk_receive_queue.lock);
-		pg_vec = XC(po->pg_vec, pg_vec);
-		po->frame_max = (req->tp_frame_nr - 1);
-		po->head = 0;
-		po->frame_size = req->tp_frame_size;
-		spin_unlock_bh(&sk->sk_receive_queue.lock);
-
-		order = XC(po->pg_vec_order, order);
-		req->tp_block_nr = XC(po->pg_vec_len, req->tp_block_nr);
-
-		po->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
-		po->prot_hook.func = po->pg_vec ? tpacket_rcv : packet_rcv;
-		skb_queue_purge(&sk->sk_receive_queue);
+		spin_lock_bh(&rb_queue->lock);
+		pg_vec = XC(rb->pg_vec, pg_vec);
+		rb->frame_max = (req->tp_frame_nr - 1);
+		rb->head = 0;
+		rb->frame_size = req->tp_frame_size;
+		spin_unlock_bh(&rb_queue->lock);
+
+		order = XC(rb->pg_vec_order, order);
+		req->tp_block_nr = XC(rb->pg_vec_len, req->tp_block_nr);
+
+		rb->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
+		po->prot_hook.func = (po->rx_ring.pg_vec) ? tpacket_rcv : packet_rcv;
+		skb_queue_purge(rb_queue);
 #undef XC
 		if (atomic_read(&po->mapped))
 			printk(KERN_DEBUG "packet_mmap: vma is busy: %d\n", atomic_read(&po->mapped));
@@ -1906,7 +2172,8 @@ static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 {
 	struct sock *sk = sock->sk;
 	struct packet_sock *po = pkt_sk(sk);
-	unsigned long size;
+	unsigned long size, expected_size;
+	struct packet_ring_buffer *rb;
 	unsigned long start;
 	int err = -EINVAL;
 	int i;
@@ -1917,23 +2184,38 @@ static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 	size = vma->vm_end - vma->vm_start;
 
 	lock_sock(sk);
-	if (po->pg_vec == NULL)
+
+	expected_size = 0;
+	if (po->rx_ring.pg_vec)
+		expected_size += po->rx_ring.pg_vec_len * po->rx_ring.pg_vec_pages * PAGE_SIZE;
+	if (po->tx_ring.pg_vec)
+		expected_size += po->tx_ring.pg_vec_len * po->tx_ring.pg_vec_pages * PAGE_SIZE;
+
+	if (expected_size == 0)
 		goto out;
-	if (size != po->pg_vec_len*po->pg_vec_pages*PAGE_SIZE)
+
+	if (size != expected_size)
 		goto out;
 
 	start = vma->vm_start;
-	for (i = 0; i < po->pg_vec_len; i++) {
-		struct page *page = virt_to_page(po->pg_vec[i]);
-		int pg_num;
-
-		for (pg_num = 0; pg_num < po->pg_vec_pages; pg_num++, page++) {
-			err = vm_insert_page(vma, start, page);
-			if (unlikely(err))
-				goto out;
-			start += PAGE_SIZE;
+
+	for(rb = &po->rx_ring; rb <= &po->tx_ring; rb++) {
+		if (rb->pg_vec == NULL)
+			continue;
+
+		for (i = 0; i < rb->pg_vec_len; i++) {
+			struct page *page = virt_to_page(rb->pg_vec[i]);
+			int pg_num;
+
+			for (pg_num = 0; pg_num < rb->pg_vec_pages; pg_num++, page++) {
+				err = vm_insert_page(vma, start, page);
+				if (unlikely(err))
+					goto out;
+				start += PAGE_SIZE;
+			}
 		}
 	}
+
 	atomic_inc(&po->mapped);
 	vma->vm_ops = &packet_mmap_ops;
 	err = 0;



^ permalink raw reply related	[flat|nested] 59+ messages in thread

end of thread, other threads:[~2008-11-18 19:46 UTC | newest]

Thread overview: 59+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2008-10-30 13:00 [PATCH] Packet socket: mmapped IO: PACKET_TX_RING Johann Baudy
2008-10-30 18:21 ` Lovich, Vitali
  -- strict thread matches above, loose matches on Subject: below --
2008-11-12 13:19 Johann Baudy
2008-11-05 15:16 Johann Baudy
2008-11-05 17:49 ` Lovich, Vitali
2008-11-05 10:55 Johann Baudy
2008-11-05 11:02 ` Patrick McHardy
2008-11-05 17:32 ` Lovich, Vitali
2008-10-31 10:58 Johann Baudy
2008-10-31 17:07 ` Lovich, Vitali
2008-10-31 18:24   ` Lovich, Vitali
2008-11-04 22:45     ` Johann Baudy
2008-11-06  0:47       ` Lovich, Vitali
2008-11-06  8:03         ` Evgeniy Polyakov
2008-11-06 18:49           ` Lovich, Vitali
2008-11-06 19:40             ` Evgeniy Polyakov
2008-11-06 19:53               ` Lovich, Vitali
2008-11-07 16:36                 ` Johann Baudy
2008-11-07 17:19                   ` Lovich, Vitali
2008-11-10 20:29                     ` Lovich, Vitali
2008-11-11  0:29                       ` Lovich, Vitali
     [not found]                         ` <7e0dd21a0811110656yff651afp8ff0f9928b79f545@mail.gmail.com>
2008-11-11 14:59                           ` Johann Baudy
2008-11-11 19:05                             ` Lovich, Vitali
2008-11-11 12:10                       ` Johann Baudy
2008-11-11 17:44                         ` Lovich, Vitali
2008-11-11 18:08                           ` Johann Baudy
2008-11-11 18:19                             ` Lovich, Vitali
2008-11-11 18:59                               ` Johann Baudy
2008-11-11 19:10                                 ` Lovich, Vitali
2008-11-12 12:09                                   ` Johann Baudy
2008-11-12 17:12                                     ` Lovich, Vitali
2008-11-11 11:43                     ` Johann Baudy
2008-11-11 17:38                       ` Lovich, Vitali
2008-11-11 17:50                         ` Johann Baudy
2008-11-11 18:14                           ` Lovich, Vitali
2008-11-11 18:50                             ` Evgeniy Polyakov
2008-11-11 19:19                               ` Johann Baudy
2008-11-11 19:29                                 ` Evgeniy Polyakov
2008-11-12 13:43                                   ` Johann Baudy
2008-11-12 13:58                                     ` Evgeniy Polyakov
2008-11-12 17:07                                       ` Lovich, Vitali
2008-11-12 17:41                                         ` Evgeniy Polyakov
2008-11-12 17:59                                           ` Lovich, Vitali
2008-11-12 18:11                                             ` Evgeniy Polyakov
2008-11-12 19:05                                               ` Lovich, Vitali
2008-11-12 19:14                                                 ` Evgeniy Polyakov
2008-11-12 21:23                                                   ` Lovich, Vitali
2008-11-12 21:46                                                     ` Evgeniy Polyakov
2008-11-12 22:33                                                       ` Lovich, Vitali
2008-11-18 18:49                                                       ` Johann Baudy
2008-11-18 19:10                                                         ` Evgeniy Polyakov
2008-11-18 19:46                                                         ` Lovich, Vitali
2008-11-07 17:28                 ` Evgeniy Polyakov
2008-11-07 20:22                   ` David Miller
2008-10-31 20:28   ` Evgeniy Polyakov
2008-11-04 22:33   ` Johann Baudy
2008-11-05  1:50     ` Lovich, Vitali
2008-10-27  9:33 Johann Baudy
2008-10-28 22:44 ` David Miller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).