我如何从Java向远程YARN集群提交级联作业? [英] How can I submit a Cascading job to a remote YARN cluster from Java?

查看:357
本文介绍了我如何从Java向远程YARN集群提交级联作业?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道我可以通过将其打包到JAR中来提交级联作业,详见Cascading用户指南。然后,如果使用 hadoop jar CLI命令手动提交它,那么这个作业将运行在我的群集上。

原始Hadoop 1 Cascading版本,可以通过在Hadoop JobConf 上设置特定属性来向集群提交作业。设置 fs.defaultFS mapred.job.tracker 导致本地Hadoop库自动尝试将作业提交给Hadoop1 JobTracker。但是,设置这些属性在新版本中似乎不起作用。提交给CDH5 5.2.1使用级联2.5.3版(将CDH5列为受支持的平台)的Hadoop集群在与服务器协商时会导致IPC异常,详情如下。



我相信这个平台组合 - 级联2.5.6,Hadoop 2,CDH 5,YARN和提交的MR1 API - 是基于兼容性表格(请参阅先前版本标题下的内容)。使用 hadoop jar 提交作业可以在同一个群集上正常工作。端口8031在提交主机和ResourceManager之间打开。在服务器端的ResourceManager日志中发现了相同消息的错误。



我使用 cascading-hadoop2-mr1 b
$ b

线程main中的异常cascading.flow.FlowException:unhandled例外
at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
在WordCount.main(WordCount.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method )
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
处的java.lang.reflect sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
。 Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
引起:org.apache.hadoop.ipc。 RemoteException(org.apache.hadoop.ipc.RpcServerException):未知的rpc类型在rpc headerRPC_WRITABLE $ b $ org.apache.hadoop.ipc.Client中。调用(Client.java:1411)
at org.apache.hadoop.ipc.Client.call(Client.java:1364)
at org.apache.hadoop.ipc.WritableRpcEngine $ Invoker.invoke( WritableRpcEngine.java:231)
。在org.apache.hadoop.mapred。$ Proxy11.getStagingAreaDir(来源不明)在org.apache.hadoop.mapred.JobClient.getStagingAreaDir
(JobClient.java:1368)在org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)

。在org.apache.hadoop.mapred.JobClient $ 2.run(JobClient.java:982)
在org.apache.hadoop.mapred.JobClient $ 2.run(JobClient.java:976)$ b $在java.security.AccessController.doPrivileged(本地方法)$ b $在javax.security.auth.Subject。 DOAS(Subject.java:415)
在org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
在org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient。 java:976)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:950)
at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:105)
处cascading.flow.planner cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:196)
。 FlowStepJob.start(FlowStepJob.java:149)
在cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
在cascading.flow.planner.FlowStepJob.call(FlowStepJob.java: 43)
在java.util.concurrent.FutureTask.run(FutureTask.java:262)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在java .util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)$ b $ at java.lang.Thread.run(Thread.java:745)

演示代码如下,与Cascading用户指南中的WordCount示例基本相同。

  public class WordCount {

public static void main(String [] args){
String inputPath =/ user / vagrant / wordcount / input;
String outputPath =/ user / vagrant / wordcount / output;

Scheme sourceScheme = new TextLine(new Fields(line));
点击source = new Hfs(sourceScheme,inputPath);

计划sinkScheme = new TextDelimited(new Fields(word,count));
点击sink = new Hfs(sinkScheme,outputPath,SinkMode.REPLACE);

管道组件=新管道(wordcount);


String regex =(?<!\\pL)(?= \\pL)[^] *(?<= \\pL )(?\\pL);
函数function = new RegexGenerator(new Fields(word),regex);
assembly = new每个(程序集,新字段(行),函数);


assembly = new GroupBy(assembly,new Fields(word));

Aggregator count = new Count(new Fields(count));
assembly = new Every(assembly,count);

属性properties = AppProps.appProps()
.setName(word-count-application)
.setJarClass(WordCount.class)
.buildProperties() ;

properties.put(fs.defaultFS,hdfs://192.168.30.101);
properties.put(mapred.job.tracker,192.168.30.101:8032);

FlowConnector flowConnector = new HadoopFlowConnector(properties);
Flow flow = flowConnector.connect(word-count,source,sink,assembly);

flow.complete();
}
}

我也尝试设置一堆其他属性试图让它工作:




  • mapreduce.jobtracker.address

  • mapreduce.framework.name

  • yarn.resourcemanager.address

  • yarn.resourcemanager.host

  • yarn.resourcemanager.hostname

  • yarn.resourcemanager.resourcetracker.address



这些工作都不行,它们只是导致作业以本地模式运行(除非 mapred.job.tracker 也是设置的)。现在我已经解决了这个问题。 它来自尝试使用Cloudera分发的旧版Hadoop类,特别是JobClient。如果您使用提供的 2.5.0-mr1-cdh5.2.1 版本使用 hadoop-core ,则会发生这种情况,或者 hadoop-client 依赖于这个相同的版本号。尽管这声称是MR1版本,并且我们正在使用MR1 API进行提交,但此版本实际上只支持提交给Hadoop1 JobTracker,并且它不支持YARN。



为了允许提交到YARN,必须对非MR1 2.5.0-cdh5.2.1 依赖项使用 hadoop-client / code>版本,该版本仍支持将MR1作业提交至YARN。


I know that I can submit a Cascading job by packaging it into a JAR, as detailed in the Cascading user guide. That job will then run on my cluster if I manually submit it using hadoop jar CLI command.

However, in the original Hadoop 1 Cascading version, it was possible to submit a job to the cluster by setting certain properties on the Hadoop JobConf. Setting fs.defaultFS and mapred.job.tracker caused the local Hadoop library to automatically attempt to submit the job to the Hadoop1 JobTracker. However, setting these properties does not seem to work in the newer version. Submitting to a CDH5 5.2.1 Hadoop cluster using Cascading version 2.5.3 (which lists CDH5 as a supported platform) leads to an IPC exception when negotiating with the server, as detailed below.

I believe that this platform combination -- Cascading 2.5.6, Hadoop 2, CDH 5, YARN, and the MR1 API for submission -- is a supported combination based on the compatibility table (see under "Prior Releases" heading). And submitting the job using hadoop jar works fine on this same cluster. Port 8031 is open between the submitting host and the ResourceManager. An error with the same message is found in the ResourceManager logs on the server side.

I am using the cascading-hadoop2-mr1 library.

Exception in thread "main" cascading.flow.FlowException: unhandled exception
    at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
    at WordCount.main(WordCount.java:91)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RpcServerException): Unknown rpc kind in rpc headerRPC_WRITABLE
    at org.apache.hadoop.ipc.Client.call(Client.java:1411)
    at org.apache.hadoop.ipc.Client.call(Client.java:1364)
    at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:231)
    at org.apache.hadoop.mapred.$Proxy11.getStagingAreaDir(Unknown Source)
    at org.apache.hadoop.mapred.JobClient.getStagingAreaDir(JobClient.java:1368)
    at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:982)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:976)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:950)
    at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:105)
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:196)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Demo code is below, which is basically identical to the WordCount sample from the Cascading user guide.

public class WordCount {

    public static void main(String[] args) {
        String inputPath = "/user/vagrant/wordcount/input";
        String outputPath = "/user/vagrant/wordcount/output";

        Scheme sourceScheme = new TextLine( new Fields( "line" ) );
        Tap source = new Hfs( sourceScheme, inputPath );

        Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
        Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

        Pipe assembly = new Pipe( "wordcount" );


        String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
        Function function = new RegexGenerator( new Fields( "word" ), regex );
        assembly = new Each( assembly, new Fields( "line" ), function );


        assembly = new GroupBy( assembly, new Fields( "word" ) );

        Aggregator count = new Count( new Fields( "count" ) );
        assembly = new Every( assembly, count );

        Properties properties = AppProps.appProps()
            .setName( "word-count-application" )
            .setJarClass( WordCount.class )
            .buildProperties();

        properties.put("fs.defaultFS", "hdfs://192.168.30.101");
        properties.put("mapred.job.tracker", "192.168.30.101:8032");

        FlowConnector flowConnector = new HadoopFlowConnector( properties );
        Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

        flow.complete();
    }
}

I've also tried setting a bunch of other properties to try to get it working:

  • mapreduce.jobtracker.address
  • mapreduce.framework.name
  • yarn.resourcemanager.address
  • yarn.resourcemanager.host
  • yarn.resourcemanager.hostname
  • yarn.resourcemanager.resourcetracker.address

None of these worked, they just cause the job to run in local mode (unless mapred.job.tracker is also set).

解决方案

I've now resolved this problem. It comes from trying to use the older Hadoop classes that Cloudera distributes, particularly JobClient. This will happen if you use hadoop-core with the provided 2.5.0-mr1-cdh5.2.1 version, or the hadoop-client dependency with this same version number. Although this claims to be the MR1 version, and we are using the MR1 API to submit, this version actually ONLY supports submission to the Hadoop1 JobTracker, and it does not support YARN.

In order to allow submitting to YARN, you must use the hadoop-client dependency with the non-MR1 2.5.0-cdh5.2.1 version, which still supports submission of MR1 jobs to YARN.

这篇关于我如何从Java向远程YARN集群提交级联作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆