Rx - Observable和Webclient获取csv [英] Rx - Observable and Webclient to get csv
问题描述
您好
我的lightswitch应用程序中有一个函数,它从我想使用Rx框架重写的站点下载csv文件并提供调用的可能性它是同步的。
I have a function in my lightswitch application that downloads a csv file from a site which i want to re-write using Rx framework and provide provide possibility to call it synchronously.
下面提供旧功能和新功能的代码片段。然而,新功能不起作用,对ParseCSV的调用永远不会发生。我想知道为什么以及是否存在更好的解决方案,随时提供。
Provide below are the code snippets for old and new function. The new function however doesn't work, the call to ParseCSV never happens. I would like to know why and if exists a better solution, feel free to provide.
旧代码:
private void ObservableCollection<Data> collection;
public ObservableCollection<Data> GetData(string url, ObservableCollection<Data> targetCollection)
{
collection = targetCollection;
if (!string.IsNullOrEmpty(url))
{
WebClient wc = new WebClient();
wc.OpenReadCompleted += new OpenReadCompletedEventHandler(OpenReadCompleted_ParseCSV);
wc.OpenReadAsync(new Uri(url));
}
return collection;
}
private void OpenReadCompleted_ParseCSV(object sender, OpenReadCompletedEventArgs e)
{
if (e.Error != null) return;
var webClient = sender as WebClient;
if (webClient == null) return;
try
{
using (StreamReader reader = new StreamReader(e.Result))
{
string contents = reader.ReadToEnd();
...
}
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine("Error parsing CSV!\n" + ex.Message);
}
}
新代码(使用Rx ):
private void ObservableCollection<Data> collection;
public ObservableCollection<Data> GetData(string url, ObservableCollection<Data> targetCollection)
{
collection = targetCollection;
if (!string.IsNullOrEmpty(url))
{
var result = Observable.FromEventPattern<OpenReadCompletedEventHandler, OpenReadCompletedEventArgs>
(
ev => webClient.OpenReadCompleted += ev,
ev => webClient.OpenReadCompleted -= ev
)
.Select(o => o.EventArgs.Result)
.FirstOrDefault()
.ParseCSV();
// Call the Async method
webClient.OpenReadAsync(new Uri(url));
}
return collection;
}
private void ParseCSV(this Stream stream)
{
try
{
using (StreamReader reader = new StreamReader(e.Result))
{
string contents = reader.ReadToEnd();
...
}
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine("Unable to get history data!\n" + ex.Message);
}
}
推荐答案
FirstOrDefault 不是被动操作。 它会阻止当前线程,直到观察到第一个值。 您应该使用
订阅。
FirstOrDefault is not a reactive operation. It blocks the current thread until the first value is observed. You should use Subscribe instead.
或者, Rxx 提供了几个相关的扩展,可以使这更容易。 以下是相关实验室:
Alternatively, Rxx provides several related extensions that could make this much easier. Here's the related lab:
http:// rxx.codeplex.com/SourceControl/changeset/view/65171#1055791
这是另一个例子。 这个在本地托管CSV文本,并在客户端代码中使用Rxx解析器。
Here's another example. This one hosts CSV text locally and uses Rxx parsers in the client code.
注意:客户端在解析之前会在输入中添加一个新行 (例如,
(reader.ReadToEnd()+ Environment.NewLine))因为解析器实现中的明显疏忽:它缺少"序列结束"。运营商,我计划为下一个版本添加。
Note: The client appends a new line to the input before parsing (e.g., (reader.ReadToEnd() + Environment.NewLine)) because of a glaring oversight in the parsers implementation: it's missing an "End of Sequence" operator, which I plan on adding for the next release.
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Reactive.Linq;
using System.Text;
using Rxx.Parsers.Linq;
namespace Rxx.Labs.Reactive
{
public sealed class WebClientCsvParserLab : BaseConsoleLab
{
protected override void Main()
{
var csv = @"
item 1, item 2, item 3
item 4, item 5, item 6
item 7, item 8, item 9
item 10";
var url = new Uri("http://localhost:1111");
var server =
from request in ObservableHttpListener.Start(url.Host, url.Port)
let bytes = Encoding.UTF8.GetBytes(csv)
from sent in Observable.Using(
() => request.Response,
response => Observable.Using(
() => response.OutputStream,
stream => stream.WriteObservable(bytes, 0, bytes.Length)))
select sent;
var client = DownloadLinesCsv(url)
.Select((line, index) => new
{
Line = line,
LineNumber = index + 1
});
using (server.Subscribe(_ => TraceLine("Server sent response."), TraceError))
using (client.Subscribe(
result =>
{
Trace("Client parsed line {0}: ", result.LineNumber);
foreach (var item in result.Line)
{
Trace(item + ',');
}
TraceLine();
},
TraceError,
() => TraceLine("Client Completed")))
{
WaitForKey();
}
}
public static IObservable<IList<string>> DownloadLinesCsv(Uri url)
{
return
from stream in ObservableWebClient.OpenRead(url)
from line in
Observable.Using(
() => new StreamReader(stream),
reader => (reader.ReadToEnd() + Environment.NewLine)
.ParseString(parser =>
from next in parser
let newLine = parser.Word(Environment.NewLine)
let separator = parser.Word(",")
let item = from value in next.NoneOrMoreNonGreedy().Join()
from _ in separator.Or(newLine.NonGreedy())
select value
let line = item.NoneOrMoreNonGreedy().ToList()
select from items in line
from _ in newLine
where items.Count > 0
select items)
.ToObservable())
select line;
}
}
}
- 戴夫
这篇关于Rx - Observable和Webclient获取csv的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!