星火斯卡拉:如何用RDD每3个元素的工作? [英] Spark Scala: How to work with each 3 elements of rdd?

查看:95
本文介绍了星火斯卡拉:如何用RDD每3个元素的工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

大家。

我有这样的问题:

我有非常大的RDD:数十亿元素,例如:

 数组[((INT,INT),双)] =阵列(((0,0),729.0),((0,1),169.0),((0 ,2),1.0),((0,3),5.0),......((34,45),34.0),......)

我需要做这样的操作:

每个元素的取值键(I,J)并添加到它的

 分钟(rdd_value [(ⅰ-1,j)的],rdd_value [(I,J-1)],rdd_value [(ⅰ-1,J-1)])

我怎么能做到这一点,而无需使用收集()后收集()我有 Java内存errror 我的RDD是非常大的。

非常感谢你!

我试图实现从蟒蛇该算法。当时间序列RDDS。

 高清DTWDistance(S1,S2):
    DTW = {}    因为我在范围内(LEN(S1)):
        DTW [(I,-1)=浮动(INF)
    因为我在范围内(LEN(S2)):
        DTW [( - 1,I)=浮动(INF)
    DTW [( - 1,-1)] = 0    因为我在范围内(LEN(S1)):
        对于j的范围(LEN(S2)):
            DIST =(S1 [I] -S2 [J]。)** 2
            DTW [(I,J)= DIST +最小值(DTW [(ⅰ-1,j)的],DTW [(I,J-1)],DTW [(ⅰ-1,J-1)])    返回的sqrt(DTW [长度(S1)-1,LEN(S2)-1])

现在我应该完成最后一次操作的循环。下的dist已经计算。

示例:

输入(如矩阵):

  4 5 1
7 2 3
9 0 1

RDD看起来像

  rdd.take(10)阵列(((1,1),4),((1,2),5),((1,3),1),((2,1),7),((2,2),2- ),((2,3),3),((3,1),9),((3,2),0),((3,3),1))

我想这样做操作

  rdd_value [(I,J)] = rdd_value [(I,J)] +分钟(rdd_value [(ⅰ-1,j)的],rdd_value [(ⅰ,J = 1)],rdd_value [(ⅰ-1,J-1)])

例如:

 ((1,1),4)= 4 +最小值(无穷大,无穷大,0)= 4 + 0 = 4
4 5 1
7 2 3
9 0 1

然后

 ((1,2),5)= 5 +最小值(无穷大,4,无穷大)= 5 + 4 = 9
4 9 1
7 2 3
9 0 1

然后

...

然后

 ((2,2),2)= 2 +最小​​值(7,9,4)= 2 + 4 = 6
4 9 1
7 6 3
9 0 1

然后
.....

 ((3,3),1)= 1 +最小值(3,0,2)= 1 + 0 = 1


解决方案

一个简短的答案是,你试图解决不可的问题,高效,简洁的前$ P $使用星火pssed。它如果选择纯RDDS分布矩阵其实并不重要。

要了解为什么你必须想想星火编程模型。一个基本火花概念是依赖性的曲线图,其中每个RDD取决于一个或多个父RDDS。如果你的问题是定义如下:


  • 给定初始矩阵的 M 0

  • I< - 1..1

    • 找到矩阵的 M 的,其中的 M <子>(M,N) 的= M I - 1 <子>(M,N) +最小值(M I - 1 <子>(M-1,N ),男 I - 1 <子>(M-1,N-1),男 I - 1 <子>(M ,N-1))


那就用星火API是微不足道的前preSS(伪code 的):

  RDD
    .flatMap(拉姆达((I,J),ⅴ):
        [(第(i + 1,j)中,v)中,((I,J + 1),V),(第(i + 1,J + 1)中,v)])
    .reduceByKey(分钟)
    .union(RDD)
    .reduceByKey(添加)

不幸的是,你正试图在同一个数据结构中各个值之间的前preSS依赖。星火抛开它的一个问题是更难并行何况分发。

这类型的动态规划是很难在不同的点,因为并行是完全或几乎完全顺序。当您尝试计算,例如 M <子>(0,0) 的或 M <子>(M,N) 的有什么并行。这是很难分配,因为它可以产生块之间复杂的依赖关系。

有不平凡的方式通过计算这些块之间的各个块和前pressing依赖或使用迭代算法,并通过明确的图(GraphX​​)传播消息来处理这个火花,但这并不容易做到这一点正确的。

在这一天的最后,有工具,可以为这种类型的计算比星火更好的选择。

everyone.

I have such problem:

I have very big rdd: billions elements like:

Array[((Int, Int), Double)] = Array(((0,0),729.0), ((0,1),169.0), ((0,2),1.0), ((0,3),5.0), ...... ((34,45),34.0), .....)

I need to do such operation:

take value of each element by key (i,j) and add to it the

min(rdd_value[(i-1, j)],rdd_value[(i, j-1)], rdd_value[(i-1, j-1)])

How can I do this without using collect() as After collect() I have got Java memory errror as my rdd is very big.

Thank you very much!

I try to realize this algorithm from python. when time series are rdds.

def DTWDistance(s1, s2):
    DTW={}

    for i in range(len(s1)):
        DTW[(i, -1)] = float('inf')
    for i in range(len(s2)):
        DTW[(-1, i)] = float('inf')
    DTW[(-1, -1)] = 0

    for i in range(len(s1)):
        for j in range(len(s2)):
            dist= (s1[i]-s2[j])**2
            DTW[(i, j)] = dist + min(DTW[(i-1, j)],DTW[(i, j-1)], DTW[(i-1, j-1)])

    return sqrt(DTW[len(s1)-1, len(s2)-1])

And now I should perform last operation with for loop. The dist is already calculated.

Example:

Input (like matrix):

4 5 1
7 2 3
9 0 1

Rdd looks like

rdd.take(10)

Array(((1,1), 4), ((1,2), 5), ((1,3), 1), ((2,1), 7), ((2,2), 2), ((2,3), 3), ((3,1), 9), ((3,2), 0), ((3,3), 1))

I want to do this operation

rdd_value[(i, j)] = rdd_value[(i, j)] + min(rdd_value[(i-1, j)],rdd_value[(i, j-1)], rdd_value[(i-1, j-1)])

For example:

((1, 1), 4) = 4 + min(infinity, infinity, 0) = 4 + 0 = 4


4 5 1
7 2 3
9 0 1

Then

((1, 2), 5) = 5 + min(infinity, 4, infinity) = 5 + 4 = 9


4 9 1
7 2 3
9 0 1

Then

....

Then

((2, 2), 2) = 2 + min(7, 9, 4) = 2 + 4 = 6


4 9 1
7 6 3
9 0 1

Then .....

((3, 3), 1) = 1 + min(3, 0, 2) = 1 + 0 = 1

解决方案

A short answer is that the problem you try to solve cannot be efficiently and concisely expressed using Spark. It doesn't really matter if you choose plain RDDs are distributed matrices.

To understand why you'll have to think about the Spark programming model. A fundamental Spark concept is a graph of dependencies where each RDD depends on one or more parent RDDs. If your problem was defined as follows:

  • given an initial matrix M0
  • for i <- 1..n
    • find matrix Mi where Mi(m,n) = Mi - 1(m,n) + min(Mi - 1(m-1,n), Mi - 1(m-1,n-1), Mi - 1(m,n-1))

then it would be trivial to express using Spark API (pseudocode):

rdd
    .flatMap(lambda ((i, j), v): 
        [((i + 1, j), v), ((i, j + 1), v), ((i + 1, j + 1), v)])
    .reduceByKey(min)
    .union(rdd)
    .reduceByKey(add)

Unfortunately you are trying to express dependencies between individual values in the same data structure. Spark aside it a problem which is much harder to parallelize not to mention distribute.

This type of dynamic programming is hard to parallelize because at different points is completely or almost completely sequential. When you try to compute for example Mi(0,0) or Mi(m,n) there is nothing to parallelize. It is hard to distribute because it can generate complex dependencies between blocks.

There are non trivial ways to handle this in Spark by computing individual blocks and expressing dependencies between these blocks or using iterative algorithms and propagating messages over the explicit graph (GraphX) but this is far from easy to do it right.

At the end of the day there tools which can be much better choice for this type of computations than Spark.

这篇关于星火斯卡拉:如何用RDD每3个元素的工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆