Python Multiprocessing "graceful" uitschakeling in de juiste volgorde
Eerst wordt het worker() proces gestopt, vervolgens worden de gegevens van het result_queue() proces gespoeld en gestopt.
Voor een nieuw project had ik een deamon proces nodig dat veel, min of meer identieke, operaties moet uitvoeren op verschillende bronnen. In dit geval is de operatie IO gebonden en ik heb het opgelost door ThreadPoolExecutor te gebruiken. Zo ver, zo goed.
Vervolgens wilde ik de resultaten opslaan in bestanden. Natuurlijk gebruiken we een wachtrij om te communiceren tussen processen. Het worker() proces gebruikt q.put() om items aan de wachtrij toe te voegen en het result_queue() proces gebruikt q.get() om de items uit de wachtrij te halen.
Mijn result_queue() proces buffert ook de ontvangen resultaten. Zodra een drempel is bereikt, worden alle resultaten in een keer naar een bestand geschreven.
En hier is het probleem. Als je op CTRL-C drukt of een SIGTERM signaal stuurt dan worden de processen abrupt beëindigd. Dit betekent:
- De middelen die door de worker() werden benaderd, kunnen in een slechte toestand worden achtergelaten
- De result_queue() kan veel resultaten bevatten die niet bewaard zullen worden
Niet goed! Hieronder laat ik zien hoe ik dit heb opgelost. Zoals altijd gebruik ik Ubuntu (20.04).
Gebruik gebeurtenissen om de kind-processen te stoppen
Pagina's op het internet over Python en Multiprocessing zijn vaak verwarrend, vooral omdat velen verwijzen naar Python2. Het goede is dat je gedwongen wordt om de officiële documentatie te lezen ...
Er zijn twee manieren waarop processen (betrouwbaar) met elkaar kunnen communiceren:
- wachtrijen
- gebeurtenissen
In dit geval, om processen te stoppen, gebruiken we events. We noemen onze events stop-events omdat ze het kindproces vragen te stoppen. Het werkt als volgt:
- Maak een gebeurtenis (object)
- Geef de gebeurtenis door als een parameter wanneer het kind-proces wordt gestart
- In het kind proces: controleer de gebeurtenis en stop als de gebeurtenis 'is_set()'
- In het hoofdproces: 'set()' de gebeurtenis als het kind moet worden gestopt
Vervangen handlers voor SIGINT / SIGTERM
Hier is de truc die alles laat werken:
Wanneer je een kindproces spawnt, erft dit proces ook de signaalhandlers van de ouder.
We willen niet dat onze kind-processen reageren op CTRL-C (SIGINT signaal) of het SIGTERM signaal. In plaats daarvan willen we dat het hoofdproces beslist hoe de kinderen gestopt worden. Dit betekent dat we:
- De SIGINT en SIGTERM handlers voor de kind processen vervangen door dummy handlers. Deze handlers doen niets, wat betekent dat de kind-processen niet beëindigd worden door deze signalen.
- Vervang de SIGINT en SIGTERM handlers van het hoofdproces door onze eigen versie. Deze handlers vertellen het hoofdproces dat het moet afsluiten. Dit wordt gedaan met behulp van een hoofd stop gebeurtenis (object)
Met dit schema controleert het hoofdproces de main stop event. Indien 'is_set()' gebruikt het 'set()' om kind-proces X te informeren om te stoppen. Omdat we stop gebeurtenissen kunnen doorgeven aan alle kind-processen, heeft het hoofd-proces volledige controle. Het kan wachten tot een kind proces is gestopt, voordat het een ander kind proces stopt.
De code
Er zijn drie processen:
- main()
- (kind) worker(), kan meerdere threads hebben
- (kind) result_queue()
Als een worker() thread klaar is, stuurt hij het resultaat naar result_queue(). Om sierlijk te stoppen moeten we eerst het worker() proces vragen te stoppen, en zodra het gestopt is, moeten we het result_queue() proces vragen te stoppen.
Ik denk dat de code niet moeilijk te lezen is. Nadat de kind-processen zijn gestart, controleert het hoofd-proces in een lus of er wordt gevraagd om te stoppen.
Het enige dat ik je niet verteld heb is dat ik wat code heb toegevoegd om een shutdown te forceren na een gespecificeerd aantal seconden in geval van een of ander probleem.
# procsig.py
import logging
import multiprocessing
import os
import signal
import sys
import time
def init_logger():
logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
formatter = logging.Formatter(logger_format)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
logger = init_logger()
# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()
def main_sigint_handler(signum, frame):
logger.debug('')
main_stop_event.set()
def main_sigterm_handler(signum, frame):
logger.debug('')
main_stop_event.set()
# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
logger.debug('')
def children_sigterm_handler(signum, frame):
logger.debug('')
def worker(rq, stop_event):
i = 0
while True:
if stop_event.is_set():
break
logger.debug('worker: {}'.format(i))
time.sleep(1)
i += 1
logger.debug('worker STOPPING ...')
time.sleep(1)
logger.debug('worker STOPPED')
def result_queue(rq, stop_event):
i = 0
while True:
if stop_event.is_set():
break
logger.debug('result_queue: {}'.format(i))
time.sleep(1)
i += 1
logger.debug('result_queue: STOPPING')
time.sleep(1)
logger.debug('result_queue: STOPPED')
def main():
# events
global main_stop_event
worker_stop_event = multiprocessing.Event()
result_queue_stop_event = multiprocessing.Event()
# setup result queue
rq = multiprocessing.Queue()
# children: capture sigint/sigterm
signal.signal(signal.SIGINT, children_sigint_handler)
signal.signal(signal.SIGTERM, children_sigterm_handler)
# start worker
worker_proc = multiprocessing.Process(
name='worker',
target=worker,
kwargs={'rq': rq, 'stop_event': worker_stop_event},
)
worker_proc.daemon = True
worker_proc.start()
# start result_queue
result_queue_proc = multiprocessing.Process(
name='result_queue',
target=result_queue,
kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
)
result_queue_proc.daemon = True
result_queue_proc.start()
# main: capture sigint/sigterm
signal.signal(signal.SIGINT, main_sigint_handler)
signal.signal(signal.SIGTERM, main_sigterm_handler)
child_procs = [worker_proc, result_queue_proc]
worker_stop_sent = False
result_queue_stop_sent = False
max_allowed_shutdown_seconds = 10
shutdown_et = None
keep_running = True
while keep_running:
if shutdown_et is not None and shutdown_et < int(time.time()):
logger.debug('Forced shutdown ...')
break
run_count = 0
for p in child_procs:
logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
if p.is_alive():
run_count += 1
# send 'stop' to result_queue()?
if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
logger.debug('Sending stop to result_queue ...')
result_queue_stop_event.set()
result_queue_stop_sent = True
# send 'stop' to worker()?
if main_stop_event.is_set() and not worker_stop_sent:
logger.debug('Sending stop to worker ...')
worker_stop_event.set()
worker_stop_sent = True
shutdown_et = int(time.time()) + max_allowed_shutdown_seconds
time.sleep(1)
if run_count == 0:
keep_running = False
logger.debug('terminating children ...')
try:
worker_proc.terminate()
result_queue_proc.terminate()
worker_proc.kill()
result_queue_proc.terminate()
except Exception as e:
# who cares
logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))
logger.debug('Done')
if __name__=='__main__':
main()
Om dit te testen, druk op CTRL-C, of open een ander terminal venster en beëindig dit proces:
pkill -f procsig
Log resultaat
Je kunt dit zelf uitproberen, hieronder laat ik je zien wat er op de console wordt afgedrukt. Om '19:28:37' druk ik op CTRL-C. Onmiddellijk worden de signal handlers aangeroepen. Het main() proces stuurt een stop bericht naar het worker() proces, door 'set()' te gebruiken. Zodra het worker() proces gestopt is, door het controleren van 'is_alive()', wordt het result_queue() proces een stop-bericht gezonden.
2021-06-16 19:28:34,238 DEBUG [ worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG [ worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG [ result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG [ worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG [ result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG [ worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG [ result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG [ main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG [ children_sigint_handler():170]
2021-06-16 19:28:37,598 DEBUG [ children_sigint_handler():170]
2021-06-16 19:28:37,598 DEBUG [ main_sigint_handler():160]
2021-06-16 19:28:38,240 DEBUG [ worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG [ result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG [ main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG [ worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG [ result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG [ main():259] [P> worker - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG [ worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG [ result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG [ main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG [ result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG [ main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG [ result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG [ main():259] [P> worker - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG [ main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG [ main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG [ main():290] Done
Samenvatting
Ik was op zoek naar een manier om processen sierlijk te stoppen met Python Multiprocessing en wilde ook de volgorde controleren waarin processen gestopt werden. Ik heb dit opgelost door gebruik te maken van events en de signal handlers te vervangen. Dit geeft me totale controle in het hoofdproces. Ik kan nu vragen om proces B te stoppen nadat proces A gestopt is.
Links / credits
concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html
Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html
Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/
Lees meer
Multiprocessing
Recent
- Database UUID primaire sleutels van je webapplicatie verbergen
- Don't Repeat Yourself (DRY) met Jinja2
- SQLAlchemy, PostgreSQL, maximum aantal rijen per user
- Toon de waarden in SQLAlchemy dynamische filters
- Veilige gegevensoverdracht met Public Key versleuteling en pyNaCl
- rqlite: een alternatief voor SQLite met hoge beschikbaarheid en distributed
Meest bekeken
- Met behulp van Python's pyOpenSSL om SSL-certificaten die van een host zijn gedownload te controleren
- Gebruik van UUIDs in plaats van Integer Autoincrement Primary Keys met SQLAlchemy en MariaDb
- Maak verbinding met een dienst op een Docker host vanaf een Docker container
- PyInstaller en Cython gebruiken om een Python executable te maken
- SQLAlchemy: Gebruik van Cascade Deletes om verwante objecten te verwijderen
- Flask RESTful API verzoekparametervalidatie met Marshmallow-schema's