可观察的主题事件侦听器 [英] Observable subject event listener

查看:88
本文介绍了可观察的主题事件侦听器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在研究Observables及其与EventEmitter的区别,然后偶然发现Subjects(我可以看到Angulars EventEmitter基于).

I was looking into Observables and their differences to EventEmitter and then stumbled upon Subjects ( which I can see Angulars EventEmitter is based off ).

Observables似乎是单播,而多播的主题(然后EE只是将.next封装在emit中以提供正确接口的主题).

It seems Observables are unicast vs Subjects that are multicast ( and then an EE is simply a subject that wraps .next in emit to give the correct interface ).

可观察对象似乎很容易实现

Observables seem easy enough to implement

class Observable {
    constructor(subscribe) {
        this._subscribe = subscribe;
    }

    subscribe(next, complete, error) {
        const observer = new Observer(next, complete, error);

        // return way to unsubscribe
        return this._subscribe(observer);
    }

}

Observer 只是一个包装,其中添加了一些尝试捕获并监视isComplete,以便可以清理并停止观察.

Where Observer is just a wrapper that adds some try catches and monitors isComplete so it can clean up and stop observing.

对于一个主题,我想到了:

For a Subject I came up with:

class Subject {
    subscribers = new Set();

    constructor() {
        this.observable = new Observable(observer => {
            this.observer = observer;
        });

        this.observable.subscribe((...args) => {
            this.subscribers.forEach(sub => sub(...args))
        });
    }

    subscribe(subscriber) {
        this.subscribers.add(subscriber);
    }

    emit(...args) {
        this.observer.next(...args);
    }
}

哪种合并到一个EventEmitter中,并用.next包裹起来,用发射-捕获Observable的 observe 参数似乎是错误的-就像我刚刚破解了一个解决方案一样.从可观察(单播)产生主题(多播)的更好方法是什么?

which sort of merges into an EventEmitter with it wrapping .next with emit - but capturing the observe argument of the Observable seems wrong - and like I have just hacked up a solution. What would be the better way to produce a Subject (multicast) from an Observable (unicast)?

我尝试查看RXJS,但看不到它的 subscribers 数组是如何填充的:/

I tried looking at RXJS but I can't see how it's subscribers array ever gets populated :/

推荐答案

我认为您也可以通过使用调试器来更好地理解.打开一个StackBlitz RxJS项目,创建最简单的示例(取决于您要理解的内容),然后放置一些断点.AFAIK,使用StackBlitz可以调试TypeScript文件,这看起来很棒.

I think you can have a better understanding by using the debugger as well. Open a StackBlitz RxJS project, create the simplest example(depending on what you're trying to understand) and then place some breakpoints. AFAIK, with StackBlitz you can debug the TypeScript files, which seems great.

首先, Subject 现在让我们检查 Observable 类.

它具有著名的 管道方法:

It has the well-known pipe method:

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

其中定义了 pipeFromArray 在弄清上面片段中发生的事情之前,重要的是要知道 operators .运算符是返回另一个函数的函数,该函数的单个参数是 Observable< T> ,其返回类型是 Observable< R> .有时, T R 可以相同(例如,当使用 filter() debounceTime()时.).

Before clarifying what's going on in the above snippet, it is important to know that operators are. An operator is a function which returns another function whose single argument is an Observable<T> and whose return type is an Observable<R>. Sometimes, T and R can be the same(e.g when using filter(), debounceTime()...).

例如, map 定义如下:

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate((source, subscriber) => {
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));
      })
    );
  });
}

export function operate<T, R>(
  init: (liftedSource: Observable<T>, subscriber: Subscriber<R>) => (() => void) | void
): OperatorFunction<T, R> {
  return (source: Observable<T>) => {
    if (hasLift(source)) {
      return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
        try {
          return init(liftedSource, this);
        } catch (err) {
          this.error(err);
        }
      });
    }
    throw new TypeError('Unable to lift unknown Observable type');
  };
}

因此, operate 返回功能.注意其参数: source:Observable< T> .返回类型源自 Subscriber< R> .

So, operate will return a function. Notice its argument: source: Observable<T>. The return type is derived from Subscriber<R>.

Observable.lift 只会创建一个新的 Observable .就像在喜欢的列表中创建节点一样.

Observable.lift just creates a new Observable. It's like creating nodes within a liked list.

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  
  // it's important to keep track of the source !
  observable.source = this;
  observable.operator = operator;
  return observable;
}

因此,运算符(如 map )将返回一个函数.调用该函数的是 pipeFromArray 函数:

So, an operator(like map) will return a function. What invokes that function is the pipeFromArray function:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    // here the functions returned by the operators are being called
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

在上面的代码段中, operate 函数返回的内容是 fn :

In the above snippet, fn is what the operate function returns:

return (source: Observable<T>) => {
  if (hasLift(source)) { // has `lift` method
    return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
      try {
        return init(liftedSource, this);
      } catch (err) {
        this.error(err);
      }
    });
  }
  throw new TypeError('Unable to lift unknown Observable type');
};

也许最好还是看一个例子.我建议您自己尝试使用调试器.

Maybe it would be better to see an example as well. I'd recommend trying this yourself with a debugger.

const src$ = new Observable(subscriber => {subscriber.next(1), subscriber.complete()});

subscriber =>{} 回调fn将分配给

The subscriber => {} callback fn will be assigned to the Observable._subscribe property.

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

接下来,让我们尝试添加一个运算符:

Next, let's try adding an operator:

const src2$ = src$.pipe(map(num => num ** 2))

在这种情况下,它将从 pipeFromArray 调用该块:

In this case, it will invoke this block from pipeFromArray:

// `pipeFromArray`
if (fns.length === 1) {
  return fns[0];
}

// `Observable.pipe`
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

因此, Observable.pipe 将调用(source:Observable< T>)=>{...} ,其中 source src $ Observable .通过调用该函数(其结果存储在 src2 $ 中),它还将调用 Observable.lift 方法.

So, the Observable.pipe will invoke (source: Observable<T>) => { ... }, where source is the src$ Observable. By invoking that function(whose result is stored in src2$), it will also call the Observable.lift method.

return source.lift(function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {
    this.error(err);
  }
});

/* ... */

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

此时, src $ 是一个 Observable 实例,其 source 设置为 src $ .并且 operator 设置为 function(this:Subscriber< R>,liftedSource:Observable< T>)... .

At this point, src$ is an Observable instance, which has the source set to src$ and the operator set to function (this: Subscriber<R>, liftedSource: Observable<T>) ....

从我的角度来看,这完全是关于链接列表的.当创建 Observable 链(通过添加运算符)时,列表是从上到下创建的.
tail节点调用其 subscribe 方法时,将创建另一个列表,这次是从下到上.我喜欢将第一个称为 Observable列表,将第二个称为 Subscribers列表.

From my perspective, it's all about linked lists. When creating the Observable chain(by adding operators), the list is created from top to bottom.
When the tail node has its subscribe method called, another list will be created, this time from bottom to top. I like to call the first one the Observable list and the second one the Subscribers list.

src2$.subscribe(console.log)

subscribe时,会发生这种情况 方法称为:

This is what happens when the subscribe method is called:

const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
  
  const { operator, source } = this;
  subscriber.add(
    operator
      ? operator.call(subscriber, source)
      : source || config.useDeprecatedSynchronousErrorHandling
      ? this._subscribe(subscriber)
      : this._trySubscribe(subscriber)
  );

  return subscriber;

在这种情况下, src2 $ 有一个 operator ,因此它将调用它. operator 定义为:

In this case src2$ has an operator, so it will call that. operator is defined as:

function (this: Subscriber<R>, liftedSource: Observable<T>) {
  try {
    return init(liftedSource, this);
  } catch (err) {
    this.error(err);
  }
}

其中 init 取决于所使用的运算符.再次,这是 map init

where init depends on the operator that is used. Once again, here is map's init

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return operate( /* THIS IS `init()` */(source, subscriber) => {
    
    // The index of the value from the source. Used with projection.
    let index = 0;
    // Subscribe to the source, all errors and completions are sent along
    // to the consumer.
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) => {
        // Call the projection function with the appropriate this context,
        // and send the resulting value to the consumer.
        subscriber.next(project.call(thisArg, value, index++));
      })
    );
  });
}

实际上是 src $ .当调用 source.subscribe()时,它将最终调用提供给 new Observable(subscriber => {...})的回调.调用 subscriber.next(1)将调用(value:T)=>.{...} 从上面,它将调用 subscriber.next(project.call(thisArg,value,index ++))); ( project -回调提供给 map ).最后, subscriber.next 是指 console.log .

source is in fact src$. When source.subscribe() is called, it will end up calling the callback provided to new Observable(subscriber => { ... }). Calling subscriber.next(1) will call the (value: T) => { ... } from above, which will call subscriber.next(project.call(thisArg, value, index++));(project - the callback provided to map). Lastly, subscriber.next refers to console.log.

回到 Subject ,这是

Coming back to Subject, this is what happens when the _subscribe method is called:

protected _subscribe(subscriber: Subscriber<T>): Subscription {
  this._throwIfClosed(); // if unsubscribed
  this._checkFinalizedStatuses(subscriber); // `error` or `complete` notifications
  return this._innerSubscribe(subscriber);
}

protected _innerSubscribe(subscriber: Subscriber<any>) {
  const { hasError, isStopped, observers } = this;
  return hasError || isStopped
    ? EMPTY_SUBSCRIPTION
    : (observers.push(subscriber), new Subscription(() => arrRemove(this.observers, subscriber)));
}

因此,这就是填充主题的订阅者列表的方式.通过返回 new Subscription(()=> arrRemove(this.observers,subscriber)),它可以确保随后用户取消订阅(由于 complete / error 通知,或者只是 subscriber.unsubscribe()),则非活动订阅者将从 Subject 的列表中删除.

So, this is how Subject's list of subscribers are is populated. By returning new Subscription(() => arrRemove(this.observers, subscriber)), it ensures that then subscriber unsubscribes(due to complete/error notifications or simply subscriber.unsubscribe()), the inactive subscriber will be removed from the Subject's list.

这篇关于可观察的主题事件侦听器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆