Effective PYTHON 2nd - Ch07
포스트
취소

Effective PYTHON 2nd - Ch07

Python 기초를 다지기 위해 효과적인 파이썬이란 책을 공부 중입니다.

7장 동시성과 병렬성

52) 자식 프로세스를 관리하기 위해 subprocess를 사용하라

  • subprocess 모듈을 사용해 자식 프로세스를 실행하고 입력과 출력 스트림을 관리할 수 있다.
  • 자식 프로세스는 파이썬 인터프리터와 병렬로 실행되므로 CPU 코어를 최대로 쓸 수 있다.
  • 간단하게 자식 프로세스를 실행하고 싶은 경우에는 run 함수를 사용하라
    • 유닉스 스타일의 파이프라인이 필요하면 Popen 클래스를 사용하라
  • 자식 프로세스가 멈추는 경우나 교착 상태를 방지하려면 communicate 메서드에 대해 timeout 파라미터를 사용하라.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import subprocess

result = subprocess.run(['echo', 'hello from subprocess!'],
        capture_output=True,
        encoding='utf-8'
        )
result.check_returncode()   # 예외가 발생하지 않으면 정상 종료임
print(result.stdout)


proc = subprocess.Popen(['sleep', '1'])
while proc.poll() is None:
    print('on working...')
    # 시간이 걸리는 작업
    ...

print('종료 상태:', proc.poll())    # => 0


# 자식 프로세스와 부모 프로세스를 분리하면
# 원하는 개수만큼 많은 자식 프로세스를 병렬로 실행할 수 있다
import time

start = time.time()
sleep_procs = []

for _ in range(10):
    proc = subprocess.Popen(['sleep', '1'])
    sleep_procs.append(proc)
for proc in sleep_procs:
    proc.communicate()      # 순차 실행시 10초 이상 걸림

end = time.time()
delta = end - start
print(f'{delta:.3} 초 만에 끝남')   # 1.04 초 만에 끝남


# 유닉스 파이프라인처럼 프로세스의 출력을 다음 프로세스 입력으로 연결시켜서
# 여러 병렬 프로세스를 연쇄적으로 실행할 수 있다
encrypt_procs = []
hash_procs = []
for _ in range(3):
    data = os.urandom(100)

    encrypt_proc = run_encrypt(data)            # PIPE => subprocess.Popen => PIPE
    encrypt_procs.append(encrypt_proc)
                                                # encrypt_proc 의 output 을 입력으로 연결
    hash_proc = run_hash(encrypt_proc.stdout)   # stdin => subprocess.Popen => PIPE
    hash_procs.append(hash_proc)

    encript_proc.stdout.close()
    encript_proc.stdout = None

for proc in encrypt_procs:
    proc_communicate()
    assert proc.returncode == 0

for proc in hash_procs:
    out, _ = proc.communicate()
    print(out[-10:])

assert proc.returncode == 0


# 블록되는 경우가 우려된다면, timeout 파라미터를 사용한다
proc  = subprocess.Popen(['sleep', '10'])
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()

print('종료 상태:', proc.poll())     # => 종료 상태: -15

53) 블로킹 I/O의 경우 스레드를 사용하고 병렬성을 피하라

  • 파이썬 스레드는 GIL(전역 인터프리터 락)로 인해 멀티 CPU 코어에서 병렬로 실행될 수 없다.
  • GIL 이 있음에도 파이썬 스레드는 여전히 유효하다.
    • 스레드를 사용하면 여러 job 을 동시에 진행하는 작업을 쉽게 기술할 수 있기 때문이다.
  • 파이썬 스레드를 사용해 여러 시스템 콜을 병렬로 수행할 수 있다.
    • 이를 활용하면 블로킹 I/O 와 계산을 동시에 수행할 수 있다.

스레드를 사용하지 않는 경우

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time

def factorize(number):
    for i in range(1, number+1):
        if number % i == 0:
            yield i

numbers = [2139079, 1214759, 1516637, 1852285]
start = time.time()

# 스레드 1개만 사용
for number in numbers:
    print(number, '==>', list(factorize(number)))

# total 0.253 sec elapsed
end = time.time()
delta = end - start
print(f'total {delta:.3f} sec elapsed')

스레드를 사용한 경우 (비슷. 성능향상이 없다)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from threading import Thread

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number
    def run(self):
        self.factors = list( factorize(self.number) )

# 각 수마다 스레드를 시작해 병렬로 인수를 찾을 수 있다.
start = time.time()

threads = []
for number in numbers:
    thread = FactorizedThread(number)
    thread.start()
    threads.append(thread)

# 모든 스레드가 끝날 때까지 기다린다
for thread in threads:
    thread.join()

end = time.time()                           # 단순 순차 실행시 0.256초
print(f'총 {end - start:.3f} 초 걸림')      # >>> 0.446초 걸림

# 놀랍게도 스레드를 하나만 써서 순차적으로 factorize를 실행할 때보다
# 시간이 더 오래 걸린다. ==> GIL 의 영향

# 그럼에도 파이썬이 스레드를 지원하는 이유는?
# 1) 다중 스레드를 사용하면 프로그램이 동시에 여러 일을 하는 것처럼 보이게 만들기 쉽다.
#    - CPython 어느 정도 균일하게 각 스레드를 실행시킨다
# 2) 블로킹 I/O 를 다루기 위해서다.
#    - 운영체제가 시스템 콜 요청에 응답하는데 걸리는 시간 동안 파이썬 프로그램이 다른 일을 할 수 있다

import select
import socket

# 순차 실행시 수행시간이 선형으로 늘어나는 작업
# ==> 총 0.503초 걸림
def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)

start = time.time()

# 블로킹 I/O를 병렬로 실행한다
threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(threads)

# 스레드를 시작한 후 다른 코드를 수행한다
def compute_another(index):
    ...

for i in range(5):                          # 다른 작업을 수행하고도
    compute_another(i)                      # 순차 실행보다 시간이 1/5로 줄어든다

for thread in threads:
    thread.join()

end = time.time()                           # 단순 순차 실행시 0.503초
print(f'총 {end - start:.3f} 초 걸림')      # >>> 총 0.102초 걸림

블로킹 I/O를 다루는 코드 (non-Thread)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import select
import socket

def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)

start = time.time()

for _ in range(5):
    slow_systemcall()

# total 0.520 sec elapsed
end = time.time()
delta = end - start
print(f'total {delta:.3f} sec elapsed')

블로킹 I/O를 다루는 코드 (Thread 사용: 성능향상!!)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import select
import socket

# 블로킹 I/O 작업 (병렬성이 필요한 작업)
def slow_systemcall():
    # 시스템 콜 (select)
    select.select([socket.socket()], [], [], 0.1)

def compute_something(pre_val, cur_val):
    # doing other job (print 사용해도 지장 없음)
    # print( pre_val + cur_val )
    return pre_val + cur_val

start = time.time()

threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

for i in range(5):
    compute_something(i, i+1)

# 모든 스레드가 끝날 때까지 기다린다
for thread in threads:
    thread.join()

# total 0.107 sec elapsed (non-thread: 0.520 sec)
end = time.time()
delta = end - start
print(f'total {delta:.3f} sec elapsed')

54) 스레드에서 데이터 경합을 피하기 위해 Lock을 사용하라

  • GIL은 여러 스레드 사이에 일어나는 데이터 경합으로부터 데이터를 보호하지 못한다
  • 여러 스레드가 상호배제 락(뮤텍스) 없이 동일 객체를 다루면 데이터를 오염시킨다
    • threading 내장 모듈의 Lock 클래스를 활용하라

Lock 없이 동일 개체를 다수의 Thread 가 다룬 경우 (데이터 오염)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 동일 객체를 대상으로 여러 스레드가 값을 갱신해서 데이터가 오염됨
# ==> Lock 필요
class Counter:
    def __init__(self):
        self.count = 0
    def increment(self, offset):
        self.count += offset

# how_many 만큼 숫자를 증가
def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        # 센서를 읽는다
        counter.increment(1)

from threading import Thread

how_many = 10**5
counter = Counter()

start = time.time()

threads = []
for i in range(5):
    thread = Thread(target=worker, args=(i, how_many, counter))
    thread.start()
    threads.append(thread)

# 모든 스레드가 끝날 때까지 기다린다
for thread in threads:
    thread.join()

# how_many 만큼 증가시키는 작업을 5번 실행한 결과 (0.085 sec elapsed)
# expected value = 500000, but found value = 367540
expected = how_many*5
found = counter.count
print(f'expected value = {expected}, but found value = {found}')

end = time.time()
delta = end - start
print(f'total {delta:.3f} sec elapsed')

# 결과
# expected value = 500000, but found value = 450379
# total 0.085 sec elapsed

Lock 으로 보호된 객체를 다루는 경우 (올바른 결과)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from threading import Lock

# 데이터 오염 방지
class LockingCounter:
    def __init__(self):
        self.lock = Lock()
        self.count = 0
    def increment(self, offset):
        with self.lock:
            self.count += offset

# how_many 만큼 숫자를 증가
def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        # 센서를 읽는다
        counter.increment(1)

from threading import Thread

how_many = 10**5
counter = LockingCounter()

start = time.time()

threads = []
for i in range(5):
    thread = Thread(target=worker, args=(i, how_many, counter))
    thread.start()
    threads.append(thread)

# 모든 스레드가 끝날 때까지 기다린다
for thread in threads:
    thread.join()

# how_many 만큼 증가시키는 작업을 5번 실행한 결과 (0.143 sec elapsed)
# expected value = 500000, but found value = 500000
expected = how_many*5
found = counter.count
print(f'expected value = {expected}, but found value = {found}')

end = time.time()
delta = end - start
print(f'total {delta:.3f} sec elapsed')

# 결과
# expected value = 500000, but found value = 500000
# total 0.143 sec elapsed

55) Queue를 사용해 스레드 사이의 작업을 조율하라

  • 순차적인 작업을 동시에 여러 스레드에서 실행되도록 조직하고 싶을 때라면 파이프라인이 유용하다
    • 특히 I/O 위주의 작업
  • 동시성 파이프라인을 만들 때 발생할 수 있는 여러 문제를 잘 알아두어야 한다
    • 바쁜 대기, 작업자에게 종료를 알리는 방법, 잠재적인 메모리 사용량 폭발 등
  • Queue 클래스는 튼튼한 파이프라인을 구축할 때 필요한 블로킹, 버퍼크기 지정, join을 통한 완료 대기를 모두 제공한다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def download(item):
    print('downloaded:', item)
    return item
def resize(item):
    print('resizeed:', item)
    return item
def upload(item):
    print('uploaded:', item)
    return item

from collections import deque
from threading import Lock

class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
    def put(self, item):
        with self.lock:
            self.items.append(item)
    def get(self):
        with self.lock:
            return self.items.popleft()

from threading import Thread
import time

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01)  # 할 일이 없음 (잠시 대기)
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()

done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()

for i in range(1000):
    download_queue.put(str(i))

while len(done_queue.items) < 1000:
    # 기다리는 동안 다른 작업을 수행할 수 있다
    pass

processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print(f'{processed} 개의 아이템을 처리,',f'이때 폴링을 {polled} 번 했습니다.')

# 결과
# 1000 개의 아이템을 처리, 이때 폴링을 3010 번 했습니다.

Queue 를 사용하면 기아 상태로 인한 바쁜 대기를 방지할 수 있다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from queue import Queue

my_queue = Queue()

def consumer():
    print('consumer ready')
    # 비어 있으면 대기 (불필요한 polling을 막아줌)
    my_queue.get()
    print('consumer done')

thread = Thread(target=consumer)
thread.start()

print('producer add data')
my_queue.put(object())
print('producer done')
thread.join()

# 결과
# consumer ready
# producer add data
# producer done
# consumer done

########################################

my_queue = Queue(1)  # 버퍼 크기

def consumer():
    time.sleep(0.1)
    my_queue.get()
    print('consumer#1')
    my_queue.get()
    print('consumer#2')
    print('consumer done')

thread = Thread(target=consumer)
thread.start()

my_queue.put(object())
print('producer#1')
my_queue.put(object())
print('producer#2')
print('producer done')

thread.join()

# 결과
# producer#1
# consumer#1
# producer#2
# producer done
# consumer#2
# consumer done

선형적인 파이프라인의 경우 Queue 가 잘 작동한다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import select
import socket

# 블로킹 I/O 작업 (병렬성이 필요한 작업)
def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)

def download(item):
    print('downloaded:', item)
    slow_systemcall()
    return item
def resize(item):
    print('resizeed:', item)
    slow_systemcall()
    return item
def upload(item):
    print('uploaded:', item)
    slow_systemcall()
    return item

from collections import deque
from threading import Lock

class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # 스레드 종료
                yield item
            finally:
                self.task_done()

from threading import Thread
import time

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


def start_threads(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

def stop_threads(closable_queue, threads):
    for _ in threads:
        closable_queue.close()

    closable_queue.join()
    for thread in threads:
        thread.join()

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

download_threads = start_threads(3, download, download_queue, resize_queue)
resize_threads = start_threads(4, resize, resize_queue, upload_queue)
upload_threads = start_threads(5, upload, upload_queue, done_queue)

for i in range(1000):
    download_queue.put(str(i))

stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)

# 기다리는 동안 다른 작업을 수행할 수 있을줄 알았는데 실행 안된다 (왜지??)
# ==> qsize() 에 대해 Lock 이 걸리나?
while done_queue.qsize() < 1000:
    print('--'*10)

print(f'{done_queue.qsize()} 개의 아이템을 처리했습니다.')

56) 언제 동시성이 필요할지 인식하는 방법을 알아두라

1

57) 요구에 따라 팬아웃을 진행하려면 새로운 스레드를 생성하지 말라

1

58) 동시성과 Queue를 사용하기 위해 코드를 어떻게 리팩터링해야 하는지 이해하라

1

59) 동시성을 위해 스레드가 필요한 경우에는 ThreadpoolExecutor를 사용하라

1

60) I/O를 할 때는 코루틴을 사용해 동시성을 높여라

1

61) 스레드를 사용한 I/O를 어떻게 asyncio로 포팅할 수 있는지 알아두라

1

62) asyncio로 쉽게 옮겨갈 수 있도록 스레드와 코루틴을 함께 사용하라

1

63) 응답성을 최대로 높이려면 asyncio 이벤트 루프를 블록하지 말라

1

64) 진정한 병렬성을 살리려면 concurrent.futures를 사용하라

1

 
 

끝!   읽어주셔서 감사합니다.

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.