Rx BehaviorSubject + 扫描将先前事件推送给新订阅者? [英] Rx BehaviorSubject + scan pushing prior event to new subscriber?

查看:38
本文介绍了Rx BehaviorSubject + 扫描将先前事件推送给新订阅者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想要一个流,我可以向其中推送减速器功能.每次推送reducer 函数时,都应将状态对象传递给reducer,reducer 应返回修改后的状态值,并将更新的状态推送给订阅者.我希望我的代码可以解释:

I want to have an stream to which I can push reducer functions. Each time a reducer function is pushed, a state object should be passed to the reducer, the reducer should return a modified state value, and the updated state should be pushed to subscribers. I'm hoping my code can explain:

import Rx from 'rx';
import { Map } from 'immutable';

let initialState = Map({ counter: 0 });

export let upstream = new Rx.BehaviorSubject(Rx.helpers.identity);
export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState);

let increment = state => {
  return state.update('counter', counter => counter + 1);
};

upstream.onNext(increment);

downstream.subscribe(state => {
  console.log('subscriptionA', state.get('counter'));
});

upstream.onNext(increment);

setTimeout(() => {
  downstream.subscribe(state => {
    console.log('subscriptionB', state.get('counter'));
  });
}, 3000);

这是我看到的输出:

subscriptionA 1
subscriptionA 2
subscriptionB 1

虽然我希望看到:

subscriptionA 1
subscriptionA 2
subscriptionB 2

显然我在这里遗漏了一些基本的东西.似乎 BehaviorSubject 应该保留新订阅者的最新值,这让我认为当 subscriptionB 订阅 downstream 时,它会得到最新降低的值,但看起来中间有 .scan 会使事情变得混乱......或其他什么.

Obviously I'm missing something fundamental here. It seems the BehaviorSubject is supposed to retain the latest value for new subscribers which would make me think that when subscriptionB subscribes to downstream that it would get the latest reduced value, but it looks like having the .scan in the middle fouls things up...or something.

这里发生了什么以及我如何完成我想要完成的任务?谢谢!

What's going on here and how to I accomplish what I'm trying to accomplish? Thanks!

推荐答案

如果你更换了,你能不能试试看是否一切都符合你的预期

Can you try to see if everything is conform to your expectations if you replace

export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState);

export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState).shareReplay(1);

jsfiddle 这里:http://jsfiddle.net/cqaumutp/

jsfiddle here : http://jsfiddle.net/cqaumutp/

如果是这样,您就是 Rx.Observable 的热与冷性质的另一个受害者,或者更准确地说,可能是 observable 的惰性实例化.

If so, you are another victim of the hot vs. cold nature of Rx.Observable, or maybe more accurately the lazy instantiation of observables.

简而言之(不是那么短),每次您执行 subscribe 时都会发生的事情是,通过将操作符链向上推,会创建一个 observables 链.每个操作符订阅它的源,并返回另一个直到起始源的 observable.在您的情况下,当您订阅 scan 时,scan 订阅 upstream 这是最后一个.upstream 是一个主题,在订阅时它只是注册订阅者.其他来源会做其他事情(比如在 DOM 节点或套接字上注册一个监听器,或者其他什么).

In short (not so short), what happens everytime you do a subscribe, is that a chain of observables is created by going upstream the chain of operators. Each operator subscribes to its source, and returns another observable up to the starting source. In your case, when you subscribe to scan, scan subscribes to upstream which is the last one. upstream being a subject, on subscription it just registers the subscriber. Other sources would do other things (like register a listener on a DOM node, or a socket, or whatever).

这里的重点是,每次订阅 scan 时,您都会重新开始,即使用 initialState.如果要使用第一次订阅的值到 scan,则必须使用 share 运算符.在第一次订阅 share 时,它会将您的订阅请求传递给 scan.在第二个和后续的,它不会,它会注册它,并将所有来自 scan 首先订阅的值转发给相关的观察者.

The point here is that every time you subscribe to the scan, you start anew, i.e. with the initialState. If you want to use the values from the first subscription to the scan, you have to use the share operator. On the first subscription to the share, it will pass your subscription request on to the scan. On the second and subsequent ones, it will not, it will register it, and forward to the associated observer all the values coming from the scan firstly subscribed to.

这篇关于Rx BehaviorSubject + 扫描将先前事件推送给新订阅者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆