Spark:数据帧中zipwithindex的等效项 [英] Spark: equivelant of zipwithindex in dataframe
问题描述
假设我有以下数据框:
dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)]
df = sc.parallelize(dummy_data).toDF(['letter','number'])
我想创建以下数据框:
[('a',0),('b',2),('c',1),('d',3),('e',0)]
我所做的是将其转换为 rdd
并使用 zipWithIndex
函数并加入结果后:
What I do is to convert it to rdd
and use zipWithIndex
function and after join the results:
convertDF = (df.select('number')
.distinct()
.rdd
.zipWithIndex()
.map(lambda x:(x[0].number,x[1]))
.toDF(['old','new']))
finalDF = (df
.join(convertDF,df.number == convertDF.old)
.select(df.letter,convertDF.new))
数据帧中是否有与 zipWIthIndex
类似的功能?还有其他更有效的方法来完成这项任务吗?
Is if there is something similar function as zipWIthIndex
in dataframes? Is there another more efficient way to do this task?
推荐答案
请查看 https://issues.apache.org/jira/browse/SPARK-23074 对于数据帧中的这种直接功能奇偶校验.. 如果您有兴趣在 Spark 的某个时候看到这一点,请对该 jira 点赞.
Please check https://issues.apache.org/jira/browse/SPARK-23074 for this direct functionality parity in dataframes .. upvote that jira if you're interested to see this at some point in Spark.
这是 PySpark 中的一种解决方法:
Here's a workaround though in PySpark:
def dfZipWithIndex (df, offset=1, colName="rowId"):
'''
Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
and preserves a schema
:param df: source dataframe
:param offset: adjustment to zipWithIndex()'s index
:param colName: name of the index column
'''
new_schema = StructType(
[StructField(colName,LongType(),True)] # new added field in front
+ df.schema.fields # previous schema
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))
return spark.createDataFrame(new_rdd, new_schema)
这也可以在 包.
That's also available in abalon package.
这篇关于Spark:数据帧中zipwithindex的等效项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!