thread pool사용시

yeilho의 이미지

thread pool프로그램을 하는데,

먼저 쓰레드를 만들어 놓고 request를 받으면 서버에서 놀고있는 쓰레드를

체크하여 일을 주고자 합니다.

제딴에는 그러려면 mutex, cond variable을 사용해야 될것 같아서,

변수 선언하고,

pthread_mutex_t mutex[NUMBER_OF_THREADS];
pthread_cond_t cond[NUMBER_OF_THREADS];

아래와 같이 초기화하고...
for(i=0; i<NUM_OF_THREADS; i++) {
pthread_cond_init(&cond[i], NULL);
pthread_mutex_init(&mutex[i], NULL);
}

main()에서
아래와 같이 노는 쓰레드를 체크하여 signal을 보내준후,

for(i = 0; i < NUMBER_OF_THREADS; i++) {
printf("DEBUG[3] : checking thread...\n");

if(thread_pool[i].is_enabled == 0) {
pthread_mutex_lock(&mutex[i]);
thread_pool[i].client_fd = client_fd;
thread_pool[i].is_enabled = 1;
printf("DEBUG[4] : thread[%d] is now enabled...\n", i);

pthread_cond_signal(&cond[i]);
pthread_mutex_unlock(&mutex[i]);

break;
}

쓰레드에서는,
아래처럼 기다리고 있다가 시그널 받으면 처리하고자 하는데,

if(thread_pool

    .is_enabled == 0) {

    pthread_mutex_unlock(&mutex

      );
      printf("DEBUG[1] : thread number [%d] is waiting for the signal here\n", index);
      pthread_mutex_lock(&mutex
        );
        pthread_cond_wait(&cond
          , &mutex
            );
            printf("DEBUG[5] : thread number [%d] start to work!!!\n");
            }

            결과는 아래처럼 cond_wait에서 더이상 내려가지 않는듯 합니다.

            DEBUG[1] : thread number [0] is waiting for the signal here
            DEBUG[2] : accept...
            DEBUG[3] : checking thread...
            DEBUG[4] : thread[0] is now enabled...

            아마도 cond 변수를 잘못쓰지 않았나도 싶고.....

            고수님들의 의견을 부탁드립니다.

            감사합니다.

            stoneshim의 이미지

            리눅스에서 테스트 하신건가요?
            리눅스라면... 현재 상태에서 pstree 한 모습을 좀 붙여주시구요...
            리눅스에서는 thread가 사실 process니까... strace를 사용하셔서 현재 thread가 무슨짓을 하고 있는지 보시는것도 좋을것 같네요.

            pthread_mutex_t mutex[NUMBER_OF_THREADS];
            pthread_cond_t cond[NUMBER_OF_THREADS];

            thread_pool[NUMBER_OF_THREADS];
            변수는 물론 전역변수로 하셨겠지요?

            index 값은 pthread_create에서 argument로 넘기신 것이겠죠?
            thread_pool

              .is_enabled 도 mutex를 거시는게 좋을것 같구요.

              그리고... printf가 버퍼링 때문에 믿기 어려운 부분이 많습니다... 가장 확실한 건 log 찍는 함수를 만드시고, 그 함수에서 파일에 쓰고 파일을 닫아버리는게 제일 확실합니다.
              그냥 printf를 쓰실거면... 처음에 setbuf(stdout, (char *)0); 하시고 사용하시는것이 좋아 보입니다.

              그리고 소스를 최대한 간단하게 하셔서 전체 소스를 붙여주시면 더 좋을것 같네요..

              우리 모두 리얼리스트가 되자. 그러나 가슴에 이룰 수 없는 꿈을 가지자

              stoneshim의 이미지

              음... 생각해보니 printf()를 써서 stdout 스트림을 사용하는것도 mutex를 걸어야 할것 같네요.

              지금 느낌으로는 잘 돌아갔는데 보이기만 그렇게 보였을것 같습니다.

              우리 모두 리얼리스트가 되자. 그러나 가슴에 이룰 수 없는 꿈을 가지자

              yeilho의 이미지

              전체 소스 입니다.

              그리고 참고로 소스는 유닉스에서 컴파일해서 돌렸습니다.

              #include <errno.h>
              #include <fcntl.h>
              #include <netinet/in.h>
              #include <pthread.h>
              #include <stdio.h>
              #include <stdlib.h>
              #include <string.h>
              #include <sys/socket.h>
              #include "socket_api.h"

              #define LISTEN_BACK_LOG 10
              #define LISTEN_PORT 9999
              #define NUMBER_OF_THREADS 3
              #define MAX 512
              #define FILE_NAME_LEN 100

              typedef struct thr_info {
              pthread_t tid;
              int client_fd;
              int is_enabled;
              } Threads;

              Threads thread_pool[NUMBER_OF_THREADS];

              pthread_mutex_t mutex[NUMBER_OF_THREADS];
              pthread_cond_t cond[NUMBER_OF_THREADS];

              void *processClientRequest(void *data)
              {
              int client_fd;
              int file_fd;
              pthread_t *tid = (pthread_t *)data;
              int index;
              int len, i;
              int rt;
              char filename[FILE_NAME_LEN+1];
              char buffer[MAX+1];

              printf("DEBUG[0] : processClientRequest(), tid[%d]\n", *tid);
              for(index = 0; index < NUMBER_OF_THREADS; index++) {
              if(thread_pool

                .tid == *tid) {
                printf("index [%d]...\n", index);
                break;
                }
                }

                for(;;) {

                if(thread_pool

                  .is_enabled == 0) {

                  /* pthread_mutex_unlock(&mutex

                    ); */
                    pthread_mutex_lock(&mutex
                      );
                      printf("DEBUG[1] : thread lock in threadfunction()...\n");
                      printf("DEBUG[2] : thread number [%d] is waiting for the signal here\n", index);
                      pthread_cond_wait(&cond
                        , &mutex
                          );
                          printf("DEBUG[8] : thread number [%d] start to work, enable[%d]!!!\n",
                          thread_pool
                            .is_enabled);
                            }

                            client_fd = thread_pool

                              .client_fd;
                              printf("DEBUG[9] : thread_pool[%d] is enabled, client_fd[%d]\n", index, client_fd);
                              memset(filename, 0x00, FILE_NAME_LEN+1);
                              memset(buffer, 0x00, MAX+1);

                              i = 0;
                              do {
                              rt = read(client_fd, &filename[i], 1);
                              if(rt < 0) {

                              if( (errno == EINTR) || (errno == EAGAIN) ) continue;
                              printf("read error\n");
                              return NULL;
                              }
                              if(rt == 0) {
                              printf("rt is now [%d]\n", rt);
                              break;
                              }

                              if(filename[i] == ' ') break;
                              i++;
                              } while(1);

                              filename[i] = '\0';

                              printf("DEBUG : filename [%s]\n", filename);

                              file_fd = open(filename, O_RDONLY);

                              if(file_fd == -1) {
                              printf("File Not Found...\n");
                              sprintf(buffer, "File Not Found...");
                              write(client_fd, buffer, strlen(buffer));
                              }
                              else {
                              printf("File Found...\n");
                              sprintf(buffer, "File Found...\n");
                              write(client_fd, buffer, strlen(buffer));

                              while((len = read(file_fd, buffer, MAX)) > 0) {
                              write(client_fd, buffer, len);
                              }
                              }

                              close(client_fd);
                              close(file_fd);
                              thread_pool

                                .is_enabled = 0;
                                printf("DEBUG[10] : thread_pool
                                  .is_enabled = [%d]\n", thread_pool
                                    .is_enabled);

                                    pthread_mutex_unlock(&mutex

                                      );
                                      }

                                      }

                                      int initThreads()
                                      {
                                      int i, ret = 0;
                                      pthread_t thread_id;

                                      for(i = 0; i < NUMBER_OF_THREADS; i++) {
                                      pthread_mutex_init(&mutex[i], NULL);
                                      pthread_cond_init(&cond[i], NULL);
                                      }

                                      for(i = 0; i < NUMBER_OF_THREADS; i++) {

                                      thread_pool[i].client_fd = -1;
                                      thread_pool[i].is_enabled = 0;

                                      printf("initThreads_DEBUG : thread_pool[%d].tid = [%d], enabled [%d]\n",
                                      i, thread_pool[i].tid, thread_pool[i].is_enabled);

                                      ret = pthread_create(&thread_pool[i].tid, NULL,
                                      processClientRequest,
                                      &thread_pool[i].tid);
                                      if(ret != 0) {
                                      perror("pthread");
                                      printf("ret [%d]\n", ret);
                                      return 1;
                                      }
                                      printf("ret [%d]\n", ret);
                                      }

                                      return 0;
                                      }

                                      int main(int argc, char *argv[])
                                      {
                                      int server_fd;
                                      int client_fd;
                                      int len, i;
                                      struct sockaddr_in addr;

                                      setbuf(stdout, (char *)0);
                                      server_fd = socket_server_init(LISTEN_PORT, LISTEN_BACK_LOG);
                                      if(server_fd == -1) {
                                      return 1;
                                      }
                                      printf("DEBUG : server_socket_fd = [%d]\n", server_fd);

                                      if(initThreads() != 0) {
                                      fprintf(stderr, "Cannot initialize threads...\n");
                                      return 1;
                                      }

                                      for(;;) {

                                      client_fd = accept(server_fd, (struct sockaddr *)&addr, &len);
                                      if(client_fd == -1) {
                                      perror("accept");
                                      return 1;
                                      }
                                      printf("DEBUG[3] : accept...\n");

                                      for(i = 0; i < NUMBER_OF_THREADS; i++) {
                                      printf("DEBUG[4] : checking thread...\n");

                                      if(thread_pool[i].is_enabled == 0) {
                                      pthread_mutex_unlock(&mutex[i]);
                                      pthread_mutex_lock(&mutex[i]);

                                      thread_pool[i].client_fd = client_fd;
                                      thread_pool[i].is_enabled = 1;
                                      printf("DEBUG[5] : thread[%d] is now enabled[%d]...\n", i, thread_pool[i].is_enabled);

                                      pthread_cond_signal(&cond[i]);
                                      printf("DEBUG[6] : sigal has been sent...\n");

                                      pthread_mutex_unlock(&mutex[i]);
                                      printf("DEBUG[7] : mutex unlock in main()...\n");

                                      break;
                                      }
                                      }

                                      /* When we reach the maximum number of socket, we close client sd */
                                      if(i == NUMBER_OF_THREADS) close(client_fd);
                                      }

                                      }

                                      그리고 아래는 제가 찍은 로그입니다.

                                      DEBUG[1] : thread lock in threadfunction()...
                                      DEBUG[2] : thread number [0] is waiting for the signal here
                                      DEBUG[3] : accept...
                                      DEBUG[4] : checking thread...
                                      DEBUG[5] : thread[0] is now enabled[1]...
                                      DEBUG[6] : sigal has been sent...
                                      DEBUG[7] : mutex unlock in main()...
                                      DEBUG[8] : thread number [1] start to work, enable[-278243024]!!!
                                      DEBUG[9] : thread_pool[0] is enabled, client_fd[5]
                                      DEBUG : filename [Makefile]
                                      File Found...
                                      DEBUG[10] : thread_pool

                                        .is_enabled = [0]

                                        답변 감사합니다.

                                        Linux rules!!!

                                        김성진의 이미지

                                        쓰레드 풀 구현하는데 이런 방법을 생각해 보세요.
                                        님께서 올리신 코드는 기능에 비해 너무 복잡하게 되어 있습니다.
                                        일단 Free 쓰레드의 리스트를 구성하세요.
                                        그리고, 그 리스트 속에서 대기하기 위해 조건변수(condition varibales)
                                        를 선언하시구요.

                                        mutex 1개와 cond val 1개로 위에서 말씀하는 것에 대한 구현이 가능합니다.

                                        1. 대기 상태로 들어갈 때는 리스트에 대한 mutex를 잡습니다.
                                        그리고, 그 리스트에 자기자신을 연결하고, cond_wait으로 대기하세요.

                                        2. 어떤 Free 쓰레드를 필요로 하시면 역시 리스트에 대해 mutex를 잡으시고,
                                        조건변수에 대해 cond-signal을 날리고, unlock()하십시요.

                                        3. 그러면, 대기상태에 있던 쓰레드 중 한개만이 list에 대한 mutex를
                                        잡은 상태에서 wakeup 할 겁니다.
                                        그때 리스트에서 제거하고, unlock()하시면 됩니다.

                                        답변이 되었는지 모르겠군요.

                                        제가 이러한 부분에 대해 고민을 많이 하고, 구현한 경험이 있어서

                                        이렇게 답변을 드립니다.

                                        좋은 하루 되세요.

                                        고도의 추상화, 극도의 구체화, 에디슨을 그리워하다.

                                        stoneshim의 이미지

                                        네.. 로그를 보니 해결이 되신 모양이네요...

                                        윗분 말씀도 일리가 있습니다.
                                        지금 상태에서는 condition variable이나 mutex를 thread개수만큼 가질 필요는 없어보이네요.

                                        하지만 boss thread에서 관리를 꼭 하고 싶다면.. scoreboard 를 달고 지금처럼 쓰시는것도 뭐 그리 나쁘지는 않습니다.

                                        그런데 main thread(boss thread)에서 accept() 후에

                                        if(thread_pool[i].is_enabled == 0) { 
                                        pthread_mutex_unlock(&mutex[i]); 
                                        pthread_mutex_lock(&mutex[i]); 

                                        여기서 pthread_mutex_unlock(); 을 호출하실 필요가 없어 보이네요...
                                        지금 현재 소스에서는 크게 문제되지 않겠지만... 복잡해지면 나중에 문제가 될지도 모르니... 다시한번 살펴보세요.

                                        그리고 유닉스가 솔라리스라면 thread creation 시에 pthread_attr_t *attr 값을 NULL로 주시면 scope이 default로 PTHREAD_SCOPE_PROCESS 로 설정됩니다. 이는 스케쥴링의 scope을 해당 process 내부로 잡겠다는 뜻으로 user-level thread와 비슷하게 동작합니다.( lwp를 하나만 생성하여 동일 process 내부의 thread가 이를 공유해서 사용한다는 개념입니다.)
                                        solaris 8까지 이 값이 default 였고, 9에서 PTHREAD_SCOPE_SYSTEM이 default값으로 된다는 말이 있었는데, sun site에 가서 보니까 바뀌지 않았더군요... 문서만 그런건지 실제로 그런건지는 테스트 해보지 않아서 모르겠습니다.

                                        결국 attribute 값을 NULL로 사용하면 SMP 환경을 최대한 이용하지 못하는 결과가 될 수 있습니다.

                                        그리고 printf는 아무래도 좀 불안해 보이네요...
                                        지금 scope이 PROCESS이기 때문에 printf가 잘 나왔을 수 있는데요... CPU 두개 이상짜리 기계에서 scope을 SYSTEM으로 해서 테스트 해보시면 문제가 될것 같습니다.
                                        아예 write() 를 호출하시는게 나아보이네요. write()는 atomic이 보장되니까요...

                                        어쨌든 해결 되셨다니 다행입니다.

                                        우리 모두 리얼리스트가 되자. 그러나 가슴에 이룰 수 없는 꿈을 가지자

                                        yeilho의 이미지

                                        답변 감사드립니다.

                                        먼저 리스트로 관리하는것은 지금 해보는 중인데...

                                        잘 나올지 모르겠습니다.

                                        그리고 상세한 설명 다시 한번 감사드립니다.

                                        Linux rules!!!

                                        june8th의 이미지

                                        thread pool 사용하는 전형적인 모양은,
                                        boss thread와 [ shared q + worker thread n개 ]의 pool로 이루어집니다.

                                        q는 mutex와 cond를 이용해서 다음을 구현합니다.
                                        - q가 thread가 get하려고 할때 아무것도 없으면 cond를 기다린다.
                                        - q에 put하게 되면, cond에 대한 signal을 뿌린다.
                                        - cond를 기다리던 thread중 하나만 mutex를 잡는다.

                                        thread pool의 worker thread들은 시작하자마자, 위의 q에서 get하려고 하고,
                                        request가 들어올때 마다, q에 put하는 boss thread를 구현합니다.

                                        이상의 내용은 oreilly pthread programming책에 나온 내용입니다. (누에 그림책)
                                        책을 보시면 더 도움이 되리라 생각합니다.

                                        댓글 달기

                                        Filtered HTML

                                        • 텍스트에 BBCode 태그를 사용할 수 있습니다. URL은 자동으로 링크 됩니다.
                                        • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>
                                        • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
                                        • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.

                                        BBCode

                                        • 텍스트에 BBCode 태그를 사용할 수 있습니다. URL은 자동으로 링크 됩니다.
                                        • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
                                        • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param>
                                        • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.

                                        Textile

                                        • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
                                        • You can use Textile markup to format text.
                                        • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>

                                        Markdown

                                        • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
                                        • Quick Tips:
                                          • Two or more spaces at a line's end = Line break
                                          • Double returns = Paragraph
                                          • *Single asterisks* or _single underscores_ = Emphasis
                                          • **Double** or __double__ = Strong
                                          • This is [a link](http://the.link.example.com "The optional title text")
                                          For complete details on the Markdown syntax, see the Markdown documentation and Markdown Extra documentation for tables, footnotes, and more.
                                        • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.
                                        • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>

                                        Plain text

                                        • HTML 태그를 사용할 수 없습니다.
                                        • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.
                                        • 줄과 단락은 자동으로 분리됩니다.
                                        댓글 첨부 파일
                                        이 댓글에 이미지나 파일을 업로드 합니다.
                                        파일 크기는 8 MB보다 작아야 합니다.
                                        허용할 파일 형식: txt pdf doc xls gif jpg jpeg mp3 png rar zip.
                                        CAPTCHA
                                        이것은 자동으로 스팸을 올리는 것을 막기 위해서 제공됩니다.