如何使用Reactor框架2.x执行多线程map / reduce? [英] How to perform multi-threaded map/reduce using Reactor framework 2.x?

查看:202
本文介绍了如何使用Reactor框架2.x执行多线程map / reduce?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我以前曾询问此问题< a> for Reactor 1.x:


假设我有一个 Collection< Map> 。我想:



将每个映射实例转换为 Foo 并发(每个实例完全独立于另一个实例 - 不需要连续/迭代地转换每个实例)。



当所有实例都转换时,想要aa方法, onReduce(Collection< Foo> foos),被调用 - 参数包含所有的结果 Foo 实例。


但是我们似乎找不到Reactor 2.x的等效解决方案 - >

如何在Reactor 2.x中执行多线程map / reduce?例如,你如何使用基于ExecutorService的Dispatcher来做这件事?

你可以这样做:

  List< Map< String,Object> data = readData(); //< 1> 

Streams.from(data)
.flatMap(m - > Streams.just(m)
.dispatchOn(Environment.cachedDispatcher())//< 2>
.map(ignored - > Thread.currentThread()。getName()))
.buffer()//< 3>
.consume(s - > System.out.println(s:+ s)); //< 4>




  1. 创建 > c>

  2. 缓冲所有值,直到完成,这将在收集被清空时发送到下游

  3. 使用来自子流的负载平衡变换的结果的列表。


I previously asked this question for Reactor 1.x:

Let's say I have a Collection<Map>. I want to:

Transform each Map instance to an object of type Foo concurrently (each instance is totally independent of another - there is no need to convert each serially/iteratively).

When all of them are converted, I want a a method, onReduce(Collection<Foo> foos), to be called - the argument contains all of the resulting Foo instances.

But we can't seem to find an equivalent solution for Reactor 2.x - just single threaded.

How do you perform multi-threaded map/reduce in Reactor 2.x? For example, how might you do this with an ExecutorService-based Dispatcher?

解决方案

It's actually pretty easy now with Reactor 2.0. You could do something like this:

List<Map<String, Object>> data = readData(); // <1>

Streams.from(data)
       .flatMap(m -> Streams.just(m)
                            .dispatchOn(Environment.cachedDispatcher()) // <2>
                            .map(ignored -> Thread.currentThread().getName()))
       .buffer() // <3>
       .consume(s -> System.out.println("s: " + s)); // <4>

  1. Create a Stream based on the input data.
  2. Create a new Stream for each Map and dispatch map operations on the given Dispatcher.
  3. Buffer all values until complete, which will be sent downstream when collection is emptied.
  4. Consume List which is the result of load-balanced transforms from the substream.

这篇关于如何使用Reactor框架2.x执行多线程map / reduce?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆