如何在Airflow中使用执行日期创建路径? [英] How to create path using execution date in Airflow?
问题描述
我有以下气流停滞期:
start_task = DummyOperator(task_id ='start_task',dag = dag)
gcs_export_uri_template ='adstest / 2018/08/31 / *'
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag = dag,
task_id ='load_ads_to_BigQuery',
存储桶= GCS_BUCKET_ID,
destination_project_dataset_table = table_name_template,
source_format ='CSV',
source_objects = [gcs_export_uri_template],
schema_fields = dc(),
create_disposition ='CREATE_IF_NEEDED' ,
write_disposition ='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id = CONNECTION_ID,
bigquery_conn_id = CONNECTION_ID
)
start_task> > update_bigquery
此dag从 adstest / 2018/08/31 / *加载数据
到BigQuery,效果很好。
我想修改Dag,使其根据执行日期根据日期运行
:
执行日期
执行日期-1天
执行日期-2天
示例执行日期为 2018-09-02
我希望DAG转到:
执行日期:adstest / 2018/09/02 / *
执行日期-1天:adstest / 2018/09/01 / *
执行日期-2天:adstest / 2018/08/31 / *
我该怎么做?
编辑:
我更新的代码:
对于范围(5、0,-1)中的i:
gcs_export_uri_template = ['' 'adstest / {{macros.ds_format(macros.ds_add(ds,-{0}),'%Y-%m-%d','%Y /%m /%d')}} / *''' .format(i)]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag = dag,
task_id ='load_ads_to_BigQuery-{}'。format(i),
bucket = GCS_BUCKET_ID,
destination_project_dataset_table = table_name_template,
source_format ='CSV',
source_objects = gcs_export_uri_template,
schema_fields = dc(),
create_disposition ='CREATE_IF_NEEDED',
write_disposition ='WRITE_APPEND',
skip_lead_rows = 1,
google_cloud_storage_conn_id = CONNECTION_ID,
bigquery_conn_id = CONNECTION_ID
)
start_task>> update_bigquery
编辑2:
我的代码:
对于范围(5,0,-1)中的i:
gcs_export_uri_template = ['''adstest / {{macros.ds_format(macros.ds_add(ds,-params.i),'%Y-%m-%d','%Y /%m /%d')}}} / *'''。format(i)]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag = dag,
task_id ='load_ads_to_BigQuery-{}'。format(i),
bucket = GCS_BUCKET_ID,
destination_project_dataset_table = table_name_template,
source_format ='CSV',
source_objects = gcs_export_uri_template,
schema_fields = dc(),
params = {' i':i},
create_disposition ='CREATE_IF_NEEDED',
write_disposition ='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id = CONNECTION_ID,
bigquery_conn_id = CONNECTION_ID
)
模板:
代码给出此错误:
源URI不得包含','字符:gs:// adstest / {macros.ds_format(macros.ds_add(ds,-params.i),'%Y-%m-%d', '%Y /%m /%d')} / *>
您可以使用
编辑评论:
您需要传递值参数 i
中的循环变量 i
并将其在字符串中用作 params.i
如下:
for i in range(5,0,-1):
gcs_export_uri_template = [ adstest / {{macros.ds_format(macros.ds_add(ds,-params.i),'%Y-%m-%d','%Y /%m /%d')}} / *]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag = dag,
task_id ='load_ads_to_BigQuery-{}'。format(i),
bucket = GCS_BUCKET_ID,
destination_project_dataset_table = table_name_template,
source_format ='CSV',
source_objects = gcs_export_uri_template,
schema_fields = dc(),
params = {'i':i},
create_disposition ='CREATE_IF_NEEDED',
write_disposition ='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id = CONNECTION_ID,
bigquery_conn_id = CONNECTION_ID
)
start_task ;> update_bigquery
I have the following Airflow dag:
start_task = DummyOperator(task_id='start_task', dag=dag)
gcs_export_uri_template = 'adstest/2018/08/31/*'
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=[gcs_export_uri_template],
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
start_task >> update_bigquery
This dag load data from adstest/2018/08/31/*
to BigQuery and it works great.
I want to modify the Dag to run over dates based on execution date
:
Execution date
Execution date - 1 days
Execution date - 2 days
Example if Execution date is 2018-09-02
I want the DAG to go to:
Execution date : adstest/2018/09/02/*
Execution date - 1 days : adstest/2018/09/01/*
Execution date - 2 days : adstest/2018/08/31/*
How can I do that?
Edit: This is my updated code:
for i in range(5, 0, -1):
gcs_export_uri_template = ['''adstest/{{ macros.ds_format(macros.ds_add(ds, -{0}), '%Y-%m-%d', '%Y/%m/%d') }}/*'''.format(i)]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery-{}'.format(i),
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows=1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
start_task >> update_bigquery
Edit 2:
My code:
for i in range(5, 0, -1):
gcs_export_uri_template = ['''adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*'''.format(i)]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery-{}'.format(i),
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
params={'i': i},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows=1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
Template:
The code gives this error:
"Source URI must not contain the ',' character: gs://adstest/{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }/*">
You can use Airflow Macros to achieve this as follows:
gcs_export_uri_template=[
"adstest/{{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
"adstest/{{ macros.ds_format(prev_ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
"adstest/{{ macros.ds_format(macros.ds_add(ds, -2), '%Y-%m-%d', '%Y/%m/%d') }}/*"
]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
When you run the above code, you can check in the Web UI, the rendered parameter:
For EDITED Comment:
You will need to pass the value of the loop variable i
in params
parameter and use it in the string as params.i
as follows:
for i in range(5, 0, -1):
gcs_export_uri_template = ["adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*"]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery-{}'.format(i),
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
params={'i': i},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows=1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
start_task >> update_bigquery
这篇关于如何在Airflow中使用执行日期创建路径?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!