在 Scala 数据框中合并地图 [英] Merge Maps in scala dataframe

查看:47
本文介绍了在 Scala 数据框中合并地图的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含 col1、col2、col3 列的数据框.col1,col2 是字符串.col3 是下面定义的 Map[String,String]

I have a dataframe with columns col1,col2,col3. col1,col2 are strings. col3 is a Map[String,String] defined below

 |-- col3: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

我已按 col1,col2 分组并使用 collect_list 聚合以获取地图数组并存储在 col4 中.

I have grouped by col1,col2 and aggregated using collect_list to get an Array of Maps and stored in col4.

 df.groupBy($"col1", $"col2").agg(collect_list($"col3").as("col4"))

 |-- col4: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

但是我想将 col4 作为单个地图将所有地图组合在一起.目前我有:

However I would like to get col4 as a single map with all the maps combined. Currently I have:

[[a->a1,b->b1],[c->c1]]

预期输出

[a->a1,b->b1,c->c1]

最好使用 udf?

感谢任何帮助.谢谢.

推荐答案

您可以使用 聚合map_concat:

import org.apache.spark.sql.functions.{expr, collect_list}

val df = Seq(
  (1, Map("k1" -> "v1", "k2" -> "v3")),
  (1, Map("k3" -> "v3")),
  (2, Map("k4" -> "v4")),
  (2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data")

val mergeExpr = expr("aggregate(data, map(), (acc, i) -> map_concat(acc, i))")

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeExpr.as("merged_data"))
  .show(false)

// +---+------------------------------+
// |id |merged_data                   |
// +---+------------------------------+
// |1  |[k1 -> v1, k2 -> v3, k3 -> v3]|
// |2  |[k4 -> v4, k6 -> v6, k5 -> v5]|
// +---+------------------------------+

使用map_concat,我们通过aggregate 内置函数连接data 列的所有Map 项这允许我们将聚合应用于列表的对.

With map_concat we concatenate all the Map items of the data column via the aggregate build-in function which allows us to apply the aggregation to the pairs of the list.

注意:map_concat 在 Spark 2.4.5 上的当前实现,它允许相同的键共存.这很可能是一个错误,因为它不是官方的预期行为 文档.请注意这一点.

如果你想避免这种情况,你也可以选择 UDF:

If you want to avoid such a case you can also go for a UDF:

import org.apache.spark.sql.functions.{collect_list, udf}

val mergeMapUDF = udf((data: Seq[Map[String, String]]) => data.reduce(_ ++ _))

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeMapUDF($"data").as("merged_data"))
  .show(false)

这篇关于在 Scala 数据框中合并地图的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆