Реактивное программирование через наблюдателей. Pub/Sub для распределённых систем. Слабые ссылки через weakref.
Observer превращает объекты в издателей событий. Pub/Sub превращает события в сообщения.
Observer — поведенческий паттерн, создающий механизм подписки, позволяющий одним объектам следить и реагировать на события в других объектах.
# ❌ ПЛОХО: Клиент должен опрашивать объект
class Stock:
def __init__(self, symbol: str):
self.symbol = symbol
self.price = 0
# Клиент должен постоянно проверять
stock = Stock("AAPL")
old_price = stock.price
while True:
if stock.price != old_price:
print(f"Price changed: {stock.price}")
old_price = stock.price
# Бесконечный цикл опроса!# ✅ ХОРОШО: Уведомления вместо опроса
from abc import ABC, abstractmethod
from typing import List
class Observer(ABC):
@abstractmethod
def update(self, subject: "Subject"):
pass
class Subject(ABC):
def __init__(self):
self._observers: List[Observer] = []
def attach(self, observer: Observer):
self._observers.append(observer)
def detach(self, observer: Observer):
self._observers.remove(observer)
def notify(self):
for observer in self._observers:
observer.update(self)
class Stock(Subject):
def __init__(self, symbol: str):
super().__init__()
self.symbol = symbol
self._price = 0
@property
def price(self):
return self._price
@price.setter
def price(self, value: float):
if value != self._price:
self._price = value
self.notify() # Уведомляем наблюдателей
class PriceLogger(Observer):
def update(self, subject: Stock):
print(f"[LOG] {subject.symbol}: ${subject.price}")
class PriceAlert(Observer):
def __init__(self, threshold: float):
self.threshold = threshold
def update(self, subject: Stock):
if subject.price > self.threshold:
print(f"ALERT: {subject.symbol} above ${self.threshold}!")
# Использование
stock = Stock("AAPL")
logger = PriceLogger()
alert = PriceAlert(150)
stock.attach(logger)
stock.attach(alert)
stock.price = 140 # [LOG] AAPL: $140
stock.price = 160 # [LOG] AAPL: $160, ALERT: AAPL above $150!# ✅ Проще: функции как наблюдатели
from typing import Callable, List
class Subject:
def __init__(self):
self._observers: List[Callable] = []
def subscribe(self, callback: Callable):
self._observers.append(callback)
def unsubscribe(self, callback: Callable):
self._observers.remove(callback)
def notify(self, data: dict = None):
for callback in self._observers:
callback(data)
class Stock:
def __init__(self, symbol: str):
self.symbol = symbol
self._subject = Subject()
self._price = 0
@property
def price(self):
return self._price
@price.setter
def price(self, value: float):
if value != self._price:
self._price = value
self._subject.notify({"symbol": self.symbol, "price": value})
def subscribe(self, callback: Callable):
self._subject.subscribe(callback)
# Использование
def log_price(data: dict):
print(f"{data['symbol']}: ${data['price']}")
def alert_high(data: dict):
if data['price'] > 150:
print(f"ALERT: {data['symbol']} at ${data['price']}!")
stock = Stock("AAPL")
stock.subscribe(log_price)
stock.subscribe(alert_high)
stock.price = 160
# AAPL: $160
# ALERT: AAPL at $160!# ✅ Pub/Sub с топиками
from typing import Dict, List, Callable, Any
from collections import defaultdict
class PubSub:
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
def subscribe(self, topic: str, handler: Callable[[Any], None]):
self._subscribers[topic].append(handler)
return self # Для цепочки
def unsubscribe(self, topic: str, handler: Callable):
self._subscribers[topic].remove(handler)
def publish(self, topic: str, data: Any = None):
for handler in self._subscribers.get(topic, []):
handler(data)
# Использование
pubsub = PubSub()
# Подписка на топики
pubsub.subscribe("stock.AAPL", lambda d: print(f"AAPL: ${d}"))
pubsub.subscribe("stock.GOOG", lambda d: print(f"GOOG: ${d}"))
pubsub.subscribe("stock.*", lambda d: print(f"Any stock: ${d}"))
# Публикация
pubsub.publish("stock.AAPL", 150)
# AAPL: $150
# Any stock: $150
pubsub.publish("stock.GOOG", 2800)
# GOOG: $2800
# Any stock: $2800# ✅ Подписка с шаблонами
import fnmatch
from typing import Dict, List, Callable, Any
class PubSubWithWildcards:
def __init__(self):
self._subscriptions: Dict[str, List[Callable]] = {}
def subscribe(self, pattern: str, handler: Callable):
if pattern not in self._subscriptions:
self._subscriptions[pattern] = []
self._subscriptions[pattern].append(handler)
def publish(self, topic: str, data: Any = None):
for pattern, handlers in self._subscriptions.items():
if fnmatch.fnmatch(topic, pattern):
for handler in handlers:
handler(topic, data)
# Использование
pubsub = PubSubWithWildcards()
pubsub.subscribe("stock.*", lambda t, d: print(f"Stock: {t} = ${d}"))
pubsub.subscribe("stock.AAPL", lambda t, d: print(f"AAPL specific: ${d}"))
pubsub.subscribe("bond.*", lambda t, d: print(f"Bond: {t} = ${d}"))
pubsub.publish("stock.AAPL", 150)
# Stock: stock.AAPL = $150
# AAPL specific: $150
pubsub.publish("bond.US10Y", 1.5)
# Bond: bond.US10Y = $1.5# ✅ События домена
from dataclasses import dataclass
from datetime import datetime
from typing import List
@dataclass
class DomainEvent:
occurred_at: datetime = None
def __post_init__(self):
if self.occurred_at is None:
self.occurred_at = datetime.now()
@dataclass
class OrderPlaced(DomainEvent):
order_id: int
customer_id: int
total: float
@dataclass
class OrderShipped(DomainEvent):
order_id: int
tracking_number: str
class EventBus:
def __init__(self):
self._handlers = {}
def subscribe(self, event_type: type, handler):
self._handlers.setdefault(event_type, []).append(handler)
def publish(self, event: DomainEvent):
event_type = type(event)
for handler in self._handlers.get(event_type, []):
handler(event)
class Order:
def __init__(self, bus: EventBus):
self._bus = bus
self._events: List[DomainEvent] = []
def place(self, customer_id: int, items: list):
total = sum(item['price'] * item['qty'] for item in items)
self._events.append(OrderPlaced(1, customer_id, total))
self._flush_events()
def ship(self, tracking: str):
self._events.append(OrderShipped(1, tracking))
self._flush_events()
def _flush_events(self):
for event in self._events:
self._bus.publish(event)
self._events.clear()
# Использование
bus = EventBus()
bus.subscribe(OrderPlaced, lambda e: print(f"Order {e.order_id}: ${e.total}"))
bus.subscribe(OrderShipped, lambda e: print(f"Order {e.order_id} shipped: {e.tracking_number}"))
order = Order(bus)
order.place(123, [{"price": 10, "qty": 2}])
order.ship("TRACK123")# ✅ Реактивные переменные
from typing import Callable, List
class ReactiveVar:
def __init__(self, value):
self._value = value
self._listeners: List[Callable] = []
@property
def value(self):
return self._value
@value.setter
def value(self, new_value):
if new_value != self._value:
self._value = new_value
self._notify()
def subscribe(self, callback: Callable):
self._listeners.append(callback)
def _notify(self):
for callback in self._listeners:
callback(self._value)
# Использование
name = ReactiveVar("")
greeting = ReactiveVar("")
# Вычисляемое свойство
def update_greeting(value):
greeting.value = f"Hello, {value}!" if value else ""
name.subscribe(update_greeting)
name.value = "Alice" # greeting.value = "Hello, Alice!"
name.value = "Bob" # greeting.value = "Hello, Bob!"# ✅ Pub/Sub для WebSocket
import asyncio
from asyncio import Queue
from typing import Set
class WebSocketPubSub:
def __init__(self):
self._clients: Set = set()
self._queue: Queue = Queue()
def add_client(self, websocket):
self._clients.add(websocket)
def remove_client(self, websocket):
self._clients.discard(websocket)
async def publish(self, message: dict):
for client in self._clients:
await client.send_json(message)
async def run(self):
while True:
message = await self._queue.get()
await self.publish(message)
# В реальном приложении с websockets
# каждый подключённый клиент получает уведомления# ✅ Слабые ссылки для избежания утечек
import weakref
from typing import List
class Subject:
def __init__(self):
self._observers: List[weakref.ref] = []
def attach(self, observer):
# Создаём слабую ссылку с callback для очистки
ref = weakref.ref(observer, lambda r: self._cleanup(r))
self._observers.append(ref)
def notify(self):
for ref in self._observers:
observer = ref() # Получаем объект
if observer:
observer.update(self)
def _cleanup(self, ref: weakref.ref):
self._observers.remove(ref)
# Если наблюдатель удалён, слабая ссылка станет None
# и будет автоматически удалена из списка# ✅ Асинхронный Pub/Sub
import asyncio
from asyncio import Queue
class AsyncPubSub:
def __init__(self):
self._queues: dict = {}
def subscribe(self, topic: str) -> Queue:
if topic not in self._queues:
self._queues[topic] = Queue()
return self._queues[topic]
async def publish(self, topic: str, data: dict):
if topic in self._queues:
await self._queues[topic].put(data)
async def run_subscriber(self, topic: str, handler):
queue = self.subscribe(topic)
while True:
data = await queue.get()
await handler(data)
queue.task_done()
# Использование
async def main():
pubsub = AsyncPubSub()
async def handler(data):
print(f"Received: {data}")
# Запуск подписчика
subscriber = asyncio.create_task(
pubsub.run_subscriber("events", handler)
)
# Публикация
await pubsub.publish("events", {"type": "test"})
await asyncio.sleep(0.1)
subscriber.cancel()
asyncio.run(main())| Паттерн | Когда использовать | Pythonic-реализация |
|---|---|---|
| Observer (класс) | Формальная подписка, несколько наблюдателей | Список observers + notify() |
| Observer (callable) | Простые уведомления | Список callback-функций |
| Pub/Sub | События с топиками, распределённые системы | dict + списки handlers |
| weakref | Избегание утечек памяти | weakref.ref + callback |
Главный принцип: Observer уведомляет подписчиков об изменениях, Pub/Sub распределяет события по топикам.
Изучите тему State и конечные автоматы для управления состоянием.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.