基于bigquery响应在气流中(在作曲家中)创建动态任务 [英] Creating dynamic tasks in airflow (in composer) based on bigquery response

查看:114
本文介绍了基于bigquery响应在气流中(在作曲家中)创建动态任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个气流DAG,该DAG根据服务器的响应生成任务。

I am trying to create a airflow DAG which generates task depending on the response from server.

这是我的方法:

从bigquery中获取表的列表->遍历列表并创建任务

getlist of tables from bigquery -> loop through the list and create tasks

这是我的最新代码,我已经尝试了所有可能在堆栈中找到的代码溢出。似乎没有任何作用。我在做什么错了?

This is my latest code and I have tried all possible code found in stack overflow. Nothing seems to work. What am I doing wrong?

with models.DAG(dag_id="xt", default_args=default_args, schedule_interval="0 1 * * *", catchup=True) as dag:
tables = get_tables_from_bq()

    bridge = DummyOperator(
        task_id='bridge',
        dag=dag
    )


    for t in tables:
        sql = ("SELECT * FROM `{project}.{dataset}.{table}` LIMIT 5;".format(
                project=project, dataset=dataset, table=t))

    materialize_t = BigQueryOperator(bql=sql,
                                     destination_dataset_table=dataset+'.' + table_prefix + t,
                                     task_id = 'x_' + t,
                                     bigquery_conn_id = 'bigquery_default',
                                     use_legacy_sql = False,
                                     write_disposition = 'WRITE_APPEND',
                                     create_disposition = 'CREATE_IF_NEEDED',
                                     query_params = {},
                                     allow_large_results = True,
                                     dag = dag)

bridge >> materialize_t

即使运行选项未与此代码一起显示。我尝试了多种代码,最终到达了这里,但还是没有运气。有帮助吗?

Even the run option is not showing with this code. I tried multiple codes and finally reached here but still no luck. Any help???

推荐答案

我不知道它是否是DAG复制和粘贴中的错字,但 tables = get_tables_from_bq()应该在<$ c之前$ c> with models.DAG(...)另外, bridge>> materialize_t 似乎缺少缩进,因此在 with models.DAG(...)范围之外。附带说明,您不需要桥接任务。

I don't know if it is a typo in the copy and paste of the DAG but tables = get_tables_from_bq() should be before with models.DAG(...) Also, bridge >> materialize_t seems to miss indentation and therefore be outside the with models.DAG(...) scope. On a side note, you do not need the bridge task.

这篇关于基于bigquery响应在气流中(在作曲家中)创建动态任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆