asyncio, speculative execution, hedged requests, parallel execution
«Не жди — делай другое». Асинхронность скрывает задержки I/O, параллелизм уменьшает общее время выполнения.
# ❌ Синхронный: каждое ожидание блокирует поток
import requests
import time
start = time.time()
# Запрос 1: ждём 100 мс
response1 = requests.get('http://service1/api', timeout=5)
data1 = response1.json()
# Запрос 2: ждём 150 мс
response2 = requests.get('http://service2/api', timeout=5)
data2 = response2.json()
# Запрос 3: ждём 200 мс
response3 = requests.get('http://service3/api', timeout=5)
data3 = response3.json()
total = time.time() - start
print(f"Общее время: {total * 1000:.0f} мс") # ~450 мс (сумма задержек)Проблема: задержки суммируются. Поток простаивает во время ожидания I/O.
# ✅ Асинхронный: запросы выполняются параллельно
import aiohttp
import asyncio
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_all():
start = time.time()
async with aiohttp.ClientSession() as session:
# Запускаем все запросы параллельно
task1 = asyncio.create_task(fetch(session, 'http://service1/api'))
task2 = asyncio.create_task(fetch(session, 'http://service2/api'))
task3 = asyncio.create_task(fetch(session, 'http://service3/api'))
# Ждём все сразу
data1, data2, data3 = await asyncio.gather(task1, task2, task3)
total = time.time() - start
print(f"Общее время: {total * 1000:.0f} мс") # ~200 мс (максимум задержек)
return data1, data2, data3
# asyncio.run(fetch_all())Преимущество: задержки перекрываются. Общее время = максимум задержек, а не сумма.
import asyncio
async def main():
# Event Loop управляет выполнением корутин
# Переключается между задачами при ожидании I/O
await task1() # Задача 1 начинает выполнение
# При await event loop переключается на другую задачу
await task2() # Задача 2 выполняется
await task3() # Задача 3 выполняется
# asyncio.run(main())import asyncio
import aiohttp
async def fetch_user(session, user_id):
"""Получение данных пользователя."""
async with session.get(f'http://api/users/{user_id}') as response:
return await response.json()
async def fetch_user_orders(session, user_id):
"""Получение заказов пользователя."""
async with session.get(f'http://api/users/{user_id}/orders') as response:
return await response.json()
async def get_user_profile(user_id):
"""Собирает полный профиль пользователя."""
async with aiohttp.ClientSession() as session:
# Последовательно (если данные зависят друг от друга)
user_data = await fetch_user(session, user_id)
orders = await fetch_user_orders(session, user_id)
return {
'user': user_data,
'orders': orders
}
async def get_user_profile_parallel(user_ids):
"""Параллельное получение профилей нескольких пользователей."""
async with aiohttp.ClientSession() as session:
# Параллельно для всех user_id
tasks = [get_user_profile(uid) for uid in user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Обрабатываем ошибки
profiles = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Failed to fetch user {user_ids[i]}: {result}")
profiles.append(None)
else:
profiles.append(result)
return profilesasync def fetch_with_concurrency_limit(items, fetch_func, max_concurrent=10):
"""
Выполняет операции с ограничением конкурентности.
Без ограничения: 1000 запросов одновременно → перегрузка сервиса
С ограничением: максимум 10 одновременных запросов
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_fetch(item):
async with semaphore:
return await fetch_func(item)
tasks = [bounded_fetch(item) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
# Использование
async def main():
user_ids = range(1000)
# Максимум 20 одновременных запросов
results = await fetch_with_concurrency_limit(
user_ids,
lambda uid: fetch_user_profile(uid),
max_concurrent=20
)Паттерн для снижения tail latency.
Время 0мс: Запрос А → Реплика 1
Запрос А → Реплика 2 (через 50 мс)
Время 80мс: Реплика 1 ответила → используем этот ответ
Реплика 2 отменяется
Результат: latency = 80 мс (вместо 200 мс от медленной реплики)
import asyncio
import time
from typing import Any, Callable, Coroutine
async def hedged_request(
primary_func: Callable[[], Coroutine],
hedge_func: Callable[[], Coroutine],
hedge_delay_ms: int = 50
) -> Any:
"""
Hedged request: отправляет второй запрос через hedge_delay_ms.
Args:
primary_func: Основная корутина
hedge_func: Дублирующая корутина (хедж)
hedge_delay_ms: Задержка перед отправкой хеджа
Returns:
Результат первого завершившегося запроса
"""
pending = set()
# Запускаем основной запрос
primary_task = asyncio.create_task(primary_func())
pending.add(primary_task)
start_time = time.time()
result = None
while pending:
# Ждём завершения любого запроса или hedge_delay
done, pending = await asyncio.wait(
pending,
timeout=hedge_delay_ms / 1000,
return_when=asyncio.FIRST_COMPLETED
)
if done:
# Первый запрос завершён
result = done.pop().result()
break
# Прошло hedge_delay_ms, запускаем хедж
if len(pending) < 2:
hedge_task = asyncio.create_task(hedge_func())
pending.add(hedge_task)
# Отменяем оставшиеся запросы
for task in pending:
task.cancel()
# Ждём отмены
if pending:
await asyncio.gather(*pending, return_exceptions=True)
return result
# Пример использования
async def fetch_from_replica(replica_id: int, delay_ms: int):
"""Симуляция запроса к реплике с переменной задержкой."""
await asyncio.sleep(delay_ms / 1000)
return f"Data from replica {replica_id}"
async def fetch_with_hedge():
"""Запрос с hedged request."""
import random
# Симулируем переменные задержки реплик
replica1_delay = random.randint(50, 300)
replica2_delay = random.randint(50, 300)
result = await hedged_request(
primary_func=lambda: fetch_from_replica(1, replica1_delay),
hedge_func=lambda: fetch_from_replica(2, replica2_delay),
hedge_delay_ms=50
)
return result
# Запуск
# result = asyncio.run(fetch_with_hedge())
# print(f"Result: {result}")| Сценарий | Подходит | Обоснование |
|---|---|---|
| Чтение из реплик БД | ✅ Да | Реплики имеют разную latency |
| Внешние API с несколькими endpoints | ✅ Да | Разная производительность endpoints |
| Write операции | ❌ Нет | Должна быть идемпотентность |
| Дорогие вычисления | ❌ Нет | Избыточность тратит ресурсы |
import statistics
async def benchmark_hedged_requests(n=1000):
"""Сравнивает latency с hedged requests и без."""
import random
def simulate_replica_latency():
"""Симулирует latency реплики с long tail."""
# 95% запросов: 10-100 мс
# 5% запросов: 200-500 мс (long tail)
if random.random() < 0.95:
return random.randint(10, 100)
else:
return random.randint(200, 500)
# Без hedged requests
latencies_no_hedge = []
for _ in range(n):
latency = simulate_replica_latency()
latencies_no_hedge.append(latency)
# С hedged requests (2 реплики)
latencies_with_hedge = []
for _ in range(n):
latency1 = simulate_replica_latency()
latency2 = simulate_replica_latency()
# Берём минимум (первый ответивший)
latencies_with_hedge.append(min(latency1, latency2))
def percentile(data, p):
sorted_data = sorted(data)
k = (len(sorted_data) - 1) * p / 100
f = int(k)
c = f + 1 if f + 1 < len(sorted_data) else f
return sorted_data[f] + (k - f) * (sorted_data[c] - sorted_data[f]) if f != c else sorted_data[f]
print("Без hedged requests:")
print(f" p50: {percentile(latencies_no_hedge, 50):.1f} мс")
print(f" p95: {percentile(latencies_no_hedge, 95):.1f} мс")
print(f" p99: {percentile(latencies_no_hedge, 99):.1f} мс")
print("\nС hedged requests:")
print(f" p50: {percentile(latencies_with_hedge, 50):.1f} мс")
print(f" p95: {percentile(latencies_with_hedge, 95):.1f} мс")
print(f" p99: {percentile(latencies_with_hedge, 99):.1f} мс")
# asyncio.run(benchmark_hedged_requests())Типичный результат:
Без hedged requests:
p50: 55.0 мс
p95: 98.0 мс
p99: 420.0 мс
С hedged requests:
p50: 35.0 мс (-36%)
p95: 75.0 мс (-23%)
p99: 180.0 мс (-57%)
Расширение hedged requests для задач.
import asyncio
from typing import Optional, List
class SpeculativeExecutor:
"""
Выполняет задачу спекулятивно на нескольких workers.
Если задача выполняется дольше p95, запускается копия.
"""
def __init__(self, speculative_delay_ms: int = 100):
self.speculative_delay_ms = speculative_delay_ms
async def execute_with_speculation(
self,
workers: List[Callable[[], Coroutine]],
) -> Any:
"""
Выполняет задачу на нескольких workers.
Args:
workers: Список корутин для выполнения
Returns:
Результат первого завершившегося worker
"""
if not workers:
raise ValueError("At least one worker required")
pending = set()
# Запускаем первый worker
pending.add(asyncio.create_task(workers[0]()))
start_time = time.time()
result = None
while pending:
done, pending = await asyncio.wait(
pending,
timeout=self.speculative_delay_ms / 1000,
return_when=asyncio.FIRST_COMPLETED
)
if done:
result = done.pop().result()
break
# Запускаем следующий worker если есть
if workers and len(pending) < len(workers):
pending.add(asyncio.create_task(workers[len(pending)]()))
# Отменяем остальные
for task in pending:
task.cancel()
if pending:
await asyncio.gather(*pending, return_exceptions=True)
return result
# Пример: поиск в нескольких индексах
async def search_index(index_name: str, query: str, delay_ms: int):
await asyncio.sleep(delay_ms / 1000)
return f"Results from {index_name}"
async def search_with_speculation(query: str):
executor = SpeculativeExecutor(speculative_delay_ms=50)
# Симулируем разную latency индексов
result = await executor.execute_with_speculation([
lambda: search_index('index1', query, 150),
lambda: search_index('index2', query, 80),
lambda: search_index('index3', query, 200),
])
return resultasync def aggregate_user_data(user_id: str):
"""
Агрегирует данные о пользователе из нескольких сервисов.
Все запросы выполняются параллельно.
"""
async with aiohttp.ClientSession() as session:
# Параллельные запросы к разным сервисам
user_task = asyncio.create_task(
session.get(f'http://user-service/users/{user_id}').then(lambda r: r.json())
)
orders_task = asyncio.create_task(
session.get(f'http://order-service/orders?user_id={user_id}').then(lambda r: r.json())
)
preferences_task = asyncio.create_task(
session.get(f'http://pref-service/prefs/{user_id}').then(lambda r: r.json())
)
recommendations_task = asyncio.create_task(
session.get(f'http://rec-service/recommendations/{user_id}').then(lambda r: r.json())
)
# Ждём все запросы
user_data, orders, preferences, recommendations = await asyncio.gather(
user_task,
orders_task,
preferences_task,
recommendations_task,
return_exceptions=True
)
# Обрабатываем ошибки gracefully
return {
'user': user_data if not isinstance(user_data, Exception) else None,
'orders': orders if not isinstance(orders, Exception) else [],
'preferences': preferences if not isinstance(preferences, Exception) else {},
'recommendations': recommendations if not isinstance(recommendations, Exception) else []
}async def aggregate_with_timeouts(user_id: str):
"""Агрегация с индивидуальными таймаутами для каждого сервиса."""
async with aiohttp.ClientSession() as session:
try:
user_data = await asyncio.wait_for(
fetch_user(session, user_id),
timeout=0.5 # 500 мс
)
except asyncio.TimeoutError:
user_data = None
logger.warning("User service timeout")
try:
orders = await asyncio.wait_for(
fetch_orders(session, user_id),
timeout=1.0 # 1 секунда
)
except asyncio.TimeoutError:
orders = []
logger.warning("Order service timeout")
return {'user': user_data, 'orders': orders}# ❌ Плохо: последовательное выполнение
result1 = await operation1()
result2 = await operation2()
result3 = await operation3()
# ✅ Хорошо: параллельное выполнение
results = await asyncio.gather(
operation1(),
operation2(),
operation3(),
return_exceptions=True
)# ❌ Плохо: неограниченная конкурентность
async def process_all(items):
tasks = [process(item) for item in items]
return await asyncio.gather(*tasks)
# ✅ Хорошо: с ограничением
async def process_all_limited(items):
semaphore = asyncio.Semaphore(100)
async def bounded_process(item):
async with semaphore:
return await process(item)
tasks = [bounded_process(item) for item in items]
return await asyncio.gather(*tasks)# С return_exceptions=True
results = await asyncio.gather(
task1(),
task2(),
task3(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {i} failed: {result}")
results[i] = None # Fallbacktry:
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0
)
except asyncio.TimeoutError:
logger.warning("Operation timed out")
result = fallback_value()asyncio.gather уменьшает общее время: сумма задержек → максимум задержекasyncio.wait_for защищают от зависших операцийreturn_exceptions=True позволяет graceful degradationВ следующей теме рассмотрим балансировку нагрузки с учётом latency.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.