Node.js Streams 与 Observables [英] Node.js Streams vs. Observables

查看:25
本文介绍了Node.js Streams 与 Observables的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在了解了 Observables 之后,我发现它们与 Node.js 流.两者都有一种机制,可以在新数据到达、发生错误或没有更多数据 (EOF) 时通知消费者.

After learning about Observables, I find them quite similar to Node.js streams. Both have a mechanism of notifying the consumer whenever new data arrives, an error occurs or there is no more data (EOF).

我很想了解两者之间的概念/功能差异.谢谢!

I would love to learn about the conceptual/functional differences between the two. Thanks!

推荐答案

Observables 和 node.js 的 Streams 都允许您解决相同的底层问题:异步处理一个值序列.我认为,两者之间的主要区别与促使其出现的背景有关.该上下文反映在术语和 API 中.

Both Observables and node.js's Streams allow you to solve the same underlying problem: asynchronously process a sequence of values. The main difference between the two, I believe, is related to the context that motivated its appearance. That context is reflected in the terminology and API.

Observables 方面,您有一个 EcmaScript 扩展,它引入了反应式编程模型.它试图用 ObserverObservable 的极简主义和可组合的概念来填补值生成和异步性之间的空白.

On the Observables side you have an extension to EcmaScript that introduces the reactive programming model. It tries to fill the gap between value generation and asynchronicity with the minimalist and composable concepts of Observer and Observable.

在 node.js 和 Streams 端,您想为网络流和本地文件的异步和高性能处理创建一个接口.该术语源自该初始上下文,您将获得 pipechunkencodingflushDuplexBuffer 等.通过使用为特定用例提供明确支持的务实方法,您会失去一些组合事物的能力,因为它不是统一的.例如,您在 Readable 流上使用 push 并在 Writable 上使用 write 尽管从概念上讲,您正在做同样的事情:发布一个值.

On node.js and Streams side you wanted to create an interface for the asynchronous and performant processing of network streams and local files. The terminology derives from that initial context and you get pipe, chunk, encoding, flush, Duplex, Buffer, etc. By having a pragmatic approach that provides explicit support for particular use cases you lose some ability to compose things because it's not as uniform. For example, you use push on a Readable stream and write on a Writable although, conceptually, you are doing the same thing: publishing a value.

因此,在实践中,如果您查看概念,并且使用选项 { objectMode: true },则可以将 Observable 匹配Readable 流和 ObserverWritable 流.您甚至可以在两个模型之间创建一些简单的适配器.

So, in practice, if you look at the concepts, and if you use the option { objectMode: true }, you can match Observable with the Readable stream and Observer with the Writable stream. You can even create some simple adapters between the two models.

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}

您可能已经注意到,我更改了一些名称并使用了此处介绍的 ObserverSubscription 的更简单的概念,以避免 Generator 中的 >Observables.基本上,Subscription 允许您取消订阅 Observable.不管怎样,有了上面的代码,你就可以有一个pipe.

You may have noticed that I changed a few names and used the simpler concepts of Observer and Subscription, introduced here, to avoid the overload of reponsibilities done by Observables in Generator. Basically, the Subscription allows you to unsubscribe from the Observable. Anyway, with the above code you can have a pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

process.stdin.pipe(process.stdout) 相比,您拥有的是一种组合、过滤和转换流的方法,它也适用于任何其他数据序列.您可以使用 ReadableTransformWritable 流来实现它,但 API 支持子类化而不是链接 Readables和应用功能.在 Observable 模型上,例如,转换值对应于将转换器函数应用于流.它不需要 Transform 的新子类型.

Compared with process.stdin.pipe(process.stdout), what you have is a way to combine, filter, and transform streams that also works for any other sequence of data. You can achieve it with Readable, Transform, and Writable streams but the API favors subclassing instead of chaining Readables and applying functions. On the Observable model, For example, transforming values corresponds to applying a transformer function to the stream. It does not require a new subtype of Transform.

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

结论?很容易在任何地方引入反应模型和 Observable 概念.围绕该概念实现整个库更加困难.所有这些小功能都需要始终如一地协同工作.毕竟,ReactiveX 项目仍在进行中.但是如果你真的需要将文件内容发送到客户端,处理编码,然后压缩它,那么它的支持就在那里,在 NodeJS 中,它工作得很好.

The conclusion? It's easy to introduce the reactive model and the Observable concept anywhere. It's harder to implement an entire library around that concept. All those little functions need to work together consistently. After all, the ReactiveX project is still going at it. But if you really need to send the file content to the client, deal with encoding, and zip it then the support it's there, in NodeJS, and it works pretty well.

这篇关于Node.js Streams 与 Observables的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆