Кэширование запросов, балансировка нагрузки, логирование метрик, обработка ошибок. Практики для надёжных систем.
Кэширование запросов, балансировка нагрузки, логирование метрик, обработка ошибок. Практики для надёжных систем.
За 60 минут вы:
import time
import hashlib
import numpy as np
from collections import OrderedDict
from typing import Optional, Tuple
class TTLCache:
"""Кэш с ограничением по времени и размеру."""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
self.cache: OrderedDict = OrderedDict()
self.max_size = max_size
self.ttl = ttl_seconds
def _hash_query(self, query_embedding: np.ndarray) -> str:
"""Хэш вектора запроса."""
normalized = query_embedding / np.linalg.norm(query_embedding)
return hashlib.md5(normalized.tobytes()).hexdigest()
def get(self, query_embedding: np.ndarray) -> Optional[Tuple]:
"""Получить значение, если не истёк TTL."""
key = self._hash_query(query_embedding)
if key not in self.cache:
return None
value, timestamp = self.cache[key]
if time.time() - timestamp > self.ttl:
del self.cache[key]
return None
# Поднять в начало (LRU)
self.cache.move_to_end(key)
return value
def set(self, query_embedding: np.ndarray, result: Tuple):
"""Сохранить значение."""
key = self._hash_query(query_embedding)
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = (result, time.time())
# Удаление старых записей
while len(self.cache) > self.max_size:
self.cache.popitem(last=False)
# Использование
query_cache = TTLCache(max_size=5000, ttl_seconds=1800) # 30 минут
def search_with_cache(query_embedding, index, k=5):
"""Поиск с кэшированием."""
cached = query_cache.get(query_embedding)
if cached is not None:
return cached # Из кэша!
distances, indices = index.search(query_embedding, k)
result = (distances, indices)
query_cache.set(query_embedding, result)
return resultimport logging
import time
from functools import wraps
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class SearchMetrics:
"""Метрики поискового запроса."""
timestamp: str
query_id: str
latency_ms: float
results_count: int
top_score: float
class SearchMonitor:
"""Мониторинг поисковых запросов."""
def __init__(self, logger_name: str = "vector_search"):
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler("search_metrics.log")
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
self.logger.addHandler(handler)
def track_search(self, func):
"""Декоратор для отслеживания поиска."""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
query_id = f"{time.time()}-{id(args[0])}"
try:
result = func(*args, **kwargs)
latency = (time.time() - start) * 1000
metrics = SearchMetrics(
timestamp=datetime.now().isoformat(),
query_id=query_id,
latency_ms=round(latency, 2),
results_count=len(result) if hasattr(result, '__len__') else 1,
top_score=result[0].get('score', 0) if result and isinstance(result[0], dict) else 0
)
self.logger.info(asdict(metrics))
return result
except Exception as e:
self.logger.error(f"Search error: {query_id} - {e}")
raise
return wrapper
# Использование
monitor = SearchMonitor()
class SearchService:
@monitor.track_search
def search(self, query: str, k: int = 5):
# Логика поиска
return resultsfrom typing import Optional, List
import logging
logger = logging.getLogger(__name__)
class ResilientSearch:
"""Поиск с обработкой ошибок."""
def __init__(self, index, documents, fallback_results: int = 3):
self.index = index
self.documents = documents
self.fallback_results = fallback_results
def search(self, query_embedding, k: int = 5) -> List[dict]:
"""Поиск с обработкой ошибок."""
try:
distances, indices = self.index.search(query_embedding, k)
results = []
for idx, score in zip(indices[0], distances[0]):
if idx < len(self.documents):
results.append({
"text": self.documents[idx],
"score": float(score)
})
return results
except Exception as e:
logger.error(f"Search failed: {e}")
# Graceful degradation: вернуть популярные документы
return self._get_fallback_results()
def _get_fallback_results(self) -> List[dict]:
"""Резервные результаты при ошибке."""
return [
{"text": doc, "score": 0.0, "fallback": True}
for doc in self.documents[:self.fallback_results]
]import threading
from typing import List
class LoadBalancedSearch:
"""Балансировка нагрузки между несколькими индексами."""
def __init__(self, indices: List):
self.indices = indices
self.current_index = 0
self.lock = threading.Lock()
def _get_next_index(self):
"""Round-robin выбор индекса."""
with self.lock:
index = self.indices[self.current_index]
self.current_index = (self.current_index + 1) % len(self.indices)
return index
def search(self, query_embedding, k=5):
"""Поиск с балансировкой."""
index = self._get_next_index()
return index.search(query_embedding, k)
# Для реального продакшена используйте:
# - Redis для распределённого кэша
# - Nginx/HAProxy для балансировки HTTP запросов
# - Kubernetes для оркестрации контейнеровСледующая тема — Финальный проект.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.