卡夫卡未提交的消息不再被消耗 [英] Kafka uncommitted message not getting consumed again

查看:235
本文介绍了卡夫卡未提交的消息不再被消耗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理kafka消息并使用带有手动偏移提交的火花流将其插入kudu表中,这是我的代码.

I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
 )
val stream = KafkaUtils.createDirectStream[String, String](
                        ssc,
                        PreferConsistent,
                        Subscribe[String, String](topicsSet, kafkaParams)
                       )
stream.foreachRDD { rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>{
val msgOffset = OffsetRange(msg.topic(), msg.partition(),  msg.offset(), msg.offset()+1)
        println(msg)
        msgOffset 
      }
    )
   val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
   stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)
}

让我们在此示例中列出表格在将数据插入kudu时,出现错误,我需要再次处理这些消息,如果我停止工作并再次启动它,我将能够收到未提交的消息,而我们无法收到所有未提交的消息.流?

Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?

推荐答案

您已收到消息,为什么在失败的情况下不放置重试逻辑. 重新连接时,如果用户崩溃,Kafka会给您同样的消息.不确定连接仍处于打开状态时,Kafka是否会给您同样的消息.

You have the message, why don't to put a retry logic in case of failure. Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.

如果失败是由于目标数据存储区不可用而导致的,则您的代码中可以包含一些重试逻辑,或者如果插入失败的错误消息格式而导致失败,则可以将这些消息保存到临时缓存,数据存储区或其他kafka主题中以进行重试以后再检查这些消息有什么问题.

You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.

这篇关于卡夫卡未提交的消息不再被消耗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆