Логи как потоки событий, агрегация и маршрутизация
Рассматривайте логи как потоки событий
Logs — одиннадцатый фактор 12-Factor App. Принцип гласит:
Логи — это потоки событий, упорядоченных по времени. Приложение не должно управлять хранением логов, а должно писать в stdout/stderr.**
┌─────────────────────────────────────────────────────────────────┐
│ Поток логов (stream) │
│ │
│ Приложение ──▶ stdout/stderr ──▶ Агрегатор ──▶ Хранилище │
│ │
│ Приложение только пишет логи │
│ Агрегатор собирает, маршрутизирует, хранит │
└─────────────────────────────────────────────────────────────────┘
# ❌ Неправильно: приложение управляет файлами логов
import logging
from logging.handlers import RotatingFileHandler
logger = logging.getLogger(__name__)
# Настройка ротации файлов
handler = RotatingFileHandler(
'/var/log/myapp/app.log',
maxBytes=10*1024*1024, # 10 MB
backupCount=5
)
logger.addHandler(handler)
@app.route('/api/users')
def get_users():
logger.info(f"User request from {request.remote_addr}")
# ...Проблемы:
# ✅ Правильно: логи в stdout/stderr
import logging
import sys
# Настройка логирования в stdout
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@app.route('/api/users')
def get_users():
logger.info(f"User request from {request.remote_addr}")
# ...┌──────────────────────────────────────────────────────────────┐
│ Docker контейнер │
│ ┌────────────┐ │
│ │ Приложение │ │
│ │ │ │
│ │ stdout ────┼────────────────────────▶ Docker daemon │
│ │ stderr ────┼────────────────────────▶ Docker daemon │
│ └────────────┘ │
└──────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────┐
│ Log Aggregator │
│ (ELK, Splunk, etc.) │
└───────────────────────┘
Преимущества:
import logging
logger = logging.getLogger(__name__)
# DEBUG — детальная информация для отладки
logger.debug("Database query: SELECT * FROM users WHERE id = %s", user_id)
# INFO — подтверждение нормальной работы
logger.info("User %s logged in successfully", user_id)
# WARNING — неожиданное событие, но приложение работает
logger.warning("Rate limit approaching for user %s: %d/1000", user_id, count)
# ERROR — ошибка, но приложение продолжает работу
logger.error("Failed to send email to %s: %s", email, str(exc))
# CRITICAL — серьёзная ошибка, приложение не может работать
logger.critical("Database connection lost: %s", str(exc))# Development
LOG_LEVEL = DEBUG # Все логи
# Staging
LOG_LEVEL = INFO # Информация о работе
# Production
LOG_LEVEL = WARNING # Только предупреждения и ошибкиimport logging
import json
import sys
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""Форматирование логов в JSON"""
def format(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Добавление контекста
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
if hasattr(record, 'request_id'):
log_entry['request_id'] = record.request_id
# Добавление исключения
if record.exc_info:
log_entry['exception'] = {
'type': record.exc_info[0].__name__ if record.exc_info[0] else None,
'message': str(record.exc_info[1]) if record.exc_info[1] else None,
'traceback': self.formatException(record.exc_info)
}
return json.dumps(log_entry)
# Настройка
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Использование
logger.info(
"User login successful",
extra={'user_id': 123, 'request_id': 'req-abc-123'}
)Вывод:
{
"timestamp": "2024-03-24T10:30:45.123Z",
"level": "INFO",
"logger": "app.auth",
"message": "User login successful",
"module": "auth",
"function": "login",
"line": 45,
"user_id": 123,
"request_id": "req-abc-123"
}from contextvars import ContextVar
import logging
# Контекстные переменные для запроса
request_id_ctx = ContextVar('request_id', default=None)
user_id_ctx = ContextVar('user_id', default=None)
class ContextFilter(logging.Filter):
"""Добавление контекста к каждому логу"""
def filter(self, record):
record.request_id = request_id_ctx.get()
record.user_id = user_id_ctx.get()
return True
# Настройка
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.addFilter(ContextFilter())
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
# Middleware для добавления контекста
from flask import request, g
@app.before_request
def set_request_context():
request_id_ctx.set(request.headers.get('X-Request-ID', str(uuid.uuid4())))
if hasattr(g, 'user_id'):
user_id_ctx.set(g.user_id)
# Теперь каждый лог автоматически содержит request_id и user_id
logger.info("Processing request") # Автоматически добавит контекст# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Логи пишутся в stdout/stderr по умолчанию
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "5000:5000"
environment:
- LOG_LEVEL=INFO
- LOG_FORMAT=json
# Docker по умолчанию собирает stdout/stderr
# и отправляет в logging driver
logging:
driver: json-file
options:
max-size: "10m"
max-file: "3"# json-file (по умолчанию)
logging:
driver: json-file
options:
max-size: "10m"
max-file: "3"
# syslog
logging:
driver: syslog
options:
syslog-address: "udp://logs.example.com:514"
# journald
logging:
driver: journald
# splunk
logging:
driver: splunk
options:
splunk-token: "xxx-xxx-xxx"
splunk-url: "https://splunk.example.com:8088"
# fluentd
logging:
driver: fluentd
options:
fluentd-address: "localhost:24224"┌─────────────────────────────────────────────────────────────────┐
│ ELK Stack Architecture │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ App 1 │ │ App 2 │ │ App 3 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────┼───────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Filebeat/ │ │
│ │ Fluentd │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Logstash │ (парсинг, обогащение) │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Elasticsearch│ (индексирование, хранение) │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Kibana │ (визуализация, поиск) │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044"
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
filebeat:
image: docker.elastic.co/beats/filebeat:8.11.0
volumes:
- ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
depends_on:
- logstash
volumes:
elasticsearch_data:# filebeat.yml
filebeat.inputs:
- type: container
paths:
- /var/lib/docker/containers/*/*.log
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/lib/docker/containers/"
output.logstash:
hosts: ["logstash:5044"]# logstash/pipeline/logstash.conf
input {
beats {
port => 5044
}
}
filter {
# Парсинг JSON логов
json {
source => "message"
}
# Добавление временной метки
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# Фильтрация по уровням
if [level] == "ERROR" {
mutate {
add_tag => ["error"]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
# Отправка ошибок в Slack
if "error" in [tags] {
slack {
url => "https://hooks.slack.com/services/xxx/yyy/zzz"
message => "Error: %{message}"
}
}
}import logging
import sys
# Логирование приложения
app_logger = logging.getLogger('app')
app_handler = logging.StreamHandler(sys.stdout)
app_handler.setFormatter(JSONFormatter())
app_logger.addHandler(app_handler)
app_logger.setLevel(logging.INFO)
# Логирование аудита (отдельный поток)
audit_logger = logging.getLogger('audit')
audit_handler = logging.StreamHandler(sys.stdout)
audit_handler.setFormatter(JSONFormatter())
audit_logger.addHandler(audit_handler)
audit_logger.setLevel(logging.INFO)
# Разные логи для разных целей
app_logger.info("User logged in") # Обычный лог
audit_logger.info("User login", extra={
'user_id': 123,
'event_type': 'login',
'ip_address': request.remote_addr
}) # Аудит# Logstash для маршрутизации
output {
# Все логи в Elasticsearch
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
# Аудит логи в отдельный индекс
if [logger] == "audit" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "audit-logs-%{+YYYY.MM.dd}"
}
}
# Ошибки в Slack
if [level] == "ERROR" {
slack {
url => "https://hooks.slack.com/services/xxx"
message => "Error in %{logger}: %{message}"
}
}
# Критические ошибки в PagerDuty
if [level] == "CRITICAL" {
pagerduty {
service_key => "xxx"
description => "%{message}"
}
}
}# ❌ Неправильно: логи в файлы
logging.basicConfig(
filename='/var/log/app.log', # Файл в контейнере
level=logging.INFO
)Проблема: При удалении контейнера логи теряются. В Kubernetes с ephemeral storage логи недоступны после перезапуска пода.
# ❌ Неправильно: логирование секретов
logger.info(f"User login: {username}, password: {password}")
logger.info(f"API request with key: {api_key}")
logger.info(f"Credit card: {card_number}")Решение:
# ✅ Правильно: маскирование секретов
def mask_sensitive(value, visible_chars=4):
if len(value) <= visible_chars:
return '*' * len(value)
return '*' * (len(value) - visible_chars) + value[-visible_chars:]
logger.info(f"User login: {username}, password: {'*' * len(password)}")
logger.info(f"API key: {mask_sensitive(api_key)}")# ❌ Неправильно: слишком много логов
for item in items:
logger.debug(f"Processing item {item.id}: {item.name}")
logger.debug(f"Item price: {item.price}")
logger.debug(f"Item stock: {item.stock}")Проблема: Миллионы строк логов в секунду, сложно найти важное, высокие затраты на хранение.
# ❌ Неправильно: неструктурированные логи
logger.info(f"User {user_id} from {ip} bought {count} items for ${total}")Решение:
# ✅ Правильно: структурированные логи
logger.info("Purchase completed", extra={
'user_id': user_id,
'ip_address': ip,
'item_count': count,
'total_amount': total,
'event_type': 'purchase'
})Задайте себе вопросы:
# ❌ Логирование в файлы
import logging
logging.basicConfig(
filename='/var/log/myapp/app.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)Проблемы:
# ✅ Логирование в stdout с JSON форматом
import logging
import sys
import json
class JSONFormatter(logging.Formatter):
def format(self, record):
log = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
}
if record.exc_info:
log['exception'] = self.formatException(record.exc_info)
return json.dumps(log)
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info("Application started")# docker-compose.yml с агрегацией
services:
web:
build: .
logging:
driver: json-file
options:
max-size: "10m"
max-file: "3"
elasticsearch:
image: elasticsearch:8.11.0
logstash:
image: logstash:8.11.0
volumes:
- ./pipeline:/usr/share/logstash/pipeline
kibana:
image: kibana:8.11.0Результат:
Ключевой вывод: Логи — это потоки событий, упорядоченных по времени. Приложение должно писать в stdout/stderr, а агрегатор собирает, маршрутизирует и хранит логи. Это обеспечивает сохранность, централизацию и возможность анализа логов в распределённых системах.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.