thread pool사용시
thread pool프로그램을 하는데,
먼저 쓰레드를 만들어 놓고 request를 받으면 서버에서 놀고있는 쓰레드를
체크하여 일을 주고자 합니다.
제딴에는 그러려면 mutex, cond variable을 사용해야 될것 같아서,
변수 선언하고,
pthread_mutex_t mutex[NUMBER_OF_THREADS];
pthread_cond_t cond[NUMBER_OF_THREADS];
아래와 같이 초기화하고...
for(i=0; i<NUM_OF_THREADS; i++) {
pthread_cond_init(&cond[i], NULL);
pthread_mutex_init(&mutex[i], NULL);
}
main()에서
아래와 같이 노는 쓰레드를 체크하여 signal을 보내준후,
for(i = 0; i < NUMBER_OF_THREADS; i++) {
printf("DEBUG[3] : checking thread...\n");
if(thread_pool[i].is_enabled == 0) {
pthread_mutex_lock(&mutex[i]);
thread_pool[i].client_fd = client_fd;
thread_pool[i].is_enabled = 1;
printf("DEBUG[4] : thread[%d] is now enabled...\n", i);
pthread_cond_signal(&cond[i]);
pthread_mutex_unlock(&mutex[i]);
break;
}
쓰레드에서는,
아래처럼 기다리고 있다가 시그널 받으면 처리하고자 하는데,
if(thread_pool
pthread_mutex_unlock(&mutex
printf("DEBUG[1] : thread number [%d] is waiting for the signal here\n", index);
pthread_mutex_lock(&mutex
pthread_cond_wait(&cond
printf("DEBUG[5] : thread number [%d] start to work!!!\n");
}
결과는 아래처럼 cond_wait에서 더이상 내려가지 않는듯 합니다.
DEBUG[1] : thread number [0] is waiting for the signal here
DEBUG[2] : accept...
DEBUG[3] : checking thread...
DEBUG[4] : thread[0] is now enabled...
아마도 cond 변수를 잘못쓰지 않았나도 싶고.....
고수님들의 의견을 부탁드립니다.
감사합니다.
리눅스에서 테스트 하신건가요?리눅스라면... 현재 상태에서 pstre
리눅스에서 테스트 하신건가요?
리눅스라면... 현재 상태에서 pstree 한 모습을 좀 붙여주시구요...
리눅스에서는 thread가 사실 process니까... strace를 사용하셔서 현재 thread가 무슨짓을 하고 있는지 보시는것도 좋을것 같네요.
pthread_mutex_t mutex[NUMBER_OF_THREADS];
pthread_cond_t cond[NUMBER_OF_THREADS];
와
thread_pool[NUMBER_OF_THREADS];
변수는 물론 전역변수로 하셨겠지요?
index 값은 pthread_create에서 argument로 넘기신 것이겠죠?
thread_pool
.is_enabled 도 mutex를 거시는게 좋을것 같구요.
그리고... printf가 버퍼링 때문에 믿기 어려운 부분이 많습니다... 가장 확실한 건 log 찍는 함수를 만드시고, 그 함수에서 파일에 쓰고 파일을 닫아버리는게 제일 확실합니다.
그냥 printf를 쓰실거면... 처음에 setbuf(stdout, (char *)0); 하시고 사용하시는것이 좋아 보입니다.
그리고 소스를 최대한 간단하게 하셔서 전체 소스를 붙여주시면 더 좋을것 같네요..
우리 모두 리얼리스트가 되자. 그러나 가슴에 이룰 수 없는 꿈을 가지자
음... 생각해보니 printf()를 써서 stdout 스트림을 사용하는
음... 생각해보니 printf()를 써서 stdout 스트림을 사용하는것도 mutex를 걸어야 할것 같네요.
지금 느낌으로는 잘 돌아갔는데 보이기만 그렇게 보였을것 같습니다.
우리 모두 리얼리스트가 되자. 그러나 가슴에 이룰 수 없는 꿈을 가지자
전체 소스 입니다.그리고 참고로 소스는 유닉스에서 컴파일해서 돌렸
전체 소스 입니다.
그리고 참고로 소스는 유닉스에서 컴파일해서 돌렸습니다.
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include "socket_api.h"
#define LISTEN_BACK_LOG 10
#define LISTEN_PORT 9999
#define NUMBER_OF_THREADS 3
#define MAX 512
#define FILE_NAME_LEN 100
typedef struct thr_info {
pthread_t tid;
int client_fd;
int is_enabled;
} Threads;
Threads thread_pool[NUMBER_OF_THREADS];
pthread_mutex_t mutex[NUMBER_OF_THREADS];
pthread_cond_t cond[NUMBER_OF_THREADS];
void *processClientRequest(void *data)
{
int client_fd;
int file_fd;
pthread_t *tid = (pthread_t *)data;
int index;
int len, i;
int rt;
char filename[FILE_NAME_LEN+1];
char buffer[MAX+1];
printf("DEBUG[0] : processClientRequest(), tid[%d]\n", *tid);
for(index = 0; index < NUMBER_OF_THREADS; index++) {
if(thread_pool
.tid == *tid) {
printf("index [%d]...\n", index);
break;
}
}
for(;;) {
if(thread_pool
.is_enabled == 0) {
/* pthread_mutex_unlock(&mutex
); */
pthread_mutex_lock(&mutex
);
printf("DEBUG[1] : thread lock in threadfunction()...\n");
printf("DEBUG[2] : thread number [%d] is waiting for the signal here\n", index);
pthread_cond_wait(&cond
, &mutex
);
printf("DEBUG[8] : thread number [%d] start to work, enable[%d]!!!\n",
thread_pool
.is_enabled);
}
client_fd = thread_pool
.client_fd;
printf("DEBUG[9] : thread_pool[%d] is enabled, client_fd[%d]\n", index, client_fd);
memset(filename, 0x00, FILE_NAME_LEN+1);
memset(buffer, 0x00, MAX+1);
i = 0;
do {
rt = read(client_fd, &filename[i], 1);
if(rt < 0) {
if( (errno == EINTR) || (errno == EAGAIN) ) continue;
printf("read error\n");
return NULL;
}
if(rt == 0) {
printf("rt is now [%d]\n", rt);
break;
}
if(filename[i] == ' ') break;
i++;
} while(1);
filename[i] = '\0';
printf("DEBUG : filename [%s]\n", filename);
file_fd = open(filename, O_RDONLY);
if(file_fd == -1) {
printf("File Not Found...\n");
sprintf(buffer, "File Not Found...");
write(client_fd, buffer, strlen(buffer));
}
else {
printf("File Found...\n");
sprintf(buffer, "File Found...\n");
write(client_fd, buffer, strlen(buffer));
while((len = read(file_fd, buffer, MAX)) > 0) {
write(client_fd, buffer, len);
}
}
close(client_fd);
close(file_fd);
thread_pool
.is_enabled = 0;
printf("DEBUG[10] : thread_pool
.is_enabled = [%d]\n", thread_pool
.is_enabled);
pthread_mutex_unlock(&mutex
);
}
}
int initThreads()
{
int i, ret = 0;
pthread_t thread_id;
for(i = 0; i < NUMBER_OF_THREADS; i++) {
pthread_mutex_init(&mutex[i], NULL);
pthread_cond_init(&cond[i], NULL);
}
for(i = 0; i < NUMBER_OF_THREADS; i++) {
thread_pool[i].client_fd = -1;
thread_pool[i].is_enabled = 0;
printf("initThreads_DEBUG : thread_pool[%d].tid = [%d], enabled [%d]\n",
i, thread_pool[i].tid, thread_pool[i].is_enabled);
ret = pthread_create(&thread_pool[i].tid, NULL,
processClientRequest,
&thread_pool[i].tid);
if(ret != 0) {
perror("pthread");
printf("ret [%d]\n", ret);
return 1;
}
printf("ret [%d]\n", ret);
}
return 0;
}
int main(int argc, char *argv[])
{
int server_fd;
int client_fd;
int len, i;
struct sockaddr_in addr;
setbuf(stdout, (char *)0);
server_fd = socket_server_init(LISTEN_PORT, LISTEN_BACK_LOG);
if(server_fd == -1) {
return 1;
}
printf("DEBUG : server_socket_fd = [%d]\n", server_fd);
if(initThreads() != 0) {
fprintf(stderr, "Cannot initialize threads...\n");
return 1;
}
for(;;) {
client_fd = accept(server_fd, (struct sockaddr *)&addr, &len);
if(client_fd == -1) {
perror("accept");
return 1;
}
printf("DEBUG[3] : accept...\n");
for(i = 0; i < NUMBER_OF_THREADS; i++) {
printf("DEBUG[4] : checking thread...\n");
if(thread_pool[i].is_enabled == 0) {
pthread_mutex_unlock(&mutex[i]);
pthread_mutex_lock(&mutex[i]);
thread_pool[i].client_fd = client_fd;
thread_pool[i].is_enabled = 1;
printf("DEBUG[5] : thread[%d] is now enabled[%d]...\n", i, thread_pool[i].is_enabled);
pthread_cond_signal(&cond[i]);
printf("DEBUG[6] : sigal has been sent...\n");
pthread_mutex_unlock(&mutex[i]);
printf("DEBUG[7] : mutex unlock in main()...\n");
break;
}
}
/* When we reach the maximum number of socket, we close client sd */
if(i == NUMBER_OF_THREADS) close(client_fd);
}
}
그리고 아래는 제가 찍은 로그입니다.
DEBUG[1] : thread lock in threadfunction()...
DEBUG[2] : thread number [0] is waiting for the signal here
DEBUG[3] : accept...
DEBUG[4] : checking thread...
DEBUG[5] : thread[0] is now enabled[1]...
DEBUG[6] : sigal has been sent...
DEBUG[7] : mutex unlock in main()...
DEBUG[8] : thread number [1] start to work, enable[-278243024]!!!
DEBUG[9] : thread_pool[0] is enabled, client_fd[5]
DEBUG : filename [Makefile]
File Found...
DEBUG[10] : thread_pool
.is_enabled = [0]
답변 감사합니다.
Linux rules!!!
쓰레드 풀링 기능 구현
쓰레드 풀 구현하는데 이런 방법을 생각해 보세요.
님께서 올리신 코드는 기능에 비해 너무 복잡하게 되어 있습니다.
일단 Free 쓰레드의 리스트를 구성하세요.
그리고, 그 리스트 속에서 대기하기 위해 조건변수(condition varibales)
를 선언하시구요.
mutex 1개와 cond val 1개로 위에서 말씀하는 것에 대한 구현이 가능합니다.
1. 대기 상태로 들어갈 때는 리스트에 대한 mutex를 잡습니다.
그리고, 그 리스트에 자기자신을 연결하고, cond_wait으로 대기하세요.
2. 어떤 Free 쓰레드를 필요로 하시면 역시 리스트에 대해 mutex를 잡으시고,
조건변수에 대해 cond-signal을 날리고, unlock()하십시요.
3. 그러면, 대기상태에 있던 쓰레드 중 한개만이 list에 대한 mutex를
잡은 상태에서 wakeup 할 겁니다.
그때 리스트에서 제거하고, unlock()하시면 됩니다.
답변이 되었는지 모르겠군요.
제가 이러한 부분에 대해 고민을 많이 하고, 구현한 경험이 있어서
이렇게 답변을 드립니다.
좋은 하루 되세요.
고도의 추상화, 극도의 구체화, 에디슨을 그리워하다.
네.. 로그를 보니 해결이 되신 모양이네요...윗분 말씀도 일리가
네.. 로그를 보니 해결이 되신 모양이네요...
윗분 말씀도 일리가 있습니다.
지금 상태에서는 condition variable이나 mutex를 thread개수만큼 가질 필요는 없어보이네요.
하지만 boss thread에서 관리를 꼭 하고 싶다면.. scoreboard 를 달고 지금처럼 쓰시는것도 뭐 그리 나쁘지는 않습니다.
그런데 main thread(boss thread)에서 accept() 후에
여기서 pthread_mutex_unlock(); 을 호출하실 필요가 없어 보이네요...
지금 현재 소스에서는 크게 문제되지 않겠지만... 복잡해지면 나중에 문제가 될지도 모르니... 다시한번 살펴보세요.
그리고 유닉스가 솔라리스라면 thread creation 시에 pthread_attr_t *attr 값을 NULL로 주시면 scope이 default로 PTHREAD_SCOPE_PROCESS 로 설정됩니다. 이는 스케쥴링의 scope을 해당 process 내부로 잡겠다는 뜻으로 user-level thread와 비슷하게 동작합니다.( lwp를 하나만 생성하여 동일 process 내부의 thread가 이를 공유해서 사용한다는 개념입니다.)
solaris 8까지 이 값이 default 였고, 9에서 PTHREAD_SCOPE_SYSTEM이 default값으로 된다는 말이 있었는데, sun site에 가서 보니까 바뀌지 않았더군요... 문서만 그런건지 실제로 그런건지는 테스트 해보지 않아서 모르겠습니다.
결국 attribute 값을 NULL로 사용하면 SMP 환경을 최대한 이용하지 못하는 결과가 될 수 있습니다.
그리고 printf는 아무래도 좀 불안해 보이네요...
지금 scope이 PROCESS이기 때문에 printf가 잘 나왔을 수 있는데요... CPU 두개 이상짜리 기계에서 scope을 SYSTEM으로 해서 테스트 해보시면 문제가 될것 같습니다.
아예 write() 를 호출하시는게 나아보이네요. write()는 atomic이 보장되니까요...
어쨌든 해결 되셨다니 다행입니다.
우리 모두 리얼리스트가 되자. 그러나 가슴에 이룰 수 없는 꿈을 가지자
답변 감사드립니다.
답변 감사드립니다.
먼저 리스트로 관리하는것은 지금 해보는 중인데...
잘 나올지 모르겠습니다.
그리고 상세한 설명 다시 한번 감사드립니다.
Linux rules!!!
코드를 안 읽어보고 글을 적습니다.
thread pool 사용하는 전형적인 모양은,
boss thread와 [ shared q + worker thread n개 ]의 pool로 이루어집니다.
q는 mutex와 cond를 이용해서 다음을 구현합니다.
- q가 thread가 get하려고 할때 아무것도 없으면 cond를 기다린다.
- q에 put하게 되면, cond에 대한 signal을 뿌린다.
- cond를 기다리던 thread중 하나만 mutex를 잡는다.
thread pool의 worker thread들은 시작하자마자, 위의 q에서 get하려고 하고,
request가 들어올때 마다, q에 put하는 boss thread를 구현합니다.
이상의 내용은 oreilly pthread programming책에 나온 내용입니다. (누에 그림책)
책을 보시면 더 도움이 되리라 생각합니다.
댓글 달기