为什么这个 Pig UDF 会导致“错误:Java 堆空间"?鉴于我将 DataBag 溢出到磁盘? [英] Why does this Pig UDF Result in an "Error: Java heap space" Given that I am Spilling the DataBag to Disk?

查看:25
本文介绍了为什么这个 Pig UDF 会导致“错误:Java 堆空间"?鉴于我将 DataBag 溢出到磁盘?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的 UDF:

public DataBag exec(Tuple input) throws IOException { 
    Aggregate aggregatedOutput = null;
    
    int spillCount = 0;

    DataBag outputBag = BagFactory.newDefaultBag(); 
    DataBag values = (DataBag)input.get(0);
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
        Tuple tuple = iterator.next();
        //spillCount++;
        ...
        if (some condition regarding current input tuple){
            //do something to aggregatedOutput with information from input tuple
        } else {
            //Because input tuple does not apply to current aggregateOutput
            //return current aggregateOutput and apply input tuple
            //to new aggregateOutput
            Tuple returnTuple = aggregatedOutput.getTuple();
            outputBag.add(returnTuple);
            spillCount++;
            aggregatedOutputTuple = new Aggregate(tuple);
            
            
            if (spillCount == 1000) {
                outputBag.spill();
                spillCount = 0;
            }
        }
    }
    return outputBag; 
}

请注意,对于每 1000 个输入元组,包会溢出到磁盘.我已将此数字设置为低至 50 和高至 100,000,但仍然收到内存错误:

Please focus on the fact that for every 1000 input tuples, the bag spills to disk. I have set this number as low as 50 and as high as 100,000 yet still receive the memory error:

Pig logfile dump:

Backend error message
---------------------
Error: Java heap space

Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space

我该怎么做才能解决这个问题?它正在处理大约一百万行.

What can I do to solve this? It is processing about a million rows.

使用累加器接口:

public class Foo extends EvalFunc<DataBag> implements Accumulator<DataBag> {
    private DataBag outputBag = null;
    private UltraAggregation currentAggregation = null;
    
    public void accumulate(Tuple input) throws IOException {
        DataBag values = (DataBag)input.get(0);
        Aggregate aggregatedOutput = null;
        outputBag = BagFactory.getInstance().newDefaultBag();
        
        for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
            Tuple tuple = iterator.next();
            ...
            if (some condition regarding current input tuple){
                //do something to aggregatedOutput with information from input tuple
            } else {
                //Because input tuple does not apply to current aggregateOutput
                //return current aggregateOutput and apply input tuple
                //to new aggregateOutput
                outputBag.add(aggregatedOutput.getTuple());
                aggregatedOutputTuple = new Aggregate(tuple);
            }
        }
    }
    
    // Called when all tuples from current key have been passed to accumulate
    public DataBag getValue() {
        //Add final current aggregation
        outputBag.add(currentAggregation.getTuple());
        return outputBag;
    }
    // This is called after getValue()
    // Not sure if these commands are necessary as they are repeated in beginning of accumulate
    public void cleanup() {
        outputBag = null;
        currentAggregation = null;
    }
    
    public DataBag exec(Tuple input) throws IOException {
        // Same as above ^^ but this doesn't appear to ever be called.
    }
    
    public Schema outputSchema(Schema input) {
        try {
            return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG));
        } catch {FrontendException e) {
            e.printStackTrace();
            return null;
        }
    }
    
    class Aggregate {
        ...
        public Tuple getTuple() {
            Tuple output = TupleFactory.getInstance().newTuple(OUTPUT_TUPLE_SIZE);
            try {
                output.set(0, val);
                ...
            } catch (ExecException e) {
                e.printStackTrace();
                return null;
            }
        }
        ...
    }
}

推荐答案

每次附加到 outputBag 时都应该增加 spillCount,而不是每次从迭代器.只有当溢出计数是 1000 的倍数并且不满足 if 条件时,您才会溢出,这可能不会经常发生(取决于逻辑).这或许可以解释为什么您看不到不同溢出阈值的太大差异.

You should increment spillCount every time you append to outputBag, not every time you get a tuple from the iterator. You are only spilling whenever the spillCount is a multiple of 1000 AND your if condition is not met, which may not happen that often (depending on the logic). This may explain why you don't see much difference for different spill thresholds.

如果这不能解决您的问题,我会尝试扩展 AccumulatorEvalFunc.在您的情况下,您实际上不需要访问整个包.您的实现适合累加器风格的实现,因为您只需要访问当前元组.这可能会减少内存使用.本质上,您将拥有一个 DataBag 类型的实例变量来累积最终输出.您还将拥有 aggregatedOutput 的实例变量,该变量将具有当前聚合.对 accumulate() 的调用将 1) 更新当前聚合,或 2) 将当前聚合添加到 aggregatedOutput 并开始新的聚合.这基本上遵循 for 循环的主体.

If that doesn't solve your problem I would try extending AccumulatorEvalFunc<DataBag>. In your case you don't actually need access to the whole bag. Your implementation fits with an accumulator style implementation because you only need access to the current tuple. This may reduce memory usage. Essentially you would have an instance variable of type DataBag that accumulates the final output. You would also have an instance variable for aggregatedOutput that would have the current aggregate. A call to accumulate() would either 1) update the current aggregate, or 2) add the current aggregate to aggregatedOutput and begin a new aggregate. This essentially follows the body of your for loop.

这篇关于为什么这个 Pig UDF 会导致“错误:Java 堆空间"?鉴于我将 DataBag 溢出到磁盘?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆