Использование Locust для нагрузочного тестирования приложения FastAPI с параллельными user
Напишите тесты для user в обычном коде Python , затем запустите тесты с параллельными user.
Я только что закончил свое первое приложение FastAPI . Это приложение позволяет user иметь свои собственные элементы, что означает, что все модели данных user имеют поле user_id. Ничего особенного, но поскольку FastAPI вводит некоторые новые для меня вещи, такие как Dependency Injection, я не был уверен, что мое приложение будет работать так, как я хочу.
Мой вопрос заключался в следующем: если я проведу нагрузочное тестирование API с большим количеством одновременных user, будет ли оно работать?
Предупреждение. Проводите нагрузочный тест на своем компьютере только в том случае, если вы уверены, что он справится.
Определение теста
Мне нужен был инструмент, который может:
- инициализировать несколько users
- параллельно запускать тесты для каждого user
- Делать это как можно быстрее
Чтобы инициализировать user , я должен зарегистрировать user с уникальным адресом электронной почты. После этого я получаю токен доступа для этого user, и устанавливаю заголовок для операций GET и POST .
Мне нужен следующий тест для каждого user:
- Добавить элемент с уникальным именем (POST)
Это возвращает добавленный элемент, включая его id. - Считать элемент по id (GET)
Возвращается элемент. - Проверьте, равно ли имя в возвращенном элементе уникальному имени.
Выбор Locust для проверки
Для тривиальных веб-приложений я использую Apache Bench (ab), очень простой в использовании инструмент для стресс-тестирования. Но здесь он не очень полезен, потому что нам нужно выполнить инициализацию и написать (простой) тест.
Я уже написал много тестов для моего приложения FastAPI с Pytest, и проблем не было. Я читал о pytest-xdist , но похоже, что у него есть проблемы и/или он слишком медленный(?).
В этом случае я решил использовать Locust . Он прост в использовании, и вы можете кодировать свои тесты в Python. Замечательно.
Код
Ниже приведен код для теста. Некоторые примечания:
import logging
Я также записываю все в файл.
def unique_str()
В тесте мне нужны уникальные адреса электронной почты и названия городов. Мы могли бы также UUID4 здесь.
network_timeout, connection_timeout
Я не хочу, чтобы тест провалился, но клиент должен подождать.
wait_time
Я хочу, чтобы следующий тест начинался сразу после завершения текущего.
фиктивная задача
Я включаю эту задачу и отключаю все остальные при разработке метода 'on_start'.
(POST/GET) catch_response=True
Вместе с конструкцией 'with' это позволяет нам пометить запрос как неудачный, вызвав response.failure().
(GET) name=<URL>
Locust показывает все URL, но здесь GET имеет каждый раз другой URL из-за ID. Используя 'name=', он показывает только этот URL.
Проверка кода состояния
Locust считает запрос успешным, если код ответа <400, но я хочу явно проверить это значение.
RescheduleTask()
При возникновении ошибки нет необходимости продолжать задачу, поэтому мы поднимаем исключение. Можно сделать более продвинуто.
# test_cities_wrc.py
import logging
import random
import time
from locust import HttpUser, task, between
from locust.exception import RescheduleTask
test_email_host = 'my-local-test.test'
def unique_str():
return '{:.10f}.{}'.format(time.time(), random.randint(1000, 9999))
def unique_email():
return 'e.' + unique_str() + '@' + test_email_host
def unique_city_name():
return 'c.' + unique_str()
class WebsiteTestUser(HttpUser):
network_timeout = 30.0
connection_timeout = 30.0
#wait_time = between(0.5, 3.0)
def on_start(self):
base_url = 'http://127.0.0.1:8020/api/v1'
# set up urls
register_url = base_url + '/auth/register'
get_token_url = base_url + '/auth/token'
# urls used in task
self.cities_create_url = base_url + '/cities'
self.cities_get_by_id_url = base_url + '/cities/'
# get unique email
email = unique_email()
password = 'abcdefghi'
# register
response = self.client.post(
register_url,
json={'email': email, 'password': password},
)
if response.status_code != 201:
error_msg = 'register: response.status_code = {}, expected 201'.format(response.status_code)
logging.error(error_msg)
# get_token
# - username instead of email
# - x-www-form-urlencoded (instead of json)
response = self.client.post(
get_token_url,
data={'username': email, 'password': password},
)
access_token = response.json()['access_token']
logging.debug('get_token: for email = {}, access_token = {}'.format(email, access_token))
# set headers with access token
self.headers = {'Authorization': 'Bearer ' + access_token}
def on_stop(self):
pass
# enable this dummy task to develop 'on_start'
#@task
def dummy(self):
pass
@task
def cities_write_read_check(self):
# add city to api
city_name = unique_city_name()
logging.debug('cities_create: city_name = {}'.format(city_name))
with self.client.post(
self.cities_create_url,
json={'name': city_name},
headers=self.headers,
catch_response=True,
) as response:
if response.status_code != 201:
error_msg = 'cities_create: response.status_code = {}, expected 201, city_name = {}'.format(response.status_code, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
response_dict = response.json()
if 'data' not in response_dict:
error_msg = 'cities_create: data not in response_dict, city_name = {}'.format(city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
city_id = response_dict['data']['id']
logging.debug('cities_create: for city_name = {}, city_id = {}'.format(city_name, city_id))
# get city from api and check
with self.client.get(
self.cities_get_by_id_url + city_id,
headers=self.headers,
name=self.cities_get_by_id_url + 'uuid',
catch_response=True,
) as response:
if response.status_code != 200:
error_msg = 'cities_get_by_id: response.status_code = {}, expected 200, city_name = {}'.format(response.status_code, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
if 'data' not in response_dict:
error_msg = 'cities_get_by_id: data not in response_dict, city_name = {}'.format(city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
city_name_returned = response_dict['data']['name']
logging.debug('cities_get_by_id: for city_id = {}, city_name_returned = {}'.format(city_id, city_name_returned))
if city_name_returned != city_name:
error_msg = 'cities_get_by_id: city_name_returned = {} not equal city_name = {}'.format(city_name_returned, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
Запуск теста
ПРЕДУПРЕЖДЕНИЕ: Когда вы запускаете подобный тест, вы нагружаете ваш CPU!
Чтобы запустить тест и записать лог в файл 'lc.log', откройте терминал и введите команду:
locust -L DEBUG --logfile lc.log -f ./test_cities_wrc.py
А затем в браузере введите:
http://localhost:8089/
Затем, например, введите:
- Количество всего user для моделирования: 20
- Скорость нереста (users нерестятся/секунду): 20
- Хозяин: http:127.0.0.1
и нажмите кнопку 'Start swarming'. Сразу же вы должны услышать, как вентиляторы вашего компьютера начинают работать на максимальной скорости...
Каков результат?
Я действительно обнаружил проблему в своем коде. Эта проблема не проявилась с Pytest. Оказалось, что где-то в моем AccessControlManager Class , используемом с Dependency Injection, переменная могла быть перезаписана другим запросом. Как следствие, возвращался неверный user_id и тест не выполнялся, но не всегда, а когда два или более user обращались к API в один и тот же момент.
Резюме
Я не был уверен, что мое первое приложение FastAPI будет работать под нагрузкой с большим количеством одновременных user. Locust позволил мне смоделировать эту ситуацию очень простым способом, закодировав тест в Python. Locust имеет множество опций, а также может работать без головы, то есть без браузера.
Этот тест помог мне не только обнаружить проблему, но и изолировать ее. Я устранил проблему, но это также побудило меня прочитать больше о Dependency Injection в FastAPI.
Ссылки / кредиты
Locust
https://docs.locust.io/en/stable/
Оставить комментарий
Комментируйте анонимно или войдите в систему, чтобы прокомментировать.
Комментарии (1)
Оставьте ответ
Ответьте анонимно или войдите в систему, чтобы ответить.
Hello ! This is a nice article. But I was wondering one thing :
Imagine you have setup your engine like this :
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size=5, max_overflow=5)
And I have setup the middleware just like you said, when you try to start 11 requests at the same time, it just hangs. There's not really a queue or nothing. Is this because my routes are async ?
Thanks !
Недавний
- Скрытие первичных ключей базы данных UUID вашего веб-приложения
- Don't Repeat Yourself (DRY) с Jinja2
- SQLAlchemy, PostgreSQL, максимальное количество строк для user
- Показать значения в динамических фильтрах SQLAlchemy
- Безопасная передача данных с помощью шифрования Public Key и pyNaCl
- rqlite: альтернатива dist с высокой степенью готовности и SQLite
Большинство просмотренных
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb
- Подключение к службе на хосте Docker из контейнера Docker
- Использование PyInstaller и Cython для создания исполняемого файла Python
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов
- Flask Удовлетворительный запрос API проверка параметров запроса с помощью схем Маршмэллоу