在顺序RDD处理功能的方法[阿帕奇星火] [英] Functional approach in sequential RDD processing [Apache Spark]
问题描述
我有一个RDD,连接到HBase的表。每行(重点)再presents GPS位置。现在,我已经写了一个函数来计算两个点之间的距离。该函数应与当前行被称为及其predecessor [I-1]
I have a RDD, connected to an HBase table. Each row(key) represents a GPS location. Now I've written a function to calculate the distance between two points. The function should be called with the current row and its predecessor [i-1]
现在我挣扎得到这与RDD功能的功能性的方式完成,这样我可以并行的。
Now I'm struggling to get this done in a functional way with RDD functions so that I can parallelize it.
我的快速和肮脏的做法是,先创建一个数组
My quick and dirty approach is to first create an array
val rows = rdd.collect()
val rowCount = rdd.count() - 1 //since the first row has no distance
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int])
var i = 0 //can be better solved in scala, I know ;)
rows.foreach(row => {
if (predecssorPoint == null) {
predecssorPoint = getPointByRow(row._2)
}
else {
currentPoint = getPointByRow(row._2)
rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint)
i += 1
predecssorPoint = currentPoint
}
})
return rowArray
然后,我将并行阵列和计算距离
Then I'll parallelize the array and calculate the distance
//create a parallel-enabled data set
val parallelDataSet = sc.parallelize(rows)
parallelDataSet.foreach(row => {
Functions.logDistance(row)
})
这工作,但它的丑陋和低效的肯定。
That works but it's ugly and surely inefficient.
我的想法知道是使用rdd.reduce()摆脱foreach循环,如果距离函数搞定的顺序(A + B)不保证该问题这可能工作。
My idea know was to use rdd.reduce() to get rid of the foreach loop and this might work if the distance function handle the issue that the ordering of (a+b) is not guaranteed.
不管怎样,有没有更好的解决办法?我的理解是,有没有可能与RDDS工作时,有一个(高效)的索引访问。
Anyways, is there a better solution? My understanding is that there is no possibility to have an (efficient) index-access when working with RDDs.
感谢。
推荐答案
由于订货在这里是关键,继续一个好办法可能是先索引RDD。然后,使用索引我们可以模拟一个拉链和具有分配集群在元组。事情是这样的:
Given that ordering is key here, a good way to proceed could be to first index the RDD. Then, using the index we can simulate a zip and have the tuples partitioned over the cluster. Something like this:
val indexed = rdd.zipWithIndex.map(_.swap) //
val shifted = indexed.map{case (k,v) => (k-1,v)}
val joined = indexed.join(shifted)
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)}
(*),例如code - 没有测试
(*) example code - not tested
这篇关于在顺序RDD处理功能的方法[阿帕奇星火]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!