如何在多列上编写Pyspark UDAF? [英] How to write Pyspark UDAF on multiple columns?
本文介绍了如何在多列上编写Pyspark UDAF?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我在名为end_stats_df
的pyspark数据框中具有以下数据:
I have the following data in a pyspark dataframe called end_stats_df
:
values start end cat1 cat2
10 1 2 A B
11 1 2 C B
12 1 2 D B
510 1 2 D C
550 1 2 C B
500 1 2 A B
80 1 3 A B
我想通过以下方式对其进行汇总:
And I want to aggregate it in the following way:
- 我想使用开始"和结束"列作为聚合键
- 对于每组行,我需要执行以下操作:
- 计算该组在
cat1
和cat2
中的唯一值数目.例如,对于start
= 1和end
= 2的组,此数字将为4,因为存在A,B,C,D.此数字将存储为n
(在此示例中为n = 4) . - 对于
values
字段,对于每个组,我需要对values
进行排序,然后选择每个n-1
值,其中n
是上述第一个操作中存储的值. - 在聚合结束时,我真的不在乎上述操作后的
cat1
和cat2
中是什么.
- I want to use the "start" and "end" columns as the aggregate keys
- For each group of rows, I need to do the following:
- Compute the unique number of values in both
cat1
andcat2
for that group. e.g., for the group ofstart
=1 andend
=2, this number would be 4 because there's A, B, C, D. This number will be stored asn
(n=4 in this example). - For the
values
field, for each group I need to sort thevalues
, and then select everyn-1
value, wheren
is the value stored from the first operation above. - At the end of the aggregation, I don't really care what is in
cat1
andcat2
after the operations above.
上面示例的示例输出是:
An example output from the example above is:
values start end cat1 cat2 12 1 2 D B 550 1 2 C B 80 1 3 A B
我如何完成使用pyspark数据框?我认为我需要使用自定义UDAF,对吧?
How do I accomplish using pyspark dataframes? I assume I need to use a custom UDAF, right?
推荐答案
Pyspark不直接支持
UDAF
,因此我们必须手动进行聚合.Pyspark do not support
UDAF
directly, so we have to do aggregation manually.from pyspark.sql import functions as f def func(values, cat1, cat2): n = len(set(cat1 + cat2)) return sorted(values)[n - 2] df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', sep='\t', header=True) df = df.groupBy(df['start'], df['end']).agg(f.collect_list(df['values']).alias('values'), f.collect_set(df['cat1']).alias('cat1'), f.collect_set(df['cat2']).alias('cat2')) df = df.select(df['start'], df['end'], f.UserDefinedFunction(func, StringType())(df['values'], df['cat1'], df['cat2']))
这篇关于如何在多列上编写Pyspark UDAF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
- Compute the unique number of values in both
- 计算该组在
查看全文