Scala中TypedDataset和类型边界的隐式编码器 [英] Implicit Encoder for TypedDataset and Type Bounds in Scala

查看:58
本文介绍了Scala中TypedDataset和类型边界的隐式编码器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是创建一个 MyDataFrame 类,该类将知道如何在给定路径下获取数据,但是我想提供类型安全性.我在将 frameless.TypedDataset 与远程数据的类型限制一起使用时遇到了一些麻烦.例如

My objective is to create a MyDataFrame class that will know how to fetch data at a given path, but I want to provide type-safety. I'm having some trouble using a frameless.TypedDataset with type bounds on remote data. For example

sealed trait Schema
final case class TableA(id: String) extends Schema
final case class TableB(id: String) extends Schema

class MyDataFrame[T <: Schema](path: String, implicit val spark: SparkSession) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
} 

但是我一直在获取找不到类型为frameless.TypedEncoder [org.apache.spark.sql.Row] 的证据参数的隐式值.我知道 TypedDataset.create 需要 Injection 才能起作用.但是我不确定如何为通用 T 编写此代码.我认为编译器将能够推断出,因为 Schema 的所有子类型都是可以工作的 case class es.

But I keep getting could not find implicit value for evidence parameter of type frameless.TypedEncoder[org.apache.spark.sql.Row]. I know that TypedDataset.create needs an Injection for this to work. But I'm not sure how I would write this for a generic T. I thought the compiler would be able to deduce that since all subtypes of Schema are case classes that it would work.

有人碰到这个吗?

推荐答案

所有隐式参数应位于最后一个参数列表中,并且此参数列表应与非隐式参数分开.

All implicit parameters should be in the last parameter list and this parameter list should be separate from non-implicit ones.

如果您尝试编译

class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

您会看到错误

Error:(11, 35) could not find implicit value for evidence parameter of type frameless.TypedEncoder[org.apache.spark.sql.Row]
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]

所以我们只添加相应的隐式参数

So let's just add corresponding implicit parameter

class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

我们会遇到错误

Error:(11, 64) could not find implicit value for parameter as: frameless.ops.As[org.apache.spark.sql.Row,T]
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]

所以我们再添加一个隐式参数

So let's add one more implicit parameter

import frameless.ops.As
import frameless.{TypedDataset, TypedEncoder}
import org.apache.spark.sql.{Row, SparkSession}

class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row], as: As[Row, T]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

或带有实物投影仪

class MyDataFrame[T <: Schema : As[Row, ?]](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}


您可以创建自定义类型类


You can create custom type class

  trait Helper[T] {
    implicit def te: TypedEncoder[Row]
    implicit def as: As[Row, T]
  }

  object Helper {
    implicit def mkHelper[T](implicit te0: TypedEncoder[Row], as0: As[Row, T]): Helper[T] = new Helper[T] {
      override implicit def te: TypedEncoder[Row] = te0
      override implicit def as: As[Row, T] = as0
    }
  }

  class MyDataFrame[T <: Schema : Helper](path: String)(implicit spark: SparkSession) {
    val h = implicitly[Helper[T]]
    import h._
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]
  }

  class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, h: Helper[T]) {
    import h._
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]
  }

  trait Helper[T] {
    def create(dataFrame: DataFrame): TypedDataset[T]
  }

  object Helper {
    implicit def mkHelper[T](implicit te: TypedEncoder[Row], as: As[Row, T]): Helper[T] =
      (dataFrame: DataFrame) => TypedDataset.create(dataFrame).as[T]
  }

  class MyDataFrame[T <: Schema : Helper](path: String)(implicit spark: SparkSession) {
    def read = implicitly[Helper[T]].create(spark.read.parquet(path))
  }

  class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, h: Helper[T]) {
    def read = h.create(spark.read.parquet(path))
  }

这篇关于Scala中TypedDataset和类型边界的隐式编码器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆