数据框 pyspark 到 dict [英] Dataframe pyspark to dict

查看:56
本文介绍了数据框 pyspark 到 dict的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这个数据框 path_df:

I have this dataframe path_df:

path_df.show()
+---------------+-------------+----+
|FromComponentID|ToComponentID|Cost|
+---------------+-------------+----+
|            160|          163|27.0|
|            160|          183|27.0|
|            161|          162|22.0|
|            161|          170|31.0|
|            162|          161|22.0|
|            162|          167|24.0|
|            163|          160|27.0|
|            163|          164|27.0|
|            164|          163|27.0|
|            164|          165|35.0|
|            165|          164|35.0|
|            165|          166|33.0|
|            166|          165|33.0|
|            166|          167|31.0|
|            167|          162|24.0|
|            167|          166|31.0|
|            167|          168|27.0|
|            168|          167|27.0|
|            168|          169|23.0|
|            169|          168|23.0|
+---------------+-------------+----+
only showing top 20 rows

由此,我想做一个字典,如下:{FromComponentID:{ToComponentID:Cost}}

From this, I want to make a dictionnary, as follow: {FromComponentID:{ToComponentID:Cost}}

对于我当前的数据,它将是:

For my current data, it would be:

{160 : {163 : 27,
        183 : 27},
 161 : {162 : 22,
        170 : 31},
 162 : {161 : 22
        167 : 24},
 ...
 167 : {162 : 24,
        166 : 31,
        168 : 27}
 168 : {167 : 27,
        169 : 23},
 169 : {168 : 23}
}

我可以只使用 PySpark 吗?怎么做?或者也许最好提取我的数据并直接用 python 处理它们.

Can I do that using only PySpark and how ? Or maybe it's better to extract my data and process them directly with python.

推荐答案

您可以使用数据帧转换和 udfs 完成所有这些工作.唯一有点烦人的是,因为从技术上讲,您有两种不同类型的字典(一种是 key=integer 和 value=dictionary,另一种是 key=integer value=float),所以您必须定义两个具有不同数据类型的 udf.这是一种可能的方法:

You can do all of this with dataframe transformations and udfs. The only slightly annoying thing is that, because you technically have two different types of dictionaries (one where key=integer and value=dictionary, the other where key=integer value=float), you will have to define two udfs with different datatypes. Here is one possible way to do this:

from pyspark.sql.functions import udf,collect_list,create_map
from pyspark.sql.types import MapType,IntegerType,FloatType

data = [[160,163,27.0],[160,183,27.0],[161,162,22.0],
      [161,170,31.0],[162,161,22.0],[162,167,24.0],
      [163,160,27.0],[163,164,27.0],[164,163,27.0],
      [164,165,35.0],[165,164,35.0],[165,166,33.0],
      [166,165,33.0],[166,167,31.0],[167,162,24.0],
      [167,166,31.0],[167,168,27.0],[168,167,27.0],
      [168,169,23.0],[169,168,23.0]]

cols = ['FromComponentID','ToComponentID','Cost']
df = spark.createDataFrame(data,cols)

combineMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
             MapType(IntegerType(),FloatType()))

combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
             MapType(IntegerType(),MapType(IntegerType(),FloatType())))

mapdf = df.groupBy('FromComponentID')\
.agg(collect_list(create_map('ToComponentID','Cost')).alias('maps'))\
.agg(combineDeepMap(collect_list(create_map('FromComponentID',combineMap('maps')))))

result_dict = mapdf.collect()[0][0]

对于大型数据集,与需要将数据收集到单个节点的解决方案相比,这应该会提供一些性能提升.但是由于 spark 仍然需要序列化 ​​udf,因此基于 rdd 的解决方案不会有巨大的收益.

For a large dataset, this should offer some performance boosts over a solution that requires the data to be collected onto a single node. But since spark still has to serialize the udf, there won't be huge gains over an rdd based solution.

更新:

rdd 解决方案要紧凑得多,但在我看来,它并不那么干净.这是因为 pyspark 不会很容易地将大型字典存储为 rdds.解决方案是将其存储为元组的分布式列表,然后在将其收集到单个节点时将其转换为字典.这是一种可能的解决方案:

An rdd solution is a lot more compact but, in my opinion, it is not as clean. This is because pyspark doesn't store large dictionaries as rdds very easily. The solution is to store it as a distributed list of tuples and then convert it to a dictionary when you collect it to a single node. Here is one possible solution:

maprdd = df.rdd.groupBy(lambda x:x[0]).map(lambda x:(x[0],{y[1]:y[2] for y in x[1]}))
result_dict = dict(maprdd.collect()) 

同样,这应该比单节点上的纯 python 实现提供性能提升,它可能与数据帧实现没有什么不同,但我期望数据帧版本的性能更高.

Again, this should offer performance boosts over a pure python implementation on single node, and it might not be that different than the dataframe implementation, but my expectation is that the dataframe version will be more performant.

这篇关于数据框 pyspark 到 dict的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆