如何在 SQL/Spark/GraphFrames 中进行这种转换 [英] How to do this transformation in SQL/Spark/GraphFrames

查看:61
本文介绍了如何在 SQL/Spark/GraphFrames 中进行这种转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含以下两列的表格:

I've a table containing the following two columns:

Device-Id    Account-Id
d1           a1   
d2           a1
d1           a2
d2           a3
d3           a4
d3           a5 
d4           a6
d1           a4

Device-Id 是安装我的应用的设备的唯一 ID,Account-Id 是用户帐户的 ID.一个用户可以拥有多个设备,并且可以在同一设备上创建多个帐户(例如,d1 设备设置了 a1、a2 和 a3 帐户).

Device-Id is the unique Id of the device on which my app is installed and Account-Id is the id of a user account. A user can have multiple devices and can create multiple accounts on the same device(eg. d1 device has a1, a2 and a3 accounts set up).

我想找到唯一的实际用户(应该表示为一个新列,在生成的表中有一些唯一的 UUID)和我正在寻找的转换,生成下表:

I want to find unique actual users(should be represented as a new column with some unique UUID in the generated table) and the transformation I'm looking for, generates the following table:

Unique-User-Id    Devices-Used    Accounts-Used
uuid1             [d1, d2, d3]    [a1, a2, a3, a4, a5]   
uuid2             [d4]            [a6]

上面生成的表背后的想法是,实际用户 uuid1 在他们的设备 d1 和 d2 上设置了一个帐户 a1,这实质上意味着这两个设备都属于 uuid 1,并且在这些设备上设置的所有其他帐户d1 和 d2 设备也映射到同一个用户 uuid1.同理,d1 也有一个账号 a4,也是在 d3 上设置的,所以 d3 也是 uuid1 的设备,上面的每个账号都应该映射到 uuid1.

The idea behind the above generated table is that an actual user, uuid1, has an account a1 set up on their devices d1 and d2, which essentially means that both these devices belong to uuid 1 and all other accounts set up on these d1 and d2 devices also map to the same user uuid1. Similarly, d1 also has an account a4 which is also set up on d3, so d3 is also uuid1's device and every account on it should get mapped to uuid1.

如何在 SQL/Spark/GraphFrames(通过 DataBricks)中实现上述转换,其中设备 ID 和帐户 ID 都可以达到数百万?

How can I achieve the above mentioned transformation in SQL/Spark/GraphFrames (by DataBricks) where both Device-Ids and Account-Ids can be in millions?

推荐答案

你可以试试 GraphFrame.connectedComponents,给所有的Device-ID加上前缀,这样就可以从Account-ID中分离出来s 在后处理步骤中:

You can try GraphFrame.connectedComponents, add a prefix to all Device-IDs, so that they can be split from Account-IDs in the post-processing step:

from graphframes import GraphFrame
from pyspark.sql.functions import collect_set, expr

df = spark.createDataFrame([
         ("d1","a1"), ("d2","a1"), ("d1","a2"), ("d1","a4"),
         ("d2","a3"), ("d3","a4"), ("d3","a5"), ("d4","a6")  
], ["Device-Id","Account-Id"])

# set checkpoint which is required for Graphframe
spark.sparkContext.setCheckpointDir("/tmp/111")

# for testing purpose, set a small shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 2)

# set up edges and vertices, add an underscore as prefix of Device-ID
edges = df.withColumn('Device-Id', expr('concat("_", `Device-Id`)')).toDF('src', 'dst')
vertices = edges.selectExpr('src as id').distinct().union(edges.select('dst').distinct())

# set up the graph
g = GraphFrame(vertices, edges)

# compute the connected components and group resultset by component
# and collect corresponding ids using collect_set(id)
df1 = g.connectedComponents().groupby('component').agg(collect_set('id').alias('ids'))
df1.show(truncate=False)
+------------+-----------------------------------+
|component   |ids                                |
+------------+-----------------------------------+
|309237645312|[a6, _d4]                          |
|85899345920 |[_d1, a4, a1, _d3, a3, a5, a2, _d2]|
+------------+-----------------------------------+

# split the ids based on the prefix we predefined when creating edges.
df1.selectExpr(
      'transform(filter(ids, x -> left(x,1) = "_"), y -> substr(y,2)) AS `Devices-Used`'
    , 'filter(ids, x -> left(x,1) != "_") AS `Accounts-Used`'
    , 'component AS `Unique-User-Id`'
).show()
+------------+--------------------+--------------+
|Devices-Used|       Accounts-Used|Unique-User-Id|
+------------+--------------------+--------------+
|[d1, d3, d2]|[a4, a1, a3, a5, a2]|   85899345920|
|        [d4]|                [a6]|  309237645312|
+------------+--------------------+--------------+

上述方法在创建大的边/顶点列表时效率较低,这是不必要的,使用自连接创建边列表应该是更好的选择(受此帖子):

The above method is less efficient in creating large list of edges/vertices which is unnecessary, using self-join to create edges list should be a better choice (inspired by this post):

edges = df.alias('d1').join(df.alias('d2'), ["Account-Id"]) \
    .filter("d1.`Device-Id` > d2.`Device-Id`") \
    .toDF("account", "src", "dst")
+-------+---+---+
|account|src|dst|
+-------+---+---+
|     a1| d2| d1|
|     a4| d3| d1|
+-------+---+---+

vertices = df.selectExpr('`Device-Id` as id', "`Account-Id` as acct_id")
g = GraphFrame(vertices, edges)

df1 = g.connectedComponents() \
    .groupby('component') \
    .agg(
       collect_set('id').alias('Device-Ids'),
       collect_set('acct_id').alias('Account-Ids')
     )
+---------+------------+--------------------+
|component|  Device-Ids|         Account-Ids|
+---------+------------+--------------------+
|        0|[d1, d2, d3]|[a4, a1, a3, a5, a2]|
|        1|        [d4]|                [a6]|
+---------+------------+--------------------+

这篇关于如何在 SQL/Spark/GraphFrames 中进行这种转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆