从芹菜执行插入时,mysql命令不同步 [英] mysql command out of sync when executing insert from celery

查看:98
本文介绍了从芹菜执行插入时,mysql命令不同步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用自定义数据库库和celery时,我遇到了令人恐惧的MySQL命令不同步的情况.

I am running in to the dreaded MySQL Commands out of Sync when using a custom DB library and celery.

该库如下:

import pymysql
import pymysql.cursors
from furl import furl

from flask import current_app

class LegacyDB:
    """Db

    Legacy Database connectivity library

    """

    def __init__(self,app):
        with app.app_context():
            self.rc = current_app.config['RAVEN']
            self.logger = current_app.logger
            self.data = {}
            # setup Mysql
            try:
                uri = furl(current_app.config['DBCX'])
                self.dbcx = pymysql.connect(
                    host=uri.host,
                    user=uri.username,
                    passwd=uri.password,
                    db=str(uri.path.segments[0]),
                    port=int(uri.port),
                    cursorclass=pymysql.cursors.DictCursor
                    )
            except:
                self.rc.captureException()

    def query(self, sql, params = None, TTL=36):
        # INPUT 1 : SQL query
        # INPUT 2 : Parameters
        # INPUT 3 : Time To Live
        # OUTPUT  : Array of result

        # check that we're still connected to the
        # database before we fire off the query
        try:
            db_cursor = self.dbcx.cursor()
            if params:
              self.logger.debug("%s : %s" % (sql, params))
              db_cursor.execute(sql,params)
              self.dbcx.commit()
            else:
              self.logger.debug("%s" % sql)
              db_cursor.execute(sql)
            self.data = db_cursor.fetchall()
            if self.data == None:
              self.data = {}
            db_cursor.close()
        except Exception as ex:
            if ex[0] == "2006":
                db_cursor.close()
                self.connect()
                db_cursor = self.dbcx.cursor()
                if params:
                  db_cursor.execute(sql,params)
                  self.dbcx.commit()
                else:
                  db_cursor.execute(sql)
                self.data = db_cursor.fetchall()
                db_cursor.close()
            else:
                self.rc.captureException()

        return self.data

该库的目的是与SQLAlchemy一起工作,而我将旧数据库模式从基于C ++的系统迁移到基于Python的系统.

The purpose of the library is to work alongside SQLAlchemy whilst I migrate a legacy database schema from a C++-based system to a Python based system.

所有配置都是通过Flask应用程序完成的,app.config ['DBCX']值与SQLAlchemy字符串("mysql://user:pass @ host @ host:port/dbname")的读取方式相同将来切换.

All configuration is done via a Flask application and the app.config['DBCX'] value reads the same as a SQLAlchemy String ("mysql://user:pass@host:port/dbname") allowing me to easily switch over in future.

我有许多任务通过celery运行"INSERT"语句,所有这些任务都利用此库.可以想象,运行Celery的主要原因是可以提高此应用程序的吞吐量,但是一段时间后(大约500条已处理的消息),我似乎遇到了我的库或应用程序中的线程问题请参阅日志中的以下内容:

I have a number of tasks that run "INSERT" statements via celery, all of which utilise this library. As you can imagine, the main reason for running Celery is so that I can increase throughput on this application, however I seem to be hitting an issue with the threading in my library or the application as after a while (around 500 processed messages) I see the following in the logs:

Stacktrace (most recent call last):

  File "legacy/legacydb.py", line 49, in query
    self.dbcx.commit()
  File "pymysql/connections.py", line 662, in commit
    self._read_ok_packet()
  File "pymysql/connections.py", line 643, in _read_ok_packet
    raise OperationalError(2014, "Command Out of Sync")

我显然在做一些错误以解决此错误,但是,似乎已启用/禁用MySQL自动提交,还是将我的connection.commit()调用放在何处都没关系.

I'm obviously doing something wrong to hit this error, however it doesn't seem to matter whether MySQL has autocommit enabled/disabled or where I place my connection.commit() call.

如果我忽略了connection.commit(),那么我什么也没插入到数据库中.

If I leave out the connection.commit() then I don't get anything inserted into the database.

我最近从mysqldb转移到pymysql,出现的次数似乎较少,但是考虑到这些都是简单的插入"命令而不是复杂的选择(此数据库上甚至没有任何外键约束!)我正在努力找出问题出在哪里.

I've recently moved from mysqldb to pymysql and the occurrences appear to be lower, however given that these are simple "insert" commands and not a complicated select (there aren't even any foreign key constraints on this database!) I'm struggling to work out where the issue is.

就目前而言,我无法使用executemany,因为我无法提前准备语句(我正在从"firehose"消息队列中提取数据并将其存储在本地以供以后处理).

As things stand at present, I am unable to use executemany as I cannot prepare the statements in advance (I am pulling data from a "firehose" message queue and storing it locally for later processing).

推荐答案

首先,请确保celery somethingamajig使用其自已的连接,因为

First of all, make sure that the celery thingamajig uses its own connection(s) since

>>> pymysql.threadsafety
1

这意味着:线程可以共享模块,但不能共享连接" .

这篇关于从芹菜执行插入时,mysql命令不同步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆