如何取消订阅或处理Angular2或RxJS中可观察到的间隔? [英] How to unsubscribe or dispose an interval observable on condition in Angular2 or RxJS?

查看:74
本文介绍了如何取消订阅或处理Angular2或RxJS中可观察到的间隔?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是整个Rx和反应式编程的新手,但是我必须处理这种情况.我希望每隔500毫秒就可以观察到一个间隔,该间隔可以通过对REST API的POST请求来检查硬件的状态,以查看响应是否发生了变化.因此,一旦更改,我希望立即关闭这种可观察间隔的POST请求,从而将资源留给以后的其他操作.这是一段代码.

I'm new to the whole Rx thing and reactive programming, however I have to deal with such a situation. I want a interval observable to check a hardware's status by a POST request to its REST API every 500ms to see if the response changes. So once it changes, I want such interval observable POST request shut down immediately, leaving resource to other future operations. Here is a piece of code.

myLoop(item_array:number[], otheroption : string){
    for (let item of item_array){
        //set hardware option, still a request
        this.configHardware(item,otheroption)
        //after config the hardware, command hardware take action
            .flatMap(() => {
                //this return a session id for check status
                this.takeHardwareAction()
                    .flatMap( (result_id) => {
                        //I want every 500ms check if hardware action is done or not
                        let actionCheckSubscription = IntervalObservable.create(500).subscribe(
                            () => {
                                //So my question is, can I unsubscribe the actionCheckSubscription in here on condition change? For example, 
                                if (this.intervalCheckingStatus(result_id))
                                    actionCheckSubscription.unsubscribe() ;
                            }
                        ) ;
                    })

            })
    }
}

推荐答案

您可以使用Observable.fromconcatMap遍历所有项目,然后将filtertake(1)组合使用以停止间隔验证通过filter:

You could use Observable.from and concatMap to iterate through all the items and then use a filter in combination with take(1) to stop an interval as soon as the validation passes the filter:

myLoop(item_array:number[], otheroption : string) {
    return Observable.from(item_array)
        .concatMap(item => this.configHardware(item, otheroption)
            .switchMap(resultId => Observable.interval(500)
                .switchMapTo(this.intervalCheckingStatus(resultId))
                .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
                .take(1) // and complete after 1 value was valid
                .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
            )
        );
}

// usage:
myLoop([1,2,3,4], "fooBar")
    .subscribe(
        item => console.log(`Item ${item} is now valid`),
        error => console.error("Some error occured", error),
        () => console.log("All items are valid now")
    );


这是一个包含模拟数据的实时示例


Here is a live-example with mock-data

const Observable = Rx.Observable;

function myLoop(item_array) {
    return Observable.from(item_array)
        // if you don't mind the execution-order you can use "mergeMap" instead of "concatMap"
        .concatMap(item => configHardwareMock(item)
            .switchMap(resultId => Observable.interval(500)
                .do(() => console.info("Checking status of: " + resultId))
                .switchMap(() => intervalCheckingStatus(resultId))
                .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
                .take(1) // and complete after 1 value was valid
                .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
            )
        );
}

// usage:
myLoop([1,2,3,4])
    .subscribe(
        item => console.log(`Item ${item} is now valid`),
        error => console.error("Some error occured", error),
        () => console.log("All items are valid now")
    );

// mock helpers
function configHardwareMock(id) {
  return Observable.of(id);  
}

function intervalCheckingStatus(resultId) {
  if (Math.random() < .4) {
    return Observable.of(false);
  }
  
  return Observable.of(true);
}

<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

这篇关于如何取消订阅或处理Angular2或RxJS中可观察到的间隔?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆