如何在Spark中的分解结构中添加列? [英] How to add column to exploded struct in Spark?
本文介绍了如何在Spark中的分解结构中添加列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
说我有以下数据:
{"id":1, "payload":[{"foo":1, "lol":2},{"foo":2, "lol":2}]}
我想爆炸有效载荷并向其添加一个列,如下所示:
I would like to explode the payload and add a column to it, like this:
df = df.select('id', F.explode('payload').alias('data'))
df = df.withColumn('data.bar', F.col('data.foo') * 2)
但是,这将导致一个包含三列的数据框:
However this results in a dataframe with three columns:
-
id
-
data
-
data.bar
id
data
data.bar
我希望data.bar
是data
结构的一部分...
I expected the data.bar
to be part of the data
struct...
如何将列添加到分解结构中,而不是添加顶级列?
How can I add a column to the exploded struct, instead of adding a top-level column?
推荐答案
df = df.withColumn('data', f.struct(
df['data']['foo'].alias('foo'),
(df['data']['foo'] * 2).alias('bar')
))
这将导致:
root
|-- id: long (nullable = true)
|-- data: struct (nullable = false)
| |-- col1: long (nullable = true)
| |-- bar: long (nullable = true)
更新:
def func(x):
tmp = x.asDict()
tmp['foo'] = tmp.get('foo', 0) * 100
res = zip(*tmp.items())
return Row(*res[0])(*res[1])
df = df.withColumn('data', f.UserDefinedFunction(func, StructType(
[StructField('foo', StringType()), StructField('lol', StringType())]))(df['data']))
P.S.
火花几乎不支持就地.
因此,每次您要进行替换时,实际上都需要进行替换.
So every time you want to do inplace, you need to do replace actually.
这篇关于如何在Spark中的分解结构中添加列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文