Multiprocessing, vergrendeling van bestanden, SQLite en testen
Testen op concurrency problemen is moeilijker en kost meer tijd, maar je kunt niet zonder.
Ik werkte aan een project met SQLAlchemy en PostgreSQL. Voor een paar tabellen wilde ik het aantal rijen per user beperken, en deed dit door een PostgreSQL controlefunctie en trigger toe te voegen.
Bij handmatig testen bleek alles goed te werken, maar wat als een user meerdere processen zou starten en rijen zou toevoegen op precies hetzelfde moment? Ik heb de 'pg_advisory_xact_lock' toegevoegd maar zal dit echt werken? Heb ik de documentatie wel goed begrepen?
In deze post laat ik een universele TaskRunner klasse zien die gebruikt kan worden voor het testen van gelijktijdige (concurrent) acties. Als testcase gebruiken we een SQLite database die we met aparte processen schrijven.
We starten alle processen vanuit één proces. In dit geval kunnen we Multiprocessing.Lock() gebruiken om de toegang tot SQLite te controleren. Maar ik heb ook een bestandslocker geïmplementeerd die gebruikt kan worden als we volledig onafhankelijke processen hebben.
Zoals altijd draai ik dit op Ubuntu 22.04.
Acties tegelijk starten
In onze testopstelling gebruiken we Multiprocessing.Event() om alle processen te laten wachten op dezelfde regel in de taakcode, één regel voor de 'kritieke actie'. Vervolgens, als alle processen dit punt bereikt hebben, 'laten we de processen vrij' en kijken we wat er gebeurt.
stop & release
|
v
task1 |--------------------->|-------->
task2 |----------------->|-------->
task3 |--------------->|-------->
|
taskN |-------->|-------->
--------------------------------------> t
In de klasse TaskRunner:
class TaskRunner:
...
def run_parallel_tasks(self, parallel_tasks_count):
...
self.mp_event = multiprocessing.Event()
...
for task_no in range(parallel_tasks_count):
p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
...
# release waiting all processes
time.sleep(self.release_time)
self.mp_event.set()
...
In onze taakfunctie:
def task(task_runner, task_no):
...
# all tasks will wait here
task_runner.mp_event.wait()
# critical action
...
Het verhogen van een SQLite tabelveld.
In onze test proberen de taken (processen) gelijktijdig een SQLite tabelveld, 'counter',
te verhogen door:
- de veldwaarde te lezen
- te verhogen
- het veld bij te werken
Als we 100 taken hebben, moet het resultaat in het tabelveld 100 zijn. Elke andere waarde is fout.
Vergrendeling
Een taak kan de incrementoperatie niet betrouwbaar uitvoeren zonder exclusieve toegang te krijgen tot SQLite. Hier gebruiken we een extern slot van SQLite.
We kunnen dist het volgende onderscheiden:
- De (gelijktijdige) taken worden gestart door één proces.
- De (gelijktijdige) taken zijn onafhankelijk
In het eerste geval kunnen we Multiprocessing.Lock() gebruiken en dit slot delen tussen al onze taken. Voor testdoeleinden is dit prima.
Het tweede geval is een realistischer scenario. We kunnen hier niet Multiprocessing.Lock() gebruiken, maar wel Linux file locking. Dit is snel en betrouwbaar.
Vergrendelen - Multiprocessing.Lock()
Ik wil Multiprocessing.Lock() gebruiken als context manager. Helaas kunnen we dan geen time-out opgeven. Dit betekent dat we de context manager zelf moeten schrijven:
# multiprocessing locker context manager with timeout
class mp_locker:
def __init__(
self,
mp_lock=None,
timeout=10,
):
self.mp_lock = mp_lock
self.timeout = timeout
def __enter__(self):
self.mp_lock.acquire(timeout=self.timeout)
def __exit__(self, exc_type, exc_value, exc_tb):
self.mp_lock.release()
Vergrendeling - Bestandsvergrendeling
Er zijn veel voorbeelden op het internet te vinden over hoe dit te doen. Ook hier wil ik dit gebruiken als context manager. Hier laat ik alleen de '__enter__()' methode zien.
# file locker context manager
...
def __enter__(self):
while True:
if (time.time() - ts) > self.timeout:
raise Exception('pid = {}: acquire lock timeout')
try:
self.lock_file_fo = open(self.lock_file, 'a')
fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError as e:
# another process locked the file, keep trying
time.sleep(self.wait_secs)
# propagate other exceptions
We blijven in de 'while-lus' totdat we het slot verkrijgen of er een time-out optreedt.
De klasse TaskRunner
De TaskRunner bevat alle logica om meerdere taken (processen) te starten.
Functies:
- before_tasks()
- taak()
- na_taken()
- resultaat_ok()
- na_resultaat()
Opties:
- Aantal gelijktijdige taken.
- Aantal keren herhalen.
- Wachtende taken release-time (na start).
- Logging niveau.
- Multiprocessing.Lock() vergrendeling, of bestandsvergrendeling.
- Time-out vergrendeling.
Belangrijk: Al je functies worden aangeroepen met het TaskRunner object als eerste parameter. Dit betekent dat je toegang hebt tot TaskRunner attributen en methodes zoals:
- get_lock()
- get_logger()
De code
De code bestaat uit de volgende onderdelen:
- TaskRunner klasse en ondersteunende klassen
- Je taakfuncties
- TaskRunner instatiation met je parameters
Wanneer je de code uitvoert, is de output zoiets als:
INFO counter = 100 <- final value
INFO ready in 2.0454471111297607 seconds
Hier is de code voor het geval je het zelf wilt proberen:
import fcntl
import logging
import multiprocessing
import os
import sys
import time
import sqlite3
class DummyLogger:
def __getattr__(self, name):
return lambda *args, **kwargs: None
# file locker context manager
class f_locker:
def __init__(
self,
lock_file=None,
timeout=10,
logger=DummyLogger(),
wait_secs=.01,
):
self.lock_file = lock_file
self.timeout = timeout
self.logger = logger
self.wait_secs = wait_secs
# keep lock_file opened
self.lock_file_fo = None
def __enter__(self):
pid = os.getpid()
ts = time.time()
while True:
self.logger.debug('pid = {}: trying to acquire lock ...'.format(pid))
if (time.time() - ts) > self.timeout:
raise Exception('pid = {}: acquire lock timeout')
# keep trying until lock or timeout
try:
self.lock_file_fo = open(self.lock_file, 'a')
fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.logger.debug('pid = {}: lock acquired'.format(pid))
break
except BlockingIOError as e:
# another process locked the file, keep trying
self.logger.debug('pid = {}: cannot acquire lock'.format(pid))
time.sleep(self.wait_secs)
# propagate other exceptions
return True
def __exit__(self, exc_type, exc_value, exc_tb):
self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
pid = os.getpid()
self.logger.debug('pid = {}: trying to release lock ...'.format(pid))
fcntl.flock(self.lock_file_fo, fcntl.LOCK_UN)
self.logger.debug('pid = {}: lock released ...'.format(pid))
# multiprocessing locker context manager with timeout
class mp_locker:
def __init__(
self,
mp_lock=None,
timeout=10,
logger=DummyLogger(),
):
self.mp_lock = mp_lock
self.timeout = timeout
self.logger = logger
def __enter__(self):
self.pid = os.getpid()
self.logger.debug('pid = {}: trying to acquire lock ...'.format(self.pid))
self.mp_lock.acquire(timeout=self.timeout)
self.logger.debug('pid = {}: lock acquired'.format(self.pid))
def __exit__(self, exc_type, exc_value, exc_tb):
self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
self.logger.debug('pid = {}: trying to release lock ...'.format(self.pid))
self.mp_lock.release()
self.logger.debug('pid = {}: lock released ...'.format(self.pid))
class TaskRunner:
def __init__(
self,
loop_count=1,
parallel_tasks_count=1,
release_time=1.,
# functions
func_before_tasks=None,
func_task=None,
func_after_tasks=None,
func_result_ok=None,
func_after_result=None,
# logging
logger_level=logging.DEBUG,
# locking
lock_timeout=10,
use_file_locking=False,
lock_file='./lock_file',
lock_wait_secs=.01,
):
self.loop_count = loop_count
self.parallel_tasks_count = parallel_tasks_count
self.release_time = release_time
# functions
self.func_before_tasks = func_before_tasks
self.func_task = func_task
self.func_after_tasks = func_after_tasks
self.func_result_ok = func_result_ok
self.func_after_result = func_after_result
# logging
self.logger_level = logger_level
# locking
self.lock_timeout = lock_timeout
self.use_file_locking = use_file_locking
self.lock_file = lock_file
self.lock_wait_secs = lock_wait_secs
def get_logger(self, proc_name, logger_level=None):
if logger_level is None:
logger_level = self.logger_level
logger = logging.getLogger(proc_name)
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_logger_format = '%(asctime)s %(proc_name)-8.8s %(levelname)-8.8s [%(filename)-20s%(funcName)20s():%(lineno)03s] %(message)s'
console_handler.setFormatter(logging.Formatter(console_logger_format))
logger.setLevel(logger_level)
logger.addHandler(console_handler)
logger = logging.LoggerAdapter(logger, {'proc_name': proc_name})
return logger
def get_lock(self, timeout=None):
timeout = timeout or self.lock_timeout
if not self.use_file_locking:
return mp_locker(self.mp_lock, timeout=timeout, logger=self.logger)
return f_locker(self.lock_file, timeout=timeout, wait_secs=self.lock_wait_secs)
def run_parallel_tasks(self, parallel_tasks_count):
# before tasks
if self.func_before_tasks:
self.func_before_tasks(self)
self.mp_lock = multiprocessing.Lock()
self.mp_event = multiprocessing.Event()
tasks = []
for task_no in range(parallel_tasks_count):
p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
p.start()
tasks.append(p)
# release waiting processes
time.sleep(self.release_time)
self.mp_event.set()
# wait for all tasks to complete
for p in tasks:
p.join()
# after tasks
if self.func_after_tasks:
return self.func_after_tasks(self)
return None
def run(
self,
loop_count=None,
parallel_tasks_count=None,
):
self.logger = self.get_logger('main')
if loop_count is not None:
self.loop_count = loop_count
if parallel_tasks_count is not None:
self.parallel_tasks_count = parallel_tasks_count
start_time = time.time()
for loop_no in range(self.loop_count):
self.logger.debug('loop_no = {}'.format(loop_no))
result = self.run_parallel_tasks(self.parallel_tasks_count)
if self.func_result_ok:
if not self.func_result_ok(self, result):
self.logger.error('result = {}'.format(result))
break
else:
self.logger.info('result ok')
if self.func_after_result:
self.func_after_result(self)
run_secs = time.time() - start_time
self.logger.info('ready in {} seconds'.format(run_secs))
# ### YOUR CODE BELOW ### #
def before_tasks(task_runner):
# create a table, insert row with counter = 0
with sqlite3.connect('./test_tasks.db') as conn:
cursor = conn.cursor()
cursor.execute("""DROP TABLE IF EXISTS tasks""")
cursor.execute("""CREATE TABLE tasks (counter INTEGER)""")
cursor.execute("""INSERT INTO tasks (counter) VALUES (0)""")
conn.commit()
def task(task_runner, task_no):
logger = task_runner.get_logger('task' + str(task_no))
pid = os.getpid()
# wait for event
logger.debug('pid = {} waiting for event at {}'.format(pid, time.time()))
task_runner.mp_event.wait()
# wait for lock
lock = task_runner.get_lock()
logger.debug('pid = {} waiting for lock at {}'.format(pid, time.time()))
with lock:
# increment counter field
with sqlite3.connect('./test_tasks.db', timeout=10) as conn:
cursor = conn.cursor()
counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
logger.debug('counter = {}'.format(counter))
counter += 1
cursor.execute("""UPDATE tasks SET counter=?""", (counter,))
conn.commit()
def after_tasks(task_runner):
conn = sqlite3.connect('./test_tasks.db')
cursor = conn.cursor()
counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
task_runner.logger.info('counter = {} <- final value'.format(counter))
def result_ok(task_runner, result):
pass
def after_result(task_runner):
pass
def main():
tr = TaskRunner(
# functions
func_before_tasks=before_tasks,
func_task=task,
func_after_tasks=after_tasks,
#func_result_ok=result_ok,
func_after_result=after_result,
# logging
logger_level=logging.INFO,
# locking
use_file_locking=True,
)
tr.run(
loop_count=1,
parallel_tasks_count=100,
#parallel_tasks_count=2,
)
if __name__ == '__main__':
main()
Samenvatting
We wilden een gemakkelijke manier om gelijktijdige operaties te testen. In het verleden gebruikte ik het Python pakket 'Locust' om concurrency te testen, zie de post 'Using Locust to load test a FastAPI app with concurrent users'. Deze keer wilde ik het klein, flexibel en uitbreidbaar houden.
Daarnaast wilde ik ook een file lock context manager voor meerdere processen. We hebben beide geïmplementeerd, de tests zijn geslaagd. Tijd om terug te gaan naar mijn andere projecten.
Links / credits
Python - fcntl
https://docs.python.org/3/library/fcntl.html
Python - multiprocessing
https://docs.python.org/3/library/multiprocessing.html
Python - SQLite3
https://docs.python.org/3/library/sqlite3.html
Using Locust to load test a FastAPI app with concurrent users
https://www.peterspython.com/en/blog/using-locust-to-load-test-a-fastapi-app-with-concurrent-users
Lees meer
Multiprocessing Testing
Recent
- Database UUID primaire sleutels van je webapplicatie verbergen
- Don't Repeat Yourself (DRY) met Jinja2
- SQLAlchemy, PostgreSQL, maximum aantal rijen per user
- Toon de waarden in SQLAlchemy dynamische filters
- Veilige gegevensoverdracht met Public Key versleuteling en pyNaCl
- rqlite: een alternatief voor SQLite met hoge beschikbaarheid en distributed
Meest bekeken
- Met behulp van Python's pyOpenSSL om SSL-certificaten die van een host zijn gedownload te controleren
- Gebruik van UUIDs in plaats van Integer Autoincrement Primary Keys met SQLAlchemy en MariaDb
- Maak verbinding met een dienst op een Docker host vanaf een Docker container
- PyInstaller en Cython gebruiken om een Python executable te maken
- SQLAlchemy: Gebruik van Cascade Deletes om verwante objecten te verwijderen
- Flask RESTful API verzoekparametervalidatie met Marshmallow-schema's