Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf [英] Spark - How to apply a udf over single field in a Seq[Map<String,String>]
问题描述
我有一个包含两列类型 String 和 Seq[Map[String, String]] 的数据框.类似的东西:
I have a Dataframe with two columns of types String and Seq[Map[String, String]]. Something like:
Name Contact
Alan [(Map(number -> 12345 , type -> home)), (Map(number -> 87878787 , type -> mobile))]
Ben [(Map(number -> 94837593 , type -> job)),(Map(number -> 346 , type -> home))]
所以我需要的是在每个 Map[String,String] o 数组中的每个元素的 number
字段上应用 udf
.这个 udf
基本上会转换成 0000 任何长度小于 6 的 number
.像这样:
So what I need is to apply a udf
over the field number
in each Map[String,String] o each element in the array. This udf
will basically convert into 0000 any number
which length is less than 6. Something like this:
def valid_num_udf =
udf((numb:String) =>
{
if(numb.length < 6)
"0000"
else
numb
})
预期结果类似于:
NAME CONTACT
Alan [(Map(number -> 0000 , type -> home)), (Map(number -> 87878787 , type -> mobile))]
Ben [(Map(number -> 94837593 , type -> job)),(Map(number -> 0000 , type -> home))]
我想要的是使用另一个 udf 来访问每个 number
字段,然后应用 valid_num_udf()
What I would like is to use another udf to access each number
field to then apply the valid_num_udf()
我正在尝试这样的事情,但我不知道在 Scala 中执行此操作的正确语法是什么.
I was trying something like this, but I don't know what is the correct syntax to do this in Scala.
val newDf = Df.withColumn("VALID_CONTACT", myUdf($"CONTACT"))
//This part is really really wrong, but don't know better
def myUdf = udf[Seq[Map[String, String]], Seq[Map[String, String]]] {
inputSeq => inputSeq.map(_.get("number") => valid_num_udf(_.get("number")))
}
谁能告诉我如何只访问地图中的一个字段,而不影响地图的其他字段?
Can anyone tell me how to access just that one single field in the map, leaving the other fields of the map untouched?
更新:DataFrame 的架构是
Update: The Schema of the DataFrame would be
root
|-- NAME: string (nullable = true)
|-- CONTACT: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
或
org.apache.spark.sql.types.StructType = StructType(StructField(NAME,StringType,true), StructField(CONTACT,ArrayType(MapType(StringType,StringType,true),true),true))
推荐答案
udf 函数需要将列作为参数传递,通过序列化和反序列化将其转换为原始数据类型.所以当列值到达 udf 函数时,它们已经是原始数据类型.所以除非将原始类型转换为列类型,否则不能从 udf 函数调用另一个 udf 函数.
你可以做什么而不是定义和调用另一个 udf 函数 只是定义一个简单的函数并从 udf 函数中调用该函数
import org.apache.spark.sql.functions._
def valid_num_udf(number: String) = number.length < 6 match{
case true => "0000"
case false => number
}
def myUdf = udf((inputSeq: Seq[Map[String, String]]) => {
inputSeq.map(x => Map("number" -> valid_num_udf(x("number")), "type"-> x("type")))
})
然后从 withColumn
api
val newDf = Df.withColumn("VALID_CONTACT", myUdf($"Contact"))
这篇关于Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!