坚持与接收观测的SelectMany [英] Stuck with Rx Observable SelectMany

查看:131
本文介绍了坚持与接收观测的SelectMany的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是通过FTP下载文件,并以某种方式处理它们是异步的。 我打开的文件列表中的IObservable和使用的SelectMany组合子处理。里面有一些操作,以阻止下载文件:尝试下载文件的重试次数和返回元组,或在出现故障时,返回元组,并把它包装成观测。样品可观察retriable延迟后,我已经并略作修改它。 问题是,下载几个文件后,我的code随机停止。有时它到达订阅方法OnNext回调。我从来没有发现code深远的onComplete回调。没有异常被抛出无论是。

My goal is to download files via ftp and somehow process them asynchronously. I turn list of files to IObservable and process it using SelectMany combinator. Inside there is some manipulations to download blocked files: try to download file with number of retries and return Tuple, or in case of failure return Tuple and wrap it into Observable. Sample of "Observable retriable after delay" I have taken there and slightly modified it. Problem is that my code randomly stops after downloading couple of files. Sometimes it reaches the "OnNext" callback in "Subscribe" method. I never have detected code reaching "OnComplete" callback. No exception were thrown either.

        files.ToObservable().SelectMany(f =>
        {
            var source = Observable.Defer(() => Observable.Start(() =>
            {
                ftpConnection.DownloadFile(avroPath, f.Name);
                return Tuple.Create(true, f.Name);
            }));
            int attempt = 0;
            return Observable.Defer(() => ((++attempt == 1)
                ? source
                : source.DelaySubscription(TimeSpan.FromSeconds(1))))
                .Retry(4)
                .Catch(Observable.Return(Tuple.Create(false, f.Name)));
        }).Subscribe(
            res =>
            {
                Console.Write("Damn, its only rarely gets there, however some files were downloaded succesfully");
                if (res.Item1) Process(res.Item2);
                else LogOrQueueOrWhatever(res.Item2);
            },
            (Exception ex) =>
            {
                Console.Write("Never was thrown");
            },
            () =>
            {
                Console.Write("Never entered this section");
                ProcessLogs();
                ScheduleNExtDownloadRoutine();
            });

我会很感激,如果有人将呈现更地道的方式来惹的观测量组合子。

I will be grateful if someone will show the more idiomatic way to mess with combinators on Observables.

推荐答案

由于布兰登提到,没有同步/确定可观察的行为后,阻止。所以,我对付它替换订阅与ForEachAsync打电话,转变可观察到的任务和阻止来电与任务的等待的方法:

As Brandon mentioned, there was no synchronization/blocking after defining observable's behavior. So I deal with it by replacing "Subscribe" call with "ForEachAsync", transforming that Observable to Task and blocking caller with Tasks's "Wait" method:

    files.ToObservable().SelectMany(f =>
    {
        var source = Observable.Defer(() => Observable.Start(() =>
        {
            ftpConnection.DownloadFile(avroPath, f.Name);
            return Tuple.Create(true, f.Name);
        }));
        int attempt = 0;
        return Observable.Defer(() => ((++attempt == 1)
            ? source
            : source.DelaySubscription(TimeSpan.FromSeconds(1))))
            .Retry(4)
            .Catch(Observable.Return(Tuple.Create(false, f.Name)));
    }).ForEachAsync(res =>
    {
        if (res.Item1) Process(res.Item2);
        else LogOrQueueOrWhatever(res.Item2);
    }).Wait();

    ProcessLogs();
    ScheduleNExtDownloadRoutine();

这篇关于坚持与接收观测的SelectMany的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆