将生成的事件序列创建为冷序列 [英] Creating generated sequence of events as a cold sequence

查看:30
本文介绍了将生成的事件序列创建为冷序列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

FWIW - 在 就元数据寻求建议

我有一个包含配置数据的网络服务.我想定期调用它 Tok 以刷新使用它的应用程序中的配置数据.如果服务出错(超时、停机等),我想保留上一次调用中的数据,并在不同的时间间隔 Tnotok 后再次调用该服务.最后,我希望行为是可测试的.

I have a webservice that contains configuration data. I would like to call it at regular intervals Tok in order to refresh the configuration data in the application that uses it. If the service is in error (timeout, down, etc) I want to keep the data from the previous call and call the service again after a different time interval Tnotok. Finally I want the behavior to be testable.

因为管理时间序列和可测试性似乎是 Reactive Extensions 的一个强项,所以我开始使用一个由生成的序列提供的 Observable.这是我创建序列的方法:

Since managing time sequences and testability seems like a strong point of the Reactive Extensions, I started using an Observable that will be fed by a generated sequence. Here is how I create the sequence:

Observable.Generate<DataProviderResult, DataProviderResult>(
    // we start with some empty data
    new DataProviderResult() { 
            Failures = 0
            , Informations = new List<Information>()},
    // never stop
    (r) => true,
    // there is no iteration
    (r) => r,
    // we get the next value from a call to the webservice
    (r) => FetchNextResults(r),
    // we select time for next msg depending on the current failures
    (r) => r.Failures > 0 ? tnotok : tok,
    // we pass a TestScheduler
    scheduler)
.Suscribe(r => HandleResults(r));

我目前有两个问题:

看起来我正在创建一个热门的 observable.即使尝试使用 Publish/Connect 我也缺少第一个事件的订阅操作.如何将其创建为冷可观察对象?

It looks like I am creating a hot observable. Even trying to use Publish/Connect I have the suscribed action missing the first event. How can I create it as a cold observable?

myObservable = myObservable.Publish();
myObservable.Suscribe(r => HandleResults(r));
myObservable.Connect() // doesn't call onNext for first element in sequence

<小时>

当我订阅时,订阅和生成的顺序似乎不同,因为对于任何帧,订阅方法在 FetchNextResults 方法之前被触发.正常吗?我希望序列调用帧 f 的方法,而不是 f+1.这是我用于获取和订阅的代码:


When I suscribe, the order in which the suscription and the generation seems off, since for any frame the suscription method is fired before the FetchNextResults method. Is it normal? I would expect the sequence to call the method for frame f, not f+1. Here is the code that I'm using for fetching and suscription:

private DataProviderResult FetchNextResults(DataProviderResult previousResult)
{
    Console.WriteLine(string.Format("Fetching at {0:hh:mm:ss:fff}", scheduler.Now));
    try
    {
        return new DataProviderResult() { Informations = dataProvider.GetInformation().ToList(), Failures = 0};
    }
    catch (Exception)
    {}
    previousResult.Failures++;

    return previousResult;
}

private void HandleResults(DataProviderResult result)
{
    Console.WriteLine(string.Format("Managing at {0:hh:mm:ss:fff}", scheduler.Now));
    dataResult = result;
}

以下是我所看到的促使我阐明这些问题的内容:

Here is what I'm seeing that prompted me articulating these questions:

Starting at 12:00:00:000
Fetching at 12:00:00:000 < no managing the result that has been fetched here
Managing at 12:00:01:000 < managing before fetching for frame f
Fetching at 12:00:01:000
Managing at 12:00:02:000
Fetching at 12:00:02:000

<小时>

这是一个简单的可复制粘贴程序,说明了这个问题.


Here is a bare bones copy-pastable program that illustrates the problem.

/*using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;*/

private static int fetchData(int i, IScheduler scheduler)
{
  writeTime("fetching " + (i+1).ToString(), scheduler);
  return i+1;
}
private static void manageData(int i, IScheduler scheduler)
{
  writeTime("managing " + i.ToString(), scheduler);
}
private static void writeTime(string msg, IScheduler scheduler)
{
  Console.WriteLine(string.Format("{0:mm:ss:fff} {1}", scheduler.Now, msg));
}

private static void Main(string[] args)
{
    var scheduler = new TestScheduler();
    writeTime("start", scheduler);
    var datas = Observable.Generate<int, int>(fetchData(0, scheduler),
                                                (d) => true,
                                                (d) => fetchData(d, scheduler),
                                                 (d) => d,
                                                 (d) => TimeSpan.FromMilliseconds(1000),
                                                 scheduler)
                                                 .Subscribe(i => manageData(i, scheduler));

    scheduler.AdvanceBy(TimeSpan.FromMilliseconds(3000).Ticks);
}

输出如下:

00:00:000 start
00:00:000 fetching 1
00:01:000 managing 1
00:01:000 fetching 2
00:02:000 managing 2
00:02:000 fetching 3

我不明白为什么在获取第一个元素后没有立即对其进行管理.在序列有效地提取数据和将数据传递给观察者之间有 1 秒.我在这里遗漏了什么还是预期的行为?如果是这样,有没有办法让观察者立即对新值做出反应?

I don't understand why the managing of the first element is not picked up immediately after its fetching. There is one second between the sequence effectively pulling the data and the data being handed to the observer. Am I missing something here or is it expected behavior? If so is there a way to have the observer react immediately to the new value?

推荐答案

您误解了 timeSelector 参数的用途.每次生成一个值时都会调用它,并返回一个时间,该时间指示在将该值传递给观察者然后生成下一个值之前延迟多长时间.

You are misunderstanding the purpose of the timeSelector parameter. It is called each time a value is generated and it returns a time which indicates how long to delay before delivering that value to observers and then generating the next value.

这是一种解决问题的非生成方式.

Here's a non-Generate way to tackle your problem.

private DataProviderResult FetchNextResult()
{
    // let exceptions throw
    return dataProvider.GetInformation().ToList();
}

private IObservable<DataProviderResult> CreateObservable(IScheduler scheduler)
{
    // an observable that produces a single result then completes
    var fetch = Observable.Defer(
        () => Observable.Return(FetchNextResult));

    // concatenate this observable with one that will pause
    // for "tok" time before completing.
    // This observable will send the result
    // then pause before completing.
    var fetchThenPause = fetch.Concat(Observable
        .Empty<DataProviderResult>()
        .Delay(tok, scheduler));

    // Now, if fetchThenPause fails, we want to consume/ignore the exception
    // and then pause for tnotok time before completing with no results
    var fetchPauseOnErrors = fetchThenPause.Catch(Observable
        .Empty<DataProviderResult>()
        .Delay(tnotok, scheduler));

    // Now, whenever our observable completes (after its pause), start it again.
    var fetchLoop = fetchPauseOnErrors.Repeat();

    // Now use Publish(initialValue) so that we remember the most recent value
    var fetchLoopWithMemory = fetchLoop.Publish(null);

    // YMMV from here on.  Lets use RefCount() to start the
    // connection the first time someone subscribes
    var fetchLoopAuto = fetchLoopWithMemory.RefCount();

    // And lets filter out that first null that will arrive before
    // we ever get the first result from the data provider
    return fetchLoopAuto.Where(t => t != null);
}

public MyClass()
{
    Information = CreateObservable();
}

public IObservable<DataProviderResult> Information { get; private set; }

这篇关于将生成的事件序列创建为冷序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆