쓰레드풀 쓰레드 리턴값
글쓴이: jang5497 / 작성시간: 목, 2014/12/11 - 1:52오후
쓰레드풀을 사용해서 서버 스트레스 체크 프로그램을 만들고 있습니다.
쓰레드풀에서 쓰레드를 생성하고 쓰레드가 서버의 소켓을 주고 받으면서 그 수행속도를 합산하려고 하는데요.
쓰레드가 각 작업을마치면 작업에 체크한 수치들을 리턴하고 싶습니다만...... 함수가 void로 되어 어떻게 리턴받을지 생각이 안나네요.... Pthread_join을 쓰자니 쓰레드를 아에 종료 시켜버려서
쓰레드풀에서는 힘들것 같고...
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "threadpool.h"
typedef enum {
immediate_shutdown = 1,
graceful_shutdown = 2
} threadpool_shutdown_t;
/**
* @struct threadpool_task
* @brief the work struct
*
* @var function Pointer to the function that will perform the task.
* @var argument Argument to be passed to the function.
*/
typedef struct {
void (*function)(void *);
void *argument;
} threadpool_task_t;
/**
* @struct threadpool
* @brief The threadpool struct
*
* @var notify Condition variable to notify worker threads.
* @var threads Array containing worker threads ID.
* @var thread_count Number of threads
* @var queue Array containing the task queue.
* @var queue_size Size of the task queue.
* @var head Index of the first element.
* @var tail Index of the next element.
* @var count Number of pending tasks
* @var shutdown Flag indicating if the pool is shutting down
* @var started Number of started threads
*/
struct threadpool_t {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_t *threads;
threadpool_task_t *queue;
int thread_count;
int queue_size;
int head;
int tail;
int count;
int shutdown;
int started;
};
/**
* @function void *threadpool_thread(void *threadpool)
* @brief the worker thread
* @param threadpool the pool which own the thread
*/
static void *threadpool_thread(void *threadpool);
int threadpool_free(threadpool_t *pool);
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
{
threadpool_t *pool;
int i;
/* TODO: Check for negative or otherwise very big input parameters */
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
goto err;
}
/* Initialize */
pool->thread_count = 0;
pool->queue_size = queue_size;
pool->head = pool->tail = pool->count = 0;
pool->shutdown = pool->started = 0;
/* Allocate thread and task queue */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
pool->queue = (threadpool_task_t *)malloc
(sizeof(threadpool_task_t) * queue_size);
/* Initialize mutex and conditional variable first */
if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||
(pthread_cond_init(&(pool->notify), NULL) != 0) ||
(pool->threads == NULL) ||
(pool->queue == NULL)) {
goto err;
}
/* Start worker threads */
for(i = 0; i < thread_count; i++) {
if(pthread_create(&(pool->threads[i]), NULL,
threadpool_thread, (void*)pool) != 0) {
threadpool_destroy(pool, 0);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
err:
if(pool) {
threadpool_free(pool);
}
return NULL;
}
int threadpool_add(threadpool_t *pool, void (*function)(void *),
void *argument, int flags)
{
int err = 0;
int next;
if(pool == NULL || function == NULL) {
return threadpool_invalid;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
next = pool->tail + 1;
next = (next == pool->queue_size) ? 0 : next;
do {
/* Are we full ? */
if(pool->count == pool->queue_size) {
err = threadpool_queue_full;
break;
}
/* Are we shutting down ? */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
/* Add task to queue */
pool->queue[pool->tail].function = function;
pool->queue[pool->tail].argument = argument;
pool->tail = next;
pool->count += 1;
/* pthread_cond_broadcast */
if(pthread_cond_signal(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
break;
}
} while(0);
if(pthread_mutex_unlock(&pool->lock) != 0) {
err = threadpool_lock_failure;
}
return err;
}
int threadpool_destroy(threadpool_t *pool, int flags)
{
int i, err = 0;
if(pool == NULL) {
return threadpool_invalid;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
do {
/* Already shutting down */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
pool->shutdown = (flags & threadpool_graceful) ?
graceful_shutdown : immediate_shutdown;
/* Wake up all worker threads */
if((pthread_cond_broadcast(&(pool->notify)) != 0) ||
(pthread_mutex_unlock(&(pool->lock)) != 0)) {
err = threadpool_lock_failure;
break;
}
/* Join all worker thread */
for(i = 0; i < pool->thread_count; i++) {
if(pthread_join(pool->threads[i], NULL) != 0) {
err = threadpool_thread_failure;
}
}
} while(0);
/* Only if everything went well do we deallocate the pool */
if(!err) {
threadpool_free(pool);
}
return err;
}
int threadpool_free(threadpool_t *pool)
{
if(pool == NULL || pool->started > 0) {
return -1;
}
/* Did we manage to allocate ? */
if(pool->threads) {
free(pool->threads);
free(pool->queue);
/* Because we allocate pool->threads after initializing the
mutex and condition variable, we're sure they're
initialized. Let's lock the mutex just in case. */
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
}
free(pool);
return 0;
}
static void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
for(;;) {
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
while((pool->count == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
/* Grab our task */
task.function = pool->queue[pool->head].function;
task.argument = pool->queue[pool->head].argument;
pool->head += 1;
pool->head = (pool->head == pool->queue_size) ? 0 : pool->head;
pool->count -= 1;
/* Unlock */
pthread_mutex_unlock(&(pool->lock));
/* Get to work */
(*(task.function))(task.argument);
}
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return(NULL);
}보시다싶이 소스를 가져다써 참고하고 있긴한데 어떻게 void형을 return 시켜 각 합을 구할수 있을까 고민되네요....
Forums:


1. future 패턴을 사용해서 해결하는
1. future 패턴을 사용해서 해결하는 방법.
2. 전역 객체를 사용하는 방법
- 2.1. 작업 쓰레드의 결과를 저장할 수 있는 전역 객체를 선언합니다. (동시접근을 막을 mutex, 수행될 작업의 수를 저장하는 변수를 포함하는)
- 2.2. 작업 쓰레드별로 task.function의 끝부분에서 전역 객체에 연산 결과를 update합니다.
- 2.3. main에서는 쓰레드 풀에 작업을 맡깁니다.
- 2.4. main에서는 루프를 돌며, 전역 객체의 업데이트된 수행 완료 작업의 수를 주기적으로 체크합니다.
- 2.5. 수행 완료 작업의 수가 추가했던 작업의 수와 같아지면, 루프를 탈출하고, 결과를 확인합니다.
Signature :) - "여유를 갖고 행동하되 게을러지지 말자"
한개의 전역 변수를 만드는 것보다, 스레드별로 변수를
한개의 전역 변수를 만드는 것보다, 스레드별로 변수를 만들고
각 스레드는 자신의 변수에 내용을 추가하고
별도의 하나의 스레드 혹은 메인 스레드가 저 변수들을 취합.
---------
간디가 말한 우리를 파괴시키는 7가지 요소
첫째, 노동 없는 부(富)/둘째, 양심 없는 쾌락
셋째, 인격 없는 지! 식/넷째, 윤리 없는 비지니스
이익추구를 위해서라면..
다섯째, 인성(人性)없는 과학
여섯째, 희생 없는 종교/일곱째, 신념 없는 정치
댓글 달기