引起:java.lang.NullPointerException at org.apache.spark.sql.Dataset [英] Caused by: java.lang.NullPointerException at org.apache.spark.sql.Dataset

查看:30
本文介绍了引起:java.lang.NullPointerException at org.apache.spark.sql.Dataset的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面我提供我的代码.我遍历数据帧 prodRows,对于每个 product_PK,我从 prodRows 中找到了一些匹配的 product_PKs 子列表.

Below I provide my code. I iterate over the DataFrame prodRows and for each product_PK I find some matching sub-list of product_PKs from prodRows.

  numRecProducts = 10
  var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
  prodRows.foreach{ row : Row =>
      val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
      val gender = row.get(row.fieldIndex("gender_PK")).toString
      val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
      var productList: Array[(Long, Int)] = Array()
      if (!selection.rdd.isEmpty()) {
        productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
      }
    listOfProducts = listOfProducts + (product_PK -> productList)
  }

但是当我执行它时,它给了我以下错误.看起来 selection 在某些迭代中是空的.但是,我不明白如何处理此错误:

But when I execute it, it gives me the following error. It looks like selection is empty in some iterations. However, I do not understand how can I handle this error:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
    at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
    at org.test.ComputeNumSim.run(ComputeNumSim.scala:69)
    at org.test.ComputeNumSimRunner$.main(ComputeNumSimRunner.scala:19)
    at org.test.ComputeNumSimRunner.main(ComputeNumSimRunner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
    at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2877)
    at org.apache.spark.sql.Dataset.filter(Dataset.scala:1304)
    at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:74)
    at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:69)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

这是什么意思,我该如何处理?

What does it mean and how can I handle it?

推荐答案

您无法从传递给以下任一者的函数中访问任何 Spark 的驱动程序端"抽象(RDD、DataFrames、Datasets、SparkSession...)Spark 的 DataFrame/RDD 转换.您也无法从这些函数中更新驱动程序端可变对象.

You cannot access any of Spark's "driver-side" abstractions (RDDs, DataFrames, Datasets, SparkSession...) from within a function passed on to one of Spark's DataFrame/RDD transformations. You also cannot update driver-side mutable objects from within these functions.

在您的情况下 - 您正在尝试在传递给 DataFrame.foreach 的函数中使用 prodRowsselection(两者都是 DataFrames).您还尝试从同一函数中更新 listOfProducts(本地驱动程序端变量).

In your case - you're trying to use prodRows and selection (both are DataFrames) within a function passed to DataFrame.foreach. You're also trying to update listOfProducts (a local driver-side variable) from within that same function.

为什么?

  • DataFrame、RDD 和 SparkSession 仅存在于您的驱动程序应用程序中.它们充当访问分布在工作机器集群上的数据的句柄".
  • 传递给 RDD/DataFrame 转换的函数被序列化并发送到该集群,在每台工作机器上的数据分区上执行.当序列化的 DataFrames/RDD 在这些机器上被反序列化时——它们是无用的,它们仍然不能代表集群上的数据,因为它们只是驱动程序应用程序上创建的数据的空心副本,实际上维护了一个连接 到集群机器
  • 出于同样的原因,尝试更新驱动程序端变量将失败:变量(在大多数情况下从空开始)将被序列化,在每个工作器上反序列化,在工作器本地更新,并保持不变那里...原来的司机端变量将保持不变
  • DataFrames, RDDs, and SparkSession only exist on your Driver application. They serve as a "handle" to access data distributed over the cluster of worker machines.
  • Functions passed to RDD/DataFrame transformations get serialized and sent to that cluster, to be executed on the data partitions on each of the worker machines. When the serialized DataFrames/RDDs get deserialized on those machines - they are useless, they can't still represent the data on the cluster as they are just hollow copies of the ones created on the driver application, which actually maintains a connection to the cluster machines
  • For the same reason, attempting to update driver-side variables will fail: the variables (starting out as empty, in most cases) will be serialized, deserialized on each of the workers, get updated locally on the workers, and stay there... the original driver-side variable will remain unchanged

你如何解决这个问题?使用 Spark 时,尤其是使用 DataFrame 时,您应该尽量避免对数据进行迭代",而应使用 DataFrame 的声明性操作.在大多数情况下,当您想为 DataFrame 中的每条记录引用 另一个 DataFrame 的数据时,您需要使用 join 来创建一个新的 DataFrame,其中记录组合了数据来自两个数据帧.

How can you solve this? When working with Spark, especially with DataFrames, you should try to avoid "iteration" over the data, and use DataFrame's declarative operations instead. In most cases, when you want to reference data of another DataFrame for each record in your DataFrame, you'd want to use join to create a new DataFrame with records combining data from the two DataFrames.

在这种特定情况下,如果我设法正确地得出结论,这里有一个大致等效的解决方案,可以完成您要执行的操作.尝试使用它并阅读 DataFrame 文档以找出详细信息:

In this specific case, here's a roughly equivalent solution that does what you're trying to do, if I managed to conclude it correctly. Try to use this and read the DataFrame documentation to figure out the details:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val numRecProducts = 10

val result = prodRows.as("left")
  // self-join by gender:
  .join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
  // limit to 10 results per record:
  .withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
  .filter($"rn" <= numRecProducts).drop($"rn")
  // group and collect_list to create products column:
  .groupBy($"left.product_PK" as "product_PK")
  .agg(collect_list(struct($"right.product_PK", lit(1))) as "products")

这篇关于引起:java.lang.NullPointerException at org.apache.spark.sql.Dataset的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆