基于不同的案例类创建数据集 [英] Creating dataset based on different case classes

查看:75
本文介绍了基于不同的案例类创建数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 RDD,它基本上是在读取 CSV 文件后制作的.我已经定义了一个方法,它基本上根据输入参数将 rdd 的行映射到不同的 case 类.

返回的RDD需要转换为dataframe当我尝试运行相同的程序时,出现以下错误.

定义的方法是

 case class Australiafile1(sectionName:String,progressCentre:String,valueAgainst:String,Status:String)案例类澳大利亚文件2(部分名称:字符串,利润中心:字符串)案例类默认类(错误:字符串)def mapper(line: String, recordLayoutClassToBeUsed: String) = {val fields = line.split(",")var outclass = recordLayoutClassToBeUsed 匹配 {case ("Australiafile1") =>澳大利亚文件1(字段(0),字段(1),字段(2),字段(3))case ("Australiafile2") =>澳大利亚文件2(字段(0),字段(1))}出类拔萃}

该方法的输出用于创建如下数据帧

 val inputlines = spark.sparkContext.textFile(inputFile).cache().mapPartitionsWithIndex { (idx, lines) =>if (idx == 0) lines.drop(numberOfLinesToBeRemoved.toInt) else lines }.cache()val record = inputlines.filter(x => !x.isEmpty).filter(x => x.split(",").length > 0).map(lines => mapper(lines, recordLayoutClassToBeUsed))导入 spark.implicits._val记录DS =记录.toDF()recordDS.createTempView("记录表")val output = spark.sql("select * from recordtable").toDF()output.write.option("delimiter", "|").option("header", "false").mode("overwrite").csv(outputFile)

收到的错误如下

<块引用><块引用>

线程main"中的异常 java.lang.NoClassDefFoundError: 没有找到与具有 Serializable 的产品对应的 Java 类在 scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1300)在 scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)在 scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:60)在 org.apache.spark.sql.Encoders$.product(Encoders.scala:27​​5)在 org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233)在 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)

你能告诉我这有什么问题吗,我怎样才能克服这个问题?

解决方案

尝试:

trait AustraliaFile 扩展 Serializablecase class Australiafile1(sectionName: String,profitCentre: String, valueAgainst: String, Status: String) extends AustraliaFilecase class Australiafile2(sectionName: String,profitCentre: String) extends AustraliaFile

您的类不是 Serializable,但 Spark 只能编写可序列化的对象.此外,将相关类建立在共同祖先的基础上总是一个好主意,这样您就可以将 RDD 声明为 RDD[AustraliaFile] 而不是 RDD[Any]

另外,你的类匹配逻辑可以简化为

def mapper(line: String, recordLayoutClassToBeUsed: String) = {val fields = line.split(",")recordLayoutClassToBeUsed 匹配 {case ("Australiafile1") =>澳大利亚文件1(字段(0),字段(1),字段(2),字段(3))case ("Australiafile2") =>澳大利亚文件2(字段(0),字段(1))}}

Hi I have an RDD which is basically made after reading a CSV file. I have defined a method which basically maps the lines of rdd to different case classes based on input parameter.

The RDD returned need to be converted to dataframe When I try to run the same I get below error.

Method defined is

  case class Australiafile1(sectionName: String, profitCentre: String, valueAgainst: String, Status: String)

  case class Australiafile2(sectionName: String, profitCentre: String)

  case class defaultclass(error: String)

  def mapper(line: String, recordLayoutClassToBeUsed: String) = {

    val fields = line.split(",")
    var outclass = recordLayoutClassToBeUsed match {
      case ("Australiafile1") => Australiafile1(fields(0), fields(1), fields(2), fields(3))
      case ("Australiafile2") => Australiafile2(fields(0), fields(1))
    }
    outclass

  }

The output of the method is used to create a dataframe as below

      val inputlines = spark.sparkContext.textFile(inputFile).cache().mapPartitionsWithIndex { (idx, lines) => if (idx == 0) lines.drop(numberOfLinesToBeRemoved.toInt) else lines }.cache()
      val records = inputlines.filter(x => !x.isEmpty).filter(x => x.split(",").length > 0).map(lines => mapper(lines, recordLayoutClassToBeUsed))

      import spark.implicits._

      val recordsDS = records.toDF()
      recordsDS.createTempView("recordtable")
      val output = spark.sql("select * from recordtable").toDF()
      output.write.option("delimiter", "|").option("header", "false").mode("overwrite").csv(outputFile)

The error received is as below

Exception in thread "main" java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable found at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1300) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:60) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)

Could you please advise what is wrong in this , how can i overcome this?

解决方案

Try:

trait AustraliaFile extends Serializable

case class Australiafile1(sectionName: String, profitCentre: String, valueAgainst: String, Status: String) extends AustraliaFile

case class Australiafile2(sectionName: String, profitCentre: String) extends AustraliaFile

Your classes are not Serializable, yet Spark can only write serializable objects. Also it's always a good idea to base related classes off a common ancestor, so that you could declare your RDD as RDD[AustraliaFile] instead of RDD[Any]

Also, your class matching logic can be simplified as

def mapper(line: String, recordLayoutClassToBeUsed: String) = {
  val fields = line.split(",")
  recordLayoutClassToBeUsed match {
     case ("Australiafile1") => Australiafile1(fields(0), fields(1), fields(2), fields(3))
    case ("Australiafile2") => Australiafile2(fields(0), fields(1))
  }
}

这篇关于基于不同的案例类创建数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆