使用 rx.js,如何从计时器上的现有可观察序列发出记忆结果? [英] Using rx.js, how do I emit a memoized result from an existing observable sequence on a timer?

查看:43
本文介绍了使用 rx.js,如何从计时器上的现有可观察序列发出记忆结果?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在自学使用 rxjs 进行响应式编程,我给自己设定了一个挑战,即创建一个可观察的流,无论发生什么,它都会始终向订阅者发出相同的结果.

I'm currently teaching myself reactive programming with rxjs, and I've set myself a challenge of creating an observable stream which will always emit the same result to a subscriber no matter what.

我已经记住了给定特定 URL 的 HTTPGET"流的创建,并且我试图每两秒对该流采取行动,结果是对于计时器的每个滴答声,我会从原始流中提取缓存/记忆的 HTTP 结果.

I've memoized the creation of an HTTP "GET" stream given a specific URL, and I'm trying to act on that stream every two seconds, with the outcome being that for each tick of the timer, I'll extract a cached/memoized HTTP result from the original stream.

import superagent from 'superagent';
import _ from 'lodash';

// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  req = req.end.bind(req);
  return Rx.Observable.fromNodeCallback(req)();
});

// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');

// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
  .map(() => response$)
  .flatMap($ => $)
  .subscribe(response => {
    console.log('Got the response!');
  });

我确信我必须在某处对 replay() 进行调用,但无论我做什么,每两秒都会发起一次新的 HTTP 调用.我该如何构建它,以便我可以从 URL 构造一个可观察对象,并让它始终向任何后续订阅者发出相同的 HTTP 结果?

I was sure that I'd have to stick a call to replay() in there somewhere, but no matter what I do, a fresh HTTP call is initiated every two seconds. How can I structure this so that I can construct an observable from a URL and have it always emit the same HTTP result to any subsequent subscribers?

编辑
我找到了一种方法来获得我想要的结果,但我觉得我遗漏了一些东西,应该能够用更简化的方法重构它:

EDIT
I found a way to get the result I want, but I feel like I am missing something, and should be able to refactor this with a much more streamlined approach:

var httpget = _.memoize(function(url) {
  var subject = new Rx.ReplaySubject();
  try {
    superagent.get(url).end((err, res) => {
      if(err) {
        subject.onError(err);
      }
      else {
        subject.onNext(res);
        subject.onCompleted();
      }
    });
  }
  catch(e) {
    subject.onError(e);
  }
  return subject.asObservable();
});

推荐答案

你的第一个代码示例实际上更接近实现方法

Your first code sample is actually closer to the way to do it

var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  return Rx.Observable.fromNodeCallback(req.end, req)();
});

但是,这不起作用,因为 fromNodeCallback 中似乎存在错误.为了解决这个问题,我认为您实际上是在寻找 AsyncSubject 而不是 ReplaySubject.后者有效,但前者正是为这种情况而设计的(如果这对您很重要,则没有数组创建 + 运行时检查缓存过期的开销).

However, this isn't working because there appears to be a bug in fromNodeCallback. As to work around till this is fixed, I think you are actually looking for the AsyncSubject instead of ReplaySubject. The latter works, but the former is designed for exactly this scenario (and doesn't have the overhead of an array creation + runtime checks for cache expiration if that matters to you).

var httpget = _.memoize(function(url) {

  var subject = new Rx.AsyncSubject();
  var req = superagent.get(url);
  Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
  return subject.asObservable();

});

最后,虽然 map 很欣赏你的想法,但你可以通过使用 flatMap 重载来简化你的 timer 代码Observable 直接:

Finally, though map appreciates that you are thinking of it, you can simplify your timer code by using the flatMap overload that takes an Observable directly:

Rx.Observable.timer(0, 2000)
  .flatMap($response)
  .subscribe(response => {
    console.log('Got the response');
  });

这篇关于使用 rx.js,如何从计时器上的现有可观察序列发出记忆结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆