Асинхронные паттерны: Producer-Consumer, Rate Limiter, Circuit Breaker, Retry с экспоненциальной задержкой.
Asyncio превращает ожидание в работу. Паттерны превращают хаос в порядок.
Producer-Consumer — паттерн, где одни корутины производят данные, а другие потребляют через очередь.
# ✅ Producer-Consumer через asyncio.Queue
import asyncio
async def producer(queue: asyncio.Queue, name: str):
"""Производитель данных"""
for i in range(5):
item = f"item-{i}"
await queue.put(item)
print(f"{name} produced: {item}")
await asyncio.sleep(0.1) # Имитация работы
# Сигнал окончания
await queue.put(None)
async def consumer(queue: asyncio.Queue, name: str):
"""Потребитель данных"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consuming: {item}")
await asyncio.sleep(0.2) # Обработка
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Несколько производителей
producers = [
asyncio.create_task(producer(queue, f"Producer-{i}"))
for i in range(2)
]
# Несколько потребителей
consumers = [
asyncio.create_task(consumer(queue, f"Consumer-{i}"))
for i in range(3)
]
await asyncio.gather(*producers)
await queue.join() # Ждём обработки всех элементов
# Отменяем потребителей
for c in consumers:
c.cancel()
asyncio.run(main())# ✅ С обработкой ошибок
import asyncio
from typing import Any, Callable
async def safe_consumer(
queue: asyncio.Queue,
handler: Callable[[Any], Any],
name: str = "Consumer"
):
"""Потребитель с обработкой ошибок"""
while True:
try:
item = await queue.get()
if item is None:
queue.task_done()
break
try:
result = await handler(item)
print(f"{name}: {item} → {result}")
except Exception as e:
print(f"{name} error processing {item}: {e}")
queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
print(f"{name} unexpected error: {e}")
# Использование
async def process_item(item: str) -> str:
await asyncio.sleep(0.1)
return item.upper()
async def main():
queue = asyncio.Queue()
# Запуск потребителей
consumers = [
asyncio.create_task(safe_consumer(queue, process_item, f"Worker-{i}"))
for i in range(3)
]
# Производство
for i in range(10):
await queue.put(f"task-{i}")
# Сигнал окончания
for _ in consumers:
await queue.put(None)
await queue.join()
asyncio.run(main())# ✅ Token Bucket Rate Limiter
import asyncio
import time
class TokenBucketLimiter:
def __init__(self, rate: float, capacity: int):
self.rate = rate # Токенов в секунду
self.capacity = capacity
self.tokens = capacity
self.last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> bool:
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_update
# Восполнение токенов
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
async def wait_and_acquire(self, tokens: int = 1):
"""Ждёт и получает токены"""
while not await self.acquire(tokens):
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait_time)
# Использование
async def fetch_with_rate_limit(url: str, limiter: TokenBucketLimiter):
await limiter.wait_and_acquire()
# ... HTTP запрос ...
print(f"Fetched: {url}")
async def main():
limiter = TokenBucketLimiter(rate=2, capacity=5) # 2 запроса/сек
urls = [f"https://api.example.com/data/{i}" for i in range(10)]
tasks = [fetch_with_rate_limit(url, limiter) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())# ✅ Sliding Window Rate Limiter
from collections import deque
class SlidingWindowLimiter:
def __init__(self, rate: int, period: float):
self.rate = rate # Максимум запросов
self.period = period # За период (секунды)
self.timestamps = deque()
self._lock = asyncio.Lock()
async def acquire(self) -> bool:
async with self._lock:
now = time.monotonic()
cutoff = now - self.period
# Удаляем старые timestamps
while self.timestamps and self.timestamps[0] < cutoff:
self.timestamps.popleft()
if len(self.timestamps) < self.rate:
self.timestamps.append(now)
return True
return False
async def wait_and_acquire(self):
while not await self.acquire():
# Ждём пока освободится место
wait_time = self.timestamps[0] + self.period - time.monotonic()
if wait_time > 0:
await asyncio.sleep(wait_time)
# Использование
async def main():
limiter = SlidingWindowLimiter(rate=5, period=1.0) # 5 запросов в секунду
for i in range(10):
await limiter.wait_and_acquire()
print(f"Request {i}")
asyncio.run(main())# ✅ Circuit Breaker
import asyncio
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Нормальная работа
OPEN = "open" # Цепь разомкнута
HALF_OPEN = "half_open" # Пробное включение
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self._state = CircuitState.CLOSED
self._failure_count = 0
self._last_failure_time = 0
self._half_open_calls = 0
self._lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
return self._state
async def call(self, func: Callable, *args, **kwargs) -> Any:
async with self._lock:
if self._state == CircuitState.OPEN:
if self._should_try_reset():
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
else:
raise CircuitBreakerOpen("Circuit is open")
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise
def _should_try_reset(self) -> bool:
return (time.monotonic() - self._last_failure_time) >= self.recovery_timeout
async def _on_success(self):
async with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._half_open_calls += 1
if self._half_open_calls >= self.half_open_max_calls:
self._state = CircuitState.CLOSED
self._failure_count = 0
else:
self._failure_count = 0
async def _on_failure(self):
async with self._lock:
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
elif self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
class CircuitBreakerOpen(Exception):
pass
# Использование
async def external_api():
# Может выбросить исключение
pass
async def main():
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
try:
result = await breaker.call(external_api)
except CircuitBreakerOpen:
print("Circuit is open, using fallback")
result = "fallback_value"
asyncio.run(main())# ✅ Exponential Backoff Retry
import asyncio
import random
from typing import Callable, Any, Type, Tuple
async def retry_with_backoff(
func: Callable,
*args,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
**kwargs
) -> Any:
"""
Повтор с экспоненциальной задержкой.
delay = min(base_delay * (exponential_base ^ attempt), max_delay)
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
# Экспоненциальная задержка
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# Jitter для предотвращения thundering herd
if jitter:
delay *= (0.5 + random.random())
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
await asyncio.sleep(delay)
raise last_exception
# Использование
async def flaky_api():
if random.random() < 0.7:
raise ConnectionError("Network error")
return "success"
async def main():
try:
result = await retry_with_backoff(
flaky_api,
max_retries=5,
base_delay=1.0,
exceptions=(ConnectionError,)
)
print(f"Result: {result}")
except Exception as e:
print(f"All retries failed: {e}")
asyncio.run(main())# ✅ Semaphore для ограничения параллелизма
import asyncio
async def fetch_with_semaphore(
url: str,
semaphore: asyncio.Semaphore,
session
):
async with semaphore: # Ограничивает количество одновременных запросов
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(10) # Максимум 10 одновременных запросов
urls = [f"https://api.example.com/data/{i}" for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_semaphore(url, semaphore, session)
for url in urls
]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} pages")
asyncio.run(main())# ✅ Timeout для операций
import asyncio
async def operation_with_timeout(
coro,
timeout: float,
fallback: Any = None
) -> Any:
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
print(f"Operation timed out after {timeout}s")
return fallback
# Использование
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
# С таймаутом
result = await operation_with_timeout(
slow_operation(),
timeout=2.0,
fallback="timeout_result"
)
print(f"Result: {result}")
asyncio.run(main())# ✅ Worker Pool pattern
import asyncio
from typing import Callable, Any
class WorkerPool:
def __init__(self, num_workers: int):
self._queue = asyncio.Queue()
self._num_workers = num_workers
self._workers = []
self._results = {}
self._task_id = 0
async def _worker(self, worker_id: int, handler: Callable):
while True:
task_id, args, kwargs = await self._queue.get()
try:
result = await handler(*args, **kwargs)
self._results[task_id] = ("success", result)
except Exception as e:
self._results[task_id] = ("error", e)
self._queue.task_done()
async def submit(self, handler: Callable, *args, **kwargs) -> int:
task_id = self._task_id
self._task_id += 1
await self._queue.put((task_id, args, kwargs))
return task_id
async def get_result(self, task_id: int, timeout: float = None) -> Any:
start = asyncio.get_event_loop().time()
while task_id not in self._results:
if timeout and (asyncio.get_event_loop().time() - start) > timeout:
raise asyncio.TimeoutError()
await asyncio.sleep(0.01)
status, result = self._results.pop(task_id)
if status == "error":
raise result
return result
async def start(self, handler: Callable):
self._workers = [
asyncio.create_task(self._worker(i, handler))
for i in range(self._num_workers)
]
async def stop(self):
await self._queue.join()
for worker in self._workers:
worker.cancel()
# Использование
async def process(item: int) -> int:
await asyncio.sleep(0.1)
return item * 2
async def main():
pool = WorkerPool(num_workers=5)
await pool.start(process)
# Отправка задач
task_ids = []
for i in range(20):
task_id = await pool.submit(process, i)
task_ids.append(task_id)
# Получение результатов
results = []
for task_id in task_ids:
result = await pool.get_result(task_id)
results.append(result)
print(f"Results: {results}")
await pool.stop()
asyncio.run(main())| Паттерн | Когда использовать | Pythonic-реализация |
|---|---|---|
| Producer-Consumer | Асинхронная обработка очередей | asyncio.Queue |
| Rate Limiter | Ограничение частоты запросов | Token Bucket, Sliding Window |
| Circuit Breaker | Защита от сбоев внешних сервисов | State machine (CLOSED/OPEN/HALF_OPEN) |
| Retry with Backoff | Повтор при временных ошибках | Экспоненциальная задержка + jitter |
| Semaphore | Ограничение параллелизма | asyncio.Semaphore |
| Timeout | Ограничение времени операции | asyncio.wait_for |
| Worker Pool | Пул обработчиков | Queue + воркеры |
Главный принцип: Asyncio паттерны превращают ожидание I/O в возможность выполнять другую работу.
Изучите тему Антипаттерны и Code Smells для распознавания распространённых ошибок.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.