TPL 数据流生产者消费者模式 [英] TPL Dataflow ProducerConsumer Pattern

查看:33
本文介绍了TPL 数据流生产者消费者模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

刚刚使用 TPL DataFlow 编写了一个示例生产者消费者模式.我有一些基本的问题.

Just wrote a sample producer consumer pattern using TPL DataFlow. I have some basic questions here.

  1. 只有在生产者发布所有项目后,消费者才处于活动状态.异步是否意味着生产和消费任务可以并行运行.

  1. The consumer is active only after all the items are posted from the producer. Does asynchronous means that both produce and consume tasks can run in parallel.

给消费者一个睡眠时间来验证它是否阻塞了其他数据项.它似乎是按顺序执行的,没有获得任何并行性.

Given a sleep time in consumer to verify if its blocking other data items. It seems to be executing sequentially and not getting any parallelism.

我在这里做错了吗?

class AscDataBlocks
{
    public Int64 start;
    public Int64 End;
    //public string ThreadName;
    public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform) 
    // This is using TPL DataBlock producer consumer pattern.
    {
        for (var i = start; i < End; i++)
        {
            Console.WriteLine("Postingasc : {0}", i);
            targetAscTransform.Post(i);
        }
    }

    public void ProcessDataBuffer(Int64 ascDataSet)
    {
        if (ascDataSet == 5) 
        // Testing if this will delay all the other data processing
            Thread.Sleep(5000);
        else
            Thread.Sleep(500);                
        Console.WriteLine(ascDataSet);
    }

    // Demonstrates the consumption end of the producer and consumer pattern. 
    public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
    {
        // Initialize a counter to track the number of bytes that are processed. 
        int status = 0;

        // Read from the source buffer until the source buffer has no  
        // available output data. 
        while (await source.OutputAvailableAsync())
        {
            Int64 data;
            source.TryReceive(out data);
            ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
            // Increment the count of bytes received.
            status = 1;
        }
        return status;
    }
}

static void Main(string[] args)
{
    AscDataBlocks ascb;
    BufferBlock<Int64> ascbuffer;
    System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
    CancellationToken ct = new CancellationToken();
    CancellationTokenSource cts = new CancellationTokenSource();
    ascb = new AscDataBlocks();
    ascb.start = 1;
    ascb.End = 100;           
    try
    {
        ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            CancellationToken = ct
        });// Initiallize the buffer bloack
        ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
        //Post source data to the dataflow block.
        ascb.AscBufferProducer(ascbuffer);
        ascProcessingconsumer.Wait();
    }
    catch (Exception ex)
    {
        //foreach (var v in ex.InnerExceptions)
        //    Console.WriteLine("msg: " + v.Message);                
    }            
}  

推荐答案

只有在生产者发布所有项目后,消费者才处于活动状态.异步是否意味着生产和消费任务可以并行运行.

The consumer is active only after all the items are posted from the producer. Does asynchronous means that both produce and consume tasks can run in parallel.

发生这种情况是因为您在消费者有机会开始之前很快发布了所有商品.如果您添加了 Thread.Sleep(100),您会看到它们实际上是并行工作的.

This happens because you post all your items very quickly, before the consumer has a chance to start. If you added Thread.Sleep(100), you would see that they actually do work in parallel.

给消费者一个睡眠时间来验证它是否阻塞了其他数据项.它似乎是按顺序执行的,没有任何并行性.

Given a sleep time in consumer to verify if its blocking other data items. It seems to be executing sequentially and not getting any parallelism.

TPL 数据流并不神奇:它不会修改您的代码以并行执行.是您调用了 AscTransConsumerAsync() 一次,所以不要对它实际上只执行一次感到惊讶.

TPL Dataflow is not magic: it won't modify your code to execute in parallel. It's you who calls AscTransConsumerAsync() once, so don't be surprised that it actually executes only once.

TDF 确实支持并行处理,但您实际上需要让它执行处理代码.为此,请使用执行块之一.在您的情况下,ActionBlock 似乎合适.

TDF does support processing in parallel, but you would need to actually let it execute the processing code. To do this, use one of the execution blocks. In your case ActionBlock seems appropriate.

如果使用它,则可以通过设置 MaxDegreeOfParallelism.当然,这样做意味着您需要确保处理委托是线程安全的.

If you use that, you can then configure the block to execute in parallel by setting MaxDegreeOfParallelism. Of course, doing that means you need to ensure that the processing delegate is thread-safe.

有了这个,AscTransConsumerAsync() 现在可能看起来像:

With that, AscTransConsumerAsync() might now look something like:

public async Task<Int64> AscTransConsumerAsync(ISourceBlock<Int64> source)
{
    // counter to track the number of items that are processed
    Int64 count = 0;

    var actionBlock = new ActionBlock<Int64>(
        data =>
        {
            ProcessDataBuffer(data);
            // count has to be accessed in a thread-safe manner
            // be careful about using Interlocked,
            // for more complicated computations, locking might be more appropriate
            Interlocked.Increment(ref count);
        },
        // some small constant might be better than Unbounded, depedning on circumstances
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    source.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    // this assumes source will be completed when done,
    // you need to call ascbuffer.Complete() after AscBufferProducer() for this
    await actionBlock.Completion;

    return count;
}

这篇关于TPL 数据流生产者消费者模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆