在Spark数据框内的映射中的结构数组上进行聚合 [英] Aggregation on an array of structs in a map inside a Spark dataframe

查看:100
本文介绍了在Spark数据框内的映射中的结构数组上进行聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我为冗长的标题表示歉意,但我确实无法提出更好的建议。

I apologize for the verbose title, but I really couldn't come up with something better.

基本上,我的数据具有以下模式:

Basically, I have data with the following schema:

 |-- id: string (nullable = true)
 |-- mainkey: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- price: double (nullable = true)
 |    |    |    |-- recordtype: string (nullable = true)

让我使用以下示例数据:

Let me use the following example data:

{"id":1, "mainkey":{"key1":[{"price":0.01,"recordtype":"BID"}],"key2":[{"price":4.3,"recordtype":"FIXED"}],"key3":[{"price":2.0,"recordtype":"BID"}]}}
{"id":2, "mainkey":{"key4":[{"price":2.50,"recordtype":"BID"}],"key5":[{"price":2.4,"recordtype":"BID"}],"key6":[{"price":0.19,"recordtype":"BID"}]}}

对于上面的两个记录,当记录类型为 BID时,我想计算所有价格的均值。因此,对于第一个记录(带有 id:1),我们有2个这样的出价,价格分别为0.01和2.0,因此四舍五入到小数点后2位的均值为1.01。对于第二个记录(带有 id:2),有3个出价,价格分别为2.5、2.4和0.19,均值为1.70。所以我想要以下输出:

For each of the two records above, I want to calculate mean of all prices when the recordtype is "BID". So, for the first record (with "id":1), we have 2 such bids, with prices 0.01 and 2.0, so the mean rounded to 2 decimal places is 1.01. For the second record (with "id":2), there are 3 bids, with prices 2.5, 2.4 and 0.19, and the mean is 1.70. So I want the following output:

+---+---------+
| id|meanvalue|
+---+---------+
|  1|     1.01|
|  2|      1.7|
+---+---------+

以下代码可以做到:

val exSchema = (new StructType().add("id", StringType).add("mainkey", MapType(StringType, new ArrayType(new StructType().add("price", DoubleType).add("recordtype", StringType), true))))
val exJsonDf = spark.read.schema(exSchema).json("file:///data/json_example")
var explodeExJson = exJsonDf.select($"id",explode($"mainkey")).explode($"value") {
    case Row(recordValue: Seq[Row] @unchecked ) => recordValue.map{ recordValue =>
    val price = recordValue(0).asInstanceOf[Double]
    val recordtype = recordValue(1).asInstanceOf[String]
    RecordValue(price, recordtype)
    }
    }.cache()

val filteredExJson = explodeExJson.filter($"recordtype"==="BID")

val aggExJson = filteredExJson.groupBy("id").agg(round(mean("price"),2).alias("meanvalue")) 



<问题是它使用昂贵的爆炸操作,当我处理大量数据时,尤其是在地图中可能有很多键时,这成为一个问题。

The problem is that it uses an "expensive" explode operation and it becomes a problem when I am dealing with lots of data, especially when there can be a lot of keys in the map.

请让我知道您是否可以想到使用UDF或其他简单的解决方案。还请记住,我是Spark的初学者,因此可能错过了一些对您来说显而易见的东西。

Please let me know if you can think of a simpler solution, using UDFs or otherwise. Please also keep in mind that I am a beginner to Spark, and hence may have missed some stuff that would be obvious to you.

任何帮助将不胜感激。

Any help would be really appreciated. Thanks in advance!

推荐答案

如果聚合仅限于单个 Row udf 将解决此问题:

If aggregation is limited to a single Row udf will solve this:

import org.apache.spark.util.StatCounter
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

val meanPrice =  udf((map: Map[String, Seq[Row]]) => {
  val prices = map.values
    .flatMap(x => x)
    .filter(_.getAs[String]("recordtype") == "BID")
    .map(_.getAs[Double]("price"))
  StatCounter(prices).mean
})

df.select($"id", meanPrice($"mainkey"))

这篇关于在Spark数据框内的映射中的结构数组上进行聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆