使用 Airflow dag run 创建 EMR 集群,任务完成后,EMR 将终止 [英] EMR Cluster Creation using Airflow dag run, Once task is done EMR will be terminated
问题描述
我有 Airflow 作业,它们在 EMR 集群上运行良好.我需要的是,假设我有 4 个需要 EMR 集群的气流作业,假设需要 20 分钟才能完成任务.为什么我们不能在 DAG 运行时创建 EMR 集群,一旦作业完成,它将终止创建的 EMR 集群.
I have Airflow jobs, which are running fine on the EMR cluster. what I need is, let's say if I have a 4 airflow jobs which required an EMR cluster for let's say 20 min to complete the task. why not we can create an EMR cluster at DAG run time and once the job is to finish it will terminate the created an EMR cluster.
推荐答案
当然,这将是最有效地利用资源.让我警告你:这里面有很多细节;我会尽量列出尽可能多的内容.我鼓励您添加自己的综合答案,列出您遇到的任何问题和解决方法(一旦您解决了这个问题)
Absolutely, that would be the most efficient use of resources. Let me warn you: there are a lot of details in this; I'll try to list as many as would get you going. I encourage you to add your own comprehensive answer listing any problems that you encountered and the workaround (once you are through this)
关于集群创建/终止
对于集群的创建和终止,你有
EmrCreateJobFlowOperator
和EmrTerminateJobFlowOperator
分别
如果您不使用 AWS
SecretAccessKey
(并且完全依赖于 IAM
角色);在 Airflow
中实例化任何与 AWS
相关的 hook
或 operator
将 自动回退到基础IAM代码>角色
Don't fret if you do not use AWS
SecretAccessKey
(and rely wholly on IAM
Roles); instantiating any AWS
-related hook
or operator
in Airflow
will automatically fall-back to underlying EC2
's attached IAM
Role
如果您没有使用 EMR-Steps API 进行作业提交,那么您还必须手动感知上述两种操作:传感器
.已经有一个用于轮询创建阶段的传感器,名为 EmrJobFlowSensor
你可以稍微修改它以创建一个用于终止的传感器
If your'e NOT using the EMR-Steps API for job-submission, then you'll also have to manually sense both the above operations using Sensors
. There's already a sensor for polling creation phase called EmrJobFlowSensor
and you can modify it slightly to create a sensor for termination too
您在 job_flow_extra
.您还可以在 Connection
中传递配置(例如my_emr_conn
) extra
param,但不要使用它,因为它经常破坏 SQLAlchemy
ORM 加载(因为它是一个很大的 json
)
You pass your cluster-config JSON in job_flow_extra
. You can also pass configs in a Connection
's (like my_emr_conn
) extra
param, but refrain from it because it often breaks SQLAlchemy
ORM loading (since its a big json
)
关于工作提交
您可以使用 EMR-Steps API 向
Emr
提交作业,这可以在集群创建阶段(在集群配置 JSON 内)或之后使用add_job_flow_steps()
一>.甚至还有一个emr_add_steps_operator()
在Airflow
中也需要EmrStepSensor
.您可以在AWS 中阅读更多相关信息
docs 并且您可能还必须使用command-runner.jar
一个>
You either submit jobs to
Emr
using EMR-Steps API, which can be done either during cluster creation phase (within the Cluster-Configs JSON) or afterwards usingadd_job_flow_steps()
. There's even anemr_add_steps_operator()
inAirflow
which also requires anEmrStepSensor
. You can read more about it inAWS
docs and you might also have to usecommand-runner.jar
对于特定于应用程序的情况(如Hive
、Livy
),您可以使用它们的特定方式.例如,您可以使用 HiveServer2Hook
提交一个 Hive
作业.这是一个棘手的部分:run_job_flow()
调用(在集群创建阶段进行)只给你一个 job_flow_id
(集群 ID).您必须使用 describe_cluster()
调用 使用 EmrHook
获取主节点的私有IP.使用它,您将能够以编程方式创建一个Connection
(例如Hive Server 2 Thrift
连接) 并将其用于将计算提交到集群.并且不要忘记在完成工作流程之前删除这些连接(为了优雅).
For application-specific cases (like Hive
, Livy
), you can use their specific ways. For instance you can use HiveServer2Hook
to submit a Hive
job. Here's a tricky part: The run_job_flow()
call (made during cluster-creation phase) only gives you a job_flow_id
(cluster-id). You'll have to use a describe_cluster()
call using EmrHook
to obtain the private-IP of the master node. Using this you will then be able to programmatically create a Connection
(such as Hive Server 2 Thrift
connection) and use it for submitting your computations to cluster. And don't forget to delete those connections (for elegance) before completing your workflow.
最后是用于与集群交互的老式 bash.为此,您还应该传递一个 EC2
集群创建阶段的密钥对.之后,您可以以编程方式创建一个 SSH
连接 并使用它(使用 SSHHook
或 SSHOperator
) 用于在集群上运行作业.在 Airflow
此处
Finally there's the good-old bash for interacting with cluster. For this you should also pass an EC2
key pair during cluster creation phase. Afterwards, you can programmatically create an SSH
connection and use it (with an SSHHook
or SSHOperator
) for running jobs on your cluster. Read more about SSH-stuff in Airflow
here
特别是对于提交Spark
作业到远程Emr
集群,阅读本次讨论
Particularly for submitting Spark
jobs to remote Emr
cluster, read this discussion
这篇关于使用 Airflow dag run 创建 EMR 集群,任务完成后,EMR 将终止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!