#include #include #include #include #include #include #include #include #include #include #include #include #include #include #define QUEUE_NUM 2 #define BL 65536 struct queued_pkt { char *qp_payload; int qp_pktlen; int qp_id; struct timeval qp_recv; STAILQ_ENTRY(queued_pkt) entries; }; struct queue_data { struct nfq_q_handle* q_handle; int q_id; pthread_t q_thread; pthread_mutex_t q_mutex; pthread_cond_t q_condvar; int q_delay; int q_jitter; double q_mu; double q_sigma; double q_xsi; STAILQ_HEAD(,queued_pkt) q_head; }; void* worker_thread (void*); int callback (struct nfq_q_handle*, struct nfgenmsg*, struct nfq_data*, void*); int main() { int rv, fd, i; struct nfq_handle *h; struct queue_data *queues, *q; char *buf; queues = (struct queue_data*) calloc (QUEUE_NUM, sizeof (struct queue_data)); buf = (char*) malloc (BL); if (!(h = nfq_open())) { perror ("open handle"); exit (1); } if (nfq_unbind_pf (h, AF_INET) < 0) { perror ("unbind NFQUEUE"); nfq_close (h); exit (1); } if (nfq_bind_pf (h, AF_INET) < 0) { perror ("bind nfnetlink"); nfq_close (h); exit (1); } for (i=0; iq_head)); q->q_id = i; pthread_mutex_init (&(q->q_mutex), NULL); pthread_cond_init (&(q->q_condvar), NULL); q->q_xsi = .25; q->q_delay = 100; q->q_jitter = 10; q->q_sigma = ((double) q->q_jitter / 1000.) * (1. - q->q_xsi) * sqrt (1. - 2. * q->q_xsi); q->q_mu = ((double) q->q_delay / 1000.) - q->q_sigma / (1. - q->q_xsi); fprintf (stderr, "Queue %d: xsi %.3f, sigma %.3f, mu %.3f\n", i, q->q_xsi, q->q_sigma, q->q_mu); if (!(q->q_handle = nfq_create_queue (h, i, &callback, q))) { perror ("create queue"); nfq_close (h); exit (1); } if (nfq_set_mode (q->q_handle, NFQNL_COPY_META, 0) < 0) { perror ("set mode"); nfq_destroy_queue (q->q_handle); nfq_close (h); exit (1); } nfq_set_queue_maxlen (q->q_handle, 20240); pthread_create (&(q->q_thread), NULL, &worker_thread, (void*) q); } fd = nfq_fd (h); while (1) { rv = recv (fd, buf, BL, MSG_TRUNC); if (rv < 0 && errno == EINTR) continue; if (rv > BL) { fprintf (stderr, "No space\n"); continue; } nfq_handle_packet (h, buf, rv); } //nfq_destroy_queue (qh); //nfq_unbind_pf (h, AF_INET); nfq_close (h); free (buf); return 0; } int callback (struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, struct nfq_data *nfad, void *data) { struct queue_data *queue = (struct queue_data*) data; struct queued_pkt *pkt; char *pl; struct nfqnl_msg_packet_hdr *ph; pkt = (struct queued_pkt*) calloc (1, sizeof (struct queued_pkt)); if (!(ph = nfq_get_msg_packet_hdr (nfad))) { perror ("get hdr"); return 0; } pkt->qp_id = htonl (ph->packet_id); gettimeofday (&pkt->qp_recv, NULL); /* if ((pkt->qp_pktlen = nfq_get_payload (nfad, &pl)) > 0) { pkt->qp_payload = (char*) malloc (pkt->qp_pktlen); memcpy (pkt->qp_payload, pl, pkt->qp_pktlen); }*/ pthread_mutex_lock (&(queue->q_mutex)); STAILQ_INSERT_TAIL (&(queue->q_head), pkt, entries); pthread_cond_signal (&(queue->q_condvar)); pthread_mutex_unlock (&(queue->q_mutex)); return 0; } void* worker_thread (void *data) { struct queue_data *queue = (struct queue_data*) data; struct queued_pkt *pkt; struct timeval cur_time; struct timespec deq_time; double real_delay; double cur_ts, deq_ts, recv_ts; char *buf; while (1) { pthread_mutex_lock (&(queue->q_mutex)); while (STAILQ_EMPTY (&(queue->q_head))) pthread_cond_wait (&(queue->q_condvar), &(queue->q_mutex)); /* Dequeue packet */ pkt = STAILQ_FIRST (&(queue->q_head)); STAILQ_REMOVE_HEAD (&(queue->q_head), entries); pthread_mutex_unlock (&(queue->q_mutex)); real_delay = queue->q_mu + queue->q_sigma * (pow ((double) (random()) / (double) RAND_MAX, -1. * queue->q_xsi) - 1.) / queue->q_xsi; //real_delay = 100e-3; recv_ts = (double) pkt->qp_recv.tv_sec + (double) pkt->qp_recv.tv_usec / 1000000.; deq_ts = recv_ts + real_delay; gettimeofday (&cur_time, NULL); cur_ts = (double) cur_time.tv_sec + (double) cur_time.tv_usec / 1000000.; real_delay = deq_ts - cur_ts; printf ("Queue %d Packet ID %d, REC (%d, %d) delay %.3fus\n", queue->q_id, pkt->qp_id, pkt->qp_recv.tv_sec, pkt->qp_recv.tv_usec, real_delay * 1000000.); if (real_delay > 0) { deq_time.tv_sec = real_delay; deq_time.tv_nsec = (real_delay - deq_time.tv_sec) * 1000000000; nanosleep (&deq_time, NULL); } if (pkt->qp_pktlen) { nfq_set_verdict (queue->q_handle, pkt->qp_id, NF_ACCEPT, pkt->qp_pktlen, pkt->qp_payload); free (pkt->qp_payload); } else nfq_set_verdict (queue->q_handle, pkt->qp_id, NF_ACCEPT, 0, NULL); free (pkt); } return 0; }