PySpark 评估 [英] PySpark Evaluation

查看:19
本文介绍了PySpark 评估的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以下代码,该代码向 RDD 中的每一行添加一个数字,并使用 PySpark 返回 RDD 列表.

I am trying the following code which adds a number to every row in an RDD and returns a list of RDDs using PySpark.

from pyspark.context import SparkContext
file  = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file) 
splits = [data.map(lambda p :  int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()

输入文件(sample.txt)中的内容为:

The content in the input file (sample.txt) is:

1
2
3

我期待这样的输出(将 rdd 中的数字分别添加 0、1、2):

I was expecting an output like this (adding the numbers in the rdd with 0, 1, 2 respectively):

[1,2,3]
[2,3,4]
[3,4,5]

而实际输出是:

[4, 5, 6]
[4, 5, 6]
[4, 5, 6]

这意味着推导式仅使用变量 i 的值 3,而与 range(4) 无关.

which means that the comprehension used only the value 3 for variable i, irrespective of the range(4).

为什么会发生这种行为?

Why does this behavior happen ?

推荐答案

发生这种情况是因为 Python 后期绑定而不是 (Py)Spark 特定的.i 将在使用 lambda p : int(p) + i 时查找,而不是在定义时查找.通常它意味着它被调用的时间,但在这个特定的上下文中,它是指它被序列化以发送给工作人员的时间.

It happens because of Python late binding and is not (Py)Spark specific. i will be looked-up when lambda p : int(p) + i is used, not when it is defined. Typically it means when it is called but in this particular context it is when it is serialized to be send to the workers.

例如,您可以执行以下操作:

You can do for example something like this:

def f(i):
    def _f(x):
        try:
            return int(x) + i
        except:
            pass
    return _f

data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]

这篇关于PySpark 评估的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆