提取或过滤 Spark DataFrame 的 MapType [英] extract or filter MapType of Spark DataFrame

查看:25
本文介绍了提取或过滤 Spark DataFrame 的 MapType的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含各种列的 DataFrame.一列包含一个 Map[Integer,Integer[]].它看起来像 { 2345 ->[1,34,2];第543话[12,3,2,5];2 ->[3,4]}现在我需要做的是过滤掉一些键.我在 Java 中有一组整数 (javaIntSet),我应该用它来过滤

I have a DataFrame that contains various columns. One column contains a Map[Integer,Integer[]]. It looks like { 2345 -> [1,34,2]; 543 -> [12,3,2,5]; 2 -> [3,4]} Now what I need to do is filter out some keys. I have a Set of Integers (javaIntSet) in Java with which I should filter such that

col(x).keySet.isin(javaIntSet)

即.上面的映射应该只包含键 2 和 543 而不是其他两个,并且应该看起来像 {543 ->[12,3,2,5];2 ->[3,4]} 过滤后.

ie. the above map should only contain the key 2 and 543 but not the other two and should look like {543 -> [12,3,2,5]; 2 -> [3,4]} after filtering.

有关如何使用 Java 列类的文档很少.如何提取 col(x) 以便我可以在 java 中对其进行过滤,然后用过滤后的地图替换单元格数据.或者我忽略了列的任何有用功能.我可以写一个UDF2,Set,Map我可以写一个 UDF1 但我不太确定它如何处理更复杂的参数.

Documentation of how to use the Java Column Class is sparse. How do I extract the col(x) such that I can just filter it in java and then replace the cell data with a filtered map. Or are there any useful functions of columns I am overlooking. Can I write an UDF2<Map<Integer, Integer[]>,Set<Integer>,Map<Integer,Integer[]> I can write an UDF1<String,String> but I am not so sure how it works with more complex parameters.

通常 javaIntSet 只有十几个,通常少于 100 个值.Map 通常也只有少数条目(通常为 0-5).

Generally the javaIntSet is only a dozen and usually less than a 100 values. The Map usually also has only a handful entries (0-5 usually).

我必须在 Java 中执行此操作(不幸的是),但我熟悉 Scala.我将自己翻译成 Java 的 Scala 答案已经非常有帮助了.

I have to do this in Java (unfortunately) but I am familiar with Scala. A Scala answer that I translate myself to Java would already be very helpful.

推荐答案

您不需要 UDF.使用一个可能更干净,但您可以使用 DataFrame.explode 轻松做到这一点:

You don't need a UDF. Might be cleaner with one, but you could just as easily do it with DataFrame.explode:

case class MapTest(id: Int, map: Map[Int,Int])
val mapDf = Seq(
  MapTest(1, Map((1,3),(2,10),(3,2)) ),
  MapTest(2, Map((1,12),(2,333),(3,543)) )
).toDF("id", "map")

mapDf.show
+---+--------------------+
| id|                 map|
+---+--------------------+
|  1|Map(1 -> 3, 2 -> ...|
|  2|Map(1 -> 12, 2 ->...|
+---+--------------------+

然后你可以使用explode:

Then you can use explode:

mapDf.explode($"map"){
  case Row(map: Map[Int,Int] @unchecked) => {
    val newMap = map.filter(m => m._1 != 1)   // <-- do filtering here
    Seq(Tuple1(newMap)) 
  }
}.show
+---+--------------------+--------------------+
| id|                 map|                  _1|
+---+--------------------+--------------------+
|  1|Map(1 -> 3, 2 -> ...|Map(2 -> 10, 3 -> 2)|
|  2|Map(1 -> 12, 2 ->...|Map(2 -> 333, 3 -...|
+---+--------------------+--------------------+

如果你确实想做UDF,它看起来像这样:

If you did want to do the UDF, it would look like this:

val mapFilter = udf[Map[Int,Int],Map[Int,Int]](map => {
  val newMap = map.filter(m => m._1 != 1)   // <-- do filtering here
  newMap
})

mapDf.withColumn("newMap", mapFilter($"map")).show
+---+--------------------+--------------------+
| id|                 map|              newMap|
+---+--------------------+--------------------+
|  1|Map(1 -> 3, 2 -> ...|Map(2 -> 10, 3 -> 2)|
|  2|Map(1 -> 12, 2 ->...|Map(2 -> 333, 3 -...|
+---+--------------------+--------------------+

DataFrame.explode 有点复杂,但最终更灵活.例如,您可以将原始行分成两行——一行包含过滤掉元素的映射,另一行包含反向的映射——过滤的元素.

DataFrame.explode is a little more complicated, but ultimately more flexible. For example, you could divide the original row into two rows -- one containing the map with the elements filtered out, the other a map with the reverse -- the elements that were filtered.

这篇关于提取或过滤 Spark DataFrame 的 MapType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆