/* * Userspace program to read the "sgi.test" sunrpc channel * and respond with the MD5 sum of the word passed in the * upcall. Used to provide userspace behaviour to test * the sunrpc cache upcall mechanism. * * Copyright (c) 2008 Silicon Graphics, Inc. * All Rights Reserved. * By Greg Banks */ #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef TRUE #define TRUE 1 #endif #ifndef FALSE #define FALSE 0 #endif #define min(x,y) ((x)<=(y)?(x):(y)) int blocksize = 128; int timeout_ms = -1; int entry_life_s = 10; int delay_ms = 0; int nthreads = 1; char *argv0; const struct option opts[] = { {"blocksize",TRUE,NULL,'b'}, {"timeout",TRUE,NULL,'T'}, {"entry-life",TRUE,NULL,'l'}, {"delay",TRUE,NULL,'d'}, {"threads",TRUE,NULL,'t'}, {NULL,0,NULL,0} }; static void usage(int ec) { fprintf(stderr, "Usage: %s [options]\n", argv0); fputs( " [options] are:\n" "--blocksize=BYTES\n" "-b BYTES Specify the size in bytes of the read()s performed\n" " on the upcall file descriptor; default 128.\n" "--timeout=MILLISEC\n" "-T MILLISEC Specify the timeout when waiting for activity\n" " on the upcall file descriptor, or -1 to wait\n" " forever. Default -1.\n" "--entry-life=SEC\n" "-l SEC Specify how long into the future each upcall\n" " reply will be scheduled to live. Default 10.\n" "--delay=MILLISEC\n" "-d MILLISEC Specify a delay between reading the upcall\n" " and replying, or 0 for no delay. Default 0.\n" "--threads=INT\n" "-t INT Specify the number of reader threads to spawn,\n" " default 1.\n" ,stderr); fflush(stderr); exit(ec); } static void parse_args(int argc, char **argv) { int c; int idx; argv0 = strrchr(argv[0], '/'); if (argv0 == NULL) argv0 = argv[0]; else argv0++; while ((c = getopt_long(argc, argv, "b:T:l:d:t:", opts, &idx)) >= 0) { switch (c) { case 'b': blocksize = atoi(optarg); if (blocksize < 1 || blocksize > 1024*1024) { fprintf(stderr, "%s: bad value for option --blocksize \"%s\"\n", argv0, optarg); usage(1); } break; case 't': nthreads = atoi(optarg); if (nthreads < 1 || nthreads > 128) { fprintf(stderr, "%s: bad value for option --threads \"%s\"\n", argv0, optarg); usage(1); } break; case 'T': timeout_ms = atoi(optarg); break; case 'l': entry_life_s = atoi(optarg); break; case 'd': delay_ms = atoi(optarg); break; default: usage(1); } } } static int read_message(FILE *fp, char *buf, int maxlen) { int len = 0; int n; int bs; struct pollfd pfd; int nreads = 0; pfd.fd = fileno(fp); pfd.events = POLLIN; pfd.revents = 0; n = poll(&pfd, 1, timeout_ms); if (n < 0) return -1; if (n == 0) { errno = ETIMEDOUT; return -1; } if (n > 1) { errno = EINVAL; return -1; } while (len < maxlen) { bs = min(blocksize, maxlen-len); n = read(fileno(fp), buf+len, bs); nreads++; if (n > bs) { fprintf(stderr, "%s: WTF? asked for %d bytes got %d\n", argv0, bs, n); errno = EINVAL; return -1; } if (n < 0) return -1; if (n == 0) break; /* no more data, must be end of message */ len += n; if (buf[len-1] == '\n') break; /* end of message indicator */ } if (nreads > 1) fprintf(stderr, "%s: read %d bytes in %d calls\n", argv0, len, nreads); return len; } static void hex_to_ascii(const unsigned char *d, int len, char *out) { while (len > 1) { static const char hexdigits[] = "0123456789ABCDEF"; *out++ = hexdigits[(*d) >> 4]; *out++ = hexdigits[(*d) & 0xf]; d++; len--; } *out = '\0'; } static int handle_message(FILE *fp, char *buf, int len) { time_t expiry; char *p; unsigned char digest[MD5_DIGEST_LENGTH]; char digest_ascii[MD5_DIGEST_LENGTH*2+1]; /* ensure the message is nul-terminated */ buf[len] = '\0'; /* remove any trailing whitespace */ while (len > 0 && isspace(buf[len-1])) buf[--len] = '\0'; /* check for any stray newlines */ if ((p = strchr(buf, '\n'))) { fprintf(stderr, "%s: newline in message, ignoring %d bytes\n", argv0, (int)((buf+len) - p)); *p = '\0'; } if (len == 0) { fprintf(stderr, "%s: zero length message, ignoring\n", argv0); return 0; } MD5((unsigned char *)buf, len, digest); hex_to_ascii(digest, sizeof(digest), digest_ascii); time(&expiry); expiry += entry_life_s; fprintf(stderr, "%s: got message: \"%s\" -> digest %s expires %ld\n", argv0, buf, digest_ascii, (long)expiry); if (delay_ms) { fprintf(stderr, "%s: sleeping %d ms\n", argv0, delay_ms); poll(NULL, 0, delay_ms); } fprintf(fp, "%s %ld %s\n", buf, (long)expiry, digest_ascii); fflush(fp); return 0; } static int bind_to_cpu(int cpu) { cpu_set_t set; CPU_ZERO(&set); CPU_SET(cpu, &set); return sched_setaffinity(0, CPU_SETSIZE, &set); } static pid_t *children; int nchildren = 0; static void kill_children(void) { int i; for (i=0 ; i 0) kill(children[i], SIGKILL); } } static void wait_for_children(void) { int i; int status; pid_t pid; int remaining = nchildren; while (remaining > 0) { pid = waitpid(0, &status, 0); if (pid < 0) { if (errno != ESRCH) perror("waitpid"); break; } for (i=0 ; i 1) loop_flag = spawn_children(); if (loop_flag == 1) { while ((len = read_message(fp, buf, buf_size)) >= 0) handle_message(fp, buf, len); if (len < 0) perror(filename); } fclose(fp); } int main(int argc, char **argv) { parse_args(argc, argv); doit(); return 0; }