提取或过滤Spark DataFrame的MapType [英] extract or filter MapType of Spark DataFrame
问题描述
我有一个包含各种列的DataFrame.
一列包含一个Map [Integer,Integer []].
看起来像{ 2345 -> [1,34,2]; 543 -> [12,3,2,5]; 2 -> [3,4]}
现在,我需要做的是过滤掉一些键.
我在Java中有一组整数(javaIntSet),应该使用它进行过滤,
I have a DataFrame that contains various columns.
One column contains a Map[Integer,Integer[]].
It looks like { 2345 -> [1,34,2]; 543 -> [12,3,2,5]; 2 -> [3,4]}
Now what I need to do is filter out some keys.
I have a Set of Integers (javaIntSet) in Java with which I should filter such that
col(x).keySet.isin(javaIntSet)
即.上面的地图应仅包含键2和543,而不能包含其他两个键,并且在过滤后应类似于{543 -> [12,3,2,5]; 2 -> [3,4]}
.
ie. the above map should only contain the key 2 and 543 but not the other two and should look like {543 -> [12,3,2,5]; 2 -> [3,4]}
after filtering.
有关如何使用Java列类的文档很少.
我如何提取col(x)以便可以在java中对其进行过滤,然后用已过滤的映射替换单元格数据.还是我忽略的列有任何有用的功能.
我可以写一个UDF2<Map<Integer, Integer[]>,Set<Integer>,Map<Integer,Integer[]>
吗?
我可以写一个UDF1<String,String>
,但是我不确定它如何与更复杂的参数一起工作.
Documentation of how to use the Java Column Class is sparse.
How do I extract the col(x) such that I can just filter it in java and then replace the cell data with a filtered map. Or are there any useful functions of columns I am overlooking.
Can I write an UDF2<Map<Integer, Integer[]>,Set<Integer>,Map<Integer,Integer[]>
I can write an UDF1<String,String>
but I am not so sure how it works with more complex parameters.
通常,javaIntSet只有十几个值,通常少于100个值.地图通常也只有少数几个条目(通常为0-5).
Generally the javaIntSet is only a dozen and usually less than a 100 values. The Map usually also has only a handful entries (0-5 usually).
(不幸的是)我必须用Java来做,但是我对Scala很熟悉.我将自己翻译成Java的Scala答案已经非常有用了.
I have to do this in Java (unfortunately) but I am familiar with Scala. A Scala answer that I translate myself to Java would already be very helpful.
推荐答案
您不需要UDF.可能会更干净一点,但是您可以使用DataFrame.explode
You don't need a UDF. Might be cleaner with one, but you could just as easily do it with DataFrame.explode
:
case class MapTest(id: Int, map: Map[Int,Int])
val mapDf = Seq(
MapTest(1, Map((1,3),(2,10),(3,2)) ),
MapTest(2, Map((1,12),(2,333),(3,543)) )
).toDF("id", "map")
mapDf.show
+---+--------------------+
| id| map|
+---+--------------------+
| 1|Map(1 -> 3, 2 -> ...|
| 2|Map(1 -> 12, 2 ->...|
+---+--------------------+
然后您可以使用explode:
Then you can use explode:
mapDf.explode($"map"){
case Row(map: Map[Int,Int] @unchecked) => {
val newMap = map.filter(m => m._1 != 1) // <-- do filtering here
Seq(Tuple1(newMap))
}
}.show
+---+--------------------+--------------------+
| id| map| _1|
+---+--------------------+--------------------+
| 1|Map(1 -> 3, 2 -> ...|Map(2 -> 10, 3 -> 2)|
| 2|Map(1 -> 12, 2 ->...|Map(2 -> 333, 3 -...|
+---+--------------------+--------------------+
如果您确实想执行UDF
,它将看起来像这样:
If you did want to do the UDF
, it would look like this:
val mapFilter = udf[Map[Int,Int],Map[Int,Int]](map => {
val newMap = map.filter(m => m._1 != 1) // <-- do filtering here
newMap
})
mapDf.withColumn("newMap", mapFilter($"map")).show
+---+--------------------+--------------------+
| id| map| newMap|
+---+--------------------+--------------------+
| 1|Map(1 -> 3, 2 -> ...|Map(2 -> 10, 3 -> 2)|
| 2|Map(1 -> 12, 2 ->...|Map(2 -> 333, 3 -...|
+---+--------------------+--------------------+
DataFrame.explode
稍微复杂一些,但最终更加灵活.例如,您可以将原始行分为两行-一行包含已过滤出元素的地图,另一行包含已被过滤元素的地图-
DataFrame.explode
is a little more complicated, but ultimately more flexible. For example, you could divide the original row into two rows -- one containing the map with the elements filtered out, the other a map with the reverse -- the elements that were filtered.
这篇关于提取或过滤Spark DataFrame的MapType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!