PySpark 评估 [英] PySpark Evaluation
问题描述
我正在尝试以下代码,该代码向 RDD 中的每一行添加一个数字,并使用 PySpark 返回 RDD 列表.
I am trying the following code which adds a number to every row in an RDD and returns a list of RDDs using PySpark.
from pyspark.context import SparkContext
file = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file)
splits = [data.map(lambda p : int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()
输入文件(sample.txt)中的内容为:
The content in the input file (sample.txt) is:
1
2
3
我期待这样的输出(将 rdd 中的数字分别添加 0、1、2):
I was expecting an output like this (adding the numbers in the rdd with 0, 1, 2 respectively):
[1,2,3]
[2,3,4]
[3,4,5]
而实际输出是:
[4, 5, 6]
[4, 5, 6]
[4, 5, 6]
这意味着推导式仅使用变量 i 的值 3,而与 range(4) 无关.
which means that the comprehension used only the value 3 for variable i, irrespective of the range(4).
为什么会发生这种行为?
Why does this behavior happen ?
推荐答案
发生这种情况是因为 Python 后期绑定而不是 (Py)Spark 特定的.i
将在使用 lambda p : int(p) + i
时查找,而不是在定义时查找.通常它意味着它被调用的时间,但在这个特定的上下文中,它是指它被序列化以发送给工作人员的时间.
It happens because of Python late binding and is not (Py)Spark specific. i
will be looked-up when lambda p : int(p) + i
is used, not when it is defined. Typically it means when it is called but in this particular context it is when it is serialized to be send to the workers.
例如,您可以执行以下操作:
You can do for example something like this:
def f(i):
def _f(x):
try:
return int(x) + i
except:
pass
return _f
data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
这篇关于PySpark 评估的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!