如何在pyspark数据框中找到没有分组的累积频率 [英] How to find cumulative frequency without group by in pyspark dataframe

查看:56
本文介绍了如何在pyspark数据框中找到没有分组的累积频率的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 pyspark 数据框中有一个计数列:

I have a count column in pyspark dataframe as :

id   Count  Percent  
a     3       50    
b     3       50

我想要一个结果数据框:

I want a result dataframe as :

id   Count Percent CCount CPercent  
 a     3      50       3      50  
 b     3      50       6      100

我不能使用 Pandas 数据框,因为数据库非常大.我找到了指向窗口分区的答案,但我没有这样的列来分区.请谁能告诉如何在 pyspark 数据帧中做到这一点.注意:pyspark 1.6 版

I cant use pandas dataframe as database is very large. I found answers pointing to window partition but i have no such column to partition by. Please can anyone tell how to do it in pyspark dataframe. Note : pyspark version 1.6

推荐答案

窗口方法需要将所有数据移动到一个分区中,正如您在帖子中指出的那样,您的数据集太大了.为了解决这个问题,我稍微修改了这个 方法.该方法在为每个分区构建偏移字典后计算每个分区的累积和.这允许并行计算每个分区的累积总和,同时最小化数据重组:

The windowing approach would require moving all data into one partition and, as you have indicated in your post, your dataset is too big for this. To get around this I have slightly adapted this approach. This method calculates the cumulative sum for each partition after constructing an offset dictionary for each partition. This allows calculating the cumulative sum for each partition in parallel with minimum reshuffling of the data:

首先让我们生成一些测试数据:

First let's generate some test data:

data = sc.parallelize([('a',1,25.0),('b',2,25.0),('c',3,50.0)]).toDF(['id','Count','Percent'])    

这些是我调整过的辅助方法(参见 原始代码在这里)

These are the helper method I have tweaked (see the original code here)

from collections import defaultdict
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql import Window

def cumulative_sum_for_each_group_per_partition(partition_index, event_stream):
    cumulative_sum = defaultdict(float)
    for event in event_stream:
        cumulative_sum["Count"] += event["Count"]
        cumulative_sum["Percent"] += event["Percent"]
    for grp, cumulative_sum in cumulative_sum .iteritems():
        yield (grp, (partition_index, cumulative_sum))

def compute_offsets_per_group_factory(num_partitions):
    def _mapper(partial_sum_stream):
        per_partition_cumulative_sum = dict(partial_sum_stream)
        cumulative_sum = 0
        offset = {}
        for partition_index in range(num_partitions):
            offset[partition_index] = cumulative_sum
            cumulative_sum += per_partition_cumulative_sum.get(partition_index, 0)
        return offset
    return _mapper

def compute_cumulative_sum_per_group_factory(global_offset):
    def _mapper(partition_index, event_stream):
        local_cumulative_sum = defaultdict(float)
        for event in event_stream:
            local_cumulative_sum["Count"] += event["Count"]
            count_cumulative_sum = local_cumulative_sum["Count"] + global_offset.value["Count"][partition_index]
            local_cumulative_sum["Percent"] += event["Percent"]
            percentage_cumulative_sum = local_cumulative_sum["Percent"] + global_offset.value["Percent"][partition_index]
            yield Row(CCount= count_cumulative_sum, CPercent = percentage_cumulative_sum, **event.asDict())
    return _mapper

def compute_cumulative_sum(points_rdd):
    # First pass to compute the cumulative offset dictionary
    compute_offsets_per_group = compute_offsets_per_group_factory(points_rdd.getNumPartitions())
    offsets_per_group = points_rdd.\
        mapPartitionsWithIndex(cumulative_sum_for_each_group_per_partition, preservesPartitioning=True).\
        groupByKey().mapValues(compute_offsets_per_group).\
        collectAsMap()
    # Second pass to compute the cumulative sum using the offset dictionary
    sc = points_rdd.context
    compute_cumulative_sum_per_group = compute_cumulative_sum_per_group_factory(sc.broadcast(offsets_per_group))
    return points_rdd.\
        mapPartitionsWithIndex(compute_cumulative_sum_per_group, preservesPartitioning=True)

在测试数据上使用这些辅助方法:

Using these helper methods on the test data:

compute_cumulative_sum(data.rdd).toDF().show()

给出:

+------+--------+-----+-------+---+
|CCount|CPercent|Count|Percent| id|
+------+--------+-----+-------+---+
|   1.0|    25.0|    1|   25.0|  a|
|   3.0|    50.0|    2|   25.0|  b|
|   6.0|   100.0|    3|   50.0|  c|
+------+--------+-----+-------+---+

这篇关于如何在pyspark数据框中找到没有分组的累积频率的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆