SQLAlchemy, PostgreSQL, maximale Anzahl von Zeilen pro user
Verwenden Sie eine PostgreSQL function und eine PostgreSQL trigger , um die maximale Anzahl der Zeilen pro user zu begrenzen.
Sie haben eine Multi-user -Anwendung mit SQLAlchemy und PostgreSQL und möchten die Anzahl der Zeilen pro user einer bestimmten Tabelle begrenzen. Zum Beispiel kann jede user maximal fünf Einträge haben.
Sie benötigen eine Operation wie:
- Sperren Sie die Tabelle
- Zählen Sie die Anzahl der Beiträge der user
- Wenn die Anzahl kleiner als fünf ist:
- Neuen Beitrag hinzufügen
- Entsperren Sie die Tabelle
- Else:
- Entsperre die Tabelle
- Ausnahme generieren
Wir können dies nur mit SQLAlchemy tun, aber das ist eine Menge Code und nicht sehr schnell.
Ein besserer Weg ist es, dies mit zu implementieren:
- A PostgreSQL function, und
- A PostgreSQL trigger
Der PostgreSQL function wird zur Durchführung der Operation verwendet, und der PostgreSQL trigger ruft den PostgreSQL function bei einer INSERT -Operation auf.
Die PostgreSQL function und PostgreSQL trigger befinden sich auf dem Datenbankserver. Wir laden sie einmal mit SQLAlchemy auf den Datenbankserver hoch. Das bedeutet, dass die Abfragen des PostgreSQL function auf dem Datenbankserver laufen.
Wie immer mache ich das auf Ubuntu 22.04.
Der Code
Ich werde hier nicht viel erklären, ich habe ein funktionierendes Beispiel erstellt, es ist alles im Code kommentiert, siehe unten.
In dem Beispiel haben wir Bürger und Autos. Ein Bürger kann eine maximale Anzahl von zwei Autos haben. Diese Anzahl wird in einer anderen (Konfigurations-)Tabelle gespeichert.
Zuerst fügen wir einem Bürger zwei Autos hinzu. Wenn wir dann versuchen, ein drittes Auto hinzuzufügen, wird eine Ausnahme erzeugt. Beachten Sie, dass diese Ausnahme eine PostgreSQL -Ausnahme ist. In SQLAlchemy ist sie in 'e.orig' verfügbar.
...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
e_orig = getattr(e.orig, None)
if e_orig is not None:
# postgresql exception
Sie entscheiden, welche Informationen in der Ausnahme enthalten sind. Hier sieht die Ausnahme wie folgt aus:
CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4
Um das Testen zu vereinfachen, werden die Tabellen und die PostgreSQL function und PostgreSQL trigger bei jedem Lauf erstellt.
Um das Beispiel auszuführen, erstellen Sie eine Datenbank PostgreSQL . Erstellen Sie dann eine virtual environment und installieren Sie das Folgende:
pip install SQLAlchemy
pip install psycopg2-binary
Hier ist der Code, vergessen Sie nicht, die Datenbankparameter hinzuzufügen:
# max_num_rows.py
import logging
import sys
import uuid
from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg
# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)
# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''
connection_uri = ''.join([
'postgresql+psycopg2://',
DB_USER + ':' + DB_PASSWORD,
'@',
DB_HOST + ':' + str(DB_PORT),
'/',
DB_NAME,
])
engine = sa.create_engine(connection_uri)
# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
max_rows INTEGER;
row_count INTEGER;
lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
-- get advisory lock to serialize access to the check
PERFORM pg_advisory_xact_lock(hashtext(lock_key));
-- get the maximum number of cars allowed per citizen
SELECT COALESCE(
(
SELECT max_cars
FROM config
),
0) INTO max_rows;
-- get current number of cars for this citizen
SELECT COUNT(*) INTO row_count
FROM car
WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;
-- we cannot insert if max_cars is reached
IF row_count >= max_rows THEN
-- release lock before raising an exception
PERFORM pg_advisory_unlock(hashtext(lock_key));
RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
END IF;
-- release lock
PERFORM pg_advisory_unlock(hashtext(lock_key));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""
# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""
# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""
# define tables
Base = declarative_base()
class CommonFields(object):
# Base - start
id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
deleted_on = sa.Column(sa.DateTime)
class Config(CommonFields, Base):
__tablename__ = 'config'
max_cars = sa.Column(sa.Integer)
def __repr__(self):
return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'
class Citizen(CommonFields, Base):
__tablename__ = 'citizen'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: cars
cars = relationship(
'Car',
back_populates='citizen',
)
def __repr__(self):
return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'
class Car(CommonFields, Base):
__tablename__ = 'car'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: citizen
citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
citizen = relationship(
'Citizen',
back_populates='cars',
)
def __repr__(self):
return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'
print(f'create tables')
must_create = True
if must_create:
print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
sys.exit() # remove this line to continue
Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(engine)
print('create session')
Session = sessionmaker(bind=engine)
db = Session()
def run_sql(sql):
stmt = sa.text(sql)
try:
db.execute(stmt)
except Exception as e:
exception = type(e).__name__
print(f'run_sql exception = {exception}, e.args = {e.args}')
sys.exit()
if must_create:
print('drop function & trigger')
run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('create function & trigger')
run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('add config ...')
config = Config(max_cars=2)
print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')
db.add(config)
db.add_all([john, jane, bob, alice])
print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')
# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]
db.commit()
db.flush()
print('show citizens and cars ...')
stmt = sa.select(Citizen).\
order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()
for i, citizen in enumerate(citizens):
print(f'citizen[{i}] {citizen}')
print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('remove a car from john ...')
print('##################################################################')
db.delete(johns_toyota)
db.commit()
print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')
stmt = sa.select(sa.func.count(Car.id)).\
where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')
print('ready')
Auflisten von Funktionen und Triggern
In pgAdmin ist das ganz einfach. Navigieren Sie zu:
Schemas -> Tables -> car -> Triggers
Von hier aus können Sie auch den Quellcode einsehen.
Sie können auch Abfragen verwenden. Um eine Liste aller Funktionen zu erhalten, führen Sie die Abfrage aus:
SELECT proname, prosrc
FROM pg_proc;
Oder, um nur Ihre eigenen Funktionen zu sehen, die das Wort "check" enthalten:
SELECT proname, prosrc
FROM pg_proc
WHERE
proname LIKE '%check%'
Um eine Liste aller Auslöser zu erhalten, führen Sie die Abfrage aus:
SELECT tgname FROM pg_trigger;
Zusammenfassung
Die Verwendung von PostgreSQL functions und Triggern ist nicht besonders schwierig, aber man muss sich wirklich etwas Zeit nehmen, um ein Testskript zu erstellen, mit dem man leicht spielen kann. Die Ausnahme PostgreSQL lässt sich leicht aus der Ausnahme Python extrahieren. Schließlich ist SQLAlchemy ein (meistens) großartiges Werkzeug. Viel Spaß damit!
Links / Impressum
Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql
PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html
Mehr erfahren
PostgreSQL SQLAlchemy
Neueste
- Ausblenden der Primärschlüssel der Datenbank UUID Ihrer Webanwendung
- Don't Repeat Yourself (DRY) mit Jinja2
- SQLAlchemy, PostgreSQL, maximale Anzahl von Zeilen pro user
- Anzeige der Werte in den dynamischen Filtern SQLAlchemy
- Sichere Datenübertragung mit Public Key Verschlüsselung und pyNaCl
- rqlite: eine hochverfügbare und distverteilte SQLite -Alternative
Meistgesehen
- Verwendung von Pythons pyOpenSSL zur Überprüfung von SSL-Zertifikaten, die von einem Host heruntergeladen wurden
- Verwendung von UUIDs anstelle von Integer Autoincrement Primary Keys mit SQLAlchemy und MariaDb
- Verbindung zu einem Dienst auf einem Docker -Host von einem Docker -Container aus
- PyInstaller und Cython verwenden, um eine ausführbare Python-Datei zu erstellen
- SQLAlchemy: Verwendung von Cascade Deletes zum Löschen verwandter Objekte
- Flask RESTful API Validierung von Anfrageparametern mit Marshmallow-Schemas