猪UDF处理多成荫的元组拆分成不同的映射器 [英] PIG UDF handle multi-lined tuple split into different mapper

查看:103
本文介绍了猪UDF处理多成荫的元组拆分成不同的映射器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有文件,其中每个元组跨越多行,例如:

I have file where each tuple span multiple lines, for example:

START
name: Jim
phone: 2128789283
address: 56 2nd street, New York, USA
END
START
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
.
.
.

所以,以上是2元组在我的文件。我写了我的UDF的定义的 GETNEXT()函数,检查它是否启动,那么我将初始化我的元组;如果是END然后我将返回元组(从字符串缓冲区);否则我将字符串只是添加到字符串缓冲区。

So above are 2 tuples in my file. I wrote my UDF that defined a getNext() function which check if it is START then I will initialize my tuple; if it is END then I will return the tuple (from string buffer); otherwise I will just add the string to string buffer.

这可以很好地用于文件大小小于HDFS块大小是64 MB(在Amazon EMR),而将失败的大小比这个更大。我尝试谷歌四周,发现这博客帖子。拉贾的交代是容易理解的,他提供了一个示例code。但是,code正在实施,而不是 GETNEXT() RecordReader 部分,猪 LoadFunc 。只是想知道如果任何人有经验来处理多行的猪元组分割的问题?我应该继续在猪实施 RecordReader ?如果是这样,怎么样?

It works well for file size is less than HDFS block size which is 64 MB (on Amazon EMR), whereas it will fail for the size larger than this. I try to google around, find this blog post. Raja's explaination is easy to understand and he provided a sample code. But the code is implementing the RecordReader part, instead of getNext() for pig LoadFunc. Just wondering if anyone has experience to handle multi-lined pig tuple split problem? Should I go ahead implement RecordReader in Pig? If so, how?

感谢。

推荐答案

您可以preprocess您输入作为的盖伊的提及,或者使用其他的技巧说明的here

You may preprocess your input as Guy mentioned or can apply other tricks described here.

我认为最干净的解决办法是实现自定义的的InputFormat (连同其RecordReader),它创建一个记录/ START-END。猪的 LoadFunc 坐在Hadoop的的InputFormat的顶部,这样你就可以定义的InputFormat你LoadFunc将使用。
一个原始的,框架实现自定义LoadFunc会是这样的:

I think the cleanest solution would be to implement a custom InputFormat (along with its RecordReader) which creates one record/START-END. The Pig's LoadFunc sits on the top of the Hadoop's InputFormat, so you can define which InputFormat your LoadFunc will use.
A raw, skeleton implementation of a custom LoadFunc would look like:

import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class CustomLoader extends LoadFunc {

    private RecordReader reader;
    private TupleFactory tupleFactory;

    public CustomLoader() {
        tupleFactory = TupleFactory.getInstance();
    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        return new MyInputFormat(); //custom InputFormat
    }

    @Override
    public Tuple getNext() {
        Tuple result = null;
        try {
            if (!reader.nextKeyValue()) {
                return null;
            }
            //value can be a custom Writable containing your name/value 
            //field pairs for a given record
            Object value = reader.getCurrentValue();
            result = tupleFactory.newTuple();
            // ...
            //append fields to tuple
        }
        catch (Exception e) {
            // ...
        }
        return result;
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit pigSplit) 
      throws IOException {
        this.reader = reader;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

LoadFunc 初始化的InputFormat RecordReader ,它定位数据的输入位置,并开始从recordReader获得的记录,创造所产生的元组( GETNEXT()的),直到输入已充分阅读。

After the LoadFunc initializes the InputFormat and its RecordReader, it locates the input location of your data and begins to obtain the records from recordReader, creates the resulting tuples (getNext()) until the input has been fully read.

在自定义的InputFormat一些言论:

Some remarks on the custom InputFormat:

我创建定制的InputFormat其中RecordReader是修改后的版本 org.apache.hadoop.ma preduce.lib.input.LineRecordReader :大多数的方法将 保持不变,但初始化():它会调用自定义LineReader (根据 org.apache.hadoop.util.LineReader )。 该的InputFormat的关键将是行偏移(龙),该值将是自定义 可写的。这将举行一个记录的字段(START-END之间即数据)键 - 值对的列表。每当你RecordReader的 nextKeyValue()被称为写入到自定义可写的LineReader记录。整个事情的要点是如何 实施 LineReader.readLine()

I'd create a custom InputFormat in which the RecordReader is a modified version of org.apache.hadoop.mapreduce.lib.input.LineRecordReader: Most of the methods would remain the same, except initialize(): it would call a custom LineReader (based on org.apache.hadoop.util.LineReader). The InputFormat's key would be the line offset (Long), the value would be a custom Writable. This would hold the fields of a record (i.e data between START-END) as a list of key-value pairs. Each time your RecordReader's nextKeyValue() is called the record is written to the custom Writable by the LineReader. The gist of the whole thing is how you implement LineReader.readLine().

另外,可能是一个更简单的方法是改变的TextInputFormat的分隔符(这是Hadoop的0.23配置,请参阅 textinputformat.record.delimiter ) 一个适合于你的数据结构(如果有可能)。在这种情况下,你最终会具有文本您的数据,你需要分割和提取KV对和成元组。

Another, probably an easier approach would be to change the delimiter of TextInputFormat (It is configurable in Hadoop 0.23, see textinputformat.record.delimiter) to one that is appropriate for your data structure (if it is possible). In this case you'll end up having your data in Text from which you need to split and extract KV pairs and into tuples.

这篇关于猪UDF处理多成荫的元组拆分成不同的映射器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆