Azure Event Hub Java客户端未处理数据 [英] Data not being processed by Azure Event Hub Java client

查看:63
本文介绍了Azure Event Hub Java客户端未处理数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

遵循 EventProcessorHost示例,我们在onEvents()中实现了自定义逻辑.某些数据未得到处理,我怀疑这是由于Java客户端引发的警告所致.

Following the EventProcessorHost example we implemented our custom logic in onEvents(). Some data is not being processed and I suspect this is because of the warnings thrown by the Java client.

在日志中,我们看到StorageException(用于更新租约或检查点的Blob存储超时),LeaseLostException(可能是由于先前的异常)和EventHubException(事件中心在短时间内移动或脱机时).

In the log we see StorageException (time-out on blob storage for renewing leases or checkpointing), LeaseLostException (probably due to the previous exception) and EventHubException (when event hub move or go offline for a short period).

基本上我的问题是:这些异常如何影响事件的处理?我们如何确保没有跳过任何事件(例如,通过带有重试的异常处理和作为最后手段完全关闭)?

我阅读了 docs ,并搜索其他无法找到满意答案的问题(提供了一些见识).

I read through the docs and searched through other questions unable to find a satisfactory answer (this and this one provide some insight).

我们的代码:

public class EventProcessor implements IEventProcessor {
    ...
    @Override
    public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception {
        for (EventData event : events) {
            try {
                String message = new String(event.getBytes(), StandardCharsets.UTF_8);

                mystuff.process(message);

                this.checkpointBatchingCount++;
                if ((checkpointBatchingCount % 50) == 0) {
                    context.checkpoint(data).get();
                }
            } catch (Exception e) {
                LOG.warn("Processing event failed: {}", e.getMessage())
            }
        }
    }
    ...
}

推荐答案

据我对EventProcessor的了解,您将重新处理事件,而不是丢失事件.可能还有另一个潜在的问题.

From my understanding of the EventProcessor, you will be reprocessing events rather than missing events. There may be another underlying issue.

当您调用 checkpoint 时发生的事情是,它保留了 EventData 的序列号(偏移量等)流,并说我已经处理了".;

What happens when you call checkpoint is that it persists the sequence number (offset, etc) stream for that EventData saying that "I've processed this."

当您收到 StorageException 时,这意味着序列号未成功持久保存,因此较旧事件的序列号将保留在您的Blob存储中.如果遇到 EventHubException ,其中的处理器在重新启动时断开连接,它将尝试声明任何已到期的租约,并从成功的最后一个检查点开始进行处理.

When you get a StorageException, it means that the sequence number was not successfully persisted, so the sequence number for an older event lives on in your blob storage. In the case that you encounter an EventHubException where the processor is disconnected when it restarts, it'll try to claim whatever leases have expired and start processing from the last checkpoint that was successful.

LeaseLostException ,如果另一个事件处理器偷"了您,则将得到此 LeaseLostException 您当前正在处理的分区.当有多个EventProcessor实例正在运行并且客户端尝试平衡正在运行的实例之间的分区数时,就会发生这种情况.

The LeaseLostException you'll get if another event processor "stole" the partition you were currently processing. This happens when there are multiple instances of the EventProcessor running and the client tries to balance the number of partitions between the running instances.

这篇关于Azure Event Hub Java客户端未处理数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆