以嵌套结构作为输入参数的 Spark UDF [英] Spark UDF with nested structure as input parameter

查看:69
本文介绍了以嵌套结构作为输入参数的 Spark UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用以下数据对 df 进行操作:

I'm trying to operate on a df with the following data:

+---+----------------------------------------------------+
|ka |readingsWFreq                                       |
+---+----------------------------------------------------+
|列  |[[[列,つ],220], [[列,れっ],353], [[列,れつ],47074]]   |
|制  |[[[制,せい],235579]]                                |

以及以下结构:

root
 |-- ka: string (nullable = true)
 |-- readingsWFreq: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- furigana: struct (nullable = true)
 |    |    |    |-- _1: string (nullable = true)
 |    |    |    |-- _2: string (nullable = true)
 |    |    |-- Occ: long (nullable = true)

我的目标是将 readingsWFreq 的值分成三个不同的列.为此,我尝试使用 udfs 如下:

My goal is to split readingsWFreq's values into three different columns. For that purpose I've tried to use udfs as follows:

val uExtractK = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._1._1))
val uExtractR = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._1._2))
val uExtractN = udf((kWFreq:Seq[((String, String), Long)]) => kWFreq.map(_._2)

val df2 = df.withColumn("K", uExtractK('readingsWFreq))
            .withColumn("R", uExtractR('readingsWFreq))
            .withColumn("N", uExtractN('readingsWFreq))
            .drop('readingsWFreq)

但是,我收到与 udfs 的输入参数相关的异常:

However, I'm getting an exception related to the input parameter of the udfs:

[error] (run-main-0) org.apache.spark.sql.AnalysisException: cannot resolve
'UDF(readingsWFreq)' due to data type mismatch: argument 1 requires
 array<struct<_1:struct<_1:string,_2:string>,_2:bigint>> type, however, 
'`readingsWFreq`' is of
 array<struct<furigana:struct<_1:string,_2:string>,Occ:bigint>> type.;;

我的问题是,我怎样才能操作数据框以得到以下结果?

+---+----------------------------------------------------+
|ka |K            |R               |N                    |
+---+----------------------------------------------------+
|列  |[列, 列, 列] | [つ, れっ, れつ] | [220, 353, 47074]   |
|制  |[制]        | [せい]          | [235579]            |

推荐答案

你可以先explode外面的array,然后得到每个值,然后再group 稍后并使用 collect_list 收集为列表.

You could explode the outer array at first and get each value and again group later and collect as a list with collect_list.

val df1 = df.withColumn("readingsWFreq", explode($"readingsWFreq"))

df1.select("ka", "readingsWFreq.furigana.*", "readingsWFreq.Occ")
    .groupBy("ka").agg(collect_list("_1").as("K"),
                  collect_list("_2").as("R"),
                  collect_list("Occ").as("N")
     )

希望这会有所帮助!

这篇关于以嵌套结构作为输入参数的 Spark UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆