我如何从Java向远程YARN集群提交级联作业? [英] How can I submit a Cascading job to a remote YARN cluster from Java?
问题描述
我知道我可以通过将其打包到JAR中来提交级联作业,详见Cascading用户指南。然后,如果使用 hadoop jar
CLI命令手动提交它,那么这个作业将运行在我的群集上。
JobConf
上设置特定属性来向集群提交作业。设置 fs.defaultFS
和 mapred.job.tracker
导致本地Hadoop库自动尝试将作业提交给Hadoop1 JobTracker。但是,设置这些属性在新版本中似乎不起作用。提交给CDH5 5.2.1使用级联2.5.3版(将CDH5列为受支持的平台)的Hadoop集群在与服务器协商时会导致IPC异常,详情如下。 我相信这个平台组合 - 级联2.5.6,Hadoop 2,CDH 5,YARN和提交的MR1 API - 是基于兼容性表格(请参阅先前版本标题下的内容)。使用 hadoop jar
提交作业可以在同一个群集上正常工作。端口8031在提交主机和ResourceManager之间打开。在服务器端的ResourceManager日志中发现了相同消息的错误。
我使用 cascading-hadoop2-mr1
b
$ b
线程main中的异常cascading.flow.FlowException:unhandled例外
at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
在WordCount.main(WordCount.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method )
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
处的java.lang.reflect sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
。 Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
引起:org.apache.hadoop.ipc。 RemoteException(org.apache.hadoop.ipc.RpcServerException):未知的rpc类型在rpc headerRPC_WRITABLE $ b $ org.apache.hadoop.ipc.Client中。调用(Client.java:1411)
at org.apache.hadoop.ipc.Client.call(Client.java:1364)
at org.apache.hadoop.ipc.WritableRpcEngine $ Invoker.invoke( WritableRpcEngine.java:231)
。在org.apache.hadoop.mapred。$ Proxy11.getStagingAreaDir(来源不明)在org.apache.hadoop.mapred.JobClient.getStagingAreaDir
(JobClient.java:1368)在org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)
。在org.apache.hadoop.mapred.JobClient $ 2.run(JobClient.java:982)
在org.apache.hadoop.mapred.JobClient $ 2.run(JobClient.java:976)$ b $在java.security.AccessController.doPrivileged(本地方法)$ b $在javax.security.auth.Subject。 DOAS(Subject.java:415)
在org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
在org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient。 java:976)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:950)
at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:105)
处cascading.flow.planner cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:196)
。 FlowStepJob.start(FlowStepJob.java:149)
在cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
在cascading.flow.planner.FlowStepJob.call(FlowStepJob.java: 43)
在java.util.concurrent.FutureTask.run(FutureTask.java:262)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在java .util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)$ b $ at java.lang.Thread.run(Thread.java:745)
演示代码如下,与Cascading用户指南中的WordCount示例基本相同。
public class WordCount {
public static void main(String [] args){
String inputPath =/ user / vagrant / wordcount / input;
String outputPath =/ user / vagrant / wordcount / output;
Scheme sourceScheme = new TextLine(new Fields(line));
点击source = new Hfs(sourceScheme,inputPath);
计划sinkScheme = new TextDelimited(new Fields(word,count));
点击sink = new Hfs(sinkScheme,outputPath,SinkMode.REPLACE);
管道组件=新管道(wordcount);
String regex =(?<!\\pL)(?= \\pL)[^] *(?<= \\pL )(?\\pL);
函数function = new RegexGenerator(new Fields(word),regex);
assembly = new每个(程序集,新字段(行),函数);
assembly = new GroupBy(assembly,new Fields(word));
Aggregator count = new Count(new Fields(count));
assembly = new Every(assembly,count);
属性properties = AppProps.appProps()
.setName(word-count-application)
.setJarClass(WordCount.class)
.buildProperties() ;
properties.put(fs.defaultFS,hdfs://192.168.30.101);
properties.put(mapred.job.tracker,192.168.30.101:8032);
FlowConnector flowConnector = new HadoopFlowConnector(properties);
Flow flow = flowConnector.connect(word-count,source,sink,assembly);
flow.complete();
}
}
我也尝试设置一堆其他属性试图让它工作:
-
mapreduce.jobtracker.address
-
mapreduce.framework.name
-
yarn.resourcemanager.address
-
yarn.resourcemanager.host
-
yarn.resourcemanager.hostname
-
yarn.resourcemanager.resourcetracker.address
这些工作都不行,它们只是导致作业以本地模式运行(除非 mapred.job.tracker
也是设置的)。现在我已经解决了这个问题。 它来自尝试使用Cloudera分发的旧版Hadoop类,特别是JobClient。如果您使用提供的 2.5.0-mr1-cdh5.2.1
版本使用 hadoop-core
,则会发生这种情况,或者 hadoop-client
依赖于这个相同的版本号。尽管这声称是MR1版本,并且我们正在使用MR1 API进行提交,但此版本实际上只支持提交给Hadoop1 JobTracker,并且它不支持YARN。
为了允许提交到YARN,必须对非MR1 2.5.0-cdh5.2.1
依赖项使用
hadoop-client
/ code>版本,该版本仍支持将MR1作业提交至YARN。
I know that I can submit a Cascading job by packaging it into a JAR, as detailed in the Cascading user guide. That job will then run on my cluster if I manually submit it using hadoop jar
CLI command.
However, in the original Hadoop 1 Cascading version, it was possible to submit a job to the cluster by setting certain properties on the Hadoop JobConf
. Setting fs.defaultFS
and mapred.job.tracker
caused the local Hadoop library to automatically attempt to submit the job to the Hadoop1 JobTracker. However, setting these properties does not seem to work in the newer version. Submitting to a CDH5 5.2.1 Hadoop cluster using Cascading version 2.5.3 (which lists CDH5 as a supported platform) leads to an IPC exception when negotiating with the server, as detailed below.
I believe that this platform combination -- Cascading 2.5.6, Hadoop 2, CDH 5, YARN, and the MR1 API for submission -- is a supported combination based on the compatibility table (see under "Prior Releases" heading). And submitting the job using hadoop jar
works fine on this same cluster. Port 8031 is open between the submitting host and the ResourceManager. An error with the same message is found in the ResourceManager logs on the server side.
I am using the cascading-hadoop2-mr1
library.
Exception in thread "main" cascading.flow.FlowException: unhandled exception
at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
at WordCount.main(WordCount.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RpcServerException): Unknown rpc kind in rpc headerRPC_WRITABLE
at org.apache.hadoop.ipc.Client.call(Client.java:1411)
at org.apache.hadoop.ipc.Client.call(Client.java:1364)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:231)
at org.apache.hadoop.mapred.$Proxy11.getStagingAreaDir(Unknown Source)
at org.apache.hadoop.mapred.JobClient.getStagingAreaDir(JobClient.java:1368)
at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:982)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:976)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:950)
at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:105)
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:196)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Demo code is below, which is basically identical to the WordCount sample from the Cascading user guide.
public class WordCount {
public static void main(String[] args) {
String inputPath = "/user/vagrant/wordcount/input";
String outputPath = "/user/vagrant/wordcount/output";
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );
Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );
Pipe assembly = new Pipe( "wordcount" );
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );
assembly = new GroupBy( assembly, new Fields( "word" ) );
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );
Properties properties = AppProps.appProps()
.setName( "word-count-application" )
.setJarClass( WordCount.class )
.buildProperties();
properties.put("fs.defaultFS", "hdfs://192.168.30.101");
properties.put("mapred.job.tracker", "192.168.30.101:8032");
FlowConnector flowConnector = new HadoopFlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );
flow.complete();
}
}
I've also tried setting a bunch of other properties to try to get it working:
mapreduce.jobtracker.address
mapreduce.framework.name
yarn.resourcemanager.address
yarn.resourcemanager.host
yarn.resourcemanager.hostname
yarn.resourcemanager.resourcetracker.address
None of these worked, they just cause the job to run in local mode (unless mapred.job.tracker
is also set).
I've now resolved this problem. It comes from trying to use the older Hadoop classes that Cloudera distributes, particularly JobClient. This will happen if you use hadoop-core
with the provided 2.5.0-mr1-cdh5.2.1
version, or the hadoop-client
dependency with this same version number. Although this claims to be the MR1 version, and we are using the MR1 API to submit, this version actually ONLY supports submission to the Hadoop1 JobTracker, and it does not support YARN.
In order to allow submitting to YARN, you must use the hadoop-client
dependency with the non-MR1 2.5.0-cdh5.2.1
version, which still supports submission of MR1 jobs to YARN.
这篇关于我如何从Java向远程YARN集群提交级联作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!