RxJS:如何让一个Observer处理多个Observables? [英] RxJS: How to have one Observer process multiple Observables?

查看:475
本文介绍了RxJS:如何让一个Observer处理多个Observables?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用一个调用我实现的函数的框架。我希望将此函数的参数转换为Observable,并通过一系列Observers发送。我认为我可以使用一个主题,但它的行为并不像我预期的那样。

I am working with a framework that calls a function I implement. I would like the parameter of this function to be converted to an Observable, and sent through a sequence of Observers. I thought I could use a Subject for this, but it isn't behaving as I expected.

为了澄清,我有类似下面的代码。我认为下面的选项1 会起作用,但到目前为止,我正在解决选项2 ,这似乎不是惯用的完全没有。

To clarify, I have something like the following code. I thought Option 1 below would work, but so far I am settling for Option 2, which doesn't seem idiomatic at all.

var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);

// The framework calls this method
function onEvent(eventArray) {

   var eventSource = Rx.Observable.from(eventArray);

   // Option 1: I thought this would work, but it doesn't
   // eventSource.subscribe(eventSubject);

   // Option 2: This does work, but its obviously clunky
  eventSource.subscribe(
      function(event) {
          log.debug("sending to subject");
          eventSubject.onNext(event);
      },
      function(e) {
          log.error(e);
      },
      function() {
          console.log('eventSource onCompleted');
      }
  );
}


推荐答案

这就是你的时候订阅整个主题到一个observable,你最终订阅该主题的 onComplete 事件。这意味着当观察完成时,它将完成您的主题。因此,当你获得下一组事件时,他们什么也不做,因为主题已经完成。

It is just that when you subscribe the whole Subject to an observable, you end up subscribing the onComplete event of that observable to the subject. Which means when the observable completes, it will complete your subject. So when you get the next set of events, they do nothing because the subject is already complete.

一个解决方案就是你所做的。只需在主题上订阅 onNext 事件。

One solution is exactly what you did. Just subscribe the onNext event to the subject.

第二个解决方案,可以说更像rx like就是将您的传入数据视为可观察的可观察数据,并使用 mergeAll

A second solution, and arguably more "rx like" is to treat your incoming data as an observable of observables and flatten this observable stream with mergeAll:

var eventSubject = new Rx.Subject(); // observes observable sequences
var resultSource = eventSubject
  .mergeAll() // subscribe to each inner observable as it arrives
  .map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);

// The framework calls this method
function onEvent(eventArray) {

   var eventSource = Rx.Observable.from(eventArray);
   // send a new inner observable sequence
   // into the subject
   eventSubject.onNext(eventSource);
}

更新:这是一个正在运行的示例

这篇关于RxJS:如何让一个Observer处理多个Observables?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆