pyspark:'PipelinedRDD' 对象不可迭代 [英] pyspark: 'PipelinedRDD' object is not iterable

查看:53
本文介绍了pyspark:'PipelinedRDD' 对象不可迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我收到此错误,但我不知道为什么.基本上我是从这段代码中出错的:

 a = data.mapPartitions(helper(locations))

其中 data 是一个 RDD,我的助手定义为:

 def helper(iterator, location):对于迭代器中的 x:c = 地点[x]产量 c

(位置只是一组数据点)我不明白问题是什么,但我也不是 pyspark 的佼佼者,所以有人可以告诉我为什么我得到的PipelinedRDD"对象不能从这段代码中迭代?

解决方案

RDD 可以使用 map 和 lambda 函数进行迭代.我已经使用以下方法迭代了流水线 RDD

lines1 = sc.textFile("\..\file1.csv")lines2 = sc.textFile("\..\file2.csv")pair1 = lines1.map(lambda s: (int(s), 'file1'))pair2 = lines2.map(lambda s: (int(s), 'file2'))pair_result =pairs1.union(pairs2)pair_result.reduceByKey(lambda a, b: a + ','+ b)结果 = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(',')))result_ll = [list(elem) for elem in result]

<块引用>

===> result_ll = [list(elem) for elem in result]

TypeError: 'PipelinedRDD' 对象不可迭代

相反,我使用 map 函数替换了迭代

result_ll = result.map( lambda elem: list(elem))

希望这有助于相应地修改您的代码

I am getting this error but i do not know why. Basically I am erroring from this code:

    a = data.mapPartitions(helper(locations))

where data is an RDD and my helper is defined as:

    def helper(iterator, locations): 
        for x in iterator:
            c = locations[x]
            yield c

(locations is just an array of data points) I do not see what the problem is but I am also not the best at pyspark so can someone please tell me why I am getting 'PipelinedRDD' object is not iterable from this code?

解决方案

RDD can iterated by using map and lambda functions. I have iterated through Pipelined RDD using the below method

lines1 = sc.textFile("\..\file1.csv")
lines2 = sc.textFile("\..\file2.csv")

pairs1 = lines1.map(lambda s: (int(s), 'file1'))
pairs2 = lines2.map(lambda s: (int(s), 'file2'))

pair_result = pairs1.union(pairs2)

pair_result.reduceByKey(lambda a, b: a + ','+ b)

result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(',')))
result_ll = [list(elem) for elem in result]

===> result_ll = [list(elem) for elem in result]

TypeError: 'PipelinedRDD' object is not iterable

Instead of this I replaced the iteration using map function

result_ll = result.map( lambda elem: list(elem))

Hope this helps to modify your code accordingly

这篇关于pyspark:'PipelinedRDD' 对象不可迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆