Ускорение работы: batching, caching, speculative decoding, continuous batching
Batching, caching, speculative decoding, continuous batching для ускорения работы моделей
Оптимизация inference критически важна для production систем с LLM. Без оптимизации:
Ключевые метрики:
Batching — объединение нескольких запросов в один batch для более эффективного использования GPU.
# Без batching — каждый запрос отдельно
for request in requests:
output = model.generate(request) # GPU простаивает между запросами
# С batching — обрабатываем вместе
batch = combine_requests(requests)
outputs = model.generate(batch) # Один проход через GPURequest A: [████████████████████] 50 tokens
Request B: [████████████████████] 50 tokens
Request C: [████] 10 tokens
Problem: Request C завершается раньше, но GPU ждёт завершения A и B
→ Waste ресурсов
from fastapi import FastAPI
import asyncio
from typing import List
from collections import deque
app = FastAPI()
class BatchProcessor:
def __init__(self, max_batch_size: int = 32, max_wait_ms: int = 10):
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue = deque()
self.lock = asyncio.Lock()
async def add_request(self, request: dict) -> str:
"""Добавить запрос в batch"""
future = asyncio.Future()
async with self.lock:
self.queue.append((request, future))
# Если batch заполнен — обрабатываем
if len(self.queue) >= self.max_batch_size:
asyncio.create_task(self.process_batch())
return await future
async def process_batch(self):
"""Обработка batch"""
async with self.lock:
batch = list(self.queue)[:self.max_batch_size]
# Удаляем обработанные из очереди
for _ in range(len(batch)):
self.queue.popleft()
requests = [item[0] for item in batch]
futures = [item[1] for item in batch]
# Генерация через модель
outputs = await self.model_generate_batch(requests)
# Возврат результатов
for future, output in zip(futures, outputs):
future.set_result(output)
batch_processor = BatchProcessor(max_batch_size=16)
@app.post("/generate")
async def generate(request: dict):
return await batch_processor.add_request(request)Continuous batching позволяет добавлять новые запросы в середину генерации.
Traditional batching:
Iter 1: [A1][B1][C1][D1]
Iter 2: [A2][B2][C2][D2]
Iter 3: [A3][B3][C3][D3]
Iter 4: [A4][B4][C4][D4]
Iter 5: [ B5][C5][D5] ← C завершился, место простаивает
Iter 6: [ B6][ D6] ← B завершился, место простаивает
Continuous batching:
Iter 1: [A1][B1][C1][D1]
Iter 2: [A2][B2][C2][D2]
Iter 3: [A3][B3][C3][D3]
Iter 4: [A4][B4][C4][D4]
Iter 5: [B5][C5][D5][E1] ← E добавлен вместо завершившегося A
Iter 6: [B6][D6][E2][F1] ← F добавлен вместо завершившегося C
from vllm import LLM, SamplingParams
llm = LLM(
model="meta-llama/Llama-2-7b-chat-hf",
gpu_memory_utilization=0.9,
max_num_seqs=256, # максимум одновременных запросов
max_num_batched_tokens=4096
)
sampling_params = SamplingParams(
temperature=0.7,
max_tokens=256
)
# Continuous batching работает автоматически
prompts = [f"Prompt {i}" for i in range(100)]
outputs = llm.generate(prompts, sampling_params)
# vLLM автоматически добавляет новые запросы по мере завершения старых| Метод | Throughput (req/sec) | GPU Utilization |
|---|---|---|
| No batching | 10 | 20% |
| Static batching | 40 | 60% |
| Continuous batching | 100+ | 90%+ |
Speculative decoding использует маленькую draft модель для генерации токенов, большую target модель для верификации.
Step 1: Draft model (small, fast) генерирует K токенов
"Hello → world → how → are → you"
Step 2: Target model (large, accurate) верифицирует все K токенов за ОДИН проход
Accept: "Hello", "world", "how"
Reject: "are" → resample
Step 3: Repeat
Naive decoding: N токенов = N passes через target model
Speculative decoding: N токенов ≈ N/K passes через target model
Speedup ≈ K * acceptance_rate
Пример: K=5, acceptance_rate=0.7 → speedup ≈ 3.5x
from vllm import LLM, SamplingParams
llm = LLM(
model="meta-llama/Llama-2-70b-chat-hf", # Target model
speculative_model="meta-llama/Llama-2-7b-chat-hf", # Draft model
num_speculative_tokens=5, # K токенов от draft модели
speculative_max_model_len=128
)
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
outputs = llm.generate(["Tell me a story"], sampling_params)KV cache хранит key-value states для предыдущих токенов, чтобы не пересчитывать их заново.
Запрос 1: [████████████] 1000 tokens
Запрос 2: [████] 200 tokens
Запрос 3: [████████████████] 1500 tokens
Память фрагментирована → нельзя добавить новый запрос
KV cache разбит на блоки (pages) по 16 токенов:
Запрос 1: [Page 1][Page 2][Page 3] (непрерывно в logical space)
Запрос 2: [Page 7] (может быть anywhere в physical memory)
Запрос 3: [Page 4][Page 5][Page 6]
Block table мапит logical → physical
from vllm import LLM
llm = LLM(
model="meta-llama/Llama-2-7b-chat-hf",
gpu_memory_utilization=0.9, # % GPU memory для KV cache + model
max_num_seqs=256,
max_num_batched_tokens=4096,
# KV cache автоматически управляется
)Caching ответов на частые запросы.
import redis
import hashlib
import json
from typing import Optional
class LLMCache:
def __init__(self, redis_url: str = "redis://localhost"):
self.redis = redis.from_url(redis_url)
self.ttl = 3600 # 1 hour
def _get_cache_key(self, prompt: str, model: str) -> str:
"""Генерация ключа кэша"""
content = f"{model}:{prompt}"
return f"llm:{hashlib.md5(content.encode()).hexdigest()}"
def get(self, prompt: str, model: str) -> Optional[str]:
"""Получение из кэша"""
key = self._get_cache_key(prompt, model)
cached = self.redis.get(key)
return cached.decode() if cached else None
def set(self, prompt: str, model: str, response: str):
"""Сохранение в кэш"""
key = self._get_cache_key(prompt, model)
self.redis.setex(key, self.ttl, response)
# Интеграция с FastAPI
from fastapi import FastAPI
app = FastAPI()
cache = LLMCache()
@app.post("/generate")
async def generate(prompt: str, model: str = "llama-3.1"):
# Check cache
cached_response = cache.get(prompt, model)
if cached_response:
return {"response": cached_response, "cached": True}
# Generate
response = await call_llm(prompt, model)
# Cache
cache.set(prompt, model, response)
return {"response": response, "cached": False}Кэширование семантически похожих запросов.
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class SemanticCache:
def __init__(self, similarity_threshold: float = 0.95):
self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
self.cache = {} # embedding -> response
self.threshold = similarity_threshold
def _get_embedding(self, text: str) -> np.ndarray:
return self.embedder.encode(text)
def get(self, prompt: str) -> str:
prompt_embedding = self._get_embedding(prompt)
for cached_prompt, (cached_embedding, response) in self.cache.items():
similarity = cosine_similarity(
[prompt_embedding],
[cached_embedding]
)[0][0]
if similarity >= self.threshold:
return response
return None
def set(self, prompt: str, response: str):
embedding = self._get_embedding(prompt)
self.cache[prompt] = (embedding, response)Разделение модели на несколько GPU.
from vllm import LLM
# 70B модель на 4 GPU
llm = LLM(
model="meta-llama/Llama-2-70b-chat-hf",
tensor_parallel_size=4 # 4 GPU для tensor parallelism
)Разделение по слоям модели.
GPU 0: Layers 1-20
GPU 1: Layers 21-40
GPU 2: Layers 41-60
GPU 3: Layers 61-80
Использование квантованных моделей для ускорения.
# AWQ 4-bit с vLLM
from vllm import LLM
llm = LLM(
model="mistralai/Mistral-7B-Instruct-v0.2",
quantization="awq" # 4-bit AWQ
)
# Speedup: 2-3x vs FP16
# Memory: 4GB vs 14GBfrom fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import asyncio
import uuid
app = FastAPI()
class GenerationRequest(BaseModel):
prompt: str
max_tokens: int = 256
class GenerationResponse(BaseModel):
task_id: str
status: str # "pending", "processing", "completed"
result: str = None
tasks = {}
async def process_long_task(task_id: str, prompt: str, max_tokens: int):
"""Фоновая обработка долгого запроса"""
tasks[task_id]["status"] = "processing"
# Имитация долгой генерации
await asyncio.sleep(5)
result = await call_llm(prompt, max_tokens=max_tokens)
tasks[task_id]["status"] = "completed"
tasks[task_id]["result"] = result
@app.post("/generate/async", response_model=GenerationResponse)
async def generate_async(request: GenerationRequest):
task_id = str(uuid.uuid4())
tasks[task_id] = {
"status": "pending",
"result": None
}
# Запуск фоновой задачи
asyncio.create_task(process_long_task(task_id, request.prompt, request.max_tokens))
return GenerationResponse(task_id=task_id, status="pending")
@app.get("/generate/status/{task_id}")
async def get_status(task_id: str):
if task_id not in tasks:
return {"error": "Task not found"}
return tasks[task_id]from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
import time
# Метрики
REQUEST_COUNT = Counter('llm_requests_total', 'Total LLM requests')
REQUEST_LATENCY = Histogram('llm_request_latency_seconds', 'LLM request latency')
TOKEN_COUNT = Counter('llm_tokens_total', 'Total tokens generated')
CACHE_HITS = Counter('llm_cache_hits_total', 'Cache hits')
@app.post("/generate")
async def generate(prompt: str):
start_time = time.time()
REQUEST_COUNT.inc()
# Check cache
cached = cache.get(prompt)
if cached:
CACHE_HITS.inc()
return {"response": cached, "cached": True}
# Generate
response = await call_llm(prompt)
# Metrics
latency = time.time() - start_time
REQUEST_LATENCY.observe(latency)
TOKEN_COUNT.inc(len(response.split()))
return {"response": response, "latency": latency}
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")| Metric | Target |
|---|---|
| p50 latency | <500ms |
| p95 latency | <2s |
| p99 latency | <5s |
| Throughput | 100+ req/sec |
| Cache hit rate | >30% |
| GPU utilization | >70% |
Оптимизация inference включает:
В следующей теме вы изучите RAG Integration — retrieval-augmented generation для работы с приватными данными.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.