Spark 数据帧分组到列表中 [英] Spark dataframes groupby into list

查看:30
本文介绍了Spark 数据帧分组到列表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对集合进行一些分析.我有一个如下所示的示例数据集:

I am trying to do some analysis on sets. I have a sample data set that looks like this:

订单.json

{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}

它只是一个字段,它是一个代表 ID 的数字列表.

All it is, is a single field that is a list of numbers that represent IDs.

这是我尝试运行的 Spark 脚本:

Here is the Spark script I am trying to run:

val sparkConf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("Dataframe Test")

val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)

val dataframe = sql.read.json("orders.json")

val expanded = dataframe
  .explode[::[Long], Long]("items", "item1")(row => row)
  .explode[::[Long], Long]("items", "item2")(row => row)

val grouped = expanded
  .where(expanded("item1") !== expanded("item2"))
  .groupBy("item1", "item2")
  .count()

val recs = grouped
  .groupBy("item1")

创建 expandedgrouped 很好,简而言之 expanded 是两个 ID 的所有可能集合的列表,其中两个 ID在同一个原始集合中.grouped 过滤掉与其自身匹配的 ID,然后将所有唯一的 ID 对组合在一起并为每个 ID 生成一个计数.grouped的schema和数据样本是:

Creating expanded and grouped is fine, in a nutshell expanded is a list of all the possible sets of two IDs where the two IDs were in the same original set. grouped filters out IDs that were matched with themselves, then groups together all the unique pairs of IDs and produces a count for each. The schema and data sample of grouped are:

root
 |-- item1: long (nullable = true)
 |-- item2: long (nullable = true)
 |-- count: long (nullable = false)

[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...

所以,我的问题是:我现在如何对每个结果中的第一项进行分组,以便我有一个元组列表?对于上面的示例数据,我希望得到类似的结果:

So, my question is: how do I now group on the first item in each result so that I have a list of tuples? For the example data above, I would expect something similar to this:

[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]

正如您在我的 recs 脚本中看到的那样,我认为您应该首先在item1"上执行 groupBy,这是每行中的第一项.但是在那之后,您将剩下这个 GroupedData 对象,它的操作非常有限.真的,您只需要进行 sum、avg 等聚合.我只想列出每个结果中的元组.

As you can see in my script with recs, I thought you would start by doing a groupBy on 'item1' which is the first item in each row. But after that you are left with this GroupedData object that has very limited actions on it. Really, you are only left with doing aggregations like sum, avg, etc. I just want to list the tuples from each result.

此时我可以轻松使用 RDD 函数,但这与使用数据帧不同.有没有办法用数据框函数来做到这一点.

I could easily use RDD functions at this point, but that departs from using Dataframes. Is there a way to do this with the dataframe functions.

推荐答案

您可以使用 org.apache.spark.sql.functions(collect_liststruct) 自 1.6 起可用

You can build that with org.apache.spark.sql.functions (collect_list and struct) available since 1.6

val recs =grouped.groupBy('item1).agg(collect_list(struct('item2,'count)).as("set"))


+-----+----------------------------+
|item1|set                         |
+-----+----------------------------+
|1    |[[5,3], [4,1], [3,2], [2,2]]|
|2    |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+

你也可以使用collect_set

有关信息,元组不存在于数据帧中.最接近的结构是 struct,因为它们相当于无类型数据集 API 中的 case 类.

for information, tuples don't exist in dataframes. The closest structures are struct since they are the equivalent of case classes in the untyped dataset API.

编辑 2:还要注意 collect_set 附带的警告是结果实际上不是集合(SQL 类型中没有具有集合属性的数据类型).这意味着您最终可以得到不同的集合",它们的顺序不同(至少在 2.1.0 版中).然后需要使用 sort_array 对它们进行排序.

Edit 2: Also be warned that collect_set comes with the caveat that the result is actually not a set (there is no datatype with set properties in the SQL types). That means that you can end up with distinct "sets" which differ by their order (in version 2.1.0 at least). Sorting them with sort_array is then necessary.

这篇关于Spark 数据帧分组到列表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆