在 Scala 数据框中合并地图 [英] Merge Maps in scala dataframe
问题描述
我有一个包含 col1、col2、col3 列的数据框.col1,col2 是字符串.col3 是下面定义的 Map[String,String]
I have a dataframe with columns col1,col2,col3. col1,col2 are strings. col3 is a Map[String,String] defined below
|-- col3: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
我已按 col1,col2 分组并使用 collect_list 聚合以获取地图数组并存储在 col4 中.
I have grouped by col1,col2 and aggregated using collect_list to get an Array of Maps and stored in col4.
df.groupBy($"col1", $"col2").agg(collect_list($"col3").as("col4"))
|-- col4: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
但是我想将 col4 作为单个地图将所有地图组合在一起.目前我有:
However I would like to get col4 as a single map with all the maps combined. Currently I have:
[[a->a1,b->b1],[c->c1]]
预期输出
[a->a1,b->b1,c->c1]
最好使用 udf?
感谢任何帮助.谢谢.
推荐答案
您可以使用 聚合 和 map_concat:
import org.apache.spark.sql.functions.{expr, collect_list}
val df = Seq(
(1, Map("k1" -> "v1", "k2" -> "v3")),
(1, Map("k3" -> "v3")),
(2, Map("k4" -> "v4")),
(2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data")
val mergeExpr = expr("aggregate(data, map(), (acc, i) -> map_concat(acc, i))")
df.groupBy("id").agg(collect_list("data").as("data"))
.select($"id", mergeExpr.as("merged_data"))
.show(false)
// +---+------------------------------+
// |id |merged_data |
// +---+------------------------------+
// |1 |[k1 -> v1, k2 -> v3, k3 -> v3]|
// |2 |[k4 -> v4, k6 -> v6, k5 -> v5]|
// +---+------------------------------+
使用map_concat
,我们通过aggregate
内置函数连接data 列的所有Map
项这允许我们将聚合应用于列表的对.
With map_concat
we concatenate all the Map
items of the data column via the aggregate
build-in function which allows us to apply the aggregation to the pairs of the list.
注意:map_concat 在 Spark 2.4.5 上的当前实现,它允许相同的键共存.这很可能是一个错误,因为它不是官方的预期行为 文档.请注意这一点.
如果你想避免这种情况,你也可以选择 UDF:
If you want to avoid such a case you can also go for a UDF:
import org.apache.spark.sql.functions.{collect_list, udf}
val mergeMapUDF = udf((data: Seq[Map[String, String]]) => data.reduce(_ ++ _))
df.groupBy("id").agg(collect_list("data").as("data"))
.select($"id", mergeMapUDF($"data").as("merged_data"))
.show(false)
这篇关于在 Scala 数据框中合并地图的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!