如何在pySpark数据帧中添加行ID [英] how to add Row id in pySpark dataframes
问题描述
我有一个 csv 文件;我在 pyspark 中将其转换为 DataFrame(df);经过一些改造;我想在 df 中添加一列;这应该是简单的行 ID(从 0 或 1 到 N).
我在 rdd 中转换了 df 并使用了zipwithindex".我将结果 rdd 转换回 df.这种方法有效,但它生成了 25 万个任务并且需要大量时间来执行.我想知道是否有其他方法可以减少运行时间.
以下是我的代码片段;我正在处理的 csv 文件很大;包含数十亿行.
debug_csv_rdd = (sc.textFile("debug.csv").filter(lambda x: x.find('header') == -1).map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(',')).map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd)debug_csv_df.registerTempTable("debug_csv_table")sqlContext.cacheTable("debug_csv_table")r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'")r0.registerTempTable("r0_table")r0_1 = (r0.flatMap(lambda x:x).zipWithIndex().map(lambda x: Row(c1=x[0],id=int(x[1]))))r0_df=sqlContext.createDataFrame(r0_2)r0_df.show(10)
您也可以使用 sql 包中的函数.它将生成一个唯一的 id,但是它不会是连续的,因为它取决于分区的数量.我相信它在 Spark 1.5 +
中可用 from pyspark.sql.functions import monotonicallyIncreasingId# 这将返回一个包含所有列 + id 的新 DFres = df.withColumn("id", monotonicallyIncreasingId())
<小时>
19/1/2017
正如 @Sean 所评论的那样>
使用 monotonically_increasing_id()
代替 Spark 1.6 及以上
I have a csv file; which i convert to DataFrame(df) in pyspark; after some transformation; I want to add a column in df; which should be simple row id (starting from 0 or 1 to N).
I converted df in rdd and use "zipwithindex". I converted resulting rdd back to df. this approach works but it generated 250k tasks and takes a lot of time in execution. I was wondering if there is other way to do it which takes less runtime.
following is snippet of my code; the csv file I am processing is BIG; contains billions of rows.
debug_csv_rdd = (sc.textFile("debug.csv")
.filter(lambda x: x.find('header') == -1)
.map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(','))
.map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))
debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd)
debug_csv_df.registerTempTable("debug_csv_table")
sqlContext.cacheTable("debug_csv_table")
r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'")
r0.registerTempTable("r0_table")
r0_1 = (r0.flatMap(lambda x:x)
.zipWithIndex()
.map(lambda x: Row(c1=x[0],id=int(x[1]))))
r0_df=sqlContext.createDataFrame(r0_2)
r0_df.show(10)
You can use also use a function from sql package. It will generate a unique id, however it will not be sequential as it depends on the number of partitions. I believe it is available in Spark 1.5 +
from pyspark.sql.functions import monotonicallyIncreasingId
# This will return a new DF with all the columns + id
res = df.withColumn("id", monotonicallyIncreasingId())
Edit: 19/1/2017
As commented by @Sean
Use monotonically_increasing_id()
instead from Spark 1.6 and on
这篇关于如何在pySpark数据帧中添加行ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!