在以编程方式将作业发送到dataproc之前创建集群 [英] Creating a cluster before sending a job to dataproc programmatically

查看:104
本文介绍了在以编程方式将作业发送到dataproc之前创建集群的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图安排PySpark工作。我遵循GCP文档,最终将一个小的python脚本部署到App Engine,它执行以下操作:


  • 使用服务帐户进行身份验证 li>
  • 将作业提交给群集



问题是,我需要群集启动否则工作不会被发送(但是!),但我不希望集群始终处于运行状态,尤其是因为我的工作需要每月运行一次。



我想在我的python脚本中添加集群的创建,但是调用是异步的(它发出HTTP请求),因此我的作业在集群创建调用之后但在集群真正启动并运行之前提交。



我该怎么办?



感谢



感谢

p>

编辑:以下是我的代码到目前为止的样子:



启动作业

  class EnqueueTaskHandler(webapp2.RequestHandler):
def get(self):
task = taskqueue.add(
url ='/ run',
target ='worker')

self.response.write(
'Task {} enqueued,ETA {}。'。format。 name,task.eta))

app = webapp2.WSGIApplication([('/ launch',EnqueueTaskHandler)],debug = True)










codelass cronEventHandler(webapp2.RequestHandler):

def create_cluster(self,dataproc,project,zone,region,cluster_name):
zone_uri ='https://www.googleapis.com/compute/v1/projects/{}/zones / {}'.format(project,zone)
cluster_data = {...}

dataproc.projects()。regions()。clusters()。create(
projectId = project,
region = region,
body = cluster_data).execute()

def wait_for_cluster(self,dataproc,project,region,clustername):
print('Waiting cluster to run ...')
while True:
result = dataproc.projects()。regions()。clusters()。get(
projectId = project,
region = region,
clusterName = clustername).execute()
#Handle例外
如果结果['status'] ['state']!='RUNNING':
time.sleep(60)
其他:
返回结果

def wait_for_job(self,dataproc,project,region,job_id):
print('等待作业完成...')
,而True:
result = dataproc.projects ().regions()。jobs()。get(
projectId = project,
region = region,
jobId = job_id).execute()
#处理异常
print(result ['status'] ['state'])
if result ['status'] ['state'] =='ERROR'或结果['status'] ['state'] =='DONE':
返回结果
else:
time .sleep(60)
$ b $ def submit_job(self,dataproc,project,region,clusterName):
job = {...}
result = dataproc.projects()。地区()。jobs()。submit(projectId = project,region = region,body = job).execute()
返回结果['reference'] ['jobId']


def post(self):
dataproc = googleapiclient.discovery.build('dataproc','v1')

project ='...'
region =...
zone =...
clusterName ='...'

self.create_cluster(dataproc,project,zone,region,clusterName)
self.wait_for_cluster(dataproc,project,region,clusterName)
job_id = self.submit_job(dataproc,project,region,clusterName)
self.wait_for_job(dataproc,project,region,job_id)
dataproc.projects()。regions()。clusters()。delete(projectId = project,region = region,clusterName = clusterName).execute()
self.response.write(JOB SENT )

app = webapp2.WSGIApplication([('/ run',CronEventHandler)],debug = True)

在删除群集之前一切正常。在这一点上,我得到一个DeadlineExceededError:响应HTTP请求的总体截止日期已超出。任何想法 ?

解决方案

除了通过 list 在集群上获取请求或者通过CreateCluster请求返回的操作,对于这样的一次性集群,您还可以考虑使用 Dataproc Workflows API 和可能的 InstantiateInline接口;在此API中,您可以使用单个请求来指定群集设置以及要提交的作业,并且一旦群集准备就绪,作业将自动运行,之后群集将自动删除。


I'm trying to schedule a PySpark Job. I followed the GCP documentation and ended up deploying a little python script to App Engine which does the following :

  • authenticate using a service account
  • submit a job to a cluster

The problem is, I need the cluster to be up and running otherwise the job won't be sent (duh !) but I don't want the cluster to always be up and running, especially since my job needs to run once a month.

I wanted to add the creation of a cluster in my python script but the call is asynchronous (it makes an HTTP request) and thus my job is submitted after the cluster creation call but before the cluster is really up and running.

How could I do ?

I'd like something cleaner than just waiting for a few minutes in my script !

Thanks

EDIT : Here's what my code looks like so far :

To launch the job

class EnqueueTaskHandler(webapp2.RequestHandler):
    def get(self):
        task = taskqueue.add(
            url='/run',
            target='worker')

        self.response.write(
            'Task {} enqueued, ETA {}.'.format(task.name, task.eta))

app = webapp2.WSGIApplication([('/launch', EnqueueTaskHandler)], debug=True)

The job

class CronEventHandler(webapp2.RequestHandler):

    def create_cluster(self, dataproc, project, zone, region, cluster_name):
        zone_uri = 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(project, zone)
        cluster_data = {...}

        dataproc.projects().regions().clusters().create(
            projectId=project,
            region=region,
            body=cluster_data).execute()

    def wait_for_cluster(self, dataproc, project, region, clustername):
        print('Waiting for cluster to run...')
        while True:
            result = dataproc.projects().regions().clusters().get(
            projectId=project,
            region=region,
            clusterName=clustername).execute()
            # Handle exceptions
            if result['status']['state'] != 'RUNNING':
                time.sleep(60)
            else:
                return result

    def wait_for_job(self, dataproc, project, region, job_id):
        print('Waiting for job to finish...')
        while True:
            result = dataproc.projects().regions().jobs().get(
                projectId=project,
                region=region,
                jobId=job_id).execute()
            # Handle exceptions
            print(result['status']['state'])
            if result['status']['state'] == 'ERROR' or result['status']['state'] == 'DONE':
                return result
            else:
                time.sleep(60)

    def submit_job(self, dataproc, project, region, clusterName):
        job = {...}
        result = dataproc.projects().regions().jobs().submit(projectId=project,region=region,body=job).execute()
        return result['reference']['jobId']


    def post(self):
        dataproc = googleapiclient.discovery.build('dataproc', 'v1')

        project = '...'
        region = "..."
        zone = "..."
        clusterName = '...'

        self.create_cluster(dataproc, project, zone, region, clusterName)
        self.wait_for_cluster(dataproc, project, region, clusterName)
        job_id = self.submit_job(dataproc,project,region,clusterName)
        self.wait_for_job(dataproc,project,region,job_id)
        dataproc.projects().regions().clusters().delete(projectId=project, region=region, clusterName=clusterName).execute()
        self.response.write("JOB SENT")

app = webapp2.WSGIApplication([('/run', CronEventHandler)], debug=True)

Everything works until the deletion of the cluster. At this point I get a "DeadlineExceededError: The overall deadline for responding to the HTTP request was exceeded." Any idea ?

解决方案

In addition to general polling either through list or get requests on the Cluster or the Operation returned with the CreateCluster request, for single-use clusters like this you can also consider using the Dataproc Workflows API and possibly its InstantiateInline interface if you don't want to use full-fledged workflow templates; in this API you use a single request to specify cluster settings along with jobs to submit, and the jobs will automatically run as soon as the cluster is ready to take it, after which the cluster will be deleted automatically.

这篇关于在以编程方式将作业发送到dataproc之前创建集群的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆