여러개의 생산자 스레드와 여러개의 소비자 스레드 문제

idealkldp의 이미지

안녕하세요. 수년전에 가입했는데...
좀 많이 아래단 소프트웨어들만 하다보니... 닉과 암호도 까먹을 정도가 되었네요.
계정 새로 팠습니다. 해서;;;

하지만, 간만에 윗단으로 올라왔습니다.

여러개의 생산자 스레드와 여러개의 소비자 스레드 문제를 좀 풀어봤습니다.
pthread를 이용했구요. 이런 코드는 자주 짠 편은 아닌지라.. 좀 어색하고, 이게 맞는 건가 싶기도 하네요.

일단, 생산자 스레드들이 파일로부터 스트링을 일정량 읽어오면, 그걸 큐에 넣고,
큐에 데이타가 있으면, 소비자 스레드들이 가져가는 방식으로 구성했어요.

일단 실행을 하면 돌아가긴 하는것 같은데.
제대로 작성을 한건지 잘 모르겠네요.

메모리 해제같은 것들 빠뜨린 부분이 보이긴 하네요. 뭐 일단 그것은 추후에 업데이트 하고...

제 코드에 대한 극딜과 조언도 좀 부탁드립니다.

Attack my code!!!!!!!

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
 
//#define DEBUG
#ifdef DEBUG
#define dbg(fmt, args...) printf("%s(%d): "fmt, __FUNCTION__, __LINE__, ##args)
#else
#define dbg(fmt, args...)
#endif
 
#define QSIZE (64)
#define PACKET_SIZE	8
 
typedef struct simple_queue {
 
	char* queue;
	int front;
	int rear;
	int number_of_items;
 
} simple_queue_t;
 
typedef struct shared_info {
	FILE*	src_fp;
	FILE* 	dest_fp;
 
	pthread_mutex_t	q_mlock;	
	pthread_cond_t	space_cond;
	pthread_cond_t	data_cond;
 
	simple_queue_t* sq;
 
} shared_info_t;
 
static volatile int exit_flag = 0;
 
int init_queue (simple_queue_t** sq) ;
int check_underflow(simple_queue_t* sq) ;
int check_overflow(simple_queue_t* sq);
int number_items(simple_queue_t* sq) ;
void enqueue(simple_queue_t* sq, char data) ;
char dequeue(simple_queue_t* sq) ;
int enq_elements(shared_info_t* si, char* buffer, int size);
int deq_elements(shared_info_t* si, char* buffer, int size);
 
/**
 * Get data from the stream (file).
 */
int get_data (shared_info_t* si, char* buffer, int bufferSizeInBytes)
{
	int count = 0;
	memset(buffer, 0, bufferSizeInBytes);
	if (!feof(si->src_fp)){
		count = fread(buffer, sizeof(char), bufferSizeInBytes, si->src_fp);
	}
	dbg("%s len(%d)\n", buffer, count);
	return count;
}
 
 
int proc_data (shared_info_t* si, char * buffer, int bufferSizeInBytes)
{
	int count = 0;
 
	count = fwrite(buffer, sizeof(char), bufferSizeInBytes, si->dest_fp);
	dbg("write data: size=%d\n", count);	
	return count;
}
 
 
void *consumer_thread(void *arg) {
	//TODO: Define set-up required
 
	shared_info_t* si = (shared_info_t*)arg;	
	int bufferSizeInBytes = PACKET_SIZE;
	char* buffer;
	pthread_t tid;
	int avail_data = 0;
 
	tid = pthread_self();
 
	printf("[Reader > Thread id:%lu] Thread Start\n", tid);
 
	buffer = (char*) malloc(bufferSizeInBytes);
	if (buffer == NULL){
		printf("Buffer allocaton failure\n");
		goto exit;
	}
 
	while(1) {
		//TODO: Define data extraction (queue) and processing 
		dbg(" ~ \n");
 
		// get some data from Queue
		// RQ
		dbg("Into Q\n");
		pthread_mutex_lock(&si->q_mlock);
 
		//if (data_in_queue() < bufferSizeInBytes)
		while (check_underflow(si->sq)){
			// check writer's flag to see time to finish.
			if (exit_flag) 
				break;
			// Wait for signal from writers.
			pthread_cond_wait(&si->data_cond, &si->q_mlock);
		}
 
		if (number_items(si->sq)){
			// check data exist.
			// Even when exit_flag is on, we need to handle remained data 
			// in queue.
			dbg("[R] tid: %lu Q\n", tid);
			memset(buffer, 0, bufferSizeInBytes);
			avail_data = deq_elements(si, buffer, bufferSizeInBytes);
			pthread_cond_signal(&si->space_cond);
 
			// Update output file with obtained data
			proc_data(si, buffer, avail_data);
 
		} else { 
			// when exit_flag is on and queue is empty
			pthread_cond_signal(&si->space_cond);
			pthread_mutex_unlock(&si->q_mlock);
 
			// Quit thread
			break;
		}
		pthread_mutex_unlock(&si->q_mlock);
	}
 
	if (buffer != NULL){
		free(buffer);
	}
 
exit:
	printf("Reader Thread End > thread id %lu\n", tid);
	pthread_exit(NULL);
}
 
 
/**
 * This thread is responsible for pulling data from a device using
 * the get_data() API and placing it into a shared area
 * for later processing by one of the reader threads.
 */
void *producer_thread(void *arg) {
	//TODO: Define set-up required
 
	shared_info_t* si = (shared_info_t*)arg;	
	char* buffer = NULL;
	int count;
	int bufferSizeInBytes = PACKET_SIZE;
	pthread_t tid;
	int avail_data = 0;
 
	tid = pthread_self();
 
	printf("[Writer > Thread id:%lu] Thread Start\n", tid);
 
	buffer = (char*) malloc(bufferSizeInBytes);
	if (buffer == NULL){
		printf("Buffer allocaton failure\n");
		goto exit;
	}
 
	while(1) {
		//TODO: Define data extraction (device) and storage 
		dbg(" @ \n");
 
		// try to get a lock
		pthread_mutex_lock(&si->q_mlock);
 
		// overflow, wait for signal from reader's responce.
		// Reader should take data packet from queue to make space.
		while (check_overflow(si->sq))
			pthread_cond_wait(&si->space_cond, &si->q_mlock);
 
		// AF
		dbg("[W] tid: %lu Q\n", tid);
 
		// Read data from input stream file
		avail_data = get_data(si, buffer, bufferSizeInBytes);
 
		if (avail_data <= 0){
			// Because thread meets end of file, 
			// this thread should prepare to finish.
			if (!exit_flag){
				dbg("ready to finish!\n");
				exit_flag = 1;
				// All action will be finished.
				// Wake up all readers. Let them prepare to finish all.
				pthread_cond_broadcast(&si->data_cond);
			}
			pthread_mutex_unlock(&si->q_mlock);
			break;
		}
 
		// Push data into queue
		enq_elements(si, buffer, avail_data);
 
		// Data ready. wake up any reader.
		pthread_cond_signal(&si->data_cond);
 
		pthread_mutex_unlock(&si->q_mlock);
 
	}
 
	if (buffer != NULL){
		free(buffer);
	}
 
exit:
	printf("Writer Thread End > thread id %lu\n", tid);
	pthread_exit(NULL);
}
 
/**
 * To check result, this function will compare input stream data(file) 
 * and output data file. 
 */
int compare_files(FILE* fp1, FILE* fp2)
{
	char c1, c2;
	int	result = 0;
 
	fseek(fp1, 0, SEEK_SET);
	fseek(fp2, 0, SEEK_SET);
 
	while(1) {
 
		c1 = fgetc(fp1);
		if (c1 == EOF) break;
 
		c2 = fgetc(fp2);
 
		if (c1 != c2)
			return -1;	
	}
	return result;
 
}
 
int main(int argc, char **argv) {
 
	int i;
	shared_info_t*	si;
	pthread_t readers[N];
	pthread_t writers[M];
 
// allocate shared information 
	si = (shared_info_t*) malloc (sizeof(shared_info_t));
	if (si == NULL){
		perror("Malloc failure");
		exit (0);
	}
 
// initialize queue 
	if (!init_queue(&si->sq)) {
		perror("Initializing queue is failure");
		exit (0);
	}
 
	pthread_mutex_init(&si->q_mlock, NULL);
	pthread_cond_init(&si->space_cond, NULL);
	pthread_cond_init(&si->data_cond, NULL);
 
// open input stream and output stream
	si->src_fp = fopen("input_stream.txt", "r");
	if (si->src_fp == NULL){
		perror("File for input open error:");
		exit (0);
	}
 
	si->dest_fp = fopen("output_stream.txt", "w+");
	if (si->dest_fp == NULL){
		perror("File for output open error:");
		exit (0);
	}
 
// Create threads
	for(i = 0; i < 20; i++) { 
		pthread_create(&readers[i], NULL, consumer_thread, (void*)si);
	}
	for(i = 0; i < 10; i++) { 
		pthread_create(&writers[i], NULL, producer_thread, (void*)si);
	}
 
// Finish
	for (i = 0; i < 20; i++)
		pthread_join(readers[i], NULL);
	for (i = 0; i < 10; i++)
		pthread_join(writers[i], NULL);
 
	pthread_mutex_destroy(&si->q_mlock);
 
// Determine if the result is correct or not. 
	printf("\nCompare the Source file with Destination file\n");
	if (compare_files(si->src_fp, si->dest_fp) < 0){
		printf(">> Two files are NOT same\n");
		printf(">> Data transfer failed.\n");
	} else {
		printf(">> Two files are SAME.\n");
		printf(">> Source data moved to Destination successfully.\n");
	}
	printf("\n");
 
	fclose(si->src_fp);
	fclose(si->dest_fp);
 
	return 0;	
}

댓글 달기

Filtered HTML

  • 텍스트에 BBCode 태그를 사용할 수 있습니다. URL은 자동으로 링크 됩니다.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>
  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.

BBCode

  • 텍스트에 BBCode 태그를 사용할 수 있습니다. URL은 자동으로 링크 됩니다.
  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param>
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.

Textile

  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • You can use Textile markup to format text.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>

Markdown

  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • Quick Tips:
    • Two or more spaces at a line's end = Line break
    • Double returns = Paragraph
    • *Single asterisks* or _single underscores_ = Emphasis
    • **Double** or __double__ = Strong
    • This is [a link](http://the.link.example.com "The optional title text")
    For complete details on the Markdown syntax, see the Markdown documentation and Markdown Extra documentation for tables, footnotes, and more.
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>

Plain text

  • HTML 태그를 사용할 수 없습니다.
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.
  • 줄과 단락은 자동으로 분리됩니다.
댓글 첨부 파일
이 댓글에 이미지나 파일을 업로드 합니다.
파일 크기는 8 MB보다 작아야 합니다.
허용할 파일 형식: txt pdf doc xls gif jpg jpeg mp3 png rar zip.
CAPTCHA
이것은 자동으로 스팸을 올리는 것을 막기 위해서 제공됩니다.