分区pyspark RDD矩阵 [英] partition a matrix RDD in pyspark

查看:698
本文介绍了分区pyspark RDD矩阵的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在一个转换矩阵numpy的成RDD与分区大小10。

I'm converting a numpy matrix into an RDD with partition size 10.

from pyspark import SparkContext, SparkConf
sc = SparkContext("local", "Simple App")        
x = np.matrix(np.random.normal(mu, sigma, 10000), dtype=np.float)
x.shape=(100,100)
rdd = sc.parallelize(x, 10)

在这里,在RDD每一行是一个矩阵对象,我可以使用 rdd.collect访问它()[ROW_NUM] [0] 如何能I组十行到每个分区?。早些时候,我试图用一个<一个href=\"http://stackoverflow.com/questions/36737566/repartition-a-dense-matrix-in-pyspark/36738279#36738279\">Dense矩阵,我不能得到正确的结果。

Here each row in RDD is a matrix object, and I can access it using rdd.collect()[row_num][0]. How can I group ten rows into each partition?. Earlier I tried using a Dense matrix, I couldn't get proper results.

推荐答案

我终于自己写的。 :)。我知道这是效率不高,但它解决了这个问题,并可能帮助别人,一个更好的答案被张贴之前。

I finally wrote it myself. :). I know it's not efficient, but it solves the problem and might help someone, before a better answer is posted.

def group_rows(rdd):
    rdd_collect = rdd.collect()
    count = 0
    key_count = 0
    result = {"0":[],"1":[],"2":[],"3":[],"4":[],"5":[],"6":[],"7":[],"8":[],"9":[]}
    for i in range(100):
        if count < 10:
            temp_value = result[str(key_count)]
            temp_value.append(rdd_collect[i][0])
            result[str(key_count)] = temp_value
            count += 1
        else:
            count = 0
            key_count += 1
    return result

result = group_rows(e3.rdd)
temp = e3.sc.parallelize(result.items())
sorted(temp.groupByKey().collect())

这篇关于分区pyspark RDD矩阵的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆