Python Multiprocessing изящное завершение работы в правильном порядке
Сначала останавливается процесс worker(), затем данные процесса result_queue() промываются и останавливаются.
Для нового проекта мне понадобился процесс deamon, который должен выполнять множество более или менее одинаковых операций на различных ресурсах. В данном случае операция связана с IO, и я решил эту проблему с помощью ThreadPoolExecutor. Пока все хорошо.
Далее я хотел хранить результаты в файлах. Конечно, мы используем очередь для связи между процессами. Процесс worker() использует q.put() для добавления элементов в очередь, а процесс result_queue() использует q.get() для получения элементов из очереди.
Мой процесс result_queue() также буферизирует полученные результаты. Как только достигается порог, все результаты сразу записываются в файл.
И вот в чем проблема. Когда вы нажимаете CTRL-C или посылаете сигнал SIGTERM , то процессы резко завершаются. Это означает:
- Ресурсы, к которым обращался worker() , могут быть оставлены в плохом состоянии.
- result_queue() может содержать много результатов, которые не будут сохранены.
Нехорошо! Ниже я покажу, как я решил эту проблему. Как всегда, я использую Ubuntu (20.04).
Использование событий для остановки дочерних процессов
Страницы в интернете о Python и Multiprocessing часто сбивают с толку, в основном потому, что многие ссылаются на Python2. Хорошо то, что вы вынуждены читать официальную документацию ...
Существует два способа, с помощью которых процессы могут взаимодействовать (надежно) друг с другом:
- очереди
- события
В данном случае, чтобы остановить процессы, мы используем события. Мы называем наши события stop-events, потому что они требуют остановки дочернего процесса. Это работает следующим образом:
- Создайте событие (объект)
- Передайте событие в качестве параметра при запуске дочернего процесса.
- В дочернем процессе: проверьте событие и остановитесь, если событие 'is_set()'
- В главном процессе: 'set()' событие, если дочерний процесс должен быть остановлен.
Замените обработчики для SIGINT / SIGTERM
Вот трюк, который заставляет все работать:
Когда вы порождаете дочерний процесс, этот процесс также наследует обработчики сигналов от родительского.
Мы не хотим, чтобы наши дочерние процессы реагировали на CTRL-C (сигнал SIGINT ) или SIGTERM . Вместо этого мы хотим, чтобы главный процесс решал, как будут остановлены дочерние процессы. Это означает, что мы должны:
- Заменить обработчики SIGINT и SIGTERM для дочерних процессов на фиктивные обработчики. Эти обработчики ничего не делают, что означает, что дочерние процессы не завершаются по этим сигналам.
- Замените обработчики SIGINT и SIGTERM главного процесса на нашу собственную версию. Эти обработчики сообщают главному процессу, что он должен выключиться. Это делается с помощью события остановки главного процесса (объект)
При такой схеме главный процесс проверяет событие main stop. Если 'is_set()', он использует 'set()', чтобы сообщить дочернему процессу X о необходимости остановиться. Поскольку мы можем передавать события остановки всем дочерним процессам, главный процесс имеет полный контроль. Он может дождаться остановки дочернего процесса, прежде чем остановить другой дочерний процесс.
Код
Есть три процесса:
- main()
- (дочерний) worker(), может иметь несколько потоков
- (дочерний) result_queue()
Когда поток worker() завершает работу, он отправляет результат в result_queue(). Для изящной остановки мы должны сначала попросить процесс worker() остановиться, а как только он остановился, мы должны попросить остановиться процесс result_queue() .
Думаю, код несложно прочитать. После запуска дочерних процессов главный процесс в цикле проверяет, не запрошена ли остановка.
Единственное, что я не сказал, это то, что я добавил код для принудительного выключения через определенное количество секунд в случае возникновения какой-либо проблемы.
# procsig.py
import logging
import multiprocessing
import os
import signal
import sys
import time
def init_logger():
logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
formatter = logging.Formatter(logger_format)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
logger = init_logger()
# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()
def main_sigint_handler(signum, frame):
logger.debug('')
main_stop_event.set()
def main_sigterm_handler(signum, frame):
logger.debug('')
main_stop_event.set()
# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
logger.debug('')
def children_sigterm_handler(signum, frame):
logger.debug('')
def worker(rq, stop_event):
i = 0
while True:
if stop_event.is_set():
break
logger.debug('worker: {}'.format(i))
time.sleep(1)
i += 1
logger.debug('worker STOPPING ...')
time.sleep(1)
logger.debug('worker STOPPED')
def result_queue(rq, stop_event):
i = 0
while True:
if stop_event.is_set():
break
logger.debug('result_queue: {}'.format(i))
time.sleep(1)
i += 1
logger.debug('result_queue: STOPPING')
time.sleep(1)
logger.debug('result_queue: STOPPED')
def main():
# events
global main_stop_event
worker_stop_event = multiprocessing.Event()
result_queue_stop_event = multiprocessing.Event()
# setup result queue
rq = multiprocessing.Queue()
# children: capture sigint/sigterm
signal.signal(signal.SIGINT, children_sigint_handler)
signal.signal(signal.SIGTERM, children_sigterm_handler)
# start worker
worker_proc = multiprocessing.Process(
name='worker',
target=worker,
kwargs={'rq': rq, 'stop_event': worker_stop_event},
)
worker_proc.daemon = True
worker_proc.start()
# start result_queue
result_queue_proc = multiprocessing.Process(
name='result_queue',
target=result_queue,
kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
)
result_queue_proc.daemon = True
result_queue_proc.start()
# main: capture sigint/sigterm
signal.signal(signal.SIGINT, main_sigint_handler)
signal.signal(signal.SIGTERM, main_sigterm_handler)
child_procs = [worker_proc, result_queue_proc]
worker_stop_sent = False
result_queue_stop_sent = False
max_allowed_shutdown_seconds = 10
shutdown_et = None
keep_running = True
while keep_running:
if shutdown_et is not None and shutdown_et < int(time.time()):
logger.debug('Forced shutdown ...')
break
run_count = 0
for p in child_procs:
logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
if p.is_alive():
run_count += 1
# send 'stop' to result_queue()?
if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
logger.debug('Sending stop to result_queue ...')
result_queue_stop_event.set()
result_queue_stop_sent = True
# send 'stop' to worker()?
if main_stop_event.is_set() and not worker_stop_sent:
logger.debug('Sending stop to worker ...')
worker_stop_event.set()
worker_stop_sent = True
shutdown_et = int(time.time()) + max_allowed_shutdown_seconds
time.sleep(1)
if run_count == 0:
keep_running = False
logger.debug('terminating children ...')
try:
worker_proc.terminate()
result_queue_proc.terminate()
worker_proc.kill()
result_queue_proc.terminate()
except Exception as e:
# who cares
logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))
logger.debug('Done')
if __name__=='__main__':
main()
Чтобы проверить это, нажмите CTRL-C, или откройте другое окно терминала и завершите этот процесс:
pkill -f procsig
Результат журнала
Вы можете попробовать это сами, ниже я покажу, что будет выведено в консоль. В '19:28:37' я нажал CTRL-C. Сразу же вызываются обработчики сигналов. Процесс main() посылает сообщение об остановке процессу worker() , используя 'set()'. Как только процесс worker() остановился, проверив 'is_alive()', процессу result_queue() посылается сообщение об остановке.
2021-06-16 19:28:34,238 DEBUG [ worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG [ worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG [ result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG [ worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG [ result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG [ worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG [ result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG [ main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG [ children_sigint_handler():170]
2021-06-16 19:28:37,598 DEBUG [ children_sigint_handler():170]
2021-06-16 19:28:37,598 DEBUG [ main_sigint_handler():160]
2021-06-16 19:28:38,240 DEBUG [ worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG [ result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG [ main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG [ worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG [ result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG [ worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG [ result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG [ main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG [ result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG [ result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG [ main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG [ main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG [ main():290] Done
Резюме
Я искал способ изящной остановки процессов с помощью Python Multiprocessing , а также хотел контролировать порядок остановки процессов. Я решил эту проблему, используя события и заменив обработчики сигналов. Это дает мне полный контроль над главным процессом. Теперь я могу попросить остановить процесс B после остановки процесса A.
Ссылки / кредиты
concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html
Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html
Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/
Подробнее
Multiprocessing
Недавний
- Скрытие первичных ключей базы данных UUID вашего веб-приложения
- Don't Repeat Yourself (DRY) с Jinja2
- SQLAlchemy, PostgreSQL, максимальное количество строк для user
- Показать значения в динамических фильтрах SQLAlchemy
- Безопасная передача данных с помощью шифрования Public Key и pyNaCl
- rqlite: альтернатива dist с высокой степенью готовности и SQLite
Большинство просмотренных
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb
- Подключение к службе на хосте Docker из контейнера Docker
- Использование PyInstaller и Cython для создания исполняемого файла Python
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов
- Flask Удовлетворительный запрос API проверка параметров запроса с помощью схем Маршмэллоу