如何从任务异步返回WCF响应中的流? [英] How can I return a stream in a WCF response asynchronously from a task?

查看:83
本文介绍了如何从任务异步返回WCF响应中的流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个类,该类对大量数据进行长时间运行的处理,并将输出写入我提供的流中.我正在尝试将WCF放在此之上(使用命名管道),但是在弄清楚如何返回流时遇到了麻烦.到目前为止,我有这样的事情:

I have a class that does some long-running processing with lots of data and writes the output to a stream that I provide. I am trying to put a WCF front on this (using named pipes) but having trouble figuring out how to return the stream. I have something like this so far:

interface IProcessor { Stream GetStream(); }

class Host {
  static void Main(string[] args) {
    using (ServiceHost sh = new ServiceHost(typeof(Processor), new Uri[]{new Uri("net.pipe://localhost")})) {
      var binding = new NetNamedPipeBinding();
      binding.TransferMode = TransferMode.StreamedResponse;
      sh.AddServiceEndpoint(typeof(IProcessor), binding, "foo");
      sh.Open();
      Console.WriteLine("Waiting...");
      Console.ReadLine();
      sh.Close();
    }
  }
}

class Processor : IProcessor {
  Stream GetStream() {
    var SourceProcessor = new SourceProcessor(...);
    var s = new MemoryStream();
    new Task(() => { SourceProcessor.Run(s); }).Start();
    return s;
  }
}

class Client {
  static void Main(string[] args) {
    Console.WriteLine("Starting...");
    var binding = new NetNamedPipeBinding();
    binding.TransferMode = TransferMode.StreamedResponse;
    ChannelFactory<IProcessor> f = new ChannelFactory<IProcessor>(binding, new EndpointAddress("net.pipe://localhost/foo"));
    Console.WriteLine("Creating channel...");
    IProcessor eps = f.CreateChannel();
    Console.WriteLine("Getting stream.");
    Stream s = eps.GetStream();
    StreamReader sr = new StreamReader(s);
    while (!sr.EndOfStream) Console.WriteLine(sr.ReadLine());
    Console.ReadLine();
  }
}

一切都经过动作,但是当然没有源数据可以传递给客户端.我对如何做到这一点感到困惑(也许我做不到),因为我既需要返回流并运行任务,又可能等待任务完成.如果我不执行任务就只调用SourceProcessor.Run(s),据称它将阻塞并缓冲,但是我不确定如何等到任务完成后再返回流以供客户端读取...

Everything goes through the motions but of course none of the source data makes it through to the client. I'm confused as to how I can do this (maybe I can't) since I need to both return the stream and run the task and potentially wait for the task to finish. If I just call SourceProcessor.Run(s) without being in a task, it would block and buffer, allegedly, but I'm not sure how to make it wait until the task is done while also returning the stream for the client to read...

推荐答案

问题在于,如果WCF调用Read(并且该调用返回0个字节,它将认为流完成". MemoryStream会很乐意这样做,如果没有可用数据,它不会阻止读取.

The problem is WCF will think it the stream is "done" if it calls Read( and the call returns 0 bytes. MemoryStream will happily do that, it will not block reads if there is no data available.

问题的根源是WCF读取MemoryStream的速度比编写它的速度快,并且认为它已完成",解决该问题的方法是您将需要返回另一种类型的Stream没有可用数据时,块而不是返回0. .NET没有内置可实现此目的的东西,您将需要找到一个第三方类或创建一个自己的类(它可能很简单,就像从MemoryStream派生并覆盖Read一样,以阻止读取,直到完成"为止).标记已设置(请参见 BlockingCollection<T> 及其 CompleteAdding() 方法类似的行为)).

The source of your problem is WCF is reading the MemoryStream faster than you are writing to it and thinking it is "done", the way to fix it is you will need to return a different type of Stream that blocks instead of returning 0 when there is no data available. There is nothing built in to .NET that will do this, you will need to either find a 3rd party class or make your own (it may be as simple as derive from MemoryStream and override Read to block reads until a "Done" flag is set (See BlockingCollection<T> and its CompleteAdding() method for a similar behavior)).

出于娱乐目的,我将其整合在一起,虽然未经测试,但可以满足您的需求.

For fun I threw this together, it is totally untested but it might do what you need.

using System;
using System.Collections.Concurrent;
using System.IO;

namespace Example
{
    public class BufferStream : Stream
    {
        public BufferStream()
        {
            _data = new BlockingCollection<byte[]>();
        }


        /// <param name="boundedCapacity">The maximum number of calls to <see cref="Write"/> that can be made without
        /// the buffer being drained.</param>
        public BufferStream(int boundedCapacity)
        {
            _data = new BlockingCollection<byte[]>(boundedCapacity);
        }

        private readonly BlockingCollection<byte[]> _data;
        private byte[] _currentBlock = null;
        private int _currentBlockIndex = 0;

        public int BoundedCapacity { get { return _data.BoundedCapacity; } }

        public int BufferedWrites { get { return _data.Count; } }

        public bool IsAddingCompleted
        {
            get { return _data.IsAddingCompleted; }
        }

        public bool IsCompleted
        {
            get { return _data.IsCompleted; }
        }

        public void CompleteAdding()
        {
            _data.CompleteAdding();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            var localArray = new byte[count];

            //Copy the data in to a new buffer of exactly the count size.
            Array.Copy(buffer, offset, localArray, 0, count);

            _data.Add(localArray);
        }


        public override int Read(byte[] buffer, int offset, int count)
        {
            if (_currentBlock == null || _currentBlockIndex == _currentBlock.Length)
            {
                if (!GetNextBlock()) 
                    return 0;
            }

            int minCount = Math.Min(count, _currentBlock.Length - _currentBlockIndex);

            Array.Copy(_currentBlock, _currentBlockIndex, buffer, offset, minCount);
            _currentBlockIndex += minCount;

            return minCount;
        }

        /// <summary>
        /// Loads the next block in to <see cref="_currentBlock"/>.
        /// </summary>
        /// <returns>True if the next block was retrieved.</returns>
        private bool GetNextBlock()
        {
            if (!_data.TryTake(out _currentBlock))
            {
                //The TryTake failed, the collection is empty.

                //See if we are in the completed state.
                if (_data.IsCompleted)
                {
                    return false;
                }

                //Wait for more data to show up.
                try
                {
                    _currentBlock = _data.Take();
                }
                catch (InvalidOperationException)
                {
                    //If the blocking collection was marked complete while we where waiting Take throws a InvalidOperationException
                    return false;
                }
            }

            _currentBlockIndex = 0;
            return true;
        }

        #region Constant functions

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return true; }
        }

        public override void Flush()
        {
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        #endregion
    }
}

与从MemoryStream派生相比的优势在于,一旦WCF读取了一个值,它就不再需要保留在内存中(返回Stream而不是byte[]的整个过程)

The advantage of this over deriving from MemoryStream is once a value has been read by WCF it no longer needs to remain in memory (the entire point of returning a Stream instead of a byte[])

这篇关于如何从任务异步返回WCF响应中的流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆