通过鉴别器功能对流进行分区 [英] Partition a Stream by a discriminator function

查看:92
本文介绍了通过鉴别器功能对流进行分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Streams API中缺少的功能之一是分区依据转换,例如 Clojure的。假设我想重现Hibernate的 fetch join :我想发出一个SQL SELECT语句来从结果中接收这种对象:

  class Family {
String surname;
List< String>成员;
}

我发行:



< pre class =lang-sql prettyprint-override> SELECT f.name,m.name
FROM Family f JOIN m.family_id上的成员m = f.id
ORDER BY f.name

我检索的平面流(f.name, m.name)记录。现在我需要将它转换为 Family 对象的流,并在其中包含其成员列表。假设我已经有一个 Stream< ResultRow> ;现在我需要将它转换为 Stream< List< ResultRow>> ,然后使用映射转换对其进行操作,将其转换为 Stream< ; Family>



转换的语义如下:继续将流收集到列表中只要提供的鉴别器功能保持返回相同的值;一旦值发生变化,将 List 作为输出流的元素发出,并开始收集新的 List



我希望能够编写这种代码(我已经有了 resultStream 方法):

  Stream< ResultRow> dbStream = resultStream(queryBuilder.createQuery(
SELECT f.name,m.name
+FROM Family f JOIN会员m在m.family_id = f.id
+ORDER BY f.name));
Stream< List< ResultRow> partitioned = partitionBy(r - > r.string(0),dbStream);
Stream< Family> = partitioned.map(rs - > {
Family f = new Family(rs.get(0).string(0));
f.members = rs.stream()。map(r - > r.string(1))。collect(toList());
return f;
});

毋庸置疑,我希望结果流保持懒惰(非物化),因为我想要能够处理任何大小的结果集而不会达到任何O(n)内存限制。如果没有这个关键要求,我会对提供的 groupingBy 收藏家感到满意。

解决方案

该解决方案要求我们定义一个自定义 Spliterator ,它可用于构造分区流。我们需要通过自己的spliterator访问输入流并将其包装到我们的。然后从我们的自定义分裂器构造输出流。



以下Spliterator会将任何 Stream< E> 转换为a 流< List< E>> 提供功能< E,?> 作为鉴别器功能。请注意,必须为此操作订购输入流才有意义。

 公共类PartitionBySpliterator< E>扩展AbstractSpliterator< List< E>> {
private final Spliterator< E> spliterator;
私人最终功能<?超级E,?> partitionBy;
private HoldingConsumer< E>持有人;
private Comparator< List< E>>比较;

public PartitionBySpliterator(Spliterator< E> toWrap,Function<?super E,?> partitionBy){
super(Long.MAX_VALUE,toWrap.characteristics()& ~SIZED | NONNULL );
this.spliterator = toWrap;
this.partitionBy = partitionBy;
}

public static< E>流<列表与LT E - 代替;> partitionBy(Function< E,?> partitionBy,Stream< E> in){
return StreamSupport.stream(new PartitionBySpliterator<>(in.spliterator(),partitionBy),false);
}

@Override public boolean tryAdvance(Consumer<?super list< E>> action){
final HoldingConsumer< E> H;
if(holder == null){
h = new HoldingConsumer<>();
if(!spliterator.tryAdvance(h))返回false;
持有人= h;
}
其他h =持有人;
final ArrayList< E> partition = new ArrayList<>();
final Object partitionKey = partitionBy.apply(h.value);
boolean didAdvance;
do partition.add(h.value);
while((didAdvance = spliterator.tryAdvance(h))
&& Objects.equals(partitionBy.apply(h.value),partitionKey));
if(!didAdvance)holder = null;
action.accept(分区);
返回true;
}

静态最终类HoldConsumer< T>实现Consumer< T> {
T值;
@Override public void accept(T value){this.value = value; }
}

@Override public Comparator<?超级列表< E>> getComparator(){
final Comparator< List< E>> c = this.comparator;
返回c!= null? c:(this.comparator = comparator());
}

private Comparator< List< E>> comparator(){
@SuppressWarnings({unchecked,rawtypes})
final Comparator<?超级E> innerComparator =
Optional.ofNullable(spliterator.getComparator())
.orElse((Comparator)naturalOrder());
返回(左,右) - > {
final int c = innerComparator.compare(left.get(0),right.get(0));
返回c!= 0? c:innerComparator.compare(
left.get(left.size() - 1),right.get(right.size() - 1));
};
}
}


One of the missing features in the Streams API is the "partition by" transformation, for example as defined in Clojure. Say I want to reproduce Hibernate's fetch join: I want to issue a single SQL SELECT statement to receive this kind of objects from the result:

class Family {
   String surname;
   List<String> members;
}

I issue:

SELECT f.name, m.name 
FROM Family f JOIN Member m on m.family_id = f.id
ORDER BY f.name

and I retrieve a flat stream of (f.name, m.name) records. Now I need to transform it into a stream of Family objects, with a list of its members inside. Assume I already have a Stream<ResultRow>; now I need to transform it into a Stream<List<ResultRow>> and then act upon that with a mapping transformation which turns it into a Stream<Family>.

The semantics of the transformation are as follows: keep collecting the stream into a List for as long as the provided discriminator function keeps returning the same value; as soon as the value changes, emit the List as an element of the output stream and start collecting a new List.

I hope to be able to write this kind of code (I already have the resultStream method):

Stream<ResultRow> dbStream = resultStream(queryBuilder.createQuery(
        "SELECT f.name, m.name"
      + " FROM Family f JOIN Member m on m.family_id = f.id"
      + " ORDER BY f.name"));
Stream<List<ResultRow> partitioned = partitionBy(r -> r.string(0), dbStream);
Stream<Family> = partitioned.map(rs -> {
                    Family f = new Family(rs.get(0).string(0));
                    f.members = rs.stream().map(r -> r.string(1)).collect(toList());
                    return f;
                 });

Needless to say, I expect the resulting stream to stay lazy (non-materialized) as I want to be able to process a result set of any size without hitting any O(n) memory limits. Without this crucial requirement I would be happy with the provided groupingBy collector.

解决方案

The solution requires us to define a custom Spliterator which can be used to construct the partitioned stream. We shall need to access the input stream through its own spliterator and wrap it into ours. The output stream is then constructed from our custom spliterator.

The following Spliterator will turn any Stream<E> into a Stream<List<E>> provided a Function<E, ?> as the discriminator function. Note that the input stream must be ordered for this operation to make sense.

public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> {
  private final Spliterator<E> spliterator;
  private final Function<? super E, ?> partitionBy;
  private HoldingConsumer<E> holder;
  private Comparator<List<E>> comparator;

  public PartitionBySpliterator(Spliterator<E> toWrap, Function<? super E, ?> partitionBy) {
    super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL);
    this.spliterator = toWrap;
    this.partitionBy = partitionBy;
  }

  public static <E> Stream<List<E>> partitionBy(Function<E, ?> partitionBy, Stream<E> in) {
    return StreamSupport.stream(new PartitionBySpliterator<>(in.spliterator(), partitionBy), false);
  }

  @Override public boolean tryAdvance(Consumer<? super List<E>> action) {
    final HoldingConsumer<E> h;
    if (holder == null) {
      h = new HoldingConsumer<>();
      if (!spliterator.tryAdvance(h)) return false;
      holder = h;
    }
    else h = holder;
    final ArrayList<E> partition = new ArrayList<>();
    final Object partitionKey = partitionBy.apply(h.value);
    boolean didAdvance;
    do partition.add(h.value);
    while ((didAdvance = spliterator.tryAdvance(h))
        && Objects.equals(partitionBy.apply(h.value), partitionKey));
    if (!didAdvance) holder = null;
    action.accept(partition);
    return true;
  }

  static final class HoldingConsumer<T> implements Consumer<T> {
    T value;
    @Override public void accept(T value) { this.value = value; }
  }

  @Override public Comparator<? super List<E>> getComparator() {
    final Comparator<List<E>> c = this.comparator;
    return c != null? c : (this.comparator = comparator());
  }

  private Comparator<List<E>> comparator() {
    @SuppressWarnings({"unchecked","rawtypes"})
    final Comparator<? super E> innerComparator =
        Optional.ofNullable(spliterator.getComparator())
                .orElse((Comparator) naturalOrder());
    return (left, right) -> {
      final int c = innerComparator.compare(left.get(0), right.get(0));
      return c != 0? c : innerComparator.compare(
          left.get(left.size() - 1), right.get(right.size() - 1));
    };
  }
}

这篇关于通过鉴别器功能对流进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆