Java 8流组合器从未调用 [英] Java 8 stream combiner never called

查看:51
本文介绍了Java 8流组合器从未调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个自定义的Java 8收集器,该收集器应该用于计算具有getValue()方法的POJO的平均值.这是代码:

I'm writing a custom java 8 collector which is supposed to compute the average of a POJO which has a getValue() method. Here's the code:

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

        @Override
        public Supplier<BigDecimal[]> supplier() {
            return () -> {
                BigDecimal[] start = new BigDecimal[2];
                start[0] = BigDecimal.ZERO;
                start[1] = BigDecimal.ZERO;
                return start;
            };
        }

        @Override
        public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
            return (a,b) ->  {
                a[0] = a[0].add(b.getValue());
                a[1] = a[1].add(BigDecimal.ONE);
            };
        }

        @Override
        public BinaryOperator<BigDecimal[]> combiner() {
            return (a,b) -> {
                a[0] = a[0].add(b[0]);
                a[1] = a[1].add(b[1]);
                return a;
            };
        }

        @Override
        public Function<BigDecimal[], BigDecimal> finisher() {
            return (a) -> {
                return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
            };
        }

        private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

        @Override
        public Set<Characteristics> characteristics() {
            return CHARACTERISTICS;
        }

    };

在非并行情况下,一切都很好.但是,当我使用parallelStream()时,有时不起作用.例如,给定从1到10的值,它将计算(53/9而不是55/10).调试时,调试器永远不会在combiner()函数中达到断点.我需要设置某种标志吗?

It all works well in the non-parallel case. However, when I use a parallelStream(), it sometimes doesn't work. For example, given the values from 1 to 10, it computes( 53/9 instead of 55/10). When debugging the debugger never hits the breakpoint in the combiner() function. Is there some kind of flag that I need to set?

推荐答案

问题似乎出在CONCURRENT特性上,它做了您想不到的其他事情:

It looks like the problem is the CONCURRENT characteristic, which does something else than you would think it might:

表示此收集器是 concurrent ,表示 结果容器可以支持累加器功能 从多个同一个结果容器中同时调用 线程.

Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

代替调用组合器,累加器被并发调用,所有线程使用相同的BigDecimal[] a.对a的访问不是原子的,因此会出错:

Instead of calling the combiner, the accumulator is being called concurrently, using the same BigDecimal[] a for all threads. The access to a is not atomic, so it goes wrong:

Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]

a[0] 7的值设置为10时.a[1]可能发生相同的情况,因此结果可能不一致.

Making the value of a[0] 7 when it should be 10. The same kind of thing can happen with a[1], so results can be inconsistent.

如果删除CONCURRENT特征,则会使用组合器.

If you remove the CONCURRENT characteristic, the combiner will get used instead.

这篇关于Java 8流组合器从未调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆