FlatMap何时会同时收听多个来源? [英] When FlatMap will listen to multiple sources concurrently?
问题描述
在什么情况下导致Flux::flatMap
同时收听多个来源(0 ...无穷大)?
What are the situations which cause Flux::flatMap
to listen to multiple sources (0...infinity) concurrently?
我在实验中发现,当上游将信号发送到线程thread-upstream-1
中的flatMap
且有N
内部流时,flatMap会监听这些流,并且每个内部流都以不同的方式发送信号/em>线程:thread-inner-stream-i
表示1<=i<=N
,如果thread-upstream-1 != thread-inner-stream-i
,flatMap
会同时同时收听所有内部流,则每个1<=i<=N
.
I found out, while experimenting, that when the upstream send signals to flatMap
in thread thread-upstream-1
and there are N
inner streams which flatMap will listen to and each of them send signals in different thread: thread-inner-stream-i
for 1<=i<=N
, than for every 1<=i<=N
if thread-upstream-1 != thread-inner-stream-i
, flatMap
will listen concurrently to all the inner streams.
我认为这并非完全正确,我错过了其他一些情况.
I think that it's not exactly true and I missed some other scenarios.
推荐答案
flatMap
不执行任何并行工作,例如:它不会更改线程.最简单的例子是
flatMap
doesn't do any parallel work, as in: it doesn't change threads. The simplest example is
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
此打印:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO reactor.Flux.FlatMap.1 - onComplete()
如您所见,仅在main
中产生.如果在初始范围后添加publishOn
,则flatMap
会在publishOn切换到的同一单线程中产生所有内容.
As you can see, only produces in main
. If you add a publishOn
after the initial range, flatMap
produces everything in the same single thread publishOn will switch to.
flatMap
的作用是预订多个内部Publisher
,最高为concurrency
参数,默认值为Queues.SMALL_BUFFER_SIZE
(256).
What flatMap
does however is subscribe to multiple inner Publisher
, up to the concurrency
parameter with a default of Queues.SMALL_BUFFER_SIZE
(256).
这意味着,如果将其设置为3
,则flatMap
会将3个源元素映射到其内部的Publisher
并订阅这些发布者,但将等待至少一个完成,然后再开始映射更多源元素.
That means that if you set it to 3
, flatMap
will map 3 source elements to their inner Publisher
and subscribe to these publishers, but will wait for at least one to complete before it starts mapping more source elements.
如果内部Publisher
使用publishOn
或subscribeOn
,则flatMap
自然会让它们的事件发生在随后定义的线程中:
If the inner Publisher
use publishOn
or subscribeOn
, then flatMap
will naturally let their events occur in the then-defined threads:
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(v * 10, 2)
.publishOn(Schedulers.newParallel("foo", 3)))
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
哪些印刷品:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO reactor.Flux.FlatMap.1 - onComplete()
这篇关于FlatMap何时会同时收听多个来源?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!