如何在RDD每个元素比较在RDD所有其他元素? [英] How to compare every element in the RDD with every other element in the RDD ?

查看:255
本文介绍了如何在RDD每个元素比较在RDD所有其他元素?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图执行K近邻搜索使用的火花。

I'm Trying to perform a K nearest neighbor search using spark.

我有一个RDD [序号[双],我刨返回
RDD [(SEQ [双],顺序[顺序[双]])]
用实际行和邻居列表

I have a RDD[Seq[Double]] and I'm planing to return a RDD[(Seq[Double],Seq[Seq[Double]])] with the actual row and a list of neighbors

val out = data.map(row => {
    val neighbours = data.top(num = 3)(new Ordering[Seq[Double]] {
      override def compare(a:Seq[Double],b:Seq[Double]) = {
        euclideanDistance(a,row).compare(euclideanDistance(b,row))*(-1)
      }
    })
  (row,neighbours.toSeq)
})

和它给人的火花以下错误提交

And it Gives the following error on spark Submit

15/04/29 21:15:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 192.168.1.7): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

据我所知,嵌套RDD是不可能的,但如何执行这样的操作,我可以每个元素在RDD与在RDD所有其他元素比较

I understand that nesting RDD is not possible but how do i perform such operations where I can compare every element in the RDD with every other element in the RDD

推荐答案

像这样的东西应该这样做。

Something like this should do it.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sco= new SparkContext(conf)

// k is the number of nearest neighbors required 
val k = 3

// generate 5 rows of two-dimensional coordinates
val rows = List.fill(5)(List.fill(2)(Math.random))
val dataRDD = sco.parallelize(rows, 1)

// No need for the sqrt as we're just comparing them
def euclidean(a:List[Double], b:List[Double]) = 
 (a zip b) map {case (x:Double, y:Double) => (x-y)*(x-y)} sum

// get all pairs
val pairs = dataRDD.cartesian(dataRDD)

// case class to keep things a bit neater
// the neighbor, and its distance from the current point
case class Entry(neighbor: List[Double], dist:Double)

// map the second element to the element and distance from the first
val pairsWithDist = pairs.map {case (x, y) => (x, Entry(y, euclidean(x,y)))}

// merge a row of pairsWithDist with the ResultRow for this point
def mergeOne(u: List[Entry], v:Entry) = (v::u).sortBy{_.dist}.take(k)

// merge two results from different partitions
def mergeList(u: List[Entry], v:List[Entry]) = (u:::v).sortBy{_.dist}.take(k)

val nearestNeighbors = pairsWithDist
                      .aggregateByKey(List[Entry]())(mergeOne, mergeList)

这篇关于如何在RDD每个元素比较在RDD所有其他元素?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆