将 List 元素作为列添加到现有 pyspark 数据框 [英] Adding a List element as a column to existing pyspark dataframe

查看:97
本文介绍了将 List 元素作为列添加到现有 pyspark 数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个列表lists=[0,1,2,3,5,6,7].顺序不是顺序的.我有一个包含 9 列的 pyspark 数据框.

+-----------+--------+--------+--------+--------+--------+--------+--------------+------+----+|日期|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmh湿度单位|指数|差异|+-------------------+--------+--------+--------+---------+--------+--------+--------------+-----+----+|2019-02-01 05:29:47|NaN|NaN|NaN|NaN|NaN|NaN|NaN|0|NaN||2019-02-01 05:29:17|NaN|NaN|NaN|NaN|NaN|NaN|NaN|1|南|

我需要将我的列表作为一列添加到我现有的数据框中.我的列表没有顺序,所以我无法使用 udf.有没有办法做到这一点?.请帮帮我我想要这样

+-----------+--------+--------+--------+--------+--------+--------+--------------+-----+----+------+|日期|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmh湿度单位|索引|差异|列表|+-------------------+--------+--------+--------+---------+--------+--------+--------------+-----+----+-------+|2019-02-01 05:29:47|NaN|NaN|NaN|NaN|NaN|NaN|NaN|0|NaN|0 ||2019-02-01 05:29:17|NaN|NaN|NaN|NaN|NaN|NaN|NaN|1|NaN |1 |

解决方案

不太确定它是否必须是这样的,或者您是否期待其他的东西.如果您的列表项和数据框行数必须相同,那么这里有一个简单的方法.

对于具有三列的给定示例数据框:

 l = [(1,'DEF',33),(2,'KLM',22),(3,'ABC',32),(4,'XYZ',77)]df=spark.createDataFrame(l, ['id', 'value','age'])

让我们说这是一个列表:

lists=[5,6,7,8]

可以从这个列表创建一个 rdd 并使用带有数据框的 zip 函数并在其上使用 map 函数.

listrdd = sc.parallelize(lists)newdf=df.rdd.zip(listrdd).map(lambda (x,y) : ([x for x in x] + [y])).toDF(["id", "Value",",age","List_element"])>>>ziprdd=df.rdd.zip(listrdd)>>>ziprdd.take(50)[(Row(id=1, value=u'DEF', age=33), 5), (Row(id=2, value=u'KLM', age=22), 6), (Row(id=3, value=u'ABC', age=32), 7), (Row(id=4, value=u'XYZ', age=77), 8)]

作为 zip 函数返回键值对,第一个元素包含来自第一个 rdd 的数据,第二个元素包含来自第二个 rdd 的数据.我对第一个元素使用列表理解并将其与第二个元素连接.

它是动态的,可以用于 n 列,但列表元素和数据框行必须相同.

<预><代码>>>>newdf.show()]+---+-----+----+------------+|id|值|,年龄|List_element|+---+-----+----+------------+|1|防御|33|5||2|荷航|22|6||3|ABC|32|7||4|XYZ|77|8|+---+-----+----+------------+

注意:使用 zip 方法时两个 rdd 分区计数必须相同,否则会出现错误

ValueError: 只能使用具有相同分区数的 RDD 进行压缩

I have a list lists=[0,1,2,3,5,6,7]. Order is not sequential. I have a pyspark dataframe with 9 columns.

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|    NaN|           NaN|    1| NaN |

I need to add my lists as a column to my existing dataframe. My lists is not in order so iam not able to use udf. Is there a way to do it?.Please help me I want it to be like this

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+------+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|lists |
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+-------+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|0     |
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|           NaN|    1| NaN |1     |

解决方案

Not too sure if it has to be something like this or were you expecting something else. If your number of list items and dataframe rows has to be same then here's a simple approach.

For a given sample dataframe with three columns:

 l = [(1,'DEF',33),(2,'KLM',22),(3,'ABC',32),(4,'XYZ',77)]
 df=spark.createDataFrame(l, ['id', 'value','age'])

Lets say here's a list:

lists=[5,6,7,8]

Can create a rdd from this list and use a zip function with the dataframe and use map function over it.

listrdd = sc.parallelize(lists)

newdf=df.rdd.zip(listrdd).map(lambda (x,y ) : ([x for x in x] + [y])).toDF(["id", "Value",",age","List_element"])

>>> ziprdd=df.rdd.zip(listrdd)
>>> ziprdd.take(50)
[(Row(id=1, value=u'DEF', age=33), 5), (Row(id=2, value=u'KLM', age=22), 6), (Row(id=3, value=u'ABC', age=32), 7), (Row(id=4, value=u'XYZ', age=77), 8)]

As zip function return key value pairs having first element contains data from first rdd and second element contains data from second rdd. I am using list comprehension for first element and concatenating it with second element.

It's dynamic and can work for n number of columns but list elements and dataframe rows has to be same.

>>> newdf.show()
]+---+-----+----+------------+
| id|Value|,age|List_element|
+---+-----+----+------------+
|  1|  DEF|  33|           5|
|  2|  KLM|  22|           6|
|  3|  ABC|  32|           7|
|  4|  XYZ|  77|           8|
+---+-----+----+------------+

Note: Both rdd partition count has to be same for using zip method else you will get an error

ValueError: Can only zip with RDD which has the same number of partitions

这篇关于将 List 元素作为列添加到现有 pyspark 数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆