버클리 디비 사용시 질문입니다. ( 멀티 프로세스 + 트랜잭션 모

febace의 이미지

버클리 디비 4.1을 이용하여

트랜잭션을 사용하여 간단한 매핑 라이브러리를 만들어서

테스트 중입니다.

그런데 문제가 발생합니다.

트랜잭션으로 put을 감싸고 데이타를 입력 합니다. (물론 버클리 디비에)

그 프로그램을 실행 한후, ^C로 INT시그날로 프로세스를 죽이면,

다시 그 프로그램을 실행 해도 실행이 되지 않습니다.

db_recover를 실행 한후 하면 되긴 하지만,

db_recover이 필요 없게 이쁘게 종료 하는 법은 없을까요?

테스트 코드는 아래와 같습니다.

sleep을 주석 처리 하면 ^C를 하는 순간에 보통 sleep 중이라서
멈추는 문제가 없는데, sleep를 없에면 ^C를 하는 순간 트랜잭션 - put 중이라서 문제가 생기는 듯 합니다.

문제는 이 테스트 프로그램을 동시에 두개나 세개를 실행 할때 더욱 커집니다.

한개를 ^C( sleep 제거시 ) 하여 죽이면, 다른 프로세스도 멈춥니다.

어떻게 하면 이 문제를 해결 할수가 있을까요? 며칠째 고민입니다.

테스트 프로그램.


int main(int argc, char *argv[])
{
        int ret, i;
        char buffer[1024] = {0,};

        struct timeval tp_s;
        struct timeval tp_e;

        /* mandatory first */
        well_set_dir(&well, "DIRECTORY");
        
        /* mandatory second */
        ret = well_make_db("TEST", &well);
        if (ret < 0) {
                printf("make db error\n");
                exit (1);
        }

        /* optional but being needed for stablity*/
        //ret = well_make_monitor(&well);
        //if (ret < 0) {
        //      printf("launch monitor error\n");
        //      exit (2);
        //}

        /* for transaction starvation */
        well_set_txn_signal(SIGSTOP);
        well_set_txn_signal(SIGKILL);
        well_set_txn_signal(SIGQUIT);
        well_set_txn_signal(SIGINT);
        /* 
         * all of preparation finished 
         */
        if (argc != 3) {
                printf("$ %s KEY DATA\n", argv[0]);
                exit (4);
        }

        for (i = 0;;) { 
                i++;

                memset(&tp_s, 0, sizeof(tp_s));
                memset(&tp_e, 0, sizeof(tp_e));

                memset(buffer, 0, sizeof(buffer));
                sprintf(buffer, "%s%d", argv[1], i);

                gettimeofday(&tp_s, NULL);
                ret = well_insert(&well, buffer, argv[2], strlen(argv[2]), 0);
                if (ret < 0) {
                        printf("insert error\n");
                        exit (3);
                }
                gettimeofday(&tp_e, NULL);

                printf("KEY[%s] TIME[%d]\n", buffer, tp_e.tv_usec - tp_s.tv_usec);
        //      sleep(1);
        }

}

간단한 매핑 라이브러리


DB_TXN *well_tid = NULL;
well_t *well_pwell = NULL;

void
well_set_dir(well_t *pwell, const char *dir)
{
        memset(pwell, 0, sizeof(well_t));
        sprintf(pwell->dir, dir);
}


/*
 * You have to call this before using Well.
 */
int 
well_make_db(const char *tblname, well_t *pwell)
{
        int ret;        

        ret = env_dir_create(pwell->dir);
        if (ret < 0)
                return ret;

        ret = env_open(&(pwell->dbenv), pwell->dir);
        if (ret < 0)
                return ret;

        ret = db_open(pwell->dbenv, &(pwell->dbh), tblname, 0);
        if (ret < 0)
                return ret;

        well_pwell = pwell;

        return (0);
}

/*
 * You have to call this, if you want to erase
 * dump log files and synchronisation automatically
 */
int
well_make_monitor(const well_t *pwell)
{
        int ret;
        thread_t ptid1, ptid2;

        /* Start a checkpoint thread. */
        ret = pthread_create(&ptid1, NULL, checkpoint_thread, (void *)pwell->dbenv);
        if (ret != 0) {
                fprintf(stderr, 
                    "txnapp: failed spawning checkpoint thread: %s\n", 
                    strerror(ret));
                return (-10);
        }
        /* Start a logfile removal thread. */
        ret = pthread_create(&ptid2, NULL, logfile_thread, (void *)pwell->dbenv);
        if (ret != 0) {
                fprintf(stderr, 
                    "txnapp: failed spawning log file removal thread: %s\n", 
                    strerror(ret));
                return (-20);
        }
        return (0);
}

/*
 * for multiple process transation
 * starvaion problem 
 */
void
well_set_txn_signal(int sig)
{
        int ret;
        ret = signal(sig, close_tid_handler);
        printf("well_set_txn_signal [%d]\n", ret);
}

/*
 * well transaction insert
 * ret < 0 : error
 *     = 0 : success 
 *     > 0 : duplicated data to insert
 */
int 
well_insert(const well_t *pwell, const char *pkey, const void *pdata, int size,  int limit)
{
        int ret;
        char buffer[1024] = {0,};

        ret = well_select(pwell, pkey, buffer, 0, 0);
        switch (ret) {
        case 1:
                break;
        case 0:
                return (1);
        default:
                return (-10);
        }

        ret = well_update(pwell, pkey, pdata, size, limit);
        if (ret < 0)
                return (-20);

        return (0);
}

/* 
 * well select not transaction 
 * ret < 0 : error
 *     = 0 : success and fill from db
 *     > 0 : success but no data
 */
int 
well_select(const well_t *pwell, const char *pkey, void *pdata, int size, int limit)
{
        int ret, cnt = 0;
        DBT key, data;

        memset(&key, 0, sizeof(key));
        key.data = (char *)pkey;
        key.size = strlen(pkey);

        memset(&data, 0, sizeof(data));
        data.flags = DB_DBT_MALLOC;

select_retry:
        switch (ret = pwell->dbh->get(pwell->dbh, NULL, &key, &data, 0)) {
        case 0:
                ret = 0;
                break;
        case DB_NOTFOUND:
                ret = 1;
                break;
        case DB_LOCK_DEADLOCK:
                cnt++;
                if (cnt < limit)
                        goto select_retry;
        default:
                pwell->dbenv->err(pwell->dbenv, ret, "select get: %s/%s", pkey, pdata);
                return (-10);
        }

        /* data copy size problem */
        if (ret == 0) {
                memset(pdata, 0, size);
                if (data.size < size) 
                        size = data.size;
                memcpy(pdata, data.data, size);
        }

        return (ret);
}

/*
 * well transaction update 
 */
int 
well_update(const well_t *pwell, const char *pkey, const void *pdata, int size, int limit)
{
        int ret, cnt = 0;
        DBT key, data;
        DB_TXN *tid;

        memset(&key, 0, sizeof(key));
        key.data = (char *)pkey;
        key.size = strlen(pkey);

        memset(&data, 0, sizeof(data));
        data.data = (char *)pdata;
        data.size = size;

        ret = pwell->dbenv->txn_begin(pwell->dbenv, NULL, &tid, 0);
        if (ret != 0) {
                pwell->dbenv->err(pwell->dbenv, ret, "update txn_begin");
                return (-10);
        }
        well_tid = tid;

update_retry:
        switch (ret = pwell->dbh->put(pwell->dbh, tid, &key, &data, 0)) {
        case 0:
                break;
        case DB_LOCK_DEADLOCK:
                cnt++;
                if (cnt < limit)
                        goto update_retry;
        default:
                pwell->dbenv->err(pwell->dbenv, ret, "update put: %s/%s", pkey, pdata);
                ret = tid->abort(tid);
                if (ret != 0) {
                        pwell->dbenv->err(pwell->dbenv, ret, "update abort");
                        return (-20);
                } else
                        return (-30);
        }

        ret = tid->commit(tid, 0);
        if (ret != 0) {
                pwell->dbenv->err(pwell->dbenv, ret, "update commit");
                return (-40);
        }

        return (0);

}

/* 
 * well transaction delete
 */
int
well_delete(const well_t *pwell, const char *pkey, int limit)
{
        int ret, cnt = 0;
        DBT key;
        DB_TXN *tid;

        memset(&key, 0, sizeof(key));
        key.data = (char *)pkey;
        key.size = strlen(pkey);

        ret = pwell->dbenv->txn_begin(pwell->dbenv, NULL, &tid, 0);
        if (ret != 0) {
                pwell->dbenv->err(pwell->dbenv, ret, "delete txn_begin");
                return (-10);
        }
        well_tid = tid;

del_retry:
        ret = pwell->dbh->del(pwell->dbh, tid, &key, 0);
        switch (ret) {
        case 0:
        case DB_NOTFOUND:
                break;
        case DB_LOCK_DEADLOCK:
                if (++cnt < limit)
                        goto del_retry;
        default:
                pwell->dbenv->err(pwell->dbenv, ret, "delete: %s", pkey);

                ret = tid->abort(tid);
                if (ret != 0) {
                        pwell->dbenv->err(pwell->dbenv, ret, "delete abort");
                        return (-20);
                } else 
                        return (-30);
        }
        
        ret = tid->commit(tid, 0);
        if (ret != 0) {
                pwell->dbenv->err(pwell->dbenv, ret, "delete commit");
                return (-40);
        }

        return (0);
}

/*
 * database environment is based on env directory
 */
static int
env_dir_create(const char *dir)
{
        int ret;
        struct stat sb;

        if (stat(dir, &sb) == 0)
                return (0);

        ret = mkdir(dir, S_IRWXU);
        if (ret != 0) {
                fprintf(stderr, "txnapp: mkdir: %s: %s\n", dir, strerror(errno));
                return (-20);
        }

        return (0);
}

/*
 * for multiple process operation.
 * database enviroment is mandatory
 */
static int
env_open(DB_ENV **dbenvp, const char *dir)
{
        DB_ENV *dbenv;
        int ret;

        ret = db_env_create(&dbenv, 0);
        if (ret != 0) {
                fprintf(stderr, "well: db_env_create: %s\n", db_strerror(ret));
                return (-10);
        }

        /* setting prefix of error message */
        dbenv->set_errpfx(dbenv, "well");

        ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT);
        if (ret != 0) {
                dbenv->err(dbenv, ret, "set_lk_detect: DB_LOCK_DEFAULT");
                return (-20);
        }

        ret = dbenv->open(dbenv, dir, DB_CREATE | 
                        DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | 
                        DB_INIT_TXN | /*DB_RECOVER |*/ DB_THREAD, 
                        S_IRUSR | S_IWUSR );
        if (ret != 0) {
                dbenv->err(dbenv, ret, "dbenv->open: %s", dir);
                return (-30);
        }

        *dbenvp = dbenv;

        return (0);
}

/* 
 * database open 
 * last dups param is for duplicated insert 
 */
static int 
db_open(DB_ENV *dbenv, DB **dbp, const char *name, int dups)
{
        DB *db;
        int ret;

        ret = db_create(&db, dbenv, 0);
        if (ret != 0) {
                dbenv->err(dbenv, ret, "db_create");
                return (-10);
        }

        ret = dups && db->set_flags(db, DB_DUP);
        if (ret != 0) {
                dbenv->err(dbenv, ret, "db->set_flags: DB_DUP");
                return (-20);
        }

        ret = db->open(db, NULL, name, NULL, DB_BTREE, 
                        DB_CREATE | DB_THREAD |  DB_AUTO_COMMIT, 
                        S_IRUSR | S_IWUSR);
        if (ret != 0) {
                dbenv->err(dbenv, ret, "db->open: %s", name);
                return (-30);
        }

        *dbp = db;

        return (0);
}

static void *
checkpoint_thread(void *arg)
{
        int ret;
        DB_ENV *dbenv;

        dbenv = arg;
        dbenv->errx(dbenv, "Checkpoint thread: %lu", (u_long)pthread_self());

        /* Checkpoint once a minute. */
        for (;; sleep(60)) {
                printf("체크 포인트 했다.\n");
                ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0);
                if (ret != 0) {
                        dbenv->err(dbenv, ret, "checkpoint thread");
                        exit (1);
                }
        }
        /* NOTREACHED */
}

static void *
logfile_thread(void *arg)
{
        int ret;
        DB_ENV *dbenv;
        char buffer[1020];
        char **file, **list;

        dbenv = arg;
        dbenv->errx(dbenv, "Log file removal thread: %lu", (u_long)pthread_self());

        /* Check once every 5 minutes. */
        for (;; sleep(300)) {
                /* Get the list of log files. */
                ret = dbenv->log_archive(dbenv, &list, DB_ARCH_ABS);
                if (ret != 0) {
                        dbenv->err(dbenv, ret, "DB_ENV->log_archive");
                        exit (1);
                }
                
                if (list != NULL) {
                        for (file = list; *file != NULL; ++file) {
                                ret = remove(*file);
                                if (ret != 0) {
                                        dbenv->err(dbenv, ret, "remove %s", *file);
                                        exit (1);
                                }
                        }
                        free(list);
                }

        }
        /* NOTREACHED */
}

static void 
close_tid_handler(int signum)
{
        int ret;
        ret = well_tid->abort(well_tid);
        printf("SIGNAL [%d]\n", ret);

        ret = well_pwell->dbh->close(well_pwell->dbh, 0);
        printf("DB CLOSE [%d]\n", ret);

        ret = well_pwell->dbenv->close(well_pwell->dbenv, 0);
        printf("ENV CLOSE [%d]\n", ret);


}

choissi의 이미지

작업의 안전성 보장을 위함이라면,
시그널을 블럭시키면 어떨까요?

울랄라~ 호기심 천국~!!
http://www.ezdoum.com

febace의 이미지

버클리 디비를 여러개의 프로세스에서 접근하여 사용하다가.
한개의 프로세스가 죽을 경우(트랜잭션 중에)

다른 프로세스들까지 멈추는
슬픈 현상을 없애고 싶어서 그러는 것이거든요..

시그날을 방지 해도..흑흑흑..

choissi의 이미지

주로 죽는 상황이 메모리 침범이나 작업중단, 등등이라고 볼때,
시그널을 주게 됩니다.

그럼 시그널 핸들러에서 트랜젝션이 진행중이라면
끝나기 전에 롤백 처리 하면 될듯한데요...

울랄라~ 호기심 천국~!!
http://www.ezdoum.com

febace의 이미지

:)
최씨(?) 님 정말 감사 합니다.

댓글 달기

Filtered HTML

  • 텍스트에 BBCode 태그를 사용할 수 있습니다. URL은 자동으로 링크 됩니다.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>
  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.

BBCode

  • 텍스트에 BBCode 태그를 사용할 수 있습니다. URL은 자동으로 링크 됩니다.
  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param>
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.

Textile

  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • You can use Textile markup to format text.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>

Markdown

  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • Quick Tips:
    • Two or more spaces at a line's end = Line break
    • Double returns = Paragraph
    • *Single asterisks* or _single underscores_ = Emphasis
    • **Double** or __double__ = Strong
    • This is [a link](http://the.link.example.com "The optional title text")
    For complete details on the Markdown syntax, see the Markdown documentation and Markdown Extra documentation for tables, footnotes, and more.
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>

Plain text

  • HTML 태그를 사용할 수 없습니다.
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.
  • 줄과 단락은 자동으로 분리됩니다.
댓글 첨부 파일
이 댓글에 이미지나 파일을 업로드 합니다.
파일 크기는 8 MB보다 작아야 합니다.
허용할 파일 형식: txt pdf doc xls gif jpg jpeg mp3 png rar zip.
CAPTCHA
이것은 자동으로 스팸을 올리는 것을 막기 위해서 제공됩니다.