FlatMap何时会同时收听多个来源? [英] When FlatMap will listen to multiple sources concurrently?

查看:146
本文介绍了FlatMap何时会同时收听多个来源?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在什么情况下导致Flux::flatMap同时收听多个来源(0 ...无穷大)?

What are the situations which cause Flux::flatMap to listen to multiple sources (0...infinity) concurrently?

我在实验中发现,当上游将信号发送到线程thread-upstream-1中的flatMap且有N内部流时,flatMap会监听这些流,并且每个内部流都以不同的方式发送信号/em>线程:thread-inner-stream-i表示1<=i<=N,如果thread-upstream-1 != thread-inner-stream-iflatMap会同时同时收听所有内部流,则每个1<=i<=N.

I found out, while experimenting, that when the upstream send signals to flatMap in thread thread-upstream-1 and there are N inner streams which flatMap will listen to and each of them send signals in different thread: thread-inner-stream-i for 1<=i<=N, than for every 1<=i<=N if thread-upstream-1 != thread-inner-stream-i, flatMap will listen concurrently to all the inner streams.

我认为这并非完全正确,我错过了其他一些情况.

I think that it's not exactly true and I missed some other scenarios.

推荐答案

flatMap不执行任何并行工作,例如:它不会更改线程.最简单的例子是

flatMap doesn't do any parallel work, as in: it doesn't change threads. The simplest example is

Flux.range(1, 5).hide()
    .flatMap(v -> Flux.range(10 * v, 2))
    .log()
    .blockLast(); //for test purpose

此打印:

[main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO  reactor.Flux.FlatMap.1 - onComplete()

如您所见,仅在main中产生.如果在初始范围后添加publishOn,则flatMap会在publishOn切换到的同一单线程中产生所有内容.

As you can see, only produces in main. If you add a publishOn after the initial range, flatMap produces everything in the same single thread publishOn will switch to.

flatMap的作用是预订多个内部Publisher,最高为concurrency参数,默认值为Queues.SMALL_BUFFER_SIZE(256).

What flatMap does however is subscribe to multiple inner Publisher, up to the concurrency parameter with a default of Queues.SMALL_BUFFER_SIZE (256).

这意味着,如果将其设置为3,则flatMap会将3个源元素映射到其内部的Publisher并订阅这些发布者,但将等待至少一个完成,然后再开始映射更多源元素.

That means that if you set it to 3, flatMap will map 3 source elements to their inner Publisher and subscribe to these publishers, but will wait for at least one to complete before it starts mapping more source elements.

如果内部Publisher使用publishOnsubscribeOn,则flatMap自然会让它们的事件发生在随后定义的线程中:

If the inner Publisher use publishOn or subscribeOn, then flatMap will naturally let their events occur in the then-defined threads:

Flux.range(1, 5).hide()
    .flatMap(v -> Flux.range(v * 10, 2)
                      .publishOn(Schedulers.newParallel("foo", 3)))
    .flatMap(v -> Flux.range(10 * v, 2))
    .log()
    .blockLast(); //for test purpose

哪些印刷品:

[main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onComplete()

这篇关于FlatMap何时会同时收听多个来源?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆