Встроенные события Locust, добавление слушателей, создание плагинов, кастомные метрики.
Events API — механизм расширения Locust. Через него можно подписаться на любое событие жизненного цикла теста: старт, стоп, каждый запрос, инициализация worker'а — и добавить кастомную логику без изменения locustfile.
Locust предоставляет набор событий, доступных через locust.events или через аргумент environment.events. Наиболее используемые:
init — срабатывает один раз при инициализации Locust (до запуска теста)test_start — перед началом теста (после нажатия Start в UI или в headless-режиме)test_stop — после остановки тестаrequest — после каждого HTTP-запроса (или после events.request.fire() для кастомных протоколов)user_error — при необработанном исключении в задаче пользователяspawning_complete — когда достигнуто целевое число пользователейquitting — перед завершением процесса LocustСамый чистый способ — использовать декоратор @events.event_name.add_listener:
from locust import events
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print(f"Тест начался. Целевых пользователей: {environment.parsed_options.num_users}")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("Тест завершён")
# Здесь можно сохранить итоговую статистикуАргумент **kwargs обязателен — Locust может передавать дополнительные параметры в зависимости от версии, и без **kwargs функция упадёт с TypeError.
Событие request срабатывает после каждого запроса — как успешного, так и упавшего. Это ключевое событие для интеграции с внешними системами мониторинга:
import influxdb_client
from locust import events
@events.init.add_listener
def on_init(environment, **kwargs):
# Создаём клиент InfluxDB один раз при инициализации
environment.influx_client = influxdb_client.InfluxDBClient(
url="http://localhost:8086",
token="my-token",
org="my-org"
)
environment.write_api = environment.influx_client.write_api()
@events.request.add_listener
def on_request(
request_type, name, response_time,
response_length, exception, **kwargs
):
environment = kwargs.get("environment")
point = (
influxdb_client.Point("locust_requests")
.tag("name", name)
.tag("request_type", request_type)
.field("response_time", response_time)
.field("success", 0 if exception else 1)
)
if environment and hasattr(environment, "write_api"):
environment.write_api.write(bucket="locust", record=point)Параметры события request:
request_type — тип запроса ("GET", "POST", "WebSocket" и т.д.)name — имя/URL запросаresponse_time — время в миллисекундахresponse_length — размер ответа в байтахexception — объект исключения или NoneЧерез событие init_command_line_parser можно добавить собственные флаги командной строки:
from locust import events
@events.init_command_line_parser.add_listener
def add_custom_args(parser, **kwargs):
parser.add_argument(
"--target-env",
type=str,
default="staging",
help="Целевое окружение: staging или production"
)
parser.add_argument(
"--test-scenario",
type=str,
default="default",
help="Имя сценария для запуска"
)
@events.init.add_listener
def on_init(environment, **kwargs):
env = environment.parsed_options.target_env
scenario = environment.parsed_options.test_scenario
print(f"Запуск сценария '{scenario}' против '{env}'")После этого можно запускать Locust с кастомными флагами:
locust -f locustfile.py --target-env production --test-scenario checkoutВ distributed режиме нужно разделить данные между worker'ами. Событие init срабатывает на обоих — master и worker. Для логики, специфичной только для worker'ов, используйте проверку типа runner'а:
from locust import events
from locust.runners import WorkerRunner
@events.init.add_listener
def on_init(environment, **kwargs):
if isinstance(environment.runner, WorkerRunner):
# Этот код выполняется только на worker'ах
worker_idx = environment.runner.worker_index
print(f"Worker {worker_idx} инициализирован")
environment.test_users = load_users_for_worker(worker_idx)
else:
# Этот код выполняется на master'е или при одиночном запуске
print("Master или одиночный запуск")Иногда нужно определить кастомные события внутри locustfile — например, сигнализировать о завершении бизнес-транзакции. Используйте EventHook:
from locust import User, task, between, events
from locust.event import EventHook
# Создаём кастомное событие
checkout_completed = EventHook()
# Регистрируем слушателя
@checkout_completed.add_listener
def on_checkout(order_id, total_price, **kwargs):
print(f"Заказ {order_id} оформлен на сумму {total_price}")
class ShopUser(User):
wait_time = between(1, 3)
@task
def make_purchase(self):
# ... выполнение HTTP-запросов ...
order_id = "12345"
total = 1500
# Стреляем кастомным событием
checkout_completed.fire(order_id=order_id, total_price=total)Хорошая практика — выносить логику событий в отдельный модуль (плагин) и подключать его через импорт. Это позволяет переиспользовать один плагин в нескольких locustfile-ах:
# plugins/influx_reporter.py
from locust import events
def register(environment):
"""Вызывается из locustfile для подключения плагина."""
@events.test_start.add_listener
def on_start(env=environment, **kwargs):
env._influx = create_influx_connection()
@events.request.add_listener
def on_request(request_type, name, response_time, exception, **kwargs):
write_to_influx(
environment._influx,
name=name,
response_time=response_time,
failed=exception is not None
)
@events.test_stop.add_listener
def on_stop(env=environment, **kwargs):
env._influx.close()Подключение плагина в locustfile:
# locustfile.py
from locust import events
from plugins.influx_reporter import register
@events.init.add_listener
def on_init(environment, **kwargs):
register(environment)Такая архитектура делает locustfile чистым, а плагин — тестируемым и переиспользуемым.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.