Pyspark复制行基于列值 [英] Pyspark Replicate Row based on column value

查看:158
本文介绍了Pyspark复制行基于列值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想基于每行上给定列的值复制DataFrame中的所有行,然后索引每个新行.假设我有:

I would like to replicate all rows in my DataFrame based on the value of a given column on each row, and than index each new row. Suppose I have:

Column A Column B
T1       3
T2       2

我希望结果是:

Column A Column B Index
T1       3        1
T1       3        2
T1       3        3
T2       2        1
T2       2        2

我能够使用固定值执行类似的操作,但是无法使用该列上的信息.我当前的固定值工作代码是:

I was able to to something similar with fixed values, but not by using the information found on the column. My current working code for fixed values is:

idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))

我尝试更改:

lit(i) for i in range(1, 10) 

lit(i) for i in range(1, df['Column B'])

并将其添加到我的array()函数中:

and add it into my array() function:

df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))

,但不起作用(TypeError:"Column"对象无法解释为整数).

but it does not work (TypeError: 'Column' object cannot be interpreted as an integer).

我应该如何实现呢?

推荐答案

很遗憾,您不能在列上进行迭代这样.您始终可以使用udf,但是我确实有一个非udf hack 解决方案,如果您使用的是Spark 2.1版或更高版本,则该解决方案应该对您有用.

Unfortunately you can't iterate over a Column like that. You can always use a udf, but I do have a non-udf hack solution that should work for you if you're using Spark version 2.1 or higher.

诀窍是利用 pyspark.sql.functions.posexplode() 以获取索引值.我们通过重复逗号Column B次来创建字符串来实现此目的.然后,我们在逗号上分割此字符串,并使用posexplode获取索引.

The trick is to take advantage of pyspark.sql.functions.posexplode() to get the index value. We do this by creating a string by repeating a comma Column B times. Then we split this string on the comma, and use posexplode to get the index.

df.createOrReplaceTempView("df")  # first register the DataFrame as a temp table

query = 'SELECT '\
    '`Column A`,'\
    '`Column B`,'\
    'pos AS Index '\
    'FROM ( '\
        'SELECT DISTINCT '\
        '`Column A`,'\
        '`Column B`,'\
        'posexplode(split(repeat(",", `Column B`), ",")) '\
        'FROM df) AS a '\
    'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#|      T1|       3|    1|
#|      T1|       3|    2|
#|      T1|       3|    3|
#|      T2|       2|    1|
#|      T2|       2|    2|
#+--------+--------+-----+

注意:您需要将列名称包装在反引号中,因为它们中有空格,如本博文所述:

Note: You need to wrap the column names in backticks since they have spaces in them as explained in this post: How to express a column which name contains spaces in Spark SQL

这篇关于Pyspark复制行基于列值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆