按小组收集? [英] Collect set by group?
问题描述
我正在使用一个用Python编写的Hive包装器将Hive数据导入Python Jupyter笔记本中。我有如下的TB数据:
表1:time = t1
uid colA
1 A
1 B
1 C
2 A
2 B
3 C
3 D
我想从上面的数据创建一个新的数据框(PySpark / Pandas),如下所示:
表2:时间= t1
uid colA
1 [A,B,C]
2 [A,B]
3 [C,D]
其中 colA
会是一个字符串列表。我将如何做到这一点?我已经阅读过有关collect_set()的内容,但不熟悉它的用法或适用性。
创建表2
,假设我有另一个表 time = t2
:
表3:time = t2
uid colA
1 [A,B]
2 [B]
3 [C,D,E]
$ c $现在,我想计算表2
和表3
。它应该返回3,因为这是从表3到表2所需的添加/删除次数。
解决方案总结了问题的解决方案。希望这可以为你使用pyspark。
全球进口:
import pyspark.sql.functions as F
import pyspark.sql.types as T
表2创建代码:
df1 = sc.parallelize ([$'$'[1,'A'],[1,'B'],[1,'C'],[2,'A'],[2,'B'],[3' C'],[3,'D']
])。toDF(['uid','colA'])。groupBy(uid)。agg(F.collect_set(colA)。alias (colA))
df1.show()
+ --- + --------- +
| uid |可口可乐|
+ --- + --------- +
| 1 | [A,B,C] |
| 2 | [A,B] |
| 3 | [C,D] |
+ --- + --------- +
表3创建代码:
df2 = sc.parallelize([[1,['A', 'B']],[2,['B']],[3,['C','D','E']]])toDF(['uid','colA'])
def diffUdfFunc(x,y):
返回列表(set(y).difference(set(x)))
diffUdf = F.udf(diffUdfFunc,T.ArrayType( (colA,colA1)。join(df2,uid)。withColumnRenamed(colA,colA2)。withColumn(diffCol ,diffUdf(F.col(colA1),F.col(colA2)))
finaldf.select(uid,F.col(diffCol)。alias(colA) )。(F.size(colA)> 0).show()
+ --- + ---- +
| uid | colA |
+ --- + ---- +
| 3 | [E] |
+ --- + ---- +
I am working with Hive data pulled into a Python Jupyter notebook using a Hive wrapper written in Python. I have terabytes of data like the following:
Table 1: time=t1
uid colA
1 A
1 B
1 C
2 A
2 B
3 C
3 D
I would like to create a new dataframe (PySpark/Pandas) from the above data that looks like:
Table 2: time=t1
uid colA
1 [A, B, C]
2 [A, B]
3 [C, D]
where colA
would be a list of strings. How would I do this? I've read about collect_set(), but am not familiar with its use or approriateness.
After creating Table 2
, suppose I had another table for time=t2
:
Table 3: time=t2
uid colA
1 [A, B]
2 [B]
3 [C, D, E]
Now, I'd like to calculate the set difference between table 2
and table 3
. It should return 3, since this is the number of additions/deletions needed to get from Table 3 to Table 2.
Here is summarized solution for problem. Hope this will work for you using pyspark.
Global Imports:-
import pyspark.sql.functions as F
import pyspark.sql.types as T
Table 2 Creation Code:-
df1 = sc.parallelize([
[1,'A'], [1,'B'], [1,'C'], [2,'A'], [2,'B'], [3, 'C'], [3,'D']
]).toDF(['uid', 'colA']).groupBy("uid").agg(F.collect_set("colA").alias("colA"))
df1.show()
+---+---------+
|uid| colA|
+---+---------+
| 1|[A, B, C]|
| 2| [A, B]|
| 3| [C, D]|
+---+---------+
Table 3 Creation Code:-
df2 = sc.parallelize([[1, ['A', 'B']],[2, ['B']],[3, ['C', 'D', 'E']]]).toDF(['uid', 'colA'])
def diffUdfFunc(x,y):
return list(set(y).difference(set(x)))
diffUdf = F.udf(diffUdfFunc,T.ArrayType(T.StringType()))
finaldf = df1.withColumnRenamed("colA", "colA1").join(df2, "uid").withColumnRenamed("colA", "colA2").withColumn("diffCol", diffUdf(F.col("colA1"), F.col("colA2")))
finaldf.select("uid", F.col("diffCol").alias("colA")).where(F.size("colA") > 0).show()
+---+----+
|uid|colA|
+---+----+
| 3| [E]|
+---+----+
这篇关于按小组收集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!