Как Telegram ограничивает запросы, обработка FloodWait, стратегии обхода ограничений.
Ключевая идея: Telegram ограничивает количество запросов с одного аккаунта для предотвращения спама и злоупотреблений. Понимание лимитов и правильная стратегия обхода — критический навык для production-приложений.
Telegram использует систему rate limiting для контроля частоты запросов. При превышении лимита сервер возвращает ошибку FloodWaitError с указанием времени ожидания.
from telethon.errors import FloodWaitError
try:
await client.send_message(chat, 'Сообщение')
except FloodWaitError as e:
print(f'Превышен лимит. Ждём {e.seconds} секунд')Ключевая идея: FloodWaitError — это не блокировка аккаунта, а просьба подождать. Если вы ждёте указанное время, запрос будет обработан.
Telegram не публикует точные значения лимитов, но эмпирически определены следующие диапазоны:
| Операция | Примерный лимит |
|---|---|
| Отправка сообщений | 20-30 в минуту (личные), 5-10 в минуту (группы) |
| Присоединение к чатам | 5-10 в минуту |
| Добавление участников | 50 в день (группы), меньше для новых аккаунтов |
| Скачивание медиа | Ограничено пропускной способностью |
| Запрос участников | 100-200 запросов в минуту |
| Редактирование сообщений | 30 в минуту |
Важно: лимиты зависят от возраста аккаунта, репутации и типа операции. Новые аккаунты имеют более строгие ограничения.
import asyncio
from telethon.errors import FloodWaitError
async def safe_send(chat, text):
try:
await client.send_message(chat, text)
except FloodWaitError as e:
print(f'FloodWait: {e.seconds}s')
await asyncio.sleep(e.seconds)
await client.send_message(chat, text) # ПовторTelethon по умолчанию автоматически обрабатывает FloodWait:
# Telethon автоматически ждёт при FloodWait
# Это поведение включено по умолчанию
client = TelegramClient('session', api_id, api_hash)Правило: несмотря на автоматическую обработку, рекомендуется явно обрабатывать FloodWait для логирования и предотвращения каскадных задержек.
import asyncio
async def send_bulk(chats, message):
for chat in chats:
try:
await client.send_message(chat, message)
await asyncio.sleep(1) # 1 секунда между сообщенияами
except FloodWaitError as e:
print(f'FloodWait: {e.seconds}s')
await asyncio.sleep(e.seconds)async def process_in_batches(items, batch_size=10, delay=5):
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
# Обрабатываем батч параллельно
await asyncio.gather(*[process_item(item) for item in batch])
# Ждём между батчами
print(f'Обработано {i + len(batch)}/{len(items)}')
await asyncio.sleep(delay)import random
async def send_with_random_delay(chat, text, min_delay=1, max_delay=3):
await client.send_message(chat, text)
# Случайная задержка для имитации человеческого поведения
delay = random.uniform(min_delay, max_delay)
await asyncio.sleep(delay)Ключевая идея: рандомизация задержек делает поведение менее «роботизированным» и снижает риск обнаружения автоматизации.
import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_requests=30, window=60):
self.max_requests = max_requests
self.window = window
self.requests = defaultdict(list)
def can_request(self, chat_id):
now = time.time()
# Удаляем старые запросы за пределами окна
self.requests[chat_id] = [
t for t in self.requests[chat_id]
if now - t < self.window
]
return len(self.requests[chat_id]) < self.max_requests
def record_request(self, chat_id):
self.requests[chat_id].append(time.time())
limiter = RateLimiter(max_requests=20, window=60)
async def rate_limited_send(chat, text):
if not limiter.can_request(chat):
print('Лимит приближается. Ждём')
await asyncio.sleep(10)
limiter.record_request(chat)
await client.send_message(chat, text)# Плохо — мгновенный бан
for chat in all_chats:
await client.send_message(chat, 'Спам!') # Без задержек
# Хорошо — с задержками
for chat in all_chats:
await client.send_message(chat, 'Сообщение')
await asyncio.sleep(random.uniform(5, 15))# Хорошо — с задержками между батчами
async def scrape_safely(chat, limit=1000):
messages = []
batch_size = 100
async for msg in client.iter_messages(chat, limit=limit):
messages.append(msg)
if len(messages) % batch_size == 0:
print(f'Загружено {len(messages)}')
await asyncio.sleep(2) # Пауза каждые 100 сообщений
return messages# Ограниченное количество в день
async def add_users_safely(chat, users):
max_per_day = 50
added = 0
for user in users:
if added >= max_per_day:
print('Дневной лимит достигнут')
break
try:
await client(InviteToChannelRequest(chat, [user]))
added += 1
await asyncio.sleep(random.uniform(10, 30))
except FloodWaitError as e:
print(f'FloodWait: {e.seconds}s')
await asyncio.sleep(e.seconds)
except UserAlreadyParticipantError:
continueimport logging
flood_count = 0
async def track_floods():
global flood_count
try:
# Ваш код
pass
except FloodWaitError as e:
flood_count += 1
logging.warning(f'FloodWait #{flood_count}: {e.seconds}s')
if flood_count > 10:
logging.error('Слишком много FloodWait. Останавливаюсь.')
raise# Динамическое увеличение задержки при FloodWait
base_delay = 1
flood_multiplier = 2
async def adaptive_send(chat, text):
global base_delay
try:
await client.send_message(chat, text)
base_delay = max(1, base_delay // 2) # Уменьшаем при успехе
except FloodWaitError as e:
base_delay *= flood_multiplier # Увеличиваем при ошибке
await asyncio.sleep(max(e.seconds, base_delay))
await client.send_message(chat, text)| Характеристика | FloodWait | Блокировка |
|---|---|---|
| Длительность | Секунды/минуты | Часы/дни/навсегда |
| Причина | Превышение лимита запросов | Спам, жалобы, подозрительная активность |
| Решение | Подождать указанное время | Ждать или связаться с поддержкой |
| Ошибка | FloodWaitError | AuthKeyDuplicatedError, UnauthorizedError |
Правило: частые FloodWait могут привести к блокировке. Соблюдайте лимиты и используйте задержки.
Лимиты — это последнее препятствие перед деплоем. Следующая тема — деплой и мониторинг — расскажет о запуске Telethon-приложений в production.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.