Latency-aware routing, consistent hashing, load balancing algorithms
«Не все серверы одинаковы». Умная балансировка учитывает задержки и загрузку, а не только количество соединений.
Клиент → Load Balancer → Server 1 (10 мс)
→ Server 2 (100 мс) ← медленный из-за GC
→ Server 3 (15 мс)
Round Robin распределяет равномерно, но 25% запросов идут на медленный Server 2.
Проблема: Round Robin не учитывает:
class RoundRobinBalancer:
"""Простой циклический перебор."""
def __init__(self, servers):
self.servers = servers
self.current = 0
def get_server(self):
server = self.servers[self.current]
self.current = (self.current + 1) % len(self.servers)
return server
# Использование
balancer = RoundRobinBalancer(['s1', 's2', 's3'])
balancer.get_server() # s1
balancer.get_server() # s2
balancer.get_server() # s3
balancer.get_server() # s1Плюсы:
Минусы:
import heapq
from threading import Lock
class LeastConnectionsBalancer:
"""Отправляет запрос на сервер с наименьшим количеством активных соединений."""
def __init__(self, servers):
# (active_connections, server)
self.servers = [(0, server) for server in servers]
self.lock = Lock()
def get_server(self):
with self.lock:
# Находим сервер с минимальным количеством соединений
min_conn, server = min(self.servers, key=lambda x: x[0])
# Увеличиваем счётчик
idx = self.servers.index((min_conn, server))
self.servers[idx] = (min_conn + 1, server)
return server
def release_connection(self, server):
with self.lock:
for i, (conn_count, srv) in enumerate(self.servers):
if srv == server:
self.servers[i] = (conn_count - 1, srv)
break
# Использование
balancer = LeastConnectionsBalancer(['s1', 's2', 's3'])
# Запрос 1 → s1 (0 соединений)
server = balancer.get_server()
# ... обработка ...
balancer.release_connection(server)Плюсы:
Минусы:
import time
from collections import deque
from threading import Lock
class LatencyAwareBalancer:
"""
Балансировка на основе latency.
Серверы с меньшей latency получают больше трафика.
"""
def __init__(self, servers, window_size=100):
self.servers = servers
self.window_size = window_size
# Скользящее окно latency для каждого сервера
self.latencies = {
server: deque(maxlen=window_size)
for server in servers
}
# Веса (обратно пропорциональны latency)
self.weights = {server: 1.0 for server in servers}
self.lock = Lock()
def record_latency(self, server, latency_ms):
"""Записывает latency для сервера."""
with self.lock:
self.latencies[server].append(latency_ms)
self._update_weights()
def _update_weights(self):
"""Обновляет веса на основе средней latency."""
for server in self.servers:
if self.latencies[server]:
avg_latency = sum(self.latencies[server]) / len(self.latencies[server])
# Вес обратно пропорционален latency
self.weights[server] = 1.0 / avg_latency
else:
self.weights[server] = 1.0 # Default weight
def get_server(self):
"""Выбирает сервер с вероятностью, пропорциональной весу."""
import random
with self.lock:
total_weight = sum(self.weights.values())
if total_weight == 0:
return random.choice(self.servers)
# Weighted random selection
r = random.uniform(0, total_weight)
cumulative = 0
for server in self.servers:
cumulative += self.weights[server]
if r <= cumulative:
return server
return self.servers[-1]
# Использование с измерением latency
balancer = LatencyAwareBalancer(['s1', 's2', 's3'])
def make_request():
server = balancer.get_server()
start = time.time()
response = call_server(server)
latency_ms = (time.time() - start) * 1000
# Записываем latency для будущей балансировки
balancer.record_latency(server, latency_ms)
return responseПлюсы:
Минусы:
Простой и эффективный алгоритм.
import random
class PowerOfTwoChoicesBalancer:
"""
Выбирает 2 случайных сервера и отправляет запрос на менее загруженный.
Теоретически доказано: даже выбор из 2 случайных серверов
даёт экспоненциально лучшее распределение, чем полный random.
"""
def __init__(self, servers):
self.servers = servers
self.connections = {server: 0 for server in servers}
def get_server(self):
# Выбираем 2 случайных сервера
candidates = random.sample(self.servers, min(2, len(self.servers)))
# Выбираем сервер с меньшим количеством соединений
return min(candidates, key=lambda s: self.connections[s])
def acquire(self, server):
self.connections[server] += 1
def release(self, server):
self.connections[server] -= 1
# Использование
balancer = PowerOfTwoChoicesBalancer(['s1', 's2', 's3', 's4', 's5'])
def make_request():
server = balancer.get_server()
balancer.acquire(server)
try:
return call_server(server)
finally:
balancer.release(server)Плюсы:
Минусы:
Для кэшей и баз данных, где важно попадание на тот же сервер.
# ❌ Плохо: при изменении количества серверов перемещается всё
def get_server(key, servers):
return servers[hash(key) % len(servers)]
# Было 3 сервера:
# hash('user:123') % 3 = 1 → Server 2
# hash('user:456') % 3 = 0 → Server 1
# Стало 4 сервера:
# hash('user:123') % 4 = 2 → Server 3 (переместился!)
# hash('user:456') % 4 = 1 → Server 2 (переместился!)
# 75% ключей переместились — cache miss для большинстваimport hashlib
from bisect import bisect_right
from collections import defaultdict
class ConsistentHashRing:
"""
Consistent hashing ring.
При добавлении/удалении сервера перемещается только 1/N ключей.
"""
def __init__(self, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = {} # hash → server
self.sorted_keys = [] # отсортированные hash для бинарного поиска
self.server_keys = defaultdict(list) # server → [hashes]
def _hash(self, key):
"""Вычисляет hash ключа."""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_server(self, server):
"""Добавляет сервер на кольцо."""
for i in range(self.virtual_nodes):
virtual_key = f"{server}:{i}"
hash_value = self._hash(virtual_key)
self.ring[hash_value] = server
self.server_keys[server].append(hash_value)
self.sorted_keys.append(hash_value)
self.sorted_keys.sort()
def remove_server(self, server):
"""Удаляет сервер с кольца."""
for hash_value in self.server_keys[server]:
del self.ring[hash_value]
self.sorted_keys.remove(hash_value)
del self.server_keys[server]
def get_server(self, key):
"""Находит сервер для ключа."""
if not self.ring:
return None
hash_value = self._hash(key)
# Бинарный поиск позиции на кольце
pos = bisect_right(self.sorted_keys, hash_value)
# Если вышли за конец, берём первый (кольцо)
if pos == len(self.sorted_keys):
pos = 0
return self.ring[self.sorted_keys[pos]]
# Использование
ring = ConsistentHashRing(virtual_nodes=150)
# Добавляем серверы
ring.add_server('cache-1')
ring.add_server('cache-2')
ring.add_server('cache-3')
# Получаем сервер для ключа
server = ring.get_server('user:123') # cache-2
server = ring.get_server('user:456') # cache-1
# Добавляем сервер
ring.add_server('cache-4')
# Только ~25% ключей переместились (1/N)
# Остальные 75% остались на тех же серверахПлюсы:
Минусы:
class WeightedConsistentHashRing(ConsistentHashRing):
"""Consistent hashing с весами серверов."""
def add_server(self, server, weight=1.0):
"""Добавляет сервер с весом (количество virtual nodes пропорционально весу)."""
num_virtual = int(self.virtual_nodes * weight)
for i in range(num_virtual):
virtual_key = f"{server}:{i}"
hash_value = self._hash(virtual_key)
self.ring[hash_value] = server
self.server_keys[server].append(hash_value)
self.sorted_keys.append(hash_value)
self.sorted_keys.sort()
# Использование
ring = WeightedConsistentHashRing(virtual_nodes=100)
# Мощные серверы получают больше virtual nodes
ring.add_server('powerful-server', weight=2.0) # 200 virtual nodes
ring.add_server('normal-server', weight=1.0) # 100 virtual nodes
ring.add_server('weak-server', weight=0.5) # 50 virtual nodesapiVersion: v1
kind: Service
metadata:
name: my-app
spec:
selector:
app: my-app
ports:
- port: 80
targetPort: 8080
type: ClusterIPРежимы kube-proxy:
# Конфигурация IPVS
apiVersion: kubeproxy.config.k8s.io/v1alpha1
kind: KubeProxyConfiguration
mode: "ipvs"
ipvs:
scheduler: "lc" # least-connection# NGINX Ingress с аннотациями
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: my-app
annotations:
nginx.ingress.kubernetes.io/load-balance: "least_conn"
nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"
spec:
rules:
- host: api.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: my-app
port:
number: 80# Envoy конфигурация с latency-based балансировкой
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: my-app
spec:
host: my-app.default.svc.cluster.local
trafficPolicy:
loadBalancer:
leastRequest:
choiceCount: 2 # Power of Two Choices
# Outlier detection — исключает медленные поды
outlierDetection:
consecutive5xxErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
# Исключаем поды с высокой latency
consecutiveGatewayErrors: 5
interval: 10s# Для внешних API с переменной latency
balancer = LatencyAwareBalancer(api_endpoints, window_size=100)
# Записываем latency после каждого запроса
def call_external_api():
server = balancer.get_server()
start = time.time()
response = requests.get(server, timeout=5)
latency = (time.time() - start) * 1000
balancer.record_latency(server, latency)
return response# Для Redis/Memcached кластеров
ring = ConsistentHashRing(virtual_nodes=150)
for node in redis_nodes:
ring.add_server(node)
def get_cached(key):
server = ring.get_server(key)
return redis_clients[server].get(key)# Простая и эффективная балансировка для HTTP сервисов
balancer = PowerOfTwoChoicesBalancer(servers)from prometheus_client import Histogram
REQUEST_LATENCY = Histogram(
'request_latency_seconds',
'Request latency',
['server', 'status']
)
REQUESTS_TOTAL = Counter(
'requests_total',
'Total requests',
['server', 'status']
)
# Алерт: неравномерное распределение
# expr: stddev(requests_total) / avg(requests_total) > 0.5В следующей теме рассмотрим оптимизацию работы с базами данных для предсказуемой производительности.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.