Multiprocessing, Dateisperren, SQLite und Prüfung
Das Testen auf Gleichzeitigkeitsprobleme ist schwieriger und nimmt mehr Zeit in Anspruch, aber man kann nicht darauf verzichten.
Ich habe an einem Projekt mit SQLAlchemy und PostgreSQL gearbeitet. Für einige Tabellen wollte ich die Anzahl der Zeilen pro user begrenzen und habe dies durch Hinzufügen einer PostgreSQL -Prüffunktion und eines Triggers erreicht.
Beim manuellen Testen schien alles gut zu funktionieren, aber was, wenn ein user mehrere Prozesse starten und Zeilen genau zur gleichen Zeit hinzufügen würde? Ich habe das "pg_advisory_xact_lock" hinzugefügt, aber wird das wirklich funktionieren? Habe ich die Dokumentation richtig verstanden?
In diesem Beitrag zeige ich eine universelle TaskRunner-Klasse, die zum Testen von gleichzeitigen (concurrent) Aktionen verwendet werden kann. Als Testfall verwenden wir eine SQLite Datenbank, die wir mit separaten Prozessen schreiben.
Wir starten alle Prozesse von einem einzigen Prozess aus. In diesem Fall können wir Multiprocessing.Lock() verwenden, um den Zugriff auf SQLite zu kontrollieren. Ich habe aber auch einen File Locker implementiert, der verwendet werden kann, wenn wir völlig unabhängige Prozesse haben.
Wie immer habe ich dies auf Ubuntu 22.04 ausgeführt.
Gleichzeitiger Start von Aktionen
In unserem Testaufbau verwenden wir Multiprocessing.Event(), um alle Prozesse an der gleichen Zeile im Taskcode warten zu lassen, eine Zeile vor der "kritischen Aktion". Wenn alle Prozesse diesen Punkt erreicht haben, geben wir die Prozesse "frei" und sehen, was passiert.
stop & release
|
v
task1 |--------------------->|-------->
task2 |----------------->|-------->
task3 |--------------->|-------->
|
taskN |-------->|-------->
--------------------------------------> t
In der TaskRunner-Klasse:
class TaskRunner:
...
def run_parallel_tasks(self, parallel_tasks_count):
...
self.mp_event = multiprocessing.Event()
...
for task_no in range(parallel_tasks_count):
p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
...
# release waiting all processes
time.sleep(self.release_time)
self.mp_event.set()
...
In unserer Task-Funktion:
def task(task_runner, task_no):
...
# all tasks will wait here
task_runner.mp_event.wait()
# critical action
...
Inkrementieren eines Tabellenfeldes SQLite
In unserem Test versuchen die Tasks (Prozesse) gleichzeitig, ein SQLite -Tabellenfeld, 'counter',
, zu inkrementieren, indem sie
- den Wert des Feldes zu lesen
- Inkrementieren
- das Feld zu aktualisieren
Wenn wir 100 Aufgaben haben, dann muss das Ergebnis im Tabellenfeld 100 sein. Jeder andere Wert ist falsch.
Sperren von
Eine Aufgabe kann die Inkrementierungsoperation nicht zuverlässig durchführen, ohne exklusiven Zugriff auf SQLite zu erhalten. Hier verwenden wir eine Sperre außerhalb von SQLite.
Wir können distdas Folgende unterscheiden:
- Die (konkurrierenden) Aufgaben werden von einem einzigen Prozess gestartet
- Die (konkurrierenden) Aufgaben sind unabhängig
Im ersten Fall können wir Multiprocessing.Lock() verwenden und diese Sperre für alle unsere Aufgaben freigeben. Für Testzwecke ist das in Ordnung.
Der zweite Fall ist eher ein Szenario aus der Praxis. Hier können wir Multiprocessing.Lock() nicht verwenden, aber wir können Linux file locking verwenden. Dies ist schnell und zuverlässig.
Sperren - Multiprocessing.Lock()
Ich möchte Multiprocessing.Lock() als Kontextmanager verwenden. Leider können wir dann keinen Timeout angeben. Das heißt, wir müssen den Kontextmanager selbst schreiben:
# multiprocessing locker context manager with timeout
class mp_locker:
def __init__(
self,
mp_lock=None,
timeout=10,
):
self.mp_lock = mp_lock
self.timeout = timeout
def __enter__(self):
self.mp_lock.acquire(timeout=self.timeout)
def __exit__(self, exc_type, exc_value, exc_tb):
self.mp_lock.release()
Sperren - Dateisperren
Es gibt viele Beispiele im Internet, wie man das machen kann. Auch hier möchte ich dies als Kontextmanager verwenden. Hier zeige ich nur die '__enter__()' Methode.
# file locker context manager
...
def __enter__(self):
while True:
if (time.time() - ts) > self.timeout:
raise Exception('pid = {}: acquire lock timeout')
try:
self.lock_file_fo = open(self.lock_file, 'a')
fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError as e:
# another process locked the file, keep trying
time.sleep(self.wait_secs)
# propagate other exceptions
Wir bleiben in der 'while-Schleife', bis wir die Sperre erhalten oder ein Timeout auftritt.
Die TaskRunner-Klasse
Der TaskRunner enthält die gesamte Logik zum Starten mehrerer Tasks (Prozesse).
Funktionen:
- before_tasks()
- task()
- after_tasks()
- result_ok()
- after_result()
Optionen:
- Anzahl der gleichzeitigen Aufgaben.
- Anzahl der Wiederholungen.
- Freigabezeit der wartenden Aufgaben (nach dem Start).
- Protokollierungsebene.
- Multiprocessing.Lock() Sperren, oder Dateisperren
- Zeitüberschreitung beim Sperren.
Wichtig: Alle Ihre Funktionen werden mit dem TaskRunner-Objekt als erstem Parameter aufgerufen. Das bedeutet, dass Sie Zugriff auf TaskRunner-Attribute und -Methoden haben wie:
- get_lock()
- get_logger()
Der Code
Der Code besteht aus den folgenden Teilen:
- TaskRunner-Klasse und unterstützende Klassen
- Ihre Task-Funktionen
- TaskRunner-Initiation mit Ihren Parametern
Wenn Sie den Code ausführen, sieht die Ausgabe ungefähr so aus:
INFO counter = 100 <- final value
INFO ready in 2.0454471111297607 seconds
Hier ist der Code, falls Sie es selbst versuchen wollen:
import fcntl
import logging
import multiprocessing
import os
import sys
import time
import sqlite3
class DummyLogger:
def __getattr__(self, name):
return lambda *args, **kwargs: None
# file locker context manager
class f_locker:
def __init__(
self,
lock_file=None,
timeout=10,
logger=DummyLogger(),
wait_secs=.01,
):
self.lock_file = lock_file
self.timeout = timeout
self.logger = logger
self.wait_secs = wait_secs
# keep lock_file opened
self.lock_file_fo = None
def __enter__(self):
pid = os.getpid()
ts = time.time()
while True:
self.logger.debug('pid = {}: trying to acquire lock ...'.format(pid))
if (time.time() - ts) > self.timeout:
raise Exception('pid = {}: acquire lock timeout')
# keep trying until lock or timeout
try:
self.lock_file_fo = open(self.lock_file, 'a')
fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.logger.debug('pid = {}: lock acquired'.format(pid))
break
except BlockingIOError as e:
# another process locked the file, keep trying
self.logger.debug('pid = {}: cannot acquire lock'.format(pid))
time.sleep(self.wait_secs)
# propagate other exceptions
return True
def __exit__(self, exc_type, exc_value, exc_tb):
self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
pid = os.getpid()
self.logger.debug('pid = {}: trying to release lock ...'.format(pid))
fcntl.flock(self.lock_file_fo, fcntl.LOCK_UN)
self.logger.debug('pid = {}: lock released ...'.format(pid))
# multiprocessing locker context manager with timeout
class mp_locker:
def __init__(
self,
mp_lock=None,
timeout=10,
logger=DummyLogger(),
):
self.mp_lock = mp_lock
self.timeout = timeout
self.logger = logger
def __enter__(self):
self.pid = os.getpid()
self.logger.debug('pid = {}: trying to acquire lock ...'.format(self.pid))
self.mp_lock.acquire(timeout=self.timeout)
self.logger.debug('pid = {}: lock acquired'.format(self.pid))
def __exit__(self, exc_type, exc_value, exc_tb):
self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
self.logger.debug('pid = {}: trying to release lock ...'.format(self.pid))
self.mp_lock.release()
self.logger.debug('pid = {}: lock released ...'.format(self.pid))
class TaskRunner:
def __init__(
self,
loop_count=1,
parallel_tasks_count=1,
release_time=1.,
# functions
func_before_tasks=None,
func_task=None,
func_after_tasks=None,
func_result_ok=None,
func_after_result=None,
# logging
logger_level=logging.DEBUG,
# locking
lock_timeout=10,
use_file_locking=False,
lock_file='./lock_file',
lock_wait_secs=.01,
):
self.loop_count = loop_count
self.parallel_tasks_count = parallel_tasks_count
self.release_time = release_time
# functions
self.func_before_tasks = func_before_tasks
self.func_task = func_task
self.func_after_tasks = func_after_tasks
self.func_result_ok = func_result_ok
self.func_after_result = func_after_result
# logging
self.logger_level = logger_level
# locking
self.lock_timeout = lock_timeout
self.use_file_locking = use_file_locking
self.lock_file = lock_file
self.lock_wait_secs = lock_wait_secs
def get_logger(self, proc_name, logger_level=None):
if logger_level is None:
logger_level = self.logger_level
logger = logging.getLogger(proc_name)
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_logger_format = '%(asctime)s %(proc_name)-8.8s %(levelname)-8.8s [%(filename)-20s%(funcName)20s():%(lineno)03s] %(message)s'
console_handler.setFormatter(logging.Formatter(console_logger_format))
logger.setLevel(logger_level)
logger.addHandler(console_handler)
logger = logging.LoggerAdapter(logger, {'proc_name': proc_name})
return logger
def get_lock(self, timeout=None):
timeout = timeout or self.lock_timeout
if not self.use_file_locking:
return mp_locker(self.mp_lock, timeout=timeout, logger=self.logger)
return f_locker(self.lock_file, timeout=timeout, wait_secs=self.lock_wait_secs)
def run_parallel_tasks(self, parallel_tasks_count):
# before tasks
if self.func_before_tasks:
self.func_before_tasks(self)
self.mp_lock = multiprocessing.Lock()
self.mp_event = multiprocessing.Event()
tasks = []
for task_no in range(parallel_tasks_count):
p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
p.start()
tasks.append(p)
# release waiting processes
time.sleep(self.release_time)
self.mp_event.set()
# wait for all tasks to complete
for p in tasks:
p.join()
# after tasks
if self.func_after_tasks:
return self.func_after_tasks(self)
return None
def run(
self,
loop_count=None,
parallel_tasks_count=None,
):
self.logger = self.get_logger('main')
if loop_count is not None:
self.loop_count = loop_count
if parallel_tasks_count is not None:
self.parallel_tasks_count = parallel_tasks_count
start_time = time.time()
for loop_no in range(self.loop_count):
self.logger.debug('loop_no = {}'.format(loop_no))
result = self.run_parallel_tasks(self.parallel_tasks_count)
if self.func_result_ok:
if not self.func_result_ok(self, result):
self.logger.error('result = {}'.format(result))
break
else:
self.logger.info('result ok')
if self.func_after_result:
self.func_after_result(self)
run_secs = time.time() - start_time
self.logger.info('ready in {} seconds'.format(run_secs))
# ### YOUR CODE BELOW ### #
def before_tasks(task_runner):
# create a table, insert row with counter = 0
with sqlite3.connect('./test_tasks.db') as conn:
cursor = conn.cursor()
cursor.execute("""DROP TABLE IF EXISTS tasks""")
cursor.execute("""CREATE TABLE tasks (counter INTEGER)""")
cursor.execute("""INSERT INTO tasks (counter) VALUES (0)""")
conn.commit()
def task(task_runner, task_no):
logger = task_runner.get_logger('task' + str(task_no))
pid = os.getpid()
# wait for event
logger.debug('pid = {} waiting for event at {}'.format(pid, time.time()))
task_runner.mp_event.wait()
# wait for lock
lock = task_runner.get_lock()
logger.debug('pid = {} waiting for lock at {}'.format(pid, time.time()))
with lock:
# increment counter field
with sqlite3.connect('./test_tasks.db', timeout=10) as conn:
cursor = conn.cursor()
counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
logger.debug('counter = {}'.format(counter))
counter += 1
cursor.execute("""UPDATE tasks SET counter=?""", (counter,))
conn.commit()
def after_tasks(task_runner):
conn = sqlite3.connect('./test_tasks.db')
cursor = conn.cursor()
counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
task_runner.logger.info('counter = {} <- final value'.format(counter))
def result_ok(task_runner, result):
pass
def after_result(task_runner):
pass
def main():
tr = TaskRunner(
# functions
func_before_tasks=before_tasks,
func_task=task,
func_after_tasks=after_tasks,
#func_result_ok=result_ok,
func_after_result=after_result,
# logging
logger_level=logging.INFO,
# locking
use_file_locking=True,
)
tr.run(
loop_count=1,
parallel_tasks_count=100,
#parallel_tasks_count=2,
)
if __name__ == '__main__':
main()
Zusammenfassung
Wir wollten einen einfachen Weg, um gleichzeitige Operationen zu testen. In der Vergangenheit habe ich das Python -Paket 'Locust' verwendet, um Gleichzeitigkeit zu testen, siehe den Beitrag 'Using Locust to load test a FastAPI app with concurrent users'. Diesmal wollte ich es klein, flexibel und erweiterbar halten.
Außerdem wollte ich auch einen Dateisperrkontextmanager für mehrere Prozesse. Wir haben beides implementiert, die Tests haben bestanden. Zeit, sich wieder meinen anderen Projekten zuzuwenden.
Links / Impressum
Python - fcntl
https://docs.python.org/3/library/fcntl.html
Python - multiprocessing
https://docs.python.org/3/library/multiprocessing.html
Python - SQLite3
https://docs.python.org/3/library/sqlite3.html
Using Locust to load test a FastAPI app with concurrent users
https://www.peterspython.com/en/blog/using-locust-to-load-test-a-fastapi-app-with-concurrent-users
Mehr erfahren
Multiprocessing Testing
Neueste
- Ausblenden der Primärschlüssel der Datenbank UUID Ihrer Webanwendung
- Don't Repeat Yourself (DRY) mit Jinja2
- SQLAlchemy, PostgreSQL, maximale Anzahl von Zeilen pro user
- Anzeige der Werte in den dynamischen Filtern SQLAlchemy
- Sichere Datenübertragung mit Public Key Verschlüsselung und pyNaCl
- rqlite: eine hochverfügbare und distverteilte SQLite -Alternative
Meistgesehen
- Verwendung von Pythons pyOpenSSL zur Überprüfung von SSL-Zertifikaten, die von einem Host heruntergeladen wurden
- Verwendung von UUIDs anstelle von Integer Autoincrement Primary Keys mit SQLAlchemy und MariaDb
- Verbindung zu einem Dienst auf einem Docker -Host von einem Docker -Container aus
- PyInstaller und Cython verwenden, um eine ausführbare Python-Datei zu erstellen
- SQLAlchemy: Verwendung von Cascade Deletes zum Löschen verwandter Objekte
- Flask RESTful API Validierung von Anfrageparametern mit Marshmallow-Schemas