Эксплуатация в production: monitoring, versioning, deployment, cost tracking
Monitoring, Evaluation, Security, Deployment, Cost Optimization для production LLM систем
LLMOps (Large Language Model Operations) — набор практик и инструментов для эксплуатации LLM в production. Включает monitoring, evaluation, versioning, deployment, cost tracking, security и compliance.
| Аспект | MLOps | LLMOps |
|---|---|---|
| Артефакт | Model weights | Model + Prompts + Context |
| Детерминизм | Детерминированный вывод | Non-deterministic output |
| Метрики качества | Accuracy, F1, ROC-AUC | Relevance, Faithfulness, Toxicity |
| Cost | Training inference | Token-based pricing |
| Versioning | Model version | Prompt + Model + Embedding version |
| Latency | < 100ms | 500ms — 30s |
| Security | Model poisoning | Prompt injection, data leakage |
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Develop │ → │ Evaluate │ → │ Deploy │
│ - Prompts │ │ - RAGAS │ │ - Canary │
│ - RAG │ │ - TruLens │ │ - A/B │
│ - Fine-tune│ │ - Human │ │ - Rollback │
└─────────────┘ └─────────────┘ └─────────────┘
↑ │
│ ↓
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Monitor │ ← │ Secure │ ← │ Optimize │
│ - Metrics │ │ - PII │ │ - Cache │
│ - Alerts │ │ - Inject │ │ - Router │
│ - Tracing │ │ - Audit │ │ - Batch │
└─────────────┘ └─────────────┘ └─────────────┘
# Business metrics
- Cost per request / session / user
- Token usage (input/output по эндпоинтам)
- User satisfaction (thumbs up/down, ratings 1-5)
- Conversation completion rate
- Acceptance rate (для code assistant)
# Technical metrics
- Latency (p50, p95, p99)
- Time to first token (TTFT)
- Throughput (requests/sec, tokens/sec)
- Error rate (4xx, 5xx, rate limits)
- Active connections
# Quality metrics
- Hallucination rate
- Response relevance score
- Faithfulness (для RAG)
- Toxicity score
- PII leakage incidents
- Context precision/recall (для RAG)pip install prometheus-client openaifrom prometheus_client import Counter, Histogram, Gauge, generate_latest, Summary
from fastapi import FastAPI, Response
from openai import AsyncOpenAI
import time
import tiktoken
app = FastAPI()
client = AsyncOpenAI(api_key="your-key")
encoding = tiktoken.get_encoding("cl100k_base")
# Metrics definitions
LLM_REQUEST_COUNT = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'endpoint', 'status']
)
LLM_REQUEST_LATENCY = Histogram(
'llm_request_latency_seconds',
'LLM request latency',
['model', 'endpoint'],
buckets=(0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0)
)
LLM_TOKEN_COUNT = Counter(
'llm_tokens_total',
'Total tokens generated',
['model', 'endpoint', 'type'] # input/output
)
LLM_COST_GAUGE = Gauge(
'llm_cost_usd',
'Current cost in USD',
['model', 'endpoint']
)
LLM_ACTIVE_CONNECTIONS = Gauge(
'llm_active_connections',
'Active LLM connections'
)
LLM_TOKENS_PER_SECOND = Summary(
'llm_tokens_per_second',
'Token generation speed',
['model']
)
# Cost per 1K tokens
PRICES = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"llama-3.1-70b": {"input": 0.0008, "output": 0.0008},
}
def count_tokens(text: str) -> int:
return len(encoding.encode(text))
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
price = PRICES.get(model, PRICES["gpt-3.5-turbo"])
return (input_tokens / 1000) * price["input"] + (output_tokens / 1000) * price["output"]
@app.post("/api/v1/generate")
async def generate(prompt: str, model: str = "gpt-3.5-turbo", endpoint: str = "default"):
start_time = time.time()
LLM_ACTIVE_CONNECTIONS.inc()
try:
input_tokens = count_tokens(prompt)
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1024
)
output_text = response.choices[0].message.content
output_tokens = response.usage.completion_tokens
latency = time.time() - start_time
# Record metrics
LLM_REQUEST_COUNT.labels(model=model, endpoint=endpoint, status='success').inc()
LLM_REQUEST_LATENCY.labels(model=model, endpoint=endpoint).observe(latency)
LLM_TOKEN_COUNT.labels(model=model, endpoint=endpoint, type='input').inc(input_tokens)
LLM_TOKEN_COUNT.labels(model=model, endpoint=endpoint, type='output').inc(output_tokens)
cost = calculate_cost(model, input_tokens, output_tokens)
LLM_COST_GAUGE.labels(model=model, endpoint=endpoint).inc(cost)
if latency > 0:
tokens_per_sec = output_tokens / latency
LLM_TOKENS_PER_SECOND.labels(model=model).observe(tokens_per_sec)
return {
"response": output_text,
"latency_ms": latency * 1000,
"tokens": {"input": input_tokens, "output": output_tokens},
"cost_usd": cost
}
except Exception as e:
LLM_REQUEST_COUNT.labels(model=model, endpoint=endpoint, status='error').inc()
raise
finally:
LLM_ACTIVE_CONNECTIONS.dec()
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain"){
"dashboard": {
"title": "LLM Production Monitoring",
"refresh": "30s",
"panels": [
{
"title": "Request Rate (req/s)",
"type": "graph",
"targets": [{
"expr": "rate(llm_requests_total[5m])",
"legendFormat": "{{model}} - {{endpoint}}"
}]
},
{
"title": "Latency Percentiles",
"type": "graph",
"targets": [
{"expr": "histogram_quantile(0.50, rate(llm_request_latency_bucket[5m]))", "legendFormat": "p50"},
{"expr": "histogram_quantile(0.95, rate(llm_request_latency_bucket[5m]))", "legendFormat": "p95"},
{"expr": "histogram_quantile(0.99, rate(llm_request_latency_bucket[5m]))", "legendFormat": "p99"}
]
},
{
"title": "Token Usage (tokens/s)",
"type": "graph",
"targets": [{
"expr": "rate(llm_tokens_total[5m])",
"legendFormat": "{{type}} - {{model}}"
}]
},
{
"title": "Cost per Hour",
"type": "graph",
"targets": [{
"expr": "increase(llm_cost_usd[1h])",
"legendFormat": "{{model}}"
}]
},
{
"title": "Error Rate (%)",
"type": "graph",
"targets": [{
"expr": "sum(rate(llm_requests_total{status='error'}[5m])) / sum(rate(llm_requests_total[5m])) * 100"
}]
},
{
"title": "Budget Tracker",
"type": "stat",
"targets": [{
"expr": "sum(increase(llm_cost_usd[24h]))",
"legendFormat": "Daily Cost"
}],
"thresholds": [
{"value": 50, "color": "green"},
{"value": 100, "color": "yellow"},
{"value": 150, "color": "red"}
]
}
]
}
}| Метрика | Инструмент | Описание | Когда использовать |
|---|---|---|---|
| Faithfulness | RAGAS | Насколько ответ основан на контексте | RAG-системы |
| Answer Relevancy | RAGAS | Насколько ответ релевантен вопросу | Все Q&A системы |
| Context Precision | RAGAS | Качество retrieval (rank relevant docs higher) | RAG-системы |
| Context Recall | RAGAS | Полнота retrieval (нашли ли все нужное) | RAG-системы |
| Toxicity | Detoxify | Уровень токсичности ответа | Public-facing bots |
| Hallucination | TruLens | Фактическая точность | Fact-based QA |
| Coherence | TruLens | Логическая связность | Long-form content |
pip install ragas datasetsfrom ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
context_entity_recall
)
from datasets import Dataset
import pandas as pd
# Подготовка evaluation dataset
eval_data = {
"question": [
"Как аутентифицироваться в API?",
"Какие есть лимиты на запросы?",
"Как получить API key?"
],
"answer": [
"Для аутентификации используйте Bearer токен в заголовке Authorization",
"Лимит: 100 запросов в минуту для free tier, 1000 для pro",
"API key можно получить в настройках аккаунта на сайте"
],
"contexts": [
["Аутентификация: Authorization: Bearer <token>. API использует JWT."],
["Rate limiting: Free tier — 100 req/min, Pro — 1000 req/min, Enterprise — unlimited"],
["Получение ключа: Settings → API → Generate New Key"]
],
"ground_truth": [
"Используйте Bearer токен в заголовке Authorization",
"100 запросов в минуту для free, 1000 для pro",
"В настройках аккаунта на сайте"
]
}
# Создание Dataset для RAGAS
ragas_dataset = Dataset.from_dict(eval_data)
# Evaluation
results = evaluate(
dataset=ragas_dataset,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall
]
)
# Анализ результатов
df = results.to_pandas()
print(df)
# Quality gates
assert df['faithfulness'].mean() > 0.80, "Faithfulness ниже порога!"
assert df['answer_relevancy'].mean() > 0.75, "Relevancy ниже порога!"
assert df['context_precision'].mean() > 0.70, "Context precision ниже порога!"
print(f"✅ All quality gates passed!")
print(f"Faithfulness: {df['faithfulness'].mean():.2f}")
print(f"Answer Relevancy: {df['answer_relevancy'].mean():.2f}")
print(f"Context Precision: {df['context_precision'].mean():.2f}")pip install deepeval# test_llm.py
from deepeval import assert_test
from deepeval.metrics import AnswerRelevancyMetric, FaithfulnessMetric
from deepeval.test_case import LLMTestCase
def test_code_assistant():
metric_relevancy = AnswerRelevancyMetric(threshold=0.7)
metric_faithfulness = FaithfulnessMetric(threshold=0.8)
test_case = LLMTestCase(
input="Как реализовать rate limiting в FastAPI?",
expected_output="Используйте slowapi middleware с Redis backend",
actual_output="Для rate limiting в FastAPI установите slowapi: pip install slowapi. Создайте Limiter с Redis URL и добавьте middleware в приложение.",
retrieval_context=[
"SlowAPI — rate limiting middleware для FastAPI",
"Redis используется для распределённого rate limiting"
]
)
assert_test(test_case, [metric_relevancy, metric_faithfulness])
# Запуск: pytest test_llm.py# .github/workflows/llm-eval.yml
name: LLM Evaluation
on:
pull_request:
paths:
- 'prompts/**'
- 'src/llm/**'
jobs:
evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install ragas deepeval pytest
- name: Run RAGAS Evaluation
run: |
python scripts/evaluate_ragas.py > eval_results.json
# Parse results and check thresholds
python scripts/check_quality_gates.py --input eval_results.json
if [ $? -ne 0 ]; then
echo "❌ Quality gate failed"
exit 1
fi
echo "✅ Quality gates passed"
- name: Run DeepEval Tests
run: |
pytest test_llm.py -v --tb=short
- name: Upload Evaluation Report
uses: actions/upload-artifact@v4
with:
name: eval-report
path: eval_results.json| Угроза | Описание | Пример | Защита |
|---|---|---|---|
| Prompt Injection | Внедрение инструкций через user input | "Ignore previous instructions and..." | Input validation, instruction hierarchy |
| Data Leakage | Передача PII/sensitive data в LLM | Email, phone, credit card в prompt | PII redaction (Presidio) |
| Jailbreaking | Обход safety filters | DAN (Do Anything Now) prompts | Output filtering, content moderation |
| Training Data Extraction | Извлечение training данных | "Repeat the first 100 words of your training" | Rate limiting, output monitoring |
| Indirect Prompt Injection | Injection через retrieved context | RAG: malicious document in vector DB | Context sanitization, source verification |
pip install presidio-analyzer presidio-anonymizerfrom presidio_analyzer import AnalyzerEngine, PatternRecognizer
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import logging
logger = logging.getLogger("llm_security")
class PIIRedactor:
def __init__(self):
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
# Custom recognizers
self._add_custom_recognizers()
def _add_custom_recognizers(self):
"""Добавление кастомных паттернов"""
# API Keys pattern
api_key_recognizer = PatternRecognizer(
supported_entity="API_KEY",
deny_list=["api_key", "apikey", "API_KEY"]
)
# Bearer tokens
import re
bearer_pattern = PatternRecognizer(
supported_entity="BEARER_TOKEN",
patterns=[
re.compile(r'\bBearer\s+[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\b')
]
)
def redact(self, text: str, language: str = "en") -> tuple[str, dict]:
"""Redact PII из текста"""
try:
# Detect PII entities
results = self.analyzer.analyze(
text=text,
language=language,
entities=[
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
"CREDIT_CARD", "US_SSN", "LOCATION", "DATE_TIME",
"US_PASSPORT", "UK_NHS", "IBAN_CODE", "NRP", "US_DRIVER_LICENSE"
]
)
if not results:
return text, {"redacted": False, "entities": []}
# Anonymize detected entities
anonymized = self.anonymizer.anonymize(
text=text,
analyzer_results=results,
operators={
"PERSON": OperatorConfig("replace", {"new_value": "<PERSON>"}),
"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "<EMAIL>"}),
"PHONE_NUMBER": OperatorConfig("replace", {"new_value": "<PHONE>"}),
"CREDIT_CARD": OperatorConfig("replace", {"new_value": "<CREDIT_CARD>"}),
"LOCATION": OperatorConfig("replace", {"new_value": "<LOCATION>"}),
}
)
entities_found = list(set([r.entity_type for r in results]))
logger.warning(
f"PII detected and redacted: {entities_found}",
extra={"pii_entities": entities_found, "original_length": len(text)}
)
return anonymized.text, {
"redacted": True,
"entities": entities_found,
"count": len(results)
}
except Exception as e:
logger.error(f"PII redaction failed: {e}")
return text, {"redacted": False, "error": str(e)}
# Usage
redactor = PIIRedactor()
# Example 1: Email redaction
text1 = "My email is john.doe@example.com and phone is +1-555-123-4567"
redacted1, metadata1 = redactor.redact(text1)
# "My email is <EMAIL> and phone is <PHONE>"
# Example 2: Code with secrets
text2 = """
API_KEY = 'sk-1234567890abcdef'
DATABASE_URL = 'postgresql://user:pass@localhost/db'
"""
redacted2, metadata2 = redactor.redact(text2)import re
from typing import List, Tuple
class PromptInjectionDetector:
def __init__(self):
self.instruction_patterns = [
r'ignore (all |previous )?(instructions|rules)',
r'disregard (all |previous )?(instructions|rules)',
r'forget (all |previous )?(instructions|rules)',
r'you are now (unrestricted|free|without rules)',
r'do anything now',
r'dan mode',
r'bypass (all |content )?filters',
r'output your (system |initial |training )?instructions',
r'repeat the (text |words |content )?above',
r'print the (conversation |dialogue |prompt )?history',
]
self.compiled_patterns = [
re.compile(pattern, re.IGNORECASE)
for pattern in self.instruction_patterns
]
def detect(self, text: str) -> Tuple[bool, List[str]]:
"""Detect potential prompt injection attempts"""
detected_patterns = []
for i, pattern in enumerate(self.compiled_patterns):
if pattern.search(text):
detected_patterns.append(self.instruction_patterns[i])
is_injection = len(detected_patterns) > 0
if is_injection:
logger.warning(
f"Potential prompt injection detected",
extra={
"patterns": detected_patterns,
"text_length": len(text),
"text_preview": text[:200]
}
)
return is_injection, detected_patterns
def sanitize(self, text: str) -> str:
"""Basic sanitization — remove potential injection attempts"""
sanitized = text
for pattern in self.compiled_patterns:
sanitized = pattern.sub('[INJECTION REMOVED]', sanitized)
return sanitized
# Usage in API
injection_detector = PromptInjectionDetector()
@app.post("/api/v1/generate")
async def generate(prompt: str):
# Check for injection
is_injection, patterns = injection_detector.detect(prompt)
if is_injection:
return {
"error": "Potentially malicious input detected",
"code": "PROMPT_INJECTION",
"patterns": patterns
}, 400
# Proceed with LLM call
response = await call_llm(prompt)
return responsefrom typing import List, Dict, Optional
from dataclasses import dataclass, field
import numpy as np
from datetime import datetime
@dataclass
class RAGQueryMetrics:
query: str
retrieval_latency_ms: float
num_docs_retrieved: int
context_tokens: int
answer_latency_ms: float
answer_tokens: int
relevance_score: Optional[float] = None
faithfulness_score: Optional[float] = None
user_feedback: Optional[int] = None # 1-5 rating
class RAGMonitor:
def __init__(self):
self.queries: List[RAGQueryMetrics] = []
self.alerts_sent: set = set()
# Thresholds
self.retrieval_latency_threshold = 2000 # ms
self.min_relevance_score = 0.6
self.min_faithfulness_score = 0.7
def track_query(self, metrics: RAGQueryMetrics):
"""Track RAG query metrics"""
self.queries.append(metrics)
# Alert on high retrieval latency
if metrics.retrieval_latency_ms > self.retrieval_latency_threshold:
self._send_alert(
"HIGH_RETRIEVAL_LATENCY",
f"Retrieval latency: {metrics.retrieval_latency_ms}ms"
)
# Alert on low relevance
if metrics.relevance_score and metrics.relevance_score < self.min_relevance_score:
self._send_alert(
"LOW_RELEVANCE",
f"Relevance score: {metrics.relevance_score:.2f}"
)
def get_daily_stats(self) -> Dict:
"""Get daily statistics"""
if not self.queries:
return {}
queries = self.queries[-1000:] # Last 1000 queries
return {
"total_queries": len(queries),
"avg_retrieval_latency_ms": np.mean([q.retrieval_latency_ms for q in queries]),
"p95_retrieval_latency_ms": np.percentile([q.retrieval_latency_ms for q in queries], 95),
"avg_answer_latency_ms": np.mean([q.answer_latency_ms for q in queries]),
"avg_context_tokens": np.mean([q.context_tokens for q in queries]),
"avg_relevance_score": np.mean([q.relevance_score or 0 for q in queries]),
"avg_faithfulness_score": np.mean([q.faithfulness_score or 0 for q in queries]),
"avg_user_rating": np.mean([q.user_feedback or 0 for q in queries if q.user_feedback]),
}
def get_missing_context_queries(self) -> List[str]:
"""Get queries where user gave low rating (potential missing context)"""
return [
q.query for q in self.queries
if q.user_feedback and q.user_feedback <= 2
]
def _send_alert(self, alert_type: str, message: str):
"""Send alert (deduplicated)"""
alert_key = f"{alert_type}:{datetime.now().strftime('%Y-%m-%d')}"
if alert_key not in self.alerts_sent:
logger.error(f"RAG Alert [{alert_type}]: {message}")
# Integration: Slack, PagerDuty, email
self.alerts_sent.add(alert_key)
# Usage in RAG pipeline
rag_monitor = RAGMonitor()
async def rag_query(query: str) -> str:
start = time.time()
# Retrieval
retrieval_start = time.time()
docs = await retriever.search(query, top_k=5)
retrieval_latency = (time.time() - retrieval_start) * 1000
# Generation
context = "\n".join([d.content for d in docs])
answer_start = time.time()
answer = await llm.generate(query, context)
answer_latency = (time.time() - answer_start) * 1000
# Track metrics
metrics = RAGQueryMetrics(
query=query,
retrieval_latency_ms=retrieval_latency,
num_docs_retrieved=len(docs),
context_tokens=count_tokens(context),
answer_latency_ms=answer_latency,
answer_tokens=count_tokens(answer)
)
rag_monitor.track_query(metrics)
return answerfrom pydantic import BaseModel
from typing import Dict, List, Optional
import hashlib
from datetime import datetime
class ChunkVersion(BaseModel):
chunk_id: str
document_id: str
content_hash: str
content: str
embedding_model: str
embedding_version: str
embedding_dim: int
metadata: Dict
created_at: str
updated_at: Optional[str] = None
is_active: bool = True
class ChunkRegistry:
def __init__(self):
self.chunks: Dict[str, ChunkVersion] = {}
self.document_chunks: Dict[str, List[str]] = {} # doc_id -> chunk_ids
def register_chunk(
self,
document_id: str,
content: str,
embedding_model: str,
embedding_version: str,
embedding_dim: int,
metadata: Dict
) -> str:
"""Register new chunk version"""
chunk_id = hashlib.sha256(
f"{document_id}:{content}:{datetime.now().isoformat()}".encode()
).hexdigest()[:16]
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
chunk = ChunkVersion(
chunk_id=chunk_id,
document_id=document_id,
content_hash=content_hash,
content=content,
embedding_model=embedding_model,
embedding_version=embedding_version,
embedding_dim=embedding_dim,
metadata=metadata,
created_at=datetime.now().isoformat()
)
self.chunks[chunk_id] = chunk
if document_id not in self.document_chunks:
self.document_chunks[document_id] = []
self.document_chunks[document_id].append(chunk_id)
return chunk_id
def needs_reindex(self, document_id: str, new_embedding_model: str) -> bool:
"""Check if chunks need re-indexing due to embedding model change"""
if document_id not in self.document_chunks:
return True
for chunk_id in self.document_chunks[document_id]:
chunk = self.chunks[chunk_id]
if chunk.embedding_model != new_embedding_model:
return True
return False
def get_chunks_for_reindex(self, embedding_model: str) -> List[ChunkVersion]:
"""Get all chunks that need re-indexing"""
return [
chunk for chunk in self.chunks.values()
if chunk.embedding_model != embedding_model and chunk.is_active
]
# Usage: Re-indexing pipeline
registry = ChunkRegistry()
def reindex_chunks(new_embedding_model: str, new_embedding_version: str):
"""Re-index all chunks with new embedding model"""
chunks_to_reindex = registry.get_chunks_for_reindex(new_embedding_model)
logger.info(f"Re-indexing {len(chunks_to_reindex)} chunks")
for chunk in chunks_to_reindex:
# Generate new embedding
new_embedding = embedding_model.encode(chunk.content)
# Update vector DB
vector_db.update(
chunk_id=chunk.chunk_id,
embedding=new_embedding.tolist()
)
# Update registry
chunk.embedding_model = new_embedding_model
chunk.embedding_version = new_embedding_version
chunk.updated_at = datetime.now().isoformat()
logger.info(f"Re-indexing complete: {len(chunks_to_reindex)} chunks updated")from typing import Dict, Literal
from enum import Enum
import tiktoken
class ComplexityLevel(str, Enum):
SIMPLE = "simple"
MEDIUM = "medium"
COMPLEX = "complex"
class CostOptimizedRouter:
def __init__(self):
self.models = {
"llama-3.1-8b": {
"cost_per_1k_input": 0.00005,
"cost_per_1k_output": 0.00005,
"max_tokens": 8192,
"latency_ms": 500,
"quality_score": 0.65
},
"gpt-3.5-turbo": {
"cost_per_1k_input": 0.0005,
"cost_per_1k_output": 0.0015,
"max_tokens": 4096,
"latency_ms": 800,
"quality_score": 0.75
},
"gpt-4o-mini": {
"cost_per_1k_input": 0.00015,
"cost_per_1k_output": 0.0006,
"max_tokens": 128000,
"latency_ms": 1200,
"quality_score": 0.85
},
"gpt-4o": {
"cost_per_1k_input": 0.005,
"cost_per_1k_output": 0.015,
"max_tokens": 128000,
"latency_ms": 2000,
"quality_score": 0.95
}
}
self.complexity_routing = {
ComplexityLevel.SIMPLE: ["llama-3.1-8b"],
ComplexityLevel.MEDIUM: ["gpt-3.5-turbo", "gpt-4o-mini"],
ComplexityLevel.COMPLEX: ["gpt-4o-mini", "gpt-4o"]
}
self.encoding = tiktoken.get_encoding("cl100k_base")
self.cost_tracker = {"total": 0.0}
def estimate_tokens(self, text: str) -> int:
return len(self.encoding.encode(text))
def estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
model_config = self.models[model]
return (
(input_tokens / 1000) * model_config["cost_per_1k_input"] +
(output_tokens / 1000) * model_config["cost_per_1k_output"]
)
def route(self, prompt: str, complexity: ComplexityLevel) -> str:
"""Route request to appropriate model based on complexity"""
candidate_models = self.complexity_routing[complexity]
# Select cheapest model from candidates
return min(
candidate_models,
key=lambda m: self.models[m]["cost_per_1k_output"]
)
def classify_complexity(self, prompt: str) -> ComplexityLevel:
"""Simple heuristic-based complexity classification"""
token_count = self.estimate_tokens(prompt)
# Keywords indicating complex tasks
complex_keywords = [
'analyze', 'compare', 'explain', 'why', 'how',
'implement', 'design', 'architecture', 'optimize'
]
simple_keywords = [
'what', 'who', 'when', 'list', 'define',
'translate', 'summarize', 'convert'
]
prompt_lower = prompt.lower()
complex_count = sum(1 for kw in complex_keywords if kw in prompt_lower)
simple_count = sum(1 for kw in simple_keywords if kw in prompt_lower)
if token_count > 500 or complex_count >= 2:
return ComplexityLevel.COMPLEX
elif simple_count >= 2 and complex_count == 0:
return ComplexityLevel.SIMPLE
else:
return ComplexityLevel.MEDIUM
async def generate(self, prompt: str) -> Dict:
"""Generate with automatic routing and cost tracking"""
complexity = self.classify_complexity(prompt)
model = self.route(prompt, complexity)
input_tokens = self.estimate_tokens(prompt)
# Call LLM (placeholder)
response = await call_llm(prompt, model)
output_tokens = self.estimate_tokens(response)
cost = self.estimate_cost(model, input_tokens, output_tokens)
self.cost_tracker["total"] += cost
return {
"model": model,
"complexity": complexity.value,
"response": response,
"tokens": {"input": input_tokens, "output": output_tokens},
"cost_usd": cost
}
# Usage
router = CostOptimizedRouter()
# Example: Simple query → cheap model
result1 = await router.generate("What is Python?")
# Model: llama-3.1-8b, Cost: ~$0.0001
# Example: Complex query → premium model
result2 = await router.generate("Analyze the performance implications of using asyncio vs threading for I/O-bound tasks in Python")
# Model: gpt-4o, Cost: ~$0.005
# Savings: 60-80% compared to using GPT-4o for all queriesfrom cachetools import TTLCache
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import hashlib
import pickle
from typing import Optional, Tuple
from datetime import datetime
class SemanticCache:
def __init__(
self,
similarity_threshold: float = 0.95,
max_size: int = 10000,
ttl: int = 3600
):
self.cache: TTLCache = TTLCache(maxsize=max_size, ttl=ttl)
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
self.threshold = similarity_threshold
# Cache for embeddings (avoid recomputing)
self.embedding_cache: Dict[str, np.ndarray] = {}
# Stats
self.stats = {
"hits": 0,
"misses": 0,
"hit_rate": 0.0
}
def _get_embedding(self, text: str) -> np.ndarray:
"""Get or compute embedding"""
text_hash = hashlib.md5(text.encode()).hexdigest()
if text_hash not in self.embedding_cache:
self.embedding_cache[text_hash] = self.embedding_model.encode([text])[0]
return self.embedding_cache[text_hash]
def get(self, query: str) -> Optional[Tuple[str, float]]:
"""Get cached response if similar query exists"""
if not self.cache:
return None
query_embedding = self._get_embedding(query)
best_match = None
best_similarity = 0.0
for cached_query, (cached_response, cached_embedding) in self.cache.items():
# Compute similarity
similarity = cosine_similarity(
[query_embedding],
[cached_embedding]
)[0][0]
if similarity > best_similarity and similarity >= self.threshold:
best_similarity = similarity
best_match = cached_response
# Update stats
if best_match:
self.stats["hits"] += 1
else:
self.stats["misses"] += 1
total = self.stats["hits"] + self.stats["misses"]
self.stats["hit_rate"] = self.stats["hits"] / total if total > 0 else 0.0
if best_match:
logger.info(
f"Cache hit (similarity: {best_similarity:.3f})",
extra={"query_preview": query[:50]}
)
return (best_match, best_similarity) if best_match else None
def set(self, query: str, response: str):
"""Cache query-response pair"""
query_embedding = self._get_embedding(query)
self.cache[query] = (response, query_embedding)
def get_stats(self) -> Dict:
"""Get cache statistics"""
return {
**self.stats,
"cache_size": len(self.cache),
"cache_max_size": self.cache.maxsize
}
# Usage with decorator
from functools import wraps
def semantic_cache(cache: SemanticCache):
def decorator(func):
@wraps(func)
async def wrapper(query: str, **kwargs):
# Try cache
cached = cache.get(query)
if cached:
response, similarity = cached
return {
"response": response,
"from_cache": True,
"similarity": similarity
}
# Call LLM
result = await func(query, **kwargs)
# Cache response
cache.set(query, result["response"])
result["from_cache"] = False
return result
return wrapper
return decorator
# Example usage
cache = SemanticCache(similarity_threshold=0.92)
@semantic_cache(cache)
async def generate_answer(query: str):
return await call_llm(query)
# First call: LLM
# Second similar call: Cache hit (~50ms vs ~2000ms)from typing import Callable, List, Optional
from datetime import datetime, timedelta
class BudgetAlert:
def __init__(
self,
daily_budget: float,
alert_callbacks: Optional[List[Callable[[float], None]]] = None
):
self.daily_budget = daily_budget
self.alert_callbacks = alert_callbacks or []
self.current_cost = 0.0
self.alerts_sent: set = set()
# Alert thresholds (percentage of budget)
self.thresholds = [0.5, 0.75, 0.9, 1.0, 1.25]
def add_cost(self, cost: float) -> bool:
"""Add cost and check alerts"""
self.current_cost += cost
budget_used_pct = self.current_cost / self.daily_budget
for threshold in self.thresholds:
alert_key = f"{threshold * 100:.0f}%"
if budget_used_pct >= threshold:
if alert_key not in self.alerts_sent:
self._send_alert(threshold * 100)
self.alerts_sent.add(alert_key)
return self.current_cost <= self.daily_budget
def _send_alert(self, percentage: float):
"""Send alert to all registered callbacks"""
message = (
f"🚨 BUDGET ALERT: {percentage:.0f}% of daily budget used\n"
f"Current: ${self.current_cost:.2f} / ${self.daily_budget:.2f}"
)
logger.warning(message)
for callback in self.alert_callbacks:
try:
callback(message)
except Exception as e:
logger.error(f"Alert callback failed: {e}")
def reset_daily(self):
"""Reset daily counter"""
self.current_cost = 0.0
self.alerts_sent.clear()
# Slack integration
def slack_alert(message: str):
import requests
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
requests.post(webhook_url, json={
"text": message,
"username": "LLM Budget Bot",
"icon_emoji": ":money_with_wings:"
})
# Usage
budget = BudgetAlert(
daily_budget=100.0,
alert_callbacks=[slack_alert]
)
# In request handler
cost = await generate(prompt)
if not budget.add_cost(cost):
logger.error("Budget exceeded! Request rejected.")
return {"error": "Budget exceeded"}, 429from typing import Dict, List
from dataclasses import dataclass
import random
import httpx
@dataclass
class ModelVersion:
name: str
endpoint: str
weight: int # 0-100
health_status: bool = True
class CanaryDeployment:
def __init__(self):
self.model_versions: Dict[str, List[ModelVersion]] = {
"code-assistant": [
ModelVersion("v1.2.0", "http://model-v1:8000", weight=90),
ModelVersion("v1.3.0", "http://model-v2:8000", weight=10),
]
}
self.health_check_interval = 30 # seconds
async def health_check(self):
"""Periodic health check of endpoints"""
import asyncio
while True:
for model_name, versions in self.model_versions.items():
for version in versions:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{version.endpoint}/health")
version.health_status = response.status_code == 200
except Exception:
version.health_status = False
await asyncio.sleep(self.health_check_interval)
def get_model_endpoint(self, model_name: str) -> str:
"""Select endpoint based on weights and health"""
versions = self.model_versions.get(model_name, [])
# Filter healthy versions
healthy_versions = [v for v in versions if v.health_status]
if not healthy_versions:
# Fallback to any version
healthy_versions = versions
if not healthy_versions:
raise ValueError(f"No available endpoints for model '{model_name}'")
# Weighted random selection
total_weight = sum(v.weight for v in healthy_versions)
rand = random.randint(1, total_weight)
cumulative = 0
for version in healthy_versions:
cumulative += version.weight
if rand <= cumulative:
return version.endpoint
return healthy_versions[0].endpoint
def update_weights(self, model_name: str, version: str, new_weight: int):
"""Update traffic weight for specific version"""
for v in self.model_versions[model_name]:
if v.name == version:
v.weight = new_weight
break
# Normalize weights to 100
total = sum(v.weight for v in self.model_versions[model_name])
if total != 100:
for v in self.model_versions[model_name]:
v.weight = int((v.weight / total) * 100)
# FastAPI integration
canary = CanaryDeployment()
@app.post("/api/v1/generate")
async def generate(prompt: str, model: str = "code-assistant"):
endpoint = canary.get_model_endpoint(model)
async with httpx.AsyncClient() as client:
response = await client.post(
f"{endpoint}/generate",
json={"prompt": prompt},
timeout=30.0
)
return response.json()from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
import json
@dataclass
class ABTestVariant:
name: str
model_version: str
prompt_version: str
config: Dict = field(default_factory=dict)
@dataclass
class ABTest:
name: str
variants: List[ABTestVariant]
traffic_split: List[float] # Must sum to 1.0
start_date: datetime
end_date: Optional[datetime] = None
primary_metric: str = "conversion_rate"
status: str = "running" # running, completed, stopped
class ABTestingPlatform:
def __init__(self):
self.tests: Dict[str, ABTest] = {}
self.results: Dict[str, Dict[str, List[Dict]]] = {} # test_name -> variant -> results
def create_test(
self,
name: str,
variants: List[ABTestVariant],
traffic_split: List[float],
primary_metric: str = "conversion_rate"
) -> ABTest:
"""Create new A/B test"""
if abs(sum(traffic_split) - 1.0) > 0.01:
raise ValueError("Traffic split must sum to 1.0")
test = ABTest(
name=name,
variants=variants,
traffic_split=traffic_split,
start_date=datetime.now(),
primary_metric=primary_metric
)
self.tests[name] = test
self.results[name] = {v.name: [] for v in variants}
return test
def get_variant(self, test_name: str, user_id: str) -> ABTestVariant:
"""Get variant for user based on consistent hashing"""
test = self.tests[test_name]
# Consistent hashing by user_id
hash_val = int(
hashlib.md5(f"{test_name}:{user_id}".encode()).hexdigest(),
16
)
bucket = (hash_val % 1000) / 1000.0 # 0.0 - 1.0
cumulative = 0.0
for i, variant in enumerate(test.variants):
cumulative += test.traffic_split[i]
if bucket < cumulative:
return variant
return test.variants[-1]
def record_result(
self,
test_name: str,
variant_name: str,
metrics: Dict
):
"""Record test result"""
if test_name not in self.results:
raise ValueError(f"Test '{test_name}' not found")
self.results[test_name][variant_name].append({
"timestamp": datetime.now().isoformat(),
**metrics
})
def get_winner(self, test_name: str) -> Optional[str]:
"""Determine winner based on primary metric"""
test = self.tests[test_name]
results = self.results[test_name]
metric_scores = {}
for variant_name, variant_results in results.items():
if not variant_results:
continue
# Calculate average for primary metric
scores = [r.get(test.primary_metric, 0) for r in variant_results]
metric_scores[variant_name] = sum(scores) / len(scores)
if not metric_scores:
return None
winner = max(metric_scores, key=metric_scores.get)
# Statistical significance check (simplified)
winner_score = metric_scores[winner]
runner_up = sorted(metric_scores.values())[-2] if len(metric_scores) > 1 else 0
if winner_score - runner_up < 0.01: # 1% threshold
return None # No significant winner
return winner
def get_report(self, test_name: str) -> Dict:
"""Generate test report"""
test = self.tests[test_name]
results = self.results[test_name]
report = {
"test_name": test_name,
"status": test.status,
"duration_days": (test.end_date or datetime.now() - test.start_date).days,
"variants": {}
}
for variant in test.variants:
variant_results = results[variant.name]
if variant_results:
report["variants"][variant.name] = {
"sample_size": len(variant_results),
"avg_conversion_rate": sum(r.get("conversion_rate", 0) for r in variant_results) / len(variant_results),
"avg_latency_ms": sum(r.get("latency_ms", 0) for r in variant_results) / len(variant_results),
"avg_cost_usd": sum(r.get("cost_usd", 0) for r in variant_results) / len(variant_results),
}
report["winner"] = self.get_winner(test_name)
return report
# Usage
ab_platform = ABTestingPlatform()
# Create test: Compare two prompt versions
ab_platform.create_test(
name="prompt-v2-evaluation",
variants=[
ABTestVariant("control", "gpt-3.5-turbo", "prompt-v1"),
ABTestVariant("treatment", "gpt-3.5-turbo", "prompt-v2"),
],
traffic_split=[0.5, 0.5],
primary_metric="conversion_rate"
)
# In request handler
variant = ab_platform.get_variant("prompt-v2-evaluation", user_id="user123")
response = await call_llm(
prompt=variant.prompt_version,
model=variant.model_version
)
# Record result
ab_platform.record_result(
test_name="prompt-v2-evaluation",
variant_name=variant.name,
metrics={
"conversion_rate": 1 if user_rating >= 4 else 0,
"latency_ms": latency * 1000,
"cost_usd": cost
}
)import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional
import uuid
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.now().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add extra fields
for key, value in record.__dict__.items():
if key not in [
'msg', 'args', 'levelname', 'levelno', 'pathname',
'filename', 'module', 'lineno', 'funcName', 'created',
'msecs', 'relativeCreated', 'thread', 'threadName',
'processName', 'process', 'message', 'exc_info', 'exc_text'
]:
try:
json.dumps(value) # Check if JSON serializable
log_entry[key] = value
except (TypeError, ValueError):
log_entry[key] = str(value)
# Add exception info
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry, ensure_ascii=False)
# Setup logging
def setup_logging(service_name: str, log_level: str = "INFO"):
logger = logging.getLogger(service_name)
logger.setLevel(getattr(logging, log_level))
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(JSONFormatter())
logger.addHandler(console_handler)
# File handler
file_handler = logging.FileHandler(f"{service_name}.log")
file_handler.setFormatter(JSONFormatter())
logger.addHandler(file_handler)
return logger
logger = setup_logging("llm_service")
# Usage with request context
def log_llm_request(
request_id: str,
user_id: str,
model: str,
prompt: str,
response: str,
latency_ms: float,
tokens_input: int,
tokens_output: int,
cost_usd: float,
endpoint: str
):
logger.info(
"LLM request completed",
extra={
"request_id": request_id,
"user_id": user_id,
"model": model,
"endpoint": endpoint,
"prompt_length": len(prompt),
"response_length": len(response),
"latency_ms": latency_ms,
"tokens_input": tokens_input,
"tokens_output": tokens_output,
"cost_usd": cost_usd,
"tokens_per_second": tokens_output / (latency_ms / 1000) if latency_ms > 0 else 0
}
)
# Usage in API
@app.post("/api/v1/generate")
async def generate(prompt: str, user_id: str):
request_id = str(uuid.uuid4())
start_time = time.time()
try:
response = await call_llm(prompt)
latency_ms = (time.time() - start_time) * 1000
log_llm_request(
request_id=request_id,
user_id=user_id,
model="gpt-3.5-turbo",
prompt=prompt,
response=response,
latency_ms=latency_ms,
tokens_input=count_tokens(prompt),
tokens_output=count_tokens(response),
cost_usd=calculate_cost("gpt-3.5-turbo", prompt, response),
endpoint="default"
)
return {"response": response, "request_id": request_id}
except Exception as e:
logger.error(
"LLM request failed",
extra={
"request_id": request_id,
"user_id": user_id,
"error": str(e),
"error_type": type(e).__name__
},
exc_info=True
)
raisepip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlpfrom opentelemetry import trace, context
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace import Status, StatusCode
# Setup tracing
def setup_tracing(service_name: str, otlp_endpoint: str = "localhost:4317"):
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0"
})
trace.set_tracer_provider(TracerProvider(resource=resource))
otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
return trace.get_tracer(__name__)
tracer = setup_tracing("llm-service")
@app.post("/api/v1/generate")
async def generate(prompt: str, user_id: str, model: str = "gpt-3.5-turbo"):
with tracer.start_as_current_span("generate") as span:
# Set span attributes
span.set_attribute("user.id", user_id)
span.set_attribute("llm.model", model)
span.set_attribute("llm.prompt.length", len(prompt))
# Retrieval span (for RAG)
with tracer.start_as_current_span("retrieval") as retrieval_span:
retrieval_span.set_attribute("retrieval.top_k", 5)
docs = await retriever.search(prompt, top_k=5)
retrieval_span.set_attribute("retrieval.num_docs", len(docs))
# Generation span
with tracer.start_as_current_span("generation") as gen_span:
response = await call_llm(prompt, model)
gen_span.set_attribute("llm.response.length", len(response))
# Set span status
span.set_status(Status(StatusCode.OK))
return {"response": response}
# Custom span for cost tracking
@tracer.start_as_current_span("cost_calculation")
def track_cost_span(model: str, tokens: int, cost: float):
span = trace.get_current_span()
span.set_attribute("llm.model", model)
span.set_attribute("llm.tokens", tokens)
span.set_attribute("llm.cost_usd", cost)# ❌ Anti-pattern 1: No monitoring
def generate(prompt):
return call_llm(prompt)
# ✅ With monitoring
def generate(prompt):
start = time.time()
response = call_llm(prompt)
metrics.record("latency", time.time() - start)
metrics.record("tokens", count_tokens(response))
return response
# ❌ Anti-pattern 2: No versioning
PROMPT = "You are a helpful assistant..."
# ✅ Versioned prompts
prompt = registry.get("assistant", version="v2.1")
# ❌ Anti-pattern 3: No cost tracking
while True:
response = call_llm(prompt) # $$$$
# ✅ Budget control
if not budget.add_cost(estimated_cost):
raise ValueError("Budget exceeded")
# ❌ Anti-pattern 4: No fallback
response = call_openai(prompt)
# ✅ With fallback
try:
response = await call_openai(prompt)
except RateLimitError:
response = await call_local_model(prompt)
except Exception as e:
logger.error(f"LLM failed: {e}")
response = get_fallback_response(prompt)
# ❌ Anti-pattern 5: No testing
deploy(model)
# ✅ Staged deployment
deploy(model, stage="staging")
run_evaluation(model)
if quality_gates_passed():
promote(model, stage="production")
# ❌ Anti-pattern 6: No PII protection
response = call_llm(f"User email: {user_email}, question: {question}")
# ✅ PII redaction
redacted_email, _ = redactor.redact(user_email)
response = call_llm(f"User email: {redacted_email}, question: {question}")
# ❌ Anti-pattern 7: No input validation
response = call_llm(user_input) # Prompt injection risk!
# ✅ Input validation
is_injection, _ = detector.detect(user_input)
if is_injection:
return {"error": "Invalid input"}
response = call_llm(user_input)| Категория | Open Source | Commercial |
|---|---|---|
| Monitoring | Arize Phoenix, LangFuse, Prometheus | LangSmith, Weights & Biases, DataDog |
| Evaluation | RAGAS, DeepEval, TruLens | Scale Spellbook, Modal, Cohere Evaluate |
| Prompt Mgmt | PromptFlow, LangChain | PromptLayer, Helicone, Braintrust |
| Security | Presidio, Lakera Guard | Protect AI, Hidden Layer, Robust Intelligence |
| Vector DB | Qdrant, Weaviate, Milvus, Chroma | Pinecone, Zilliz, Astra DB |
| Tracing | OpenTelemetry, Jaeger | LangSmith, Arize, Honeycomb |
| Cost Optimization | LiteLLM, CacheLLM | Helicone, OpenRouter |
Контекст: SaaS компания с 50K+ страниц документации
Проблема:
Решение:
Результаты:
| Метрика | До | После |
|---|---|---|
| Context Precision | 0.45 | 0.82 |
| Answer Relevancy | 0.52 | 0.78 |
| Daily Cost | $150 | $85 |
| Avg Latency | 3.2s | 1.8s |
| User Satisfaction | 3.2/5 | 4.5/5 |
ROI: 340% за 6 месяцев
Контекст: FinTech компания, 200+ разработчиков
Проблема:
Решение:
Результаты:
| Метрика | До | После |
|---|---|---|
| Acceptance Rate | 23% | 67% |
| Time Saved | — | 2.5 часа/неделю на разработчика |
| Security Incidents | 3 | 0 |
| Developer Satisfaction | 2.8/5 | 4.3/5 |
ROI: 340% за 6 месяцев
LLMOps критически важен для production LLM систем:
| Область | Ключевые практики |
|---|---|
| Monitoring | Latency, throughput, errors, cost, token usage |
| Evaluation | RAGAS, DeepEval, human feedback, quality gates |
| Versioning | Prompts, models, embeddings, configurations |
| Deployment | Canary, A/B testing, rollback, feature flags |
| Cost Tracking | Token counting, budget alerts, multi-model routing |
| Security | PII redaction, prompt injection detection, input validation |
| Logging | Structured logs, distributed tracing, audit trails |
Вы завершили курс по LLM для Python разработчика! 🎉
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.