如何在java中使用yarn api提交mapreduce作业 [英] how to submit mapreduce job with yarn api in java

查看:329
本文介绍了如何在java中使用yarn api提交mapreduce作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用YARN java API提交我的MR工作,我尝试像 WritingYarnApplications ,但我不知道要添加什么amContainer,下面是我写的代码:

I want submit my MR job using YARN java API, I try to do it like WritingYarnApplications, but I don't know what to add amContainer, below is code I have written:

package org.apache.hadoop.examples;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.util.Records;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnJob {
    private static Logger logger = LoggerFactory.getLogger(YarnJob.class);

    public static void main(String[] args) throws Throwable {

        Configuration conf = new Configuration();
        YarnClient client = YarnClient.createYarnClient();
        client.init(conf);
        client.start();

        System.out.println(JSON.toString(client.getAllQueues()));
        System.out.println(JSON.toString(client.getConfig()));
        //System.out.println(JSON.toString(client.getApplications()));
        System.out.println(JSON.toString(client.getYarnClusterMetrics()));

        YarnClientApplication app = client.createApplication();
        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();

        ApplicationId appId = appResponse.getApplicationId();

        // Create launch context for app master
        ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
        // set the application id
        appContext.setApplicationId(appId);
        // set the application name
        appContext.setApplicationName("test");
        // Set the queue to which this application is to be submitted in the RM
        appContext.setQueue("default");

        // Set up the container launch context for the application master
        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
        //amContainer.setLocalResources();
        //amContainer.setCommands();
        //amContainer.setEnvironment();

        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(Resource.newInstance(1024, 1));

        appContext.setApplicationType("MAPREDUCE");

        // Submit the application to the applications manager
        client.submitApplication(appContext);
        //client.stop();
    }
}

我可以使用命令界面正确运行mapreduce作业:

I can run a mapreduce job properly with command interface:

hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount /user/admin/input /user/admin/output/

但是如何在纱线java api中提交这个wordcount作业?

But how can I submit this wordcount job in yarn java api?

推荐答案

您不使用Yarn Client提交作业,而是使用MapReduce API提交作业。 请参阅此链接以获取示例

You do not use Yarn Client to submit job, instead use MapReduce APIs to submit job. See this link for Example

但是如果你需要更多的工作控制,比如获得完成状态,Mapper阶段状态,Reducer阶段状态等,你可以使用

However if you need more control on the job, like getting status of completion, Mapper phase status, Reducer phase status, etc, you can use

job.submit();

而不是

job.waitForCompletion(true)

你可以使用函数job.mapProgress()和job.reduceProgress()获取状态。你可以探索的工作对象有很多功能。

You can use functions job.mapProgress() and job.reduceProgress() to get the status. There are lots of functions in job object which you can explore.

至于你的查询

hadoop jar wordcount.jar org.apache.hadoop.examples.WordCount /user/admin/input /user/admin/output/

这里发生的事情是你正在运行wordcount.jar中提供的驱动程序。您没有使用 java -jar wordcount.jar ,而是使用 hadoop jar wordcount.jar 。你也可以使用 yarn jar wordcount.jar 。与java -jar命令相比,Hadoop / Yarn将设置必要的附加类路径。这将执行驱动程序的main(),该命令在命令中指定的org.apache.hadoop.examples.WordCount类中可用。

Whats happening here is you are running your driver program which is available in wordcount.jar. Instead of doing "java -jar wordcount.jar" you are using "hadoop jar wordcount.jar". you can as well use "yarn jar wordcount.jar". Hadoop/Yarn will setup necessary additional classpaths compared to java -jar command. This executes the "main()" of your driver program which is available in class org.apache.hadoop.examples.WordCount as specified in the command.

您可以检查在这里找到来源 WordCount类的来源

You can check out the source here Source for WordCount class

我认为你想通过纱线提交工作的唯一原因是将它与某种服务相结合,这种服务可以启动MapReduce2工作某些事件。

The only reason i would assume you want to submit job via yarn is to integrate it with some kind of service which kicks up MapReduce2 jobs on certain events.

为此你可以随时将你的驱动程序设为main()。

For this you can always have your drivers main() something like this.

public class MyMapReduceDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    /******/

    int errCode = ToolRunner.run(conf, new MyMapReduceDriver(), args);

    System.exit(errCode);
}

@Override
public int run(String[] args) throws Exception {

    while(true) {

        try{

            runMapReduceJob();
        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
    }
}

private void runMapReduceJob() {

    Configuration conf = new Configuration();
    Job job = new Job(conf, "word count");
    /******/

    job.submit();

    // Get status
    while(job.getJobState()==RUNNING || job.getJobState()==PREP){
        Thread.sleep(1000);

        System.out.println(" Map: "+ StringUtils.formatPercent(job.mapProgress(), 0) + " Reducer: "+ StringUtils.formatPercent(job.reduceProgress(), 0));

    }
}}

希望这有帮助。

这篇关于如何在java中使用yarn api提交mapreduce作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆