具有事件中心触发器的Azure函数写入重复消息 [英] Azure functions with Event Hub trigger writes duplicate messages

查看:19
本文介绍了具有事件中心触发器的Azure函数写入重复消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有事件中心触发器的Azure函数。此集线器从设备接收消息并将其存储在Blob中。最近,我注意到重复的消息存储在BLOB中。BLOB商店中的文件是按上次修改日期排序的,如果你看一下屏幕截图,你会发现情况并非如此。以前是否有人见过此问题?

我还有一个写入cosmos DB的Azure函数,对于blob中的重复消息,cosmos中没有对应的重复消息。

我还连接了也没有任何重复消息的时间序列洞察力。

我打开了事件中心捕获,并且那里也没有重复的消息。

这是屏幕截图。

第一列是事件中心入队时间的Unix时间戳。如果我没有与文件名关联的GUID,它将抛出异常。下面是在BLOB中存储数据的代码片断。

dynamic msg = JObject.Parse(myEventHubMessage);
string deviceId = msg.deviceId;
if (deviceId == "5Y.....")
{
           var filename = "_" + ((DateTimeOffset)enqueuedTimeUtc).ToUnixTimeSeconds() + "_" + Guid.NewGuid().ToString() + ".json";
        
           var containerName = "containerName/";
        
           var path = containerName + deviceId + "/" + filename;
        
           using (var writer = binder.Bind<TextWriter>(new BlobAttribute(path)))
           {
                writer.Write(myEventHubMessage);
           }
 }

这里的逻辑非常简单。如果事件到达事件中心,则会触发该函数并将数据存储在Azure Blob中。

推荐答案

一个重要提示是,事件集线器具有at-least-once delivery guarantee;强烈建议确保您的处理能够以适合您的应用程序方案的任何方式恢复到事件复制。

对于您在本例中看到的复制,Azure函数的绑定使用EventProcessorHost来读取事件并触发函数代码的执行。随着Azure函数自动向上和向下扩展,EventProcessorHost的实例将加入并离开负责处理配置的事件中心的使用者组。

当处理器启动时,它将尝试平衡要处理的工作与同一使用者组中活动的其他处理器。在处理器不能通过声明无主分区达到其公平的工作份额的情况下,它将尝试从其他处理器窃取分区的所有权。在此期间,新所有者将开始从最后记录的检查点读取数据。同时,旧所有者可能会将其上次读取的事件分派给处理程序进行处理;在尝试从Event Hubs服务读取下一组事件之前,它不会理解所有权已更改。当处理器关闭并放弃其分区所有权时,也会发生类似的模式。

结果是,当处理器启动或停止时,您将看到一些重复事件被处理,当处理器达到负载平衡的稳定状态时,这些事件会平息下来。该窗口持续时间应该较短,但根据处理器配置和正在使用的检查点策略的不同而有所不同。

这篇关于具有事件中心触发器的Azure函数写入重复消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆