什么是Rx.NET方式产生一个撤销的可观察到的文件名? [英] What is the Rx.NET way to produce a cancellable observable of file names?

查看:260
本文介绍了什么是Rx.NET方式产生一个撤销的可观察到的文件名?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想生成可观察到的文件,这样的文件名的发现可能在任何时刻被取消的。对于这个例子来说,取消发生在自动1秒

I would like to generate an observable of files, such that the discovery of the files names could be cancelled in any moment. For the sake of this example, the cancellation takes place in 1 second automatically.

下面是我当前的代码:

class Program
{
    static void Main()
    {
        try
        {
            RunAsync(@"\\abc\xyz").GetAwaiter().GetResult();
        }
        catch (Exception exc)
        {
            Console.Error.WriteLine(exc);
        }
        Console.Write("Press Enter to exit");
        Console.ReadLine();
    }

    private static async Task RunAsync(string path)
    {
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
        await GetFileSource(path, cts);
    }

    private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
    {
        return Observable.Create<string>(obs => Task.Run(async () =>
        {
            Console.WriteLine("Inside Before");
            foreach (var file in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).Take(50))
            {
                cts.Token.ThrowIfCancellationRequested();
                obs.OnNext(file);
                await Task.Delay(100);
            }
            Console.WriteLine("Inside After");
            obs.OnCompleted();
            return Disposable.Empty;
        }, cts.Token))
        .Do(Console.WriteLine);
    }
}



我不喜欢我的实现两个方面(如还有更多的 - 请随时指出):

I do not like two aspects of my implementation (if there are more - please feel free to point out):


  1. 我有文件可枚举,但我手动遍历每个。我可以以某种方式使用 ToObservable 延期?

  2. 我无法弄清楚如何使用 CTS的.Token 传递给 Task.Run 。只好用 CTS 从外部环境( GetFileSource 参数)抓获。似乎丑陋给我。

  1. I have an enumerable of files, yet I iterate over each manually. Could I use the ToObservable extension somehow?
  2. I could not figure out how to make use of the cts.Token passed to Task.Run. Had to use the cts captured from the outer context (GetFileSource parameter). Seems ugly to me.

请问这是怎么它应该做?必须有一个更好的办法。

Is this how it should be done? Must be a better way.

推荐答案

我仍然不相信这是真的无问题,你所要求的背压上制片人这实在是对应该如何反应工作。

I'm still not convinced this is really a Reactive Problem, you are asking for backpressure on the producer which is really against how Reactive is supposed to work.

这是说,如果你打算这样做,这样你应该明白,非常细粒度的时间操作应该总是被委派到计划而不是试图做任务协调 CancellationTokens 。因此,我将重构看起来像这样:

That being said, if you are going to do it this way you should realize that very fine-grained time manipulation should almost always be delegated to a Scheduler rather than trying to do coordination with Tasks and CancellationTokens. So I would refactor to look like this:

public static IObservable<string> GetFileSource(string path, Func<string, Task<string>> processor, IScheduler scheduler = null) {

  scheduler = scheduler ?? Scheduler.Default;

  return Observable.Create<string>(obs => 
  {
    //Grab the enumerator as our iteration state.
    var enumerator = Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories)
                              .GetEnumerator();
    return scheduler.Schedule(enumerator, async (e, recurse) =>
    {
      if (!e.MoveNext())
      {
         obs.OnCompleted();
         return;
      }

      //Wait here until processing is done before moving on
      obs.OnNext(await processor(e.Current));

      //Recursively schedule
      recurse(e);
    });
  });

}



然后,而不是传递一个取消标记,使用 TakeUntil

var source = GetFileSource(path, x => {/*Do some async task here*/; return x; })
 .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1));

您还可以看到一个异步<$ C的实现更高级的例子$ C>生成方法

You can also see a more advanced example for an implementation of an async Generate method.

这篇关于什么是Rx.NET方式产生一个撤销的可观察到的文件名?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆