使用 pyspark 从字典映射数据框中的值 [英] map values in a dataframe from a dictionary using pyspark

查看:97
本文介绍了使用 pyspark 从字典映射数据框中的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道如何映射数据框中特定列中的值.

I want to know how to map values in a specific column in a dataframe.

我有一个如下所示的数据框:

I have a dataframe which looks like:

df = sc.parallelize([('india','japan'),('usa','uruguay')]).toDF(['col1','col2'])

+-----+-------+
| col1|   col2|
+-----+-------+
|india|  japan|
|  usa|uruguay|
+-----+-------+

我有一本字典,我想从中映射值.

I have a dictionary from where I want to map the values.

dicts = sc.parallelize([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')])

我想要的输出是:

+-----+-------+--------+--------+
| col1|   col2|col1_map|col2_map|
+-----+-------+--------+--------+
|india|  japan|     ind|     jpn|
|  usa|uruguay|      us|     urg|
+-----+-------+--------+--------+

我尝试使用 查找功能 但它不起作用.它抛出错误 SPARK-5063.以下是我失败的方法:

I have tried using the lookup function but it doesn't work. It throws error SPARK-5063. Following is my approach which failed:

def map_val(x):
    return dicts.lookup(x)[0]

myfun = udf(lambda x: map_val(x), StringType())

df = df.withColumn('col1_map', myfun('col1')) # doesn't work
df = df.withColumn('col2_map', myfun('col2')) # doesn't work

推荐答案

udf方式

我建议您将元组列表更改为dicts广播在udf中使用

dicts = sc.broadcast(dict([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')]))

from pyspark.sql import functions as f
from pyspark.sql import types as t
def newCols(x):
    return dicts.value[x]

callnewColsUdf = f.udf(newCols, t.StringType())

df.withColumn('col1_map', callnewColsUdf(f.col('col1')))\
    .withColumn('col2_map', callnewColsUdf(f.col('col2')))\
    .show(truncate=False)

应该给你

+-----+-------+--------+--------+
|col1 |col2   |col1_map|col2_map|
+-----+-------+--------+--------+
|india|japan  |ind     |jpn     |
|usa  |uruguay|us      |urg     |
+-----+-------+--------+--------+

join方式(比udf方式慢)

您所要做的就是将dicts rdd也更改为数据框,并使用两个连接别名,如下所示

df = sc.parallelize([('india','japan'),('usa','uruguay')]).toDF(['col1','col2'])

dicts = sc.parallelize([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')]).toDF(['key', 'value'])

from pyspark.sql import functions as f
df.join(dicts, df['col1'] == dicts['key'], 'inner')\
    .select(f.col('col1'), f.col('col2'), f.col('value').alias('col1_map'))\
    .join(dicts, df['col2'] == dicts['key'], 'inner') \
    .select(f.col('col1'), f.col('col2'), f.col('col1_map'), f.col('value').alias('col2_map'))\
    .show(truncate=False)

这应该给你相同的结果

这篇关于使用 pyspark 从字典映射数据框中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆