如何从一个异步生产汇总数据,并将其写入到一个文件? [英] How to aggregate the data from an async producer and write it to a file?

查看:180
本文介绍了如何从一个异步生产汇总数据,并将其写入到一个文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我学习异步/等待在C#中的模式。目前,我正在试图解决这样一个问题:


  • 有是产生每秒1000个封包生产者(硬件设备)。我需要这个数据记录到文件中。


  • 该装置只有一个 ReadAsync()方法一次报告一个数据包。


  • 我需要缓冲的数据包,并在他们它们生成该文件的顺序写,只有一次第二。


  • 如果写入过程未及时完成后,下一批数据包准备要写入写操作应该失败。


到目前为止,我已经写了类似下面。它的工作原理,但我不知道这是否是解决问题的最好办法。任何意见或建议?什么是接近这种生产者/消费者问题在哪里,消费者需要聚集来自生产者接收数据的最佳实践?

 静态异步任务TestLogger(设备装置,INT秒)
{
    const int的bufLength = 1000;
    布尔firstIteration = TRUE;
    任务writerTask = NULL;    使用(VAR作家=新的StreamWriter(test.log中)))
    {
        做
        {
            VAR缓冲=新的字节[bufLength] [];            的for(int i = 0; I< bufLength;我++)
            {
                缓冲区[i] =等待device.ReadAsync();
            }            如果(!firstIteration)
            {
                如果(!writerTask.IsCompleted)
                    抛出新的异常(写超时!);
            }            writerTask = Task.Run(()=>
                {
                    的foreach(缓冲区变种B)
                        writer.WriteLine(ToHexString(B));
                });            firstIteration = FALSE;
        }而(--seconds大于0);
    }
}


解决方案

您可以使用下面的理念,提供的标准冲水数据包的数量(最多1000个)。我没有测试它。它利用斯蒂芬·克利里的 AsyncProducerConsumerQueue&LT; T&GT; 刊登在<一个href=\"http://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont\">this问题。

  AsyncProducerConsumerQueue&LT;字节[]&GT; _队列;
流_stream;//制片人
异步任务ReceiveAsync(令牌的CancellationToken)
{
    而(真)
    {
       VAR名单=新名单,LT;字节&GT;();
       而(真)
       {
           token.ThrowIfCancellationRequested(标记);
           VAR包=等待_device.ReadAsync(标记);
           list.Add(包);
           如果(list.Count == 1000)
               打破;
       }
       //推下一批
       等待_queue.EnqueueAsync(list.ToArray(),令牌);
    }
}//消费
异步任务LogAsync(令牌的CancellationToken)
{
    任务previousFlush = Task.FromResult(0);
    CancellationTokenSource CTS = NULL;
    而(真)
    {
       token.ThrowIfCancellationRequested(标记);
       //获取下一批
       VAR nextBatch =等待_queue.DequeueAsync(标记);
       如果(!previousFlush.IsCompleted)
       {
           cts.Cancel(); //取消previous如果冲洗没有准备好
           抛出新的异常(无法按时刷新。);
       }
       等待previousFlush; //它的建成,观察任何错误
       //启动冲洗
       CTS = CancellationTokenSource.CreateLinkedTokenSource(令牌);
       previousFlush = _stream.WriteAsync(nextBatch,0,nextBatch.Count,cts.Token);
    }
}

如果你不想失败的记录,而是preFER取消冲洗,并进入下一个批次,您可以用最小的改变这个code这样做。

在回应@ l3arnon评论:


  

      
  1. 一个包不是一个字节,它的byte []。 2.您没有使用OP的ToHexString。 3. AsyncProducerConsumerQueue小得多健壮和
      比NET的TPL数据流测试。 4.您等待的错误previousFlush
      刚过你抛出一个异常,这使得该行的冗余。
      等等,总之:我认为可能的增值并不能证明这个
      非常复杂的解决方案。

  2.   


  1. 的数据包是不是一个字节,它的字节[] - 一个数据包的的一个字节,这是从OP的code显而易见的:缓冲[我] =等待device.ReadAsync()。于是,一批数据包的是字节[]

  2. 你还没有使用OP的ToHexString。 - 我们的目标是展示如何使用 Stream.WriteAsync 其中本地的接受取消标记,而不是 WriteLineAsync 不允许取消。这是微不足道的使用 ToHexString Stream.WriteAsync ,仍然采取取消支持优势:

      VAR hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch)+
        Environment.NewLine);
    _stream.WriteAsync(hexBytes,0,hexBytes.Length,令牌);


  3. AsyncProducerConsumerQueue稳健,净比的TPL数据流测试要少得多 - 我不认为这是一个确定的事实。但是,如果OP非常关心,他可以用普通的 BlockingCollection ,它不会阻止生产者线程。这是确定以阻止消费者线程在等待下一批,因为写入是并行进行。与此相反,你的TPL数据流的版本带有一个的冗余的CPU和锁定集约化经营:生产流水线将数据移动到作家pipleline与 logAction.Post(数据包),逐字节。我的code没有做到这一点。


  4. 你等待的错误previousFlush你抛出一个异常,这使得该行冗余之后。 - 这条线是不是多余的。也许,你错过了这一点: previousFlush.IsCompleted 真正 previousFlush.IsFaulted previousFlush.IsCancelled 真正 。因此,等待previousFlush 相关那里观察了的已完成的任务(例如,写入失败)的任何错误,否则将是丢失。


I'm learning about async/await patterns in C#. Currently I'm trying to solve a problem like this:

  • There is a producer (a hardware device) that generates 1000 packets per second. I need to log this data to a file.

  • The device only has a ReadAsync() method to report a single packet at a time.

  • I need to buffer the packets and write them in the order they are generated to the file, only once a second.

  • Write operation should fail if the write process is not finished in time when the next batch of packets is ready to be written.

So far I have written something like below. It works but I am not sure if this is the best way to solve the problem. Any comments or suggestion? What is the best practice to approach this kind of Producer/Consumer problem where the consumer needs to aggregate the data received from the producer?

static async Task TestLogger(Device device, int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}

解决方案

You could use the following idea, provided the criteria for flush is the number of packets (up to 1000). I did not test it. It makes use of Stephen Cleary's AsyncProducerConsumerQueue<T> featured in this question.

AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(), token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task previousFlush = Task.FromResult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!previousFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the previous flush if not ready
           throw new Exception("failed to flush on time.");
       }
       await previousFlush; // it's completed, observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
    }
}

If you don't want to fail the logger but rather prefer to cancel the flush and proceed to the next batch, you can do so with a minimal change to this code.

In response to @l3arnon comment:

  1. A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow. 4. You await previousFlush for errors just after you throw an exception which makes that line redundant. etc. In short: I think the possible added value doesn't justify this very complicated solution.

  1. "A packet is not a byte, it's byte[]" - A packet is a byte, this is obvious from the OP's code: buffer[i] = await device.ReadAsync(). Then, a batch of packets is byte[].
  2. "You haven't used the OP's ToHexString." - The goal was to show how to use Stream.WriteAsync which natively accepts a cancellation token, instead of WriteLineAsync which doesn't allow cancellation. It's trivial to use ToHexString with Stream.WriteAsync and still take advantage of cancellation support:

    var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
        Environment.NewLine);
    _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
    

  3. "AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow" - I don't think this is a determined fact. However, if the OP is concerned about it, he can use regular BlockingCollection, which doesn't block the producer thread. It's OK to block the consumer thread while waiting for the next batch, because writing is done in parallel. As opposed to this, your TPL Dataflow version carries one redundant CPU and lock intensive operation: moving data from producer pipeline to writer pipleline with logAction.Post(packet), byte by byte. My code doesn't do that.

  4. "You await previousFlush for errors just after you throw an exception which makes that line redundant." - This line is not redundant. Perhaps, you're missing this point: previousFlush.IsCompleted can be true when previousFlush.IsFaulted or previousFlush.IsCancelled is also true. So, await previousFlush is relevant there to observe any errors on the completed tasks (e.g., a write failure), which otherwise will be lost.

这篇关于如何从一个异步生产汇总数据,并将其写入到一个文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆