angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

SLQAlchemy construcción y filtrado de consultas dinámicas que incluye soft deletes

Este post muestra cómo construir un constructor de consultas para todas las consultas seleccionadas

21 junio 2019
post main image
Original photo unsplash.com/@grohsfabian.

Basándome en el post anterior 'Flask, Jinja2 y SLQAlchemy relación de muchos a muchos con las condiciones' busqué una manera de añadir dinámicamente las condiciones de filtro y si es posible también encontrar una solución para el soft delete patrón.

El borrado suave no es borrar registros de una tabla, sino marcarlos como borrados. Esto significa que cada tabla debe tener un indicador borrado y todas las consultas deben excluir los registros marcados como borrados. Para un ORM caso como SQLAlchemy este es aún más complejo ya que no estamos hablando de registros sino de objetos. La implementación soft delete es difícil, pero no sólo se limita a. soft deletes.. Cada clase en mi modelo también tiene un campo de estado. Esto se puede utilizar para desactivar temporalmente la visualización de este objeto a los visitantes no admitidos.

Creo que la mejor manera de implementar soft delete y/o estatus es integrando esto en SQLAlchemy sí mismo y haciéndolo disponible como una función de utilidad. Sin embargo, este no es el caso, pero algunas recetas están disponibles (usando la opción before_compile).

Buscando consultas sólo de selección decidí construir mi propio generador de consultas para sentencias selectas. Los requisitos eran que debe ser posible añadir más de una clase, por ejemplo[Parent, Child], y/o columnas, por ejemplo[Parent.id, Child] y también que debe ser posible añadir dinámicamente condiciones de filtro incluyendo la adición automática de la columna y la columna de estado eliminadas. A continuación encontrará algunas referencias a la construcción de consultas dinámicas.

Por supuesto que tuve otros problemas como: AttributeError: 'scoped_session' el objeto no tiene ningún atributo '_autoflush'. Afortunadamente alguien encontró una solución para esto, ver referencias.

Luego, hice una copia de seguridad y después de eso empecé a usar esto. En caso de que tenga problemas, siempre puedo añadir la receta de FilteredQuery.

Por supuesto, esto quita parte de la'diversión' de escribir SQLAlchemy consultas, pero ¡hey, vamos a crear aplicaciones que funcionen!

En caso de que quieras probar esto:

from sqlalchemy import Table, Column, Integer, String, Boolean, BigInteger, DateTime, ForeignKey, func, and_, or_, desc, asc, create_engine, inspect, sql
from sqlalchemy.orm import relationship, Session, with_polymorphic, backref, contains_eager, Query
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func, label
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy import inspect

import os
import sys


Base = declarative_base()

# many-to-many link table: parent - child
parent_mtm_child_table = Table('parent_mtm_child', Base.metadata,
    Column('parent_id', Integer, ForeignKey('parent.id')),
    Column('child_id', Integer, ForeignKey('child.id'))
)


class Parent(Base):
    __tablename__ = 'parent'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    # many-to-many relationship with child
    children = relationship(
        'Child', 
        secondary=parent_mtm_child_table)

    def __repr__(self):
        return "%s(name=%r)" % \
            (self.__class__.__name__, self.name) 


class Child(Base):
    __tablename__ = 'child'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    age = Column(Integer)
    hair_color = Column(String)
    # many-to-many relationship with parent
    parents = relationship(
        'Parent',
        secondary=parent_mtm_child_table)

    def __repr__(self):
        return "%s(name=%r, age=%r, hair_color=%r)" % \
            (self.__class__.__name__, self.name,
            self.age, self.hair_color) 


# show/hide sql
#engine = create_engine('sqlite://', echo=True)
engine = create_engine('sqlite://')
Base.metadata.create_all(engine)

#session = sessionmaker()
#session.configure(bind=engine)
#db = session()
# use scoped_session
db = scoped_session(sessionmaker(bind=engine))

# Attaching a pre-built query to a scoped_session in SQLAlchemy
# https://stackoverflow.com/questions/43685758/attaching-a-pre-built-query-to-a-scoped-session-in-sqlalchemy
db_local = db()


STATUS_ENABLED = 1

# parents
john = Parent(name='John', status=STATUS_ENABLED)
mary = Parent(name='Mary', status=STATUS_ENABLED)
gina = Parent(name='Gina', status=STATUS_ENABLED)
ryan = Parent(name='Ryan', status=STATUS_ENABLED)
eric = Parent(name='Eric', status=STATUS_ENABLED)
# children
liam = Child(name='Liam', age=6, hair_color='brown', status=STATUS_ENABLED)
emma = Child(name='Emma', age=8, hair_color='blond', status=STATUS_ENABLED)
alex = Child(name='Alex', age=10, hair_color='blond', status=STATUS_ENABLED)
sara = Child(name='Sara', age=9, hair_color='blond', status=STATUS_ENABLED)
rose = Child(name='Rose', age=9, hair_color='blond', status=STATUS_ENABLED)
# assign children to parents
john.children.append(liam)
john.children.append(emma)
john.children.append(alex)
mary.children.append(liam)
gina.children.append(sara)
eric.children.append(rose)
db.add_all([john, mary, gina, ryan, eric, liam, emma, alex, sara, rose])
db.commit()

# delete some
sara.deleted = True
db.commit()

''' 

db_select()
description: build a select query based on input,
also filter on deleted and status attributes


example 1: single table/object select
-------------------------------------------------------

qry = db_select(
    model_class_list=[Parent], 
    order_by_list=[(parent, 'name', 'asc')]
).first()


example 2: two table with many-to-many select
-------------------------------------------------------

qry = db_select(
   model_class_list=[Parent, Child], 
   filter_by_list=[
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
   ],
   order_by_list=[
       (Child, 'name', 'asc'), 
   ],
   limit 10, offset 4, 
).all()


example 3: select column attribute instead of object
-------------------------------------------------------

qry = db_select(
   model_class=[(Parent, 'id'), Child], 
   filter_by_list=[
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
   ],
   order_by_list=[
       (Child, 'name', 'asc'), 
   ],
   limit 10, offset 4, 
).all()


''' 

def db_select(model_class_list=None, filter_by_list=None, order_by_list=None, limit=None, offset=None, filter_deleted=False, filter_status=STATUS_ENABLED):
    fname = 'db_select'
    dbg_print = False

    if dbg_print:
        print(fname + ": len(model_class_list) = {}".format(len(model_class_list)))

    if filter_by_list == None:
        filter_by_list = []
    if order_by_list == None:
        order_by_list = []

    if not isinstance(model_class_list, list):
        raise Exception('model_class_list not list')
    if not isinstance(filter_by_list, list):
        raise Exception('filter_by_list not list')
    if not isinstance(order_by_list, list):
        raise Exception('order_by_list not list')

    # collector for model_classes
    mcs = []
    # collector for columns
    columns = []
    for model_class_item in model_class_list:
        if isinstance(model_class_item, tuple):
            m, key = model_class_item
            column = getattr(m, key, None)
            columns.append(column)
            mcs.append(m)
        else:
            columns.append(model_class_item)
            mcs.append(model_class_item)
    query = Query(columns)

    if dbg_print:
        print(fname + ": after creating query, query = {}".format(query))

    # add deleted filter if column deleted exists
    if not filter_deleted is None:
        for model_class in mcs:
            if 'deleted' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'deleted', 'eq', filter_deleted) )

    # add status filter if column status exists
    if not filter_status is None:
        for model_class in mcs:
            if 'status' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'status', 'eq', filter_status) )

    if dbg_print:
        # filter_by_items
        for filter_by_item in filter_by_list:
            print(fname + ": filter_by_item = {}".format(filter_by_item))
        # order_by_items
        for order_by_item in order_by_list:
            print(fname + ": order_by_item = {}".format(order_by_item))

    for filter_by_item in filter_by_list:
        if dbg_print:
            print(fname + ": processing filter_by_item = {}".format(filter_by_item))
        try:
            model_class, key, op, value = filter_by_item
        except ValueError:
            raise Exception('Invalid filter_by_item: %s' % filter_by_item)

        if dbg_print:
           print(fname + ": processing key, op, value = {}, {}, {}".format(key, op, value))

        column = getattr(model_class, key, None)
        if not column:
            raise Exception('Invalid filter column: %s' % key)

        if op == 'in':
            if isinstance(value, list):
                filt = column.in_(value)
            else:
                filt = column.in_(value.split(','))

            if dbg_print:
                print(fname + ": if, filt = {}".format(filt))
        else:
            try:
                attr = list(filter(
                    lambda e: hasattr(column, e % op),
                    ['%s', '%s_', '__%s__']
                ))[0] % op
            except IndexError:
                raise Exception('Invalid filter operator: %s' % op)

            if dbg_print:
                print(fname + ": processing filter_cond, attr = {}".format(attr))

            if value == 'null':
                value = None
            filt = getattr(column, attr)(value)

            if dbg_print:
                print(fname + ": else, filt = {}".format(filt))

        if dbg_print:
            print(fname + ": adding filt")
        query = query.filter(filt)

    for order_by_item in order_by_list:
        if dbg_print:
            print(fname + ": processing order_by_item = {}".format(order_by_item))

        try:
            model_class, key, op = order_by_item
        except ValueError:
            raise Exception('Invalid order_by_item: %s' % order_by_item)

        if dbg_print:
            print(fname + ": processing model_class = {}, key = {}, op = {}".format(model_class, key, op))

        column = getattr(model_class, key, None)
        column_sorted = getattr(column, op)()
        query = query.order_by(column_sorted)        

    if limit:
        if dbg_print:
            print(fname + ": processing limit = {}".format(limit))
        query = query.limit(limit)

    if offset:
        if dbg_print:
            print(fname + ": processing offset = {}".format(offset))
        query = query.offset(offset)

    if dbg_print:
        print(fname + ": after building query, query = {}".format(query))
    return query.with_session(db_local)


print("\nPARENTS\n")

parents = db_select(
    model_class_list=[Parent], 
    order_by_list=[
        (Parent, 'name', 'asc'), 
    ],
    limit=3,
).all()

for parent in parents:
    print("parent.name = {}".format(parent.name))	


# get parent_ids for next query
parent_ids = [parent.id for parent in parents]
print("parent_ids = {}".format(parent_ids))


print("\nPARENTS-CHILDREN\n")

parent_child_tuples = db_select(
    model_class_list=[Parent, Child], 
    filter_by_list=[
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
    ],
    order_by_list=[
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    ],
    limit=10, offset=0,
).all()

# show tuples
print("parent_child_tuples: {}".format(parent_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent, child in parent_child_tuples:
    parent_id2children[parent.id].append(child)

# show parent_id2children
print("parent_id2children:")
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


print("\nPARENT_IDS-CHILDREN\n")

parent_id_child_tuples = db_select(
    model_class_list=[(Parent, 'id'), Child], 
    filter_by_list=[
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
    ],
    order_by_list=[
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    ],
    limit=10, offset=0,
).all()

# show tuples
print("parent_id_child_tuples: {}".format(parent_id_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent_id, child in parent_id_child_tuples:
    parent_id2children[parent_id].append(child)

# show parent_id2children
print("\nparent_id2children:")
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


print("\nshow columns:")

# debug: show columns in parent 
for c in Parent.__table__.columns:
    print("parent table column c = {}".format(c))

# debug: show columns in parent using inspect
from sqlalchemy import inspect
mapper = inspect(Parent)
for column in mapper.attrs:
    print("column.key = {}".format(column.key))

for key in inspect(Parent).columns.keys():
    print("key = {}".format(key))

if 'deleted' in inspect(Parent).columns.keys():
    print("deleted found")
else:
    print("deleted NOT found")

Enlaces / créditos

Attaching a pre-built query to a scoped_session in SQLAlchemy
https://stackoverflow.com/questions/43685758/attaching-a-pre-built-query-to-a-scoped-session-in-sqlalchemy

Dynamically constructing filters based on string input using SQLAlchemy
https://ruddra.com/posts/dynamically-constructing-filters-based-on-string-input-using-sqlalchemy/

Dynamically constructing filters in SQLAlchemy
https://ruddra.com/posts/dynamically-constructing-filters-based-on-string-input-using-sqlalchemy/

FilteredQuery
https://github.com/sqlalchemy/sqlalchemy/wiki/FilteredQuery

Implementing the "Soft Delete" Pattern with Flask and SQLAlchemy
https://blog.miguelgrinberg.com/post/implementing-the-soft-delete-pattern-with-flask-and-sqlalchemy

method of iterating over sqlalchemy model's defined columns?
https://stackoverflow.com/questions/2537471/method-of-iterating-over-sqlalchemy-models-defined-columns

Python - SqlAlchemy: convert lists of tuples to list of atomic values [duplicate]
https://stackoverflow.com/questions/44355850/python-sqlalchemy-convert-lists-of-tuples-to-list-of-atomic-values

Leer más

SQLAlchemy

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios (1)

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.

avatar

how does one do or_ with this?