Horizontal vs vertical scaling, load balancing алгоритмы, auto-scaling, rate limiting.
Когда ваш сервис начинает расти, правильные стратегии масштабирования и балансировки становятся критическими для поддержания производительности и доступности.
Масштабирование — это способность системы справляться с увеличением нагрузки. Балансировка нагрузки распределяет входящий трафик между несколькими серверами, обеспечивая отказоустойчивость и оптимальное использование ресурсов.
Ключевые вопросы, которые мы разберём:
Вертикальное масштабирование — увеличение ресурсов одного сервера (CPU, RAM, диск).
┌─────────────────┐
│ Один сервер │
│ CPU: 4 → 16 │
│ RAM: 8 → 64GB │
└─────────────────┘
Преимущества:
Недостатки:
Когда использовать:
Горизонтальное масштабирование — добавление новых серверов в кластер.
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Server 1│ │ Server 2│ │ Server 3│
└─────────┘ └─────────┘ └─────────┘
▲ ▲ ▲
└─────────────┼─────────────┘
│
┌───────────────┐
│ Load Balancer │
└───────────────┘
Преимущества:
Недостатки:
Когда использовать:
| Критерий | Вертикальное | Горизонтальное |
|---|---|---|
| Стоимость | Растёт экспоненциально | Линейная |
| Сложность | Низкая | Высокая |
| Доступность | Единая точка отказа | Отказоустойчивость |
| Downtime | Требуется | Не требуется |
| Максимум | Ограничено железом | Практически неограниченно |
Запросы распределяются по серверам циклически, по очереди.
Запрос 1 → Server A
Запрос 2 → Server B
Запрос 3 → Server C
Запрос 4 → Server A (цикл начинается заново)
Nginx конфигурация:
upstream backend {
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}
server {
listen 80;
location / {
proxy_pass http://backend;
}
}Когда использовать:
Преимущества:
Недостатки:
Запрос направляется на сервер с наименьшим количеством активных соединений.
Server A: 15 соединений
Server B: 3 соединения ← новый запрос пойдёт сюда
Server C: 8 соединений
Nginx конфигурация:
upstream backend {
least_conn;
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}HAProxy конфигурация:
backend app_servers
balance leastconn
server app1 192.168.1.10:8080 check
server app2 192.168.1.11:8080 check
server app3 192.168.1.12:8080 checkКогда использовать:
Клиент всегда направляется на один и тот же сервер на основе хэша его IP.
IP 192.168.1.100 → hash() → Server B
IP 192.168.1.101 → hash() → Server A
Nginx конфигурация:
upstream backend {
ip_hash;
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}Когда использовать:
Преимущества:
Недостатки:
Серверы получают нагрузку пропорционально их весу.
Nginx конфигурация:
upstream backend {
server 192.168.1.10:8080 weight=5; # 50% нагрузки
server 192.168.1.11:8080 weight=3; # 30% нагрузки
server 192.168.1.12:8080 weight=2; # 20% нагрузки
}HAProxy конфигурация:
backend app_servers
server app1 192.168.1.10:8080 weight 5 check
server app2 192.168.1.11:8080 weight 3 check
server app3 192.168.1.12:8080 weight 2 checkКогда использовать:
| Алгоритм | Лучше всего подходит для | Не подходит для |
|---|---|---|
| Round Robin | Одинаковые серверы, stateless | Долгие запросы, разные серверы |
| Least Connections | WebSocket, API с разной длительностью | Короткие запросы, одинаковая нагрузка |
| IP Hash | Sticky sessions, серверный кэш | Равномерная нагрузка, много клиентов |
| Weighted | Разные серверы, canary deploy | Одинаковые серверы |
Работает на транспортном уровне OSI (TCP/UDP). Принимает решения на основе IP и портов.
┌─────────────────────────────────────┐
│ Layer 4 LB │
│ Решения на основе: │
│ - IP адреса источника/назначения │
│ - TCP/UDP портов │
│ Не видит содержимое запроса │
└─────────────────────────────────────┘
Характеристики:
Примеры:
Nginx Layer 4 конфигурация:
stream {
upstream database {
least_conn;
server 192.168.1.10:5432;
server 192.168.1.11:5432;
server 192.168.1.12:5432;
}
server {
listen 5432;
proxy_pass database;
proxy_connect_timeout 1s;
}
}Работает на прикладном уровне OSI (HTTP/HTTPS). Может анализировать содержимое запроса.
┌─────────────────────────────────────┐
│ Layer 7 LB │
│ Решения на основе: │
│ - URL пути (/api, /static) │
│ - HTTP заголовков │
│ - Cookie │
│ - Метода запроса (GET, POST) │
└─────────────────────────────────────┘
Характеристики:
Примеры:
Nginx Layer 7 конфигурация:
http {
upstream api_servers {
least_conn;
server 192.168.1.10:8080;
server 192.168.1.11:8080;
}
upstream static_servers {
server 192.168.1.20:80;
server 192.168.1.21:80;
}
server {
listen 80;
server_name example.com;
# Маршрутизация по пути
location /api/ {
proxy_pass http://api_servers;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location /static/ {
proxy_pass http://static_servers;
}
# Маршрутизация по заголовку
location / {
if ($http_x_api_version = "v2") {
proxy_pass http://api_v2;
}
proxy_pass http://api_servers;
}
}
}| Критерий | Layer 4 | Layer 7 |
|---|---|---|
| Производительность | Выше (быстрее) | Ниже (глубокая инспекция) |
| Маршрутизация | По IP/порту | По URL, заголовкам, cookie |
| SSL termination | Нет | Да |
| Модификация запросов | Нет | Да |
| Протоколы | TCP/UDP | HTTP/HTTPS, gRPC, WebSocket |
# /etc/nginx/nginx.conf
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 4096;
use epoll;
multi_accept on;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
# Логирование
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'rt=$request_time uct="$upstream_connect_time" '
'uht="$upstream_header_time" urt="$upstream_response_time"';
access_log /var/log/nginx/access.log main;
# Оптимизации
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
# Балансировка API
upstream api_backend {
least_conn;
server 10.0.1.10:8000 weight=3 max_fails=3 fail_timeout=30s;
server 10.0.1.11:8000 weight=3 max_fails=3 fail_timeout=30s;
server 10.0.1.12:8000 weight=2 max_fails=3 fail_timeout=30s backup;
keepalive 32;
}
# Балансировка WebSocket
upstream websocket_backend {
ip_hash;
server 10.0.2.10:8001;
server 10.0.2.11:8001;
}
# Rate limiting zone
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
limit_conn_zone $binary_remote_addr zone=conn_limit:10m;
server {
listen 80;
server_name api.example.com;
# Redirect to HTTPS
return 301 https://$server_name$request_uri;
}
}# /etc/nginx/conf.d/https.conf
server {
listen 443 ssl http2;
server_name api.example.com;
# SSL конфигурация
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
ssl_prefer_server_ciphers off;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 1d;
# Rate limiting
limit_req zone=api_limit burst=20 nodelay;
limit_conn conn_limit 10;
location /api/ {
proxy_pass http://api_backend;
# Заголовки
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Таймауты
proxy_connect_timeout 5s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# Буферы
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
}
# WebSocket endpoint
location /ws/ {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_read_timeout 86400s;
}
# Health check endpoint
location /health {
access_log off;
return 200 "healthy\n";
add_header Content-Type text/plain;
}
}# /etc/haproxy/haproxy.cfg
global
log /dev/log local0
log /dev/log local1 notice
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin
stats timeout 30s
user haproxy
group haproxy
daemon
# Оптимизации
maxconn 4096
tune.ssl.default-dh-param 2048
defaults
log global
mode http
option httplog
option dontlognull
option forwardfor
option http-server-close
timeout connect 5s
timeout client 50s
timeout server 50s
timeout http-request 10s
timeout queue 30s
errorfile 400 /etc/haproxy/errors/400.http
errorfile 403 /etc/haproxy/errors/403.http
errorfile 408 /etc/haproxy/errors/408.http
errorfile 500 /etc/haproxy/errors/500.http
errorfile 502 /etc/haproxy/errors/502.http
errorfile 503 /etc/haproxy/errors/503.http
errorfile 504 /etc/haproxy/errors/504.http
# Stats dashboard
listen stats
bind *:8404
stats enable
stats uri /stats
stats refresh 10s
stats admin if LOCALHOST
# Frontend для HTTP
frontend http_front
bind *:80
http-request redirect scheme https unless { ssl_fc }
# Frontend для HTTPS
frontend https_front
bind *:443 ssl crt /etc/haproxy/certs/example.com.pem
http-request set-header X-Forwarded-Proto https
# Rate limiting с stick-table
stick-table type ip size 100k expire 30s store http_req_rate(10s)
http-request track-sc0 src
http-request deny deny_status 429 if { sc_http_req_rate(0) gt 100 }
# ACL для маршрутизации
acl is_api path_beg /api
acl is_websocket hdr(Upgrade) -i WebSocket
use_backend api_backend if is_api
use_backend ws_backend if is_websocket
default_backend web_backend
# API Backend
backend api_backend
balance leastconn
option httpchk GET /health
http-check expect status 200
server api1 10.0.1.10:8000 check weight 3 maxconn 100
server api2 10.0.1.11:8000 check weight 3 maxconn 100
server api3 10.0.1.12:8000 check weight 2 maxconn 100 backup
# WebSocket Backend
backend ws_backend
balance source
option httpchk GET /health
timeout tunnel 1h
server ws1 10.0.2.10:8001 check
server ws2 10.0.2.11:8001 check
# Static content Backend
backend web_backend
balance roundrobin
option httpchk GET /health
server web1 10.0.3.10:80 check
server web2 10.0.3.11:80 checkHPA автоматически масштабирует количество подов на основе метрик (CPU, память, кастомные метрики).
# hpa-basic.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: api-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: api-deployment
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 4
periodSeconds: 15
selectPolicy: MaxМасштабирование по метрикам приложения (RPS, latency, queue length).
# hpa-custom-metrics.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: api-hpa-custom
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: api-deployment
minReplicas: 3
maxReplicas: 50
metrics:
# CPU
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# Requests per second (через Prometheus Adapter)
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: 100
# P95 latency
- type: Pods
pods:
metric:
name: http_request_duration_seconds
selector:
matchLabels:
quantile: "0.95"
target:
type: AverageValue
averageValue: "0.5"
behavior:
scaleDown:
stabilizationWindowSeconds: 600
policies:
- type: Percent
value: 5
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 50
periodSeconds: 15VPA автоматически подбирает requests/limits для подов.
# vpa.yaml
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: api-vpa
namespace: production
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: api-deployment
updatePolicy:
updateMode: "Auto" # Auto, Off, Recreate
resourcePolicy:
containerPolicies:
- containerName: api
minAllowed:
cpu: 100m
memory: 128Mi
maxAllowed:
cpu: 2
memory: 2Gi
controlledResources: ["cpu", "memory"]KEDA масштабирует на основе событий (очереди, Kafka, cron).
# keda-scaledobject.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: worker-scaledobject
namespace: production
spec:
scaleTargetRef:
name: worker-deployment
minReplicaCount: 1
maxReplicaCount: 100
cooldownPeriod: 300
pollingInterval: 10
triggers:
# Масштабирование по RabbitMQ очереди
- type: rabbitmq
metadata:
queueName: tasks
host: {{ env "RABBITMQ_HOST" }}
queueLength: "10" # 1 под на 10 сообщений
# Масштабирование по Kafka
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: worker-group
topic: events
lagThreshold: "100"
# Cron-based scaling (ночью меньше подов)
- type: cron
metadata:
timezone: "UTC"
start: "0 22 * * *"
end: "0 6 * * *"
desiredReplicas: "1"Распределение нагрузки чтения между репликами.
┌─────────────┐
│ Master │ ← Запись (INSERT, UPDATE, DELETE)
│ (Read/Write)│
└──────┬──────┘
│ Replication
├──────────────┬──────────────┬──────────────┐
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Replica 1│ │ Replica 2│ │ Replica 3│ │ Replica 4│
│ (Read) │ │ (Read) │ │ (Read) │ │ (Read) │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Python пример с балансировкой чтения:
# database/router.py
from typing import Type
from django.db import connections
from django.db.models import Model
import random
class ReadReplicaRouter:
"""Маршрутизация запросов: запись на master, чтение на реплики"""
def db_for_read(self, model: Type[Model], **hints):
"""Выбор случайной реплики для чтения"""
read_replicas = ['replica1', 'replica2', 'replica3']
return random.choice(read_replicas)
def db_for_write(self, model: Type[Model], **hints):
"""Все записи идут на master"""
return 'default'
def allow_relation(self, obj1, obj2, **hints):
"""Разрешаем отношения между БД"""
db_list = ('default', 'replica1', 'replica2', 'replica3')
if obj1._state.db in db_list and obj2._state.db in db_list:
return True
return None
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""Миграции только на master"""
return db == 'default'# database/connection_pool.py
import psycopg2
from psycopg2 import pool
from contextlib import contextmanager
class DatabaseConnectionPool:
"""Connection pooling для PostgreSQL"""
def __init__(self, dsn: str, minconn: int = 5, maxconn: int = 20):
self.pool = psycopg2.pool.ThreadedConnectionPool(
minconn=minconn,
maxconn=maxconn,
dsn=dsn
)
@contextmanager
def get_connection(self):
"""Контекстный менеджер для получения соединения"""
conn = self.pool.getconn()
try:
yield conn
finally:
self.pool.putconn(conn)
def execute_read(self, query: str, params: tuple = None):
"""Выполнение read-запроса на реплике"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
def execute_write(self, query: str, params: tuple = None):
"""Выполнение write-запроса на master с коммитом"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
conn.commit()
return cur.rowcount
# Использование
pool = DatabaseConnectionPool(
dsn="postgresql://user:pass@replica1:5432/mydb"
)
# Чтение
users = pool.execute_read("SELECT * FROM users WHERE active = %s", (True,))
# Запись
pool.execute_write(
"INSERT INTO users (name, email) VALUES (%s, %s)",
("John", "john@example.com")
)Горизонтальное разделение данных по разным серверам.
Стратегии шардирования:
Shard 1: user_id 1-1000000
Shard 2: user_id 1000001-2000000
Shard 3: user_id 2000001-3000000
def get_shard_id(user_id: int, num_shards: int = 4) -> int:
return hash(user_id) % num_shardsОтдельная БД хранит маппинг: user_id → shard_id
Python пример шардирования:
# database/sharding.py
import hashlib
from typing import Dict, List, Any
import psycopg2
class ShardManager:
"""Менеджер шардирования базы данных"""
def __init__(self, shard_configs: List[Dict[str, str]]):
"""
shard_configs: [
{'id': 0, 'dsn': 'postgresql://.../shard0'},
{'id': 1, 'dsn': 'postgresql://.../shard1'},
]
"""
self.shards = {cfg['id']: cfg['dsn'] for cfg in shard_configs}
self.num_shards = len(shard_configs)
def get_shard_id(self, key: str) -> int:
"""Вычисление ID шарда по хэшу ключа"""
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
return hash_value % self.num_shards
def get_connection(self, key: str):
"""Получение соединения для нужного шарда"""
shard_id = self.get_shard_id(key)
dsn = self.shards[shard_id]
return psycopg2.connect(dsn)
def get_user(self, user_id: int) -> Dict[str, Any]:
"""Получение пользователя с правильного шарда"""
shard_key = f"user:{user_id}"
with self.get_connection(shard_key) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, name, email FROM users WHERE id = %s",
(user_id,)
)
row = cur.fetchone()
if row:
return {'id': row[0], 'name': row[1], 'email': row[2]}
return None
def create_user(self, user_id: int, name: str, email: str):
"""Создание пользователя на правильном шарде"""
shard_key = f"user:{user_id}"
with self.get_connection(shard_key) as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",
(user_id, name, email)
)
conn.commit()
# Использование
shard_manager = ShardManager([
{'id': 0, 'dsn': 'postgresql://user:pass@shard0:5432/users'},
{'id': 1, 'dsn': 'postgresql://user:pass@shard1:5432/users'},
{'id': 2, 'dsn': 'postgresql://user:pass@shard2:5432/users'},
{'id': 3, 'dsn': 'postgresql://user:pass@shard3:5432/users'},
])
# Запросы автоматически идут на правильный шард
user = shard_manager.get_user(12345)
shard_manager.create_user(67890, "Alice", "alice@example.com")Пул соединений уменьшает накладные расходы на создание соединений.
SQLAlchemy connection pool:
# database/sqlalchemy_pool.py
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool
# Создание engine с connection pool
engine = create_engine(
"postgresql://user:pass@localhost:5432/mydb",
poolclass=QueuePool,
pool_size=20, # Количество постоянных соединений
max_overflow=10, # Дополнительные соединения при пике
pool_timeout=30, # Таймаут ожидания соединения
pool_recycle=1800, # Пересоздание соединений через 30 мин
pool_pre_ping=True, # Проверка соединения перед использованием
)
# Мониторинг пула
@event.listens_for(engine, "connect")
def on_connect(dbapi_conn, connection_record):
print("New connection created")
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
print("Connection checked out from pool")
@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_record):
print("Connection returned to pool")
# Использование
from sqlalchemy.orm import sessionmaker
SessionLocal = sessionmaker(bind=engine)
def get_user(user_id: int):
session = SessionLocal()
try:
return session.query(User).filter(User.id == user_id).first()
finally:
session.close() # Возвращает соединение в пулТокены добавляются в "ведро" с постоянной скоростью. Каждый запрос потребляет токен.
┌─────────────────┐
│ Bucket │
│ Capacity: 100 │
│ Tokens: 50 │
│ Refill: 10/s │
└────────┬────────┘
│
Запрос → -1 токен
Нет токенов → 429 Too Many Requests
Python реализация:
# rate_limiting/token_bucket.py
import time
from threading import Lock
from typing import Dict
class TokenBucket:
"""Rate limiter с алгоритмом Token Bucket"""
def __init__(self, capacity: int, refill_rate: float):
"""
capacity: максимальное количество токенов
refill_rate: токенов в секунду
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self._lock = Lock()
def _refill(self):
"""Пополнение ведра токенами"""
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def consume(self, tokens: int = 1) -> bool:
"""
Попытка потребить токены.
Возвращает True если успешно, False если лимит превышен.
"""
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def get_wait_time(self, tokens: int = 1) -> float:
"""Время ожидания до доступности токенов"""
with self._lock:
self._refill()
if self.tokens >= tokens:
return 0
needed = tokens - self.tokens
return needed / self.refill_rate
class RateLimiter:
"""Rate limiter для множества клиентов"""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self.buckets: Dict[str, TokenBucket] = {}
self._lock = Lock()
def _get_bucket(self, client_id: str) -> TokenBucket:
"""Получение или создание bucket для клиента"""
with self._lock:
if client_id not in self.buckets:
self.buckets[client_id] = TokenBucket(
self.capacity, self.refill_rate
)
return self.buckets[client_id]
def is_allowed(self, client_id: str) -> bool:
"""Проверка разрешён ли запрос для клиента"""
bucket = self._get_bucket(client_id)
return bucket.consume()
def get_retry_after(self, client_id: str) -> float:
"""Время до следующего разрешённого запроса"""
bucket = self._get_bucket(client_id)
return bucket.get_wait_time()
# Использование в FastAPI
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
rate_limiter = RateLimiter(capacity=100, refill_rate=10) # 10 req/s, burst 100
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_id = request.client.host
if not rate_limiter.is_allowed(client_id):
retry_after = rate_limiter.get_retry_after(client_id)
return JSONResponse(
status_code=429,
content={"detail": "Too many requests"},
headers={"Retry-After": str(int(retry_after) + 1)}
)
response = await call_next(request)
return responseЗапросы поступают в очередь фиксированного размера и обрабатываются с постоянной скоростью.
┌─────────────────┐
│ Queue │
│ Size: 10 │
│ Drain: 5/s │
└────────┬────────┘
│
Запрос → в очередь
Очередь полная → 429
Обработка → 5 запросов/сек
Python реализация:
# rate_limiting/leaky_bucket.py
import time
from collections import deque
from threading import Lock
from typing import Deque
class LeakyBucket:
"""Rate limiter с алгоритмом Leaky Bucket"""
def __init__(self, capacity: int, drain_rate: float):
"""
capacity: максимальный размер очереди
drain_rate: запросов обрабатывается в секунду
"""
self.capacity = capacity
self.drain_rate = drain_rate
self.queue: Deque[float] = deque()
self._lock = Lock()
def _drain(self):
"""Удаление обработанных запросов из очереди"""
now = time.time()
# Сколько запросов можно обработать за прошедшее время
max_drain = int((now - self.queue[0]) * self.drain_rate) if self.queue else 0
for _ in range(min(max_drain, len(self.queue))):
self.queue.popleft()
def consume(self) -> bool:
"""Попытка добавить запрос в очередь"""
with self._lock:
self._drain()
if len(self.queue) < self.capacity:
self.queue.append(time.time())
return True
return False
def get_queue_size(self) -> int:
"""Текущий размер очереди"""
with self._lock:
self._drain()
return len(self.queue)
# Использование
limiter = LeakyBucket(capacity=10, drain_rate=5) # 5 req/s, queue size 10
def handle_request(client_id: str):
if not limiter.consume():
return {"error": "Rate limit exceeded", "status": 429}
return {"status": "OK"}Подсчёт запросов в скользящем временном окне.
Время: 0 10 20 30 40 50 60 (сек)
Окно: [───────────────────]
Запросы: 5 8 3 7 4 2 1
Итого в окне: 29 запросов (лимит 30)
Python реализация:
# rate_limiting/sliding_window.py
import time
from collections import defaultdict
from threading import Lock
from typing import Dict, List
class SlidingWindowCounter:
"""Rate limiter с алгоритмом Sliding Window Counter"""
def __init__(self, limit: int, window_seconds: int):
"""
limit: максимальное количество запросов в окне
window_seconds: размер окна в секундах
"""
self.limit = limit
self.window_seconds = window_seconds
self.requests: Dict[str, List[float]] = defaultdict(list)
self._lock = Lock()
def _clean_old(self, client_id: str):
"""Удаление запросов вне текущего окна"""
now = time.time()
cutoff = now - self.window_seconds
self.requests[client_id] = [
ts for ts in self.requests[client_id] if ts > cutoff
]
def is_allowed(self, client_id: str) -> bool:
"""Проверка разрешён ли запрос"""
with self._lock:
self._clean_old(client_id)
if len(self.requests[client_id]) < self.limit:
self.requests[client_id].append(time.time())
return True
return False
def get_remaining(self, client_id: str) -> int:
"""Оставшееся количество запросов в окне"""
with self._lock:
self._clean_old(client_id)
return max(0, self.limit - len(self.requests[client_id]))
def get_reset_time(self, client_id: str) -> float:
"""Время до сброса счётчика (когда oldest запрос выйдет из окна)"""
with self._lock:
self._clean_old(client_id)
if not self.requests[client_id]:
return 0
oldest = min(self.requests[client_id])
return max(0, oldest + self.window_seconds - time.time())
class SlidingWindowLog:
"""Более точная версия с логированием каждого запроса"""
def __init__(self, limit: int, window_seconds: int):
self.limit = limit
self.window_seconds = window_seconds
self.logs: Dict[str, List[float]] = defaultdict(list)
self._lock = Lock()
def is_allowed(self, client_id: str) -> bool:
"""Проверка с использованием взвешенного подсчёта"""
now = time.time()
window_start = now - self.window_seconds
with self._lock:
# Удаляем старые записи
self.logs[client_id] = [
ts for ts in self.logs[client_id] if ts > window_start
]
# Взвешенный подсчёт (учитываем часть предыдущего окна)
previous_window_start = window_start - self.window_seconds
previous_count = sum(
1 for ts in self.logs.get(client_id, [])
if previous_window_start < ts <= window_start
)
current_count = len(self.logs[client_id])
# Взвешенная сумма: часть предыдущего окна + текущее окно
weight = (now - window_start) / self.window_seconds
weighted_count = previous_count * (1 - weight) + current_count
if weighted_count < self.limit:
self.logs[client_id].append(now)
return True
return False
# Использование в Redis (production-ready)
# rate_limiting/redis_sliding_window.py
import redis
from typing import Optional
class RedisSlidingWindow:
"""Rate limiter с Redis для распределённой системы"""
def __init__(self, redis_client: redis.Redis, limit: int, window_seconds: int):
self.redis = redis_client
self.limit = limit
self.window_seconds = window_seconds
def is_allowed(self, client_id: str) -> bool:
"""Атомарная проверка с использованием Lua скрипта"""
key = f"rate_limit:{client_id}"
now = int(time.time() * 1000) # миллисекунды
window_ms = self.window_seconds * 1000
lua_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- Удаляем старые записи
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- Считаем количество запросов в окне
local count = redis.call('ZCARD', key)
if count < limit then
-- Добавляем текущий запрос
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, math.ceil(window / 1000))
return 1
else
return 0
end
"""
result = self.redis.eval(
lua_script, 1, key, self.limit, window_ms, now
)
return result == 1
# Использование
redis_client = redis.Redis(host='localhost', port=6379)
limiter = RedisSlidingWindow(redis_client, limit=100, window_seconds=60)
if limiter.is_allowed("user:123"):
# Обработать запрос
pass
else:
# Вернуть 429
pass| Алгоритм | Burst | Равномерность | Сложность | Когда использовать |
|---|---|---|---|---|
| Token Bucket | Да | Средняя | Низкая | API с допустимыми всплесками |
| Leaky Bucket | Нет | Высокая | Средняя | Стабильная нагрузка, очереди |
| Sliding Window | Зависит | Высокая | Средняя | Точный подсчёт, распределённые системы |
# Rate limiting в Nginx
http {
# Зона для rate limiting по IP
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
# Зона для connection limiting
limit_conn_zone $binary_remote_addr zone=conn_limit:10m;
server {
location /api/ {
# Rate limiting с burst
limit_req zone=api_limit burst=20 nodelay;
# Connection limiting
limit_conn conn_limit 10;
# Custom error page
limit_req_status 429;
limit_conn_status 429;
proxy_pass http://backend;
}
# Разные лимиты для разных endpoints
location /api/expensive/ {
limit_req zone=api_limit burst=5 nodelay;
proxy_pass http://backend;
}
location /api/search/ {
# Отдельная зона для поиска
limit_req_zone $binary_remote_addr zone=search_limit:10m rate=1r/s;
limit_req zone=search_limit burst=3;
proxy_pass http://backend;
}
}
}Малая нагрузка (< 1000 RPS)
├── Вертикальное масштабирование
└── Один сервер + backup
Средняя нагрузка (1000-10000 RPS)
├── Горизонтальное масштабирование (2-10 серверов)
├── Load balancer (Nginx/HAProxy)
├── Read replicas для БД
└── Connection pooling
Высокая нагрузка (> 10000 RPS)
├── Автоскейлинг (Kubernetes HPA)
├── Шардирование БД
├── Кэширование (Redis, CDN)
├── Rate limiting
└── Микросервисы
# Prometheus alerting rules для масштабирования
groups:
- name: scaling
rules:
- alert: HighCPUUsage
expr: avg(rate(container_cpu_usage_seconds_total[5m])) > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "High CPU usage detected"
- alert: HighRequestLatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: critical
annotations:
summary: "P95 latency above 500ms"
- alert: ConnectionPoolExhausted
expr: database_connection_pool_available == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Database connection pool exhausted"Масштабирование и балансировка нагрузки — критические компоненты для построения надёжных высоконагруженных систем.
Ключевые выводы:
Дальнейшие шаги:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.