在同时执行的Celery任务之间共享Oracle数据库连接 [英] Sharing an Oracle database connection between simultaneous Celery tasks

查看:307
本文介绍了在同时执行的Celery任务之间共享Oracle数据库连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Python2.7,Celery和cx_Oracle来访问Oracle数据库。

I'm working with Python2.7, Celery and cx_Oracle to access the Oracle database.

我创建了很多任务。每个任务都通过cx_Oracle运行查询。许多这些任务将同时运行。所有任务应该共享相同的数据库连接。

I create a lot of tasks. Each task runs a query through cx_Oracle. Many of this tasks will run simultaneously. All tasks should share the same database connection.

如果我仅启动一个任务,查询将正确运行。但是,如果我启动几个查询,就会开始收到此错误消息:

If I only launch one task, the query gets run correctly. However, if I launch several queries, I start getting this error message:

[2016-04-04 17:12:43,846: ERROR/MainProcess] Task tasks.run_query[574a6e7f-f58e-4b74-bc84-af4555af97d6] raised unexpected: DatabaseError('<cx_Oracle._Error object at 0x7f9976635580>',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/ric/workspace/dbw_celery/tasks.py", line 39, in run_query
    column_names = get_column_names(oracle_conn, table_info["table_name"])
  File "/home/ric/workspace/dbw_celery/utilities.py", line 87, in get_column_names
    cursor.execute(query_str)
DatabaseError: <cx_Oracle._Error object at 0x7f9976635580>

现在让我们看看我的代码。

Let's look at my code now.

这是我的 tasks.py 文件,在其中创建Oracle数据库连接,Celery实例并定义我的用户将要说的数据库连接的任务:

This is my tasks.py file, where I create the Oracle database connection, Celery instance and define my tasks which will user said database connection:

# tasks.py
import celeryconfig
from celery import Celery
from utilities import connect_to_db, get_new_rows, write_output_rows

# Define a Celery instance
dbwapp = Celery('tasks')
dbwapp.config_from_object(celeryconfig)
dbwapp.conf["CELERYBEAT_SCHEDULE"] = {}

# Define an Oracle connection as a global variable to be used by all tasks
oracle_conn = connect_to_db(db_user, db_pass, db_host, db_port, db_name)

# Define the task function that each Celery worker will run
@dbwapp.task()
def run_query(table_info, output_description):
    """Run a query on a given table. Writes found rows to output file."""
    global oracle_conn

    column_names = get_column_names(oracle_conn, table_info["table_name"])

    new_rows, last_check_timestamp = get_new_rows(oracle_conn, table_info)

    write_result_to_output_file(output_file, new_rows)


def load_celerybeat_schedule():
    """Loads the CELERYBEAT_SCHEDULE dictionary with the tasks to run."""

    new_task_dict = {
        "task": "tasks.run_query",
        "schedule": timedelta(seconds=table_config["check_interval"]),
        "args": (table_config, output_description)
    }
    new_task_name = "task-" + table_config["table_name"]
    dbwapp.conf["CELERYBEAT_SCHEDULE"][new_task_name] = new_task_dict

这是我连接到 utilities.py 文件中的数据库的方式:

This is how I connect to the database in the utilities.py file:

# utilities.py
def connect_to_db(db_user, db_password, db_host, db_port, db_name):
    """Connect to DB."""
    connection_str = "%s/%s@%s:%s/%s" % (db_user, db_password, db_host, db_port, db_name)

    try:
        db_connection = cx_Oracle.connect(connection_str)
    except cx_Oracle.DatabaseError:
        logger.error("Couldn't connect to DB %s" % db_name)
        return None
    logging.info("Succesfully connected to the DB: %s" % db_name)

    return db_connection

这是另一个定义中的 get_new_rows_function 文件,实际在哪里运行查询:

This is the get_new_rows_function defined in another file, where the query gets actually run:

#utilities.py
def get_new_rows(db_connection, table_info):
    """Return new rows inserted in a given table since last check."""
    cursor = db_connection.cursor()
    query_str = "SELECT * FROM {0}".format(table_info["table_name"])
    cursor.execute(query_str)
    new_rows = cursor.fetchall()
    cursor.close()
    return new_rows

我这样运行代码: celery -A任务w orker -B

我试图简化代码以使其更易于理解。

I have tried to simplify my code in order to make it easier to understand.

恐怕我得到的错误是由同时运行并共享同一数据库连接的不同任务引起的。他们的同时执行变得混乱或类似的事情。

I am afraid that the error I'm getting is caused by different tasks running simultaneosuly and sharing the same database connection. Their simultaneous execution gets "mixed up" or something like that.

在不同的Celery任务之间共享数据库连接的正确方法是什么?

What is the correct way to share a database connection between different Celery tasks?

有人知道我在做什么吗?

Does anybody know what I'm doing wrong?

推荐答案

如果您希望有多个线程共享您需要启用线程模式的同一连接。像这样的东西:

If you wish to have multiple threads sharing the same connection you need to enable threaded mode. Something like this:

conn = cx_Oracle.connect(connection_str, threaded = True)

如果不这样做,可能会遇到一些有趣的问题!

If you don't you can run into some interesting problems!

这篇关于在同时执行的Celery任务之间共享Oracle数据库连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆