通用T作为Spark Dataset [T]构造函数 [英] Generic T as Spark Dataset[T] constructor

查看:152
本文介绍了通用T作为Spark Dataset [T]构造函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在以下代码段中,tryParquet函数尝试从Parquet文件中加载数据集(如果存在).如果没有,它将计算,保留并返回提供的数据集计划:

In the following snippet, the tryParquet function tries to load a Dataset from a Parquet file if it exists. If not, it computes, persists and returns back the Dataset plan which was provided:

import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset

sealed trait CustomRow

case class MyRow(
  id: Int,
  name: String
) extends CustomRow

val ds: Dataset[MyRow] =
  Seq((1, "foo"),
      (2, "bar"),
      (3, "baz")).toDF("id", "name").as[MyRow]

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
    Try(session.read.parquet(path)) match {
      case Success(df) => df.as[T] // <---- compile error here
      case Failure(_)  => {
        target.write.parquet(path)
        target
      }
    }

val readyDS: Dataset[MyRow] =
    tryParquet(spark, "/path/to/file.parq", ds)

但是这会在df.as[T]上产生编译错误:

However this produces a compile error on df.as[T]:

无法找到数据集中存储的类型的编码器.通过导入spark.implicits ._

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._

在将来的版本中将添加对其他类型的序列化的支持.

Support for serializing other types will be added in future releases.

成功案例(df)=> df.as [T]

case Success(df) => df.as[T]

可以通过将tryParquet强制转换为df来返回无类型的DataFrame并将调用方强制转换为所需的构造函数来避免此问题.但是,如果我们希望通过函数在内部对类型进行管理,是否有任何解决方案?

One can circumvent this problem by making tryParquet cast df to return an untyped DataFrame and let caller cast to the desired constructor. However is there any solution in the case we want the type to be managed internally by the function?

推荐答案

似乎可以通过在类型参数中使用Encoder来实现:

Looks like it's possible by using an Encoder in the type parameter:

import org.apache.spark.sql.Encoder

def tryParquet[T <: CustomRow: Encoder](...)

通过这种方式,编译器可以证明df.as[T]在构造对象时正在提供编码器.

This way the compiler can prove that df.as[T] is providing an Encoder when constructing the objects.

这篇关于通用T作为Spark Dataset [T]构造函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆