在顺序RDD处理功能的方法[阿帕奇星火] [英] Functional approach in sequential RDD processing [Apache Spark]

查看:157
本文介绍了在顺序RDD处理功能的方法[阿帕奇星火]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个RDD,连接到HBase的表。每行(重点)再presents GPS位置。现在,我已经写了一个函数来计算两个点之间的距离。该函数应与当前行被称为及其predecessor ​​[I-1]

I have a RDD, connected to an HBase table. Each row(key) represents a GPS location. Now I've written a function to calculate the distance between two points. The function should be called with the current row and its predecessor [i-1]

现在我挣扎得到这与RDD功能的功能性的方式完成,这样我可以并行的。

Now I'm struggling to get this done in a functional way with RDD functions so that I can parallelize it.

我的快速和肮脏的做法是,先创建一个数组

My quick and dirty approach is to first create an array

val rows = rdd.collect()
val rowCount = rdd.count() - 1 //since the first row has no distance
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int])
var i = 0 //can be better solved in scala, I know ;)

rows.foreach(row => {
  if (predecssorPoint == null) {
    predecssorPoint = getPointByRow(row._2)
  }
  else {
    currentPoint = getPointByRow(row._2)
    rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint)

    i += 1
    predecssorPoint = currentPoint
  }
})

return rowArray

然后,我将并行阵列和计算距离

Then I'll parallelize the array and calculate the distance

  //create a parallel-enabled data set
  val parallelDataSet = sc.parallelize(rows)

  parallelDataSet.foreach(row => {     
  Functions.logDistance(row)
})

这工作,但它的丑陋和低效的肯定。

That works but it's ugly and surely inefficient.

我的想法知道是使用rdd.reduce()摆脱foreach循环,如果距离函数搞定的顺序(A + B)不保证该问题这可能工作。

My idea know was to use rdd.reduce() to get rid of the foreach loop and this might work if the distance function handle the issue that the ordering of (a+b) is not guaranteed.

不管怎样,有没有更好的解决办法?我的理解是,有没有可能与RDDS工作时,有一个(高效)的索引访问。

Anyways, is there a better solution? My understanding is that there is no possibility to have an (efficient) index-access when working with RDDs.

感谢。

推荐答案

由于订货在这里是关键,继续一个好办法可能是先索引RDD。然后,使用索引我们可以模拟一个拉链和具有分配集群在元组。事情是这样的:

Given that ordering is key here, a good way to proceed could be to first index the RDD. Then, using the index we can simulate a zip and have the tuples partitioned over the cluster. Something like this:

val indexed = rdd.zipWithIndex.map(_.swap) // 
val shifted = indexed.map{case (k,v) => (k-1,v)}
val joined = indexed.join(shifted)
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)}

(*),例如code - 没有测试

(*) example code - not tested

这篇关于在顺序RDD处理功能的方法[阿帕奇星火]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆