如何从星火数据帧创建EdgeRDD [英] how to create EdgeRDD from data frame in Spark
问题描述
我在火花数据帧。每一行再presents一个人,我想找回它们之间可能的联系。有一个链接的规则是,对于每个可能的,如果他们有相同的为prop1:字符串和prop2的绝对差值:诠释为< 5,然后链接存在。我试图理解来完成这项任务与数据帧的工作的最好方法。
I have a dataframe in spark. Each row represents a person and I want to retrieve possible connections among them. The rule to have a link is that, for each possible pair, if they have the same prop1:String and the absolute difference of prop2:Int is < 5 then the link exists. I am trying to understand the best way to accomplish this task working with data frame.
我想检索索引RDDS:
I am trying to retrieve indexed RDDs:
val idusers = people.select("ID")
.rdd
.map(r => r(0).asInstanceOf[Int])
.zipWithIndex
val prop1users = people.select("ID")
.rdd
.map(r => (r(0).asInstanceOf[Int], r(1).asInstanceOf[String]))
val prop2users = people.select("ID")
.rdd
.map(r => (r(0).asInstanceOf[Int], r(2).asInstanceOf[Int]))
然后开始像删除重复:
then start removing duplicates like:
var links = idusers
.join(idusers)
.filter{ case (v1, v2) => v2._1 != v2._2 }
但后来我被困检查为prop1 ......反正,有没有办法来完成所有这些步骤只是用数据帧?
but then I got stuck to check for prop1... anyway, is there a way to accomplish all these steps just using data frame?
推荐答案
假设你有这样的事情:
val sqlc : SQLContext = ???
case class Person(id: Long, country: String, age: Int)
val testPeople = Seq(
Person(1, "Romania" , 15),
Person(2, "New Zealand", 30),
Person(3, "Romania" , 17),
Person(4, "Iceland" , 20),
Person(5, "Romania" , 40),
Person(6, "Romania" , 44),
Person(7, "Romania" , 45),
Person(8, "Iceland" , 21),
Person(9, "Iceland" , 22)
)
val people = sqlc.createDataFrame(testPeople)
您可以创建第一台自行奇迹的列重命名为避免柱中发生冲突自联接:
You can create first self miracle with columns renamed to avoid column-clashed in self-join:
val peopleR = people
.withColumnRenamed("id" , "idR")
.withColumnRenamed("country", "countryR")
.withColumnRenamed("age" , "ageR")
现在,您可以加入数据框带自检,下降成对交换和循环边:
Now you can join dataframe with self, dropping swapped pairs and loop-edges:
import org.apache.spark.sql.functions._
val relations = people.join(peopleR,
(people("id") < peopleR("idR")) &&
(people("country") === peopleR("countryR")) &&
(abs(people("age") - peopleR("ageR")) < 5))
最后,你可以建立所需的 EdgeRDD
:
import org.apache.spark.graphx._
val edges = EdgeRDD.fromEdges(relations.map(row => Edge(
row.getAs[Long]("id"), row.getAs[Long]("idR"), ())))
relations.show()
现在将输出:
+---+-------+---+---+--------+----+
| id|country|age|idR|countryR|ageR|
+---+-------+---+---+--------+----+
| 1|Romania| 15| 3| Romania| 17|
| 4|Iceland| 20| 8| Iceland| 21|
| 4|Iceland| 20| 9| Iceland| 22|
| 5|Romania| 40| 6| Romania| 44|
| 6|Romania| 44| 7| Romania| 45|
| 8|Iceland| 21| 9| Iceland| 22|
+---+-------+---+---+--------+----+
和 edges.toLocalIterator.foreach(的println)
将输出:
Edge(1,3,())
Edge(4,8,())
Edge(4,9,())
Edge(5,6,())
Edge(6,7,())
Edge(8,9,())
这篇关于如何从星火数据帧创建EdgeRDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!