상위 목록: 하위 목록: 작성 날짜: 읽는 데 24 분 소요

프로세스 기반 병렬 처리(Multi Processing)

프로세스 기반 병렬 처리(Multi Processing)쓰레딩(threading) 모듈과 비슷한 API를 활용하여 프로세스 스포닝(Process Spawning)을 지원하는 패키지입니다.

프로세스(Process)는 프로그램을 메모리 상에서 실행중인 작업을 의미합니다. 그러므로, 멀티 프로세싱은 하나 이상의 프로세스들을 동시에 처리하는 것을 의미합니다.

대용량 데이터를 처리하는 과정이나 데이터를 분배하여 동시에 처리하고자 할 때 주로 활용합니다.

멀티 프로세싱은 다수의 프로세스로 처리하므로 안전성이 높지만, 각각 독립된 메모리 영역을 갖고 있어 작업량 많을 수록 오버헤드(Overhead)가 발생할 수 있습니다.

  • Tip : 프로세스 스포닝(Process Spawning)이란 부모 프로세스(Parent Proecess)가 운영 체제에 요청해 새로운 자식 프로세스(Child Process)를 만들어내는 과정입니다.

  • Tip : 스레드(Thread)프로세스(Process) 안에서 실행되는 여러 흐름 단위를 의미합니다.



프로세스(Process)

import os
import time
from multiprocessing import Process, freeze_support


def task(idx, count):
    print(f"PID : {os.getpid()}")
    logic = sum([i ** 2 for i in range(count)])
    return idx, logic


if __name__ == "__main__":
    freeze_support()

    job = [("첫 번째", 10 ** 7), ("두 번째", 10 ** 7), ("세 번째", 10 ** 7), ("네 번째", 10 ** 7)]

    start = time.time()
    
    process = []
    for idx, count in job:
        p = Process(target=task, args=(idx, count))
        p.start()
        process.append(p)

    for p in process:
        p.join()

    print(f"End Time : {time.time() - start}s")

    start = time.time()

    for idx, count in job:
        task(idx, count)

    print(f"End Time : {time.time() - start}s")
결과
PID : 27792
PID : 28488
PID : 26248
PID : 26508
End Time : 7.55154824256897s
PID : 24800
PID : 24800
PID : 24800
PID : 24800
End Time : 13.973652124404907s

프로세스(Process) 클래스는 Process 객체를 생성한 후 start() 메서드를 호출해서 스폰합니다.

이후 각 프로세스는 join() 메서드를 통해 자식 프로세스가 종료될 때까지 대기합니다.

각 프로세스마다 ID가 존재하므로, process 목록을 통해 프로세스가 종료될 때 까지 대기하기 위해 join() 메서드를 호출합니다.

처리해야하는 연산량이 많은 경우, 프로세스 클래스를 통해 병렬 처리를 진행할 수 있습니다.

프로세스(Process)는 각 작업마다 새로운 프로세스가 할당되어 작업을 처리합니다.

  • Tip : Windows 환경에서는 freeze_support()를 통해 프로세스 개체에 대한 코드를 실행할 수 있게 설정합니다.

  • Tip : os.getpid()를 통해 프로세스마다 서로 다른 PID 값을 가진 프로세스가 실행되는 것을 확인할 수 있습니다.



풀(Pool)

import os
import time
from multiprocessing import Pool, freeze_support


def task(pairs):
    print(f"PID : {os.getpid()}")
    idx, count = pairs
    logic = sum([i ** 2 for i in range(count)])
    return idx, logic


if __name__ == "__main__":
    freeze_support()
    job = [("첫 번째", 10 ** 7), ("두 번째", 10 ** 7), ("세 번째", 10 ** 7), ("네 번째", 10 ** 7)]

    start = time.time()

    p = Pool(processes=2)
    result = p.map(task, job)

    print(result)
    print(f"End Time : {time.time() - start}s")

    start = time.time()

    result = [task(j) for j in job]

    print(result)
    print(f"End Time : {time.time() - start}s")
결과
PID : 23736
PID : 27140
PID : 23736
PID : 27140
[(‘첫 번째’, 333333283333335000000), (‘두 번째’, 333333283333335000000), (‘세 번째’, 333333283333335000000), (‘네 번째’, 333333283333335000000)]
End Time : 8.848436832427979s
PID : 27316
PID : 27316
PID : 27316
PID : 27316
[(‘첫 번째’, 333333283333335000000), (‘두 번째’, 333333283333335000000), (‘세 번째’, 333333283333335000000), (‘네 번째’, 333333283333335000000)]
End Time : 13.524287700653076s

풀(Pool) 객체는 여러 입력 값에 걸쳐 함수의 실행을 병렬 처리하고 입력 데이터를 프로세스에 분산시킵니다.

풀의 인스턴스를 생성하고 2개의 작업자를 생성합니다.

map() 메서드를 통해 실행하려는 함수반복 가능한 객체를 입력하여 각 프로세스에 매핑합니다.

풀(Pool)은 사전에 프로세스(processes)의 개수를 설정하여 반복합니다.

  • Tip : 프로세스의 개수가 2개라면, 첫 번째세 번째 작업은 같은 PID를 갖습니다.



병렬(Parallel)

import os
import time
from joblib import Parallel, delayed


def task(idx, count):
    print(f"PID : {os.getpid()}")
    logic = sum([i ** 2 for i in range(count)])
    return idx, logic


job = [("첫 번째", 10 ** 7), ("두 번째", 10 ** 7), ("세 번째", 10 ** 7), ("네 번째", 10 ** 7)]

start = time.time()

result = Parallel(n_jobs=4)(delayed(task)(idx, count) for idx, count in job)

print(result)
print(f"End Time : {time.time() - start}s")

start = time.time()

result = [task(*j) for j in job]

print(result)
print(f"End Time : {time.time() - start}s")
결과
PID : 21636
PID : 28004
PID : 4808
PID : 1604
[(‘첫 번째’, 333333283333335000000), (‘두 번째’, 333333283333335000000), (‘세 번째’, 333333283333335000000), (‘네 번째’, 333333283333335000000)]
End Time : 6.701249122619629s
PID : 25380
PID : 25380
PID : 25380
PID : 25380
[(‘첫 번째’, 333333283333335000000), (‘두 번째’, 333333283333335000000), (‘세 번째’, 333333283333335000000), (‘네 번째’, 333333283333335000000)]
End Time : 13.458436727523804s

joblib 라이브러리는 multiprocessing 모듈과 동일한 기능을 포함하고 있습니다.

주요한 차이점으로는 매개 변수를 조금 더 쉽게 전달할 수 있으며, 대규모 Numpy 기반 데이터 구조에 대해 작업자 프로세스와 공유 메모리를 효율적으로 사용할 수 있습니다.

Parallel(n_jobs=프로세스 개수)(delayed(함수)(인수))의 구조로 사용할 수 있습니다.

Parallel 클래스는 병렬 매핑을 위한 클래스입니다. 백엔드(backend)를 설정하거나, 배치 크기(batch_size) 등을 추가로 설정할 수 있습니다.

delayed 메서드는 함수의 인수를 캡처하는 데 사용되는 데코레이터입니다.

대규모 작업에서는 joblib를 활용하는 것이 효율적인 방법입니다.

  • Tip : 멀티 프로세싱은 여러 프로세스를 생성하는 데, 시간이 소요되므로 비교적 작업량이 적은 경우 단일 프로세스가 더 빠를 수 있습니다.

댓글 남기기