Cómo cancelar tareas con Python Asynchronous IO (AsyncIO)
AsyncIO 'create_task()' no inicia una tarea sino que la programa para su ejecución en el bucle de eventos.
Para un proyecto estaba usando AIOHTTP para comprobar las respuestas de muchos sitios web remotos, URLs. Las URLs provenían de una lista. Esta lista puede contener duplicados.
Todo bien hasta que noté que algunas respuestas también tenían el código de estado HTTP-429 'Demasiadas peticiones'. Cualquiera que sea la razón, sobrecarga, seguridad, queremos comportarnos amigablemente y no queremos llamar URLs idénticas de nuevo, al menos por un tiempo mínimo. Como estamos usando AIOHTTP, muchas peticiones estarán esperando la conexión a internet, no estamos usando un flag para comprobar si pueden continuar.
Este post trata sobre abortar (cancelar) estas peticiones en espera usando el método 'cancel()'. No es sobre AIOHTTP o limitación de tasa, ese es otro tema.
Ejemplo: 3 tareas, sin cancelación
A continuación se muestra el código sin cancelar las tareas en espera. Aquí no uso AIOHTTP sino 'asyncio.sleep()'. Como queremos un comportamiento determinista, en lugar de aleatorio, nosotros mismos especificamos el tiempo de reposo de las tareas.
# 3 tasks, no cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
print(f'{f_main} task{task.id} done')
result = task.result()
print(f'{f_main} task{task.id} result: {result}')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Nada especial aquí, como era de esperar la tarea2 completó primero, seguida por la tarea1 y la tarea0. El resultado:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 done
main task2 result: normal return
main wait: 0 done, 2 pending
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 1 pending
main task1 done
main task1 result: normal return
main wait: 0 done, 1 pending
mytask0 after sleep(2), returning ...
main wait: 1 done, 0 pending
main task0 done
main task0 result: normal return
Crear una tarea en AsyncIO no significa que la tarea se inicie.
Por el contrario, cuando se utiliza Multiprocessing o Treads, en AsyncIO no existe el método 'start()'.
Multiprocessing:
p = multiprocessing.Process(target=task, args ...)
p.start()
Threading:
t = threading.Thread(target=task, args ...)
t.start()
AsyncIO:
t = asyncio.create_task(task, args ...)
La documentación de AsyncIO no menciona cuando se inicia una tarea, o se ejecuta por primera vez, cuando se utiliza 'create_task()', a no ser que me haya perdido algo. Está programada para su ejecución, lo que significa que puede empezar a ejecutarse inmediatamente pero también algún tiempo después, cuando el bucle de eventos está 'esperando' alguna(s) co-rutina(s).
Esto es importante porque cuando cancelas una tarea, puede que aún no se haya ejecutado, lo que significa que la co-rutina de la tarea aún no ha sido llamada.
Diferentes estados de una tarea cuando se cancela una tarea
Con el método 'cancel()' de una tarea podemos marcar que una tarea debe ser cancelada. De lo anterior se deduce que cuando se aplica el método 'cancel()', el estado de la tarea puede ser:
- La tarea no ha comenzado a ejecutarse todavía, o,
- La tarea comenzó a ejecutarse y se completó, o,
- La tarea comenzó a ejecutarse y está esperando en algún lugar
Cancelar una tarea que aún no ha comenzado a ejecutarse significa que la tarea NO se ejecutará. Cancelar una tarea que comenzó a ejecutarse y está en espera significa: abortar la tarea en espera. Si la tarea ya se ha completado, entonces la tarea no se marcará como cancelada.
Estas tres condiciones se muestran a continuación, estoy corriendo con AsyncIO en Python 3.10. ¡Debemos 'esperar' la tarea, de lo contrario los resultados no son correctos! Cuando se cancela una tarea, se genera un error asyncio.CancelledError. El parámetro 'msg' del método 'cancel()' sólo se propaga al bucle principal a partir de Python 3.11.
Cancelar una tarea no iniciada
# cancel a not started task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# cancel task before awaitable
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Se ha producido un CancelledError. Lo importante en el resultado es que la tarea no se ha iniciado:
main create task
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Cancelar una tarea finalizada
Aquí añadimos un 'awaitable', 'asyncio.sleep()' después de crear la tarea:
# cancel a completed task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 3 sec, allow task to complete
print(f'{f_main} before await sleep')
await asyncio.sleep(3)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
La tarea terminó antes de ser cancelada. El resultado:
main create task
main before await sleep
mytask before await sleep
mytask after await sleep
main after await sleep
main task.done() = True
main task.cancelled() = False
main task.result(): normal return
Cancelar una tarea no completada (en espera)
Aquí cambiamos el tiempo de espera en el bucle principal de 3 segundos a 1 segundo antes de cancelar la tarea.
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Resultado:
main create task
main before await sleep
mytask before await sleep
main after await sleep
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Capturar el CancelledError en la tarea
También es posible capturar el error asyncio.CancelledError en la tarea. Tenga en cuenta que no puede confiar en los valores de retorno de una tarea, porque la tarea puede no iniciarse en absoluto, véase más arriba. Aquí está el código. Usando el último ejemplo anterior, añadimos el manejo de excepciones también a la tarea.
# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
El resultado. Observe que el parámetro 'msg' del método 'cancel()' se propaga a la tarea.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main task.done() = True
main task.cancelled() = False
main task.result(): CancelledError return
Si también quieres que el error cancelado se propague al bucle principal, puedes lanzar la excepción de nuevo en la tarea:
# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
# raise the exception again
raise
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
En el resultado vemos el CancelledError primero en la tarea y luego en el bucle principal. De nuevo, como estamos en Python 3.10, el parámetro 'msg' del método 'cancel()' sólo se propaga a la tarea, no al bucle principal.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Ejemplo: 3 tareas, con cancelación
A continuación se muestra el código del inicio de este post, 'Ejemplo: 3 tareas, sin cancelación', modificado para cancelar tareas. En mi caso no me interesa capturar el CancelledError en una tarea, sólo quiero que se cancelen la(s) tarea(s). Volviendo a la pregunta original, cancelar una tarea cuando una determinada tarea devuelve un determinado valor (estado HTTP-429), cancelamos la tarea0 al recibir el resultado de la tarea2.
Si queremos obtener la razón de la cancelación, como se especifica en el método 'cancel()', podemos lanzar la excepción en el bucle llamando al método 'exception()'.
# 3 tasks, with cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
if task.cancelled():
print(f'{f_main} task{task.id} was cancelled')
# get cancel msg, python >= 3.11 only
cancel_msg = None
try:
task.exception()
except asyncio.CancelledError as e:
cancel_msg = e
print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
else:
print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')
# cancel task0 if task2 completed
if task.id == 2:
print(f'{f_main} task{task.id} cancel task0')
tasks[0].cancel(msg='my cancel reason')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
El resultado:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 was not cancelled, result: normal return
main task2 cancel task0
main wait: 1 done, 1 pending
main task0 was cancelled
main task0 cancel_msg:
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 0 pending
main task1 was not cancelled, result: normal return
Resumen
Cancelar una tarea con AsyncIO es un poco confuso al principio. El método 'create_task()' de AsyncIO no inicia una tarea, sólo la programa para su ejecución. Lo principal a recordar es siempre 'esperar' una tarea antes de comprobar sus métodos 'done()' y 'cancelled()'. Dónde capturar el CancelledError, en el bucle principal o en la tarea, o en ambos, depende de tu aplicación. Y para obtener la razón de la cancelación en el bucle principal necesitas Python 3.11 arriba.
Enlaces / créditos
asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390
Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
Leer más
Async
Recientes
- Cómo ocultar las claves primarias de la base de datos UUID de su aplicación web
- Don't Repeat Yourself (DRY) con Jinja2
- SQLAlchemy, PostgreSQL, número máximo de filas por user
- Mostrar los valores en filtros dinámicos SQLAlchemy
- Transferencia de datos segura con cifrado de Public Key y pyNaCl
- rqlite: una alternativa de alta disponibilidad y dist distribuida SQLite
Más vistos
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- Usando PyInstaller y Cython para crear un ejecutable de Python
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados
- Flask RESTful API validación de parámetros de solicitud con esquemas Marshmallow