适用于AWS EMR自动集群创建和pyspark部署的Airflow / Luigi [英] Airflow/Luigi for AWS EMR automatic cluster creation and pyspark deployment

查看:127
本文介绍了适用于AWS EMR自动集群创建和pyspark部署的Airflow / Luigi的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是气流自动化的新手,如果可以使用apache airflow(或luigi等)进行此操作,或者我只是制作一个长bash文件来执行此操作,我现在就不这样做。

I am new to airflow automation, i dont now if it is possible to do this with apache airflow(or luigi etc) or should i just make a long bash file to do this.

我要为此构建数据


  1. 在AWS EMR上创建/克隆集群

  2. 安装python要求

  3. 安装pyspark相关库

  4. 从github获取最新代码

  5. 提交火花作业

  6. 在完成时终止群集

  1. Create/clone a cluster on AWS EMR
  2. Install python requirements
  3. Install pyspark related libararies
  4. Get latest code from github
  5. Submit spark job
  6. Terminate cluster on finish

对于单个步骤,我可以制作.sh文件像下面这样(不确定这样做是否合适),但是不知道如何在气流中做到这一点

for individual steps, i can make .sh files like below(not sure if it is good to do this or not) but dont know how to do it in airflow

1)用 cluster.sh

 aws emr create-cluster \
    --name "1-node dummy cluster" \
    --instance-type m3.xlarge \
    --release-label emr-4.1.0 \
    --instance-count 1 \
    --use-default-roles \
    --applications Name=Spark \
    --auto-terminate

2& 3& 4)克隆git并安装要求 codesetup.sh

2 & 3 & 4) clone git and install requirements codesetup.sh

git clone some-repo.git
pip install -r requirements.txt
mv xyz.jar /usr/lib/spark/xyz.jar

5)运行火花作业 sparkjob.sh

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

6)不知道,可能是这样

6) Not sure, may be this

  terminate-clusters
--cluster-ids <value> [<value>...]

最后,所有这些都可以作为一个.sh文件执行。我需要知道使用气流/路易吉接头的好方法。

Finally this all can be executed as one .sh file. I need to know the good approach to this with airflow/luigi.

我发现了什么:

我发现该帖子收尾但过时(2016),错过了剧本的连接和代码

I find this post to be close but its outdated(2016) and misses the connections and code for playbooks

https://www.agari.com/email-security-blog/automated-model-building-emr-spark -airflow /

推荐答案

我发现,可以有两种选择方法

I figured out that, There can be two option to do this

1),我们可以借助emr create-cluster 和<$ c创建bash脚本$ c> addstep ,然后使用气流 Bashoperator 安排时间

1) we can make a bash script with the help of emr create-cluster and addstep and then use airflow Bashoperator to schedule it

被这两个包裹,称为 sparksteps

他们的文档中的示例

sparksteps examples/episodes.py \
  --s3-bucket $AWS_S3_BUCKET \
  --aws-region us-east-1 \
  --release-label emr-4.7.0 \
  --uploads examples/lib examples/episodes.avro \
  --submit-args="--deploy-mode client --jars /home/hadoop/lib/spark-avro_2.10-2.0.2-custom.jar" \
  --app-args="--input /home/hadoop/episodes.avro" \
  --tags Application="Spark Steps" \
  --debug

您可以创建 .sh脚本,并选择默认选项。准备好此脚本后,您可以从气流bashoperator调用此脚本,如下所示

you can make a .sh script with default option of your choice. After preparing this script you can call this from airflow bashoperator as below

create_command = "sparkstep_custom.sh "    

t1 = BashOperator(
        task_id= 'create_file',
        bash_command=create_command,
        dag=dag
   )

2)您可以使用airflow自己的操作员来执行此操作。

2) You can use airflow's own operators for aws to do this.

EmrCreateJobFlowOperator (用于启动集群) EmrAddStepsOperator (用于提交Spark作业)
EmrStepSensor (跟踪步骤何时完成)
EmrTerminateJobFlowOperator (步骤完成时终止集群)

EmrCreateJobFlowOperator (for launching cluster) EmrAddStepsOperator(for submitting spark job) EmrStepSensor (to track when step finishes) EmrTerminateJobFlowOperator (to terminate clluster when step finishes)

创建集群并提交步骤的基本示例

Basic example to create cluster and submit step

my_step=[

    {
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI + 'test.py', '/home/hadoop/']
        }
    },
{
        'Name': 'setup - copy files 3',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI + 'myfiledependecy.py', '/home/hadoop/']
        }
    },
 {
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit','--jars', "jar1.jar,jar2.jar", '--py-files','/home/hadoop/myfiledependecy.py','/home/hadoop/test.py']
        }
    }
    ]


cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow2',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=my_steps,
    dag=dag
)
step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('pre_step', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

也可以将代码上传到s3 (我很好奇可以从github_获取最新代码)与 s3 boto3 Pythonoperator

Also, to upload code to s3 (where i was curious to get latest code from github_ it can be done with s3, boto3 and Pythonoperator

简单示例

S3_BUCKET = 'you_bucket_name'
S3_URI = 's3://{bucket}/'.format(bucket=S3_BUCKET)
def upload_file_to_S3(filename, key, bucket_name):
    s3.Bucket(bucket_name).upload_file(filename, key)

upload_to_S3_task = PythonOperator(
    task_id='upload_to_S3',
    python_callable=upload_file_to_S3,
    op_kwargs={
        'filename': configdata['project_path']+'test.py',
        'key': 'test.py',
        'bucket_name': 'dep-buck',
    },
    dag=dag)

这篇关于适用于AWS EMR自动集群创建和pyspark部署的Airflow / Luigi的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆