像TextLine一样创建Scalding Source,将多个文件组合成单个映射器 [英] Create Scalding Source like TextLine that combines multiple files into single mappers
问题描述
我们有很多需要组合的小文件。在Scalding中,您可以使用 TextLine
以文本行的形式读取文件。问题是我们每个文件都有1个mapper ,但我们想要合并多个文件,以便它们由1个mapper处理。
我理解我们需要将输入格式更改为 CombineFileInputFormat
的实现,这可能涉及使用级联 CombinedHfs
。我们无法弄清楚如何做到这一点,但它应该只是一小撮代码来定义我们自己的Scalding源码,比如 CombineTextLine
。
非常感谢任何能够提供代码的人。
作为一个侧面问题,我们有一些数据在s3,如果给出的解决方案适用于s3文件会很好 - 我想这取决于是否 CombineFileInputFormat
或 CombinedHfs
作品为s3。
你在你的问题中得到了这个想法,所以这里有可能是你的解决方案。 p>
创建您自己的扩展 CombineFileInputFormat 的输入格式,并使用您自己的自定义 RecordReader 。我向你展示了Java代码,但如果你愿意,你可以很容易地将它转换为Scala。
import java.io.IOException;
导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
公共类CombinedInputFormat< K,V>扩展CombineFileInputFormat< K,V> {
public static class MyKeyValueLineRecordReader implements RecordReader< LongWritable,Text> {
private final RecordReader< LongWritable,Text>代表;
public MyKeyValueLineRecordReader(CombineFileSplit split,Configuration conf,Reporter记者,Integer idx)抛出IOException {
FileSplit fileSplit = new FileSplit(split.getPath(idx),split.getOffset(idx), split.getLength(idx),split.getLocations());
委托=新的LineRecordReader(conf,fileSplit);
$ b @Override
public boolean next(LongWritable key,Text value)throws IOException {
return delegate.next(key,value);
}
@Override
public LongWritable createKey(){
return delegate.createKey();
}
@Override
public Text createValue(){
return delegate.createValue();
$ b @Override
public long getPos()throws IOException {
return delegate.getPos();
}
@Override
public void close()throws IOException {
delegate.close();
$ b @Override
public float getProgress()throws IOException {
return delegate.getProgress();
$ b @Override
public RecordReader getRecordReader(InputSplit split,JobConf job,Reporter reporter)抛出IOException {
返回新的CombineFileRecordReader(job, (CombineFileSplit)split,reporter,(Class)MyKeyValueLineRecordReader.class);
}
}
然后你需要扩展TextLine类,并使用自己刚才定义的输入格式(从现在开始,Scala代码)。
import cascading.scheme.hadoop .TextLine
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred。{OutputCollector,RecordReader,JobConf}
import cascading.tap.Tap
import com.twitter .scalding。{FixedPathSource,TextLineScheme}
import cascading.scheme.Scheme
class CombineFileTextLine extends TextLine {
override def sourceConfInit(flowProcess:FlowProcess [JobConf],点击:点击[JobConf,RecordReader [_,_],OutputCollector [_,_]],conf:JobConf){
super.sourceConfInit(flowProcess,tap,conf)
conf.setInputFormat(classOf [ CombinedInputFormat [String,String]])
}
}
为您的组合输入。
trait CombineFileTextLineScheme外延nd TextLineScheme {
override def hdfsScheme = new CombineFileTextLine()。asInstanceOf [Scheme [JobConf,RecordReader [_,_],OutputCollector [_,_],_,_]]
}
最后,创建您的源代码类:
如果您想使用单个路径而不是多个路径,则对源类的更改很简单。
我希望有所帮助。
We have many small files that need combining. In Scalding you can use TextLine
to read files as text lines. The problem is we get 1 mapper per file, but we want to combine multiple files so that they are processed by 1 mapper.
I understand we need to change the input format to an implementation of CombineFileInputFormat
, and this may involve using cascadings CombinedHfs
. We cannot work out how to do this, but it should be just a handful of lines of code to define our own Scalding source called, say, CombineTextLine
.
Many thanks to anyone who can provide the code to do this.
As a side question, we have some data that is in s3, it would be great if the solution given works for s3 files - I guess it depends on whether CombineFileInputFormat
or CombinedHfs
works for s3.
You get the idea in your question, so here is what possibly is a solution for you.
Create your own input format that extends the CombineFileInputFormat and uses your own custom RecordReader. I am showing you Java code, but you could easily convert it to scala if you want.
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {
public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
private final RecordReader<LongWritable,Text> delegate;
public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
delegate = new LineRecordReader(conf, fileSplit);
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
return delegate.next(key, value);
}
@Override
public LongWritable createKey() {
return delegate.createKey();
}
@Override
public Text createValue() {
return delegate.createValue();
}
@Override
public long getPos() throws IOException {
return delegate.getPos();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public float getProgress() throws IOException {
return delegate.getProgress();
}
}
@Override
public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
}
}
Then you need to extend the TextLine class and make it use your own input format you just defined (Scala code from now on).
import cascading.scheme.hadoop.TextLine
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
import cascading.tap.Tap
import com.twitter.scalding.{FixedPathSource, TextLineScheme}
import cascading.scheme.Scheme
class CombineFileTextLine extends TextLine{
override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {
super.sourceConfInit(flowProcess, tap, conf)
conf.setInputFormat(classOf[CombinedInputFormat[String, String]])
}
}
Create a scheme for the for your combined input.
trait CombineFileTextLineScheme extends TextLineScheme{
override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}
Finally, create your source class:
case class CombineFileMultipleTextLine(p : String*) extends FixedPathSource(p :_*) with CombineFileTextLineScheme
If you want to use a single path instead of multiple ones, the change to your source class is trivial.
I hope that helps.
这篇关于像TextLine一样创建Scalding Source,将多个文件组合成单个映射器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!