在 UDAF 的每个更新步骤上创建一个新的累加器 [英] Creating a new accumulator on each update step of UDAF

查看:15
本文介绍了在 UDAF 的每个更新步骤上创建一个新的累加器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在根据 UDAF 示例实施 UDAF.update 阶段看起来像这样:

I am implementing an UDAF according to UDAF example. the update phase there looks like this:

    public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
        String inputKey = input.getString(0);
        Map<String, Long> inputValues = input.<String, Long>getJavaMap(1);
        Map<String, Map<String, Long>> newData = new HashMap<>();

        if (!buffer.isNullAt(0)) {
            Map<String, Map<String, Long>> currData = buffer.<String, Map<String, Long>>getJavaMap(0);
            newData.putAll(currData);
        }
        newData.put(inputKey, inputValues);
        buffer.update(0, newData);
    }
}

您可以看到,每一步都会创建一个新的 HashMap (newData),并且将前一个缓冲区中的数据复制到其中.这看起来是一种可怕的浪费,不得不创建新的地图并复制所有元素.所以我尝试了(在我的情况下,我的地图类型略有不同):

You can see that on every step a new HashMap is created (newData) and the data from the previous buffer is copied into it. It looks like an awful waste, having to create new Maps and copying all the elements. So I tried (in my case I have a map with a slightly different types):

bufferJavaMap = buffer.<String, Integer>getJavaMap(0);
bufferJavaMap.put("aaaa", 1);
buffer.update(0, bufferJavaMap);

我收到以下错误:

java.lang.UnsupportedOperationException
   at java.util.AbstractMap.put(AbstractMap.java:209)
   at dns.MergeMapUDAF.update(MergeMapUDAF.java:84)

不能更新现有的地图吗?更新此地图的最佳方法是什么?

Isn't it possible to update the existing Map? what is the best method update this Map?

推荐答案

不能更新现有的地图吗?

Isn't it possible to update the existing Map?

这是不可能的,但问题比您在.Spark 在 getupdate 上都制作了结构的完整副本,因此即使删除显式副本也无法解决问题.

It is not possible, but the problem is more complex than the one identified in your. Spark makes a full copy of the structure on both get and update so even removing explicit copy wouldn't resolve the problem.

如果需要性能,则应避免将 UserDefinedAggregateFunction 与非原子类型一起使用.

If performance is required, you should avoid using UserDefinedAggregateFunction with non-atomic types.

这篇关于在 UDAF 的每个更新步骤上创建一个新的累加器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆