如何保证Spark Dataframe中的重新分区 [英] How to guarantee repartitioning in Spark Dataframe

查看:21
本文介绍了如何保证Spark Dataframe中的重新分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对ApacheSpark非常陌生,我正在尝试按美国州重新划分数据帧。然后,我希望将每个分区分解为其自己的RDD并保存到特定位置:

schema = types.StructType([
  types.StructField("details", types.StructType([
      types.StructField("state", types.StringType(), True)
  ]), True)
])

raw_rdd = spark_context.parallelize([
  '{"details": {"state": "AL"}}',
  '{"details": {"state": "AK"}}',
  '{"details": {"state": "AZ"}}',
  '{"details": {"state": "AR"}}',
  '{"details": {"state": "CA"}}',
  '{"details": {"state": "CO"}}',
  '{"details": {"state": "CT"}}',
  '{"details": {"state": "DE"}}',
  '{"details": {"state": "FL"}}',
  '{"details": {"state": "GA"}}'
]).map(
    lambda row: json.loads(row)
)

rdd = sql_context.createDataFrame(raw_rdd).repartition(10, "details.state").rdd

for index in range(0, rdd.getNumPartitions()):
    partition = rdd.mapPartitionsWithIndex(
        lambda partition_index, partition: partition if partition_index == index else []
    ).coalesce(1)

    if partition.count() > 0:
        df = sql_context.createDataFrame(partition, schema=schema)

        for event in df.collect():
            print "Partition {0}: {1}".format(index, str(event))
    else:
        print "Partition {0}: No rows".format(index)
为了进行测试,我从S3加载了一个有50行(在本例中为10行)的文件,每行在details.state列中都有不同的状态。为了模仿上面示例中的行为,我对数据进行了并行化,但行为是相同的。我得到了我要求的50个分区,但有些分区没有被使用,有些分区包含多个州的条目。以下是样例集10的输出:

Partition 0: Row(details=Row(state=u'AK'))
Partition 1: Row(details=Row(state=u'AL'))
Partition 1: Row(details=Row(state=u'CT'))
Partition 2: Row(details=Row(state=u'CA'))
Partition 3: No rows
Partition 4: No rows
Partition 5: Row(details=Row(state=u'AZ'))
Partition 6: Row(details=Row(state=u'CO'))
Partition 6: Row(details=Row(state=u'FL'))
Partition 6: Row(details=Row(state=u'GA'))
Partition 7: Row(details=Row(state=u'AR'))
Partition 7: Row(details=Row(state=u'DE'))
Partition 8: No rows
Partition 9: No rows

我的问题是:重新分区策略只是对Spark的建议,还是我的代码存在根本问题?

推荐答案

这里没有意外情况。Spark使用分区键的散列(正的)模数分区来在分区之间分配行,对于50个分区,您将获得大量的重复项:

from pyspark.sql.functions import expr

states = sc.parallelize([
    "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA", 
    "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
    "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
    "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
    "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
])

states_df = states.map(lambda x: (x, )).toDF(["state"])

states_df.select(expr("pmod(hash(state), 50)")).distinct().count()
# 26
如果要在写入时分隔文件,最好使用partitionBy子句。它将为每个级别创建单独的输出,并且不需要洗牌。

如果您确实要进行完全重新分区,则可以使用RDD API,该API允许您使用自定义分区程序。

这篇关于如何保证Spark Dataframe中的重新分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆