使用 Spark 计算节点之间的链接 [英] Calculate links between nodes using Spark

查看:19
本文介绍了使用 Spark 计算节点之间的链接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Spark 2.2 和 Scala 2.11 中有以下两个 DataFrame.DataFrame edges 定义了有向图的边,而 DataFrame types 定义了每个节点的类型.

I have the following two DataFrames in Spark 2.2 and Scala 2.11. The DataFrame edges defines the edges of a directed graph, while the DataFrame types defines the type of each node.

edges =

+-----+-----+----+
|from |to   |attr|
+-----+-----+----+
|    1|    0|   1|
|    1|    4|   1|
|    2|    2|   1|
|    4|    3|   1|
|    4|    5|   1|
+-----+-----+----+

types =
+------+---------+
|nodeId|type     |
+------+---------+
|     0|        0|
|     1|        0|
|     2|        2|
|     3|        4|
|     4|        4|
|     5|        4|
+------+---------+

对于每个节点,我想知道相同type的节点的边数.请注意,由于我处理的是有向图,因此我只想计算从节点发出的边数.

For each node, I want to know the number of edges to the nodes of the same type. Please notice that I only want to count the edges outgoing from a node, since I deal with the directed graph.

为了达到这个目标,我执行了两个 DataFrame 的连接:

In order to reach this objective, I performed the joining of both DataFrames:

val graphDF = edges
                  .join(types, types("nodeId") === edges("from"), "left")
                  .drop("nodeId")
                  .withColumnRenamed("type","type_from")
                  .join(types, types("nodeId") === edges("to"), "left")
                  .drop("nodeId")
                  .withColumnRenamed("type","type_to")

我获得了以下新数据帧 graphDF:

I obtained the following new DataFrame graphDF:

+-----+-----+----+---------------+---------------+
|from |to   |attr|type_from      |type_to        |
+-----+-----+----+---------------+---------------+
|    1|    0|   1|              0|              0|
|    1|    4|   1|              0|              4|
|    2|    2|   1|              2|              2|
|    4|    3|   1|              4|              4|
|    4|    5|   1|              4|              4|
+-----+-----+----+---------------+---------------+

现在我需要得到以下最终结果:

Now I need to get the following final result:

+------+---------+---------+
|nodeId|numLinks |type     |
+------+---------+---------+
|     0|        0|        0| 
|     1|        1|        0|
|     2|        0|        2|
|     3|        0|        4|
|     4|        2|        4|
|     5|        0|        4| 
+------+---------+---------+

我正在考虑使用 groupByagg(count(...),但我不知道如何处理有向边.

I was thinking about using groupBy and agg(count(...), but I do not know how to deal with directed edges.

更新:

numLinks 计算为从给定节点传出的边数.例如,节点5没有任何出边(只有入边4->5,参见DataFrameedges).同样是指节点0.但是节点4有两条出边(4->34->5).

numLinks is calculated as the number of edges outgoing from a given node. For example, the node 5 does not have any outgoing edges (only ingoing edge 4->5, see the DataFrame edges). The same refers to the node 0. But the node 4 has two outgoing edges (4->3 and 4->5).

我的解决方案:

这是我的解决方案,但它缺少那些具有 0 个链接的节点.

This is my solution, but it lacks those nodes that have 0 links.

graphDF.filter("from != to").filter("type_from == type_to").groupBy("from").agg(count("from") as "numLinks").show()

推荐答案

您可以过滤、按 id 和类型聚合并使用类型添加缺失的节点:

You can filter, aggregate by id and type and add missing nodes using types:

val graphDF = Seq(
  (1, 0, 1, 0, 0), (1, 4, 1, 0, 4), (2, 2, 1, 2, 2),
  (4, 3, 1, 4, 4), (4, 5, 1, 4, 4)
).toDF("from", "to", "attr", "type_from", "type_to")

val types = Seq(
  (0, 0), (1, 0), (2, 2), (3, 4), (4,4), (5, 4)
).toDF("nodeId", "type")

graphDF
  // I want to know the number of edges to the nodes of the same type
  .where($"type_from" === $"type_to" && $"from" =!= $"to")
  // I only want to count the edges outgoing from a node,
  .groupBy($"from" as "nodeId", $"type_from" as "type")
  .agg(count("*") as "numLinks")
  // but it lacks those nodes that have 0 links.
  .join(types, Seq("nodeId", "type"), "rightouter")
  .na.fill(0)

// +------+----+--------+
// |nodeId|type|numLinks|
// +------+----+--------+
// |     0|   0|       0|
// |     1|   0|       1|
// |     2|   2|       1|
// |     3|   4|       0|
// |     4|   4|       2|
// |     5|   4|       0|
// +------+----+--------+

要跳过自链接,请将 $"from" =!= $"to" 添加到选择中:

To skip self-links add $"from" =!= $"to" to the selection:

graphDF
  .where($"type_from" === $"type_to" && $"from" =!= $"to")
  .groupBy($"from" as "nodeId", $"type_from" as "type")
  .agg(count("*") as "numLinks")
  .join(types, Seq("nodeId", "type"), "rightouter")
  .na.fill(0)

// +------+----+--------+
// |nodeId|type|numLinks|
// +------+----+--------+
// |     0|   0|       0|
// |     1|   0|       1|
// |     2|   2|       0|
// |     3|   4|       0|
// |     4|   4|       2|
// |     5|   4|       0|
// +------+----+--------+

这篇关于使用 Spark 计算节点之间的链接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆