数据库轮询反应扩展 [英] Database polling with Reactive Extensions
问题描述
我及时查询数据库来了解遗留系统的状态。我想过缠绕的查询的观测
,但我不知道正确的方式去做。
基本上,这将是相同的查询,每5秒但我怕我将不得不面对这些问题:
- 如果查询的执行需要10秒?我不想
。如果以前仍在处理执行任何新的查询。 - 此外,应该有一个超时。如果当前查询不之后执行
,例如,20秒,一个信息的信息应该是
记录和一个新的尝试(相同的查询)应被发送。
额外的细节:
- 查询只是一个
SELECT
返回一个数据集的状态代码的列表(工作的故障的)。 - 可观察序列将始终从查询,如交换机扩展方法的东西接收到的数据。
- 我想换数据库查询(lenghty操作)到一个任务,但我不知道这是否是最好的选择。
我几乎可以肯定,查询应该在另一个线程中执行,但我没有的可观察到的应该怎么样子,曾经看过介绍接收由李·坎贝尔的想法。
< DIV CLASS =h2_lin>解决方案
这是使用接收以轮询其他系统的一个相当经典案例。大多数人会使用 Observable.Interval
作为他们去到运营商,对于大多数会被罚款。
然而,你有超时和重试的具体要求。在这种情况下,我想你最好使用运算符的组合:
-
Observable.Timer
,让您在 -
超时
规定时间内确定并已溢出李数据库查询执行查询>
-
ToObservable()
你的任务
结果映射到一个可观察序列。 -
重试
来让你超时后恢复 -
重复
来让你成功的数据库查询后继续进行。这也将保留以前的数据库查询的完成和下一个的开始之间的初期/差距。
这工作 LINQPad 段应表明您查询正常工作:
无效的主要()
{
VAR pollingPeriod = TimeSpan.FromSeconds(5);
变种dbQueryTimeout = TimeSpan.FromSeconds(10);
//你会希望定时器的预期沉默后您的接收查询超时,然后再进一步的最大沉默。
VAR rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
无功调度=新EventLoopScheduler(TS =>新建线程(TS){名称=DatabasePoller});
VAR的查询= Observable.Timer(pollingPeriod,调度程序)
.SelectMany(_ =方式> DatabaseQuery()ToObservable())
.Timeout(rxQueryTimeOut,Observable.Return (超时),调度程序)
.Retry()//循环上的错误
.Repeat(); //循环成功
query.StartWith(「种籽」)
.TimeInterval(调度)//只是为了调试,打印时间差距。
使用.dump();
}
//定义其它的方法和类在这里
私有静态诠释延迟= 9;
私有静态诠释delayModifier = 1;
公共异步任务<串GT; DatabaseQuery()
{
//娼3 12秒
延迟+ = delayModifier之间的延迟;
VAR时间跨度= TimeSpan.FromSeconds(延迟);
如果(延迟4; ||延迟> 11)
delayModifier * = -1;
timespan.Dump(延迟);
等待Task.Delay(时间跨度);
回报值;
}
结果如下:
种子00:00:00.0125407
超时00:00:15.0166379
超时00:00:15.0124480
超时00:00:15.0004520
超时00:00:15.0013296
超时00:00:15.0140864
值00:00:14.0251731
值00:00:13.0231958
值00:00:12.0162236 $ b $的b值00:00:11.0138606
样品的关键部分是....
VAR的查询= Observable.Timer(TimeSpan.FromSeconds(5),调度程序)
.SelectMany(_ => 。DatabaseQuery()ToObservable())
.Timeout(rxQueryTimeOut,Observable.Return(超时),调度程序)
.Retry()//循环上的错误
.Repeat(); //循环成功
I have to query a database in a timely fashion to know the state of a legacy system. I've thought of wrapping the query around an Observable
, but I don't know the correct way to do it.
Basically, it will be the same query every 5 seconds. But I'm afraid I will have to face these problems:
- What if the execution of the query takes 10 seconds? I don't want to execute any new query if the previous is still being processed.
- Also, there should be a timeout. If the current query doesn't execute after, for example, 20 seconds, an informative message should be logged and a new attempt (the same query) should be sent.
Extra details:
- The query is just a
SELECT
that returns a dataset with a list of status codes (working, faulted). - The Observable sequence will always take the latest data received from the query, something like the Switch extension method.
- I would like to wrap the database query (lenghty operation) into a Task, but I'm not sure if it's the best option.
I'm almost sure that the query should be executed in another thread, but I have no idea of how the observable should look like, ever having read Introduction to Rx by Lee Campbell.
This is a fairly classic case of using Rx to poll another system. Most people will use Observable.Interval
as their go-to operator, and for most it will be fine.
However you have specific requirements on timeouts and retry. In this case I think you are better off using a combination of operators:
Observable.Timer
to allow you to execute your query in a specified timeTimeout
to identify and database queries that have overrunToObservable()
to map yourTask
results to an observable sequence.Retry
to allow you to recover after timeoutsRepeat
to allow you to continue after successful database queries. This will also keep that initial period/gap between the completion of the previous database query and the commencement of the next one.
This working LINQPad snippet should show you the query works properly:
void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);
//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}
// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}
The results look like:
Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606
The key part of the sample is....
var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
这篇关于数据库轮询反应扩展的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!