通过Python使用Spark准备我的大数据 [英] Prepare my bigdata with Spark via Python
问题描述
我的100m大小,量化数据:
My 100m in size, quantized data:
(1424411938', [3885, 7898])
(3333333333', [3885, 7898])
所需结果:
(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])
所以我想要的是转换数据,以便我将3885(例如)与所有 data [0]组合在一起
)。这是我在 python 的问题中所做的事情:
So what I want, is to transform the data so that I group 3885 (for example) with all the data[0]
that have it). Here is what I did in python:
def prepare(data):
result = []
for point_id, cluster in data:
for index, c in enumerate(cluster):
found = 0
for res in result:
if c == res[0]:
found = 1
if(found == 0):
result.append((c, []))
for res in result:
if c == res[0]:
res[1].append(point_id)
return result
但是当我 mapPartitions()
使用 prepare()
编辑了 data
RDD,仅在当前分区中执行我想要的操作,从而返回比所需结果更大的结果。
but when I mapPartitions()
'ed data
RDD with prepare()
, it seem to do what I want only in the current partition, thus return a bigger result than the desired.
例如,如果开始时的第一个记录位于第一个分区中,第二,第二,我ld结果如下:
For example, if the 1st record in the start was in the 1st partition and the 2nd in the 2nd, then I would get as a result:
(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])
如何修改我的 prepare()
以获得所需的效果?或者,如何处理 prepare()
产生的结果,以便获得所需的结果?
How to modify my prepare()
to get the desired effect? Alternatively, how to process the result that prepare()
produces, so that I can get the desired result?
正如您可能已经从代码中注意到的那样,我根本不关心速度。
As you may already have noticed from the code, I do not care about speed at all.
这里是一种方法创建数据:
Here is a way to create the data:
data = []
from random import randint
for i in xrange(0, 10):
data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)
推荐答案
您可以使用一些基本的pyspark转换来实现。
You can use a bunch of basic pyspark transformations to achieve this.
>>> rdd = sc.parallelize([(1424411938, [3885, 7898]),(3333333333, [3885, 7898])])
>>> r = rdd.flatMap(lambda x: ((a,x[0]) for a in x[1]))
我们使用了 flatMap
为 x [1]
中的每个项目都有一个键值对然后将数据行格式更改为(a,x [0])
,这里的 a
x [1]
。要更好地理解 flatMap
,可以查看文档。
We used flatMap
to have a key, value pair for every item in x[1]
and we changed the data line format to (a, x[0])
, the a
here is every item in x[1]
. To understand flatMap
better you can look to the documentation.
>>> r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])))
我们只是将所有键,值对按键进行分组,并使用元组函数将可迭代项转换为元组。
We just grouped all key, value pairs by their keys and used tuple function to convert iterable to tuple.
>>> r2.collect()
[(3885, (1424411938, 3333333333)), (7898, (1424411938, 3333333333))]
如您所说,您可以使用[:150]来拥有前150个元素,我想这应该是正确的用法:
As you said you can use [:150] to have first 150 elements, I guess this would be proper usage:
r2 = r.groupByKey()。map(lambda x:(x [0],tuple(x [1])[:150]))
我试图尽可能地加以解释。希望对您有所帮助。
I tried to be as explanatory as possible. I hope this helps.
这篇关于通过Python使用Spark准备我的大数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!