파이썬 asyncio로 생산자/소비자 (Producer/Consumer) 패턴 구현하기
오늘은 파이썬에서 공식적으로 지원하는 비동기 라이브러리인 asyncio
로 생산자/소비자 (Producer/Consumber) 패턴을 구현하는 방법에 대해서 다뤄보겠습니다.
해당 포스트에서 사용하는 모든 예시는 파이썬 3.11 버전 이상을 가정하고 있습니다.
비동기 프로그래밍과 생산자/소비자 (Producer/Consumer) 패턴
비동기 프로그래밍이란
비동기 프로그래밍은 모든 코드가 순차적으로 실행되는 동기식 프로그래밍과 달리, 작업에 대기가 발생했을 때 (e.g., 네트워크 요청 후 응답) 해당 작업이 끝나기 전까지 다른 작업을 수행하는 프로그램 방식을 의미합니다. 이를 통해 네트워크 입출력(I/O)와 같이 대기가 발생하는 일을 처리할 때 성능을 높일 수 있다는 장점이 있습니다. 보다 자세한 내용은 이전에 작성한 '파이썬과 비동기 프로그래밍 #1, 비동기 프로그래밍이란'을 참고해 주시면 감사하겠습니다.
생산자/소비자 (Producer/Consumer) 패턴이란
애플리케이션에서 '생산자(Producer)'는 데이터를 생산하는 주체를 의미하고, '소비자(Consumber)'는 생산자가 생산해 내는 데이터를 소비하는 주체를 의미합니다. 이해를 돕기 위해 비유를 들어서 설명해 보겠습니다. 여러분들이 핫도그 가게를 운영한다고 생각해 봅시다. 이때, 핫도그를 만들어 내는 사람이 '생산자', 핫도그를 구입해서 먹는 손님들이 '소비자'입니다.
만약에 핫도그가 손님들이 빠지는 속도에 비해 너무 빨리 (많이) 만들어지게 된다면 핫도그 재고 및 품질 문제가 발생할 수 있습니다. 반대로, 핫도그를 만드는 속도에 비해 손님들이 더 빠르게 방문하게 된다면 대기하는 시간이 길어지고 가게의 평판이 나빠지게 될겁니다. 생산자는 핫도그를 만들어내는 거 말고는 다른 일을 할 수 없겠죠.
이러한 문제를 해결하기 위해서는 핫도그를 분배하는 방식이 중요해집니다. 이처럼 생산자/소비자 패턴은 이렇게 생산자의 데이터 생산 속도와 소비자의 데이터 소비 속도의 불균형이 발생할 때 이를 효율적으로 분배하는 패턴을 의미합니다.
비동기 프로그래밍과의 관계
그럼 생산자/소비자 패턴과 비동기 프로그래밍은 어떤 관계가 있을까요? 동기식 프로그래밍에서는 핫도그를 한 개 만드는 동안 생산자가 아무 일도 할 수 없습니다. 생산자는 핫도그를 하나씩 만들게 되고 하나의 핫도그가 만들어지게 되면 손님에게 나누어주고 다시 핫도그를 한 개를 구워야 합니다. 이 과정에서 엄청난 비효율이 발생합니다. 비동기 프로그래밍을 활용하면, 생산자는 손님이 핫도그를 기다리는 동안 여러 개의 핫도그를 한꺼번에 구울 수도 있고, 핫도그가 구워지는 동안 청소나 재료 손질 같은 다른 일을 할 수 있습니다.
핫도그 대기열을 통해서 생산자/소비자 문제 해결하기
위의 문제를 해결하기 위한 한 가지 방법은 핫도그 대기열 (Queue)을 만드는 것 입니다. 크기가 정해진 핫도그 보관함을 만들어두고 생산자는 미리 핫도그를 구워서 해당 보관함에 순차적으로 넣어둡니다. 핫도그 보관함에 핫도그가 차있다면 생산자는 핫도그를 만드는 속도를 조절할 수 있고, 손님들이 몰리는 상황에서는 미리 준비해둔 핫도그 보관함에서 바로 핫도그를 꺼내줄 수 있습니다.
프로그래밍에서는 이를 Queue를 통해서 구현하게 됩니다. 생산자의 속도가 빠를 때는 데이터를 Queue에 넣어두고, 소비자의 속도가 빨라질 때는 Queue에 있는 데이터를 순차적으로 가져가게 함으로써 속도의 불균형 문제를 조절할 수 있습니다.
asyncio를 통해 생산자/소비자 패턴 구현하기
이제 파이썬의 asyncio 라이브러리를 사용하여 비동기 방식으로 생산자/소비자 패턴을 구현하는 방법에 대해 알아보겠습니다. 이 예제에서는 핫도그 대기열(asyncio.Queue
)을 사용하여 생산자가 핫도그를 준비하고, 소비자가 준비된 핫도그를 가져가는 과정을 모델링 합니다.
import asyncio
import random
async def producer(queue: asyncio.Queue[str], name: str):
for i in range(10):
await asyncio.sleep(random.random()) # 핫도그를 만드는데 시간이 걸림을 가정
item = f"핫도그 {i+1} ({name})"
await queue.put(item) # 대기열에 핫도그 추가
print(f"{item} 준비됨.")
async def consumer(queue: asyncio.Queue[str], name: str):
while True:
await asyncio.sleep(
random.random() * 3
) # 소비자가 핫도그를 먹는데 시간이 걸림을 가정
try:
item = await asyncio.wait_for(queue.get(), 3) # 대기열에서 핫도그 가져오기
except asyncio.TimeoutError:
print("시간 초과: 더 이상 핫도그가 없음.")
break
queue.task_done() # 작업 완료 표시
print(f"{name}가 {item}를 받아감.")
async def main():
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=3) # 최대 크기가 3인 대기열 생성
# 생산자와 소비자 태스크 생성
producers = [producer(queue, f"생산자 {i+1}") for i in range(2)] # 2 명의 생산자
consumers = [consumer(queue, f"소비자 {i+1}") for i in range(4)] # 4 명의 소비자
async with asyncio.TaskGroup() as g:
for p in producers:
g.create_task(p)
for c in consumers:
g.create_task(c)
# 3.11 이전 버전에서는 아래 코드를 사용
# await asyncio.gather(*producers, *consumers)
# 모든 항목이 처리될 때까지 기다림
await queue.join()
if __name__ == "__main__":
asyncio.run(main())
실행결과
핫도그 1 (생산자 2) 준비됨.
핫도그 2 (생산자 2) 준비됨.
소비자 2가 핫도그 1 (생산자 2)를 받아감.
핫도그 3 (생산자 2) 준비됨.
소비자 3가 핫도그 2 (생산자 2)를 받아감.
핫도그 1 (생산자 1) 준비됨.
소비자 3가 핫도그 3 (생산자 2)를 받아감.
핫도그 2 (생산자 1) 준비됨.
...
소비자 3가 핫도그 8 (생산자 1)를 받아감.
핫도그 10 (생산자 1) 준비됨.
소비자 4가 핫도그 7 (생산자 2)를 받아감.
소비자 2가 핫도그 9 (생산자 1)를 받아감.
소비자 1가 핫도그 8 (생산자 2)를 받아감.
핫도그 9 (생산자 2) 준비됨.
핫도그 10 (생산자 2) 준비됨.
소비자 2가 핫도그 10 (생산자 1)를 받아감.
소비자 4가 핫도그 9 (생산자 2)를 받아감.
소비자 3가 핫도그 10 (생산자 2)를 받아감.
시간 초과: 더 이상 핫도그가 없음.
...
- 먼저 최대 크기가 3인 Queue를 생성합니다.
만약 Queue에 핫도그가 3개보다 적다면 생산자는 핫도그를 더 생산할 수 있고, 그렇지 않고 핫도그가 3개 가득 차 있다면 생산자는 생산을 멈춥니다. - 생산자는 핫도그 생산을 마치면 Queue에 핫도그를 추가합니다.
이는asyncio.Queue
의put
메서드를 통해서 구현 가능합니다. - 소비자는 큐에서 핫도그를 가져와서 처리합니다.
이는asyncio.Queue
의get
메서드를 통해서 구현 가능합니다. - 소비자는 핫도그를 성공적으로 가져오면
queue.task_done()
을 통해 해당 작업이 완료되었음을 알립니다.
이 방식을 통해, 생산자와 소비자 간의 데이터 (여기서는 핫도그) 교환을 비동기적으로 처리할 수 있으며, 애플리케이션의 성능을 향상시킬 수 있습니다.
현실세계에 적용하기
지금까지 저희는 생산자/소비자 패턴을 코드로 구현하는 방법을 간단하게 살펴보았습니다. 현실 세계에서 해당 패턴을 적용하기 위해서 고려해야 할 사항과 실제 해당 패턴을 현업에서 사용하면서 도움이 된 팁들을 몇 가지 공유하고자 합니다.
- Queue 사이즈 조정
Queue 사이즈가 너무 크면 애플리케이션 상에서 메모리 문제가 발생할 수 있고, 반대로 너무 작다면 처리 속도에 문제가 발생할 수 있습니다. 적정한 Queue 사이즈는 도메인과 데이터 특성에 따라서 다르므로 각 상황에 맞게 적절하게 설정할 필요가 있습니다.
- 데이터를 배치로 생산하기
Queue가 있더라도 데이터의 생산 속도의 지연은 발생할 수 있습니다. 이때 사용할 수 있는 한 가지 방법은 데이터를 배치로 묶어서 생산하는 것입니다. 예를 들어, json 데이터를 dict로 바꾸는 과정은 오버헤드가 발생할 수 있습니다. 아래 예시와 같이 json 데이터를 묶어서 한 번에 변환하여 여기서 발생하는 오버헤드를 줄일 수 있습니다.
import asyncio
import json
json_data = [
'{"name": "핫도그 1", "price": 1000}',
'{"name": "핫도그 2", "price": 1500}',
'{"name": "핫도그 3", "price": 2000}',
'{"name": "핫도그 4", "price": 2100}',
'{"name": "핫도그 5", "price": 2200}',
# 더 많은 JSON 항목들...
]
async def producer(queue):
batch = ""
for idx, item in enumerate(json_data):
# 3개씩 묶어서 큐에 추가
if idx % 3 < 3:
batch += "," + item if batch else item
# 남은 항목이 있으면 큐에 추가
if batch:
await queue.put(json.loads(f"[{batch}]"))
batch = ""
async def consumer(queue):
while True:
try:
batch = await asyncio.wait_for(queue.get(), 0.5)
except asyncio.TimeoutError:
print("시간 초과: 더 이상 핫도그가 없음.")
break
for item in batch:
# 각 JSON 객체 처리
print(f'{item["name"]} 처리됨, 가격: {item["price"]}')
queue.task_done()
async def main():
queue = asyncio.Queue()
async with asyncio.TaskGroup() as g:
g.create_task(producer(queue))
g.create_task(consumer(queue))
await queue.join()
asyncio.run(main())
- 데이터를 배치로 소비하여 처리하기
소비자에서 지연이 발생하여 Queue가 차는 상황에서는 Queue에 있는 데이터를 한 번에 배치로 가져와 처리하는 방법을 사용할 수 있습니다. 아래 예시에서는 wait_queue_empty
라는 함수를 구현하여 큐에 차있는 모든 데이터를 가져와서 처리하도록 구현 해보았습니다.
import asyncio
import json
from typing import TypeVar
# 예시 JSON 데이터
json_data = [
'{"name": "핫도그 1", "price": 1000}',
'{"name": "핫도그 2", "price": 1500}',
'{"name": "핫도그 3", "price": 2000}',
'{"name": "핫도그 4", "price": 2100}',
'{"name": "핫도그 5", "price": 2200}',
# 더 많은 JSON 항목들...
]
T = TypeVar("T")
async def wait_queue_empty(queue):
data = await queue.get()
queue.task_done()
result = [data]
while True:
try:
data = queue.get_nowait()
queue.task_done()
result.append(data)
except asyncio.QueueEmpty:
break
return result
async def producer(queue):
for item in json_data:
deserialized_item = json.loads(item)
await queue.put(deserialized_item) # JSON 데이터 항목을 개별적으로 큐에 추가
print(f'{deserialized_item["name"]} 준비됨.')
await asyncio.sleep(0.1) # 생산 속도를 조절하기 위한 짧은 대기 시간
async def consumer(queue):
while True:
try:
batch = await asyncio.wait_for(wait_queue_empty(queue), 0.5)
except asyncio.TimeoutError:
print("시간 초과: 더 이상 핫도그가 없음.")
break
for item in batch:
print(f'{item["name"]} 처리됨, 가격: {item["price"]}')
await asyncio.sleep(0.3) # 소비 속도를 늦추어 큐에 데이터가 쌓이게 함
async def main():
queue = asyncio.Queue()
async with asyncio.TaskGroup() as g:
g.create_task(producer(queue))
g.create_task(consumer(queue))
await queue.join()
asyncio.run(main())
추가로 배치로 데이터를 처리할 때는 상황에 따라서 아래와 같은 방법을 통해 데이터 처리의 효율성을 높여줄 수 있습니다.
- 과거 데이터는 무시하고 최신 데이터만 사용하기 (최신 데이터만 중요한 경우)
- 과거 데이터와 최신 데이터를 합쳐서 사용하기
오늘은 Python의 asyncio 라이브러리를 사용하여 비동기 프로그래밍에서 생산자/소비자 (Producer/Consumer) 패턴을 구현하는 방법에 대해 알아보았습니다. 이 패턴은 데이터의 생산자와 소비자의 데이터 처리속도에 불균형을 효율적으로 관리할 수 있게 해주는 패턴이며, 애플리케이션의 성능을 향상시켜줄 수 있습니다.
오늘 배운 패턴이 여러분들의 문제를 해결하는 데 조금이나마 도움이 되면 좋겠습니다. 긴 글 읽어주셔서 감사합니다. 🙏