async/await, async генераторы, asyncio интеграция
Асинхронность — не многопоточность. Это способ выполнять тысячи I/O-операций в одном потоке, переключаясь между ожиданиями.
| Подход | Параллелизм | Годится для |
|---|---|---|
threading | Потоки ОС | I/O-bound (GIL освобождается) |
multiprocessing | Процессы ОС | CPU-bound |
asyncio | Один поток, event loop | Много I/O-ожиданий (10k+ соединений) |
Синхронно: A──────────────────── B──────────────────── C────────────
Асинхронно: A────await──────A B────await──────B C────await──────C
↑ переключение ↑ переключение
Каждый await — это возможность event loop запустить другую корутину пока первая ждёт ответа.
import asyncio
async def greet(name: str) -> str:
await asyncio.sleep(1) # I/O-ожидание; event loop переключается
return f"Hello, {name}!"
# Вызов async def создаёт объект coroutine — код ещё не выполнялся
coro = greet("Alice") # <coroutine object greet at 0x...>
result = asyncio.run(coro) # запускаем и ждёмasync def main():
# create_task() планирует корутину для выполнения в event loop
task = asyncio.create_task(greet("Bob"))
# task начинает выполняться немедленно в фоне
await asyncio.sleep(0) # даём event loop шанс переключиться
result = await task # дожидаемся результата
print(result)# Future — низкоуровневый объект; обычно не создаётся напрямую
# Task наследует от Future
future = asyncio.get_event_loop().create_future()
future.set_result(42)
await future # → 42| Тип | Что это | Когда нужен |
|---|---|---|
coroutine | Функция async def, не запущенная | Определение логики |
Task | Запущенная корутина | Параллельное выполнение |
Future | Placeholder для результата | Интеграция с callback-кодом |
asyncio.run() — точка входаimport asyncio
async def main():
print("Start")
await asyncio.sleep(1)
print("End")
# Python 3.7+: запускает новый event loop
asyncio.run(main())
# ❌ Нельзя вызывать asyncio.run() внутри уже запущенного event loop
# (например, в Jupyter — там уже есть event loop, используйте await)asyncio.gather() — запуск нескольких корутинimport asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
# Все запросы запускаются параллельно
results = await asyncio.gather(
*[fetch(session, url) for url in urls]
)
return results
# 1000 URL запрашиваются практически одновременно!
urls = [f"https://example.com/page/{i}" for i in range(1000)]
asyncio.run(fetch_all(urls))Обработка исключений в gather():
# По умолчанию: если одна задача завершается с ошибкой — gather() поднимает исключение
results = await asyncio.gather(
fetch(url1), fetch(url2), fetch(url3),
return_exceptions=True # ошибки как результаты, не исключения
)
for r in results:
if isinstance(r, Exception):
print(f"Ошибка: {r}")asyncio.wait() — гибкое ожиданиеtasks = [asyncio.create_task(fetch(url)) for url in urls]
# Ждём первого завершившегося
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Ждём всех
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
# Отмена оставшихся
for task in pending:
task.cancel()asyncio.wait_for() — таймаутtry:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Превышено время ожидания!")asyncio.TaskGroup (Python 3.11+) — structured concurrencyasync def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch(url1))
task2 = tg.create_task(fetch(url2))
task3 = tg.create_task(fetch(url3))
# Здесь все задачи завершены
# Если любая задача завершилась с ошибкой — все остальные отменяются
print(task1.result(), task2.result(), task3.result())async def long_task():
try:
await asyncio.sleep(100)
except asyncio.CancelledError:
print("Задача отменена — выполняю cleanup")
await cleanup()
raise # важно: перебросить CancelledError
task = asyncio.create_task(long_task())
await asyncio.sleep(1)
task.cancel() # запрашивает отмену
try:
await task
except asyncio.CancelledError:
print("Задача была отменена")
# asyncio.shield() — защита от отмены
try:
result = await asyncio.wait_for(asyncio.shield(critical_op()), timeout=5)
except asyncio.TimeoutError:
pass # timeout, но critical_op() продолжает выполняться в фонеlock = asyncio.Lock()
async def safe_operation():
async with lock: # ждёт пока блокировка свободна
await modify_shared_state()# Максимум 10 одновременных запросов
semaphore = asyncio.Semaphore(10)
async def rate_limited_fetch(url):
async with semaphore:
async with aiohttp.ClientSession() as session:
return await fetch(session, url)
# 1000 URL, но максимум 10 одновременно
results = await asyncio.gather(*[rate_limited_fetch(url) for url in urls])ready = asyncio.Event()
async def producer():
await prepare_data()
ready.set() # сигнал
async def consumer():
await ready.wait() # ждёт сигнала
process_data()
await asyncio.gather(producer(), consumer())async def producer(queue: asyncio.Queue):
for i in range(10):
await queue.put(i)
await queue.put(None) # sentinel
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
await process(item)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(producer(queue), consumer(queue))asyncio.to_thread() (Python 3.9+)import asyncio
def blocking_io(filename):
with open(filename) as f:
return f.read()
async def main():
# Запускает синхронную функцию в thread pool
content = await asyncio.to_thread(blocking_io, "large_file.txt")
return contentloop.run_in_executor()import concurrent.futures
async def main():
loop = asyncio.get_running_loop()
# ThreadPoolExecutor — для I/O-bound синхронного кода
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_function, arg)
# ProcessPoolExecutor — для CPU-bound кода
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive, data)# Async generator
async def fetch_pages(base_url, pages):
async with aiohttp.ClientSession() as session:
for page in range(pages):
url = f"{base_url}?page={page}"
async with session.get(url) as resp:
yield await resp.json()
# Использование через async for
async def process_pages():
async for page_data in fetch_pages("https://api.example.com/items", 100):
await store(page_data)
# Async iterator через класс
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.currentasyncio.timeout() (Python 3.11+)async def main():
try:
async with asyncio.timeout(10):
await long_running_task()
except TimeoutError:
print("Задача не завершилась за 10 секунд")
# Можно обновить дедлайн:
async with asyncio.timeout(10) as deadline:
await first_part()
# Увеличиваем таймаут если первая часть быстрая
deadline.reschedule(asyncio.get_event_loop().time() + 5)
await second_part()# Включение debug mode — более детальные ошибки
asyncio.run(main(), debug=True)
# или
import asyncio
asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.get_event_loop().set_debug(True)
# asyncio.sleep(0) — передать управление event loop
async def cooperative_task():
for i in range(1_000_000):
if i % 1000 == 0:
await asyncio.sleep(0) # дать шанс другим задачам
process(i)
# Получить текущий event loop
loop = asyncio.get_event_loop() # deprecated в 3.10+
loop = asyncio.get_running_loop() # рекомендуется внутри корутины# ❌ Забытый await — coroutine создана но не выполнена
async def main():
result = fetch_data() # coroutine object! await забыт
# Python выдаст RuntimeWarning: coroutine was never awaited
# ✅ Всегда await coroutine
result = await fetch_data()
# ❌ Блокирующий код в корутине — замораживает весь event loop
async def bad():
time.sleep(5) # блокирует ВСЕ корутины!
requests.get(url) # тоже блокирующий!
# ✅ Используйте async-версии
async def good():
await asyncio.sleep(5)
async with aiohttp.ClientSession() as session:
await session.get(url)
# ❌ asyncio.run() внутри корутины
async def nested():
asyncio.run(other_coro()) # RuntimeError!
# ✅ Просто await
async def nested():
await other_coro()
# ❌ Смешение asyncio.run() и nest_asyncio без необходимости
# В Jupyter используйте: await coro() (Jupyter имеет свой event loop)import asyncio
import aiohttp
import time
# Async: 1000 запросов в одном потоке
async def async_test():
urls = [f"https://httpbin.org/delay/0.1"] * 1000
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
await asyncio.gather(*tasks)
# Результат: ~2-3 секунды (параллельно через event loop)
# Threading: 1000 запросов в пуле потоков
from concurrent.futures import ThreadPoolExecutor
import requests
def thread_test():
urls = [f"https://httpbin.org/delay/0.1"] * 1000
with ThreadPoolExecutor(max_workers=50) as pool:
list(pool.map(requests.get, urls))
# Результат: ~3-5 секунды + overhead на создание потоков
# Вывод: asyncio лучше масштабируется при большом числе соединенийimport asyncio
import random
from functools import wraps
def async_retry(max_attempts=3, delay=1.0, backoff=2.0,
exceptions=(Exception,)):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except exceptions as e:
if attempt + 1 >= max_attempts:
raise
jitter = random.uniform(0, current_delay * 0.1)
await asyncio.sleep(current_delay + jitter)
current_delay *= backoff
return wrapper
return decorator
@async_retry(max_attempts=3, delay=0.5, exceptions=(aiohttp.ClientError,))
async def fetch_with_retry(session, url):
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json()import asyncio
import time
class AsyncRateLimiter:
"""Ограничивает до N запросов в секунду с burst."""
def __init__(self, rate: float, burst: int = 1):
self._rate = rate # запросов в секунду
self._burst = burst # максимальный burst
self._tokens = burst
self._last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self._last_update
self._tokens = min(
self._burst,
self._tokens + elapsed * self._rate
)
self._last_update = now
if self._tokens < 1:
sleep_time = (1 - self._tokens) / self._rate
await asyncio.sleep(sleep_time)
self._tokens = 0
else:
self._tokens -= 1
limiter = AsyncRateLimiter(rate=10, burst=5) # 10 req/s, burst до 5
async def limited_request(url):
await limiter.acquire()
return await fetch(url)import asyncio
import threading
# Вариант 1: asyncio.run() — блокирует текущий поток
result = asyncio.run(async_function())
# Вариант 2: run_coroutine_threadsafe — для уже запущенного loop
def sync_caller(loop, coro):
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result(timeout=10) # блокирует до результата
# Вариант 3: nest_asyncio для Jupyter/REPL
import nest_asyncio
nest_asyncio.apply()
result = asyncio.run(async_function()) # теперь работает в Jupyterimport pytest
import asyncio
# pytest-asyncio — декоратор для async тестов
@pytest.mark.asyncio
async def test_fetch():
result = await fetch_user(user_id=1)
assert result["id"] == 1
# Или с pytest.ini: asyncio_mode = auto
# Мокирование asyncio функций
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
with patch("mymodule.fetch_user", new_callable=AsyncMock) as mock:
mock.return_value = {"id": 1, "name": "Alice"}
result = await process_user(1)
mock.assert_awaited_once_with(user_id=1)
# Тестирование таймаутов
@pytest.mark.asyncio
async def test_timeout():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(asyncio.sleep(10), timeout=0.1)async def download_all(urls) загружающий все URL параллельно с ограничением 10 одновременных запросов.asyncio.Queue с несколькими потребителями.async def retry(coro_factory, max_attempts=3) — повторяет корутину при ошибке.asyncio.sleep(0) полезно в длинных синхронных циклах.asyncio.Semaphore ограничивающий N запросов в секунду.AsyncCircuitBreaker — после N ошибок подряд блокирует запросы на T секунд.asyncio.CancelledError.Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.