asyncio, конкурентность, типичные ошибки и антипаттерны
Asyncio и асинхронное программирование — ключ к высокой производительности. В этой теме вы изучите правильные паттерны, типичные ошибки и способы их избежать.
import time
def fetch_data():
time.sleep(2) # Блокирует поток на 2 секунды
return {'data': 'result'}
def main():
result1 = fetch_data() # 2 секунды
result2 = fetch_data() # 2 секунды
result3 = fetch_data() # 2 секунды
# Итого: 6 секунд
main()import asyncio
async def fetch_data():
await asyncio.sleep(2) # Не блокирует, отдаёт управление
return {'data': 'result'}
async def main():
# Параллельное выполнение
results = await asyncio.gather(
fetch_data(),
fetch_data(),
fetch_data()
)
# Итого: ~2 секунды
asyncio.run(main())@app.get('/sync')
def sync_endpoint():
time.sleep(2) # Блокирует поток!
return {'status': 'done'}FastAPI запускает sync функции в thread pool, но это не идеально для высокой нагрузки.
@app.get('/async')
async def async_endpoint():
await asyncio.sleep(2) # Не блокирует
return {'status': 'done'}Event Loop — ядро asyncio, планировщик корутин.
import asyncio
async def task1():
print("Task 1 start")
await asyncio.sleep(1)
print("Task 1 end")
async def task2():
print("Task 2 start")
await asyncio.sleep(1)
print("Task 2 end")
async def main():
# Последовательно
await task1() # 1 секунда
await task2() # 1 секунда
# Итого: 2 секунды
asyncio.run(main())async def main():
# Параллельно
await asyncio.gather(task1(), task2())
# Итого: 1 секунда
asyncio.run(main())Параллельный запуск нескольких корутин.
import asyncio
async def fetch_user(user_id: int):
await asyncio.sleep(0.5)
return {'id': user_id, 'name': f'User {user_id}'}
async def main():
# Последовательно (3 секунды)
users = []
for i in range(3):
user = await fetch_user(i)
users.append(user)
# Параллельно (0.5 секунды)
users = await asyncio.gather(*[fetch_user(i) for i in range(3)])
# С обработкой ошибок
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
return_exceptions=True # Не прерывать при ошибке
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"User {i} failed: {result}")
else:
print(f"User {i}: {result}")
asyncio.run(main())Таймаут для корутин.
async def slow_operation():
await asyncio.sleep(10)
return 'done'
async def main():
try:
# Таймаут 3 секунды
result = await asyncio.wait_for(
slow_operation(),
timeout=3.0
)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())Фоновое выполнение задачи.
async def background_task():
await asyncio.sleep(5)
print("Background task completed")
@app.post('/start')
async def start_background():
# Запуск без ожидания
task = asyncio.create_task(background_task())
return {'status': 'task started', 'task_id': id(task)}Синхронизация доступа к ресурсу.
import asyncio
lock = asyncio.Lock()
shared_resource = 0
async def increment():
global shared_resource
async with lock: # Только одна корутина одновременно
current = shared_resource
await asyncio.sleep(0.1)
shared_resource = current + 1
async def main():
await asyncio.gather(*[increment() for _ in range(10)])
print(f"Result: {shared_resource}") # 10 (без гонки)
asyncio.run(main())Ограничение конкурентности.
import asyncio
# Максимум 5 одновременных подключений
semaphore = asyncio.Semaphore(5)
async def fetch_with_limit(url: str):
async with semaphore:
await asyncio.sleep(1) # Имитация запроса
return f"Data from {url}"
async def main():
# 10 запросов, но только 5 одновременно
urls = [f"http://example.com/{i}" for i in range(10)]
results = await asyncio.gather(*[fetch_with_limit(url) for url in urls])
return results
asyncio.run(main())Очередь для producer-consumer.
import asyncio
queue = asyncio.Queue()
async def producer():
for i in range(5):
await queue.put(i)
print(f"Produced {i}")
await asyncio.sleep(0.5)
# Сигнал окончания
await queue.put(None)
async def consumer():
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
await asyncio.gather(producer(), consumer())
asyncio.run(main())@app.get('/bad')
async def bad_endpoint():
time.sleep(2) # Блокирует весь event loop!
return {'status': 'done'}Проблема: Пока один запрос спит, все остальные ждут.
Решение:
@app.get('/good')
async def good_endpoint():
await asyncio.sleep(2)
return {'status': 'done'}async def get_db():
await asyncio.sleep(1)
return db
@app.get('/items')
def get_items(db = Depends(get_db)): # Ошибка!
return dbПроблема: Sync функция не может await async зависимость.
Решение:
@app.get('/items')
async def get_items(db = Depends(get_db)):
return db@app.get('/items')
async def get_items():
result = asyncio.sleep(1) # Забыли await!
return {'result': result} # Вернёт coroutine objectРешение:
@app.get('/items')
async def get_items():
result = await asyncio.sleep(1)
return {'result': 'done'}counter = 0
@app.get('/increment')
async def increment():
global counter
current = counter
await asyncio.sleep(0.1)
counter = current + 1 # Гонка!
return counterРешение:
lock = asyncio.Lock()
counter = 0
@app.get('/increment')
async def increment():
global counter
async with lock:
current = counter
await asyncio.sleep(0.1)
counter = current + 1
return counterasync def main():
# Если одна задача упадёт, все остальные будут отменены
results = await asyncio.gather(
task1(),
task2(), # Может упасть
task3()
)Решение:
async def main():
results = await asyncio.gather(
task1(),
task2(),
task3(),
return_exceptions=True # Ошибки как результаты
)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {i} failed: {result}")@app.get('/users')
def get_users():
users = db.query(User).all() # Блокирует поток
return usersfrom sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
@app.get('/users')
async def get_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
users = result.scalars().all()
return usersimport requests
@app.get('/external')
def get_external():
response = requests.get('https://api.example.com/data') # Блокирует
return response.json()import httpx
@app.get('/external')
async def get_external():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/data')
return response.json()import httpx
async def fetch_data(client, url: str):
response = await client.get(url)
return response.json()
@app.get('/multiple')
async def get_multiple():
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
fetch_data(client, 'https://api1.com/data'),
fetch_data(client, 'https://api2.com/data'),
fetch_data(client, 'https://api3.com/data')
)
return {'results': results}from fastapi import FastAPI, HTTPException
import asyncio
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
app = FastAPI()
async def fetch_external_data(session: httpx.AsyncClient, url: str):
"""Асинхронный запрос к внешнему API"""
try:
response = await session.get(url, timeout=5.0)
response.raise_for_status()
return response.json()
except Exception as e:
return {'error': str(e)}
async def fetch_database_data(db: AsyncSession, model):
"""Асинхронный запрос к БД"""
result = await db.execute(select(model))
return result.scalars().all()
@app.get('/aggregated/{user_id}')
async def get_aggregated_data(
user_id: int,
db: AsyncSession = Depends(get_db)
):
"""
Агрегирует данные из БД и внешних API.
"""
async with httpx.AsyncClient() as client:
# Параллельное выполнение всех запросов
user_data, posts_data, external_api1, external_api2 = await asyncio.gather(
fetch_database_data(db, User),
fetch_database_data(db, Post),
fetch_external_data(client, f'https://api1.com/users/{user_id}'),
fetch_external_data(client, f'https://api2.com/users/{user_id}'),
return_exceptions=True
)
# Обработка ошибок
results = {
'users': user_data if not isinstance(user_data, Exception) else [],
'posts': posts_data if not isinstance(posts_data, Exception) else [],
'external_api1': external_api1 if not isinstance(external_api1, Exception) else None,
'external_api2': external_api2 if not isinstance(external_api2, Exception) else None,
}
return results
@app.get('/batch-process')
async def batch_process(items: list[int]):
"""
Обрабатывает элементы с ограничением конкурентности.
"""
semaphore = asyncio.Semaphore(10) # Максимум 10 одновременно
async def process_item(item_id: int):
async with semaphore:
await asyncio.sleep(0.5) # Имитация обработки
return {'item_id': item_id, 'status': 'processed'}
results = await asyncio.gather(*[process_item(i) for i in items])
return {'results': results, 'total': len(results)}В следующей теме вы изучите оптимизацию производительности — профилирование, benchmark, поиск bottleneck.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.