如何在两个或更多Streams上执行外连接 [英] How to perform an outer join on two or more Streams

查看:90
本文介绍了如何在两个或更多Streams上执行外连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的应用程序中,我使用几个Streams来提供表单元素(ID,值)。元素由以下类定义:

In my application I use several Streams that provide Elements of the form ( ID, value ). An Element is defined by the following class:

static final class Element<T> implements Comparable<Element<T>> {
    final long id;
    final T value;

    Element(int id, T value) {
        this.id = id;
        this.value = value;
    }

    @Override
    public int compareTo(Element o) {
        return Long.compare(id, o.id);
    }
}

我的目标是加入两个或更多Streams元素的ID(在每个流中,ID被排序并且严格单调),例如:

My goal is to join two or more Streams by the Element's IDs (in each stream, the IDs are sorted and strictly monotonic), e.g.:

    Stream <Element> colour = Arrays.stream(new Element[]{new Element(1, "red"), new Element(2, "green"), new Element(4, "red"), new Element(6, "blue")}); 
    Stream <Element> length = Arrays.stream(new Element[]{new Element(2, 28), new Element(3, 9), new Element(4, 17), new Element(6, 11)});
    Stream <Element> mass = Arrays.stream(new Element[]{new Element(1, 87.9f), new Element(2, 21.0f), new Element(3, 107f)});

进入包含表单元素的单个Stream(ID,[T1,T2,T3]) :

into a single Stream that contains Elements of the form ( ID, [T1, T2, T3] ):

    Stream<Element<Object[]>> allProps = joinStreams(colour, length, mass);

通过应用这样的方法:

public Stream<Element<Object[]>> joinStreams(Stream<Element>... streams) {
    return ...;
}

生成的Stream应该提供一个FULL OUTER JOIN,即上面的示例:

The resulting Stream should deliver a FULL OUTER JOIN, i.e. for the above example:

1, "red",   null, 87.9
2, "green", 28,   21.0
3, null,    9,    107
4, "red"    17,   null
6, "blue",  11,   null

由于我使用Java的流API的经验到目前为止我通常使用迭代器来执行此类任务。

Since my experience with Java's streaming API is quite basic so far I normally use iterators for such tasks.

是否有惯用语(并且有效)通过Streams实现这种连接的方式?我可以使用任何实用程序库吗?

Is there an idiomatic (and efficient) way to perfom this kind of join with Streams? Are there any utility libraries that I could use?

附注:示例已简化。应用程序从类似于列的数据存储(没有真正的DMBS)接收数据,这些数据大小为几千兆字节,并且不容易进入内存。这种连接操作也没有内置支持。

Side note: The example is simplified. The application receives the data from something like a column-oriented data store (no real DMBS), that is several gigabytes in size and does not fit easily into memory. There's also no built-in support for this kind of join operation.

推荐答案

要构建完整的外连接流实现,我使用两个阻塞队列。队列与每个流相关联,并且Filler类(Runnable实现)从流中读取数据并将其写入队列。当填充类耗尽数据时,它会将流末尾标记写入队列。然后我从AbstractSpliterator构造一个spliterator。 tryAdvance方法实现从左队列和右队列获取值,并根据比较器结果使用或保存这些值。我使用了Element类的变体。请参阅以下代码:

To construct a full outer join stream implementation, I use two blocking queues. A queue is associated with each stream and a Filler class (a Runnable implementation) reads data from a stream and writes it to the queue. When the filler class runs out of data, it writes an end-of-stream marker to the queue. I then construct a spliterator from AbstractSpliterator. The tryAdvance method implementation takes a value from the left queue and right queue and consumes or holds these values depending on the comparator result. I use a variation of your Element class. See the following code:

import java.util.ArrayList;
import java.util.Collection;

public final class Element<T> implements Comparable<Element<T>> {
    final long id;
    final Collection<T> value;

    public Element(int id, T value) {
        this.id = id;
        // Order preserving
        this.value = new ArrayList<T>();
        this.value.add(value);
    }

    Element(long id, Element<T> e1, Element<T> e2) {
        this.id = id;
        this.value = new ArrayList<T>();
        add(e1);
        add(e2);
    }

    private void add(Element<T> e1) {
        if(e1 == null) {
            this.value.add(null);           
        } else {
            this.value.addAll(e1.value);
        }
    }

    /**
     * Used as End-of-Stream marker 
     */
    Element() {
        id = -1;
        value = null;
    }

    @Override
    public int compareTo(Element<T> o) {
        return Long.compare(id, o.id);
    }
}

加入实施

import java.util.Comparator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public  class OuterJoinSpliterator<T> extends Spliterators.AbstractSpliterator<Element<T>> {

    private final class Filler implements Runnable {
        private final Stream<Element<T>> stream;
        private final BlockingQueue<Element<T>> queue;

        private Filler(Stream<Element<T>> stream, BlockingQueue<Element<T>> queue) {
            this.stream = stream;
            this.queue = queue;
        }

        @Override
        public void run() {
            stream.forEach(x -> {
                try {
                    queue.put(x);
                } catch (final InterruptedException e) {
                    e.printStackTrace();
                }
            });
            try {
                queue.put(EOS);
            } catch (final InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public final Element<T> EOS = new Element<T>();
    private final int queueSize;
    private final BlockingQueue<Element<T>> leftQueue;
    private final BlockingQueue<Element<T>> rightQueue;
    protected Element<T> leftValue;
    protected Element<T> rightValue;

    private OuterJoinSpliterator(long estSize, int characteristics, int queueSize,
            Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) {
        super(estSize, characteristics);
        this.queueSize = queueSize;
        leftQueue = createQueue();
        rightQueue = createQueue();
        createFillerThread(leftStream, leftQueue).start();
        createFillerThread(rightStream, rightQueue).start();
    }

    private Element<T> acceptBoth(long id, Element<T> left, Element<T> right) {
        return new Element<T>(id, left, right);
    }

    private final Element<T> acceptLeft(Element<T> left) {
        return acceptBoth(left.id, left, null);
    }

    private final Element<T> acceptRight(Element<T> right) {
        return acceptBoth(right.id, null, right);
    }

    private final Thread createFillerThread(Stream<Element<T>> leftStream, BlockingQueue<Element<T>> queue) {
        return new Thread(new Filler(leftStream, queue));
    }

    private final ArrayBlockingQueue<Element<T>> createQueue() {
        return new ArrayBlockingQueue<>(queueSize);
    }

    @Override
    public Comparator<? super Element<T>> getComparator() {
        return null;
    }

    private final boolean isFinished() {
        return leftValue == EOS && rightValue == EOS;
    }

    @Override
    public final boolean tryAdvance(Consumer<? super Element<T>> action) {
        try {
            updateLeft();

            updateRight();

            if (isFinished()) {
                return false;
            }

            if (leftValue == EOS) {
                action.accept(acceptRight(rightValue));
                rightValue = null;
            } else if (rightValue == EOS) {
                action.accept(acceptLeft(leftValue));
                leftValue = null;
            } else {
                switch (leftValue.compareTo(rightValue)) {
                case -1:
                    action.accept(acceptLeft(leftValue));
                    leftValue = null;
                    break;
                case 1:
                    action.accept(acceptRight(rightValue));
                    rightValue = null;
                    break;
                default:
                    action.accept(acceptBoth(leftValue.id, leftValue, rightValue));
                    leftValue = null;
                    rightValue = null;
                }
            }
        } catch (final InterruptedException e) {
            return false;
        }
        return true;
    }

    private final void updateLeft() throws InterruptedException {
        if (leftValue == null) {
            leftValue = leftQueue.take();
        }
    }

    private final void updateRight() throws InterruptedException {
        if (rightValue == null) {
            rightValue = rightQueue.take();
        }
    }

    public static <T> Stream<Element<T>> join(long estSize, int characteristics, int queueSize, boolean parallel, Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) {
        Spliterator<Element<T>> spliterator = new OuterJoinSpliterator<>(estSize, characteristics, queueSize, leftStream, rightStream);
        return StreamSupport.stream(spliterator, parallel);
    }
}

您可以使用Long.MAX_VALUE作为估计大小。有关各种流特征的说明,请参见Spliterator接口。有关其他信息,请参阅AbstractSpliterator的注释。

You can use Long.MAX_VALUE as your estimated size. See the Spliterator interface for a description of the various stream characteristics. See comments for AbstractSpliterator for additional information.

这篇关于如何在两个或更多Streams上执行外连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆