Node.js流与可观察对象 [英] Node.js Streams vs. Observables

查看:31
本文介绍了Node.js流与可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

了解了可观察对象后,我发现它们与Node.js流.两者都有一种机制,可以在新数据到达,发生错误或没有更多数据(EOF)时通知消费者.

After learning about Observables, I find them quite similar to Node.js streams. Both have a mechanism of notifying the consumer whenever new data arrives, an error occurs or there is no more data (EOF).

我很想了解两者之间的概念/功能差异.谢谢!

I would love to learn about the conceptual/functional differences between the two. Thanks!

推荐答案

Observables 和node.js的 Streams 都可以解决相同的潜在问题:异步处理值的顺序.我认为,两者之间的主要区别与激发其外观的环境有关.该上下文反映在术语和API中.

Both Observables and node.js's Streams allow you to solve the same underlying problem: asynchronously process a sequence of values. The main difference between the two, I believe, is related to the context that motivated its appearance. That context is reflected in the terminology and API.

Observables 方面,您对EcmaScript进行了扩展,引入了反应式编程模型.它试图用 Observer Observable 的极简和可组合的概念填补价值生成和异步之间的空白.

On the Observables side you have an extension to EcmaScript that introduces the reactive programming model. It tries to fill the gap between value generation and asynchronicity with the minimalist and composable concepts of Observer and Observable.

在node.js和 Streams 端,您想创建一个接口,用于网络流和本地文件的异步处理和高性能处理.该术语从该初始上下文派生而来,您将获得 pipe chunk encoding flush Duplex Buffer 等.通过采用务实的方法为特定用例提供显式支持,您会失去一些编写东西的能力,因为它不那么统一.例如,您在 Readable 流上使用 push ,在 Writable 上使用 write ,尽管从概念上讲,您正在做同样的事情:发布一个值.

On node.js and Streams side you wanted to create an interface for the asynchronous and performant processing of network streams and local files. The terminology derives from that initial context and you get pipe, chunk, encoding, flush, Duplex, Buffer, etc. By having a pragmatic approach that provides explicit support for particular use cases you lose some ability to compose things because it's not as uniform. For example, you use push on a Readable stream and write on a Writable although, conceptually, you are doing the same thing: publishing a value.

因此,实际上,如果您查看这些概念,并且使用选项 {objectMode:true} ,则可以将 Observable Readable 流和 Writable 流的 Observer .您甚至可以在两个模型之间创建一些简单的适配器.

So, in practice, if you look at the concepts, and if you use the option { objectMode: true }, you can match Observable with the Readable stream and Observer with the Writable stream. You can even create some simple adapters between the two models.

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}

您可能已经注意到,我更改了一些名称,并使用了此处介绍的 Observer Subscription 的简单概念,以避免 Generator 中的> Observables .基本上,使用 Subscription 可以取消订阅 Observable .无论如何,使用上面的代码,您可以拥有一个 pipe .

You may have noticed that I changed a few names and used the simpler concepts of Observer and Subscription, introduced here, to avoid the overload of reponsibilities done by Observables in Generator. Basically, the Subscription allows you to unsubscribe from the Observable. Anyway, with the above code you can have a pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

process.stdin.pipe(process.stdout)相比,您拥有的是一种组合,过滤和转换流的方法,该方法也适用于任何其他数据序列.您可以使用 Readable Transform Writable 流来实现它,但是API支持子类化而不是链接 Readable s和应用功能.例如,在 Observable 模型上,转换值对应于将转换器函数应用于流.它不需要 Transform 的新子类型.

Compared with process.stdin.pipe(process.stdout), what you have is a way to combine, filter, and transform streams that also works for any other sequence of data. You can achieve it with Readable, Transform, and Writable streams but the API favors subclassing instead of chaining Readables and applying functions. On the Observable model, For example, transforming values corresponds to applying a transformer function to the stream. It does not require a new subtype of Transform.

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

结论?很容易在任何地方引入反应模型和 Observable 概念.围绕该概念实现整个库比较困难.所有这些小功能都需要始终如一地协同工作.毕竟, ReactiveX 项目仍在进行中.但是,如果您确实需要将文件内容发送到客户端,进行编码并压缩,则可以在NodeJS中找到它的支持,并且效果很好.

The conclusion? It's easy to introduce the reactive model and the Observable concept anywhere. It's harder to implement an entire library around that concept. All those little functions need to work together consistently. After all, the ReactiveX project is still going at it. But if you really need to send the file content to the client, deal with encoding, and zip it then the support it's there, in NodeJS, and it works pretty well.

这篇关于Node.js流与可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆