netfilter.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* libnetfilter_queue question
@ 2011-05-04  6:14 nowhere
  2011-05-04 18:13 ` Alessandro Vesely
  0 siblings, 1 reply; 10+ messages in thread
From: nowhere @ 2011-05-04  6:14 UTC (permalink / raw)
  To: netfilter

[-- Attachment #1: Type: text/plain, Size: 2687 bytes --]

Hello, dear list,

I'm now experimenting with subj, and have a problem which I can not
solve.

There is a simple application, which do the following:

1. Register a queue (or several queues)
2. Reads metadata from nfqueue in a cycle
3. Spreads it into multiple software queues, one queue per nfqueue
4. Worker threads apply some delay according to distribution law
5. Worker threads accept packets, which (if I understand correctly)
still reside in kernel netfilter queue

I have done all the steps to allow for larger kernel queues:
  - sysctl net.core.{r,w}mem{_max,}=16M
  - tc qdisc add dev eth0 root pfifo limit 20000

So I with all these I see no drops on interfaces, interface queue or
netfilter queue (in /proc/net/netfilter/nfnetlink_queue)

Then I do the following to test the setup:
iptables -t mangle -A POSTROUTING -p icmp -d 10.77.130.72 -j NFQUEUE
--queue-num 1

and then start ping. If i do normal ping, everything works like expected

$ping 10.77.130.72
PING 10.77.130.72 (10.77.130.72) 56(84) bytes of data.
64 bytes from 10.77.130.72: icmp_req=1 ttl=64 time=97.0 ms
64 bytes from 10.77.130.72: icmp_req=2 ttl=64 time=97.1 ms
64 bytes from 10.77.130.72: icmp_req=3 ttl=64 time=97.6 ms
64 bytes from 10.77.130.72: icmp_req=4 ttl=64 time=93.6 ms
64 bytes from 10.77.130.72: icmp_req=5 ttl=64 time=101 ms
64 bytes from 10.77.130.72: icmp_req=6 ttl=64 time=94.8 ms

Packets are passed to the target host, delay is applied. Stats from
application and fro iptables counters show consistent figures.

But when I issue flood ping I see this:
$ sudo ping 10.77.130.72 -i0
PING 10.77.130.72 (10.77.130.72) 56(84) bytes of data.
64 bytes from 10.77.130.72: icmp_req=1 ttl=64 time=111 ms
64 bytes from 10.77.130.72: icmp_req=8 ttl=64 time=118 ms
64 bytes from 10.77.130.72: icmp_req=9 ttl=64 time=114 ms
64 bytes from 10.77.130.72: icmp_req=10 ttl=64 time=104 ms
64 bytes from 10.77.130.72: icmp_req=11 ttl=64 time=93.5 ms
64 bytes from 10.77.130.72: icmp_req=12 ttl=64 time=93.9 ms
64 bytes from 10.77.130.72: icmp_req=13 ttl=64 time=94.3 ms
64 bytes from 10.77.130.72: icmp_req=14 ttl=64 time=101 ms
64 bytes from 10.77.130.72: icmp_req=15 ttl=64 time=96.8 ms

There are 7 packets dropped at the beginning. I see same results when
testing with iperf. Several packets at the beginning get lost.
iptables counters show, that NFQUEUE rule has processed all the packets
(15 in this example), app debug output shows 15 processed packets,
nfqueue stat show no drops, tc -s -d qdisc show dev eth0 shows no drops
in the interface queue. But tcpdump has caught only 9 packets on remote
and on local hosts.

There is app's source code here. Maybe, I'm doing something wrong in it?

Thanks

[-- Attachment #2: main.c --]
[-- Type: text/x-csrc, Size: 5324 bytes --]

#include <libnetfilter_queue/libnetfilter_queue.h>

#include <netinet/in.h>
#include <linux/netfilter.h>

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>

#include <sys/socket.h>
#include <arpa/inet.h>

#include <sys/time.h>
#include <time.h>

#include <math.h>

#include <pthread.h>
#include <sys/queue.h>

#define QUEUE_NUM	2

#define BL 		65536

struct queued_pkt {
  char *qp_payload;
  int qp_pktlen;
  int qp_id;
  struct timeval qp_recv;
  STAILQ_ENTRY(queued_pkt) entries;
};

struct queue_data {
  struct nfq_q_handle* q_handle;
  int q_id;
  pthread_t q_thread;
  pthread_mutex_t q_mutex;
  pthread_cond_t q_condvar;
  int q_delay;
  int q_jitter;
  double q_mu;
  double q_sigma;
  double q_xsi;
  STAILQ_HEAD(,queued_pkt) q_head;
};

void* worker_thread (void*);

int callback (struct nfq_q_handle*,
              struct nfgenmsg*,
	      struct nfq_data*, void*);

int main() {
  int rv, fd, i;
  struct nfq_handle *h;

  struct queue_data *queues, *q;
  char *buf;

  queues = (struct queue_data*) calloc (QUEUE_NUM, sizeof (struct queue_data));
  buf = (char*) malloc (BL);

  
  if (!(h = nfq_open())) {
    perror ("open handle");
    exit (1);
  }

  if (nfq_unbind_pf (h, AF_INET) < 0) {
    perror ("unbind NFQUEUE");
    nfq_close (h);
    exit (1);
  }

  if (nfq_bind_pf (h, AF_INET) < 0) {
    perror ("bind nfnetlink");
    nfq_close (h);
    exit (1);
  }

  for (i=0; i<QUEUE_NUM; i++) {

    q = queues + i;
    STAILQ_INIT (&(q->q_head));

    q->q_id = i;
    pthread_mutex_init (&(q->q_mutex), NULL);
    pthread_cond_init (&(q->q_condvar), NULL);

    q->q_xsi = .25;
    q->q_delay = 100;
    q->q_jitter = 10;
    q->q_sigma = ((double) q->q_jitter / 1000.) * (1. - q->q_xsi) * sqrt (1. - 2. * q->q_xsi);
    q->q_mu = ((double) q->q_delay / 1000.) - q->q_sigma / (1. - q->q_xsi);

    fprintf (stderr, "Queue %d: xsi %.3f, sigma %.3f, mu %.3f\n", i, q->q_xsi, q->q_sigma, q->q_mu);

    if (!(q->q_handle = nfq_create_queue (h, i, &callback, q))) {
      perror ("create queue");
      nfq_close (h);
      exit (1);
    }

    if (nfq_set_mode (q->q_handle, NFQNL_COPY_META, 0) < 0) {
      perror ("set mode");
      nfq_destroy_queue (q->q_handle);
      nfq_close (h);
      exit (1);
    }

    nfq_set_queue_maxlen (q->q_handle, 20240);
    pthread_create (&(q->q_thread), NULL, &worker_thread, (void*) q);

  }
  
  fd = nfq_fd (h);
  
  while (1) {
    
    rv = recv (fd, buf, BL, MSG_TRUNC);
    if (rv < 0 && errno == EINTR) continue;
    if (rv > BL) {
      fprintf (stderr, "No space\n");
      continue;
    }

    nfq_handle_packet (h, buf, rv);

  }

  //nfq_destroy_queue (qh);
  //nfq_unbind_pf (h, AF_INET);
  nfq_close (h);
  free (buf);
  return 0;
}

int callback (struct nfq_q_handle *qh,
	      struct nfgenmsg *nfmsg,
	      struct nfq_data *nfad, void *data) {
   
   struct queue_data *queue = (struct queue_data*) data;
   struct queued_pkt *pkt;
   char *pl;
   struct nfqnl_msg_packet_hdr *ph;
   
   pkt = (struct queued_pkt*) calloc (1, sizeof (struct queued_pkt));

   if (!(ph = nfq_get_msg_packet_hdr (nfad))) {
       perror ("get hdr");
       return 0;
   }

   pkt->qp_id = htonl (ph->packet_id);
   
   gettimeofday (&pkt->qp_recv, NULL);
/*   if ((pkt->qp_pktlen = nfq_get_payload (nfad, &pl)) > 0) {
     pkt->qp_payload = (char*) malloc (pkt->qp_pktlen);
     memcpy (pkt->qp_payload, pl, pkt->qp_pktlen);
   }*/

   pthread_mutex_lock (&(queue->q_mutex));
   STAILQ_INSERT_TAIL (&(queue->q_head), pkt, entries);
   pthread_cond_signal (&(queue->q_condvar));
   pthread_mutex_unlock (&(queue->q_mutex));
   return 0;
}


void* worker_thread (void *data) {
   struct queue_data *queue = (struct queue_data*) data;
   struct queued_pkt *pkt;

   struct timeval cur_time;
   struct timespec deq_time;
   double real_delay;

   double cur_ts, deq_ts, recv_ts;

   char *buf;

   while (1) {
     pthread_mutex_lock (&(queue->q_mutex));

     while (STAILQ_EMPTY (&(queue->q_head)))
       pthread_cond_wait (&(queue->q_condvar), &(queue->q_mutex));

     /* Dequeue packet */
     pkt = STAILQ_FIRST (&(queue->q_head));

     STAILQ_REMOVE_HEAD (&(queue->q_head), entries);
     pthread_mutex_unlock (&(queue->q_mutex));

     real_delay = queue->q_mu + queue->q_sigma * (pow ((double) (random()) / (double) RAND_MAX, -1. * queue->q_xsi) - 1.) / 
     					queue->q_xsi;

     //real_delay = 100e-3;

     recv_ts = (double) pkt->qp_recv.tv_sec + (double) pkt->qp_recv.tv_usec / 1000000.;
     deq_ts = recv_ts + real_delay;

     gettimeofday (&cur_time, NULL);
     cur_ts = (double) cur_time.tv_sec + (double) cur_time.tv_usec / 1000000.;

     real_delay = deq_ts - cur_ts;
       printf ("Queue %d Packet ID %d, REC (%d, %d) delay %.3fus\n", queue->q_id, pkt->qp_id, 
                 pkt->qp_recv.tv_sec, pkt->qp_recv.tv_usec, real_delay * 1000000.);
     if (real_delay > 0) {
       deq_time.tv_sec = real_delay;
       deq_time.tv_nsec = (real_delay - deq_time.tv_sec) * 1000000000; 

       nanosleep (&deq_time, NULL);
     }

     if (pkt->qp_pktlen) {
       nfq_set_verdict (queue->q_handle, pkt->qp_id, NF_ACCEPT, pkt->qp_pktlen, pkt->qp_payload);
       free (pkt->qp_payload);
     } else
       nfq_set_verdict (queue->q_handle, pkt->qp_id, NF_ACCEPT, 0, NULL);

     free (pkt);
   }

   return 0;
}

^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2011-05-13 18:25 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2011-05-04  6:14 libnetfilter_queue question nowhere
2011-05-04 18:13 ` Alessandro Vesely
2011-05-04 18:32   ` Nikolay S.
2011-05-05  9:12     ` Alessandro Vesely
2011-05-05  9:24       ` nowhere
2011-05-11 17:27         ` NFQUEUE looses packets between arrival and verdict Alessandro Vesely
2011-05-11 22:56           ` Ed W
2011-05-12  9:40           ` nowhere
2011-05-12 18:03             ` NFQUEUE the plot is growing Alessandro Vesely
2011-05-13 18:25               ` Nikolay S.

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).