尝试将数据框行映射到更新的行时发生编码器错误 [英] Encoder error while trying to map dataframe row to updated row

查看:93
本文介绍了尝试将数据框行映射到更新的行时发生编码器错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试在代码中执行如下所述的相同操作时

When I m trying to do the same thing in my code as mentioned below

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

我从这里引用了上面的参考: Scala:如何使用Scala替换Dataframs中的值 但是我收到了编码器错误

I have taken the above reference from here: Scala: How can I replace value in Dataframs using scala But I am getting encoder error as

无法找到数据集中存储的类型的编码器.原始类型 (整数,字符串等)和产品类型(案例类)受以下方面的支持: 导入spark.im plicits._支持序列化其他类型 在将来的版本中添加.

Unable to find encoder for type stored in a Dataset. Primitive types (Int, S tring, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.

注意:我正在使用spark 2.0!

Note: I am using spark 2.0!

推荐答案

这里没有意外.您正在尝试使用Spark 1.x编写的代码,而Spark 2.0不再支持该代码:

There is nothing unexpected here. You're trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:

    1.x DataFrame.map中的
  • ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • 2.x Dataset[Row].map中的
  • ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
  • in 1.x DataFrame.map is ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • in 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

说实话,在1.x中也没有太大意义.与版本无关,您只需使用DataFrame API:

To be honest it didn't make much sense in 1.x either. Independent of version you can simply use DataFrame API:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

如果您确实要使用map,则应使用静态类型的Dataset:

If you really want to use map you should use statically typed Dataset:

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

或至少返回一个具有隐式编码器的对象:

or at least return an object which will have implicit encoder:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

最后,如果出于某些完全疯狂的原因,您确实要映射到Dataset[Row],则必须提供所需的编码器:

Finally if for some completely crazy reason you really want to map over Dataset[Row] you have to provide required encoder:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)

这篇关于尝试将数据框行映射到更新的行时发生编码器错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆