TPL DataFlow - 按持续时间或阈值进行批处理 [英] TPL DataFlow - Batching on duration or threshold

查看:47
本文介绍了TPL DataFlow - 按持续时间或阈值进行批处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经使用 TPL 数据流实现了生产者..消费者模式.用例是代码从 Kafka 总线读取消息.为了效率,我们需要在去数据库时批量处理消息.

I have implemented a producer..consumer pattern using TPL data flow. The use case is that code reads messages from the Kafka bus. For efficiency, we need to process messages in batches when going to the database.

在 TPL 数据流中是否有一种方法可以保留消息并在达到大小或持续时间阈值时触发?

Is there a way in TPL data flow to hold on to the message and fire whenever a size or duration threshold is hit?

例如,当前实现一旦从队列中拉出消息就发布消息.

Example, the current implementation post the message once it is pulled from the queue.

    postedSuccessfully = targetBuffer.Post(msg.Value);

推荐答案

按计数和持续时间进行缓冲已经可以通过 System.Reactive,特别是 Buffer 运算符.缓冲区收集传入事件,直到达到所需的计数或其时间跨度.

Buffering by count and duration is already available through the System.Reactive and specifically, the Buffer operator. Buffer collects incoming events until either the desired count is reached or its timespan expires.

Dataflow 块旨在与 System.Reactive 一起使用.块 可以转换为 Observables和观察者使用 DataflowBlock.AsObservable()AsObserver() 扩展方法.

Dataflow blocks are designed to work with System.Reactive. Blocks can be converted to Observables and Observers by using the DataflowBlock.AsObservable() and AsObserver() extension methods.

这使得构建缓冲块非常容易:

This makes building a buffering block very easy :

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}

此方法使用两个缓冲块来缓冲输入和输出.Buffer() 从输入块(observable)读取,并在批次已满或时间跨度到期时写入输出块(观察者).

This method uses two buffer blocks to buffer inputs and outputs. Buffer() reads from the input block (the observable) and writes to the output block (the observer) when either the batch is full or the timespan expires.

默认情况下,Rx 工作在当前线程上.通过调用 ObserveOn(TaskPoolScheduler.Default),我们告诉它在任务池线程上处理数据.

By default, Rx works on the current thread. By calling ObserveOn(TaskPoolScheduler.Default) we tell it to process data on a Task pool thread.

示例

此代码为 5 个项目或 1 秒创建了一个缓冲块.它首先发布 7 个项目,等待 1.1 秒然后发布另外 7 个项目.每个批次都与线程 ID 一起写入控制台:

This code creates a buffer block for 5 items or 1 second. It starts by posting 7 items, waits 1.1 seconds then posts another 7 items. Each batch is written to the console together with the thread ID :

static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

输出是:

Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6

这篇关于TPL DataFlow - 按持续时间或阈值进行批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆