Spark(PySpark)组通过对collect_list的第一个元素进行错误排序 [英] Spark (pySpark) groupBy misordering first element on collect_list

查看:223
本文介绍了Spark(PySpark)组通过对collect_list的第一个元素进行错误排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下数据框(df_parquet):

I have the following dataframe (df_parquet):

DataFrame[id: bigint, date: timestamp, consumption: decimal(38,18)]

我打算使用collect_list来获取日期和消费的排序列表,如这篇文章中所述: collect_list通过保留基于另一个列表的顺序变量

I intend to get sorted lists of dates and consumptions using collect_list, just as stated in this post: collect_list by preserving order based on another variable

我正在遵循最后一种方法( https://stackoverflow.com/a/49246162/11841618 )是我认为它更有效的一种.

I am following the last approach (https://stackoverflow.com/a/49246162/11841618), which is the one i think its more efficient.

因此,我不仅使用默认的分区数(200)来调用分区,还用500进行了调用,并且我按ID和日期而不是按日期在分区内进行排序(以提高groupBy的效率,等等)我希望).事实是,每个分区一次(每个分区只有一个ID,而且似乎是一个随机ID),我将列表的第一项放在最后一个位置.

So instead of just calling repartition with the default number of partitions (200) i call it with 500, and i sort within partitions by id and date, not just by date (in order to make the groupBy more efficient, or so i hope). The thing is that once per partition (on only one id per partition, and it seems to be a random id) i get the first item of a list in the last place.

关于发生了什么的任何线索?其余的id在其数组中排序良好,所以我觉得在每个分区内groupBy或collect_list的行为方式都存在问题.

Any clue on what is going on? The rest of ids are well sorted in its arrays, so I thing there is something going on with the way groupBy or collect_list behave inside each partition.

我通过获取分区ID并检查相同的groupBy + collect_list组合在这些值之一上是否失败,来验证其行为不同的分区上的第一个或最后一个ID.

I verified its not the first or last id on a partition the one that behaves differently by getting the partition id and checking if the same groupBy + collect_list combination fails on one of those values, so it seems it's random.

Youc可以根据需要检查我的代码,它非常简单.

Youc can check my code if you want, its pretty simple.


    ordered_df = df_parquet.repartition(500, 
    'id').sortWithinPartitions(['id', 'date'])

    grouped_df =  ordered_df.groupby("id").agg(F.collect_list("date").alias('date'), 
    F.collect_list('consumption').alias('consumption'))

然后使用代码对其进行测试(比较第一个和最后一个值,第一个应较旧,但在500个情况下则不是):

And the code use to test it (comparing the first and last value, the first should be older, but on 500 cases it is not):


    test = grouped_df.filter(F.size('date') > 
    1).select('id', (F.col('date').getItem(0) > 
    F.col('date').getItem(F.size('date') - 1)).alias('test'), 
    F.array([F.col('fecha').getItem(0), 
                      F.col('date').getItem(F.size('date') - 
    1)]).alias('see')).filter(F.col('test'))

    test.show(5, 100)

    test.count()

结果:

+-----+----+------------------------------------------+
|   id|test|                                       see|
+-----+----+------------------------------------------+
|89727|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76325|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|80115|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|89781|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76411|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
+-----+----+------------------------------------------+
only showing top 5 rows

500

虽然它应该是一个空的数据框,但是应该对所有ID的所有数组进行排序.

While its expected to be an empty dataframe, as all the arrays should be sorted for all the ids.

推荐答案

好的,这个问题仍然没有解决,但是我发现了一个简单的解决方法,以防万一有人卡住这个问题:

Ok, the question is still unsolved, but I found an easy workaround, just in case somebody gets stuck cause of this same issue:

重点是反转数组的第一个和最后一个位置.在date数组上,可以通过使用spark 2.4中引入的 array_sort 函数进行排序来完成.要对消耗数组执行重新排序,我们需要使用udf.

The point is to invert the first and last places of the arrays. On the date array this can be done by sorting with the array_sort function introduced in spark 2.4. To perform the reordering on the consumption array we need to use an udf.

invert_last = F.udf(lambda vector: [vector[-1]] + vector[:-1], ArrayType(DoubleType()))

test = grouped_df.withColumn('error', (F.size('date') > 1) & (F.col('date').getItem(0) >
           F.col('date').getItem(F.size('date') - 1))).withColumn('date', F.when(F.col('error'),
           F.array_sort(F.col('date'))).otherwise(F.col('date'))).withColumn('consumption',
           F.when(F.col('error'), invert_last(F.col('consumption'))).otherwise(F.col('consumption'))).drop('error')

干杯.

这篇关于Spark(PySpark)组通过对collect_list的第一个元素进行错误排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆