如何在pyspark数据框中动态添加列 [英] How to add columns in pyspark dataframe dynamically

查看:78
本文介绍了如何在pyspark数据框中动态添加列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试根据输入变量 vIssueCols 添加几列

I am trying to add few columns based on input variable vIssueCols

from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
vIssueCols=['jobid','locid']
vQuery1 = 'vSrcData2= vSrcData'
vWindow1 = Window.partitionBy("vKey").orderBy("vOrderBy")
for x in vIssueCols:
Query1=vQuery1+'.withColumn("'+x+'_prev",F.lag(vSrcData.'+x+').over(vWindow1))'

exec(vQuery1)

现在上面的查询将生成如下 vQuery1,并且它正在工作,但是

now above query will generate vQuery1 as below, and it is working, but

vSrcData2= vSrcData.withColumn("jobid_prev",F.lag(vSrcData.jobid).over(vWindow1)).withColumn("locid_prev",F.lag(vSrcData.locid).over(vWindow1))

我不能写一个类似

vSrcData2= vSrcData.withColumn(x+"_prev",F.lag(vSrcData.x).over(vWindow1))for x in vIssueCols

并使用循环语句生成列.一些博客建议添加一个 udf 并调用它,但使用 udf 我将使用上面执行字符串的方法.

and generate the columns with the loop statement. Some blog has suggested to add a udf and call that, But instead using udf I will use above executing string method.

推荐答案

您可以使用 reduce 构建查询.

You can build your query using reduce.

from pyspark.sql.functions import lag
from pyspark.sql.window import Window
from functools import reduce

#sample data
df = sc.parallelize([[1, 200, '1234', 'asdf'],
                     [1, 50, '2345', 'qwerty'],
                     [1, 100, '4567', 'xyz'],
                     [2, 300, '123', 'prem'],
                     [2, 10, '000', 'ankur']]).\
    toDF(["vKey","vOrderBy","jobid","locid"])
df.show()

vWindow1 = Window.partitionBy("vKey").orderBy("vOrderBy")

#your existing processing
df1= df.\
    withColumn("jobid_prev",lag(df.jobid).over(vWindow1)).\
    withColumn("locid_prev",lag(df.locid).over(vWindow1))
df1.show()

#to-be processing
vIssueCols=['jobid','locid']
df2 = (reduce(
    lambda r_df, col_name: r_df.withColumn(col_name+"_prev", lag(r_df[col_name]).over(vWindow1)),
    vIssueCols,
    df
))
df2.show()

示例数据:

+----+--------+-----+------+
|vKey|vOrderBy|jobid| locid|
+----+--------+-----+------+
|   1|     200| 1234|  asdf|
|   1|      50| 2345|qwerty|
|   1|     100| 4567|   xyz|
|   2|     300|  123|  prem|
|   2|      10|  000| ankur|
+----+--------+-----+------+

输出:

+----+--------+-----+------+----------+----------+
|vKey|vOrderBy|jobid| locid|jobid_prev|locid_prev|
+----+--------+-----+------+----------+----------+
|   1|      50| 2345|qwerty|      null|      null|
|   1|     100| 4567|   xyz|      2345|    qwerty|
|   1|     200| 1234|  asdf|      4567|       xyz|
|   2|      10|  000| ankur|      null|      null|
|   2|     300|  123|  prem|       000|     ankur|
+----+--------+-----+------+----------+----------+

希望这会有所帮助!

这篇关于如何在pyspark数据框中动态添加列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆