pyspark中的累加器,将dict作为全局变量 [英] accumulator in pyspark with dict as global variable

查看:820
本文介绍了pyspark中的累加器,将dict作为全局变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

出于学习目的,我试图将字典设置为累加器中的全局变量,add函数运行良好,但是我运行了代码并将字典放入map函数中,它始终返回空。

Just for learning purpose, I tried to set a dictionary as a global variable in accumulator the add function works well, but I ran the code and put dictionary in the map function, it always return empty.

但是将列表设置为全局变量的类似代码

But similar code for setting list as a global variable

class DictParam(AccumulatorParam):
    def zero(self,  value = ""):
        return dict()

    def addInPlace(self, acc1, acc2):
        acc1.update(acc2)


if  __name__== "__main__":
    sc, sqlContext = init_spark("generate_score_summary", 40)
    rdd = sc.textFile('input')
    #print(rdd.take(5))



    dict1 = sc.accumulator({}, DictParam())


    def file_read(line):
        global dict1
        ls = re.split(',', line)
        dict1+={ls[0]:ls[1]}
        return line


    rdd = rdd.map(lambda x: file_read(x)).cache()
    print(dict1)


推荐答案

我相信 print(dict1())只是在执行之前 rdd.map()可以。

I believe that print(dict1()) simply gets executed before the rdd.map() does.

在Spark中,有两种类型的操作

In Spark, there are 2 types of operations:


  • 转化,描述 未来的计算

  • 和行动,呼吁采取行动,实际上触发执行

  • transformations, that describe the future computation
  • and actions, that call for action, and actually trigger the execution

仅当已执行某些操作


累加器不会更改Spark的惰性评估模型。如果在一个RDD上的某个操作中正在更新它们
,则仅在将RDD作为操作的一部分进行计算后才更新
的值。

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action.

如果您查看文档本节的结尾,则有一个与您完全一样的示例:

If you check out the end of this section of the docs, there is an example exactly like yours:

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.

因此,您需要添加一些操作,例如:

So you would need to add some action, for instance:

rdd = rdd.map(lambda x: file_read(x)).cache() # transformation
foo = rdd.count() # action
print(dict1)

请确保检查各种RDD功能和累加器特性的详细信息因为这可能会影响结果的正确性。 (例如, rdd.take(n)默认为仅扫描一个分区,而不是整个数据集。)

Please make sure to check on the details of various RDD functions and accumulator peculiarities because this might affect the correctness of your result. (For instance, rdd.take(n) will by default only scan one partition, not the entire dataset.)

这篇关于pyspark中的累加器,将dict作为全局变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆