如何使用boto3(或其他方式)在emr上自动执行pyspark作业? [英] How do you automate pyspark jobs on emr using boto3 (or otherwise)?

查看:156
本文介绍了如何使用boto3(或其他方式)在emr上自动执行pyspark作业?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在创建一个作业以解析大量服务器数据,然后将其上传到Redshift数据库中.

I am creating a job to parse massive amounts of server data, and then upload it into a Redshift database.

我的工作流程如下:

  • 从S3获取日志数据
  • 使用spark dataframes或spark sql解析数据并将其写回到S3
  • 将数据从S3上载到Redshift.
  • Grab the log data from S3
  • Either use spark dataframes or spark sql to parse the data and write back out to S3
  • Upload the data from S3 to Redshift.

不过,我对如何实现此自动化非常挂念,以便我的进程启动一个EMR集群,引导正确的程序进行安装,并运行我的python脚本,其中将包含用于解析和编写的代码.

I'm getting hung up on how to automate this though so that my process spins up an EMR cluster, bootstraps the correct programs for installation, and runs my python script that will contain the code for parsing and writing.

有人可以与我分享任何示例,教程或经验,以帮助我学习如何做吗?

Does anyone have any examples, tutorials, or experience they could share with me to help me learn how to do this?

推荐答案

看看boto3 EMR 文档来创建集群.您基本上必须调用 run_job_flow 并创建运行所需程序的步骤.

Take a look at boto3 EMR docs to create the cluster. You essentially have to call run_job_flow and create steps that runs the program you want.

import boto3    

client = boto3.client('emr', region_name='us-east-1')

S3_BUCKET = 'MyS3Bucket'
S3_KEY = 'spark/main.py'
S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY)

# upload file to an S3 bucket
s3 = boto3.resource('s3')
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY)

response = client.run_job_flow(
    Name="My Spark Cluster",
    ReleaseLabel='emr-4.6.0',
    Instances={
        'MasterInstanceType': 'm4.xlarge',
        'SlaveInstanceType': 'm4.xlarge',
        'InstanceCount': 4,
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    Applications=[
        {
            'Name': 'Spark'
        }
    ],
    BootstrapActions=[
        {
            'Name': 'Maximize Spark Default Config',
            'ScriptBootstrapAction': {
                'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
            }
        },
    ],
    Steps=[
    {
        'Name': 'Setup Debugging',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['state-pusher-script']
        }
    },
    {
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/']
        }
    },
    {
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit', '/home/hadoop/main.py']
        }
    }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole'
)

如果您知道作业流程ID,也可以向正在运行的集群添加步骤:

You can also add steps to a running cluster if you know the job flow id:

job_flow_id = response['JobFlowId']
print("Job flow ID:", job_flow_id)

step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps)

step_ids = step_response['StepIds']

print("Step IDs:", step_ids)

有关更多配置,请查看 sparksteps .

For more configurations, check out sparksteps.

这篇关于如何使用boto3(或其他方式)在emr上自动执行pyspark作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆