为什么在访问偏移量时 Spark Streaming 会因 ClassCastException 和重新分区的 dstream 而失败? [英] Why does Spark Streaming fail with ClassCastException with repartitioned dstream when accessing offsets?

查看:30
本文介绍了为什么在访问偏移量时 Spark Streaming 会因 ClassCastException 和重新分区的 dstream 而失败?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的 Spark 应用程序中,我通过以下方式从 Kafka 主题创建了一个 DStream:

 KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder, (String, Array[Byte])](流媒体上下文,卡夫卡参数,偏移量.get,{ 消息:MessageAndMetadata[String, Array[Byte]] =>(message.key(), message.message()) })

之后,我使用 asInstanceOf 函数向 Kafka 主题提交偏移量:

directKafkaStream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]//offsetRanges.length = # 被消耗的 Kafka 分区数... }

在这种情况下一切正常,但如果我重新分配 dstream,当我尝试提交偏移量时,我有以下异常:

<块引用>

java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD 无法转换为 org.apache.spark.streaming.kafka.HasOffsetRanges 重新分区

有人可以帮我吗?

解决方案

你为什么要重新分区_?!_ 我会说鉴于分区数量是不允许的(在 KafkaRDD 中)正是所谓的偏移范围(即您从中读取记录的主题分区).然后,您会破坏"Spark 计算出的最适合并行性和分布的东西.

 override def getPartitions: Array[Partition] = {offsetRanges.zipWithIndex.map { case (o, i) =>新 KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)}.toArray}

此外, KafkaRDDHasOffsetRanges:

private[spark] 类 KafkaRDD[K, V](...) 使用 HasOffsetRanges 的 Logging 扩展 RDD[ConsumerRecord[K, V]](sc, Nil)

以及中的官方文档获取偏移量是这样说的:

<块引用>

请注意,对 HasOffsetRanges 的类型转换只有在对 createDirectStream 的结果调用的第一个方法中完成时才会成功,而不是在随后的方法链中完成.请注意,在任何混洗或重新分区的方法之后,RDD 分区和 Kafka 分区之间的一对一映射不会保留,例如reduceByKey() 或 window().

使用 RDD.repartition 您只需创建一个CoalescedRDD(在所谓的RDD谱系中):><预><代码>...新合并RDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),新的 HashPartitioner(numPartitions)),数量分区,partitionCoalescer).values} 别的 {新的 CoalescedRDD(this, numPartitions, partitionCoalescer)}

由于 RDD 没有混入 HasOffsetRanges,你会得到 ClassCastException.

如果您想增加并行度(并在 Spark 的 KafkaRDD 中拥有更多偏移范围),请增加主题中的分区数量,Spark Streaming 会为您很好地处理.

引用 Spark Streaming + Kafka 集成指南(Kafka代理版本 0.10.0 或更高版本)(突出显示我的):

<块引用>

Kafka 0.10 的 Spark Streaming 集成在设计上类似于 0.8 Direct Stream 方法.它提供了简单的并行性、Kafka 分区和 Spark 分区之间的 1:1 对应,以及对偏移量和元数据的访问.

In my Spark application I create a DStream from a Kafka topic in the following way:

 KafkaUtils
  .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder, (String, Array[Byte])](
    streamingContext,
    kafkaParams,
    offset.get,
    { message: MessageAndMetadata[String, Array[Byte]] => (message.key(), message.message()) }
  )

and later, i commit offset to Kafka topic using asInstanceOf function:

directKafkaStream.foreachRDD { rdd => 
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
 // offsetRanges.length = # of Kafka partitions being consumed
 ... }

In this case everything is ok, but if i repartion the dstream, when i try to commit offsets I have the following exception:

java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges repartition

Can someone help me?

解决方案

Why do you repartition at all_?!_ I'd say it is not allowed given the number of partitions (in KafkaRDD) is exactly the number of so-called offset ranges (i.e. topic partitions you read records from). You'd then "damage" what Spark has calculated to be the best for parallelism and distribution.

  override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
    }.toArray
  }

Moreover, only KafkaRDD is HasOffsetRanges:

private[spark] class KafkaRDD[K, V](
  ...
  ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges

And the official documentation in Obtaining Offsets says so:

Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createDirectStream, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

With RDD.repartition you simply create a CoalescedRDD (in so-called RDD lineage):

...
  new CoalescedRDD(
    new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
    new HashPartitioner(numPartitions)),
    numPartitions,
    partitionCoalescer).values
} else {
  new CoalescedRDD(this, numPartitions, partitionCoalescer)
}

As the RDD does not have HasOffsetRanges mixed-in, you get the ClassCastException.

If you want to increase the parallelism (and have more offset ranges in Spark's KafkaRDD), increase the number of partitions in the topic(s) and Spark Streaming will handle that nicely for you.

Quoting Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) (highlighting mine):

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata.

这篇关于为什么在访问偏移量时 Spark Streaming 会因 ClassCastException 和重新分区的 dstream 而失败?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆