Масштабирование через модель процессов и горизонтальное расширение
Масштабируйтесь через модель процессов
Concurrency — восьмой фактор 12-Factor App. Принцип гласит:
Масштабируйте приложение через запуск дополнительных процессов, а не через потоки внутри одного процесса.
┌─────────────────────────────────────────────────────────┐
│ Load Balancer │
└─────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Process 1 │ │ Process 2 │ │ Process 3 │
│ (web) │ │ (web) │ │ (web) │
│ Port 5000 │ │ Port 5001 │ │ Port 5002 │
└────────────┘ └────────────┘ └────────────┘
# ❌ Масштабирование через потоки внутри одного процесса
from threading import Thread
from flask import Flask
app = Flask(__name__)
def handle_request():
# Обработка запроса в потоке
pass
# Запуск 100 потоков в одном процессе
for i in range(100):
thread = Thread(target=handle_request)
thread.start()Проблемы потоков в Python:
# ✅ Масштабирование через процессы
# app.py — одинаковый код для всех процессов
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello():
return 'Hello from process!'# Запуск нескольких процессов
gunicorn --bind 0.0.0.0:5000 --workers 4 app:app
# Каждый воркер — отдельный процессПреимущества процессов:
# web.py
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/api/users', methods=['GET'])
def get_users():
return jsonify({'users': []})
@app.route('/health')
def health():
return jsonify({'status': 'healthy'})# Запуск веб-процессов
gunicorn --bind 0.0.0.0:5000 --workers 4 web:appХарактеристики:
# worker.py
from celery import Celery
from redis import Redis
app = Celery('worker', broker='redis://localhost:6379/0')
@app.task
def send_email(user_id, message):
# Длительная операция
# Отправка email, генерация отчёта, обработка видео
pass
@app.task
def process_video(video_id):
# Тяжёлая вычислительная задача
pass# Запуск воркеров
celery -A worker worker --loglevel=info --concurrency=4Характеристики:
# scheduler.py
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', hour=0, minute=0)
def daily_report():
"""Ежедневный отчёт в полночь"""
print(f"Generating report at {datetime.now()}")
@scheduler.scheduled_job('interval', minutes=5)
def health_check():
"""Проверка здоровья каждые 5 минут"""
print("Health check passed")
if __name__ == '__main__':
scheduler.start()# Запуск планировщика
python scheduler.pyХарактеристики:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Веб- │────▶│ Очередь │────▶│ Воркер │
│ процессы │ │ (Redis) │ │ процессы │
│ │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
# tasks.py
from celery import Celery
app = Celery(
'tasks',
broker='redis://redis:6379/0',
backend='redis://redis:6379/1'
)
@app.task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
"""Отправка приветственного email"""
try:
# Логика отправки
print(f"Sending email to user {user_id}")
except Exception as exc:
# Повторная попытка при ошибке
raise self.retry(exc=exc, countdown=60)
@app.task
def generate_report(report_id, start_date, end_date):
"""Генерация отчёта"""
# Длительная операция
pass# web.py
from flask import Flask, request
from tasks import send_welcome_email, generate_report
app = Flask(__name__)
@app.route('/users', methods=['POST'])
def create_user():
user_id = create_user_in_db()
# Асинхронная отправка email
send_welcome_email.delay(user_id)
return {'user_id': user_id}, 201
@app.route('/reports', methods=['POST'])
def create_report():
report_id = create_report_in_db()
# Асинхронная генерация
generate_report.delay(report_id, '2024-01-01', '2024-01-31')
return {'report_id': report_id, 'status': 'processing'}, 202# Запуск компонентов
# Веб-процессы
gunicorn --bind 0.0.0.0:5000 --workers 4 web:app
# Воркеры
celery -A tasks worker --loglevel=info --concurrency=8
# Планировщик (опционально)
celery -A tasks beat --loglevel=infoВертикальное масштабирование (Scale Up):
┌──────────────────────┐
│ Один процесс │
│ ████████████████ │ 100% CPU
│ ████████████████ │
│ ████████████████ │
└──────────────────────┘
Больше RAM/CPU
Горизонтальное масштабирование (Scale Out):
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│Process │ │Process │ │Process │ │Process │
│ ██ │ │ ██ │ │ ██ │ │ ██ │ 25% CPU каждый
└────────┘ └────────┘ └────────┘ └────────┘
Больше экземпляров
# Формула Gunicorn
workers = (2 × cpu_cores) + 1
# Пример для 4 ядер
# workers = (2 × 4) + 1 = 9 воркеров# gunicorn_config.py
import multiprocessing
# Автоматический расчёт количества воркеров
workers = multiprocessing.cpu_count() * 2 + 1
# Для I/O-bound приложений (веб-запросы)
# Можно больше воркеров
# Для CPU-bound приложений (вычисления)
# workers = cpu_cores (или меньше)# Sync воркеры (по умолчанию)
# Подходит для CPU-bound задач
gunicorn --worker-class sync app:app
# Async воркеры (gevent, eventlet)
# Подходит для I/O-bound задач с долгими соединениями
gunicorn --worker-class gevent app:app
# Tornado воркеры
# Для WebSocket и long-polling
gunicorn --worker-class tornado app:appversion: '3.8'
services:
# Веб-процессы
web:
build: .
command: gunicorn --bind 0.0.0.0:5000 --workers 4 web:app
ports:
- "5000:5000"
depends_on:
- redis
- db
deploy:
replicas: 3 # 3 экземпляра веб-процессов
# Воркеры
worker:
build: .
command: celery -A tasks worker --loglevel=info --concurrency=4
depends_on:
- redis
- db
deploy:
replicas: 2 # 2 экземпляра воркеров
# Планировщик
scheduler:
build: .
command: celery -A tasks beat --loglevel=info
depends_on:
- redis
deploy:
replicas: 1 # Только один планировщик
redis:
image: redis:7-alpine
db:
image: postgres:15# Запуск всех сервисов
docker-compose up -d
# Масштабирование веб-процессов
docker-compose up -d --scale web=5
# Масштабирование воркеров
docker-compose up -d --scale worker=4
# Просмотр запущенных контейнеров
docker-compose ps# web-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: web
spec:
replicas: 4 # 4 веб-процесса
selector:
matchLabels:
app: web
type: frontend
template:
metadata:
labels:
app: web
type: frontend
spec:
containers:
- name: web
image: myapp:latest
command: ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "2", "web:app"]
ports:
- containerPort: 5000
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"# worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: worker
spec:
replicas: 3 # 3 воркера
selector:
matchLabels:
app: worker
type: backend
template:
metadata:
labels:
app: worker
type: backend
spec:
containers:
- name: worker
image: myapp:latest
command: ["celery", "-A", "tasks", "worker", "--loglevel=info", "--concurrency=4"]
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"# scheduler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: scheduler
spec:
replicas: 1 # Только один планировщик
selector:
matchLabels:
app: scheduler
template:
metadata:
labels:
app: scheduler
spec:
containers:
- name: scheduler
image: myapp:latest
command: ["celery", "-A", "tasks", "beat", "--loglevel=info"]# web-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: web-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: web
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80# Применение HPA
kubectl apply -f web-hpa.yaml
# Просмотр статуса
kubectl get hpa web-hpa
# Ручное масштабирование
kubectl scale deployment web --replicas=5# async_app.py
import asyncio
import aiohttp
from aiohttp import web
async def fetch_data(session, url):
"""Асинхронный HTTP-запрос"""
async with session.get(url) as response:
return await response.json()
async def handle_request(request):
"""Обработка запроса с асинхронными операциями"""
async with aiohttp.ClientSession() as session:
# Параллельное выполнение нескольких запросов
tasks = [
fetch_data(session, 'https://api.example.com/users'),
fetch_data(session, 'https://api.example.com/posts'),
fetch_data(session, 'https://api.example.com/comments'),
]
results = await asyncio.gather(*tasks)
return web.json_response({
'users': results[0],
'posts': results[1],
'comments': results[2]
})
app = web.Application()
app.router.add_get('/', handle_request)
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0', port=5000)# fastapi_app.py
from fastapi import FastAPI
import asyncio
import aiofiles
app = FastAPI()
@app.get('/users/{user_id}')
async def get_user(user_id: int):
# Асинхронный запрос к БД
await asyncio.sleep(0.1) # Имитация запроса
return {'user_id': user_id}
@app.post('/upload')
async def upload_file(file: bytes):
# Асинхронная запись файла
async with aiofiles.open(f'/tmp/{file.filename}', 'wb') as f:
await f.write(file)
return {'status': 'uploaded'}# Запуск с Uvicorn
uvicorn fastapi_app:app --host 0.0.0.0 --port 5000 --workers 4# ❌ Неправильно: один процесс для веб, воркеров и планировщика
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
app = Flask(__name__)
scheduler = BackgroundScheduler()
@app.route('/')
def web_handler():
# Веб-запросы
pass
@scheduler.scheduled_job('interval', minutes=5)
def scheduled_task():
# Периодические задачи
pass
def background_worker():
# Фоновая обработка
pass
# Всё в одном процессе — сложно масштабироватьПроблема: Невозможно масштабировать веб-процессы и воркеры независимо.
# ❌ Неправильно: блокирующая операция в веб-процессе
@app.route('/generate-report')
def generate_report():
# Длительная операция блокирует воркер
time.sleep(30) # Блокировка на 30 секунд
return {'report': '...'}Решение:
# ✅ Правильно: асинхронная обработка через очередь
@app.route('/generate-report')
def generate_report():
task = generate_report_task.delay()
return {'task_id': task.id, 'status': 'processing'}, 202# ❌ Неправильно: попытка разделить состояние между процессами
cache = {} # Глобальная переменная
@app.route('/cache/{key}')
def get_cache(key):
return cache.get(key) # Каждый процесс имеет свой cache!Решение:
# ✅ Правильно: внешнее хранилище состояния
from redis import Redis
cache = Redis()
@app.route('/cache/{key}')
def get_cache(key):
return cache.get(key) # Общее хранилище для всех процессовЗадайте себе вопросы:
# monolith.py
from threading import Thread
from flask import Flask
import time
app = Flask(__name__)
# Глобальное состояние — проблема для масштабирования
active_tasks = []
@app.route('/process')
def process():
# Блокирующая операция
time.sleep(10)
return 'Done'
def background_task():
# Фоновая задача в потоке
while True:
time.sleep(60)
# Делает что-то
# Запуск фона в потоке
Thread(target=background_task, daemon=True).start()Проблемы:
# web.py
from flask import Flask, request
from tasks import process_data
app = Flask(__name__)
@app.route('/process')
def process():
task = process_data.delay()
return {'task_id': task.id, 'status': 'queued'}, 202# tasks.py
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_data():
# Длительная операция в воркере
time.sleep(10)
return 'Done'# scheduler.py
from apscheduler.schedulers.blocking import BlockingScheduler
from tasks import cleanup
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', hour=0)
def daily_cleanup():
cleanup.delay()
if __name__ == '__main__':
scheduler.start()# Запуск
gunicorn --bind 0.0.0.0:5000 --workers 4 web:app
celery -A tasks worker --loglevel=info --concurrency=8
python scheduler.pyРезультат:
Ключевой вывод: Масштабируйте приложение через запуск дополнительных процессов, а не через потоки. Разделяйте веб-процессы, воркеры и планировщики для независимого масштабирования каждого типа.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.