强制EventProcessorHost将失败的Azure事件中心eventData重新传递到IEventProcessor.ProcessEvents方法 [英] Forcing EventProcessorHost to re-deliver failed Azure Event Hub eventData's to IEventProcessor.ProcessEvents method

查看:175
本文介绍了强制EventProcessorHost将失败的Azure事件中心eventData重新传递到IEventProcessor.ProcessEvents方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

该应用程序使用.NET 4.6.1和



此处提供了部分日志示例: https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

解决方案

TLDR :唯一可靠的方法重播失败的事件到 IEventProcessor.ProcessEventsAsync 是-关闭 EventProcessorHost (又名 EPH )立即-通过使用 eph.UnregisterEventProcessorAsync()或通过终止进程-基于si学费。这将使其他 EPH 实例获得此分区的租约&从上一个检查点开始。


在解释这一点之前-我想指出,这是一个重大问题&实际上,这是我们为 EPH 必须做出的最艰难的设计选择之一。我认为这是一个权衡的黑白:可用性 / 可支持性 > EPH 框架,而技术正确性


理想情况本来是:当 IEventProcessorImpl.ProcessEventsAsync 中的用户代码引发异常时- EPH 库不应捕获此错误。它应该让此 Exception -导致进程崩溃& 崩溃转储清楚地显示了调用堆栈的责任。我仍然相信-这是技术上最正确的 解决方案。


当前情况 IEventProcessorImpl.ProcessEventsAsync API& EPH


  1. ,只要 EventData EventHubs服务接收code>-继续使用 IEventProcessorImplementation.ProcessEventsAsync ) > EventData的&如果调用时用户回调引发错误,请通知 EventProcessorOptions.ExceptionReceived

  2. IEventProcessorImpl内的用户代码.ProcessEventsAsync 应该处理所有错误,并在必要时合并 Retry的 EPH 对此回调没有设置任何超时,以使用户可以完全控制处理时间。

  3. 如果发生特定事件是麻烦的原因-将 EventData 标记为特殊属性-例如ex:type = poison-event 并重新-发送到相同的 EventHub (包括指向实际事件的指针,复制这些 EventData.Offset SequenceNumber 到新的 EventData.ApplicationProperties 中)或将其转发到SERVICEBUS队列或存储在其他地方,基本上, identify& ;推迟处理有毒事件

  4. 如果您处理了所有可能的情况,但仍遇到 Exceptions -catch' em&关闭 EPH failfast 与此异常的过程。当 EPH 重新出现时-它将从where-it-left开始。


为什么要检查指向旧事件无效(请阅读以了解其中的 EPH 一般):


在幕后, EPH 在每个EventHub Consumergroup分区的接收器上运行一个泵-其工作是从以下位置启动接收器给定的检查点(如果存在)并创建 IEventProcessor 实现的专用实例,然后 receive 来自检查点中指定的 Offset 中的指定EventHub分区(如果不存在- EventProcessorOptions.initialOffsetProvider )并最终调用 IEventProcessorImpl.ProcessEventsAsync 检查点的目的是能够在 EPH 处理关机和拥有所有权时可靠地开始处理消息。分区已移动到另一个 EPH 实例。因此,检查点仅在启动 PUMP 时消耗,并且在泵启动后将读取。


在撰写本文时, EPH 的版本为 2.2.10



有关事件中心的更多常规阅读...


The application uses .NET 4.6.1 and the Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2, along with it's dependency WindowsAzure.ServiceBus package v3.0.1 to process Azure Event Hub messages.

The application has an implementation of IEventProcessor. When an unhandled exception is thrown from the ProcessEventsAsync method the EventProcessorHost never re-sends those messages to the running instance of IEventProcessor. (Anecdotally, it will re-send if the hosting application is stopped and restarted or if the lease is lost and re-obtained.)

Is there a way to force the event message that resulted in an exception to be re-sent by EventProcessorHost to the IEventProcessor implementation?

One possible solution is presented in this comment on a nearly identical question: Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync

The comment suggests holding a copy of the last successfully processed event message and checkpointing explicitly using that message when an exception occurs in ProcessEventsAsync. However, after implementing and testing such a solution, the EventProcessorHost still does not re-send. The implementation is pretty simple:

private EventData _lastSuccessfulEvent;

public async Task ProcessEventsAsync(
    PartitionContext context,
    IEnumerable<EventData> messages)
{
    try
    {
        await ProcessEvents(context, messages);     // does actual processing, may throw exception
        _lastSuccessfulEvent = messages
            .OrderByDescending(ed => ed.SequenceNumber)
            .First();
    }
    catch(Exception ex)
    {
        await context.CheckpointAsync(_lastSuccessfulEvent);
    }
}

An analysis of things in action:

A partial log sample is available here: https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

解决方案

TLDR: The only reliable way to re-play a failed batch of events to the IEventProcessor.ProcessEventsAsync is to - Shutdown the EventProcessorHost(aka EPH) immediately - either by using eph.UnregisterEventProcessorAsync() or by terminating the process - based on the situation. This will let other EPH instances to acquire the lease for this partition & start from the previous checkpoint.

Before explaining this - I want to call-out that, this is a great Question & indeed, was one of the toughest design choices we had to make for EPH. In my view, it was a trade-off b/w: usability/supportability of the EPH framework, vs Technical-Correctness.

Ideal Situation would have been: When the user-code in IEventProcessorImpl.ProcessEventsAsync throws an Exception - EPH library shouldn't catch this. It should have let this Exception - crash the process & the crash-dump clearly shows the callstack responsible. I still believe - this is the most technically-correct solution.

Current situation: The contract of IEventProcessorImpl.ProcessEventsAsync API & EPH is,

  1. as long as EventData can be received from EventHubs service - continue invoking the user-callback (IEventProcessorImplementation.ProcessEventsAsync) with the EventData's & if the user-callback throws errors while invoking, notify EventProcessorOptions.ExceptionReceived.
  2. User-code inside IEventProcessorImpl.ProcessEventsAsync should handle all errors and incorporate Retry's as necessary. EPH doesn't set any timeout on this call-back to give users full control over processing-time.
  3. If a specific event is the cause of trouble - mark the EventData with a special property - for ex:type=poison-event and re-send to the same EventHub(include a pointer to the actual event, copy these EventData.Offset and SequenceNumber into the New EventData.ApplicationProperties) or fwd it to a SERVICEBUS Queue or store it elsewhere, basically, identify & defer processing the poison-event.
  4. if you handled all possible cases and are still running into Exceptions - catch'em & shutdown EPH or failfast the process with this exception. When the EPH comes back up - it will start from where-it-left.

Why does check-pointing 'the old event' NOT work (read this to understand EPH in general):

Behind the scenes, EPH is running a pump per EventHub Consumergroup partition's receiver - whose job is to start the receiver from a given checkpoint (if present) and create a dedicated instance of IEventProcessor implementation and then receive from the designated EventHub partition from the specified Offset in the checkpoint (if not present - EventProcessorOptions.initialOffsetProvider) and eventually invoke IEventProcessorImpl.ProcessEventsAsync. The purpose of the Checkpoint is to be able to reliably start processing messages, when the EPH process Shutsdown and the ownership of Partition is moved to another EPH instances. So, checkpoint will be consumed only while starting the PUMP and will NOT be read, once the pump started.

As I am writing this, EPH is at version 2.2.10.

more general reading on Event Hubs...

这篇关于强制EventProcessorHost将失败的Azure事件中心eventData重新传递到IEventProcessor.ProcessEvents方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆