如何在Spark中的分解结构中添加列? [英] How to add column to exploded struct in Spark?

查看:67
本文介绍了如何在Spark中的分解结构中添加列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

说我有以下数据:

{"id":1, "payload":[{"foo":1, "lol":2},{"foo":2, "lol":2}]}

我想爆炸有效载荷并向其添加一个列,如下所示:

I would like to explode the payload and add a column to it, like this:

df = df.select('id', F.explode('payload').alias('data'))
df = df.withColumn('data.bar', F.col('data.foo') * 2)

但是,这将导致一个包含三列的数据框:

However this results in a dataframe with three columns:

  • id
  • data
  • data.bar
  • id
  • data
  • data.bar

我希望data.bardata结构的一部分...

I expected the data.bar to be part of the data struct...

如何将列添加到分解结构中,而不是添加顶级列?

How can I add a column to the exploded struct, instead of adding a top-level column?

推荐答案

df = df.withColumn('data', f.struct(
    df['data']['foo'].alias('foo'),
   (df['data']['foo'] * 2).alias('bar')
))

这将导致:

root
 |-- id: long (nullable = true)
 |-- data: struct (nullable = false)
 |    |-- col1: long (nullable = true)
 |    |-- bar: long (nullable = true)

更新:

def func(x):
    tmp = x.asDict()
    tmp['foo'] = tmp.get('foo', 0) * 100
    res = zip(*tmp.items())
    return Row(*res[0])(*res[1])

df = df.withColumn('data', f.UserDefinedFunction(func, StructType(
    [StructField('foo', StringType()), StructField('lol', StringType())]))(df['data']))

P.S.

火花几乎不支持就地.

因此,每次您要进行替换时,实际上都需要进行替换.

So every time you want to do inplace, you need to do replace actually.

这篇关于如何在Spark中的分解结构中添加列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆