使用blockingcollect 写入文件 [英] file writing using blockingcollection

查看:51
本文介绍了使用blockingcollect 写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 tcp 侦听器,它侦听和写入来自服务器的数据.我使用了 BlockingCollection 来存储数据.这里我不知道文件什么时候结束.所以,我的文件流总是打开的.

I have a tcp listener which listens and writes data from the server. I used a BlockingCollection to store data. Here I don't know when the file ends. So, my filestream is always open.

我的部分代码是:

private static BlockingCollection<string> Buffer = new   BlockingCollection<string>();

Process()

{
 var consumer = Task.Factory.StartNew(() =>WriteData());
 while()

 {
  string request = await reader.ReadLineAsync();
  Buffer.Add(request);
 }
} 

WriteData()
{
  FileStream fStream = new FileStream(filename,FileMode.Append,FileAccess.Write,FileShare.Write, 16392);

 foreach(var val in Buffer.GetConsumingEnumerable(token))
 {

 fStream.Write(Encoding.UTF8.GetBytes(val), 0, val.Length);
                            fStream.Flush();
 }

}

问题是我不能在循环内处理文件流,否则我必须为每一行创建文件流,循环可能永远不会结束.

The problem is I cannot dispose filestream within loop otherwise I have to create filestream for each line and the loop may never end.

推荐答案

如果您使用 DataFlow,这在 .NET 4.5 中会容易得多 ActionBlock.ActionBlock 接受并缓冲传入的消息,并使用一个或多个任务异步处理它们.

This would be much easier in .NET 4.5 if you used a DataFlow ActionBlock. An ActionBlock accepts and buffers incoming messages and processes them asynchronously using one or more Tasks.

你可以这样写:

public static async Task ProcessFile(string sourceFileName,string targetFileName)
{
    //Pass the target stream as part of the message to avoid globals
    var block = new ActionBlock<Tuple<string, FileStream>>(async tuple =>
    {
        var line = tuple.Item1;
        var stream = tuple.Item2;
        await stream.WriteAsync(Encoding.UTF8.GetBytes(line), 0, line.Length);
    });


    //Post lines to block
    using (var targetStream = new FileStream(targetFileName, FileMode.Append, 
                                   FileAccess.Write, FileShare.Write, 16392))
    {
        using (var sourceStream = File.OpenRead(sourceFileName))
        {
            await PostLines(sourceStream, targetStream, block);
        }
        //Tell the block we are done
        block.Complete();
        //And wait fo it to finish
        await block.Completion;
    }

}

private static async Task PostLines(FileStream sourceStream, FileStream targetStream, 
                                    ActionBlock<Tuple<string, FileStream>> block)
{
    using (var reader = new StreamReader(sourceStream))
    {
        while (true)
        {
            var line = await reader.ReadLineAsync();
            if (line == null)
                break;
            var tuple = Tuple.Create(line, targetStream);
            block.Post(tuple);
        }
    }
}

大部分代码处理读取每一行并将其发布到块.默认情况下,一个 ActionBlock 一次只使用一个 Task 来处理一条消息,在这种情况下这很好.如果需要并行处理数据,可以使用更多任务.

Most of the code deals with reading each line and posting it to the block. By default, an ActionBlock uses only a single Task to process one message at a time, which is fine in this scenario. More tasks can be used if needed to process data in parallel.

一旦读取了所有行,我们通过调用 Complete 通知块,并使用 await block.Completion 等待它完成处理.

Once all lines are read, we notify the block with a call to Complete and await for it to finish processing with await block.Completion.

一旦块的Completion任务完成,我们就可以关闭目标流.

Once the block's Completion task finishes we can close the target stream.

DataFlow 库的美妙之处在于您可以将多个块链接在一起,以创建处理步骤的管道.ActionBlock 通常是此类链中的最后一步.该库负责将数据从一个块传递到下一个块,并沿着链向下传播完成.

The beauty of the DataFlow library is that you can link multiple blocks together, to create a pipeline of processing steps. ActionBlock is typically the final step in such a chain. The library takes care to pass data from one block to the next and propagate completion down the chain.

例如,一个步骤可以从日志中读取文件,第二个步骤可以使用正则表达式解析它们以查找特定模式(例如错误消息)并将其传递,第三个步骤可以接收错误消息并将它们写入另一个文件.每一步都将在不同的线程上执行,中间消息在每一步缓冲.

For example, one step can read files from a log, a second can parse them with a regex to find specific patterns (eg error messages) and pass them on, a third can receive the error messages and write them to another file. Each step will execute on a different thread, with intermediate messages buffered at each step.

这篇关于使用blockingcollect 写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆