为什么此Pig UDF导致“错误:Java堆空间”鉴于我将DataBag泄漏到磁盘? [英] Why does this Pig UDF Result in an "Error: Java heap space" Given that I am Spilling the DataBag to Disk?

查看:140
本文介绍了为什么此Pig UDF导致“错误:Java堆空间”鉴于我将DataBag泄漏到磁盘?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的UDF:

  public DataBag exec(Tuple input)抛出IOException {
Aggregate aggregatedOutput = null ;

int spillCount = 0;

DataBag outputBag = BagFactory.newDefaultBag();
DataBag值=(DataBag)input.get(0);
for(Iterator< Tuple> iterator = values.iterator(); iterator.hasNext();){
Tuple tuple = iterator.next();
// spillCount ++;
...
if(关于当前输入元组的一些条件){
//使用输入元组
中的信息对aggregatedOutput做些什么else else {
//因为输入元组不适用于当前的aggregateOutput
//返回当前的aggregateOutput并应用输入元组
//到新的aggregateOutput
Tuple returnTuple = aggregatedOutput.getTuple();
outputBag.add(returnTuple);
spillCount ++;
aggregatedOutputTuple = new Aggregate(tuple);


if(spillCount == 1000){
outputBag.spill();
spillCount = 0;
}
}
}
return outputBag;
}

请注意,每输入1000个元组,磁盘。我已经将这个数字设置为50和10万,但仍然收到内存错误:

  Pig日志文件转储:

后端错误消息
---------------------
错误:Java堆空间

Pig Stack跟踪
---------------
错误2997:无法重新创建支持错误的异常:错误:Java堆空间
code>

我能做些什么来解决这个问题?它正在处理约一百万行。



这里是解决方案



使用累加器接口:

  public class Foo extends EvalFunc< DataBag>实现了Accumulator< DataBag> {
private DataBag outputBag = null;
private UltraAggregation currentAggregation = null;

public void accumulate(Tuple input)throws IOException {
DataBag values =(DataBag)input.get(0);
Aggregate aggregatedOutput = null;
outputBag = BagFactory.getInstance()。newDefaultBag(); (Iterator< Tuple> iterator = values.iterator(); iterator.hasNext();){
Tuple tuple = iterator.next();


...
if(关于当前输入元组的一些条件){
//使用输入元组
中的信息对aggregatedOutput做些什么else else {
//因为输入元组不适用于当前的aggregateOutput
//返回当前的aggregateOutput并应用输入元组
//到新的aggregateOutput
outputBag.add(aggregatedOutput.getTuple());
aggregatedOutputTuple = new Aggregate(tuple);




//当从当前键的所有元组传递到累加时调用
public DataBag getValue(){
//添加最终的当前汇总
outputBag.add(currentAggregation.getTuple());
返回outputBag;
}
//在getValue()
之后调用它//不确定这些命令是否必要,因为它们在累积开始时重复
public void cleanup(){
outputBag = null;
currentAggregation = null;
}

public DataBag exec(Tuple input)抛出IOException {
//同上^ ^但这似乎不会被调用。


public Sc​​hema outputSchema(Schema input){
try {
return new Schema(new FieldSchema(getSchemaName(this.getClass()。getName())。 toLowerCase(),input),bagSchema,DataType.BAG));
} catch {FrontendException e){
e.printStackTrace();
返回null; (





$ b $ tuple getTuple(){
Tuple output = TupleFactory.getInstance() .newTuple(OUTPUT_TUPLE_SIZE);
尝试{
output.set(0,val);
...
} catch(ExecException e){
e.printStackTrace();
返回null;
}
}
...
}
}


<每当你追加到 outputBag 时,你应该增加 spillCount 不是每次你从迭代器中得到一个元组。只有当spillCount是1000的倍数且您的if条件未满足时才会发生泄漏,这可能不会经常发生(取决于逻辑)。这可以解释为什么你看不到不同的溢出阈值有很大的差异。如果这不能解决你的问题,我会尝试扩展 AccumulatorEvalFunc<< ; DataBag> 。在你的情况下,你实际上并不需要访问整个包。你的实现符合累加器样式实现,因为你只需要访问当前的元组。这可能会减少内存使用量。基本上你会有一个DataBag类型的实例变量来累加最终的输出。您还可以为 aggregatedOutput 提供一个实例变量,该实例变量具有当前的聚合。调用 accumulate()将会1)更新当前聚合,或者2)将当前聚合添加到 aggregatedOutput 并开始一个新的聚合。这基本上跟随你的for循环体。


Here is my UDF:

public DataBag exec(Tuple input) throws IOException { 
    Aggregate aggregatedOutput = null;

    int spillCount = 0;

    DataBag outputBag = BagFactory.newDefaultBag(); 
    DataBag values = (DataBag)input.get(0);
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
        Tuple tuple = iterator.next();
        //spillCount++;
        ...
        if (some condition regarding current input tuple){
            //do something to aggregatedOutput with information from input tuple
        } else {
            //Because input tuple does not apply to current aggregateOutput
            //return current aggregateOutput and apply input tuple
            //to new aggregateOutput
            Tuple returnTuple = aggregatedOutput.getTuple();
            outputBag.add(returnTuple);
            spillCount++;
            aggregatedOutputTuple = new Aggregate(tuple);


            if (spillCount == 1000) {
                outputBag.spill();
                spillCount = 0;
            }
        }
    }
    return outputBag; 
}

Please focus on the fact that for every 1000 input tuples, the bag spills to disk. I have set this number as low as 50 and as high as 100,000 yet still receive the memory error:

Pig logfile dump:

Backend error message
---------------------
Error: Java heap space

Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space

What can I do to solve this? It is processing about a million rows.

HERE IS THE SOLUTION

Using the Accumulator interface:

public class Foo extends EvalFunc<DataBag> implements Accumulator<DataBag> {
    private DataBag outputBag = null;
    private UltraAggregation currentAggregation = null;

    public void accumulate(Tuple input) throws IOException {
        DataBag values = (DataBag)input.get(0);
        Aggregate aggregatedOutput = null;
        outputBag = BagFactory.getInstance().newDefaultBag();

        for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
            Tuple tuple = iterator.next();
            ...
            if (some condition regarding current input tuple){
                //do something to aggregatedOutput with information from input tuple
            } else {
                //Because input tuple does not apply to current aggregateOutput
                //return current aggregateOutput and apply input tuple
                //to new aggregateOutput
                outputBag.add(aggregatedOutput.getTuple());
                aggregatedOutputTuple = new Aggregate(tuple);
            }
        }
    }

    // Called when all tuples from current key have been passed to accumulate
    public DataBag getValue() {
        //Add final current aggregation
        outputBag.add(currentAggregation.getTuple());
        return outputBag;
    }
    // This is called after getValue()
    // Not sure if these commands are necessary as they are repeated in beginning of accumulate
    public void cleanup() {
        outputBag = null;
        currentAggregation = null;
    }

    public DataBag exec(Tuple input) throws IOException {
        // Same as above ^^ but this doesn't appear to ever be called.
    }

    public Schema outputSchema(Schema input) {
        try {
            return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG));
        } catch {FrontendException e) {
            e.printStackTrace();
            return null;
        }
    }

    class Aggregate {
        ...
        public Tuple getTuple() {
            Tuple output = TupleFactory.getInstance().newTuple(OUTPUT_TUPLE_SIZE);
            try {
                output.set(0, val);
                ...
            } catch (ExecException e) {
                e.printStackTrace();
                return null;
            }
        }
        ...
    }
}

解决方案

You should increment spillCount every time you append to outputBag, not every time you get a tuple from the iterator. You are only spilling whenever the spillCount is a multiple of 1000 AND your if condition is not met, which may not happen that often (depending on the logic). This may explain why you don't see much difference for different spill thresholds.

If that doesn't solve your problem I would try extending AccumulatorEvalFunc<DataBag>. In your case you don't actually need access to the whole bag. Your implementation fits with an accumulator style implementation because you only need access to the current tuple. This may reduce memory usage. Essentially you would have an instance variable of type DataBag that accumulates the final output. You would also have an instance variable for aggregatedOutput that would have the current aggregate. A call to accumulate() would either 1) update the current aggregate, or 2) add the current aggregate to aggregatedOutput and begin a new aggregate. This essentially follows the body of your for loop.

这篇关于为什么此Pig UDF导致“错误:Java堆空间”鉴于我将DataBag泄漏到磁盘?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆