Spark CSV-找不到适用于实际参数的适用构造函数/方法 [英] Spark CSV - No applicable constructor/method found for actual parameters

查看:90
本文介绍了Spark CSV-找不到适用于实际参数的适用构造函数/方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在java spark应用程序中的类型数据集的过滤器和映射上使用lambda函数时,我遇到了一个问题.

我遇到此运行时错误

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"

我正在使用下面的类和spark 2.2.0. https://gitlab.com/opencell/test-bigdata

Dataset<CDR> cdr = spark
            .read()
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("delimiter", ";")
            .csv("CDR_SAMPLE.csv")
            .as(Encoders.bean(CDR.class));

    long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();

    System.out.println("validated entries :" + v);

CDR文件定义为 gitlab链接

编辑

val cdrCSVSchema = StructType(Array(
  StructField("timestamp", DataTypes.TimestampType),
  StructField("quantity", DataTypes.DoubleType),
  StructField("access", DataTypes.StringType),
  StructField("param1", DataTypes.StringType),
  StructField("param2", DataTypes.StringType),
  StructField("param3", DataTypes.StringType),
  StructField("param4", DataTypes.StringType),
  StructField("param5", DataTypes.StringType),
  StructField("param6", DataTypes.StringType),
  StructField("param7", DataTypes.StringType),
  StructField("param8", DataTypes.StringType),
  StructField("param9", DataTypes.StringType),
  StructField("dateParam1", DataTypes.TimestampType),
  StructField("dateParam2", DataTypes.TimestampType),
  StructField("dateParam3", DataTypes.TimestampType),
  StructField("dateParam4", DataTypes.TimestampType),
  StructField("dateParam5", DataTypes.TimestampType),
  StructField("decimalParam1", DataTypes.DoubleType),
  StructField("decimalParam2", DataTypes.DoubleType),
  StructField("decimalParam3", DataTypes.DoubleType),
  StructField("decimalParam4", DataTypes.DoubleType),
  StructField("decimalParam5", DataTypes.DoubleType),
  StructField("extraParam", DataTypes.StringType)))

并且我使用了此命令来加载CSV文档

val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")

,然后尝试使用此命令编码并运行lambda函数,但仍然出现错误

cdr.as[CDR].filter(c => c.timestamp != null).show

解决方案

TL; DR 明确定义架构,因为输入数据集没有用于推断类型的值(对于java.sql.Date字段)

对于您而言,使用无类型的数据集API可能是一个解决方案(也许是一种解决方法,老实说,我建议您避免使用内部行格式进行不必要的反序列化):

cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count

(这是Scala,我要把它作为家庭练习翻译成Java).

问题在于,您使用inferSchema选项,而大多数输入字段中的大多数字段都不可用,从而使大多数字段的类型为String(当没有值可用来推断更具体的类型时,这是默认类型)./p>

这使得类型为java.sql.Date的字段(即,范围为dateParam1dateParam5的字段)为字符串类型.

import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
  option("inferSchema", "true").
  option("delimiter", ";").
  option("header", true).
  csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
 |-- timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- access: string (nullable = true)
 |-- param1: string (nullable = true)
 |-- param2: string (nullable = true)
 |-- param3: string (nullable = true)
 |-- param4: string (nullable = true)
 |-- param5: string (nullable = true)
 |-- param6: string (nullable = true)
 |-- param7: string (nullable = true)
 |-- param8: string (nullable = true)
 |-- param9: string (nullable = true)
 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)
 |-- decimalParam1: string (nullable = true)
 |-- decimalParam2: string (nullable = true)
 |-- decimalParam3: string (nullable = true)
 |-- decimalParam4: string (nullable = true)
 |-- decimalParam5: string (nullable = true)
 |-- extraParam: string (nullable = true)

请注意,感兴趣的字段(即dateParam1dateParam5)都是字符串.

 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)

通过使用CDR类中定义的编码器来假装"字段类型时,问题浮出水面:

private Date dateParam1;
private Date dateParam2;
private Date dateParam3; 
private Date dateParam4; 
private Date dateParam5; 

这是问题的根本原因. Spark可以从类中推断出什么有所不同.没有转换,代码就可以工作,但是由于您坚持要...

cdrs.as[CDR]. // <-- HERE is the issue = types don't match
  filter(cdr => cdr.timestamp != null).
  show // <-- trigger conversion

filter运算符中访问哪个字段并不重要.问题是转换发生会导致执行不正确(以及整个Java代码生成).

我怀疑Spark可以做很多事情,因为您请求inferSchema使用的数据集没有用于类型推断的值.最好的选择是显式定义架构,并使用schema(...)运算符进行设置.

I have an issue in using lambda functions on filters and maps of typed datasets in java spark applications.

I am getting this runtime error

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"

I am using the below class and spark 2.2.0. Full example with sample data is available in https://gitlab.com/opencell/test-bigdata

Dataset<CDR> cdr = spark
            .read()
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("delimiter", ";")
            .csv("CDR_SAMPLE.csv")
            .as(Encoders.bean(CDR.class));

    long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();

    System.out.println("validated entries :" + v);

CDR file definition is gitlab link

EDIT

val cdrCSVSchema = StructType(Array(
  StructField("timestamp", DataTypes.TimestampType),
  StructField("quantity", DataTypes.DoubleType),
  StructField("access", DataTypes.StringType),
  StructField("param1", DataTypes.StringType),
  StructField("param2", DataTypes.StringType),
  StructField("param3", DataTypes.StringType),
  StructField("param4", DataTypes.StringType),
  StructField("param5", DataTypes.StringType),
  StructField("param6", DataTypes.StringType),
  StructField("param7", DataTypes.StringType),
  StructField("param8", DataTypes.StringType),
  StructField("param9", DataTypes.StringType),
  StructField("dateParam1", DataTypes.TimestampType),
  StructField("dateParam2", DataTypes.TimestampType),
  StructField("dateParam3", DataTypes.TimestampType),
  StructField("dateParam4", DataTypes.TimestampType),
  StructField("dateParam5", DataTypes.TimestampType),
  StructField("decimalParam1", DataTypes.DoubleType),
  StructField("decimalParam2", DataTypes.DoubleType),
  StructField("decimalParam3", DataTypes.DoubleType),
  StructField("decimalParam4", DataTypes.DoubleType),
  StructField("decimalParam5", DataTypes.DoubleType),
  StructField("extraParam", DataTypes.StringType)))

and I used this command to load the CSV document

val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")

and then tried this command to encode and run lambda function, but I am still getting error

cdr.as[CDR].filter(c => c.timestamp != null).show

解决方案

TL;DR Define the schema explicitly since the input dataset does not have values to infer types from (for java.sql.Date fields).

For your case, using untyped Dataset API could be a solution (perhaps a workaround and honestly I'd recommend it to avoid unnecessary deserialization from internal row format):

cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count

(It's Scala and I'm leaving translating it to Java as a home exercise).

The issue is that you use inferSchema option with most fields unavailable in the input CDR_SAMPLE.csv file that makes most fields of type String (which is the default type when no values are available to infer more specific type).

That makes the fields of type java.sql.Date, i.e. dateParam1 up to dateParam5, of type String.

import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
  option("inferSchema", "true").
  option("delimiter", ";").
  option("header", true).
  csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
 |-- timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- access: string (nullable = true)
 |-- param1: string (nullable = true)
 |-- param2: string (nullable = true)
 |-- param3: string (nullable = true)
 |-- param4: string (nullable = true)
 |-- param5: string (nullable = true)
 |-- param6: string (nullable = true)
 |-- param7: string (nullable = true)
 |-- param8: string (nullable = true)
 |-- param9: string (nullable = true)
 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)
 |-- decimalParam1: string (nullable = true)
 |-- decimalParam2: string (nullable = true)
 |-- decimalParam3: string (nullable = true)
 |-- decimalParam4: string (nullable = true)
 |-- decimalParam5: string (nullable = true)
 |-- extraParam: string (nullable = true)

Note that the fields of interest, i.e. dateParam1 to dateParam5, are all strings.

 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)

The issue surfaces when you "pretend" the type of the fields is different by using the encoder as defined in CDR class which says:

private Date dateParam1;
private Date dateParam2;
private Date dateParam3; 
private Date dateParam4; 
private Date dateParam5; 

That's the root cause of the issue. There is a difference between what Spark could infer from the class. Without the conversion the code would've worked, but since you insisted...

cdrs.as[CDR]. // <-- HERE is the issue = types don't match
  filter(cdr => cdr.timestamp != null).
  show // <-- trigger conversion

It does not really matter what field you access in filter operator. The issue is that the conversion takes place that leads to incorrect execution (and whole-stage Java code generation).

I doubt Spark can do much about it since you requested inferSchema with a dataset with no values to use for the type inference. The best bet is to define the schema explicitly and use schema(...) operator to set it.

这篇关于Spark CSV-找不到适用于实际参数的适用构造函数/方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆