如何使用观测量来实现轮询? [英] How to implement polling using Observables?

查看:188
本文介绍了如何使用观测量来实现轮询?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经参数化应该执行REST调用各5第二,不同的PARAMS:

I have parametrized rest call that should be executed each five second with different params:

观测restCall = api.method1(参数1);

Observable restCall = api.method1(param1);

我需要创建观测投票,将执行restCall每次5第二,不同的参数1。如果API调用失败,我需要得到错误,使下一个电话5秒钟。电话之间的间隔应在restCall完成(成功/错误)只衡量。

I need to create Observable polling that will execute restCall each 5 second with different param1. If api call fails i need to get error and make next call in 5 seconds. Interval between calls should be measured only when restCall is finished (success/error).

PS:即时通讯目前使用RxJava,但.NET示例也将是不错的。

PS: Im currently using RxJava, but .NET example also will be good.

推荐答案

首先承认,我是一个.NET的家伙,我知道这种方法使用一些成语有在Java中没有直接等同的。但是,我要带你的话,并继续进行的基础,这是一个.NET家伙会喜欢,而且希望它会导致你在RX-java的正确的道路,这是我从来没有看过一个很大的问题上。这是一个相当长的答案,但它主要的解释 - 解决方案code本身是pretty短

Introduction

First, an admission, I'm a .NET guy, and I know this approach uses some idioms that have no direct equivalent in Java. But I'm taking you at your word and proceeding on the basis that this is a great question that .NET guys will enjoy, and that hopefully it will lead you down the right path in rx-java, which I have never looked at. This is quite a long answer, but it's mostly explanation - the solution code itself is pretty short!

我们需要一些工具,在第一个排序,以帮助这个解决方案。首先是使用无论是与LT的; TLeft,TRight> 键入。这一点很重要,因为你每次调用的或者的一个很好的结果,还是错误的两种可能的结果。但是,我们需要在一个单一的类型来包装这些 - 我们不能使用的OnError发回的错误,因为这将终止,结果流。无论是看起来有点像一个元组,并使其更容易地处理这种情况。该 RXX库有一个非常全面和良好的执行中,但这里是后跟一个简单的实现良好的我们的目的使用的一个简单的一般示例:

We will need sort some tools out first to help with this solution. The first is the use of the Either<TLeft, TRight> type. This is important, because you have two possible outcomes of each call either a good result, or an error. But we need to wrap these in a single type - we can't use OnError to send errors back since this would terminate the result stream. Either looks a bit like a Tuple and makes it easier to deal with this situation. The Rxx library has a very full and good implementation of Either, but here is a simple generic example of usage followed by a simple implementation good for our purposes:

var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());

/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
{
    public abstract bool IsLeft { get; }
    public bool IsRight { get { return !IsLeft; } }
    public abstract TLeft Left { get; }
    public abstract TRight Right { get;  }    
}

public static class Either
{
    public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TLeft _leftValue;

        public LeftValue(TLeft leftValue)
        {
            _leftValue = leftValue;
        }

        public override TLeft Left { get { return _leftValue; } }
        public override TRight Right { get { return default(TRight); } }
        public override bool IsLeft { get { return true; } }
    }

    public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TRight _rightValue;

        public RightValue(TRight rightValue)
        {
            _rightValue = rightValue;
        }

        public override TLeft Left { get { return default(TLeft); } }
        public override TRight Right { get { return _rightValue; } }
        public override bool IsLeft { get { return false; } }
    }

    // Factory functions to create left or right-valued Either instances
    public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
    {
        return new LeftValue<TLeft, TRight>(leftValue);
    }

    public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
    {
        return new RightValue<TLeft, TRight>(rightValue);
    }
}

请注意,按照惯例使用其中任何一个成功或失败,右侧是用于成功的价值模型的时候,因为它是右,当然:)

Note that by convention when using Either to model a success or failure, the Right side is used for the successful value, because it's "Right" of course :)

我要模仿你的问题的两个方面的一些辅助功能。首先,这里是一家工厂产生的参数 - 每次调用它会从1开始的整数返回序列中的下一个整数时间:

I'm going to simulate two aspects of your problem with some helper functions. First, here is a factory to generate parameters - each time it is called it will return the next integer in the sequence of integers starting with 1:

// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
{
    return ++count; 
}

接下来,这里是模拟的REST调用作为的IObservable的功能。该函数接受一个整数和:

Next, here is a function that simulates your Rest call as an IObservable. This function accepts an integer and:


  • 如果该整数甚至它返回一个可观察的是立即发送的OnError。

  • 如果整数是奇数它返回一个字符串串联与-ret的整数,但第二个已经过去了之后。我们将用它来检查轮询间隔表现为你的要求 - 但是长,他们采取为完成调用之间暂停,而不是一个常规的间隔

下面是:

// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
{
    return x % 2 == 0
        ? Observable.Throw<string>(new Exception())
        : Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));   
}

现在好一点

下面是一个合理的可重复使用的通用功能,我称之为民意测验。它接受一个异步函数,将轮询,参数工厂,了解功能,所需的其余部分(没有双关语意!)间隔,最后一个IScheduler使用。

Now The Good Bit

Below is a reasonably generic reusable function I have called Poll. It accepts an asynchronous function that will be polled, a parameter factory for that function, the desired rest (no pun intended!) interval, and finally an IScheduler to use.

我能想出的最简单的方法是使用 Observable.Create 使用调度驱动的结果流。 ScheduleAsync 是调度的方式,使用.NET异步/的await形式。这是一个.NET的成语,使您可以编写异步code在命令式的方式。在异步关键字引入一个异步功能,可以再等待一个或在它的身上更多的异步调用,只会继续当调用完成。 我写这种风格的调度在这个问题上很长的解释,其中包括上了年纪的递归风格可能会更容易在RX-java的方法来实现中的code是这样的:

The simplest approach I could come up with is to use Observable.Create that uses a scheduler to drive the result stream. ScheduleAsync is a way of Scheduling that uses the .NET async/await form. This is a .NET idiom that allows you to write asynchronous code in an imperative fashion. The async keyword introduces an asynchronous function that can then await one or more asynchronous calls in it's body and will continue on only when the call completes. I wrote a long explanation of this style of scheduling in this question, which includes the older recursive the style that might be easier to implement in an rx-java approach. The code looks like this:

public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
    Func<TArg, IObservable<TResult>> asyncFunction,
    Func<TArg> parameterFactory,
    TimeSpan interval,
    IScheduler scheduler)
{
    return Observable.Create<Either<Exception, TResult>>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) => {
            while(!ct.IsCancellationRequested)
            {
                try
                {
                    var result = await asyncFunction(parameterFactory());
                    observer.OnNext(Either.Right<Exception,TResult>(result));
                }
                catch(Exception ex)
                {
                    observer.OnNext(Either.Left<Exception, TResult>(ex));
                }
                await ctrl.Sleep(interval, ct);
            }
        });        
    });    
}

打破下来, Observable.Create 一般是创建IObservables,让您的控制权结果如何公布观察员很大的工厂。它常常被忽视赞成原语的不必要的复杂组成。

Breaking this down, Observable.Create in general is a factory for creating IObservables that gives you a great deal of control over how results are posted to observers. It's often overlooked in favour of unnecessarily complex composition of primitives.

在这种情况下,我们使用它来创建流要么&LT; TResult,异常方式&gt; 这样我们就可以返回成功和失败的投票结果。

In this case, we are using it to create a stream of Either<TResult, Exception> so that we can return the successful and failed polling results.

创建函数接受重新presents,这是我们通过OnNext /的OnError / OnCompleted传递结果给用户的观察员。我们需要返回一个的IDisposable 创建通话 - 在.NET中,这是一个由手柄,认购可以取消他们的订阅。因为投票,否则将永远持续下去它在这里特别重要的 - 或者至少是永远不会的onComplete

The Create function accepts an observer that represents the Subscriber to which we pass results to via OnNext/OnError/OnCompleted. We need to return an IDisposable within the Create call - in .NET this is a handle by which the Subscriber can cancel their subscription. It's particularly important here because Polling will otherwise go on forever - or at least it won't ever OnComplete.

ScheduleAsync (或纯计划)的结果是这样的句柄。处置时,将取消我们预定的任何未决的事件 - 从而结束轮询循环。在我们的例子中,睡眠我们使用管理间隔为撤销的操作,虽然民意调查功能,可以很容易地进行修改,以接受撤销 asyncFunction ,接受一个的CancellationToken 以及

The result of ScheduleAsync (or plain Schedule) is such a handle. When disposed, it will cancel any pending event we Scheduled - thereby ending the the polling loop. In our case, the Sleep we use to manage the interval is the cancellable operation, although the Poll function could easily be modified to accept a cancellable asyncFunction that accepts a CancellationToken as well.

该ScheduleAsync方法接受一个将被调用来安排活动功能。它有两个参数,第一个 CTRL 是调度程序本身。第二个克拉是的CancellationToken我们可以用它来查看是否取消已请求(认购人处置自己的订阅句柄)。

The ScheduleAsync method accepts a function that will be called to schedule events. It is passed two arguments, the first ctrl is the scheduler itself. The second ct is a CancellationToken we can use to see if cancellation has been requested (by the Subscriber disposing their subscription handle).

轮询本身通过一个无限while循环只有终止,如果指示的CancellationToken取消已请求。

The polling itself is performed via an infinite while loop that terminates only if the CancellationToken indicates cancellation has been requested.

在循环中,我们可以使用异步的魔法/等待异步调用查询功能但仍然在异常处理包裹。这是如此真棒!假设没有错误,我们将结果作为的右键的一个的价值,通过观察 OnNext 。如果有一个例外,我们发送的的的一个来观测值。最后,我们使用睡眠功能上的调度程序来安排敲响了警钟,其余时间间隔后 - 不以 Thread.sleep代码混淆电话,这其中通常不会阻止任何线程。需要注意的是睡眠接受的CancellationToken 使该被以及中止!

In the loop, we can use the magic of async/await to asynchronously invoke the polling function yet still wrap it in an exception handler. This is so awesome! Assuming no error, we send the result as the right value of an Either to the observer via OnNext. If there was an exception, we send that as the left value of an Either to the observer. Finally, we use the Sleep function on the scheduler to schedule a wake-up call after the rest interval - not to be confused with a Thread.Sleep call, this one typically doesn't block any threads. Note that Sleep accepts the CancellationToken enabling that to be aborted as well!

我想你会同意这是一个pretty酷使用异步电动机/等待简化什么将是一个非常棘手的问题!

I think you'll agree this is a pretty cool use of async/await to simplify what would have been an awfully tricky problem!

最后,这里是一些测试code调用民意测验,以及示例输出 - 为的 LINQPad 球迷所有的code一起在这个答案将在LINQPad与接收运行2.1组件引用的:

Finally, here is some test code that calls Poll, along with sample output - for LINQPad fans all the code together in this answer will run in LINQPad with Rx 2.1 assemblies referenced:

void Main()
{
    var subscription = Poll(SomeRestCall,
                            ParameterFactory,
                            TimeSpan.FromSeconds(5),
                            ThreadPoolScheduler.Instance)
        .TimeInterval()                            
        .Subscribe(x => {
            Console.Write("Interval: " + x.Interval);
            var result = x.Value;
            if(result.IsRight)
                Console.WriteLine(" Success: " + result.Right);
            else
                Console.WriteLine(" Error: " + result.Left.Message);
        });

    Console.ReadLine();    
    subscription.Dispose();
}

Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.

请注意结果之间的间隔是采用5秒(轮询间隔),如果立即返回一个错误,或6秒(轮询时间间隔加上模拟REST调用持续时间)为成功的结果。

Note the interval between results is either 5 seconds (the polling interval) if an error was immediately returned, or 6 seconds (the polling interval plus the simulated REST call duration) for a successful result.

编辑 - 在这里是一种替代实现,它的的使用ScheduleAsync,但使用老式递归调度和异步无/ AWAIT语法。正如你所看到的,这是一个很大混乱 - 但它也支持取消asyncFunction观察到

EDIT - Here is an alternative implementation that doesn't use ScheduleAsync, but uses old style recursive scheduling and no async/await syntax. As you can see, it's a lot messier - but it does also support cancelling the asyncFunction observable.

    public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
        Func<TArg, IObservable<TResult>> asyncFunction,
        Func<TArg> parameterFactory,
        TimeSpan interval,
        IScheduler scheduler)
    {
        return Observable.Create<Either<Exception, TResult>>(
            observer =>
                {
                    var disposable = new CompositeDisposable();
                    var funcDisposable = new SerialDisposable();
                    bool cancelRequested = false;
                    disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
                    disposable.Add(funcDisposable);
                    disposable.Add(scheduler.Schedule(interval, self =>
                        {
                            funcDisposable.Disposable = asyncFunction(parameterFactory())
                                .Finally(() =>
                                    {
                                        if (!cancelRequested) self(interval);
                                    })
                                .Subscribe(
                                    res => observer.OnNext(Either.Right<Exception, TResult>(res)),
                                    ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));
                        }));

                    return disposable;

                });
    }

看我其他的答案了不同的方法,避免了.NET 4.5异步/等待功能,不使用进度表要求。

See my other answer for a different approach that avoids .NET 4.5 async/await features and doesn't use Schedule calls.

我希望这是对RX-java的人一些帮助!

I do hope that is some help to the rx-java guys!

这篇关于如何使用观测量来实现轮询?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆