如何阅读交错文件同时使用无扩展 [英] How to read interleaved file concurrently using reactive extensions

查看:147
本文介绍了如何阅读交错文件同时使用无扩展的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新来的反应扩展和我想用它(在C#)来读取其中包含了几个交错流的文件。基本上,文件的格式为 ABCDABCDABCD ... 。我想preFER读取顺序文件,以及单独的数据流(即 AAA .. BBB .. 等),以及并行地处理每个流,使用单独的线程对每个流

I am new to the reactive extensions and I would like to use it (in c#) to read a file which contains several streams that are interleaved. Basically the file is in the format ABCDABCDABCD.... I would prefer to read the file sequentially and separate the streams (ie AAA.., BBB.., etc) and process each stream in parallel, using separate threads for each stream.

有将必须是某种形式的缓冲,以使确保每个流可以保持忙碌尽可能(内当然限制)。不是所有的数据流开始的同时必须的,在这种情况下,一些元件具有要跳过的延迟流。在这种情况下,可能缓冲桥接间隙。

There will have to be some form of buffering to make sure each stream can remain busy as much as possible (within limits of course). Not all streams start at the same time necessarily, in which case a number of elements have to be skipped for the delayed streams. In this case the buffering might bridge the gap.

文件中的元素是小的(4个字节),所以它是相当繁琐。所以,我也在寻找一种方式来处理这个效率。

The elements in the file are small (4 bytes) so it is quite chatty. Therefore, I'm also looking for a way to deal with this efficiently.

我开始通过创建一个枚举读取文件。这可以作出以提供一个结构包含流ID,或流可以基于订单(流的元件数模数)来分离。后者可能是更有效的,但。

I started out by creating an enumerable to read the file. This could be made to supply a struct which contains the stream ID, or the streams could be separated based on the order (element number modulo number of streams). The later is probably more efficient though.

推荐答案

这个问题有'这取决于'加盖一切都结束吧,尤其是当你在谈论的性能和效率,但也提供了一个例子,是有点做作。也就是说,相对于真正的文件中的示例文件是死的简单。不过,我会尝试提供一些建议关闭的机会,这是非常有用的。

This question has 'it depends' stamped all over it, especially when you're talking about performance and efficiency but have provided an example that is somewhat contrived. Namely, your example file is dead simple compared to the real file. However, I will attempt to provide some advice on the off chance that it is useful.

下面是一个方法,把流转换为可枚举<焦炭> 。流将应用缓冲,这将在同一时间发送一个结果返回。这可以作出更有效的(发回的数据块),但在某些时候需要处理它们一次一个,它可能还有在这里。不要prematurely优化。

Here's a method to turn a stream into an Enumerable<char>. The stream will apply the buffering, this will send one result back at a time. This could be made more efficient (to send back chunks of data), but at some point you need to process them one at a time and it may as well be here. Don't prematurely optimise.

IEnumerable<char> ReadBytes(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return (char)reader.Read();
    }
}

现在,让我们说,这是处理code的'输出'观测。首先,我设置输出观测起来,然后我订阅了他们适当的。请注意,我用数组在这里,所以我的输出可观察到的指标是数组索引。人们可以使用字典也,如果流指标不能变成一个从零开始的索引。

Now, let's say this is the processing code for the 'output' observables. First, I set the output observables up, and then I subscribe to them as appropriate. Note that I'm using an array here so my output observable index is the array index. One could use a dictionary also, if the stream index couldn't be turned into a zero-based index.

var outputs = Enumerable.Repeat(0, 3).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].Delay(TimeSpan.FromSeconds(2)).Subscribe(x => Console.WriteLine("hi: {0}", x));
outputs[1].Delay(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine("ho: {0}", x));
outputs[2].Subscribe(x => Console.WriteLine("he: {0}", x));

注意使用的受试对象;焦炭&GT; 来了就派我的元素。这取决于你的元素的类型,但字符工作在给出的例子。还要注意的是我耽误元素只能证明一切工作。他们现在是独立的流,你可以做任何你想要他们。

Notice the use of Subject<char> to send my elements out on. This depends on the type of your element, but char works in the example given. Notice also that I delay the elements only to prove everything is working. They are now independent streams and you can do whatever you want with them.

确定,给定一个文件流:

OK, given a file stream:

var file = @"C:\test.txt";
var buffer = 32;
var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, buffer);

我现在可以订阅,并使用模数索引发送到右输出流:

I can now subscribe and use the modulo index to send to the right output stream:

ReadBytes(stream)
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % 3), Value = x }) // you can change it up here
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

有可能更有效的方法在这里取决于你究竟如何计算目标流,但这个想法是相同的。

There are potentially more efficient methods here depending on exactly how you can calculate the target stream, but the idea remains the same.

输入文件只包含一个行: ABCABCABCABCABCABC

Input file contains just one line: ABCABCABCABCABCABC

从运行程序的输出是:

he: C
he: C
he: C
he: C
he: C
he: C

一秒钟后:

ho: B
ho: B
ho: B
ho: B
ho: B
ho: B

然后又是第二个:

And then another second:

hi: A
hi: A
hi: A
hi: A
hi: A
hi: A

这篇关于如何阅读交错文件同时使用无扩展的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆