Best practices, performance, security, troubleshooting
Production-логирование требует внимания к производительности, безопасности, надёжности и удобству отладки.
Цель этого руководства: Сделать ваше логирование наблюдаемым, безопасным и производительным в любых условиях.
Проблема: Разные требования к логированию в development и production.
Решение: Настройка уровня в зависимости от окружения.
# Development: DEBUG для отладки
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.debug("SQL query: %s", query)
logger.debug("Response: %s", response)# Production: INFO для событий, ERROR для проблем
logging.basicConfig(
level=logging.INFO, # Не DEBUG!
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.info("Request processed") # ✅
logger.debug("SQL query: %s", query) # ❌ Не выводитсяimport os
import logging.config
import yaml
env = os.getenv('APP_ENV', 'production')
config_file = f'logging.{env}.yaml'
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
logging.config.dictConfig(config)# logging.development.yaml
version: 1
disable_existing_loggers: false
handlers:
console:
class: logging.StreamHandler
level: DEBUG
loggers:
app:
level: DEBUG
handlers: [console]
root:
level: DEBUG
handlers: [console]# logging.production.yaml
version: 1
disable_existing_loggers: false
handlers:
console:
class: logging.StreamHandler
level: INFO
file:
class: logging.handlers.RotatingFileHandler
level: DEBUG
filename: /var/log/app/app.log
maxBytes: 104857600
backupCount: 10
loggers:
app:
level: DEBUG
handlers: [console, file]
root:
level: INFO
handlers: [console, file]Проблема: Логирование может замедлить приложение.
Решение: Оптимизация для высокой производительности.
Проблема: Форматирование строки выполняется даже если уровень отключён.
Решение: Проверка перед форматированием.
# ❌ Плохо: строка формируется всегда
logger.debug(f"Processing {len(items)} items: {items}")
# ✅ Хорошо: проверка перед форматированием
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Processing %d items: %s", len(items), items)Benchmark:
Без isEnabledFor: 10000 вызовов = 0.5s (строки формируются)
С isEnabledFor: 10000 вызовов = 0.001s (строки не формируются)
Проблема: Запись в файл/сеть блокирует основной поток.
Решение: QueueHandler + QueueListener.
import logging.handlers
import queue
log_queue = queue.Queue(maxsize=10000)
queue_handler = logging.handlers.QueueHandler(log_queue)
file_handler = logging.handlers.RotatingFileHandler('app.log')
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
logger = logging.getLogger(__name__)
logger.addHandler(queue_handler)
# Логи не блокируют основной поток# ❌ Плохо: логирование в цикле
for item in items:
logger.debug(f"Processing {item}")
process(item)
# ✅ Хорошо: агрегированное логирование
logger.debug("Processing %d items", len(items))
for item in items:
process(item)
logger.debug("Completed processing %d items", len(items))Проблема: Логи могут содержать чувствительные данные.
Решение: Фильтрация и маскирование.
# ❌ Никогда не логируйте:
logger.info(f"Password: {password}")
logger.info(f"API Key: {api_key}")
logger.info(f"Token: {token}")
logger.info(f"Credit Card: {card_number}")
logger.info(f"SSN: {ssn}")
logger.info(f"Email: {user_email}") # Персональные данныеimport logging
import re
class SensitiveDataFilter(logging.Filter):
"""Маскирует чувствительные данные в логах."""
PATTERNS = [
(re.compile(r'password["\']?\s*[:=]\s*["\']?[\w@#$%^&*]+', re.I), 'password=***'),
(re.compile(r'api[_-]?key["\']?\s*[:=]\s*["\']?[\w]+', re.I), 'api_key=***'),
(re.compile(r'token["\']?\s*[:=]\s*["\']?[\w.-]+', re.I), 'token=***'),
(re.compile(r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'), '****-****-****-****'), # CC
(re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), '***-**-****'), # SSN
]
def filter(self, record):
msg = record.getMessage()
for pattern, replacement in self.PATTERNS:
msg = pattern.sub(replacement, msg)
# Обновляем сообщение в record
record.msg = msg
record.args = ()
return True
logger = logging.getLogger(__name__)
logger.addFilter(SensitiveDataFilter())
# Даже если попытаться залогировать пароль:
logger.info(f"User login with password={password}")
# Вывод: User login with password=***Проблема: Злоумышленник может вставить newline в лог.
# ❌ Уязвимость: log injection
user_input = "admin\nWARNING: Unauthorized access detected\n"
logger.info(f"User: {user_input}")
# Вывод:
# User: admin
# WARNING: Unauthorized access detected
#
# Теперь в логах фальшивое предупреждение!Решение: Удаление newline.
import logging
class SanitizeFilter(logging.Filter):
"""Удаляет newline для предотвращения log injection."""
def filter(self, record):
# Sanitize все строковые аргументы
record.msg = str(record.msg).replace('\n', '').replace('\r', '')
if record.args:
record.args = tuple(
str(arg).replace('\n', '').replace('\r', '')
if isinstance(arg, str) else arg
for arg in record.args
)
return True
logger.addFilter(SanitizeFilter())Проблема: Логи занимают всё место на диске.
Решение: RotatingFileHandler с правильными настройками.
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
'/var/log/app/app.log',
maxBytes=100 * 1024 * 1024, # 100 MB
backupCount=10, # 10 backup файлов
encoding='utf-8',
delay=True
)
# Максимум: 100 MB * 11 = 1.1 GBfrom logging.handlers import TimedRotatingFileHandler
# Ротация каждый день, хранение 30 дней
handler = TimedRotatingFileHandler(
'/var/log/app/app.log',
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)# /etc/logrotate.d/app
/var/log/app/*.log {
daily
rotate 30
compress
delaycompress
missingok
notifempty
create 0644 app app
sharedscripts
postrotate
# Сигнал приложению для reopening файлов
systemctl reload app
endscript
}# WatchedFileHandler для работы с logrotate
from logging.handlers import WatchedFileHandler
handler = WatchedFileHandler('/var/log/app/app.log')Проблема: Нужно понимать, к какому запросу относится лог.
Решение: Correlation ID и контекст.
import logging
import uuid
from contextvars import ContextVar
from fastapi import Request
correlation_id_var = ContextVar('correlation_id', default='')
class CorrelationIdFilter(logging.Filter):
def filter(self, record):
record.correlation_id = correlation_id_var.get()
return True
logger = logging.getLogger(__name__)
logger.addFilter(CorrelationIdFilter())
async def correlation_middleware(request: Request, call_next):
correlation_id = request.headers.get(
'X-Correlation-ID',
str(uuid.uuid4())
)
correlation_id_var.set(correlation_id)
logger.info("Request started", extra={
"method": request.method,
"path": request.url.path
})
response = await call_next(request)
response.headers['X-Correlation-ID'] = correlation_id
logger.info("Request completed", extra={
"status_code": response.status_code
})
return responsefrom pythonjsonlogger import jsonlogger
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s '
'%(correlation_id)s %(user_id)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)Вывод:
{
"asctime": "2026-03-17 10:00:00,123",
"name": "app.api",
"levelname": "INFO",
"message": "Request completed",
"correlation_id": "abc-123",
"user_id": 42,
"status_code": 200
}Проблема: Библиотека не должна настраивать logging.
Решение: Только getLogger, без handlers.
# ✅ Хорошо: библиотека
# mylibrary/core.py
import logging
logger = logging.getLogger(__name__)
def do_something():
logger.debug("Doing something")
# ... код
# Приложение настраивает logging
# app.py
import logging
import mylibrary
logging.basicConfig(level=logging.INFO)
mylibrary.do_something()# ❌ Плохо: библиотека настраивает logging
# mylibrary/core.py
import logging
logging.basicConfig(level=logging.DEBUG) # ❌
logger = logging.getLogger(__name__)Проблема: Нужно включить DEBUG логирование без перезапуска.
Решение: Admin endpoint + dictConfig incremental.
from flask import Flask, request, jsonify
import logging.config
app = Flask(__name__)
@app.post('/admin/logging')
def update_logging():
data = request.json
level = data.get('level', 'INFO').upper()
logging.config.dictConfig({
'version': 1,
'incremental': True,
'loggers': {
'app': {'level': level}
}
}, incremental=True)
return jsonify({'status': 'ok', 'level': level})
# Использование:
# curl -X POST http://localhost:8000/admin/logging \
# -H "Content-Type: application/json" \
# -d '{"level": "DEBUG"}'Проблема: Нужно правильно фиксировать ошибки для быстрого debugging.
Решение: Использовать exception() и exc_info.
# ❌ Плохо: теряется traceback
try:
process_data()
except Exception as e:
logger.error(f"Error: {e}") # Только сообщение, нет стека
# ✅ Хорошо: полный traceback
try:
process_data()
except Exception:
logger.exception("Failed to process data") # Автоматически добавляет traceback
# или
logger.error("Failed to process data", exc_info=True)Вывод:
ERROR:app:Failed to process data
Traceback (most recent call last):
File "app.py", line 42, in <module>
process_data()
File "app.py", line 15, in process_data
result = data['key']
KeyError: 'key'
try:
user_id = 123
process_user(user_id)
except Exception:
logger.exception(
"Failed to process user",
extra={
"user_id": user_id,
"operation": "process_user",
}
)JSON вывод:
{
"levelname": "ERROR",
"message": "Failed to process user",
"user_id": 123,
"operation": "process_user",
"exc_info": "Traceback (most recent call last):...",
"timestamp": "2026-03-24T10:00:00.123Z"
}Проблема: Ошибка логируется в catch блоке и в Sentry/d Sentry.
Решение: Разделяйте логирование и алертинг.
from sentry_sdk import capture_exception
try:
process_data()
except Exception:
# Логируем для истории
logger.exception("Data processing failed")
# Отправляем в Sentry для алертинга
capture_exception()
# Возвращаем пользователю безопасное сообщение
raise HTTPException(status_code=500, detail="Internal error")import logging
import traceback
from typing import Optional
class DetailedExceptionFilter(logging.Filter):
"""Добавляет детальную информацию об исключениях."""
def filter(self, record: logging.LogRecord) -> bool:
if record.exc_info:
exc_type, exc_value, exc_tb = record.exc_info
if exc_type:
# Добавляем тип исключения в сообщение
record.exc_type_name = exc_type.__name__
record.exc_message = str(exc_value)
# Извлекаем последнюю строку traceback
tb_lines = traceback.format_exception(*record.exc_info)
record.last_tb_line = tb_lines[-1].strip() if tb_lines else ""
return True
logger = logging.getLogger(__name__)
logger.addFilter(DetailedExceptionFilter())
# Formatter с использованием extra полей
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(exc_type_name)s: %(exc_message)s - %(last_tb_line)s'
)Проблема: Ошибка в цикле заполняет логи миллионами записей.
Решение: RateLimitFilter.
import logging
import time
from collections import defaultdict
from threading import Lock
class RateLimitFilter(logging.Filter):
"""Ограничивает частоту логирования одинаковых сообщений."""
def __init__(self, max_per_second: int = 10):
self.max_per_second = max_per_second
self._counts: dict[str, list[float]] = defaultdict(list)
self._lock = Lock()
def filter(self, record: logging.LogRecord) -> bool:
now = time.time()
msg = record.getMessage()
with self._lock:
# Очищаем старые записи
self._counts[msg] = [
t for t in self._counts[msg] if now - t < 1.0
]
# Проверяем лимит
if len(self._counts[msg]) >= self.max_per_second:
return False
self._counts[msg].append(now)
return True
logger = logging.getLogger(__name__)
logger.addFilter(RateLimitFilter(max_per_second=5))
# Даже если вызвать 1000 раз в секунду, пройдёт только 5 логов
for i in range(1000):
logger.error("Database connection failed")Проблема: Распространённые ошибки, которые ухудшают логирование.
# ❌ Плохо: непонятно что произошло
logger.error("Failed")
# ✅ Хорошо: полное описание
logger.error(
"Failed to fetch user data from external API",
extra={
"user_id": user_id,
"api_endpoint": api_url,
"request_id": request_id,
"status_code": response.status_code,
}
)# ❌ Плохо: бесконечная рекурсия или спам
class User:
def __repr__(self):
logger.debug(f"Creating repr for {self.id}") # ❌
return f"User({self.id})"
# ✅ Хорошо: никаких побочных эффектов
class User:
def __repr__(self):
return f"User({self.id})"# ❌ Плохо: логирование мегабайтных данных
logger.debug(f"Response: {response.content}") # Может быть 100MB+
# ✅ Хорошо: логирование метаданных
logger.debug(
"Response received",
extra={
"content_length": len(response.content),
"content_type": response.headers.get("Content-Type"),
"status_code": response.status_code,
}
)# ❌ Плохо: логирование в критической секции
def process_transaction(tx):
with transaction_lock:
logger.debug("Processing transaction %s", tx.id) # Замедляет критическую секцию
# ... обработка
# ✅ Хорошо: логирование вне критической секции
def process_transaction(tx):
logger.debug("Processing transaction %s", tx.id)
with transaction_lock:
# ... обработка# ❌ Плохо: print нельзя контролировать
print(f"Debug: {value}")
# ✅ Хорошо: logging с уровнями
logger.debug("Value: %s", value)# ❌ Плохо: невозможно отключить логирование отдельного модуля
logger = logging.getLogger("app")
# ✅ Хорошо: модульный logger
logger = logging.getLogger(__name__) # app.module.submodule# ❌ Плохо: спам при retry
for i in range(10):
logger.error("Connection failed, retrying...")
connect()
# ✅ Хорошо: агрегирование или backoff
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(5), wait=wait_exponential())
def connect():
...
# Или логирование только первой и последней попытки
for i in range(max_retries):
try:
connect()
break
except Exception:
if i == 0:
logger.warning("Connection failed, will retry")
elif i == max_retries - 1:
logger.exception("All retries exhausted")Проблема: Логи бесполезны, если их не анализировать.
Решение: Интеграция с ELK, CloudWatch, Datadog, Sentry.
from pythonjsonlogger import jsonlogger
import logging
import sys
class CustomJsonFormatter(jsonlogger.JsonFormatter):
"""Кастомный formatter для ELK."""
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
# Переименовываем поля для совместимости с ELK
log_record['@timestamp'] = log_record.get('asctime')
log_record['@version'] = '1'
log_record['logger_name'] = log_record.get('name')
# Удаляем дубли
log_record.pop('asctime', None)
log_record.pop('name', None)
handler = logging.StreamHandler(sys.stdout)
formatter = CustomJsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s %(filename)s %(lineno)d'
)
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)Вывод для ELK:
{
"@timestamp": "2026-03-24T10:00:00.123",
"@version": "1",
"logger_name": "app.api.users",
"levelname": "INFO",
"message": "User created",
"filename": "users.py",
"lineno": 42,
"user_id": 123,
"level": "INFO"
}import boto3
from watchtower import CloudWatchLogHandler
handler = CloudWatchLogHandler(
log_group='my-app',
log_stream='production',
boto3_client=boto3.client('logs', region_name='us-east-1'),
use_queues=True, # Асинхронная отправка
max_batch_size=100,
max_batch_count=10,
)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)from datadog_handler.handlers import DatadogHandler
handler = DatadogHandler(
api_key='YOUR_API_KEY',
service='my-app',
source='python',
tags=['environment:production', 'version:1.2.3']
)
logger = logging.getLogger(__name__)
logger.addHandler(handler)import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
# Отправляем в Sentry только WARNING и выше
sentry_logging = LoggingIntegration(
level=logging.INFO, # Отправлять события начиная с INFO
event_level=logging.ERROR # Создавать event начиная с ERROR
)
sentry_sdk.init(
dsn="https://your-dsn@sentry.io/123",
integrations=[sentry_logging],
traces_sample_rate=1.0,
)
# Теперь logger.error() автоматически создаст событие в Sentry
logger.error("Something went wrong")from prometheus_client import Counter, Histogram
import logging
# Метрики
log_counter = Counter('app_log_total', 'Total log messages', ['level', 'module'])
error_counter = Counter('app_errors_total', 'Total errors', ['error_type'])
class MetricsFilter(logging.Filter):
"""Собирает метрики из логов."""
def filter(self, record: logging.LogRecord) -> bool:
# Считаем логи по уровням
log_counter.labels(level=record.levelname, module=record.name).inc()
# Считаем ошибки по типам
if record.exc_info:
error_type = record.exc_info[0].__name__
error_counter.labels(error_type=error_type).inc()
return True
logger = logging.getLogger(__name__)
logger.addFilter(MetricsFilter())Проблема: В Kubernetes/cloud environment логи должны следовать 12-factor principles.
Решение: Логирование в stdout/stderr + централизованный сбор.
import logging
import sys
# ✅ Хорошо: только stdout/stderr
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s %(levelname)s [%(name)s] %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S%z'
)
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)Kubernetes manifest:
apiVersion: v1
kind: Pod
metadata:
name: my-app
spec:
containers:
- name: app
image: my-app:latest
# Логи собираются через stdout
resources:
limits:
ephemeral-storage: 1Gi # Ограничение на дисковое пространство# fluent-bit.conf
[INPUT]
Name tail
Path /var/log/containers/*.log
Parser docker
Tag kube.*
Refresh_Interval 5
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
[OUTPUT]
Name es
Match *
Host elasticsearch.logging.svc
Port 9200
Logstash_Format True
Logstash_Prefix app-logsfrom google.cloud import logging as gc_logging
client = gc_logging.Client(project='my-project')
logger = client.logger('my-app')
# Прямая отправка в Cloud Logging
logger.log_text("Application started", severity="INFO")
# Или через Python logging
handler = client.get_handler()
handler.setLevel(logging.INFO)
import logging
logging.getLogger().addHandler(handler)from opencensus.ext.azure.log_exporter import AzureLogHandler
handler = AzureLogHandler(
connection_string='InstrumentationKey=your-key'
)
handler.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Логи автоматически появляются в Application Insights
logger.info('Request processed', extra={
'custom_dimensions': {
'user_id': '123',
'request_id': 'abc-123'
}
})Проблема: Нужно убедиться, что логирование работает корректно.
Решение: Использовать test handlers и assertions.
import logging
import logging.handlers
import unittest
class TestLogging(unittest.TestCase):
def setUp(self):
# MemoryHandler для захвата логов
self.memory_handler = logging.handlers.MemoryHandler(
capacity=100,
flushLevel=logging.DEBUG
)
self.memory_handler.setFormatter(
logging.Formatter('%(levelname)s - %(message)s')
)
self.logger = logging.getLogger('test')
self.logger.addHandler(self.memory_handler)
self.logger.setLevel(logging.DEBUG)
def tearDown(self):
self.logger.removeHandler(self.memory_handler)
self.memory_handler.close()
def test_info_log(self):
self.logger.info("Test message")
# Проверяем наличие сообщения
logs = self.memory_handler.buffer
self.assertTrue(
any('Test message' in record.getMessage() for record in logs)
)
self.assertTrue(
any(record.levelname == 'INFO' for record in logs)
)import logging
import pytest
def test_logging_with_caplog(caplog):
"""pytest автоматически захватывает логи."""
logger = logging.getLogger('app')
# Устанавливаем уровень для захвата
caplog.set_level(logging.INFO)
logger.info("Test message")
logger.debug("Debug message") # Не захватится
# Проверяем наличие сообщения
assert "Test message" in caplog.text
assert "Debug message" not in caplog.text
# Проверяем записи
assert len(caplog.records) == 1
assert caplog.records[0].levelname == 'INFO'import json
import logging
import pytest
from pythonjsonlogger import jsonlogger
@pytest.fixture
def json_logger(caplog):
"""Fixture для тестирования JSON логов."""
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logger = logging.getLogger('test_json')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Перенаправляем stdout для захвата
import io
from contextlib import redirect_stdout
f = io.StringIO()
with redirect_stdout(f):
yield logger, f
handler.close()
def test_json_log_format(json_logger):
logger, output = json_logger
logger.info("User created", extra={"user_id": 123})
logs = output.getvalue().strip()
log_entry = json.loads(logs)
assert log_entry['message'] == "User created"
assert log_entry['user_id'] == 123
assert 'asctime' in log_entryimport logging
import pytest
def test_sensitive_data_filter(caplog):
from app.logging.filters import SensitiveDataFilter
caplog.set_level(logging.INFO)
caplog.handler.setFormatter(logging.Formatter('%(message)s'))
# Добавляем фильтр
logging.getLogger().addFilter(SensitiveDataFilter())
password = "super_secret_123"
logging.getLogger().info(f"User login with password={password}")
# Проверяем, что пароль замаскирован
assert "password=***" in caplog.text
assert "super_secret_123" not in caplog.textimport logging
import logging.handlers
import queue
import pytest
class LogCapture:
"""Helper для захвата логов в тестах."""
def __init__(self):
self.records = []
self.handler = logging.Handler()
self.handler.emit = self.capture
def capture(self, record):
self.records.append(record)
def __enter__(self):
logging.getLogger().addHandler(self.handler)
return self
def __exit__(self, *args):
logging.getLogger().removeHandler(self.handler)
def test_async_logging():
"""Тест асинхронного логирования."""
log_queue = queue.Queue()
queue_handler = logging.handlers.QueueHandler(log_queue)
# Захватываем из очереди
captured = []
listener = logging.handlers.QueueListener(
log_queue,
logging.Handler() # Кастомный handler
)
# ... настройка listener
logger = logging.getLogger('app')
logger.addHandler(queue_handler)
logger.info("Async message")
# Проверяем, что сообщение в очереди
assert not log_queue.empty()
record = log_queue.get()
assert record.getMessage() == "Async message"Проблема: logger.info() ничего не выводит.
Диагностика:
# Проверка handlers
logger = logging.getLogger(__name__)
print(f"Handlers: {logger.handlers}")
print(f"Level: {logger.level}")
print(f"Effective level: {logger.getEffectiveLevel()}")
# Проверка root logger
root = logging.getLogger()
print(f"Root handlers: {root.handlers}")
print(f"Root level: {root.level}")Решение:
# Добавить handler
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)Проблема: Каждое сообщение выводится дважды.
Причина: propagation + handler в root и logger.
Решение:
# Отключить propagation
logger = logging.getLogger(__name__)
logger.propagate = False
# Или убрать handler из root
logging.getLogger().handlers.clear()Проблема: extra поля не выводятся.
Решение: Добавить поля в formatter.
# Formatter должен включать extra поля
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s - user_id=%(user_id)s'
)
# Или использовать JsonFormatter
from pythonjsonlogger import jsonlogger
formatter = jsonlogger.JsonFormatter()Проблема: Файл остаётся пустым.
Причины и решения:
# 1. Проверить права на запись
import os
os.access('/var/log/app', os.W_OK) # Должно вернуть True
# 2. Проверить путь (использовать абсолютный)
handler = logging.FileHandler('/var/log/app/app.log')
# 3. Проверить flush
handler.flush() # Принудительная запись
# 4. Использовать delay=False для немедленного создания файла
handler = logging.FileHandler('app.log', delay=False)Проблема: Вместо русских букв ``.
Решение: Указать encoding='utf-8'.
# ❌ Плохо
handler = logging.FileHandler('app.log')
# ✅ Хорошо
handler = logging.FileHandler('app.log', encoding='utf-8')
# Для StreamHandler
import sys
handler = logging.StreamHandler(sys.stdout)
handler.stream.reconfigure(encoding='utf-8') # Python 3.7+Проблема: Queue full, логи теряются.
Решение: Обработать переполнение.
import logging.handlers
import queue
log_queue = queue.Queue(maxsize=10000)
class ErrorHandler(logging.Handler):
"""Handler для потерянных логов."""
def emit(self, record):
print(f"LOST LOG: {record.getMessage()}")
queue_handler = logging.handlers.QueueHandler(log_queue)
queue_listener = logging.handlers.QueueListener(
log_queue,
logging.FileHandler('app.log'),
respect_handler_level=True
)
# При переполнении QueueHandler отправит в резервный handler
queue_handler.handleError = lambda record: ErrorHandler().emit(record)
queue_listener.start()Проблема: Fluentd/Docker не видит логи.
Решение: Логировать в stdout, проверить Docker driver.
# Убедиться, что логи идут в stdout
handler = logging.StreamHandler(sys.stdout)
# Docker compose должен использовать json-file или journald driver
# docker-compose.yml
version: '3'
services:
app:
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"Проблема: Логирование замедляет приложение.
Диагностика:
import cProfile
import logging
# Профилирование
cProfile.run('logger.debug("Test %s", data)', 'profile.stats')
# Проверка времени записи
import time
start = time.time()
for i in range(1000):
logger.info("Test")
print(f"Time: {time.time() - start}s")Решение:
python-json-logger — JSON форматированиеstructlog — Structured loggingwatchtower — CloudWatch handlersentry-sdk — Error trackingtenacity — Retry с логированиемНастройте логирование для FastAPI приложения:
Реализуйте SensitiveDataFilter который маскирует:
xe***@example.com)+7XXX1234567)Реализуйте асинхронное логирование с:
Напишите тесты для:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.