具有高级消费者的 Apache Kafka:跳过损坏的消息 [英] Apache Kafka with High Level Consumer: Skip corrupted messages
问题描述
我遇到了高级 kafka 消费者 (0.8.2.0) 的问题 - 在消耗了一些数据后,我们的一个消费者停止了.重新启动后,它会消耗一些消息并再次停止,没有错误/异常或警告.
I'm facing an issue with high level kafka consumer (0.8.2.0) - after consuming some amount of data one of our consumers stops. After restart it consumes some messages and stops again with no error/exception or warning.
经过一番调查,我发现消费者的问题是这个异常:
After some investigation I found that the problem with consumer was this exception:
ERROR c.u.u.e.impl.kafka.KafkaConsumer - Error consuming message stream:
kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3801080313, computed crc = 2728178222)
任何想法如何简单地跳过此类消息?
Any ideas how can I simple skip such messages at all?
推荐答案
所以,回答我自己的问题.经过对Kafka Consumer的一些调试,我找到了一种可能的解决方案:
So, answering my own question. After some debugging of Kafka Consumer, I found one possible solution:
- 创建
kafka.consumer.ConsumerIterator
的子类 - 覆盖
makeNext
方法.在此方法中捕获InvalidMessageException
并返回一些虚拟占位符. - 在您的
while
循环中,您必须将kafka.consumer.ConsumerIterator
转换为您的实现.不幸的是,kafka.consumer.ConsumerIterator
的所有字段都是私有的,所以你必须使用反射.
- Create a subclass of
kafka.consumer.ConsumerIterator
- Override
makeNext
-method. In this method catchInvalidMessageException
and return some dummy-placeholder. - In your
while
-loop you have to convert thekafka.consumer.ConsumerIterator
to your implementation. Unfortunately all fields ofkafka.consumer.ConsumerIterator
are private, so you have to use reflection.
这是代码示例:
val skipIt = createKafkaSkippingIterator(ks.iterator())
while(skipIt.hasNext()) {
val messageAndTopic = skipIt.next()
if (messageNotCorrupt(messageAndTopic)) {
consumeFn(messageAndTopic)
}
}
messageNotCorrupt
方法只是检查参数是否等于虚拟消息.
The messageNotCorrupt
-method simply checks if the argument is equal to the dummy-message.
这篇关于具有高级消费者的 Apache Kafka:跳过损坏的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!