본문 바로가기

Python/Python

[Python] Futures를 사용해서 병렬성(Concurrency) 수행하기

반응형

이번 포스팅에서는 Python에서 병렬성을 수행하는 방법에 대해 알아보자. 저번 포스팅에서는 Python에서 병행성을 수행하기 위한 코루틴에 대해 알아보았다. 다시 한 번 언급하지만 병행성은 여러가지 일을 순차적으로 왔다갔다 빠르게 실행함으로써 한 번에 여러가지 일을 동시에 하듯이 수행하는 것을 의미한다. 반면 병렬성은 실제로 한 번에 여러가지 일을 동시에 처리하는 것을 말한다. 이번에는 파이썬에서 병렬성을 수행하는 방법에 대해 알아보자.

 

Python의 병렬성에 대해 알아보자

 


파이썬에서는 병렬성을 수행하기 위해서 Futures라는 모듈을 사용한다. 자세한 내용은 파이썬 공식문서에서 살펴보자. 병렬성을 실행하기 위해서는 Futures 모듈에서 멀티 쓰레드 또는 멀티 프로세스 익스큐터를 사용한다. 멀티 쓰레드, 멀티 프로세스에 대한 개념은 저번 포스팅에서 소개했으므로 잘 모르겠다면 확인하고 오자.

 

멀티 쓰레드 또는 멀티 프로세스를 사용하는 방법 중에 크게 2가지 방법이 존재한다. 첫 번째는 map 함수를 사용해서 제네레이터로 반환하는 방법, 두 번째는 wait, as_completed를 활용하는 방법이 존재한다. 먼저 map 함수를 사용하는 방법에 대해 알아보자.

1. map 사용하기

현재 우리는 아래와 같이 총 4가지의 숫자에 대해서 누적합을 구하는 작업들 4개를 해야 한다고 가정해보자.

 

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

WORK_LIST = [int(1e4), int(1e5), int(1e6), int(1e7)]

# 누적 합 계산 함수
def sum_generator(n):
    sum_val = sum(x for x in range(1, n+1))
    return sum_val


def main():
    # worker 개수 시정
    worker = min(10, len(WORK_LIST))
    # 시작 시간
    st = time.time()

    # 멀티쓰레드로 수행 (멀티 프로세스로 바꾸려면 ProcessPoolExecutor로 변경)
    with ThreadPoolExecutor(max_workers=worker) as executor:
        result = executor.map(sum_generator, WORK_LIST)

    # 종료 시간
    et = time.time() - st

    print('Result: {}, Time: {:.2f}s'.format(list(result), et))

if __name__ == '__main__':
    main()

 

위와 같이 with 구문으로 멀티 쓰레드 또는 멀티 프로세스를 정의해준다. 그리고 map 함수를 통해 map(function, 작업할 것들이 담긴 리스트)로 사용이 가능하다. 반환되는 객체는 제네레이터 객체가 반환되므로 결과를 출력하려면 list로 감싸주면 된다.

2. wait, as_completed 사용하기

두 번째는 wait 또는 as_completed를 사용하는 방법이다. 그 중에서 wait를  먼저 살펴보자. wait는 특정 타임아웃 옵션을 설정해줌으로써 실행할 작업들을 순차적으로 병렬적으로 수행하는데, 해당 타임아웃을 넘어가는 작업물은 중단(pending) 시켜버린다. 따라서 특정 타임아웃 내에만 수행이 완료된 작업들만 반환된다. 또한 설정한 타임아웃 내에 수행된 작업들은 done, 수행되지 않은 작업들은 not_done 속성으로 관찰이 가능하다. 이제 wait를 사용하는 방법을 알아보자. wait를 사용하기 위해서는 futures에서 wait 모듈을 따로 임포트해주어야 한다.

 

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait
import time

WORK_LIST = [int(1e4), int(1e5), int(1e6), int(1e7)]

# 누적 합 계산 함수
def sum_generator(n):
    sum_val = sum(x for x in range(1, n+1))
    return sum_val


def main():
    worker = min(10, len(WORK_LIST))

    st = time.time()

    future_lst = []
    with ProcessPoolExecutor(max_workers=worker) as executor:
        # 작업할 것들에 대해서
        for work in WORK_LIST:
            # submit으로 작업이 실행되도록 예약하고 객체의 실행을 나타내는 Future객체를 반환
            future = executor.submit(sum_generator, work)
            # future객체를 리스트에 담기
            future_lst.append(future)
            print('Scheduled for {}: {}'.format(work, future))

        # wait은 타임아웃 설정할 수 있는데, 해당 타임 초과되는 작업물들을 pending 됨
        result = wait(future_lst, timeout=7)
        print('Completed Task:', result.done)
        print('Over timeout Task:', result.not_done)
        # 수행된 결과물만 출력
        print([res.result() for res in result.done])
        # 수행되지 않은 결과물만 출력
        print([res.result() for res in result.not_done])


if __name__ == '__main__':
    main()

 

이전과 달라진 방법은 멀티 쓰레드 또는 멀티 프로세스를 정의한 with 구문 안에서 실행할 작업물들에 대해 하나하나 loop를 돌면서 익스큐터의 submit 메소드로 Futures 객체를 만들어준다. 이 submit 메소드는 공식 문서에서도 확인할 수 있듯이 해당 작업이 실행되도록 예약하고 객체의 실행을 나타내는 Future 객체를 만들어주는 역할을 한다. 그리고 이 Future 객체를 따로 리스트에 담아준다. 다음엔 각 작업물에 대한 Future 객체들이 담긴 리스트를 wait 메소드에 넣어준다. 단, 이 때 타임아웃 옵션을 설정할 수 있다. 그리고 done, not_done 속성을 통해 수행한 병렬성에서 수행되거나 수행되지 않은 작업물들이 무엇인지 알 수 있다.

 

다음은 as_completed를 활용한 방법이다. as_completed가 wait과 다른 점은 정의한 작업물들 순서에 상관없이 시간이 적게 걸리는 작업들부터 수행한다는 것이다. 다시 말해, 만약 [천만, 만, 백만, 십만] 에 대한 각각 누적합을 구하라고 한다면 wait는 리스트에 담겨있는 작업물 순차적으로 수행하지만 as_completed는 가장 적게 걸릴 만(10,000)에 대한 누적합을 구하고 다음은 십만 -> 백만 -> 천만 이런 식으로 수행한다는 것이다. 또한 as_completed도 wait 처럼 타임아웃 설정이 가능하다.

 

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

WORK_LIST = [int(1e5), int(1e6), int(1e7), int(1e8)]

# 누적 합 계산 함수
def sum_generator(n):
    sum_val = sum(x for x in range(1, n+1))
    return sum_val


def main():
    worker = min(10, len(WORK_LIST))

    st = time.time()

    future_lst = []
    with ProcessPoolExecutor(max_workers=worker) as executor:
        for work in WORK_LIST:
            future = executor.submit(sum_generator, work)
            future_lst.append(future)
        
        # as_completed
        for future in as_completed(future_lst, timeout=7):
            # 가장 적게 걸리는 작업물부터 수행
            result = future.result()
            # 수행 완료된 작업물
            done = future.done()
            # 타임아웃을 벗어나 수행되지 않은 작업물
            cancelled = future.cancelled()
            print('Result: {}, Done: {}'.format(result, done))
            print('Cancelled: {}'.format(cancelled))



if __name__ == '__main__':
    main()

 

wait와 다른 차이점이라고 하면 for loop를 다시 사용한다는 점과 수행된 결과물을 보기 위해서는 done() 처럼 callable 하게 호출한다는 점, 설정한 타임아웃 옵션을 넘어서 수행되지 않은 작업물들을 보기 위해서는 cancelled()로 callable 하게 호출한다는 점도 다르다.

반응형