等待RxJS Observable的onNext中的异步操作 [英] Wait for an async operation in onNext of RxJS Observable

查看:353
本文介绍了等待RxJS Observable的onNext中的异步操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个RxJS序列以正常方式被消耗...

I have an RxJS sequence being consumed in the normal manner...

但是,在可观察的"onNext"处理程序中,某些操作将同步完成,而其他操作则需要异步回调,在处理输入序列中的下一项之前,需要等待这些回调.

However, in the observable 'onNext' handler, some of the operations will complete synchronously, but others require async callbacks, that need to be waited on before processing the next item in the input sequence.

...有点困惑如何执行此操作.有任何想法吗?谢谢!

...little bit confused how to do this. Any ideas? thanks!

someObservable.subscribe(
    function onNext(item)
    {
        if (item == 'do-something-async-and-wait-for-completion')
        {
            setTimeout(
                function()
                {
                    console.log('okay, we can continue');
                }
                , 5000
            );
        }
        else
        {
            // do something synchronously and keep on going immediately
            console.log('ready to go!!!');
        }
    },
    function onError(error)
    {
        console.log('error');
    },
    function onComplete()
    {
        console.log('complete');
    }
);

推荐答案

要执行的每个操作都可以建模为可观察的模型.甚至同步操作都可以用这种方式建模.然后,您可以使用map将序列转换为序列序列,然后使用concatAll展平序列.

Each operation you want to perform can be modeled as an observable. Even the synchronous operation can be modeled this way. Then you can use map to convert your sequence into a sequence of sequences, then use concatAll to flatten the sequence.

someObservable
    .map(function (item) {
        if (item === "do-something-async") {
            // create an Observable that will do the async action when it is subscribed
            // return Rx.Observable.timer(5000);

            // or maybe an ajax call?  Use `defer` so that the call does not
            // start until concatAll() actually subscribes.
            return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });
        }
        else {
            // do something synchronous but model it as an async operation (using Observable.return)
            // Use defer so that the sync operation is not carried out until
            // concatAll() reaches this item.
            return Rx.Observable.defer(function () {
                return Rx.Observable.return(someSyncAction(item));
            });
        }
    })
    .concatAll() // consume each inner observable in sequence
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });

要回复您的一些评论...在某些时候,您需要对功能流施加一些期望.在大多数语言中,当处理可能是异步的函数时,函数签名是异步的,并且函数的实际异步与同步本质被隐藏为函数的实现细节.无论您使用的是JavaScript Promise,Rx Observables,C#Tasks,C ++ Futures等,都是如此.这些函数最终会返回一个promise/observable/task/future/etc,如果该函数实际上是同步的,则它返回的对象是刚刚完成.

To reply to some of your comments...at some point you need to force some expectations on the stream of functions. In most languages, when dealing with functions that are possibly async, the function signatures are async and the actual async vs sync nature of the function is hidden as an implementation detail of the function. This is true whether you are using javaScript promises, Rx observables, c# Tasks, c++ Futures, etc. The functions end up returning a promise/observable/task/future/etc and if the function is actually synchronous, then the object it returns is just already completed.

话虽如此,由于这是JavaScript,因此您可以 作弊:

Having said that, since this is JavaScript, you can cheat:

var makeObservable = function (func) {
    return Rx.Observable.defer(function () {
        // execute the function and then examine the returned value.
        // if the returned value is *not* an Rx.Observable, then
        // wrap it using Observable.return
        var result = func();
        return result instanceof Rx.Observable ? result: Rx.Observable.return(result);
    });
}

someObservable
    .map(makeObservable)
    .concatAll()
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });

这篇关于等待RxJS Observable的onNext中的异步操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆