如何使用Google DataProc Java Client在相关的GS存储桶中使用jar文件和类提交spark作业? [英] How do you use the Google DataProc Java Client to submit spark jobs using jar files and classes in associated GS bucket?
问题描述
我需要触发Spark Jobs以使用API调用从JSON文件聚合数据。我使用spring-boot来创建资源。因此,解决方案的步骤如下:
I need to trigger Spark Jobs to aggregate data from a JSON file using an API call. I use spring-boot to create the resources. Thus, the steps for the solution is the following:
- 用户以json文件作为输入发出POST请求
- JSON文件存储在与数据中心群集关联的谷歌存储桶中。
- 在REST方法中使用指定的jar,类和参数触发聚合spark作业是json文件链接。
我希望使用Dataproc的Java Client而不是控制台或命令行来触发作业。你是怎么做到的?
I want the job to be triggered using Dataproc's Java Client instead of console or command line. How do you do it?
推荐答案
我们希望很快能在官方文档,但要开始使用,请访问以下API概述: https://developers.google.com/api-client-library/java/apis/dataproc/v1
We're hoping to have a more thorough guide shortly on the official documentation, but to get started, visit the following API overview: https://developers.google.com/api-client-library/java/apis/dataproc/v1
它包含指向 Dataproc javadocs ;如果您的服务器代表您自己的项目而不代表最终用户的Google项目拨打电话,那么您可能需要基于密钥文件的服务帐户身份验证,在此解释以创建凭据
用于初始化 Dataproc
客户端存根的对象。
It includes links to the Dataproc javadocs; if your server is making calls on behalf of your own project and not on behalf of your end-users' Google projects, then you probably want the keyfile-based service-account auth explained here to create the Credential
object you use to initialize the Dataproc
client stub.
对于数据流特定的部分,这只是意味着如果使用Maven,则将以下依赖项添加到Maven pom文件中:
As for the dataproc-specific parts, this just means you add the following dependency to your Maven pomfile if using Maven:
<project>
<dependencies>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-dataproc</artifactId>
<version>v1-rev4-1.21.0</version>
</dependency>
</dependencies>
</project>
然后你会得到如下代码:
And then you'll have code like:
Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
.setApplicationName("my-webabb/1.0")
.build();
dataproc.projects().regions().jobs().submit(
projectId, "global", new SubmitJobRequest()
.setJob(new Job()
.setPlacement(new JobPlacement()
.setClusterName("my-spark-cluster"))
.setSparkJob(new SparkJob()
.setMainClass("FooSparkJobMain")
.setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))
.setArgs(ImmutableList.of(
"arg1", "arg2", "arg3")))))
.execute();
由于不同的中间服务器可能会进行低级重试,或者您的请求可能会抛出IOException要知道作业提交是否成功,您可能想要的一个额外步骤是生成您自己的 jobId
;然后你知道什么jobId要进行民意调查,以确定它是否已经提交,即使你的请求超时或抛出一些未知的异常:
Since different intermediary servers may do low-level retries or your request may throw an IOException where you don't know whether the job-submission succeeded or not, an addition step you may want to take is to generate your own jobId
; then you know what jobId to poll on to figure out if it got submitted even if your request times out or throws some unknown exception:
import java.util.UUID;
...
Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
.setApplicationName("my-webabb/1.0")
.build();
String curJobId = "json-agg-job-" + UUID.randomUUID().toString();
Job jobSnapshot = null;
try {
jobSnapshot = dataproc.projects().regions().jobs().submit(
projectId, "global", new SubmitJobRequest()
.setJob(new Job()
.setReference(new JobReference()
.setJobId(curJobId))
.setPlacement(new JobPlacement()
.setClusterName("my-spark-cluster"))
.setSparkJob(new SparkJob()
.setMainClass("FooSparkJobMain")
.setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))
.setArgs(ImmutableList.of(
"arg1", "arg2", "arg3")))))
.execute();
} catch (IOException ioe) {
try {
jobSnapshot = dataproc.projects().regions().jobs().get(
projectId, "global", curJobId).execute();
logger.info(ioe, "Despite exception, job was verified submitted");
} catch (IOException ioe2) {
// Handle differently; if it's a GoogleJsonResponseException you can inspect the error
// code, and if it's a 404, then it means the job didn't get submitted; you can add retry
// logic in that case.
}
}
// We can poll on dataproc.projects().regions().jobs().get(...) until the job reports being
// completed or failed now.
这篇关于如何使用Google DataProc Java Client在相关的GS存储桶中使用jar文件和类提交spark作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!