如何使用Reactor框架2.x执行多线程map / reduce? [英] How to perform multi-threaded map/reduce using Reactor framework 2.x?
问题描述
我以前曾询问此问题< a> for Reactor 1.x:
假设我有一个
Collection< Map>
。我想:
将每个
映射
实例转换为Foo
并发(每个实例完全独立于另一个实例 - 不需要连续/迭代地转换每个实例)。
当所有实例都转换时,想要aa方法,
onReduce(Collection< Foo> foos)
,被调用 - 参数包含所有的结果Foo
实例。
但是我们似乎找不到Reactor 2.x的等效解决方案 - >
如何在Reactor 2.x中执行多线程map / reduce?例如,你如何使用基于ExecutorService的Dispatcher来做这件事?
你可以这样做:
List< Map< String,Object> data = readData(); //< 1>
Streams.from(data)
.flatMap(m - > Streams.just(m)
.dispatchOn(Environment.cachedDispatcher())//< 2>
.map(ignored - > Thread.currentThread()。getName()))
.buffer()//< 3>
.consume(s - > System.out.println(s:+ s)); //< 4>
- 创建
流$ c $ > c>
- 缓冲所有值,直到完成,这将在收集被清空时发送到下游
- 使用来自子流的负载平衡变换的结果的列表。
I previously asked this question for Reactor 1.x:
Let's say I have a
Collection<Map>
. I want to:Transform each
Map
instance to an object of typeFoo
concurrently (each instance is totally independent of another - there is no need to convert each serially/iteratively).When all of them are converted, I want a a method,
onReduce(Collection<Foo> foos)
, to be called - the argument contains all of the resultingFoo
instances.
But we can't seem to find an equivalent solution for Reactor 2.x - just single threaded.
How do you perform multi-threaded map/reduce in Reactor 2.x? For example, how might you do this with an ExecutorService-based Dispatcher?
It's actually pretty easy now with Reactor 2.0. You could do something like this:
List<Map<String, Object>> data = readData(); // <1>
Streams.from(data)
.flatMap(m -> Streams.just(m)
.dispatchOn(Environment.cachedDispatcher()) // <2>
.map(ignored -> Thread.currentThread().getName()))
.buffer() // <3>
.consume(s -> System.out.println("s: " + s)); // <4>
- Create a
Stream
based on the input data. - Create a new
Stream
for eachMap
and dispatch map operations on the givenDispatcher
. - Buffer all values until complete, which will be sent downstream when collection is emptied.
- Consume List which is the result of load-balanced transforms from the substream.
这篇关于如何使用Reactor框架2.x执行多线程map / reduce?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!