如何在TPL数据流中安排流量控制? [英] How do I arrange flow control in TPL Dataflows?

查看:343
本文介绍了如何在TPL数据流中安排流量控制?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图让我的头围绕在TPL数据流控制数据流。我有一个非常快的制片人,一个非常缓慢的消费。 (我真正的代码是比较复杂的,但没有少,这是一个很好的模型,它再现了问题。)



当我运行它,代码开始喝喜欢它的内存是走出去的风格 - 和生产者输出队列一样快,因为它可以填满。我真正希望看到的是生产者停止运行一段时间,直到消费者有机会提出要求。从我的文档的阅读,这是什么是应该发生的:那就是,我觉得制片人等待,直到消费者有空间



这是不是情况下,清晰。 ?如何解决它,这样的队列不发疯

 使用系统; 
使用System.Linq的;使用System.Threading.Tasks
;
使用System.Threading.Tasks.Dataflow;
使用的System.Threading;

命名空间MemoryLeakTestCase
{
类节目
{

静态无效的主要(字串[] args)
{
VAR CreateData =新TransformManyBlock< INT,串>(忽略=>
{
返回Enumerable.Range(0,1000 * 1000 * 1000)。选择((S,I)=> 你好,世界+ I);
});

变种ParseFile =新TransformManyBlock<字符串,字符串>(fileContent =>
{
Thread.sleep代码(1000);
返回Enumerable.Range(0, 100)。选择((SST,III)=>你好,+三);
},新ExecutionDataflowBlockOptions(){BoundedCapacity = 1000}
);

变种EndOfTheLine =新ActionBlock<对象>(F =>
{
});


变种linkOptions =新DataflowLinkOptions {PropagateCompletion =真,};
CreateData.LinkTo(ParseFile,linkOptions);
ParseFile.LinkTo(EndOfTheLine,linkOptions);

任务T =新建任务(()=>
{
,而(真)
{
Console.WriteLine(CreateData:+报告(CreateData));
Console.WriteLine(ParseData:+报告(ParseFile));
Console.WriteLine(NullTarget:+ EndOfTheLine.InputCount);
的Thread.Sleep (1000);
}

});
t.Start();

CreateData.SendAsync(0);
CreateData.Complete();

EndOfTheLine.Completion.Wait();
}

公共静态字符串报告< T,U>(TransformManyBlock< T,U>块)
{
返回的String.Format(INPUT:{0 }输出:{1},block.InputCount.ToString()PadLeft(10,''),block.OutputCount.ToString()PadLeft(10,''));
}


}
}


解决方案

通常情况下,你会怎么做一个像这样的情况是还可以设置 CreateData的 BoundedCapacity 块。但是,这不会在这里工作,因为 TransformManyBlock 似乎并没有采取 BoundedCapacity 考虑填充输出队列时从单一的的IEnumerable



你可以做的反而是创建一个迭代的收集和使用<功能code> SendAsync()来发送更多的数据,只有当目标可以接受他们:

  ///<&言论GT; 
///如果迭代数据抛出一个异常,目标块出现故障
///和返回任务成功完成。
///
///根据不同的使用情况,这可能是也可能不是你想要的。
///< /言论>
公共静态异步任务SendAllAsync< T>(
本ITargetBlock< T>的目标,IEnumerable的< T>数据)
{

{
的foreach (数据VAR项)
{
等待target.SendAsync(项目);
}
}
赶上(例外五)
{
target.Fault(E);
}
}



用法:

  VAR数据= Enumerable.Range(0,1000 * 1000 * 1000)。选择((S,I)=>中的Hello,World+ I); 
等待ParseFile.SendAllAsync(数据);
ParseFile.Complete();

如果你仍然想有 CreateData 块这将类似的行为的原代码,你可以有两个所约束的 BufferBlock S, SendAllAsync()之间,然后使用封装()来使它们看起来像一个块:

  / //<&言论GT; 
/// boundedCapacity表示输入队列
///和输出队列的容量分别,而不是他们的总。
///< /言论>
公共静态IPropagatorBlock< TInput,TOutput>
CreateBoundedTransformManyBlock< TInput,TOutput>(
Func键< TInput,IEnumerable的< TOutput>>变换,INT boundedCapacity)
{
VAR输入=新BufferBlock< TInput>(
新DataflowBlockOptions {BoundedCapacity = boundedCapacity});
无功输出=新BufferBlock< TOutput>(
新DataflowBlockOptions {BoundedCapacity = boundedCapacity});

Task.Run(
异步()=>
{

{
,而(等待input.OutputAvailableAsync())
{
VAR数据=变换(等待input.ReceiveAsync());

等待output.SendAllAsync(数据);
}

output.Complete();
}
赶上(例外五)
{
((IDataflowBlock)输入).Fault(E);
((IDataflowBlock)输出).Fault(E);
}
});

返回DataflowBlock.Encapsulate(输入,输出);
}


I'm trying to get my head around controlling dataflow in TPL Dataflow. I have a very fast producer, and a very slow consumer. (My real code is more complex, but none the less, this is a pretty good model and it reproduces the problem.)

When I run it, the code starts drinking memory like it's going out of style--and the output queue on the producer fills up as fast as it can. What I'd really prefer to see is the Producer stop running for a while, until the Consumer has a chance to ask for it. From my readings of the documentation, this is what is supposed to happen: that is, I thought that the producer waits until the consumer has space.

This isn't the case, clearly. How do I fix it so that the queue doesn't go crazy?

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

namespace MemoryLeakTestCase
{
    class Program
    {

        static void Main(string[] args)
        {
            var CreateData = new TransformManyBlock<int, string>(ignore =>
            {
                return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
            });

            var ParseFile = new TransformManyBlock<string, string>(fileContent =>
            {
                Thread.Sleep(1000);
                return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
            }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
            );

            var EndOfTheLine = new ActionBlock<object>(f =>
                {
                });


            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
            CreateData.LinkTo(ParseFile, linkOptions);
            ParseFile.LinkTo(EndOfTheLine, linkOptions);

            Task t = new Task(() =>
            {
                while (true)
                {
                    Console.WriteLine("CreateData: " + Report(CreateData));
                    Console.WriteLine("ParseData:  " + Report(ParseFile));
                    Console.WriteLine("NullTarget: " +  EndOfTheLine.InputCount );
                    Thread.Sleep(1000);
                }

            });
            t.Start();

            CreateData.SendAsync(0);
            CreateData.Complete();

            EndOfTheLine.Completion.Wait();
        }

        public static string Report<T, U>(TransformManyBlock<T, U> block)
        {
            return String.Format("INPUT: {0}   OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
        }


    }
}

解决方案

Normally, what you would do in a situation like this is to also set BoundedCapacity of the CreateData block. But that won't work here, because TransformManyBlock doesn't seem to take BoundedCapacity into account when filling the output queue from a single IEnumerable.

What you can do instead is to create a function that iterates the collection and uses SendAsync() to send more data only when the target can accept them:

/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
/// 
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
    this ITargetBlock<T> target, IEnumerable<T> data)
{
    try
    {
        foreach (var item in data)
        {
            await target.SendAsync(item);
        }
    }
    catch (Exception e)
    {
        target.Fault(e);
    }
}

Usage:

var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();

If you still wanted to have CreateData block that would behave similarly to your original code, you could have two bounded BufferBlocks, SendAllAsync() between them and then use Encapsulate() to make them look like one block:

/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
    CreateBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
    var input = new BufferBlock<TInput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
    var output = new BufferBlock<TOutput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

    Task.Run(
        async () =>
        {
            try
            {
                while (await input.OutputAvailableAsync())
                {
                    var data = transform(await input.ReceiveAsync());

                    await output.SendAllAsync(data);
                }

                output.Complete();
            }
            catch (Exception e)
            {
                ((IDataflowBlock)input).Fault(e);
                ((IDataflowBlock)output).Fault(e);
            }
        });

    return DataflowBlock.Encapsulate(input, output);
}

这篇关于如何在TPL数据流中安排流量控制?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆