pyspark矩阵累加器 [英] pyspark matrix accumulator

查看:188
本文介绍了pyspark矩阵累加器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用

I want to additively populate a matrix with values inferred from an rdd using a pyspark accumulator; I found the docs a bit unclear. Adding a bit of background, just in case its relevant.
My rddData contains lists of indexes for which one count has to be added to the matrix. For example this list maps to indices:
[1,3,4] -> (11), (13), (14), (33), (34), (44)

现在,这是我的蓄电池:

Now, here is my accumulator:

from pyspark.accumulators import AccumulatorParam
class MatrixAccumulatorParam(AccumulatorParam):
    def zero(self, mInitial):
        import numpy as np
        aaZeros = np.zeros(mInitial.shape)
        return aaZeros

    def addInPlace(self, mAdd, lIndex):
        mAdd[lIndex[0], lIndex[1]] += 1
        return mAdd

这是我的映射器函数:

def populate_sparse(lIndices):
    for i1 in lIndices:
        for i2 in lIndices:
            oAccumilatorMatrix.add([i1, i2])

然后运行数据:

oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam())

rddData.map(populate_sparse).collect()

现在,当我查看数据时:

Now, when I look at my data:

sum(sum(oAccumilatorMatrix.value))
#= 0.0

不应该这样.我想念什么?

Which it shouldn't be. What am I missing?

编辑 首先使用稀疏矩阵对此进行了尝试,得到了不支持稀疏矩阵的回溯.更改了稠密的numpy矩阵的问题:

EDIT Tried this with a sparse matrix at first, got this traceback that sparse matrices are not supported. Changed question for dense numpy matrix:

...

    raise IndexError("Indexing with sparse matrices is not supported"
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes.

推荐答案

啊!我想我明白了.最终,累加器仍需要自己添加一些内容.因此,将addInPlace更改为:

Aha! I think I got it. The accumulator, at the end of the day, still needs to add its own pieces to itself. So, change addInPlace to:

def addInPlace(self, mAdd, lIndex):
    if type(lIndex) == list:
        mAdd[lIndex[0], lIndex[1]] += 1
    else:
        mAdd += lIndex
    return mAdd

所以现在它在给定列表时添加索引,并在populate_sparse函数循环之后添加自身以创建最终矩阵.

So now it adds indices when it is given a list, and adds itself after the populate_sparse function loop to create my final matrix.

这篇关于pyspark矩阵累加器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆