Rx - Observable和Webclient获取csv [英] Rx - Observable and Webclient to get csv

查看:92
本文介绍了Rx - Observable和Webclient获取csv的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您好

我的lightswitch应用程序中有一个函数,它从我想使用Rx框架重写的站点下载csv文件并提供调用的可能性它是同步的。

I have a function in my lightswitch application that downloads a csv file from a site which i want to re-write using Rx framework and provide provide possibility to call it synchronously.

下面提供旧功能和新功能的代码片段。然而,新功能不起作用,对ParseCSV的调用永远不会发生。我想知道为什么以及是否存在更好的解决方案,随时提供。

Provide below are the code snippets for old and new function. The new function however doesn't work, the call to ParseCSV never happens. I would like to know why and if exists a better solution, feel free to provide.

旧代码:

private void ObservableCollection<Data> collection;
public ObservableCollection<Data> GetData(string url, ObservableCollection<Data> targetCollection)
{
	collection = targetCollection;
	if (!string.IsNullOrEmpty(url))
	{
		WebClient wc = new WebClient();
		wc.OpenReadCompleted += new OpenReadCompletedEventHandler(OpenReadCompleted_ParseCSV);
		wc.OpenReadAsync(new Uri(url));
	}
	return collection;
}

private void OpenReadCompleted_ParseCSV(object sender, OpenReadCompletedEventArgs e)
{
	if (e.Error != null) return;

	var webClient = sender as WebClient;
	if (webClient == null) return;

	try
	{
		using (StreamReader reader = new StreamReader(e.Result))
		{
			string contents = reader.ReadToEnd();
			...
		}
	}
	catch (Exception ex)
	{
		System.Diagnostics.Debug.WriteLine("Error parsing CSV!\n" + ex.Message);
	}
}

新代码(使用Rx ):

private void ObservableCollection<Data> collection;
public ObservableCollection<Data> GetData(string url, ObservableCollection<Data> targetCollection)
{
	collection = targetCollection;
	if (!string.IsNullOrEmpty(url))
	{
		var result = Observable.FromEventPattern<OpenReadCompletedEventHandler, OpenReadCompletedEventArgs>
					 (
						ev => webClient.OpenReadCompleted += ev,
						ev => webClient.OpenReadCompleted -= ev
					 )
					 .Select(o => o.EventArgs.Result)
					 .FirstOrDefault()
					 .ParseCSV();

		// Call the Async method
		webClient.OpenReadAsync(new Uri(url));
	}
	return collection;
}
    
private void ParseCSV(this Stream stream)
{
	try
	{
		using (StreamReader reader = new StreamReader(e.Result))
		{
			string contents = reader.ReadToEnd();
			...
		}
	}
	catch (Exception ex)
	{
		System.Diagnostics.Debug.WriteLine("Unable to get history data!\n" + ex.Message);
	}
}

推荐答案

FirstOrDefault 不是被动操作。 它会阻止当前线程,直到观察到第一个值。 您应该使用
订阅

FirstOrDefault is not a reactive operation.  It blocks the current thread until the first value is observed.  You should use Subscribe instead.

或者, Rxx 提供了几个相关的扩展,可以使这更容易。 以下是相关实验室:

Alternatively, Rxx provides several related extensions that could make this much easier.  Here's the related lab:

http:// rxx.codeplex.com/SourceControl/changeset/view/65171#1055791

这是另一个例子。 这个在本地托管CSV文本,并在客户端代码中使用Rxx解析器。

Here's another example.  This one hosts CSV text locally and uses Rxx parsers in the client code.

注意:客户端在解析之前会在输入中添加一个新行   (例如,
(reader.ReadToEnd()+ Environment.NewLine))因为解析器实现中的明显疏忽:它缺少"序列结束"。运营商,我计划为下一个版本添加。

Note: The client appends a new line to the input before parsing (e.g., (reader.ReadToEnd() + Environment.NewLine)) because of a glaring oversight in the parsers implementation: it's missing an "End of Sequence" operator, which I plan on adding for the next release.

using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Reactive.Linq;
using System.Text;
using Rxx.Parsers.Linq;

namespace Rxx.Labs.Reactive
{
	public sealed class WebClientCsvParserLab : BaseConsoleLab
	{
		protected override void Main()
		{
			var csv = @"
item 1, item 2, item 3
item 4, item 5, item 6

item 7, item 8, item 9
item 10";

			var url = new Uri("http://localhost:1111");

			var server =
				from request in ObservableHttpListener.Start(url.Host, url.Port)
				let bytes = Encoding.UTF8.GetBytes(csv)
				from sent in Observable.Using(
					() => request.Response,
					response => Observable.Using(
						() => response.OutputStream,
						stream => stream.WriteObservable(bytes, 0, bytes.Length)))
				select sent;

			var client = DownloadLinesCsv(url)
				.Select((line, index) => new
				{
					Line = line,
					LineNumber = index + 1
				});

			using (server.Subscribe(_ => TraceLine("Server sent response."), TraceError))
			using (client.Subscribe(
					result =>
					{
						Trace("Client parsed line {0}: ", result.LineNumber);

						foreach (var item in result.Line)
						{
							Trace(item + ',');
						}

						TraceLine();
					},
					TraceError,
					() => TraceLine("Client Completed")))
			{
				WaitForKey();
			}
		}

		public static IObservable<IList<string>> DownloadLinesCsv(Uri url)
		{
			return
				from stream in ObservableWebClient.OpenRead(url)
				from line in
					Observable.Using(
						() => new StreamReader(stream),
						reader => (reader.ReadToEnd() + Environment.NewLine)
							.ParseString(parser =>
								from next in parser
								let newLine = parser.Word(Environment.NewLine)
								let separator = parser.Word(",")
								let item = from value in next.NoneOrMoreNonGreedy().Join()
													 from _ in separator.Or(newLine.NonGreedy())
													 select value
								let line = item.NoneOrMoreNonGreedy().ToList()
								select from items in line
											 from _ in newLine
											 where items.Count > 0
											 select items)
							.ToObservable())
				select line;
		}
	}
}

- 戴夫


这篇关于Rx - Observable和Webclient获取csv的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆