#define _GNU_SOURCE 1 #include "pthread.h" #include #include #include #include #include #include #include #ifdef __linux__ #include #include #define HAVE_FUTEX 1 #endif #ifdef _WIN32 #include #include #else #define HAVE_AFFINITY 1 #endif typedef struct EvCounter { int ctr; int waiters; int fast_signal; pthread_mutex_t lock; pthread_cond_t cond; } EvCounter; typedef int EvCounterState; void evcounter_get(EvCounterState *state, EvCounter *evcounter) { *state = evcounter->ctr; } #ifdef HAVE_FUTEX #define futex(...) syscall(__NR_futex, __VA_ARGS__) void _evcounter_init(EvCounter *evcounter) { evcounter->ctr = evcounter->waiters = 0; } int _evcounter_wait(EvCounterState *state, EvCounter *evcounter) { int fast = 1; __sync_fetch_and_add(&evcounter->waiters, 1); while (*state == evcounter->ctr) { futex(&evcounter->ctr, FUTEX_WAIT_PRIVATE, *state, NULL); fast = 0; } __sync_fetch_and_add(&evcounter->waiters, -1); *state = evcounter->ctr; return fast; } void _evcounter_signal(EvCounter *evcounter) { __sync_fetch_and_add(&evcounter->ctr, 1); if (evcounter->waiters != 0) { futex(&evcounter->ctr, FUTEX_WAKE_PRIVATE, INT_MAX); } else { evcounter->fast_signal++; } } #endif void _evcounter_init_cond(EvCounter *evcounter) { evcounter->ctr = 0; pthread_mutex_init (&evcounter->lock, NULL); pthread_cond_init (&evcounter->cond, NULL); } int _evcounter_wait_cond(EvCounterState *state, EvCounter *evcounter) { int fast = 1; pthread_mutex_lock(&evcounter->lock); while (*state == evcounter->ctr) { pthread_cond_wait(&evcounter->cond, &evcounter->lock); fast = 0; } pthread_mutex_unlock(&evcounter->lock); *state = evcounter->ctr; return fast; } void _evcounter_signal_cond(EvCounter *evcounter) { pthread_mutex_lock(&evcounter->lock); evcounter->ctr++; pthread_cond_broadcast(&evcounter->cond); pthread_mutex_unlock(&evcounter->lock); } void (*evcounter_init)(EvCounter *); int (*evcounter_wait)(EvCounterState *, EvCounter *); void (*evcounter_signal)(EvCounter *); EvCounter evc; volatile int stop = 0; int affinity = 0; long t_wait; long n_slow_wait; long n_wait; long t_signal; long n_signal; #define CLOCK_RES (sizeof(long) == 8 ? 1000000000LL : 1000000LL) static inline long long getclk() { #ifndef _WIN32 struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (ts.tv_sec * 1000000000LL + ts.tv_nsec) / (1000000000LL / CLOCK_RES); #else static LARGE_INTEGER freq, init; LARGE_INTEGER counter; if (!init.QuadPart) { QueryPerformanceFrequency(&freq); QueryPerformanceCounter(&init); } QueryPerformanceCounter(&counter); return (counter.QuadPart - init.QuadPart) * CLOCK_RES / freq.QuadPart; #endif } #ifdef _WIN32 void sleep(int secs) { Sleep (secs * 1000); } #endif void exp_usleep(int avg) { if (avg) { double x = rand() / ((double)RAND_MAX + 1.0); int usecs = (int) -avg * log(x); #ifdef _WIN32 Sleep(usecs / 1000); #else usleep(usecs); #endif } } void *generator(void *pavg) { int avg = *(int *)pavg; while (stop != 2) { exp_usleep(avg); long long t1 = getclk(); evcounter_signal(&evc); t_signal += getclk() - t1; n_signal++; } return NULL; } void *consumer(void *pavg) { EvCounterState s; int avg = *(int *)pavg; evcounter_get(&s, &evc); while (stop == 0) { long long t1 = getclk(); if (evcounter_wait(&s, &evc)) { __sync_fetch_and_add (&t_wait, getclk() - t1); __sync_fetch_and_add (&n_wait, 1); } else { __sync_fetch_and_add(&n_slow_wait, 1); } exp_usleep(avg); } return NULL; } void create_thread(pthread_t *p, void *(*f)(void *), void *arg) { pthread_create(p, NULL, f, arg); #ifdef HAVE_AFFINITY if (affinity) { static int nproc, i; cpu_set_t cpu_set; if (nproc == 0) nproc = sysconf (_SC_NPROCESSORS_ONLN); CPU_ZERO(&cpu_set); CPU_SET(i % nproc, &cpu_set); i++; pthread_setaffinity_np(*p, sizeof(cpu_set), &cpu_set); } #endif } int main(int argc, char **argv) { #ifdef HAVE_FUTEX evcounter_init = _evcounter_init; evcounter_wait = _evcounter_wait; evcounter_signal = _evcounter_signal; #else evcounter_init = _evcounter_init_cond; evcounter_wait = _evcounter_wait_cond; evcounter_signal = _evcounter_signal_cond; #endif while (argv[1] && argv[1][0] == '-') { if (strchr(argv[1], 'c')) { evcounter_init = _evcounter_init_cond; evcounter_wait = _evcounter_wait_cond; evcounter_signal = _evcounter_signal_cond; } if (!strcmp(argv[1], "-a")) { affinity = 1; } argc--, argv++; } evcounter_init (&evc); srand(time(NULL)); int think_avg = argc > 1 ? atoi(argv[1]) * 1000 : 100000; int throughput_avg = argc > 2 ? atoi(argv[2]) * 1000 : 10000; int n = argc > 3 ? atoi(argv[3]) : 10; int len = argc > 4 ? atoi(argv[4]) : 10; printf ("%d child processes, avg think time %d msec\n", n, think_avg/1000); printf ("Avg event distance %d msec. Running for %d sec\n", throughput_avg/1000, len); pthread_t g, c[n]; create_thread(&g, generator, &throughput_avg); int i; for (i = 0; i < n; i++) create_thread(&c[i], consumer, &think_avg); for (i = 0; i < len; i++) { write(1, ".", 1); sleep (1); } write (1, "\n", 1); stop = 1; for (i = 0; i < n; i++) pthread_join(c[i], NULL); stop = 2; pthread_join(g, NULL); printf ("waits\t\t %ld %.3f us (fast case)\n", n_wait, (double)t_wait/(CLOCK_RES/1000000.0)/n_wait); printf (" slow waits\t %ld\n", n_slow_wait); printf ("signal\t\t %ld %.3f us\n", n_signal, (double)t_signal/(CLOCK_RES/1000000.0)/n_signal); printf (" fast path\t %ld\n", evc.fast_signal); exit (0); }