Типичные ошибки Telethon, повторные попытки, логирование, graceful shutdown.
Ключевая идея: Telethon предоставляет иерархию исключений для обработки различных типов ошибок. Правильная обработка ошибок — ключ к созданию устойчивых production-приложений.
Все исключения Telethon наследуются от RPCError:
RPCError (базовый класс)
├── AuthKeyDuplicatedError
├── SessionPasswordNeededError
├── FloodWaitError
├── ChatWriteForbiddenError
├── MessageIdInvalidError
├── UserAlreadyParticipantError
├── ChannelPrivateError
└── ... (многие другие)
Ключевая идея: каждое конкретное исключение соответствует определённому коду ошибки Telegram API. Это позволяет точно определять причину и реагировать соответствующим образом.
from telethon.errors import RPCError
try:
await client.send_message(chat, 'Привет!')
except RPCError as e:
print(f'Ошибка API: {e}')from telethon.errors import (
FloodWaitError,
ChatWriteForbiddenError,
MessageIdInvalidError,
UserAlreadyParticipantError
)
async def safe_send(chat, text):
try:
await client.send_message(chat, text)
except FloodWaitError as e:
print(f'FloodWait: ждём {e.seconds} секунд')
except ChatWriteForbiddenError:
print('Нет прав на запись в этом чате')
except MessageIdInvalidError:
print('Неверный ID сообщения')
except UserAlreadyParticipantError:
print('Пользователь уже в чате')FloodWaitError — самая частая ошибка. Она содержит время ожидания:
from telethon.errors import FloodWaitError
import asyncio
async def send_with_retry(chat, text):
while True:
try:
await client.send_message(chat, text)
break # Успех
except FloodWaitError as e:
print(f'FloodWait: {e.seconds} сек')
await asyncio.sleep(e.seconds)Правило: всегда обрабатывайте FloodWaitError с ожиданием указанного времени. Игнорирование может привести к более длительной блокировке.
from telethon import TelegramClient
# Включаем автоматическую обработку FloodWait
client = TelegramClient('session', api_id, api_hash)
# Telethon автоматически ждёт при FloodWait по умолчаниюВажно: Telethon по умолчанию автоматически обрабатывает FloodWaitError, ожидая указанное время. Но для полного контроля лучше обрабатывать вручную.
import logging
import traceback
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)
@client.on(events.NewMessage)
async def safe_handler(event):
try:
await process_message(event)
except FloodWaitError as e:
logger.warning(f'FloodWait: {e.seconds}s')
await asyncio.sleep(e.seconds)
except Exception as e:
logger.error(f'Неожиданная ошибка: {e}')
traceback.print_exc()
# Опционально: уведомить админа
await client.send_message('admin_chat', f'Ошибка: {e}')async def retry_send(chat, text, max_retries=3):
for attempt in range(max_retries):
try:
await client.send_message(chat, text)
return # Успех
except FloodWaitError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(e.seconds)
except Exception:
if attempt == max_retries - 1:
raise
await asyncio.sleep(1)async def exponential_retry(chat, text, max_retries=5):
for attempt in range(max_retries):
try:
await client.send_message(chat, text)
return
except Exception as e:
delay = min(2 ** attempt, 60) # Максимум 60 сек
logger.warning(f'Попытка {attempt + 1} failed: {e}. Ждём {delay}s')
await asyncio.sleep(delay)
raise Exception('Превышено максимальное число попыток')Корректное завершение работы клиента:
import signal
import asyncio
shutdown_event = asyncio.Event()
def signal_handler():
print('Получен сигнал завершения')
shutdown_event.set()
async def main():
await client.start()
print('Клиент запущен. Ctrl+C для остановки.')
# Ждём сигнала завершения
await shutdown_event.wait()
print('Завершение работы...')
await client.disconnect()
print('Клиент отключён')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, signal_handler)
try:
loop.run_until_complete(main())
finally:
loop.close()Ключевая идея: graceful shutdown гарантирует, что клиент корректно закроет соединение и сохранит сессию. Это предотвращает потерю данных и повреждение сессии.
import logging
from datetime import datetime
# Настройка логирования
logging.basicConfig(
filename='telethon_errors.log',
level=logging.ERROR,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
@client.on(events.NewMessage)
async def logged_handler(event):
try:
await process(event)
except FloodWaitError as e:
logging.warning(f'FloodWait в чате {event.chat_id}: {e.seconds}s')
except Exception as e:
logging.error(
f'Ошибка в чате {event.chat_id}: {type(e).__name__}: {e}',
exc_info=True
)
raise # Пробрасываем дальшеfrom telethon.errors import (
ApiIdInvalidError,
PhoneNumberInvalidError,
PhoneCodeInvalidError,
SessionPasswordNeededError
)
try:
await client.start()
except ApiIdInvalidError:
print('Неверный API ID или Hash')
except PhoneNumberInvalidError:
print('Неверный формат номера')
except PhoneCodeInvalidError:
print('Неверный код подтверждения')
except SessionPasswordNeededError:
print('Требуется пароль 2FA')from telethon.errors import (
MessageNotModifiedError,
MessageDeleteForbiddenError,
MessageEmptyError
)
try:
await client.edit_message(chat, msg_id, new_text)
except MessageNotModifiedError:
print('Текст не изменился')
except MessageDeleteForbiddenError:
print('Нет прав на удаление/редактирование')
except MessageEmptyError:
print('Пустое сообщение')from telethon.errors import (
ChannelPrivateError,
ChatAdminRequiredError,
UsernameNotOccupiedError
)
try:
chat = await client.get_entity('username')
except UsernameNotOccupiedError:
print('Username не найден')
except ChannelPrivateError:
print('Закрытый канал, нет доступа')
except ChatAdminRequiredError:
print('Требуются права администратора')class TelethonAppError(Exception):
"""Базовое исключение приложения"""
pass
class RateLimitError(TelethonAppError):
def __init__(self, wait_time):
self.wait_time = wait_time
super().__init__(f'Лимит превышен. Ждём {wait_time}s')
class PermissionError(TelethonAppError):
pass
async def handle_with_custom(chat, text):
try:
await client.send_message(chat, text)
except FloodWaitError as e:
raise RateLimitError(e.seconds)
except (ChatWriteForbiddenError, ChatAdminRequiredError) as e:
raise PermissionError(f'Нет прав: {e}')# Отправка ошибок админу
async def notify_admin(error_msg):
admin_id = 12345678
await client.send_message(admin_id, f'⚠️ Ошибка:\n{error_msg}')
@client.on(events.NewMessage)
async def monitored_handler(event):
try:
await process(event)
except Exception as e:
error_msg = f'Chat {event.chat_id}: {type(e).__name__}: {e}'
await notify_admin(error_msg)
raiseОбработка ошибок защищает ваше приложение от сбоев. Но Telegram также ограничивает частоту запросов. Следующая тема — лимиты и FloodWait — расскажет о стратегиях обхода ограничений.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.