如何动态地迭代上游任务的输出以在气流中创建并行任务? [英] How to dynamically iterate over the output of an upstream task to create parallel tasks in airflow?

查看:70
本文介绍了如何动态地迭代上游任务的输出以在气流中创建并行任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请考虑以下DAG示例,其中第一个任务 get_id_creds 从数据库中提取凭据列表。此操作告诉我可以对数据库中的哪些用户进行进一步的数据预处理,并将这些ID写入文件 /tmp/ids.txt 。然后,我将这些ID扫描到DAG中,并使用它们生成可以并行运行的 upload_transaction 任务列表。

Consider the following example of a DAG where the first task, get_id_creds, extracts a list of credentials from a database. This operation tells me what users in my database I am able to run further data preprocessing on and it writes those ids to the file /tmp/ids.txt. I then scan those ids into my DAG and use them to generate a list of upload_transaction tasks that can be run in parallel.

我的问题是:有没有一种惯用的,正确的,动态的方式来使用气流进行此操作?我在这里感到笨拙和脆弱。如何直接将一个进程的有效ID列表传递给定义后续下游进程的列表?

My question is: Is there a more idiomatically correct, dynamic way to do this using airflow? What I have here feels clumsy and brittle. How can I directly pass a list of valid IDs from one process to that defines the subsequent downstream processes?

from datetime import datetime, timedelta
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'schedule_interval': None
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds, 
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in uids:
    upload_transactions = PythonOperator(
        task_id=uid,
        python_callable=dash_workers.upload_transactions,
        op_args=[uid],
        dag=DAG)
    upload_transactions.set_downstream(get_id_creds)


推荐答案

根据@Juan Riza的建议,我检查了以下链接: 在Airflow中创建动态工作流的正确方法。这几乎是答案,尽管我能够简化解决方案,以至于我想在这里提供自己的实现的修改版本:

Per @Juan Riza's suggestion I checked out this link: Proper way to create dynamic workflows in Airflow. This was pretty much the answer, although I was able to simplify the solution enough that I thought I would offer my own modified version of the implementation here:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

ENV = os.environ

default_args = {
  # 'start_date': datetime.now(),
  'start_date': datetime(2017, 7, 18)
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

clear_tables = PythonOperator(
  task_id='clear_tables',
  python_callable=dash_workers.clear_db,
  dag=DAG)

def id_worker(uid):
    return PythonOperator(
        task_id=uid,
        python_callable=dash_workers.main_preprocess,
        op_args=[uid],
        dag=DAG)

for uid in capone_dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)

clear_tables 清理数据库该过程将对其进行重建。 id_worker 是一个函数,该函数根据从 get_if_creds 返回的ID值数组动态生成新的预处理任务。任务ID只是相应的用户ID,尽管它很容易成为索引 i ,如上述示例所示。

clear_tables cleans the database that will be re-built as a result of the process. id_worker is a function that dynamically generates new preprocessing tasks, based on the array of ID values returned from get_if_creds. The task ID is just the corresponding user ID, though it could easily have been an index, i, as in the example mentioned above.

注意那个移位运算符(<< )在我看来,因为 clear_tables 任务应该排在第一位,但这在这种情况下似乎可以正常工作。

NOTE That bitshift operator (<<) looks backwards to me, as the clear_tables task should come first, but it's what seems to be working in this case.

这篇关于如何动态地迭代上游任务的输出以在气流中创建并行任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆