thread pool이 작동을 안하네요....
글쓴이: jang5497 / 작성시간: 화, 2014/11/25 - 9:11오후
https://github.com/Pithikos/C-Thread-Pool
여기서 퍼온 소스입니다.
완성된 소스라서 그런지 컴파일은 잘되나 결과같이 제대로 나오지않네요....
기대결과 값은 함수를 호출하여
Created thread 0 in pool
Created thread 1 in pool
Created thread 2 in pool
Created thread 3 in pool
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 1 running..\n");
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 2 running..\n");
printf("%d\n", a);
Adding 20 tasks to threadpool
Will kill threadpool
위와 같이 출력되야 하는데 결과값은
Created thread 0 in pool
Created thread 1 in pool
Created thread 2 in pool
Created thread 3 in pool
Adding 20 tasks to threadpool
Will kill threadpool
이렇게 함수 값이 출력이 되지않습니다.......
코드 첨부해서 올립니다.
main.c
/*
* This is just an example on how to use the thpool library
*
* We create a pool of 4 threads and then add 20 tasks to the pool(10 task1
* functions and 10 task2 functions).
*
* Task1 doesn't take any arguments. Task2 takes an integer. Task2 is used to show
* how to add work to the thread pool with an argument.
*
* As soon as we add the tasks to the pool, the threads will run them. One thread
* may run x tasks in a row so if you see as output the same thread running several
* tasks, it's not an error.
*
* All jobs will not be completed and in fact maybe even none will. You can add a sleep()
* function if you want to complete all tasks in this test file to be able and see clearer
* what is going on.
*
* */
#include <stdio.h>
#include "thpool.h"
/* Some arbitrary task 1 */
void task1(){
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 1 running..\n");
}
/* Some arbitrary task 2 */
void task2(int a){
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 2 running..\n");
printf("%d\n", a);
}
int main(){
int i;
thpool_t* threadpool; /* make a new thread pool structure */
threadpool=thpool_init(4); /* initialise it to 4 number of threads */
puts("Adding 20 tasks to threadpool");
int a=54;
for (i=0; i<10; i++){
thpool_add_work(threadpool, (void*)task1, NULL);
thpool_add_work(threadpool, (void*)task2, (void*)a);
};
puts("Will kill threadpool");
thpool_destroy(threadpool);
return 0;
}thpool.c
/* ********************************
*
* Author: Johan Hanssen Seferidis
* Date: 12/08/2011
* Update: 01/11/2011
* License: LGPL
*
*
*//** @file thpool.h *//*
********************************/
/* Library providing a threading pool where you can add work. For an example on
* usage you refer to the main file found in the same package */
/*
* Fast reminders:
*
* tp = threadpool
* thpool = threadpool
* thpool_t = threadpool type
* tp_p = threadpool pointer
* sem = semaphore
* xN = x can be any string. N stands for amount
*
* */
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>
#include "thpool.h" /* here you can also find the interface to each function */
static int thpool_keepalive=1;
/* Create mutex variable */
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */
/* Initialise thread pool */
thpool_t* thpool_init(int threadsN){
thpool_t* tp_p;
if (!threadsN || threadsN<1) threadsN=1;
/* Make new thread pool */
tp_p=(thpool_t*)malloc(sizeof(thpool_t)); /* MALLOC thread pool */
if (tp_p==NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
return NULL;
}
tp_p->threads=(pthread_t*)malloc(threadsN*sizeof(pthread_t)); /* MALLOC thread IDs */
if (tp_p->threads==NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for thread IDs\n");
return NULL;
}
tp_p->threadsN=threadsN;
/* Initialise the job queue */
if (thpool_jobqueue_init(tp_p)==-1){
fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
return NULL;
}
/* Initialise semaphore*/
tp_p->jobqueue->queueSem=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC job queue semaphore */
sem_init(tp_p->jobqueue->queueSem, 0, 0); /* no shared, initial value */
/* Make threads in pool */
int t;
for (t=0; t<threadsN; t++){
printf("Created thread %d in pool \n", t);
pthread_create(&(tp_p->threads[t]), NULL, (void *)thpool_thread_do, (void *)tp_p); /* MALLOCS INSIDE PTHREAD HERE */
}
return tp_p;
}
/* What each individual thread is doing
* */
/* There are two scenarios here. One is everything works as it should and second if
* the thpool is to be killed. In that manner we try to BYPASS sem_wait and end each thread. */
void thpool_thread_do(thpool_t* tp_p){
while(thpool_keepalive){
if (sem_wait(tp_p->jobqueue->queueSem)) {/* WAITING until there is work in the queue */
perror("thpool_thread_do(): Waiting for semaphore");
exit(1);
}
if (thpool_keepalive){
/* Read job from queue and execute it */
void*(*func_buff)(void* arg);
void* arg_buff;
thpool_job_t* job_p;
pthread_mutex_lock(&mutex); /* LOCK */
job_p = thpool_jobqueue_peek(tp_p);
func_buff=job_p->function;
arg_buff =job_p->arg;
thpool_jobqueue_removelast(tp_p);
pthread_mutex_unlock(&mutex); /* UNLOCK */
func_buff(arg_buff); /* run function */
free(job_p); /* DEALLOC job */
}
else
{
return; /* EXIT thread*/
}
}
return;
}
/* Add work to the thread pool */
int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){
thpool_job_t* newJob;
newJob=(thpool_job_t*)malloc(sizeof(thpool_job_t)); /* MALLOC job */
if (newJob==NULL){
fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
exit(1);
}
/* add function and argument */
newJob->function=function_p;
newJob->arg=arg_p;
/* add job to queue */
pthread_mutex_lock(&mutex); /* LOCK */
thpool_jobqueue_add(tp_p, newJob);
pthread_mutex_unlock(&mutex); /* UNLOCK */
return 0;
}
/* Destroy the threadpool */
void thpool_destroy(thpool_t* tp_p){
int t;
/* End each thread's infinite loop */
thpool_keepalive=0;
/* Awake idle threads waiting at semaphore */
for (t=0; t<(tp_p->threadsN); t++){
if (sem_post(tp_p->jobqueue->queueSem)){
fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n");
}
}
/* Kill semaphore */
if (sem_destroy(tp_p->jobqueue->queueSem)!=0){
fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n");
}
/* Wait for threads to finish */
for (t=0; t<(tp_p->threadsN); t++){
pthread_join(tp_p->threads[t], NULL);
}
thpool_jobqueue_empty(tp_p);
/* Dealloc */
free(tp_p->threads); /* DEALLOC threads */
free(tp_p->jobqueue->queueSem); /* DEALLOC job queue semaphore */
free(tp_p->jobqueue); /* DEALLOC job queue */
free(tp_p); /* DEALLOC thread pool */
}
/* =================== JOB QUEUE OPERATIONS ===================== */
/* Initialise queue */
int thpool_jobqueue_init(thpool_t* tp_p){
tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue)); /* MALLOC job queue */
if (tp_p->jobqueue==NULL) return -1;
tp_p->jobqueue->tail=NULL;
tp_p->jobqueue->head=NULL;
tp_p->jobqueue->jobsN=0;
return 0;
}
/* Add job to queue */
void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p){ /* remember that job prev and next point to NULL */
newjob_p->next=NULL;
newjob_p->prev=NULL;
thpool_job_t *oldFirstJob;
oldFirstJob = tp_p->jobqueue->head;
/* fix jobs' pointers */
switch(tp_p->jobqueue->jobsN){
case 0: /* if there are no jobs in queue */
tp_p->jobqueue->tail=newjob_p;
tp_p->jobqueue->head=newjob_p;
break;
default: /* if there are already jobs in queue */
oldFirstJob->prev=newjob_p;
newjob_p->next=oldFirstJob;
tp_p->jobqueue->head=newjob_p;
}
(tp_p->jobqueue->jobsN)++; /* increment amount of jobs in queue */
sem_post(tp_p->jobqueue->queueSem);
int sval;
sem_getvalue(tp_p->jobqueue->queueSem, &sval);
}
/* Remove job from queue */
int thpool_jobqueue_removelast(thpool_t* tp_p){
thpool_job_t *oldLastJob;
oldLastJob = tp_p->jobqueue->tail;
/* fix jobs' pointers */
switch(tp_p->jobqueue->jobsN){
case 0: /* if there are no jobs in queue */
return -1;
break;
case 1: /* if there is only one job in queue */
tp_p->jobqueue->tail=NULL;
tp_p->jobqueue->head=NULL;
break;
default: /* if there are more than one jobs in queue */
oldLastJob->prev->next=NULL; /* the almost last item */
tp_p->jobqueue->tail=oldLastJob->prev;
}
(tp_p->jobqueue->jobsN)--;
int sval;
sem_getvalue(tp_p->jobqueue->queueSem, &sval);
return 0;
}
/* Get first element from queue */
thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p){
return tp_p->jobqueue->tail;
}
/* Remove and deallocate all jobs in queue */
void thpool_jobqueue_empty(thpool_t* tp_p){
thpool_job_t* curjob;
curjob=tp_p->jobqueue->tail;
while(tp_p->jobqueue->jobsN){
tp_p->jobqueue->tail=curjob->prev;
free(curjob);
curjob=tp_p->jobqueue->tail;
tp_p->jobqueue->jobsN--;
}
/* Fix head and tail */
tp_p->jobqueue->tail=NULL;
tp_p->jobqueue->head=NULL;
}thpool.h
/**********************************
* @author Johan Hanssen Seferidis
* @date 12/08/2011
* Last update: 01/11/2011
* License: LGPL
*
**********************************/
/* Description: Library providing a threading pool where you can add work on the fly. The number
* of threads in the pool is adjustable when creating the pool. In most cases
* this should equal the number of threads supported by your cpu.
*
* For an example on how to use the threadpool, check the main.c file or just read
* the documentation.
*
* In this header file a detailed overview of the functions and the threadpool logical
* scheme is present in case tweaking of the pool is needed.
* */
/*
* Fast reminders:
*
* tp = threadpool
* thpool = threadpool
* thpool_t = threadpool type
* tp_p = threadpool pointer
* sem = semaphore
* xN = x can be any string. N stands for amount
*
* */
/* _______________________________________________________
* / \
* | JOB QUEUE | job1 | job2 | job3 | job4 | .. |
* | |
* | threadpool | thread1 | thread2 | .. |
* \_______________________________________________________/
*
* Description: Jobs are added to the job queue. Once a thread in the pool
* is idle, it is assigned with the first job from the queue(and
* erased from the queue). It's each thread's job to read from
* the queue serially(using lock) and executing each job
* until the queue is empty.
*
*
* Scheme:
*
* thpool______ jobqueue____ ______
* | | | | .----------->|_job0_| Newly added job
* | | | head------------' |_job1_|
* | jobqueue----------------->| | |_job2_|
* | | | tail------------. |__..__|
* |___________| |___________| '----------->|_jobn_| Job for thread to take
*
*
* job0________
* | |
* | function---->
* | |
* | arg------->
* | | job1________
* | next-------------->| |
* |___________| | |..
*/
#ifndef _THPOOL_
#define _THPOOL_
#include <pthread.h>
#include <semaphore.h>
/* ================================= STRUCTURES ================================================ */
/* Individual job */
typedef struct thpool_job_t{
void* (*function)(void* arg); /**< function pointer */
void* arg; /**< function's argument */
struct thpool_job_t* next; /**< pointer to next job */
struct thpool_job_t* prev; /**< pointer to previous job */
}thpool_job_t;
/* Job queue as doubly linked list */
typedef struct thpool_jobqueue{
thpool_job_t *head; /**< pointer to head of queue */
thpool_job_t *tail; /**< pointer to tail of queue */
int jobsN; /**< amount of jobs in queue */
sem_t *queueSem; /**< semaphore(this is probably just holding the same as jobsN) */
}thpool_jobqueue;
/* The threadpool */
typedef struct thpool_t{
pthread_t* threads; /**< pointer to threads' ID */
int threadsN; /**< amount of threads */
thpool_jobqueue* jobqueue; /**< pointer to the job queue */
}thpool_t;
/* Container for all things that each thread is going to need */
typedef struct thread_data{
pthread_mutex_t *mutex_p;
thpool_t *tp_p;
}thread_data;
/* =========================== FUNCTIONS ================================================ */
/* ----------------------- Threadpool specific --------------------------- */
/**
* @brief Initialize threadpool
*
* Allocates memory for the threadpool, jobqueue, semaphore and fixes
* pointers in jobqueue.
*
* @param number of threads to be used
* @return threadpool struct on success,
* NULL on error
*/
thpool_t* thpool_init(int threadsN);
/**
* @brief What each thread is doing
*
* In principle this is an endless loop. The only time this loop gets interuppted is once
* thpool_destroy() is invoked.
*
* @param threadpool to use
* @return nothing
*/
void thpool_thread_do(thpool_t* tp_p);
/**
* @brief Add work to the job queue
*
* Takes an action and its argument and adds it to the threadpool's job queue.
* If you want to add to work a function with more than one arguments then
* a way to implement this is by passing a pointer to a structure.
*
* ATTENTION: You have to cast both the function and argument to not get warnings.
*
* @param threadpool to where the work will be added to
* @param function to add as work
* @param argument to the above function
* @return int
*/
int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p);
/**
* @brief Destroy the threadpool
*
* This will 'kill' the threadpool and free up memory. If threads are active when this
* is called, they will finish what they are doing and then they will get destroyied.
*
* @param threadpool a pointer to the threadpool structure you want to destroy
*/
void thpool_destroy(thpool_t* tp_p);
/* ------------------------- Queue specific ------------------------------ */
/**
* @brief Initialize queue
* @param pointer to threadpool
* @return 0 on success,
* -1 on memory allocation error
*/
int thpool_jobqueue_init(thpool_t* tp_p);
/**
* @brief Add job to queue
*
* A new job will be added to the queue. The new job MUST be allocated
* before passed to this function or else other functions like thpool_jobqueue_empty()
* will be broken.
*
* @param pointer to threadpool
* @param pointer to the new job(MUST BE ALLOCATED)
* @return nothing
*/
void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p);
/**
* @brief Remove last job from queue.
*
* This does not free allocated memory so be sure to have peeked() \n
* before invoking this as else there will result lost memory pointers.
*
* @param pointer to threadpool
* @return 0 on success,
* -1 if queue is empty
*/
int thpool_jobqueue_removelast(thpool_t* tp_p);
/**
* @brief Get last job in queue (tail)
*
* Gets the last job that is inside the queue. This will work even if the queue
* is empty.
*
* @param pointer to threadpool structure
* @return job a pointer to the last job in queue,
* a pointer to NULL if the queue is empty
*/
thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p);
/**
* @brief Remove and deallocate all jobs in queue
*
* This function will deallocate all jobs in the queue and set the
* jobqueue to its initialization values, thus tail and head pointing
* to NULL and amount of jobs equal to 0.
*
* @param pointer to threadpool structure
* */
void thpool_jobqueue_empty(thpool_t* tp_p);
#endifForums:


다른 서버에서 돌리니 잘돌아가네요....
허참 이유를 모르겠네.....
물론 다른서버는 컴파일 에러가 나서
void task2(void* a){
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 2 running..\n");
printf("%d\n",*((int*)a));
}
thpool_add_work(threadpool, (void*)task2, (void *)&a);
이렇게 수정을 했습니다.
제 pc에서는 똑같이 해도 안돌아가네요....
댓글 달기