Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf [英] Spark - How to apply a udf over single field in a Seq[Map&lt;String,String&gt;]

查看:35
本文介绍了Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含两列类型 String 和 Seq[Map[String, String]] 的数据框.类似的东西:

I have a Dataframe with two columns of types String and Seq[Map[String, String]]. Something like:

Name    Contact
Alan    [(Map(number -> 12345   , type -> home)),   (Map(number -> 87878787 , type -> mobile))]
Ben     [(Map(number -> 94837593    , type -> job)),(Map(number -> 346      , type -> home))]

所以我需要的是在每个 Map[String,String] o 数组中的每个元素的 number 字段上应用 udf.这个 udf 基本上会转换成 0000 任何长度小于 6 的 number.像这样:

So what I need is to apply a udf over the field number in each Map[String,String] o each element in the array. This udf will basically convert into 0000 any number which length is less than 6. Something like this:

def valid_num_udf = 
udf((numb:String) =>
{ 
if(numb.length < 6)
   "0000"
else 
    numb 
})

预期结果类似于:

NAME    CONTACT
Alan    [(Map(number -> 0000    , type -> home)),   (Map(number -> 87878787 , type -> mobile))]
Ben     [(Map(number -> 94837593    , type -> job)),(Map(number -> 0000     , type -> home))]

我想要的是使用另一个 udf 来访问每个 number 字段,然后应用 valid_num_udf()

What I would like is to use another udf to access each number field to then apply the valid_num_udf()

我正在尝试这样的事情,但我不知道在 Scala 中执行此操作的正确语法是什么.

I was trying something like this, but I don't know what is the correct syntax to do this in Scala.

val newDf = Df.withColumn("VALID_CONTACT", myUdf($"CONTACT"))

//This part is really really wrong, but don't know better
def myUdf = udf[Seq[Map[String, String]], Seq[Map[String, String]]] { 
    inputSeq => inputSeq.map(_.get("number") => valid_num_udf(_.get("number")))
}

谁能告诉我如何只访问地图中的一个字段,而不影响地图的其他字段?

Can anyone tell me how to access just that one single field in the map, leaving the other fields of the map untouched?

更新:DataFrame 的架构是

Update: The Schema of the DataFrame would be

root
 |-- NAME: string (nullable = true)
 |-- CONTACT: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

org.apache.spark.sql.types.StructType = StructType(StructField(NAME,StringType,true), StructField(CONTACT,ArrayType(MapType(StringType,StringType,true),true),true))

推荐答案

udf 函数需要将列作为参数传递,通过序列化和反序列化将其转换为原始数据类型.所以当列值到达 udf 函数时,它们已经是原始数据类型.所以除非将原始类型转换为列类型,否则不能从 udf 函数调用另一个 udf 函数.

你可以做什么而不是定义和调用另一个 udf 函数 只是定义一个简单的函数并从 udf 函数中调用该函数

import org.apache.spark.sql.functions._
def valid_num_udf(number: String) = number.length < 6 match{
  case true => "0000"
  case false => number
}
def myUdf = udf((inputSeq: Seq[Map[String, String]]) => {
  inputSeq.map(x => Map("number" -> valid_num_udf(x("number")), "type"-> x("type")))
})

然后从 withColumn api

val newDf = Df.withColumn("VALID_CONTACT", myUdf($"Contact"))

这篇关于Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆