如何将新列添加到Spark DataFrame(使用PySpark)? [英] How do I add a new column to a Spark DataFrame (using PySpark)?

查看:85
本文介绍了如何将新列添加到Spark DataFrame(使用PySpark)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark DataFrame(使用PySpark 1.5.1),想添加一个新列.

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.

我尝试了以下操作,但均未成功:

I've tried the following without any success:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

使用此命令也出错:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

那么如何使用PySpark将新列(基于Python向量)添加到现有DataFrame中?

So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?

推荐答案

您不能将任意列添加到Spark中的DataFrame.只能通过使用文字来创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述?)

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

转换现有列:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

包含在join中:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

或使用函数/udf生成:

or generated with function / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

映射到Catalyst表达式的基于性能的内置函数(pyspark.sql.functions)通常优于Python用户定义的函数.

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

如果您想将任意RDD的内容添加为列,则可以

If you want to add content of an arbitrary RDD as a column you can

  • add row numbers to existing data frame
  • call zipWithIndex on RDD and convert it to data frame
  • join both using index as a join key

这篇关于如何将新列添加到Spark DataFrame(使用PySpark)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆