根据数据库连接上可用的行动态创建DAG [英] Dynamically Creating DAG based on Row available on DB Connection

查看:196
本文介绍了根据数据库连接上可用的行动态创建DAG的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从数据库表查询创建动态创建的DAG。当我尝试从准确数字范围或基于气流设置中的可用对象创建动态创建的DAG时,它成功了。但是,当我尝试使用PostgresHook并为表的每一行创建DAG时,只要在表中添加新行,我都会看到生成的新DAG。但是事实证明,我无法在气流Web服务器ui上单击新创建的DAG。有关更多背景信息,我正在使用Google Cloud Composer。我已经按照 DAG在Google Cloud Composer网络服务器上不可单击,但在本地Airflow上可以正常工作

I want to create a dynamically created DAG from database table query. When I'm trying to create a dynamically creating DAG from both of range of exact number or based on available object in airflow settings it's succeeded. However when I'm trying to use a PostgresHook and create a DAG for each of row of my table, I can see a new DAG generated whenever I add a new row in my table. However it turned out that I can't click the newly created DAG on my airflow web server ui. For more context I'm using Google Cloud Composer. I already following the steps mentioned in DAGs not clickable on Google Cloud Composer webserver, but working fine on a local Airflow. However it's still not working for my case.

这是我的代码

from datetime import datetime, timedelta

from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os

default_args = {
  "owner": "debug",
  "depends_on_past": False,
  "start_date": datetime(2018, 10, 17),
  "email": ["airflow@airflow.com"],
  "email_on_failure": False,
  "email_on_retry": False,
  "retries": 1,
  "retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}


def create_dag(dag_id,
           schedule,
           default_args):
def hello_world_py(*args):
    print 'Hello from DAG: {}'.format(dag_id)

dag = DAG(dag_id,
          schedule_interval=timedelta(days=1),
          default_args=default_args)

with dag:
    t1 = PythonOperator(
        task_id=dag_id,
        python_callable=hello_world_py,
        dag_id=dag_id)

return dag


dag = DAG("dynamic_yolo_pg_", default_args=default_args,     
        schedule_interval=timedelta(hours=1))

"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
  dag_id = command.id
  schedule = timedelta(days=1)

  id = "dynamic_yolo_" + str(dag_id)

  print id

  globals()[id] = create_dag(id,
                           schedule,
                           default_args)

最好,

推荐答案

可以使用[1]中提到的步骤,使用自我管理的Airflow Web服务器来解决此问题。完成此操作后,如果您决定在自我管理的Web服务器之前添加身份验证,则在创建入口后,您的BackendServices应该会出现在Google IAP控制台上,并且您可以启用IAP。如果您想以编程方式访问气流,还可以使用JWT身份验证通过服务帐户来管理自己的气流Web服务器[2]。

This is can be solved using self-managed Airflow Webserver using steps mentioned in [1]. After you do this, if you decide to add authentication in front of your self-managed webserver, once you created the ingress, your BackendServices should appear on the Google IAP console and you can enable the IAP. In case you want to access your airflow programmatically you also can use JWT authentication using service account for your self-managed Airflow Webserver [2].

[1] https://cloud.google.com/composer/docs/how-to / managing / deploy-webserver

[2] https://cloud.google.com/iap/docs/authentication-howto

这篇关于根据数据库连接上可用的行动态创建DAG的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆