Синхронные вызовы через очереди: RPC в RabbitMQ, request-reply в NATS, таймауты, обработка ошибок.
Синхронные вызовы через очереди: RPC в RabbitMQ, request-reply в NATS, таймауты, обработка ошибок.
RPC (Remote Procedure Call) позволяет вызвать удалённую функцию через очередь сообщений и получить ответ.
Client ──> [Request Queue] ──> Server
▲ │
│ [Reply Queue] <──────┘
└──────────────────────────────┘
Сценарии использования:
import uuid
import asyncio
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Хранилище для ожидающих ответов
pending_requests = {}
@broker.subscriber("rpc.results")
async def handle_rpc_result(result: dict, message: RabbitMessage):
correlation_id = message.correlation_id
if correlation_id in pending_requests:
pending_requests[correlation_id].set_result(result)
async def rpc_call(request_data, timeout=5.0):
correlation_id = str(uuid.uuid4())
future = asyncio.Future()
pending_requests[correlation_id] = future
try:
# Опубликовать запрос
await broker.publish(
request_data,
"rpc.requests",
reply_to="rpc.results",
correlation_id=correlation_id
)
# Ждать ответ с таймаутом
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
raise TimeoutError(f"RPC call timed out after {timeout}s")
finally:
pending_requests.pop(correlation_id, None)
# Server
@broker.subscriber("rpc.requests")
async def handle_rpc_request(request: dict, message: RabbitMessage):
# Обработать запрос
result = {"result": request["value"] * 2}
# Опубликовать ответ
await broker.publish(
result,
message.reply_to,
correlation_id=message.correlation_id
)from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost")
@broker.publisher("rpc.results")
@broker.subscriber("rpc.requests")
async def process_request(request: dict, message: RabbitMessage):
result = {"result": request["value"] * 2}
# Ответ автоматически уйдёт в reply_to
return result
# Client
async def call_rpc(value):
response = await broker.request(
{"value": value},
"rpc.requests",
timeout=5.0
)
return responseNATS имеет встроенную поддержку request-reply:
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)
# Server
@broker.subscriber("calculate")
async def handle_calculate(data: dict) -> dict:
return {"result": data["a"] + data["b"]}
# Client
async def call_calculate():
response = await broker.request(
{"a": 2, "b": 3},
"calculate",
timeout=5.0
)
print(response) # {"result": 5}Преимущества NATS:
from faststream import FastStream
from faststream.redis import RedisBroker
import uuid
import asyncio
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)
pending_requests = {}
@broker.subscriber("rpc.results")
async def handle_result(result: dict, message):
correlation_id = message.headers.get("correlation_id")
if correlation_id in pending_requests:
pending_requests[correlation_id].set_result(result)
async def rpc_call(request_data, timeout=5.0):
correlation_id = str(uuid.uuid4())
future = asyncio.Future()
pending_requests[correlation_id] = future
try:
await broker.publish(
request_data,
"rpc.requests",
headers={"correlation_id": correlation_id},
reply_to="rpc.results"
)
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
raise TimeoutError("RPC timeout")
finally:
pending_requests.pop(correlation_id, None)
# Server
@broker.subscriber("rpc.requests")
async def handle_request(request: dict, message):
correlation_id = message.headers.get("correlation_id")
reply_to = message.reply_to
result = {"result": request["value"] * 2}
await broker.publish(
result,
reply_to,
headers={"correlation_id": correlation_id}
)# RabbitMQ
response = await broker.request(
{"data": "value"},
"service.method",
timeout=10.0 # 10 секунд
)
# NATS
response = await broker.request(
{"data": "value"},
"service.method",
timeout=5.0
)
# Redis
response = await broker.request(
{"data": "value"},
"service.method",
timeout=3.0
)import asyncio
from faststream.exceptions import TimeoutError
async def call_with_retry(service_method, data, max_retries=3):
for attempt in range(max_retries):
try:
response = await broker.request(
data,
service_method,
timeout=5.0
)
return response
except TimeoutError:
if attempt == max_retries - 1:
raise
logger.warning(f"Timeout, retry {attempt + 1}/{max_retries}")
await asyncio.sleep(1.0 * (attempt + 1))
except Exception as e:
logger.error(f"RPC error: {e}")
raise
# Использование
try:
result = await call_with_retry("calculate.sum", {"a": 2, "b": 3})
except TimeoutError:
logger.error("Service unavailable after retries")
# Fallback логика# Server
@broker.subscriber("calculate")
async def handle_calculate(data: dict) -> dict:
if "a" not in data or "b" not in data:
# Вернуть ошибку клиенту
return {
"error": True,
"message": "Missing required fields: a, b"
}
return {"result": data["a"] + data["b"]}
# Client
async def safe_call(data):
response = await broker.request(data, "calculate", timeout=5.0)
if response.get("error"):
raise ValueError(response["message"])
return response["result"]from faststream.exceptions import ConnectionError
async def resilient_rpc(data):
try:
response = await broker.request(data, "service.method", timeout=5.0)
return response
except ConnectionError:
# Брокер недоступен
logger.error("Broker connection lost")
return await fallback_processing(data)
except TimeoutError:
# Таймаут ответа
logger.warning("Service timeout")
return {"status": "pending", "data": data}# Несколько серверов в queue group
# Server 1
@broker.subscriber("calculate", queue="calc-workers")
async def handle_calculate_1(data: dict) -> dict:
return {"result": data["a"] + data["b"], "server": 1}
# Server 2
@broker.subscriber("calculate", queue="calc-workers")
async def handle_calculate_2(data: dict) -> dict:
return {"result": data["a"] + data["b"], "server": 2}
# Client — запрос балансируется между серверами
response = await broker.request({"a": 2, "b": 3}, "calculate")# Несколько инстансов подписываются на одну очередь
@broker.subscriber(
"rpc.requests",
"rpc.requests.queue", # Одна очередь на всех
max_workers=10
)
async def handle_rpc_request(request: dict, message: RabbitMessage):
result = {"result": request["value"] * 2}
await broker.publish(
result,
message.reply_to,
correlation_id=message.correlation_id
)from faststream import FastStream
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel, ValidationError
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
class ValidateRequest(BaseModel):
data: dict
schema: dict
class ValidateResponse(BaseModel):
valid: bool
errors: list = []
# Server: сервис валидации
@broker.publisher("validation.results")
@broker.subscriber("validation.requests")
async def validate_data(request: ValidateRequest, message: RabbitMessage):
try:
# Валидация через Pydantic
from pydantic import create_model
DynamicModel = create_model('DynamicModel', **request.schema)
DynamicModel(**request.data)
return ValidateResponse(valid=True)
except ValidationError as e:
return ValidateResponse(
valid=False,
errors=[{"field": err[0], "message": err[1][0]["msg"]}
for err in e.errors()]
)
# Client: вызов сервиса валидации
async def validate_user_data(data: dict):
response = await broker.request(
ValidateRequest(
data=data,
schema={"name": (str, ...), "email": (str, ...)}
).model_dump(),
"validation.requests",
timeout=3.0
)
result = ValidateResponse(**response)
if not result.valid:
raise ValueError(f"Validation failed: {result.errors}")
return resultfrom faststream import FastStream
from faststream.nats import NatsBroker
import asyncio
broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)
# Server: вычислительный воркер
@broker.subscriber("compute.factorial", queue="compute-workers")
async def compute_factorial(data: dict) -> dict:
n = data["n"]
result = 1
for i in range(2, n + 1):
result *= i
return {"result": result, "n": n}
# Client: агрегатор вычислений
async def compute_sum_of_factorials(numbers: list[int]):
tasks = []
for n in numbers:
task = broker.request({"n": n}, "compute.factorial", timeout=10.0)
tasks.append(task)
# Параллельные вычисления
results = await asyncio.gather(*tasks)
# Агрегация
total = sum(r["result"] for r in results)
return {"total": total, "count": len(numbers)}
# Использование
# total = await compute_sum_of_factorials([5, 10, 15])
# print(total) # {"total": 1307674368000, "count": 3}Следующая тема — Масштабирование и производительность: горизонтальное масштабирование, конкурентность, worker pools, оптимизация потребления, backpressure.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.