具有高级消费者的Apache Kafka:跳过损坏的消息 [英] Apache Kafka with High Level Consumer: Skip corrupted messages

查看:568
本文介绍了具有高级消费者的Apache Kafka:跳过损坏的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了一个高级卡夫卡消费者(0.8.2.0)的问题-在消费了一些数据之后,我们的一个消费者停止了.重新启动后,它将消耗一些消息,然后再次停止,而不会出现错误/异常或警告.

I'm facing an issue with high level kafka consumer (0.8.2.0) - after consuming some amount of data one of our consumers stops. After restart it consumes some messages and stops again with no error/exception or warning.

经过一番调查,我发现消费者的问题是此异常:

After some investigation I found that the problem with consumer was this exception:

ERROR c.u.u.e.impl.kafka.KafkaConsumer  - Error consuming message stream:
 kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3801080313, computed crc = 2728178222)

任何想法我怎么能简单地根本跳过这些消息?

Any ideas how can I simple skip such messages at all?

推荐答案

所以,回答我自己的问题.在对Kafka Consumer进行一些调试之后,我找到了一种可能的解决方案:

So, answering my own question. After some debugging of Kafka Consumer, I found one possible solution:

  1. 创建kafka.consumer.ConsumerIterator
  2. 的子类
  3. 覆盖makeNext-方法.在此方法中,捕获InvalidMessageException并返回一些虚拟占位符.
  4. 在您的while循环中,您必须将kafka.consumer.ConsumerIterator转换为您的实现.不幸的是,kafka.consumer.ConsumerIterator的所有字段都是私有的,因此您必须使用反射.
  1. Create a subclass of kafka.consumer.ConsumerIterator
  2. Override makeNext-method. In this method catch InvalidMessageException and return some dummy-placeholder.
  3. In your while-loop you have to convert the kafka.consumer.ConsumerIterator to your implementation. Unfortunately all fields of kafka.consumer.ConsumerIterator are private, so you have to use reflection.

这是代码示例:

val skipIt = createKafkaSkippingIterator(ks.iterator())

while(skipIt.hasNext()) {
  val messageAndTopic = skipIt.next()

  if (messageNotCorrupt(messageAndTopic)) {
    consumeFn(messageAndTopic)
  }
}

messageNotCorrupt方法仅检查参数是否等于虚拟消息.

The messageNotCorrupt-method simply checks if the argument is equal to the dummy-message.

这篇关于具有高级消费者的Apache Kafka:跳过损坏的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆