RxJS:带多个动作和不同过滤器的 takeUntil? [英] RxJS: takeUntil with multiple actions and different filters?

查看:20
本文介绍了RxJS:带多个动作和不同过滤器的 takeUntil?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Observable 想继续执行,直到:

I have an Observable that I want to continue executing until:

1) uploadActions.MARK_UPLOAD_AS_COMPLETE 动作用特定的有效载荷调用

1) the uploadActions.MARK_UPLOAD_AS_COMPLETE action is called with a certain payload

2) uploadActions.UPLOAD_FAILURE 动作被任何负载调用

2) the uploadActions.UPLOAD_FAILURE action is called with any payload

这是我所能得到的(并且不起作用):

This is as far as I could get (and doesn't work):

return Observable.interval(5000)
  .takeUntil(
    action$
      .ofType(
        uploadActions.UPLOAD_FAILURE,
        uploadActions.MARK_UPLOAD_AS_COMPLETE
      )
      .filter(a => { // <---- this filter only applies to uploadActions.MARK_UPLOAD_AS_COMPLETE
        const completedFileHandle = a.payload;
        return handle === completedFileHandle;
      })
  )
  .mergeMap(action =>
    ...
  );

有没有一种干净的方法可以实现这一目标?

Is there a clean way I could achieve this?

推荐答案

我会将这两个条件拆分为单独的流,然后像这样合并它们:

I'd split the two conditions into separate streams and then merge them like so:

const action$ = new Rx.Subject();
const uploadActions = {
  UPLOAD_FAILURE: "UPLOAD_FAILURE",
  MARK_UPLOAD_AS_COMPLETE: "MARK_UPLOAD_AS_COMPLETE"
};
const handle = 42;

window.setTimeout(() => action$.next({
  type: uploadActions.MARK_UPLOAD_AS_COMPLETE,
  payload: handle
}), 1200);

Rx.Observable.interval(500)
  .takeUntil(
    Rx.Observable.merge(
      action$.filter(x => x.type === uploadActions.UPLOAD_FAILURE),
      action$.filter(x => x.type === uploadActions.MARK_UPLOAD_AS_COMPLETE)
      	.filter(x => x.payload === handle)      
    )
  ).subscribe(
    x => { console.log('Next: ', x); },
    e => { console.log('Error: ', e); },
    () => { console.log('Completed'); }
  );

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>

例如,我不得不使用 filter 运算符而不是 ofType,因为 ofType 是一个 redux 的东西.

For the example I had to use the filter operator instead of ofType since ofType is an redux thing.

这篇关于RxJS:带多个动作和不同过滤器的 takeUntil?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆