Централизованное взаимодействие объектов. Event Bus для слабой связанности. asyncio.Queue для асинхронных событий.
Mediator превращает спагетти-связи в звезду. Event Bus превращает прямые вызовы в события.
Mediator — поведенческий паттерн, централизующий взаимодействие между объектами, уменьшая связанность.
# ❌ ПЛОХО: Каждый компонент знает о других
class Button:
def __init__(self, textbox, checkbox):
self.textbox = textbox
self.checkbox = checkbox
def click(self):
if self.checkbox.is_checked():
self.textbox.enable()
else:
self.textbox.disable()
class TextBox:
def __init__(self, button, checkbox):
self.button = button
self.checkbox = checkbox
def text_changed(self):
self.button.enable() if self.text else self.button.disable()
class Checkbox:
def __init__(self, button, textbox):
self.button = button
self.textbox = textbox
def toggle(self):
self.button.click()
# Проблема: каждый компонент знает о всех остальных
# Невозможно переиспользовать компоненты# ✅ ХОРОШО: Компоненты знают только о медиаторе
from abc import ABC, abstractmethod
class Mediator(ABC):
@abstractmethod
def notify(self, sender, event: str):
pass
class Component:
def __init__(self, mediator: Mediator = None):
self._mediator = mediator
def set_mediator(self, mediator: Mediator):
self._mediator = mediator
def notify(self, event: str):
self._mediator.notify(self, event)
class Button(Component):
def click(self):
print("Button clicked")
self.notify("click")
class TextBox(Component):
def __init__(self, mediator: Mediator = None):
super().__init__(mediator)
self.text = ""
self.enabled = False
def set_text(self, text: str):
self.text = text
self.notify("text_changed")
def enable(self):
self.enabled = True
print("TextBox enabled")
def disable(self):
self.enabled = False
print("TextBox disabled")
class Checkbox(Component):
def __init__(self, mediator: Mediator = None):
super().__init__(mediator)
self.checked = False
def toggle(self):
self.checked = not self.checked
print(f"Checkbox: {self.checked}")
self.notify("toggle")
def is_checked(self) -> bool:
return self.checked
class DialogMediator(Mediator):
def __init__(self):
self._button = None
self._textbox = None
self._checkbox = None
def set_components(self, button, textbox, checkbox):
self._button = button
self._textbox = textbox
self._checkbox = checkbox
def notify(self, sender, event: str):
if event == "click":
if self._checkbox.is_checked():
self._textbox.enable()
else:
self._textbox.disable()
elif event == "text_changed":
if self._textbox.text:
self._button.enable()
else:
self._button.disable()
elif event == "toggle":
self._button.click()
# Использование
mediator = DialogMediator()
button = Button(mediator)
textbox = TextBox(mediator)
checkbox = Checkbox(mediator)
mediator.set_components(button, textbox, checkbox)
# Компоненты не знают друг о друге!
checkbox.toggle() # Checkbox: True, Button clicked
textbox.set_text("Hello") # TextBox enabled# ✅ Event Bus для слабой связанности
from typing import Callable, Dict, List, Any
from collections import defaultdict
class EventBus:
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
def subscribe(self, event: str, handler: Callable):
self._subscribers[event].append(handler)
return self # Для цепочки
def unsubscribe(self, event: str, handler: Callable):
self._subscribers[event].remove(handler)
def publish(self, event: str, data: Any = None):
for handler in self._subscribers.get(event, []):
handler(data)
# Использование
bus = EventBus()
# Подписка
bus.subscribe("user.created", lambda u: print(f"Welcome {u}"))
bus.subscribe("user.created", lambda u: print(f"Email to {u}"))
bus.subscribe("user.deleted", lambda u: print(f"Deleted {u}"))
# Публикация
bus.publish("user.created", "Alice")
# Welcome Alice
# Email to Alice
bus.publish("user.deleted", "Bob")
# Deleted Bob# ✅ Типизированный Event Bus
from typing import TypeVar, Generic, Callable
from dataclasses import dataclass
T = TypeVar('T')
class Event(Generic[T]):
def __init__(self, data: T):
self.data = data
class TypedEventBus:
def __init__(self):
self._subscribers: Dict[type, List[Callable]] = defaultdict(list)
def subscribe(self, event_type: type, handler: Callable[[Event], None]):
self._subscribers[event_type].append(handler)
def publish(self, event: Event):
for handler in self._subscribers.get(type(event), []):
handler(event)
# Использование
@dataclass
class UserCreated:
user_id: int
name: str
@dataclass
class OrderPlaced:
order_id: int
amount: float
bus = TypedEventBus()
bus.subscribe(UserCreated, lambda e: print(f"User {e.data.name} created"))
bus.subscribe(OrderPlaced, lambda e: print(f"Order ${e.data.amount}"))
bus.publish(Event(UserCreated(1, "Alice")))
bus.publish(Event(OrderPlaced(100, 99.99)))# ✅ Event Bus для UI
class UIEventBus:
def __init__(self):
self._handlers: Dict[str, List[Callable]] = defaultdict(list)
def on(self, event: str, handler: Callable):
self._handlers[event].append(handler)
def off(self, event: str, handler: Callable):
self._handlers[event].remove(handler)
def emit(self, event: str, **data):
for handler in self._handlers.get(event, []):
handler(**data)
# Использование
ui = UIEventBus()
ui.on("click", lambda x, y: print(f"Click at {x},{y}"))
ui.on("keypress", lambda key: print(f"Key: {key}"))
ui.emit("click", x=100, y=200) # Click at 100,200
ui.emit("keypress", key="Enter") # Key: Enter# ✅ Domain Events в DDD
from datetime import datetime
from typing import List
class DomainEvent:
def __init__(self, occurred_at: datetime = None):
self.occurred_at = occurred_at or datetime.now()
class OrderPlaced(DomainEvent):
def __init__(self, order_id: int, customer_id: int, total: float):
super().__init__()
self.order_id = order_id
self.customer_id = customer_id
self.total = total
class Order:
def __init__(self):
self._events: List[DomainEvent] = []
def place(self, customer_id: int, items: List[dict]):
total = sum(item['price'] * item['qty'] for item in items)
self._events.append(OrderPlaced(1, customer_id, total))
def get_events(self) -> List[DomainEvent]:
events = self._events.copy()
self._events.clear()
return events
class OrderEventHandler:
def __init__(self, event_bus: EventBus):
self._bus = event_bus
self._bus.subscribe("order.placed", self.handle)
def handle(self, event: OrderPlaced):
print(f"Order {event.order_id}: ${event.total}")
# Отправить email, обновить склад, etc.
# Использование
bus = EventBus()
handler = OrderEventHandler(bus)
order = Order()
order.place(123, [{"price": 10, "qty": 2}])
for event in order.get_events():
bus.publish("order.placed", event)# ✅ Асинхронный Event Bus
import asyncio
from asyncio import Queue
class AsyncEventBus:
def __init__(self):
self._queue: Queue = Queue()
async def publish(self, event: str, data: dict = None):
await self._queue.put((event, data))
async def subscribe(self, handler: Callable):
while True:
event, data = await self._queue.get()
await handler(event, data)
self._queue.task_done()
# Использование
async def main():
bus = AsyncEventBus()
async def handler(event: str, data: dict):
print(f"Received {event}: {data}")
# Запуск подписчика
consumer = asyncio.create_task(bus.subscribe(handler))
# Публикация событий
await bus.publish("user.created", {"name": "Alice"})
await bus.publish("order.placed", {"id": 1})
await asyncio.sleep(0.1)
consumer.cancel()
asyncio.run(main())| Аспект | Mediator | Event Bus |
|---|---|---|
| Связь | Центральная логика в медиаторе | Распределённая через события |
| Знания | Компоненты знают медиатор | Компоненты не знают друг о друге |
| Синхронность | Обычно синхронный | Может быть асинхронным |
| Сложность | Медиатор может стать God Object | Проще масштабировать |
# ✅ Слабые ссылки для избежания утечек памяти
import weakref
class EventBus:
def __init__(self):
self._subscribers: Dict[str, List[weakref.ref]] = defaultdict(list)
def subscribe(self, event: str, handler: Callable):
# Создаём слабую ссылку
ref = weakref.ref(handler, lambda r: self._cleanup(event, r))
self._subscribers[event].append(ref)
def publish(self, event: str, data: Any = None):
refs = self._subscribers.get(event, [])
for ref in refs:
handler = ref() # Получаем объект
if handler:
handler(data)
def _cleanup(self, event: str, ref: weakref.ref):
self._subscribers[event].remove(ref)
# Использование
bus = EventBus()
def handler(data):
print(f"Received: {data}")
bus.subscribe("test", handler)
bus.publish("test", "Hello")
# Если handler будет удалён сборщиком мусора,
# слабая ссылка станет None и будет очищена| Паттерн | Когда использовать | Pythonic-реализация |
|---|---|---|
| Mediator | Взаимодействие UI-компонентов, диалоги | Класс с notify() |
| Event Bus | События домена, асинхронная обработка | dict + списки handlers |
| asyncio.Queue | Асинхронные события | Queue + задачи |
Главный принцип: Mediator централизует логику, Event Bus распределяет через события.
Изучите тему Memento и сериализация для сохранения состояния объектов.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.