All of lore.kernel.org
 help / color / mirror / Atom feed
* libnetfilter_queue question
@ 2010-05-19 10:15 Victor Julien
  2010-05-19 20:10 ` Eric Leblond
  0 siblings, 1 reply; 9+ messages in thread
From: Victor Julien @ 2010-05-19 10:15 UTC (permalink / raw)
  To: netfilter-devel

Hi all,

I'm using libnetfilter_queue for inline mode in the Suricata IDS/IPS
(www.openinfosecfoundation.org). I'm using a callback that makes the
packet(s) available to the detection engine. In some special cases the
call back could fail (only malloc failure atm).

I was wondering what the proper response would be to such an event. I'm
assuming nfq_handle_packet() would return an (non zero) error code in
that case.

Should I verdict the packet? (drop to be safe)

Cheers,
Victor

-- 
---------------------------------------------
Victor Julien
http://www.inliniac.net/
PGP: http://www.inliniac.net/victorjulien.asc
---------------------------------------------


^ permalink raw reply	[flat|nested] 9+ messages in thread
* libnetfilter_queue question
@ 2011-05-04  6:14 nowhere
  2011-05-04 18:13 ` Alessandro Vesely
  0 siblings, 1 reply; 9+ messages in thread
From: nowhere @ 2011-05-04  6:14 UTC (permalink / raw)
  To: netfilter

[-- Attachment #1: Type: text/plain, Size: 2687 bytes --]

Hello, dear list,

I'm now experimenting with subj, and have a problem which I can not
solve.

There is a simple application, which do the following:

1. Register a queue (or several queues)
2. Reads metadata from nfqueue in a cycle
3. Spreads it into multiple software queues, one queue per nfqueue
4. Worker threads apply some delay according to distribution law
5. Worker threads accept packets, which (if I understand correctly)
still reside in kernel netfilter queue

I have done all the steps to allow for larger kernel queues:
  - sysctl net.core.{r,w}mem{_max,}=16M
  - tc qdisc add dev eth0 root pfifo limit 20000

So I with all these I see no drops on interfaces, interface queue or
netfilter queue (in /proc/net/netfilter/nfnetlink_queue)

Then I do the following to test the setup:
iptables -t mangle -A POSTROUTING -p icmp -d 10.77.130.72 -j NFQUEUE
--queue-num 1

and then start ping. If i do normal ping, everything works like expected

$ping 10.77.130.72
PING 10.77.130.72 (10.77.130.72) 56(84) bytes of data.
64 bytes from 10.77.130.72: icmp_req=1 ttl=64 time=97.0 ms
64 bytes from 10.77.130.72: icmp_req=2 ttl=64 time=97.1 ms
64 bytes from 10.77.130.72: icmp_req=3 ttl=64 time=97.6 ms
64 bytes from 10.77.130.72: icmp_req=4 ttl=64 time=93.6 ms
64 bytes from 10.77.130.72: icmp_req=5 ttl=64 time=101 ms
64 bytes from 10.77.130.72: icmp_req=6 ttl=64 time=94.8 ms

Packets are passed to the target host, delay is applied. Stats from
application and fro iptables counters show consistent figures.

But when I issue flood ping I see this:
$ sudo ping 10.77.130.72 -i0
PING 10.77.130.72 (10.77.130.72) 56(84) bytes of data.
64 bytes from 10.77.130.72: icmp_req=1 ttl=64 time=111 ms
64 bytes from 10.77.130.72: icmp_req=8 ttl=64 time=118 ms
64 bytes from 10.77.130.72: icmp_req=9 ttl=64 time=114 ms
64 bytes from 10.77.130.72: icmp_req=10 ttl=64 time=104 ms
64 bytes from 10.77.130.72: icmp_req=11 ttl=64 time=93.5 ms
64 bytes from 10.77.130.72: icmp_req=12 ttl=64 time=93.9 ms
64 bytes from 10.77.130.72: icmp_req=13 ttl=64 time=94.3 ms
64 bytes from 10.77.130.72: icmp_req=14 ttl=64 time=101 ms
64 bytes from 10.77.130.72: icmp_req=15 ttl=64 time=96.8 ms

There are 7 packets dropped at the beginning. I see same results when
testing with iperf. Several packets at the beginning get lost.
iptables counters show, that NFQUEUE rule has processed all the packets
(15 in this example), app debug output shows 15 processed packets,
nfqueue stat show no drops, tc -s -d qdisc show dev eth0 shows no drops
in the interface queue. But tcpdump has caught only 9 packets on remote
and on local hosts.

There is app's source code here. Maybe, I'm doing something wrong in it?

Thanks

[-- Attachment #2: main.c --]
[-- Type: text/x-csrc, Size: 5324 bytes --]

#include <libnetfilter_queue/libnetfilter_queue.h>

#include <netinet/in.h>
#include <linux/netfilter.h>

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>

#include <sys/socket.h>
#include <arpa/inet.h>

#include <sys/time.h>
#include <time.h>

#include <math.h>

#include <pthread.h>
#include <sys/queue.h>

#define QUEUE_NUM	2

#define BL 		65536

struct queued_pkt {
  char *qp_payload;
  int qp_pktlen;
  int qp_id;
  struct timeval qp_recv;
  STAILQ_ENTRY(queued_pkt) entries;
};

struct queue_data {
  struct nfq_q_handle* q_handle;
  int q_id;
  pthread_t q_thread;
  pthread_mutex_t q_mutex;
  pthread_cond_t q_condvar;
  int q_delay;
  int q_jitter;
  double q_mu;
  double q_sigma;
  double q_xsi;
  STAILQ_HEAD(,queued_pkt) q_head;
};

void* worker_thread (void*);

int callback (struct nfq_q_handle*,
              struct nfgenmsg*,
	      struct nfq_data*, void*);

int main() {
  int rv, fd, i;
  struct nfq_handle *h;

  struct queue_data *queues, *q;
  char *buf;

  queues = (struct queue_data*) calloc (QUEUE_NUM, sizeof (struct queue_data));
  buf = (char*) malloc (BL);

  
  if (!(h = nfq_open())) {
    perror ("open handle");
    exit (1);
  }

  if (nfq_unbind_pf (h, AF_INET) < 0) {
    perror ("unbind NFQUEUE");
    nfq_close (h);
    exit (1);
  }

  if (nfq_bind_pf (h, AF_INET) < 0) {
    perror ("bind nfnetlink");
    nfq_close (h);
    exit (1);
  }

  for (i=0; i<QUEUE_NUM; i++) {

    q = queues + i;
    STAILQ_INIT (&(q->q_head));

    q->q_id = i;
    pthread_mutex_init (&(q->q_mutex), NULL);
    pthread_cond_init (&(q->q_condvar), NULL);

    q->q_xsi = .25;
    q->q_delay = 100;
    q->q_jitter = 10;
    q->q_sigma = ((double) q->q_jitter / 1000.) * (1. - q->q_xsi) * sqrt (1. - 2. * q->q_xsi);
    q->q_mu = ((double) q->q_delay / 1000.) - q->q_sigma / (1. - q->q_xsi);

    fprintf (stderr, "Queue %d: xsi %.3f, sigma %.3f, mu %.3f\n", i, q->q_xsi, q->q_sigma, q->q_mu);

    if (!(q->q_handle = nfq_create_queue (h, i, &callback, q))) {
      perror ("create queue");
      nfq_close (h);
      exit (1);
    }

    if (nfq_set_mode (q->q_handle, NFQNL_COPY_META, 0) < 0) {
      perror ("set mode");
      nfq_destroy_queue (q->q_handle);
      nfq_close (h);
      exit (1);
    }

    nfq_set_queue_maxlen (q->q_handle, 20240);
    pthread_create (&(q->q_thread), NULL, &worker_thread, (void*) q);

  }
  
  fd = nfq_fd (h);
  
  while (1) {
    
    rv = recv (fd, buf, BL, MSG_TRUNC);
    if (rv < 0 && errno == EINTR) continue;
    if (rv > BL) {
      fprintf (stderr, "No space\n");
      continue;
    }

    nfq_handle_packet (h, buf, rv);

  }

  //nfq_destroy_queue (qh);
  //nfq_unbind_pf (h, AF_INET);
  nfq_close (h);
  free (buf);
  return 0;
}

int callback (struct nfq_q_handle *qh,
	      struct nfgenmsg *nfmsg,
	      struct nfq_data *nfad, void *data) {
   
   struct queue_data *queue = (struct queue_data*) data;
   struct queued_pkt *pkt;
   char *pl;
   struct nfqnl_msg_packet_hdr *ph;
   
   pkt = (struct queued_pkt*) calloc (1, sizeof (struct queued_pkt));

   if (!(ph = nfq_get_msg_packet_hdr (nfad))) {
       perror ("get hdr");
       return 0;
   }

   pkt->qp_id = htonl (ph->packet_id);
   
   gettimeofday (&pkt->qp_recv, NULL);
/*   if ((pkt->qp_pktlen = nfq_get_payload (nfad, &pl)) > 0) {
     pkt->qp_payload = (char*) malloc (pkt->qp_pktlen);
     memcpy (pkt->qp_payload, pl, pkt->qp_pktlen);
   }*/

   pthread_mutex_lock (&(queue->q_mutex));
   STAILQ_INSERT_TAIL (&(queue->q_head), pkt, entries);
   pthread_cond_signal (&(queue->q_condvar));
   pthread_mutex_unlock (&(queue->q_mutex));
   return 0;
}


void* worker_thread (void *data) {
   struct queue_data *queue = (struct queue_data*) data;
   struct queued_pkt *pkt;

   struct timeval cur_time;
   struct timespec deq_time;
   double real_delay;

   double cur_ts, deq_ts, recv_ts;

   char *buf;

   while (1) {
     pthread_mutex_lock (&(queue->q_mutex));

     while (STAILQ_EMPTY (&(queue->q_head)))
       pthread_cond_wait (&(queue->q_condvar), &(queue->q_mutex));

     /* Dequeue packet */
     pkt = STAILQ_FIRST (&(queue->q_head));

     STAILQ_REMOVE_HEAD (&(queue->q_head), entries);
     pthread_mutex_unlock (&(queue->q_mutex));

     real_delay = queue->q_mu + queue->q_sigma * (pow ((double) (random()) / (double) RAND_MAX, -1. * queue->q_xsi) - 1.) / 
     					queue->q_xsi;

     //real_delay = 100e-3;

     recv_ts = (double) pkt->qp_recv.tv_sec + (double) pkt->qp_recv.tv_usec / 1000000.;
     deq_ts = recv_ts + real_delay;

     gettimeofday (&cur_time, NULL);
     cur_ts = (double) cur_time.tv_sec + (double) cur_time.tv_usec / 1000000.;

     real_delay = deq_ts - cur_ts;
       printf ("Queue %d Packet ID %d, REC (%d, %d) delay %.3fus\n", queue->q_id, pkt->qp_id, 
                 pkt->qp_recv.tv_sec, pkt->qp_recv.tv_usec, real_delay * 1000000.);
     if (real_delay > 0) {
       deq_time.tv_sec = real_delay;
       deq_time.tv_nsec = (real_delay - deq_time.tv_sec) * 1000000000; 

       nanosleep (&deq_time, NULL);
     }

     if (pkt->qp_pktlen) {
       nfq_set_verdict (queue->q_handle, pkt->qp_id, NF_ACCEPT, pkt->qp_pktlen, pkt->qp_payload);
       free (pkt->qp_payload);
     } else
       nfq_set_verdict (queue->q_handle, pkt->qp_id, NF_ACCEPT, 0, NULL);

     free (pkt);
   }

   return 0;
}

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2011-05-05  9:24 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-05-19 10:15 libnetfilter_queue question Victor Julien
2010-05-19 20:10 ` Eric Leblond
2010-05-21 16:02   ` Victor Julien
2010-05-27 13:25     ` Victor Julien
  -- strict thread matches above, loose matches on Subject: below --
2011-05-04  6:14 nowhere
2011-05-04 18:13 ` Alessandro Vesely
2011-05-04 18:32   ` Nikolay S.
2011-05-05  9:12     ` Alessandro Vesely
2011-05-05  9:24       ` nowhere

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.