带有 SQL Server 后端和 pyodbc 的气流调度程序 [英] Airflow Scheduler with SQL Server backend and pyodbc

查看:54
本文介绍了带有 SQL Server 后端和 pyodbc 的气流调度程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已将 Airflow 设置为 SQL Server 作为后端 (SQL Azure).初始化数据库成功.我正在尝试每 2 分钟运行一次简单的 dag.

I have setup Airflow a SQL Server as backend (SQL Azure). Init DB is successful. I am trying to run a simple dag every 2 minutes.

dag 有 2 个任务:

The dag has 2 tasks:

  1. 打印日期
  2. 睡觉

当它启动气流调度器时,它会为两个任务创建任务实例,第一个成功&第二个似乎卡在运行"状态.

When it start the airflow scheduler, it creates tasks instances for both the tasks, the first one succeeds & the second one seems to be stuck in "running" state.

查看调度器日志,反复看到如下错误.

Looking at scheduler logs, I see the following error repeatedly.

[2019-01-04 11:38:48,253] {jobs.py:397} ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 389, in helper
    pickle_dags)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 1816, in process_file
    dag.sync_to_db()
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/models.py", line 4296, in sync_to_db
    DagModel).filter(DagModel.dag_id == self.dag_id).first()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2755, in first
    ret = list(self[0:1])
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2547, in __getitem__
    return list(res)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2855, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2876, in _execute_and_instances
    close_with_result=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2885, in _get_bind_args
    **kw
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2867, in _connection_from_session
    conn = self.session.connection(**kw)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1019, in connection
    execution_options=execution_options)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1024, in _connection_for_bind
    engine, execution_options)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 409, in _connection_for_bind
    conn = bind.contextual_connect()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2112, in contextual_connect
    self._wrap_pool_connect(self.pool.connect, None),
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2151, in _wrap_pool_connect
    e, dialect, self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1465, in _handle_dbapi_exception_noconnection
    exc_info
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2147, in _wrap_pool_connect
    return fn()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 387, in connect
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 768, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 516, in checkout
    rec = pool._do_get()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1140, in _do_get
    self._dec_overflow()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1137, in _do_get
    return self._create_connection()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 333, in _create_connection
    return _ConnectionRecord(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 461, in __init__
    self.__connect(first_connect_check=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 651, in __connect
    connection = pool._invoke_creator(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/strategies.py", line 105, in connect
    return dialect.connect(*cargs, **cparams)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 393, in connect
    return self.dbapi.connect(*cargs, **cparams)
InterfaceError: (pyodbc.InterfaceError) ('28000', u"[28000] [unixODBC][Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Login failed for user 'airflowuser'. (18456) (SQLDriverConnect)")

Airflow 配置为使用 LocalExecutor &pyodbc 连接到 SQL Azure

Airflow is configured to use LocalExecutor & pyodbc to connect to SQL Azure

    # The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
#executor = SequentialExecutor
executor = LocalExecutor


# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/sshuser/airflow/airflow.db
#connection string to MS SQL Serv er
sql_alchemy_conn = mssql+pyodbc://airflowuser@afdsqlserver76:<password>@afdsqlserver76.database.windows.net:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server


# The encoding for the databases
sql_engine_encoding = utf-8

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 10

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite. If the number of DB connections is ever exceeded,
# a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 180

# How many seconds to retry re-establishing a DB connection after
# disconnects. Setting this to 0 disables retries.
sql_alchemy_reconnect_timeout = 300

dag如下

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 4),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval='*/2 * * * *', max_active_runs=1, catchup=False)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

我有点不明白为什么调度程序在成功运行第一个任务后无法连接到数据库.非常感谢任何解决此问题的指针.

I am a bit lost on why scheduler would not be able to connect to DB after it has run the first task successfully. Any pointers to resolve this is much appreciated.

我有一个示例程序,它使用 sqlalchemy 使用相同的凭据连接到 SQL Azure这有效.

I have a sample program that uses sqlalchemy to connect to SQL Azure using the same credentials & this works.

import sqlalchemy

from sqlalchemy import create_engine

engine = create_engine("mssql+pyodbc://afdadmin@afdsqlserver76:<password>@afdsqlserver76.database.windows.net:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server")

connection = engine.connect()
result = connection.execute("select version_num from alembic_version")
for row in result:
    print("Version:", row['version_num'])
connection.close()

推荐答案

在odbcinst.ini中设置Pooling = True后问题解决

The issue was resolved after Pooling = True was set in odbcinst.ini

[ODBC]池化 = 是

[ODBC] Pooling = Yes

这篇关于带有 SQL Server 后端和 pyodbc 的气流调度程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆