结构的Spark(Scala)过滤器数组不爆炸 [英] Spark (Scala) filter array of structs without explode

查看:56
本文介绍了结构的Spark(Scala)过滤器数组不爆炸的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带键的数据框和一个在数据框列中带有结构数组的列.每行包含一列,看起来像这样:

I have a dataframe with a key and a column with an array of structs in a dataframe column. Each row contains a column a looks something like this:

[
    {"id" : 1, "someProperty" : "xxx", "someOtherProperty" : "1", "propertyToFilterOn" : 1},
    {"id" : 2, "someProperty" : "yyy", "someOtherProperty" : "223", "propertyToFilterOn" : 0},
    {"id" : 3, "someProperty" : "zzz", "someOtherProperty" : "345", "propertyToFilterOn" : 1}
]

现在我想做两件事:

  1. "propertyToFilterOn" = 1上的过滤器
  2. 对其他应用一些逻辑 属性-例​​如连接
  1. Filter on "propertyToFilterOn" = 1
  2. Apply some logic on other properties - for example concatenate

结果是:

[
{"id" : 1, "newProperty" : "xxx_1"},
{"id" : 3, "newProperty" : "zzz_345"}
]

我知道如何使用爆炸来实现,但是爆炸在将其放回原处时还需要在键上使用groupBy.但是,由于这是一个流数据帧,因此我还必须在它上面加一个水印,这是我想避免的.

I know how to do it with explode but explode also requires groupBy on the key when putting it back together. But as this is a streaming Dataframe I would also have to put a watermark on it which I am trying to avoid.

是否有其他方法可以在不使用爆炸的情况下实现这一目标?我敢肯定,有一些Scala魔术可以实现这一目标!

Is there any other way to achieve this without using explode? I am sure there is some Scala magic that can achieve this!

谢谢!

推荐答案

随着spark 2.4的出现,数组出现了许多更高阶的函数. (请参见 https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html )

With spark 2.4 came many higher order functions for arrays. (see https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html)

val dataframe = Seq(
("a", 1, "xxx", "1", 1),
("a", 2, "yyy", "223", 0),
("a", 3, "zzz", "345", 1)
).toDF( "grouping_key", "id" , "someProperty" , "someOtherProperty", "propertyToFilterOn" )
.groupBy("grouping_key")
.agg(collect_list(struct("id" , "someProperty" , "someOtherProperty", "propertyToFilterOn")).as("your_array"))

dataframe.select("your_array").show(false)

+----------------------------------------------------+
|your_array                                          |
+----------------------------------------------------+
|[[1, xxx, 1, 1], [2, yyy, 223, 0], [3, zzz, 345, 1]]|
+----------------------------------------------------+

您可以使用数组过滤器高阶函数来过滤数组中的元素,如下所示:

You can filter elements within an array using the array filter higher order function like this:

val filteredDataframe = dataframe.select(expr("filter(your_array, your_struct -> your_struct.propertyToFilterOn == 1)").as("filtered_arrays"))

filteredDataframe.show(false)

+----------------------------------+
|filtered_arrays                   |
+----------------------------------+
|[[1, xxx, 1, 1], [3, zzz, 345, 1]]|
+----------------------------------+

对于您所谈论的其他逻辑",您应该能够使用变换高阶数组函数,如下所示:

for the "other logic" your talking about you should be able to use the transform higher order array function like so:

val tranformedDataframe = filteredDataframe
.select(expr("transform(filtered_arrays, your_struct -> struct(concat(your_struct.someProperty, '_', your_struct.someOtherProperty))"))

但是如本文所述,从transform函数返回结构存在一些问题:

but there are issues with returning structs from the transform function as described in this post:

所以您最好像这样使用数据集api进行转换:

so you are best using the dataset api for the transform like so:

case class YourStruct(id:String, someProperty: String, someOtherProperty: String)
case class YourArray(filtered_arrays: Seq[YourStruct])

case class YourNewStruct(id:String, newProperty: String)

val transformedDataset = filteredDataframe.as[YourArray].map(_.filtered_arrays.map(ys => YourNewStruct(ys.id, ys.someProperty + "_" + ys.someOtherProperty)))

val transformedDataset.show(false)

+--------------------------+
|value                     |
+--------------------------+
|[[1, xxx_1], [3, zzz_345]]|
+--------------------------+

这篇关于结构的Spark(Scala)过滤器数组不爆炸的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆