如何在Airflow中使用执行日期创建路径? [英] How to create path using execution date in Airflow?

查看:117
本文介绍了如何在Airflow中使用执行日期创建路径?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下气流停滞期:

  start_task = DummyOperator(task_id ='start_task',dag = dag)

gcs_export_uri_template ='adstest / 2018/08/31 / *'
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag = dag,
task_id ='load_ads_to_BigQuery',
存储桶= GCS_BUCKET_ID,
destination_project_dataset_table = table_name_template,
source_format ='CSV',
source_objects = [gcs_export_uri_template],
schema_fields = dc(),
create_disposition ='CREATE_IF_NEEDED' ,
write_disposition ='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id = CONNECTION_ID,
bigquery_conn_id = CONNECTION_ID


start_task> > update_bigquery

此dag从 adstest / 2018/08/31 / *加载数据到BigQuery,效果很好。



我想修改Dag,使其根据执行日期根据日期运行

 执行日期
执行日期-1天
执行日期-2天

示例执行日期为 2018-09-02 我希望DAG转到:

 执行日期:adstest / 2018/09/02 / * 
执行日期-1天:adstest / 2018/09/01 / *
执行日期-2天:adstest / 2018/08/31 / *

我该怎么做?



编辑:
我更新的代码:

 对于范围(5、0,-1)中的i:
gcs_export_uri_template = ['' 'adstest / {{macros.ds_format(macros.ds_add(ds,-{0}),'%Y-%m-%d','%Y /%m /%d')}} / *''' .format(i)]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag = dag,
task_id ='load_ads_to_BigQuery-{}'。format(i),
bucket = GCS_BUCKET_ID,
destination_project_dataset_table = table_name_template,
source_format ='CSV',
source_objects = gcs_export_uri_template,
schema_fields = dc(),
create_disposition ='CREATE_IF_NEEDED',
write_disposition ='WRITE_APPEND',
skip_lead_rows = 1,
google_cloud_storage_conn_id = CONNECTION_ID,
bigquery_conn_id = CONNECTION_ID

start_task>> update_bigquery

编辑2:



我的代码:

 对于范围(5,0,-1)中的i:
gcs_export_uri_template = ['''adstest / {{macros.ds_format(macros.ds_add(ds,-params.i),'%Y-%m-%d','%Y /%m /%d')}}} / *'''。format(i)]

update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag = dag,
task_id ='load_ads_to_BigQuery-{}'。format(i),
bucket = GCS_BUCKET_ID,
destination_project_dataset_table = table_name_template,
source_format ='CSV',
source_objects = gcs_export_uri_template,
schema_fields = dc(),
params = {' i':i},
create_disposition ='CREATE_IF_NEEDED',
write_disposition ='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id = CONNECTION_ID,
bigquery_conn_id = CONNECTION_ID

模板:



代码给出此错误:

 源URI不得包含','字符:gs:// adstest / {macros.ds_format(macros.ds_add(ds,-params.i),'%Y-%m-%d', '%Y /%m /%d')} / *> 


解决方案

您可以使用






编辑评论



您需要传递值参数 i 中的循环变量 i 并将其在字符串中用作 params.i 如下:

  for i in range(5,0,-1):
gcs_export_uri_template = [ adstest / {{macros.ds_format(macros.ds_add(ds,-params.i),'%Y-%m-%d','%Y /%m /%d')}} / *]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag = dag,
task_id ='load_ads_to_BigQuery-{}'。format(i),
bucket = GCS_BUCKET_ID,
destination_project_dataset_table = table_name_template,
source_format ='CSV',
source_objects = gcs_export_uri_template,
schema_fields = dc(),
params = {'i':i},
create_disposition ='CREATE_IF_NEEDED',
write_disposition ='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id = CONNECTION_ID,
bigquery_conn_id = CONNECTION_ID

start_task ;> update_bigquery


I have the following Airflow dag:

    start_task = DummyOperator(task_id='start_task', dag=dag)

    gcs_export_uri_template = 'adstest/2018/08/31/*'
    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery',
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=[gcs_export_uri_template],
        schema_fields=dc(),
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows = 1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )

start_task >> update_bigquery

This dag load data from adstest/2018/08/31/* to BigQuery and it works great.

I want to modify the Dag to run over dates based on execution date:

Execution date
Execution date - 1 days
Execution date - 2 days

Example if Execution date is 2018-09-02 I want the DAG to go to:

Execution date : adstest/2018/09/02/*
Execution date - 1 days : adstest/2018/09/01/*
Execution date - 2 days : adstest/2018/08/31/*

How can I do that?

Edit: This is my updated code:

for i in range(5, 0, -1):
    gcs_export_uri_template = ['''adstest/{{ macros.ds_format(macros.ds_add(ds, -{0}), '%Y-%m-%d', '%Y/%m/%d') }}/*'''.format(i)]
    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery-{}'.format(i),
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=gcs_export_uri_template,
        schema_fields=dc(),
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows=1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )
    start_task >> update_bigquery

Edit 2:

My code:

for i in range(5, 0, -1):
    gcs_export_uri_template = ['''adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*'''.format(i)]

    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery-{}'.format(i),
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=gcs_export_uri_template,
        schema_fields=dc(),
        params={'i': i},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows=1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )

Template:

The code gives this error:

"Source URI must not contain the ',' character: gs://adstest/{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }/*">

解决方案

You can use Airflow Macros to achieve this as follows:

gcs_export_uri_template=[
    "adstest/{{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
    "adstest/{{ macros.ds_format(prev_ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
    "adstest/{{ macros.ds_format(macros.ds_add(ds, -2), '%Y-%m-%d', '%Y/%m/%d') }}/*"
]

update_bigquery = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_ads_to_BigQuery',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template,
    source_format='CSV',
    source_objects=gcs_export_uri_template,
    schema_fields=dc(),
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

When you run the above code, you can check in the Web UI, the rendered parameter:


For EDITED Comment:

You will need to pass the value of the loop variable i in params parameter and use it in the string as params.i as follows:

for i in range(5, 0, -1):
    gcs_export_uri_template = ["adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*"]
    update_bigquery = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_ads_to_BigQuery-{}'.format(i),
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='CSV',
        source_objects=gcs_export_uri_template,
        schema_fields=dc(),
        params={'i': i},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_APPEND',
        skip_leading_rows=1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID
    )
    start_task >> update_bigquery

这篇关于如何在Airflow中使用执行日期创建路径?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆