数据库轮询反应扩展 [英] Database polling with Reactive Extensions

查看:223
本文介绍了数据库轮询反应扩展的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我及时查询数据库来了解遗留系统的状态。我想过缠绕的查询的观测,但我不知道正确的方式去做。



基本上,这将是相同的查询,每5秒但我怕我将不得不面对这些问题:




  • 如果查询的执行需要10秒?我不想
    。如果以前仍在处理执行任何新的查询。

  • 此外,应该有一个超时。如果当前查询不之后执行
    ,例如,20秒,一个信息的信息应该是
    记录和一个新的尝试(相同的查询)应被发送。



额外的细节:




  • 查询只是一个 SELECT 返回一个数据集的状态代码的列表(工作故障的)。

  • 可观察序列将始终从查询,如交换机扩展方法的东西接收到的数据。

  • 我想换数据库查询(lenghty操作)到一个任务,但我不知道这是否是最好的选择。



我几乎可以肯定,查询应该在另一个线程中执行,但我没有的可观察到的应该怎么样子,曾经看过介绍接收由李·坎贝尔的想法。


< DIV CLASS =h2_lin>解决方案

这是使用接收以轮询其他系统的一个相当经典案例。大多数人会使用 Observable.Interval 作为他们去到运营商,对于大多数会被罚款。



然而,你有超时和重试的具体要求。在这种情况下,我想你最好使用运算符的组合:




  • Observable.Timer ,让您在

  • 超时规定时间内确定并已溢出
  • ToObservable()你的任务结果映射到一个可观察序列。

  • 重试来让你超时后恢复

  • 重复来让你成功的数据库查询后继续进行。这也将保留以前的数据库查询的完成和下一个的开始之间的初期/差距。



这工作 LINQPad 段应表明您查询正常工作:

 无效的主要()
{
VAR pollingPeriod = TimeSpan.FromSeconds(5);
变种dbQueryTimeout = TimeSpan.FromSeconds(10);

//你会希望定时器的预期沉默后您的接收查询超时,然后再进一步的最大沉默。
VAR rxQueryTimeOut = pollingPeriod + dbQueryTimeout;

无功调度=新EventLoopScheduler(TS =>新建线程(TS){名称=DatabasePoller});

VAR的查询= Observable.Timer(pollingPeriod,调度程序)
.SelectMany(_ =方式> DatabaseQuery()ToObservable())
.Timeout(rxQueryTimeOut,Observable.Return (超时),调度程序)
.Retry()//循环上的错误
.Repeat(); //循环成功

query.StartWith(「种籽」)
.TimeInterval(调度)//只是为了调试,打印时间差距。
使用.dump();
}

//定义其它的方法和类在这里
私有静态诠释延迟= 9;
私有静态诠释delayModifier = 1;
公共异步任务<串GT; DatabaseQuery()
{
//娼3 12秒
延迟+ = delayModifier之间的延迟;
VAR时间跨度= TimeSpan.FromSeconds(延迟);
如果(延迟4; ||延迟> 11)
delayModifier * = -1;
timespan.Dump(延迟);
等待Task.Delay(时间跨度);
回报值;
}



结果如下:

 种子00:00:00.0125407 
超时00:00:15.0166379
超时00:00:15.0124480
超时00:00:15.0004520
超时00:00:15.0013296
超时00:00:15.0140864
值00:00:14.0251731
值00:00:13.0231958
值00:00:12.0162236 $ b $的b值00:00:11.0138606

样品的关键部分是....

  VAR的查询= Observable.Timer(TimeSpan.FromSeconds(5),调度程序)
.SelectMany(_ => 。DatabaseQuery()ToObservable())
.Timeout(rxQueryTimeOut,Observable.Return(超时),调度程序)
.Retry()//循环上的错误
.Repeat(); //循环成功


I have to query a database in a timely fashion to know the state of a legacy system. I've thought of wrapping the query around an Observable, but I don't know the correct way to do it.

Basically, it will be the same query every 5 seconds. But I'm afraid I will have to face these problems:

  • What if the execution of the query takes 10 seconds? I don't want to execute any new query if the previous is still being processed.
  • Also, there should be a timeout. If the current query doesn't execute after, for example, 20 seconds, an informative message should be logged and a new attempt (the same query) should be sent.

Extra details:

  • The query is just a SELECT that returns a dataset with a list of status codes (working, faulted).
  • The Observable sequence will always take the latest data received from the query, something like the Switch extension method.
  • I would like to wrap the database query (lenghty operation) into a Task, but I'm not sure if it's the best option.

I'm almost sure that the query should be executed in another thread, but I have no idea of how the observable should look like, ever having read Introduction to Rx by Lee Campbell.

解决方案

This is a fairly classic case of using Rx to poll another system. Most people will use Observable.Interval as their go-to operator, and for most it will be fine.

However you have specific requirements on timeouts and retry. In this case I think you are better off using a combination of operators:

  • Observable.Timer to allow you to execute your query in a specified time
  • Timeout to identify and database queries that have overrun
  • ToObservable() to map your Task results to an observable sequence.
  • Retry to allow you to recover after timeouts
  • Repeat to allow you to continue after successful database queries. This will also keep that initial period/gap between the completion of the previous database query and the commencement of the next one.

This working LINQPad snippet should show you the query works properly:

void Main()
{
    var pollingPeriod = TimeSpan.FromSeconds(5);
    var dbQueryTimeout = TimeSpan.FromSeconds(10);

    //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
    var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;

    var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });

    var query = Observable.Timer(pollingPeriod, scheduler)
                    .SelectMany(_ => DatabaseQuery().ToObservable())
                    .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                    .Retry()    //Loop on errors
                    .Repeat();  //Loop on success

    query.StartWith("Seed")
        .TimeInterval(scheduler)    //Just to debug, print the timing gaps.
        .Dump();
}

// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
    //Oscillate the delay between 3 and 12 seconds
    delay += delayModifier;
    var timespan = TimeSpan.FromSeconds(delay);
    if (delay < 4 || delay > 11)
        delayModifier *= -1;
    timespan.Dump("delay");
    await Task.Delay(timespan);
    return "Value";
}

The results look like:

Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606

The key part of the sample is....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
                .SelectMany(_ => DatabaseQuery().ToObservable())
                .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                .Retry()    //Loop on errors
                .Repeat();  //Loop on success

这篇关于数据库轮询反应扩展的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆