Spark Streaming 异常:java.util.NoSuchElementException:None.get [英] Spark Streaming Exception: java.util.NoSuchElementException: None.get

查看:30
本文介绍了Spark Streaming 异常:java.util.NoSuchElementException:None.get的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在通过将 SparkStreaming 数据转换为数据帧将其写入 HDFS:

I am writing SparkStreaming data to HDFS by converting it to a dataframe:

代码

object KafkaSparkHdfs {

  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
  sparkConf.set("spark.driver.allowMultipleContexts", "true");
  val sc = new SparkContext(sparkConf)

  def main(args: Array[String]): Unit = {
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val ssc = new StreamingContext(sparkConf, Seconds(20))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "stream3",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("fridaydata")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
    )

    val lines = stream.map(consumerRecord => consumerRecord.value)
    val words = lines.flatMap(_.split(" "))
    val wordMap = words.map(word => (word, 1))
    val wordCount = wordMap.reduceByKey(_ + _)

    wordCount.foreachRDD(rdd => {
      val dataframe = rdd.toDF(); 
      dataframe.write
        .mode(SaveMode.Append)
        .save("hdfs://localhost:9000/newfile24")     
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

文件夹已创建,但文件未写入.

The folder is created but the file is not written.

程序因以下错误而终止:

The program is getting terminated with the following error:

    18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.lang.Thread.run(Thread.java:748)
    18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

在我的 pom 中,我使用了各自的依赖项:

In my pom I am using respective dependencies:

  • spark-core_2.11
  • spark-sql_2.11
  • spark-streaming_2.11
  • spark-streaming-kafka-0-10_2.11

推荐答案

该错误是由于尝试同时运行多个 Spark 上下文.将 allowMultipleContexts 设置为 true 主要用于测试目的,不鼓励使用.因此,解决您的问题的方法是确保在任何地方都使用相同的 SparkContext.从代码中我们可以看出,SparkContext (sc) 用于创建一个SQLContext 很好.但是,在创建 StreamingContext 时不使用它,而是使用 SparkConf.

The error is due to trying to run multiple spark contexts at the same time. Setting allowMultipleContexts to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext is used everywhere. From the code we can see that the SparkContext (sc) is used to create a SQLContext which is fine. However, when creating the StreamingContext it is not used, instead the SparkConf is used.

通过查看 文档我们看到:

通过提供新 SparkContext 所需的配置来创建一个 StreamingContext

Create a StreamingContext by providing the configuration necessary for a new SparkContext

换句话说,通过使用 SparkConf 作为参数,将创建一个新的 SparkContext.现在有两个独立的上下文.

In other words, by using SparkConf as parameter a new SparkContext will be created. Now there are two separate contexts.

这里最简单的解决方案是继续使用与以前相同的上下文.将创建 StreamingContext 的行更改为:

The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext to:

val ssc = new StreamingContext(sc, Seconds(20))


注意:在较新版本的 Spark (2.0+) 中,请改用 SparkSession.然后可以使用 StreamingContext(spark.sparkContext, ...) 创建新的流上下文.它可以如下所示:


Note: In newer versions of Spark (2.0+) use SparkSession instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...). It can look as follows:

val spark = SparkSession().builder
  .setMaster("local[*]")
  .setAppName("SparkKafka")
  .getOrCreate()

import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))

这篇关于Spark Streaming 异常:java.util.NoSuchElementException:None.get的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆