Connection pooling, graceful shutdown, idempotency, тестирование, антипаттерны
Production-системы требуют надёжности, масштабируемости и наблюдаемости. В этой теме изучим паттерны и практики для эксплуатации RabbitMQ в production.
Создание подключения — дорогая операция. Используйте пул подключений:
import asyncio
from typing import Optional, AsyncIterator
from contextlib import asynccontextmanager
import aio_pika
from aio_pika.abc import AbstractConnection, AbstractChannel
class ConnectionPool:
"""
Пул подключений к RabbitMQ.
Переиспользует подключения для уменьшения накладных расходов
на TCP handshake и AMQP handshake.
"""
def __init__(
self,
url: str,
min_size: int = 2,
max_size: int = 10,
reconnect_interval: float = 5.0
):
self.url = url
self.min_size = min_size
self.max_size = max_size
self.reconnect_interval = reconnect_interval
self._pool: asyncio.Queue[AbstractConnection] = asyncio.Queue(maxsize=max_size)
self._channels: dict[int, AbstractChannel] = {}
self._initialized = False
self._lock = asyncio.Lock()
async def _create_connection(self) -> AbstractConnection:
"""Создание нового подключения."""
return await aio_pika.connect_robust(
self.url,
reconnect_interval=self.reconnect_interval,
client_properties={
"connection_name": f"pool_{id(self)}"
}
)
async def _initialize(self):
"""Инициализация пула."""
async with self._lock:
if self._initialized:
return
# Создание минимального количества подключений
for _ in range(self.min_size):
connection = await self._create_connection()
await self._pool.put(connection)
self._initialized = True
@asynccontextmanager
async def acquire(self) -> AsyncIterator[AbstractConnection]:
"""Получение подключения из пула."""
if not self._initialized:
await self._initialize()
# Получение из пула или создание нового
try:
connection = self._pool.get_nowait()
except asyncio.QueueEmpty:
# Пул пуст — создаём новое если есть место
if self._pool.qsize() < self.max_size:
connection = await self._create_connection()
else:
# Ждём освобождения
connection = await self._pool.get()
try:
yield connection
finally:
# Возврат в пул
if connection.is_closed:
# Пересоздать если закрыто
connection = await self._create_connection()
await self._pool.put(connection)
async def close(self):
"""Закрытие всех подключений."""
while not self._pool.empty():
connection = await self._pool.get()
await connection.close()
self._initialized = False
# Использование
pool = ConnectionPool(
"amqp://guest:guest@localhost/",
min_size=2,
max_size=10
)
async def publish_message():
async with pool.acquire() as connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
"app_exchange",
aio_pika.ExchangeType.TOPIC
)
await exchange.publish(
aio_pika.Message(body=b"Hello"),
routing_key="test"
)Корректное завершение работы при получении сигнала:
import signal
import asyncio
from typing import List, Callable, Awaitable
class GracefulApp:
"""
Приложение с graceful shutdown.
Корректно завершает работу при SIGTERM/SIGINT:
1. Останавливает приём новых сообщений
2. Завершает обработку текущих
3. Закрывает подключения
"""
def __init__(self, name: str):
self.name = name
self._shutdown_event = asyncio.Event()
self._tasks: List[asyncio.Task] = []
self._connections: List[aio_pika.Connection] = []
self._cleanup_handlers: List[Callable[[], Awaitable]] = []
def register_task(self, task: asyncio.Task):
"""Регистрация задачи для отслеживания."""
self._tasks.append(task)
def register_connection(self, connection: aio_pika.Connection):
"""Регистрация подключения для закрытия."""
self._connections.append(connection)
def register_cleanup(self, handler: Callable[[], Awaitable]):
"""Регистрация cleanup обработчика."""
self._cleanup_handlers.append(handler)
def setup_signal_handlers(self):
"""Настройка обработчиков сигналов."""
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(self._shutdown())
)
async def _shutdown(self):
"""Процедура graceful shutdown."""
if self._shutdown_event.is_set():
return
print(f"\n[{self.name}] Graceful shutdown initiated...")
# Сигнал остановки
self._shutdown_event.set()
# Отмена подписок
print(f"[{self.name}] Cancelling subscriptions...")
# Ожидание завершения задач (до 30 секунд)
if self._tasks:
print(f"[{self.name}] Waiting for {len(self._tasks)} tasks to complete...")
done, pending = await asyncio.wait(
self._tasks,
timeout=30,
return_when=asyncio.ALL_COMPLETED
)
if pending:
print(f"[{self.name}] Cancelling {len(pending)} pending tasks...")
for task in pending:
task.cancel()
# Cleanup обработчики
print(f"[{self.name}] Running cleanup handlers...")
for handler in self._cleanup_handlers:
try:
await handler()
except Exception as e:
print(f"[{self.name}] Cleanup handler error: {e}")
# Закрытие подключений
print(f"[{self.name}] Closing connections...")
for connection in self._connections:
try:
await connection.close()
except Exception as e:
print(f"[{self.name}] Connection close error: {e}")
print(f"[{self.name}] Shutdown complete")
async def run(self, main_coro: Awaitable):
"""Запуск приложения."""
self.setup_signal_handlers()
try:
await main_coro
except asyncio.CancelledError:
pass
finally:
await self._shutdown()
@property
def is_shutting_down(self) -> bool:
return self._shutdown_event.is_set()Защита от дубликатов сообщений:
import hashlib
from datetime import datetime, timedelta
from typing import Optional
class IdempotencyManager:
"""
Менеджер идемпотентности.
Хранит ID обработанных сообщений и предотвращает
повторную обработку дубликатов.
"""
def __init__(self, ttl_hours: int = 24):
self.ttl = timedelta(hours=ttl_hours)
self._processed: dict[str, datetime] = {}
self._lock = asyncio.Lock()
def _generate_key(self, message: aio_pika.abc.AbstractIncomingMessage) -> str:
"""Генерация уникального ключа для сообщения."""
# Используем message_id если есть
if message.message_id:
return message.message_id
# Или хэш от body + correlation_id
content = f"{message.correlation_id}:{message.body}"
return hashlib.sha256(content.encode()).hexdigest()
async def is_duplicate(self, message: aio_pika.abc.AbstractIncomingMessage) -> bool:
"""Проверка на дубликат."""
key = self._generate_key(message)
now = datetime.utcnow()
async with self._lock:
# Очистка старых записей
expired = [
k for k, t in self._processed.items()
if now - t > self.ttl
]
for k in expired:
del self._processed[k]
# Проверка дубликата
if key in self._processed:
return True
# Сохранение ID
self._processed[key] = now
return False
# Redis-backed idempotency для распределённых систем
class RedisIdempotencyManager:
"""Идемпотентность с Redis для распределённых консьюмеров."""
def __init__(self, redis_client, ttl_seconds: int = 86400):
self.redis = redis_client
self.ttl = ttl_seconds
async def is_duplicate(self, message: aio_pika.abc.AbstractIncomingMessage) -> bool:
key = self._generate_key(message)
# SETNX — установить если не существует
result = await self.redis.set(
f"idempotency:{key}",
"1",
nx=True,
ex=self.ttl
)
# Возвращает True если ключ был установлен (не дубликат)
return result is None
def _generate_key(self, message) -> str:
if message.message_id:
return message.message_id
content = f"{message.correlation_id}:{message.body}"
return hashlib.sha256(content.encode()).hexdigest()
# Использование в консьюмере
idempotency = IdempotencyManager(ttl_hours=24)
async def process_message(message: aio_pika.abc.AbstractIncomingMessage):
async with message.process():
# Проверка дубликата
if await idempotency.is_duplicate(message):
logger.info(f"Duplicate message: {message.message_id}")
return
# Обработка
await handle_message(message)Тестирование RabbitMQ кода:
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
class TestProducer:
"""Тесты продюсера."""
@pytest.mark.asyncio
async def test_publish_message(self):
"""Тест публикации сообщения."""
# Mock подключения
mock_connection = AsyncMock()
mock_channel = AsyncMock()
mock_exchange = AsyncMock()
mock_connection.channel.return_value = mock_channel
mock_channel.declare_exchange.return_value = mock_exchange
with patch('aio_pika.connect_robust', return_value=mock_connection):
producer = MyProducer("amqp://guest:guest@localhost/")
await producer.connect()
await producer.publish("test.key", b"test body")
# Проверка вызова
mock_exchange.publish.assert_called_once()
call_args = mock_exchange.publish.call_args
message = call_args[0][0]
assert message.body == b"test body"
assert message.delivery_mode == 2
class TestConsumer:
"""Тесты консьюмера."""
@pytest.mark.asyncio
async def test_process_message(self):
"""Тест обработки сообщения."""
# Mock сообщения
mock_message = AsyncMock()
mock_message.body = b'{"user_id": 123}'
mock_message.headers = {}
mock_message.process.return_value.__aenter__ = AsyncMock()
mock_message.process.return_value.__aexit__ = AsyncMock()
consumer = MyConsumer()
# Обработка
await consumer.process_message(mock_message)
# Проверка что ack был отправлен
mock_message.ack.assert_called_once()import pytest
from testcontainers.rabbitmq import RabbitMQContainer
@pytest.fixture(scope="module")
def rabbitmq_container():
"""RabbitMQ в Docker для тестов."""
with RabbitMQContainer("rabbitmq:3-management") as rabbitmq:
yield rabbitmq
@pytest.mark.asyncio
async def test_end_to_end(rabbitmq_container: RabbitMQContainer):
"""End-to-end тест."""
url = rabbitmq_container.get_connection_url()
# Подключение
connection = await aio_pika.connect_robust(url)
channel = await connection.channel()
# Объявление
exchange = await channel.declare_exchange("test_exchange", aio_pika.ExchangeType.DIRECT)
queue = await channel.declare_queue("test_queue")
await queue.bind(exchange, routing_key="test")
# Публикация
await exchange.publish(
aio_pika.Message(body=b"test message", delivery_mode=2),
routing_key="test"
)
# Получение
message = await queue.get(timeout=5)
assert message.body == b"test message"
await message.ack()
await connection.close()Избегайте следующих антипаттернов при работе с RabbitMQ в production:
Создание подключения — дорогая операция, не создавайте его на каждое сообщение:
# ❌ Плохо — очень медленно
async def publish_message():
connection = await aio_pika.connect("amqp://guest:guest@localhost/")
# ... публикация ...
await connection.close()
# ✅ Хорошо — переиспользование
class Producer:
def __init__(self):
self.connection = None
async def connect(self):
self.connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async def publish(self, body):
channel = await self.connection.channel()
# ... публикация ...Автоматический ack приводит к потере сообщений при crash консьюмера:
# ❌ Опасно — сообщения теряются при crash
await queue.consume(callback, no_ack=True)
# ✅ Безопасно — ручной ack
await queue.consume(callback) # no_ack=False по умолчаниюСинхронный код блокирует event loop и останавливает обработку:
# ❌ Блокирует event loop
async def process(message):
time.sleep(5) # Блокировка!
# ✅ Асинхронная версия
async def process(message):
await asyncio.sleep(5)
# ✅ Или в executor
async def process(message):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, blocking_function)Без DLX проблемные сообщения теряются:
# ❌ Опасно — нет обработки проблемных сообщений
queue = await channel.declare_queue("main_queue", durable=True)
# ✅ Безопасно — DLX для ошибок
queue = await channel.declare_queue(
"main_queue",
durable=True,
arguments={"x-dead-letter-exchange": "dlx"}
)RabbitMQ не предназначен для передачи больших файлов:
# ❌ Плохо — сообщение 10MB
message = aio_pika.Message(body=large_binary_data)
# ✅ Хорошо — ссылка на хранилище
message = aio_pika.Message(
body=json.dumps({"file_url": "s3://bucket/file.zip"}).encode()
)## Pre-deployment Checklist
### Надёжность
- [ ] durable=True для очередей и обменников
- [ ] delivery_mode=2 для критичных сообщений
- [ ] Manual ack (no_ack=False)
- [ ] DLX настроен для всех очередей
- [ ] Обработка исключений в консьюмере
- [ ] Retry логика с exponential backoff
- [ ] Circuit breaker для внешних сервисов
### Масштабируемость
- [ ] Connection pooling
- [ ] prefetch_count настроен
- [ ] Консьюмеры stateless
- [ ] Idempotency реализована
### Мониторинг
- [ ] Метрики Prometheus
- [ ] Health check endpoint
- [ ] Логирование с контекстом
- [ ] Алерты на DLQ size
- [ ] Алерты на queue growth
### Безопасность
- [ ] Не default credentials
- [ ] SSL/TLS для production
- [ ] Ограниченный доступ к Management API
- [ ] Secrets в environment variables
### Graceful Shutdown
- [ ] Обработка SIGTERM/SIGINT
- [ ] Завершение текущих обработок
- [ ] Закрытие подключенийВ следующей теме изучим кластеризацию и высокодоступную настройку RabbitMQ.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.