如何使用2个数据源返回异步流 [英] How can I have an async stream return with 2 data sources

查看:79
本文介绍了如何使用2个数据源返回异步流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我具有以下函数,该函数以运行 System.Diagnostics.Process 的结果作为异步流返回标准输出数据.该方法中当前的所有操作均按预期进行.我可以在 await foreach()循环中调用它,并获得由外部exe生成的每一行输出.

I have the following function that returns the standard output data, as an async stream, that results from running a System.Diagnostics.Process. Everything currently in the method works as intended; I can call it in an await foreach() loop and I get each line of output as its generated by the external exe.

private static async IAsyncEnumerable<string> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handler back to this method
   BufferBlock<string> dataBuffer = new BufferBlock<string>();

   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         dataBuffer.Complete();
      }
      else
      {
         dataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line  
   while (await dataBuffer.OutputAvailableAsync())
      yield return dataBuffer.Receive();
}

我的问题是,现在我需要它同时返回标准输出和标准错误结果.我做了这个类来保存每个流中的数据.

My problem is that now I need it to return both the standard output and standard error results. I made this class to hold the data from each stream.

public class ProcessData
{
   public string Error { get; set; } = "";
   public string Output { get; set; } = "";
}

并更改了 ProcessAsyncStream()使其看起来像这样

and changed ProcessAsyncStream() to look like this

private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handlers back to this method
   BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
   BufferBlock<string> errorDataBuffer = new BufferBlock<string>();

   
   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         outputDataBuffer.Complete();
      }
      else
      {
         outputDataBuffer.Post(e.Data);
      }
   };

   process.ErrorDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         errorDataBuffer.Complete();
      }
      else
      {
         errorDataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line
   while (await outputDataBuffer.OutputAvailableAsync()
          || await errorDataBuffer.OutputAvailableAsync())
      yield return new ProcessData() 
      {
         Error = errorDataBuffer.Receive(), 
         Output = outputDataBuffer.Receive()
      }
}

问题在于,如果一个缓冲区在方法挂起之前就完成了,因为该缓冲区的 .Receive()没有任何要接收的数据.如果将 while 条件更改为&& ,那么我将不会从另一个缓冲区获取所有数据.

The problem is that if either buffer completes before the other than the method hangs up because that buffer's .Receive() doesn't have any data to receive. If I change the while condition to && then I won't get all the data from the other buffer.

有什么建议吗?

推荐答案

关于实际问题,读取块的流程存在问题.最简单的解决方案是只使用带有多个生产者的单个缓冲区和单个消费者以及消息包

Regarding the actual problem, there is an issue with the process flow of reading the blocks. The easiest solution is to just use a single buffer with multiple producers and a single consumer combined with a message packet.

您要使用 DataFlow块解决的概念性问题是事件异步流的基本本质.推送事件,并提取异步流.

The conceptual issue that you are trying to solve with the DataFlow blocks is in the fundamental nature of events an async streams. Events are pushed, and async streams are pulled.

有几种解决方案可以将它们映射在一起,尽管我认为最优雅的方法是使用 Unbounded Channel 作为 buffer .

There are several solutions that would map them together, though I think the most elegant would be just to use an Unbounded Channel as the buffer.

与DataFlow相比,Channel是更现代的方法,与 BufferBlock 相比,它们具有更少的自由度,更笨拙,并且非常轻巧且经过高度优化.另外,我只需要为不同的响应类型传递包装器.

Channels are more modern approach than DataFlow, have less degrees of freedom, less clunky then a BufferBlock, and very lightweight and highly optimized. Additionally, I would just pass a wrapper for the different response types.

忽略任何其他问题(概念性或其他问题).

给予

public enum MessageType
{
   Output,
   Error
}

public class Message
{
   public MessageType MessageType { get; set; }
   public string Data { get; set; }

   public Message(string data, MessageType messageType )
   {
      Data = data;
      MessageType = messageType;
   }
}

用法

private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
     ProcessStartInfo processStartInfo, 
     CancellationToken cancellationToken)
{
   using var process = new Process() { StartInfo = processStartInfo };

   var ch = Channel.CreateUnbounded<Message>();
   var completeCount = 0;

   void OnReceived(string data, MessageType type)
   {
      // The Interlocked memory barrier is likely overkill here
      if (data is null && Interlocked.Increment(ref completeCount) == 2)
         ch?.Writer.Complete();
      else
         ch?.Writer.WriteAsync(new Message(data, type), cancellationToken);
   }

   process.OutputDataReceived += (_, args) => OnReceived(args.Data, MessageType.Output);
   process.ErrorDataReceived += (_, args) => OnReceived(args.Data, MessageType.Error);

   // start the process 
   // ...

   await foreach (var message in ch.Reader
           .ReadAllAsync(cancellationToken)
           .ConfigureAwait(false))
      yield return message;

   // cleanup
   // ...
}

注意:完全未经测试

这篇关于如何使用2个数据源返回异步流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆