调用OnMessage()后自动处理BrokeredMessage [英] BrokeredMessage Automatically Disposed after calling OnMessage()

查看:95
本文介绍了调用OnMessage()后自动处理BrokeredMessage的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将Azure Service Bus中的项目排队,以便可以批量处理它们.我知道Azure服务总线具有ReceiveBatch(),但是由于以下原因,它似乎有问题:

  • 我一次最多只能收到256条消息,即使这样也可以根据消息大小随机选择.
  • 即使我偷看有多少消息正在等待,我也不知道要发出多少RequestBatch调用,因为我不知道每个调用会给我带来多少消息.由于消息会不断传入,因此我无法继续发送请求,直到消息为空为止,因为它永远不会为空.

我决定只使用消息侦听器,该消息侦听器比进行浪费的窥视便宜,并且可以给我更多的控制权.

基本上,我试图让一定数量的消息建立起来,然后立即处理它们.我使用计时器强制延迟,但我需要以便能够在我的物品进来时将它们排队.

根据我的计时器要求,似乎阻塞收集不是一个好选择,所以我尝试使用ConcurrentBag.

  var batchingQueue = new ConcurrentBag< BrokeredMessage>();myQueueClient.OnMessage((m)=>{Console.WriteLine(正在排队的消息");batchingQueue.Add(m);});一会儿(true){var sw = WaitableStopwatch.StartNew();BrokeredMessage消息;while(batchingQueue.TryTake(out msg))//< ==对象已经被丢弃{...执行此操作,直到我准备好将一千个文件批量写入数据库Console.WriteLine(正在完成消息");msg.Complete();//< ==此处有错误}sw.Wait(MINIMUM_DELAY);} 

但是,一旦我在OnMessage之外访问消息管道,它显示BrokeredMessage已被处置.

我认为这一定是OnMessage的某种自动行为,除了立即处理我不想执行的消息外,我看不到任何处理消息的方法.

解决方案

使用 BlockingCollection 做到这一点非常容易.

  var batchingQueue = new BlockingCollection< BrokeredMessage>();myQueueClient.OnMessage((m)=>{Console.WriteLine(正在排队的消息");batchingQueue.Add(m);}); 

您的使用者线程:

  foreach(batchingQueue.GetConsumingEnumerable()中的var msg){Console.WriteLine(正在完成消息");msg.Complete();} 

GetConsumingEnumerable 返回一个迭代器,该迭代器消耗队列中的项目,直到设置 IsCompleted 属性并且队列为空.如果队列为空,但是 IsCompleted False ,则它不忙于等待下一个项目.

要取消使用者线程(即关闭程序),请停止在队列中添加内容,并让主线程调用 batchingQueue.CompleteAdding .使用者将清空队列,看到 IsCompleted 属性为 True ,然后退出.

在这里使用 BlockingCollection 优于 ConcurrentBag ConcurrentQueue ,因为 BlockingCollection 接口更易于使用.特别是,使用 GetConsumingEnumerable 可使您不必担心检查计数或进行繁忙的等待(轮询循环).就是这样.

还请注意, ConcurrentBag 具有一些相当奇怪的删除行为.特别是,删除项目的顺序因哪个线程删除项目而异.创建袋子的线程以与其他线程不同的顺序删除项目.详情请参见使用ConcurrentBag集合.

您还没有说过为什么要对输入项进行批处理.除非有最重要的性能原因,否则使用批处理逻辑将代码复杂化似乎不是一个特别好的主意.


如果要批量写入数据库,则建议使用简单的 List< T> 缓冲项目.如果必须在将项目写入数据库之前对其进行处理,请使用上面显示的技术来处理它们.然后,而不是直接写入数据库,而是将该项目添加到列表中.当列表中有1,000个项目或经过了给定的时间后,请分配一个新列表并启动一个任务以将旧列表写入数据库.像这样:

 //在类范围内//每5分钟冲洗一次.私有只读TimeSpan FlushDelay = TimeSpan.FromMinutes(5);私有const int MaxBufferItems = 1000;//为缓冲区刷新创建一个计时器.System.Threading.Timer _flushTimer =新的System.Threading.Timer(TimedFlush,FlushDelay.TotalMilliseconds,Timeout.Infinite);//列表的锁.除非你得到成千上万//每秒的项目数,这不会造成性能问题.对象_listLock = new Object();列表< BrokeredMessage>_recordBuffer = new List< BrokeredMessage>(); 

然后,在您的消费者中:

  foreach(batchingQueue.GetConsumingEnumerable()中的var msg){//处理消息Console.WriteLine(正在完成消息");msg.Complete();锁(_listLock){_recordBuffer.Add(msg);如果(_recordBuffer.Count> = MaxBufferItems){//停止计时器_flushTimer.Change(Timeout.Infinite,Timeout.Infinite);//保存旧列表并分配新列表var myList = _recordBuffer;_recordBuffer = new List< BrokeredMessage>();//启动任务以写入数据库Task.Factory.StartNew(()=> FlushBuffer(myList));//重新启动计时器_flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite);}}}私人void TimedFlush(){bool lockTaken = false;列表< BrokeredMessage>myList = null;尝试{如果(Monitor.TryEnter(_listLock,0,outlockTaken)){//保存旧列表并分配新列表myList = _recordBuffer;_recordBuffer = new List< BrokeredMessage>();}}最后{如果(lockTaken){Monitor.Exit(_listLock);}}如果(myList!= null){FlushBuffer(myList);}//重新启动计时器_flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite);} 

这里的想法是让您摆脱旧列表的困扰,分配一个新列表,以便继续进行处理,然后将旧列表的项目写入数据库.那里的锁可以防止计时器和记录计数器彼此踩踏.没有锁,事情可能会在一段时间内运行良好,然后在不可预测的时间出现奇怪的崩溃.

我喜欢这种设计,因为它消除了消费者的轮询.我唯一不喜欢的是消费者必须了解计时器(即必须先停止然后重新启动计时器).再多考虑一下,我就可以消除这一要求.但是它的编写方式很好用.

I am trying to queue up items from an Azure Service Bus so I can process them in bulk. I am aware that the Azure Service Bus has a ReceiveBatch() but it seems problematic for the following reasons:

  • I can only get a max of 256 messages at a time and even this then can be random based on message size.
  • Even if I peek to see how many messages are waiting I don't know how many RequestBatch calls to make because I don't know how many messages each call will give me back. Since messages will keep coming in I can't just continue to make requests until it's empty since it will never be empty.

I decided to just use the message listener which is cheaper than doing wasted peeks and will give me more control.

Basically I am trying to let a set number of messages build up and then process them at once. I use a timer to force a delay but I need to be able to queue my items as they come in.

Based on my timer requirement it seemed like the blocking collection was not a good option so I am trying to use ConcurrentBag.

var batchingQueue = new ConcurrentBag<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

while (true)
{
    var sw = WaitableStopwatch.StartNew();
    BrokeredMessage msg;
    while (batchingQueue.TryTake(out msg)) // <== Object is already disposed
    {
        ...do this until I have a thousand ready to be written to DB in batch
        Console.WriteLine("Completing message");
        msg.Complete(); // <== ERRORS HERE
    }

    sw.Wait(MINIMUM_DELAY);
}

However as soon as I access the message outside of the OnMessage pipeline it shows the BrokeredMessage as already being disposed.

I am thinking this must be some automatic behavior of OnMessage and I don't see any way to do anything with the message other than process it right away which I don't want to do.

解决方案

This is incredibly easy to do with BlockingCollection.

var batchingQueue = new BlockingCollection<BrokeredMessage>();

myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

And your consumer thread:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    Console.WriteLine("Completing message");
    msg.Complete();
}

GetConsumingEnumerable returns an iterator that consumes items in the queue until the IsCompleted property is set and the queue is empty. If the queue is empty but IsCompleted is False, it does a non-busy wait for the next item.

To cancel the consumer thread (i.e. shut down the program), you stop adding things to the queue and have the main thread call batchingQueue.CompleteAdding. The consumer will empty the queue, see that the IsCompleted property is True, and exit.

Using BlockingCollection here is better than ConcurrentBag or ConcurrentQueue, because the BlockingCollection interface is easier to work with. In particular, the use of GetConsumingEnumerable relieves you from having to worry about checking the count or doing busy waits (polling loops). It just works.

Also note that ConcurrentBag has some rather strange removal behavior. In particular, the order in which items are removed differs depending on which thread removes the item. The thread that created the bag removes items in a different order than other threads. See Using the ConcurrentBag Collection for the details.

You haven't said why you want to batch items on input. Unless there's an overriding performance reason to do so, it doesn't seem like a particularly good idea to complicate your code with that batching logic.


If you want to do batch writes to the database, then I would suggest using a simple List<T> to buffer the items. If you have to process the items before they're written to the database, then use the technique I showed above to process them. Then, rather writing directly to the database, add the item to a list. When the list gets 1,000 items, or a given amount of time elapses, allocate a new list and start a task to write the old list to the database. Like this:

// at class scope

// Flush every 5 minutes.
private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5);
private const int MaxBufferItems = 1000;

// Create a timer for the buffer flush.
System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush, FlushDelay.TotalMilliseconds, Timeout.Infinite);  

// A lock for the list. Unless you're getting hundreds of thousands
// of items per second, this will not be a performance problem.
object _listLock = new Object();

List<BrokeredMessage> _recordBuffer = new List<BrokeredMessage>();

Then, in your consumer:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    // process the message
    Console.WriteLine("Completing message");
    msg.Complete();
    lock (_listLock)
    {
        _recordBuffer.Add(msg);
        if (_recordBuffer.Count >= MaxBufferItems)
        {
            // Stop the timer
            _flushTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // Save the old list and allocate a new one
            var myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();

            // Start a task to write to the database
            Task.Factory.StartNew(() => FlushBuffer(myList));

            // Restart the timer
            _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
        }
    }
}

private void TimedFlush()
{
    bool lockTaken = false;
    List<BrokeredMessage> myList = null;

    try
    {
        if (Monitor.TryEnter(_listLock, 0, out lockTaken))
        {
            // Save the old list and allocate a new one
            myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(_listLock);
        }
    }

    if (myList != null)
    {
        FlushBuffer(myList);
    }

    // Restart the timer
    _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
}

The idea here is that you get the old list out of the way, allocate a new list so that processing can continue, and then write the old list's items to the database. The lock is there to prevent the timer and the record counter from stepping on each other. Without the lock, things would likely appear to work fine for a while, and then you'd get weird crashes at unpredictable times.

I like this design because it eliminates polling by the consumer. The only thing I don't like is that the consumer has to be aware of the timer (i.e. it has to stop and then restart the timer). With a little more thought, I could eliminate that requirement. But it works well the way it's written.

这篇关于调用OnMessage()后自动处理BrokeredMessage的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆