PIG UDF 处理多行元组拆分成不同的映射器 [英] PIG UDF handle multi-lined tuple split into different mapper

查看:16
本文介绍了PIG UDF 处理多行元组拆分成不同的映射器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有每个元组跨越多行的文件,例如:

I have file where each tuple span multiple lines, for example:

START
name: Jim
phone: 2128789283
address: 56 2nd street, New York, USA
END
START
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
.
.
.

以上是我文件中的 2 个元组.我写了我的 UDF,它定义了一个 getNext() 函数,它检查它是否是 START 然后我将初始化我的元组;如果是 END 那么我将返回元组(从字符串缓冲区);否则我只会将字符串添加到字符串缓冲区.

So above are 2 tuples in my file. I wrote my UDF that defined a getNext() function which check if it is START then I will initialize my tuple; if it is END then I will return the tuple (from string buffer); otherwise I will just add the string to string buffer.

它适用于小于 64 MB 的 HDFS 块大小(在 Amazon EMR 上)的文件大小,而对于大于此大小的文件,它会失败.我试着用谷歌搜索,找到这个博客文章.Raja 的解释很容易理解,他提供了一个示例代码.但是代码正在实现 RecordReader 部分,而不是用于 Pig LoadFuncgetNext().只是想知道是否有人有处理多行猪元组拆分问题的经验?我应该继续在 Pig 中实现 RecordReader 吗?如果是这样,如何?

It works well for file size is less than HDFS block size which is 64 MB (on Amazon EMR), whereas it will fail for the size larger than this. I try to google around, find this blog post. Raja's explaination is easy to understand and he provided a sample code. But the code is implementing the RecordReader part, instead of getNext() for pig LoadFunc. Just wondering if anyone has experience to handle multi-lined pig tuple split problem? Should I go ahead implement RecordReader in Pig? If so, how?

谢谢.

推荐答案

你可以像 Guy 提到的那样预处理你的输入,或者可以应用 此处.

You may preprocess your input as Guy mentioned or can apply other tricks described here.

我认为最干净的解决方案是实现自定义 InputFormat(连同它的 RecordReader)创建一个记录/START-END.猪的 LoadFunc 位于 Hadoop 的 InputFormat 的顶部,因此您可以定义 LoadFunc 将使用的 InputFormat.
自定义 LoadFunc 的原始框架实现如下所示:

I think the cleanest solution would be to implement a custom InputFormat (along with its RecordReader) which creates one record/START-END. The Pig's LoadFunc sits on the top of the Hadoop's InputFormat, so you can define which InputFormat your LoadFunc will use.
A raw, skeleton implementation of a custom LoadFunc would look like:

import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class CustomLoader extends LoadFunc {

    private RecordReader reader;
    private TupleFactory tupleFactory;

    public CustomLoader() {
        tupleFactory = TupleFactory.getInstance();
    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        return new MyInputFormat(); //custom InputFormat
    }

    @Override
    public Tuple getNext() {
        Tuple result = null;
        try {
            if (!reader.nextKeyValue()) {
                return null;
            }
            //value can be a custom Writable containing your name/value 
            //field pairs for a given record
            Object value = reader.getCurrentValue();
            result = tupleFactory.newTuple();
            // ...
            //append fields to tuple
        }
        catch (Exception e) {
            // ...
        }
        return result;
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit pigSplit) 
      throws IOException {
        this.reader = reader;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

LoadFunc 初始化InputFormat 及其RecordReader 后,定位到你的数据的输入位置并开始从recordReader 获取记录, 创建结果元组 (getNext()) 直到输入被完全读取.

After the LoadFunc initializes the InputFormat and its RecordReader, it locates the input location of your data and begins to obtain the records from recordReader, creates the resulting tuples (getNext()) until the input has been fully read.

关于自定义 InputFormat 的一些说明:

Some remarks on the custom InputFormat:

我会创建一个自定义 InputFormat,其中 RecordReader 是org.apache.hadoop.mapreduce.lib.input.LineRecordReader:大多数方法都会保持不变,除了 initialize():它将调用自定义 LineReader(基于 org.apache.hadoop.util.LineReader).InputFormat 的键是行偏移量 (Long),值是 custom可写.这会将记录的字段(即 START-END 之间的数据)保存为键值对列表.每次调用 RecordReader 的 nextKeyValue() 时,LineReader 都会将记录写入自定义 Writable.整个事情的要点是你如何实施 LineReader.readLine().

I'd create a custom InputFormat in which the RecordReader is a modified version of org.apache.hadoop.mapreduce.lib.input.LineRecordReader: Most of the methods would remain the same, except initialize(): it would call a custom LineReader (based on org.apache.hadoop.util.LineReader). The InputFormat's key would be the line offset (Long), the value would be a custom Writable. This would hold the fields of a record (i.e data between START-END) as a list of key-value pairs. Each time your RecordReader's nextKeyValue() is called the record is written to the custom Writable by the LineReader. The gist of the whole thing is how you implement LineReader.readLine().

另一种可能更简单的方法是更改​​ TextInputFormat 的分隔符(可在 Hadoop 0.23 中进行配置,请参阅 textinputformat.record.delimiter)到适合您的数据结构的一个(如果可能).在这种情况下,您最终将拥有 Text 中的数据,您需要从中拆分和提取 KV 对并生成元组.

Another, probably an easier approach would be to change the delimiter of TextInputFormat (It is configurable in Hadoop 0.23, see textinputformat.record.delimiter) to one that is appropriate for your data structure (if it is possible). In this case you'll end up having your data in Text from which you need to split and extract KV pairs and into tuples.

这篇关于PIG UDF 处理多行元组拆分成不同的映射器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆