Протокол with, __enter__, __exit__, contextlib
Цель: гарантированное освобождение ресурсов (файлы, соединения, блокировки), даже при исключениях.
Объект является контекстным менеджером, если реализует два метода:
__enter__(self) — вызывается при входе в with; возвращаемое значение присваивается переменной as__exit__(self, exc_type, exc_val, exc_tb) — вызывается при выходе; возврат True подавляет исключениеclass FileManager:
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
self.file = open(self.filename, self.mode)
return self.file # это попадает в `as f`
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file:
self.file.close()
if exc_type is not None:
print(f"Исключение: {exc_type.__name__}: {exc_val}")
return False # False = не подавлять исключение
with FileManager("test.txt", "w") as f:
f.write("hello")
# Файл закрыт даже если write() вызвал исключение__exit__| Параметр | Значение при ошибке | Значение при успехе |
|---|---|---|
exc_type | Класс исключения | None |
exc_val | Экземпляр исключения | None |
exc_tb | Traceback объект | None |
def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
if exc_type is ValueError:
print(f"Неверное значение: {exc_val}")
return True # подавляем ValueError
return False # остальные исключения пробрасываемВажно понимать порядок выполнения:
with SomeManager() as x:
risky_code() # если здесь исключение...
# → Python вызывает __exit__(exc_type, exc_val, exc_tb)
# → если __exit__ вернул True → исключение подавлено
# → если False или None → исключение пробрасывается
Если __exit__ сам вызывает исключение, оно заменяет оригинальное:
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup() # если cleanup() упадёт — оригинальное исключение потеряется
return False
# Безопасный паттерн:
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.cleanup()
except Exception as e:
if exc_type is None:
raise # только если не было оригинального исключения
# иначе logируем и позволяем оригинальному исключению пройти
logging.error(f"Cleanup failed: {e}")
return False@contextmanagercontextlib.contextmanager превращает генераторную функцию в контекстный менеджер. Код до yield — это __enter__, после — __exit__.
from contextlib import contextmanager
import time
@contextmanager
def timer(label=""):
start = time.perf_counter()
try:
yield # здесь выполняется тело with-блока
finally: # finally гарантирует выполнение при любом исходе
elapsed = time.perf_counter() - start
print(f"{label}: {elapsed:.4f}s")
with timer("database query"):
result = db.execute(query)
# database query: 0.0023syield@contextmanager
def managed_resource(name):
resource = create_resource(name)
print(f"Создан ресурс: {name}")
try:
yield resource # resource попадает в `as r`
except Exception as e:
print(f"Ошибка: {e}")
raise # перебрасываем исключение
finally:
release_resource(resource)
print(f"Ресурс освобождён: {name}")
with managed_resource("connection") as r:
r.query("SELECT 1")@contextmanager@contextmanager
def suppress_and_log(*exceptions):
try:
yield
except exceptions as e:
logging.warning(f"Подавлено исключение: {type(e).__name__}: {e}")
# НЕ re-raise = исключение подавлено
with suppress_and_log(FileNotFoundError, PermissionError):
os.remove("nonexistent.txt") # тихо игнорируется@contextmanager# ❌ Без try/finally — cleanup не выполнится при исключении
@contextmanager
def bad_manager():
resource = acquire()
yield resource
release(resource) # не выполнится если внутри with было исключение!
# ✅ Правильно: всегда try/finally
@contextmanager
def good_manager():
resource = acquire()
try:
yield resource
finally:
release(resource) # выполняется всегда
# ❌ Несколько yield — RuntimeError
@contextmanager
def broken():
yield 1
yield 2 # RuntimeError: generator didn't stopcontextlib — готовые контекстные менеджерыcontextlib.suppress(*exceptions) — подавление исключенийfrom contextlib import suppress
# Эквивалентно try/except pass, но короче
with suppress(FileNotFoundError):
os.remove("maybe_exists.txt")
# Несколько типов исключений:
with suppress(KeyError, IndexError):
value = data["missing_key"][42]contextlib.closing(thing) — автовызов close()from contextlib import closing
# Для объектов с методом close(), но без поддержки with
with closing(urllib.request.urlopen(url)) as response:
data = response.read()
# Аналог:
with closing(csv.DictReader(open("data.csv"))) as reader:
for row in reader:
process(row)contextlib.redirect_stdout() и redirect_stderr()from contextlib import redirect_stdout, redirect_stderr
import io
# Перехват stdout
buf = io.StringIO()
with redirect_stdout(buf):
print("Это пойдёт в buf, не в консоль")
help(len)
output = buf.getvalue()
# Перехват stderr (полезно для тестов)
with redirect_stderr(open(os.devnull, 'w')):
noisy_function() # вывод в stderr подавленcontextlib.nullcontext(enter_result=None) — КМ-заглушкаfrom contextlib import nullcontext
def process(data, lock=None):
# lock может быть реальным Lock или None
ctx = lock if lock is not None else nullcontext()
with ctx:
do_work(data)
# Или для опционального КМ:
def open_file(filename, encoding=None):
ctx = open(filename) if filename else nullcontext(default_data)
with ctx as data:
return process(data)contextlib.ExitStack — динамическое управление несколькими КМfrom contextlib import ExitStack
# Открываем произвольное число файлов
filenames = ['a.txt', 'b.txt', 'c.txt', 'd.txt']
with ExitStack() as stack:
files = [stack.enter_context(open(f)) for f in filenames]
# все файлы открыты; при выходе из with все закроются
for f in files:
process(f.read())
# ExitStack позволяет динамически добавлять КМ во время выполнения:
with ExitStack() as stack:
for resource_name in get_resources():
resource = stack.enter_context(acquire(resource_name))
configure(resource)
# Реестр cleanup-функций:
with ExitStack() as stack:
stack.callback(print, "Cleanup 1") # вызовется при выходе
stack.callback(print, "Cleanup 2") # в обратном порядке
do_work()
# Cleanup 2
# Cleanup 1contextlib.AbstractContextManager — базовый классfrom contextlib import AbstractContextManager
class MyManager(AbstractContextManager):
def __enter__(self):
return self # AbstractContextManager предоставляет __enter__ по умолчанию
# возвращающий self, но лучше переопределить явно
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
return False # AbstractContextManager требует реализовать __exit__with и синтаксис запятой# Старый стиль — вложенные блоки
with open('input.txt') as fin:
with open('output.txt', 'w') as fout:
fout.write(fin.read())
# Новый стиль — несколько КМ через запятую (Python 3.1+)
with open('input.txt') as fin, open('output.txt', 'w') as fout:
fout.write(fin.read())
# С переносом строки (Python 3.10+):
with (
open('input.txt') as fin,
open('output.txt', 'w') as fout,
lock,
):
fout.write(fin.read())При with A() as a, B() as b: порядок такой:
A.__enter__(), затем B.__enter__()B.__exit__(), затем A.__exit__() (обратный порядок, как стек)class Traced:
def __init__(self, name):
self.name = name
def __enter__(self):
print(f"Enter {self.name}")
return self
def __exit__(self, *args):
print(f"Exit {self.name}")
return False
with Traced("A") as a, Traced("B") as b:
print("inside")
# Enter A
# Enter B
# inside
# Exit B
# Exit AДля async with нужно реализовать __aenter__ и __aexit__:
import aiofiles
from contextlib import asynccontextmanager
# Через класс:
class AsyncDB:
async def __aenter__(self):
self.conn = await create_connection()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
async with AsyncDB() as db:
result = await db.execute("SELECT 1")
# Через @asynccontextmanager:
@asynccontextmanager
async def managed_transaction(conn):
async with conn.transaction():
try:
yield conn
except Exception:
await conn.rollback()
raise
else:
await conn.commit()
async with managed_transaction(db) as conn:
await conn.execute("INSERT ...")AsyncExitStack — для динамических async КМfrom contextlib import AsyncExitStack
async def setup():
async with AsyncExitStack() as stack:
db = await stack.enter_async_context(get_db_connection())
cache = await stack.enter_async_context(get_redis())
stack.callback(log_shutdown) # синхронный callback тоже работает
# при выходе: cache.__aexit__ → db.__aexit__ → log_shutdown()Некоторые менеджеры поддерживают вложенный вход (reentrant):
from contextlib import contextmanager
class RLock:
"""Реентерабельная блокировка с подсчётом входов."""
def __init__(self):
self._count = 0
self._lock = threading.Lock()
def __enter__(self):
if self._count == 0:
self._lock.acquire()
self._count += 1
return self
def __exit__(self, *args):
self._count -= 1
if self._count == 0:
self._lock.release()
return False
rlock = RLock()
with rlock:
print("outer")
with rlock: # OK — не заблокируется
print("inner")contextlib.redirect_stdout — пример реентерабельного КМ из stdlib:
buf = io.StringIO()
with redirect_stdout(buf):
with redirect_stdout(buf): # вложенный — работает корректно
print("nested")from contextlib import contextmanager
@contextmanager
def db_transaction(session):
"""Автоматически коммитит или откатывает транзакцию."""
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
with db_transaction(db_session) as session:
session.add(User(name="Alice"))
session.add(Order(user_id=1, amount=100))
# коммит только если оба успешныimport time
from dataclasses import dataclass
from contextlib import contextmanager
@dataclass
class Elapsed:
value: float = 0.0
@contextmanager
def stopwatch(label="", verbose=True):
"""Измеряет время выполнения блока."""
elapsed = Elapsed()
start = time.perf_counter()
try:
yield elapsed
finally:
elapsed.value = time.perf_counter() - start
if verbose:
print(f"[{label}] {elapsed.value*1000:.2f} ms")
with stopwatch("heavy computation") as t:
result = compute_something()
print(f"Took {t.value:.3f}s")import tempfile
import shutil
@contextmanager
def temp_directory():
"""Создаёт временную директорию, удаляет при выходе."""
tmpdir = tempfile.mkdtemp()
try:
yield tmpdir
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
with temp_directory() as tmpdir:
output_file = os.path.join(tmpdir, "result.csv")
process_to_file(output_file)
upload(output_file)
# директория удалена
# Или встроенный:
with tempfile.TemporaryDirectory() as tmpdir:
pass@contextmanager
def working_directory(path):
"""Временно меняет рабочую директорию."""
old_cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(old_cwd)
with working_directory("/tmp"):
subprocess.run(["make", "all"])import threading
class CircuitBreaker:
"""Отключает вызовы после N ошибок подряд."""
CLOSED = "closed"
OPEN = "open"
def __init__(self, max_failures=3, reset_timeout=60):
self._state = self.CLOSED
self._failures = 0
self._max = max_failures
self._reset_at = None
self._lock = threading.Lock()
def __enter__(self):
with self._lock:
if self._state == self.OPEN:
if time.time() < self._reset_at:
raise RuntimeError("Circuit breaker is OPEN")
self._state = self.CLOSED
self._failures = 0
return self
def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
if exc_type is not None:
self._failures += 1
if self._failures >= self._max:
self._state = self.OPEN
self._reset_at = time.time() + 60
logging.warning("Circuit OPEN: too many failures")
return False
cb = CircuitBreaker(max_failures=3)
for _ in range(10):
try:
with cb:
call_external_service()
except RuntimeError as e:
print(f"Circuit open: {e}")unittest.mock.patch)from unittest.mock import patch
# patch — это контекстный менеджер
with patch('mymodule.requests.get') as mock_get:
mock_get.return_value.json.return_value = {"status": "ok"}
result = my_function_that_calls_requests()
mock_get.assert_called_once()import queue
class ConnectionPool:
def __init__(self, create_fn, size=5):
self._pool = queue.Queue(maxsize=size)
for _ in range(size):
self._pool.put(create_fn())
@contextmanager
def acquire(self, timeout=5):
try:
conn = self._pool.get(timeout=timeout)
except queue.Empty:
raise TimeoutError("No connections available")
try:
yield conn
finally:
self._pool.put(conn) # всегда возвращаем в пул
pool = ConnectionPool(lambda: psycopg2.connect(DSN), size=10)
with pool.acquire() as conn:
conn.execute("SELECT 1")
# соединение вернулось в пулwith напрямуюimport pytest
from contextlib import contextmanager
@contextmanager
def managed_file(path):
f = open(path)
try:
yield f
finally:
f.close()
def test_managed_file(tmp_path):
p = tmp_path / "test.txt"
p.write_text("hello")
with managed_file(p) as f:
assert f.read() == "hello"
# Тестирование подавления исключений:
def test_suppresses_error():
with suppress(ValueError):
raise ValueError("should be suppressed")
# дошли сюда = исключение подавлено, тест проходит__exit__ с исключениемdef test_exit_called_on_error():
events = []
@contextmanager
def traced():
events.append("enter")
try:
yield
except Exception:
events.append("exception")
raise
finally:
events.append("exit")
with pytest.raises(RuntimeError):
with traced():
raise RuntimeError("test")
assert events == ["enter", "exception", "exit"]| Подход | Когда использовать |
|---|---|
Класс с __enter__/__exit__ | Сложная логика, состояние, наследование, реентерабельность |
@contextmanager | Простые случаи, читаемый код, нет состояния |
contextlib.suppress | Подавление конкретных исключений |
contextlib.ExitStack | Динамическое количество ресурсов |
contextlib.nullcontext | Опциональный контекст (тест vs прод) |
@asynccontextmanager | Async ресурсы с простой логикой |
AsyncExitStack | Динамические async ресурсы |
@contextmanager для блокировки threading.Lock с таймаутом, который поднимает TimeoutError если не удалось получить блокировку за N секунд.redirect_stdout(to), перенаправляющий sys.stdout в файл или StringIO.ExitStack-based функцию open_all(filenames), открывающую список файлов с гарантией закрытия всех при ошибке на любом из них.Timer КМ, который накапливает общее время вложенных измерений.@contextmanager требует try/finally, а не просто yield — что произойдёт с ресурсом при исключении без finally?async with пул асинхронных HTTP-соединений используя AsyncExitStack.Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.