从动态生成的案例类加载数据集 [英] Load Dataset from Dynamically generated Case Class

查看:89
本文介绍了从动态生成的案例类加载数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

需要什么:

源数据库中表的数量正在快速变化,因此我不想编辑案例类,因此我通过SCALA代码动态生成它们并放入包中.但是现在无法动态读取它.如果可以,那么我将解析"com.example.datasources.fileSystemSource.schema.{}".作为对象架构成员循环

number of tables in source database are changing rapidly and thus I don't want to edit case classes so I dynamically generate them through SCALA code and put in package. But now not able to read it dynamically. If this works than I would parse "com.example.datasources.fileSystemSource.schema.{}" as object schema members in loop

已经完成的事情:

我有一些案例类是根据数据库表的架构动态生成的,如下所示:

I have some case classes dynamically generated from schema of database tables as below:

object schema{
case class Users(name: String,
                 favorite_color: String,
                 favorite_numbers: Array[Int])

case class UserData(registration_dttm: Timestamp,
                    id: Int,
                    first_name: String,
                    last_name: String,
                    email: String,
                    gender: String,
                    ip_address: String,
                    cc: String,
                    country: String,
                    birthdate: String,
                    salary: Double,
                    title: String,
                    comments: String)
}

然后我将它们用作动态类型,以读取Loader.scala中的Load [T]函数,如下所示:

Then i have used them as dynamic type to read in Load[T] function in my Loader.scala as below:

import org.apache.spark.sql.{Dataset, Encoder, SparkSession}

class Load[T <: Product: Encoder](val tableName: String,
                                       val inputPath: String,
                                       val spark: SparkSession,
                                       val saveMode: String,
                                       val outputPath: String,
                                       val metadata: Boolean)
    extends Loader[T] {

  val fileSystemSourceInstance: FileSystem[T] =
    new FileSystem[T](inputPath, spark, saveMode, tableName)

  override def Load: Dataset[T] =
    fileSystemSourceInstance.provideData(metadata, outputPath).as[T]

}

现在,通过使用reflect.api,我可以为案例类获取TypeTag.

Now, by using reflect.api I am able to get TypeTag for my case classes.

def stringToTypeTag[A](name: String): TypeTag[A] = {
    val c = Class.forName(name)
    val mirror = runtimeMirror(c.getClassLoader)
    val sym = mirror.staticClass(name)
    val tpe = sym.selfType
    TypeTag(mirror, new api.TypeCreator {
      def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =

        if (m eq mirror) tpe.asInstanceOf[U # Type]
        else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
    })
  }

所以,如果我现在打印我的案例类类型标签,我会得到:

So if i print now my case class type tag I got:

val typetagDynamic = stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users")
println(typetags)
TypeTag[com.example.datasources.fileSystemSource.schema.Users]

问题:

需要阅读这些TypeTag或动态生成的案例类,以对我的数据集进行如下编码:

Need to read these TypeTag or Dynamically generated case classes, to encode my datasets as below:

new Load[typetagDynamic](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic]).Load 

这给了我错误:无法解析符号typetagDynamic

如果这样使用:

new Load[typetagDynamic.type](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata)(Encoders.product[typetagDynamic.type]).Load 

这给了我一个错误:类型参数[T]不符合方法产品的类型参数范围[T< ;:产品]

推荐答案

如果仅在运行时知道 schema.Users 类型,请尝试替换

If you know a type schema.Users only at runtime try to replace

new Load[schema.Users](tableName,inputPath,spark,
  saveMode,
  outputPath + tableName,
  metadata).Load

使用

import scala.reflect.runtime
import scala.reflect.runtime.universe._

val currentMirror = runtime.currentMirror

val loadType = typeOf[Load[_]]
val classSymbol = loadType.typeSymbol.asClass
val classMirror = currentMirror.reflectClass(classSymbol)
val constructorSymbol = loadType.decl(termNames.CONSTRUCTOR).asMethod
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
  
import scala.tools.reflect.ToolBox
val toolbox = ToolBox(currentMirror).mkToolBox()
val encoderType = appliedType(
  typeOf[Encoder[_]].typeConstructor.typeSymbol,
  currentMirror.staticClass("com.example.datasources.fileSystemSource.schema.Users").toType
)
val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))

constructorMirror(tableName,inputPath,spark,
  saveMode,
  outputPath + tableName,
  metadata, encoderInstance).asInstanceOf[Load[_]].Load


scala.tools.reflect.ToolBoxError:隐式搜索失败

scala.tools.reflect.ToolBoxError: implicit search has failed

您需要:

  1. 在其伴随对象中为 Users 定义类型为 org.apache.spark.sql.Encoder 的类实例的实例(以便该实例位于隐式范围)

  1. to define an instance of type class org.apache.spark.sql.Encoder for Users in its companion object (so that the instance will be in implicit scope)

object Users {
  implicit val usersEnc: Encoder[Users] = spark.implicits.newProductEncoder[Users]
}

  1. 通过 import spark.implicits ._ 导入案例类的 Encoder 实例,但您需要将其导入的不是当前本地范围,而是导入到工具箱-生成的本地范围,因此在这种情况下,您应该替换

  1. to import instances of Encoder for case classes via import spark.implicits._ but you need to import them not into current local scope but into toolbox-generated local scope, so in this case you should replace

val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))

使用

val className = "com.example.datasources.fileSystemSource.schema.Users"
val classType = currentMirror.staticClass(className).toType
val encoderInstance = toolbox.eval(
  q"""import path.to.spark.implicits._
      import org.apache.spark.sql.Encoder
      implicitly[Encoder[$classType]]""")

查看完整代码: https://gist.github.com/DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00

https://groups.google.com/g/scala-internals/c/ta-vbUT6JE8

这篇关于从动态生成的案例类加载数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆