Arrêt gracieux de Python Multiprocessing dans l'ordre approprié
D'abord le processus worker() est arrêté, puis les données du processus result_queue() sont vidées et arrêtées.
Pour un nouveau projet, j'avais besoin d'un processus deamon qui doit exécuter de nombreuses opérations, plus ou moins identiques, sur différentes ressources. Dans ce cas, l'opération est liée aux entrées-sorties et j'ai résolu le problème en utilisant ThreadPoolExecutor. Jusqu'ici, tout va bien.
Ensuite, je voulais stocker les résultats dans des fichiers. Bien sûr, nous utilisons une file d'attente pour communiquer entre les processus. Le processus worker() utilise q.put() pour ajouter des éléments à la file d'attente et le processus result_queue() utilise q.get() pour récupérer les éléments de la file d'attente.
Le processus result_queue() met également en mémoire tampon les résultats reçus. Lorsqu'un seuil est atteint, tous les résultats sont écrits dans un fichier en une seule fois.
Et voici le problème. Lorsque vous appuyez sur CTRL-C ou envoyez un signal SIGTERM , les processus s'arrêtent brusquement. Cela signifie que :
- Les ressources auxquelles accédait le worker() peuvent être laissées dans un mauvais état.
- La result_queue() peut contenir de nombreux résultats qui ne seront pas sauvegardés.
Pas bon ! Je vous montre ci-dessous comment j'ai résolu ce problème. Comme toujours, j'utilise Ubuntu (20.04).
Utiliser les événements pour arrêter les processus enfants
Les pages sur internet concernant Python et Multiprocessing sont souvent confuses, principalement parce que beaucoup font référence à Python2. L'avantage est que vous êtes obligé de lire la documentation officielle...
Il existe deux façons pour les processus de communiquer (de manière fiable) entre eux :
- les files d'attente
- les événements
Dans ce cas, pour arrêter les processus, nous utilisons des événements. Nous appelons nos événements stop-events car ils demandent au processus fils de s'arrêter. Cela fonctionne comme suit :
- Créer un événement (objet)
- Passez l'événement en tant que paramètre lorsque le processus enfant est lancé.
- Dans le processus enfant : vérifier l'événement et arrêter si l'événement 'is_set()'.
- Dans le processus principal : set()' l'événement si l'enfant doit être arrêté.
Remplacer les handlers pour SIGINT / SIGTERM
Voici l'astuce qui fait que tout fonctionne :
Lorsque vous créez un processus enfant, ce processus hérite également des gestionnaires de signaux du parent.
Nous ne voulons pas que nos processus enfants répondent à CTRL-C (signal SIGINT ) ou au signal SIGTERM . Au lieu de cela, nous voulons que le processus principal décide comment les enfants sont arrêtés. Cela signifie que nous devons :
- Remplacer les handlers SIGINT et SIGTERM pour les processus enfants par des handlers factices. Ces gestionnaires ne font rien, ce qui signifie que les processus enfants ne sont pas interrompus par ces signaux.
- Remplacer les gestionnaires SIGINT et SIGTERM du processus principal par notre propre version. Ces gestionnaires indiquent au processus principal qu'il doit s'arrêter. Ceci est fait en utilisant un événement d'arrêt principal (objet).
Avec ce schéma, le processus principal vérifie l'événement d'arrêt principal. Si 'is_set()', il utilise 'set()' pour informer le processus enfant X de s'arrêter. Comme nous pouvons transmettre des événements d'arrêt à tous les processus enfants, le processus principal a un contrôle total. Il peut attendre qu'un processus enfant s'arrête, avant d'arrêter un autre processus enfant.
Le code
Il y a trois processus :
- main()
- (enfant) worker(), peut avoir plusieurs threads.
- (enfant) result_queue()
Quand un thread worker() se termine, il envoie le résultat à result_queue(). Pour un arrêt en douceur, nous devons d'abord demander au processus worker() de s'arrêter, et une fois qu'il s'est arrêté, nous devons demander au processus result_queue() de s'arrêter.
Je pense que le code n'est pas difficile à lire. Une fois que les processus enfants ont été lancés, le processus principal vérifie dans une boucle si un arrêt est demandé.
La seule chose que je ne vous ai pas dit est que j'ai ajouté du code pour forcer un arrêt après un nombre de secondes spécifié en cas de problème.
# procsig.py
import logging
import multiprocessing
import os
import signal
import sys
import time
def init_logger():
logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
formatter = logging.Formatter(logger_format)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
logger = init_logger()
# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()
def main_sigint_handler(signum, frame):
logger.debug('')
main_stop_event.set()
def main_sigterm_handler(signum, frame):
logger.debug('')
main_stop_event.set()
# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
logger.debug('')
def children_sigterm_handler(signum, frame):
logger.debug('')
def worker(rq, stop_event):
i = 0
while True:
if stop_event.is_set():
break
logger.debug('worker: {}'.format(i))
time.sleep(1)
i += 1
logger.debug('worker STOPPING ...')
time.sleep(1)
logger.debug('worker STOPPED')
def result_queue(rq, stop_event):
i = 0
while True:
if stop_event.is_set():
break
logger.debug('result_queue: {}'.format(i))
time.sleep(1)
i += 1
logger.debug('result_queue: STOPPING')
time.sleep(1)
logger.debug('result_queue: STOPPED')
def main():
# events
global main_stop_event
worker_stop_event = multiprocessing.Event()
result_queue_stop_event = multiprocessing.Event()
# setup result queue
rq = multiprocessing.Queue()
# children: capture sigint/sigterm
signal.signal(signal.SIGINT, children_sigint_handler)
signal.signal(signal.SIGTERM, children_sigterm_handler)
# start worker
worker_proc = multiprocessing.Process(
name='worker',
target=worker,
kwargs={'rq': rq, 'stop_event': worker_stop_event},
)
worker_proc.daemon = True
worker_proc.start()
# start result_queue
result_queue_proc = multiprocessing.Process(
name='result_queue',
target=result_queue,
kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
)
result_queue_proc.daemon = True
result_queue_proc.start()
# main: capture sigint/sigterm
signal.signal(signal.SIGINT, main_sigint_handler)
signal.signal(signal.SIGTERM, main_sigterm_handler)
child_procs = [worker_proc, result_queue_proc]
worker_stop_sent = False
result_queue_stop_sent = False
max_allowed_shutdown_seconds = 10
shutdown_et = None
keep_running = True
while keep_running:
if shutdown_et is not None and shutdown_et < int(time.time()):
logger.debug('Forced shutdown ...')
break
run_count = 0
for p in child_procs:
logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
if p.is_alive():
run_count += 1
# send 'stop' to result_queue()?
if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
logger.debug('Sending stop to result_queue ...')
result_queue_stop_event.set()
result_queue_stop_sent = True
# send 'stop' to worker()?
if main_stop_event.is_set() and not worker_stop_sent:
logger.debug('Sending stop to worker ...')
worker_stop_event.set()
worker_stop_sent = True
shutdown_et = int(time.time()) + max_allowed_shutdown_seconds
time.sleep(1)
if run_count == 0:
keep_running = False
logger.debug('terminating children ...')
try:
worker_proc.terminate()
result_queue_proc.terminate()
worker_proc.kill()
result_queue_proc.terminate()
except Exception as e:
# who cares
logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))
logger.debug('Done')
if __name__=='__main__':
main()
Pour tester cela, tapez CTRL-C, ou ouvrez une autre fenêtre de terminal et terminez ce processus :
pkill -f procsig
Résultat du journal
Vous pouvez essayer vous-même, ci-dessous je vous montre ce qui est imprimé dans la console. À '19:28:37', je frappe CTRL-C. Immédiatement, les gestionnaires de signaux sont appelés. Le processus main() envoie un message d'arrêt au processus worker() , en utilisant 'set()'. Une fois le processus worker() arrêté, en vérifiant 'is_alive()', le processus result_queue() reçoit un message d'arrêt.
2021-06-16 19:28:34,238 DEBUG [ worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG [ worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG [ result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG [ worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG [ result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG [ worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG [ result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG [ main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG [ children_sigint_handler():170]
2021-06-16 19:28:37,598 DEBUG [ children_sigint_handler():170]
2021-06-16 19:28:37,598 DEBUG [ main_sigint_handler():160]
2021-06-16 19:28:38,240 DEBUG [ worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG [ result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG [ main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG [ worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG [ result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG [ worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG [ result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG [ main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG [ result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG [ result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG [ main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG [ main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG [ main():290] Done
Résumé
Je cherchais un moyen d'arrêter les processus en douceur avec Python Multiprocessing et je voulais également contrôler l'ordre dans lequel les processus étaient arrêtés. J'ai résolu ce problème en utilisant des événements et en remplaçant les gestionnaires de signaux. Cela me donne un contrôle total sur le processus principal. Je peux maintenant demander l'arrêt du processus B après l'arrêt du processus A.
Liens / crédits
concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html
Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html
Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/
En savoir plus...
Multiprocessing
Récent
- Masquer les clés primaires de la base de données UUID de votre application web
- Don't Repeat Yourself (DRY) avec Jinja2
- SQLAlchemy, PostgreSQL, nombre maximal de lignes par user
- Afficher les valeurs des filtres dynamiques SQLAlchemy
- Transfert de données sécurisé grâce au cryptage à Public Key et à pyNaCl
- rqlite : une alternative à haute disponibilité et dist distribuée SQLite
Les plus consultés
- Utilisation des Python's pyOpenSSL pour vérifier les certificats SSL téléchargés d'un hôte
- Utiliser UUIDs au lieu de Integer Autoincrement Primary Keys avec SQLAlchemy et MariaDb
- Connexion à un service sur un hôte Docker à partir d'un conteneur Docker
- Utiliser PyInstaller et Cython pour créer un exécutable Python
- SQLAlchemy : Utilisation de Cascade Deletes pour supprimer des objets connexes
- Flask RESTful API validation des paramètres de la requête avec les schémas Marshmallow