Развёртывание RAG-системы: API, кэширование, мониторинг, масштабирование и best practices
Прототип работает на ноутбуке. Продакшен — на сервере с тысячами пользователей.
Всё, что мы делали в предыдущих темах — это прототип. Для продакшена нужны:
В этой теме мы разберём, как превратить RAG-прототип в рабочую систему.
FastAPI — лучший выбор для создания API. Он быстр, асинхронный и автоматически генерирует документацию.
pip install fastapi uvicornfrom fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI(title="RAG API", version="1.0")
class QueryRequest(BaseModel):
question: str
top_k: int = 3
class QueryResponse(BaseModel):
answer: str
sources: list[str]
query_time_ms: float
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""Основной endpoint для вопросов."""
import time
start = time.time()
# Здесь ваша RAG-логика:
# 1. Найти релевантные чанки
# 2. Сгенерировать ответ через LLM
elapsed = (time.time() - start) * 1000
return QueryResponse(
answer="Ответ на вопрос",
sources=["doc_1.pdf", "doc_2.pdf"],
query_time_ms=elapsed
)Запуск:
uvicorn main:app --host 0.0.0.0 --port 8000Документация автоматически доступна по адресу http://localhost:8000/docs.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import chromadb
from openai import OpenAI
import time
app = FastAPI(title="RAG API", version="1.0")
# Инициализация клиентов при старте
client = chromadb.PersistentClient(path="./rag_store")
collection = client.get_collection("company_docs")
llm_client = OpenAI()
class QueryRequest(BaseModel):
question: str
top_k: int = 3
include_sources: bool = True
class QueryResponse(BaseModel):
answer: str
sources: list[dict] | None = None
query_time_ms: float
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
start = time.time()
try:
# 1. Retrieval
results = collection.query(
query_texts=[request.question],
n_results=request.top_k
)
contexts = results["documents"][0]
sources = results["metadatas"][0]
if not contexts:
return QueryResponse(
answer="Информация не найдена в документах.",
sources=[],
query_time_ms=(time.time() - start) * 1000
)
# 2. Generation
context = "\n\n".join(contexts)
prompt = f"""Контекст:
{context}
Вопрос: {request.question}
Ответь на основе контекста."""
response = llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1
)
answer = response.choices[0].message.content
# 3. Response
sources_out = None
if request.include_sources:
sources_out = [{"source": s.get("source", "unknown")} for s in sources]
return QueryResponse(
answer=answer,
sources=sources_out,
query_time_ms=(time.time() - start) * 1000
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "ok"}
@app.get("/stats")
async def stats():
"""Статистика коллекции."""
count = collection.count()
return {"documents_in_collection": count}Многие вопросы повторяются. Кэширование ответов экономит токены и ускоряет ответ.
import hashlib
import json
from pathlib import Path
CACHE_DIR = Path("./cache")
CACHE_DIR.mkdir(exist_ok=True)
def get_cache_key(question: str, top_k: int) -> str:
"""Создаёт уникальный ключ кэша."""
content = f"{question}|{top_k}"
return hashlib.md5(content.encode()).hexdigest()
def get_from_cache(question: str, top_k: int) -> str | None:
"""Получает ответ из кэша или None."""
key = get_cache_key(question, top_k)
cache_file = CACHE_DIR / f"{key}.json"
if cache_file.exists():
with open(cache_file) as f:
data = json.load(f)
return data.get("answer")
return None
def save_to_cache(question: str, top_k: int, answer: str):
"""Сохраняет ответ в кэш."""
key = get_cache_key(question, top_k)
cache_file = CACHE_DIR / f"{key}.json"
with open(cache_file, "w") as f:
json.dump({"question": question, "answer": answer}, f)
# Использование в API
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
# Проверяем кэш
cached = get_from_cache(request.question, request.top_k)
if cached:
return QueryResponse(
answer=cached,
sources=[],
query_time_ms=0.1 # Кэш — мгновенно
)
# ... обычная RAG-логика ...
# Сохраняем в кэш
save_to_cache(request.question, request.top_k, answer)
return QueryResponse(answer=answer, sources=sources_out, query_time_ms=elapsed)Для продакшена лучше использовать Redis вместо файлового кэша:
import redis
import json
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_from_cache(question: str) -> str | None:
key = f"rag:{hashlib.md5(question.encode()).hexdigest()}"
return redis_client.get(key)
def save_to_cache(question: str, answer: str, ttl: int = 3600):
"""TTL = время жизни кэша в секундах (1 час по умолчанию)."""
key = f"rag:{hashlib.md5(question.encode()).hexdigest()}"
redis_client.setex(key, ttl, answer)import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("rag_api.log"),
logging.StreamHandler() # Также в консоль
]
)
logger = logging.getLogger("rag_api")
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
logger.info(f"Запрос: {request.question}")
try:
# ... RAG-логика ...
logger.info(f"Ответ получен за {elapsed:.0f}ms")
return response
except Exception as e:
logger.error(f"Ошибка: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))from collections import deque
import statistics
# Храним последние 100 времён запросов
query_times = deque(maxlen=100)
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
start = time.time()
# ... RAG-логика ...
elapsed = (time.time() - start) * 1000
query_times.append(elapsed)
return QueryResponse(answer=answer, query_time_ms=elapsed)
@app.get("/stats")
async def stats():
if not query_times:
return {"message": "No queries yet"}
return {
"avg_query_time_ms": statistics.mean(query_times),
"p50_query_time_ms": statistics.median(query_times),
"p95_query_time_ms": sorted(query_times)[int(len(query_times) * 0.95)],
"total_queries": len(query_times)
}Для развёртывания удобно упаковать API в Docker:
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]requirements.txt:
fastapi
uvicorn
chromadb
openai
sentence-transformers
pypdf
python-docx
pandas
beautifulsoup4
Сборка и запуск:
docker build -t rag-api .
docker run -p 8000:8000 -v ./rag_store:/app/rag_store rag-apiЗапустите несколько инстансов API за load balancer (nginx):
upstream rag_backend {
server 127.0.0.1:8001;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
}
server {
listen 80;
location / {
proxy_pass http://rag_backend;
}
}ChromaDB работает в одном процессе — не масштабируется. Для продакшена:
Ограничение запросов для предотвращения злоупотреблений:
from fastapi import Request, HTTPException
from datetime import datetime, timedelta
from collections import defaultdict
# Простой rate limiter
rate_limits = defaultdict(list)
RATE_LIMIT = 10 # запросов
RATE_WINDOW = 60 # секунд
async def check_rate_limit(client_ip: str) -> bool:
now = datetime.now()
window_start = now - timedelta(seconds=RATE_WINDOW)
# Удаляем старые запросы
rate_limits[client_ip] = [
t for t in rate_limits[client_ip] if t > window_start
]
if len(rate_limits[client_ip]) >= RATE_LIMIT:
return False
rate_limits[client_ip].append(now)
return True
@app.post("/query")
async def query(request: QueryRequest, req: Request):
client_ip = req.client.host
if not await check_rate_limit(client_ip):
raise HTTPException(
status_code=429,
detail="Слишком много запросов. Попробуйте позже."
)
# ... RAG-логика ...from fastapi import Depends, Header, HTTPException
VALID_API_KEYS = ["key1", "key2"] # В продакшене — из БД или env
async def verify_api_key(x_api_key: str = Header(...)):
if x_api_key not in VALID_API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API key")
@app.post("/query", dependencies=[Depends(verify_api_key)])
async def query(request: QueryRequest):
# Доступно только с валидным API-ключом
...class QueryRequest(BaseModel):
question: str
@field_validator("question")
@classmethod
def question_not_empty(cls, v):
if not v.strip():
raise ValueError("Question cannot be empty")
if len(v) > 1000:
raise ValueError("Question too long (max 1000 chars)")
return vВопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.