#include "actl_thread_queue.h" #if 0 // __TEST__ #include actl_thread_queue q; #define ACTL_THREAD_QUEUE_NUMBER 50 void *producer_func(void *arg) { int n = 0; while(1) { printf("put queue[%d] = %d\n", pthread_self(), n); printf("rc = %d\n", q.put_queue(&q, &n, 0)); fflush(stdout); ++n; } } void *consumer_func(void *arg) { int n; while(1) { printf("rc = %d\n", q.get_queue(&q, &n, 0)); printf("get queue[%d] = %d\n", pthread_self(), n); fflush(stdout); } } int main() { pthread_t t1, t2; int i; if (actl_thread_queue_init(&q) != ACTL_THREAD_QUEUE_SUCCESS) { fprintf(stderr, "ACTL Thread Initializing Failed\n"); exit(-1); } q.create(&q, 3, sizeof(int)); q.destroy(&q); pthread_create(&t2, NULL, consumer_func, NULL); for (i = 0; i < ACTL_THREAD_QUEUE_NUMBER; i++) { pthread_create(&t1, NULL, producer_func, NULL); } pthread_join(t1, NULL); pthread_join(t2, NULL); } #endif // __TEST__ int actl_thread_queue_create(actl_thread_queue *q, int queue_size, int obj_size) { int i; q->queue_buffer = (char**)malloc(queue_size); for (i = 0; i < queue_size; i++) { q->queue_buffer[i] = (char*)malloc(obj_size); } q->queue_size = queue_size; q->obj_size = obj_size; q->is_full = q->is_empty = ACTL_THREAD_QUEUE_FALSE; pthread_mutex_init(&q->lock1, NULL); pthread_mutex_init(&q->lock2, NULL); pthread_cond_init(&q->cond1, NULL); pthread_cond_init(&q->cond2, NULL); return ACTL_THREAD_QUEUE_SUCCESS; } int actl_thread_queue_destroy(actl_thread_queue *q) { int i; for (i = 0; i < q->queue_size; i++) { free(q->queue_buffer[i]); } free(q->queue_buffer); pthread_mutex_destroy(&q->lock1); pthread_mutex_destroy(&q->lock2); pthread_cond_destroy(&q->cond1); pthread_cond_destroy(&q->cond2); return ACTL_THREAD_QUEUE_SUCCESS; } int actl_thread_queue_put_queue( actl_thread_queue *q, void *buf, unsigned long long usec) { int rc = ACTL_THREAD_QUEUE_SUCCESS; struct timespec ts; struct timeval tv; unsigned long long then; pthread_mutex_lock(&q->lock1); pthread_mutex_lock(&q->lock2); if (q->get_position == (q->put_position + 1) % q->queue_size) { q->is_full = ACTL_THREAD_QUEUE_TRUE; if (usec == 0) { rc = pthread_cond_wait(&q->cond1, &q->lock2); } else { gettimeofday(&tv, NULL); then = tv.tv_sec * ACTL_USEC_PER_SEC + tv.tv_usec + usec; ts.tv_sec = then / ACTL_USEC_PER_SEC; ts.tv_nsec = (then % ACTL_USEC_PER_SEC) * 1000; // nanoseconds rc = pthread_cond_timedwait(&q->cond1, &q->lock2, &ts); } if (rc != ACTL_THREAD_QUEUE_SUCCESS) { if (rc == ETIMEDOUT) { rc = ACTL_THREAD_QUEUE_FAILURE_TIMEOUT; } else { rc = ACTL_THREAD_QUEUE_FAILURE; } } } if (rc == ACTL_THREAD_QUEUE_SUCCESS) { memcpy(q->queue_buffer[q->put_position], buf, q->obj_size); q->put_position = (q->put_position + 1) % q->queue_size; } if (q->is_empty == ACTL_THREAD_QUEUE_TRUE) { pthread_cond_signal(&q->cond2); } q->is_empty = ACTL_THREAD_QUEUE_FALSE; pthread_mutex_unlock(&q->lock2); pthread_mutex_unlock(&q->lock1); return rc; } int actl_thread_queue_get_queue( actl_thread_queue *q, void *buf, unsigned long long usec) { int rc = ACTL_THREAD_QUEUE_SUCCESS; struct timespec ts; struct timeval tv; unsigned long long then; pthread_mutex_lock(&q->lock2); if (q->get_position == q->put_position) { q->is_empty = ACTL_THREAD_QUEUE_TRUE; if (usec == 0) { rc = pthread_cond_wait(&q->cond2, &q->lock2); } else { gettimeofday(&tv, NULL); then = tv.tv_sec * ACTL_USEC_PER_SEC + tv.tv_usec + usec; ts.tv_sec = then / ACTL_USEC_PER_SEC; ts.tv_nsec = (then % ACTL_USEC_PER_SEC) * 1000; // nanoseconds rc = pthread_cond_timedwait(&q->cond2, &q->lock2, &ts); } if (rc != ACTL_THREAD_QUEUE_SUCCESS) { if (rc == ETIMEDOUT) { rc = ACTL_THREAD_QUEUE_FAILURE_TIMEOUT; } else { rc = ACTL_THREAD_QUEUE_FAILURE; } } } if (rc == ACTL_THREAD_QUEUE_SUCCESS) { memcpy(buf, q->queue_buffer[q->get_position], q->obj_size); q->get_position = (q->get_position + 1) % q->queue_size; } if (q->is_full == ACTL_THREAD_QUEUE_TRUE) { pthread_cond_signal(&q->cond1); } q->is_full = ACTL_THREAD_QUEUE_FALSE; pthread_mutex_unlock(&q->lock2); return rc; } int actl_thread_queue_init(actl_thread_queue *q) { q->create = actl_thread_queue_create; q->destroy = actl_thread_queue_destroy; q->put_queue = actl_thread_queue_put_queue; q->get_queue = actl_thread_queue_get_queue; return ACTL_THREAD_QUEUE_SUCCESS; }