/* * $ gcc -o mq mq.c -pthread -Wall * $ ./mq */ #include #include #include #include #include #include #include #include #include #include #define THREAD_NR (10) #define DATA_NR (20) #define TEST_1 (1L) #define TEST_2 (2L) typedef struct { int no; int no_inv; } TEST_DATA_1; typedef struct { int no; char str[32]; } TEST_DATA_2; struct msg_buf { long mtype; union { struct { char bytes[0]; } msg; TEST_DATA_1 test_data_1; TEST_DATA_2 test_data_2; } d; }; static struct thread_info { int id; int mq; int ok; pthread_t thread_1; pthread_t thread_2; } thread_infos[THREAD_NR]; #define countof(x) (sizeof(x)/sizeof(x[0])) static void *test_1_handler(void *arg) { const struct thread_info *ti = (const struct thread_info *)arg; int count = 0; size_t ret = 0; struct msg_buf msg; TEST_DATA_1 *data = &msg.d.test_data_1; do { ret = msgrcv(ti->mq, &msg, 1024/*sizeof(msg.d)*/, TEST_1, 0); if (ret < 0) { perror("msgrcv"); break; } if (ret != sizeof(*data)) fprintf(stderr, "something wrong\n"); if (msg.mtype != TEST_1) fprintf(stderr, "something wrong#2\n"); printf("thread %d.1 got %d bytes on %dth iter from mq %d," " data.no:%d data.no_inv:%d\n", ti->id, ret, count, ti->mq, data->no, data->no_inv); } while (ret >= 0); return NULL; } static void *test_2_handler(void *arg) { const struct thread_info *ti = (const struct thread_info *)arg; int count = 0; size_t ret = 0; struct msg_buf msg; TEST_DATA_2 *data = &msg.d.test_data_2; do { ret = msgrcv(ti->mq, &msg, 1024/*sizeof(msg.d)*/, TEST_2, 0); if (ret < 0) { perror("msgrcv"); break; } if (ret != sizeof(*data)) fprintf(stderr, "something wrong\n"); if (msg.mtype != TEST_2) fprintf(stderr, "something wrong#2\n"); printf("thread %d.2 got %d bytes on %dth iter from mq %d," " data.no:%d data.str:%s\n", ti->id, ret, count, ti->mq, data->no, data->str); } while (ret >= 0); return NULL; } static void deinitmsgqvariable(void) { int i; for (i = 0; i < countof(thread_infos); i++) { struct thread_info *ti = &thread_infos[i]; if (ti->ok == 0) continue; ti->ok = 0; pthread_kill(ti->thread_1, SIGTERM); pthread_kill(ti->thread_2, SIGTERM); msgctl(ti->mq, IPC_RMID, NULL); } } static int initmsgqvariable(void) { int i, ret = 0; memset(thread_infos, 0, sizeof(thread_infos)); for (i = 0; i < countof(thread_infos); i++) { struct thread_info *ti = &thread_infos[i]; ti->id = i; ret = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | 0600); if (ret < 0) { perror("msgget"); break; } ti->mq = ret; ret = pthread_create(&ti->thread_1, NULL, test_1_handler, ti); if (ret) { fprintf(stderr,"pthread_create: %s\n", strerror(ret)); msgctl(ti->mq, IPC_RMID, NULL); ret = -ret; break; } ret = pthread_create(&ti->thread_2, NULL, test_2_handler, ti); if (ret) { fprintf(stderr,"pthread_create: %s\n", strerror(ret)); pthread_kill(ti->thread_1, SIGTERM); msgctl(ti->mq, IPC_RMID, NULL); ret = -ret; break; } ti->ok = 1; } if (ret < 0) deinitmsgqvariable(); return ret; } int main(int argc, char **argv) { int i, ret; if (initmsgqvariable() < 0) return EXIT_FAILURE; for (i = 0; i < DATA_NR; i++) { /* test message 1 */ int n = i % countof(thread_infos); struct thread_info *ti = &thread_infos[n]; struct msg_buf msg; /* test 1 */ { msg.mtype = TEST_1; TEST_DATA_1 *data = &msg.d.test_data_1; data->no = i; data->no_inv = DATA_NR - i; ret = msgsnd(ti->mq, &msg, sizeof(*data), IPC_NOWAIT); if (ret < 0) perror("msgsnd"); } /* test 2 */ { msg.mtype = TEST_2; TEST_DATA_2 *data = &msg.d.test_data_2; data->no = i; snprintf(data->str, sizeof(data->str), "DATA %d", i); ret = msgsnd(ti->mq, &msg, sizeof(*data), IPC_NOWAIT); if (ret < 0) perror("msgsnd"); } } sleep(1); deinitmsgqvariable(); return EXIT_SUCCESS; }