thread pool이 작동을 안하네요....
글쓴이: jang5497 / 작성시간: 화, 2014/11/25 - 9:11오후
https://github.com/Pithikos/C-Thread-Pool
여기서 퍼온 소스입니다.
완성된 소스라서 그런지 컴파일은 잘되나 결과같이 제대로 나오지않네요....
기대결과 값은 함수를 호출하여
Created thread 0 in pool
Created thread 1 in pool
Created thread 2 in pool
Created thread 3 in pool
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 1 running..\n");
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 2 running..\n");
printf("%d\n", a);
Adding 20 tasks to threadpool
Will kill threadpool
위와 같이 출력되야 하는데 결과값은
Created thread 0 in pool
Created thread 1 in pool
Created thread 2 in pool
Created thread 3 in pool
Adding 20 tasks to threadpool
Will kill threadpool
이렇게 함수 값이 출력이 되지않습니다.......
코드 첨부해서 올립니다.
main.c
/* * This is just an example on how to use the thpool library * * We create a pool of 4 threads and then add 20 tasks to the pool(10 task1 * functions and 10 task2 functions). * * Task1 doesn't take any arguments. Task2 takes an integer. Task2 is used to show * how to add work to the thread pool with an argument. * * As soon as we add the tasks to the pool, the threads will run them. One thread * may run x tasks in a row so if you see as output the same thread running several * tasks, it's not an error. * * All jobs will not be completed and in fact maybe even none will. You can add a sleep() * function if you want to complete all tasks in this test file to be able and see clearer * what is going on. * * */ #include <stdio.h> #include "thpool.h" /* Some arbitrary task 1 */ void task1(){ printf("# Thread working: %u\n", (int)pthread_self()); printf(" Task 1 running..\n"); } /* Some arbitrary task 2 */ void task2(int a){ printf("# Thread working: %u\n", (int)pthread_self()); printf(" Task 2 running..\n"); printf("%d\n", a); } int main(){ int i; thpool_t* threadpool; /* make a new thread pool structure */ threadpool=thpool_init(4); /* initialise it to 4 number of threads */ puts("Adding 20 tasks to threadpool"); int a=54; for (i=0; i<10; i++){ thpool_add_work(threadpool, (void*)task1, NULL); thpool_add_work(threadpool, (void*)task2, (void*)a); }; puts("Will kill threadpool"); thpool_destroy(threadpool); return 0; }
thpool.c
/* ******************************** * * Author: Johan Hanssen Seferidis * Date: 12/08/2011 * Update: 01/11/2011 * License: LGPL * * *//** @file thpool.h *//* ********************************/ /* Library providing a threading pool where you can add work. For an example on * usage you refer to the main file found in the same package */ /* * Fast reminders: * * tp = threadpool * thpool = threadpool * thpool_t = threadpool type * tp_p = threadpool pointer * sem = semaphore * xN = x can be any string. N stands for amount * * */ #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <semaphore.h> #include <errno.h> #include "thpool.h" /* here you can also find the interface to each function */ static int thpool_keepalive=1; /* Create mutex variable */ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */ /* Initialise thread pool */ thpool_t* thpool_init(int threadsN){ thpool_t* tp_p; if (!threadsN || threadsN<1) threadsN=1; /* Make new thread pool */ tp_p=(thpool_t*)malloc(sizeof(thpool_t)); /* MALLOC thread pool */ if (tp_p==NULL){ fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n"); return NULL; } tp_p->threads=(pthread_t*)malloc(threadsN*sizeof(pthread_t)); /* MALLOC thread IDs */ if (tp_p->threads==NULL){ fprintf(stderr, "thpool_init(): Could not allocate memory for thread IDs\n"); return NULL; } tp_p->threadsN=threadsN; /* Initialise the job queue */ if (thpool_jobqueue_init(tp_p)==-1){ fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n"); return NULL; } /* Initialise semaphore*/ tp_p->jobqueue->queueSem=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC job queue semaphore */ sem_init(tp_p->jobqueue->queueSem, 0, 0); /* no shared, initial value */ /* Make threads in pool */ int t; for (t=0; t<threadsN; t++){ printf("Created thread %d in pool \n", t); pthread_create(&(tp_p->threads[t]), NULL, (void *)thpool_thread_do, (void *)tp_p); /* MALLOCS INSIDE PTHREAD HERE */ } return tp_p; } /* What each individual thread is doing * */ /* There are two scenarios here. One is everything works as it should and second if * the thpool is to be killed. In that manner we try to BYPASS sem_wait and end each thread. */ void thpool_thread_do(thpool_t* tp_p){ while(thpool_keepalive){ if (sem_wait(tp_p->jobqueue->queueSem)) {/* WAITING until there is work in the queue */ perror("thpool_thread_do(): Waiting for semaphore"); exit(1); } if (thpool_keepalive){ /* Read job from queue and execute it */ void*(*func_buff)(void* arg); void* arg_buff; thpool_job_t* job_p; pthread_mutex_lock(&mutex); /* LOCK */ job_p = thpool_jobqueue_peek(tp_p); func_buff=job_p->function; arg_buff =job_p->arg; thpool_jobqueue_removelast(tp_p); pthread_mutex_unlock(&mutex); /* UNLOCK */ func_buff(arg_buff); /* run function */ free(job_p); /* DEALLOC job */ } else { return; /* EXIT thread*/ } } return; } /* Add work to the thread pool */ int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){ thpool_job_t* newJob; newJob=(thpool_job_t*)malloc(sizeof(thpool_job_t)); /* MALLOC job */ if (newJob==NULL){ fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n"); exit(1); } /* add function and argument */ newJob->function=function_p; newJob->arg=arg_p; /* add job to queue */ pthread_mutex_lock(&mutex); /* LOCK */ thpool_jobqueue_add(tp_p, newJob); pthread_mutex_unlock(&mutex); /* UNLOCK */ return 0; } /* Destroy the threadpool */ void thpool_destroy(thpool_t* tp_p){ int t; /* End each thread's infinite loop */ thpool_keepalive=0; /* Awake idle threads waiting at semaphore */ for (t=0; t<(tp_p->threadsN); t++){ if (sem_post(tp_p->jobqueue->queueSem)){ fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n"); } } /* Kill semaphore */ if (sem_destroy(tp_p->jobqueue->queueSem)!=0){ fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n"); } /* Wait for threads to finish */ for (t=0; t<(tp_p->threadsN); t++){ pthread_join(tp_p->threads[t], NULL); } thpool_jobqueue_empty(tp_p); /* Dealloc */ free(tp_p->threads); /* DEALLOC threads */ free(tp_p->jobqueue->queueSem); /* DEALLOC job queue semaphore */ free(tp_p->jobqueue); /* DEALLOC job queue */ free(tp_p); /* DEALLOC thread pool */ } /* =================== JOB QUEUE OPERATIONS ===================== */ /* Initialise queue */ int thpool_jobqueue_init(thpool_t* tp_p){ tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue)); /* MALLOC job queue */ if (tp_p->jobqueue==NULL) return -1; tp_p->jobqueue->tail=NULL; tp_p->jobqueue->head=NULL; tp_p->jobqueue->jobsN=0; return 0; } /* Add job to queue */ void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p){ /* remember that job prev and next point to NULL */ newjob_p->next=NULL; newjob_p->prev=NULL; thpool_job_t *oldFirstJob; oldFirstJob = tp_p->jobqueue->head; /* fix jobs' pointers */ switch(tp_p->jobqueue->jobsN){ case 0: /* if there are no jobs in queue */ tp_p->jobqueue->tail=newjob_p; tp_p->jobqueue->head=newjob_p; break; default: /* if there are already jobs in queue */ oldFirstJob->prev=newjob_p; newjob_p->next=oldFirstJob; tp_p->jobqueue->head=newjob_p; } (tp_p->jobqueue->jobsN)++; /* increment amount of jobs in queue */ sem_post(tp_p->jobqueue->queueSem); int sval; sem_getvalue(tp_p->jobqueue->queueSem, &sval); } /* Remove job from queue */ int thpool_jobqueue_removelast(thpool_t* tp_p){ thpool_job_t *oldLastJob; oldLastJob = tp_p->jobqueue->tail; /* fix jobs' pointers */ switch(tp_p->jobqueue->jobsN){ case 0: /* if there are no jobs in queue */ return -1; break; case 1: /* if there is only one job in queue */ tp_p->jobqueue->tail=NULL; tp_p->jobqueue->head=NULL; break; default: /* if there are more than one jobs in queue */ oldLastJob->prev->next=NULL; /* the almost last item */ tp_p->jobqueue->tail=oldLastJob->prev; } (tp_p->jobqueue->jobsN)--; int sval; sem_getvalue(tp_p->jobqueue->queueSem, &sval); return 0; } /* Get first element from queue */ thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p){ return tp_p->jobqueue->tail; } /* Remove and deallocate all jobs in queue */ void thpool_jobqueue_empty(thpool_t* tp_p){ thpool_job_t* curjob; curjob=tp_p->jobqueue->tail; while(tp_p->jobqueue->jobsN){ tp_p->jobqueue->tail=curjob->prev; free(curjob); curjob=tp_p->jobqueue->tail; tp_p->jobqueue->jobsN--; } /* Fix head and tail */ tp_p->jobqueue->tail=NULL; tp_p->jobqueue->head=NULL; }
thpool.h
/********************************** * @author Johan Hanssen Seferidis * @date 12/08/2011 * Last update: 01/11/2011 * License: LGPL * **********************************/ /* Description: Library providing a threading pool where you can add work on the fly. The number * of threads in the pool is adjustable when creating the pool. In most cases * this should equal the number of threads supported by your cpu. * * For an example on how to use the threadpool, check the main.c file or just read * the documentation. * * In this header file a detailed overview of the functions and the threadpool logical * scheme is present in case tweaking of the pool is needed. * */ /* * Fast reminders: * * tp = threadpool * thpool = threadpool * thpool_t = threadpool type * tp_p = threadpool pointer * sem = semaphore * xN = x can be any string. N stands for amount * * */ /* _______________________________________________________ * / \ * | JOB QUEUE | job1 | job2 | job3 | job4 | .. | * | | * | threadpool | thread1 | thread2 | .. | * \_______________________________________________________/ * * Description: Jobs are added to the job queue. Once a thread in the pool * is idle, it is assigned with the first job from the queue(and * erased from the queue). It's each thread's job to read from * the queue serially(using lock) and executing each job * until the queue is empty. * * * Scheme: * * thpool______ jobqueue____ ______ * | | | | .----------->|_job0_| Newly added job * | | | head------------' |_job1_| * | jobqueue----------------->| | |_job2_| * | | | tail------------. |__..__| * |___________| |___________| '----------->|_jobn_| Job for thread to take * * * job0________ * | | * | function----> * | | * | arg-------> * | | job1________ * | next-------------->| | * |___________| | |.. */ #ifndef _THPOOL_ #define _THPOOL_ #include <pthread.h> #include <semaphore.h> /* ================================= STRUCTURES ================================================ */ /* Individual job */ typedef struct thpool_job_t{ void* (*function)(void* arg); /**< function pointer */ void* arg; /**< function's argument */ struct thpool_job_t* next; /**< pointer to next job */ struct thpool_job_t* prev; /**< pointer to previous job */ }thpool_job_t; /* Job queue as doubly linked list */ typedef struct thpool_jobqueue{ thpool_job_t *head; /**< pointer to head of queue */ thpool_job_t *tail; /**< pointer to tail of queue */ int jobsN; /**< amount of jobs in queue */ sem_t *queueSem; /**< semaphore(this is probably just holding the same as jobsN) */ }thpool_jobqueue; /* The threadpool */ typedef struct thpool_t{ pthread_t* threads; /**< pointer to threads' ID */ int threadsN; /**< amount of threads */ thpool_jobqueue* jobqueue; /**< pointer to the job queue */ }thpool_t; /* Container for all things that each thread is going to need */ typedef struct thread_data{ pthread_mutex_t *mutex_p; thpool_t *tp_p; }thread_data; /* =========================== FUNCTIONS ================================================ */ /* ----------------------- Threadpool specific --------------------------- */ /** * @brief Initialize threadpool * * Allocates memory for the threadpool, jobqueue, semaphore and fixes * pointers in jobqueue. * * @param number of threads to be used * @return threadpool struct on success, * NULL on error */ thpool_t* thpool_init(int threadsN); /** * @brief What each thread is doing * * In principle this is an endless loop. The only time this loop gets interuppted is once * thpool_destroy() is invoked. * * @param threadpool to use * @return nothing */ void thpool_thread_do(thpool_t* tp_p); /** * @brief Add work to the job queue * * Takes an action and its argument and adds it to the threadpool's job queue. * If you want to add to work a function with more than one arguments then * a way to implement this is by passing a pointer to a structure. * * ATTENTION: You have to cast both the function and argument to not get warnings. * * @param threadpool to where the work will be added to * @param function to add as work * @param argument to the above function * @return int */ int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p); /** * @brief Destroy the threadpool * * This will 'kill' the threadpool and free up memory. If threads are active when this * is called, they will finish what they are doing and then they will get destroyied. * * @param threadpool a pointer to the threadpool structure you want to destroy */ void thpool_destroy(thpool_t* tp_p); /* ------------------------- Queue specific ------------------------------ */ /** * @brief Initialize queue * @param pointer to threadpool * @return 0 on success, * -1 on memory allocation error */ int thpool_jobqueue_init(thpool_t* tp_p); /** * @brief Add job to queue * * A new job will be added to the queue. The new job MUST be allocated * before passed to this function or else other functions like thpool_jobqueue_empty() * will be broken. * * @param pointer to threadpool * @param pointer to the new job(MUST BE ALLOCATED) * @return nothing */ void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p); /** * @brief Remove last job from queue. * * This does not free allocated memory so be sure to have peeked() \n * before invoking this as else there will result lost memory pointers. * * @param pointer to threadpool * @return 0 on success, * -1 if queue is empty */ int thpool_jobqueue_removelast(thpool_t* tp_p); /** * @brief Get last job in queue (tail) * * Gets the last job that is inside the queue. This will work even if the queue * is empty. * * @param pointer to threadpool structure * @return job a pointer to the last job in queue, * a pointer to NULL if the queue is empty */ thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p); /** * @brief Remove and deallocate all jobs in queue * * This function will deallocate all jobs in the queue and set the * jobqueue to its initialization values, thus tail and head pointing * to NULL and amount of jobs equal to 0. * * @param pointer to threadpool structure * */ void thpool_jobqueue_empty(thpool_t* tp_p); #endif
Forums:
다른 서버에서 돌리니 잘돌아가네요....
허참 이유를 모르겠네.....
물론 다른서버는 컴파일 에러가 나서
void task2(void* a){
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 2 running..\n");
printf("%d\n",*((int*)a));
}
thpool_add_work(threadpool, (void*)task2, (void *)&a);
이렇게 수정을 했습니다.
제 pc에서는 똑같이 해도 안돌아가네요....
댓글 달기