Внедрение зависимостей в обработчики: Depends, контекст, базы данных, кэш, внешние сервисы.
Внедрение зависимостей в обработчики: Depends, контекст, базы данных, кэш, внешние сервисы.
FastStream использует систему Dependency Injection, аналогичную FastAPI:
from faststream import FastStream, Depends
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Зависимость
async def get_db():
db = Database()
await db.connect()
yield db
await db.close()
@broker.subscriber("orders")
async def handle_order(order: dict, db=Depends(get_db)):
# db автоматически инжектится
await db.orders.save(order)Преимущества:
Простейшая зависимость возвращает значение напрямую — например, логгер или настройки. Функция зависимости вызывается перед обработчиком, результат передаётся в аргумент.
async def get_logger():
return logging.getLogger("orders")
@broker.subscriber("orders")
async def handle_order(order: dict, logger=Depends(get_logger)):
logger.info(f"Processing order {order['id']}")Для зависимостей с жизненным циклом (подключения к БД, кэш) используется yield. Код до yield выполняется до обработчика, после yield — после обработчика (даже при exception).
async def get_db():
db = Database()
await db.connect()
yield db # Передаётся в обработчик
await db.close() # Выполняется после обработчикаЖизненный цикл:
get_db() до yielddb передаётся в обработчикyieldasync def get_redis():
redis = await aioredis.from_url("redis://localhost")
yield redis
await redis.close()
async def get_cache(redis=Depends(get_redis)):
return Cache(redis)
@broker.subscriber("orders")
async def handle_order(
order: dict,
cache=Depends(get_cache) # Автоматически инжектит redis
):
cached = await cache.get(f"order:{order['id']}")import asyncpg
from faststream import Depends
async def get_db_pool():
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db"
)
yield pool
await pool.close()
async def get_connection(pool=Depends(get_db_pool)):
async with pool.acquire() as conn:
yield conn
@broker.subscriber("orders")
async def handle_order(order: dict, conn=Depends(get_connection)):
await conn.execute(
"INSERT INTO orders (id, amount) VALUES ($1, $2)",
order["id"], order["amount"]
)from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("postgresql+asyncpg://localhost/db")
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db_session():
async with async_session() as session:
yield session
await session.commit()
@broker.subscriber("orders")
async def handle_order(order: dict, session: AsyncSession = Depends(get_db_session)):
from models import Order
db_order = Order(**order)
session.add(db_order)
# commit вызывается автоматически после yieldfrom motor.motor_asyncio import AsyncIOMotorClient
client = AsyncIOMotorClient("mongodb://localhost")
db = client["mydb"]
async def get_database():
yield db
async def get_orders_collection(db=Depends(get_database)):
yield db["orders"]
@broker.subscriber("orders")
async def handle_order(order: dict, collection=Depends(get_orders_collection)):
await collection.insert_one(order)import aioredis
from functools import lru_cache
@lru_cache
def get_redis_url():
return "redis://localhost:6379"
async def get_redis():
redis = await aioredis.from_url(get_redis_url())
yield redis
await redis.close()
class Cache:
def __init__(self, redis):
self.redis = redis
async def get(self, key: str):
return await self.redis.get(key)
async def set(self, key: str, value, ttl: int = 3600):
await self.redis.setex(key, ttl, value)
async def get_cache(redis=Depends(get_redis)):
return Cache(redis)
@broker.subscriber("products")
async def handle_product(product: dict, cache=Depends(get_cache)):
await cache.set(f"product:{product['id']}", product, ttl=3600)import aiohttp
async def get_http_session():
async with aiohttp.ClientSession() as session:
yield session
async def get_payment_service(session=Depends(get_http_session)):
return PaymentService(session)
class PaymentService:
def __init__(self, session):
self.session = session
self.base_url = "https://payment.example.com"
async def charge(self, amount: float, card: str):
async with self.session.post(
f"{self.base_url}/charge",
json={"amount": amount, "card": card}
) as resp:
return await resp.json()
@broker.subscriber("payments")
async def handle_payment(payment: dict, service=Depends(get_payment_service)):
result = await service.charge(payment["amount"], payment["card"])
print(f"Payment result: {result}")from faststream.rabbit import RabbitMessage
async def get_message_context(message: RabbitMessage):
return {
"trace_id": message.headers.get("trace_id"),
"correlation_id": message.correlation_id,
"reply_to": message.reply_to
}
@broker.subscriber("orders")
async def handle_order(
order: dict,
context=Depends(get_message_context)
):
logger.info(
f"Processing order {order['id']}",
extra={"trace_id": context["trace_id"]}
)async def get_current_user(token: str = Header("X-User-Token")):
user = await auth_service.verify(token)
return user
@broker.subscriber("orders")
async def handle_order(
order: dict,
user=Depends(get_current_user)
):
# Проверка прав
if not user.can_create_orders:
raise PermissionError()
order["user_id"] = user.id
await db.orders.save(order)from dataclasses import dataclass
@dataclass
class Settings:
db_url: str
redis_url: str
debug: bool = False
@lru_cache
def get_settings():
return Settings(
db_url=os.getenv("DB_URL"),
redis_url=os.getenv("REDIS_URL"),
debug=os.getenv("DEBUG", "false").lower() == "true"
)
async def get_db(settings=Depends(get_settings)):
db = await create_db(settings.db_url)
yield db
await db.close()
@broker.subscriber("orders")
async def handle_order(
order: dict,
settings=Depends(get_settings), # Кэшируется
db=Depends(get_db)
):
if settings.debug:
logger.debug(f"Debug order: {order}")# production.py
async def get_db():
db = Database("postgresql://prod/db")
yield db
await db.close()
@broker.subscriber("orders")
async def handle_order(order: dict, db=Depends(get_db)):
await db.orders.save(order)
# test.py
from faststream.testing import TestClient
async def get_test_db():
yield TestDatabase() # Мок база данных
async with TestClient(app) as client:
client.override_dependency(get_db, get_test_db)
await client.publish({"id": 1}, "orders")from faststream import FastStream, Depends
from faststream.rabbit import RabbitBroker
from sqlalchemy.ext.asyncio import AsyncSession
import aiohttp
from functools import lru_cache
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Settings
@lru_cache
def get_settings():
return Settings(
db_url=os.getenv("DB_URL"),
redis_url=os.getenv("REDIS_URL"),
payment_api_url=os.getenv("PAYMENT_API_URL")
)
# Database
async def get_db_session(settings=Depends(get_settings)):
engine = create_async_engine(settings.db_url)
async with async_session(engine) as session:
yield session
# Redis
async def get_redis(settings=Depends(get_settings)):
redis = await aioredis.from_url(settings.redis_url)
yield redis
await redis.close()
# Cache service
async def get_cache(redis=Depends(get_redis)):
return CacheService(redis)
# Payment service
async def get_payment_service(settings=Depends(get_settings)):
async with aiohttp.ClientSession() as session:
yield PaymentService(session, settings.payment_api_url)
# Обработчик со всеми зависимостями
@broker.subscriber("orders")
async def process_order(
order: dict,
db: AsyncSession = Depends(get_db_session),
cache=Depends(get_cache),
payment=Depends(get_payment_service),
settings=Depends(get_settings)
):
# Проверка кэша
cached = await cache.get(f"order:{order['id']}")
if cached:
return
# Обработка платежа
result = await payment.charge(order["amount"], order["card"])
# Сохранение в БД
db_order = Order(**order, payment_result=result)
db.add(db_order)
# Кэширование
await cache.set(f"order:{order['id']}", order, ttl=3600)
logger.info(f"Order {order['id']} processed")Следующая тема — Тестирование FastStream приложений: TestClient, моки, интеграционные тесты, fixtures.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.