Pyspark复制行基于列值 [英] Pyspark Replicate Row based on column value
问题描述
我想基于每行上给定列的值复制DataFrame中的所有行,然后索引每个新行.假设我有:
I would like to replicate all rows in my DataFrame based on the value of a given column on each row, and than index each new row. Suppose I have:
Column A Column B
T1 3
T2 2
我希望结果是:
Column A Column B Index
T1 3 1
T1 3 2
T1 3 3
T2 2 1
T2 2 2
我能够使用固定值执行类似的操作,但是无法使用该列上的信息.我当前的固定值工作代码是:
I was able to to something similar with fixed values, but not by using the information found on the column. My current working code for fixed values is:
idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))
我尝试更改:
lit(i) for i in range(1, 10)
到
lit(i) for i in range(1, df['Column B'])
并将其添加到我的array()函数中:
and add it into my array() function:
df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))
,但不起作用(TypeError:"Column"对象无法解释为整数).
but it does not work (TypeError: 'Column' object cannot be interpreted as an integer).
我应该如何实现呢?
推荐答案
很遗憾,您不能在列上进行迭代这样.您始终可以使用udf
,但是我确实有一个非udf hack 解决方案,如果您使用的是Spark 2.1版或更高版本,则该解决方案应该对您有用.
Unfortunately you can't iterate over a Column like that. You can always use a udf
, but I do have a non-udf hack solution that should work for you if you're using Spark version 2.1 or higher.
诀窍是利用 pyspark.sql.functions.posexplode()
以获取索引值.我们通过重复逗号Column B
次来创建字符串来实现此目的.然后,我们在逗号上分割此字符串,并使用posexplode
获取索引.
The trick is to take advantage of pyspark.sql.functions.posexplode()
to get the index value. We do this by creating a string by repeating a comma Column B
times. Then we split this string on the comma, and use posexplode
to get the index.
df.createOrReplaceTempView("df") # first register the DataFrame as a temp table
query = 'SELECT '\
'`Column A`,'\
'`Column B`,'\
'pos AS Index '\
'FROM ( '\
'SELECT DISTINCT '\
'`Column A`,'\
'`Column B`,'\
'posexplode(split(repeat(",", `Column B`), ",")) '\
'FROM df) AS a '\
'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#| T1| 3| 1|
#| T1| 3| 2|
#| T1| 3| 3|
#| T2| 2| 1|
#| T2| 2| 2|
#+--------+--------+-----+
注意:您需要将列名称包装在反引号中,因为它们中有空格,如本博文所述:
Note: You need to wrap the column names in backticks since they have spaces in them as explained in this post: How to express a column which name contains spaces in Spark SQL
这篇关于Pyspark复制行基于列值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!