控制阿帕奇星火数据分区 [英] Controlling the data partition in Apache Spark

查看:272
本文介绍了控制阿帕奇星火数据分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

数据如下:

第1栏第2栏第3栏第4栏搜索
行1列1行1列1结果
行2列2行2列2结果
排第3行3列3行3结果
排第4行4列4行4结果
排第5行5列5行5结果
排行6行6行6 6搜索

col 1 col 2 col 3 col 4
row 1 row 1 row 1 row 1
row 2 row 2 row 2 row 2
row 3 row 3 row 3 row 3
row 4 row 4 row 4 row 4
row 5 row 5 row 5 row 5
row 6 row 6 row 6 row 6

问题:我要分区这个数据,可以说,第1行和行2将被处理为一个分区,行3和第4行的另一个行5和第6行的另一个并创建一个JSON数据一起将它们合并列(列标题与行的数据值)。

Problem: I want to partition this data, lets say row 1 and row 2 will be processed as one partition, row 3 and row 4 as another, row 5 and row 6 as another and create a JSON data merging them together with the column (column headers with data values in rows).

输出应该是这样的:结果
[
结果{COL1:ROW1,COL2:ROW1:COL3:ROW1:COL4:ROW1},
结果{COL1:ROW2,COL2:ROW2:COL3:ROW2:COL4:ROW2},
结果{COL1:ROW3,COL2:ROW3:COL3:ROW3:COL4:ROW3},
结果{COL1:ROW4,COL2:ROW4:COL3:ROW4:COL4:ROW4},...
结果]

Output should be like:
[
{col1:row1,col2:row1:col3:row1:col4:row1},
{col1:row2,col2:row2:col3:row2:col4:row2},
{col1:row3,col2:row3:col3:row3:col4:row3},
{col1:row4,col2:row4:col3:row4:col4:row4},...
]

我试着用repartion(NUM)的火花可用,但它是不完全划分,因为我想。因此,数据生成的JSON是无效的。我曾与为什么我的计划是采取同一时间处理,即使我使用不同的内核数量可以找到<的数据的问题href=\"http://stackoverflow.com/questions/32556654/spark-streaming-same-processing-time-for-4-cores-and-16-cores-why\">here和重新分区建议得到了@Patrick McGloin建议。在这个问题中提到的code是什么我试图做的。

I tried using repartion(num) available in spark but it is not exactly partitioning as i want. therefore the JSON data generated is not valid. i had issue with why my program was taking same time for processing the data even though i was using different number of cores which can be found here and the repartition suggestion was suggested by @Patrick McGloin . The code mentioned in that problem is something i am trying to do.

推荐答案

你猜你需要的是的 partitionBy 的。在Scala中,你可以提供给它的自定义生成HashParitioner,而在Python传递的 partitionFunc 的。有一些例子在斯卡拉在那里,所以让我先简单介绍一下Python的味道。

Guess what you need is partitionBy. In Scala you can provide to it a custom build HashParitioner, while in Python you pass partitionFunc. There is a number of examples out there in Scala, so let me briefly explain the Python flavour.

partitionFunc 的期望一个元组,与第一个元素是关键。让我们假设您安排在下面的方式您的数据:
(ROW_ID中,(A,B,C,...)),其中ROW_ID = [1,2,3,...,k]的。您可以随时添加ROW_ID,之后将其删除。

partitionFunc expects a tuple, with first element being the key. Lets assume you organise your data in the following fashion: (ROW_ID, (A,B,C,..)) where ROW_ID = [1,2,3,...,k]. You can always add ROW_ID and remove it afterwards.

要获得一个新的分区每两行:

To get a new partition every two rows:

rdd.partitionBy(numPartitions = int(rdd.count() / 2),
                partitionFunc = lambda key: int(key / 2)

partitionFunc 的会产生一个序列0,0,1,1,2,2,......这个数字将是一个数分区到给定的行会属于的。

partitionFunc will produce a sequence 0,0,1,1,2,2,... This number will be a number of partition to which given row will belong.

这篇关于控制阿帕奇星火数据分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆