从RDD访问KafkaOffset时发生异常 [英] Exception while accessing KafkaOffset from RDD
问题描述
我有一个来自Kafka的Spark使用者. 我正在尝试为完全一次的语义管理偏移量.
I have a Spark consumer which streams from Kafka. I am trying to manage offsets for exactly-once semantics.
但是,在访问偏移量时会引发以下异常:
However, while accessing the offset it throws the following exception:
"java.lang.ClassCastException:org.apache.spark.rdd.MapPartitionsRDD 无法转换为org.apache.spark.streaming.kafka.HasOffsetRanges"
"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges"
执行此操作的部分代码如下:
The part of the code that does this is as below :
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })
此处dataStream是使用KafkaUtils API创建的直接流(DStream [String]),例如:
Here dataStream is a direct stream(DStream[String]) created using KafkaUtils API something like :
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)
如果有人可以帮助我了解我在这里做错了什么. 转换也是官方文档中提到的在数据流上执行的方法链中的第一种方法
If somebody can help me understand what I am doing wrong here. transform is the first method in the chain of methods performed on datastream as mentioned in the official documentation as well
谢谢.
推荐答案
您的问题是:
.map(._2)
将创建MapPartitionedDStream
而不是KafkaUtils.createKafkaStream
创建的DirectKafkaInputDStream
.
您需要在transform
之后输入map
:
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))
kafkaStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.map(_._2)
.foreachRDD(rdd => // stuff)
这篇关于从RDD访问KafkaOffset时发生异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!