在 RxJS 6 中重置 ReplaySubject [英] Resetting ReplaySubject in RxJS 6

查看:13
本文介绍了在 RxJS 6 中重置 ReplaySubject的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个当前使用 ReplaySubject 实现的可过滤活动日志"(因为一些组件使用它并且它们可能会在不同时间订阅).

I have a filterable 'activity log' that's currently implemented using a ReplaySubject (since a few components use it and they might subscribe at different times).

当用户更改过滤器设置时,会发出一个新请求,但结果会附加到 ReplaySubject 而不是替换它.

When the user changes the filter settings, a new request is made, however the results are appended to the ReplaySubject rather than replacing it.

我想知道是否可以更新 ReplaySubject 以仅使用 switchMap 之类的东西发送新项目?

I was wondering if there is anyway to update the ReplaySubject to only send through the new items using something like a switchMap?

否则,我可能需要使用 BehaviorSubject 返回所有活动条目的数组,或者重新创建 ReplaySubject 并通知用户(可能通过使用另一个 observable)取消订阅并重新订阅新的 observable.

Otherwise, I might need to either use a BehaviorSubject that returns an array of all the activity entries or recreate the ReplaySubject and notify users (probably by using another observable) to unsubscribe and resubscribe to the new observable.

推荐答案

如果您希望能够在不让订阅者明确取消订阅和重新订阅的情况下重置主题,您可以执行以下操作:

If you want to be able to reset a subject without having its subscribers explicitly unsubscribe and resubscribe, you could do something like this:

import { Observable, Subject } from "rxjs";
import { startWith, switchMap } from "rxjs/operators";

function resettable<T>(factory: () => Subject<T>): {
  observable: Observable<T>,
  reset(): void,
  subject: Subject<T>
} {
  const resetter = new Subject<any>();
  const source = new Subject<T>();
  let destination = factory();
  let subscription = source.subscribe(destination);
  return {
    observable: resetter.asObservable().pipe(
      startWith(null),
      switchMap(() => destination)
    ),
    reset: () => {
      subscription.unsubscribe();
      destination = factory();
      subscription = source.subscribe(destination);
      resetter.next();
    },
    subject: source
  };
}

resettable 将返回一个包含以下内容的对象:

resettable will return an object containing:

  • 一个observable,resettable 的订阅者应该订阅它;
  • 一个subject,你可以根据它调用nexterrorcomplete;和
  • 一个 reset 函数,将重置(内部)主题.
  • an observable to which subscribers to the re-settable subject should subscribe;
  • a subject upon which you'd call next, error or complete; and
  • a reset function that will reset the (inner) subject.

你会这样使用它:

import { ReplaySubject } from "rxjs";
const { observable, reset, subject } = resettable(() => new ReplaySubject(3));
observable.subscribe(value => console.log(`a${value}`)); // a1, a2, a3, a4, a5, a6
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
observable.subscribe(value => console.log(`b${value}`)); // b2, b3, b4, b5, b6
reset();
observable.subscribe(value => console.log(`c${value}`)); // c5, c6
subject.next(5);
subject.next(6);

这篇关于在 RxJS 6 中重置 ReplaySubject的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆