有没有办法在Airflow中创建动态工作流程 [英] Is there a way to create dynamic workflows in Airflow

查看:390
本文介绍了有没有办法在Airflow中创建动态工作流程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我有任务A,它将一些未知数量的文件复制到文件夹中。
任务B在文件夹中的每个文件上运行。由于文件不断变化,我无法事先知道文件数量。有没有办法使这种工作在气流中进行。

So I have task A which is copying some unkown number of files into a folder. Task B runs on each of those files in the folder. I have no way of knowing the number of files beforehand as they keep changing. Is there a way to make this work in airflow.

spans = os.listdir('/home/abc/tmpFolder')
counter = 0
for s in spans:
    src_path = '/home/abc/tmpFolder' + s
    dst_path = "tmp/" + s
    counter += 1
    run_this = \
        FileToGoogleCloudStorageOperator(
            task_id='gcp_task_' + str(counter),
            src=src_path,
            dst=dst_path,
            bucket='gcpBucket',
            google_cloud_storage_conn_id='gcp',
            mime_type='text/plain',
            dag=dag
        )
    dummy_operator_two.set_downstream(run_this)

我正在获取目录中所有文件的名称,然后为它们运行运算符,但是气流无法按照这种方式工作事先知道电话号码。

I am getting name of all the files in the directory and then running the operator for them, but airflow doesn't work that way as it needs to know the number beforehand.

推荐答案

在DagRun处于活动状态时,我不希望Airflow修改DAG,所以我不会赌钱获取文件然后将任务附加到同一DAG中。话虽如此,Airflow每隔几秒钟就会重新生成DAG。您可能有一个DAG可以获取文件,而另一个DAG可以处理这些文件。在获取文件之后,第一个DAG必须等待一分钟以确保注意到Airflow,然后使用 TriggerDagRunOperator 开始第二个DAG。

I don't expect Airflow to modify a DAG while DagRun is active, so I wouldn't bet money on getting files and then appending tasks in the same DAG. That being said, Airflow regenerates DAGs every few seconds. You could have one DAG that gets the files and another DAG that processes those files. After getting the files, first DAG would have to wait a minute to make sure Airflow noticed and then kick off the second DAG with a TriggerDagRunOperator.

DAG1:

def wait_a_minute():
    time.sleep(60)

get_files = DummyOperator(dag=dag, task_id='get_files')
give_airflow_time_to_rebuild_DAG2 = PythonOperator(dag=dag, task_id='give_airflow_time_to_rebuild_DAG2', python_callable=wait_a_minute)
trigger_DAG2 = TriggerDagRunOperator(dag=dag, task_id='trigger_DAG2', trigger_dag_id='DAG2', execution_date='{{ ds }}')

get_files >> give_airflow_time_to_rebuild_DAG2 >> trigger_DAG2

DAG2:

pre_process = DummyOperator(dag=dag, task_id='pre_process')
post_process = DummyOperator(dag=dag, task_id='post_process')

files = get_files_to_process()

for file in files:
    process = DummyOperator(dag=dag, task_id=f'process_{file}')
    pre_process >> process >> post_process

hack比解决方案还多,但这样的方法应该可行。但是,外部触发器和动态任务存在问题。当我不得不使用 depends_on_past = True 时,我通常会遇到调度程序问题。

More hack than a solution, but something like this should work. There are issues with external triggers and dynamic tasks though. I typically stumble into scheduler problems when I have to use depends_on_past=True.

这篇关于有没有办法在Airflow中创建动态工作流程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆