Uso de Locust para probar la carga de una aplicación FastAPI con users concurrentes
Escriba las pruebas para users en el código regular de Python , luego ejecute las pruebas con users concurrentes.
Acabo de completar mi primera aplicación FastAPI . Esta app permite que los user tengan sus propios elementos, lo que significa que los modelos de datos user tienen todos un campo user_id. Nada especial, pero como FastAPI introduce algunas cosas nuevas para mí, como Dependency Injection, no estaba seguro de si mi aplicación funcionaría como yo quería.
Mi pregunta era: si pruebo la carga del API con muchos users concurrentes, ¿sigue funcionando?
Advertencia. Sólo ejecute una prueba de carga en su ordenador si está seguro de que puede soportarla.
Definición de la prueba
Necesitaba una herramienta que pudiera
- Inicializar múltiples users
- Ejecutar simultáneamente pruebas para cada user
- Hacer esto lo más rápido posible
Para inicializar un user debo registrar el user con una dirección de correo electrónico única. Después obtengo el token de acceso para este user, y establezco la cabecera para las operaciones GET y POST .
Quiero la siguiente prueba para cada user:
- Añadir un elemento con un nombre único (POST)
Esto devuelve el elemento añadido, incluyendo su id. - Leer de nuevo el elemento por id (GET)
Esto devuelve el elemento. - Comprobar si el nombre del elemento devuelto es igual al nombre único
Seleccionar Locust para la prueba
Para aplicaciones web triviales utilizo Apache Bench (ab), una herramienta muy fácil de usar para hacer pruebas de estrés. Pero no es realmente útil aquí porque necesitamos realizar la inicialización y escribir una prueba (simple).
Ya escribí muchas pruebas para mi aplicación FastAPI con Pytest, y no hubo problemas. He leído sobre pytest-xdist pero parece que tiene problemas y/o es demasiado lento(?).
Decidí utilizar Locust en este caso. Es fácil de usar y puedes codificar tus pruebas en Python. Es una maravilla.
El código
A continuación se muestra el código de la prueba. Algunas notas:
import logging
Registro todo también en un archivo.
def unique_str()
En la prueba necesito direcciones de correo electrónico únicas y nombres de ciudades. También podríamos un UUID4 aquí.
network_timeout, connection_timeout
No quiero que la prueba falle sino que el cliente espere.
wait_time
Quiero que la siguiente prueba comience inmediatamente cuando termine la actual.
tarea ficticia
Habilito ésta y deshabilito todas las demás tareas al desarrollar el método 'on_start'.
(POST/GET) catch_response=True
Junto con la construcción 'with' esto nos permite marcar una petición como fallida llamando a response.failure().
(GET) name=<URL>
Locust muestra todas las URLs pero aquí GET tiene una URL diferente cada vez debido al ID. Con el 'name=' sólo se muestra esta URL.
Comprobación del código de estado
Locust considera que una petición es correcta si el código de respuesta es <400, pero quiero comprobar explícitamente el valor.
RescheduleTask()
Cuando se produce un error no es necesario continuar una tarea por lo que lanzamos una excepción. Se podría hacer de forma más avanzada.
# test_cities_wrc.py
import logging
import random
import time
from locust import HttpUser, task, between
from locust.exception import RescheduleTask
test_email_host = 'my-local-test.test'
def unique_str():
return '{:.10f}.{}'.format(time.time(), random.randint(1000, 9999))
def unique_email():
return 'e.' + unique_str() + '@' + test_email_host
def unique_city_name():
return 'c.' + unique_str()
class WebsiteTestUser(HttpUser):
network_timeout = 30.0
connection_timeout = 30.0
#wait_time = between(0.5, 3.0)
def on_start(self):
base_url = 'http://127.0.0.1:8020/api/v1'
# set up urls
register_url = base_url + '/auth/register'
get_token_url = base_url + '/auth/token'
# urls used in task
self.cities_create_url = base_url + '/cities'
self.cities_get_by_id_url = base_url + '/cities/'
# get unique email
email = unique_email()
password = 'abcdefghi'
# register
response = self.client.post(
register_url,
json={'email': email, 'password': password},
)
if response.status_code != 201:
error_msg = 'register: response.status_code = {}, expected 201'.format(response.status_code)
logging.error(error_msg)
# get_token
# - username instead of email
# - x-www-form-urlencoded (instead of json)
response = self.client.post(
get_token_url,
data={'username': email, 'password': password},
)
access_token = response.json()['access_token']
logging.debug('get_token: for email = {}, access_token = {}'.format(email, access_token))
# set headers with access token
self.headers = {'Authorization': 'Bearer ' + access_token}
def on_stop(self):
pass
# enable this dummy task to develop 'on_start'
#@task
def dummy(self):
pass
@task
def cities_write_read_check(self):
# add city to api
city_name = unique_city_name()
logging.debug('cities_create: city_name = {}'.format(city_name))
with self.client.post(
self.cities_create_url,
json={'name': city_name},
headers=self.headers,
catch_response=True,
) as response:
if response.status_code != 201:
error_msg = 'cities_create: response.status_code = {}, expected 201, city_name = {}'.format(response.status_code, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
response_dict = response.json()
if 'data' not in response_dict:
error_msg = 'cities_create: data not in response_dict, city_name = {}'.format(city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
city_id = response_dict['data']['id']
logging.debug('cities_create: for city_name = {}, city_id = {}'.format(city_name, city_id))
# get city from api and check
with self.client.get(
self.cities_get_by_id_url + city_id,
headers=self.headers,
name=self.cities_get_by_id_url + 'uuid',
catch_response=True,
) as response:
if response.status_code != 200:
error_msg = 'cities_get_by_id: response.status_code = {}, expected 200, city_name = {}'.format(response.status_code, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
if 'data' not in response_dict:
error_msg = 'cities_get_by_id: data not in response_dict, city_name = {}'.format(city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
city_name_returned = response_dict['data']['name']
logging.debug('cities_get_by_id: for city_id = {}, city_name_returned = {}'.format(city_id, city_name_returned))
if city_name_returned != city_name:
error_msg = 'cities_get_by_id: city_name_returned = {} not equal city_name = {}'.format(city_name_returned, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
Ejecutando el test
ADVERTENCIA: ¡Cuando ejecutas una prueba como ésta estás estresando tu CPU!
Para ejecutar la prueba, y el registro en un archivo 'lc.log', abrir un terminal y entrar en el comando:
locust -L DEBUG --logfile lc.log -f ./test_cities_wrc.py
Y luego en su navegador escriba:
http://localhost:8089/
Luego, por ejemplo, introduzca:
- Número de user totales a simular: 20
- Tasa de desove (users desovados/segundo): 20
- Anfitrión: http:127.0.0.1
y pulsa el botón 'Start swarming'. Inmediatamente deberías oír cómo los ventiladores de tu ordenador empiezan a funcionar a máxima velocidad...
¿Cuál fue el resultado?
Efectivamente, encontré un problema en mi código. Este problema no apareció con Pytest. Parecía que en algún lugar de mi AccessControlManager Class utilizado con Dependency Injection, una variable podría ser sobrescrita por otra solicitud. Como consecuencia, se devolvía un user_id erróneo y la prueba fallaba, no siempre, sino cuando dos o más user accedían al API en el mismo momento.
Resumen
No sabía si mi primera aplicación FastAPI funcionaría bajo carga con muchos users concurrentes. Locust me permitió simular esta situación de forma muy sencilla, codificando la prueba en Python. Locust tiene muchas opciones y también puede ejecutarse sin cabeza, es decir, sin navegador.
Esta prueba no sólo me ayudó a detectar el problema sino también a aislarlo. Arreglé el problema pero esto también me indicó que debía leer más sobre Dependency Injection en FastAPI.
Enlaces / créditos
Locust
https://docs.locust.io/en/stable/
Deje un comentario
Comente de forma anónima o inicie sesión para comentar.
Comentarios (1)
Deje una respuesta.
Responda de forma anónima o inicie sesión para responder.
Hello ! This is a nice article. But I was wondering one thing :
Imagine you have setup your engine like this :
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size=5, max_overflow=5)
And I have setup the middleware just like you said, when you try to start 11 requests at the same time, it just hangs. There's not really a queue or nothing. Is this because my routes are async ?
Thanks !
Recientes
- Cómo ocultar las claves primarias de la base de datos UUID de su aplicación web
- Don't Repeat Yourself (DRY) con Jinja2
- SQLAlchemy, PostgreSQL, número máximo de filas por user
- Mostrar los valores en filtros dinámicos SQLAlchemy
- Transferencia de datos segura con cifrado de Public Key y pyNaCl
- rqlite: una alternativa de alta disponibilidad y dist distribuida SQLite
Más vistos
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- Usando PyInstaller y Cython para crear un ejecutable de Python
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados
- Flask RESTful API validación de parámetros de solicitud con esquemas Marshmallow