Spark CSV-找不到适用于实际参数的适用构造函数/方法 [英] Spark CSV - No applicable constructor/method found for actual parameters
问题描述
在java spark应用程序中的类型数据集的过滤器和映射上使用lambda函数时,我遇到了一个问题.
我遇到此运行时错误
ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
我正在使用下面的类和spark 2.2.0. https://gitlab.com/opencell/test-bigdata
Dataset<CDR> cdr = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.csv("CDR_SAMPLE.csv")
.as(Encoders.bean(CDR.class));
long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();
System.out.println("validated entries :" + v);
CDR文件定义为 gitlab链接
编辑
val cdrCSVSchema = StructType(Array(
StructField("timestamp", DataTypes.TimestampType),
StructField("quantity", DataTypes.DoubleType),
StructField("access", DataTypes.StringType),
StructField("param1", DataTypes.StringType),
StructField("param2", DataTypes.StringType),
StructField("param3", DataTypes.StringType),
StructField("param4", DataTypes.StringType),
StructField("param5", DataTypes.StringType),
StructField("param6", DataTypes.StringType),
StructField("param7", DataTypes.StringType),
StructField("param8", DataTypes.StringType),
StructField("param9", DataTypes.StringType),
StructField("dateParam1", DataTypes.TimestampType),
StructField("dateParam2", DataTypes.TimestampType),
StructField("dateParam3", DataTypes.TimestampType),
StructField("dateParam4", DataTypes.TimestampType),
StructField("dateParam5", DataTypes.TimestampType),
StructField("decimalParam1", DataTypes.DoubleType),
StructField("decimalParam2", DataTypes.DoubleType),
StructField("decimalParam3", DataTypes.DoubleType),
StructField("decimalParam4", DataTypes.DoubleType),
StructField("decimalParam5", DataTypes.DoubleType),
StructField("extraParam", DataTypes.StringType)))
并且我使用了此命令来加载CSV文档
val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")
,然后尝试使用此命令编码并运行lambda函数,但仍然出现错误
cdr.as[CDR].filter(c => c.timestamp != null).show
TL; DR 明确定义架构,因为输入数据集没有用于推断类型的值(对于java.sql.Date
字段)
对于您而言,使用无类型的数据集API可能是一个解决方案(也许是一种解决方法,老实说,我建议您避免使用内部行格式进行不必要的反序列化):
cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count
(这是Scala,我要把它作为家庭练习翻译成Java).
问题在于,您使用inferSchema
选项,而大多数输入字段中的大多数字段都不可用,从而使大多数字段的类型为String(当没有值可用来推断更具体的类型时,这是默认类型)./p>
这使得类型为java.sql.Date
的字段(即,范围为dateParam1
到dateParam5
的字段)为字符串类型.
import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
option("inferSchema", "true").
option("delimiter", ";").
option("header", true).
csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
|-- timestamp: timestamp (nullable = true)
|-- quantity: integer (nullable = true)
|-- access: string (nullable = true)
|-- param1: string (nullable = true)
|-- param2: string (nullable = true)
|-- param3: string (nullable = true)
|-- param4: string (nullable = true)
|-- param5: string (nullable = true)
|-- param6: string (nullable = true)
|-- param7: string (nullable = true)
|-- param8: string (nullable = true)
|-- param9: string (nullable = true)
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
|-- decimalParam1: string (nullable = true)
|-- decimalParam2: string (nullable = true)
|-- decimalParam3: string (nullable = true)
|-- decimalParam4: string (nullable = true)
|-- decimalParam5: string (nullable = true)
|-- extraParam: string (nullable = true)
请注意,感兴趣的字段(即dateParam1
至dateParam5
)都是字符串.
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
通过使用CDR
类中定义的编码器来假装"字段类型时,问题浮出水面:
private Date dateParam1;
private Date dateParam2;
private Date dateParam3;
private Date dateParam4;
private Date dateParam5;
这是问题的根本原因. Spark可以从类中推断出什么有所不同.没有转换,代码就可以工作,但是由于您坚持要...
cdrs.as[CDR]. // <-- HERE is the issue = types don't match
filter(cdr => cdr.timestamp != null).
show // <-- trigger conversion
在filter
运算符中访问哪个字段并不重要.问题是转换发生会导致执行不正确(以及整个Java代码生成).
我怀疑Spark可以做很多事情,因为您请求inferSchema
使用的数据集没有用于类型推断的值.最好的选择是显式定义架构,并使用schema(...)
运算符进行设置.
I have an issue in using lambda functions on filters and maps of typed datasets in java spark applications.
I am getting this runtime error
ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
I am using the below class and spark 2.2.0. Full example with sample data is available in https://gitlab.com/opencell/test-bigdata
Dataset<CDR> cdr = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.csv("CDR_SAMPLE.csv")
.as(Encoders.bean(CDR.class));
long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();
System.out.println("validated entries :" + v);
CDR file definition is gitlab link
EDIT
val cdrCSVSchema = StructType(Array(
StructField("timestamp", DataTypes.TimestampType),
StructField("quantity", DataTypes.DoubleType),
StructField("access", DataTypes.StringType),
StructField("param1", DataTypes.StringType),
StructField("param2", DataTypes.StringType),
StructField("param3", DataTypes.StringType),
StructField("param4", DataTypes.StringType),
StructField("param5", DataTypes.StringType),
StructField("param6", DataTypes.StringType),
StructField("param7", DataTypes.StringType),
StructField("param8", DataTypes.StringType),
StructField("param9", DataTypes.StringType),
StructField("dateParam1", DataTypes.TimestampType),
StructField("dateParam2", DataTypes.TimestampType),
StructField("dateParam3", DataTypes.TimestampType),
StructField("dateParam4", DataTypes.TimestampType),
StructField("dateParam5", DataTypes.TimestampType),
StructField("decimalParam1", DataTypes.DoubleType),
StructField("decimalParam2", DataTypes.DoubleType),
StructField("decimalParam3", DataTypes.DoubleType),
StructField("decimalParam4", DataTypes.DoubleType),
StructField("decimalParam5", DataTypes.DoubleType),
StructField("extraParam", DataTypes.StringType)))
and I used this command to load the CSV document
val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")
and then tried this command to encode and run lambda function, but I am still getting error
cdr.as[CDR].filter(c => c.timestamp != null).show
TL;DR Define the schema explicitly since the input dataset does not have values to infer types from (for java.sql.Date
fields).
For your case, using untyped Dataset API could be a solution (perhaps a workaround and honestly I'd recommend it to avoid unnecessary deserialization from internal row format):
cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count
(It's Scala and I'm leaving translating it to Java as a home exercise).
The issue is that you use inferSchema
option with most fields unavailable in the input CDR_SAMPLE.csv
file that makes most fields of type String (which is the default type when no values are available to infer more specific type).
That makes the fields of type java.sql.Date
, i.e. dateParam1
up to dateParam5
, of type String.
import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
option("inferSchema", "true").
option("delimiter", ";").
option("header", true).
csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
|-- timestamp: timestamp (nullable = true)
|-- quantity: integer (nullable = true)
|-- access: string (nullable = true)
|-- param1: string (nullable = true)
|-- param2: string (nullable = true)
|-- param3: string (nullable = true)
|-- param4: string (nullable = true)
|-- param5: string (nullable = true)
|-- param6: string (nullable = true)
|-- param7: string (nullable = true)
|-- param8: string (nullable = true)
|-- param9: string (nullable = true)
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
|-- decimalParam1: string (nullable = true)
|-- decimalParam2: string (nullable = true)
|-- decimalParam3: string (nullable = true)
|-- decimalParam4: string (nullable = true)
|-- decimalParam5: string (nullable = true)
|-- extraParam: string (nullable = true)
Note that the fields of interest, i.e. dateParam1
to dateParam5
, are all strings.
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
The issue surfaces when you "pretend" the type of the fields is different by using the encoder as defined in CDR
class which says:
private Date dateParam1;
private Date dateParam2;
private Date dateParam3;
private Date dateParam4;
private Date dateParam5;
That's the root cause of the issue. There is a difference between what Spark could infer from the class. Without the conversion the code would've worked, but since you insisted...
cdrs.as[CDR]. // <-- HERE is the issue = types don't match
filter(cdr => cdr.timestamp != null).
show // <-- trigger conversion
It does not really matter what field you access in filter
operator. The issue is that the conversion takes place that leads to incorrect execution (and whole-stage Java code generation).
I doubt Spark can do much about it since you requested inferSchema
with a dataset with no values to use for the type inference. The best bet is to define the schema explicitly and use schema(...)
operator to set it.
这篇关于Spark CSV-找不到适用于实际参数的适用构造函数/方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!