为什么从UDF访问DataFrame会导致NullPointerException? [英] Why accesing DataFrame from UDF results in NullPointerException?

查看:73
本文介绍了为什么从UDF访问DataFrame会导致NullPointerException?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在执行Spark应用程序时遇到问题.

I have a problem executing a Spark application.

源代码:

// Read table From HDFS
val productInformation = spark.table("temp.temp_table1")
val dict = spark.table("temp.temp_table2")

// Custom UDF
val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) => 
    dict.filter(
        (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7
    ).count
)

val result = productInformation.withColumn("positive_count", countPositiveSimilarity($"title", $"internal_category"))

// Error occurs!
result.show

错误消息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 54.0 failed 4 times, most recent failure: Lost task 0.3 in stage 54.0 (TID 5887, ip-10-211-220-33.ap-northeast-2.compute.internal, executor 150): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at $anonfun$1.apply(<console>:45)
    at $anonfun$1.apply(<console>:43)
    ... 16 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
  ... 48 elided
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  ... 3 more
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:45)
  at $anonfun$1.apply(<console>:43)
  ... 16 more

我检查了productInformationdictColumns中是否具有空值.但是没有空值.

I have checked whether productInformation and dict have null value in Columns. But there are no null values.

有人可以帮助我吗? 我随附了示例代码,以使您了解更多详细信息:

Can anyone help me? I attached example code to let you know more details:

case class Target(wordListOne: Seq[String], WordListTwo: Seq[String])
val targetData = Seq(Target(Seq("Spark", "Wrong", "Something"), Seq("Java", "Grape", "Banana")),
                     Target(Seq("Java", "Scala"), Seq("Scala", "Banana")),
                     Target(Seq(""), Seq("Grape", "Banana")),
                     Target(Seq(""), Seq("")))
val targets = spark.createDataset(targetData)

case class WordSimilarity(first: String, second: String, similarity: Double)
val similarityData = Seq(WordSimilarity("Spark", "Java", 0.8), 
                     WordSimilarity("Scala", "Spark", 0.9), 
                     WordSimilarity("Java", "Scala", 0.9),
                     WordSimilarity("Apple", "Grape", 0.66),
                     WordSimilarity("Scala", "Apple", -0.1),
                     WordSimilarity("Gine", "Spark", 0.1)) 
val dict = spark.createDataset(similarityData)

val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) => 
    dict.filter(
        (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7
    ).count
)

val countDF = targets.withColumn("positive_count", countPositiveSimilarity($"wordListOne", $"wordListTwo"))

这是示例代码,与我的原始代码相似. 示例代码运行良好.我应该在哪一点检入原始代码和数据?

This is an example code and is similar to my original code. Example code operates well. Which point should I check in original code and data?

推荐答案

非常有趣的问题.我必须进行一些搜索,但这是我的.希望这会对您有所帮助.

Very interesting question. I have to do some search, and here is my though. Hope this will help you a little bit.

通过Dataset时.scala#L465"rel =" noreferrer> createDataset ,spark将为该数据集分配LocalRelation逻辑查询计划.

When you create Dataset via createDataset, spark will assign this dataset with LocalRelation logical query plan.

def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
    val enc = encoderFor[T]
    val attributes = enc.schema.toAttributes
    val encoded = data.map(d => enc.toRow(d).copy())
    val plan = new LocalRelation(attributes, encoded)
    Dataset[T](self, plan)
  }

遵循以下链接: LocalRelation is a leaf logical plan that allow functions like collect or take to be executed locally, i.e. without using Spark executors.

而且,很显然,您可以检查您的2个数据集是本地的.

Obviously, You can check out your 2 datasets is local.

并且

And, the show method actually call take internally.

private[sql] def showString(_numRows: Int, truncate: Int = 20): String = {
    val numRows = _numRows.max(0)
    val takeResult = toDF().take(numRows + 1)
    val hasMoreData = takeResult.length > numRows
    val data = takeResult.take(numRows)

因此,基于这些证据,我认为调用countDF.show已执行,其行为与您从 driver dict数据集调用count时类似,调用次数为targets的记录数.而且,dict数据集对于在countDF作品中的演出当然不需要是本地的.

So, with those envidences, I think the call countDF.show is executed, it will behave simliar as when you call count on dict dataset from driver, number of call times is number of records of targets. And, the dict dataset of course doesn't need to be local for the show on countDF work.

您可以尝试保存countDF,它将为您提供与第一种情况相同的例外 org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)

You can try to save countDF, it will give you exception same as first case org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)

这篇关于为什么从UDF访问DataFrame会导致NullPointerException?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆