RxJava观察调用/订阅线程 [英] RxJava Observing on calling/subscribing thread

查看:149
本文介绍了RxJava观察调用/订阅线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很难理解subscribeOn / observeOn如何在RxJava中工作。我创建了一个带有observable的简单应用程序,可以发出太阳系行星名称,进行一些映射和过滤并打印结果。

I have some trouble understandig how subscribeOn/observeOn works in RxJava. I've created simple app with observable that emits solar system planet names, does some mapping and filtering and prints results.

据我所知,后台线程的调度工作是通过 subscribeOn 运算符完成的(它似乎工作正常) 。

As I understand, scheduling work to background thread is done via subscribeOn operator (and it seems to work fine).

观察后台线程也适用于 observeOn 运算符。

Observing on background thread also works fine with observeOn operator.

但我在理解方面有困难,如何在调用线程上观察(如果它是主线程或其他任何线程)。使用 AndroidSchedulers.mainThread()运算符可以在Android上轻松完成,但我不知道如何在纯java中实现这一点。

But I have trouble in understanding, how to observe on calling thread (either if it is main thread or any other). It is easily done on Android with AndroidSchedulers.mainThread() operator, but I don't know how to achieve this in pure java.

这是我的代码:

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> {
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                })
                .filter(in -> {
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                })
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread("Thread-" + i) {
                @Override
                public void run() {
                    stringObservable
                            .buffer(5)
                            .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
                }
            };
            thread.start();
        }

    }

    private static String getCurrentThreadInfo() {
        return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
    }
}

创建和工作中​​的可观察性是在三个中的一个上订阅的执行者的线程。这按预期工作。但是如何在for循环中观察那些动态创建的线程的结果呢?有没有办法从当前线程创建调度程序?

Observable in created and work is subscribed on one of three thread from executor. This works as expected. But how to observe results on those dynamically created thread in for loop? Is there a way to create Scheduler from current thread?

此外,我发现在运行此代码后,它永远不会终止,我不知道为什么? :(

Also, I've found out that after running this code, it never terminates and I don't know why? :(

推荐答案

要回答你的问题,让我从头开始,让其他人了解你已经知道的事情。

To answer your question, let me start from beginning, this allows other people to understand what you already know.

调度程序

调度程序与Java的执行程序扮演相同的角色。简而言之 - 他们决定执行哪些线程动作。

Schedulers play the same role as Executors for Java. Briefly - they decide on which thread actions are executed.

通常一个Observable和运算符在当前线程中执行。有时您可以将Scheduler传递给Observable或operator作为参数(例如Observable.timer) ())。

Usually an Observable and operators execute in current thread. Sometimes you can pass Scheduler to Observable or operator as a parameter (e.g. Observable.timer()).

此外,RxJava提供2个运营商来指定调度程序:

Additionally RxJava provides 2 operators to specify Scheduler:


  • subscribeOn - 指定Observable将在其上运行的Scheduler

  • observeOn - 指定观察者将观察此Observable的Scheduler

为了快速理解它们,我使用了示例代码:

To understand them quickly, I use a the example code:

在所有示例中,我将使用helper createObservable,它的名称为关于whi的线程ch Observable运行:

On all samples, I will use helper createObservable, which emits a name of thread on which the Observable operates:

 public static Observable<String> createObservable(){
        return Observable.create((Subscriber<? super String> subscriber) -> {
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            }
        );
    }

没有调度程序:

createObservable().subscribe(message -> {
        System.out.println("Case 1 Observer thread " + message);
        System.out.println("Case 1 Observable thread " + Thread.currentThread().getName());
    });
    //will print:
    //Case 1 Observer thread main
    //Case 1 Observable thread main

订阅:

createObservable()
            .subscribeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 2 Observer thread " + message);
                System.out.println("Case 2 Observable thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 2 Observer thread RxNewThreadScheduler-1
            //Case 2 Observable thread RxNewThreadScheduler-1

SubscribeOn和ObserveOn:

SubscribeOn and ObserveOn:

reateObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 3 Observer thread " + message);
                System.out.println("Case 3 Observable thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 3 Observer thread RxNewThreadScheduler-2
            //Case 3 Observable thread RxNewThreadScheduler-1

ObserveOn:

ObserveOn:

createObservable()
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 4 Observer thread " + message);
                System.out.println("Case 4 Observable thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 4 Observer thread main
            //Case 4 Observable thread RxNewThreadScheduler-1

答案:

AndroidSchedulers.mainThread()返回一个sheduler,它将工作委托给MessageQueue关联主线程。

为此,它使用android.os.Looper.getMainLooper()和android.os.Handler。

AndroidSchedulers.mainThread() returns a sheduler which delegates work to MessageQueue associated with main thread.
For this purpose it uses android.os.Looper.getMainLooper() and android.os.Handler.

换句话说,如果要指定特定线程,则必须提供在线程上调度和执行任务的方法。

In other words, if you would like to specify particular thread, you must provide means to schedule and perform tasks on thread.

在它下面可以使用任何类型的MQ来存储循环Quee和执行任务的任务和逻辑。

Underneath it may use any kind of MQ for storing tasks and logic which loops the Quee and execute tasks.

在java中,我们有Executor,它被指定用于此类任务。 RxJava可以从这样的Executor轻松创建Scheduler。

In java, we have Executor which is designated for such tasks. RxJava can easily create Scheduler from such Executor.

下面是一个示例,它显示了如何在主线程上观察(不是特别有用,但显示所有必需的部分)。

Below is example which shows how you can observe on main thread (not particular useful but show all required parts).

public class RunCurrentThread implements Executor {

private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

public static void main(String[] args) throws InterruptedException {
    RunCurrentThread sample = new RunCurrentThread();
    sample.observerOnMain();
    sample.runLoop();
}

private void observerOnMain() {
    createObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.from(this))
            .subscribe(message -> {
                System.out.println("Observer thread " + message);
                System.out.println("Observable thread " + Thread.currentThread().getName());
            });
    ;
}

public Observable<String> createObservable() {
    return Observable.create((Subscriber<? super String> subscriber) -> {
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            }
    );
}

private void runLoop() throws InterruptedException {
    while(!Thread.interrupted()){
        tasks.take().run();
    }
}

@Override
public void execute(Runnable command) {
    tasks.add(command);
}

}

最后一个问题,为什么你的代码没有终止:

And the last question, why your code does not terminate:

ThreadPoolExecutor通过defult使用非deamon线程,因此你的程序在它们存在之前不会结束。
你应该使用关机关闭线程的方法。

ThreadPoolExecutor uses non deamon threads by defult, thus your program does not end until they exist. You should use shutdown method to close the threads.

这篇关于RxJava观察调用/订阅线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆