rxjs zip 不偷懒? [英] rxjs zip is not lazy?

查看:40
本文介绍了rxjs zip 不偷懒?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经删除了样板文件以进入正题

I 've removed the boilerplate to get to the point

//a.js

// My observables from stream and event
this.a = Rx.Node.fromStream(this.aStream());
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem');

// Zip 'em
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) {
    return {item: s2, a: s1.toString()};
});

// Streams the lowercase alphabet
rb.prototype.aStream = function aStream() {
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    console.log('Hit!');
    if (c > 'z'.charCodeAt(0)) {
        rs.push(null);
    }
};

return rs;
};

//b.js(需要上面导出的模块)

// b.js (requires the module exported above)

rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...)  in the module to trigger the itemSource observable

我希望看到的:

{item: 'a', a: 'a'} 打印在控制台

发生了什么:

Hit!{item: 'a', a: 'a'} 之前被打印了 24 次.这意味着 zipaStream 获取所有值,缓冲它们,然后做它应该做的事情.

Hit! was printed 24 times before {item: 'a', a: 'a'}. This means that zip took all the values from aStream, buffered them and then did what it was supposed to do.

我如何获得与 zip 提供的相同的功能但很懒惰?我的目标是使用无限流/可观察对象并使用有限(异步)流对其进行压缩.

How do I get the same functionality zip offers but lazily? My goal is to use an infinite stream/observable and zip it with a finite (async) one.

编辑

通过可运行查看/编辑它:RX Zip 测试 编辑 2 根据答案更新代码 -> 现在没有输出.

See/Edit it via runnable: RX Zip test Edit 2 Code updated based on answer -> no output now.

推荐答案

zip 确实很懒.它只订阅 ab 并在任何产生新值时执行其工作.

zip is indeed lazy. It just subscribes to a and b and does its work whenever either produces a new value.

您的问题是 fromStream 一旦 zip 订阅它就会同步发出它的所有值.发生这种情况是因为您的自定义 Readable 不断说有更多可用数据!"

Your problem is that fromStream is emitting all of its values synchronously as soon as zip subscribes to it. This is happening because your custom Readable is constantly saying "There is more data available!"

使您的 Readable 异步,您将获得所需的行为.

Make your Readable asynchronous and you'll get the desired behavior.

尝试这样的事情(未经测试)

Try something like this (untested)

var rs = Readable();
var subscription = null;
rs._read = function () {
    if (!subscription) {
        // produce the values once per second
        subscription = Rx.Observable
            .generateWithRelativeTime(
                97, // start value
                function (c) { return c > 'z'.charCodeAt(0); }, // end condition
                function (c) { return c + 1; }, // step function
                function (c) { return String.fromCharCode(c); }, // result selector
                function () { return 1000; }) // 1000ms between values
            .subscribe(
                function (s) {
                    rs.push(s);
                    console.log("Hit!");
                },
                function (error) { rs.push(null); },
                function () { rs.push(null); });
    }
};

这篇关于rxjs zip 不偷懒?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆