버클리 디비 사용시 질문입니다. ( 멀티 프로세스 + 트랜잭션 모
글쓴이: febace / 작성시간: 목, 2003/04/10 - 10:25오전
버클리 디비 4.1을 이용하여
트랜잭션을 사용하여 간단한 매핑 라이브러리를 만들어서
테스트 중입니다.
그런데 문제가 발생합니다.
트랜잭션으로 put을 감싸고 데이타를 입력 합니다. (물론 버클리 디비에)
그 프로그램을 실행 한후, ^C로 INT시그날로 프로세스를 죽이면,
다시 그 프로그램을 실행 해도 실행이 되지 않습니다.
db_recover를 실행 한후 하면 되긴 하지만,
db_recover이 필요 없게 이쁘게 종료 하는 법은 없을까요?
테스트 코드는 아래와 같습니다.
sleep을 주석 처리 하면 ^C를 하는 순간에 보통 sleep 중이라서
멈추는 문제가 없는데, sleep를 없에면 ^C를 하는 순간 트랜잭션 - put 중이라서 문제가 생기는 듯 합니다.
문제는 이 테스트 프로그램을 동시에 두개나 세개를 실행 할때 더욱 커집니다.
한개를 ^C( sleep 제거시 ) 하여 죽이면, 다른 프로세스도 멈춥니다.
어떻게 하면 이 문제를 해결 할수가 있을까요? 며칠째 고민입니다.
테스트 프로그램.
int main(int argc, char *argv[]) { int ret, i; char buffer[1024] = {0,}; struct timeval tp_s; struct timeval tp_e; /* mandatory first */ well_set_dir(&well, "DIRECTORY"); /* mandatory second */ ret = well_make_db("TEST", &well); if (ret < 0) { printf("make db error\n"); exit (1); } /* optional but being needed for stablity*/ //ret = well_make_monitor(&well); //if (ret < 0) { // printf("launch monitor error\n"); // exit (2); //} /* for transaction starvation */ well_set_txn_signal(SIGSTOP); well_set_txn_signal(SIGKILL); well_set_txn_signal(SIGQUIT); well_set_txn_signal(SIGINT); /* * all of preparation finished */ if (argc != 3) { printf("$ %s KEY DATA\n", argv[0]); exit (4); } for (i = 0;;) { i++; memset(&tp_s, 0, sizeof(tp_s)); memset(&tp_e, 0, sizeof(tp_e)); memset(buffer, 0, sizeof(buffer)); sprintf(buffer, "%s%d", argv[1], i); gettimeofday(&tp_s, NULL); ret = well_insert(&well, buffer, argv[2], strlen(argv[2]), 0); if (ret < 0) { printf("insert error\n"); exit (3); } gettimeofday(&tp_e, NULL); printf("KEY[%s] TIME[%d]\n", buffer, tp_e.tv_usec - tp_s.tv_usec); // sleep(1); } }
간단한 매핑 라이브러리
DB_TXN *well_tid = NULL; well_t *well_pwell = NULL; void well_set_dir(well_t *pwell, const char *dir) { memset(pwell, 0, sizeof(well_t)); sprintf(pwell->dir, dir); } /* * You have to call this before using Well. */ int well_make_db(const char *tblname, well_t *pwell) { int ret; ret = env_dir_create(pwell->dir); if (ret < 0) return ret; ret = env_open(&(pwell->dbenv), pwell->dir); if (ret < 0) return ret; ret = db_open(pwell->dbenv, &(pwell->dbh), tblname, 0); if (ret < 0) return ret; well_pwell = pwell; return (0); } /* * You have to call this, if you want to erase * dump log files and synchronisation automatically */ int well_make_monitor(const well_t *pwell) { int ret; thread_t ptid1, ptid2; /* Start a checkpoint thread. */ ret = pthread_create(&ptid1, NULL, checkpoint_thread, (void *)pwell->dbenv); if (ret != 0) { fprintf(stderr, "txnapp: failed spawning checkpoint thread: %s\n", strerror(ret)); return (-10); } /* Start a logfile removal thread. */ ret = pthread_create(&ptid2, NULL, logfile_thread, (void *)pwell->dbenv); if (ret != 0) { fprintf(stderr, "txnapp: failed spawning log file removal thread: %s\n", strerror(ret)); return (-20); } return (0); } /* * for multiple process transation * starvaion problem */ void well_set_txn_signal(int sig) { int ret; ret = signal(sig, close_tid_handler); printf("well_set_txn_signal [%d]\n", ret); } /* * well transaction insert * ret < 0 : error * = 0 : success * > 0 : duplicated data to insert */ int well_insert(const well_t *pwell, const char *pkey, const void *pdata, int size, int limit) { int ret; char buffer[1024] = {0,}; ret = well_select(pwell, pkey, buffer, 0, 0); switch (ret) { case 1: break; case 0: return (1); default: return (-10); } ret = well_update(pwell, pkey, pdata, size, limit); if (ret < 0) return (-20); return (0); } /* * well select not transaction * ret < 0 : error * = 0 : success and fill from db * > 0 : success but no data */ int well_select(const well_t *pwell, const char *pkey, void *pdata, int size, int limit) { int ret, cnt = 0; DBT key, data; memset(&key, 0, sizeof(key)); key.data = (char *)pkey; key.size = strlen(pkey); memset(&data, 0, sizeof(data)); data.flags = DB_DBT_MALLOC; select_retry: switch (ret = pwell->dbh->get(pwell->dbh, NULL, &key, &data, 0)) { case 0: ret = 0; break; case DB_NOTFOUND: ret = 1; break; case DB_LOCK_DEADLOCK: cnt++; if (cnt < limit) goto select_retry; default: pwell->dbenv->err(pwell->dbenv, ret, "select get: %s/%s", pkey, pdata); return (-10); } /* data copy size problem */ if (ret == 0) { memset(pdata, 0, size); if (data.size < size) size = data.size; memcpy(pdata, data.data, size); } return (ret); } /* * well transaction update */ int well_update(const well_t *pwell, const char *pkey, const void *pdata, int size, int limit) { int ret, cnt = 0; DBT key, data; DB_TXN *tid; memset(&key, 0, sizeof(key)); key.data = (char *)pkey; key.size = strlen(pkey); memset(&data, 0, sizeof(data)); data.data = (char *)pdata; data.size = size; ret = pwell->dbenv->txn_begin(pwell->dbenv, NULL, &tid, 0); if (ret != 0) { pwell->dbenv->err(pwell->dbenv, ret, "update txn_begin"); return (-10); } well_tid = tid; update_retry: switch (ret = pwell->dbh->put(pwell->dbh, tid, &key, &data, 0)) { case 0: break; case DB_LOCK_DEADLOCK: cnt++; if (cnt < limit) goto update_retry; default: pwell->dbenv->err(pwell->dbenv, ret, "update put: %s/%s", pkey, pdata); ret = tid->abort(tid); if (ret != 0) { pwell->dbenv->err(pwell->dbenv, ret, "update abort"); return (-20); } else return (-30); } ret = tid->commit(tid, 0); if (ret != 0) { pwell->dbenv->err(pwell->dbenv, ret, "update commit"); return (-40); } return (0); } /* * well transaction delete */ int well_delete(const well_t *pwell, const char *pkey, int limit) { int ret, cnt = 0; DBT key; DB_TXN *tid; memset(&key, 0, sizeof(key)); key.data = (char *)pkey; key.size = strlen(pkey); ret = pwell->dbenv->txn_begin(pwell->dbenv, NULL, &tid, 0); if (ret != 0) { pwell->dbenv->err(pwell->dbenv, ret, "delete txn_begin"); return (-10); } well_tid = tid; del_retry: ret = pwell->dbh->del(pwell->dbh, tid, &key, 0); switch (ret) { case 0: case DB_NOTFOUND: break; case DB_LOCK_DEADLOCK: if (++cnt < limit) goto del_retry; default: pwell->dbenv->err(pwell->dbenv, ret, "delete: %s", pkey); ret = tid->abort(tid); if (ret != 0) { pwell->dbenv->err(pwell->dbenv, ret, "delete abort"); return (-20); } else return (-30); } ret = tid->commit(tid, 0); if (ret != 0) { pwell->dbenv->err(pwell->dbenv, ret, "delete commit"); return (-40); } return (0); } /* * database environment is based on env directory */ static int env_dir_create(const char *dir) { int ret; struct stat sb; if (stat(dir, &sb) == 0) return (0); ret = mkdir(dir, S_IRWXU); if (ret != 0) { fprintf(stderr, "txnapp: mkdir: %s: %s\n", dir, strerror(errno)); return (-20); } return (0); } /* * for multiple process operation. * database enviroment is mandatory */ static int env_open(DB_ENV **dbenvp, const char *dir) { DB_ENV *dbenv; int ret; ret = db_env_create(&dbenv, 0); if (ret != 0) { fprintf(stderr, "well: db_env_create: %s\n", db_strerror(ret)); return (-10); } /* setting prefix of error message */ dbenv->set_errpfx(dbenv, "well"); ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT); if (ret != 0) { dbenv->err(dbenv, ret, "set_lk_detect: DB_LOCK_DEFAULT"); return (-20); } ret = dbenv->open(dbenv, dir, DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | /*DB_RECOVER |*/ DB_THREAD, S_IRUSR | S_IWUSR ); if (ret != 0) { dbenv->err(dbenv, ret, "dbenv->open: %s", dir); return (-30); } *dbenvp = dbenv; return (0); } /* * database open * last dups param is for duplicated insert */ static int db_open(DB_ENV *dbenv, DB **dbp, const char *name, int dups) { DB *db; int ret; ret = db_create(&db, dbenv, 0); if (ret != 0) { dbenv->err(dbenv, ret, "db_create"); return (-10); } ret = dups && db->set_flags(db, DB_DUP); if (ret != 0) { dbenv->err(dbenv, ret, "db->set_flags: DB_DUP"); return (-20); } ret = db->open(db, NULL, name, NULL, DB_BTREE, DB_CREATE | DB_THREAD | DB_AUTO_COMMIT, S_IRUSR | S_IWUSR); if (ret != 0) { dbenv->err(dbenv, ret, "db->open: %s", name); return (-30); } *dbp = db; return (0); } static void * checkpoint_thread(void *arg) { int ret; DB_ENV *dbenv; dbenv = arg; dbenv->errx(dbenv, "Checkpoint thread: %lu", (u_long)pthread_self()); /* Checkpoint once a minute. */ for (;; sleep(60)) { printf("체크 포인트 했다.\n"); ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0); if (ret != 0) { dbenv->err(dbenv, ret, "checkpoint thread"); exit (1); } } /* NOTREACHED */ } static void * logfile_thread(void *arg) { int ret; DB_ENV *dbenv; char buffer[1020]; char **file, **list; dbenv = arg; dbenv->errx(dbenv, "Log file removal thread: %lu", (u_long)pthread_self()); /* Check once every 5 minutes. */ for (;; sleep(300)) { /* Get the list of log files. */ ret = dbenv->log_archive(dbenv, &list, DB_ARCH_ABS); if (ret != 0) { dbenv->err(dbenv, ret, "DB_ENV->log_archive"); exit (1); } if (list != NULL) { for (file = list; *file != NULL; ++file) { ret = remove(*file); if (ret != 0) { dbenv->err(dbenv, ret, "remove %s", *file); exit (1); } } free(list); } } /* NOTREACHED */ } static void close_tid_handler(int signum) { int ret; ret = well_tid->abort(well_tid); printf("SIGNAL [%d]\n", ret); ret = well_pwell->dbh->close(well_pwell->dbh, 0); printf("DB CLOSE [%d]\n", ret); ret = well_pwell->dbenv->close(well_pwell->dbenv, 0); printf("ENV CLOSE [%d]\n", ret); }
Forums:
시그널을 블럭 시키면..
작업의 안전성 보장을 위함이라면,
시그널을 블럭시키면 어떨까요?
울랄라~ 호기심 천국~!!
http://www.ezdoum.com
버클리 디비를 여러개의 프로세스에서..접근..
버클리 디비를 여러개의 프로세스에서 접근하여 사용하다가.
한개의 프로세스가 죽을 경우(트랜잭션 중에)
다른 프로세스들까지 멈추는
슬픈 현상을 없애고 싶어서 그러는 것이거든요..
시그날을 방지 해도..흑흑흑..
주로 죽는 것이...
주로 죽는 상황이 메모리 침범이나 작업중단, 등등이라고 볼때,
시그널을 주게 됩니다.
그럼 시그널 핸들러에서 트랜젝션이 진행중이라면
끝나기 전에 롤백 처리 하면 될듯한데요...
울랄라~ 호기심 천국~!!
http://www.ezdoum.com
감사 합니다.
:)
최씨(?) 님 정말 감사 합니다.
댓글 달기