PySpark评估 [英] PySpark Evaluation
问题描述
我正在尝试以下代码,该代码在RDD的每一行中添加一个数字,并使用PySpark返回RDD的列表.
I am trying the following code which adds a number to every row in an RDD and returns a list of RDDs using PySpark.
from pyspark.context import SparkContext
file = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file)
splits = [data.map(lambda p : int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()
输入文件(sample.txt)中的内容为:
The content in the input file (sample.txt) is:
1
2
3
我期待这样的输出(将rdd中的数字分别添加为0、1、2):
I was expecting an output like this (adding the numbers in the rdd with 0, 1, 2 respectively):
[1,2,3]
[2,3,4]
[3,4,5]
实际输出为:
[4, 5, 6]
[4, 5, 6]
[4, 5, 6]
这意味着对于 range(4),该理解仅对变量i使用了值3.
which means that the comprehension used only the value 3 for variable i, irrespective of the range(4).
为什么会发生这种现象?
Why does this behavior happen ?
推荐答案
它是由于Python后期绑定而发生的,而不是(Py)Spark特定的.使用lambda p : int(p) + i
时(而不是定义时)将查找i
.通常,它是指何时调用它,但在此特定情况下,它是序列化发送给工作人员的时间.
It happens because of Python late binding and is not (Py)Spark specific. i
will be looked-up when lambda p : int(p) + i
is used, not when it is defined. Typically it means when it is called but in this particular context it is when it is serialized to be send to the workers.
例如,您可以执行以下操作:
You can do for example something like this:
def f(i):
def _f(x):
try:
return int(x) + i
except:
pass
return _f
data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
这篇关于PySpark评估的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!