QueueHandler, QueueListener, multiprocessing логирование
Асинхронное логирование переносит запись логов в отдельный поток, не блокируя основное приложение.
Проблема: Запись логов блокирует основной поток.
import logging
logger = logging.getLogger(__name__)
# ❌ Блокировка на I/O
logger.info("Processing request") # Запись в файл/сеть блокирует поток
# ↓
# Пока лог записывается в файл, приложение ждётПроблемы:
Решение: Асинхронное логирование через очередь.
┌─────────────────────────────────────────────────────────────┐
│ Асинхронное логирование │
└─────────────────────────────────────────────────────────────┘
Основной поток Поток логирования
│ │
│ logger.info() │
│ │ │
│ ▼ │
│ QueueHandler │
│ │ │
│ ▼ │
│ Queue.put() ───────────────► Queue.get()
│ (быстро, не блокирует) │ │
│ │ ▼
│ Продолжает работу │ Real Handler
│ │ (запись в файл)
│ │ (медленно, но в
│ │ отдельном потоке)
import logging
import logging.handlers
import queue
# Очередь для логов
log_queue = queue.Queue(maxsize=1000)
# QueueHandler в основном потоке
queue_handler = logging.handlers.QueueHandler(log_queue)
# Реальный handler в отдельном потоке
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
# QueueListener в отдельном потоке
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
# Настройка logger
logger = logging.getLogger(__name__)
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)
# Логи не блокируют основной поток
logger.info("Processing request") # Быстро отправляет в очередь
# Корректное завершение
listener.stop()# 1. logger.info() создаёт LogRecord
# 2. QueueHandler.emit() отправляет LogRecord в очередь
# queue.put_nowait(record) # Не блокирует
# 3. Основной поток продолжает работу
# 4. QueueListener в отдельном потоке:
# - while True: record = queue.get()
# - for handler in handlers: handler.emit(record)
# 5. Реальный handler записывает в файл/сетьПроблема: Нужна надёжная конфигурация для production.
Решение: Фабричная функция с правильными настройками.
import logging
import logging.handlers
import queue
from typing import List
def setup_async_logging(
queue_size: int = 10000,
log_file: str = 'app.log',
max_bytes: int = 100 * 1024 * 1024,
backup_count: int = 10,
level: int = logging.INFO
) -> logging.handlers.QueueListener:
"""Настройка асинхронного логирования для production."""
# Очередь
log_queue = queue.Queue(maxsize=queue_size)
# QueueHandler
queue_handler = logging.handlers.QueueHandler(log_queue)
# File handler с ротацией
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8',
delay=True
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s - '
'%(filename)s:%(lineno)d - %(process)d'
))
file_handler.setLevel(level)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(levelname)s: %(message)s'
))
console_handler.setLevel(logging.INFO)
# QueueListener с несколькими handlers
listener = logging.handlers.QueueListener(
log_queue,
file_handler,
console_handler,
respect_handler_level=True # Учитывать уровни handlers
)
listener.start()
# Настройка root logger
root_logger = logging.getLogger()
root_logger.addHandler(queue_handler)
root_logger.setLevel(logging.DEBUG)
return listener
# Использование
listener = setup_async_logging()
logger = logging.getLogger(__name__)
logger.info("Application started")
# Корректное завершение
# listener.stop()Проблема: Нужно логировать из нескольких процессов.
Решение: QueueHandler + QueueListener в главном процессе.
import logging
import logging.handlers
import multiprocessing
from multiprocessing import Queue
def setup_multiprocessing_logging():
"""Настройка логирования для multiprocessing."""
# Очередь (multiprocessing-safe)
log_queue = Queue(-1) # Без ограничения
# QueueHandler для дочерних процессов
queue_handler = logging.handlers.QueueHandler(log_queue)
# Listener в главном процессе
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s - '
'process=%(process)d'
))
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
return queue_handler, listener
def worker_process(queue_handler):
"""Worker процесс с логированием."""
logger = logging.getLogger(__name__)
# Удаление старых handlers
logger.handlers.clear()
# Добавление QueueHandler
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)
logger.info(f"Worker {multiprocessing.current_process().name} started")
# Обработка
for i in range(10):
logger.debug(f"Processing item {i}")
logger.info(f"Worker {multiprocessing.current_process().name} completed")
if __name__ == '__main__':
queue_handler, listener = setup_multiprocessing_logging()
# Запуск процессов
processes = []
for i in range(4):
p = multiprocessing.Process(
target=worker_process,
args=(queue_handler,),
name=f'Worker-{i}'
)
p.start()
processes.append(p)
# Ожидание завершения
for p in processes:
p.join()
# Остановка listener
listener.stop()┌─────────────────────────────────────────────────────────────┐
│ Multiprocessing логирование │
└─────────────────────────────────────────────────────────────┘
Process 1 (Worker-0) Process 2 (Worker-1) Main Process
│ │ │
│ logger.info() │ │
│ │ │ │
│ ▼ │ │
│ QueueHandler │ │
│ │ │ │
│ └──────────────────┼──────────────────────┤
│ │ │
│ │ logger.info() │
│ │ │ │
│ │ ▼ │
│ │ QueueHandler │
│ │ │ │
│ │ │ │
└─────────────────────────┼──────┤ │
│ │ │
▼ ▼ │
multiprocessing.Queue │
│ │
│ │
▼ │
QueueListener ◄─────────┘
│
▼
File Handler
(запись в файл)
Проблема: Нужно логировать в asyncio приложении без блокировки event loop.
Решение: QueueHandler + интеграция в event loop.
import asyncio
import logging
import logging.handlers
import queue
# Настройка
log_queue = queue.Queue(maxsize=1000)
queue_handler = logging.handlers.QueueHandler(log_queue)
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
logger = logging.getLogger(__name__)
logger.addHandler(queue_handler)
async def process_request(request_id):
"""Обработка запроса с логированием."""
logger.info(f"Request {request_id} started")
# Имитация работы
await asyncio.sleep(0.1)
logger.info(f"Request {request_id} completed")
async def main():
# Запуск множества задач
tasks = [
process_request(f"req-{i}")
for i in range(100)
]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
# Остановка listener
listener.stop()Проблема: QueueListener в отдельном потоке может создавать проблемы с asyncio.
Решение: Использовать watch() для интеграции в event loop.
import asyncio
import logging
import logging.handlers
import queue
log_queue = queue.Queue()
queue_handler = logging.handlers.QueueHandler(log_queue)
file_handler = logging.FileHandler('app.log')
listener = logging.handlers.QueueListener(log_queue, file_handler)
logger = logging.getLogger(__name__)
logger.addHandler(queue_handler)
async def process_logs():
"""Periodic processing of log queue."""
while True:
try:
# Обработка одного сообщения
listener.watch()
except queue.Empty:
pass
await asyncio.sleep(0.1) # Проверка каждые 100ms
async def main():
# Запуск processing задачи
log_task = asyncio.create_task(process_logs())
# Основное приложение
logger.info("Application started")
await asyncio.sleep(1)
logger.info("Application stopped")
# Остановка
log_task.cancel()
try:
await log_task
except asyncio.CancelledError:
pass
asyncio.run(main())Проблема: При аварийном завершении логи могут потеряться.
Решение: Корректный shutdown с обработкой ошибок.
import logging
import logging.handlers
import queue
import signal
import sys
class AsyncLoggingManager:
"""Менеджер асинхронного логирования с graceful shutdown."""
def __init__(self):
self.log_queue = queue.Queue(maxsize=10000)
self.queue_handler = logging.handlers.QueueHandler(self.log_queue)
self.file_handler = logging.handlers.RotatingFileHandler(
'app.log',
maxBytes=100*1024*1024,
backupCount=10
)
self.file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.listener = logging.handlers.QueueListener(
self.log_queue,
self.file_handler,
respect_handler_level=True
)
self._shutdown = False
def start(self):
self.listener.start()
logger = logging.getLogger()
logger.addHandler(self.queue_handler)
logger.setLevel(logging.DEBUG)
# Регистрация signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
logger = logging.getLogger(__name__)
logger.info(f"Received signal {signum}, shutting down...")
self._shutdown = True
self.stop()
sys.exit(0)
def stop(self):
"""Корректная остановка логирования."""
if self.listener:
# Остановка listener
self.listener.stop()
# Ожидание обработки всех сообщений
try:
while not self.log_queue.empty():
pass
except Exception:
pass
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
# Использование
with AsyncLoggingManager():
logger = logging.getLogger(__name__)
while True:
logger.info("Processing...")
# ... работа приложенияПроблема: Если QueueListener не успевает записывать логи (медленный I/O), а приложение генерирует их быстро — очередь переполняется. При этом возникает queue.Full, и логи теряются.
Решение: Перехватывать исключение и вести счётчик потерь.
import logging
import logging.handlers
import queue
import sys
import threading
class SafeQueueHandler(logging.handlers.QueueHandler):
"""QueueHandler с обработкой переполнения."""
def __init__(self, queue):
super().__init__(queue)
self.dropped_count = 0
self._lock = threading.Lock()
def enqueue(self, record):
try:
super().enqueue(record)
except queue.Full:
# Очередь полна — лог теряется
with self._lock:
self.dropped_count += 1
# Предупреждение каждые 100 потерянных логов
if self.dropped_count % 100 == 0:
# Пишем в stderr, чтобы не создавать новую очередь
sys.stderr.write(f"Warning: Dropped {self.dropped_count} log records\n")
def get_dropped_count(self):
"""Получить счётчик для мониторинга."""
with self._lock:
return self.dropped_count
# Использование
log_queue = queue.Queue(maxsize=1000)
handler = SafeQueueHandler(log_queue)dropped_count?Это счётчик потерянных логов. Каждая запись — один лог, который не удалось поместить в очередь.
dropped_count?| Значение | Что означает | Действие |
|---|---|---|
0 | Всё в порядке | Ничего не нужно |
| Растёт медленно | Пиковые нагрузки | Увеличьте maxsize очереди |
| Растёт быстро | QueueListener не успевает | Оптимизируйте handler или уменьшите уровень логирования |
| Растёт постоянно | Система не справляется | Масштабируйте логирование (отдельный сервис, буфер) |
# 1. Увеличьте размер очереди
log_queue = queue.Queue(maxsize=10000) # Было 1000
# 2. Уменьшите уровень логирования
logging.getLogger().setLevel(logging.WARNING) # Вместо DEBUG
# 3. Используйте буферизованный FileHandler
handler = logging.handlers.MemoryHandler(
capacity=1000,
flushLevel=logging.ERROR,
target=file_handler
)
# 4. Мониторинг в production
if handler.get_dropped_count() > 0:
# Отправить метрику в Prometheus/StatsD
metrics.increment('logs.dropped', handler.get_dropped_count())import logging
import logging.handlers
import time
import queue
# Синхронное логирование
sync_logger = logging.getLogger('sync')
sync_handler = logging.FileHandler('sync.log')
sync_logger.addHandler(sync_handler)
sync_logger.setLevel(logging.INFO)
# Асинхронное логирование
async_queue = queue.Queue(maxsize=10000)
async_handler = logging.handlers.QueueHandler(async_queue)
async_logger = logging.getLogger('async')
async_logger.addHandler(async_handler)
async_logger.setLevel(logging.INFO)
async_file_handler = logging.FileHandler('async.log')
async_listener = logging.handlers.QueueListener(async_queue, async_file_handler)
async_listener.start()
# Benchmark
N = 10000
# Синхронное
start = time.time()
for i in range(N):
sync_logger.info(f"Message {i}")
sync_time = time.time() - start
# Асинхронное
start = time.time()
for i in range(N):
async_logger.info(f"Message {i}")
async_time = time.time() - start
print(f"Синхронное: {sync_time:.3f}s ({N/sync_time:.0f} msg/s)")
print(f"Асинхронное: {async_time:.3f}s ({N/async_time:.0f} msg/s)")
print(f"Ускорение: {sync_time/async_time:.1f}x")
# Остановка
async_listener.stop()Результат (типичный):
Синхронное: 2.500s (4000 msg/s)
Асинхронное: 0.100s (100000 msg/s)
Ускорение: 25x
# ✅ Хорошо: достаточно большая очередь
queue.Queue(maxsize=10000)
# ❌ Плохо: слишком маленькая
queue.Queue(maxsize=10) # Быстро переполнится
# ❌ Плохо: слишком большая (много памяти)
queue.Queue(maxsize=1000000)# ✅ Хорошо
listener.start()
try:
# работа приложения
finally:
listener.stop()
# ❌ Плохо: listener не остановлен
listener.start()
# работа приложения
# listener остаётся висеть# ✅ Хорошо: учитывает уровни handlers
listener = logging.handlers.QueueListener(
queue, handler1, handler2,
respect_handler_level=True
)
# ❌ Плохо: все handlers получают все сообщения
listener = logging.handlers.QueueListener(queue, handler1, handler2)import threading
# ✅ Хорошо: обработка Full exception + счётчик + thread-safe
class SafeQueueHandler(logging.handlers.QueueHandler):
def __init__(self, queue):
super().__init__(queue)
self.dropped_count = 0
self._lock = threading.Lock()
def enqueue(self, record):
try:
super().enqueue(record)
except queue.Full:
with self._lock:
self.dropped_count += 1
# ❌ Плохо: нет обработки, логи теряются silently
queue_handler = logging.handlers.QueueHandler(log_queue)Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.