RxJava2 - 间隔和调度程序 [英] RxJava2 - interval and schedulers

查看:44
本文介绍了RxJava2 - 间隔和调度程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个区间,并且我已经给了它一个计算调度器.像这样:

Let's say I have an interval, and that I've given it a computationScheduler. Like this:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

那么,在 flatmap {...} 中发生的所有事情都会被调度到一个计算线程上吗?

Then, will everything that happens in flatmap {...} also be scheduled on a computation thread?

在 Observable.interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) 的源代码中,它说:

In the sources for Observable.interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler), it says:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

作为 RxJava 的初学者,我很难理解这个评论.我知道间隔计时器/等待逻辑发生在计算线程上.但是,关于发出的项目的最后一部分是否也意味着发出的项目将在同一线程上使用?或者是否需要observeOn?像这样:

As a beginner to RxJava, I'm having a hard time understanding this comment. I understand that the interval timer/waiting logic occurs on the computation thread. But, does the last part, about items being emitted, also mean that the emitted items will be consumed on the same thread? Or is an observeOn required for that? Like this:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

如果我希望在计算线程上处理发射,那么observeOn 是否有必要?

Would that observeOn be necessary if I want the emits to be processed on the computation thread?

推荐答案

这个验证起来很简单:只要打印当前线程,看看操作符是在哪个线程上执行的:

This is simple to verify: just print the current thread to see on which thread the operator is executed:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这将始终打印:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

顺序处理,因为所有都发生在单个线程中 ->main.

Processed sequentially because all happen in a single thread -> main.

observeOn 将改变下游执行线程:

observeOn will change the downstream execution thread :

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

每次执行的结果都不同,但是flatmapsubscribe会在不同的线程中处理:

The result this time will be different for each execution but flatmap and subscribe will be processed in diffrent threads:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

interval 将充当 observeOn 并更改下游执行线程(调度程序):

interval will act as observeOn and change the downstream execution thread (scheduler):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这次在计算调度器的一个线程内是顺序执行的:

This time the execution is sequential inside one thread of computation scheduler:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

interval 将默认使用计算调度程序,您不需要将其作为参数传递,也不需要 observeOn

interval will by default use the computation scheduler, you don't need to pass it as an argument and observeOn is not needed

这篇关于RxJava2 - 间隔和调度程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆