暂停和恢复可观察的视频流,请提出更好的选择 [英] Pausing and resuming an observable stream, please suggest better options

查看:39
本文介绍了暂停和恢复可观察的视频流,请提出更好的选择的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有必要从可观察对象那里听取项目流.当某些情况出现时,将对该项目执行异步任务,并且组件将处于忙碌"状态,直到完成为止.我想暂停订阅中的项目,直到此任务完成(因为以下项目的处理取决于结果),然后从序列中的下一个项目继续进行,而不会造成任何损失.

最好在看Plunk的时候阅读下一部分,

实践

在此示例中, src Observable每100ms发出一个值,并且每个值都映射到一个新的observable,该可观察的对象发出一个介于0到2000ms之间的值(异步任务).您可以看到订单很安全.

  let src = Rx.Observable.timer(0,100);src.concatMap(i => {返回Rx.Observable.timer(Math.random()* 2000).mapTo(i);//这是异步任务}).subscribe(data => console.log(data));  

 < script src ="https://unpkg.com/rxjs@5.4.0/bundles/Rx.min.js"></script>  

制作可观察的热点

您也不应使用这些订阅来使您的观察对象发出数据.实际上,您应该将您的冷到可观察到的冷使用 .publish() .connect()而不是 share() subscribe()代码>:

  this.source $ = Observable.range(1,500).zip(Observable.interval(500),函数(x,y){返回x;}).发布();//等等等等代码this.source $ .connect(); 

I have a requirement to listen to a stream of items from an observable. When certain conditions arise an asynchronous task will be performed on the item and the component will be 'busy' until this completes. I would like to pause handling items in the subscription until this task has completed (as the processing of the following items is dependent on the result) and then resume from the next item in the sequence without any loss.

the next part is probably best read whilst looking at the Plunk here

To achieve this I have used a buffer with a swtichMap. I thought these would do the job on their own but switchMap destroys and recreates the subscription the sequence gets reset every time.

export class AppComponent implements OnInit {
    source$: Observable<any>;
    clearBuffer$ = new Subject();
    busy$ = new Subject();

    private itemSubscription: Subscription;
    private stayAliveSubscription: Subscription;

    items: any[] = [];

    constructor() { }

    ngOnInit() {
      this.source$ = Observable.range(1, 500).zip(
        Observable.interval(500),
        function (x, y) { return x; }
      ).share();

      this.busy$
        .subscribe(result => {
          if (!result) {
            this.clearBuffer$.next();
          }
        }, error => {
          console.log(error);
        });
    }

    start() {
      if (!this.itemSubscription) {
        this.itemSubscription =
          this.busy$.switchMap(busy => {
            if (busy) {
              return this.source$.buffer(this.clearBuffer$);
            } else {
              return this.source$;
            }
          })
            .subscribe(items => {
              if (Array.isArray(items)) {
                this.items.push('buffered: ' + items.join());
              } else {
                this.items.push('live feed: ' + items);
              }
            }, error => {
              this.items.push(error);
            });

        this.stayAliveSubscription = this.source$
          .subscribe(result => {
            console.log(result);
          }, error => {
            console.log(error);
          });

        this.busy$.next(false);
      }
   }
...
}

To fix this the source$ observable is now shared and a separate subscription is started (stayAliveSubscription) so a single subscription is used throughout. This seems messy to me and I wanted to ask if anyone can show me better/alternative approaches to the problem.

I put the working sample in a Plunk here click start to start the subscription and then set/unset the busy toggle to buffer and continue.

edit: working code with concatMap

I changed the Plunk to use concatMap. I've pasted the code below as well. The key is that the busy observable returned in concatMap must complete you can't just return the busy$ observable multiple times and call next on it when the busy status changes.

    source$: Observable<any>;
    busy$ = new Subject();
    busy: boolean;

    private itemSubscription: Subscription;
    private stayAliveSubscription: Subscription;

    items: any[] = [];

    constructor() { }

    ngOnInit() {
      this.source$ = Observable.range(1, 500).zip(
        Observable.interval(500),
        function (x, y) { return x; }
      );

      this.busy$
        .subscribe(busy => {
          this.busy = <any>busy;
        });
    }

    start() {
      if (!this.itemSubscription) {
        this.itemSubscription = this.source$.concatMap(item => {
          const busySubject = new Subject();
          this.busy$
            .subscribe(result => {
              busySubject.next(item);
              busySubject.complete();
            });

          if (this.busy) {
            return busySubject;
          } else {
            return Observable.of(item);
          }

        })
          .subscribe(item => {
            this.items.push(item);
          }, error => {
            this.items.push(error);
          });
      }

      this.setBusy(false);
    }

解决方案

I don't fully understand what you are trying to do, but if it is just a matter of preserving the order of the emitted values while the "async task" can take a long (random) time, I guess you could use the concatMap operator.

Theory

concatMap

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

Practice

In this example, the srcObservable emits a value every 100ms and each value is mapped to a new observable that emits a value between 0 and 2000ms (the async task). You can see the order is safe.

let src = Rx.Observable.timer(0,100);
src.concatMap(i=>{
  return Rx.Observable.timer(Math.random()*2000).mapTo(i); // this is the async task
}).subscribe(data=>console.log(data));

<script src="https://unpkg.com/rxjs@5.4.0/bundles/Rx.min.js"></script>

Making a hot Observable

You should also not use these subscriptions to make your observable emit data. Actually you should transform your cold observable to a hot one using .publish() and .connect() instead of share()and subscribe() :

this.source$ = Observable.range(1, 500).zip(
        Observable.interval(500),
        function (x, y) { return x; }
      ).publish();
// blah blah blah some code
this.source$.connect();

这篇关于暂停和恢复可观察的视频流,请提出更好的选择的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆