#ifdef HAVE_CONFIG_H #include #endif //#include //#include #include //signal #include //mlockall, MCL_CURRENT, MCL_FUTURE #include #include // CXXFLAGS: // -D_REENTRANT -Wall -pipe -I/usr/src/linux/include -I/usr/xenomai/include // LDFLAGS: // -lnative -L/usr/xenomai/lib // old compile options flag #define USE_READ_WRITE #define PERIOD 10000000 //10ms //#define PERIOD 100000 //100us #define QUEUE_INPUT_LEN 1024 #define CPU_ID 0 RT_TASK task_main, task_producer, task_consumer; RT_QUEUE queue_input; void *cookie = NULL; static volatile bool finished = false; struct TInputData { int counter; int data[16]; }; static void sighandler(int dummy) { finished = true; } void producer(void *cookie) { TInputData sendData; memset(&sendData, 0, sizeof(TInputData)); while(!finished) { rt_task_wait_period(NULL); #ifdef USE_READ_WRITE int bytesSent = rt_queue_write(&queue_input, &sendData, sizeof(TInputData), Q_NORMAL); if (bytesSent <= 0) { printf("rt_queue_write(queue_input) failed: %d\n", bytesSent); finished = true; continue; } #else void *msg = rt_queue_alloc(&queue_input, sizeof(TInputData)); if(msg == NULL) { printf("rt_queue_alloc(queue_inout, %d) failed\n", sizeof(TInputData)); finished = true; continue; } memcpy(msg, &sendData, sizeof(TInputData)); int bytesSent = rt_queue_send(&queue_input, msg, sizeof(TInputData), Q_NORMAL); if (bytesSent <= 0) { printf("rt_queue_send(queue_input) failed: %d\n", bytesSent); rt_queue_free(&queue_input, msg); finished = true; continue; } #endif sendData.counter++; } } void consumer(void *cookie) { TInputData receiveData; receiveData.counter = -1; while(!finished) { int counter = receiveData.counter; #ifdef USE_READ_WRITE int bytesRead = rt_queue_read(&queue_input, &receiveData, sizeof(TInputData), TM_INFINITE); if (bytesRead <= 0) { printf("rt_queue_read(queue_input) failed: %d\n", bytesRead); finished = true; continue; } #else void *msg; int bytesRead = rt_queue_receive(&queue_input, &msg, TM_INFINITE); if (bytesRead > 0) { memcpy(&receiveData, msg, sizeof(TInputData)); rt_queue_free(&queue_input, msg); } if (bytesRead <=0) { printf("rt_queue_receive(queue_input) failed: %d\n", bytesRead); finished = true; continue; } #endif else if (receiveData.counter != counter + 1) { printf("counter error: %d\t->\t%d\n", counter + 1, receiveData.counter); } else { printf("%d\n", receiveData.counter); } } } int main(int argc, char *argv[]) { int res; res = mlockall(MCL_CURRENT | MCL_FUTURE); if (res < 0) { printf("mlockall failed: %d\n", res); goto exit; } //MAIN res = rt_task_shadow( &task_main, // task descriptor "main", // task name 10, // priority T_FPU | T_CPU(CPU_ID) // mode T_FPU, T_CPU(i), T_SUSP ); if (res < 0) { printf("rt_task_shadow(task_main) failed: %d\n", res); goto cleanup_main; } // INPUT_QUEUE res = rt_queue_create(&queue_input, "queue_input", sizeof(TInputData) * QUEUE_INPUT_LEN, QUEUE_INPUT_LEN, Q_FIFO | Q_SHARED); if (res == -EEXIST) { res = rt_queue_bind(&queue_input, "queue_input", 1000); //rt_queue_clear(&queue_input); } if (res < 0) { printf("rt_queue_create(queue_input) failed: %d\n", res); goto cleanup_queue_input; } // CONSUMER res = rt_task_create( &task_consumer, // task descriptor "consumer", // task name 512*1024, // stack size 20, // priority T_FPU | T_CPU(CPU_ID) | T_JOINABLE // mode T_FPU, T_CPU(i), T_SUSP ); if (res < 0) { printf("rt_task_create(consumer) failed: %d\n", res); goto cleanup_consumer; } // PRODUCER res = rt_task_create( &task_producer, // task descriptor "producer", // task name 256*1024, // stack size 30, // priority T_FPU | T_CPU(CPU_ID) | T_JOINABLE // mode T_FPU, T_CPU(i), T_SUSP ); if (res < 0) { printf("rt_task_create(producer) failed: %d\n", res); goto cleanup_producer; } res = rt_task_start(&task_consumer, consumer, cookie); rt_task_set_periodic( &task_producer, rt_timer_read() + 5 * rt_timer_ns2ticks(PERIOD), rt_timer_ns2ticks(PERIOD) ); res = rt_task_start(&task_producer, producer, cookie); signal(SIGINT, sighandler); signal(SIGTERM, sighandler); signal(SIGHUP, sighandler); signal(SIGALRM, sighandler); while(!finished) pause(); printf("cleanup(producer)\n"); rt_task_unblock(&task_producer); rt_task_join(&task_producer); cleanup_producer: printf("cleanup(consumer)\n"); rt_task_unblock(&task_consumer); rt_task_join(&task_consumer); cleanup_consumer: printf("cleanup(queue_input)\n"); rt_queue_delete(&queue_input); cleanup_queue_input: cleanup_main: exit: if (res >= 0) return EXIT_SUCCESS; else return res; }