如何在 rxjs 中为每个订阅在可观察管道上执行一次初始化逻辑 [英] How to do initialization logic on an observable pipe once per subscription in rxjs

查看:41
本文介绍了如何在 rxjs 中为每个订阅在可观察管道上执行一次初始化逻辑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我有这个 observable 管道,我需要在订阅开始时执行一次操作,就像您可以使用 finalize() 在订阅结束时执行一次操作一样订阅

So I've got this observable pipe where I need to do an operation once at the beginning of the subscription, just like you can use finalize() to do an operation once at the end of a subscription

所以这就是我的开始,不幸的是,它会在每次针对主题进行的 next() 调用时启动一次.

So this is what I started with, unfortunately it will do the startup once per each next() call that is made towards the subject.

    const notificationSubject = new BehaviorSubject<Notification | undefined>(undefined);
    const notifications$ = this.notificationSubject.pipe(
      tap(() => startup()),
      filter(isValueDefined),
      finalize(() => shutdown())
    );

   notifications$.subscribe(noti => foo(noti));
   notifications$.subscribe(noti => bar(noti));

然后我们得到了这个变体:

Then we got this variant:

    let isStartedUp = false;
    const internalStartup = () => {
      if(!isStartedUp){
        isStartedUp = true;
        startup();
      }
    }

    const notifications$ = notificationSubject.pipe(
      tap(() => internalStartup()),
      filter(isValueDefined),
      finalize(() => shutdown())
    );

   notifications$.subscribe(noti => foo(noti));
   notifications$.subscribe(noti => bar(noti));

...它可以完成它的工作,但是它做得有点过分了,因为现在启动只进行一次(并且仅在第一次订阅时),而不是每个创建的订阅一次.

... which does it's job however it does it a little too well as now the startup is only made once ever (and only on the first subscription) instead of once per subscription that is created.

我想有类似的东西,但我还没有找到.

I imagine there being something along the lines of this but I haven't found it.

const notifications$ = notificationSubject.pipe(
      initialize(() => startup()),
      finalize(() => shutdown())
    );

推荐答案

您可以使用 defer 在每次订阅时执行一些代码.

You can use defer to execute some code on every subscribe.

export function initialize<T>(initializer: () => void): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => defer(() => {
    initializer();
    return source;
  });
}

const notifications$ = notificationSubject.pipe(
  initialize(() => startup()),
  finalize(() => shutdown())
);

这篇关于如何在 rxjs 中为每个订阅在可观察管道上执行一次初始化逻辑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆