使用反应式扩展创建多个计时器 [英] Creating Multiple Timers with Reactive Extensions

查看:49
本文介绍了使用反应式扩展创建多个计时器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个非常简单的类,我用它来轮询目录中的新文件.它有位置、开始监控该位置的时间以及再次检查的时间间隔(以小时为单位):

I've got a very simple class that I am using to poll a directory for new files. It's got the location, a time to start monitoring that location, and an interval (in hours) for when to check again:

public class Thing
{
  public string         Name {get; set;}
  public Uri            Uri { get; set;}
  public DateTimeOffset StartTime {get; set;}
  public double         Interval {get; set;}
}

我是 Reactive Extensions 的新手,但我认为它正是这里工作的正确工具.在开始时以及在随后的每个时间间隔内,我只想调用一个 Web 服务来完成所有繁重的工作 - 我们将使用不断创新的 public bool DoWork(Uri uri) 来表示.

I am new to Reactive Extensions, but I think it is exactly the right tool for the job here. At the start time, and on every subsequent interval, I simply want to call a web service that does all the heavy lifting - we'll use the ever inventive public bool DoWork(Uri uri) to represent that.

编辑:DoWork 是对 Web 服务的调用,它将检查新文件并在必要时移动它们,因此它的执行应该是异步的.如果完成则返回 true,否则返回 false.

edit: DoWork is a call to a web service that will check for new files and move them if necessary, so its execution should be async. It returns a true if it completed, false if not.

如果我拥有这些 Thing 的完整集合,事情就会变得复杂.我无法思考如何为每个创建 Observable.Timer() 并让它们都调用相同的方法.

Things get complicated if I have a whole collection of these Things. I can't wrap my head around how to create the Observable.Timer() for each one, and have them all call the same method.

edit2: Observable.Timer(DateTimeOffset, Timespan) 似乎非常适合为我在这里尝试做的事情创建 IObservable.想法?

edit2: Observable.Timer(DateTimeOffset, Timespan) seems perfect to create an IObservable for what I'm trying to do here. Thoughts?

推荐答案

你需要有很多计时器吗?我假设如果你有 20 个东西的集合,那么我们将创建 20 个定时器,所有定时器都在同一时间点触发?在同一个线程/调度程序上?

Do you need there to be many timers? I assume if you have a collection of 20 things, then we will create 20 timers all to fire at the same point in time? On the same thread/scheduler?

或者您想在每个时期为每件事DoWork?

Or perhaps you want to DoWork foreach thing at every period?

from thing in things
from x in Observable.Interval(thing.Interval)
select DoWork(thing.Uri)

对比

Observable.Interval(interval)
.Select(_=>
    {
        foreach(var thing in Things)
        {
            DoWork(thing);
        }
    })

您将来可以通过多种方式开展工作.

There are many ways that you can do work in the future.

  • 您可以直接使用 Scheduler 来安排要在未来.
  • 您可以使用 Observable.Timer 来创建一个在未来指定时间产生一个值的序列.
  • 您可以使用 Observable.Interval 来创建一个序列,该序列在每个指定的时间段内产生多个值.

所以现在引入另一个问题.如果您的轮询时间为 60 秒,而您的工作功能需要 5 秒;下一次民意调查应该在 55 秒后还是 60 秒后发生?这里的一个答案表明您想要使用 Rx 序列,另一个表明您可能想要使用 Periodic Sc​​heudling.

So this now introduces another question. If you have your polling time as 60seconds and you do work function takes 5 seconds; should the next poll happen in 55 seconds or in 60 seconds? Here one answer indicates you want to use an Rx sequence, the other indicates that you probably want to use Periodic Scheudling.

下一个问题是,DoWork 会返回值吗?目前看起来它没有*.在这种情况下,我认为最适合您的做法是利用 Periodic Sc​​heduler(假设 Rx v2).

Next question is, does DoWork return a value? Currently it looks like it does not*. In this case I think the most appropriate thing for you to do is to leverage the Periodic Schedulers (assuming Rx v2).

var things = new []{
    new Thing{Name="google", Uri = new Uri("http://google.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3},
    new Thing{Name="bing", Uri = new Uri("http://bing.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3}
};
var scheduler = Scheduler.Default;
var scheduledWork = new CompositeDisposable();

foreach (var thing in things)
{
    scheduledWork.Add( scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri)));
}

//Just showing that I can cancel this i.e. clean up my resources.
scheduler.Schedule(TimeSpan.FromSeconds(10), ()=>scheduledWork.Dispose());

这现在将安排每件事情定期处理(没有漂移),在它自己的时间间隔内并提供取消.

This now will schedule each thing to be processed periodically (without drift), on it's own interval and provide cancellation.

如果您愿意,我们现在可以将其升级为查询

We can now upgrade this to a query if you like

var scheduledWork = from thing in things
                    select scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri));

var work = new CompositeDisposable(scheduledWork);

这些查询的问题在于我们没有满足 StartTime 要求.令人烦恼的是,Ccheduler.SchedulePeriodic 方法没有提供重载也有起始偏移量.

The problem with these query is that we don't fulfil the StartTime requirement. Annoyingly the Ccheduler.SchedulePeriodic method does not provide an overload to also have a start offset.

Observable.Timer 运算符确实提供了这一点.它还将在内部利用非漂移调度功能.要使用 Observable.Timer 重建查询,我们可以执行以下操作.

The Observable.Timer operator does however provide this. It will also internally leverage the non-drifting scheduling features. To reconstruct the query with Observable.Timer we can do the following.

var urisToPoll = from thing in things.ToObservable()
                 from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
                 select thing;

var subscription = urisToPoll.Subscribe(t=>DoWork(t.Uri));

所以现在你有一个很好的界面,应该可以避免漂移.但是,我认为这里的工作是以串行方式完成的(如果同时调用多个 DoWork 操作).

So now you have a nice interface that should avoid drift. However, I think the work here is done in a serial manner (if many DoWork actions are called concurrently).

*理想情况下,我会尽量避免这样的副作用陈述,但我不能 100% 确定您的要求.

*ideally I would try to avoid side effect statements like this but I am not 100% sure what your requirements.

编辑看来对 DoWork 的调用必须是并行的,所以你需要做更多的事情.理想情况下,您将 DoWork 设为 asnyc,但如果您做不到,我们可以伪造它直到我们成功.

EDIT It appears that the calls to DoWork must be in parallel, so then you need to do a bit more. Ideally you make DoWork asnyc, but if you cant we can fake it till we make it.

var polling = from thing in things.ToObservable()
              from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
              from result in Observable.Start(()=>DoWork(thing.Uri))
              select result;

var subscription = polling.Subscribe(); //Ignore the bool results?

这篇关于使用反应式扩展创建多个计时器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆