Проектирование архитектуры бота: роутинг, middleware, dependency injection, модульная структура
Проблема: Бот, написанный в одном файле
bot.py, работает до тех пор, пока не превысит 500 строк кода. После этого:
- Невозможно найти обработчики — всё в одной куче
- Сложно добавлять новые фичи — меняете одну команду, ломаете другую
- Тесты требуют мокирования всего подряд — нет изоляции компонентов
- Несколько разработчиков не могут работать параллельно — постоянные конфликты в git
Решение: Модульная архитектура с разделением ответственности. Каждый компонент решает одну задачу, зависит от абстракций, а не от реализаций, и может тестироваться изолированно.
Зачем это нужно сейчас: Даже если вы делаете прототип в одиночку, правильная архитектура сэкономит часы через 3 месяца, когда придётся добавлять новую фичу или чинить баги.
Сценарий из практики: Вы запустили бота для приёма заказов. Через месяц:
Цена плохой архитектуры:
| Проблема | Время на решение |
|---|---|
| Найти обработчик команды | 10-30 минут |
| Добавить новую фичу | 2-3 дня (вместо 4 часов) |
| Написать тесты | Невозможно без рефакторинга |
| Онбординг нового разработчика | 1-2 недели |
Что даёт правильная архитектура:
telegram_bot/
├── bot/
│ ├── __init__.py
│ ├── main.py # Точка входа
│ ├── config.py # Настройки через pydantic-settings
│ ├── dispatcher.py # Создание и настройка Dispatcher
│ │
│ ├── middleware/ # Сквозная логика
│ │ ├── __init__.py
│ │ ├── auth.py # Аутентификация пользователей
│ │ ├── logging.py # Логирование запросов
│ │ ├── rate_limit.py # Throttling
│ │ └── db.py # Injection сессий БД
│ │
│ ├── handlers/ # Обработчики (бизнес-логика)
│ │ ├── __init__.py
│ │ ├── start.py # /start, /help
│ │ ├── user_profile.py # Профиль пользователя
│ │ ├── orders.py # Управление заказами
│ │ └── admin.py # Админ-панель
│ │
│ ├── keyboards/ # Клавиатуры
│ │ ├── __init__.py
│ │ ├── inline.py # Inline-клавиатуры
│ │ └── reply.py # Reply-клавиатуры
│ │
│ ├── filters/ # Кастомные фильтры
│ │ ├── __init__.py
│ │ ├── is_admin.py # Фильтр на админа
│ │ └── is_subscribed.py # Фильтр на подписку
│ │
│ ├── services/ # Бизнес-логика
│ │ ├── __init__.py
│ │ ├── api_client.py # Внешние API
│ │ ├── notification.py # Уведомления
│ │ └── payment.py # Платежи
│ │
│ ├── repositories/ # Доступ к данным
│ │ ├── __init__.py
│ │ ├── base.py # Базовый репозиторий
│ │ ├── user.py # UserRepository
│ │ └── order.py # OrderRepository
│ │
│ └── models/ # SQLAlchemy модели
│ ├── __init__.py
│ ├── user.py # Модель пользователя
│ └── order.py # Модель заказа
│
├── migrations/ # Alembic миграции
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Фикстуры pytest
│ ├── test_handlers.py
│ └── test_services.py
├── .env
├── .env.example
├── pyproject.toml
└── docker-compose.yml
💡 Зачем такое разделение:
- handlers — только обработка сообщений, минимум логики
- services — бизнес-логика, координация репозиториев
- repositories — доступ к БД, инкапсуляция SQL
- middleware — сквозная логика (auth, logging)
🔗 Связь с другими темами: Каждый модуль тестируется изолированно (см. "Тестирование").
Проблема: Как распределить входящие сообщения по обработчикам и обеспечить сквозную логику (логирование, аутентификацию, БД)?
Решение: Dispatcher — центральный компонент, который принимает обновления и распределяет их по роутерам.
# bot/dispatcher.py
from aiogram import Dispatcher, Bot
from aiogram.fsm.storage.redis import RedisStorage
from redis.asyncio import Redis
from bot.handlers import start, user_profile, orders, admin
from bot.middleware.auth import AuthMiddleware
from bot.middleware.db import DatabaseMiddleware
from bot.config import settings
def create_dispatcher(bot: Bot, redis: Redis) -> Dispatcher:
"""
Создаёт и настраивает Dispatcher.
Зачем: Централизованная настройка всех компонентов бота.
🔗 Связь с другими темами:
- RedisStorage — FSM (см. "Управление состояниями")
- Middleware — сквозная логика (см. ниже)
- Роутеры — модульность обработчиков
"""
# Storage для FSM (состояний)
# ⚠️ Для продакшена используйте Redis, не MemoryStorage!
storage = RedisStorage(redis=redis, key_prefix='fsm')
dp = Dispatcher(storage=storage)
# Глобальные middleware (вызываются для каждого обновления)
dp.update.middleware(AuthMiddleware())
dp.update.middleware(DatabaseMiddleware(session_factory=settings.db_session_factory))
# Подключаем роутеры (модули с обработчиками)
dp.include_router(start.router)
dp.include_router(user_profile.router)
dp.include_router(orders.router)
dp.include_router(admin.router)
return dp⚠️ Антипаттерн: Не создавайте Dispatcher в
main.py. Вынесите в отдельный модуль — это упростит тестирование (можно создать dispatcher с мокированными зависимостями).
Проблема: Все обработчики в одном файле — невозможно найти нужный.
Решение: Router группирует обработчики по функциональности. Каждый роутер — независимый модуль.
# bot/handlers/orders.py
from aiogram import Router, F
from aiogram.types import Message, CallbackQuery
from aiogram.filters import Command
from bot.keyboards.inline import build_order_keyboard
from bot.services.order import OrderService
# Роутер с именем для логирования
# 💡 Зачем: В логах видно, из какого роутера пришёл обработчик
router = Router(name='orders')
@router.message(Command('orders'))
async def list_orders(msg: Message, order_service: OrderService):
"""
Показать список заказов пользователя.
Зачем: Разделение обработчиков по доменам.
🔗 Связь с другими темами:
- OrderService — бизнес-логика (см. "Сервисный слой")
- build_order_keyboard — UI (см. "Inline-клавиатуры")
"""
user_id = msg.from_user.id
orders = await order_service.get_user_orders(user_id)
if not orders:
await msg.answer('У вас пока нет заказов.')
return
text = 'Ваши заказы:\n\n'
for order in orders:
text += f'📦 #{order.id} — {order.status}\n'
await msg.answer(text, reply_markup=build_order_keyboard(orders))
@router.callback_query(F.data.startswith('order_'))
async def order_detail(cb: CallbackQuery, order_service: OrderService):
"""Показать детали заказа."""
order_id = int(cb.data.split('_')[1])
order = await order_service.get_order(order_id)
if not order:
await cb.answer('Заказ не найден', show_alert=True)
return
text = f"""
📦 Заказ #{order.id}
Статус: {order.status}
Сумма: {order.total} ₽
Дата: {order.created_at.strftime('%d.%m.%Y')}
"""
await cb.message.edit_text(text)💡 Совет: Именуйте роутеры по доменам:
orders,users,payments,admin. Избегайте общих имён вродеcommon,other.
Проблема: У вас 10 роутеров, и вы хотите зарегистрировать их все в dispatcher. Список получается длинным.
Решение: Создайте главный роутер для группы и подключайте его.
# bot/handlers/__init__.py
from aiogram import Router
# Главный роутер для всех handlers
handlers_router = Router(name='handlers')
# Под-роутеры
from . import start, user_profile, orders, admin
handlers_router.include_router(start.router)
handlers_router.include_router(user_profile.router)
handlers_router.include_router(orders.router)
handlers_router.include_router(admin.router)
# В dispatcher.py теперь:
# dp.include_router(handlers_router)⚠️ Антипаттерн: Не создавайте глубокую вложенность роутеров (более 2 уровней). Это усложняет отладку.
Проблема: Нужно логировать все запросы, проверять авторизацию и ограничивать частоту запросов. Писать это в каждом обработчике?
Решение: Middleware — функции, которые вызываются до и после обработчика. Один раз написал — работает везде.
| Тип | Когда вызывается | Пример использования |
|---|---|---|
update | До всех обработчиков, для любого типа обновления | Аутентификация, логирование |
message | Только для сообщений | Обработка текста, команд |
callback_query | Только для callback query | Валидация callback данных |
error | При ошибках в обработчиках | Логирование ошибок, алерты |
💡 Зачем разные типы: Экономия ресурсов. Не нужно проверять авторизацию для callback, если это middleware только для сообщений.
# bot/middleware/logging.py
import logging
from typing import Callable, Dict, Any
from aiogram import BaseMiddleware
from aiogram.types import Update, Message, CallbackQuery
logger = logging.getLogger(__name__)
class LoggingMiddleware(BaseMiddleware):
"""
Логирует все входящие обновления.
Зачем: Наблюдаемость в продакшене.
Когда: Для отладки и мониторинга.
🔗 Связь с другими темами: Структурированное логирование (см. "Мониторинг").
"""
async def __call__(
self,
handler: Callable,
event: Update,
data: Dict[str, Any]
) -> Any:
# Логирование до обработки
user_id = self._get_user_id(event)
update_type = self._get_update_type(event)
logger.info(f'Получено обновление: type={update_type}, user_id={user_id}')
# Вызов следующего middleware или обработчика
result = await handler(event, data)
# Логирование после обработки (опционально)
logger.info(f'Обновление обработано: user_id={user_id}')
return result
def _get_user_id(self, event: Update) -> int:
"""Извлекает user_id из обновления."""
if event.message:
return event.message.from_user.id
elif event.callback_query:
return event.callback_query.from_user.id
elif event.edited_message:
return event.edited_message.from_user.id
return 0
def _get_update_type(self, event: Update) -> str:
"""Определяет тип обновления."""
if event.message:
return 'message'
elif event.callback_query:
return 'callback_query'
elif event.edited_message:
return 'edited_message'
return 'unknown'⚠️ Антипаттерн: Не логируйте чувствительные данные (токены, пароли, персональные данные).
# bot/middleware/auth.py
from typing import Callable, Dict, Any
from aiogram import BaseMiddleware
from aiogram.types import Update
from bot.repositories.user import UserRepository
from bot.services.notification import NotificationService
class AuthMiddleware(BaseMiddleware):
"""
Проверяет, зарегистрирован ли пользователь.
Зачем: Автоматическая регистрация и передача пользователя в обработчики.
🔗 Связь с другими темами:
- UserRepository — доступ к БД (см. "Базы данных")
- Dependency Injection — передача зависимостей
"""
def __init__(self, user_repo: UserRepository):
self.user_repo = user_repo
async def __call__(
self,
handler: Callable,
event: Update,
data: Dict[str, Any]
) -> Any:
user_id = self._get_user_id(event)
if user_id:
# Проверяем наличие пользователя в БД
user = await self.user_repo.get_by_id(user_id)
data['user'] = user # Передаём в контекст
if not user:
# Пользователь не найден — создаём
user = await self.user_repo.create(user_id)
data['user'] = user
return await handler(event, data)
def _get_user_id(self, event: Update) -> int:
if event.message:
return event.message.from_user.id
elif event.callback_query:
return event.callback_query.from_user.id
return 0💡 Совет: Не блокируйте запрос в auth middleware, если пользователь не найден. Создайте его автоматически — это улучшит UX.
Проблема: Пользователь отправляет 100 сообщений в минуту. Бот тратит все ресурсы на обработку спама.
Решение: Rate limiting middleware ограничивает частоту запросов.
# bot/middleware/rate_limit.py
import asyncio
from typing import Callable, Dict, Any
from collections import defaultdict
from aiogram import BaseMiddleware
from aiogram.types import Message, CallbackQuery
from aiogram.exceptions import TelegramForbiddenError
class RateLimitMiddleware(BaseMiddleware):
"""
Ограничивает частоту запросов от пользователя.
Зачем: Защита от злоупотреблений и спама.
⚠️ Антипаттерн: Не используйте MemoryStorage для rate limiting
в продакшене — данные потеряются при перезапуске.
Используйте Redis.
"""
def __init__(self, limit: int = 5, interval: int = 1):
self.limit = limit # Максимум запросов
self.interval = interval # В секундах
self.requests: Dict[int, list] = defaultdict(list)
self.locks: Dict[int, asyncio.Lock] = defaultdict(asyncio.Lock)
async def __call__(
self,
handler: Callable,
event: Message | CallbackQuery,
data: Dict[str, Any]
) -> Any:
user_id = event.from_user.id
async with self.locks[user_id]:
now = asyncio.get_event_loop().time()
# Удаляем старые запросы
self.requests[user_id] = [
t for t in self.requests[user_id]
if now - t < self.interval
]
if len(self.requests[user_id]) >= self.limit:
# Превышен лимит
try:
await event.answer(
'Слишком много запросов. Подождите немного.',
show_alert=True
)
except TelegramForbiddenError:
pass
return None
self.requests[user_id].append(now)
return await handler(event, data)🔗 Связь с другими темами: Для production используйте Redis-based rate limiting (см. "Масштабирование").
# bot/middleware/db.py
from typing import Callable, Dict, Any, AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from aiogram import BaseMiddleware
from aiogram.types import Update
class DatabaseMiddleware(BaseMiddleware):
"""
Создаёт и передаёт сессию БД в обработчики.
Зачем: Автоматическое управление сессиями БД.
🔗 Связь с другими темами:
- Repository Pattern — доступ к данным
- Dependency Injection — передача зависимостей
"""
def __init__(self, session_factory: async_sessionmaker):
self.session_factory = session_factory
async def __call__(
self,
handler: Callable,
event: Update,
data: Dict[str, Any]
) -> Any:
async with self.session_factory() as session:
data['session'] = session # Inject сессии
data['user_repo'] = UserRepository(session) # Inject репозитория
return await handler(event, data)⚠️ Антипаттерн: Не создавайте сессию БД в каждом обработчике. Используйте middleware для централизованного управления.
Проблема: Обработчик зависит от БД, сервисов, клиентов API. Создавать их внутри обработчика? Это усложнит тестирование.
Решение: Dependency Injection — передача зависимостей через параметры. Middleware создаёт зависимости, обработчик получает готовые.
# bot/dispatcher.py
from bot.middleware.db import DatabaseMiddleware
from bot.services.api_client import APIClient
from bot.services.notification import NotificationService
def create_dispatcher(bot: Bot, redis: Redis) -> Dispatcher:
storage = RedisStorage(redis=redis, key_prefix='fsm')
dp = Dispatcher(storage=storage)
# Создаём сервисы (синглтоны)
# 💡 Зачем: Один экземпляр на все запросы — экономия ресурсов
api_client = APIClient(settings.api_key)
notification_service = NotificationService(bot)
# Middleware для injection
dp.update.middleware(DatabaseMiddleware(settings.db_session_factory))
dp.update.middleware(ServiceMiddleware(api_client, notification_service))
return dp# bot/handlers/user_profile.py
from aiogram import Router, F
from aiogram.types import Message
from sqlalchemy.ext.asyncio import AsyncSession
from bot.repositories.user import UserRepository
from bot.services.api_client import APIClient
router = Router()
@router.message(F.text == 'Профиль')
async def show_profile(
msg: Message,
session: AsyncSession, # Из DatabaseMiddleware
user_repo: UserRepository, # Из DatabaseMiddleware
api_client: APIClient # Из ServiceMiddleware
):
"""
Показать профиль пользователя.
Зачем: Явные зависимости упрощают тестирование.
Можно заменить реальные зависимости на моки.
"""
user_id = msg.from_user.id
# Используем injected зависимости
user = await user_repo.get_by_id(user_id)
external_data = await api_client.get_user_data(user_id)
text = f"Профиль {user.username}: {external_data}"
await msg.answer(text)💡 Совет: Для тестирования создайте dispatcher с мокированными зависимостями (см. "Тестирование").
Проблема: SQL-запросы разбросаны по обработчикам. Изменили схему БД — правим десятки файлов.
Решение: Repository инкапсулирует доступ к данным. Обработчики работают с репозиторием, не зная о SQL.
# bot/repositories/base.py
from typing import TypeVar, Generic, Type, Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
ModelType = TypeVar('ModelType')
class BaseRepository(Generic[ModelType]):
"""
Базовый CRUD репозиторий.
Зачем: Переиспользование CRUD-операций.
🔗 Связь с другими темами: SQLAlchemy (см. "Базы данных").
"""
def __init__(self, model: Type[ModelType], session: AsyncSession):
self.model = model
self.session = session
async def get_by_id(self, id: int) -> Optional[ModelType]:
"""Получить объект по ID."""
result = await self.session.execute(
select(self.model).where(self.model.id == id)
)
return result.scalar_one_or_none()
async def get_all(self) -> List[ModelType]:
"""Получить все объекты."""
result = await self.session.execute(select(self.model))
return list(result.scalars().all())
async def create(self, **kwargs) -> ModelType:
"""Создать новый объект."""
obj = self.model(**kwargs)
self.session.add(obj)
await self.session.commit()
await self.session.refresh(obj)
return obj
async def update(self, obj: ModelType, **kwargs) -> ModelType:
"""Обновить объект."""
for key, value in kwargs.items():
setattr(obj, key, value)
await self.session.commit()
await self.session.refresh(obj)
return obj
async def delete(self, obj: ModelType) -> None:
"""Удалить объект."""
await self.session.delete(obj)
await self.session.commit()⚠️ Антипаттерн: Не возвращайте SQLAlchemy-модели напрямую в handlers. Создайте DTO/схемы для ответа.
# bot/repositories/user.py
from typing import Optional, List
from sqlalchemy import select
from bot.repositories.base import BaseRepository
from bot.models.user import User
class UserRepository(BaseRepository[User]):
"""
Репозиторий для пользователей.
Зачем: Специфичные запросы, связанные с пользователями.
"""
def __init__(self, session):
super().__init__(User, session)
async def get_by_telegram_id(self, telegram_id: int) -> Optional[User]:
"""
Найти пользователя по Telegram ID.
Зачем: Telegram ID — основной идентификатор в боте.
"""
result = await self.session.execute(
select(User).where(User.telegram_id == telegram_id)
)
return result.scalar_one_or_none()
async def get_by_username(self, username: str) -> Optional[User]:
"""Найти пользователя по username."""
result = await self.session.execute(
select(User).where(User.username == username)
)
return result.scalar_one_or_none()
async def get_admins(self) -> List[User]:
"""Получить всех администраторов."""
result = await self.session.execute(
select(User).where(User.is_admin == True)
)
return list(result.scalars().all())
async def create_from_telegram(self, telegram_id: int, username: str) -> User:
"""Создать пользователя из данных Telegram."""
return await self.create(
telegram_id=telegram_id,
username=username
)Проблема: Бизнес-логика в обработчиках. Обработчик создаёт заказы, отправляет уведомления, вызывает API. Невозможно переиспользовать логику.
Решение: Сервисы содержат бизнес-логику, координируя репозитории и внешние API. Обработчики только вызывают сервисы.
# bot/services/order.py
from typing import List, Optional
from datetime import datetime
from bot.models.order import Order, OrderStatus
from bot.repositories.order import OrderRepository
from bot.services.notification import NotificationService
from bot.services.api_client import APIClient
class OrderService:
"""
Бизнес-логика для заказов.
Зачем: Разделение ответственности.
Обработчик — только UI, сервис — логика.
🔗 Связь с другими темами:
- Repository — доступ к данным
- Notification — уведомления пользователей
- Testing — тестирование сервисов изолированно
"""
def __init__(
self,
order_repo: OrderRepository,
notification_service: NotificationService,
api_client: APIClient
):
self.order_repo = order_repo
self.notification_service = notification_service
self.api_client = api_client
async def create_order(self, user_id: int, items: List[dict]) -> Order:
"""
Создать новый заказ.
Зачем: Координация нескольких зависимостей.
"""
# Проверяем наличие товаров через внешнее API
available_items = await self.api_client.check_availability(items)
if not available_items:
raise ValueError('Нет товаров в наличии')
# Создаём заказ
total = sum(item['price'] * item['quantity'] for item in items)
order = await self.order_repo.create(
user_id=user_id,
items=items,
total=total,
status=OrderStatus.PENDING
)
# Уведомляем пользователя
await self.notification_service.send_order_created(
user_id=user_id,
order_id=order.id
)
return order
async def get_user_orders(self, user_id: int) -> List[Order]:
"""Получить заказы пользователя."""
return await self.order_repo.get_by_user_id(user_id)
async def cancel_order(self, order_id: int, reason: str) -> bool:
"""Отменить заказ."""
order = await self.order_repo.get_by_id(order_id)
if not order or order.status != OrderStatus.PENDING:
return False
# Обновляем статус
await self.order_repo.update(
order,
status=OrderStatus.CANCELLED,
cancel_reason=reason
)
# Уведомляем
await self.notification_service.send_order_cancelled(
user_id=order.user_id,
order_id=order.id,
reason=reason
)
return True⚠️ Антипаттерн: Не вызывайте Telegram API напрямую из сервисов. Сервис должен возвращать результат, а обработчик — отправлять ответ пользователю.
Проблема: Настройки разбросаны по коду, нет валидации, секреты в коде.
Решение: Централизованная конфигурация с валидацией через pydantic-settings.
# bot/config.py
from pydantic_settings import BaseSettings
from pydantic import Field, SecretStr
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
class Settings(BaseSettings):
"""
Настройки приложения.
Зачем: Валидация конфигурации при старте.
Секреты не попадают в логи.
"""
# Bot
bot_token: SecretStr = Field(..., env='BOT_TOKEN')
# Database
database_url: str = Field(..., env='DATABASE_URL')
db_pool_size: int = Field(default=10, env='DB_POOL_SIZE')
# Redis
redis_url: str = Field(default='redis://localhost:6379/0', env='REDIS_URL')
# API
api_key: SecretStr = Field(..., env='API_KEY')
api_base_url: str = Field(..., env='API_BASE_URL')
# Admins
admin_ids: list[int] = Field(default_factory=list, env='ADMIN_IDS')
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
@property
def db_session_factory(self) -> async_sessionmaker:
"""Создаёт фабрику сессий БД."""
engine = create_async_engine(
self.database_url,
pool_size=self.db_pool_size,
echo=False
)
return async_sessionmaker(engine, expire_on_commit=False)
settings = Settings()⚠️ Антипаттерн: Не храните
.envв git. Добавьте в.gitignoreи создайте.env.exampleс примерами.
# bot/main.py
import asyncio
import logging
from aiogram import Bot
from redis.asyncio import Redis
from bot.config import settings
from bot.dispatcher import create_dispatcher
from bot.middleware.logging import LoggingMiddleware
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
async def main():
"""
Точка входа приложения.
Зачем: Минимальная логика в main.
Вся настройка вынесена в create_dispatcher.
"""
# Создаём бота и Redis
bot = Bot(token=settings.bot_token.get_secret_value())
redis = Redis.from_url(settings.redis_url)
# Создаём Dispatcher с настройками
dp = create_dispatcher(bot, redis)
# Добавляем глобальный middleware логирования
dp.update.outer_middleware(LoggingMiddleware())
try:
# Запускаем polling
await dp.start_polling(bot)
finally:
# Корректное завершение
# 💡 Зачем: Освобождение ресурсов (соединения БД, Redis)
await bot.session.close()
await redis.close()
if __name__ == '__main__':
asyncio.run(main())| Тема | Как связана |
|---|---|
| Обработчики и фильтры | Handlers — часть архитектуры, фильтры выносятся в bot/filters/ |
| Базы данных | Repository Pattern использует SQLAlchemy |
| FSM | Storage настраивается в dispatcher |
| Тестирование | Модульная архитектура упрощает изолированное тестирование |
| Масштабирование | Архитектура позволяет запускать несколько инстансов бота |
→ Базы данных — SQLAlchemy, миграции, connection pooling
→ Middleware — детальное рассмотрение сквозной логики
→ Тестирование — как тестировать компоненты изолированно
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.