使用PySpark从地图创建全局列表的问题 [英] Issue with creating a global list from map using PySpark

查看:110
本文介绍了使用PySpark从地图创建全局列表的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这个代码,我正在使用 pyspark ipython 中读取文件.我想做的是在其中添加一个片段,该片段基于从文件中读取的特定列形成一个列表,但是当我尝试执行该列表时,该列表显示为空,并且没有附加任何内容.我的代码是:

I have this code where I am reading a file in ipython using pyspark. What I am trying to do is to add a piece to it which forms a list based on a particular column read from the file but when I try to execute it then the list comes out to be empty and nothing gets appended to it. My code is:

list1 = []

def file_read(line):

    list1.append(line[10])
    # bunch of other code which process other column indexes on `line`

inputData = sc.textFile(fileName).zipWithIndex().filter(lambda (line,rownum): rownum>0).map(lambda (line, rownum): line)

column_val = (inputData
    .map(lambda line: line.split(","))
    .filter(lambda line: len(line) >1 )
    .map(file_read))

当我执行这部分代码时,即使在 line [10] 中有数据, list1 仍然为空,因为我正在其他部分使用它上面相同功能的代码.似乎只是没有将其添加到列表中.如何形成上面的列表?

WHen I execute this part of code the list1 still comes to be empty even though there's data in line[10] as I am using it in other parts of the code in the same function above. It seems as if it is just not appending it to the list. How can I form the list above?

推荐答案

好吧,它实际上确实附加在 list1 上,问题不在于您正在考虑的那个.闭包中引用的每个变量都被序列化并发送给工作程序.它也适用于 list1 .

Well, it actually does append to the list1, problem is not to the one you're thinking about. Every variable referenced in the closures are serialized and send to the workers. It applies to list1 as well.

每个分区都接收到它自己的 list1 副本,当调用 file_read 时,会将数据附加到此副本中,并且当给定的映射阶段完成时,它就会消失范围,并被丢弃.

Every partition receives it's own copy of the list1, when file_read is called data is appended to this copy, and when a given map phase is finished it goes out of scope and is discarded.

这段代码不是特别优雅,但是您应该看到它确实是这里发生的事情:

Not particularly elegant piece of code but you should see that it is really what is happening here:

rdd = sc.parallelize(range(100), 5)

line1 = []

def file_read(line):
    list1.append(line)
    print len(list1)
    return line

xs = rdd.map(file_read).collect()

修改

Spark提供两种类型的共享变量.广播变量,仅从工作人员角度读取,以及累加器,它们仅从驾驶员角度.

Spark provides two types of shared variable. Broadcast variables, which are read only from the worker perspective, and accumulators which are write only from the driver perspective.

默认情况下,累加器仅支持数字变量,并且主要用作计数器.尽管可以定义自定义累加器.为此,您必须扩展 AccumulatorParam 类,并提供自定义 <代码>零 addInPlace 实现:

By default accumulators support only numeric variables and are intended to be used mostly as counters. It is possible to define custom accumulators though. To do that you have to extend AccumulatorParam class and provide custom zero and addInPlace implementations:

class ListParam(AccumulatorParam):
    def zero(self, v):
        return []
    def addInPlace(self, acc1, acc2):
        acc1.extend(acc2)
        return acc1

接下来,您可以如下重新定义 file_read :

Next you can redefine file_read as follows:

def file_read1(line):
    global list1 # Required otherwise the next line will fail
    list1 += [line]
    return line

示例用法:

list1 = sc.accumulator([], ListParam())

rdd = sc.parallelize(range(10)).map(file_read1).collect()
list1.value

即使可以使用这种累加器,在实践中使用它可能也很昂贵,在最坏的情况下,它可能会使驱动程序崩溃.相反,您可以简单地使用另一种转换:

Even if it is possible to use accumulator like this it is probably to expensive to be used in practice and in the worst case scenario it can crash the driver. Instead you can simply use another transformation:

tmp = (inputData
    .map(lambda line: line.split(","))
    .filter(lambda line: len(line) >1 ))

def line_read2(line): return ... # Just a core logic

line1 = tmp.map(lambda line: line[10])
column_val = tmp.map(line_read2)

旁注:

您提供的代码无法执行任何操作.Spark中的转换只是对必须完成的操作的描述,但是在您调用动作数据之前,不会真正执行任何操作.

Code you've provided doesn't do anything. Transformations in Spark are just the descriptions of what has to be done, but until you call an action data nothing is really executed.

这篇关于使用PySpark从地图创建全局列表的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆