RxJava .subscribeOn(Schedulers.newThread()) 问题 [英] RxJava .subscribeOn(Schedulers.newThread()) questions

查看:82
本文介绍了RxJava .subscribeOn(Schedulers.newThread()) 问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是普通的 JDK 8.我有一个简单的 RxJava 示例:

I am on plain JDK 8. I have this simple RxJava example:

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));

它会一行一行地打印出单词,并与有关线程的信息交织在一起,正如预期的那样,所有接下来的调用都是main".

and it prints out the words line by line, intertwined with information about the thread, which is 'main' for all next calls, as expected.

但是,当我取消注释 subscribeOn(Schedulers.newThread()) 调用时,根本没有打印任何内容.为什么它不起作用?我希望它为每个 onNext() 调用和 doOnNext() 启动一个新线程来打印该线程的名称.现在,我什么也没看到,其他调度程序也是如此.

However, when I uncomment the subscribeOn(Schedulers.newThread()) call, nothing is printed at all. Why isn't it working? I would have expected it to start a new thread for each onNext() call and the doOnNext() to print that thread's name. Right now, I see nothing, also for the other schedulers.

当我在 main 的末尾添加对 Thread.sleep(10000L) 的调用时,我可以看到输出,这表明 RxJava 使用的线程都是守护进程.是这种情况吗?这可以以某种方式改变,但使用自定义 ThreadFactory 或类似概念,而不必实现自定义调度程序?

When I add the call to Thread.sleep(10000L) at the end of my main, I can see the output, which would suggest the threads used by RxJava are all daemons. Is this the case? Can this be changed somehow, but using a custom ThreadFactory or similar concept, and not have to implement a custom Scheduler?

随着提到的变化,线程名称总是 RxNewThreadScheduler-1,而 newThread 的文档说为每个线程创建一个新的 {@link Thread}工作单位".不是应该为所有排放创建一个新线程吗?

With the mentioned change, the thread name is always RxNewThreadScheduler-1, whereas the documentation for newThread says "Scheduler that creates a new {@link Thread} for each unit of work". Isn't it supposed to create a new thread for all of the emissions?

推荐答案

正如 Vladimir 所提到的,RxJava 标准调度程序在守护线程上运行工作,这些线程在您的示例中终止,因为主线程退出.我想强调的是,他们不会在新线程上安排每个值,而是在新创建的线程上为每个订阅者安排值流.第二次订阅会给你RxNewThreadScheduler-2".

As Vladimir mentioned, RxJava standard schedulers run work on daemon threads which terminate in your example because the main thread quits. I'd like to emphasise that they don't schedule each value on a new thread, but they schedule the stream of values for each individual subscriber on a newly created thread. Subscribing a second time would give you "RxNewThreadScheduler-2".

您实际上并不需要更改默认调度程序,而只需使用 Schedulers.from() 包装您自己的基于 Executor 的调度程序并在需要时将其作为参数提供:

You don't really need to change the default schedulers, but just wrap your own Executor-based scheduler with Schedulers.from() and supply that as a parameter where needed:

ThreadPoolExecutor exec = new ThreadPoolExecutor(
        0, 64, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
exec.allowCoreThreadTimeOut(true);

Scheduler s = Schedulers.from(exec);

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
    Thread.currentThread().getName()))
.subscribeOn(s)
.subscribe(word -> System.out.println(word));

我有一系列关于博客文章RxJava 调度程序应该可以帮助您实现更持久"的变体.

I've got a series of blog posts about RxJava schedulers whichs should help you implement a "more permanent" variant.

这篇关于RxJava .subscribeOn(Schedulers.newThread()) 问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆