Dependency injection, жизненный цикл приложения, подключение Redis
Правильная интеграция Redis в FastAPI — это не просто
redis.Redis()в глобальной переменной. Это dependency injection, жизненный цикл приложения, graceful shutdown и production-ready настройки.
from fastapi import FastAPI, Depends
import redis.asyncio as redis
app = FastAPI()
# Глобальный клиент (не рекомендуется для production)
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
)
@app.get("/health")
async def health():
await redis_client.ping()
return {"status": "ok"}Проблемы этого подхода:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
import redis.asyncio as redis
import os
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
# Хранилище для клиента
redis_client: redis.Redis | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Управление жизненным циклом приложения"""
global redis_client
# Startup: создание подключения
redis_client = redis.from_url(
REDIS_URL,
decode_responses=True,
socket_timeout=5.0,
socket_connect_timeout=5.0,
health_check_interval=30,
)
await redis_client.ping()
print("Redis connected")
yield # Приложение работает
# Shutdown: закрытие подключения
await redis_client.close()
print("Redis connection closed")
app = FastAPI(lifespan=lifespan)
# Dependency для получения клиента
async def get_redis() -> redis.Redis:
"""Зависимость для получения Redis клиента"""
return redis_client
# Использование в endpoint
@app.get("/health")
async def health_check(redis: redis.Redis = Depends(get_redis)):
await redis.ping()
return {"status": "ok", "redis": "connected"}
@app.get("/counter")
async def increment_counter(redis: redis.Redis = Depends(get_redis)):
count = await redis.incr('counter:visits')
return {"count": count}Для более сложной логики создайте менеджер:
# redis_manager.py
from contextlib import asynccontextmanager
from typing import AsyncIterator
import redis.asyncio as redis
from redis.asyncio.connection import ConnectionPool
import os
class RedisManager:
"""Менеджер подключений к Redis"""
def __init__(self, url: str | None = None):
self.url = url or os.getenv('REDIS_URL', 'redis://localhost:6379/0')
self._client: redis.Redis | None = None
self._pool: ConnectionPool | None = None
async def connect(self) -> redis.Redis:
"""Создание подключения"""
self._pool = ConnectionPool.from_url(
self.url,
decode_responses=True,
max_connections=50,
socket_timeout=5.0,
socket_connect_timeout=5.0,
health_check_interval=30,
)
self._client = redis.Redis(connection_pool=self._pool)
await self._client.ping()
return self._client
async def disconnect(self):
"""Закрытие подключения"""
if self._client:
await self._client.close()
await self._pool.disconnect()
@property
def client(self) -> redis.Redis:
"""Получение клиента (требует предварительного connect)"""
if self._client is None:
raise RuntimeError("Redis not connected. Call connect() first.")
return self._client
# Глобальный экземпляр
redis_manager = RedisManager()
async def get_redis() -> AsyncIterator[redis.Redis]:
"""Dependency для FastAPI"""
yield redis_manager.client# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
import redis.asyncio as redis
from .redis_manager import redis_manager, get_redis
@asynccontextmanager
async def lifespan(app: FastAPI):
await redis_manager.connect()
yield
await redis_manager.disconnect()
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int, redis: redis.Redis = Depends(get_redis)):
user = await redis.hgetall(f'user:{user_id}')
if not user:
return {'error': 'User not found'}
return user# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class RedisSettings(BaseSettings):
"""Настройки Redis"""
redis_url: str = 'redis://localhost:6379/0'
redis_max_connections: int = 50
redis_socket_timeout: float = 5.0
redis_socket_connect_timeout: float = 5.0
redis_health_check_interval: int = 30
class Config:
env_prefix = 'REDIS_'
env_file = '.env'
@lru_cache
def get_redis_settings() -> RedisSettings:
"""Кэшированная загрузка настроек"""
return RedisSettings()# redis_manager.py
from .config import get_redis_settings
class RedisManager:
def __init__(self, settings: RedisSettings | None = None):
self.settings = settings or get_redis_settings()
self._client: redis.Redis | None = None
async def connect(self) -> redis.Redis:
self._client = redis.from_url(
self.settings.redis_url,
decode_responses=True,
socket_timeout=self.settings.redis_socket_timeout,
socket_connect_timeout=self.settings.redis_socket_connect_timeout,
health_check_interval=self.settings.redis_health_check_interval,
)
await self._client.ping()
return self._clientfrom fastapi import FastAPI, Request
import redis.asyncio as redis
import time
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = redis.from_url(
'redis://localhost:6379/0',
decode_responses=True,
)
yield
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
@app.middleware("http")
async def count_requests(request: Request, call_next):
redis_client: redis.Redis = request.app.state.redis
# Инкремент счётчика
path = request.url.path
await redis_client.incr(f'request:count:{path}')
# Время ответа
start = time.time()
response = await call_next(request)
duration = time.time() - start
# Сохранение метрики
await redis_client.incr(f'request:latency:{path}:count')
await redis_client.incrbyfloat(f'request:latency:{path}:total', duration)
response.headers['X-Process-Time'] = str(duration)
return responsefrom fastapi import FastAPI, Request
import redis.asyncio as redis
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.redis = redis.from_url(
'redis://localhost:6379/0',
decode_responses=True,
)
@app.on_event("shutdown")
async def shutdown():
await app.state.redis.close()
@app.get("/items/{item_id}")
async def get_item(request: Request, item_id: int):
redis_client: redis.Redis = request.app.state.redis
item = await redis_client.hgetall(f'item:{item_id}')
return itemfrom fastapi import FastAPI, Request, HTTPException, status
from fastapi.responses import JSONResponse
import redis.asyncio as redis
from datetime import datetime
import time
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.redis = redis.from_url(
'redis://localhost:6379/0',
decode_responses=True,
)
@app.on_event("shutdown")
async def shutdown():
await app.state.redis.close()
async def rate_limit(request: Request, calls: int = 100, period: int = 60):
"""
Rate limiting middleware.
calls: максимум запросов
period: период в секундах
"""
redis_client: redis.Redis = request.app.state.redis
# Идентификатор клиента (IP или user_id)
client_id = request.client.host
# Ключ для rate limiting
key = f'rate_limit:{client_id}'
# Получаем текущее количество запросов
current = await redis_client.get(key)
if current is None:
# Первый запрос — устанавливаем счётчик
await redis_client.setex(key, period, 1)
return
if int(current) >= calls:
# Лимит превышен
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail='Rate limit exceeded',
headers={'Retry-After': str(period)},
)
# Инкремент
await redis_client.incr(key)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
try:
await rate_limit(request, calls=100, period=60)
except HTTPException as e:
return JSONResponse(
status_code=e.status_code,
content={'detail': e.detail},
headers=e.headers,
)
response = await call_next(request)
return responsefrom fastapi import FastAPI, Depends, Request
from fastapi.responses import JSONResponse
import redis.asyncio as redis
import json
import hashlib
app = FastAPI()
async def get_redis() -> redis.Redis:
return app.state.redis
@app.on_event("startup")
async def startup():
app.state.redis = redis.from_url('redis://localhost:6379/0')
def create_cache_key(path: str, query: str) -> str:
"""Создание уникального ключа кэша"""
key_string = f'{path}:{query}'
key_hash = hashlib.md5(key_string.encode()).hexdigest()
return f'cache:{key_hash}'
async def cached_response(
request: Request,
redis: redis.Redis = Depends(get_redis),
ttl: int = 300,
):
"""Dependency для кэширования"""
cache_key = create_cache_key(request.url.path, request.url.query)
# Проверка кэша
cached = await redis.get(cache_key)
if cached:
data = json.loads(cached)
response = JSONResponse(content=data)
response.headers['X-Cache'] = 'HIT'
return response
# Кэша нет — создаём обёртку для ответа
request.state.cache_key = cache_key
request.state.cache_ttl = ttl
request.state.cache_redis = redis
from starlette.middleware.base import BaseHTTPMiddleware
class CacheMiddleware(BaseHTTPMiddleware):
def __init__(self, app, redis: redis.Redis, ttl: int = 300):
super().__init__(app)
self.redis = redis
self.ttl = ttl
async def dispatch(self, request, call_next):
# Только для GET запросов
if request.method != 'GET':
return await call_next(request)
cache_key = create_cache_key(request.url.path, request.url.query)
# Проверка кэша
cached = await self.redis.get(cache_key)
if cached:
response = JSONResponse(content=json.loads(cached))
response.headers['X-Cache'] = 'HIT'
return response
# Выполнение запроса
response = await call_next(request)
# Кэширование ответа
if response.status_code == 200:
body = b''
async for chunk in response.body_iterator:
body += chunk
await self.redis.setex(
cache_key,
self.ttl,
json.dumps(json.loads(body)),
)
# Возвращаем ответ с телом
response = JSONResponse(content=json.loads(body))
response.headers['X-Cache'] = 'MISS'
return response
# Использование
app.add_middleware(CacheMiddleware, redis=app.state.redis, ttl=300)from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import redis.asyncio as redis
import json
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.redis = redis.from_url('redis://localhost:6379/0')
@app.on_event("shutdown")
async def shutdown():
await app.state.redis.close()
@app.websocket("/ws/{room_id}")
async def websocket_room(websocket: WebSocket, room_id: str):
await websocket.accept()
redis_client: redis.Redis = app.state.redis
pubsub = redis_client.pubsub()
# Подписка на канал комнаты
await pubsub.subscribe(f'room:{room_id}')
try:
while True:
# Получение сообщений от клиента
data = await websocket.receive_text()
message = json.loads(data)
# Публикация в Redis
await redis_client.publish(
f'room:{room_id}',
json.dumps({
'type': 'message',
'user': message.get('user'),
'text': message.get('text'),
}),
)
# Получение сообщений из Redis и отправка клиенту
async for message in pubsub.listen():
if message['type'] == 'message':
await websocket.send_text(message['data'])
except WebSocketDisconnect:
await pubsub.unsubscribe(f'room:{room_id}')
await redis_client.close()from fastapi import FastAPI, BackgroundTasks
import redis.asyncio as redis
import json
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.redis = redis.from_url('redis://localhost:6379/0')
async def send_email_task(redis_client: redis.Redis, email: str, subject: str, body: str):
"""Фоновая задача отправки email"""
# Добавление в очередь задач
await redis_client.lpush('queue:emails', json.dumps({
'email': email,
'subject': subject,
'body': body,
}))
@app.post("/send-email")
async def send_email(
email: str,
subject: str,
body: str,
background_tasks: BackgroundTasks,
):
redis_client: redis.Redis = app.state.redis
# Добавление задачи в фон
background_tasks.add_task(send_email_task, redis_client, email, subject, body)
return {'status': 'Email queued'}| Настройка | Значение |
|---|---|
| URL из env | REDIS_URL environment variable |
| Таймауты | socket_timeout=5.0, socket_connect_timeout=5.0 |
| Пул соединений | max_connections=50 (настроить под нагрузку) |
| Health checks | health_check_interval=30 |
| Graceful shutdown | await redis.close() в lifespan |
| Decode responses | decode_responses=True для строк |
| Retry logic | Retry с ExponentialBackoff для production |
| TLS | rediss:// схема URL для production |
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Redis
redis_url: str = 'redis://localhost:6379/0'
redis_max_connections: int = 100
redis_socket_timeout: float = 5.0
redis_socket_connect_timeout: float = 5.0
redis_health_check_interval: int = 30
redis_retry_on_timeout: bool = True
# Cache
cache_default_ttl: int = 300 # 5 минут
# Rate limiting
rate_limit_calls: int = 100
rate_limit_period: int = 60
class Config:
env_prefix = 'APP_'
env_file = '.env'# .env
APP_REDIS_URL=rediss://:password@redis.internal:6379/0
APP_REDIS_MAX_CONNECTIONS=100
APP_CACHE_DEFAULT_TTL=600
APP_RATE_LIMIT_CALLS=100
APP_RATE_LIMIT_PERIOD=60Проверьте понимание → ответьте на вопросы в fastapi_integration.json
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.