从另一个Java文件动态编译和运行Hadoop作业 [英] Dynamically compiling and Running a Hadoop job from another Java File

查看:55
本文介绍了从另一个Java文件动态编译和运行Hadoop作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个Java文件,该文件接收MapReduce作业的源代码,对其进行动态编译并在Hadoop集群上运行该作业。为此,我编写了3个方法,分别称为compile(),makeJAR()和run_Hadoop_Job()。编译和创建JAR文件,一切都可以正常工作。但是,当作业提交到Hadoop时,作业一开始就面临查找所需的Mapper / Reducer类的问题,并为Mapper_Class和Reducer_Class都抛出ClassNotFoundException *(java.lang.ClassNotFoundException:reza。 rCloud.Mapper_Reducer_Classes $ Mapper_Class.class)* 。我知道引用所需的Mapper / Reducer类的方式应该有问题,但是经过几次我还是无法弄清楚。对于如何解决此问题的任何帮助/建议都将受到高度赞赏。

I am trying to write a Java file that receives the source code of a MapReduce job, compiles it dynamically and runs the job on a Hadoop cluster. To reach this, I have written 3 methods called compile(), makeJAR() and run_Hadoop_Job(). Everything works fine with the compilation and creation of the JAR file. However, when the job is submitted to Hadoop, as soon as the job starts, it faces problem with finding required Mapper/Reducer classes and throws a ClassNotFoundException for both the Mapper_Class and Reducer_Class *(java.lang.ClassNotFoundException: reza.rCloud.Mapper_Reducer_Classes$Mapper_Class.class)* . I know that there should be something wrong with how I have referenced the required Mapper/Reducer classes but I was not able to figure it out after several. Any help/suggestion on how to solve the issue is highly appreciated.

关于项目的详细信息:我有一个名为 rCloud_test的文件/src/reza/Mapper_Reducer_Classes.java,其中包含Mapper_Class和Reducer_Class的源代码。该文件最终在运行时期间接收到,但是现在我复制了其中的Hadoop WordCount示例,并将其本地存储在与主类文件相同的文件夹中:rCloud_test / src / reza / Platform2.java。

Regarding the details of the project: I have a file called "rCloud_test/src/reza/Mapper_Reducer_Classes.java" that contains the source code for Mapper_Class and Reducer_Class. This file is ultimately received during the runtime but for now I copied the Hadoop WordCount example in it and store it locally in the same folder as my main class file: rCloud_test/src/reza/Platform2.java.

在下面,您可以看到Platform2.java的main()方法,这是该项目的主要类:

Here below you can see the main() method of the Platform2.java which is my main class for this project:

    public static void main(String[] args){
    System.out.println("Code Execution Started");
    String className = "Mapper_Reducer_Classes";
    Platform2 myPlatform = new Platform2();

    //step 1: compile the received class file dynamically:
    boolean compResult = myPlatform.compile(className); 
    System.out.println(className + ".java compilation result: "+compResult);

    //step 2: make a JAR file out of the compiled file:
    if (compResult) {
        compResult  = myPlatform.makeJAR("jar_file", myPlatform.compilation_Output_Folder);
        System.out.println("JAR creation result: "+compResult);
    } 
    //step 3: Now let's run the Hadoop job:
    if (compResult) {       
        compResult = myPlatform.run_Hadoop_Job(className);
        System.out.println("Running on Hadoop result: "+compResult);
    }

引起我所有问题的方法是run_Hadoop_Job()如下所示:

The method that is causing me all the problems is the run_Hadoop_Job() which is as below:

private boolean run_Hadoop_Job(String className){
try{
    System.out.println("*Starting to run the code on Hadoop...");
    String[] argsTemp = { "project_test/input", "project_test/output" };

    Configuration conf = new Configuration();
    conf.set("fs.default.name", "hdfs://localhost:54310");
    conf.set("mapred.job.tracker", "localhost:54311");

    conf.set("mapred.jar", jar_Output_Folder + "/jar_file"+".jar");

    conf.set("libjars", required_Execution_Classes);

    //THIS IS WHERE IT CAN'T FIND THE MENTIONED CLASSES, ALTHOUGH THEY EXIST BOTH ON DISK 
    // AND IN THE CREATED JAR FILE:??????
    System.out.println("Getting Mapper/Reducer package name: " + 
                        Mapper_Reducer_Classes.class.getName());
    conf.set("mapreduce.map.class", "reza.rCloud.Mapper_Reducer_Classes$Mapper_Class");
    conf.set("mapreduce.reduce.class", "reza.rCloud.Mapper_Reducer_Classes$Reducer_Class");

    Job job = new Job(conf, "Hadoop Example for dynamically and programmatically compiling-running a job");
    job.setJarByClass(Platform2.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(argsTemp[0]));
    FileSystem fs = FileSystem.get(conf);
    Path out = new Path(argsTemp[1]);
    fs.delete(out, true);
    FileOutputFormat.setOutputPath(job, new Path(argsTemp[1]));

    //job.submit();
    System.out.println("*and now submitting the job to Hadoop...");
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    System.out.println("Job Finished!");        
} catch (Exception e) {         
            System.out.println("****************Exception!" ); 
            e.printStackTrace();
            return false; 
    }
return true;
}

如果需要,这是compile()方法的源代码:

if needed, here's the source code for the compile() method:

private boolean compile(String className) {
    String fileToCompile =  JOB_FOLDER   + "/" +className+".java";
    JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();       
    FileOutputStream errorStream = null;        
    try{
        errorStream = new FileOutputStream(JOB_FOLDER + "/logs/Errors.txt");
    } catch(FileNotFoundException e){ 
        //if problem creating the file, default wil be console 
    }   

    int compilationResult = 
            compiler.run(   null, null, errorStream, 
                            "-classpath", required_Compilation_Classes,
                            "-d", compilation_Output_Folder,
                            fileToCompile);
    if (compilationResult == 0) {
        //Compilation is successful:
        return true;
    } else {
        //Compilation Failed:
        return false;
    }   
}

以及makeJAR()方法的源代码:

and the source code for makeJAR() method:

private boolean makeJAR(String outputFileName, String inputDirectory) {
    Manifest manifest = new Manifest();
    manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,
            "1.0");

    JarOutputStream target = null;                      
    try {       
        target = new JarOutputStream(new FileOutputStream(
                jar_Output_Folder+ "/"  
                + outputFileName+".jar"                      ), manifest);
        add(new File(inputDirectory), target);
    } catch (Exception e) { return false; }
    finally {
        if (target != null)
            try{
                target.close();
            } catch (Exception e) { return false; }
    }
    return true;
}

    private void add(File source, JarOutputStream target) throws IOException
{
  BufferedInputStream in = null;
  try
  {
    if (source.isDirectory())
    {
      String name = source.getPath().replace("\\", "/");
      if (!name.isEmpty())
      {
        if (!name.endsWith("/"))
          name += "/";
        JarEntry entry = new JarEntry(name);
        entry.setTime(source.lastModified());
        target.putNextEntry(entry);
        target.closeEntry();
      }
      for (File nestedFile: source.listFiles())
        add(nestedFile, target);
      return;
    }

    JarEntry entry = new JarEntry(source.getPath().replace("\\", "/"));
    entry.setTime(source.lastModified());
    target.putNextEntry(entry);
    in = new BufferedInputStream(new FileInputStream(source));

    byte[] buffer = new byte[1024];
    while (true)
    {
      int count = in.read(buffer);
      if (count == -1)
        break;
      target.write(buffer, 0, count);
    }
    target.closeEntry();
  }
  finally
  {
    if (in != null)
      in.close();
  }
}

最后是用于访问文件的固定参数:

and finally the fixed parameters used for accessing the files:

private String JOB_FOLDER = "/Users/reza/My_Software/rCloud_test/src/reza/rCloud";
private String HADOOP_SOURCE_FOLDER = "/Users/reza/My_Software/hadoop-0.20.2";
private String required_Compilation_Classes = HADOOP_SOURCE_FOLDER + "/hadoop-0.20.2-core.jar";
private String required_Execution_Classes = required_Compilation_Classes + "," +
     "/Users/reza/My_Software/ActorFoundry_dist_ver/lib/commons-cli-1.1.jar," +
     "/Users/reza/My_Software/ActorFoundry_dist_ver/lib/commons-logging-1.1.1.jar";
public String compilation_Output_Folder = "/Users/reza/My_Software/rCloud_test/dyn_classes";
private String jar_Output_Folder = "/Users/reza/My_Software/rCloud_test/dyn_jar";

运行Platform2的结果是,磁盘上的项目结构如下:

As a result of running the Platform2, the structure of the project on disk looks as below:

rCloud_test / classes / reza / rCloud / Platform2.class:包含Platform2类
rCloud_test / dyn_classes / reza / rCloud /包含Mapper_Reducer_Classes.class,Mapper_Reducer_Classes的类$ Mapper_Class.class和Mapper_Reducer_Classes $ Reducer_Class.class
rCloud_test / dyn_jar / jar_file.jar包含创建的jar文件

rCloud_test/classes/reza/rCloud/Platform2.class: contain the Platform2 class rCloud_test/dyn_classes/reza/rCloud/ contains the classes for Mapper_Reducer_Classes.class, Mapper_Reducer_Classes$Mapper_Class.class, and Mapper_Reducer_Classes$Reducer_Class.class rCloud_test/dyn_jar/jar_file.jar contains the created jar file

REVSED:这是rCloud_test / src / reza / rCloud / Mapper_Reducer_Classes.java:

REVSED: here's the source code for the rCloud_test/src/reza/rCloud/Mapper_Reducer_Classes.java:

package reza.rCloud;

import java.io.IOException;
import java.lang.InterruptedException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Mapper_Reducer_Classes {
/**
 * The map class of WordCount.
 */
public static class Mapper_Class
    extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}
/**
 * The reducer class of WordCount
 */
public static class Reducer_Class
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
}


推荐答案

尝试使用 setClass()方法:

conf.setClass("mapreduce.map.class", 
              Class.forName("reza.rCloud.Mapper_Reducer_Classes$Mapper_Class"),
              Mapper.class);

conf.setClass("mapreduce.reduce.class",
              Class.forName("reza.rCloud.Mapper_Reducer_Classes$Reducer_Class"),
              Reducer.class);

这篇关于从另一个Java文件动态编译和运行Hadoop作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆