如何在单元测试中测试气流 dag? [英] How to test airflow dag in unittest?

查看:122
本文介绍了如何在单元测试中测试气流 dag?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在测试环境中测试具有多个任务的 dag.我能够测试与 dag 关联的单个任务,但我想在第一个任务的 dag 和 kick 中创建多个任务.为了测试我正在使用的 dag 中的一项任务

I am trying to test a dag with more than one task in the test environment. I was able to test single task associated with the dag but I want to create several tasks in dag and kick of the first task. For testing one task in a dag I am using

task1.run()

正在执行.但是,当我在 dag 的下游一个接一个地执行许多任务时,同样不起作用.

which is getting executed. But, the same is not working when I have many tasks one after another in downstream of a dag.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

t2.set_upstream(t1)

t1.run() # It is executing just first task.

为了运行第二个任务,我必须使用 t2.run() 运行,因为我在设计 DAG 时不想要它.如何实现这一目标?

In order to run second task I have to run using t2.run() which I don't want as I am designing a DAG. How to achieve this?

推荐答案

我还不确定我是否完全理解您的问题,但我会尝试开始回答.

I'm not totally sure I understand your question yet, but I'll take a stab at starting an answer.

如果您的目标只是手动运行 DAG 或其任务的子集,您可以通过 CLI 实现这一点,例如:

If your goal is to just run the DAG or a subsets of its tasks manually, you can achieve this from the CLI, such as:

  • $airflow run ... - 运行一个任务实例
  • $airflow test ... - 测试任务实例而不检查依赖项或在数据库中记录状态
  • $airflow trigger_dag ... - 触发 DAG 的特定 DAG 运行
  • $ airflow run ... - run a task instance
  • $ airflow test ... - test a task instance without checking dependencies or recording state in database
  • $ airflow trigger_dag ... - trigger a specific DAG run of a DAG

CLI 文档 -https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html

我认为airflow run 命令与您的用例最相关.

I think the airflow run command is the one most relevant to your use case.

在运行时,在 DAG 中调度任务并在满足需求后运行下游依赖项都由执行程序自动处理.您不需要在代码中的任何地方调用 run().

At runtime, scheduling tasks in the DAGs and running downstream dependencies once their requirements are met is all handled automatically by the executor. You shouldn't need to call run() anywhere in your code.

至于run方法本身,代码还在:

As far as the run method itself, the code is still there:

问题

  1. 当您说在测试环境中"测试 DAG 时你是什​​么意思?喜欢在 CI 中还是在单元测试中?
  2. 此代码是用于测试还是来自您的实际 DAG 之一?
  3. 这是否与您最近的其他问题有关 测试 Dag 运行用于单元测试中的 Airflow 1.9?

这篇关于如何在单元测试中测试气流 dag?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆