SQLAlchemy和多个进程的连接问题 [英] Connection problems with SQLAlchemy and multiple processes
问题描述
我在一个由启动子进程的主进程组成的项目中使用PostgreSQL和SQLAlchemy.所有这些过程都通过SQLAlchemy访问数据库.
I'm using PostgreSQL and SQLAlchemy in a project that consists of a main process which launches child processes. All of these processes access the database via SQLAlchemy.
我遇到可重复的连接失败:前几个子进程正常工作,但是过一会儿出现连接错误.这是MWCE:
I'm experiencing repeatable connection failures: The first few child processes work correctly, but after a while a connection error is raised. Here's an MWCE:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker
DB_URL = 'postgresql://user:password@localhost/database'
Base = declarative_base()
class Dummy(Base):
__tablename__ = 'dummies'
id = Column(Integer, primary_key=True)
value = Column(Integer)
engine = None
Session = None
session = None
def init():
global engine, Session, session
engine = create_engine(DB_URL)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
def cleanup():
session.close()
engine.dispose()
def target(id):
init()
try:
dummy = session.query(Dummy).get(id)
dummy.value += 1
session.add(dummy)
session.commit()
finally:
cleanup()
def main():
init()
try:
dummy = Dummy(value=1)
session.add(dummy)
session.commit()
p = multiprocessing.Process(target=target, args=(dummy.id,))
p.start()
p.join()
session.refresh(dummy)
assert dummy.value == 2
finally:
cleanup()
if __name__ == '__main__':
i = 1
while True:
print(i)
main()
i += 1
在我的系统(PostgreSQL 9.6,SQLAlchemy 1.1.4,psycopg2 2.6.2,Python 2.7,Ubuntu 14.04)上,这产生了
On my system (PostgreSQL 9.6, SQLAlchemy 1.1.4, psycopg2 2.6.2, Python 2.7, Ubuntu 14.04) this yields
1
2
3
4
5
6
7
8
9
10
11
Traceback (most recent call last):
File "./fork_test.py", line 64, in <module>
main()
File "./fork_test.py", line 55, in main
session.refresh(dummy)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh
only_load_props=attribute_names) is None:
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
return q.one()
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one
ret = self.one_or_none()
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none
ret = list(self)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__
return self._execute_and_instances(context)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
return meth(self, multiparams, params)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
exc_info
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
[SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 11074}]
这是可重复的,并且总是在同一迭代中崩溃.
This is repeatable and always crashes at the same iteration.
按照其他地方.有趣的是,以下略有不同的方法不会崩溃:
I'm creating a new engine and session after the fork as recommended by the SQLAlchemy documentation and elsewhere. Interestingly, the following slightly different approach does not crash:
import contextlib
import multiprocessing
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker
DB_URL = 'postgresql://user:password@localhost/database'
Base = declarative_base()
class Dummy(Base):
__tablename__ = 'dummies'
id = Column(Integer, primary_key=True)
value = Column(Integer)
@contextlib.contextmanager
def get_session():
engine = sqlalchemy.create_engine(DB_URL)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
yield session
finally:
session.close()
engine.dispose()
def target(id):
with get_session() as session:
dummy = session.query(Dummy).get(id)
dummy.value += 1
session.add(dummy)
session.commit()
def main():
with get_session() as session:
dummy = Dummy(value=1)
session.add(dummy)
session.commit()
p = multiprocessing.Process(target=target, args=(dummy.id,))
p.start()
p.join()
session.refresh(dummy)
assert dummy.value == 2
if __name__ == '__main__':
i = 1
while True:
print(i)
main()
i += 1
由于原始代码更复杂,不能简单地切换到后一个版本,所以我想了解为什么其中一个有效而另一个无效.
Since the original code is more complex and cannot simply be switched over to the latter version I'd like to understand why one of these works and the other doesn't.
唯一明显的区别是,崩溃的代码将全局变量用于引擎和会话-这些变量通过子进程的写时复制共享.但是,由于我是在分叉后立即将它们重置的,所以我不明白这可能是个问题.
The only obvious difference is that the crashing code uses global variables for the engine and the session -- these are shared via copy-on-write with the child processes. However, since I reset them directly after the fork I don't understand how that could be a problem.
我使用Python 2.7和Python 3.4重新运行了带有最新SQLAlchemy(1.1.5)的两个代码段.两种结果基本上如上所述.但是,在Python 2.7上,第一个代码段的崩溃现在发生在第13个迭代中(可重现),而在3.4上,它已经在第三次迭代中发生了(也可重现).第二个代码段在两个版本上都运行正常.这是从3.4开始的追溯:
I re-ran the two code pieces with the latest SQLAlchemy (1.1.5) using both Python 2.7 and Python 3.4. On both the results are basically as described above. However, on Python 2.7 the crash of the first code piece now happens in the 13th iteration (reproducibly) while on 3.4 it already happens in the third iteration (also reproducibly). The second code piece runs without problems on both versions. Here's the traceback from 3.4:
1
2
3
Traceback (most recent call last):
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
cursor.execute(statement, parameters)
psycopg2.OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "fork_test.py", line 64, in <module>
main()
File "fork_test.py", line 55, in main
session.refresh(dummy)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh
only_load_props=attribute_names) is None:
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
return q.one()
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one
ret = self.one_or_none()
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none
ret = list(self)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__
return self._execute_and_instances(context)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute
return meth(self, multiparams, params)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
exc_info
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
raise value.with_traceback(tb)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
context)
File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
[SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 3397}]
这是PostgreSQL日志(与2.7和3.4相同):
Here's the PostgreSQL log (it's the same for 2.7 and 3.4):
2017-01-18 10:59:36 UTC [22429-1] LOG: database system was shut down at 2017-01-18 10:59:35 UTC
2017-01-18 10:59:36 UTC [22429-2] LOG: MultiXact member wraparound protections are now enabled
2017-01-18 10:59:36 UTC [22428-1] LOG: database system is ready to accept connections
2017-01-18 10:59:36 UTC [22433-1] LOG: autovacuum launcher started
2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG: incomplete startup packet
2017-01-18 11:00:10 UTC [22466-1] user@db LOG: SSL error: decryption failed or bad record mac
2017-01-18 11:00:10 UTC [22466-2] user@db LOG: could not receive data from client: Connection reset by peer
(请注意,有关不完整的启动数据包的消息是无害的)
(Note that the message about the incomplete startup packet is harmless)
推荐答案
引用特别强调:
SQLAlchemy Engine对象引用现有数据库连接的连接池.因此,当将此对象复制到子进程时,目标是确保不保留任何数据库连接.
和
但是,对于共享活动交易的会话或连接的情况,没有自动解决方案;应用程序需要确保新的子进程仅启动新的Connection对象和事务以及ORM会话对象.
However, for the case of a transaction-active Session or Connection being shared, there’s no automatic fix for this; an application needs to ensure a new child process only initiate new Connection objects and transactions, as well as ORM Session objects.
此问题源于派生的子进程继承了实时全局session
,该全局全局生存在Connection
上.当target
调用init
时,它将覆盖对engine
和session
的全局引用,从而在子级中将它们的refcounts减少为0,从而迫使它们最终确定.例如,如果您以一种方式或另一种方式在子代中创建对继承的会话的另一个引用,则可以防止对该子代进行清理,但不要这样做. main
加入并照常营业后,它正在尝试使用现在可能已定稿的(否则不同步的)连接.至于为什么只有在反复迭代后才导致错误,我不确定.
The issue stems from the forked child process inheriting the live global session
, which is holding on to a Connection
. When target
calls init
, it overwrites the global references to engine
and session
, thus decreasing their refcounts to 0 in the child, forcing them to finalize. If you for example one way or another create another reference to the inherited session in the child, you prevent it from being cleaned up – but don't do that. After main
has joined and returns to business as usual it is trying to use the now potentially finalized – or otherwise out of sync – connection. As to why this causes an error only after some amount of iterations I'm not sure.
使用全局变量处理这种情况的唯一方法是
The only way to handle this situation using globals the way you do is to
- 关闭所有会话
- 致电
engine.dispose()
在分叉之前.这将防止连接泄漏到孩子.例如:
before forking. This will prevent connections from leaking to the child. For example:
def main():
global session
init()
try:
dummy = Dummy(value=1)
session.add(dummy)
session.commit()
dummy_id = dummy.id
# Return the Connection to the pool
session.close()
# Dispose of it!
engine.dispose()
# ...or call your cleanup() function, which does the same
p = multiprocessing.Process(target=target, args=(dummy_id,))
p.start()
p.join()
# Start a new session
session = Session()
dummy = session.query(Dummy).get(dummy_id)
assert dummy.value == 2
finally:
cleanup()
您的第二个示例不会在子级中触发终结处理,因此它似乎只起作用,尽管它可能与第一个示例一样坏,因为它仍继承会话的副本及其在
Your second example does not trigger finalization in the child, and so it only seems to work, though it might be as broken as the first, as it is still inheriting a copy of the session and its connection defined locally in main
.
这篇关于SQLAlchemy和多个进程的连接问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!