像TextLine一样创建Scalding Source,将多个文件组合成单个映射器 [英] Create Scalding Source like TextLine that combines multiple files into single mappers

查看:126
本文介绍了像TextLine一样创建Scalding Source,将多个文件组合成单个映射器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有很多需要组合的小文件。在Scalding中,您可以使用 TextLine 以文本行的形式读取文件。问题是我们每个文件都有1个mapper ,但我们想要合并多个文件,以便它们由1个mapper处理。



我理解我们需要将输入格式更改为 CombineFileInputFormat 的实现,这可能涉及使用级联 CombinedHfs 。我们无法弄清楚如何做到这一点,但它应该只是一小撮代码来定义我们自己的Scalding源码,比如 CombineTextLine



非常感谢任何能够提供代码的人。

作为一个侧面问题,我们有一些数据在s3,如果给出的解决方案适用于s3文件会很好 - 我想这取决于是否 CombineFileInputFormat CombinedHfs 作品为s3。

解决方案

你在你的问题中得到了这个想法,所以这里有可能是你的解决方案。 p>

创建您自己的扩展 CombineFileInputFormat 的输入格式,并使用您自己的自定义 RecordReader 。我向你展示了Java代码,但如果你愿意,你可以很容易地将它转换为Scala。

  import java.io.IOException; 
导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

公共类CombinedInputFormat< K,V>扩展CombineFileInputFormat< K,V> {

public static class MyKeyValueLineRecordReader implements RecordReader< LongWritable,Text> {
private final RecordReader< LongWritable,Text>代表;

public MyKeyValueLineRecordReader(CombineFileSplit split,Configuration conf,Reporter记者,Integer idx)抛出IOException {
FileSplit fileSplit = new FileSplit(split.getPath(idx),split.getOffset(idx), split.getLength(idx),split.getLocations());
委托=新的LineRecordReader(conf,fileSplit);

$ b @Override
public boolean next(LongWritable key,Text value)throws IOException {
return delegate.next(key,value);
}

@Override
public LongWritable createKey(){
return delegate.createKey();
}

@Override
public Text createValue(){
return delegate.createValue();

$ b @Override
public long getPos()throws IOException {
return delegate.getPos();
}

@Override
public void close()throws IOException {
delegate.close();

$ b @Override
public float getProgress()throws IOException {
return delegate.getProgress();


$ b @Override
public RecordReader getRecordReader(InputSplit split,JobConf job,Reporter reporter)抛出IOException {
返回新的CombineFileRecordReader(job, (CombineFileSplit)split,reporter,(Class)MyKeyValueLineRecordReader.class);
}

}

然后你需要扩展TextLine类,并使用自己刚才定义的输入格式(从现在开始,Scala代码)。

  import cascading.scheme.hadoop .TextLine 
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred。{OutputCollector,RecordReader,JobConf}
import cascading.tap.Tap
import com.twitter .scalding。{FixedPathSource,TextLineScheme}
import cascading.scheme.Scheme

class CombineFileTextLine extends TextLine {

override def sourceConfInit(flowProcess:FlowProcess [JobConf],点击:点击[JobConf,RecordReader [_,_],OutputCollector [_,_]],conf:JobConf){
super.sourceConfInit(flowProcess,tap,conf)
conf.setInputFormat(classOf [ CombinedInputFormat [String,String]])
}
}

为您的组合输入。

  trait CombineFileTextLineScheme外延nd TextLineScheme {

override def hdfsScheme = new CombineFileTextLine()。asInstanceOf [Scheme [JobConf,RecordReader [_,_],OutputCollector [_,_],_,_]]
}

最后,创建您的源代码类:

 $ b 

如果您想使用单个路径而不是多个路径,则对源类的更改很简单。



我希望有所帮助。

We have many small files that need combining. In Scalding you can use TextLine to read files as text lines. The problem is we get 1 mapper per file, but we want to combine multiple files so that they are processed by 1 mapper.

I understand we need to change the input format to an implementation of CombineFileInputFormat, and this may involve using cascadings CombinedHfs. We cannot work out how to do this, but it should be just a handful of lines of code to define our own Scalding source called, say, CombineTextLine.

Many thanks to anyone who can provide the code to do this.

As a side question, we have some data that is in s3, it would be great if the solution given works for s3 files - I guess it depends on whether CombineFileInputFormat or CombinedHfs works for s3.

解决方案

You get the idea in your question, so here is what possibly is a solution for you.

Create your own input format that extends the CombineFileInputFormat and uses your own custom RecordReader. I am showing you Java code, but you could easily convert it to scala if you want.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {

    public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
        private final RecordReader<LongWritable,Text> delegate;

        public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
            FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
            delegate = new LineRecordReader(conf, fileSplit);
        }

        @Override
        public boolean next(LongWritable key, Text value) throws IOException {
            return delegate.next(key, value);
        }

        @Override
        public LongWritable createKey() {
            return delegate.createKey();
        }

        @Override
        public Text createValue() {
            return delegate.createValue();
        }

        @Override
        public long getPos() throws IOException {
            return delegate.getPos();
        }

        @Override
        public void close() throws IOException {
            delegate.close();
        }

        @Override
        public float getProgress() throws IOException {
            return delegate.getProgress();
        }
    }

    @Override
    public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
    }

}

Then you need to extend the TextLine class and make it use your own input format you just defined (Scala code from now on).

import cascading.scheme.hadoop.TextLine
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
import cascading.tap.Tap
import com.twitter.scalding.{FixedPathSource, TextLineScheme}
import cascading.scheme.Scheme

class CombineFileTextLine extends TextLine{

  override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {
    super.sourceConfInit(flowProcess, tap, conf)
    conf.setInputFormat(classOf[CombinedInputFormat[String, String]])
  }
}

Create a scheme for the for your combined input.

trait CombineFileTextLineScheme extends TextLineScheme{

  override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}

Finally, create your source class:

case class CombineFileMultipleTextLine(p : String*) extends  FixedPathSource(p :_*) with CombineFileTextLineScheme

If you want to use a single path instead of multiple ones, the change to your source class is trivial.

I hope that helps.

这篇关于像TextLine一样创建Scalding Source,将多个文件组合成单个映射器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆