PySpark评估 [英] PySpark Evaluation

查看:71
本文介绍了PySpark评估的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以下代码,该代码在RDD的每一行中添加一个数字,并使用PySpark返回RDD的列表.

I am trying the following code which adds a number to every row in an RDD and returns a list of RDDs using PySpark.

from pyspark.context import SparkContext
file  = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file) 
splits = [data.map(lambda p :  int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()

输入文件(sample.txt)中的内容为:

The content in the input file (sample.txt) is:

1
2
3

我期待这样的输出(将rdd中的数字分别添加为0、1、2):

I was expecting an output like this (adding the numbers in the rdd with 0, 1, 2 respectively):

[1,2,3]
[2,3,4]
[3,4,5]

实际输出为:

[4, 5, 6]
[4, 5, 6]
[4, 5, 6]

这意味着对于 range(4),该理解仅对变量i使用了值3.

which means that the comprehension used only the value 3 for variable i, irrespective of the range(4).

为什么会发生这种现象?

Why does this behavior happen ?

推荐答案

它是由于Python后期绑定而发生的,而不是(Py)Spark特定的.使用lambda p : int(p) + i时(而不是定义时)将查找i.通常,它是指何时调用它,但在此特定情况下,它是序列化发送给工作人员的时间.

It happens because of Python late binding and is not (Py)Spark specific. i will be looked-up when lambda p : int(p) + i is used, not when it is defined. Typically it means when it is called but in this particular context it is when it is serialized to be send to the workers.

例如,您可以执行以下操作:

You can do for example something like this:

def f(i):
    def _f(x):
        try:
            return int(x) + i
        except:
            pass
    return _f

data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]

这篇关于PySpark评估的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆