有没有一种从 java 8 流中提取数据块的好方法? [英] Is there a good way to extract chunks of data from a java 8 stream?

查看:15
本文介绍了有没有一种从 java 8 流中提取数据块的好方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是一个 ETL 过程,我正在从 Spring Data Repository 中检索很多实体.然后我使用并行流将实体映射到不同的实体.我可以使用使用者将这些新实体一个一个地存储在另一个存储库中,也可以将它们收集到一个 List 中并将其存储在单个批量操作中.第一个代价高昂,而后者可能会超出可用内存.

I an ETL process I'm retrieving a lot of entities from a Spring Data Repository. I'm then using a parallel stream to map the entities to different ones. I can either use a consumer to store those new entities in another repository one by one or collect them into a List and store that in a single bulk operation. The first is costly while the later might exceed the available memory.

是否有一种好方法可以在流中收集一定数量的元素(如 limit 那样),消耗该块,然后继续并行处理,直到处理完所有元素?

Is there a good way to collect a certain amount of elements in the stream (like limit does), consume that chunk, and keep on going in parallel until all elements are processed?

推荐答案

我使用分块进行批量操作的方法是使用分区拆分器包装器,以及另一个覆盖默认拆分策略的包装器(批量大小的算术级数以1024) 到简单的固定批次拆分.像这样使用它:

My approach to bulk operations with chunking is to use a partitioning spliterator wrapper, and another wrapper which overrides the default splitting policy (arithmetic progression of batch sizes in increments of 1024) to simple fixed-batch splitting. Use it like this:

Stream<OriginalType> existingStream = ...;
Stream<List<OriginalType>> partitioned = partition(existingStream, 100, 1);
partitioned.forEach(chunk -> ... process the chunk ...);

完整代码如下:

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class PartitioningSpliterator<E> extends AbstractSpliterator<List<E>>
{
  private final Spliterator<E> spliterator;
  private final int partitionSize;

  public PartitioningSpliterator(Spliterator<E> toWrap, int partitionSize) {
    super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL);
    if (partitionSize <= 0) throw new IllegalArgumentException(
        "Partition size must be positive, but was " + partitionSize);
    this.spliterator = toWrap;
    this.partitionSize = partitionSize;
  }

  public static <E> Stream<List<E>> partition(Stream<E> in, int size) {
    return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false);
  }

  public static <E> Stream<List<E>> partition(Stream<E> in, int size, int batchSize) {
    return StreamSupport.stream(
        new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false);
  }

  @Override public boolean tryAdvance(Consumer<? super List<E>> action) {
    final ArrayList<E> partition = new ArrayList<>(partitionSize);
    while (spliterator.tryAdvance(partition::add) 
           && partition.size() < partitionSize);
    if (partition.isEmpty()) return false;
    action.accept(partition);
    return true;
  }

  @Override public long estimateSize() {
    final long est = spliterator.estimateSize();
    return est == Long.MAX_VALUE? est
         : est / partitionSize + (est % partitionSize > 0? 1 : 0);
  }
}

<小时>

import static java.util.Spliterators.spliterator;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
    characteristics |= ORDERED;
    if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
    this.characteristics = characteristics;
    this.batchSize = batchSize;
    this.est = est;
  }
  public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
    this(characteristics, batchSize, Long.MAX_VALUE);
  }
  public FixedBatchSpliteratorBase(int characteristics) {
    this(characteristics, 64, Long.MAX_VALUE);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}

<小时>

import static java.util.stream.StreamSupport.stream;

import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> {
  private final Spliterator<T> spliterator;

  public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize, long est) {
    super(toWrap.characteristics(), batchSize, est);
    this.spliterator = toWrap;
  }
  public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, batchSize, toWrap.estimateSize());
  }
  public FixedBatchSpliterator(Spliterator<T> toWrap) {
    this(toWrap, 64, toWrap.estimateSize());
  }

  public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true);
  }

  public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) {
    return new FixedBatchSpliterator<>(toWrap, batchSize);
  }

  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
}

这篇关于有没有一种从 java 8 流中提取数据块的好方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆