在通量上同时使用publishOn和subscribeOn不会导致任何事情发生 [英] Using both publishOn and subscribeOn on a flux results in nothing happening

查看:566
本文介绍了在通量上同时使用publishOn和subscribeOn不会导致任何事情发生的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

无论何时我同时使用subscribeOn和publishOn,都不会打印任何内容. 如果我只使用一个,它将打印. 如果我使用subscriptionOn(Schedulers.immediate())或弹性的,它的工作原理. 有什么想法吗?

Whenever i use both subscribeOn and publishOn nothing is printed. If I use only one it will print. If I use subscribeOn(Schedulers.immediate()) or elastic it works. Any ideea why that is?

据我了解,publishOn会影响它发布的线程,并影响订阅者运行的线程.你能指出我正确的方向吗?

It was my understanding that publishOn affects on what thread it gets published and subscribe on on what thread the subscriber runs. Could you please point me in the right direction?

fun test() {
        val testPublisher = EmitterProcessor.create<String>().connect()
        testPublisher
                .publishOn(Schedulers.elastic())
                .map { it ->
                    println("map on ${Thread.currentThread().name}")
                    it
                }
                .subscribeOn(Schedulers.parallel())  
                .subscribe { println("subscribe on ${Thread.currentThread().name}") }
        testPublisher.onNext("a")
        testPublisher.onNext("b")
        testPublisher.onNext("c")
        Thread.sleep(5000)
        println("---")
    }

推荐答案

subscribeOn会影响订阅发生的位置.也就是说,触发源发出元素的初始事件.另一方面,SubscriberonNext钩子受链中最接近的publishOn的影响(很像您的map).

subscribeOn rather influence where the subscription occurs. That is, the initial event that triggers the source to emit elements. The Subscriber 's onNext hook on the other hand is influenced by the closest publishOn up in the chain (much like your map).

但是EmitterProcessor与大多数Processor一样,更高级,可以完成一些偷窃工作.我不确定为什么您的机箱上没有打印任何东西(您的样本转换为Java可以在我的机器上使用),但是我敢打赌它与该处理器有关.

But EmitterProcessor, like most Processors, is more advanced and can do some work stealing. I'm unsure why you don't get anything printed in your case (your sample converted to Java works on my machine), but I bet it has something to do with that Processor.

此代码将更好地演示subscribeOnpublishOn:

This code would better demonstrate subscribeOn vs publishOn:

Flux.just("a", "b", "c") //this is where subscription triggers data production
        //this is influenced by subscribeOn
        .doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName()))
        .publishOn(Schedulers.elastic())
        //the rest is influenced by publishOn
        .doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.parallel())
        .subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName()));
    Thread.sleep(5000);

打印输出:

before publishOn: parallel-1
before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2

这篇关于在通量上同时使用publishOn和subscribeOn不会导致任何事情发生的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆