如何在pySpark数据帧中添加行ID [英] how to add Row id in pySpark dataframes

查看:217
本文介绍了如何在pySpark数据帧中添加行ID的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个csv文件;我在pyspark中将其转换为DataFrame(df);经过一些改造;我想在df中添加一列;它应该是简单的行ID(从0或1到N).

I have a csv file; which i convert to DataFrame(df) in pyspark; after some transformation; I want to add a column in df; which should be simple row id (starting from 0 or 1 to N).

我在rdd中转换了df,并使用"zipwithindex".我将生成的rdd转换回df.这种方法行之有效,但它产生了25万个任务,并且执行过程花费大量时间.我想知道是否还有其他方法可以减少运行时间.

I converted df in rdd and use "zipwithindex". I converted resulting rdd back to df. this approach works but it generated 250k tasks and takes a lot of time in execution. I was wondering if there is other way to do it which takes less runtime.

以下是我的代码的摘要;我正在处理的csv文件很大;包含数十亿行.

following is snippet of my code; the csv file I am processing is BIG; contains billions of rows.

debug_csv_rdd = (sc.textFile("debug.csv")
  .filter(lambda x: x.find('header') == -1)
  .map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(','))
  .map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))

debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd)
debug_csv_df.registerTempTable("debug_csv_table")
sqlContext.cacheTable("debug_csv_table")

r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'")
r0.registerTempTable("r0_table")

r0_1 = (r0.flatMap(lambda x:x)
    .zipWithIndex()
    .map(lambda x: Row(c1=x[0],id=int(x[1]))))

r0_df=sqlContext.createDataFrame(r0_2)
r0_df.show(10) 

推荐答案

您还可以使用sql包中的函数.它将生成一个唯一的ID,但是它不是顺序的,因为它取决于分区的数量.我相信它可以在Spark 1.5 +

You can use also use a function from sql package. It will generate a unique id, however it will not be sequential as it depends on the number of partitions. I believe it is available in Spark 1.5 +

from pyspark.sql.functions import monotonicallyIncreasingId

# This will return a new DF with all the columns + id
res = df.withColumn("id", monotonicallyIncreasingId())


2017年1月19日


19/1/2017

@Sean 的评论

使用monotonically_increasing_id()代替Spark 1.6及更高版本

Use monotonically_increasing_id() instead from Spark 1.6 and on

这篇关于如何在pySpark数据帧中添加行ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆