使用spark-csv和DataFrames获取NullPointerException [英] Getting NullPointerException using spark-csv with DataFrames

查看:115
本文介绍了使用spark-csv和DataFrames获取NullPointerException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

spark-csv自述文件中运行,其中有示例Java像这样的代码导入org.apache.spark.sql.SQLContext; 导入org.apache.spark.sql.types.*;

Running through the spark-csv README there's sample Java code like this import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.*;

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(
    new StructField("year", IntegerType, true), 
    new StructField("make", StringType, true),
    new StructField("model", StringType, true),
    new StructField("comment", StringType, true),
    new StructField("blank", StringType, true));

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

它并没有开箱即用地编译,因此经过一番争吵,我将其修改为将不正确的FooType语法更改为DataTypes.FooType并将StructFields作为new StructField[]传递了;编译器在StructField的构造函数中请求了metadata的第四个参数,但是我很难找到有关其含义的文档(javadocs描述了它的用例,但实际上没有决定如何在StructField构造过程中传递什么).现在,使用以下代码,它可以运行到任何副作用方法,例如collect():

It didn't compile out of the box, so with some wrangling I got it to compile with changing the incorrect FooType syntax to DataTypes.FooType and passing the StructFields as a new StructField[]; the compiler requested a fourth argument for metadata in the constructor of StructField but I had trouble finding documentation on what it means (javadocs describe its use cases, but not really how to decide what to pass in during StructField construction). With the following code, it now runs until any side-effect method like collect():

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

// Read features.
System.out.println("Reading features from " + args[0]);
StructType featuresSchema = new StructType(new StructField[] {
    new StructField("case_id", DataTypes.StringType, false, null), 
    new StructField("foo", DataTypes.DoubleType, false, null)
});
DataFrame features = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(featuresSchema)
    .load(args[0]);
for (Row r : features.collect()) {
  System.out.println("Row: " + r);
}

我收到以下异常:

Exception in thread "main" java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202)
  at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
  at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65)
  at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:56)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:59)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:127)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:121)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
  at scala.collection.SetLike$class.map(SetLike.scala:93)
  at scala.collection.AbstractSet.map(Set.scala:47)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114)
  at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
  at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
...

你知道怎么了吗?

推荐答案

自述文件似乎已经过时,需要对Java示例进行一些重要的编辑.我跟踪了实际的添加了元数据字段的JIRA ,它指向了用法Scala实例的默认Map.empty值,并且编写文档的人必须将Scala直接转换为Java,尽管输入参数缺少相同的默认值.

It seems the README is very outdated, and needs some significant editing for the Java example. I tracked down the actual JIRA which added the metadata field and it points at the usage of a default Map.empty value for Scala cases, and whoever wrote the documentation must have just translated the Scala directly to Java despite the lack of the same default value for the input parameter.

In the 1.5 branch of SparkSQL's code we can see that it references metadata.hashCode() without checking, which is what's causing the NullPointerException. The existence of the Metadata.empty() method combined with the discussions about using empty maps as default in Scala seem to imply the correct implementation is to go ahead and pass Metadata.empty() if you don't care about it. The revised example should be:

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
    new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), 
    new StructField("make", DataTypes.StringType, true, Metadata.empty()),
    new StructField("model", DataTypes.StringType, true, Metadata.empty()),
    new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
    new StructField("blank", DataTypes.StringType, true, Metadata.empty())
});

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(customSchema)
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

这篇关于使用spark-csv和DataFrames获取NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆