KafkaStreams EXACTLY_ONCE保证-跳过kafka偏移量 [英] KafkaStreams EXACTLY_ONCE guarantee - skipping kafka offsets

查看:385
本文介绍了KafkaStreams EXACTLY_ONCE保证-跳过kafka偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark 2.2.0和kafka 0.10 spark-streaming库来读取充满Kafka-Streams scala应用程序的主题. Kafka Broker版本为0.11,Kafka-streams版本为0.11.0.2.

I'm using Spark 2.2.0 and kafka 0.10 spark-streaming library to read from topic filled with Kafka-Streams scala application. Kafka Broker version is 0.11 and Kafka-streams version is 0.11.0.2.

当我在Kafka-Stream应用程序中设置EXACTLY_ONCE保证时:

When i set EXACTLY_ONCE guarantee in Kafka-Stream app:

 p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)

我在Spark中收到此错误:

i get this error in Spark:

java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-<group.id> <topic> 0 even after seeking to offset 24
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.to(KafkaRDD.scala:189)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.toBuffer(KafkaRDD.scala:189)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.toArray(KafkaRDD.scala:189)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

如果未设置EXACTLY_ONCE属性,则可以正常工作.

If EXACTLY_ONCE property is not set, it works just fine.

充满kafka-streams应用程序(恰好启用后)的主题的结束偏移量错误.当我运行 kafka.tools.GetOffsetShell 时,它给出结束偏移18,但是在主题中只有12条消息(禁用了保留).如果恰好禁用了一次保证,则这些偏移量是匹配的.我尝试根据,但问题仍然存在.

EDIT 1: Topic filled with kafka-streams app(exactly once enabled) has wrong ending offset. When i run kafka.tools.GetOffsetShell, it gives ending offset 18, but in topic there are just 12 messages (retention is disabled). When exactly once guarantee is disabled, these offsets are matching. I tried to reset kafka-streams according to this, but problem still remains.

当我使用-print-offsets 选项运行 SimpleConsumerShell 时,输出如下:

EDIT 2: When i run SimpleConsumerShell with --print-offsets option, output is folowing:

next offset = 1
{"timestamp": 149583551238149, "data": {...}}
next offset = 2
{"timestamp": 149583551238149, "data": {...}}
next offset = 4
{"timestamp": 149583551238149, "data": {...}}
next offset = 5
{"timestamp": 149583551238149, "data": {...}}
next offset = 7
{"timestamp": 149583551238149, "data": {...}}
next offset = 8
{"timestamp": 149583551238149, "data": {...}}
...

启用一次准确的交货保证后,显然会跳过某些抵销.

Some offsets are apparently skipped when exactly-once dellivery guarantee is enabled.

有什么想法吗?是什么原因造成的?谢谢!

Any thoughts? What can cause this? Thanks!

推荐答案

我发现在Kafka中,偏移间隙是预期的行为(版本> = 0.11),这是由提交/中止事务标记引起的.

I found that offset gaps are expected behavior in Kafka (version >= 0.11), these are caused by commit/abort transaction markers.

有关kafka交易和控制消息的更多信息,请此处:

More info about kafka transactions and control messages here:

这些交易标记未公开给应用程序,但是 使用者在read_committed模式下用于过滤来自 中止交易并且不返回开放消息的一部分 交易(即那些在日志中但没有交易的交易) 与它们相关的交易标记).

These transaction markers are not exposed to applications, but are used by consumers in read_committed mode to filter out messages from aborted transactions and to not return messages which are part of open transactions (i.e., those which are in the log but don’t have a transaction marker associated with them).

这里.

Kafka交易是在Kafka 0.11中引入的,因此我认为spark-streaming-kafka库0.10与该消息格式不兼容,并且尚未实现较新版本的spark-streaming-kafka.

Kafka transactions were introduced in Kafka 0.11, so I assume that spark-streaming-kafka library 0.10 is not compatible with this message format, and newer version of spark-streaming-kafka is not yet implemented.

这篇关于KafkaStreams EXACTLY_ONCE保证-跳过kafka偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆