Приложение Flask , показывающее stdout и stderr фонового задания
multiprocessing.Queue() используется для захвата линий stdout и stderr в режиме реального времени.
В проекте Flask мне нужно было запустить фоновое задание, а точнее команду, запущенную в терминале (Linux), и показать ее вывод, stdout и sterr, в реальном времени в окне браузера. Вы можете найти несколько решений в интернете, и это просто еще одно. Я также использую некоторый код, который я нашел в Интернете, см. ссылки ниже.
Это решение использует:
- мультипроцессинг, для запуска нового процесса из нашего приложения Flask
- subprocess, для запуска команды
- потоков, для захвата stdin и stdout
- multiprocessing.Queue для:
- хранения вывода фонового задания
- чтения вывода фонового задания в наше приложение Flask .
Как обычно, я запускаю это на Ubuntu.
Одностраничное приложение
Демонстрационное приложение Flask - это одностраничное приложение. На этой странице мы можем запустить команду, и после запуска в окне на странице отображается вывод по команде.
Приложение Flask имеет два маршрута:
start_command()
GET-метод отправляет страницу. Метод POST-метод используется для запуска команды и остановки команды. Команды, которые вы можете попробовать в этой демонстрации:
- pwd
- ls -lR
- ps -Af
- someunknowncommand
- cat /var/log/syslog
- tail --lines=5000 /var/log/syslog
- tail -f /var/log/syslog
- docker
get_command_result_data(queue_id)
Этот маршрут вызывается Javascript на странице каждую секунду после запуска команды. Полученные данные добавляются в 'div' на странице.
В этом проекте также используются Bootstrap и JQuery.
Услуги в Flask
Для этого приложения Flask я создал новый сервис. Как обычно, я поместил службу в папку app.services, а инициализировал службу в factory.py, используя init_app(), как и расширения Flask . Тогда все, что нам нужно сделать, это включить следующую строку в наши файлы Python :
import app.services as services
И затем мы вызываем нашу службу следующим образом:
services.our_service.some_method()
Таким образом, нам не придется беспокоиться о циклическом импорте.
Служба BackgroundCommandRunner
BackgroundCommandRunner - это наш новый сервис с двумя методами:
start_command(command)
Этот метод создает очередь и запускает новый фоновый процесс, выполняющий заданную команду. Процесс перехватывает stdout и stderr и помещает это в очередь.
Возвращает tuple (процесс, очередь):
- процесс: возвращаемое значение из multiprocessing.Process
- очередь: возвращаемое значение из multiprocessing.Queue(), включая идентификатор и метку времени.
Чтобы получить идентификатор процесса: process.pid
Чтобы получить идентификатор очереди: queue.id
get_queue_data(queue_id)
Возвращает все новые доступные данные (строки) путем чтения из очереди, пока она не опустеет. Возвращенные данные являются jsonified, что означает, что мы можем вернуть их клиенту.
Когда у нас будут все выходные данные (stdout, stderr) из процесса?
В клиенте мы хотим знать, когда завершился фоновый процесс. При нормальной работе, т.е. работе без ошибок, потоки stdout и stderr закрываются.
Предполагая, что в некоторых случаях потоки не работают так, как ожидалось, в качестве последнего средства мы можем подождать, пока завершится subprocess . Затем мы добавляем небольшую задержку, две секунды, чтобы очередь заполнилась оставшимися данными.
Существуют и другие условия, например, при запуске неизвестной команды subprocess генерирует исключение. В этом случае мы отправляем сообщение об ошибке и устанавливаем флаг 'ready'-.
Некоторые примечания
- subprocess не запускается с 'shell=True', так как это привело бы к запуску другого процесса.
- shlex используется для разбиения команды (строки) на последовательность аргументов перед вызовом subprocess.
- Ошибки в фоновом процессе перехватываются и отправляются клиенту с помощью очереди.
- Очереди удаляются после некоторого времени бездействия (60 секунд).
Попробуйте сами
Внимание! Пока вы запускаете и останавливаете фоновые задачи с веб-страницы, проблем быть не должно. Однако, если что-то пойдет не так или вы завершите Flask , нажав Control-C в командной строке, когда фоновый процесс запущен, то вы должны остановить этот фоновый процесс перед перезапуском приложения Flask . Это демо-версия, и не было предусмотрено никаких мер по изящной обработке таких ситуаций.
Если вы хотите попробовать сами, вот дерево проекта:
.
├── project
│ ├── app
│ │ ├── services
│ │ │ ├── background_command_runner.py
│ │ │ └── __init__.py
│ │ ├── templates
│ │ │ ├── base.html
│ │ │ └── start_command.html
│ │ └── factory.py
│ └── run.py
Сначала создайте virtual environment , а затем установите Flask:
pip install Flask
Затем создайте следующие файлы.
run.py
# run.py
from app.factory import create_app
app = create_app()
if __name__ == '__main__':
app.run(
host= '0.0.0.0',
port=5050,
debug=True,
use_reloader=True,
)
app/factory.py
# app/factory.py
import datetime
import logging
import os
import re
import signal
import sys
import string
import time
from flask import current_app, Flask, g, json, redirect, request, render_template
from .services import (
background_command_runner,
)
import app.services as services
def setup_logging():
logger = logging.getLogger(__name__)
logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
logger.setLevel(logging.DEBUG)
# console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(console_handler)
return logger
def create_app():
app = Flask(__name__)
# reload template if differs from cached
app.jinja_env.auto_reload = True
app.config['TEMPLATES_AUTO_RELOAD'] = True
# logging
app.logger = setup_logging()
# init services
background_command_runner.init_app(app)
# route to start a command
@app.route('/', methods=['GET', 'POST'])
def start_command():
current_app.logger.debug('()')
command_pid = None
command_qid = None
command = None
error_message = None
if request.method == 'POST':
# stop current background process, if running
try:
command_pid = int(request.form.get('command_pid'))
current_app.logger.debug('command_pid = {}'.format(command_pid))
os.kill(command_pid, signal.SIGKILL)
except:
pass
action = request.form.get('action')
if action == 'start_command':
try:
current_app.logger.debug('starting background command ...')
command = request.form.get('command')
p, q = services.background_command_runner.start_command(command)
command_pid = p.pid
command_qid = q.id
except Exception as e:
error_message = 'Error starting command {}: {}, {}'.format(command, type(e).__name__, e.args)
elif action == 'stop_command':
current_app.logger.debug('stopping background command ...')
pass
return render_template(
'/start_command.html',
page_title='Run command in background',
command=command,
command_pid=command_pid,
command_qid=command_qid,
error_message=error_message,
)
# route to get data from a command
@app.route('/get-command-result-data/<command_qid>', methods=['GET'])
def get_command_result_data(command_qid):
current_app.logger.debug('(command_qid = {})'.format(command_qid))
return services.background_command_runner.get_queue_data(command_qid)
return app
app/services/background_command_runner.py
# app/services/background_command_runner.py
import datetime
import multiprocessing
import os
import queue
import shlex
import subprocess
import sys
import threading
import time
import uuid
from flask import current_app, jsonify
class BackgroundCommandRunner:
def __init__(self, app=None):
self.app = app
# storage for queues by id
self.qid_queues = {}
# remove queue if no activity after this time
self.max_queue_secs = 60
# stream end-of-transmission character
self.EOT = None
if app is not None:
self.init_app(app)
def init_app(self, app):
pass
def __create_queue(self):
q = multiprocessing.Queue()
q.id = uuid.uuid4().hex
q.et = int(time.time())
q.stdout_closed = False
q.stderr_closed = False
self.qid_queues[q.id] = q
return q
def __get_queue_by_id(self, qid):
self.__cleanup_queues()
q = self.qid_queues.get(qid)
if q is not None:
q.et = int(time.time())
return q
def __to_json(self, d):
current_app.logger.debug('d = {}'.format(d))
d_json = None
try:
d_json = jsonify(d)
except Exception as e:
current_app.logger.error('jsonify error, exception = {}, e.args = {} for d = {}'.format(type(e).__name__, e.args, d))
return d_json
def get_queue_data(self, qid):
q = self.__get_queue_by_id(qid)
if q is None:
data = {
'lines': [],
'errors': ['Queue disappeared'],
'ready': True,
}
return self.__to_json({'data': data})
errors = None
ready = False
lines = []
# loop while queue not empty or max lines
while len(lines) < 1000:
try:
stream, line = q.get(block=True, timeout=0.2)
except queue.Empty:
break
except Exception as e:
errors = [type(e).__name__ + ', ' + str(e.args)]
current_app.logger.error('exception = {}, e.args = {}'.format(type(e).__name__, e.args))
ready = True
break
current_app.logger.debug('data from queue, stream = {}, type(line) = {}, line = {}'.format(stream, type(line), line))
if line == self.EOT:
if stream == 'stdout':
q.stdout_closed = True
elif stream == 'stderr':
q.stderr_closed = True
if q.stdout_closed and q.stderr_closed:
ready = True
continue
lines.append({
'stream': stream,
'line': line,
})
if stream == 'exit_code':
current_app.logger.debug('exit_code received')
ready = True
data = {
'lines': lines,
'errors': errors,
'ready': ready,
}
return self.__to_json({'data': data})
def __cleanup_queues(self):
et = int(time.time())
to_delete_qids = [q.id for q in self.qid_queues.values() if (et - q.et) > self.max_queue_secs]
for qid in to_delete_qids:
del self.qid_queues[qid]
def __reader(self, stream, pipe, q):
try:
with pipe:
for line in iter(pipe.readline, ''):
q.put((stream, line))
finally:
q.put((stream, self.EOT))
def __run_command_as_subprocess(self, command, q):
try:
if isinstance(command, str):
command = shlex.split(command)
#print('COMMAND = {}'.format(command))
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines = True,
)
threading.Thread(target=self.__reader, args=['stdout', p.stdout, q], daemon=True).start()
threading.Thread(target=self.__reader, args=['stderr', p.stderr, q], daemon=True).start()
# delay: the process may have completed but the output was still not processed
exit_code = p.wait()
time.sleep(2)
#print('EXIT_CODE = {}'.format(exit_code))
q.put(('exit_code', exit_code))
except Exception as e:
error_message = 'There was an error running the command = {}: {}, {}'.format(command, type(e).__name__, e.args)
#print('ERROR_MESSAGE = {}'.format(error_message))
q.put(('stderr', error_message))
q.put(('exit_code', 1))
def start_command(self, command):
# start process and return
q = self.__create_queue()
q.put(('stdout', 'Running command: ' + command))
p = multiprocessing.Process(
name='__run_command_as_subprocess',
target=self.__run_command_as_subprocess,
args=(command, q),
)
p.start()
return (p, q)
app/services/__init__.py
# app/services/__init__.py
from .background_command_runner import BackgroundCommandRunner
background_command_runner = BackgroundCommandRunner()
app/templates/base.html
{# app/templates/base.html #}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>{{ page_title }}</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
</head>
<body>
<main id="main" class="container-fluid py-3 qps-0 qflex-fill mt-0">
{% block main -%}{% endblock -%}
{%- if command_qid -%}
<div class="row px-2">
<div class="col-8 px-2 py-2 my-0">
Results for command '{{ command }}':
</div>
<div class="col-4 px-2 pt-0 pb-2 my-0 text-end">
<form method="post">
<input type="hidden" name="command_pid" value="{{ command_pid }}">
<button type="submit" name="action" value="stop_command" class="btn btn-outline-dark btn-sm">
Stop command
</button>
</form>
</div>
</div>
<div class="row px-2">
<div class="col border p-3 overflow-scroll small" id="command_result_data" style="height: 400px;">
</div>
</div>
<p>
Lines received: <span id="lines-received">0</span>
</p>
{%- endif -%}
</main>
<script src="https://code.jquery.com/jquery-3.6.2.min.js" integrity="sha256-2krYZKh//PcchRtd+H+VyyQoZ/e3EcrkxhM8ycwASPA=" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4" crossorigin="anonymous"></script>
{%- set get_command_result_data_url = '' -%}
{%- if command_qid -%}
{%- set get_command_result_data_url = url_for('get_command_result_data', command_qid=command_qid) -%}
<script>
var build_result_update_secs = 1000;
var lines_received = 0;
function get_command_result_data(){
var url, data, box, li, i, stream, line, exit_code_received = false;
url = '{{ get_command_result_data_url }}';
console.log('url = ' + url);
if(url == ''){
return;
}
$.ajax({
url: url,
type: 'GET',
dataType: 'json',
success: function(rcvd){
data = rcvd.data;
lines_received += data.lines.length;
box = $('#command_result_data');
for(i = 0; i < data.lines.length; i++){
li = data.lines[i];
stream = li.stream;
if(stream == 'stdout'){
line = li.line;
}else if(stream == 'stderr'){
line = 'stderr: ' + li.line;
}else{
line = stream + ': ' + li.line;
}
box.append(line + '<br>');
}
if(data.errors){
data.errors.forEach(function(li, i){
box.append('ERROR: ' + li + '<br>');
});
}
if(data.ready){
box.append('Ready' + '<br>');
}else{
setTimeout(get_command_result_data, build_result_update_secs);
}
box.scrollTop(box.prop('scrollHeight'));
$('#lines-received').text(lines_received);
}
});
}
$(document).ready(function(){
setTimeout(get_command_result_data, build_result_update_secs);
});
</script>
{%- endif -%}
</body>
</html>
app/templates/start_command.html
{# app/templates/start_command.html #}
{% extends "base.html" %}
{% block main %}
<h2 class="mb-3">
{{ page_title }}
</h2>
{%- if error_message -%}
<p class="text-danger">
{{ error_message }}
</p>
{%- endif -%}
<form method="post">
<input type="hidden" name="command_pid" value="{{ command_pid }}">
<select name="command" class="form-select" aria-label="Select command">
<option value="pwd">
pwd
</option>
<option value="ls -lR">
ls -lR
</option>
<option value="ps -Af">
ps -Af
</option>
<option value="someunknowncommand">
someunknowncommand
</option>
<option value="tail --lines=5000 /var/log/syslog">
tail --lines=5000 /var/log/syslog
</option>
<option value="tail -f /var/log/syslog">
tail -f /var/log/syslog
</option>
<option value="docker">
docker
</option>
</select>
<button type="submit" name="action" value="start_command" class="btn btn-primary my-3">
Start command
</button>
</form>
{%- endblock -%}
Запуск проекта
Запустите приложение, перейдя в каталог проекта и набрав:
python run.py
Затем направьте браузер на:
http://127.0.0.1:5050
Должна появиться страница. Выберите команду и наблюдайте за выводом.
Резюме
Как всегда, это заняло больше времени, чем ожидалось. Сначала я передал команду в виде строки в subprocess. Команды 'pwd' и 'ls' сработали, но 'ls -l' выдала сообщение:
FileNotFoundError, (2, 'No such file or directory')
После использования shlex эта ошибка исчезла.
Другая трудность заключалась в том, чтобы определить, когда мы получили все данные из subprocess. Сначала я ждал, пока закроются оба stdout и stderr . Но это иногда не срабатывало. В качестве последнего средства, мы ждем завершения subprocess , добавляем небольшую задержку в две секунды и считаем это окончанием потоков.
Ссылки / кредиты
How to continuously display Python output in a Webpage?
https://stackoverflow.com/questions/15092961/how-to-continuously-display-python-output-in-a-webpage
Python - shlex - Simple lexical analysis
https://docs.python.org/3/library/shlex.html
Python - subprocess - Popen
https://docs.python.org/3/library/subprocess.html#subprocess.Popen
Python read from subprocess stdout and stderr separately while preserving order
https://stackoverflow.com/questions/31833897/python-read-from-subprocess-stdout-and-stderr-separately-while-preserving-order
Подробнее
Flask Multiprocessing
Недавний
- Скрытие первичных ключей базы данных UUID вашего веб-приложения
- Don't Repeat Yourself (DRY) с Jinja2
- SQLAlchemy, PostgreSQL, максимальное количество строк для user
- Показать значения в динамических фильтрах SQLAlchemy
- Безопасная передача данных с помощью шифрования Public Key и pyNaCl
- rqlite: альтернатива dist с высокой степенью готовности и SQLite
Большинство просмотренных
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb
- Подключение к службе на хосте Docker из контейнера Docker
- Использование PyInstaller и Cython для создания исполняемого файла Python
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов
- Flask Удовлетворительный запрос API проверка параметров запроса с помощью схем Маршмэллоу