Асинхронные запросы к внешним API: сессии, пул соединений, таймауты, retry с exponential backoff.
Научитесь выполнять асинхронные HTTP-запросы к внешним API, управлять сессиями и обрабатывать ошибки
ClientSession — основной класс aiohttp для исходящих HTTP-запросов.
import aiohttp
import asyncio
async def fetch():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
print(f"Status: {response.status}")
data = await response.json()
return data
asyncio.run(fetch())Всегда используйте async with для ClientSession:
ClientSession поддерживает все стандартные методы:
async with aiohttp.ClientSession() as session:
# GET
async with session.get(url) as resp:
data = await resp.json()
# POST
async with session.post(url, json={'key': 'value'}) as resp:
result = await resp.json()
# PUT
async with session.put(url, data={'field': 'new'}) as resp:
...
# PATCH
async with session.patch(url, json={'status': 'active'}) as resp:
...
# DELETE
async with session.delete(url) as resp:
...async with session.post(
'https://api.example.com/users',
json={'name': 'John', 'email': 'john@example.com'}
) as resp:
# aiohttp автоматически установит Content-Type: application/json
result = await resp.json()# URL-encoded form data
async with session.post(
'https://api.example.com/login',
data={'username': 'john', 'password': 'secret'}
) as resp:
# Content-Type: application/x-www-form-urlencoded
token = await resp.json()from aiohttp import FormData
form = FormData()
form.add_field('name', 'John')
form.add_field(
'file',
open('document.pdf', 'rb'),
filename='document.pdf',
content_type='application/pdf'
)
async with session.post(
'https://api.example.com/upload',
data=form
) as resp:
result = await resp.json()async with session.get(url) as resp:
print(resp.status) # 200
print(resp.reason) # 'OK'
print(resp.headers) # Headers
print(resp.headers.get('Content-Type'))
print(resp.content_type) # 'application/json'async with session.get(url) as resp:
# JSON
data = await resp.json()
# Текст
text = await resp.text()
# Байты
content = await resp.read()
# Stream (для больших данных)
async for chunk in resp.content.iter_chunked(1024):
process(chunk)# Query параметры
params = {'page': 1, 'limit': 50, 'sort': 'created_at'}
async with session.get(url, params=params) as resp:
...
# URL: https://api.example.com/items?page=1&limit=50&sort=created_at
# Заголовки
headers = {
'Authorization': 'Bearer token123',
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json'
}
async with session.get(url, headers=headers) as resp:
...import aiohttp
# Глобальный таймаут
timeout = aiohttp.ClientTimeout(total=30) # 30 секунд
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
...timeout = aiohttp.ClientTimeout(
total=30, # Общее время запроса
connect=10, # Время подключения
sock_read=15, # Время чтения из сокета
sock_connect=5 # Время подключения к сокету
)
async with aiohttp.ClientSession(timeout=timeout) as session:
...from aiohttp import ClientResponseError
async def fetch(url):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url) as resp:
resp.raise_for_status() # Выбросит ошибку для 4xx/5xx
return await resp.json()
except ClientResponseError as e:
print(f"HTTP error: {e.status} - {e.message}")
raisefrom aiohttp import ClientError, ClientConnectorError, asyncio
async def fetch_with_retry(url, max_retries=3):
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
except ClientConnectorError as e:
# Ошибка подключения
print(f"Connection error: {e}")
except asyncio.TimeoutError:
# Таймаут
print(f"Timeout on attempt {attempt + 1}")
except ClientError as e:
# Другие ошибки клиента
print(f"Client error: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
raise Exception(f"Failed after {max_retries} attempts")ClientSession автоматически управляет пулом соединений:
# Настройка пула
connector = aiohttp.TCPConnector(
limit=100, # Максимум соединений
limit_per_host=5, # Максимум на один хост
ttl_dns_cache=300,# TTL DNS-кэша
use_dns_cache=True
)
async with aiohttp.ClientSession(connector=connector) as session:
...# Сохранение соединений
connector = aiohttp.TCPConnector(
keepalive_timeout=30, # Держать соединение 30 секунд
enable_cleanup_closed=True # Закрывать неиспользуемые
)import asyncio
from aiohttp import ClientError
from http import HTTPStatus
async def fetch_with_retry(
session,
url,
max_retries=3,
backoff_factor=1.0,
retry_statuses=(500, 502, 503, 504)
):
"""Запрос с автоматическими повторными попытками"""
for attempt in range(max_retries):
try:
async with session.get(url) as resp:
# Успех
if resp.status < 400:
return resp
# Retry для определённых статусов
if resp.status in retry_statuses and attempt < max_retries - 1:
wait = backoff_factor * (2 ** attempt)
await asyncio.sleep(wait)
continue
# Другие ошибки
resp.raise_for_status()
except ClientError as e:
if attempt == max_retries - 1:
raise
wait = backoff_factor * (2 ** attempt)
await asyncio.sleep(wait)
raise Exception(f"Failed after {max_retries} attempts")
# Использование
async with aiohttp.ClientSession() as session:
resp = await fetch_with_retry(session, 'https://api.example.com/data')
data = await resp.json()async def fetch_all(urls):
"""Параллельная загрузка нескольких URL"""
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for url, resp in zip(urls, responses):
if isinstance(resp, Exception):
print(f"Error fetching {url}: {resp}")
results.append(None)
else:
async with resp:
results.append(await resp.json())
return results
# Использование
urls = [
'https://api.example.com/users/1',
'https://api.example.com/users/2',
'https://api.example.com/users/3'
]
users = await fetch_all(urls)Пример использования в Task Manager для отправки уведомлений:
# app/services/notifications.py
import aiohttp
from aiohttp import ClientError
import logging
logger = logging.getLogger(__name__)
class NotificationService:
def __init__(self, session: aiohttp.ClientSession, webhook_url: str):
self.session = session
self.webhook_url = webhook_url
async def send_task_notification(self, user_id: int, task_title: str):
"""Отправить уведомление о задаче через webhook"""
payload = {
'user_id': user_id,
'message': f'Task updated: {task_title}',
'type': 'task_notification'
}
try:
async with self.session.post(
self.webhook_url,
json=payload,
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status >= 400:
logger.error(f"Webhook error: {resp.status}")
return resp.status == 200
except ClientError as e:
logger.exception(f"Failed to send notification: {e}")
return False
# app/main.py
from aiohttp import ClientSession, ClientTimeout
from .services.notifications import NotificationService
async def init_services(app):
"""Инициализация внешних сервисов"""
timeout = ClientTimeout(total=30)
connector = aiohttp.TCPConnector(limit=50)
app['http_session'] = ClientSession(
timeout=timeout,
connector=connector
)
app['notification_service'] = NotificationService(
app['http_session'],
app['config']['WEBHOOK_URL']
)
async def close_services(app):
"""Закрытие соединений"""
await app['http_session'].close()
def create_app():
app = web.Application()
app.on_startup.append(init_services)
app.on_cleanup.append(close_services)
return appУбедитесь, что вы понимаете:
async with?Переходите к вопросам для закрепления.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.