将结构数组扩展为PySpark中的列 [英] Expand array-of-structs into columns in PySpark

查看:94
本文介绍了将结构数组扩展为PySpark中的列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个源自Google Analytics(分析)的Spark数据框,如下所示:

I have a Spark dataframe, originating from Google Analytics, that looks like the following:

id     customDimensions (Array<Struct>)
100    [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101    [ {"index": 1, "value": "Mars" }]

我还有一个自定义维度元数据"数据框,如下所示:

I also have a "custom dimensions metadata" dataframe that looks like this:

index   name
1       planet
2       continent

我将使用元数据df中的索引,以便将自定义维度扩展为列.结果应如下所示:

I'd to use the indexes in the metadata df in order to expand my custom dimensions into columns. The result should look like the following:

id     planet     continent
100    Earth      Europe
101    Mars       null


我尝试了以下方法,并且效果很好,但是效果极差.我想知道是否有更好的方法.


I have tried the following approach, and it works fine, however it is extremely non-performant. I'd like to know if there's a better approach.

# Select the two relevant columns
cd = df.select('id', 'customDimensions')

# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
         .join(metadata, cd.index == metadata.index, 'left')
         .drop(metadata.index))

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))

# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)


假设:


Assumptions:

  • 最多有250个自定义维度索引,并且只能通过元数据数据框知道名称
  • 原始数据框还有其他几列我要维护(因此在解决方案末尾加入了连接)

推荐答案

联接是非常昂贵的操作,因为它会导致数据改组.如果可以,则应避免使用它或对其进行优化.

Joins are very costly operation because it results in data shuffling. If you can, you should avoid it or look to optimize it.

您的代码中有两个联接.完全可以避免最后一次联接使列退回.可以优化与元数据数据帧的其他联接.由于元数据df只有250行并且非常多,因此可以在联接中使用broadcast()提示.这样可以避免改组较大的数据帧.

There are two joins in your code. The last join get the columns back can be avoided altogether. The other join with metadata dataframe can be optimized. Since metadata df has only 250 rows and is very, you can use broadcast() hint in the join. This would avoid shuffling of the larger dataframe.

我进行了一些建议的代码更改,但由于没有您的数据,因此未经过测试.

I have made some suggested code changes but its not tested since I don't have your data.

# df columns list
df_columns = df.columns

# Explode customDimensions so that each row now has a {index, value}
cd = df.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select(*df_columns, 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = cd.join(broadcast(metadata), "index", 'left')

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy(df_columns).pivot('name').agg(F.first(F.col('value')))


return piv

这篇关于将结构数组扩展为PySpark中的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆