访问Spark RDD时在闭包中使用局部变量 [英] Usage of local variables in closures when accessing Spark RDDs
问题描述
我对访问Spark RDD时在闭包中使用局部变量有疑问.我要解决的问题如下:
I have a question regarding the usage of local variables in closures when accessing Spark RDDs. The problem I would like to solve looks as follows:
我有一个应读入RDD的文本文件列表. 但是,首先,我需要向从单个文本文件创建的RDD中添加其他信息.此附加信息是从文件名中提取的.然后,使用union()将RDD放入一个大的RDD中.
I have a list of textfiles that should be read into an RDD. However, first I need to add additional information to an RDD that is created from a single textfile. This additional information is extracted from the filename. Then, the RDDs are put into one big RDD using union().
from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)
list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
tmp_rdd = spark_context.textFile(filename)
# extract_file_info('file_from_Owner.txt') == 'Owner'
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work:
# The result is that always Bert will be the owner, i.e., never Ernie.
问题在于循环内的map()函数未引用正确的" file_owner.相反,它将引用file_owner的最新值.在我的本地计算机上,我设法通过为每个单个RDD调用cache()函数来解决此问题:
The problem is that the map() function within the loop does not refer to the "correct" file_owner. Instead, it will refer to the latest value of file_owner. On my local machine, I managed to fix the problem by calling the cache() function for each single RDD:
# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..
我的问题:使用cache()是解决我的问题的正确解决方案吗?有其他选择吗?
非常感谢!
推荐答案
因此,您正在执行的cache()方法不一定会在100%的时间内都有效,只要没有节点发生故障且无需分区即可重新计算.一个简单的解决方案是制作一个将捕获" file_owner值的函数.这是一个潜在解决方案的pyspark shell中的简短示例:
So the cache() method that you are doing won't necessarily work 100% of the time, it works provided that no nodes fail and no partitions need to be recomputed. A simple solution would be to make a function that will "capture" the value of file_owner. Here is a quick little illustration in the pyspark shell of a potential solution:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0-SNAPSHOT
/_/
Using Python version 2.7.6 (default, Mar 22 2014 22:59:56)
SparkContext available as sc.
>>> hi = "hi"
>>> sc.parallelize(["panda"])
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:365
>>> r = sc.parallelize(["panda"])
>>> meeps = r.map(lambda x : x + hi)
>>> hi = "by"
>>> meeps.collect()
['pandaby']
>>> hi = "hi"
>>> def makeGreetFunction(param):
... return (lambda x: x + param)
...
>>> f = makeGreetFunction(hi)
>>> hi="by"
>>> meeps = r.map(f)
>>> meeps.collect()
['pandahi']
>>>
这篇关于访问Spark RDD时在闭包中使用局部变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!