除非更改列名,否则不应用DataFrame用户定义的函数 [英] DataFrame user-defined function not applied unless I change column name

查看:108
本文介绍了除非更改列名,否则不应用DataFrame用户定义的函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用隐式函数定义转换DataFrame列.

I want to convert my DataFrame column using implicits functions definition.

我定义了DataFrame类型,其中包含其他功能:

I have my DataFrame type defined, which contains additional functions:

class MyDF(df: DataFrame) {
    def bytes2String(colName: String): DataFrame = df
       .withColumn(colname + "_tmp", udf((x: Array[Byte]) => bytes2String(x)).apply(col(colname)))
       .drop(colname)
       .withColumnRenamed(colname + "_tmp", colname)
}

然后我定义我的隐式转换类:

Then I define my implicit conversion class:

object NpDataFrameImplicits {
    implicit def toNpDataFrame(df: DataFrame): NpDataFrame = new NpDataFrame(df)
}

所以最后,这是我在一个小型FunSuite单元测试中要做的事情:

So finally, here is what I do in a small FunSuite unit test:

test("example: call to bytes2String") {
    val df: DataFrame = ...
    df.select("header.ID").show() // (1)
    df.bytes2String("header.ID").withColumnRenamed("header.ID", "id").select("id").show() // (2)
    df.bytes2String("header.ID").select("header.ID").show() // (3)
}

显示#1

+-------------------------------------------------+
|ID                                               |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+

显示#2

+------------------------------------+
|id                                  |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+

显示#3

+-------------------------------------------------+
|ID                                               |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+

正如您在这里可以看到的那样,第三个show(也就是未重命名列)无法按预期工作,并向我们显示了一个未转换的ID列.有人知道为什么吗?

As you can witness here, the third show (aka without the column renaming) does not work as expected and shows us a non-converted ID column. Anyone knows why?

df.select(col("header.ID") as "ID").bytes2String("ID").show()的输出:

+------------------------------------+
|ID                                  |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+

推荐答案

让我解释一下,下面的示例在您的转换函数上发生了什么. 首先创建数据框:

Let me explain, what is happening on your conversion function with bellow example. First Create data frame:

val jsonString: String =
    """{
      | "employee": {
      |   "id": 12345,
      |   "name": "krishnan"
      | },
      | "_id": 1
      |}""".stripMargin

  val jsonRDD: RDD[String] = sc.parallelize(Seq(jsonString, jsonString))

  val df: DataFrame = sparkSession.read.json(jsonRDD)
  df.printSchema()

输出结构:

root
 |-- _id: long (nullable = true)
 |-- employee: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)

类似于您的转换功能:

def myConversion(myDf: DataFrame, colName: String): DataFrame = {
    myDf.withColumn(colName + "_tmp", udf((x: Long) => (x+1).toString).apply(col(colName)))
      .drop(colName)
      .withColumnRenamed(colName + "_tmp", colName)
  }

场景1# 进行root级别字段的转换.

Scenario 1# Do the conversion for root level field.

myConversion(df, "_id").show()
myConversion(df, "_id").select("_id").show()

结果:

+----------------+---+
|        employee|_id|
+----------------+---+
|[12345,krishnan]|  2|
|[12345,krishnan]|  2|
+----------------+---+
+---+
|_id|
+---+
|  2|
|  2|
+---+

方案2#进行employee.id的转换.在这里,当我们使用employee.id手段时,数据帧在root级别添加了新字段id.这是正确的行为.

Scenario 2# do the conversion for employee.id. Here, when we use employee.id means, data frame got added with new field id at root level. This is the correct behavior.

myConversion(df, "employee.id").show()
myConversion(df, "employee.id").select("employee.id").show()

结果:

+---+----------------+-----------+
|_id|        employee|employee.id|
+---+----------------+-----------+
|  1|[12345,krishnan]|      12346|
|  1|[12345,krishnan]|      12346|
+---+----------------+-----------+
+-----+
|   id|
+-----+
|12345|
|12345|
+-----+

方案3#:选择内部字段为根级别,然后执行转换.

Scenario 3# Select the inner field to root level and then perform conversion.

myConversion(df.select("employee.id"), "id").show()

结果:

+-----+
|   id|
+-----+
|12346|
|12346|
+-----+

我新的转换函数,接受结构类型字段并执行转换并将其存储到结构类型字段本身中.在这里,传递employee字段并单独转换id字段,但是在root级别上对字段employee进行了更改.

My new conversion function, takes struct type field and perform conversion and store it into struct type field itself. Here, pass employee field and convert the id field alone, but changes are done field employee at root level.

case class Employee(id: String, name: String)

def myNewConversion(myDf: DataFrame, colName: String): DataFrame = {
    myDf.withColumn(colName + "_tmp", udf((row: Row) => Employee((row.getLong(0)+1).toString, row.getString(1))).apply(col(colName)))
      .drop(colName)
      .withColumnRenamed(colName + "_tmp", colName)
  }

使用我的转换功能

您的方案编号3#.

Your scenario number 3# using my conversion function.

myNewConversion(df, "employee").show()
myNewConversion(df, "employee").select("employee.id").show()

结果#

+---+----------------+
|_id|        employee|    
+---+----------------+
|  1|[12346,krishnan]|
|  1|[12346,krishnan]|
+---+----------------+
+-----+
|   id|
+-----+
|12346|
|12346|
+-----+

这篇关于除非更改列名,否则不应用DataFrame用户定义的函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆