SQLAlchemy和多个进程的连接问题 [英] Connection problems with SQLAlchemy and multiple processes

查看:126
本文介绍了SQLAlchemy和多个进程的连接问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在一个由启动子进程的主进程组成的项目中使用PostgreSQL和SQLAlchemy.所有这些过程都通过SQLAlchemy访问数据库.

I'm using PostgreSQL and SQLAlchemy in a project that consists of a main process which launches child processes. All of these processes access the database via SQLAlchemy.

我遇到可重复的连接失败:前几个子进程正常工作,但是过一会儿出现连接错误.这是MWCE:

I'm experiencing repeatable connection failures: The first few child processes work correctly, but after a while a connection error is raised. Here's an MWCE:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

engine = None
Session = None
session = None

def init():
    global engine, Session, session
    engine = create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()

def cleanup():
    session.close()
    engine.dispose()

def target(id):
    init()
    try:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()
    finally:
        cleanup()

def main():
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2
    finally:
        cleanup()

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

在我的系统(PostgreSQL 9.6,SQLAlchemy 1.1.4,psycopg2 2.6.2,Python 2.7,Ubuntu 14.04)上,这产生了

On my system (PostgreSQL 9.6, SQLAlchemy 1.1.4, psycopg2 2.6.2, Python 2.7, Ubuntu 14.04) this yields

1
2
3
4
5
6
7
8
9
10
11
Traceback (most recent call last):
  File "./fork_test.py", line 64, in <module>
    main()
  File "./fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 11074}]

这是可重复的,并且总是在同一迭代中崩溃.

This is repeatable and always crashes at the same iteration.

按照

I'm creating a new engine and session after the fork as recommended by the SQLAlchemy documentation and elsewhere. Interestingly, the following slightly different approach does not crash:

import contextlib
import multiprocessing

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

@contextlib.contextmanager
def get_session():
    engine = sqlalchemy.create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session
    finally:
        session.close()
        engine.dispose()

def target(id):
    with get_session() as session:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()

def main():
    with get_session() as session:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

由于原始代码更复杂,不能简单地切换到后一个版本,所以我想了解为什么其中一个有效而另一个无效.

Since the original code is more complex and cannot simply be switched over to the latter version I'd like to understand why one of these works and the other doesn't.

唯一明显的区别是,崩溃的代码将全局变量用于引擎和会话-这些变量通过子进程的写时复制共享.但是,由于我是在分叉后立即将它们重置的,所以我不明白这可能是个问题.

The only obvious difference is that the crashing code uses global variables for the engine and the session -- these are shared via copy-on-write with the child processes. However, since I reset them directly after the fork I don't understand how that could be a problem.

我使用Python 2.7和Python 3.4重新运行了带有最新SQLAlchemy(1.1.5)的两个代码段.两种结果基本上如上所述.但是,在Python 2.7上,第一个代码段的崩溃现在发生在第13个迭代中(可重现),而在3.4上,它已经在第三次迭代中发生了(也可重现).第二个代码段在两个版本上都运行正常.这是从3.4开始的追溯:

I re-ran the two code pieces with the latest SQLAlchemy (1.1.5) using both Python 2.7 and Python 3.4. On both the results are basically as described above. However, on Python 2.7 the crash of the first code piece now happens in the 13th iteration (reproducibly) while on 3.4 it already happens in the third iteration (also reproducibly). The second code piece runs without problems on both versions. Here's the traceback from 3.4:

1
2
3
Traceback (most recent call last):
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "fork_test.py", line 64, in <module>
    main()
  File "fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 3397}]

这是PostgreSQL日志(与2.7和3.4相同):

Here's the PostgreSQL log (it's the same for 2.7 and 3.4):

2017-01-18 10:59:36 UTC [22429-1] LOG:  database system was shut down at 2017-01-18 10:59:35 UTC
2017-01-18 10:59:36 UTC [22429-2] LOG:  MultiXact member wraparound protections are now enabled
2017-01-18 10:59:36 UTC [22428-1] LOG:  database system is ready to accept connections
2017-01-18 10:59:36 UTC [22433-1] LOG:  autovacuum launcher started
2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG:  incomplete startup packet
2017-01-18 11:00:10 UTC [22466-1] user@db LOG:  SSL error: decryption failed or bad record mac
2017-01-18 11:00:10 UTC [22466-2] user@db LOG:  could not receive data from client: Connection reset by peer

(请注意,有关不完整的启动数据包的消息是无害的)

(Note that the message about the incomplete startup packet is harmless)

推荐答案

引用特别强调:

SQLAlchemy Engine对象引用现有数据库连接的连接池.因此,当将此对象复制到子进程时,目标是确保不保留任何数据库连接.

但是,对于共享活动交易的会话或连接的情况,没有自动解决方案;应用程序需要确保新的子进程仅启动新的Connection对象和事务以及ORM会话对象.

However, for the case of a transaction-active Session or Connection being shared, there’s no automatic fix for this; an application needs to ensure a new child process only initiate new Connection objects and transactions, as well as ORM Session objects.

此问题源于派生的子进程继承了实时全局session,该全局全局生存在Connection上.当target调用init时,它将覆盖对enginesession的全局引用,从而在子级中将它们的refcounts减少为0,从而迫使它们最终确定.例如,如果您以一种方式或另一种方式在子代中创建对继承的会话的另一个引用,则可以防止对该子代进行清理,但不要这样做. main加入并照常营业后,它正在尝试使用现在可能已定稿的(否则不同步的)连接.至于为什么只有在反复迭代后才导致错误,我不确定.

The issue stems from the forked child process inheriting the live global session, which is holding on to a Connection. When target calls init, it overwrites the global references to engine and session, thus decreasing their refcounts to 0 in the child, forcing them to finalize. If you for example one way or another create another reference to the inherited session in the child, you prevent it from being cleaned up – but don't do that. After main has joined and returns to business as usual it is trying to use the now potentially finalized – or otherwise out of sync – connection. As to why this causes an error only after some amount of iterations I'm not sure.

使用全局变量处理这种情况的唯一方法是

The only way to handle this situation using globals the way you do is to

  1. 关闭所有会话
  2. 致电engine.dispose()

在分叉之前.这将防止连接泄漏到孩子.例如:

before forking. This will prevent connections from leaking to the child. For example:

def main():
    global session
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        dummy_id = dummy.id
        # Return the Connection to the pool
        session.close()
        # Dispose of it!
        engine.dispose()
        # ...or call your cleanup() function, which does the same
        p = multiprocessing.Process(target=target, args=(dummy_id,))
        p.start()
        p.join()
        # Start a new session
        session = Session()
        dummy = session.query(Dummy).get(dummy_id)
        assert dummy.value == 2
    finally:
        cleanup()

您的第二个示例不会在子级中触发终结处理,因此它似乎只起作用,尽管它可能与第一个示例一样坏,因为它仍继承会话的副本及其在.

Your second example does not trigger finalization in the child, and so it only seems to work, though it might be as broken as the first, as it is still inheriting a copy of the session and its connection defined locally in main.

这篇关于SQLAlchemy和多个进程的连接问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆