除非更改列名,否则不应用DataFrame用户定义的函数 [英] DataFrame user-defined function not applied unless I change column name
问题描述
我想使用隐式函数定义转换DataFrame列.
I want to convert my DataFrame column using implicits functions definition.
我定义了DataFrame类型,其中包含其他功能:
I have my DataFrame type defined, which contains additional functions:
class MyDF(df: DataFrame) {
def bytes2String(colName: String): DataFrame = df
.withColumn(colname + "_tmp", udf((x: Array[Byte]) => bytes2String(x)).apply(col(colname)))
.drop(colname)
.withColumnRenamed(colname + "_tmp", colname)
}
然后我定义我的隐式转换类:
Then I define my implicit conversion class:
object NpDataFrameImplicits {
implicit def toNpDataFrame(df: DataFrame): NpDataFrame = new NpDataFrame(df)
}
所以最后,这是我在一个小型FunSuite单元测试中要做的事情:
So finally, here is what I do in a small FunSuite unit test:
test("example: call to bytes2String") {
val df: DataFrame = ...
df.select("header.ID").show() // (1)
df.bytes2String("header.ID").withColumnRenamed("header.ID", "id").select("id").show() // (2)
df.bytes2String("header.ID").select("header.ID").show() // (3)
}
显示#1
+-------------------------------------------------+
|ID |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+
显示#2
+------------------------------------+
|id |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+
显示#3
+-------------------------------------------------+
|ID |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+
正如您在这里可以看到的那样,第三个show
(也就是未重命名列)无法按预期工作,并向我们显示了一个未转换的ID列.有人知道为什么吗?
As you can witness here, the third show
(aka without the column renaming) does not work as expected and shows us a non-converted ID column. Anyone knows why?
df.select(col("header.ID") as "ID").bytes2String("ID").show()
的输出:
+------------------------------------+
|ID |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+
推荐答案
让我解释一下,下面的示例在您的转换函数上发生了什么. 首先创建数据框:
Let me explain, what is happening on your conversion function with bellow example. First Create data frame:
val jsonString: String =
"""{
| "employee": {
| "id": 12345,
| "name": "krishnan"
| },
| "_id": 1
|}""".stripMargin
val jsonRDD: RDD[String] = sc.parallelize(Seq(jsonString, jsonString))
val df: DataFrame = sparkSession.read.json(jsonRDD)
df.printSchema()
输出结构:
root
|-- _id: long (nullable = true)
|-- employee: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
类似于您的转换功能:
def myConversion(myDf: DataFrame, colName: String): DataFrame = {
myDf.withColumn(colName + "_tmp", udf((x: Long) => (x+1).toString).apply(col(colName)))
.drop(colName)
.withColumnRenamed(colName + "_tmp", colName)
}
场景1#
进行root
级别字段的转换.
Scenario 1#
Do the conversion for root
level field.
myConversion(df, "_id").show()
myConversion(df, "_id").select("_id").show()
结果:
+----------------+---+
| employee|_id|
+----------------+---+
|[12345,krishnan]| 2|
|[12345,krishnan]| 2|
+----------------+---+
+---+
|_id|
+---+
| 2|
| 2|
+---+
方案2#进行employee.id
的转换.在这里,当我们使用employee.id
手段时,数据帧在root
级别添加了新字段id
.这是正确的行为.
Scenario 2# do the conversion for employee.id
. Here, when we use employee.id
means, data frame got added with new field id
at root
level. This is the correct behavior.
myConversion(df, "employee.id").show()
myConversion(df, "employee.id").select("employee.id").show()
结果:
+---+----------------+-----------+
|_id| employee|employee.id|
+---+----------------+-----------+
| 1|[12345,krishnan]| 12346|
| 1|[12345,krishnan]| 12346|
+---+----------------+-----------+
+-----+
| id|
+-----+
|12345|
|12345|
+-----+
方案3#:选择内部字段为根级别,然后执行转换.
Scenario 3# Select the inner field to root level and then perform conversion.
myConversion(df.select("employee.id"), "id").show()
结果:
+-----+
| id|
+-----+
|12346|
|12346|
+-----+
我新的转换函数,接受结构类型字段并执行转换并将其存储到结构类型字段本身中.在这里,传递employee
字段并单独转换id
字段,但是在root
级别上对字段employee
进行了更改.
My new conversion function, takes struct type field and perform conversion and store it into struct type field itself. Here, pass employee
field and convert the id
field alone, but changes are done field employee
at root
level.
case class Employee(id: String, name: String)
def myNewConversion(myDf: DataFrame, colName: String): DataFrame = {
myDf.withColumn(colName + "_tmp", udf((row: Row) => Employee((row.getLong(0)+1).toString, row.getString(1))).apply(col(colName)))
.drop(colName)
.withColumnRenamed(colName + "_tmp", colName)
}
使用我的转换功能
您的方案编号3#.
Your scenario number 3# using my conversion function.
myNewConversion(df, "employee").show()
myNewConversion(df, "employee").select("employee.id").show()
结果#
+---+----------------+
|_id| employee|
+---+----------------+
| 1|[12346,krishnan]|
| 1|[12346,krishnan]|
+---+----------------+
+-----+
| id|
+-----+
|12346|
|12346|
+-----+
这篇关于除非更改列名,否则不应用DataFrame用户定义的函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!