使用带有数据帧的 spark-csv 获取 NullPointerException [英] Getting NullPointerException using spark-csv with DataFrames

查看:23
本文介绍了使用带有数据帧的 spark-csv 获取 NullPointerException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

运行 spark-csv README 有示例 Java像这样的代码 import org.apache.spark.sql.SQLContext;导入 org.apache.spark.sql.types.*;

Running through the spark-csv README there's sample Java code like this import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.*;

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(
    new StructField("year", IntegerType, true), 
    new StructField("make", StringType, true),
    new StructField("model", StringType, true),
    new StructField("comment", StringType, true),
    new StructField("blank", StringType, true));

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

它不是开箱即用的,所以经过一些争论,我通过将不正确的 FooType 语法更改为 DataTypes.FooType 并传递 StructFields 来编译它作为 new StructField[];编译器在 StructField 的构造函数中请求了 metadata 的第四个参数,但我无法找到有关其含义的文档(javadocs 描述了它的用例,但并没有真正确定如何决定)在 StructField 构建期间传递什么).使用以下代码,它现在会一直运行,直到出现像 collect() 这样的副作用方法:

It didn't compile out of the box, so with some wrangling I got it to compile with changing the incorrect FooType syntax to DataTypes.FooType and passing the StructFields as a new StructField[]; the compiler requested a fourth argument for metadata in the constructor of StructField but I had trouble finding documentation on what it means (javadocs describe its use cases, but not really how to decide what to pass in during StructField construction). With the following code, it now runs until any side-effect method like collect():

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

// Read features.
System.out.println("Reading features from " + args[0]);
StructType featuresSchema = new StructType(new StructField[] {
    new StructField("case_id", DataTypes.StringType, false, null), 
    new StructField("foo", DataTypes.DoubleType, false, null)
});
DataFrame features = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(featuresSchema)
    .load(args[0]);
for (Row r : features.collect()) {
  System.out.println("Row: " + r);
}

我收到以下异常:

Exception in thread "main" java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202)
  at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
  at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65)
  at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:56)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:59)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:127)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:121)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
  at scala.collection.SetLike$class.map(SetLike.scala:93)
  at scala.collection.AbstractSet.map(Set.scala:47)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114)
  at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
  at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
...

知道出了什么问题吗?

推荐答案

似乎 README 已经过时了,需要对 Java 示例进行一些重要的编辑.我追踪了实际的 JIRA,它添加了元数据字段,它指出了用法Scala 案例的默认 Map.empty 值,并且尽管输入参数缺少相同的默认值,但编写文档的人必须将 Scala 直接转换为 Java.

It seems the README is very outdated, and needs some significant editing for the Java example. I tracked down the actual JIRA which added the metadata field and it points at the usage of a default Map.empty value for Scala cases, and whoever wrote the documentation must have just translated the Scala directly to Java despite the lack of the same default value for the input parameter.

SparkSQL 代码的 1.5 分支 我们可以看到它在没有检查的情况下引用了 metadata.hashCode(),这就是导致 空指针异常.Metadata.empty() 方法结合有关在 Scala 中默认使用空映射的讨论似乎暗示正确的实现是继续并传递 Metadata.empty() 如果你不在乎.修改后的例子应该是:

In the 1.5 branch of SparkSQL's code we can see that it references metadata.hashCode() without checking, which is what's causing the NullPointerException. The existence of the Metadata.empty() method combined with the discussions about using empty maps as default in Scala seem to imply the correct implementation is to go ahead and pass Metadata.empty() if you don't care about it. The revised example should be:

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
    new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), 
    new StructField("make", DataTypes.StringType, true, Metadata.empty()),
    new StructField("model", DataTypes.StringType, true, Metadata.empty()),
    new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
    new StructField("blank", DataTypes.StringType, true, Metadata.empty())
});

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(customSchema)
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

这篇关于使用带有数据帧的 spark-csv 获取 NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆