分区Java 8流 [英] Partition a Java 8 Stream

查看:292
本文介绍了分区Java 8流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在Java 8 Stream上实现分区操作?通过分区我的意思是,将一个流分成给定大小的子流。不知何故,它与Guava Iterators.partition()方法,只是希望这些分区是懒惰评估的Streams而不是List。

将任意源流划分为固定大小的批处理是不可能的,因为这会搞乱并行处理。并行处理时,您可能不知道分割后的第一个子任务中有多少个元素,因此您无法为下一个子任务创建分区,直到完成第一个子任务的处理。



但是可以从随机访问 List 创建分区流。这种功能可用,例如,在我的 StreamEx library:

  List< Type> input = Arrays.asList(...); 

Stream< List< Type>> stream = StreamEx.ofSubLists(input,partitionSize);

或者如果您确实需要流的流:

 流< Stream< Type>> stream = StreamEx.ofSubLists(input,partitionSize).map(List :: stream); 

如果您不想依赖第三方库,可以实现<$ c

  public static< T>手动添加子元素流<列表与LT; T>> ofSubLists(List< T> source,int length){
if(length< = 0)
throw new IllegalArgumentException(length =+ length);
int size = source.size();
if(size< = 0)
return Stream.empty();
int fullChunks =(size - 1)/ length;
return IntStream.range(0,fullChunks + 1).mapToObj(
n - > source.subList(n * length,n == fullChunks?size:(n + 1)* length));

$ / code>

这个实现看起来有点长,但它考虑到了一些角落案例比如接近MAX_VALUE的列表大小。






如果您想为无序流提供并行友好的解决方案(因此,无需关心哪些流元素将在单个批次中组合),您可以像这样使用收集器(感谢@sibnick获取灵感):

  public static< T,A,R>收集器< T,?,R> unorderedBatches(int batchSize,
Collector< List< T> ;, A,R> downstream){
class Acc {
List< T> cur = new ArrayList<>();
A acc = downstream.supplier()。get();
}
BiConsumer< Acc,T>累加器=(acc,t) - > {
acc.cur.add(t);
if(acc.cur.size()== batchSize){
downstream.accumulator()。accept(acc.acc,acc.cur);
acc.cur = new ArrayList<>();
}
};
return Collector.of(Acc :: new,accumulator,
(acc1,acc2) - > {
acc1.acc = downstream.combiner()。apply(acc1.acc,acc2 .acc);
for(T t:acc2.cur)accumulator.accept(acc1,t);
return acc1;
},acc - > {
if( !acc.cur.isEmpty())
downstream.accumulator()。accept(acc.acc,acc.cur);
return downstream.finisher()。apply(acc.acc);
},Collector.Characteristics.UNORDERED);

$ / code>

用法示例:

 列表与LT;列表与LT;整数>> list = IntStream.range(0,20)
.boxed()。parallel()
.collect(unorderedBatches(3,Collectors.toList()));

结果:

  [[2,3,4],[7,8,9],[0,1,5],[12,13,14],[17,18,19],[10,11 ,15],[6,16]] 

这种收集器完全是线程安全的并且生成有序批次对于顺序流。



如果您想为每个批次应用中间转换,则可以使用以下版本:

  public static< T,AA,A,B,R>收集器< T,?,R> unorderedBatches(int batchSize,
Collector< T,AA,B> batchCollector,
Collector< B,A,R> downstream){
return unorderedBatches(batchSize,
Collectors.mapping (list - > list.stream()。collect(batchCollector),downstream));

$ / code>

例如,通过这种方式,您可以对每批中的数字进行总结:

 列表<整数> list = IntStream.range(0,20)
.boxed()。parallel()
.collect(unorderedBatches(3,Collectors.summingInt(Integer :: intValue),
Collectors.toList ()));


How to implement "partition" operation on Java 8 Stream? By partition I mean, divide a stream into sub-streams of a given size. Somehow it will be identical to Guava Iterators.partition() method, just it's desirable that the partitions are lazily-evaluated Streams rather than List's.

解决方案

It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.

However it is possible to create the stream of partitions from the random access List. Such feature is available, for example, in my StreamEx library:

List<Type> input = Arrays.asList(...);

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);

Or if you really want the stream of streams:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);

If you don't want to depend on third-party libraries, you can implement such ofSubLists method manually:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
    if (length <= 0)
        throw new IllegalArgumentException("length = " + length);
    int size = source.size();
    if (size <= 0)
        return Stream.empty();
    int fullChunks = (size - 1) / length;
    return IntStream.range(0, fullChunks + 1).mapToObj(
        n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}

This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.


If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
                   Collector<List<T>, A, R> downstream) {
    class Acc {
        List<T> cur = new ArrayList<>();
        A acc = downstream.supplier().get();
    }
    BiConsumer<Acc, T> accumulator = (acc, t) -> {
        acc.cur.add(t);
        if(acc.cur.size() == batchSize) {
            downstream.accumulator().accept(acc.acc, acc.cur);
            acc.cur = new ArrayList<>();
        }
    };
    return Collector.of(Acc::new, accumulator,
            (acc1, acc2) -> {
                acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
                for(T t : acc2.cur) accumulator.accept(acc1, t);
                return acc1;
            }, acc -> {
                if(!acc.cur.isEmpty())
                    downstream.accumulator().accept(acc.acc, acc.cur);
                return downstream.finisher().apply(acc.acc);
            }, Collector.Characteristics.UNORDERED);
}

Usage example:

List<List<Integer>> list = IntStream.range(0,20)
                                    .boxed().parallel()
                                    .collect(unorderedBatches(3, Collectors.toList()));

Result:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]

Such collector is perfectly thread-safe and produces ordered batches for sequential stream.

If you want to apply an intermediate transformation for every batch, you may use the following version:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
        Collector<T, AA, B> batchCollector,
        Collector<B, A, R> downstream) {
    return unorderedBatches(batchSize, 
            Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}

For example, this way you can sum the numbers in every batch on the fly:

List<Integer> list = IntStream.range(0,20)
        .boxed().parallel()
        .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
            Collectors.toList()));

这篇关于分区Java 8流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆