Реализация синхронного RPC поверх асинхронной очереди, correlation_id, reply_to
RabbitMQ — асинхронная очередь сообщений, но иногда нужен синхронный запрос-ответ. В этой теме изучим как реализовать RPC (Remote Procedure Call) паттерн поверх RabbitMQ.
В микросервисной архитектуре иногда нужен синхронный вызов:
Сервис A ──► ? ──► Сервис B
│ │
│ Получить данные │
◄──────────────────┘
HTTP — естественный выбор, но RPC через RabbitMQ даёт:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ RPC Client │ ───► │ RabbitMQ │ ───► │ RPC Server │
│ │ │ │ │ │
│ reply_to ──────────► │ │ │
│ correlation_id ───► │ │ │
└──────▲──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ │ │
│ callback_queue ◄──┴─────────────────────┘
│ correlation_id ◄────────────────────────┘
│
│ Response
└───────────────────────────────────────────┘
RPC клиент использует callback_queue для получения ответов и correlation_id для сопоставления запросов:
import aio_pika
from aio_pika import Message, ExchangeType
import uuid
import asyncio
from typing import Any, Optional
from datetime import datetime, timedelta
class RPCClient:
"""
RPC клиент для запрос-ответ через RabbitMQ.
Использует callback_queue для ответов и correlation_id
для сопоставления запроса и ответа.
"""
def __init__(self, connection_url: str):
self.connection_url = connection_url
self.connection: Optional[aio_pika.Connection] = None
self.channel: Optional[aio_pika.Channel] = None
self.callback_queue: Optional[aio_pika.Queue] = None
self.pending_requests: dict[str, asyncio.Future] = {}
async def connect(self):
"""Подключение и создание callback очереди."""
self.connection = await aio_pika.connect_robust(
self.connection_url,
client_properties={"connection_name": "rpc_client"}
)
self.channel = await self.connection.channel()
# Уникальная callback очередь для этого клиента
self.callback_queue = await self.channel.declare_queue(
"", # Авто-имя (amq.gen-...)
exclusive=True, # Только для этого connection
auto_delete=True # Удалить при отключении
)
# Подписка на ответы
await self.callback_queue.consume(self._on_response)
async def _on_response(self, message: aio_pika.abc.AbstractIncomingMessage):
"""Обработка ответа от сервера."""
async with message.process():
correlation_id = message.correlation_id
if correlation_id in self.pending_requests:
future = self.pending_requests.pop(correlation_id)
# Проверка на ошибку
if message.headers.get('x-error'):
future.set_exception(
RPCError(message.headers['x-error'])
)
else:
future.set_result(message.body)
async def call(
self,
exchange: str,
routing_key: str,
body: bytes,
timeout: float = 30.0
) -> Any:
"""
RPC вызов.
Args:
exchange: Обменник для отправки запроса
routing_key: Ключ маршрутизации
body: Тело запроса (сериализованное)
timeout: Таймаут ожидания ответа (секунды)
Returns:
Тело ответа
Raises:
asyncio.TimeoutError: Превышен таймаут ожидания
RPCError: Ошибка на стороне сервера
"""
correlation_id = str(uuid.uuid4())
future = asyncio.Future()
self.pending_requests[correlation_id] = future
message = Message(
body=body,
content_type="application/json",
delivery_mode=2,
correlation_id=correlation_id,
reply_to=self.callback_queue.name,
expiration=str(int(timeout * 1000)) # TTL сообщения
)
await self.channel.default_exchange.publish(
message,
routing_key=routing_key
)
try:
# Ожидание ответа с таймаутом
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
# Очистка pending request
self.pending_requests.pop(correlation_id, None)
raise RPCError(f"Request timeout after {timeout}s")
async def close(self):
"""Закрытие подключения."""
if self.connection:
await self.connection.close()
class RPCError(Exception):
"""Ошибка RPC вызова."""RPC сервер слушает очередь запросов и отправляет ответы в callback_queue клиента:
import json
from typing import Optional
from aio_pika.abc import AbstractIncomingMessage
class RPCServer:
"""
RPC сервер для обработки запросов.
Слушает rpc_queue, обрабатывает запросы и отправляет
ответы в callback_queue клиента.
"""
def __init__(self, connection_url: str, queue_name: str):
self.connection_url = connection_url
self.queue_name = queue_name
self.connection: Optional[aio_pika.Connection] = None
self.channel: Optional[aio_pika.Channel] = None
self.queue: Optional[aio_pika.Queue] = None
async def connect(self):
"""Подключение и объявление очереди."""
self.connection = await aio_pika.connect_robust(
self.connection_url,
client_properties={"connection_name": "rpc_server"}
)
self.channel = await self.connection.channel()
# Очередь для RPC запросов
self.queue = await self.channel.declare_queue(
self.queue_name,
durable=True
)
async def start(self, handler):
"""
Запуск сервера.
Args:
handler: Асинхронная функция обработки запроса
signature: async def handler(request: dict) -> dict
"""
await self.queue.consume(
lambda msg: self._on_request(msg, handler)
)
print(f"RPC server started on queue '{self.queue_name}'")
async def _on_request(
self,
message: AbstractIncomingMessage,
handler
):
"""Обработка RPC запроса."""
async with message.process():
try:
# Парсинг запроса
request = json.loads(message.body)
# Обработка
response = await handler(request)
# Отправка ответа
await self._send_response(
correlation_id=message.correlation_id,
reply_to=message.reply_to,
body=json.dumps(response).encode(),
error=None
)
except Exception as e:
# Отправка ошибки
await self._send_response(
correlation_id=message.correlation_id,
reply_to=message.reply_to,
body=b'{}',
error=str(e)
)
async def _send_response(
self,
correlation_id: str,
reply_to: str,
body: bytes,
error: Optional[str] = None
):
"""Отправка ответа клиенту."""
headers = {}
if error:
headers['x-error'] = error
response = Message(
body=body,
content_type="application/json",
delivery_mode=2,
correlation_id=correlation_id,
headers=headers
)
# Отправка в callback_queue клиента
await self.channel.default_exchange.publish(
response,
routing_key=reply_to
)Пример RPC сервера который выполняет математические операции:
async def calculator_handler(request: dict) -> dict:
"""Обработчик RPC запросов калькулятора."""
operation = request.get('operation')
a = request.get('a')
b = request.get('b')
if operation == 'add':
result = a + b
elif operation == 'subtract':
result = a - b
elif operation == 'multiply':
result = a * b
elif operation == 'divide':
if b == 0:
raise ValueError("Division by zero")
result = a / b
else:
raise ValueError(f"Unknown operation: {operation}")
return {'result': result}
async def run_rpc_server():
server = RPCServer("amqp://guest:guest@localhost/", "rpc_calculator")
await server.connect()
await server.start(calculator_handler)
print("Calculator RPC server running...")
await asyncio.Future() # Держим сервер запущеннымПример RPC клиента который вызывает сервер калькулятора:
async def run_rpc_client():
client = RPCClient("amqp://guest:guest@localhost/")
await client.connect()
try:
# Вызов 1: Сложение
response = await client.call(
exchange="",
routing_key="rpc_calculator",
body=json.dumps({
"operation": "add",
"a": 10,
"b": 5
}).encode(),
timeout=5.0
)
result = json.loads(response)
print(f"10 + 5 = {result['result']}")
# Вызов 2: Деление с ошибкой
try:
response = await client.call(
exchange="",
routing_key="rpc_calculator",
body=json.dumps({
"operation": "divide",
"a": 10,
"b": 0
}).encode(),
timeout=5.0
)
except RPCError as e:
print(f"Error: {e}")
finally:
await client.close()Пул клиентов уменьшает накладные расходы на создание подключений:
import asyncio
from typing import AsyncIterator
from contextlib import asynccontextmanager
class RPCClientPool:
"""Пул RPC клиентов для высокой пропускной способности."""
def __init__(self, connection_url: str, pool_size: int = 10):
self.connection_url = connection_url
self.pool_size = pool_size
self.pool: asyncio.Queue[RPCClient] = asyncio.Queue(maxsize=pool_size)
self._initialized = False
async def _initialize(self):
"""Создание пула клиентов."""
for _ in range(self.pool_size):
client = RPCClient(self.connection_url)
await client.connect()
await self.pool.put(client)
self._initialized = True
@asynccontextmanager
async def acquire(self) -> AsyncIterator[RPCClient]:
"""Получение клиента из пула."""
if not self._initialized:
await self._initialize()
client = await self.pool.get()
try:
yield client
finally:
await self.pool.put(client)
async def close(self):
"""Закрытие всех клиентов в пуле."""
while not self.pool.empty():
client = await self.pool.get()
await client.close()
# Использование
pool = RPCClientPool("amqp://guest:guest@localhost/", pool_size=10)
async def make_rpc_call():
async with pool.acquire() as client:
response = await client.call(
exchange="",
routing_key="rpc_service",
body=b'{"method": "test"}',
timeout=5.0
)
return responseПараллельные RPC вызовы через asyncio.gather для увеличения пропускной способности:
class BatchRPCClient(RPCClient):
"""RPC клиент с поддержкой batch вызовов."""
async def batch_call(
self,
requests: list[dict],
timeout: float = 30.0
) -> list[Any]:
"""
Параллельные RPC вызовы.
Args:
requests: Список запросов
timeout: Общий таймаут для всех
Returns:
Список ответов в том же порядке
"""
tasks = [
self.call(
exchange="",
routing_key=req['routing_key'],
body=json.dumps(req['body']).encode(),
timeout=timeout
)
for req in requests
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
json.loads(r) if isinstance(r, bytes) else r
for r in results
]
# Использование
async def batch_example():
client = BatchRPCClient("amqp://guest:guest@localhost/")
await client.connect()
requests = [
{"routing_key": "rpc_calc", "body": {"operation": "add", "a": 1, "b": 2}},
{"routing_key": "rpc_calc", "body": {"operation": "mul", "a": 3, "b": 4}},
{"routing_key": "rpc_calc", "body": {"operation": "sub", "a": 5, "b": 6}},
]
results = await client.batch_call(requests, timeout=10.0)
print(f"Results: {results}")
await client.close()Автоматические retry с exponential backoff для обработки временных ошибок:
class ResilientRPCClient(RPCClient):
"""RPC клиент с retry логикой."""
async def call_with_retry(
self,
exchange: str,
routing_key: str,
body: bytes,
timeout: float = 30.0,
max_retries: int = 3,
base_delay: float = 1.0
) -> Any:
"""
RPC вызов с автоматическими retry.
Args:
max_retries: Максимальное количество попыток
base_delay: Базовая задержка между retry
Returns:
Тело ответа
Raises:
RPCError: После исчерпания retry
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return await self.call(
exchange=exchange,
routing_key=routing_key,
body=body,
timeout=timeout
)
except asyncio.TimeoutError as e:
last_error = e
if attempt < max_retries:
delay = base_delay * (2 ** attempt)
logger.warning(
f"RPC timeout, retry {attempt + 1} in {delay}s",
extra={"routing_key": routing_key}
)
await asyncio.sleep(delay)
else:
raise RPCError(f"RPC failed after {max_retries + 1} attempts")
except RPCError as e:
# Не retry'ить ошибки сервера
raise
raise last_errorМетрики Prometheus для отслеживания RPC вызовов:
from prometheus_client import Counter, Histogram
rpc_requests_total = Counter(
'rpc_requests_total',
'Total RPC requests',
['method', 'status']
)
rpc_request_duration = Histogram(
'rpc_request_duration_seconds',
'RPC request duration',
['method']
)
class MonitoredRPCClient(RPCClient):
"""RPC клиент с метриками."""
async def call(self, exchange, routing_key, body, timeout=30.0):
start_time = asyncio.get_event_loop().time()
try:
result = await super().call(
exchange=exchange,
routing_key=routing_key,
body=body,
timeout=timeout
)
rpc_requests_total.labels(
method=routing_key,
status='success'
).inc()
return result
except asyncio.TimeoutError:
rpc_requests_total.labels(
method=routing_key,
status='timeout'
).inc()
raise
except RPCError as e:
rpc_requests_total.labels(
method=routing_key,
status='error'
).inc()
raise
finally:
rpc_request_duration.labels(
method=routing_key
).observe(
asyncio.get_event_loop().time() - start_time
)Следующие рекомендации обеспечат надёжную работу RPC через RabbitMQ:
Используйте UUID для гарантированной уникальности correlation_id:
# ✅ Используйте UUID
correlation_id = str(uuid.uuid4())
# ❌ Не используйте timestamp — возможны коллизии
correlation_id = str(int(time.time() * 1000))Всегда указывайте таймаут чтобы избежать вечного ожидания:
# ✅ Всегда указывайте таймаут
response = await client.call(
exchange="",
routing_key="rpc_service",
body=b'{}',
timeout=30.0 # Таймаут обязателен
)
# ❌ Без таймаута — может ждать вечно
response = await client.call(
exchange="",
routing_key="rpc_service",
body=b'{}'
# Нет таймаута!
)Ограничьте размер ответа для защиты от больших сообщений:
MAX_RESPONSE_SIZE = 1024 * 1024 # 1MB
async def _on_response(self, message):
async with message.process():
if len(message.body) > MAX_RESPONSE_SIZE:
raise RPCError(f"Response too large: {len(message.body)} bytes")
# Обработка ответа...Логируйте correlation_id для трассировки запросов:
logger.info(
f"RPC request {correlation_id} to {routing_key}",
extra={
"correlation_id": correlation_id,
"routing_key": routing_key,
"request_size": len(body)
}
)Реализуйте graceful shutdown для корректной остановки сервера:
class GracefulRPCServer(RPCServer):
async def stop(self):
"""Корректная остановка сервера."""
print("Stopping RPC server...")
# Отмена подписки
await self.queue.cancel("rpc_consumer")
# Завершение текущих обработок
await asyncio.sleep(1)
# Закрытие подключения
await self.connection.close()
print("RPC server stopped")В следующей теме изучим Pub/Sub паттерн и широковещательную рассылку.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.