如何在PySpark DataFrame中强制进行某些分区? [英] How to force a certain partitioning in a PySpark DataFrame?

查看:87
本文介绍了如何在PySpark DataFrame中强制进行某些分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个带有 partition_id 列的DataFrame:

  n_partitions = 2df = spark.sparkContext.parallelize([[1,'A'],[1,'B'],[2,'A'],[2,"C"]]).toDF(('partition_id','val')) 

如何重新划分DataFrame以保证 partition_id 的每个值都进入一个单独的分区,并且与 partition_id的不同值一样,实际的分区也正好一样多代码>?

如果我进行哈希分区,即 df.repartition(n_partitions,'partition_id'),则可以保证分区数量正确,但是某些分区可能为空,而其他分区可能包含多个值由于哈希冲突导致 partition_id .

解决方案

Python和 DataFrame API没有此类选项.数据集中的分区API不可插入,仅支持预定义的范围和哈希分区方案.

您可以将数据转换为 RDD ,使用自定义分区程序进行分区,然后读取转换回 DataFrame :

pyspark.sql.functions中的

 导入col,struct,spark_partition_id映射= {k:i for i,k inumerate(df.select("partition_id").distinct().rdd.flatMap(lambda x:x).collect())}结果=(df.select("partition_id",struct([c for df.columns中的c]]).rdd.partitionBy(len(mapping),lambda k:mapping [k]).values().toDF(df.schema))result.withColumn("actual_partition_id",spark_partition_id()).show()#+ ------------ + --- + ------------------- +#| partition_id | val | actual_partition_id |#+ ------------ + --- + ------------------- +#|1 |A |0 |#|1 |B |0 |#|2 |A |1 |#|2 |C |1 |#+ ------------ + --- + ------------------- + 

请记住,这只会创建特定的数据分配,而不会设置Catalyst优化程序可以使用的分区程序.

Suppose I have a DataFrame with a column partition_id:

n_partitions = 2

df = spark.sparkContext.parallelize([
    [1, 'A'],
    [1, 'B'],
    [2, 'A'],
    [2, 'C']
]).toDF(('partition_id', 'val'))

How can I repartition the DataFrame to guarantee that each value of partition_id goes to a separate partition, and that there are exactly as many actual partitions as there are distinct values of partition_id?

If I do a hash partition, i.e. df.repartition(n_partitions, 'partition_id'), that guarantees the right number of partitions, but some partitions may be empty and others may contain multiple values of partition_id due to hash collisions.

解决方案

There is no such option with Python and DataFrame API. Partitioning API in Dataset is not plugable and supports only predefined range and hash partitioning schemes.

You can convert data to RDD, partition with custom partitioner, and read convert back to DataFrame:

from pyspark.sql.functions import col, struct, spark_partition_id

mapping = {k: i for i, k in enumerate(
    df.select("partition_id").distinct().rdd.flatMap(lambda x: x).collect()
)}

result = (df
    .select("partition_id", struct([c for c in df.columns]))
    .rdd.partitionBy(len(mapping), lambda k: mapping[k])
    .values()
    .toDF(df.schema))

result.withColumn("actual_partition_id", spark_partition_id()).show()
# +------------+---+-------------------+
# |partition_id|val|actual_partition_id|
# +------------+---+-------------------+
# |           1|  A|                  0|
# |           1|  B|                  0|
# |           2|  A|                  1|
# |           2|  C|                  1|
# +------------+---+-------------------+

Please remember that this only creates specific distribution of data and doesn't set partitioner that can be used by Catalyst optimizer.

这篇关于如何在PySpark DataFrame中强制进行某些分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆