PySpark:当列是列表时,将列添加到DataFrame [英] PySpark: Add a column to DataFrame when column is a list

查看:383
本文介绍了PySpark:当列是列表时,将列添加到DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我读过类似的问题,但找不到解决我特定问题的方法。

I have read similar questions but couldn't find a solution to my specific problem.

我有一个列表

l = [1, 2, 3]

和DataFrame

and a DataFrame

df = sc.parallelize([
    ['p1', 'a'],
    ['p2', 'b'],
    ['p3', 'c'],
]).toDF(('product', 'name'))

我想获得一个新的DataFrame,其中的列表 l 添加为另一列,即

I would like to obtain a new DataFrame where the list l is added as a further column, namely

+-------+----+---------+
|product|name| new_col |
+-------+----+---------+
|     p1|   a|     1   |
|     p2|   b|     2   |
|     p3|   c|     3   |
+-------+----+---------+

使用JOIN的方法,我在那里以

Approaches with JOIN, where I was joining df with an

 sc.parallelize([[1], [2], [3]])

失败。使用 withColumn 的方法,例如

have failed. Approaches using withColumn, as in

new_df = df.withColumn('new_col', l)

失败,因为列表不是 Column 对象。

have failed because the list is not a Column object.

推荐答案

因此,通过阅读一些有趣的东西此处,我确定您不能真的只是随机添加一个/任意列到给定的 DataFrame 对象。您似乎想要的更多的是 zip 而不是 join 。我环顾四周,发现这张票,这让我觉得您不会假设您拥有 DataFrame 而不是 RDD zip >对象。

So, from reading some interesting stuff here, I've ascertained that you can't really just append a random / arbitrary column to a given DataFrame object. It appears what you want is more of a zip than a join. I looked around and found this ticket, which makes me think you won't be able to zip given that you have DataFrame rather than RDD objects.

我能够解决您的问题的唯一方法是离开 DataFrame 对象的世界并返回到 RDD 对象。为了联接的目的,我还需要创建一个索引,它可能与您的用例一起使用,也可能无法使用。

The only way I've been able to solve your issue invovles leaving the world of DataFrame objects and returning to RDD objects. I've also needed to create an index for the purpose of the join, which may or may not work with your use case.

l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)

rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)

# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])

当我运行 new_df.show(),我得到:

+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
|     p1|   a|      1|
|     p2|   b|      2|
|     p3|   c|      3|
+-------+----+-------+






旁注:我真的很惊讶这没有用。看起来像外部联接?


Sidenote: I'm really surprised this didn't work. Looks like an outer join?

from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)

当我运行 new_df.show()时,我得到:

+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
|     p1|   a|           1|
|     p1|   a|           2|
|     p1|   a|           3|
|     p2|   b|           1|
|     p3|   c|           1|
|     p2|   b|           2|
|     p2|   b|           3|
|     p3|   c|           2|
|     p3|   c|           3|
+-------+----+------------+

这篇关于PySpark:当列是列表时,将列添加到DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆