使用 rx.js,如何从计时器上的现有可观察序列发出记忆结果? [英] Using rx.js, how do I emit a memoized result from an existing observable sequence on a timer?
问题描述
我目前正在自学使用 rxjs 进行响应式编程,我给自己设定了一个挑战,即创建一个可观察的流,无论发生什么,它都会始终向订阅者发出相同的结果.
I'm currently teaching myself reactive programming with rxjs, and I've set myself a challenge of creating an observable stream which will always emit the same result to a subscriber no matter what.
我已经记住了给定特定 URL 的 HTTPGET"流的创建,并且我试图每两秒对该流采取行动,结果是对于计时器的每个滴答声,我会从原始流中提取缓存/记忆的 HTTP 结果.
I've memoized the creation of an HTTP "GET" stream given a specific URL, and I'm trying to act on that stream every two seconds, with the outcome being that for each tick of the timer, I'll extract a cached/memoized HTTP result from the original stream.
import superagent from 'superagent';
import _ from 'lodash';
// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
var req = superagent.get(url);
req = req.end.bind(req);
return Rx.Observable.fromNodeCallback(req)();
});
// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');
// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
.map(() => response$)
.flatMap($ => $)
.subscribe(response => {
console.log('Got the response!');
});
我确信我必须在某处对 replay()
进行调用,但无论我做什么,每两秒都会发起一次新的 HTTP 调用.我该如何构建它,以便我可以从 URL 构造一个可观察对象,并让它始终向任何后续订阅者发出相同的 HTTP 结果?
I was sure that I'd have to stick a call to replay()
in there somewhere, but no matter what I do, a fresh HTTP call is initiated every two seconds. How can I structure this so that I can construct an observable from a URL and have it always emit the same HTTP result to any subsequent subscribers?
编辑
我找到了一种方法来获得我想要的结果,但我觉得我遗漏了一些东西,应该能够用更简化的方法重构它:
EDIT
I found a way to get the result I want, but I feel like I am missing something, and should be able to refactor this with a much more streamlined approach:
var httpget = _.memoize(function(url) {
var subject = new Rx.ReplaySubject();
try {
superagent.get(url).end((err, res) => {
if(err) {
subject.onError(err);
}
else {
subject.onNext(res);
subject.onCompleted();
}
});
}
catch(e) {
subject.onError(e);
}
return subject.asObservable();
});
推荐答案
你的第一个代码示例实际上更接近实现方法
Your first code sample is actually closer to the way to do it
var httpget = _.memoize(function(url) {
var req = superagent.get(url);
return Rx.Observable.fromNodeCallback(req.end, req)();
});
但是,这不起作用,因为 fromNodeCallback
中似乎存在错误.为了解决这个问题,我认为您实际上是在寻找 AsyncSubject
而不是 ReplaySubject
.后者有效,但前者正是为这种情况而设计的(如果这对您很重要,则没有数组创建 + 运行时检查缓存过期的开销).
However, this isn't working because there appears to be a bug in fromNodeCallback
. As to work around till this is fixed, I think you are actually looking for the AsyncSubject
instead of ReplaySubject
. The latter works, but the former is designed for exactly this scenario (and doesn't have the overhead of an array creation + runtime checks for cache expiration if that matters to you).
var httpget = _.memoize(function(url) {
var subject = new Rx.AsyncSubject();
var req = superagent.get(url);
Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
return subject.asObservable();
});
最后,虽然 map
很欣赏你的想法,但你可以通过使用 flatMap
重载来简化你的 timer
代码Observable
直接:
Finally, though map
appreciates that you are thinking of it, you can simplify your timer
code by using the flatMap
overload that takes an Observable
directly:
Rx.Observable.timer(0, 2000)
.flatMap($response)
.subscribe(response => {
console.log('Got the response');
});
这篇关于使用 rx.js,如何从计时器上的现有可观察序列发出记忆结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!