一个上游流馈送多个下游流 [英] One upstream stream feeding multiple downstream streams

查看:147
本文介绍了一个上游流馈送多个下游流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个常见的Streams API问题,我想有效地解决。假设我有一个(可能非常大,可能是无限的)流。我想以某种方式预处理它,例如,过滤掉一些项目,并改变一些项目。让我们假设这个预处理是复杂的,时间和计算密集的,所以我不想做两次。

I have a general Streams API problem I'd like to solve "efficiently". Suppose I have a (possibly very large, possibly infinite) stream. I want to pre-process it in some way, for example, filtering out some items, and mutating some. Let's assume that this pre-processing is complex, time and compute intensive, so I do not want to do it twice.

接下来我想做两组不同的操作到项目序列并使用不同的流类型构造处理每个不同序列的远端。对于无限流,这将是forEach,对于有限的流,它可能是收集器或其他。

Next I want to do two distinct sets of operations to the item sequence and process the far end of each distinct sequence with a different stream-type construction. For an infinite stream, this would be a forEach, for a finite one, it might be a collector or whatever.

显然,我可能会将中间结果收集到列表中,然后从该列表中拖出两个单独的流,分别处理每个流。这对于有限的流是有效的,尽管a)它看起来丑陋而且b)对于非常大的流来说它可能是不切实际的,而对于无限流来说,flat out将不起作用。

Clearly, I might collect the intermediate results into a list, then drag two separate streams off that list, processing each stream individually. That would work for a finite stream, though a) it seems "ugly" and b) it's potentially impractical for a very large stream, and flat out won't work for an infinite stream.

我想我可以用peek作为一种T恤。然后,我可以对peek下游的结果执行一个处理链,并以某种方式强制消费者查看其他工作,但现在第二条路径不再是流。

I guess I could use peek as a kind of "tee". I could then perform one chain of processing on the results downstream of peek and somehow coerce the Consumer in peek to do the the "other" work, but now that second path is no longer a stream.

我发现我可以创建一个BlockingQueue,使用peek将东西推入该队列,然后从队列中获取一个流。这似乎是一个好主意,实际上工作得很好,虽然我无法理解流是如何关闭的(它实际上是这样,但我看不出如何)。以下示例代码说明了这一点:

I have discovered that I can create a BlockingQueue, use peek to push things into that queue, and then obtain a stream from the queue. This seems to be a fine idea and actually works quite well, though I fail to understand how the stream gets closed (it actually does, but I cannot see how). Here's sample code illustrating this:

List<Student> ls = Arrays.asList(
  new Student("Fred", 2.3F)
  // more students (and Student definition) elided ...
);

BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();

ls.stream()
  .peek(s -> {
     try {
       pipe.put(s);
     } catch (InterruptedException ioe) {
       ioe.printStackTrace();
     }
   })
   .forEach(System.out::println);

   new Thread(
     new Runnable() {
       public void run() {
         Map<String, Double> map = 
           pipe.stream()
             .collect(Collectors.groupingBy(s->s.getName(),
                      Collectors.averagingDouble(s->s.getGpa())));
         map.forEach(
           (k,v)->
             System.out.println(
               "Students called " + k 
               + " average " + v));

       }
     }).start();

所以,第一个问题是:有更好的方法吗?

So, the first question is: is there a "better" way to do this?

第二个问题,BlockingQueue上的流如何关闭?

Second question, how the heck is that stream on the BlockingQueue getting closed?

干杯,
托比

推荐答案

有趣的问题。我将首先回答第二个问题,因为这是一个更简单的问题。

Interesting problem. I'll answer the second question first, since it's a simpler issue.


第二个问题,怎么样就是BlockingQueue上的那个流关闭?

Second question, how the heck is that stream on the BlockingQueue getting closed?

通过关闭我认为你的意思是,流有一定数量的元素然后它完成,无视任何元素可能会在将来添加到队列中。原因是队列中的流仅表示创建流时队列的当前内容。它不代表任何未来的元素,也就是那些其他线程可能在将来添加的元素。

By "closed" I think you mean, the stream has certain number of elements and then it finishes, disregarding any elements that may be added to the queue in the future. The reason is that a stream on a queue represents only the current contents of a queue as of the time the stream is created. It doesn't represent any future elements, that is, those that some other thread might add in the future.

如果你想要一个代表当前和未来内容的流在队列中,您可以使用此其他答案中描述的技术。基本上使用 Stream.generate()来调用 queue.take()。我不认为这是你想要做的,所以我不会在这里进一步讨论。

If you want a stream that represents the current and future contents of the queue, then you can use the technique described in this other answer. Basically use Stream.generate() to call queue.take(). I don't think this is what you want to do, though, so I won't discuss it further here.

现在讨论更大的问题。

您有一个对象来源,您需要对其进行一些处理,包括过滤。然后,您想要获取结果并通过两个不同的下游处理步骤发送它们。基本上你有一个生产者和两个消费者。

You have a source of objects upon which you want to do some processing, including filtering. You then want to take the results and send them through two different downstream processing steps. Essentially you have a single producer and two consumers.

你必须处理的一个基本问题是如何处理不同处理步骤以不同速率发生的情况。假设我们已经解决了如何在没有流过早终止的情况下从队列中获取流的问题。如果生产者可以比消费者处理来自该队列的元素更快地生成元素,则队列将累积元素,直到它填满所有可用内存。

One of the fundamental issues you have to handle is how to deal with the cases where the different processing steps occur at different rates. Suppose that we've solve the issue of how to get a stream from queue without the stream terminating prematurely. If the producer can produce elements faster than the consumer can process elements from this queue, the queue will accumulate elements until it fills all available memory.

您还必须决定如何以不同的速度处理不同的消费者处理元素。如果一个消费者明显慢于另一个消费者,则可能需要缓冲任意数量的元素(这可能会填满内存),或者必须减慢更快的消费者以匹配较慢消费者的平均费率。

You also have to decide how to deal with the different consumers processing elements at different rates. If one consumer is significantly slower than the other, either an arbitrary number of elements may need to be buffered (which might fill memory) or the faster consumer will have to be slowed down to match the average rate of the slower consumer.

让我折腾一下你将如何进行的草图。但是我不知道你的实际要求,所以我不知道这是否会令人满意。需要注意的一点是,在这种应用程序中使用并行流可能会有问题,因为并行流不能很好地处理阻塞和负载平衡。

Let me toss out a sketch of how you might proceed. I don't know your actual requirements, though, so I have no idea whether this will be satisfactory. One thing to note is that using parallel streams with this kind of application can be problematic, since parallel streams don't deal with blocking and load-balancing very well.

首先,我将从生产者的流处理元素开始,并将它们累积到 ArrayBlockingQueue

First, I'd start off with a stream processing elements from the producer and accumulating them into an ArrayBlockingQueue:

BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity);
producer.map(...)
        .filter(...)
        .forEach(queue::put);

(注意 put 抛出 InterruptedException ,所以你不能在这里放入 queue :: put 。你必须在这里放一个try-catch块,或者写一个帮助器方法。但是如果 InterruptedException 被捕获,该怎么办并不明显。)

(Note that put throws InterruptedException, so you can't just put queue::put here. You have to put a try-catch block here, or write a helper method instead. But it's not obvious what to do if InterruptedException is caught.)

如果队列填满,这将阻止管道。要么在自己的线程中顺序运行,要么并行运行在专用线程池中,以避免阻塞公共池。

If the queue fills up, this will block the pipeline. Either run this sequentially in its own thread, or if in parallel, in a dedicated thread pool, to avoid blocking up the common pool.

接下来,消费者:

while (true) {
    // wait until the queue is full, or a timeout has expired,
    // depending upon how frequently you want to continue
    // processing elements emitted by the producer
    List<T> list = new ArrayList<>();
    queue.drainTo(list);
    downstream1 = list.stream().filter(...).map(...).collect(...);
    downstream2 = list.stream().filter(...).map(...).collect(...);
    // deal with results downstream1 and downstream2
}

这里的关键是从生产者到消费者的切换是使用 drainTo 方法批量完成的,该方法将队列的元素添加到目标并以原子方式清空队列。这样,消费者不必等待生产者完成其处理(如果它是无限的则不会发生)。此外,消费者正在使用已知数量的数据,并且不会在处理过程中阻止。因此,如果这有用,可以想象每个消费者流并行运行。

The key here is the handoff from the producer to the consumers is done in batches with the drainTo method, which adds the elements of the queue to the destination and atomically empties the queue. This way, the consumers don't have to wait for the producer to finish its processing (which won't happen if it's infinite). In addition, the consumers are operating on a known quantity of data and won't block in the midst of processing. Each consumer stream could thus conceivably be run in parallel, if that's helpful.

在这里,我让消费者​​以步调一致的方式运行。如果您希望消费者以不同的费率运行,您将必须构建额外的队列(或某些东西)以独立缓冲其工作负载。

Here, I have the consumers running in lockstep. If you want the consumers to run at different rates, you'll have to construct additional queues (or something) to buffer up their workloads independently.

如果消费者是整体的比生产者慢,队列最终会被填满并被阻止,从而使生产者减速到消费者可以接受的速度。如果消费者平均比生产者快,那么也许您不必担心消费者的相对处理率。你可以让它们循环并获取生产者设法放入队列的任何东西,甚至让它们阻塞直到可用的东西。

If the consumers are overall slower than the producer, the queue will eventually fill up and be blocked, slowing down the producer to the rate the consumers can accept. If the consumers are faster than the producer on average, then maybe you don't need to worry about the relative processing rates of the consumers. You can just have them loop and pick up whatever the producer has managed to put into the queue, or even have them block until something is available.

我应该说那是什么我概述了一种非常简单的多阶段流水线方法。如果您的应用程序对性能至关重要,您可能会发现自己在调整内存消耗,负载平衡,提高吞吐量和减少延迟方面做了大量工作。还有其他框架可能更适合您的应用程序。您可以查看 LMAX Disruptor ,例如,虽然我没有我自己的经验。

I should say that what I've outlined is a very simplistic approach to multi-stage pipelining. If your application is performance critical, you may find yourself doing a lot of work tuning memory consumption, load balancing, increasing throughput and reducing latency. There are other frameworks out there that may be more amenable to your application. You might take a look at the LMAX Disruptor, for example, though I don't have any experience with it myself.

这篇关于一个上游流馈送多个下游流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆