无法访问pyspark中的管道Rdd [英] Can not access Pipelined Rdd in pyspark

查看:156
本文介绍了无法访问pyspark中的管道Rdd的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用pyspark从零开始实现K-means.我正在rdd上执行各种操作,但是当我尝试显示最终处理的rdd的结果时,出现诸如无法迭代管道式RDD的错误"之类的错误或类似的错误,由于以下原因,.collect()之类的操作不再起作用 固定的rdd问题.

I am trying to implement K-means from scratch using pyspark. I am performing various operations on rdd's but when i try to display the result of the final processed rdd, some error like "Pipelined RDD's cant be iterated" or something like that and things like .collect() do not work again because of the piplined rdd issue.

from __future__ import print_function
import sys
import numpy as np
def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

data=SC.parallelize([1, 2, 3,5,7,3,5,7,3,6,4,66,33,66,22,55,77])

K = 3
convergeDist = float(0.1)

kPoints = data.takeSample(False, K, 1)
tempDist = 1.0

while tempDist > convergeDist:
    closest = data.map(
        lambda p: (closestPoint(p, kPoints), (p, 1)))



    pointStats = closest.reduceByKey(
        lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))

    newPoints = pointStats.map(
        lambda st: (st[0], st[1][0] / st[1][1]))
    print(newPoints)


    tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints).collect()

       # tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)




    for (iK, p) in newPoints:
        kPoints[iK] = p

print("Final centers: " + str(kPoints))

我得到的错误是:

TypeError:"PipelinedRDD"对象不可迭代

TypeError: 'PipelinedRDD' object is not iterable

推荐答案

您不能在RDD上进行迭代,您需要首先调用一个操作以将数据返回给驱动程序.快速样本:

You cannot iterate over an RDD, you need first to call an action to get your data back to the driver. Quick sample:

>>> test = sc.parallelize([1,2,3])
>>> for i in test:
...    print i
... 
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'RDD' object is not iterable

这将不起作用,因为测试是RDD.另一方面,如果您通过操作将数据带回驱动程序,那么它将成为您可以对其进行迭代的对象,例如:

This will not work because test is an RDD. On the other hand, if you bring your data back to the Driver with an action, now it will be an object over which you can iterate, for example:

>>> for i in test.collect():
...    print i
1                                                                               
2
3

您可以进行操作,然后将数据带回驱动程序,请注意不要有太多数据,否则您可能会遇到内存不足的异常情况

There you go, call an action and bring the data back to the Driver, being careful of not having too much data or you can get an out of memory exception

这篇关于无法访问pyspark中的管道Rdd的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆