通过Python使用Spark准备我的大数据 [英] Prepare my bigdata with Spark via Python

查看:79
本文介绍了通过Python使用Spark准备我的大数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的100m大小,量化数据:

My 100m in size, quantized data:

(1424411938', [3885, 7898])
(3333333333', [3885, 7898])

所需结果:

(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])

所以我想要的是转换数据,以便我将3885(例如)与所有 data [0]组合在一起)。这是我在 python 的问题中所做的事情:

So what I want, is to transform the data so that I group 3885 (for example) with all the data[0] that have it). Here is what I did in python:

def prepare(data):
    result = []
    for point_id, cluster in data:
        for index, c in enumerate(cluster):
            found = 0
            for res in result:
                if c == res[0]:
                    found = 1
            if(found == 0):
                result.append((c, []))
            for res in result:
                if c == res[0]:
                    res[1].append(point_id)
    return result

但是当我 mapPartitions()使用 prepare()编辑了 data RDD,仅在当前分区中执行我想要的操作,从而返回比所需结果更大的结果。

but when I mapPartitions()'ed data RDD with prepare(), it seem to do what I want only in the current partition, thus return a bigger result than the desired.

例如,如果开始时的第一个记录位于第一个分区中,第二,第二,我ld结果如下:

For example, if the 1st record in the start was in the 1st partition and the 2nd in the 2nd, then I would get as a result:

(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])

如何修改我的 prepare()以获得所需的效果?或者,如何处理 prepare()产生的结果,以便获得所需的结果?

How to modify my prepare() to get the desired effect? Alternatively, how to process the result that prepare() produces, so that I can get the desired result?

正如您可能已经从代码中注意到的那样,我根本不关心速度。

As you may already have noticed from the code, I do not care about speed at all.

这里是一种方法创建数据:

Here is a way to create the data:

data = []
from random import randint
for i in xrange(0, 10):
    data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)


推荐答案

您可以使用一些基本的pyspark转换来实现。

You can use a bunch of basic pyspark transformations to achieve this.

>>> rdd = sc.parallelize([(1424411938, [3885, 7898]),(3333333333, [3885, 7898])])
>>> r = rdd.flatMap(lambda x: ((a,x[0]) for a in x[1]))

我们使用了 flatMap x [1] 中的每个项目都有一个键值对然后将数据行格式更改为(a,x [0]),这里的 a x [1] 。要更好地理解 flatMap ,可以查看文档。

We used flatMap to have a key, value pair for every item in x[1] and we changed the data line format to (a, x[0]), the a here is every item in x[1]. To understand flatMap better you can look to the documentation.

>>> r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])))

我们只是将所有键,值对按键进行分组,并使用元组函数将可迭代项转换为元组。

We just grouped all key, value pairs by their keys and used tuple function to convert iterable to tuple.

>>> r2.collect()
[(3885, (1424411938, 3333333333)), (7898, (1424411938, 3333333333))]

如您所说,您可以使用[:150]来拥有前150个元素,我想这应该是正确的用法:

As you said you can use [:150] to have first 150 elements, I guess this would be proper usage:

r2 = r.groupByKey()。map(lambda x:(x [0],tuple(x [1])[:150]))

我试图尽可能地加以解释。希望对您有所帮助。

I tried to be as explanatory as possible. I hope this helps.

这篇关于通过Python使用Spark准备我的大数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆