Spark中的预分区数据,这样每个分区在我们要分区的列中都具有不重叠的值 [英] Pre-partition data in spark such that each partition has non-overlapping values in the column we are partitioning on

查看:67
本文介绍了Spark中的预分区数据,这样每个分区在我们要分区的列中都具有不重叠的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在对数据的特定列进行聚合操作之前对数据进行预分区.我有3个工作程序节点,我希望每个分区在要进行分区的列中都具有不重叠的值.我不想出现两个分区在列中可能具有相同值的情况.

I'm trying to pre-partition the data before doing an aggregation operation across a certain column of my data. I have 3 worker nodes and I would llike each partition to have non-overlapping values in the column I am partitioning on. I don't want to have situations where two partitions might have the same values in the column.

例如如果我有以下数据

ss_item_sk | ss_quantity
1          | 10.0
1          |  4.0
2          |  3.0
3          |  5.0
4          |  8.0
5          |  13.0
5          |  10.0

那么以下分区是令人满意的:

Then the following partitions are satisfactory:

分区1

ss_item_sk | ss_quantity
1          | 10.0
1          |  4.0

分区2

ss_item_sk | ss_quantity
2          |  3.0
3          |  5.0

分区3

ss_item_sk | ss_quantity
4          |  8.0
5          |  13.0
5          |  10.0

不幸的是,我下面的代码无法正常工作.

Unfortunately, the code I have below does not work.

spark.sqlContext.setConf( "spark.sql.shuffle.partitions", "3")
var json = spark.read.json("hdfs://master:9000/tpcds/store_sales")
var filtered = json.filter(row => row.getAs[Long]("ss_item_sk") < 180)
filtered.repartition($"ss_item_sk").write.json(savepath)

我已经看过

我仍然无法弄清楚.

推荐答案

按键分区可以在数据帧级别基于键进行数据分配.在hdfs上写数据帧是另一回事.您可以尝试

Repartition by key does an distribution of data based on a key in dataframe level. While writing a dataframe on hdfs is a separate thing. you can try

df.coalesce(1).write.partitionBy("ss_item_sk").json(savepath)

在这种情况下,您还将在分区列创建的不同目录中看到多个零件文件.只能根据"partitionBy"方法控制将要运行的写入器/还原器的数量.它非常类似于Map Reduce Partitioner,因为它控制将运行的reducer数量.要基于分区列获取单个文件,您必须运行此命令.

In this scenario as well you will see multiple part files in different directory created by partitioned column. The number of writer/reducer that will run can only be controlled based on "partitionBy" method. Its very similar like Map Reduce Partitioner as it controls the number of reducer will run. To get a single file based on the partition column you have to run this command.

df.repartition($"ss_item_sk").write.partitionBy("ss_item_sk").json(savepath)

现在,这在将reducer与执行程序分区的数量进行映射时起作用.希望这会有所帮助

Now this works as the reducer is getting mapped with the number of executor partition. Hope this helps

这篇关于Spark中的预分区数据,这样每个分区在我们要分区的列中都具有不重叠的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆