ContextVars, адаптеры, correlation ID, trace ID
Контекстное логирование добавляет к каждому логу информацию о запросе (user_id, request_id, trace_id). Это позволяет отследить весь путь запроса.
Проблема: В production-приложении обрабатываются тысячи запросов параллельно. Как понять, какие логи относятся к одному запросу?
Решение: Correlation ID — уникальный идентификатор запроса.
Запрос 1: request_id = "abc-123"
├─ logger.info("Request started")
├─ logger.info("Database query")
├─ logger.info("External API call")
└─ logger.info("Response sent")
Запрос 2: request_id = "xyz-789"
├─ logger.info("Request started")
├─ logger.info("Database query")
└─ logger.info("Response sent")
С correlation ID в логах:
{"request_id": "abc-123", "message": "Request started"}
{"request_id": "abc-123", "message": "Database query"}
{"request_id": "xyz-789", "message": "Request started"}
{"request_id": "abc-123", "message": "External API call"}
{"request_id": "abc-123", "message": "Response sent"}
{"request_id": "xyz-789", "message": "Response sent"}Теперь легко отфильтровать логи одного запроса:
# В Kibana / grep
grep "abc-123" app.logimport uuid
def generate_correlation_id() -> str:
"""Генерация уникального correlation ID."""
return str(uuid.uuid4())
# Пример: "f47ac10b-58cc-4372-a567-0e02b2c3d479"# В HTTP запросе
# X-Correlation-ID: f47ac10b-58cc-4372-a567-0e02b2c3d479
correlation_id = request.headers.get(
'X-Correlation-ID',
generate_correlation_id()
)Проблема: Не хочется указывать extra={} в каждом вызове logger.
Решение: LoggerAdapter автоматически добавляет контекст.
import logging
from pythonjsonlogger import jsonlogger
# Настройка JSON logger
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
# Создание адаптера с контекстом
correlation_id = "abc-123"
user_id = 42
adapter = logging.LoggerAdapter(
logger,
extra={
"correlation_id": correlation_id,
"user_id": user_id
}
)
# Контекст добавляется автоматически
adapter.info("Request started")
adapter.info("Processing data", extra={"records": 100})
adapter.error("Something went wrong")Вывод:
{"correlation_id": "abc-123", "user_id": 42, "message": "Request started"}
{"correlation_id": "abc-123", "user_id": 42, "records": 100, "message": "Processing data"}
{"correlation_id": "abc-123", "user_id": 42, "message": "Something went wrong"}Проблема: Нужно добавить контекст ко всем логам запроса в веб-приложении.
Решение: Middleware создаёт адаптер при начале запроса.
# fastapi_app.py
import logging
import uuid
from contextvars import ContextVar
from fastapi import FastAPI, Request
from pythonjsonlogger import jsonlogger
# ContextVar для хранения correlation_id в async коде
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')
app = FastAPI()
# Настройка logging
logger = logging.getLogger('app')
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class CorrelationIdFilter(logging.Filter):
"""Добавляет correlation_id из ContextVar ко всем логам."""
def filter(self, record):
record.correlation_id = correlation_id_var.get()
return True
logger.addFilter(CorrelationIdFilter())
@app.middleware("http")
async def correlation_id_middleware(request: Request, call_next):
# Извлечение или генерация correlation_id
correlation_id = request.headers.get(
'X-Correlation-ID',
str(uuid.uuid4())
)
# Сохранение в ContextVar
correlation_id_var.set(correlation_id)
# Логирование начала запроса
logger.info(
"Request started",
extra={
"method": request.method,
"path": request.url.path,
"user_agent": request.headers.get('user-agent')
}
)
# Обработка запроса
response = await call_next(request)
# Добавление correlation_id в ответ
response.headers['X-Correlation-ID'] = correlation_id
# Логирование завершения
logger.info(
"Request completed",
extra={"status_code": response.status_code}
)
return response
@app.get("/users/{user_id}")
async def get_user(user_id: int):
logger.info(f"Fetching user {user_id}")
# ... обработка
return {"user_id": user_id}Вывод для каждого запроса:
{"correlation_id": "abc-123", "message": "Request started", "method": "GET", "path": "/users/42"}
{"correlation_id": "abc-123", "message": "Fetching user 42"}
{"correlation_id": "abc-123", "message": "Request completed", "status_code": 200}Проблема: Глобальные переменные не работают в async коде — разные задачи перемешивают контексты.
Решение: ContextVars предоставляет изолированные контексты для async задач.
# ❌ Плохо: глобальная переменная в async коде
correlation_id = None # Глобальная переменная
async def handle_request(request_id):
global correlation_id
correlation_id = request_id
# Другая задача может изменить correlation_id
# пока эта задача ожидает I/O
await some_async_operation()
logger.info(f"Processing {correlation_id}") # Может быть чужой ID!# ✅ Хорошо: ContextVars для async контекста
from contextvars import ContextVar
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')
async def handle_request(request_id):
# Установка значения для текущего контекста
correlation_id_var.set(request_id)
# Другие async задачи не видят это значение
await some_async_operation()
# Значение гарантированно принадлежит этой задаче
logger.info(f"Processing {correlation_id_var.get()}")from contextvars import ContextVar, Token
from contextlib import contextmanager
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')
@contextmanager
def correlation_id_context(correlation_id: str):
"""Контекстный менеджер для временной установки correlation_id."""
token: Token = correlation_id_var.set(correlation_id)
try:
yield
finally:
correlation_id_var.reset(token)
# Использование
async def handle_request(request_id):
with correlation_id_context(request_id):
# correlation_id установлен в этом блоке
await process()
# correlation_id сброшенПроблема: Нужно добавить контекст ко всем логам централизованно.
Решение: Кастомный Filter модифицирует LogRecord.
import logging
class ContextFilter(logging.Filter):
"""Добавляет контекстные поля ко всем LogRecord."""
def __init__(self, **context):
super().__init__()
self.context = context
def filter(self, record):
# Добавление полей контекста
for key, value in self.context.items():
setattr(record, key, value)
return True
# Использование
logger = logging.getLogger(__name__)
context_filter = ContextFilter(
environment='production',
service='users-api',
version='1.2.3'
)
logger.addFilter(context_filter)
logger.info("Request processed")
# Вывод: {..., "environment": "production", "service": "users-api", "version": "1.2.3"}import logging
from contextvars import ContextVar
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')
user_id_var: ContextVar[int] = ContextVar('user_id', default=None)
class ContextVarFilter(logging.Filter):
"""Добавляет значения из ContextVars ко всем логам."""
def filter(self, record):
record.correlation_id = correlation_id_var.get()
record.user_id = user_id_var.get()
return True
# Настройка
logger = logging.getLogger(__name__)
logger.addFilter(ContextVarFilter())
# В middleware
async def middleware(request, call_next):
correlation_id_var.set(request.headers.get('X-Correlation-ID', generate_uuid()))
user_id_var.set(request.state.user_id)
logger.info("Request started") # Автоматически включает correlation_id и user_id
return await call_next(request)Проблема: Запрос проходит через несколько сервисов. Как отследить весь путь?
Решение: Trace ID + Span ID для distributed tracing.
| Поле | Описание | Пример |
|---|---|---|
| Trace ID | Идентифицирует весь trace через все сервисы | abc123def456 |
| Span ID | Идентифицирует конкретную операцию (span) в trace | span-789 |
| Parent Span ID | ID родительской операции | span-456 |
| Correlation ID | Часто синоним Trace ID в рамках одного сервиса | abc123def456 |
Клиент
│
▼ (X-Trace-ID: abc123)
┌─────────────────┐
│ API Gateway │ ← Извлекает или генерирует trace_id
│ (Kong) │
└────────┬────────┘
│ (X-Trace-ID: abc123)
▼
┌─────────────────┐
│ Users Service │ ← Читает trace_id из заголовка
│ (Python) │
└────────┬────────┘
│ (X-Trace-ID: abc123)
▼
┌─────────────────┐
│ Orders Service │ ← Продолжает trace
│ (Python) │
└─────────────────┘
import logging
import requests
from contextvars import ContextVar
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')
span_id_var: ContextVar[str] = ContextVar('span_id', default='')
def generate_span_id() -> str:
import uuid
return str(uuid.uuid4())[:8]
class TracingFilter(logging.Filter):
def filter(self, record):
record.trace_id = trace_id_var.get()
record.span_id = span_id_var.get()
return True
logger = logging.getLogger(__name__)
logger.addFilter(TracingFilter())
def make_traced_request(url, **kwargs):
"""HTTP запрос с пробросом trace_id."""
headers = kwargs.pop('headers', {})
headers['X-Trace-ID'] = trace_id_var.get()
headers['X-Span-ID'] = generate_span_id()
logger.info(f"Calling {url}", extra={"span_id": headers['X-Span-ID']})
return requests.get(url, headers=headers, **kwargs)
# В middleware
async def tracing_middleware(request, call_next):
# Извлечение или генерация trace_id
trace_id = request.headers.get('X-Trace-ID', generate_uuid())
span_id = generate_span_id()
trace_id_var.set(trace_id)
span_id_var.set(span_id)
logger.info("Request received")
response = await call_next(request)
# Исходящий запрос с trace_id
if should_call_external_api():
make_traced_request('https://api.example.com')
return responseПроблема: Нужно стандартное решение для distributed tracing.
Решение: OpenTelemetry — стандарт индустрии для трассировки.
pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentationfrom opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# Настройка tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Экспорт в Jaeger
jaeger_exporter = JaegerExporter(
agent_host_name='localhost',
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Использование
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("user_id", 123)
logger.info("Processing order")
# ... обработка# ✅ Хорошо
correlation_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
# ❌ Плохо: запрос без correlation_id
correlation_id = request.headers.get('X-Correlation-ID') # Может быть None# ✅ Хорошо
response.headers['X-Correlation-ID'] = correlation_id
# Клиент может использовать этот ID для отладки# ✅ Хорошо для async
from contextvars import ContextVar
user_id_var = ContextVar('user_id')
# ❌ Плохо для async
user_id = None # Глобальная переменная# ✅ Хорошо
logger.info("Request started", extra={"correlation_id": cid, "method": method})
# ... обработка
logger.info("Request completed", extra={"correlation_id": cid, "status": status})Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.