[파이썬]파일을 한 행씩 읽어들이는 iterator에서 mutiprocess를 어떻게 사용할 수 있을까요?

익명 사용자의 이미지

저는 생명과학 전공자이고, 파이썬을 이용해 DNA 염기서열을 분석하고 있습니다.

DNA 서열이 분석되면 두 개의 대용량 text 파일로 저장이 됩니다. 이 두개의 파일에서 각 각 한줄씩 읽어서 (사실 엄밀히 말해서 4줄씩이 한 데이터를 구성하는데 이해를 쉽게 하기 위해 한 줄씩이라고 표현했습니다.) 분석을 해야 합니다.

다행인 것은 Biopython에서 FastqGeneralIterator라는 함수를 제공하는데, 이게 해당 파일에서 한 줄씩(엄밀히 4줄씩) 반환하는 iterator를 만들어 줍니다.

문제는 이게 속도가 느리다보니 (분석 해야 할 파일이 너무 커서) multithread를 구성해서 일을 하고 싶은데, iterator를 어떻게 쪼개서 할당해야 하는지 감이 잡히지 않습니다.

제가 아는한 데이터를 쪼개서 생성한 thread에 할당을 해줘야 하는데, iterator를 쪼개는 방법이 애매하네요. 참고로 파일 전체를 읽어들여서 쪼개기에는 파일이 너무 큽니다. (200만 라인 이상이 될 수도 있습니다.)

이해를 돕기 위해 코드의 일부를 보이겠습니다.

import gzip
from Bio.SeqIO.QualityIO import FastqGeneralIterator
# f1: 첫번째 파일
# f2: 두번째 파일
with gzip.open(f1, 'rt') as R1:  
    with gzip.open(f2, 'rt') as R2:
        for i, j in zip(FastqGeneralIterator(R1), FastqGeneralIterator(R2)):
	    do_something()

작성자의 이미지

만약 불가하다면, 반드시 FastqGeneralIterator를 사용할 필요는 없습니다.

파일을 어떻게 쪼갤지에 대해서도 감이 잡히지 않습니다. 참고로 두 개일 파일에서 행을 읽어들이는 순서는 중요치 않으나, 2번째 행을 첫 파일에서 읽어들였다면, 두 번째 파일에서도 2번째 행을 읽어야 합니다. 각 행의 쌍이 중요합니다.

파이썬3의 이미지

실수로 잘못 건들었다가 정확도를 해치면 지도교수의 불벼락이 떨어질까 두렵네요;;;

[크롬북에서 적었어요]

익명 사용자의 이미지

python 은 멀티코어 CPU 를 활용하려면 Multiprocessing 모듈을 사용해야 되잖아요.
https://monkey3199.github.io/develop/python/2018/12/04/python-pararrel.html

filesplit 모듈을 이용해서 파일을 분리한 다음에 위 코드를 각 프로세스에서 돌리면 될것같은데요?

swish95의 이미지

답이 될지 모르지만 일단 기술해봅니다.
일단 텍스트 파일이 Sequential 이므로 파일을 분리하지 않고서는 동시작업이 무의미합니다.
만약 f1 의 400 번째 줄을 작업하고자 한다면 f2 도 400 번째까지 스킵하는 동작이 일어나야 되고 일단 그런 동작을 하게 구성하는건 속도 개선의 의미가 없습니다.

위의 분이 파일 분리를 이야기 하셨는데 각각의 f1,f2 를 4의 배수로 분리 시키기만 하면 동작은 개별 파일로 작업하게 되지만 python 은 별도의 모듈을 사용하지 않는한 멀티 프로세싱이 힘든 언어입니다. - 이건 제가 작업했을 때 기준이고 python3 는 어떻게 되는지는 정확히 모릅니다.
그래서 제가 제한하는 방법은
1. 파일 분리 작업
1-1. f1 을 full-scan 하면서 하위 디렉토리 ex)f1-sub 등에 4의 배수로 적절히 크기를 조정하여 분리해 둡니다.
1-2. f2 도 1-1 과 같은 방법으로 하위 파일을 시퀀셜하게 만듭니다.
2. 별도의 프로그램을 작성해서 f1의 분리된 파일과 f2의 분리된 파일이 생성되는 시점 - 생성되는 중에 작업하지 않도록 주의 - 에 각각의 파일을 처리하기 위한 프로세스를 os.system 또는 subprocess 등으로 fork 시킵니다. 물론 이때 각파일을 처리하는 프로그램을 만들어 둬야 합니다.
3. 각각의 프로세스는 해당 파일만 작업하고 특정한 형식으로 결과치를 파일또는 유효한 장치로 내보냅니다.

이때 주의할점은
2번의 작업시 너무 많은 프로세스가 동시에 뜨지 않도록 적절히 분배해야 하고 결과파일의 생성유무등을 통해 관리하도록 하는겁니다.

위에 기술한 방법은 hadoop 이전에 빅데이타를 처리하기 위해 제가 취했던 방법입니다.
좀 복잡해보일지도 모르지만 각 파일의 처리가 제대로 되는지 확인할수 있고 특정 라인의 비교가 이상할경우 해당 부분만 테스트가 용이하기 때문입니다.
또한 이런식으로 프로세스 포크를 이용하면 os 자체의 멀티프로세싱을 이용하므로 파이선의 제약과는 상관이 없습니다.

요약하자면 다음의 세가지 프로세스로 요약됩니다.
1. 파일을 분리하는 프로그램 - 이건 N 개에 대해 돌리기만 하면 되고 4의 배수로 분리 시켜 야 될듯 합니다.
2. 특정 디렉토리에 해당하는 패턴의 파일을 감지하여 비교 프로세스를 fork 시키고 프로세스가 과도하게 생성되지 않도록 감시하는 프로그램
3. 파라미터를 통해 자기가 해당하는 파일만 비교하는 프로그램 - 여기는 FastqGeneralIterator 의 성능을 테스트 하여 분리될 파일의 크기및 프로세스 갯수를 정합니다.

도움이 되셨는지 모르지만 원래대로 하자면 님이 말한건 하둡과 같은 빅데이타 플랫폼을 통해 처리하는게 바람직해 보입니다. ^^

------------------------------------------------------------
ProgrammingHolic

jick의 이미지

대충 생각하기로는, multiprocessing + queue를 사용해서, 일단 queue에서 라인을 하나씩 읽은 다음에 프로세스해서 결과를 돌려주는 함수를 만든 다음 자식 프로세스로 돌리고, 메인 프로세스에서는 위의 do_something() 부분에서 대신 queue.put() 해주면 되지 않을까 싶은데요.

이렇게 하면 파일을 나눌 필요도 없죠. 다만 한 가지 심각한 문제는 각 라인마다 처리 시간이 다르니까 결과가 라인 순서대로 나오는 게 아니라 처리 순서대로 나온다는 점입니다. (예를 들면 라인 1, 2, 3, 4, 5를 순서대로 queue에 넣었는데 결과는 3, 2, 4, 5, 1 뭐 이렇게 나올 수 있겠죠.) 이 문제를 해결하는 게 아주 복잡한 건 아니지만 좀 머리를 써야 하는데, 제가 생각하기에 가장 간단한 방법은 모든 작업과 결과물에 줄번호를 같이 넣어주고, 결과 queue를 읽는 자식 프로세스 혹은 쓰레드를 따로 만들어서, 거기서 계속 "queue에 있는 작업 결과를 다 읽는다 -> 다음에 출력해야 할 행번호가 있는가? -> 있으면 출력, 아니면 일단 다 버퍼에 넣어둔다 -> 반복" 하는 방법이 있겠습니다.

hirameki의 이미지

운영체제가 윈도우즈라면 파일을 쪼갠다던가해서 처리하는 방법보다는 SSD로 바꾸시는게 나을것 같습니다.
동시파일 처리시 디스크 억세스로 인한 오버헤드가 오히려 클수도 있습니다.

운영체제가 리눅스라면 파일을 두 파일을 각각 4의 배수인 줄 수로 split등으로 쪼갠다음, 프로세스를 복수개 실행한다음 결과를 순서대로 append해서 합치는 것은 어떨까싶네요.
또, 윗분들께서 이야기 하셨지만 각 파일에서 줄을 읽어오는 처리가 순서를 지키려면 기본적으로 1개기 떄문에, 읽어오는 속도에 한계가 있으므로 스레드수를 올려도 읽기보다 처리가 빨라지면 읽기를 기다리게 되므로 일정이상 빨라지지 않으니 추가적으로 리눅스도 SSD로 바꾸는 방법도 가능하겠습니다.

또다른 방법으로는 일시적으로 DNA서열 출력을 두 파일로 출력하지말고 하나로 나오게 한다음 파이프로 연결해서 (예를 들어 DNA서열을 두 파일로 출력하지 않고 1번 파일 4줄 2번파일 4줄이 연속적으로 출력해서 8줄이 순서대로 나오게 한다음 프로그램에서는 각각 4줄씩 2개의 데이터를 받고 처리하도록 변경) DNA서열 분석부터 즉시 결과를 뽑도록.. 하면 파일 출력후 다시 읽어서 처리해야 하는 수고를 덜을 수 있을것 같습니다... 만약 DNA서열 파일출력도 필요하다고 하면, 이 서열 분석 프로그램에서 파일1, 파일2, 분석결과 이렇게 3개로 분할출력하면 시간절약이 되지 않을까요?

--

LLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLL
Hirameki --X-
Mail : hirameki_krjp@yahoo.co.jp
God is not customer center. Do it yourself
LLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLL

작성자의 이미지

조언 해주신 모든 분들, 고맙습니다.

어느정도 optimize를 하니 생각보다 시간이 오래 걸리지는 않았습니다.

처음에 코드를 짰을 때 매번 리스트를 ndarray로 변환하고 각 array를 np.vstack로 합쳤었는데, 아마 이런 부분이 오히려 파일을 읽어들이는 부분보다 악영향을 미쳤었던 것 같습니다.

지금은 리스트 형태로 append한 후, 한 번에 array로 변환하고 있습니다. 이 부분도 시간이 좀 걸리기는 하지만 못 참을 정도는 아니네요.

import numpy as np
import matplotlib.pyplot as plt 
from Bio.SeqIO.QualityIO import FastqGeneralIterator
import gzip
import time
 
 
def get_Qscores(quality_values):
    return [ord(i) - 33 for i in quality_values]
 
 
start = time.time()
Q_scores_array = []
with gzip.open("./TEST.fastq.gz", "rt") as f:
    for s in FastqGeneralIterator(f):
        #print(s[1], len(s[1]))
        Q_scores = get_Qscores(s[2]) + [np.nan]*(151 - len(s[2]))
        Q_scores_array.append(Q_scores)
 
array_time = time.time() - start
print(f"It took {array_time} secs to get the list of Q scores")
 
print(len(Q_scores_array))
 
start = time.time()
Q_scores_array = np.array(Q_scores_array)
array_time = time.time() - start
print(f"It took {array_time} secs to get the array of Q scores")
 
print(Q_scores_array.shape)
 
start = time.time()
plt.boxplot([Q_scores_array[:, i][~np.isnan(Q_scores_array[:, i])] for i in range(0, 151, 5)], showfliers=False)
plot_time = time.time() - start
print(f"It took {plot_time} secs to plot boxplots")
 
plt.show()

아래는 출력결과 입니다.

It took 7.480452060699463 secs to get the list of Q scores
491508
It took 4.518349647521973 secs to get the array of Q scores
(491508, 151)
It took 0.866363525390625 secs to plot boxplots

record가 49만개이니 약 49*4, 약 200만 줄 파일을 읽어서 plotting하는데 13초. 뭐 나쁘지 않습니다. 그래도 어떻게 간편하고 좀 더 빠르게 할 수 있을 지 고민해봐야겠습니다.

참고로 전 프로그래밍을 독학해왔습니다. 그래서 어떻게 더 좋은 코드를 작성해야 하는지 잘 모릅니다.
제 코드에 어떤 코멘트도 환영합니다.

작성자의 이미지

아래와 같이 코드를 수정해 봤습니다. 그런데 예상과는 다르게 작동하네요.

  3 import numpy as np
  4 import matplotlib.pyplot as plt
  5 from Bio.SeqIO.QualityIO import FastqGeneralIterator
  6 import gzip
  7 import time
  8 import multiprocessing
  9 import concurrent.futures
 10 
 11 
 12 def get_Qscores(quality_values):
 13     tmp = [ord(i) - 33 for i in quality_values[2]] + [np.NaN]*(151 - len(quality_values[2]))
 14     Q_scores_array.append(tmp)
 15 
 16 
 17 
 18 if __name__ == "__main__":
 19     start = time.time()
 20     manager = multiprocessing.Manager()
 21     Q_scores_array = manager.list()
 22     with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
 23         with gzip.open("./TEST.fastq.gz", "rt") as f:
 24             result = list(executor.map(get_Qscores, FastqGeneralIterator(f)))
 25  
 26     array_time = time.time() - start
 27     print(f"It took {array_time} secs to get the list of Q scores")
 28  
 29     print(len(Q_scores_array))

결과는 아래와 같습니다.

It took 93.72561740875244 secs to get the list of Q scores
491508

multiprocess를 사용했음에도 싱글프로세스를 사용했을 때보다 10배 이상 늦어졌습니다.
참고로 라인 24의 FastqGeneralIterator(f)는 일종의 리스트를 원소로 하는 iterator를 반환합니다. 그리고 저는 그 리스트의 세 번째(index로는 2번)을 갖고 무언가를 하려고 합니다.는

제가 하려고 하는 일은 싱글 프로세스가 파일을 읽어서 하던 일(바로 이전 글에 작성했던 코드)을, 5개의 프로세스가 하나의 파일에서 읽어 들여서 하나의 global list인 Q_scores_array 에 데이터를 append하는 과정을 통해 시간적으로 단축시키고 싶었습니다.

아래는 동작하는 동안 top 했을 때 결과 중 상위에 있는 프로세스입니다. (참고로 위에서 실행한 파일이 fastqc.py입니다.

어떠한 코멘트도 환영합니다.

top - 22:13:38 up 56 min,  1 user,  load average: 2.42, 1.25, 0.69
Tasks: 338 total,   3 running, 264 sleeping,   0 stopped,   0 zombie
%Cpu(s): 30.5 us,  8.4 sy,  0.0 ni, 61.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem : 16071572 total, 10451268 free,  3380516 used,  2239788 buff/cache
KiB Swap:  7812092 total,  7812092 free,        0 used. 11832388 avail Mem 
 
  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                      
11179 heodh     20   0 1267548 556032  39996 R 115.6  3.5   1:46.89 fastqc.py                                                                                                                                                    
11191 heodh     20   0 1450416 715308   8392 S  60.9  4.5   0:32.30 fastqc.py                                                                                                                                                    
11198 heodh     20   0  669744  52564   5472 S  23.5  0.3   0:12.94 fastqc.py                                                                                                                                                    
11202 heodh     20   0  669744  52568   5472 S  23.5  0.3   0:12.91 fastqc.py                                                                                                                                                    
11196 heodh     20   0  669744  52480   5432 S  23.2  0.3   0:12.93 fastqc.py                                                                                                                                                    
11200 heodh     20   0  669744  52564   5472 S  23.2  0.3   0:12.89 fastqc.py                                                                                                                                                    
11204 heodh     20   0  669744  52572   5472 R  22.8  0.3   0:12.91 fastqc.py    
jick의 이미지

음, 보여주신 코드에 따르면 각 줄을 읽어서 복잡한 계산을 하는 게 아니라 그냥 리스트로 변환하는 것뿐인데, 그렇다면 multiprocess를 썼을 때 느려지는 게 이해가 갑니다. 자식 프로세스에서 딱히 해줄 일이 없고 그냥 데이터를 자식 프로세스로 보냈다가 다시 가져오는 오버헤드가 더 큰 거겠죠.

저보고 하라면, 리스트를 만드는 과정을 아예 스킵하고 적당한 크기의 numpy array를 만들어서 그냥 in-place update를 하는 식으로 하면 어떨까 싶습니다만, 예를 들면 이런 식으로?

buf = np.full(151 * 1000, np.nan)
for idx in range(1000):
    # read a line into quality_values
    start = idx * 151
    end = idx * 151 + len(quality_values[2])
    buf[start:end] = [ord(i) - 33 for i in quality_values[2]]

이렇게 대략 1000줄씩 읽은다음 마지막에 합치면... 어쩌면 빨라질 수 있을지도요?

swish95의 이미지

워커를 다섯개로 분리하기만 했네요
각각의 프로세스들이 최적의 성능을 발휘할수 있게 분배 하는게 없습니다.
그렇게 되면 쓸데없는 부하만 더 늘리는 꼴이죠

------------------------------------------------------------
ProgrammingHolic

댓글 달기

Filtered HTML

  • 텍스트에 BBCode 태그를 사용할 수 있습니다. URL은 자동으로 링크 됩니다.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>
  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.

BBCode

  • 텍스트에 BBCode 태그를 사용할 수 있습니다. URL은 자동으로 링크 됩니다.
  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param>
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.

Textile

  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • You can use Textile markup to format text.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>

Markdown

  • 다음 태그를 이용하여 소스 코드 구문 강조를 할 수 있습니다: <code>, <blockcode>, <apache>, <applescript>, <autoconf>, <awk>, <bash>, <c>, <cpp>, <css>, <diff>, <drupal5>, <drupal6>, <gdb>, <html>, <html5>, <java>, <javascript>, <ldif>, <lua>, <make>, <mysql>, <perl>, <perl6>, <php>, <pgsql>, <proftpd>, <python>, <reg>, <spec>, <ruby>. 지원하는 태그 형식: <foo>, [foo].
  • Quick Tips:
    • Two or more spaces at a line's end = Line break
    • Double returns = Paragraph
    • *Single asterisks* or _single underscores_ = Emphasis
    • **Double** or __double__ = Strong
    • This is [a link](http://the.link.example.com "The optional title text")
    For complete details on the Markdown syntax, see the Markdown documentation and Markdown Extra documentation for tables, footnotes, and more.
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.
  • 사용할 수 있는 HTML 태그: <p><div><span><br><a><em><strong><del><ins><b><i><u><s><pre><code><cite><blockquote><ul><ol><li><dl><dt><dd><table><tr><td><th><thead><tbody><h1><h2><h3><h4><h5><h6><img><embed><object><param><hr>

Plain text

  • HTML 태그를 사용할 수 없습니다.
  • web 주소와/이메일 주소를 클릭할 수 있는 링크로 자동으로 바꿉니다.
  • 줄과 단락은 자동으로 분리됩니다.
댓글 첨부 파일
이 댓글에 이미지나 파일을 업로드 합니다.
파일 크기는 8 MB보다 작아야 합니다.
허용할 파일 형식: txt pdf doc xls gif jpg jpeg mp3 png rar zip.
CAPTCHA
이것은 자동으로 스팸을 올리는 것을 막기 위해서 제공됩니다.