RxJava和观察者代码的并行执行 [英] RxJava and parallel execution of observer code

查看:702
本文介绍了RxJava和观察者代码的并行执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用RxJava Observable api获得以下代码:

I am having the following code using RxJava Observable api :

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
    observable
      .buffer(10000)
      .observeOn(Schedulers.computation())
      .subscribe(recordInfo -> {
        _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
          for(Info info : recordInfo) {
            // some I/O operation logic
         }
      }, 
      exception -> {
      }, 
      () -> {
      });

我的期望是观察代码即subscribe()方法内的代码将在以后并行执行我已经指定了计算调度程序。相反,代码仍然在单个线程上顺序执行。如何使用RxJava api使代码并行运行。

My expectation was that the observation code i.e. code inside the subscribe() method will be executed in parallel after I have specified the computation scheduler. Instead the code is still being executed sequentially on single thread. How can make the code run in parallel using RxJava api.

推荐答案

当涉及到异步/多线程方面时,RxJava经常被误解它多线程操作的编码很简单,但理解抽象是另一回事。

RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.

关于RxJava的常见问题是如何从Observable实现并行化或同时发出多个项目。当然,这个定义违反了Observable Contract,它声明onNext()必须按顺序调用,而且一次不能同时调用多个线程。

A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time.

要实现并行性,您需要多个Observable。

To achieve parallelism you need multiple Observables.

这在单个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())
          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));

这在多个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

代码和文字来自此博文。

这篇关于RxJava和观察者代码的并行执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆