Language/Python

Multiprocessing

테리는당근을좋아해 2023. 3. 24. 01:00

- multiprocessing(https://docs.python.org/3/library/multiprocessing.html) 공식 문서 참고

 

Multiprocessing

- multiprocessing은 API를 사용해 process spawning 지원

- processing spawning은 parent process가 os에 요청해 새로운 child process를 만들어내는 과정

- local concurrency와 remote concurrency를 지원하며, GIL(Global Interpreter Lock)을 효과적으로 우회

- Local concurrency is defined as "within a system" (e.g. a multi-core CPU), nonlocal concurrency is defined as "between systems"

 

 

Process Pool

from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3])) # [1, 4, 9]

 

 

 

concurrent.futures.ProcessPoolExecutor

- 호출 프로세스의 실행을 차단하지 않고 taskf를 백그라운드 프로세스로 실행하는 더 높은 수준의 인터페이스 제공

- Pool과 비교했을 때, 작업을 실행하는 것과 결과를 기다리는 것을 더 쉽게 분리할 수 있음

 

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419
]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

 

 

 

Process Class

- multiprocessing package에 process는 Process 객체를 만들고 start() 메서드를 호출함으로써 spawn됨

- Process 클래스는 threading.Thread API를 참고

 

from multiprocessing import Process
import os

def f(name):
    print(f"parent proecss : {os.getppid()}")
    print(f"process : {os.getpid()}")
    print(f"hello, {name}")

if __name__ == '__main__': # 해당 구문을 사용해서 entry point를 보호하지 않으면 runtime error 발생
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

 

 

 

Context and start method

- multiprocessing 프로세스를 시작하는 세가지 방법을 제공합니다.

 

1) spawn

- 부모 프로세스는 새로운 python interperter process를 시작

- 자식 프로세스의 run() 메서드를 실행하는데 필요한 리소스만을 상속

- 특히, 부모 프로세스의 불필요한 file descriptor와 handle은 상속되지 않음

- fork, forkserver를 사용하는 것에 비해 다소 느림

- Windows, macOS의 default 설정

 

2) fork

- 부모 프로세스는 os.fork()를 사용해 python 인터프리터를 분기

- 부모 프로세스의 모든 리소스는 자식 프로세스에 상속

 

3) forkserver

- forkserver를 사용하면 서버 프로세스가 시작됨

- 새 프로세스가 필요할때마다 상위 프로세스는 서버에 연결해 새 프로세스를 fork하도록 요청

- forkserver 프로세스는 단일 스레드이므로 thread safe

- 특정 Unix 플랫폼에서만 사용할 수 있음

 

 

시작 방법 설정

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn') # set_start_method() 를 두번이상 사용해서는 안됨
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

 

 

컨텍스트 객체 가져오기

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

 

- 한 컨텍스트와 관련된 객체는 다른 컨텍스트의 프로세스와 호환되지 않을 수 있음

- fork 컨텍스트로 생성된 lock은 spawn이나 forkserver로 만들어진 프로세스로 전달될 수 없음

 

 

프로세스 간 객체 교환

- multiprocessing 패키지는 프로세스간 통신을 위해 두가지 통신 방법을 제공

 

1) Queue

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, "hello"])

if __name__ == "__main__":
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join

 

- Queue 객체는 thread, Process 안전한 객체

 

2) Pipe

- Pipe() 함수는 파이프(duplex)로 연결된 한 쌍의 connection 객체를 반환

 

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, "hello"])
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()