Observable 主题事件监听器 [英] Observable subject event listener

查看:64
本文介绍了Observable 主题事件监听器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究 Observables 及其与 EventEmitter 的差异,然后偶然发现了 Subjects(我可以看到 Angulars EventEmitter 是基于它的).

I was looking into Observables and their differences to EventEmitter and then stumbled upon Subjects ( which I can see Angulars EventEmitter is based off ).

似乎 Observables 是单播的,而 Subjects 是多播的(然后一个 EE 只是一个将 .next 包装在发射中以提供正确接口的主题).

It seems Observables are unicast vs Subjects that are multicast ( and then an EE is simply a subject that wraps .next in emit to give the correct interface ).

Observables 看起来很容易实现

Observables seem easy enough to implement

class Observable {
    constructor(subscribe) {
        this._subscribe = subscribe;
    }

    subscribe(next, complete, error) {
        const observer = new Observer(next, complete, error);

        // return way to unsubscribe
        return this._subscribe(observer);
    }

}

其中 Observer 只是一个包装器,它添加了一些尝试捕获并监控 isComplete,以便它可以清理并停止观察.

Where Observer is just a wrapper that adds some try catches and monitors isComplete so it can clean up and stop observing.

对于我想出的主题:

class Subject {
    subscribers = new Set();

    constructor() {
        this.observable = new Observable(observer => {
            this.observer = observer;
        });

        this.observable.subscribe((...args) => {
            this.subscribers.forEach(sub => sub(...args))
        });
    }

    subscribe(subscriber) {
        this.subscribers.add(subscriber);
    }

    emit(...args) {
        this.observer.next(...args);
    }
}

哪种合并到一个 EventEmitter 中,它用发射包裹 .next - 但是捕获 Observable 的 observe 参数似乎是错误的 - 就像我刚刚找到了一个解决方案.从 Observable(单播)生成主题(多播)的更好方法是什么?

which sort of merges into an EventEmitter with it wrapping .next with emit - but capturing the observe argument of the Observable seems wrong - and like I have just hacked up a solution. What would be the better way to produce a Subject (multicast) from an Observable (unicast)?

我尝试查看 RXJS,但我看不到它是如何填充 subscribers 数组的:/

I tried looking at RXJS but I can't see how it's subscribers array ever gets populated :/

推荐答案

我认为您也可以通过使用调试器来更好地理解.打开一个 StackBlitz RxJS 项目,创建最简单的例子(取决于你试图理解的内容),然后放置一些断点.AFAIK,使用 StackBlitz,您可以调试 TypeScript 文件,这看起来很棒.

I think you can have a better understanding by using the debugger as well. Open a StackBlitz RxJS project, create the simplest example(depending on what you're trying to understand) and then place some breakpoints. AFAIK, with StackBlitz you can debug the TypeScript files, which seems great.

首先,Subject扩展Observable:

Firstly, the Subject class extends Observable:

export class Subject<T> extends Observable<T> implements SubscriptionLike { /* ... */ }

现在让我们检查 Observable 类.

Now let's examine the Observable class.

它有众所周知的管道方法:

It has the well-known pipe method:

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

其中定义了 pipeFromArray 如下:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

在阐明上述代码段中发生的事情之前,了解运算符是很重要的.运算符是一个函数,它返回另一个函数,该函数的单个参数是 Observable,其返回类型是 Observable.有时,TR 可以相同(例如,使用 filter() 时,debounceTime()...).

Before clarifying what's going on in the above snippet, it is important to know that operators are. An operator is a function which returns another function whose single argument is an Observable<T> and whose return type is an Observable<R>. Sometimes, T and R can be the same(e.g when using filter(), debounceTime()...).

例如,map定义如下:

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate((source, subscriber) => {
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));
      })
    );
  });
}

export function operate<T, R>(
  init: (liftedSource: Observable<T>, subscriber: Subscriber<R>) => (() => void) | void
): OperatorFunction<T, R> {
  return (source: Observable<T>) => {
    if (hasLift(source)) {
      return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
        try {
          return init(liftedSource, this);
        } catch (err) {
          this.error(err);
        }
      });
    }
    throw new TypeError('Unable to lift unknown Observable type');
  };
}

所以,operate返回一个函数.注意它的参数:source: Observable.返回类型派生自 Subscriber.

So, operate will return a function. Notice its argument: source: Observable<T>. The return type is derived from Subscriber<R>.

Observable.lift 只是创建一个新的 Observable.这就像在喜欢的列表中创建节点一样.

Observable.lift just creates a new Observable. It's like creating nodes within a liked list.

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  
  // it's important to keep track of the source !
  observable.source = this;
  observable.operator = operator;
  return observable;
}

因此,运算符(如 map)将返回一个函数.调用该函数的是 pipeFromArray 函数:

So, an operator(like map) will return a function. What invokes that function is the pipeFromArray function:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    // here the functions returned by the operators are being called
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

在上面的代码片段中,fnoperate 函数返回的内容:

In the above snippet, fn is what the operate function returns:

return (source: Observable<T>) => {
  if (hasLift(source)) { // has `lift` method
    return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
      try {
        return init(liftedSource, this);
      } catch (err) {
        this.error(err);
      }
    });
  }
  throw new TypeError('Unable to lift unknown Observable type');
};

也许最好也看一个例子.我建议您自己使用调试器尝试一下.

Maybe it would be better to see an example as well. I'd recommend trying this yourself with a debugger.

const src$ = new Observable(subscriber => {subscriber.next(1), subscriber.complete()});

订阅者 =>{} 回调 fn 将分配给 Observable._subscribe 属性.

The subscriber => {} callback fn will be assigned to the Observable._subscribe property.

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

接下来,让我们尝试添加一个运算符:

Next, let's try adding an operator:

const src2$ = src$.pipe(map(num => num ** 2))

在这种情况下,它将从 pipeFromArray 调用这个块:

In this case, it will invoke this block from pipeFromArray:

// `pipeFromArray`
if (fns.length === 1) {
  return fns[0];
}

// `Observable.pipe`
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

因此,Observable.pipe 将调用 (来源:Observable)=>{ ... },其中 sourcesrc$ Observable.通过调用该函数(其结果存储在 src2$ 中),它还将调用 Observable.lift 方法.

So, the Observable.pipe will invoke (source: Observable<T>) => { ... }, where source is the src$ Observable. By invoking that function(whose result is stored in src2$), it will also call the Observable.lift method.

return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {
    this.error(err);
  }
});

/* ... */

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

此时,src$ 是一个 Observable 实例,它的 source 设置为 src$并且 operator 设置为 function (this: Subscriber<R>,liftSource: Observable<T>) ....

At this point, src$ is an Observable instance, which has the source set to src$ and the operator set to function (this: Subscriber<R>, liftedSource: Observable<T>) ....

在我看来,这完全是关于链表.在创建Observable链时(通过添加操作符),列表是从上到下创建的.
tail 节点 调用了它的 subscribe 方法时,将创建另一个列表,这次是从下到上.我喜欢将第一个称为 Observable list,将第二个称为 Subscribers list.

From my perspective, it's all about linked lists. When creating the Observable chain(by adding operators), the list is created from top to bottom.
When the tail node has its subscribe method called, another list will be created, this time from bottom to top. I like to call the first one the Observable list and the second one the Subscribers list.

src2$.subscribe(console.log)

这是订阅时发生的情况 方法被调用:

This is what happens when the subscribe method is called:

const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
  
  const { operator, source } = this;
  subscriber.add(
    operator
      ? operator.call(subscriber, source)
      : source || config.useDeprecatedSynchronousErrorHandling
      ? this._subscribe(subscriber)
      : this._trySubscribe(subscriber)
  );

  return subscriber;

在这种情况下 src2$ 有一个 operator,所以它会调用它.operator 定义为:

In this case src2$ has an operator, so it will call that. operator is defined as:

function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {
    this.error(err);
  }
}

其中 init 取决于所使用的运算符.再一次,这里是mapinit

where init depends on the operator that is used. Once again, here is map's init

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate( /* THIS IS `init()` */(source, subscriber) => {
    
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));
      })
    );
  });
}

source 实际上是 src$.当 source.subscribe() 被调用时,它最终会调用提供给 new Observable(subscriber => { ... }) 的回调.调用 subscriber.next(1) 将调用 (value: T) =>{ ... } 从上面,它将调用 subscriber.next(project.call(thisArg, value, index++));(project - 回调提供给 map).最后,subscriber.next 指的是 console.log.

source is in fact src$. When source.subscribe() is called, it will end up calling the callback provided to new Observable(subscriber => { ... }). Calling subscriber.next(1) will call the (value: T) => { ... } from above, which will call subscriber.next(project.call(thisArg, value, index++));(project - the callback provided to map). Lastly, subscriber.next refers to console.log.

回到Subject,当_subscribe 方法被调用:

Coming back to Subject, this is what happens when the _subscribe method is called:

protected _subscribe(subscriber: Subscriber<T>): Subscription {
  this._throwIfClosed(); // if unsubscribed
  this._checkFinalizedStatuses(subscriber); // `error` or `complete` notifications
  return this._innerSubscribe(subscriber);
}

protected _innerSubscribe(subscriber: Subscriber<any>) {
  const { hasError, isStopped, observers } = this;
  return hasError || isStopped
    ? EMPTY_SUBSCRIPTION
    : (observers.push(subscriber), new Subscription(() => arrRemove(this.observers, subscriber)));
}

所以,这就是 Subject's 订阅者列表的填充方式.通过返回 new Subscription(() => arrRemove(this.observers,subscriber)),它确保订阅者取消订阅(由于 complete/error 通知或简单的 subscriber.unsubscribe()),非活动 订阅者将从 Subject 的列表中删除.

So, this is how Subject's list of subscribers are is populated. By returning new Subscription(() => arrRemove(this.observers, subscriber)), it ensures that then subscriber unsubscribes(due to complete/error notifications or simply subscriber.unsubscribe()), the inactive subscriber will be removed from the Subject's list.

这篇关于Observable 主题事件监听器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆