Unit- и интеграционные тесты: TestClient, моки брокеров, тестирование publishers и subscribers, fixtures.
Unit- и интеграционные тесты: TestClient, моки брокеров, тестирование publishers и subscribers, fixtures.
FastStream предоставляет TestClient для тестирования без реального брокера:
from faststream import FastStream
from faststream.testing import TestClient
app = FastStream()
async def test_basic():
async with TestClient(app) as client:
# Публикация сообщения
await client.publish({"test": "data"}, "test-queue")
# Assertions...Преимущества:
# app.py
from faststream import FastStream
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
class Order(BaseModel):
id: int
amount: float
@broker.subscriber("orders")
async def process_order(order: Order):
if order.amount <= 0:
raise ValueError("Invalid amount")
return {"status": "processed", "order_id": order.id}
# test_app.py
import pytest
from faststream.testing import TestClient
from app import app, broker
@pytest.mark.asyncio
async def test_process_order():
async with TestClient(app) as client:
# Публикация валидного заказа
await client.publish({"id": 1, "amount": 100}, "orders")
# Проверка (зависит от реализации)
# Например, проверка вызова mock-объектаfrom unittest.mock import AsyncMock, patch
import pytest
from faststream.testing import TestClient
@pytest.mark.asyncio
async def test_process_order_with_mock_db():
mock_db = AsyncMock()
with patch("app.db", mock_db):
async with TestClient(app) as client:
await client.publish({"id": 1, "amount": 100}, "orders")
# Проверка вызова БД
mock_db.orders.save.assert_called_once()@pytest.mark.asyncio
async def test_invalid_order():
async with TestClient(app) as client:
# Невалидный заказ (amount — строка вместо числа)
await client.publish({"id": 1, "amount": "invalid"}, "orders")
# Проверка, что обработчик не вызвался или выбросил ошибку
# Зависит от реализации обработки ошибокimport pytest
import asyncio
from testcontainers.rabbitmq import RabbitMqContainer
@pytest.fixture(scope="session")
def rabbitmq_container():
with RabbitMqContainer() as rabbitmq:
yield rabbitmq.get_connection_url()
@pytest.mark.asyncio
async def test_integration_with_real_broker(rabbitmq_container):
from app import broker, app
# Пересоздать брокер с тестовым URL
test_broker = RabbitBroker(rabbitmq_container)
app.broker = test_broker
async with test_broker:
await test_broker.publish({"id": 1}, "orders")
# Подождать обработки
await asyncio.sleep(0.5)
# Assertions# conftest.py
import pytest
from faststream.testing import TestClient
from unittest.mock import AsyncMock
@pytest.fixture
def mock_db():
db = AsyncMock()
db.orders.save = AsyncMock()
db.orders.get = AsyncMock(return_value={"id": 1})
return db
@pytest.fixture
async def faststream_client(mock_db):
from app import app
with patch("app.db", mock_db):
async with TestClient(app) as client:
yield client
# test_handlers.py
@pytest.mark.asyncio
async def test_order_processing(faststream_client, mock_db):
await faststream_client.publish({"id": 1, "amount": 100}, "orders")
# Проверка
mock_db.orders.save.assert_called_once()@pytest.mark.asyncio
async def test_publisher():
from app import broker, app
async with TestClient(app) as client:
# Подписка на ответ
received = []
@broker.subscriber("responses")
async def capture_response(msg):
received.append(msg)
# Триггер публикации
await client.publish({"id": 1}, "requests")
# Проверка
assert len(received) == 1
assert received[0]["status"] == "processed"@pytest.mark.asyncio
async def test_multiple_publishers():
from app import app, broker
events = []
async with TestClient(app) as client:
@broker.subscriber("order.created")
async def on_created(event):
events.append(("created", event))
@broker.subscriber("order.paid")
async def on_paid(event):
events.append(("paid", event))
# Запуск процесса
await client.publish({"id": 1}, "orders.new")
# Проверка цепочки событий
assert len(events) == 2
assert events[0][0] == "created"
assert events[1][0] == "paid"@pytest.mark.asyncio
async def test_batch_processing():
from app import app, broker
processed_batches = []
@broker.subscriber("orders", batch=True)
async def process_batch(orders):
processed_batches.append(orders)
async with TestClient(app) as client:
# Опубликовать несколько сообщений
for i in range(5):
await client.publish({"id": i}, "orders")
# Подождать batch
await asyncio.sleep(0.5)
# Проверка
assert len(processed_batches) > 0
assert len(processed_batches[0]) <= 10 # max_batch_sizefrom faststream import FastStream
from faststream.testing import TestClient
app = FastStream()
# Middleware для логирования
@app.middleware
async def log_middleware(message, next_middleware):
print(f"Processing: {message}")
return await next_middleware(message)
@pytest.mark.asyncio
async def test_middleware():
from io import StringIO
import sys
# Перехват stdout
captured = StringIO()
sys.stdout = captured
async with TestClient(app) as client:
await client.publish({"test": "data"}, "queue")
sys.stdout = sys.__stdout__
# Проверка логов
assert "Processing:" in captured.getvalue()@pytest.mark.asyncio
async def test_retry_on_error():
from app import app, broker
from unittest.mock import AsyncMock
call_count = 0
async def flaky_handler(msg):
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("Temporary error")
return "success"
# Подменить обработчик
original_handler = broker._subscribers["orders"]
broker._subscribers["orders"] = flaky_handler
async with TestClient(app) as client:
await client.publish({"id": 1}, "orders")
await asyncio.sleep(0.5)
# Проверка, что было 3 попытки
assert call_count == 3@pytest.mark.asyncio
async def test_dependency_override():
from app import app, get_db
# Тестовая зависимость
async def get_test_db():
yield {"type": "test_db"}
async with TestClient(app) as client:
# Переопределить зависимость
client.override_dependency(get_db, get_test_db)
await client.publish({"id": 1}, "orders")
# Проверка использования тестовой БД@pytest.mark.asyncio
async def test_order_snapshot():
from app import app
import json
async with TestClient(app) as client:
# Опубликовать и получить результат
result = await client.publish({"id": 1, "amount": 100}, "orders")
# Snapshot результата
snapshot = json.dumps(result, sort_keys=True, indent=2)
# Сравнение с эталоном
assert snapshot == """{
"order_id": 1,
"status": "processed"
}"""@pytest.mark.asyncio
@pytest.mark.parametrize(
"order,expected_status",
[
({"id": 1, "amount": 100}, "processed"),
({"id": 2, "amount": 0}, "error"),
({"id": 3, "amount": -50}, "error"),
]
)
async def test_order_variants(order, expected_status):
from app import app, broker
results = []
@broker.subscriber("orders.result")
async def capture_result(result):
results.append(result)
async with TestClient(app) as client:
await client.publish(order, "orders")
await asyncio.sleep(0.3)
assert results[0]["status"] == expected_status# tests/test_orders.py
import pytest
from unittest.mock import AsyncMock, patch
from faststream.testing import TestClient
from app import app, broker
from pydantic import BaseModel
class Order(BaseModel):
id: int
amount: float
@pytest.fixture
def mock_services():
mock_db = AsyncMock()
mock_cache = AsyncMock()
mock_payment = AsyncMock()
mock_payment.charge = AsyncMock(return_value={"status": "success"})
return {
"db": mock_db,
"cache": mock_cache,
"payment": mock_payment
}
@pytest.mark.asyncio
async def test_full_order_flow(mock_services):
with patch("app.db", mock_services["db"]), \
patch("app.cache", mock_services["cache"]), \
patch("app.payment_service", mock_services["payment"]):
async with TestClient(app) as client:
# Опубликовать заказ
await client.publish(
{"id": 1, "amount": 100, "card": "1234"},
"orders.new"
)
# Подождать обработки
await asyncio.sleep(0.5)
# Проверки
mock_services["payment"].charge.assert_called_once()
mock_services["db"].orders.save.assert_called_once()
mock_services["cache"].set.assert_called_once()Следующая тема — Middleware и обработка ошибок: перехватчики сообщений, retry-логика, dead letter queues, circuit breaker, alerting.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.