控制 Apache Spark 中的数据分区 [英] Controlling the data partition in Apache Spark

查看:32
本文介绍了控制 Apache Spark 中的数据分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

数据看起来像:

列 1 列 2 列 3 列 4
第 1 行 1 行 1 行 1
第2行 第2行 第2行 第2行
第3行 第3行 第3行 第3行
第4行 第4行 第4行 第4行
第5行 第5行 第5行 第5行
第6行第6行第6行第6行

col 1 col 2 col 3 col 4
row 1 row 1 row 1 row 1
row 2 row 2 row 2 row 2
row 3 row 3 row 3 row 3
row 4 row 4 row 4 row 4
row 5 row 5 row 5 row 5
row 6 row 6 row 6 row 6

问题:我想对这些数据进行分区,假设第 1 行和第 2 行将作为一个分区处理,第 3 行和第 4 行作为另一个处理,第 5 行和第 6 行作为另一个处理,并创建一个 JSON 数据将它们合并在一起列(带有行中数据值的列标题).

Problem: I want to partition this data, lets say row 1 and row 2 will be processed as one partition, row 3 and row 4 as another, row 5 and row 6 as another and create a JSON data merging them together with the column (column headers with data values in rows).

输出应该是这样的:
[
{col1:row1,col2:row1:col3:row1:col4:row1},
{col1:row2,col2:row2:col3:row2:col4:row2},
{col1:row3,col2:row3:col3:row3:col4:row3},
{col1:row4,col2:row4:col3:row4:col4:row4},...
]

Output should be like:
[
{col1:row1,col2:row1:col3:row1:col4:row1},
{col1:row2,col2:row2:col3:row2:col4:row2},
{col1:row3,col2:row3:col3:row3:col4:row3},
{col1:row4,col2:row4:col3:row4:col4:row4},...
]

我尝试使用 spark 中可用的 repartion(num) 但它并不是我想要的完全分区.因此生成的 JSON 数据无效.我有一个问题,为什么我的程序在处理数据时花费相同的时间,即使我使用了不同数量的内核,可以找到 here 和@Patrick McGloin 建议的重新分区建议.该问题中提到的代码是我正在尝试做的事情.

I tried using repartion(num) available in spark but it is not exactly partitioning as i want. therefore the JSON data generated is not valid. i had issue with why my program was taking same time for processing the data even though i was using different number of cores which can be found here and the repartition suggestion was suggested by @Patrick McGloin . The code mentioned in that problem is something i am trying to do.

推荐答案

猜猜你需要的是partitionBy.在 Scala 中,您可以为其提供自定义构建 HashParitioner,而在 Python 中,您可以传递 partitionFunc.Scala 中有许多示例,所以让我简要解释一下 Python 的风格.

Guess what you need is partitionBy. In Scala you can provide to it a custom build HashParitioner, while in Python you pass partitionFunc. There is a number of examples out there in Scala, so let me briefly explain the Python flavour.

partitionFunc 需要一个元组,第一个元素是键.假设您按以下方式组织数据:(ROW_ID, (A,B,C,..)) 其中 ROW_ID = [1,2,3,...,k].您可以随时添加 ROW_ID 并在之后将其删除.

partitionFunc expects a tuple, with first element being the key. Lets assume you organise your data in the following fashion: (ROW_ID, (A,B,C,..)) where ROW_ID = [1,2,3,...,k]. You can always add ROW_ID and remove it afterwards.

每两行获取一个新分区:

To get a new partition every two rows:

rdd.partitionBy(numPartitions = int(rdd.count() / 2),
                partitionFunc = lambda key: int(key / 2)

partitionFunc 将产生一个序列 0,0,1,1,2,2,... 这个数字将是给定行所属的分区数.

partitionFunc will produce a sequence 0,0,1,1,2,2,... This number will be a number of partition to which given row will belong.

这篇关于控制 Apache Spark 中的数据分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆