将运算符融合在一起 [英] Fusing operators together
问题描述
我仍在部署 Airflow
的过程中,我已经感到有必要合并 操作员
在一起。最常见的用例是耦合运算符和相应的传感器
。例如,可能想将 EmrStepOperator
和 EmrStepSensor
链接在一起。
I'm still in the process of deploying Airflow
and I've already felt the need to merge operator
s together. The most common use-case would be coupling an operator and the corresponding sensor
. For instance, one might want to chain together the EmrStepOperator
and EmrStepSensor
.
我正在创建 DAG
s 以编程方式,其中最大的一个包含150多个(相同)分支,每个都对不同的数据位(表)执行相同的一系列操作。因此,在我的DAG中将组成一个逻辑步骤的任务组合在一起会很有帮助。
I'm creating my DAG
s programmatically, and the biggest one of those contains 150+ (identical) branches, each performing the same series of operations on different bits of data (tables). Therefore clubbing together tasks that make-up a single logical step in my DAG would be of great help.
以下是我的两个有争议的示例
Here are 2 contending examples from my project to give motivation for my argument.
1。从S3路径中删除数据,然后写入新数据
此步骤包括2个运算符
-
DeleteS3PathOperator
:从BaseOperator
和使用S3Hook
-
HadoopDistcpOperator
:从SSHOperator扩展
DeleteS3PathOperator
: Extends fromBaseOperator
& usesS3Hook
HadoopDistcpOperator
: Extends fromSSHOperator
2。在 Hive
表上有条件地执行 MSCK修复
2. Conditionally performing MSCK REPAIR
on Hive
table
此步骤包含4个运算符
-
BranchPythonOperator
:检查Hive表是否为已分区 -
MsckRepairOperator
:从HiveOperator
扩展并对( partmented )表执行MSCK修复 -
Dummy(Branch)Operator
:组成备用分支路径到MsckRepairOperator
(用于未分区表) -
Dummy(Join)运算符
:组成两个分支的 join步骤
BranchPythonOperator
: Checks whether Hive table is partitionedMsckRepairOperator
: Extends fromHiveOperator
and performs MSCK REPAIR on (partioned) tableDummy(Branch)Operator
: Makes up alternate branching path toMsckRepairOperator
(for non-partitioned tables)Dummy(Join)Operator
: Makes up the join step for both branches
在 isolation 中使用运算符当然可以提供较小的模块和更多的细粒度日志记录/调试,但是在大型DAG中,减少混乱可能是可取的。根据我目前的理解,有两种方法可以将运营商链接在一起
Using operators in isolation certainly offers smaller modules and more fine-grained logging / debugging, but in large DAGs, reducing the clutter might be desirable. From my current understanding there are 2 ways to chain operators together
-
Hook
s
在钩子中编写实际的处理逻辑,然后在单个运算符中使用任意数量的钩子(当然,更好的方法是意见)
Write actual processing logic in hooks and then use as many hooks as you want within a single operator (Certainly the better way in my opinion)
SubDagOperator
A 风险和有争议的方式;此外,SubDagOperator的命名约定使我皱眉。
A risky and controversial way of doing things; additionally the naming convention for SubDagOperator makes me frown.
我的问题是
My questions are
- 应该将运算符设置为完全由组成还是采用离散步骤更好?
- 任何陷阱,上述方法都有改进吗?
- 将操作员组合在一起的任何其他方式?
- 在Airflow的分类中,这是Hooks的主要动机
- Should operators be composed at all or is it better to have discrete steps?
- Any pitfalls, improvements in above approaches?
- Any other ways to combine operators together?
- In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?
UPDATE-1
3。
3. Multiple Inhteritance
这是 Python
功能,而不是 Airflow
具体而言,值得指出的是多重继承可以在组合运算符的功能时派上用场。 QuboleCheckOperator $ c $例如,c>
已经使用它编写了。但是,在过去,我尝试用这种方法来融合 EmrCreateJobFlowOperator
和 EmrJobFlowSensor
,但是当时我遇到了问题,其中包含 @apply_defaults
装饰器,并放弃了这个想法。
While this is a Python
feature rather than Airflow
specific, its worthwhile to point out that multiple inheritance can come handy in combining functionalities of operators. QuboleCheckOperator
, for instance, is already written using that. However in the past, I've tried this thing to fuse EmrCreateJobFlowOperator
and EmrJobFlowSensor
, but at the time I had run into issues with @apply_defaults
decorator and had abandoned the idea.
推荐答案
我根据自己的需求组合了各种钩子来创建一个Single运算符。一个简单的例子是,我将gcs的delete,copy,list方法和get_size方法合并在一起,以创建一个名为 GcsDataValidationOperator
的单个运算符。经验法则是具有幂等,即,如果您多次运行,它将产生相同的结果。
I have combined various hooks to create a Single operator based on my needs. A simple example is I clubbed gcs delete, copy, list method and get_size methods in hook to create a single operator called GcsDataValidationOperator
. A rule of thumb would be to have Idempotency i.e. if you run multiple times it should produce the same result.
应该完全组成运算符,还是最好使用离散的
步骤?
Should operators be composed at all or is it better to have discrete steps?
唯一的陷阱是可维护性,有时当master分支中的钩子发生更改时,如果有任何重大更改,则需要手动更新所有操作员。
The only pitfall is maintainability, sometimes when the hooks change in the master branch, you will need to update all your operator manually if there are any breaking changes.
任何陷阱,上述方法都有改进吗?
Any pitfalls, improvements in above approaches?
您可以使用 PythonOperator
并通过 .execute
方法使用内置的钩子,但这仍然意味着很多细节在DAG文件中。因此,我仍然会寻求一种新的运算符方法
You can use PythonOperator
and use the in-built hooks with .execute
method, but it would still mean a lot of details in the DAG file. Hence, I would still go for a new operator approach
还有其他将运算符组合在一起的方法吗?
Any other ways to combine operators together?
钩子只是到外部平台和数据库(如Hive,GCS等)的接口,并为操作员形成了构建块。这样可以创建新的运算符。另外,这意味着您可以自定义模板字段,在新操作员内部的每个细化步骤上添加松弛通知,并拥有自己的日志记录详细信息。
Hooks are just interfaces to external platforms and databases like Hive, GCS, etc and form building blocks for operators. This allows the creation of new operators. Also, this mean you can customize templated field, add slack notification on each granular step inside your new operator and have your own logging details.
在Airflow的分类法中,Hooks的主要动机是否与上述相同,或者它们还有其他用途吗?
In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?
FWIW:我是PMC成员和Airflow项目的贡献者。
FWIW: I am the PMC member and a contributor of the Airflow project.
这篇关于将运算符融合在一起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!