Pyspark:使用具有多重排序条件的 repartitionAndSortWithinPartitions [英] Pyspark: Using repartitionAndSortWithinPartitions with multiple sort Critiria

查看:47
本文介绍了Pyspark:使用具有多重排序条件的 repartitionAndSortWithinPartitions的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有以下 RDD:

Assuming I am having the following RDD:

rdd = sc.parallelize([('a', (5,1)), ('d', (8,2)), ('2', (6,3)), ('a', (8,2)), ('d', (9,6)), ('b', (3,4)),('c', (8,3))])

我如何使用 repartitionAndSortWithinPartitions 并按 x[0] 和 x[1][0] 之后排序.使用以下我只按键排序(x[0]):

How can I use repartitionAndSortWithinPartitions and sort by x[0] and after x[1][0]. Using the following I sort only by the key(x[0]):

Npartitions = sc.defaultParallelism
rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: hash(x) % Npartitions, 2)

一种方法如下,但我想应该有更简单的方法:

A way to do it is the following but there should something more simple I guess:

Npartitions = sc.defaultParallelism 
partitioned_data = rdd
  .partitionBy(2)
  .map(lambda x:(x[0],x[1][0],x[1][1]))
  .toDF(['letter','number2','number3'])
  .sortWithinPartitions(['letter','number2'],ascending=False)
  .map(lambda x:(x.letter,(x.number2,x.number3)))

>>> partitioned_data.glom().collect()

[[],
[(u'd', (9, 6)), (u'd', (8, 2))],
[(u'c', (8, 3)), (u'c', (6, 3))],
[(u'b', (3, 4))],
[(u'a', (8, 2)), (u'a', (5, 1))]

正如所见,我必须将其转换为 Dataframe 才能使用 sortWithinPartitions.还有其他方法吗?使用 repartitionAndSortWIthinPartitions?

As it can be seen I have to convert it to Dataframe in order to use sortWithinPartitions. Is there another way? Using repartitionAndSortWIthinPartitions?

(数据不是全局排序没有关系.我只关心在分区内排序.)

(It doesnt matter that the data is not globally sorted. I care only to be sorted inside the partitions.)

推荐答案

这是可能的,但您必须在组合键中包含所有必需的信息:

It is possible but you'll have to include all required information in the composite key:

from pyspark.rdd import portable_hash

n = 2

def partitioner(n):
    """Partition by the first item in the key tuple"""
    def partitioner_(x):
        return portable_hash(x[0]) % n
    return partitioner_


(rdd
  .keyBy(lambda kv: (kv[0], kv[1][0]))  # Create temporary composite key
  .repartitionAndSortWithinPartitions(
      numPartitions=n, partitionFunc=partitioner(n), ascending=False)
  .map(lambda x: x[1]))  # Drop key (note: there is no partitioner set anymore)

分步说明:

  • keyBy(lambda kv: (kv[0], kv[1][0])) 创建一个由原始键和值的第一个元素组成的替代键.换句话说,它会转换:

  • keyBy(lambda kv: (kv[0], kv[1][0])) creates a substitute key which consist of original key and the first element of the value. In other words it transforms:

(0, (5,1))

进入

((0, 5), (0, (5, 1)))

在实践中,简单地将数据重塑为

In practice it can be slightly more efficient to simply reshape data to

((0, 5), 1)

  • partitioner 根据键的第一个元素的散列定义分区函数,因此:

  • partitioner defines partitioning function based on a hash of the first element of the key so:

    partitioner(7)((0, 5))
    ## 0
    
    partitioner(7)((0, 6))
    ## 0
    
    partitioner(7)((0, 99))
    ## 0
    
    partitioner(7)((3, 99))
    ## 3
    

    如您所见,它是一致的并且忽略了第二位.

    as you can see it is consistent and ignores the second bit.

    我们使用默认的 keyfunc 函数,它是身份 (lambda x: x) 并依赖于 Python tuple 上定义的字典顺序:

    we use default keyfunc function which is identity (lambda x: x) and depend on lexicographic ordering defined on Python tuple:

    (0, 5) < (1, 5)
    ## True
    
    (0, 5) < (0, 4)
    ## False
    

  • 如前所述,您可以改为重塑数据:

    As mentioned before you could reshape data instead:

    rdd.map(lambda kv: ((kv[0], kv[1][0]), kv[1][1]))
    

    并删除最终的 map 以提高性能.

    and drop final map to improve performance.

    这篇关于Pyspark:使用具有多重排序条件的 repartitionAndSortWithinPartitions的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆