生成uuid并在Airflow DAG中使用它 [英] Generating uuid and use it across Airflow DAG

查看:104
本文介绍了生成uuid并在Airflow DAG中使用它的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建具有以下2个任务的动态气流:
任务1:创建带有生成的UUID作为其名称
一部分的文件任务2:对这些文件运行检查

I'm trying to create a dynamic airflow that has the following 2 tasks: Task 1: Creates files with a generated UUID as part of their name Task 2: Runs a check on those files

因此,我定义了一个变量'FILE_UUID'并将其设置为:str(uuid.uuid4())。并且还创建了一个常量文件名:
MY_FILE ='{file_uuid} _file.csv'.format(file_uuid = FILE_UUID}

So I define a variable 'FILE_UUID' and sets it as follow: str(uuid.uuid4()). And also created a constant file name: MY_FILE = '{file_uuid}_file.csv'.format(file_uuid=FILE_UUID}

然后-任务1是一个bashOperator将MY_FILE作为命令的一部分,并成功创建了一个文件
我可以看到生成的文件名称中包含一个特定的UUID

Then - Task 1 is a bashOperator that get MY_FILE as part of the command, and it creates a file successfully. I can see the generated files include a specific UUID in the name,

任务2失败的原因是PythonOperator将MY_FILE作为op_args获取,但无法访问该文件。日志显示它试图访问具有不同UUID的文件。

TASK 2 fails is a PythonOperator that get MY_FILE as an op_args. But can't access the file. Logs show that it tries to access files with a different UUID.

为什么是我的常数在每个任务上单独运行?有什么方法可以防止这种情况发生?

Why is my "constant" is being run separately on every task? Is there any way to prevent that from happening?

我正在使用Airflow 1.10,我的执行者是LocalExecutor。

I'm using Airflow 1.10, my executor is LocalExecutor.

我尝试在 with DAG之外并在其中设置常量,也尝试使用宏,但是PythonOperator只是使用它们持有的值按字面意义使用宏字符串。

I tried setting the constant outside the "with DAG" and inside it, also tries working with macros, but then PythonOperator just uses the macro strings literally using the values they hold.

推荐答案

您必须谨记DAG定义文件是一种图形脚本,而不是运行DAG的实际可执行文件。这些任务是在完全不同的环境中执行的,大多数情况下甚至不在同一台计算机上执行。可以将其想象为一种配置XML,它可以设置您的任务,然后在云中的其他计算机上构建并执行它们-但是它是Python而不是XML。

You have to keep in mind that the DAG definition file is a sort of "configuration script", not an actual executable to run your DAGs. The tasks are executed in completely different environments, most of the times not even on the same machine. Think of it like a configuration XML which sets up your tasks, and then they are built and executed on some other machine in the cloud - but it's Python instead of XML.

总而言之-您的DAG代码是Python,但不是在任务运行时执行的代码。因此,如果您在此处生成一个随机的uuid,它将在未知时间多次评估-针对每个任务,在不同的机器上。

In conclusion - your DAG code is Python, but it's not the one being executed in the runtime of your tasks. So if you generate a random uuid there, it will get evaluated at an unknown time and multiple times - for each task, on different machines.

要在所有任务中保持一致,您需要找到另一种方法,例如:

To have it consistent across tasks you need to find another way, for example:


  • 使用XCOM,以便第一个任务使用它获取的uuid,然后将其写入XCOM,以供所有下游任务使用。

  • 使用在整个管道中固定的常量来固定uuid,源,日期或其他任何内容(例如,如果是日常任务,则可以从日期部分中混合一些dag /任务细节来构建uuid,等等)-使所有任务的uuid相同但对于唯一性而言唯一的uuid天)

使用第一种方法(XCOM)的DAG示例:

Example DAG using the first method (XCOM's):

from datetime import datetime
import uuid

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

with DAG(dag_id='global_uuid',
         schedule_interval='@daily',
         start_date=...) as dag:

    generate_uuid = PythonOperator(
        task_id='generate_uuid',
        python_callable=lambda: str(uuid.uuid4())
    )

    print_uuid1 = BashOperator(
        task_id='print1',
        bash_command='echo {{ task_instance.xcom_pull("generate_uuid") }}'
    )

    print_uuid2 = BashOperator(
        task_id='print2',
        bash_command='echo {{ task_instance.xcom_pull("generate_uuid") }}'
    )

    generate_uuid >> print_uuid1 >> print_uuid2

这篇关于生成uuid并在Airflow DAG中使用它的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆