pyspark矩阵累加器 [英] pyspark matrix accumulator
问题描述
I want to additively populate a matrix with values inferred from an rdd
using a pyspark accumulator; I found the docs a bit unclear. Adding a bit of background, just in case its relevant.
My rddData
contains lists of indexes for which one count has to be added to the matrix. For example this list maps to indices:
[1,3,4] -> (11), (13), (14), (33), (34), (44)
现在,这是我的蓄电池:
Now, here is my accumulator:
from pyspark.accumulators import AccumulatorParam
class MatrixAccumulatorParam(AccumulatorParam):
def zero(self, mInitial):
import numpy as np
aaZeros = np.zeros(mInitial.shape)
return aaZeros
def addInPlace(self, mAdd, lIndex):
mAdd[lIndex[0], lIndex[1]] += 1
return mAdd
这是我的映射器函数:
def populate_sparse(lIndices):
for i1 in lIndices:
for i2 in lIndices:
oAccumilatorMatrix.add([i1, i2])
然后运行数据:
oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam())
rddData.map(populate_sparse).collect()
现在,当我查看数据时:
Now, when I look at my data:
sum(sum(oAccumilatorMatrix.value))
#= 0.0
不应该这样.我想念什么?
Which it shouldn't be. What am I missing?
编辑 首先使用稀疏矩阵对此进行了尝试,得到了不支持稀疏矩阵的回溯.更改了稠密的numpy矩阵的问题:
EDIT Tried this with a sparse matrix at first, got this traceback that sparse matrices are not supported. Changed question for dense numpy matrix:
...
raise IndexError("Indexing with sparse matrices is not supported"
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes.
推荐答案
啊!我想我明白了.最终,累加器仍需要自己添加一些内容.因此,将addInPlace
更改为:
Aha! I think I got it. The accumulator, at the end of the day, still needs to add its own pieces to itself. So, change addInPlace
to:
def addInPlace(self, mAdd, lIndex):
if type(lIndex) == list:
mAdd[lIndex[0], lIndex[1]] += 1
else:
mAdd += lIndex
return mAdd
所以现在它在给定列表时添加索引,并在populate_sparse
函数循环之后添加自身以创建最终矩阵.
So now it adds indices when it is given a list, and adds itself after the populate_sparse
function loop to create my final matrix.
这篇关于pyspark矩阵累加器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!