卡夫卡未提交的消息不再被消费 [英] Kafka uncommitted message not getting consumed again

查看:26
本文介绍了卡夫卡未提交的消息不再被消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理 kafka 消息并使用 Spark 流插入 kudu 表,手动偏移提交这里是我的代码.

I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
 )
val stream = KafkaUtils.createDirectStream[String, String](
                        ssc,
                        PreferConsistent,
                        Subscribe[String, String](topicsSet, kafkaParams)
                       )
stream.foreachRDD { rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>{
val msgOffset = OffsetRange(msg.topic(), msg.partition(),  msg.offset(), msg.offset()+1)
        println(msg)
        msgOffset 
      }
    )
   val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
   stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)
}

让我们列出这个例子在将数据插入 kudu 时,我收到错误,我需要再次处理这些消息,如果我停止工作并重新启动它,我能够获得未提交的消息,我们不能在其中获取所有未提交的消息吗?流媒体?

Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?

推荐答案

你有消息了,为什么不放一个失败的重试逻辑.Kafka 会在您重新连接时为您提供相同的消息,以防您的使用者崩溃,不确定在连接仍处于打开状态时 Kafka 是否会提供相同的消息.

You have the message, why don't to put a retry logic in case of failure. Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.

如果失败是由于目标数据存储不可用,您可以在代码中设置一些重试逻辑,或者如果由于错误的消息格式插入失败,您可以将这些消息保存到临时缓存、数据存储或其他 kafka 主题中以重试稍后或检查这些消息有什么问题.

You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.

这篇关于卡夫卡未提交的消息不再被消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆