坚持与接收观测的SelectMany [英] Stuck with Rx Observable SelectMany
问题描述
我的目标是通过FTP下载文件,并以某种方式处理它们是异步的。 我打开的文件列表中的IObservable和使用的SelectMany组合子处理。里面有一些操作,以阻止下载文件:尝试下载文件的重试次数和返回元组,或在出现故障时,返回元组,并把它包装成观测。样品可观察retriable延迟后,我已经有并略作修改它。 问题是,下载几个文件后,我的code随机停止。有时它到达订阅方法OnNext回调。我从来没有发现code深远的onComplete回调。没有异常被抛出无论是。
My goal is to download files via ftp and somehow process them asynchronously. I turn list of files to IObservable and process it using SelectMany combinator. Inside there is some manipulations to download blocked files: try to download file with number of retries and return Tuple, or in case of failure return Tuple and wrap it into Observable. Sample of "Observable retriable after delay" I have taken there and slightly modified it. Problem is that my code randomly stops after downloading couple of files. Sometimes it reaches the "OnNext" callback in "Subscribe" method. I never have detected code reaching "OnComplete" callback. No exception were thrown either.
files.ToObservable().SelectMany(f =>
{
var source = Observable.Defer(() => Observable.Start(() =>
{
ftpConnection.DownloadFile(avroPath, f.Name);
return Tuple.Create(true, f.Name);
}));
int attempt = 0;
return Observable.Defer(() => ((++attempt == 1)
? source
: source.DelaySubscription(TimeSpan.FromSeconds(1))))
.Retry(4)
.Catch(Observable.Return(Tuple.Create(false, f.Name)));
}).Subscribe(
res =>
{
Console.Write("Damn, its only rarely gets there, however some files were downloaded succesfully");
if (res.Item1) Process(res.Item2);
else LogOrQueueOrWhatever(res.Item2);
},
(Exception ex) =>
{
Console.Write("Never was thrown");
},
() =>
{
Console.Write("Never entered this section");
ProcessLogs();
ScheduleNExtDownloadRoutine();
});
我会很感激,如果有人将呈现更地道的方式来惹的观测量组合子。
I will be grateful if someone will show the more idiomatic way to mess with combinators on Observables.
推荐答案
由于布兰登提到,没有同步/确定可观察的行为后,阻止。所以,我对付它替换订阅与ForEachAsync打电话,转变可观察到的任务和阻止来电与任务的等待的方法:
As Brandon mentioned, there was no synchronization/blocking after defining observable's behavior. So I deal with it by replacing "Subscribe" call with "ForEachAsync", transforming that Observable to Task and blocking caller with Tasks's "Wait" method:
files.ToObservable().SelectMany(f =>
{
var source = Observable.Defer(() => Observable.Start(() =>
{
ftpConnection.DownloadFile(avroPath, f.Name);
return Tuple.Create(true, f.Name);
}));
int attempt = 0;
return Observable.Defer(() => ((++attempt == 1)
? source
: source.DelaySubscription(TimeSpan.FromSeconds(1))))
.Retry(4)
.Catch(Observable.Return(Tuple.Create(false, f.Name)));
}).ForEachAsync(res =>
{
if (res.Item1) Process(res.Item2);
else LogOrQueueOrWhatever(res.Item2);
}).Wait();
ProcessLogs();
ScheduleNExtDownloadRoutine();
这篇关于坚持与接收观测的SelectMany的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!