将运算符融合在一起 [英] Fusing operators together

查看:203
本文介绍了将运算符融合在一起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我仍在部署 Airflow 的过程中,我已经感到有必要合并 操作员在一起。最常见的用例是耦合运算符和相应的传感器。例如,可能想将 EmrStepOperator EmrStepSensor 链接在一起。

I'm still in the process of deploying Airflow and I've already felt the need to merge operators together. The most common use-case would be coupling an operator and the corresponding sensor. For instance, one might want to chain together the EmrStepOperator and EmrStepSensor.

我正在创建 DAG s 以编程方式,其中最大的一个包含150多个(相同)分支,每个都对不同的数据位(表)执行相同的一系列操作。因此,在我的DAG中将组成一个逻辑步骤的任务组合在一起会很有帮助。

I'm creating my DAGs programmatically, and the biggest one of those contains 150+ (identical) branches, each performing the same series of operations on different bits of data (tables). Therefore clubbing together tasks that make-up a single logical step in my DAG would be of great help.

以下是我的两个有争议的示例

Here are 2 contending examples from my project to give motivation for my argument.

1。从S3路径中删除数据,然后写入新数据

此步骤包括2个运算符


  • DeleteS3PathOperator :从 BaseOperator 和使用 S3Hook

  • HadoopDistcpOperator :从 SSHOperator扩展

  • DeleteS3PathOperator: Extends from BaseOperator & uses S3Hook
  • HadoopDistcpOperator: Extends from SSHOperator

2。在 Hive 表上有条件地执行 MSCK修复

2. Conditionally performing MSCK REPAIR on Hive table

此步骤包含4个运算符


  • BranchPythonOperator :检查Hive表是否为已分区

  • MsckRepairOperator :从 HiveOperator 扩展并对( partmented )表执行MSCK修复

  • Dummy(Branch)Operator :组成备用分支路径 MsckRepairOperator (用于未分区表)

  • Dummy(Join)运算符:组成两个分支的 join步骤

  • BranchPythonOperator: Checks whether Hive table is partitioned
  • MsckRepairOperator: Extends from HiveOperator and performs MSCK REPAIR on (partioned) table
  • Dummy(Branch)Operator: Makes up alternate branching path to MsckRepairOperator (for non-partitioned tables)
  • Dummy(Join)Operator: Makes up the join step for both branches

isolation 中使用运算符当然可以提供较小的模块和更多的细粒度日志记录/调试,但是在大型DAG中,减少混乱可能是可取的。根据我目前的理解,有两种方法可以将运营商链接在一起

Using operators in isolation certainly offers smaller modules and more fine-grained logging / debugging, but in large DAGs, reducing the clutter might be desirable. From my current understanding there are 2 ways to chain operators together


  1. Hook s

在钩子中编写实际的处理逻辑,然后在单个运算符中使用任意数量的钩子(当然,更好的方法是意见)

Write actual processing logic in hooks and then use as many hooks as you want within a single operator (Certainly the better way in my opinion)

SubDagOperator

A 风险有争议的方式;此外,SubDagOperator的命名约定使我皱眉

A risky and controversial way of doing things; additionally the naming convention for SubDagOperator makes me frown.






我的问题是


My questions are


  • 应该将运算符设置为完全由组成还是采用离散步骤更好?

  • 任何陷阱,上述方法都有改进吗?

  • 将操作员组合在一起的任何其他方式?

  • 在Airflow的分类中,这是Hooks的主要动机

  • Should operators be composed at all or is it better to have discrete steps?
  • Any pitfalls, improvements in above approaches?
  • Any other ways to combine operators together?
  • In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?

UPDATE-1

3。

3. Multiple Inhteritance

这是 Python 功能,而不是 Airflow 具体而言,值得指出的是多重继承可以在组合运算符的功能时派上用场。 QuboleCheckOperator 已经使用它编写了。但是,在过去,我尝试用这种方法来融合 EmrCreateJobFlowOperator EmrJobFlowSensor ,但是当时我遇到了问题,其中包含 @apply_defaults 装饰器,并放弃了这个想法。

While this is a Python feature rather than Airflow specific, its worthwhile to point out that multiple inheritance can come handy in combining functionalities of operators. QuboleCheckOperator, for instance, is already written using that. However in the past, I've tried this thing to fuse EmrCreateJobFlowOperator and EmrJobFlowSensor, but at the time I had run into issues with @apply_defaults decorator and had abandoned the idea.

推荐答案

我根据自己的需求组合了各种钩子来创建一个Single运算符。一个简单的例子是,我将gcs的delete,copy,list方法和get_size方法合并在一起,以创建一个名为 GcsDataValidationOperator 的单个运算符。经验法则是具有幂等,即,如果您多次运行,它将产生相同的结果。

I have combined various hooks to create a Single operator based on my needs. A simple example is I clubbed gcs delete, copy, list method and get_size methods in hook to create a single operator called GcsDataValidationOperator. A rule of thumb would be to have Idempotency i.e. if you run multiple times it should produce the same result.


应该完全组成运算符,还是最好使用离散的
步骤?

Should operators be composed at all or is it better to have discrete steps?

唯一的陷阱是可维护性,有时当master分支中的钩子发生更改时,如果有任何重大更改,则需要手动更新所有操作员。

The only pitfall is maintainability, sometimes when the hooks change in the master branch, you will need to update all your operator manually if there are any breaking changes.


任何陷阱,上述方法都有改进吗?

Any pitfalls, improvements in above approaches?

您可以使用 PythonOperator 并通过 .execute 方法使用内置的钩子,但这仍然意味着很多细节在DAG文件中。因此,我仍然会寻求一种新的运算符方法

You can use PythonOperator and use the in-built hooks with .execute method, but it would still mean a lot of details in the DAG file. Hence, I would still go for a new operator approach


还有其他将运算符组合在一起的方法吗?

Any other ways to combine operators together?

钩子只是到外部平台和数据库(如Hive,GCS等)的接口,并为操作员形成了构建块。这样可以创建新的运算符。另外,这意味着您可以自定义模板字段,在新操作员内部的每个细化步骤上添加松弛通知,并拥有自己的日志记录详细信息。

Hooks are just interfaces to external platforms and databases like Hive, GCS, etc and form building blocks for operators. This allows the creation of new operators. Also, this mean you can customize templated field, add slack notification on each granular step inside your new operator and have your own logging details.


在Airflow的分类法中,Hooks的主要动机是否与上述相同,或者它们还有其他用途吗?

In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?

FWIW:我是PMC成员和Airflow项目的贡献者。

FWIW: I am the PMC member and a contributor of the Airflow project.

这篇关于将运算符融合在一起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆