Flow control, bounded queues, reactive streams, Kafka patterns
«Быстрый производитель должен ждать медленного потребителя». Backpressure предотвращает перегрузку системы.
Producer (1000 msg/s) → Queue → Consumer (100 msg/s)
Без backpressure:
- Очередь растёт бесконечно
- Память исчерпывается
- Система падает
С backpressure:
- Очередь сигнализирует producer замедлиться
- Producer снижает темп до 100 msg/s
- Система стабильна
import queue
# ❌ Опасно: очередь растёт бесконечно
unbounded_queue = queue.Queue()
def producer():
while True:
message = generate_message()
unbounded_queue.put(message) # Никогда не блокируется!
# Producer работает на полной скорости
def consumer():
while True:
message = unbounded_queue.get()
process(message) # Медленная обработка
# Consumer не успевает → очередь растёт → OOMПроблема: при разнице производительности producer/consumer очередь растёт до OOM.
import queue
# ✅ Безопасно: очередь с лимитом
bounded_queue = queue.Queue(maxsize=1000)
def producer():
while True:
message = generate_message()
# Блокируется, если очередь заполнена
bounded_queue.put(message) # Блокирующий put
# Producer ждёт, пока consumer освободит место
def consumer():
while True:
message = bounded_queue.get()
process(message)
bounded_queue.task_done()Преимущество: backpressure — producer автоматически замедляется при заполнении очереди.
import asyncio
class AsyncBoundedQueue:
"""Асинхронная ограниченная очередь."""
def __init__(self, maxsize=1000):
self.queue = asyncio.Queue(maxsize=maxsize)
async def put(self, item):
"""Блокирует, если очередь заполнена."""
await self.queue.put(item)
async def get(self):
"""Блокирует, если очередь пуста."""
return await self.queue.get()
def qsize(self):
return self.queue.qsize()
def full(self):
return self.queue.full()
# Использование
async def producer(queue, num_messages=10000):
for i in range(num_messages):
message = f"Message {i}"
# Блокируется при заполнении очереди
await queue.put(message)
if i % 1000 == 0:
print(f"Produced {i}, queue size: {queue.qsize()}")
async def consumer(queue):
processed = 0
while True:
message = await queue.get()
# Медленная обработка
await asyncio.sleep(0.01)
process(message)
processed += 1
queue.task_done()
if processed % 1000 == 0:
print(f"Consumed {processed}, queue size: {queue.qsize()}")
async def main():
queue = AsyncBoundedQueue(maxsize=100)
# Producer будет ждать, когда очередь заполнится
await asyncio.gather(
producer(queue),
consumer(queue)
)
# asyncio.run(main())Producer блокируется при заполнении очереди.
import threading
import queue
class BlockingBackpressureQueue:
"""Очередь с блокирующим backpressure."""
def __init__(self, maxsize=1000):
self.queue = queue.Queue(maxsize=maxsize)
self.closed = False
def produce(self, item, timeout=None):
"""
Производит элемент.
Блокируется, если очередь заполнена.
"""
if self.closed:
raise RuntimeError("Queue is closed")
try:
self.queue.put(item, block=True, timeout=timeout)
return True
except queue.Full:
# Таймаут истёк
return False
def consume(self, timeout=None):
"""Потребляет элемент. Блокируется, если очередь пуста."""
try:
return self.queue.get(block=True, timeout=timeout)
except queue.Empty:
return None
def close(self):
self.closed = True
# Использование
bp_queue = BlockingBackpressureQueue(maxsize=100)
def fast_producer():
for i in range(10000):
# Блокируется, когда очередь заполнена
success = bp_queue.produce(i, timeout=1.0)
if not success:
print("Producer blocked, waiting...")
bp_queue.produce(i) # Ждём сколько нужно
def slow_consumer():
while True:
item = bp_queue.consume()
if item is None:
continue
# Медленная обработка
import time
time.sleep(0.1)
process(item)Producer явно ограничивает скорость.
import time
from collections import deque
class RateLimitedQueue:
"""Очередь с ограничением скорости producer."""
def __init__(self, maxsize=1000, rate_limit=100):
"""
Args:
maxsize: Максимальный размер очереди
rate_limit: Максимум сообщений в секунду
"""
self.queue = queue.Queue(maxsize=maxsize)
self.rate_limit = rate_limit
self.timestamps = deque()
def produce(self, item):
"""Производит с ограничением скорости."""
now = time.time()
# Удаляем старые timestamps (старше 1 секунды)
while self.timestamps and self.timestamps[0] < now - 1:
self.timestamps.popleft()
# Проверяем rate limit
if len(self.timestamps) >= self.rate_limit:
# Ждём, пока освободится «слот»
sleep_time = self.timestamps[0] + 1 - now
if sleep_time > 0:
time.sleep(sleep_time)
now = time.time()
while self.timestamps and self.timestamps[0] < now - 1:
self.timestamps.popleft()
# Проверяем размер очереди
if self.queue.full():
# Очередь заполнена — ждём
self.queue.put(item, block=True)
else:
self.queue.put(item, block=False)
self.timestamps.append(time.time())
def consume(self):
return self.queue.get()
# Использование
rl_queue = RateLimitedQueue(maxsize=100, rate_limit=100)
def producer():
for i in range(10000):
# Не быстрее 100 msg/s
rl_queue.produce(i)
def consumer():
while True:
item = rl_queue.consume()
process(item)При переполнении новые сообщения отбрасываются.
class DropPolicyQueue:
"""Очередь с политикой отбрасывания."""
def __init__(self, maxsize=1000, drop_policy='tail'):
"""
Args:
drop_policy: 'tail' (отбрасывать новые) или 'head' (отбрасывать старые)
"""
self.queue = deque(maxlen=maxsize)
self.drop_policy = drop_policy
self.dropped_count = 0
def produce(self, item):
"""Производит, возможно отбрасывая элементы."""
if len(self.queue) >= self.queue.maxlen:
self.dropped_count += 1
if self.drop_policy == 'head':
# Отбрасываем старый элемент
self.queue.popleft()
elif self.drop_policy == 'tail':
# Отбрасываем новый элемент
return False
self.queue.append(item)
return True
def consume(self):
if not self.queue:
return None
return self.queue.popleft()
def dropped(self):
return self.dropped_count
# Использование для метрик (допустима потеря)
metrics_queue = DropPolicyQueue(maxsize=1000, drop_policy='tail')
def produce_metrics():
for metric in metrics_stream:
# Если очередь заполнена, метрика отбрасывается
metrics_queue.produce(metric)
def consume_metrics():
while True:
metric = metrics_queue.consume()
if metric:
send_to_prometheus(metric)
print(f"Dropped metrics: {metrics_queue.dropped()}")Kafka предоставляет встроенный backpressure через consumer lag.
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient
class KafkaBackpressureConsumer:
"""Kafka consumer с monitoring backpressure."""
def __init__(self, topic, bootstrap_servers):
self.topic = topic
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=False,
max_poll_records=100, # Ограничиваем batch
max_poll_interval_ms=300000
)
self.admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
def get_consumer_lag(self):
"""
Вычисляет consumer lag — разницу между producer offset и consumer offset.
Lag > 10000 — producer быстрее consumer, нужен backpressure
"""
partitions = self.consumer.partitions_for_topic(self.topic)
total_lag = 0
for partition in partitions:
# Получаем последний offset
end_offset = self.consumer.end_offsets([
TopicPartition(self.topic, partition)
])[TopicPartition(self.topic, partition)]
# Получаем текущий consumer offset
current_offset = self.consumer.position(
TopicPartition(self.topic, partition)
)
lag = end_offset - current_offset
total_lag += lag
return total_lag
def consume_with_backpressure(self, process_func, max_lag=10000):
"""
Потребляет с backpressure.
Если lag > max_lag, замедляем потребление.
"""
import time
for message in self.consumer:
lag = self.get_consumer_lag()
if lag > max_lag:
# Consumer не успевает — замедляемся
sleep_time = (lag - max_lag) / 1000 # 1 мс на каждый лишний message
time.sleep(min(sleep_time, 1.0))
try:
process_func(message)
self.consumer.commit()
except Exception as e:
logger.error(f"Error processing message: {e}")
# Не commit — сообщение будет обработано сноваclass BackpressureKafkaProducer:
"""Kafka producer с backpressure."""
def __init__(self, topic, bootstrap_servers, max_inflight=1000):
self.topic = topic
self.max_inflight = max_inflight
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
acks='all', # Ждём подтверждения от всех реплик
retries=3,
max_in_flight_requests_per_connection=5
)
self.inflight = 0
def produce(self, key, value, callback=None):
"""
Производит сообщение с backpressure.
Блокируется, если слишком много inflight сообщений.
"""
import time
while self.inflight >= self.max_inflight:
# Ждём, пока некоторые сообщения будут подтверждены
self.producer.poll(timeout_ms=100)
time.sleep(0.1)
def on_send(record_metadata, exception=None):
self.inflight -= 1
if callback:
callback(record_metadata, exception)
future = self.producer.send(self.topic, key=key, value=value)
future.add_callback(on_send)
self.inflight += 1
def flush(self):
"""Ждём подтверждения всех сообщений."""
self.producer.flush()Reactive streams предоставляют встроенный backpressure через request(n).
import asyncio
from asyncio import Queue
class ReactiveStreamConsumer:
"""
Consumer с явным запросом элементов (request-based backpressure).
Consumer сам контролирует, сколько элементов готов обработать.
"""
def __init__(self, queue_size=100):
self.queue = asyncio.Queue(maxsize=queue_size)
self.buffer = []
self.requested = 0
self.completed = False
async def produce(self, item):
"""Producer добавляет элемент, блокируется при заполнении."""
await self.queue.put(item)
def request(self, n):
"""Consumer запрашивает n элементов."""
self.requested += n
async def consume_batch(self, batch_size=10):
"""
Потребляет batch элементов.
Возвращает пустой список, если нет запрошенных элементов.
"""
if self.requested <= 0:
await asyncio.sleep(0.1)
return []
batch = []
while len(batch) < min(batch_size, self.requested) and not self.queue.empty():
item = await self.queue.get()
batch.append(item)
self.requested -= 1
return batch
async def produce_stream(self, producer_func, total_items):
"""Производит поток с backpressure."""
for i in range(total_items):
item = producer_func(i)
await self.produce(item)
# Если очередь заполнена, producer ждёт
# Это backpressure!
self.completed = True
# Использование
async def main():
stream = ReactiveStreamConsumer(queue_size=100)
async def producer():
for i in range(10000):
await stream.produce(f"Item {i}")
async def consumer():
processed = 0
while not stream.completed or not stream.queue.empty():
# Consumer запрашивает 10 элементов за раз
stream.request(10)
batch = await stream.consume_batch(batch_size=10)
# Обрабатываем batch
for item in batch:
process(item)
processed += 1
print(f"Processed {processed} items")
await asyncio.gather(producer(), consumer())
# asyncio.run(main())from prometheus_client import Gauge, Counter
QUEUE_SIZE = Gauge('queue_size', 'Current queue size', ['queue_name'])
QUEUE_DROPPED = Counter('queue_dropped_total', 'Total dropped items', ['queue_name'])
BACKPRESSURE_TIME = Gauge('backpressure_seconds', 'Time spent in backpressure', ['queue_name'])
class MonitoredQueue:
"""Очередь с метриками backpressure."""
def __init__(self, name, maxsize=1000):
self.name = name
self.queue = asyncio.Queue(maxsize=maxsize)
self.dropped = 0
self.backpressure_start = None
async def put(self, item):
"""Put с monitoring backpressure."""
if self.queue.full():
if self.backpressure_start is None:
self.backpressure_start = time.time()
await self.queue.put(item)
if self.backpressure_start is not None:
duration = time.time() - self.backpressure_start
BACKPRESSURE_TIME.labels(queue_name=self.name).set(duration)
self.backpressure_start = None
QUEUE_SIZE.labels(queue_name=self.name).set(self.queue.qsize())
async def get(self):
item = await self.queue.get()
QUEUE_SIZE.labels(queue_name=self.name).set(self.queue.qsize())
return item
def drop(self):
"""Record dropped item."""
self.dropped += 1
QUEUE_DROPPED.labels(queue_name=self.name).inc()
# Алерты
# queue_size / queue_max_size > 0.9 for 5m
# rate(queue_dropped_total[5m]) > 10
# backpressure_seconds > 1# ❌ Плохо
queue = asyncio.Queue() # Неограниченная
# ✅ Хорошо
queue = asyncio.Queue(maxsize=1000) # Ограниченная# Метрики — допустима потеря старых
metrics_queue = DropPolicyQueue(drop_policy='head')
# Логи — допустима потеря новых при перегрузке
logs_queue = DropPolicyQueue(drop_policy='tail')
# Платежи — не допустима потеря, используйте persistent queue
payments_queue = KafkaBackpressureConsumer('payments')# Для Kafka
lag = consumer.get_consumer_lag()
if lag > 10000:
# Scale up consumers или slow down producer
alert(f"High consumer lag: {lag}")# Reactive streams: consumer контролирует скорость
stream.request(10) # Готов обработать 10 элементов
batch = await stream.consume_batch(10)В следующей теме рассмотрим адаптивные системы с rate limiting и load shedding.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.