从 Heroku 上运行的 Celery 任务连接时出现 MySQL 查询错误 [英] MySQL query errors when connecting from Celery task running on Heroku

查看:62
本文介绍了从 Heroku 上运行的 Celery 任务连接时出现 MySQL 查询错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在对外部 MySQL 数据库执行查询时,我看到错误的查询结果,但仅当从 Heroku 上运行的 Celery 任务连接时才看到.同样的任务,在我自己的机器上运行时没有出现这些错误,而且错误只出现了大约一半的时间(虽然当它们失败时,所有任务都是错误的).

I'm seeing wrong query results when executing queries against an external MySQL database, but only when connecting from Celery tasks running on Heroku. The same tasks, when run on my own machine do not show these errors, and the errors only appear about half of the time (although when they fail, all tasks are wrong).

任务由 Celery 通过 Redis 管理,MySQL 数据库本身不在 Heroku 上运行.我的本地机器和 Heroku 都连接到同一个 MySQL 数据库.

The tasks are managed by Celery via Redis, and the MySQL database does not itself run on Heroku. Both my local machine and Heroku connect to the same MySQL database.

我使用MySQL连接数据库,使用pymysql驱动,使用;

I connect to the database using MySQL, with the pymysql driver, using;

DB_URI = 'mysql+pymysql://USER:PW@SERVER/DB'

engine = create_engine(stats_config.DB_URI, convert_unicode=True, echo_pool=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

任务一一执行.

以下是具有不同结果的任务示例:

Here is an example of a task with different results:

@shared_task(bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):

    db_session.close()
    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if gross_rev_trans_VK is None:
        gross_rev_trans_VK = 0

    if gross_rev_trans_Stripe is None:
        gross_rev_trans_Stripe = 0

    if gross_rev_trans is None:
        gross_rev_trans = 0

    print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans)

    total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans

    return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()}

# Selects gross revenue between selected dates
@app.route('/get-gross-revenue', methods=["POST"])
@basic_auth.required
@check_verified
def get_gross_revenue():
    if request.method == "POST":
        task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}

这些是简单而快速的任务,在几秒钟内完成.

These are simple and fast tasks, completing within a few seconds.

任务因产生微小差异而失败.例如,对于正确结果为 30111 的任务,当事情中断时,该任务将产生 29811.总是使用`db

The tasks fail by producing small differences. For example, for a task where the correct result would by 30111, when things break the task would produce 29811 instead. It is always the code that uses `db

  • 我已经通过执行以下操作使用了相同的时区:

  • I am already using the same timezone by executing:

db_session.execute("SET SESSION time_zone = 'Europe/Berlin'")

  • 我检查了工作日志中的错误.虽然有一些条目像

  • I checked for errors in the worker logs. Although there are some entries like

    2013 Lost connection to MySQL
    
    sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically
    
    2014 commands out of sync
    

    我没有发现 SQL 错误和错误结果之间的相关性.错误的任务结果可能会在没有丢失连接的情况下出现.

    I haven't found a correlation between SQL errors and wrong results. The wrong tasks results can appear without a lost connection.

    一个非常脏的解决方法是对其中一个任务的预期结果进行硬编码,首先执行该结果,然后在产生的结果不正确时重新提交所有内容.

    A very dirty fix is to hard-code an expected result for one of the tasks, execute that first and then re-submit everything if the result produced is incorrect.

    这可能是我使用 SQLAlchemy 会话的方式的缓存或隔离级别问题.因为我只需要使用 SELECT(没有插入或更新),所以在运行任务之前,我还尝试了不同的隔离级别设置,例如

    This is probably a cache or isolation level problem with the way I use the SQLAlchemy session. Because I only ever need to use SELECT (no inserts or updates), I also tried different settings for the isolation level, before running tasks, such as

    #db_session.close()
    #db_session.commit()
    #db_session.execute('SET TRANSACTION READ ONLY')
    

    当我在 Heroku 上运行它们时,它们会显示一个错误,但是当我在我的 Windows 机器上运行它们时它们可以工作.

    These show an error when I run these on Heroku, but they work when I run them on my Windows machine.

    我还尝试使用 'isolation_level="READ UNCOMMITTED' 更改连接本身,但没有任何结果.

    I also tried to alter the connection itself with 'isolation_level="READ UNCOMMITTED', without any result.

    我确信工作人员不会重复使用相同的 db_session.

    I am certain that the workers are not reusing the same db_session.

    似乎只有在查询中使用 db_session 的任务才能返回错误的结果.使用 Base 基类(db_session.query_property() 对象,例如 Users.query>) 似乎没有问题.我认为这基本上是同一件事?

    It seems that only tasks which use db_session in the query can return wrong results. Code using the query attribute on the Base base class (a db_session.query_property() object, e.g. Users.query) does not appear to having issues. I thought this was basically the same thing?

    推荐答案

    您正在不同工作人员的任务之间重复使用会话.创建您的会话每个 Celery 工作人员,甚至每个任务.

    You are re-using sessions between tasks in different workers. Create your session per Celery worker, or even per task.

    知道任务实际上是按工人持久化的.您可以使用它来缓存每个任务的会话,因此您不必在每次运行任务时重新创建会话.使用 自定义任务类;该文档以数据库连接缓存为例.

    Know that tasks are actually persisted per worker. You can use this to cache a session for each task, so you don't have to recreate the session each time the task is run. This is easiest done with a custom task class; the documentation uses database connection caching as an example there.

    要使用 SQLAlchemy 会话执行此操作,请使用:

    To do this with a SQLAlchemy session, use:

    Session = scoped_session(sessionmaker(autocommit=True, autoflush=True))
    
    class SQLASessionTask(Task):
        _session = None
    
        @property
        def session(self):
            if self._session is None:
                engine = create_engine(
                    stats_config.DB_URI, convert_unicode=True, echo_pool=True) 
                self._session = Session(bind=engine)
            return self._session
    

    将其用作:

    @shared_task(base=SQLASessionTask, bind=True, name="get_gross_revenue_task")
    def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):
        db_session = self.session
        # ... etc.
    

    这只会在当前任务需要时为当前任务创建一个 SQLAlchemy 会话,即您访问 self.session 的那一刻.

    This only creates a SQLAlchemy session for the current task only if it needs one, the moment you access self.session.

    这篇关于从 Heroku 上运行的 Celery 任务连接时出现 MySQL 查询错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆