Apache Spark 的 RDD[Vector] 不变性问题 [英] Apache Spark's RDD[Vector] Immutability issue

查看:21
本文介绍了Apache Spark 的 RDD[Vector] 不变性问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道 RDD 是不可变的,因此它们的值不能改变,但我看到以下行为:

I know the RDDs are immutable and therefore their value cannot be changed but I see the following behaviour:

我为 FuzzyCMeans (https://github.com/salexln/FinalProject_FCM) 算法编写了一个实现现在我正在测试它,所以我运行以下示例:

I wrote an implementation for FuzzyCMeans (https://github.com/salexln/FinalProject_FCM) algorithm and now I'm testing it, so I run the following example:

import org.apache.spark.mllib.clustering.FuzzyCMeans
import org.apache.spark.mllib.linalg.Vectors

val data = sc.textFile("/home/development/myPrjects/R/butterfly/butterfly.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
> parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[2] at map at <console>:31

val numClusters = 2
val numIterations = 20


parsedData.foreach{ point => println(point) }
> [0.0,-8.0]
[-3.0,-2.0]
[-3.0,0.0]
[-3.0,2.0]
[-2.0,-1.0]
[-2.0,0.0]
[-2.0,1.0]
[-1.0,0.0]
[0.0,0.0]
[1.0,0.0]
[2.0,-1.0]
[2.0,0.0]
[2.0,1.0]
[3.0,-2.0]
[3.0,0.0]
[3.0,2.0]
[0.0,8.0] 

val clusters = FuzzyCMeans.train(parsedData, numClusters, numIteration
parsedData.foreach{ point => println(point) }
> 
[0.0,-0.4803333185624595]
[-0.1811743096972924,-0.12078287313152826]
[-0.06638890786148487,0.0]
[-0.04005925925925929,0.02670617283950619]
[-0.12193263222069807,-0.060966316110349035]
[-0.0512,0.0]
[NaN,NaN]
[-0.049382716049382706,0.0]
[NaN,NaN]
[0.006830134553650707,0.0]
[0.05120000000000002,-0.02560000000000001]
[0.04755220304297078,0.0]
[0.06581619798335057,0.03290809899167529]
[0.12010867103812725,-0.0800724473587515]
[0.10946638900458144,0.0]
[0.14814814814814817,0.09876543209876545]
[0.0,0.49119985188436205] 

但这怎么会是我的方法改变了不可变的 RDD?

But how can this be that my method changes the Immutable RDD?

顺便说一句,train方法的签名如下:

BTW, the signature of the train method, is the following:

train(data: RDD[Vector],clusters: Int, maxIterations: Int)

train( data: RDD[Vector], clusters: Int, maxIterations: Int)

推荐答案

文档:

打印 RDD 的元素

另一个常见的习惯用法是尝试打印出 RDD 的元素使用 rdd.foreach(println) 或 rdd.map(println).在单机上,这将生成预期的输出并打印所有 RDD元素.但是,在集群模式下,调用 stdout 的输出由执行者现在写入执行者的标准输出,而不是驱动程序上的那个,所以驱动程序上的标准输出不会显示这些!到打印驱动程序上的所有元素,可以使用 collect() 方法首先将RDD带到驱动程序节点,因此:rdd.collect().foreach(println).这可能会导致驱动程序耗尽但是,因为 collect() 将整个 RDD 提取到一个单机;如果你只需要打印 RDD 的几个元素,一个更安全的方法是使用 take(): rdd.take(100).foreach(println).

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

因此,由于数据可以在节点之间迁移,因此不能保证 foreach 的输出相同.RDD 是不可变的,但您应该以适当的方式提取数据,因为您的节点上没有整个 RDD.

So, as data can migrate between nodes, the same output of foreach is not guaranteed. RDD is immutable, but you should extract data in appropriate way, as you don't have the whole RDD at your node.

另一个可能的问题(不是在您的情况下,因为您使用的是不可变向量)是在 Point 自身内部使用可变数据,这是完全不正确的,因此您将失去所有保证 - RDD 本身仍然是不可变的.

Another possible issue (not in your case as you're using an immutable vector) is using mutable data inside Point iself, which is completely incorrect, so you'd lose all guarantees - the RDD itself is still gonna be immutable however.

这篇关于Apache Spark 的 RDD[Vector] 不变性问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆