使用 Pyspark 从关系数据集构建层次结构 [英] Build a hierarchy from a relational data-set using Pyspark

查看:26
本文介绍了使用 Pyspark 从关系数据集构建层次结构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Python 新手,一直坚持从关系数据集构建层次结构.
如果有人知道如何进行此操作,将会有很大帮助.

I am new to Python and stuck with building a hierarchy out of a relational dataset.
It would be of immense help if someone has an idea on how to proceed with this.

我有一个包含类似数据的关系数据集

I have a relational data-set with data like

_currentnode,  childnode_  
 root,         child1  
 child1,       leaf2  
 child1,       child3  
 child1,       leaf4  
 child3,       leaf5  
 child3,       leaf6  

等等.我正在寻找一些 python 或 pyspark 代码
构建如下所示的层次结构数据框

so-on. I am looking for some python or pyspark code to
build a hierarchy dataframe like below

_level1, level2,  level3,  level4_  
root,    child1,  leaf2,   null  
root,    child1,  child3,  leaf5  
root,    child1,  child3,  leaf6  
root,    child1,  leaf4,   null  

数据是字母数字,是一个巨大的数据集[~5000万条记录].
此外,层次结构的根是已知的,可以在代码中硬连线.
因此,在上面的示例中,层次结构的根是root".

The data is alpha-numerics and is a huge dataset[~50mil records].
Also, the root of the hierarchy is known and can be hardwired in the code.
So in the example, above, the root of the hierarchy is 'root'.

推荐答案

Shortest Path with Pyspark

输入数据可以解释为currentnodechildnode之间的连接图.那么问题是根节点和所有叶节点之间的最短路径是什么,称为单源最短路径.

Shortest Path with Pyspark

The input data can be interpreted as a graph with the connections between currentnode and childnode. Then the question is what is the shortest path between the root node and all leaf nodes and is called single source shortest path.

Spark 有 Graphx 来处理并行计算的图表.不幸的是,GraphX 不提供 Python API(更多详细信息可以在此处找到).支持 Python 的图形库是 GraphFrames.GraphFrames 使用了 GraphX 的一部分.

Spark has Graphx to handle parallel computations of graphs. Unfortunately, GraphX does not provide a Python API (more details can be found here). A graph library with Python support is GraphFrames. GraphFrames uses parts of GraphX.

GraphX 和 GraphFrames 都为 sssp 提供了解决方案.不幸的是,这两种实现都只返回最短路径的长度,而不是路径本身(GraphXGraphFrames).但是 这个答案 为 GraphX 和 Scala 提供了算法的实现,该算法也返回路径.所有三种解决方案都使用 Pregel.

Both GraphX and GraphFrames provide an solution for sssp. Unfortunately again, both implementations return only the length of the shortest paths, not the paths themselves (GraphX and GraphFrames). But this answer provides an implementation of the algorithm for GraphX and Scala that also returns the paths. All three solutions use Pregel.

将上述答案翻译成 GraphFrames/Python:

Translating the aforementioned answer to GraphFrames/Python:

为所有节点提供唯一 ID 并更改列名称,使其符合描述的名称 这里

Provide unique IDs for all nodes and change the column names so that they fit to the names described here

import pyspark.sql.functions as F

df = ...

vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()

edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
        .join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache() 

Nodes                   Edges
+------+------------+   +-----------+---------+------------+------------+
|  node|          id|   |currentnode|childnode|         src|         dst|
+------+------------+   +-----------+---------+------------+------------+
| leaf2| 17179869184|   |     child1|    leaf4| 25769803776|249108103168|
|child1| 25769803776|   |     child1|   child3| 25769803776| 68719476736|
|child3| 68719476736|   |     child1|    leaf2| 25769803776| 17179869184|
| leaf6|103079215104|   |     child3|    leaf6| 68719476736|103079215104|
|  root|171798691840|   |     child3|    leaf5| 68719476736|214748364800|
| leaf5|214748364800|   |       root|   child1|171798691840| 25769803776|
| leaf4|249108103168|   +-----------+---------+------------+------------+
+------+------------+   

2.创建 GraphFrame

from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)

3.创建将构成 Pregel 算法单个部分的 UDF

消息类型:

from pyspark.sql.types import *
vertColSchema = StructType()\
      .add("dist", DoubleType())\
      .add("node", StringType())\
      .add("path", ArrayType(StringType(), True))

顶点程序:

def vertexProgram(vd, msg):
    if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
        return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
    else:
        return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)

外发消息:

def sendMsgToDst(src, dst):
    srcDist = src.__getitem__(0)
    dstDist = dst.__getitem__(0)
    if srcDist < (dstDist - 1):
        return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
    else:
        return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)

消息聚合:

def aggMsgs(agg):
    shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
    return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)

4.组合零件

from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
    initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
    .otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
    updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
    .sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
    .aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
    .setMaxIter(10) \
    .setCheckpointInterval(2) \
    .run()
result.select("vertCol.path").show(truncate=False)   

备注:

  • maxIter 应设置为至少与最长路径一样大的值.数值越大,结果不变,但计算时间变长.如果该值太小,则结果中将缺少较长的路径.当前版本的 GraphFrames (0.8.0) 不支持在不再发送新消息时停止循环.
  • checkpointInterval 应设置为小于 maxIter 的值.实际值取决于数据和可用硬件.当 OutOfMemory 异常或 Spark 会话挂起一段时间后,该值可能会减小.
  • maxIter should be set to a value at least as large as the longest path. If the value is higher, the result will stay unchanged, but the computation time becomes longer. If the value is too small, the longer paths will be missing in the result. The current version of GraphFrames (0.8.0) does not support stopping the loop when no more new messages are sent.
  • checkpointInterval should be set to a value smaller than maxIter. The actual value depends on the data and the available hardware. When OutOfMemory exception occur or the Spark session hangs for some time, the value could be reduced.

最终结果是一个带有内容的常规数据框

The final result is a regular dataframe with the content

+-----------------------------+
|path                         |
+-----------------------------+
|[root, child1]               |
|[root, child1, leaf4]        |
|[root, child1, child3]       |
|[root]                       |
|[root, child1, child3, leaf6]|
|[root, child1, child3, leaf5]|
|[root, child1, leaf2]        |
+-----------------------------+

如有必要,可以在此处过滤掉非叶节点.

If necessary the non-leaf nodes could be filtered out here.

这篇关于使用 Pyspark 从关系数据集构建层次结构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆