如何建立从graphx元组的图形后标记节点? [英] how to build a graph from tuples in graphx and label the nodes after ?

查看:236
本文介绍了如何建立从graphx元组的图形后标记节点?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

某些情况下,可以发现<一个href=\"http://stackoverflow.com/questions/31928133/how-to-create-a-graph-from-arrayany-any-using-graph-fromedgetuples/31929807?noredirect=1#comment51792711_31929807\">here,这个想法是,我创建从一个蜂巢表的请求采集的元组的图形。这些对应的贸易国家之间的关系。
具有内置图形以此方式,顶点没有标签。我想学习的学位分配和获得最发达的国家的名字。我试过2个选项:

Some context can be found here, the idea is that I have created a graph from tuples collected from a request on a Hive table. Those correspond to trade relations between countries. Having built the graph this way, the vertices are not labelled. I want to study the distribution of degrees and get the most connected countries' names. I tried 2 options :


  • 首页的:我试图映射与函数内部的功能idMapbis它正在收集和打印十大关联度顶点的字符串名称顶点的指数

  • 的:我试着标签添加到图形本身的顶点

  • First : I tried to map the index of the vertices with the string names of the vertices with the function idMapbis inside the function which is collecting and printing the ten top connected degrees.
  • Second : I tried to add label to the vertices of the graph itself.

在这两种情况下我得到以下错误:任务不序列化

In both cases I get the following error : the task is not serializable

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays 

夫妻是这样的:数组[(任何,任何)] =阵列((MWI,MOZ),(WSM,AUS),(MDA,CRI),(KNA,HTI),(PER,ERI),( SWE,CUB),...

couples look like this: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB),...

val idMap = sc.broadcast(couples 
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct 
.zipWithIndex  
.map{case (k, v) => (k, v.toLong)}  
.toMap) 

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
.map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})

val graph = Graph.fromEdgeTuples(edges, 1)

这种方式构建的,顶点的样子(68,1)例如:

built this way, vertices look like (68,1) for example

val degrees: VertexRDD[Int] = graph.degrees.cache()

//Most connected vertices 
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, k) => (id.toInt, degree)}
val ord = Ordering.by[(Int, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

我们得到:(79,1​​016),(64912),(55889)...

We get : (79,1016),(64,912),(55,889)...

val idMapbis = sc.parallelize(couples
.flatMap{case (x: String, y: String) => Seq(x, y)} 
.distinct 
.zipWithIndex  
.map{case (k, v) => (v,k)}  
.toMap)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]):  Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (idMapbis.value(id.toInt), degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

任务不是序列化的,但功能idMapbis工作,因为没有与idMapbis.value没有错误(graph.vertices.take(1)(0)._ 1.toInt)

The task is not serializable but the function idMapbis is working since there is no error with idMapbis.value(graph.vertices.take(1)(0)._1.toInt)

graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}

的任务不是再序列化(上下文这里是topNamesAndDegrees是如何修改,以获得在该选项的最顶点连接的名称)

The task is not serializable again (for context here is how topNamesAndDegrees is modified to obtain the names of the most connected vertices in this option)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (name, degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

我感兴趣的是了解如何改进这个选项之一,也许两个,如果有人看到。

I am interested in understanding how to improve one of this option, maybe both if someone see how.

推荐答案

与你的企图问题是, idMapbis RDD 。既然我们已经知道你的数据能够装入内存,你可以简单地使用广播变量作为前:

Problem with your attempts is that idMapbis is an RDD. Since we already know your data fits into memory you can simply use a broadcast variable as before:

val idMapRev = sc.broadcast(idMap.value.map{case (k, v) => (v, k)}.toMap)
graph.mapVertices{case (id, _) => idMapRev.value(id)}

另外,您可以从一开始就使用正确的标签:

Alternatively you could use the correct labels from the beginning:

val countries: RDD[(VertexId, String)] = sc
  .parallelize(idMap.value.map(_.swap).toSeq)

val relationships: RDD[Edge[Int]] = sc.parallelize(couples
 .map{case (x: String, y: String) => Edge(idMap.value(x), idMap.value(y), 1)}
)

val graph = Graph(countries, relationships)

第二个方法有一个重要的优势 - 如果图形很大,你比较容易与连接代替广播变量。

The second approach has one important advantage - if graph is large you relatively easily replace broadcast variables with joins.

这篇关于如何建立从graphx元组的图形后标记节点?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆