Реакция на новые сообщения, редактирования, вступления пользователей. Декораторы и фильтры событий.
Ключевая идея: Обработчики событий позволяют реагировать на действия в Telegram в реальном времени. Вместо опроса (polling) вы регистрируете функции-обработчики, которые вызываются автоматически при наступлении события.
Telethon использует систему событий, похожую на event-driven программирование. Вы регистрируете обработчик, и он вызывается каждый раз, когда происходит определённое событие:
from telethon import events
@client.on(events.NewMessage)
async def handler(event):
print(f'Новое сообщение: {event.text}')Ключевая идея: В отличие от polling (периодического опроса), события работают мгновенно и не тратят ресурсы на пустые запросы.
Самый используемый тип события — новые сообщения:
@client.on(events.NewMessage)
async def handle_all(event):
print(f'[{event.chat_id}] {event.text}')
# Запуск клиента для получения событий
client.start()
client.run_until_disconnected()# Только конкретные чаты
@client.on(events.NewMessage(chats=['chat1', 'chat2']))
async def handle_specific(event):
print(f'Сообщение из monitored чата: {event.text}')
# Исключение чатов
@client.on(events.NewMessage(not_chats=['spam_chat']))
async def handle_except(event):
print(f'Сообщение не из чёрного списка: {event.text}')import re
# Простая строка (регистронезависимый поиск)
@client.on(events.NewMessage(pattern='привет'))
async def handle_greeting(event):
await event.respond('Привет!')
# Regex — более сложные паттерны
@client.on(events.NewMessage(pattern=r'(?i)^!command\s+(\w+)'))
async def handle_command(event):
command = event.pattern_match.group(1)
await event.respond(f'Выполнил команду: {command}')Важно:
patternподдерживает как строки, так и regex-объекты. Флаг(?i)делает поиск регистронезависимым.
# Только от определённых пользователей
@client.on(events.NewMessage(from_users=['username1', 12345678]))
async def handle_from_user(event):
print(f'Сообщение от конкретного пользователя: {event.text}')
# Только свои сообщения
@client.on(events.NewMessage(outgoing=True))
async def handle_own(event):
print(f'Моё сообщение: {event.text}')
# Только входящие (не мои)
@client.on(events.NewMessage(incoming=True))
async def handle_incoming(event):
print(f'Входящее сообщение: {event.text}')Когда пользователь нажимает inline-кнопку, возникает событие CallbackQuery:
@client.on(events.CallbackQuery)
async def handle_callback(event):
print(f'Нажата кнопка: {event.data.decode()}')
await event.answer('Кнопка нажата!')@client.on(events.CallbackQuery(data=b'start_action'))
async def handle_start(event):
await event.edit('Кнопка Start нажата ✓')
await event.answer()
@client.on(events.CallbackQuery(data=b'stop_action'))
async def handle_stop(event):
await event.edit('Кнопка Stop нажата ✓')
await event.answer()Правило: всегда вызывайте
event.answer()после обработки callback. Это убирает «загрузку» на кнопке и подтверждает обработку.
События действий пользователей в чате:
@client.on(events.ChatAction)
async def handle_action(event):
if event.user_added:
print(f'Пользователь {event.user_added[0]} добавлен в чат')
elif event.user_joined:
print(f'Пользователь {event.user_joined} присоединился')
elif event.user_left:
print(f'Пользователь {event.user_left} покинул чат')
elif event.user_banned:
print(f'Пользователь {event.user_banned} забанен')# Только вступления
@client.on(events.ChatAction(func=lambda e: e.user_joined))
async def handle_join(event):
await event.respond(f'Добро пожаловать, {event.user_joined.first_name}!')
# Только выходы
@client.on(events.ChatAction(func=lambda e: e.user_left))
async def handle_leave(event):
print(f'Пока, {event.user_left.first_name}')@client.on(events.MessageEdited)
async def handle_edit(event):
print(f'Сообщение {event.id} изменено')
print(f'Новый текст: {event.text}')# Сохраняем оригинал
message_cache = {}
@client.on(events.NewMessage)
async def cache_message(event):
message_cache[event.id] = event.text
@client.on(events.MessageEdited)
async def track_edit(event):
old_text = message_cache.get(event.id, 'неизвестно')
print(f'Было: {old_text}')
print(f'Стало: {event.text}')
message_cache[event.id] = event.text@client.on(events.MessageDeleted)
async def handle_delete(event):
print(f'Удалены сообщения: {event.deleted_ids}')
print(f'В чате: {event.chat_id}')Важно: событие MessageDeleted содержит список ID удалённых сообщений, но не их содержимое. Если вам нужен текст удалённого сообщения — кешируйте его заранее.
Событие Album срабатывает при получении группы медиа (альбом):
@client.on(events.Album)
async def handle_album(event):
print(f'Получен альбом из {len(event.messages)} элементов')
for msg in event.messages:
if msg.photo:
print('Фото')
elif msg.video:
print('Видео')
elif msg.document:
print('Документ')Параметр func принимает функцию, которая решает, обрабатывать ли событие:
# Фильтр по длине сообщения
@client.on(events.NewMessage(func=lambda e: len(e.text) > 100))
async def handle_long(event):
print(f'Длинное сообщение: {len(event.text)} символов')
# Фильтр по наличию медиа
@client.on(events.NewMessage(func=lambda e: e.media is not None))
async def handle_media(event):
print(f'Сообщение с медиа')
# Комбинированный фильтр
@client.on(events.NewMessage(
chats=['main_chat'],
func=lambda e: e.text.startswith('!') and not e.out
))
async def handle_command(event):
print(f'Команда в основном чате: {event.text}')Можно регистрировать несколько обработчиков на одно событие:
@client.on(events.NewMessage)
async def handler1(event):
print('Обработчик 1')
@client.on(events.NewMessage)
async def handler2(event):
print('Обработчик 2')
# Оба обработчика вызовутся при новом сообщенииПравило: порядок вызова обработчиков не гарантирован. Не полагайтесь на порядок выполнения.
Для предотвращения дальнейшей обработки события используйте raise events.StopPropagation:
from telethon.events import StopPropagation
@client.on(events.NewMessage(pattern='!stop'))
async def stop_handler(event):
await event.respond('Обработка остановлена')
raise StopPropagation
@client.on(events.NewMessage)
async def other_handler(event):
# Не вызовется для '!stop', так как StopPropagation
print('Это не напечатается для команды !stop')import traceback
@client.on(events.NewMessage)
async def safe_handler(event):
try:
# Ваш код
await process_message(event)
except Exception as e:
print(f'Ошибка в обработчике: {e}')
traceback.print_exc()
await event.respond('Произошла ошибка при обработке')def main():
# Регистрируем обработчики
setup_handlers()
# Запускаем и слушаем события
client.start()
print('Слушаю события...')
client.run_until_disconnected()
def setup_handlers():
@client.on(events.NewMessage(pattern='привет'))
async def greet(event):
await event.respond('Привет!')
@client.on(events.CallbackQuery)
async def callback(event):
await event.answer('OK')
if __name__ == '__main__':
main()Ключевая идея:
run_until_disconnected()блокирует выполнение и слушает события до отключения. Это стандартный способ запуска юзербота.
Обработчики событий — это сердце интерактивного юзербота. Следующая тема — работа с пользователями — научит вас получать профили, искать людей и определять их роли в чатах.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.