angle-up arrow-clockwise arrow-counterclockwise arrow-down-up arrow-left at calendar card-list chat check envelope folder house info-circle pencil people person person-plus phone plus question-circle search tag trash x

SLQAlchemy construction dynamique de requêtes et de filtrage, y compris soft deletes

21 juin 2019 à côté de Peter
Dans SQLAlchemy

Cet article montre comment construire un générateur de requêtes pour toutes vos requêtes sélectionnées.

post main image
Original photo unsplash.com/@grohsfabian.

En m'appuyant sur le post précédent " ", Jinja2 et SLQAlchemy many-to-many relationship with conditions, j'ai cherché un moyen d'ajouter dynamiquement des conditions de filtrage et si possible de trouver également une solution pour le soft delete motif.

La suppression douce ne supprime pas les enregistrements d'une table, mais les marque comme supprimés. Cela signifie que chaque table doit avoir un drapeau supprimé et que toutes les requêtes doivent exclure les enregistrements marqués comme supprimés. Pour une ORM telle SQLAlchemy situation, c'est d'autant plus complexe qu'il ne s'agit pas de documents mais d'objets. L'implémentation soft delete est difficile, mais elle n'est pas seulement limitée à soft deletes... Chaque classe de mon modèle possède également un champ d'état. Ceci peut être utilisé pour désactiver temporairement l'affichage de cet objet pour les visiteurs non-administrateurs.

Je crois que la meilleure façon de mettre en œuvre soft delete ou d'établir un statut est de l'intégrer en SQLAlchemy soi et de le rendre disponible en tant que fonction d'utilité. Ce n'est pas le cas, mais certaines recettes sont disponibles (en utilisant l'option before_compile).

À la recherche de requêtes sélectionnées uniquement, j'ai décidé de construire mon propre générateur de requêtes pour certaines instructions. Les exigences étaient qu'il devait être possible d'ajouter plus d'une classe, par exemple[Parent, Enfant], et/ou des colonnes, par exemple[Parent.id, Enfant] et aussi qu'il devait être possible d'ajouter dynamiquement des conditions de filtrage incluant l'ajout automatique de la colonne supprimée et de la colonne état. Ci-dessous vous trouverez quelques références à la construction de requêtes dynamiques.

Bien sûr, j'ai eu d'autres problèmes comme : AttributeError : L'objet'scoped_session' n'a pas l'attribut'_autoflush'. Heureusement quelqu'un a trouvé une solution à ce problème, voir références.

Ensuite, j'ai fait une sauvegarde et ensuite j'ai commencé à utiliser ceci. En cas de problème, je peux toujours ajouter la recette de FilteredQuery.

Bien sûr, cela enlève une partie du'plaisir' d'écrire SQLAlchemy des requêtes, mais bon, construisons des applications fonctionnelles !

Au cas où vous voudriez essayer ça :

from sqlalchemy import Table, Column, Integer, String, Boolean, BigInteger, DateTime, ForeignKey, func, and_, or_, desc, asc, create_engine, inspect, sql
from sqlalchemy.orm import relationship, Session, with_polymorphic, backref, contains_eager, Query
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func, label
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy import inspect

import os
import sys


Base = declarative_base()

# many-to-many link table: parent - child
parent_mtm_child_table = Table('parent_mtm_child', Base.metadata,
    Column('parent_id', Integer, ForeignKey('parent.id')),
    Column('child_id', Integer, ForeignKey('child.id'))
)


class Parent(Base):
    __tablename__ = 'parent'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    # many-to-many relationship with child
    children = relationship(
        'Child', 
        secondary=parent_mtm_child_table)

    def __repr__(self):
        return "%s(name=%r)" % \
            (self.__class__.__name__, self.name) 


class Child(Base):
    __tablename__ = 'child'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    age = Column(Integer)
    hair_color = Column(String)
    # many-to-many relationship with parent
    parents = relationship(
        'Parent',
        secondary=parent_mtm_child_table)

    def __repr__(self):
        return "%s(name=%r, age=%r, hair_color=%r)" % \
            (self.__class__.__name__, self.name,
            self.age, self.hair_color) 


# show/hide sql
#engine = create_engine('sqlite://', echo=True)
engine = create_engine('sqlite://')
Base.metadata.create_all(engine)

#session = sessionmaker()
#session.configure(bind=engine)
#db = session()
# use scoped_session
db = scoped_session(sessionmaker(bind=engine))

# Attaching a pre-built query to a scoped_session in SQLAlchemy
# https://stackoverflow.com/questions/43685758/attaching-a-pre-built-query-to-a-scoped-session-in-sqlalchemy
db_local = db()


STATUS_ENABLED = 1

# parents
john = Parent(name='John', status=STATUS_ENABLED)
mary = Parent(name='Mary', status=STATUS_ENABLED)
gina = Parent(name='Gina', status=STATUS_ENABLED)
ryan = Parent(name='Ryan', status=STATUS_ENABLED)
eric = Parent(name='Eric', status=STATUS_ENABLED)
# children
liam = Child(name='Liam', age=6, hair_color='brown', status=STATUS_ENABLED)
emma = Child(name='Emma', age=8, hair_color='blond', status=STATUS_ENABLED)
alex = Child(name='Alex', age=10, hair_color='blond', status=STATUS_ENABLED)
sara = Child(name='Sara', age=9, hair_color='blond', status=STATUS_ENABLED)
rose = Child(name='Rose', age=9, hair_color='blond', status=STATUS_ENABLED)
# assign children to parents
john.children.append(liam)
john.children.append(emma)
john.children.append(alex)
mary.children.append(liam)
gina.children.append(sara)
eric.children.append(rose)
db.add_all([john, mary, gina, ryan, eric, liam, emma, alex, sara, rose])
db.commit()

# delete some
sara.deleted = True
db.commit()

''' 

db_select()
description: build a select query based on input,
also filter on deleted and status attributes


example 1: single table/object select
-------------------------------------------------------

qry = db_select(
    model_class_list=[Parent], 
    order_by_list=[(parent, 'name', 'asc')]
).first()


example 2: two table with many-to-many select
-------------------------------------------------------

qry = db_select(
   model_class_list=[Parent, Child], 
   filter_by_list=[
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
   ],
   order_by_list=[
       (Child, 'name', 'asc'), 
   ],
   limit 10, offset 4, 
).all()


example 3: select column attribute instead of object
-------------------------------------------------------

qry = db_select(
   model_class=[(Parent, 'id'), Child], 
   filter_by_list=[
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
   ],
   order_by_list=[
       (Child, 'name', 'asc'), 
   ],
   limit 10, offset 4, 
).all()


''' 

def db_select(model_class_list=None, filter_by_list=None, order_by_list=None, limit=None, offset=None, filter_deleted=False, filter_status=STATUS_ENABLED):
    fname = 'db_select'
    dbg_print = False

    if dbg_print:
        print(fname + ": len(model_class_list) = {}".format(len(model_class_list)))

    if filter_by_list == None:
        filter_by_list = []
    if order_by_list == None:
        order_by_list = []

    if not isinstance(model_class_list, list):
        raise Exception('model_class_list not list')
    if not isinstance(filter_by_list, list):
        raise Exception('filter_by_list not list')
    if not isinstance(order_by_list, list):
        raise Exception('order_by_list not list')

    # collector for model_classes
    mcs = []
    # collector for columns
    columns = []
    for model_class_item in model_class_list:
        if isinstance(model_class_item, tuple):
            m, key = model_class_item
            column = getattr(m, key, None)
            columns.append(column)
            mcs.append(m)
        else:
            columns.append(model_class_item)
            mcs.append(model_class_item)
    query = Query(columns)

    if dbg_print:
        print(fname + ": after creating query, query = {}".format(query))

    # add deleted filter if column deleted exists
    if not filter_deleted is None:
        for model_class in mcs:
            if 'deleted' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'deleted', 'eq', filter_deleted) )

    # add status filter if column status exists
    if not filter_status is None:
        for model_class in mcs:
            if 'status' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'status', 'eq', filter_status) )

    if dbg_print:
        # filter_by_items
        for filter_by_item in filter_by_list:
            print(fname + ": filter_by_item = {}".format(filter_by_item))
        # order_by_items
        for order_by_item in order_by_list:
            print(fname + ": order_by_item = {}".format(order_by_item))

    for filter_by_item in filter_by_list:
        if dbg_print:
            print(fname + ": processing filter_by_item = {}".format(filter_by_item))
        try:
            model_class, key, op, value = filter_by_item
        except ValueError:
            raise Exception('Invalid filter_by_item: %s' % filter_by_item)

        if dbg_print:
           print(fname + ": processing key, op, value = {}, {}, {}".format(key, op, value))

        column = getattr(model_class, key, None)
        if not column:
            raise Exception('Invalid filter column: %s' % key)

        if op == 'in':
            if isinstance(value, list):
                filt = column.in_(value)
            else:
                filt = column.in_(value.split(','))

            if dbg_print:
                print(fname + ": if, filt = {}".format(filt))
        else:
            try:
                attr = list(filter(
                    lambda e: hasattr(column, e % op),
                    ['%s', '%s_', '__%s__']
                ))[0] % op
            except IndexError:
                raise Exception('Invalid filter operator: %s' % op)

            if dbg_print:
                print(fname + ": processing filter_cond, attr = {}".format(attr))

            if value == 'null':
                value = None
            filt = getattr(column, attr)(value)

            if dbg_print:
                print(fname + ": else, filt = {}".format(filt))

        if dbg_print:
            print(fname + ": adding filt")
        query = query.filter(filt)

    for order_by_item in order_by_list:
        if dbg_print:
            print(fname + ": processing order_by_item = {}".format(order_by_item))

        try:
            model_class, key, op = order_by_item
        except ValueError:
            raise Exception('Invalid order_by_item: %s' % order_by_item)

        if dbg_print:
            print(fname + ": processing model_class = {}, key = {}, op = {}".format(model_class, key, op))

        column = getattr(model_class, key, None)
        column_sorted = getattr(column, op)()
        query = query.order_by(column_sorted)        

    if limit:
        if dbg_print:
            print(fname + ": processing limit = {}".format(limit))
        query = query.limit(limit)

    if offset:
        if dbg_print:
            print(fname + ": processing offset = {}".format(offset))
        query = query.offset(offset)

    if dbg_print:
        print(fname + ": after building query, query = {}".format(query))
    return query.with_session(db_local)


print("\nPARENTS\n")

parents = db_select(
    model_class_list=[Parent], 
    order_by_list=[
        (Parent, 'name', 'asc'), 
    ],
    limit=3,
).all()

for parent in parents:
    print("parent.name = {}".format(parent.name))	


# get parent_ids for next query
parent_ids = [parent.id for parent in parents]
print("parent_ids = {}".format(parent_ids))


print("\nPARENTS-CHILDREN\n")

parent_child_tuples = db_select(
    model_class_list=[Parent, Child], 
    filter_by_list=[
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
    ],
    order_by_list=[
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    ],
    limit=10, offset=0,
).all()

# show tuples
print("parent_child_tuples: {}".format(parent_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent, child in parent_child_tuples:
    parent_id2children[parent.id].append(child)

# show parent_id2children
print("parent_id2children:")
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


print("\nPARENT_IDS-CHILDREN\n")

parent_id_child_tuples = db_select(
    model_class_list=[(Parent, 'id'), Child], 
    filter_by_list=[
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
    ],
    order_by_list=[
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    ],
    limit=10, offset=0,
).all()

# show tuples
print("parent_id_child_tuples: {}".format(parent_id_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent_id, child in parent_id_child_tuples:
    parent_id2children[parent_id].append(child)

# show parent_id2children
print("\nparent_id2children:")
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


print("\nshow columns:")

# debug: show columns in parent 
for c in Parent.__table__.columns:
    print("parent table column c = {}".format(c))

# debug: show columns in parent using inspect
from sqlalchemy import inspect
mapper = inspect(Parent)
for column in mapper.attrs:
    print("column.key = {}".format(column.key))

for key in inspect(Parent).columns.keys():
    print("key = {}".format(key))

if 'deleted' in inspect(Parent).columns.keys():
    print("deleted found")
else:
    print("deleted NOT found")

Liens / crédits

Attaching a pre-built query to a scoped_session in SQLAlchemy
https://stackoverflow.com/questions/43685758/attaching-a-pre-built-query-to-a-scoped-session-in-sqlalchemy

Dynamically constructing filters based on string input using SQLAlchemy
https://ruddra.com/posts/dynamically-constructing-filters-based-on-string-input-using-sqlalchemy/

Dynamically constructing filters in SQLAlchemy
https://ruddra.com/posts/dynamically-constructing-filters-based-on-string-input-using-sqlalchemy/

FilteredQuery
https://github.com/sqlalchemy/sqlalchemy/wiki/FilteredQuery

Implementing the "Soft Delete" Pattern with Flask and SQLAlchemy
https://blog.miguelgrinberg.com/post/implementing-the-soft-delete-pattern-with-flask-and-sqlalchemy

method of iterating over sqlalchemy model's defined columns?
https://stackoverflow.com/questions/2537471/method-of-iterating-over-sqlalchemy-models-defined-columns

Python - SqlAlchemy: convert lists of tuples to list of atomic values [duplicate]
https://stackoverflow.com/questions/44355850/python-sqlalchemy-convert-lists-of-tuples-to-list-of-atomic-values

En savoir plus...:
SQLAlchemy

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.