Абстракция доступа к данным. Unit of Work для транзакций. SQLAlchemy и asyncpg примеры.
Repository абстрагирует доступ к данным. Unit of Work координирует транзакции.
Repository — архитектурный паттерн, абстрагирующий доступ к данным: коллекция объектов домена с методами add(), remove(), find().
# ❌ ПЛОХО: SQL размазан по бизнес-логике
class UserService:
def get_user(self, user_id: int):
conn = get_connection()
cursor = conn.execute(
"SELECT * FROM users WHERE id = ?",
(user_id,)
)
row = cursor.fetchone()
if row:
return User(row[0], row[1], row[2])
return None
def save_user(self, user: User):
conn = get_connection()
conn.execute(
"INSERT OR REPLACE INTO users (id, name, email) VALUES (?, ?, ?)",
(user.id, user.name, user.email)
)
conn.commit()
# Проблема:
# 1. Бизнес-логика смешана с SQL
# 2. Трудно тестировать (нужна БД)
# 3. Дублирование SQL# ✅ ХОРОШО: Абстракция над данными
from abc import ABC, abstractmethod
from typing import Optional, List
from uuid import UUID
class Entity(ABC):
@property
@abstractmethod
def id(self) -> UUID:
pass
class User(Entity):
def __init__(self, id: UUID, name: str, email: str):
self._id = id
self.name = name
self.email = email
@property
def id(self) -> UUID:
return self._id
class Repository(ABC, Generic[T]):
@abstractmethod
def add(self, entity: T) -> None:
pass
@abstractmethod
def get(self, id: UUID) -> Optional[T]:
pass
@abstractmethod
def remove(self, id: UUID) -> None:
pass
@abstractmethod
def list(self) -> List[T]:
pass
class UserRepository(Repository[User]):
def __init__(self, connection):
self._conn = connection
def add(self, user: User) -> None:
self._conn.execute(
"INSERT OR REPLACE INTO users (id, name, email) VALUES (?, ?, ?)",
(str(user.id), user.name, user.email)
)
self._conn.commit()
def get(self, id: UUID) -> Optional[User]:
cursor = self._conn.execute(
"SELECT * FROM users WHERE id = ?",
(str(id),)
)
row = cursor.fetchone()
if row:
return User(UUID(row[0]), row[1], row[2])
return None
def remove(self, id: UUID) -> None:
self._conn.execute("DELETE FROM users WHERE id = ?", (str(id),))
self._conn.commit()
def list(self) -> List[User]:
cursor = self._conn.execute("SELECT * FROM users")
return [User(UUID(row[0]), row[1], row[2]) for row in cursor.fetchall()]
# Использование
repo = UserRepository(conn)
user = repo.get(uuid)
repo.add(user)Unit of Work — паттерн поддержания списка объектов, изменённых в ходе бизнес-транзакции, и координация записи изменений в БД.
# ✅ Unit of Work для транзакций
from contextlib import contextmanager
from typing import Dict, Set
class UnitOfWork:
def __init__(self, connection):
self._conn = connection
self._new: Dict[type, Set[Entity]] = {}
self._dirty: Dict[type, Set[Entity]] = {}
self._deleted: Dict[type, Set[UUID]] = {}
def register_new(self, entity: Entity):
"""Регистрирует новый объект для вставки"""
entity_type = type(entity)
if entity_type not in self._new:
self._new[entity_type] = set()
self._new[entity_type].add(entity)
def register_dirty(self, entity: Entity):
"""Регистрирует изменённый объект для обновления"""
entity_type = type(entity)
if entity_type not in self._dirty:
self._dirty[entity_type] = set()
self._dirty[entity_type].add(entity)
def register_deleted(self, entity_id: UUID, entity_type: type):
"""Регистрирует удалённый объект"""
if entity_type not in self._deleted:
self._deleted[entity_type] = set()
self._deleted[entity_type].add(entity_id)
def commit(self):
"""Сохраняет все изменения в транзакции"""
try:
# Вставка новых
for entity_type, entities in self._new.items():
repo = self._get_repository(entity_type)
for entity in entities:
repo.add(entity)
# Обновление изменённых
for entity_type, entities in self._dirty.items():
repo = self._get_repository(entity_type)
for entity in entities:
repo.update(entity)
# Удаление
for entity_type, ids in self._deleted.items():
repo = self._get_repository(entity_type)
for entity_id in ids:
repo.remove(entity_id)
self._conn.commit()
except Exception:
self._conn.rollback()
raise
finally:
self._clear()
def _get_repository(self, entity_type: type):
"""Возвращает репозиторий для типа"""
# Фабрика репозиториев
pass
def _clear(self):
"""Очищает трекинг"""
self._new.clear()
self._dirty.clear()
self._deleted.clear()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.commit()
else:
self._conn.rollback()
self._clear()
# Использование
with UnitOfWork(conn) as uow:
user = User(uuid, "Alice", "alice@example.com")
uow.register_new(user)
# commit() вызывается автоматически
# rollback() при исключении# ✅ Repository с SQLAlchemy
from sqlalchemy import select
from sqlalchemy.orm import Session
class SQLAlchemyRepository(Repository[T]):
def __init__(self, session: Session, model):
self._session = session
self._model = model
def add(self, entity: T) -> None:
self._session.add(entity)
self._session.flush()
def get(self, id: UUID) -> Optional[T]:
return self._session.get(self._model, id)
def remove(self, id: UUID) -> None:
entity = self.get(id)
if entity:
self._session.delete(entity)
def list(self) -> List[T]:
return self._session.execute(select(self._model)).scalars().all()
def find_by(self, **filters) -> List[T]:
query = select(self._model)
for key, value in filters.items():
query = query.where(getattr(self._model, key) == value)
return self._session.execute(query).scalars().all()
# Использование
class UserSQLAlchemyRepository(SQLAlchemyRepository[User]):
def __init__(self, session: Session):
super().__init__(session, UserORM)
def find_by_email(self, email: str) -> Optional[User]:
return self._session.execute(
select(UserORM).where(UserORM.email == email)
).scalars().first()
# В контексте
with Session() as session:
repo = UserSQLAlchemyRepository(session)
user = repo.find_by_email("alice@example.com")# ✅ Unit of Work с SQLAlchemy
from sqlalchemy.orm import sessionmaker
class SQLAlchemyUnitOfWork:
def __init__(self, session_factory: sessionmaker):
self._session_factory = session_factory
self._session: Session = None
def __enter__(self) -> "SQLAlchemyUnitOfWork":
self._session = self._session_factory()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self._session.commit()
else:
self._session.rollback()
self._session.close()
@property
def users(self) -> UserRepository:
return UserRepository(self._session)
@property
def orders(self) -> OrderRepository:
return OrderRepository(self._session)
# Использование
uow = SQLAlchemyUnitOfWork(session_factory)
with uow:
user = User(uuid, "Alice", "alice@example.com")
uow.users.add(user)
order = Order(uuid4(), user.id, 100.0)
uow.orders.add(order)
# commit() вызывается автоматически при выходе из with# ✅ In-Memory Repository для тестов
class InMemoryRepository(Repository[T]):
def __init__(self):
self._store: Dict[UUID, T] = {}
def add(self, entity: T) -> None:
self._store[entity.id] = entity
def get(self, id: UUID) -> Optional[T]:
return self._store.get(id)
def remove(self, id: UUID) -> None:
self._store.pop(id, None)
def list(self) -> List[T]:
return list(self._store.values())
# In-Memory Unit of Work
class InMemoryUnitOfWork:
def __init__(self):
self.users = InMemoryRepository()
self.orders = InMemoryRepository()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
# Rollback — очистка
self.users._store.clear()
self.orders._store.clear()
# Тесты
def test_create_user():
with InMemoryUnitOfWork() as uow:
user = User(uuid4(), "Test", "test@example.com")
uow.users.add(user)
found = uow.users.get(user.id)
assert found.name == "Test"# ✅ Разделение чтения и записи
class WriteRepository(Repository[T]):
"""Для записи — с бизнес-логикой"""
def add(self, entity: T) -> None:
# Валидация, бизнес-правила
self._validate(entity)
super().add(entity)
def _validate(self, entity: T):
pass
class ReadRepository:
"""Для чтения — оптимизированные запросы"""
def __init__(self, connection):
self._conn = connection
def get_user_dashboard(self, user_id: UUID) -> dict:
cursor = self._conn.execute(
"""
SELECT u.name, COUNT(o.id) as order_count, SUM(o.total) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.id = ?
GROUP BY u.id
""",
(str(user_id),)
)
row = cursor.fetchone()
return {"name": row[0], "orders": row[1], "total": row[2]} if row else None
# Использование
write_repo = WriteRepository(conn)
read_repo = ReadRepository(conn)
# Запись через write-репозиторий
write_repo.add(user)
# Чтение через read-репозиторий (быстрее, без ORM)
dashboard = read_repo.get_user_dashboard(user_id)| Паттерн | Когда использовать | Pythonic-реализация |
|---|---|---|
| Repository | Абстракция над БД/хранилищем | ABC + CRUD методы |
| Unit of Work | Транзакции, отслеживание изменений | Контекстный менеджер |
| In-Memory Repo | Тестирование без БД | dict в памяти |
Главный принцип: Repository скрывает SQL, Unit of Work координирует транзакции.
Изучите тему CQRS и Event Sourcing для разделения чтения/записи и событий как источника истины.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.