structlog, loguru, logbook — сравнение, миграция, когда выбирать
Стандартный
logging— не единственный выбор.structlogиloguruпредлагают более современный API, structured logging из коробки и лучший developer experience. Но когда их выбирать?
🎯 Цель этого руководства: Сделать вас экспертом по логированию в Python. После прочтения вы сможете:
| Критерий | logging | structlog | loguru |
|---|---|---|---|
| Стандартная библиотека | ✅ Да | ❌ Нет | ❌ Нет |
| Зависимости | 0 | 1 | 1 |
| Structured logging | С настройкой | ✅ Из коробки | ✅ Из коробки |
| API простота | Средняя | Средняя | ✅ Очень простой |
| Контекст (bind) | LoggerAdapter | ✅ bind() | ✅ bind() |
| Иммутабельность | ❌ Нет | ✅ Да | ✅ Да |
| Производительность | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| Гибкость | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Интеграция с stdlib | ✅ N/A | ✅ Да | ✅ Частичная |
| Кривая обучения | Средняя | Низкая | Низкая |
| Async поддержка | ❌ Ограничена | ✅ Отличная | ⚠️ Базовая |
| Cloud-native готовность | ⚠️ С настройкой | ✅ Да | ⚠️ Частичная |
| OpenTelemetry интеграция | ✅ Да | ✅ Отличная | ❌ Нет |
| Память (typical) | ~38-43 MB | ~52 MB | ~83 MB |
| Пропускная способность | ~10K записей/сек | ~8K записей/сек | ~6K записей/сек |
Бенчмарки 2025: Данные получены на сервере среднего уровня с RotatingFileHandler. Для high-load систем (>1000 RPS) рекомендуется structlog или logging с queue-based handlers.
Стандартный logging:
structlog:
loguru:
┌─────────────────────────────────────────┐
│ Вы пишете библиотеку для PyPI? │
└─────────────────┬───────────────────────┘
│ Да
▼
┌─────────┐
│ logging │ ← Не добавляйте зависимости
└─────────┘
│ Нет
▼
┌─────────────────────────────────────────┐
│ Нужен structured logging для │
│ production (JSON, cloud, tracing)? │
└─────────────────┬───────────────────────┘
│ Да
▼
┌───────────┐
│ structlog │ ← Лучший выбор для production
└───────────┘
│ Нет
▼
┌─────────────────────────────────────────┐
│ Прототип/скрипт/внутренний инструмент?│
└─────────────────┬───────────────────────┘
│ Да
▼
┌─────────┐
│ loguru │ ← Быстро и просто
└─────────┘
│ Нет (сложный production проект)
▼
┌───────────┐
│ structlog │ ← Или logging с кастомной настройкой
└───────────┘
| Индустрия | Рекомендация | Почему |
|---|---|---|
| FinTech | structlog + OpenTelemetry | Compliance, tracing, audit |
| E-commerce | structlog | Микросервисы, correlation IDs |
| SaaS | structlog или logging | Масштабируемость, cloud-native |
| Data Science | loguru | Прототипирование, скрипты |
| Embedded/IoT | logging | Минимальные зависимости, память |
| Startup MVP | loguru → structlog | Быстрый старт, миграция позже |
⚠️ Важно: Loguru отлично подходит для прототипов и скриптов, но не рекомендуется для production high-load систем из-за более высокого потребления памяти (~83 MB) и отсутствия нативной интеграции с OpenTelemetry.
pip install logurufrom loguru import logger
# Готово! Никакой конфигурации
logger.info("Application started")
logger.warning("Rate limit at 90%")
logger.error("Database connection failed")
# С контекстом
logger.info("User logged in", user_id=123, ip="192.168.1.1")
# ⚠️ SECURITY WARNING: Не логируйте чувствительные данные!
# logger.info(f"User password: {password}") # ❌ Никогда так не делайтеВывод:
2026-03-21 10:00:00.123 | INFO | __main__:main:10 - Application started
2026-03-21 10:00:01.234 | WARNING | __main__:main:11 - Rate limit at 90%
2026-03-21 10:00:02.345 | ERROR | __main__:main:12 - Database connection failed
2026-03-21 10:00:03.456 | INFO | __main__:main:13 - User logged in user_id=123 ip=192.168.1.1
from loguru import logger
# bind() возвращает новый logger с контекстом
request_logger = logger.bind(request_id="abc-123", user_id=42)
request_logger.info("Request started")
request_logger.info("Processing data", records=100)
request_logger.info("Request completed")
# Исходный logger не изменён
logger.info("Another message") # Без request_idfrom loguru import logger
# Декоратор для автоматического catching
@logger.catch
def risky_operation(x, y):
return x / y # ZeroDivisionError будет автоматически залогирован
risky_operation(10, 0) # Не упадёт, ошибка залогируется
# Эквивалент в стандартном logging:
# try:
# risky_operation(10, 0)
# except Exception:
# logger.exception("Error in risky_operation")from loguru import logger
import sys
import logging
# Удаление default handler
logger.remove()
# Добавление своих handlers
logger.add(
sys.stdout,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
# Production: JSON формат для cloud-native логирования
def json_sink(message):
import json
record = message.record
log_entry = {
"timestamp": record["time"].isoformat(),
"level": record["level"].name,
"message": record["message"],
"module": record["module"],
"function": record["function"],
"line": record["line"],
}
# Добавляем extra поля
for key, value in record["extra"].items():
log_entry[key] = value
print(json.dumps(log_entry))
logger.add(json_sink, level="INFO", format="{message}")
# Ротация логов для production
logger.add(
"logs/app.log",
level="DEBUG",
rotation="10 MB", # Максимальный размер файла
retention="7 days", # Хранить 7 дней
compression="zip", # Сжимать старые логи
backtrace=True, # Показывать полный traceback
diagnose=True, # Показывать значения переменных
enqueue=True # Thread-safe очередь (важно для production!)
)
# Разные уровни для разных файлов
logger.add(
"logs/errors.log",
level="ERROR",
rotation="1 day",
backtrace=True,
diagnose=True
)
# Фильтрация по модулям
logger.add(
sys.stdout,
level="DEBUG",
filter=lambda record: record["module"] in ("main", "api")
)
# Environment-based уровень логирования
import os
logger.add(
sys.stdout,
level=os.getenv("LOG_LEVEL", "INFO").upper()
)💡 Production Tip: Используйте
enqueue=Trueдля thread-safe логирования в многопоточных приложениях. Это добавляет небольшую задержку, но предотвращает гонки данных.
from loguru import logger
import json
from datetime import datetime
# Production-ready JSON sink для cloud-native логирования
def json_sink(message):
record = message.record
log_entry = {
"timestamp": record["time"].isoformat(),
"level": record["level"].name,
"message": record["message"],
"module": record["module"],
"function": record["function"],
"line": record["line"],
"thread": record["thread"].id,
"process": record["process"].id,
}
# Добавляем extra поля
for key, value in record["extra"].items():
log_entry[key] = value
# Добавляем контекст трассировки (если есть)
from opentelemetry import trace
span = trace.get_current_span()
if span and span.is_recording():
ctx = span.get_span_context()
log_entry["trace_id"] = format(ctx.trace_id, "032x")
log_entry["span_id"] = format(ctx.span_id, "016x")
print(json.dumps(log_entry))
logger.remove()
logger.add(json_sink, serialize=True, level="INFO")
logger.info("User logged in", user_id=123)Вывод (для ELK/Loki/Datadog):
{
"timestamp": "2026-03-21T10:00:00.123456",
"level": "INFO",
"message": "User logged in",
"module": "main",
"function": "main",
"line": 10,
"thread": 140234567890,
"process": 12345,
"user_id": 123,
"trace_id": "442d81cb25de382054575e33c1a659df",
"span_id": "da8405273d89b065"
}💡 Cloud Tip: Для Grafana Loki используйте метки в JSON (level, service, environment) — это ускорит поиск в 10-100 раз.
from fastapi import FastAPI, Request
from loguru import logger
import uuid
import time
app = FastAPI()
# Middleware для correlation ID и трассировки
@app.middleware("http")
async def log_requests(request: Request, call_next):
request_id = str(uuid.uuid4())
start_time = time.monotonic()
# Создаём logger с контекстом
req_logger = logger.bind(
request_id=request_id,
method=request.method,
path=request.url.path,
client_ip=request.client.host if request.client else "unknown"
)
req_logger.info("Request started")
try:
response = await call_next(request)
duration_ms = (time.monotonic() - start_time) * 1000
req_logger.info(
"Request completed",
status_code=response.status_code,
duration_ms=round(duration_ms, 2)
)
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
req_logger.exception("Request failed", error=str(e))
raise⚠️ Compliance Alert: Для GDPR, HIPAA, PCI DSS необходимо маскировать персональные данные в логах.
from loguru import logger
import re
class PIIMaskingFilter:
"""Фильтр для автоматического маскирования PII"""
PATTERNS = {
'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
'credit_card': re.compile(r'\b(?:\d[ -]*?){13,19}\b'),
'phone': re.compile(r'\b\d{10,15}\b'),
'ssn': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
}
MASKS = {
'email': '[EMAIL_REDACTED]',
'credit_card': '[CC_REDACTED]',
'phone': '[PHONE_REDACTED]',
'ssn': '[SSN_REDACTED]',
}
def __call__(self, record):
# Маскирование в message
if isinstance(record["message"], str):
for key, pattern in self.PATTERNS.items():
record["message"] = pattern.sub(self.MASKS[key], record["message"])
# Маскирование в extra полях
for key, value in record["extra"].items():
if isinstance(value, str):
for pattern_key, pattern in self.PATTERNS.items():
record["extra"][key] = pattern.sub(self.MASKS[pattern_key], value)
return True
# Добавляем фильтр
logger.add(
"logs/app.log",
level="INFO",
filter=PIIMaskingFilter()
)
# Теперь даже если вы случайно залогируете email, он будет замаскирован
logger.info("User email: {email}", email="user@example.com")
# Вывод: User email: [EMAIL_REDACTED]import pytest
from loguru import logger
from io import StringIO
@pytest.fixture
def log_capture():
"""Фикстура для захвата логов в тестах"""
buffer = StringIO()
logger.add(buffer, level="DEBUG", format="{message}")
yield buffer
logger.remove()
def test_info_logging(log_capture):
logger.info("Test message")
assert "Test message" in log_capture.getvalue()
def test_context_logging(log_capture):
logger.bind(user_id=123).info("User action")
log_output = log_capture.getvalue()
assert "User action" in log_output
assert "123" in log_output✅ Production Choice: Structlog — лучший выбор для production систем в 2025-2026. Отличная производительность (~8K записей/сек), иммутабельный контекст, полная интеграция с OpenTelemetry и cloud-native инструментами.
pip install structlogimport structlog
# Конфигурация (обычно в начале приложения)
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer()
]
)
log = structlog.get_logger()
# Structured logging из коробки
log.info("user_logged_in", user_id=123, ip="192.168.1.1")
log.error("payment_failed", order_id=100500, error="insufficient_funds")Вывод:
{"event": "user_logged_in", "user_id": 123, "ip": "192.168.1.1", "level": "info", "timestamp": "2026-03-21T10:00:00.123456"}
{"event": "payment_failed", "order_id": 100500, "error": "insufficient_funds", "level": "error", "timestamp": "2026-03-21T10:00:01.234567"}import structlog
log = structlog.get_logger()
# bind() возвращает иммутабельный logger с контекстом
request_log = log.bind(request_id="abc-123", user_id=42)
request_log.info("request_started", method="GET", path="/api/users")
request_log.info("database_query", query="SELECT * FROM users")
request_log.info("request_completed", status=200)
# Исходный logger не изменён
log.info("another_event") # Без request_idimport structlog
import logging
# Настройка стандартного logging для handlers
logging.basicConfig(
format="%(message)s",
level=logging.INFO,
handlers=[
logging.StreamHandler(),
# Для production добавьте RotatingFileHandler
# logging.handlers.RotatingFileHandler(
# "app.log",
# maxBytes=10*1024*1024,
# backupCount=5
# )
]
)
# Production-ready конфигурация structlog
structlog.configure(
processors=[
# 1. Фильтрация по уровню (интеграция с logging)
structlog.stdlib.filter_by_level,
# 2. Добавление имени логгера
structlog.stdlib.add_logger_name,
# 3. Добавление log level
structlog.stdlib.add_log_level,
# 4. Форматирование позиционных аргументов
structlog.stdlib.PositionalArgumentsFormatter(),
# 5. Добавление timestamp
structlog.processors.TimeStamper(fmt="iso"),
# 6. Добавление информации о вызове (файл, строка, функция)
structlog.processors.CallerInfoProcessor(),
# 7. Обработка stack info (для warning и выше)
structlog.processors.StackInfoRenderer(),
# 8. Форматирование исключений
structlog.processors.format_exc_info,
# 9. Unicode decoder для строк
structlog.processors.UnicodeDecoder(),
# 10. Финальный рендерер (JSON для production, Console для dev)
# structlog.dev.ConsoleRenderer() # Для разработки
structlog.processors.JSONRenderer() # Для production
],
# Контекст: dict для простоты или OrderedDict для порядка ключей
context_class=dict,
# Factory для создания логгеров
logger_factory=structlog.stdlib.LoggerFactory(),
# Wrapper для интеграции со stdlib
wrapper_class=structlog.stdlib.BoundLogger,
# Кэширование для production (ускоряет до 30%)
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
# Примеры использования
log.info("user_logged_in", user_id=123, ip="192.168.1.1")
log.warning("rate_limit_approaching", current=90, threshold=100)
log.error("payment_failed", order_id=100500, error="insufficient_funds")
try:
risky_operation()
except Exception:
log.exception("risky_operation_failed")Production JSON вывод:
{
"event": "user_logged_in",
"user_id": 123,
"ip": "192.168.1.1",
"level": "info",
"timestamp": "2026-03-21T10:00:00.123456Z",
"logger": "__main__",
"filename": "main.py",
"lineno": 42
}💡 Performance Tip: Порядок процессоров важен! Размещайте
filter_by_levelпервым для отсечения ненужных логов до дорогостоящих операций.
Проблема: Нужно использовать structlog с существующими библиотеками, которые используют logging.
Решение: structlog.stdlib для интеграции.
import logging
import structlog
# Настройка стандартного logging
logging.basicConfig(
format="%(message)s",
level=logging.INFO,
handlers=[
logging.StreamHandler(),
logging.handlers.RotatingFileHandler(
"app.log",
maxBytes=10*1024*1024,
backupCount=5
)
]
)
# Настройка structlog с интеграцией
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Теперь structlog логи идут через стандартный logging
log = structlog.get_logger()
log.info("user_logged_in", user_id=123)
# И стандартный logging работает
logging.info("Standard logging message")🔗 Critical for Microservices: Интеграция с OpenTelemetry добавляет
trace_idиspan_idв каждый лог для сквозной трассировки.
import structlog
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.logging import LoggingInstrumentor
# 1. Настройка OpenTelemetry
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(
endpoint="http://localhost:4318/v1/traces"
))
)
# 2. Автоматическая инъекция trace_id в логи
LoggingInstrumentor().instrument(set_logging_format=True)
# 3. Процессор для добавления trace_id в structlog
def add_trace_ids(logger, method_name, event_dict):
"""Добавляет trace_id и span_id из текущего спана"""
span = trace.get_current_span()
if span and span.is_recording():
ctx = span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict
# 4. Конфигурация structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
add_trace_ids, # ← Добавляет trace_id/span_id
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
tracer = trace.get_tracer(__name__)
# 5. Логирование внутри спана
def process_order(order_id):
with tracer.start_as_current_span("process_order"):
log.info("Order processing started", order_id=order_id)
# ... бизнес-логика ...
log.info("Order completed", order_id=order_id)
process_order("ord_123")JSON вывод с trace_id:
{
"event": "Order processing started",
"order_id": "ord_123",
"level": "info",
"timestamp": "2026-03-21T10:00:00.123456Z",
"trace_id": "442d81cb25de382054575e33c1a659df",
"span_id": "da8405273d89b065"
}💡 Microservices Tip: В Grafana Loki используйте
trace_idдля поиска всех логов по распределённому трейсу across all services!
☁️ 12-Factor App: Логируйте в
stdout, делегируйте сбор логов платформе (Promtail, Fluentd, Filebeat).
import structlog
import logging
import sys
import os
# Для Kubernetes: JSON формат в stdout
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
# Уровень логирования из переменной окружения
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.getLogger().setLevel(log_level)
log.info("Application started",
service=os.getenv("SERVICE_NAME", "unknown"),
environment=os.getenv("ENVIRONMENT", "unknown"),
version=os.getenv("APP_VERSION", "unknown"))Kubernetes Deployment с аннотациями:
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-service
spec:
template:
metadata:
annotations:
promtail.io/scrape: "true"
promtail.io/parser: "json"
prometheus.io/scrape: "true"
spec:
containers:
- name: app
image: my-app:latest
env:
- name: LOG_LEVEL
value: "info"
- name: SERVICE_NAME
value: "my-service"
- name: ENVIRONMENT
value: "production"
- name: APP_VERSION
value: "1.2.3"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "500m"Проблема: Большой проект использует logging, нужна постепенная миграция.
Решение: recreate_defaults() для совместимости API.
import structlog
# Создаёт API совместимый с logging
structlog.stdlib.recreate_defaults()
log = structlog.get_logger()
# API совместим с logging
log.info("Message with extra", extra={"user_id": 123})
# Можно использовать bind() для контекста
log = log.bind(request_id="abc-123")
log.info("Request processed")import structlog
import asyncio
log = structlog.get_logger()
async def process_request(request_id: str):
# Контекст автоматически изолирован для async задачи
request_log = log.bind(request_id=request_id)
request_log.info("request_started")
await asyncio.sleep(0.1)
request_log.info("request_completed")
async def main():
# Параллельные задачи с изолированным контекстом
await asyncio.gather(
process_request("req-1"),
process_request("req-2"),
process_request("req-3")
)
asyncio.run(main())# logging
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("User logged in")
# loguru
from loguru import logger
logger.info("User logged in")
# structlog
import structlog
log = structlog.get_logger()
log.info("user_logged_in")# logging
import logging
logger = logging.getLogger(__name__)
adapter = logging.LoggerAdapter(logger, {"user_id": 123})
adapter.info("Request processed")
# loguru
from loguru import logger
logger.bind(user_id=123).info("Request processed")
# structlog
import structlog
log = structlog.get_logger().bind(user_id=123)
log.info("request_processed")# logging
import logging
logger = logging.getLogger(__name__)
try:
risky_operation()
except Exception:
logger.exception("Operation failed")
# loguru
from loguru import logger
@logger.catch
def safe_operation():
risky_operation()
# structlog
import structlog
log = structlog.get_logger()
try:
risky_operation()
except Exception:
log.exception("operation_failed")# logging
from pythonjsonlogger import jsonlogger
import logging
handler = logging.StreamHandler()
handler.setFormatter(jsonlogger.JsonFormatter())
logger = logging.getLogger()
logger.addHandler(handler)
logger.info("User logged in", extra={"user_id": 123})
# loguru
from loguru import logger
import json
def json_sink(message):
print(json.dumps(message.record))
logger.remove()
logger.add(json_sink)
logger.info("User logged in", user_id=123)
# structlog
import structlog
structlog.configure(processors=[structlog.processors.JSONRenderer()])
log = structlog.get_logger()
log.info("user_logged_in", user_id=123)# Before (logging)
import logging
logger = logging.getLogger(__name__)
logger.info("User logged in", extra={"user_id": 123})
# After (loguru)
from loguru import logger
logger.info("User logged in", user_id=123)# Before (logging)
import logging
logger = logging.getLogger(__name__)
logger.info("User logged in", extra={"user_id": 123})
# After (structlog)
import structlog
log = structlog.get_logger()
log.info("user_logged_in", user_id=123) # snake_case event name# Используем structlog с интеграцией logging
import structlog
structlog.configure(
processors=[
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
)
# Новый код использует structlog
log = structlog.get_logger()
log.info("new_feature", user_id=123)
# Старый код продолжает использовать logging
import logging
logger = logging.getLogger(__name__)
logger.info("Old code still works")# ✅ Рекомендуется: structlog для production
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer()
]
)
log = structlog.get_logger()
log.info("user_action", user_id=123, action="purchase")# ✅ Рекомендуется: стандартный logging (нет зависимостей)
import logging
logger = logging.getLogger(__name__)
logger.info("Library initialized")# ✅ Рекомендуется: loguru (минимум кода)
from loguru import logger
logger.info("Quick prototype")request_id, user_id, trace_id, span_idX-Request-ID для трассировки запросовservice, environment, version в каждом логеisEnabledFor() перед тяжёлыми вычислениямиcache_logger_on_first_use=True для structlog| ❌ Антипаттерн | ✅ Правильное решение |
|---|---|
print() вместо logging | logging.getLogger(__name__).info() |
| Логирование в цикле | Агрегация: logger.info("Processed %d items", len(items)) |
logger.error(f"{e}") | logger.exception("operation failed") |
| Логирование секретов | Маскирование или исключение чувствительных данных |
| Запись в файлы в контейнерах | Логирование в stdout для Docker/K8s |
| Корневой логгер | logging.getLogger(__name__) |
| Хардкод уровней | Переменные окружения: LOG_LEVEL |
| Избыточное логирование | Сэмплирование для high-traffic endpoints |
import pytest
import logging
import structlog
from structlog.testing import capture_logs
def test_info_logging(caplog):
caplog.set_level(logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Test message")
assert "Test message" in caplog.text
def test_structured_logging():
with capture_logs() as cap_logs:
log = structlog.get_logger()
log.info("user_logged_in", user_id=123)
assert len(cap_logs) == 1
assert cap_logs[0]['event'] == 'user_logged_in'
assert cap_logs[0]['user_id'] == 123| Библиотека | Пропускная способность | Память | CPU Overhead | Рекомендация |
|---|---|---|---|---|
| logging | ~10,000 записей/сек | ~38-43 MB | <5% | High-load production |
| structlog | ~8,000 записей/сек | ~52 MB | ~7% | Production с structured logging |
| loguru | ~6,000 записей/сек | ~83 MB | ~10% | Прототипы, скрипты |
Бенчмарк условия: Сервер среднего уровня, RotatingFileHandler, JSON форматирование.
| Ситуация | Рекомендация |
|---|---|
| Библиотека | logging (нет зависимостей) |
| Существующий проект | logging или gradual migration to structlog |
| Новый production проект | structlog |
| Прототип/скрипт | loguru |
| Structured logging нужен | structlog или loguru |
| Максимальная совместимость | logging |
| Microservices / Cloud-Native | structlog + OpenTelemetry |
| High-Load (>1000 RPS) | logging или structlog с queue handlers |
| Compliance (GDPR, HIPAA, PCI DSS) | structlog или logging с PII filters |
| Фича | logging | structlog | loguru |
|---|---|---|---|
| Конфигурация | Сложная | Средняя | Простая |
| Structured logging | С настройкой | ✅ Из коробки | ✅ Из коробки |
| Контекст | LoggerAdapter | bind() | bind() |
| Иммутабельность | ❌ | ✅ | ✅ |
| Зависимости | 0 | 1 | 1 |
| OpenTelemetry | ✅ | ✅ Отличная | ❌ |
| Cloud-Native готовность | ⚠️ | ✅ | ⚠️ |
| Async поддержка | ❌ | ✅ | ⚠️ |
structlog.stdlib.LoggerFactory для постепенной миграцииВопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.