pyspark:从现有列创建MapType列 [英] pyspark: Create MapType Column from existing columns

查看:308
本文介绍了pyspark:从现有列创建MapType列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要基于现有的列创建一个新的Spark DF MapType列,其中列名是键,值是值.

I need to creeate an new Spark DF MapType Column based on the existing columns where column name is the key and the value is the value.

例如-我有这个DF:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),
                      ('d23d', 1.5, 2.0, 2.2), 
                      ('as3d', 2.2, 4.3, 9.0)
                          ])
schema = StructType([StructField('key', StringType(), True),
                     StructField('metric1', FloatType(), True),
                     StructField('metric2', FloatType(), True),
                     StructField('metric3', FloatType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k|    1.3|    6.3|    7.6|
|d23d|    1.5|    2.0|    2.2|
|as3d|    2.2|    4.3|    9.0|
+----+-------+-------+-------+

到目前为止,我已经可以从中创建一个structType了:

I'm already so far that i can create a structType from this:

nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric")
df2 = df.select("key", nameCol)

+----+-------------+
| key|       metric|
+----+-------------+
|123k|[1.3,6.3,7.6]|
|d23d|[1.5,2.0,2.2]|
|as3d|[2.2,4.3,9.0]|
+----+-------------+

但是我需要的是一个具有am MapType的度量标准列,其中键是列名:

But what i need is an metric column with am MapType where the key is the column name:

+----+-------------------------+
| key|                   metric|
+----+-------------------------+
|123k|Map(metric1 -> 1.3, me...|
|d23d|Map(metric1 -> 1.5, me...|
|as3d|Map(metric1 -> 2.2, me...|
+----+-------------------------+

有没有提示我如何转换数据?

Any hints how i can transform the data?

谢谢!

推荐答案

在Spark 2.0或更高版本中,您可以使用create_map.首先是一些进口:

In Spark 2.0 or later you can use create_map. First some imports:

from pyspark.sql.functions import lit, col, create_map
from itertools import chain

create_map期望keysvalues的交错序列,可以像这样创建:

create_map expects an interleaved sequence of keys and values which can be created for example like this:

metric = create_map(list(chain(*(
    (lit(name), col(name)) for name in df.columns if "metric" in name
)))).alias("metric")

并与select一起使用:

df.select("key", metric)

使用示例数据,结果为:

With example data the result is:

+----+---------------------------------------------------------+
|key |metric                                                   |
+----+---------------------------------------------------------+
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)      |
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)      |
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)      |
+----+---------------------------------------------------------+

如果使用早期版本的Spark,则必须使用UDF:

If you use an earlier version of Spark you'll have to use UDF:

from pyspark.sql import Column
from pyspark.sql.functions import struct
from pyspark.sql.types import DataType, DoubleType, StringType, MapType

def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column:
    args = [struct(lit(name), col(name)) for name in cols]
    as_map_ = udf(
        lambda *args: dict(args),
        MapType(StringType(), key_type)
    )
    return as_map_(*args)

可以如下使用:

df.select("key", 
    as_map(*[name for name in df.columns if "metric" in name]).alias("metric"))

这篇关于pyspark:从现有列创建MapType列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆