使用Pyspark从关系数据集构建层次结构 [英] Build a hierarchy from a relational data-set using Pyspark

查看:110
本文介绍了使用Pyspark从关系数据集构建层次结构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Python的新手,并且坚持使用关系数据集构建层次结构.
如果有人对如何进行此操作有想法,那将是极大的帮助.

I am new to Python and stuck with building a hierarchy out of a relational dataset.
It would be of immense help if someone has an idea on how to proceed with this.

我有一个与之相关的数据集

I have a relational data-set with data like

_currentnode,  childnode_  
 root,         child1  
 child1,       leaf2  
 child1,       child3  
 child1,       leaf4  
 child3,       leaf5  
 child3,       leaf6  

等等.我正在寻找一些python或pyspark代码以
建立如下的层次结构数据框

so-on. I am looking for some python or pyspark code to
build a hierarchy dataframe like below

_level1, level2,  level3,  level4_  
root,    child1,  leaf2,   null  
root,    child1,  child3,  leaf5  
root,    child1,  child3,  leaf6  
root,    child1,  leaf4,   null  

数据是字母数字,是一个巨大的数据集[〜5000万条记录].
同样,层次结构的根是已知的,可以在代码中进行硬连线.
因此,在上面的示例中,层次结构的根是根".

The data is alpha-numerics and is a huge dataset[~50mil records].
Also, the root of the hierarchy is known and can be hardwired in the code.
So in the example, above, the root of the hierarchy is 'root'.

推荐答案

带有Pyspark的最短路径

输入数据可以解释为具有 currentnode childnode 之间的连接的图形.然后问题是根节点和所有叶节点之间的最短路径是什么,并称为单源最短路径.

Shortest Path with Pyspark

The input data can be interpreted as a graph with the connections between currentnode and childnode. Then the question is what is the shortest path between the root node and all leaf nodes and is called single source shortest path.

Spark具有 Graphx 来处理并行计算图.不幸的是,GraphX不提供Python API(可在此处找到更多详细信息).具有Python支持的图形库是 GraphFrames .GraphFrames使用GraphX的一部分.

Spark has Graphx to handle parallel computations of graphs. Unfortunately, GraphX does not provide a Python API (more details can be found here). A graph library with Python support is GraphFrames. GraphFrames uses parts of GraphX.

GraphX和GraphFrames都为sssp提供了解决方案.再次遗憾的是,这两种实现都只返回最短路径的长度,而不返回路径本身(此答案提供了GraphX和Scala算法的实现,该算法也返回路径.这三种解决方案都使用 Pregel .

Both GraphX and GraphFrames provide an solution for sssp. Unfortunately again, both implementations return only the length of the shortest paths, not the paths themselves (GraphX and GraphFrames). But this answer provides an implementation of the algorithm for GraphX and Scala that also returns the paths. All three solutions use Pregel.

将上述答案翻译为GraphFrames/Python:

Translating the aforementioned answer to GraphFrames/Python:

为所有节点提供唯一的ID,并更改列名称,以使其适合描述的名称

Provide unique IDs for all nodes and change the column names so that they fit to the names described here

import pyspark.sql.functions as F

df = ...

vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()

edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
        .join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache() 

Nodes                   Edges
+------+------------+   +-----------+---------+------------+------------+
|  node|          id|   |currentnode|childnode|         src|         dst|
+------+------------+   +-----------+---------+------------+------------+
| leaf2| 17179869184|   |     child1|    leaf4| 25769803776|249108103168|
|child1| 25769803776|   |     child1|   child3| 25769803776| 68719476736|
|child3| 68719476736|   |     child1|    leaf2| 25769803776| 17179869184|
| leaf6|103079215104|   |     child3|    leaf6| 68719476736|103079215104|
|  root|171798691840|   |     child3|    leaf5| 68719476736|214748364800|
| leaf5|214748364800|   |       root|   child1|171798691840| 25769803776|
| leaf4|249108103168|   +-----------+---------+------------+------------+
+------+------------+   

2.创建GraphFrame

from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)

3.创建将构成Pregel算法单个部分的UDF

消息类型:来自pyspark.sql.types import的

3. Create UDFs that will form the single parts of the Pregel algorithm

The message type:

from pyspark.sql.types import *
vertColSchema = StructType()\
      .add("dist", DoubleType())\
      .add("node", StringType())\
      .add("path", ArrayType(StringType(), True))

顶点程序:

def vertexProgram(vd, msg):
    if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
        return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
    else:
        return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)

外发消息:

def sendMsgToDst(src, dst):
    srcDist = src.__getitem__(0)
    dstDist = dst.__getitem__(0)
    if srcDist < (dstDist - 1):
        return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
    else:
        return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)

消息聚合:

def aggMsgs(agg):
    shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
    return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)

4.合并零件

从graphframes.lib中的

4. Combine the parts

from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
    initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
    .otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
    updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
    .sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
    .aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
    .setMaxIter(10) \
    .setCheckpointInterval(2) \
    .run()
result.select("vertCol.path").show(truncate=False)   

备注:

  • maxIter 应该设置为至少与最长路径一样大的值.如果该值较高,结果将保持不变,但计算时间会变长.如果值太小,结果中将缺少较长的路径.当前版本的GraphFrames(0.8.0)不支持在不再发送新消息时停止循环.
  • checkpointInterval 的值应小于 maxIter .实际值取决于数据和可用的硬件.当发生OutOfMemory异常或Spark会话挂起一段时间时,该值可能会减小.
  • maxIter should be set to a value at least as large as the longest path. If the value is higher, the result will stay unchanged, but the computation time becomes longer. If the value is too small, the longer paths will be missing in the result. The current version of GraphFrames (0.8.0) does not support stopping the loop when no more new messages are sent.
  • checkpointInterval should be set to a value smaller than maxIter. The actual value depends on the data and the available hardware. When OutOfMemory exception occur or the Spark session hangs for some time, the value could be reduced.

最终结果是包含内容的常规数据框

The final result is a regular dataframe with the content

+-----------------------------+
|path                         |
+-----------------------------+
|[root, child1]               |
|[root, child1, leaf4]        |
|[root, child1, child3]       |
|[root]                       |
|[root, child1, child3, leaf6]|
|[root, child1, child3, leaf5]|
|[root, child1, leaf2]        |
+-----------------------------+

如有必要,可以在此处过滤掉非叶子节点.

If necessary the non-leaf nodes could be filtered out here.

这篇关于使用Pyspark从关系数据集构建层次结构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆