본문 바로가기

Computer/Python

concurrent.futures를 이용한 병렬화

반응형

매우 많은 연산량을 요구하는 파이썬 프로그램에 대해서는 병렬화 없이 결국 성능의 벽에 부딪치게 됩니다. 멀티 스레드를 사용하고자 하더라도 파이썬의 GIL (Global Interpreter Lock)으로 인해 파이썬 스레드는 진정한 병렬 실행이 불가능하므로 하나의 계산을 여러 작업으로 나누는 작업은 파이썬에서는 불필요합니다. 그렇다면 성능에 결정적인 영향을 미치는 부분을 하부 기계에 가까운 C 언어를 사용한 확장 모듈로 작성할 수 있겠지만 여러 부분을 C 언어로 바꾸어야 해서 많은 비용이 들고 포팅하는 과정에서 무수히 많은 버그가 발생할 수 있습니다.

파이썬에서 진정한 병렬화를 통해 속도 향상을 시키기 위해서는 파이썬 3.2 버젼부터 생성된 concurrent.futures 모듈의 multiprocessing 내장 모듈을 사용할 수 있습니다. 이 모듈을 사용하면 자식 프로세스로 다른 파이썬 인터프리터를 실행함으로써 파이썬에서 여러 CPU 코어를 활용할 수 있으며 주 인터프리터와 별도로 실행되기 때문에 GIL로부터 자유롭습니다. 각 자식에는 주 프로세스에 대한 연결이 들어 있으므로 이 연결을 활용해 어떤 연산을 수행할지 지시를 받고 계산한 결과를 돌려줄 수 있습니다.

예를 들어 매우 큰 두 수 쌍에 대하여 최대공약수를 구하려고 합니다.

# my_module.py
def gcd(pair):
    a, b = pair
    low = min(a,b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i

이 함수를 순차적으로 실행하면 병렬성이 없기 때문에 계산 시간이 선형적으로 증가합니다. NUMBERS 변수에 대해 실행 시 1.073초가 소요됩니다.

from my_module import gcd

import timeit

def main():
    NUMBERS = [(1963309, 2265973), (2030677, 3814172),
               (1551645, 2229620), (2039045, 2020802),
               (1823712, 1924928), (2293129, 1020491),
               (1281238, 2273782), (3823812, 4237281),
               (3812741, 4729139), (1292391, 2123811)]

    start = timeit.default_timer()
    results = list(map(gcd, NUMBERS))
    end = timeit.default_timer()

    print(f'{end-start:.3f} seconds')

if __name__ == "__main__":
    main()

파이썬의 GIL로 인해 여러 스레드를 병렬 실행할 수 없으므로 여러 파이썬 스레드를 활용해 실행해도 속도가 향상되지 않습니다. concurrent.futures 모듈에 있는 ThreadPoolExecutor 클래스를 활용해 두 개의 작업자 스레드로 수행해보면 1.025초가 소요되어 거의 차이가 나지 않습니다.

from my_module import gcd
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

import timeit

def main():
    NUMBERS = [(1963309, 2265973), (2030677, 3814172),
               (1551645, 2229620), (2039045, 2020802),
               (1823712, 1924928), (2293129, 1020491),
               (1281238, 2273782), (3823812, 4237281),
               (3812741, 4729139), (1292391, 2123811)]

    start = timeit.default_timer()
    pool = ThreadPoolExecutor(max_workers=2)
    results = list(pool.map(gcd, NUMBERS))
    end = timeit.default_timer()

    print(f'{end-start:.3f} seconds')

if __name__ == "__main__":
    main()

하지만 ThreadPoolExecutor 클래스 대신 ProcessPoolExecutor 클래스로 바꾸면 놀랍게도 0.75초가 소요되어 25% 정도 빨라집니다.

from my_module import gcd
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

import timeit

def main():
    NUMBERS = [(1963309, 2265973), (2030677, 3814172),
               (1551645, 2229620), (2039045, 2020802),
               (1823712, 1924928), (2293129, 1020491),
               (1281238, 2273782), (3823812, 4237281),
               (3812741, 4729139), (1292391, 2123811)]

    start = timeit.default_timer()
    pool = ProcessPoolExecutor(max_workers=2)
    results = list(pool.map(gcd, NUMBERS))
    end = timeit.default_timer()

    print(f'{end-start:.3f} seconds')

if __name__ == "__main__":
    main()

ProcessPoolExecutor 클래스는 다음과 같은 순서로 동작합니다.

  1. 입력 데이터로 들어온 map 메서드에 전달된 NUMBERS의 각 원소를 취합니다.
  2. 1번에서 얻은 원소를 pickle 모듈을 사용해 이진 데이터로 직렬화하고 자식 프로세스의 인터프리터로 직렬화한 데이터를 복사합니다.
  3. 자식은 pickle 모듈을 사용해 데이터를 파이썬 객체로 역직렬화하면서 gcd 함수를 호출합니다.
  4. 자식 프로세스는 입력 데이터에 대해 gcd 함수를 다른 자식 프로세스와 병렬로 실행합니다.
  5. gcd 함수의 결과를 이진 데이터로 직렬화하여 부모 프로세스로 직렬화한 데이터를 돌려줍니다.
  6. 부모 프로세스는 데이터를 파이썬 객체로 역직렬화하고 여러 자식 프로세스가 돌려준 결과를 병합해서 한 list로 만듭니다.

ProcessPoolExecutor 클래스를 이용한 multiprocessing 모듈은 부모와 자식 프로세스 사이에 데이터가 오고 갈때 pickle 모듈을 이용한 직렬화와 역직렬화가 일어나야 하므로 추가비용이 발생합니다. 따라서 프로그램의 다른 부분과 상태를 공유할 필요가 없고 (격리) 부모와 자식 사이에 주고받아야하는 데이터 크기는 작지만 자식 프로세스가 데이터로 인한 연산량이 매우 큰 (레버리지가 큰) 최대공약수 알고리즘이나 다른 수학 알고리즘에 대해 적합합니다. 특히, 수행해야 하는 계산이 크지 않다면 부모, 자식 프로세스 통신으로 인한 부가 비용으로 인해 병렬화를 해도 속도가 그리 빨라지지 않을 수 있습니다.


다른 예제로 매우 큰 자연수에 대해 소수인지 여부를 확인하는 작업을 ThreadPoolExecutor / ProcessPoolExecutor 에 대해 실행시켜 보겠습니다.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

import math
import timeit

PRIMES = [112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n+1, 2):
        if n % i == 0:
            return False

    return True

def main():
    t1 = timeit.default_timer()
    with ThreadPoolExecutor(max_workers=2) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print(f'{number} is prime: {prime}')

    print(f'{timeit.default_timer() - t1:.3f} seconds')

    t1 = timeit.default_timer()
    with ProcessPoolExecutor(max_workers=2) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print(f'{number} is prime: {prime}')

    print(f'{timeit.default_timer() - t1:.3f} seconds')
    
    t1 = timeit.default_timer()
    for prime in PRIMES:
        isPrime = is_prime(prime)
        print(f'{number} is prime: {isPrime}')

    print(f'{timeit.default_timer() - t1:.3f} seconds')
  • 위 프로그램을 실행하면 해당 원소가 소수인지 아닌지 여부가 전체 소요 시간과 함께 출력되는데, TreadPoolExecutor 에서는 2.738초, ProcessPoolExecutor 에서는 1.637초가 소요됩니다.
  • 일반 단일 쓰레드로 실행할때는 2.615초로 ThreadPoolExecutor로 실행할 때보다 약간 빠른데 이는 ThreadPoolExecutor 실행시 연산에 사용할 스레드 풀을 미리 생성하고 스레드끼리 통신하는데 따른 추가 시간이 소요되기 때문입니다.

주의

멀티 processing 사용 시 주의해야 할 점은 메인 프로세스가 돌아가는 파일 모듈 (__main__)이 명확히 정의되어야 한다는 점입니다. 즉, "if __name__ == '__main__'" 구문 안 에서 실행해야 하는 것이죠. 이는 자식 프로세스에서 실행되는 worker가 작업에 필요한 정보를 부모 프로세스로부터 가져오고 부모 프로세스는 결과를 취합해야되기 때문에 __main__ 모듈과 기타 모듈은 분명히 구분되어야 합니다. 따라서 __main__ 모듈이 따로 존재하지 않는 REPL (command line 환경에서 한 줄씩 명령을 인식하고 실행되는 환경으로 interpretor, jupyter notebook 등) 환경에서는 멀티 processing이 동작하지 않습니다. 반면에 멀티 threading은 상관 없이 잘 동작합니다. 

반응형

'Computer > Python' 카테고리의 다른 글

파이썬의 매개변수 전달 방식  (0) 2021.07.13
Property, Setter, Getter  (0) 2021.06.28
데코레이터와 functools.wrap  (0) 2021.06.27
Public, Private Attributes  (0) 2021.06.27
변수 영역과 클로저  (0) 2021.06.27