从RDD访问KafkaOffset时发生异常 [英] Exception while accessing KafkaOffset from RDD

查看:218
本文介绍了从RDD访问KafkaOffset时发生异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个来自Kafka的Spark使用者. 我正在尝试为完全一次的语义管理偏移量.

I have a Spark consumer which streams from Kafka. I am trying to manage offsets for exactly-once semantics.

但是,在访问偏移量时会引发以下异常:

However, while accessing the offset it throws the following exception:

"java.lang.ClassCastException:org.apache.spark.rdd.MapPartitionsRDD 无法转换为org.apache.spark.streaming.kafka.HasOffsetRanges"

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges"

执行此操作的部分代码如下:

The part of the code that does this is as below :

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

此处dataStream是使用KafkaUtils API创建的直接流(DStream [String]),例如:

Here dataStream is a direct stream(DStream[String]) created using KafkaUtils API something like :

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

如果有人可以帮助我了解我在这里做错了什么. 转换也是官方文档中提到的在数据流上执行的方法链中的第一种方法

If somebody can help me understand what I am doing wrong here. transform is the first method in the chain of methods performed on datastream as mentioned in the official documentation as well

谢谢.

推荐答案

您的问题是:

.map(._2)

将创建MapPartitionedDStream而不是KafkaUtils.createKafkaStream创建的DirectKafkaInputDStream.

您需要在transform之后输入map:

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))

kafkaStream
  .transform { 
    rdd => 
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
  }
  .map(_._2)
  .foreachRDD(rdd => // stuff)

这篇关于从RDD访问KafkaOffset时发生异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆