쓰레드풀 쓰레드 리턴값
글쓴이: jang5497 / 작성시간: 목, 2014/12/11 - 1:52오후
쓰레드풀을 사용해서 서버 스트레스 체크 프로그램을 만들고 있습니다.
쓰레드풀에서 쓰레드를 생성하고 쓰레드가 서버의 소켓을 주고 받으면서 그 수행속도를 합산하려고 하는데요.
쓰레드가 각 작업을마치면 작업에 체크한 수치들을 리턴하고 싶습니다만...... 함수가 void로 되어 어떻게 리턴받을지 생각이 안나네요.... Pthread_join을 쓰자니 쓰레드를 아에 종료 시켜버려서
쓰레드풀에서는 힘들것 같고...
#include <stdlib.h> #include <pthread.h> #include <unistd.h> #include "threadpool.h" typedef enum { immediate_shutdown = 1, graceful_shutdown = 2 } threadpool_shutdown_t; /** * @struct threadpool_task * @brief the work struct * * @var function Pointer to the function that will perform the task. * @var argument Argument to be passed to the function. */ typedef struct { void (*function)(void *); void *argument; } threadpool_task_t; /** * @struct threadpool * @brief The threadpool struct * * @var notify Condition variable to notify worker threads. * @var threads Array containing worker threads ID. * @var thread_count Number of threads * @var queue Array containing the task queue. * @var queue_size Size of the task queue. * @var head Index of the first element. * @var tail Index of the next element. * @var count Number of pending tasks * @var shutdown Flag indicating if the pool is shutting down * @var started Number of started threads */ struct threadpool_t { pthread_mutex_t lock; pthread_cond_t notify; pthread_t *threads; threadpool_task_t *queue; int thread_count; int queue_size; int head; int tail; int count; int shutdown; int started; }; /** * @function void *threadpool_thread(void *threadpool) * @brief the worker thread * @param threadpool the pool which own the thread */ static void *threadpool_thread(void *threadpool); int threadpool_free(threadpool_t *pool); threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) { threadpool_t *pool; int i; /* TODO: Check for negative or otherwise very big input parameters */ if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { goto err; } /* Initialize */ pool->thread_count = 0; pool->queue_size = queue_size; pool->head = pool->tail = pool->count = 0; pool->shutdown = pool->started = 0; /* Allocate thread and task queue */ pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); pool->queue = (threadpool_task_t *)malloc (sizeof(threadpool_task_t) * queue_size); /* Initialize mutex and conditional variable first */ if((pthread_mutex_init(&(pool->lock), NULL) != 0) || (pthread_cond_init(&(pool->notify), NULL) != 0) || (pool->threads == NULL) || (pool->queue == NULL)) { goto err; } /* Start worker threads */ for(i = 0; i < thread_count; i++) { if(pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0) { threadpool_destroy(pool, 0); return NULL; } pool->thread_count++; pool->started++; } return pool; err: if(pool) { threadpool_free(pool); } return NULL; } int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags) { int err = 0; int next; if(pool == NULL || function == NULL) { return threadpool_invalid; } if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } next = pool->tail + 1; next = (next == pool->queue_size) ? 0 : next; do { /* Are we full ? */ if(pool->count == pool->queue_size) { err = threadpool_queue_full; break; } /* Are we shutting down ? */ if(pool->shutdown) { err = threadpool_shutdown; break; } /* Add task to queue */ pool->queue[pool->tail].function = function; pool->queue[pool->tail].argument = argument; pool->tail = next; pool->count += 1; /* pthread_cond_broadcast */ if(pthread_cond_signal(&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } } while(0); if(pthread_mutex_unlock(&pool->lock) != 0) { err = threadpool_lock_failure; } return err; } int threadpool_destroy(threadpool_t *pool, int flags) { int i, err = 0; if(pool == NULL) { return threadpool_invalid; } if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } do { /* Already shutting down */ if(pool->shutdown) { err = threadpool_shutdown; break; } pool->shutdown = (flags & threadpool_graceful) ? graceful_shutdown : immediate_shutdown; /* Wake up all worker threads */ if((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } /* Join all worker thread */ for(i = 0; i < pool->thread_count; i++) { if(pthread_join(pool->threads[i], NULL) != 0) { err = threadpool_thread_failure; } } } while(0); /* Only if everything went well do we deallocate the pool */ if(!err) { threadpool_free(pool); } return err; } int threadpool_free(threadpool_t *pool) { if(pool == NULL || pool->started > 0) { return -1; } /* Did we manage to allocate ? */ if(pool->threads) { free(pool->threads); free(pool->queue); /* Because we allocate pool->threads after initializing the mutex and condition variable, we're sure they're initialized. Let's lock the mutex just in case. */ pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_cond_destroy(&(pool->notify)); } free(pool); return 0; } static void *threadpool_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task; for(;;) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from pthread_cond_wait(), we own the lock. */ while((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } /* Grab our task */ task.function = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; pool->head += 1; pool->head = (pool->head == pool->queue_size) ? 0 : pool->head; pool->count -= 1; /* Unlock */ pthread_mutex_unlock(&(pool->lock)); /* Get to work */ (*(task.function))(task.argument); } pool->started--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return(NULL); }
보시다싶이 소스를 가져다써 참고하고 있긴한데 어떻게 void형을 return 시켜 각 합을 구할수 있을까 고민되네요....
Forums:
1. future 패턴을 사용해서 해결하는
1. future 패턴을 사용해서 해결하는 방법.
2. 전역 객체를 사용하는 방법
- 2.1. 작업 쓰레드의 결과를 저장할 수 있는 전역 객체를 선언합니다. (동시접근을 막을 mutex, 수행될 작업의 수를 저장하는 변수를 포함하는)
- 2.2. 작업 쓰레드별로 task.function의 끝부분에서 전역 객체에 연산 결과를 update합니다.
- 2.3. main에서는 쓰레드 풀에 작업을 맡깁니다.
- 2.4. main에서는 루프를 돌며, 전역 객체의 업데이트된 수행 완료 작업의 수를 주기적으로 체크합니다.
- 2.5. 수행 완료 작업의 수가 추가했던 작업의 수와 같아지면, 루프를 탈출하고, 결과를 확인합니다.
Signature :) - "여유를 갖고 행동하되 게을러지지 말자"
한개의 전역 변수를 만드는 것보다, 스레드별로 변수를
한개의 전역 변수를 만드는 것보다, 스레드별로 변수를 만들고
각 스레드는 자신의 변수에 내용을 추가하고
별도의 하나의 스레드 혹은 메인 스레드가 저 변수들을 취합.
---------
간디가 말한 우리를 파괴시키는 7가지 요소
첫째, 노동 없는 부(富)/둘째, 양심 없는 쾌락
셋째, 인격 없는 지! 식/넷째, 윤리 없는 비지니스
이익추구를 위해서라면..
다섯째, 인성(人性)없는 과학
여섯째, 희생 없는 종교/일곱째, 신념 없는 정치
댓글 달기