如何将 numpy.array 作为新列添加到 pyspark.SQL DataFrame? [英] How do you add a numpy.array as a new column to a pyspark.SQL DataFrame?
问题描述
这里是创建pyspark.sql DataFrame的代码
Here is the code to create a pyspark.sql DataFrame
import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
df = pd.DataFrame(np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]]), columns=['a','b','c'])
sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1)
所以 sparkdf 看起来像
So that sparkdf looks like
a b c
1 2 3
4 5 6
7 8 9
10 11 12
现在我想添加一个 numpy 数组(甚至是一个列表)作为新列
Now I would like to add as a new column a numpy array (or even a list)
new_col = np.array([20,20,20,20])
但是标准方式
sparkdf = sparkdf.withColumn('newcol', new_col)
失败.可能 udf 是要走的路,但我不知道如何创建一个 udf 为每个 DataFrame 行分配一个不同的值,即迭代 new_col.我查看了其他 pyspark 和 pyspark.sql 但找不到解决方案.此外,我需要留在 pyspark.sql 中,所以不是 Scala 解决方案.谢谢!
fails. Probably an udf is the way to go, but I don't know how to create an udf that assigns one different value per DataFrame row, i.e. that iterates through new_col. I have looked at other pyspark and pyspark.sql but couldn't find a solution. Also I need to stay within pyspark.sql so not a scala solution. Thanks!
推荐答案
假设数据框已排序以匹配数组中值的顺序,您可以按如下方式压缩 RDD 并重建数据框:
Assuming that data frame is sorted to match order of values in an array you can zip RDDs and rebuild data frame as follows:
n = sparkdf.rdd.getNumPartitions()
# Parallelize and cast to plain integer (np.int64 won't work)
new_col = sc.parallelize(np.array([20,20,20,20]), n).map(int)
def process(pair):
return dict(pair[0].asDict().items() + [("new_col", pair[1])])
rdd = (sparkdf
.rdd # Extract RDD
.zip(new_col) # Zip with new col
.map(process)) # Add new column
sqlContext.createDataFrame(rdd) # Rebuild data frame
您也可以使用连接:
new_col = sqlContext.createDataFrame(
zip(range(1, 5), [20] * 4),
("rn", "new_col"))
sparkdf.registerTempTable("df")
sparkdf_indexed = sqlContext.sql(
# Make sure we have specific order and add row number
"SELECT row_number() OVER (ORDER BY a, b, c) AS rn, * FROM df")
(sparkdf_indexed
.join(new_col, new_col.rn == sparkdf_indexed.rn)
.drop(new_col.rn))
但窗口函数组件不可扩展,应避免用于较大的数据集.
but window function component is not scalable and should be avoided with larger datasets.
当然,如果您只需要一列单个值,您可以简单地使用 lit
Of course if all you need is a column of a single value you can simply use lit
import pyspark.sql.functions as f
sparkdf.withColumn("new_col", f.lit(20))
但我认为事实并非如此.
but I assume it is not the case.
这篇关于如何将 numpy.array 作为新列添加到 pyspark.SQL DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!