通过Tuple2的键将作为地图的Tuple2的值部分合并为单个地图分组 [英] Combine value part of Tuple2 which is a map, into single map grouping by the key of Tuple2

查看:50
本文介绍了通过Tuple2的键将作为地图的Tuple2的值部分合并为单个地图分组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Scala和Spark中这样做.

I am doing this in Scala and Spark.

我有一个 Tuple2 Dataset 作为 Dataset [(String,Map [String,String])] .

下面是数据集中值的示例.

(A, {1->100, 2->200, 3->100})
(B, {1->400, 4->300, 5->900})
(C, {6->100, 4->200, 5->100})
(B, {1->500, 9->300, 11->900})
(C, {7->100, 8->200, 5->800})

如果您注意到,可以重复元组的键或第一个元素.同样,同一元组的键的对应映射可以在映射中具有重复的键(Tuple2的第二部分).

If you notice, the key or first element of the Tuple can be repeated. Also, the corresponding map of the same Tuples' key can have duplicate keys in the map (second part of Tuple2).

我想创建一个最终的 Dataset [(String,Map [String,String])] .并且输出应如下所示(来自上面的示例).此外,将保留地图的最后一个键的值(选中B和C),并删除针对B和C的先前相同键.

I want to create a final Dataset[(String, Map[String, String])]. And the output should be as below (from the above example). Also, the last keys' value of the map is retained (check B and C) and previous same key against B and C is removed.

(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})

请让我知道是否需要澄清.

Please let me know if any clarification is required.

推荐答案

通过使用rdd,

val rdd = sc.parallelize(
    Seq(("A", Map(1->100, 2->200, 3->100)),
        ("B", Map(1->400, 4->300, 5->900)),
        ("C", Map(6->100, 4->200, 5->100)),
        ("B", Map(1->500, 9->300, 11->900)),
        ("C", Map(7->100, 8->200, 5->800)))
)

rdd.reduceByKey((a, b) => a ++ b).collect()

// Array((A,Map(1 -> 100, 2 -> 200, 3 -> 100)), (B,Map(5 -> 900, 1 -> 500, 9 -> 300, 11 -> 900, 4 -> 300)), (C,Map(5 -> 800, 6 -> 100, 7 -> 100, 8 -> 200, 4 -> 200)))

并使用数据框

val df = spark.createDataFrame(
    Seq(("A", Map(1->100, 2->200, 3->100)),
        ("B", Map(1->400, 4->300, 5->900)),
        ("C", Map(6->100, 4->200, 5->100)),
        ("B", Map(1->500, 9->300, 11->900)),
        ("C", Map(7->100, 8->200, 5->800)))
).toDF("key", "map")

spark.conf.set("spark.sql.mapKeyDedupPolicy","LAST_WIN")

df.withColumn("map", map_entries($"map"))
  .groupBy("key").agg(collect_list($"map").alias("map"))
  .withColumn("map", flatten($"map"))
  .withColumn("map", map_from_entries($"map")).show(false)

+---+---------------------------------------------------+
|key|map                                                |
+---+---------------------------------------------------+
|B  |[1 -> 500, 4 -> 300, 5 -> 900, 9 -> 300, 11 -> 900]|
|C  |[6 -> 100, 4 -> 200, 5 -> 800, 7 -> 100, 8 -> 200] |
|A  |[1 -> 100, 2 -> 200, 3 -> 100]                     |
+---+---------------------------------------------------+

这篇关于通过Tuple2的键将作为地图的Tuple2的值部分合并为单个地图分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆