在气流中生成多个任务时,上游/下游关系相反 [英] Reversed upstream/downstream relationships when generating multiple tasks in Airflow

查看:224
本文介绍了在气流中生成多个任务时,上游/下游关系相反的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

与该问题相关的原始代码可以找到



我像这样配置DAG的主执行循环:

  for dash_workers.get_id_creds()中的uid:
clear_tables .set_upstream(id_worker(uid))

 用于dash_workers.get_id_creds()中的uid:
id_worker(uid)>> clear_tables

图形如下:





第二张图是我想要的/我希望前两段代码生成的内容根据我对文档的阅读。如果我想先执行 clear_tables ,然后为不同的用户ID触发我的数据解析任务批,我应该将此表示为 clear_tables>> id_worker(uid)



编辑-这是完整的代码,自从我上次发布后就更新了几个问题,供参考:

 从datetime导入datetime 
import os
import sys
从airflow.models导入
从airflow.operators.python_operator导入DAG
从airflow.operators.python_operator导入DAG
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
如果SCRIPT_PATH:
sys.path.insert(0,SCRIPT_PATH)
import dash_workers
其他:
print('在环境变量中定义DASH_PREPROC_PATH值')
sys.exit(1)

ENV = os.environ

default_args = {
'start_date':datetime.now(),
}

DAG = DAG(
dag_id ='dash_preproc',
default_args = default_args


clear_tables = PythonOperator(
task_id ='clear_tables',
python_callable = dash_workers.clear_db,
dag = DAG)

def id_wo rker(uid):
返回PythonOperator(
task_id = id,
python_callable = dash_workers.main_preprocess,
op_args = [uid],
dag = DAG)$ b dash_workers.get_id_creds()中uid的
$ b:
preproc_task = id_worker(uid)
clear_tables<<在实现@LadislavIndra的建议后,我继续对bitshift运算符执行相同的反向实现,以便执行以下操作:a.preproc_task

获取正确的依赖关系图。



更新 @ AshBerlin-Taylor的回答是这里发生的事情。我以为Graph View和Tree View在做相同的事情,但事实并非如此。这是 id_worker(uid)>> clear_tables 在图形视图中看起来像:





我当然不希望我的数据准备例程中的最后一步是删除所有数据表!

解决方案

Airflow中的树形视图倒向您(和我!)最初的想法。在您的第一个屏幕截图中,它表明 clear_tables必须在 AAAG5608078M2运行任务之前运行。 DAG状态取决于每个id工作者任务。因此,它是状态链的一棵树,而不是任务单。



(起初可能看起来很奇怪,但这是因为DAG可以分支然后再分支回去。)



如果您对dag的图形视图感兴趣,可能会更好。这有箭头,并以更直观的方式显示了执行顺序。 (尽管我现在确实发现树视图很有用。开始时不太清楚)


The original code related to this question can be found here.

I'm confused by up both bitshift operators and set_upstream/set_downstream methods are working within a task loop that I've defined in my DAG. When the main execution loop of the DAG is configured as follows:

for uid in dash_workers.get_id_creds():
    clear_tables.set_downstream(id_worker(uid))

or

for uid in dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)

The graph looks like this (the alpha-numeric sequence are the user IDs, which also define the task IDs):

when I configure the main execution loop of the DAG like this:

for uid in dash_workers.get_id_creds():
    clear_tables.set_upstream(id_worker(uid))

or

for uid in dash_workers.get_id_creds():
    id_worker(uid) >> clear_tables

the graph looks like this:

The second graph is what I want / what I would have expected the first two snippets of code to have produced based on my reading of the docs. If I want clear_tables to execute first before triggering my batch of data parsing tasks for different user IDs should I indicate this as clear_tables >> id_worker(uid)

EDIT -- Here's the complete code, which has been updated since I posted the last few questions, for reference:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

ENV = os.environ

default_args = {
  'start_date': datetime.now(),
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

clear_tables = PythonOperator(
  task_id='clear_tables',
  python_callable=dash_workers.clear_db,
  dag=DAG)

def id_worker(uid):
    return PythonOperator(
        task_id=id,
        python_callable=dash_workers.main_preprocess,
        op_args=[uid],
        dag=DAG)

for uid in dash_workers.get_id_creds():
    preproc_task = id_worker(uid)
    clear_tables << preproc_task

After implementing @LadislavIndra's suggestion I continue to have the same reversed implementation of the bitshift operator in order to get the correct dependency graph.

UPDATE @AshBerlin-Taylor's answer is what's going on here. I assumed that Graph View and Tree View were doing the same thing, but they're not. Here's what id_worker(uid) >> clear_tables looks like in graph view:

I certainly don't want the final step in my data pre-prep routine to be to delete all data tables!

解决方案

The tree view in Airflow is "backwards" to how you (and I!) first thought about it. In your first screenshot it is showing that "clear_tables" must be run before the "AAAG5608078M2" run task. And the DAG status depends upon each of the id worker tasks. So instead of a task order, it's a tree of the status chain. If that makes any sense at all.

(This might seem strange at first, but it's because a DAG can branch out and branch back in.)

You might have better luck looking at the Graph view for your dag. This one has arrows and shows the execution order in a more intuitive way. (Though I do now find the tree view useful. It's just less clear to start with)

这篇关于在气流中生成多个任务时,上游/下游关系相反的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆