Асинхронное программирование: event loop, корутины, async/await, Task. Архитектура aiohttp и первый сервер.
Асинхронное программирование — фундамент высокопроизтурбленных веб-сервисов. Разберёмся, как это работает под капотом.
Представьте, что ваш сервер обрабатывает 1000 одновременных запросов к базе данных. В синхронном подходе (Flask, Django) каждый запрос занимает отдельный поток операционной системы. Каждый поток — это ~8 МБ памяти на стек. Итого: 1000 × 8 МБ = 8 ГБ только на стеки, плюс накладные расходы на переключение контекста между потоками.
Асинхронный подход работает иначе: все 1000 запросов обрабатываются в одном потоке. Когда запрос жёт ответ от базы данных, event loop не блокируется — он переключается на обработку другого запроса. Результат: та же работа в десятки раз меньше памяти.
Ключевая идея: асинхронность не делает отдельные операции быстрее. Она позволяет не простаивать, пока ждёте I/O.
Event loop — это цикл, который управляет выполнением асинхронных задач. Его можно представить как диспетчера в ресторане: он не готовит блюда сам, но знает, какие повара свободны, и раздаёт заказы.
import asyncio
async def fetch_data(id: int) -> str:
print(f"Запрос {id}: начинаем загрузку")
await asyncio.sleep(1) # Имитация I/O-операции
print(f"Запрос {id}: загрузка завершена")
return f"data-{id}"
async def main():
# Запускаем три задачи параллельно
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
print(f"Все результаты: {results}")
asyncio.run(main())Вывод:
Запрос 1: начинаем загрузку
Запрос 2: начинаем загрузку
Запрос 3: начинаем загрузку
Запрос 1: загрузка завершена
Запрос 2: загрузка завершена
Запрос 3: загрузка завершена
Все результаты: ['data-1', 'data-2', 'data-3']
Все три операции начались одновременно и заняли ~1 секунду вместо 3 секунд при последовательном выполнении.
Когда Python встречает await asyncio.sleep(1), происходит следующее:
fetch_data приостанавливается и возвращает управление event loopЭто кооперативная многозадачность: корутина сама решает, когда отдать управление (через await). В отличие от потоков ОС, где переключение происходит принудительно.
Корутина — функция, объявленная через async def. Она не выполняется при вызове — вместо этого создаётся объект-корутина, который нужно запланировать.
async def get_user(user_id: int) -> dict:
# Эта функция НЕ выполнится, пока её не await-ить
return {"id": user_id, "name": "Alice"}
# Это НЕ вызовет функцию — только создаст объект корутины
coro = get_user(1) # <coroutine object get_user at 0x...>
# А вот так выполняется:
result = asyncio.run(get_user(1)) # {"id": 1, "name": "Alice"}Правила await:
await работает только внутри async defawait принимает корутины, Task-и и другие awaitable-объектыasync def, но внутри нет await — она всё равно корутина, но без приостановкиasyncio.create_task() планирует корутину для немедленного выполнения в event loop. Это как отправить письмо по электронной почте вместо того, чтобы ждать ответа на месте.
async def main():
# Создаём задачи — они начинают выполняться немедленно
task1 = asyncio.create_task(fetch_data(1))
task2 = asyncio.create_task(fetch_data(2))
# Делаем что-то ещё, пока задачи работают
print("Задачи запущены, делаем другую работу...")
# Ждём результаты
r1 = await task1
r2 = await task2
print(f"Результаты: {r1}, {r2}")Когда использовать Task:
gather() запускает несколько корутин конкурентно и ждёт все результаты:
async def main():
# Все три запроса начнутся одновременно
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
# Результаты в порядке передачи аргументов
print(results) # ['data-1', 'data-2', 'data-3']Типичная ошибка: передать в
gather()уже выполненные корутины. Каждая корутина может быть await-нута только один раз. Если вы вызвалиfetch_data(1)дважды — второй вызов создаст новую корутину.
aiohttp — асинхронный веб-фреймворк, который использует asyncio для обработки тысяч соединений в одном потоке. Он состоит из двух частей:
requests, но асинхронный)pip install aiohttpfrom aiohttp import web
async def hello(request: web.Request) -> web.Response:
return web.Response(text="Hello, World!")
app = web.Application()
app.router.add_get('/', hello)
if __name__ == '__main__':
web.run_app(app, host='localhost', port=8080)Что здесь происходит:
async def hello(request) — обработчик (handler). Это корутина, потому что aiohttp вызывает её через await. Даже если внутри нет await, функция должна быть async def, потому что aiohttp ожидает корутинуweb.Application() — центральный объект приложения. Хранит роуты, middleware, общее состояниеapp.router.add_get('/', hello) — регистрируем обработчик для GET-запросов на путь /web.run_app() — запускает event loop и HTTP-серверКогда клиент отправляет GET-запрос на http://localhost:8080/:
await handler(request) — передаёт управление вашей функцииweb.Response и отправляет HTTP-ответ клиентуВесь этот процесс не блокирует event loop: пока ваш handler ждёт базу данных, aiohttp обрабатывает другие запросы.
┌─────────────────────────────────────────────┐
│ web.Application │
│ │
│ ┌───────────┐ ┌────────────────┐ │
│ │ Router │ ──────▶ │ Handler │ │
│ │ (URL+Method│ │ (ваш async │ │
│ │ → handler)│ │ код) │ │
│ └───────────┘ └────────────────┘ │
│ ▲ │ │
│ │ ▼ │
│ ┌───────────┐ ┌────────────────┐ │
│ │ Middleware │ ◀────── │ Response │ │
│ │ (цепочка │ │ (текст, JSON, │ │
│ │ до/после) │ │ файл) │ │
│ └───────────┘ └────────────────┘ │
│ │
│ ┌───────────────────────────────────┐ │
│ │ Signals │ │
│ │ on_startup / on_cleanup / ... │ │
│ └───────────────────────────────────┘ │
└─────────────────────────────────────────────┘
Компоненты:
| Компонент | Назначение |
|---|---|
Application | Контейнер: роуты, middleware, состояние |
Router | Сопоставляет URL + HTTP-метод с handler-функцией |
Handler | Ваша async-функция: принимает Request, возвращает Response |
Middleware | Промежуточная функция: оборачивает handler (логирование, auth, CORS) |
Signals | Хуки жизненного цикла: on_startup, on_cleanup, on_shutdown |
В реальном приложении нужно инициализировать ресурсы при старте и очистить при остановке. Для этого aiohttp предоставляет сигналы:
from aiohttp import web
import asyncpg
async def init_db(app: web.Application) -> None:
"""Вызывается ОДИН раз при старте — создаём пул соединений"""
app['db'] = await asyncpg.create_pool(
'postgresql://user:pass@localhost/task_manager',
min_size=5,
max_size=20
)
print("База данных подключена")
async def close_db(app: web.Application) -> None:
"""Вызывается ОДИН раз при остановке — закрываем пул"""
await app['db'].close()
print("База данных отключена")
def create_app() -> web.Application:
app = web.Application()
# Регистрируем хуки жизненного цикла
app.on_startup.append(init_db)
app.on_cleanup.append(close_db)
# Регистрируем роуты
app.router.add_get('/tasks', list_tasks)
return app
if __name__ == '__main__':
web.run_app(create_app())Порядок при старте:
web.run_app() создаёт event loopon_startup хуки (в порядке добавления)Порядок при останове (Ctrl+C):
on_cleanup хуки (в порядке, обратном добавлению)Типичная ошибка: создавать ресурсы (пул БД, Redis) в глобальной области видимости. Это значит, что они создадутся до запуска event loop, и асинхронные операции не будут работать. Всегда создавайте ресурсы в
on_startup.
Объект app поддерживает доступ по ключу как словарь. Это стандартный способ передавать ресурсы между частями приложения:
# При старте — сохраняем в app
app['db'] = await asyncpg.create_pool(DSN)
app['redis'] = await aioredis.create_pool(REDIS_URL)
app['config'] = {'debug': True}
# В обработчике — получаем из app
async def handler(request: web.Request) -> web.Response:
db_pool = request.app['db']
redis = request.app['redis']
# ...Под капотом:
app['key']— это просто dict-подобный доступ. aiohttp не проверяет типы ключей, но рекомендуется использовать строки для единообразия.
await# Неправильно — корутина НЕ выполнится
async def handler(request):
data = fetch_data(1) # <coroutine object> — не вызов!
return web.json_response(data)
# Правильно — await выполняет корутину
async def handler(request):
data = await fetch_data(1) # {'id': 1, ...}
return web.json_response(data)# Неправильно — time.sleep() блокирует ВСЕ задачи
async def handler(request):
import time
time.sleep(5) # Блокировка event loop на 5 секунд!
return web.Response(text="Done")
# Правильно — asyncio.sleep() отдаёт управление
async def handler(request):
await asyncio.sleep(5) # Event loop свободен для других задач
return web.Response(text="Done")Любой синхронный блокирующий вызов (time.sleep, requests.get, open().read()) остановит весь event loop. Для таких операций используйте asyncio.to_thread() или асинхронные аналоги библиотек.
# Неправильно — сессия создана до event loop
session = aiohttp.ClientSession()
# Правильно — сессия создаётся в on_startup
async def init_session(app):
app['session'] = aiohttp.ClientSession()
async def close_session(app):
await app['session'].close()Эта тема — фундамент всего курса:
Перед переходом к вопросам убедитесь, что можете ответить:
async def, даже если внутри нет await?on_startup и on_cleanup?on_startup в handler?Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.