CPU/I/O-bound, GIL, кэширование, алгоритмы, C-расширения, async vs threading
Правило оптимизации: измеряй → анализируй → оптимизируй → повторяй. Никогда не оптимизируй вслепую.
Прежде чем оптимизировать — определите тип узкого места:
| Тип | Характеристика | Примеры | Лучшая стратегия |
|---|---|---|---|
| CPU-bound | Процессор загружен на 100% | Математика, шифрование, обработка изображений, ML | multiprocessing, numba, Cython |
| I/O-bound | Процессор ждёт (диск, сеть, БД) | HTTP-запросы, файлы, запросы к БД | asyncio, aiohttp, asyncpg |
| Memory-bound | Узкое место — пропускная способность памяти | Обработка больших массивов | numpy, array, __slots__ |
import time
# CPU-bound: процессор не простаивает
def cpu_task(n):
return sum(i * i for i in range(n))
# I/O-bound: большую часть времени ждём
def io_task(url):
import urllib.request
return urllib.request.urlopen(url).read()GIL (Global Interpreter Lock) — мьютекс, позволяющий только одному потоку выполнять байткод Python в каждый момент времени.
| Подход | Когда использовать | Обходит GIL? |
|---|---|---|
threading | I/O-bound задачи | Частично (GIL снимается при I/O) |
multiprocessing | CPU-bound задачи | Да (отдельные процессы) |
asyncio | Много I/O-ожиданий | Нет (один поток) |
concurrent.futures | Удобный API над threading/multiprocessing | Зависит от executor |
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import asyncio
# CPU-bound: multiprocessing
def heavy_computation(n):
return sum(i * i for i in range(n))
# Используем все ядра процессора
with ProcessPoolExecutor() as executor:
results = list(executor.map(heavy_computation, [10**6] * 4))
# I/O-bound: threading или asyncio
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
results = [f.result() for f in futures]
# Async для максимального числа одновременных I/O
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)С Python 3.13 появилась экспериментальная сборка без GIL (--disable-gil). Потоки действительно выполняются параллельно для CPU-bound задач. Пока нестабильно для production.
Изменение алгоритма даёт 10-1000x улучшение. Низкоуровневые оптимизации — 10-50%.
| Алгоритм | Сложность | n=1000 | n=1_000_000 |
|---|---|---|---|
| Hash lookup | O(1) | ~0.1 мкс | ~0.1 мкс |
| Binary search | O(log n) | ~10 шагов | ~20 шагов |
| Linear search | O(n) | 1000 шагов | 1M шагов |
| Bubble sort | O(n²) | 1M шагов | 10¹² шагов |
# O(n) vs O(1) — разница в 50 000 раз
lst = list(range(100_000))
s = set(lst)
import timeit
print(timeit.timeit('99999 in lst', globals=globals(), number=1000)) # ~5 ms
print(timeit.timeit('99999 in s', globals=globals(), number=1000)) # ~0.01 ms
# Используйте set/dict для частых проверок наличия
seen = set()
for item in data:
if item not in seen:
seen.add(item)
process(item)
# bisect для отсортированных данных
import bisect
sorted_data = sorted(range(100_000))
idx = bisect.bisect_left(sorted_data, 42) # O(log n) вместо O(n)import timeit
# Медленно: каждое обращение к math.sqrt — поиск в глобальном словаре
import math
def slow():
for x in range(1000):
math.sqrt(x)
# Быстро: локальная ссылка — LOAD_FAST вместо LOAD_GLOBAL
def fast():
sqrt = math.sqrt # локальная ссылка
for x in range(1000):
sqrt(x)
# fast() примерно на 20-30% быстрееstr.join() vs конкатенацияwords = ['Hello', 'World'] * 1000
# O(n²) — каждая + создаёт новую строку
slow = ''
for w in words:
slow += w + ' '
# O(n) — создаётся один раз
fast = ' '.join(words)# Медленнее — накладные расходы на метод append
result = []
for x in range(10000):
if x % 2 == 0:
result.append(x * x)
# Быстрее — всё в одном выражении
result = [x * x for x in range(10000) if x % 2 == 0]in для set и dict — O(1)# Медленно для больших данных
if item in large_list: # O(n)
# Быстро всегда
if item in large_set: # O(1)
if item in large_dict: # O(1) по ключуfunctools.lru_cache / functools.cachefrom functools import lru_cache, cache
# lru_cache — ограниченный кэш (LRU eviction)
@lru_cache(maxsize=128)
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
fibonacci(100) # мгновенно после прогрева
fibonacci.cache_info() # CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)
fibonacci.cache_clear() # очистить кэш
# cache (Python 3.9+) — без ограничения размера
@cache
def factorial(n):
return 1 if n <= 1 else n * factorial(n - 1)
# Важно: lru_cache работает только с hashable аргументами!
@lru_cache(maxsize=None)
def compute(x, y): # x, y должны быть hashable
return x * yfunctools.cached_property — ленивое вычислениеfrom functools import cached_property
class Circle:
def __init__(self, radius):
self.radius = radius
@cached_property
def area(self):
import math
return math.pi * self.radius ** 2 # вычисляется один раз при первом обращении
c = Circle(5)
c.area # вычисляется
c.area # берётся из кэшаimport redis
import json
r = redis.Redis(host='localhost', port=6379)
def get_user(user_id: int) -> dict:
cache_key = f"user:{user_id}"
# Проверяем кэш
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Загружаем из БД
user = db.query(User).filter_by(id=user_id).first()
data = user.to_dict()
# Кэшируем на 1 час
r.setex(cache_key, 3600, json.dumps(data))
return data__slots__ — экономия памяти для классовclass RegularPoint:
def __init__(self, x, y):
self.x = x
self.y = y
# каждый экземпляр имеет __dict__ (~200 байт)
class SlottedPoint:
__slots__ = ['x', 'y']
def __init__(self, x, y):
self.x = x
self.y = y
# без __dict__, ~50 байт на экземпляр
# При 1 миллионе экземпляров: разница в ~150 МБ
import sys
r = RegularPoint(1, 2)
s = SlottedPoint(1, 2)
print(sys.getsizeof(r)) # ~48 bytes (только объект, не __dict__)
print(sys.getsizeof(r.__dict__)) # ~104 bytes — словарь атрибутовarray vs list для числовых данныхimport array
import sys
# list — каждый элемент PyObject* (56 байт для int)
lst = list(range(1_000_000))
print(sys.getsizeof(lst)) # ~8.5 MB + данные
# array — компактное представление C-типов
arr = array.array('i', range(1_000_000)) # 'i' = signed int
print(sys.getsizeof(arr)) # ~4 MB
# NumPy — ещё эффективнее
import numpy as np
np_arr = np.arange(1_000_000, dtype=np.int32)
print(np_arr.nbytes) # 4 MB + незначительный overhead# 100 MB данных в памяти сразу
def process_v1(filename):
lines = open(filename).readlines() # вся файла в памяти
return [line.strip() for line in lines]
# O(1) памяти — обрабатываем построчно
def process_v2(filename):
with open(filename) as f:
for line in f: # итератор строк
yield line.strip() # обрабатываем по однойimport numpy as np
import timeit
# Python цикл — медленно
def python_sum(n):
return sum(i * i for i in range(n))
# NumPy — векторизованная операция
def numpy_sum(n):
arr = np.arange(n)
return (arr * arr).sum()
# Разница: numpy_sum в 100-500 раз быстрее для больших n
n = 1_000_000
print(timeit.timeit(lambda: python_sum(n), number=3)) # ~0.3 sec
print(timeit.timeit(lambda: numpy_sum(n), number=3)) # ~0.003 sec
# Broadcasting — операции без циклов
a = np.array([1, 2, 3, 4, 5])
b = np.array([10, 20, 30, 40, 50])
result = a * b + a # векторизованно: [11, 42, 93, 164, 255]from numba import jit, prange
import numpy as np
@jit(nopython=True) # компилирует в машинный код при первом вызове
def matrix_multiply(a, b):
n = len(a)
result = np.zeros((n, n))
for i in range(n):
for j in range(n):
for k in range(n):
result[i, j] += a[i, k] * b[k, j]
return result
# Первый вызов: медленно (компиляция)
# Последующие: ~100x быстрее чистого Python# math_utils.pyx
def fast_sum(int n): # аннотации типов = C-код
cdef int i
cdef long total = 0
for i in range(n):
total += i * i
return total
# После компиляции: скорость C-кода с Python-интерфейсомimport ctypes
import os
# Загружаем системную библиотеку
libc = ctypes.CDLL("libc.so.6")
# Вызываем C функцию
result = libc.strlen(b"hello") # 5# cProfile — где тратится время
import cProfile
cProfile.run('my_function()', sort='cumulative')
# timeit — точные измерения
import timeit
t = timeit.timeit(
'sum(x**2 for x in range(1000))',
number=10000
)
print(f"{t:.3f} sec для 10000 итераций")
# memory_profiler — потребление памяти по строкам
from memory_profiler import profile
@profile
def memory_heavy():
data = [0] * 1_000_000
return sum(data)
# tracemalloc — встроенное отслеживание
import tracemalloc
tracemalloc.start()
# ... ваш код ...
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:3]:
print(stat)| Проблема | Решение |
|---|---|
| Поиск в большом списке | set или dict |
| Строковая конкатенация в цикле | str.join() |
| Повторные вычисления | @lru_cache / @cache |
| CPU-bound задачи | multiprocessing / ProcessPoolExecutor |
| I/O-bound задачи | asyncio / ThreadPoolExecutor |
| Много экземпляров класса | __slots__ |
| Числовые вычисления | numpy / numba |
| Большие файлы | Генераторы (построчно) |
| Медленный алгоритм | Сначала улучши алгоритм! |
# 1. Оптимизация без измерений (premature optimization)
# Сначала: cProfile → находим узкое место → оптимизируем
# 2. Строковая конкатенация в цикле
result = ""
for item in huge_list:
result += str(item) # O(n^2)! Используй: "".join(str(i) for i in huge_list)
# 3. Поиск в списке вместо множества
if x in [1, 2, 3, 4, 5, ...1000]: # O(n)
if x in {1, 2, 3, 4, 5, ...1000}: # O(1)
# 4. Повторные вычисления без кэширования
for i in range(n):
if expensive_check(i): # вызывается n раз!
...
# Решение: кэшировать результатconcurrent.futures — унифицированный APIfrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import urllib.request
# ThreadPoolExecutor — для I/O-bound задач
def download(url):
with urllib.request.urlopen(url) as r:
return r.read()
urls = ["https://example.com"] * 20
with ThreadPoolExecutor(max_workers=10) as executor:
# map — сохраняет порядок, блокирует до завершения
results = list(executor.map(download, urls))
# submit + as_completed — обрабатываем по мере завершения
futures = {executor.submit(download, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
data = future.result()
print(f"{url}: {len(data)} bytes")
except Exception as e:
print(f"{url} failed: {e}")
# ProcessPoolExecutor — для CPU-bound задач
def compute(n):
return sum(i*i for i in range(n))
with ProcessPoolExecutor() as executor:
results = list(executor.map(compute, [10**6] * 4))from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
async def pipeline(items):
"""I/O в потоках, CPU в процессах, координация через asyncio."""
loop = asyncio.get_running_loop()
# Шаг 1: загрузка (I/O-bound) — в потоках
with ThreadPoolExecutor(max_workers=20) as io_pool:
raw_data = await asyncio.gather(*[
loop.run_in_executor(io_pool, fetch_item, item)
for item in items
])
# Шаг 2: обработка (CPU-bound) — в процессах
with ProcessPoolExecutor() as cpu_pool:
processed = await asyncio.gather(*[
loop.run_in_executor(cpu_pool, heavy_process, data)
for data in raw_data
])
return processedfrom collections import deque, defaultdict, Counter
import heapq
# deque — O(1) добавление/удаление с обоих концов
# list — O(n) для appendleft/popleft
dq = deque(maxlen=100) # FIFO с автоматическим выталкиванием старых
dq.append(1) # O(1)
dq.appendleft(0) # O(1)
# defaultdict — избегает KeyError проверок
graph = defaultdict(list)
graph['a'].append('b') # не нужен 'if a not in graph'
# Counter — частотный анализ
words = ["the", "quick", "the", "fox", "the"]
freq = Counter(words)
freq.most_common(2) # [('the', 3), ('quick', 1)]
freq['the'] # 3
# heapq — приоритетная очередь (мин-куча)
heap = []
heapq.heappush(heap, (priority, item))
priority, item = heapq.heappop(heap) # O(log n)
# Для N наименьших/наибольших:
top5 = heapq.nlargest(5, data, key=lambda x: x.score)# PyPy — альтернативная реализация CPython с JIT
# Установка: pypy3 вместо python3
# Совместим с большинством Python кода
# Когда PyPy быстрее:
# - Длинные циклы
# - Числовые вычисления без numpy
# - Интерпретируемый код
# Когда PyPy хуже:
# - C-расширения (numpy, scipy) — могут быть медленнее
# - Короткие скрипты (JIT не прогревается)
# - Память: PyPy обычно потребляет больше
# Benchmark: loop 10^8 iterations
# CPython: ~30 sec
# PyPy: ~1 sec (30x speedup!)cProfile и найдите узкое место.list.index() и dict lookup для 10^6 элементов.collections.OrderedDict.__slots__ и без.numpy и измерьте разницу.ProcessPoolExecutor для параллельной обработки списка файлов — измерьте speedup на разном числе воркеров.tracemalloc.Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.