在气流dag中添加路径变量时,重复键值违反了唯一约束 [英] duplicate key value violates unique constraint when adding path variable in airflow dag

查看:103
本文介绍了在气流dag中添加路径变量时,重复键值违反了唯一约束的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

要设置气流中的连接和变量,我使用DAG,我们这样做是为了快速设置气流,以防万一必须再次快速设置所有内容。它确实起作用,但我的连接和变量显示出来,但是任务失败。错误是说已经有一个sql_path变量

To set up the connections and variables in airflow i use a DAG, we do this inorder to setup airflow fast in case we have to setup everything again fast. It does work my connections and variables show up but the task "fails". The error is saying that there is already an sql_path variable

[2018-03-30 19:42:48,784] {{models.py:1595}} ERROR - (psycopg2.IntegrityError) duplicate key value violates unique constraint "variable_key_key"
DETAIL:  Key (key)=(sql_path) already exists.
 [SQL: 'INSERT INTO variable (key, val, is_encrypted) VALUES (%(key)s, %(val)s, %(is_encrypted)s) RETURNING variable.id'] [parameters: {'key': 'sql_path', 'val': 'gAAAAABavpM46rWjISLZRRKu4hJRD7HFKMuXMpmJ5Z3DyhFbFOQ91cD9NsQsYyFof_pdPn116d6yNoNoOAqx_LRqMahjbYKUqrhNRiYru4juPv4JEGAv2d0=', 'is_encrypted': True}] (Background on this error at: http://sqlalche.me/e/gkpj)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 507, in do_execute
    cursor.execute(statement, parameters)
psycopg2.IntegrityError: duplicate key value violates unique constraint "variable_key_key"
DETAIL:  Key (key)=(sql_path) already exists.

但是我检查了一下,然后在运行DAG之前,即席查询 SELECT * FROM变量不返回任何内容,然后返回我的两个变量。

However I checked and before I run the DAG the addhoc query SELECT * FROM variable returns nothing and afterwards it returns my two variables.

我检查了是否没有两次创建变量,但我不认为所以。
在这里,您将看到dag创建路径变量的部分

I checked that I don't create the variable twice but I don't think so. Here you see the part of the dag creating the path variables

import airflow
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.settings import Session
import logging


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'provide_context': True
}


def init_staging_airflow():
    logging.info('Creating connections, pool and sql path')

    session = Session()

    new_var = models.Variable()
    new_var.key = "sql_path"
    new_var.set_val("/usr/local/airflow/sql")
    session.add(new_var)
    session.commit()

    new_var = models.Variable()
    new_var.key = "conf_path"
    new_var.set_val("/usr/local/airflow/conf")
    session.add(new_var)
    session.commit()

    session.add(new_pool)
    session.commit()

    session.close()

dag = airflow.DAG(
    'init_staging_airflow',
    schedule_interval="@once",
    default_args=args,
    max_active_runs=1)

t1 = PythonOperator(task_id='init_staging_airflow',
                    python_callable=init_staging_airflow,
                    provide_context=False,
                    dag=dag)


推荐答案

尝试在DAG中执行Variable.set()时遇到了相同的问题。我相信调度程序将不断轮询DagBag,以动态刷新任何更改。这就是为什么在运行Web服务器时会看到很多这样的原因:

I ran into the same problem when trying to do Variable.set() inside a DAG. I believe the scheduler will constantly poll the DagBag to refresh any changes dynamically. That's why you see a ton of these when running the webserver:

[2018-04-02 11:28:41,531] [45914] {models.py:168} INFO - Filling up the DagBag from /Users/jasontang/XXX/data-server/dags

迟早您会遇到关键约束:

Sooner or later you'll hit the key constraint:

什么我所做的就是将我需要在运行时设置的所有变量设置为全局字典(在下面的示例中为 VARIABLE_DICT),并且只允许我的所有DAG和子DAG对其进行访问。

What I did was to set all my variables that I need to set at runtime into a global dictionary ("VARIABLE_DICT" in the example below), and just allow all my DAGs and sub-DAGs access it.

def initialize(dag_run_obj):
    global VARIABLE_DICT
    if dag_run_obj.external_trigger:
        VARIABLE_DICT.update(dag_run_obj.conf)
        values = (dag_run_obj.conf['client'],
                  dag_run_obj.conf['vertical'],
                  dag_run_obj.conf['frequency'],
                  dag_run_obj.conf.get('snapshot'))
        config_file = '{0}-{1}/{0}-{1}-{2}.json'.format(*values)
        path = os.path.join(Variable.get('repo_root'), 'conf', config_file)
        VARIABLE_DICT.update(read_config(path))

您可以忽略dag_run_obj部分,因为我专门查找DAG Run创建时提供给它的任何其他配置值。在其他DAG和subDAG中,只需导入字典即可。

You could ignore the dag_run_obj part, since I specifically look for any additional configuration values provided to the DAG Run when it is created. In your other DAGs and subDAGs just import the dictionary.

这篇关于在气流dag中添加路径变量时,重复键值违反了唯一约束的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆