SparkSQL CSV的引用不明确 [英] Reference is ambiguous with SparkSQL CSV

查看:83
本文介绍了SparkSQL CSV的引用不明确的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用自定义架构读取SparkSQL 2.10中的一堆CSV文件,该架构部分是Double,部分是这样的字符串:

I'm trying to read a bunch of CSV files in SparkSQL 2.10 with a custom schema that is partly Double, partly Strings like this:

// Build the schema
val schemaStringS = "col1 col2"
val schemaStringD = "col3 col4 col5 col6"
val schemaStringS2 = "col7 col8"
val fieldsString = schemaStringS.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val fieldsString2 = schemaStringS2.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val fieldsDouble = schemaStringS.split(" ")
  .map(fieldName => StructField(fieldName, DoubleType, nullable = true))
val schema = StructType(fieldsString ++ fieldsDouble ++ fieldsString2)

// Read DataFrame
val input = sqlContext.read.schema(schema)
  .option("header", true)
  .csv("/files/*.csv")
  .toJavaRDD

这导致

Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'col6' is ambiguous, could be: col6#0, col6#5.;
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:158)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:130)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:129)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.types.StructType.foreach(StructType.scala:96)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at org.apache.spark.sql.types.StructType.map(StructType.scala:96)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:129)
    at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:83)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
    at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2547)
    at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.toJavaRDD(Dataset.scala:2557)
    at com.otterinasuit.spark.sensorlog.main.Main$.main(Main.scala:39)
    at com.otterinasuit.spark.sensorlog.main.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

我尝试将文件与cat合并(仅对PoC可行),并避免使用CSV库(认为这可能是新Spark版本中的错误),但无济于事.

I tried merging the files with cat (only feasible for a PoC) and avoiding the CSV library (thinking this might be a bug in the new Spark version) but to no avail.

val input = sc.textFile("/csv/*.csv")
.map(line => line.split(",")).filter(row => !row.contains("col1")).map(x => Row(x))
val input2 = sqlContext.createDataFrame(input, schema)

我在使用常规DataFrame和联接时遇到了此问题,并且iirc可以通过指定列名,删除特定列或使用其他联接来解决.但是,在这种情况下,我没有该选择.

I have encountered this problem with regular DataFrames and joins and iirc this can be solved by specifying column names, dropping specific columns or using different joins. However, in this case, I don't have that option.

所有文件中的所有标头都相同,已由head -1 *.csv证明.我不明白为什么会这样.

All headers in all files are identical, as proven by head -1 *.csv. I don't understand why this would occur.

推荐答案

fieldsStringfieldsDouble都引用schemaStringS.

val fieldsString = schemaStringS.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))

//Changing from schemaStringS to schemaStringD
val fieldsDouble = schemaStringD.split(" ")
  .map(fieldName => StructField(fieldName, DoubleType, nullable = true))

所以,当您合并时

val schema = StructType(fieldsString ++ fieldsDouble ++ fieldsString2))

抛出'col6' is ambiguous错误,

这篇关于SparkSQL CSV的引用不明确的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆