SQLAlchemy:按多个表按天分组 [英] SQLAlchemy: group by day over multiple tables

查看:161
本文介绍了SQLAlchemy:按多个表按天分组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的 Flask 应用程序中,我有一个类似于银行帐户的内容:一个User有一个Account,贷项条目建模为Incomings,扣除额建模为Outgoings.

In my Flask application, I have something similar to a bank account: One User has one Account, credit entries are modeled as Incomings, deductions are modeled as Outgoings.

问题:

获取一个用户的帐户对帐单",即每天的信用额度/扣除额,例如

Get an "account statement" for one user, i.e. credit entries / deductions per day, e.g.

Thu 29 Aug 2019
  Some deduction: -23.00
  Some credit: 123.00
Fri 30 Aug 2019
  Big credit: 4223.00
  Another deduction: -42.00

我的数据模型:

这是我的models.py的简化版本:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy     import Column, Float, ForeignKey, Integer, Text, TIMESTAMP
from sqlalchemy.orm import relationship

Base = declarative_base()

class Account(Base):
    __tablename__ = 'account'
    id        = Column(Integer, primary_key=True)
    balance   = Column(Float,   nullable=False)
    userID    = Column(Integer, ForeignKey('user.id'))
    incomings = relationship("Incoming", back_populates="account")
    outgoings = relationship("Outgoing", back_populates="account")
    user      = relationship("User",     back_populates="account")

class Incoming(Base):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

class Outgoing(Base):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

class User(Base):
    __tablename__ = 'user'
    id      = Column(Integer,   primary_key=True)
    name    = Column(Text,      nullable=False)
    account = relationship("Account", back_populates="user")

我通常的做法:

  1. 按日期分组为用户获取所有Incomings
  2. 按日期分组为用户获取所有Outgoings
  3. 以某种方式合并按天分组的两个列表
  1. get all Incomings for user, grouped by day
  2. get all Outgoings for user, grouped by day
  3. somehow merge the two lists, grouped by day

我的背景

已经有一段时间了,因为我已经使用了基础数据库 PostgreSQL (但后来,我 设法设置了一个触发功能来自动更新余额),但据 SQLAlchemy (使用中的ORM)而言,我似乎只是在抓挠表面.

It's been a while since I've worked with the underlying database PostgreSQL (but then, I did manage to set up a trigger function to auto-update the balance), but as far as SQLAlchemy (the ORM in use) is concerned, I merely seem to have scratched the surface.

在第一个 SO命中之后,我尝试了

Following the first SO hit, I tried

from sqlalchemy import func

# existing sample account ID
accountID  = 42
# not relevant to the point at hand, known to work
db_session = get_a_scoped_session_from_elsewhere()

db_incomings = db_session.query(Incoming)                         \
                         .filter(Incoming.accountID == accountID) \
                         .group_by(func.day(Incoming.timestamp))  \
                         .all()

但这失败了

ProgrammingError: (psycopg2.errors.UndefinedFunction) ...
 ... function day(timestamp without time zone) does not exist

似乎表明PostgreSQL不支持day.

which seems to indicate that PostgreSQL doesn't support day.

根据这样的答案,

# imports and variables as above
db_incomings = db_session.query(Incoming)                                      \
                         .filter(Incoming.accountID == accountID)              \
                         .group_by(func.date_trunc('day', Incoming.timestamp)) \
                         .all()

适用于PostgreSQL ,但对我而言失败

ProgrammingError: (psycopg2.errors.GroupingError) ...
 ... column "incoming.id" must appear in the GROUP BY clause ...
 ... or be used in an aggregate function

当我盲目尝试执行错误消息告诉我的内容时,将incoming.id添加到GROUP BY子句中,如

When I just blindly try to do what the error message tells me and add incoming.id to the GROUP BY clause as in

db_incomings = db_session.query(Incoming)                                      \
                         .filter(Incoming.accountID == accountID)              \
                         .group_by(Incoming.id,
                                   func.date_trunc('day', Incoming.timestamp)) \
                         .all()

该代码有效,但不返回所需结果;相反,我得到了像

the code works, but does not return the wanted result; instead, I get a list of objects like

{'timestamp': datetime.datetime(2019, 8, 29, 10, 4, 27, 459000), 'id': 1, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 21, 493000), 'id': 2, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 42, 660000), 'id': 3, 'accountID': 42, ...}

考虑到我按Incoming.id分组,这并不奇怪.

which isn't really surprising, considering I'm grouping by Incoming.id.

试图了解潜在问题(请参见例如此处

Trying to understand the underlying issue (see e.g. here or here), it seems I cannot reference a field on the SELECT statement (i.e. the SQLAlchemy .query) if it doesn't appear on the GROUP BY clause (i.e. the SQLAlchemy .group_by). Looking at the error message, this also seems to be the case vice versa.

我已经动脑筋了几个小时,找到了func.date_trunc的许多替代方案,并打开了800个浏览器选项卡,但仍然不知道如何解决这个问题.

I've been wrecking my brain for a couple of hours now, found lots of alternatives to func.date_trunc and have 800 browser tabs open, but still no idea how to approach this.

我的问题:我如何构造/构建SQLAlchemy查询?

My question: How do I need to structure / build up the SQLAlchemy query ?

推荐答案

SQL可以处理并返回表格数据(或关系,如果您更喜欢这样考虑的话,但并非所有SQL表都是关系).这意味着问题中描述的嵌套表并不是一个常见的功能.有几种方法可以在Postgresql中产生某种类型的东西,例如使用JSON数组或组合,但是完全可以获取表格数据并在应用程序中执行嵌套. Python具有 itertools.groupby() ,非常适合该法案,给出排序后的数据.

SQL works with and returns tabular data (or relations, if you prefer to think of it that way, but not all SQL tables are relations). What this implies is that a nested table such as depicted in the question is not that common a feature. There are ways to produce something of the kind in Postgresql, for example using arrays of JSON or composites, but it is entirely possible to just fetch tabular data and perform the nesting in the application. Python has itertools.groupby(), which fits the bill quite well, given sorted data.

错误column "incoming.id" must appear in the GROUP BY clause...表示选择列表中的非聚合,having子句等必须出现在GROUP BY子句中或在聚合中使用,以免它们可能具有不确定的值.换句话说,该值只能从组中的某行中选择,因为 GROUP BY将分组的行压缩为单行,并且任何人都可以猜测它们是从哪一行中选择的.实现可能允许这样做,就像SQLite和MySQL曾经那样,但是SQL标准禁止这样做.该规则的例外是当存在功能依赖时; GROUP BY子句确定非聚集.考虑一下按 A 主键分组的表 A B 之间的联接.无论系统从哪个组中的哪一行中选择 A 的列的值,由于分组都是基于主键进行的,因此它们将是相同的.

The error column "incoming.id" must appear in the GROUP BY clause... is saying that non-aggregates in the select list, having clause, etc. must appear in the GROUP BY clause or be used in an aggregate, lest they have possibly indeterminate values. In other words the value would have to be picked from just some row in the group, because GROUP BY condenses the grouped rows into a single row, and it would be anyone's guess which row they were picked from. The implementation might allow this, like SQLite does and MySQL used to do, but the SQL standard forbids such. The exception to the rule is when there is a functional dependency; the GROUP BY clause determines the non-aggregates. Think of a join between tables A and B grouped by A's primary key. No matter which row in a group the system would pick the values for A's columns from, they would be the same since the grouping was done based on the primary key.

要解决三点通用方法,一种方法是选择传入和传出的并集,并按其时间戳排序.由于没有继承体系设置-甚至可能没有成为其中一个,我对会计不熟悉–在这种情况下,恢复使用Core和简单结果元组将使事情变得更容易:

To address the 3 point general intended approach, one way would be to select a union of incoming and outgoing, ordered by their timestamps. Since there is no inheritance hierarchy setup––as there might not even be one, I'm not familiar with accounting––a revert to using Core and plain result tuples makes things easier in this case:

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)

然后使用以下形式来形成嵌套结构itertools.groupby():

Then in order to form the nested structure itertools.groupby() is used:

date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]

最终结果是一个2元组的日期列表和一个按升序排列的条目字典.还不完全是ORM解决方案,但是可以完成工作.一个例子:

The end result is a list of 2-tuples of date and list of dictionaries of entries in ascending order. Not quite the ORM solution, but gets the job done. An example:

In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [57]: session.commit()

In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    ...:     where(Incoming.accountID == 1)
    ...: 
    ...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    ...:     where(Outgoing.accountID == 1)
    ...: 
    ...: all_entries = incoming.union(outgoing)
    ...: all_entries = all_entries.order_by(all_entries.c.timestamp)
    ...: all_entries = db_session.execute(all_entries)

In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
    ...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]: 
[(datetime.date(2019, 9, 1),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 5,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 2),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 3),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 2,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
    'type': 'outgoing'}])]

如上所述,Postgresql可以产生与使用JSON数组几乎相同的结果:

As mentioned, Postgresql can produce pretty much the same result as is using an array of JSON:

from sqlalchemy.dialects.postgresql import aggregate_order_by

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing).alias('all_entries')

day = func.date_trunc('day', all_entries.c.timestamp)

stmt = select([day,
               func.array_agg(aggregate_order_by(
                   func.row_to_json(literal_column('all_entries.*')),
                   all_entries.c.timestamp))]).\
    group_by(day).\
    order_by(day)

db_session.execute(stmt).fetchall()


如果实际上可以将IncomingOutgoing视为具有共同基础的子级,例如Entry,则可以通过


If in fact Incoming and Outgoing can be thought of as children of a common base, for example Entry, using unions can be somewhat automated with concrete table inheritance:

from sqlalchemy.ext.declarative import AbstractConcreteBase

class Entry(AbstractConcreteBase, Base):
    pass

class Incoming(Entry):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

    __mapper_args__ = {
        'polymorphic_identity': 'incoming',
        'concrete': True
    }

class Outgoing(Entry):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

    __mapper_args__ = {
        'polymorphic_identity': 'outgoing',
        'concrete': True
    }

不幸的是使用 AbstractConcreteBase 要求手动调用configure_mappers() 定义了所有必需的类时;在这种情况下,最早的可能性是在定义User之后,因为Account通过关系依赖于它:

Unfortunately using AbstractConcreteBase requires a manual call to configure_mappers() when all necessary classes have been defined; in this case the earliest possibility is after defining User, because Account depends on it through relationships:

from sqlalchemy.orm import configure_mappers
configure_mappers()

然后,为了在单个多态ORM查询中获取所有IncomingOutgoing,请使用Entry:

Then in order to fetch all Incoming and Outgoing in a single polymorphic ORM query use Entry:

session.query(Entry).\
    filter(Entry.accountID == accountID).\
    order_by(Entry.timestamp).\
    all()

,然后继续在结果IncomingOutgoing的列表上使用上述的itertools.groupby().

and proceed to use itertools.groupby() as above on the resulting list of Incoming and Outgoing.

P.s.小心二进制浮点数和金钱.我们曾经度过了愉快的时光,弄清楚为什么40.80的购买最终会变成40.79.

这篇关于SQLAlchemy:按多个表按天分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆