Aplicación Flask que muestra stdout y stderr de un trabajo de fondo
Se utiliza un multiprocessing.Queue() para capturar las líneas stdout y stderr en tiempo real.
En un proyecto Flask , necesitaba ejecutar un trabajo en segundo plano, más concretamente un comando que se ejecuta en un terminal (Linux), y mostrar su salida, stdout y sterr, en tiempo real en una ventana del navegador. Puedes encontrar algunas soluciones en internet y esta es una más. También estoy usando un código que encontré en la web, ver enlaces más abajo.
Esta solución está utilizando:
- multiproceso, para iniciar un nuevo proceso desde nuestra aplicación Flask
- subprocess, para iniciar el comando
- threads, para capturar stdin y stdout
- un multiprocessing.Queue para:
- almacenar la salida del trabajo en segundo plano
- leer la salida del trabajo en segundo plano en nuestra aplicación Flask
Como siempre, estoy ejecutando esto en Ubuntu.
Aplicación de una sola página
La aplicación de demostración Flask es una aplicación de una sola página. En esta página podemos iniciar un comando, y una vez iniciado, una ventana en la página muestra la salida en el comando.
La app Flask tiene dos rutas:
start_command()
El método GET envía la página. El método POST se utiliza para iniciar y detener un comando. Los comandos que puedes probar en esta demo:
- pwd
- ls -lR
- ps -Af
- someunknowncommand
- cat /var/log/syslog
- tail --lines=5000 /var/log/syslog
- tail -f /var/log/syslog
- docker
get_command_result_data(queue_id)
Esta ruta es llamada por el Javascript en la página cada segundo una vez que se inicia un comando. Los datos recibidos se añaden a un 'div' en la página.
Este proyecto también utiliza Bootstrap y JQuery.
Servicios en Flask
Para esta aplicación Flask he creado un nuevo servicio. Como siempre pongo el servicio en la carpeta app.services, e inicializo el servicio en factory.py, usando init_app(), igual que las extensiones Flask . Entonces todo lo que tenemos que hacer es incluir la siguiente línea en nuestros archivos Python :
import app.services as services
Y luego llamamos a nuestro servicio como:
services.our_service.some_method()
De esta forma no tenemos que preocuparnos por las importaciones cíclicas.
El servicio BackgroundCommandRunner
BackgroundCommandRunner es nuestro nuevo servicio con dos métodos:
start_command(comando)
Este método crea una cola e inicia un nuevo proceso en segundo plano que ejecuta el comando dado. El proceso captura stdout y stderr y lo pone en la cola.
Devuelve un tuple (proceso, cola):
- proceso: valor de retorno de multiprocessing.Process
- cola: el valor devuelto por multiprocessing.Queue(), incluyendo un id y una marca de tiempo
Para obtener el identificador del proceso: process.pid
Para obtener el identificador de la cola: queue.id
get_queue_data(queue_id)
Devuelve todos los nuevos datos disponibles (líneas) leyendo de la cola hasta que esté vacía. Los datos devueltos están jsonificados, lo que significa que podemos devolverlos al cliente.
¿Cuándo tenemos todos los datos de salida (stdout, stderr) del proceso?
En el cliente queremos saber cuándo ha finalizado el proceso en segundo plano. El funcionamiento normal, es decir, sin errores, es que los flujos stdout y stderr están cerrados.
Suponiendo que los flujos no funcionen como se espera en algunos casos, como último recurso, podemos esperar a que subprocess termine. Entonces añadimos un pequeño retraso, de dos segundos, para permitir que la cola se llene con los datos restantes.
Hay otras condiciones, por ejemplo, al iniciar un comando desconocido, subprocess genera una excepción. En este caso, enviamos el mensaje de error y activamos la bandera 'ready'.
Algunas notas
- subprocess no se inicia con 'shell=True' porque eso iniciaría otro proceso.
- shlex se utiliza para descomponer un comando (cadena) en una secuencia de argumentos antes de llamar a subprocess.
- Los errores en el proceso en segundo plano se capturan y se envían al cliente utilizando la cola.
- Las colas se eliminan tras un tiempo de inactividad (60 segundos).
Pruebe usted mismo
¡Atención! Mientras inicie y detenga las tareas en segundo plano desde la página web, no debería haber problemas. Sin embargo, si algo va mal o finalizas Flask pulsando Control-C en la línea de comandos mientras se está ejecutando un proceso en segundo plano, deberás detener este proceso en segundo plano antes de reiniciar la aplicación Flask . Esto es una demo y no se han tomado medidas para manejar estas situaciones con elegancia.
Por si quieres probar tú mismo, aquí tienes el árbol del proyecto:
.
├── project
│ ├── app
│ │ ├── services
│ │ │ ├── background_command_runner.py
│ │ │ └── __init__.py
│ │ ├── templates
│ │ │ ├── base.html
│ │ │ └── start_command.html
│ │ └── factory.py
│ └── run.py
Crea primero un virtual environment y luego instala Flask:
pip install Flask
A continuación, cree los siguientes archivos.
run.py
# run.py
from app.factory import create_app
app = create_app()
if __name__ == '__main__':
app.run(
host= '0.0.0.0',
port=5050,
debug=True,
use_reloader=True,
)
app/factory.py
# app/factory.py
import datetime
import logging
import os
import re
import signal
import sys
import string
import time
from flask import current_app, Flask, g, json, redirect, request, render_template
from .services import (
background_command_runner,
)
import app.services as services
def setup_logging():
logger = logging.getLogger(__name__)
logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
logger.setLevel(logging.DEBUG)
# console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(console_handler)
return logger
def create_app():
app = Flask(__name__)
# reload template if differs from cached
app.jinja_env.auto_reload = True
app.config['TEMPLATES_AUTO_RELOAD'] = True
# logging
app.logger = setup_logging()
# init services
background_command_runner.init_app(app)
# route to start a command
@app.route('/', methods=['GET', 'POST'])
def start_command():
current_app.logger.debug('()')
command_pid = None
command_qid = None
command = None
error_message = None
if request.method == 'POST':
# stop current background process, if running
try:
command_pid = int(request.form.get('command_pid'))
current_app.logger.debug('command_pid = {}'.format(command_pid))
os.kill(command_pid, signal.SIGKILL)
except:
pass
action = request.form.get('action')
if action == 'start_command':
try:
current_app.logger.debug('starting background command ...')
command = request.form.get('command')
p, q = services.background_command_runner.start_command(command)
command_pid = p.pid
command_qid = q.id
except Exception as e:
error_message = 'Error starting command {}: {}, {}'.format(command, type(e).__name__, e.args)
elif action == 'stop_command':
current_app.logger.debug('stopping background command ...')
pass
return render_template(
'/start_command.html',
page_title='Run command in background',
command=command,
command_pid=command_pid,
command_qid=command_qid,
error_message=error_message,
)
# route to get data from a command
@app.route('/get-command-result-data/<command_qid>', methods=['GET'])
def get_command_result_data(command_qid):
current_app.logger.debug('(command_qid = {})'.format(command_qid))
return services.background_command_runner.get_queue_data(command_qid)
return app
app/services/background_command_runner.py
# app/services/background_command_runner.py
import datetime
import multiprocessing
import os
import queue
import shlex
import subprocess
import sys
import threading
import time
import uuid
from flask import current_app, jsonify
class BackgroundCommandRunner:
def __init__(self, app=None):
self.app = app
# storage for queues by id
self.qid_queues = {}
# remove queue if no activity after this time
self.max_queue_secs = 60
# stream end-of-transmission character
self.EOT = None
if app is not None:
self.init_app(app)
def init_app(self, app):
pass
def __create_queue(self):
q = multiprocessing.Queue()
q.id = uuid.uuid4().hex
q.et = int(time.time())
q.stdout_closed = False
q.stderr_closed = False
self.qid_queues[q.id] = q
return q
def __get_queue_by_id(self, qid):
self.__cleanup_queues()
q = self.qid_queues.get(qid)
if q is not None:
q.et = int(time.time())
return q
def __to_json(self, d):
current_app.logger.debug('d = {}'.format(d))
d_json = None
try:
d_json = jsonify(d)
except Exception as e:
current_app.logger.error('jsonify error, exception = {}, e.args = {} for d = {}'.format(type(e).__name__, e.args, d))
return d_json
def get_queue_data(self, qid):
q = self.__get_queue_by_id(qid)
if q is None:
data = {
'lines': [],
'errors': ['Queue disappeared'],
'ready': True,
}
return self.__to_json({'data': data})
errors = None
ready = False
lines = []
# loop while queue not empty or max lines
while len(lines) < 1000:
try:
stream, line = q.get(block=True, timeout=0.2)
except queue.Empty:
break
except Exception as e:
errors = [type(e).__name__ + ', ' + str(e.args)]
current_app.logger.error('exception = {}, e.args = {}'.format(type(e).__name__, e.args))
ready = True
break
current_app.logger.debug('data from queue, stream = {}, type(line) = {}, line = {}'.format(stream, type(line), line))
if line == self.EOT:
if stream == 'stdout':
q.stdout_closed = True
elif stream == 'stderr':
q.stderr_closed = True
if q.stdout_closed and q.stderr_closed:
ready = True
continue
lines.append({
'stream': stream,
'line': line,
})
if stream == 'exit_code':
current_app.logger.debug('exit_code received')
ready = True
data = {
'lines': lines,
'errors': errors,
'ready': ready,
}
return self.__to_json({'data': data})
def __cleanup_queues(self):
et = int(time.time())
to_delete_qids = [q.id for q in self.qid_queues.values() if (et - q.et) > self.max_queue_secs]
for qid in to_delete_qids:
del self.qid_queues[qid]
def __reader(self, stream, pipe, q):
try:
with pipe:
for line in iter(pipe.readline, ''):
q.put((stream, line))
finally:
q.put((stream, self.EOT))
def __run_command_as_subprocess(self, command, q):
try:
if isinstance(command, str):
command = shlex.split(command)
#print('COMMAND = {}'.format(command))
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines = True,
)
threading.Thread(target=self.__reader, args=['stdout', p.stdout, q], daemon=True).start()
threading.Thread(target=self.__reader, args=['stderr', p.stderr, q], daemon=True).start()
# delay: the process may have completed but the output was still not processed
exit_code = p.wait()
time.sleep(2)
#print('EXIT_CODE = {}'.format(exit_code))
q.put(('exit_code', exit_code))
except Exception as e:
error_message = 'There was an error running the command = {}: {}, {}'.format(command, type(e).__name__, e.args)
#print('ERROR_MESSAGE = {}'.format(error_message))
q.put(('stderr', error_message))
q.put(('exit_code', 1))
def start_command(self, command):
# start process and return
q = self.__create_queue()
q.put(('stdout', 'Running command: ' + command))
p = multiprocessing.Process(
name='__run_command_as_subprocess',
target=self.__run_command_as_subprocess,
args=(command, q),
)
p.start()
return (p, q)
app/services/__init__.py
# app/services/__init__.py
from .background_command_runner import BackgroundCommandRunner
background_command_runner = BackgroundCommandRunner()
app/templates/base.html
{# app/templates/base.html #}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>{{ page_title }}</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
</head>
<body>
<main id="main" class="container-fluid py-3 qps-0 qflex-fill mt-0">
{% block main -%}{% endblock -%}
{%- if command_qid -%}
<div class="row px-2">
<div class="col-8 px-2 py-2 my-0">
Results for command '{{ command }}':
</div>
<div class="col-4 px-2 pt-0 pb-2 my-0 text-end">
<form method="post">
<input type="hidden" name="command_pid" value="{{ command_pid }}">
<button type="submit" name="action" value="stop_command" class="btn btn-outline-dark btn-sm">
Stop command
</button>
</form>
</div>
</div>
<div class="row px-2">
<div class="col border p-3 overflow-scroll small" id="command_result_data" style="height: 400px;">
</div>
</div>
<p>
Lines received: <span id="lines-received">0</span>
</p>
{%- endif -%}
</main>
<script src="https://code.jquery.com/jquery-3.6.2.min.js" integrity="sha256-2krYZKh//PcchRtd+H+VyyQoZ/e3EcrkxhM8ycwASPA=" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous"></script>
{%- set get_command_result_data_url = '' -%}
{%- if command_qid -%}
{%- set get_command_result_data_url = url_for('get_command_result_data', command_qid=command_qid) -%}
<script>
var build_result_update_secs = 1000;
var lines_received = 0;
function get_command_result_data(){
var url, data, box, li, i, stream, line, exit_code_received = false;
url = '{{ get_command_result_data_url }}';
console.log('url = ' + url);
if(url == ''){
return;
}
$.ajax({
url: url,
type: 'GET',
dataType: 'json',
success: function(rcvd){
data = rcvd.data;
lines_received += data.lines.length;
box = $('#command_result_data');
for(i = 0; i < data.lines.length; i++){
li = data.lines[i];
stream = li.stream;
if(stream == 'stdout'){
line = li.line;
}else if(stream == 'stderr'){
line = 'stderr: ' + li.line;
}else{
line = stream + ': ' + li.line;
}
box.append(line + '<br>');
}
if(data.errors){
data.errors.forEach(function(li, i){
box.append('ERROR: ' + li + '<br>');
});
}
if(data.ready){
box.append('Ready' + '<br>');
}else{
setTimeout(get_command_result_data, build_result_update_secs);
}
box.scrollTop(box.prop('scrollHeight'));
$('#lines-received').text(lines_received);
}
});
}
$(document).ready(function(){
setTimeout(get_command_result_data, build_result_update_secs);
});
</script>
{%- endif -%}
</body>
</html>
app/templates/start_command.html
{# app/templates/start_command.html #}
{% extends "base.html" %}
{% block main %}
<h2 class="mb-3">
{{ page_title }}
</h2>
{%- if error_message -%}
<p class="text-danger">
{{ error_message }}
</p>
{%- endif -%}
<form method="post">
<input type="hidden" name="command_pid" value="{{ command_pid }}">
<select name="command" class="form-select" aria-label="Select command">
<option value="pwd">
pwd
</option>
<option value="ls -lR">
ls -lR
</option>
<option value="ps -Af">
ps -Af
</option>
<option value="someunknowncommand">
someunknowncommand
</option>
<option value="tail --lines=5000 /var/log/syslog">
tail --lines=5000 /var/log/syslog
</option>
<option value="tail -f /var/log/syslog">
tail -f /var/log/syslog
</option>
<option value="docker">
docker
</option>
</select>
<button type="submit" name="action" value="start_command" class="btn btn-primary my-3">
Start command
</button>
</form>
{%- endblock -%}
Ejecutar el proyecto
Inicie la aplicación desplazándose al directorio del proyecto y escribiendo:
python run.py
A continuación, dirija su navegador a:
http://127.0.0.1:5050
Debería aparecer la página. Seleccione un comando y observe la salida.
Resumen
Como siempre esto tomó más tiempo de lo esperado. Inicialmente pasé el comando como una cadena a subprocess. Los comandos 'pwd' y 'ls' funcionaron, pero 'ls -l' produjo el mensaje:
FileNotFoundError, (2, 'No such file or directory')
Después de utilizar shlex, este error desapareció.
Otra dificultad era decidir cuándo habíamos obtenido todos los datos del subprocess. Primero esperé a que se cerraran tanto stdout como stderr . Pero esto a veces no funcionaba. Como último recurso, esperamos a que termine el subprocess y añadimos un pequeño retardo de dos segundos, y consideramos que éste es el final de los flujos.
Enlaces / créditos
How to continuously display Python output in a Webpage?
https://stackoverflow.com/questions/15092961/how-to-continuously-display-python-output-in-a-webpage
Python - shlex - Simple lexical analysis
https://docs.python.org/3/library/shlex.html
Python - subprocess - Popen
https://docs.python.org/3/library/subprocess.html#subprocess.Popen
Python read from subprocess stdout and stderr separately while preserving order
https://stackoverflow.com/questions/31833897/python-read-from-subprocess-stdout-and-stderr-separately-while-preserving-order
Leer más
Flask Multiprocessing
Recientes
- Cómo ocultar las claves primarias de la base de datos UUID de su aplicación web
- Don't Repeat Yourself (DRY) con Jinja2
- SQLAlchemy, PostgreSQL, número máximo de filas por user
- Mostrar los valores en filtros dinámicos SQLAlchemy
- Transferencia de datos segura con cifrado de Public Key y pyNaCl
- rqlite: una alternativa de alta disponibilidad y dist distribuida SQLite
Más vistos
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- Usando PyInstaller y Cython para crear un ejecutable de Python
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados
- Flask RESTful API validación de parámetros de solicitud con esquemas Marshmallow