如何使用Observables实施轮询? [英] How do I implement polling using Observables?

查看:127
本文介绍了如何使用Observables实施轮询?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个参数化的rest调用,应该使用不同的参数每五秒钟执行一次

I have a parametrized rest call that should be executed every five seconds with different params:

Observable<TResult> restCall = api.method1(param1);

我需要创建一个Observable<TResult>,它将每5秒以不同的param1值轮询restCall.如果api调用失败,我需要得到一个错误,并在5秒钟内进行下一个调用.呼叫之间的间隔应该仅在restCall完成(成功/错误)时进行测量.

I need to create an Observable<TResult> which will poll the restCall every 5 seconds with different values for param1. If the api call fails I need to get an error and make the next call in 5 seconds. The interval between calls should be measured only when restCall is finished (success/error).

我目前正在使用RxJava,但是.NET示例也不错.

I'm currently using RxJava, but a .NET example would also be good.

推荐答案

简介

首先,我是一个.NET专家,我知道这种方法使用了一些Java中没有直接等效项的惯用法.但是,我谨遵从您的话,然后继续进行下去,因为这是.NET伙计们会喜欢的一个很好的问题,并且希望它将引导您沿着rx-java的正确道路前进,这是我从未研究过的.这是一个很长的答案,但主要是解释-解决方案代码本身很短!

Introduction

First, an admission, I'm a .NET guy, and I know this approach uses some idioms that have no direct equivalent in Java. But I'm taking you at your word and proceeding on the basis that this is a great question that .NET guys will enjoy, and that hopefully it will lead you down the right path in rx-java, which I have never looked at. This is quite a long answer, but it's mostly explanation - the solution code itself is pretty short!

我们将需要首先整理一些工具以帮助该解决方案.首先是Either<TLeft, TRight>类型的使用.这很重要,因为每个调用都有两个可能的结果 一个好结果或一个错误.但是我们需要将它们包装为单一类型-我们不能使用OnError将错误发回,因为这会终止结果流.两者看上去都像元组,因此更容易处理这种情况. Rxx库具有Either的非常完整和良好的实现,但是这里是一个简单的通用示例用法,然后进行简单的实现就可以达到我们的目的:

We will need sort some tools out first to help with this solution. The first is the use of the Either<TLeft, TRight> type. This is important, because you have two possible outcomes of each call either a good result, or an error. But we need to wrap these in a single type - we can't use OnError to send errors back since this would terminate the result stream. Either looks a bit like a Tuple and makes it easier to deal with this situation. The Rxx library has a very full and good implementation of Either, but here is a simple generic example of usage followed by a simple implementation good for our purposes:

var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());

/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
{
    public abstract bool IsLeft { get; }
    public bool IsRight { get { return !IsLeft; } }
    public abstract TLeft Left { get; }
    public abstract TRight Right { get;  }    
}

public static class Either
{
    public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TLeft _leftValue;

        public LeftValue(TLeft leftValue)
        {
            _leftValue = leftValue;
        }

        public override TLeft Left { get { return _leftValue; } }
        public override TRight Right { get { return default(TRight); } }
        public override bool IsLeft { get { return true; } }
    }

    public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TRight _rightValue;

        public RightValue(TRight rightValue)
        {
            _rightValue = rightValue;
        }

        public override TLeft Left { get { return default(TLeft); } }
        public override TRight Right { get { return _rightValue; } }
        public override bool IsLeft { get { return false; } }
    }

    // Factory functions to create left or right-valued Either instances
    public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
    {
        return new LeftValue<TLeft, TRight>(leftValue);
    }

    public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
    {
        return new RightValue<TLeft, TRight>(rightValue);
    }
}

请注意,按照惯例,在使用"Either"建模成功或失败时,"Right"表示成功值,因为它当然是"Right":)

Note that by convention when using Either to model a success or failure, the Right side is used for the successful value, because it's "Right" of course :)

我将使用一些辅助函数来模拟问题的两个方面.首先,这是一个生成参数的工厂-每次调用它时,它将返回以1开头的整数序列中的下一个整数.

I'm going to simulate two aspects of your problem with some helper functions. First, here is a factory to generate parameters - each time it is called it will return the next integer in the sequence of integers starting with 1:

// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
{
    return ++count; 
}

接下来,这是一个函数,将您的Rest调用模拟为IObservable.此函数接受一个整数,并且:

Next, here is a function that simulates your Rest call as an IObservable. This function accepts an integer and:

  • 如果整数是偶数,则返回一个Observable并立即发送一个OnError.
  • 如果整数为奇数,则返回字符串,该字符串将整数与"-ret"连接,但仅在经过一秒钟之后.我们将使用它来检查轮询间隔是否符合您的要求-作为完成的调用之间的暂停(无论它们花费多长时间),而不是规则的间隔.

这里是:

// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
{
    return x % 2 == 0
        ? Observable.Throw<string>(new Exception())
        : Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));   
}

现在好点子

以下是我称为Poll的合理通用可重用函数.它接受将要轮询的异步函数,该函数的参数工厂,所需的休息时间(无双关!),最后是要使用的IScheduler.

Now The Good Bit

Below is a reasonably generic reusable function I have called Poll. It accepts an asynchronous function that will be polled, a parameter factory for that function, the desired rest (no pun intended!) interval, and finally an IScheduler to use.

我想出的最简单的方法是使用Observable.Create,它使用调度程序来驱动结果流. ScheduleAsync是一种使用.NET async/await格式进行计划的方法.这是一个.NET习惯用法,可让您以命令式方式编写异步代码. async关键字引入了一个异步函数,该函数随后可以在其主体中await一个或多个异步调用,并且仅在调用完成时才继续. 我在此问题中对这种调度方式进行了详尽的解释,其中包括较旧的递归式可能更容易以rx-java方法实现.代码如下:

The simplest approach I could come up with is to use Observable.Create that uses a scheduler to drive the result stream. ScheduleAsync is a way of Scheduling that uses the .NET async/await form. This is a .NET idiom that allows you to write asynchronous code in an imperative fashion. The async keyword introduces an asynchronous function that can then await one or more asynchronous calls in it's body and will continue on only when the call completes. I wrote a long explanation of this style of scheduling in this question, which includes the older recursive the style that might be easier to implement in an rx-java approach. The code looks like this:

public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
    Func<TArg, IObservable<TResult>> asyncFunction,
    Func<TArg> parameterFactory,
    TimeSpan interval,
    IScheduler scheduler)
{
    return Observable.Create<Either<Exception, TResult>>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) => {
            while(!ct.IsCancellationRequested)
            {
                try
                {
                    var result = await asyncFunction(parameterFactory());
                    observer.OnNext(Either.Right<Exception,TResult>(result));
                }
                catch(Exception ex)
                {
                    observer.OnNext(Either.Left<Exception, TResult>(ex));
                }
                await ctrl.Sleep(interval, ct);
            }
        });        
    });    
}

简而言之,Observable.Create通常是用于创建IObservable的工厂,它使您可以更好地控制将结果发布到观察者的方式.人们常常忽略了它,而赞成使用不必要的原始图元组成.

Breaking this down, Observable.Create in general is a factory for creating IObservables that gives you a great deal of control over how results are posted to observers. It's often overlooked in favour of unnecessarily complex composition of primitives.

在这种情况下,我们使用它来创建Either<TResult, Exception>的流,以便我们可以返回成功和失败的轮询结果.

In this case, we are using it to create a stream of Either<TResult, Exception> so that we can return the successful and failed polling results.

Create函数接受一个观察者,该观察者表示我们通过OnNext/OnError/OnCompleted将结果传递给的订阅者.我们需要在Create调用内返回一个IDisposable-在.NET中,这是订阅服务器可以用来取消其订阅的句柄.在这里特别重要,因为否则轮询将永远进行-至少不会进行OnComplete.

The Create function accepts an observer that represents the Subscriber to which we pass results to via OnNext/OnError/OnCompleted. We need to return an IDisposable within the Create call - in .NET this is a handle by which the Subscriber can cancel their subscription. It's particularly important here because Polling will otherwise go on forever - or at least it won't ever OnComplete.

ScheduleAsync(或普通Schedule)的结果就是这样的句柄.处置后,它将取消我们计划的任何未决事件-从而终止轮询循环.在本例中,用于管理间隔的Sleep是可取消操作,尽管可以轻松修改Poll函数以接受也接受CancellationToken的可取消asyncFunction.

The result of ScheduleAsync (or plain Schedule) is such a handle. When disposed, it will cancel any pending event we Scheduled - thereby ending the the polling loop. In our case, the Sleep we use to manage the interval is the cancellable operation, although the Poll function could easily be modified to accept a cancellable asyncFunction that accepts a CancellationToken as well.

ScheduleAsync方法接受一个将调度事件的函数.它传递了两个参数,第一个ctrl是调度程序本身.第二个ct是CancellationToken,我们可以使用它来查看是否已请求取消(由订阅服务器设置其订阅句柄).

The ScheduleAsync method accepts a function that will be called to schedule events. It is passed two arguments, the first ctrl is the scheduler itself. The second ct is a CancellationToken we can use to see if cancellation has been requested (by the Subscriber disposing their subscription handle).

轮询本身是通过无限的while循环执行的,仅当CancellationToken指示已请求取消时,该循环才会终止.

The polling itself is performed via an infinite while loop that terminates only if the CancellationToken indicates cancellation has been requested.

在循环中,我们可以使用async/await的魔力来异步调用轮询功能,但仍将其包装在异常处理程序中.太棒了!假设没有错误,我们将结果作为Either right 值通过OnNext发送给观察者.如果存在异常,我们将其作为Either left 值发送给观察者.最后,我们在调度程序上使用Sleep函数来安排在休息间隔之后的唤醒调用-不要与Thread.Sleep调用混淆,该调用通常不会阻塞任何线程.请注意,Sleep接受CancellationToken使其也可以中止!

In the loop, we can use the magic of async/await to asynchronously invoke the polling function yet still wrap it in an exception handler. This is so awesome! Assuming no error, we send the result as the right value of an Either to the observer via OnNext. If there was an exception, we send that as the left value of an Either to the observer. Finally, we use the Sleep function on the scheduler to schedule a wake-up call after the rest interval - not to be confused with a Thread.Sleep call, this one typically doesn't block any threads. Note that Sleep accepts the CancellationToken enabling that to be aborted as well!

我想您会同意,这是async/await的一种很酷的用法,它可以简化本来非常棘手的问题!

I think you'll agree this is a pretty cool use of async/await to simplify what would have been an awfully tricky problem!

最后,这是一些调用Poll的测试代码以及​​示例输出-用于 LINQPad 粉丝此答案中的所有代码将一起在LINQPad中运行,并引用Rx 2.1程序集:

Finally, here is some test code that calls Poll, along with sample output - for LINQPad fans all the code together in this answer will run in LINQPad with Rx 2.1 assemblies referenced:

void Main()
{
    var subscription = Poll(SomeRestCall,
                            ParameterFactory,
                            TimeSpan.FromSeconds(5),
                            ThreadPoolScheduler.Instance)
        .TimeInterval()                            
        .Subscribe(x => {
            Console.Write("Interval: " + x.Interval);
            var result = x.Value;
            if(result.IsRight)
                Console.WriteLine(" Success: " + result.Right);
            else
                Console.WriteLine(" Error: " + result.Left.Message);
        });

    Console.ReadLine();    
    subscription.Dispose();
}

Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.

请注意,如果立即返回错误,则结果之间的间隔为5秒(轮询间隔),或者为6秒钟(轮询间隔加上模拟的REST调用持续时间),以获取成功结果.

Note the interval between results is either 5 seconds (the polling interval) if an error was immediately returned, or 6 seconds (the polling interval plus the simulated REST call duration) for a successful result.

编辑-这是一个替代实现,使用ScheduleAsync,但使用旧式递归调度,并且不使用async/await语法.正如您所看到的那样,这比较麻烦-但它也支持取消asyncFunction可观察到的状态.

EDIT - Here is an alternative implementation that doesn't use ScheduleAsync, but uses old style recursive scheduling and no async/await syntax. As you can see, it's a lot messier - but it does also support cancelling the asyncFunction observable.

    public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
        Func<TArg, IObservable<TResult>> asyncFunction,
        Func<TArg> parameterFactory,
        TimeSpan interval,
        IScheduler scheduler)
    {
        return Observable.Create<Either<Exception, TResult>>(
            observer =>
                {
                    var disposable = new CompositeDisposable();
                    var funcDisposable = new SerialDisposable();
                    bool cancelRequested = false;
                    disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
                    disposable.Add(funcDisposable);
                    disposable.Add(scheduler.Schedule(interval, self =>
                        {
                            funcDisposable.Disposable = asyncFunction(parameterFactory())
                                .Finally(() =>
                                    {
                                        if (!cancelRequested) self(interval);
                                    })
                                .Subscribe(
                                    res => observer.OnNext(Either.Right<Exception, TResult>(res)),
                                    ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));
                        }));

                    return disposable;

                });
    }

请参阅我的其他答案,以了解一种可以避免.NET 4.5异步/等待功能并且不使用计划调用的不同方法.

See my other answer for a different approach that avoids .NET 4.5 async/await features and doesn't use Schedule calls.

我希望这对rx-java家伙有所帮助!

I do hope that is some help to the rx-java guys!

这篇关于如何使用Observables实施轮询?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆