JSON логирование, кастомные Formatter, интеграция с ELK Stack
Структурированное логирование (JSON) — стандарт для production-приложений. Машиночитаемый формат упрощает анализ в ELK Stack.
Проблема: Текстовые логи сложно парсить и анализировать.
# Текстовый лог
2026-03-17 10:00:00,123 - app.api - INFO - User 123 logged in from 192.168.1.1
# Как извлечь user_id и IP?
# Нужен regex: r"User (\d+) logged in from ([\d.]+)"Проблемы:
Решение: Структурированный формат (JSON).
{
"timestamp": "2026-03-17T10:00:00.123456",
"level": "INFO",
"logger": "app.api",
"message": "User logged in",
"user_id": 123,
"ip": "192.168.1.1",
"request_id": "abc-xyz-123"
}Преимущества:
Проблема: Стандартный Formatter выводит текст.
Решение: Библиотека python-json-logger для JSON форматирования.
pip install python-json-loggerimport logging
from pythonjsonlogger import jsonlogger
logger = logging.getLogger(__name__)
# JSON Formatter
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
logger.info("User logged in", extra={"user_id": 123, "ip": "192.168.1.1"})Вывод:
{"asctime": "2026-03-17 10:00:00,123", "name": "app.api", "levelname": "INFO", "message": "User logged in", "user_id": 123, "ip": "192.168.1.1"}Проблема: Нужно добавить timestamp в ISO формате и исключить лишние поля.
Решение: Наследование от JsonFormatter.
from pythonjsonlogger import jsonlogger
from datetime import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
# ISO формат timestamp
log_record['timestamp'] = datetime.utcnow().isoformat()
# Переименовать level
log_record['level'] = log_record.get('levelname')
# Исключить лишние поля
log_record.pop('levelname', None)
log_record.pop('name', None)
# Использование
formatter = CustomJsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
handler.setFormatter(formatter)Вывод:
{
"timestamp": "2026-03-17T10:00:00.123456",
"level": "INFO",
"message": "User logged in",
"user_id": 123,
"ip": "192.168.1.1"
}Проблема: datetime, Decimal, UUID не сериализуются в JSON по умолчанию.
Решение: Кастомный JSON encoder.
import json
from datetime import datetime, date
from decimal import Decimal
from uuid import UUID
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, UUID):
return str(obj)
if isinstance(obj, set):
return list(obj)
return super().default(obj)
# Кастомный Formatter с encoder
class EncodedJsonFormatter(jsonlogger.JsonFormatter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._encoder = CustomEncoder()
def process_log_record(self, log_record):
# Сериализация сложных объектов
for key, value in log_record.items():
try:
json.dumps(value) # Проверка на сериализуемость
except (TypeError, ValueError):
log_record[key] = self._encoder.default(value)
return super().process_log_record(log_record)Проблема: Нужно добавлять контекст (user_id, request_id) ко всем логам.
Решение: LoggerAdapter + JSON Formatter.
import logging
from pythonjsonlogger import jsonlogger
# Настройка JSON logger
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Адаптер с контекстом
adapter = logging.LoggerAdapter(
logger,
extra={
"request_id": "abc-xyz",
"user_id": 123
}
)
# Контекст добавляется автоматически
adapter.info("Request started")
adapter.info("Processing data", extra={"records": 100})Вывод:
{"timestamp": "...", "level": "INFO", "message": "Request started", "request_id": "abc-xyz", "user_id": 123}
{"timestamp": "...", "level": "INFO", "message": "Processing data", "request_id": "abc-xyz", "user_id": 123, "records": 100}Проблема: Нужно централизованное хранение и анализ логов.
Решение: ELK Stack (Elasticsearch, Logstash, Kibana).
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Python App │ │ Logstash/ │ │ Elasticsearch │
│ JSON logs │────►│ Filebeat │────►│ (хранение) │
└──────────────────┘ └──────────────────┘ └────────┬─────────┘
│
▼
┌──────────────────┐
│ Kibana │
│ (визуализация) │
└──────────────────┘
import logging
import logging.handlers
from pythonjsonlogger import jsonlogger
# JSON Formatter
formatter = jsonlogger.JsonFormatter()
# SocketHandler для отправки в Logstash
socket_handler = logging.handlers.SocketHandler(
host='logstash.example.com',
port=5000
)
socket_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(socket_handler)
logger.setLevel(logging.INFO)
logger.info("User action", extra={"user_id": 123, "action": "purchase"})# logstash.conf
input {
tcp {
port => 5000
codec => json_lines
}
}
filter {
# Парсинг timestamp
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# GeoIP для IP адресов
geoip {
source => "ip"
target => "geoip"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}Примеры запросов в Kibana:
# Все логи пользователя
user_id: 123
# Ошибки за последний час
level: ERROR AND @timestamp > now-1h
# Среднее время ответа по endpoint'ам
avg(duration) by endpoint
# Топ пользователей по количеству запросов
count by user_idПроблема: Не хочется использовать Logstash.
Решение: Кастомный Elasticsearch handler.
pip install elasticsearchimport logging
from elasticsearch import Elasticsearch
from datetime import datetime
class ElasticsearchHandler(logging.Handler):
def __init__(self, es_hosts, index_prefix='app-logs'):
super().__init__()
self.es = Elasticsearch(es_hosts)
self.index_prefix = index_prefix
def emit(self, record):
try:
# Формирование документа
doc = {
'@timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
}
# Добавление extra полей
for key, value in record.__dict__.items():
if key not in ('name', 'msg', 'args', 'created', 'filename',
'funcName', 'levelname', 'levelno', 'lineno',
'module', 'msecs', 'pathname', 'process',
'processName', 'relativeCreated', 'thread',
'threadName', 'message', 'asctime', 'exc_info',
'exc_text', 'stack_info'):
doc[key] = value
# Индекс по дате
index = f"{self.index_prefix}-{datetime.utcnow().strftime('%Y.%m.%d')}"
# Отправка в Elasticsearch
self.es.index(index=index, document=doc)
except Exception:
self.handleError(record)
# Использование
es_handler = ElasticsearchHandler(['localhost:9200'])
logger = logging.getLogger(__name__)
logger.addHandler(es_handler)
logger.info("User action", extra={"user_id": 123, "action": "purchase"})Проблема: JSON сериализация и отправка в сеть блокирует основной поток.
Решение: QueueHandler + QueueListener.
import logging
import logging.handlers
import queue
from pythonjsonlogger import jsonlogger
# Очередь для логов
log_queue = queue.Queue(maxsize=1000)
# QueueHandler (в основном потоке)
queue_handler = logging.handlers.QueueHandler(log_queue)
# JSON Formatter
json_formatter = jsonlogger.JsonFormatter()
# File handler (в отдельном потоке)
file_handler = logging.FileHandler('app.json.log')
file_handler.setFormatter(json_formatter)
# QueueListener (отдельный поток)
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
# Настройка logger
logger = logging.getLogger(__name__)
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)
# Логи не блокируют основной поток
logger.info("User action", extra={"user_id": 123})# ✅ Хорошо: JSON для production
from pythonjsonlogger import jsonlogger
formatter = jsonlogger.JsonFormatter()
# ❌ Плохо: текст для production
formatter = logging.Formatter('%(asctime)s - %(message)s')# ✅ Хорошо
logger.info("Action completed", extra={"user_id": 123, "request_id": "abc"})
# ❌ Плохо: контекст в message
logger.info(f"Action completed for user {user_id}")# ✅ Хорошо: фильтр чувствительных данных
class SensitiveFilter(logging.Filter):
def filter(self, record):
# Исключаем поля с паролями
if hasattr(record, 'password'):
record.password = '***'
return True
logger.addFilter(SensitiveFilter())
# ❌ Плохо: логирование паролей
logger.info("Login attempt", extra={"password": password})# ✅ Хорошо: ISO формат (машиночитаемый)
timestamp = datetime.utcnow().isoformat()
# ❌ Плохо: кастомный формат
timestamp = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.