Celery 和 SQLAlchemy - 此结果对象不返回行.它已自动关闭 [英] Celery and SQLAlchemy - This result object does not return rows. It has been closed automatically

查看:58
本文介绍了Celery 和 SQLAlchemy - 此结果对象不返回行.它已自动关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个连接到 MySQL 数据库的 celery 项目.其中一张表的定义如下:

I have a celery project connected to a MySQL databases. One of the tables is defined like this:

class MyQueues(Base):
    __tablename__ = 'accepted_queues'

    id = sa.Column(sa.Integer, primary_key=True)
    customer = sa.Column(sa.String(length=50), nullable=False)
    accepted = sa.Column(sa.Boolean, default=True, nullable=False)
    denied = sa.Column(sa.Boolean, default=True, nullable=False)

此外,在我的设置中

THREADS = 4

我被困在 code.py 中的一个函数中:

And I am stuck in a function in code.py:

def load_accepted_queues(session, mode=None):

    #make query  
    pool = session.query(MyQueues.customer, MyQueues.accepted, MyQueues.denied)

    #filter conditions    
    if (mode == 'XXX'):
        pool = pool.filter_by(accepted=1)
    elif (mode == 'YYY'):
        pool = pool.filter_by(denied=1)
    elif (mode is None):
        pool = pool.filter(
            sa.or_(MyQueues.accepted == 1, MyQueues.denied == 1)
            )

   #generate a dictionary with data
   for i in pool: #<---------- line 90 in the error
        l.update({i.customer: {'customer': i.customer, 'accepted': i.accepted, 'denied': i.denied}})

运行时出现错误:

[20130626 115343] Traceback (most recent call last):
  File "/home/me/code/processing/helpers.py", line 129, in wrapper
    ret_value = func(session, *args, **kwargs)
  File "/home/me/code/processing/test.py", line 90, in load_accepted_queues
    for i in pool: #generate a dictionary with data
  File "/home/me/envs/me/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2341, in instances
    fetch = cursor.fetchall()
  File "/home/me/envs/me/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3205, in fetchall
    l = self.process_rows(self._fetchall_impl())
  File "/home/me/envs/me/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3174, in _fetchall_impl
    self._non_result()
  File "/home/me/envs/me/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3179, in _non_result
    "This result object does not return rows. "
ResourceClosedError: This result object does not return rows. It has been closed automatically

所以主要是部分

ResourceClosedError: This result object does not return rows. It has been closed automatically

有时也会出现此错误:

DBAPIError: (Error) (, AssertionError('未请求结果长度长度: 预期=1.实际=0.位置:21.数据长度:21',))'选择accepted_queues.customer ASaccepted_queues_customer,Accepted_queues.accepted 作为accepted_queues_accepted,accepted_queues.denied ASaccepted_queues_denied FROMAccepted_queues WHERE Accepted_queues.accepted = %s 或Accepted_queues.denied = %s' (1, 1)

DBAPIError: (Error) (, AssertionError('Result length not requested length: Expected=1. Actual=0. Position: 21. Data Length: 21',)) 'SELECT accepted_queues.customer AS accepted_queues_customer, accepted_queues.accepted AS accepted_queues_accepted, accepted_queues.denied AS accepted_queues_denied FROM accepted_queues WHERE accepted_queues.accepted = %s OR accepted_queues.denied = %s' (1, 1)

我无法正确重现错误,因为它通常在处理大量数据时发生.我尝试将 THREADS = 4 更改为 1 并且错误消失了.无论如何,这不是解决方案,因为我需要将线程数保持在 4 上.

I cannot reproduce the errror properly as it normally happens when processing a lot of data. I tried to change THREADS = 4 to 1 and errors disappeared. Anyway, it is not a solution as I need the number of threads to be kept on 4.

另外,我对使用的必要性感到困惑

Also, I am confused about the need to use

for i in pool: #<---------- line 90 in the error

for i in pool.all(): #<---------- line 90 in the error

并且找不到合适的解释.

and could not find a proper explanation of it.

一起:有什么建议可以跳过这些困难吗?

All together: any advise to skip these difficulties?

推荐答案

一起:有什么建议可以跳过这些困难吗?

All together: any advise to skip these difficulties?

是的.您绝对不能在多个线程中同时使用会话(或与该会话关联的任何对象)或连接,尤其是对于 DBAPI 连接非常线程不安全的 MySQL-Python*.您必须组织您的应用程序,以便每个线程处理它自己的专用 MySQL-Python 连接(以及与该会话关联的 SQLAlchemy 连接/会话/对象),而不会泄漏到任何其他线程.

yes. you absolutely cannot use a Session (or any objects which are associated with that Session), or a Connection, in more than one thread simultaneously, especially with MySQL-Python whose DBAPI connections are very thread-unsafe*. You must organize your application such that each thread deals with it's own, dedicated MySQL-Python connection (and therefore SQLAlchemy Connection/ Session / objects associated with that Session) with no leakage to any other thread.

  • 或者,您可以使用互斥锁将会话/连接/DBAPI 连接的访问​​权限限制为一次只访问其中一个线程,尽管这种情况不太常见,因为所需的高度锁定往往会破坏首先是为了使用多线程.

这篇关于Celery 和 SQLAlchemy - 此结果对象不返回行.它已自动关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆