Быстрый запуск и graceful shutdown для отказоустойчивости
Обеспечьте быстрый запуск и graceful shutdown
Disposability — девятый фактор 12-Factor App. Принцип гласит:
Процессы 12-Factor приложения должны запускаться быстро и завершаться корректно (graceful shutdown) для обеспечения отказоустойчивости и эластичного масштабирования.
┌──────────────────────────────────────────────────────────────┐
│ Быстрый запуск │
│ [Инициализация] ──────▶ [Готов к работе] │
│ 2 секунды < 5 секунд │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ Graceful Shutdown │
│ [SIGTERM] ──▶ [Завершение запросов] ──▶ [Закрытие соединений] ──▶ [Exit] │
│ (до 30 секунд) (очистка ресурсов) │
└──────────────────────────────────────────────────────────────┘
# ❌ Неправильно: медленная инициализация
import time
def initialize():
# Загрузка больших данных в память
load_massive_cache() # 30 секунд
# Установка множества соединений
for i in range(100):
create_database_connection() # 60 секунд
# Компиляция шаблонов
compile_all_templates() # 20 секунд
initialize() # 110 секунд до готовности!Проблемы:
# ✅ Правильно: быстрая инициализация и корректное завершение
import signal
import sys
from flask import Flask
app = Flask(__name__)
# Глобальное состояние для graceful shutdown
shutdown_requested = False
def initialize():
"""Быстрая инициализация — только необходимое"""
# Ленивая загрузка кэша — по требованию
# Подключения к БД — пул, создаётся по требованию
print("Initialized in < 2 seconds")
def graceful_shutdown(signum, frame):
"""Обработка SIGTERM для корректного завершения"""
global shutdown_requested
shutdown_requested = True
print("Shutdown requested, finishing current requests...")
# Завершение текущих запросов
# Закрытие соединений с БД
# Очистка ресурсов
sys.exit(0)
# Регистрация обработчиков сигналов
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)
initialize()# ❌ Неправильно: всё загружается при старте
import redis
import psycopg2
# Создание соединений при импорте
redis_client = redis.Redis(host='redis', port=6379)
db_conn = psycopg2.connect('postgresql://user:pass@db/app')
# Загрузка кэша при старте
cache = {}
for key in redis_client.keys('*'):
cache[key] = redis_client.get(key) # Долго!# ✅ Правильно: ленивая инициализация
import redis
import psycopg2
from functools import lru_cache
class Database:
def __init__(self):
self._conn = None
@property
def conn(self):
"""Ленивое создание соединения"""
if self._conn is None:
self._conn = psycopg2.connect(
'postgresql://user:pass@db/app'
)
return self._conn
def close(self):
"""Закрытие соединения при shutdown"""
if self._conn:
self._conn.close()
db = Database() # Быстро — соединение ещё не создано
# Использование — соединение создаётся при первом запросе
# db.conn.execute(...)# ❌ Неправильно: импорт тяжёлых библиотек при старте
import numpy # 100+ МБ, долгий импорт
import pandas # Ещё дольше
import tensorflow # Очень долго
@app.route('/predict')
def predict():
# Библиотеки нужны только здесь
pass# ✅ Правильно: отложенный импорт
@app.route('/predict')
def predict():
# Импорт внутри функции — только при первом вызове
import numpy as np
import pandas as pd
# Логика предсказания
pass
# Или ещё лучше — отдельный воркер для ML# ❌ Неправильно: предзагрузка всего при старте
def preload_all_data():
"""Загружает всё в память — медленно"""
users = db.query("SELECT * FROM users") # Миллионы записей
products = db.query("SELECT * FROM products")
orders = db.query("SELECT * FROM orders")
# 60+ секунд до готовности# ✅ Правильно: ленивое кэширование с TTL
from functools import lru_cache
import time
@lru_cache(maxsize=1000)
def get_user(user_id):
"""Кэширование по требованию"""
return db.query("SELECT * FROM users WHERE id = ?", user_id)
@lru_cache(maxsize=100)
def get_product(product_id):
"""Кэширование по требованию"""
return db.query("SELECT * FROM products WHERE id = ?", product_id)
# Старт за < 2 секунды, кэш заполняется по мере запросовimport signal
import sys
import time
from flask import Flask, g
from threading import Lock
app = Flask(__name__)
# Счётчик активных запросов
active_requests = 0
requests_lock = Lock()
@app.before_request
def count_request_start():
global active_requests
with requests_lock:
active_requests += 1
@app.after_request
def count_request_end(response):
global active_requests
with requests_lock:
active_requests -= 1
return response
def graceful_shutdown(signum, frame):
"""Корректное завершение работы"""
print(f"Received signal {signum}, initiating shutdown...")
# Перестать принимать новые запросы
# (веб-сервер делает это автоматически)
# Подождать завершения текущих запросов
timeout = 30 # секунд
start_time = time.time()
while active_requests > 0:
if time.time() - start_time > timeout:
print(f"Timeout: {active_requests} requests still active")
break
time.sleep(0.1)
# Закрыть соединения с БД
close_database_connections()
# Очистить ресурсы
cleanup_resources()
print("Shutdown complete")
sys.exit(0)
# Регистрация обработчиков
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)# gunicorn_config.py
import signal
import sys
def pre_fork(server, worker):
"""Вызывается перед форком воркера"""
print(f"Worker {worker.pid} starting...")
def post_fork(server, worker):
"""Вызывается после форка воркера"""
print(f"Worker {worker.pid} started")
def worker_int(worker):
"""Получен SIGINT"""
print(f"Worker {worker.pid} received SIGINT")
def worker_term(worker):
"""Получен SIGTERM"""
print(f"Worker {worker.pid} received SIGTERM")
def when_ready(server):
"""Сервер готов"""
print(f"Server ready on {server.address}")
def on_exit(server):
"""Сервер завершается"""
print("Server shutting down")
# Очистка ресурсов# Запуск с конфигом
gunicorn --config gunicorn_config.py app:app
# Graceful перезапуск
kill -HUP $(cat gunicorn.pid)
# Graceful shutdown
kill -TERM $(cat gunicorn.pid)
# Принудительное завершение (избегать!)
kill -KILL $(cat gunicorn.pid)# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
spec:
replicas: 3
template:
spec:
# Время на graceful shutdown
terminationGracePeriodSeconds: 30
containers:
- name: web
image: myapp:latest
ports:
- containerPort: 5000
# Health checks
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 5000
initialDelaySeconds: 5
periodSeconds: 5# Обработка SIGTERM в Kubernetes
import signal
import sys
import time
from flask import Flask, jsonify
app = Flask(__name__)
shutting_down = False
def handle_sigterm(signum, frame):
global shutting_down
shutting_down = True
print("SIGTERM received, starting graceful shutdown")
# Завершение текущих запросов
time.sleep(2) # Дать время на завершение
# Очистка ресурсов
sys.exit(0)
signal.signal(signal.SIGTERM, handle_sigterm)
@app.route('/ready')
def ready():
"""Проверка готовности"""
if shutting_down:
# Возвращаем 503, чтобы Kubernetes перестал слать трафик
return jsonify({'status': 'shutting_down'}), 503
return jsonify({'status': 'ready'}), 200┌─────────────────────────────────────────────────────────────────┐
│ Жизненный цикл процесса │
│ │
│ [Start] ──▶ [Init] ──▶ [Ready] ──▶ [Serving] ──▶ [Terminating] │
│ │ │ │ │ │
│ < 2 сек < 5 сек минуты/часы < 30 сек │
│ │
│ Init: Быстрая инициализация (подключения, кэш) │
│ Ready: Health check проходит, трафик идёт │
│ Serving: Обработка запросов │
│ Terminating: Graceful shutdown (SIGTERM → завершение → exit) │
└─────────────────────────────────────────────────────────────────┘
# ❌ Неправильно: создание соединения на каждый запрос
@app.route('/users')
def get_users():
conn = psycopg2.connect('postgresql://user:pass@db/app')
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# Соединение не закрывается!# ✅ Правильно: пул соединений
from psycopg2 import pool
# Создание пула при старте (быстро)
db_pool = pool.SimpleConnectionPool(
minconn=5,
maxconn=20,
dsn='postgresql://user:pass@db/app'
)
@app.route('/users')
def get_users():
# Взять соединение из пула
conn = db_pool.getconn()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
return cursor.fetchall()
finally:
# Вернуть соединение в пул
db_pool.putconn(conn)
def cleanup():
"""Закрытие пула при shutdown"""
db_pool.closeall()# ❌ Неправильно: нет повторных попыток
def call_external_api():
response = requests.get('https://api.example.com/data')
response.raise_for_status()
return response.json()# ✅ Правильно: retry с exponential backoff
import time
import random
from requests.exceptions import RequestException
def call_external_api(max_retries=5):
"""Повторные попытки с экспоненциальной задержкой"""
for attempt in range(max_retries):
try:
response = requests.get('https://api.example.com/data', timeout=5)
response.raise_for_status()
return response.json()
except RequestException as e:
if attempt == max_retries - 1:
raise # Последняя попытка failed
# Exponential backoff с jitter
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"Retry {attempt + 1} after {delay:.2f}s")
time.sleep(delay)from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
def call_unreliable_service():
"""Circuit breaker для ненадёжного сервиса"""
response = requests.get('https://unreliable.example.com/api')
response.raise_for_status()
return response.json()
# При 5 неудачных попытках circuit "размыкается"
# Следующие 30 секунд вызовы сразу возвращают ошибку
# Через 30 секунд circuit "полуразомкнут" — одна попытка# ❌ Загрузка всего при старте
def init():
# Загрузка 1 ГБ данных в память
load_all_data_to_memory() # 60 секунд
# Предварительное вычисление всего
precompute_everything() # 120 секунд
# Создание 1000 соединений
for i in range(1000):
create_connection() # 60 секундПроблема: Процесс не готов к работе 4+ минуты.
# ❌ Неправильно: нет обработки сигналов
import time
while True:
process_batch()
time.sleep(1)
# При SIGTERM процесс убивается немедленно
# Текущая операция прерывается, данные могут повредитьсяРешение:
# ✅ Правильно: обработка SIGTERM
import signal
import sys
should_stop = False
def handle_sigterm(signum, frame):
global should_stop
should_stop = True
signal.signal(signal.SIGTERM, handle_sigterm)
while not should_stop:
process_batch()
time.sleep(1)
# Корректная очистка
cleanup()# ❌ Неправильно: слишком долгий shutdown
def handle_sigterm(signum, frame):
# Обработка миллионов записей
process_all_pending_tasks() # 10 минут
# Отправка тысяч email
send_all_pending_emails() # 20 минут
sys.exit(0)
# Kubernetes убьёт процесс через 30 секунд (terminationGracePeriodSeconds)Решение:
# ✅ Правильно: быстрое завершение
def handle_sigterm(signum, frame):
# Сохранить состояние во внешнее хранилище
save_state_to_redis() # < 5 секунд
# Закрыть соединения
close_connections() # < 2 секунды
# Остальное доделают другие экземпляры
sys.exit(0)Задайте себе вопросы:
# app.py
import time
import psycopg2
# Медленная инициализация
print("Loading data...")
data = load_all_data() # 60 секунд
print("Creating connections...")
connections = [psycopg2.connect(...) for _ in range(50)] # 60 секунд
@app.route('/api/data')
def get_data():
return data
# Нет обработки SIGTERM
# При перезапуске данные теряются, соединения не закрываютсяПроблемы:
# app.py
import signal
import sys
import time
from flask import Flask
from psycopg2 import pool
app = Flask(__name__)
# Быстрая инициализация
db_pool = None
shutdown_in_progress = False
def init():
global db_pool
# Быстрое создание пула (соединения ленивые)
db_pool = pool.SimpleConnectionPool(5, 20, dsn='...')
print("Ready in < 2 seconds")
def graceful_shutdown(signum, frame):
global shutdown_in_progress
shutdown_in_progress = True
print("Graceful shutdown started")
# Подождать текущие запросы (до 25 секунд)
for _ in range(25):
if get_active_requests() == 0:
break
time.sleep(1)
# Закрыть пул соединений
if db_pool:
db_pool.closeall()
print("Shutdown complete")
sys.exit(0)
signal.signal(signal.SIGTERM, graceful_shutdown)
@app.route('/ready')
def ready():
if shutdown_in_progress:
return {'status': 'shutting_down'}, 503
return {'status': 'ready'}, 200
init()Результат:
Ключевой вывод: Процессы должны запускаться быстро (< 5 секунд) и завершаться корректно (graceful shutdown). Это обеспечивает отказоустойчивость, быстрое масштабирование и безопасное развёртывание в облачных средах.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.