angle-up arrow-clockwise arrow-counterclockwise arrow-down-up arrow-left at calendar card-list chat check envelope folder house info-circle pencil people person person-plus phone plus question-circle search tag trash x

Построение динамических запросов и фильтрация SLQAlchemy, в том числе soft deletes

21 июня 2019 возле Peter

В этом сообщении показано, как построить конструктор запросов для всех выбранных вами запросов.

post main image
Original photo unsplash.com/@grohsfabian.

Основываясь на предыдущем сообщении 'Flask, Jinja2 и связи SLQAlchemy много-многих с условиями' я искал способ динамического добавления условий фильтра и, если возможно, нашел решение для soft delete шаблона.

Мягкое удаление - это не удаление записей из таблицы, а их пометка как удаленные. Это означает, что каждая таблица должна иметь удаленный флаг, и все запросы должны исключать записи, помеченные как удаленные. Ведь ORM SQLAlchemy это еще сложнее, потому что речь идет не о записях, а об объектах. Реализация сложна, но она не ограничивается только тем soft deletes, что каждый класс в моей модели также имеет поле статуса. Это может быть использовано для временного отключения показа этого объекта другим посетителям.

Я считаю, что лучший способ реализации soft delete и/или статуса - это интегрировать его в SQLAlchemy себя и сделать его доступным в качестве функции утилиты. Однако это не так, но некоторые рецепты доступны (с использованием опции pre_compile).

В поисках только выборочных запросов я решил построить свой собственный конструктор запросов для выбранных операторов. Требования заключались в том, что должна быть возможность добавлять более одного класса, например, [Родитель, ребенок], и/или столбцы, например [Родитель.id, ребенок], а также возможность динамического добавления условий фильтрации, включая автоматическое добавление удаленных столбцов и столбцов состояния. Ниже приведены ссылки на динамическое построение запросов.

Конечно, я столкнулся с некоторыми другими проблемами, например: AttributeError: объект 'scoped_session' не имеет атрибута '_autoflush'. К счастью, кто-то нашел решение этой проблемы, см. ссылки.

Затем я сделал резервную копию, и после этого начал использовать это. В случае возникновения проблем я всегда могу добавить рецепт FilteredQuery.

Конечно, это отнимает часть "удовольствия" от написания SQLAlchemy запросов, но давайте создадим рабочие приложения!

На случай, если ты захочешь попробовать это:

from sqlalchemy import Table, Column, Integer, String, Boolean, BigInteger, DateTime, ForeignKey, func, and_, or_, desc, asc, create_engine, inspect, sql
from sqlalchemy.orm import relationship, Session, with_polymorphic, backref, contains_eager, Query
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func, label
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy import inspect

import os
import sys


Base = declarative_base()

# many-to-many link table: parent - child
parent_mtm_child_table = Table('parent_mtm_child', Base.metadata,
    Column('parent_id', Integer, ForeignKey('parent.id')),
    Column('child_id', Integer, ForeignKey('child.id'))
)


class Parent(Base):
    __tablename__ = 'parent'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    # many-to-many relationship with child
    children = relationship(
        'Child', 
        secondary=parent_mtm_child_table)

    def __repr__(self):
        return "%s(name=%r)" % \
            (self.__class__.__name__, self.name) 


class Child(Base):
    __tablename__ = 'child'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    age = Column(Integer)
    hair_color = Column(String)
    # many-to-many relationship with parent
    parents = relationship(
        'Parent',
        secondary=parent_mtm_child_table)

    def __repr__(self):
        return "%s(name=%r, age=%r, hair_color=%r)" % \
            (self.__class__.__name__, self.name,
            self.age, self.hair_color) 


# show/hide sql
#engine = create_engine('sqlite://', echo=True)
engine = create_engine('sqlite://')
Base.metadata.create_all(engine)

#session = sessionmaker()
#session.configure(bind=engine)
#db = session()
# use scoped_session
db = scoped_session(sessionmaker(bind=engine))

# Attaching a pre-built query to a scoped_session in SQLAlchemy
# https://stackoverflow.com/questions/43685758/attaching-a-pre-built-query-to-a-scoped-session-in-sqlalchemy
db_local = db()


STATUS_ENABLED = 1

# parents
john = Parent(name='John', status=STATUS_ENABLED)
mary = Parent(name='Mary', status=STATUS_ENABLED)
gina = Parent(name='Gina', status=STATUS_ENABLED)
ryan = Parent(name='Ryan', status=STATUS_ENABLED)
eric = Parent(name='Eric', status=STATUS_ENABLED)
# children
liam = Child(name='Liam', age=6, hair_color='brown', status=STATUS_ENABLED)
emma = Child(name='Emma', age=8, hair_color='blond', status=STATUS_ENABLED)
alex = Child(name='Alex', age=10, hair_color='blond', status=STATUS_ENABLED)
sara = Child(name='Sara', age=9, hair_color='blond', status=STATUS_ENABLED)
rose = Child(name='Rose', age=9, hair_color='blond', status=STATUS_ENABLED)
# assign children to parents
john.children.append(liam)
john.children.append(emma)
john.children.append(alex)
mary.children.append(liam)
gina.children.append(sara)
eric.children.append(rose)
db.add_all([john, mary, gina, ryan, eric, liam, emma, alex, sara, rose])
db.commit()

# delete some
sara.deleted = True
db.commit()

''' 

db_select()
description: build a select query based on input,
also filter on deleted and status attributes


example 1: single table/object select
-------------------------------------------------------

qry = db_select(
    model_class_list=[Parent], 
    order_by_list=[(parent, 'name', 'asc')]
).first()


example 2: two table with many-to-many select
-------------------------------------------------------

qry = db_select(
   model_class_list=[Parent, Child], 
   filter_by_list=[
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
   ],
   order_by_list=[
       (Child, 'name', 'asc'), 
   ],
   limit 10, offset 4, 
).all()


example 3: select column attribute instead of object
-------------------------------------------------------

qry = db_select(
   model_class=[(Parent, 'id'), Child], 
   filter_by_list=[
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
   ],
   order_by_list=[
       (Child, 'name', 'asc'), 
   ],
   limit 10, offset 4, 
).all()


''' 

def db_select(model_class_list=None, filter_by_list=None, order_by_list=None, limit=None, offset=None, filter_deleted=False, filter_status=STATUS_ENABLED):
    fname = 'db_select'
    dbg_print = False

    if dbg_print:
        print(fname + ": len(model_class_list) = {}".format(len(model_class_list)))

    if filter_by_list == None:
        filter_by_list = []
    if order_by_list == None:
        order_by_list = []

    if not isinstance(model_class_list, list):
        raise Exception('model_class_list not list')
    if not isinstance(filter_by_list, list):
        raise Exception('filter_by_list not list')
    if not isinstance(order_by_list, list):
        raise Exception('order_by_list not list')

    # collector for model_classes
    mcs = []
    # collector for columns
    columns = []
    for model_class_item in model_class_list:
        if isinstance(model_class_item, tuple):
            m, key = model_class_item
            column = getattr(m, key, None)
            columns.append(column)
            mcs.append(m)
        else:
            columns.append(model_class_item)
            mcs.append(model_class_item)
    query = Query(columns)

    if dbg_print:
        print(fname + ": after creating query, query = {}".format(query))

    # add deleted filter if column deleted exists
    if not filter_deleted is None:
        for model_class in mcs:
            if 'deleted' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'deleted', 'eq', filter_deleted) )

    # add status filter if column status exists
    if not filter_status is None:
        for model_class in mcs:
            if 'status' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'status', 'eq', filter_status) )

    if dbg_print:
        # filter_by_items
        for filter_by_item in filter_by_list:
            print(fname + ": filter_by_item = {}".format(filter_by_item))
        # order_by_items
        for order_by_item in order_by_list:
            print(fname + ": order_by_item = {}".format(order_by_item))

    for filter_by_item in filter_by_list:
        if dbg_print:
            print(fname + ": processing filter_by_item = {}".format(filter_by_item))
        try:
            model_class, key, op, value = filter_by_item
        except ValueError:
            raise Exception('Invalid filter_by_item: %s' % filter_by_item)

        if dbg_print:
           print(fname + ": processing key, op, value = {}, {}, {}".format(key, op, value))

        column = getattr(model_class, key, None)
        if not column:
            raise Exception('Invalid filter column: %s' % key)

        if op == 'in':
            if isinstance(value, list):
                filt = column.in_(value)
            else:
                filt = column.in_(value.split(','))

            if dbg_print:
                print(fname + ": if, filt = {}".format(filt))
        else:
            try:
                attr = list(filter(
                    lambda e: hasattr(column, e % op),
                    ['%s', '%s_', '__%s__']
                ))[0] % op
            except IndexError:
                raise Exception('Invalid filter operator: %s' % op)

            if dbg_print:
                print(fname + ": processing filter_cond, attr = {}".format(attr))

            if value == 'null':
                value = None
            filt = getattr(column, attr)(value)

            if dbg_print:
                print(fname + ": else, filt = {}".format(filt))

        if dbg_print:
            print(fname + ": adding filt")
        query = query.filter(filt)

    for order_by_item in order_by_list:
        if dbg_print:
            print(fname + ": processing order_by_item = {}".format(order_by_item))

        try:
            model_class, key, op = order_by_item
        except ValueError:
            raise Exception('Invalid order_by_item: %s' % order_by_item)

        if dbg_print:
            print(fname + ": processing model_class = {}, key = {}, op = {}".format(model_class, key, op))

        column = getattr(model_class, key, None)
        column_sorted = getattr(column, op)()
        query = query.order_by(column_sorted)        

    if limit:
        if dbg_print:
            print(fname + ": processing limit = {}".format(limit))
        query = query.limit(limit)

    if offset:
        if dbg_print:
            print(fname + ": processing offset = {}".format(offset))
        query = query.offset(offset)

    if dbg_print:
        print(fname + ": after building query, query = {}".format(query))
    return query.with_session(db_local)


print("\nPARENTS\n")

parents = db_select(
    model_class_list=[Parent], 
    order_by_list=[
        (Parent, 'name', 'asc'), 
    ],
    limit=3,
).all()

for parent in parents:
    print("parent.name = {}".format(parent.name))	


# get parent_ids for next query
parent_ids = [parent.id for parent in parents]
print("parent_ids = {}".format(parent_ids))


print("\nPARENTS-CHILDREN\n")

parent_child_tuples = db_select(
    model_class_list=[Parent, Child], 
    filter_by_list=[
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
    ],
    order_by_list=[
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    ],
    limit=10, offset=0,
).all()

# show tuples
print("parent_child_tuples: {}".format(parent_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent, child in parent_child_tuples:
    parent_id2children[parent.id].append(child)

# show parent_id2children
print("parent_id2children:")
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


print("\nPARENT_IDS-CHILDREN\n")

parent_id_child_tuples = db_select(
    model_class_list=[(Parent, 'id'), Child], 
    filter_by_list=[
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
    ],
    order_by_list=[
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    ],
    limit=10, offset=0,
).all()

# show tuples
print("parent_id_child_tuples: {}".format(parent_id_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent_id, child in parent_id_child_tuples:
    parent_id2children[parent_id].append(child)

# show parent_id2children
print("\nparent_id2children:")
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


print("\nshow columns:")

# debug: show columns in parent 
for c in Parent.__table__.columns:
    print("parent table column c = {}".format(c))

# debug: show columns in parent using inspect
from sqlalchemy import inspect
mapper = inspect(Parent)
for column in mapper.attrs:
    print("column.key = {}".format(column.key))

for key in inspect(Parent).columns.keys():
    print("key = {}".format(key))

if 'deleted' in inspect(Parent).columns.keys():
    print("deleted found")
else:
    print("deleted NOT found")

Ссылки / кредиты

Attaching a pre-built query to a scoped_session in SQLAlchemy
https://stackoverflow.com/questions/43685758/attaching-a-pre-built-query-to-a-scoped-session-in-sqlalchemy

Dynamically constructing filters based on string input using SQLAlchemy
https://ruddra.com/posts/dynamically-constructing-filters-based-on-string-input-using-sqlalchemy/

Dynamically constructing filters in SQLAlchemy
https://ruddra.com/posts/dynamically-constructing-filters-based-on-string-input-using-sqlalchemy/

FilteredQuery
https://github.com/sqlalchemy/sqlalchemy/wiki/FilteredQuery

Implementing the "Soft Delete" Pattern with Flask and SQLAlchemy
https://blog.miguelgrinberg.com/post/implementing-the-soft-delete-pattern-with-flask-and-sqlalchemy

method of iterating over sqlalchemy model's defined columns?
https://stackoverflow.com/questions/2537471/method-of-iterating-over-sqlalchemy-models-defined-columns

Python - SqlAlchemy: convert lists of tuples to list of atomic values [duplicate]
https://stackoverflow.com/questions/44355850/python-sqlalchemy-convert-lists-of-tuples-to-list-of-atomic-values

Подробнее:
SQLAlchemy

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.