按小组收集? [英] Collect set by group?

查看:128
本文介绍了按小组收集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用一个用Python编写的Hive包装器将Hive数据导入Python Jupyter笔记本中。我有如下的TB数据:

 表1:time = t1 
uid colA
1 A
1 B
1 C
2 A
2 B
3 C
3 D

我想从上面的数据创建一个新的数据框(PySpark / Pandas),如下所示:

 表2:时间= t1 
uid colA
1 [A,B,C]
2 [A,B]
3 [C,D]

其中 colA 会是一个字符串列表。我将如何做到这一点?我已经阅读过有关collect_set()的内容,但不熟悉它的用法或适用性。



创建表2 ,假设我有另一个表 time = t2

 表3:time = t2 
uid colA
1 [A,B]
2 [B]
3 [C,D,E]
表2
表3 。它应该返回3,因为这是从表3到表2所需的添加/删除次数。

解决方案

总结了问题的解决方案。希望这可以为你使用pyspark。



全球进口:

 import pyspark.sql.functions as F 
import pyspark.sql.types as T

表2创建代码:

  df1 = sc.parallelize ([$'$'[1,'A'],[1,'B'],[1,'C'],[2,'A'],[2,'B'],[3' C'],[3,'D'] 
])。toDF(['uid','colA'])。groupBy(uid)。agg(F.collect_set(colA)。alias (colA))

df1.show()
+ --- + --------- +
| uid |可口可乐|
+ --- + --------- +
| 1 | [A,B,C] |
| 2 | [A,B] |
| 3 | [C,D] |
+ --- + --------- +

表3创建代码:

  df2 = sc.parallelize([[1,['A', 'B']],[2,['B']],[3,['C','D','E']]])toDF(['uid','colA'])
def diffUdfFunc(x,y):
返回列表(set(y).difference(set(x)))

diffUdf = F.udf(diffUdfFunc,T.ArrayType( (colA,colA1)。join(df2,uid)。withColumnRenamed(colA,colA2)。withColumn(diffCol ,diffUdf(F.col(colA1),F.col(colA2)))
finaldf.select(uid,F.col(diffCol)。alias(colA) )。(F.size(colA)> 0).show()
+ --- + ---- +
| uid | colA |
+ --- + ---- +
| 3 | [E] |
+ --- + ---- +


I am working with Hive data pulled into a Python Jupyter notebook using a Hive wrapper written in Python. I have terabytes of data like the following:

Table 1: time=t1
uid   colA
1     A
1     B
1     C
2     A
2     B
3     C
3     D

I would like to create a new dataframe (PySpark/Pandas) from the above data that looks like:

Table 2: time=t1
uid   colA
1     [A, B, C]
2     [A, B]
3     [C, D]

where colA would be a list of strings. How would I do this? I've read about collect_set(), but am not familiar with its use or approriateness.

After creating Table 2, suppose I had another table for time=t2:

Table 3: time=t2
uid   colA
1     [A, B]
2     [B]
3     [C, D, E]

Now, I'd like to calculate the set difference between table 2 and table 3. It should return 3, since this is the number of additions/deletions needed to get from Table 3 to Table 2.

解决方案

Here is summarized solution for problem. Hope this will work for you using pyspark.

Global Imports:-

import pyspark.sql.functions as F
import pyspark.sql.types as T

Table 2 Creation Code:-

df1 = sc.parallelize([
        [1,'A'], [1,'B'], [1,'C'], [2,'A'], [2,'B'], [3, 'C'], [3,'D']
       ]).toDF(['uid', 'colA']).groupBy("uid").agg(F.collect_set("colA").alias("colA"))

df1.show()
+---+---------+
|uid|     colA|
+---+---------+
|  1|[A, B, C]|
|  2|   [A, B]|
|  3|   [C, D]|
+---+---------+

Table 3 Creation Code:-

df2 = sc.parallelize([[1, ['A', 'B']],[2, ['B']],[3, ['C', 'D', 'E']]]).toDF(['uid', 'colA'])
def diffUdfFunc(x,y):
    return list(set(y).difference(set(x)))

diffUdf = F.udf(diffUdfFunc,T.ArrayType(T.StringType()))
finaldf = df1.withColumnRenamed("colA", "colA1").join(df2, "uid").withColumnRenamed("colA", "colA2").withColumn("diffCol", diffUdf(F.col("colA1"), F.col("colA2")))
finaldf.select("uid", F.col("diffCol").alias("colA")).where(F.size("colA") > 0).show()
+---+----+
|uid|colA|
+---+----+
|  3| [E]|
+---+----+

这篇关于按小组收集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆