重用Hadoop的有效code火花? [英] Reuse Hadoop code in Spark efficiently?

查看:257
本文介绍了重用Hadoop的有效code火花?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述


我已经写在Hadoop的code,现在我试图移植到星火。映射器和减速器是相当复杂的。于是,我就重用的火花程序中已经存在的的Hadoop code的映射和减速类。谁能告诉我,我该如何实现这一目标?

I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I tried to reuse Mapper and Reducer classes of already existing Hadoop code inside spark program. Can somebody tell me how do I achieve this?

编辑:
结果
到目前为止,我已经能够重复使用的<​​strong>映射类火花标准的Hadoop字计数的例子,下面搜索结果实施
wordcount.java


So far, I have been able to reuse mapper class of standard hadoop word-count example in spark, implemented as below

wordcount.java

import scala.Tuple2; 

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class wordcount  extends Configured  implements Serializable {
  public static int main(String[] args) throws Exception{

        SparkConf sparkConf = new SparkConf().setMaster("spark://IMPETUS-I0203:7077").setAppName("wordcount");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf); //created Spark context
        JavaRDD<String> rec = ctx.textFile("hdfs://localhost:54310/input/words.txt"); //Record Reader

  //creating a  Pair RDD whose key=some arbitrary number, value = a Record
        JavaPairRDD<LongWritable,Text> lines =rec.mapToPair(s->new Tuple2<LongWritable,Text>(new LongWritable(s.length()),new Text(s)));

  //transforming 'lines' RDD to another such that it returns for example ('word',1) tuple.
      JavaPairRDD<Text,IntWritable> ones = lines.flatMapToPair(it -> {

          NotSerializableException notSerializable = new NotSerializableException();
          JobConf conf = new JobConf(new Configuration(), wordcount.class);
          conf.setJobName("WordCount");
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(IntWritable.class);
          Path inp = new Path("hdfs://localhost:54310/input/darcy.txt");
          FileInputFormat.addInputPath(conf, inp);
          FileOutputFormat.setOutputPath(conf, out);

          WordCountMapper mapper = new WordCountMapper();
        mapper.configure(conf);
        OutputCollector<Text,IntWritable> output = new outputcollector<Text,IntWritable>() ;

            mapper.map(it._1, it._2 , output, Reporter.NULL);

          return  ((outputcollector)output).getList();
        });

        ones.saveAsTextFile("hdfs://localhost:54310/output/41");
        return 0;
  }
}

结果
WordCountMapper.java

import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import java.io.Serializable;

public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>,Serializable
{
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            NotSerializableException notSerializable = new NotSerializableException();
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {  
               word = new Text();
               word.set(tokenizer.nextToken());
               output.collect(word, one);
            }
       }
}

结果
outputcollector.java

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapred.*;
import scala.Tuple2;

public class outputcollector<K extends Object, V extends Object> implements OutputCollector<K, V>{
    private List<Tuple2<K,V>> writer = new ArrayList<Tuple2<K,V>>();
    @Override
    public void collect(K key, V value) {
        try{
            writer.add(new Tuple2<K,V>(key,value));
        }catch(Exception e){
            System.out.println(e+ "\n\n****output collector error\n\n");
        }
    }
    public List<Tuple2<K,V>> getList(){
        return writer;
    }
}

结果
这code工作完全正常,我可以成功提交该火花的工作。它是某种高度的低效相比纯星火计划项目。它比简单的火花字计数例如延长约50倍。输入文件的 1 GB 。输入文件存在于 HDFS 。跑在独立模式。


我无法找到原因,为什么这个code是为作为一个懒惰。在这里,我为了使用WordCountMapper.java简单地收集在对(字,1)。即,可以在存储器的工作。因此,我不明白为什么我的code要这样比标准的火花字计数的例子慢。


所以,任何人都可以提出火花重用WordCountMapper.java(Hadoop的映射)的更好的办法?或者解释为什么这么慢的原因是什么?或任何有助于实现我的终极目标? (在顶部在我的问题提到的)。


This code works perfectly fine and I can successfully submit this spark job. It is somehow highly inefficient compared pure spark program. It takes about 50 times longer than a simple spark word-count example. Input file is 1 GB. Input file exists on HDFS. Running in standalone mode.

I am unable to find the reason why this code is as slow as a sloth. Here, I am using WordCountMapper.java in order to simply collect the pair(word,1). That is also working in memory. So I don't see why my code has to be so slower than standard spark word-count example.

So, can anyone suggest a better approach of reusing WordCountMapper.java (hadoop mapper) in spark? or explain the reason why it is so slow? Or anything that helps achieve my ultimate goal? (mentioned in my question at the top).

推荐答案

转换硕士学位preduce火花的基本方法是:

The basic way of converting a mapreduce to spark is:

rdd.mapPartitions(partition -> 
    setup() //map setup
    partition.map( item => 
        val output = process(item) 
        if (!partition.hasNext) {
           // Some cleanup code here
        }
    )
).groupByKey().mapPartitions( //similarly for reduce code).saveAsHadoopFile(//params) //To save on HDFS

以下链接指向一套关于Cloudera的两篇文章。并非一切都进行了讨论,但如果你通过它你如何Hadoop作业的一些部分转换成火花的要点。例如如何做到设置和清理工作。

Following link points to a set of two articles on cloudera. Not everything is discussed but if you go through it you get the gist of how to convert some parts of hadoop jobs into spark. For example how to do setup and cleanup.

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-ma$p$pduce-to-apache-spark/

注意:我已经试过转换均线preduce火花,但它导致了较慢的应用程序。也许这是我自己的低效率使用Scala或者火花不适合用于批处理作业。所以,要意识到这一点。

Note: I have tried converting mapreduce to spark but it has resulted in a slower application. Maybe it is my own inefficiency in using scala or maybe spark is not suited for batch jobs. So be aware of this as well.

这篇关于重用Hadoop的有效code火花?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆