为什么 pyspark 选择未广播的变量? [英] Why is pyspark picking up a variable that was not broadcast?

查看:36
本文介绍了为什么 pyspark 选择未广播的变量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 pyspark 来分析数据集,我有点惊讶为什么即使我使用的是广播的变量,以下代码仍能正常工作.>

有问题的变量是 video,它在函数 filter 中使用,在 join 之后.

seed = random.randint(0,999)# df 是一个数据框# 视频只是一个随机采样的元素视频 = df.sample(False,0.001,seed).head()# 只是一个python列表其他视频 = [ (22,0.32),(213,0.43)]# 将python列表转换为rddresultsRdd = sc.parallelize(similarVideos)rdd = df.rdd.map(lambda 行:(row.video_id,row.title))# 在 resultsRdd 和 rdd 之间执行连接# 注意 video.title 没有被广播(结果Rdd.join(rdd).filter(lambda pair: pair[1][1] != video.title) # 这里!!!.takeOrdered(10, key= lambda 对: -pair[1][0]))

我在独立模式下使用 pyspark,对 pyspark-submit 使用以下参数:

--num-executors 12 --executor-cores 4 --executor-memory 1g --master local[*]

此外,我正在 jupyter(新的 ipython-notebooks)上运行之前的代码.

解决方案

[Reposting comment as an answer.]

对于这个概念,我认为这个 link关于理解闭包 是一本很好的读物.本质上,您不需要广播 RDD 范围之外的所有变量,因为闭包(在您的情况下为 video)将被序列化并发送到每个执行程序和任务以在任务执行期间进行访问.当广播的数据集很大时,广播变量很有用,因为它将作为只读缓存存在,位于执行程序上,并且不会随着在该执行程序上运行的每个任务进行序列化/发送/反序列化.

I'm using pyspark to analyse a dataset and I'm a little bit surprised as to why the following code works correctly even though I'm using a variable that was not broadcast.

The variable in question is video, that's used in function filter, after the join.

seed = random.randint(0,999)

# df is a dataframe
# video is just one randomly sampled element
video = df.sample(False,0.001,seed).head()

# just a python list
otherVideos = [ (22,0.32),(213,0.43) ]

# transform the python list into an rdd 
resultsRdd = sc.parallelize(similarVideos)

rdd = df.rdd.map(lambda row: (row.video_id,row.title))

# perform a join between resultsRdd and rdd
# note that video.title was NOT broadcast
(resultsRdd
   .join(rdd)
   .filter(lambda pair: pair[1][1] != video.title) # HERE!!!
   .takeOrdered(10, key= lambda pair: -pair[1][0]))

I'm using pyspark in standalone mode, with the following arguments to pyspark-submit:

--num-executors 12 --executor-cores 4 --executor-memory 1g --master local[*]

Also, I'm running the previous code on jupyter (new ipython-notebooks).

解决方案

[Reposting comment as an answer.]

For this concept, I think this link on understanding closures is a pretty good read. Essentially, you do not need to broadcast all variables outside the scope of an RDD since the closure (in your case video) will be serialized and sent to each executor and task for access during task execution. Broadcast variables are useful when the dataset being broadcast is large because it will exist as a read-only cache that will sit on the executor and not be serialized/sent/deserialized with each task run on that executor.

这篇关于为什么 pyspark 选择未广播的变量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆