向pyspark中的数据帧添加唯一的连续行号 [英] adding a unique consecutive row number to dataframe in pyspark

查看:92
本文介绍了向pyspark中的数据帧添加唯一的连续行号的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 pyspark 中将唯一的行号添加到我的数据框中,并且不想使用 monotonicallyIncreasingId &partitionBy 方法.我认为这个问题可能与之前提出的类似问题重复,仍在寻找一些建议,无论我的做法是否正确.以下是我的代码片段:我有一个包含以下输入记录集的 csv 文件:

I want to add the unique row number to my dataframe in pyspark and dont want to use monotonicallyIncreasingId & partitionBy methods. I think that this question might be a duplicate of similar questions asked earlier, still looking for some advice whether I am doing it right way or not. following is snippet of my code: I have a csv file with below set of input records:

1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000

并且我已将此 csv 文件加载到数据框中.

and I have loaded this csv file into a dataframe.

PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"

emp_df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

+------+--------------------+--------+----------+
|emp_id|            emp_name|emp_city|emp_salary|
+------+--------------------+--------+----------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|USA     |     50000|
|     6|RAJAT TYAGI      ...|UP      |     65000|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|
|     9|ROBERT           ...|GURGAON |     70000|
+------+--------------------+--------+----------+

empRDD = emp_df.rdd.zipWithIndex()
newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
 newRDD.take(2);
[[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]

当我在列表中包含 int 值时,我丢失了数据帧架构.

when I included the int value to my list, I have lost the dataframe schema.

newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
newdf.show();

+------+--------------------+--------+----------+------+
|emp_id|            emp_name|emp_city|emp_salary|row_id|
+------+--------------------+--------+----------+------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
|     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
|     6|RAJAT TYAGI      ...|UP      |     65000|     5|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
|     9|ROBERT           ...|GURGAON |     70000|     8|
+------+--------------------+--------+----------+------+

我这样做对吗?或者有没有更好的方法在 pyspark 中添加或保留数据帧的架构?

Am I doing it right way? or is there any better way to add or preserve the schema of dataframe in pyspark?

使用 zipWithIndex 方法为大尺寸数据框添加唯一的连续行号是否可行?我们可以使用这个 row_id 对数据帧进行重新分区,以便在各个分区之间均匀分布数据吗?

Is it feasible to use zipWithIndex method to add unique consecutive row number for large size dataframe also? Can we use this row_id to re-partition the dataframe to uniformly distribute the data across the partitions?

推荐答案

我找到了一个解决方案,而且非常简单.因为我的数据框中没有列在所有行中都具有相同的值,所以在将 row_number 与 partitionBy 子句一起使用时,它不会生成唯一的行号.

I have found a solution and it's very simple. since I have no column in my dataframe which is having same value across all the rows, so using row_number is not generating unique row numbers when using it with partitionBy clause.

让我们向现有数据框中添加一个新列,其中包含一些默认值.

Lets add a new column to the existing dataframe with some default value in it.

emp_df= emp_df.withColumn("new_column",lit("ABC"))

并使用parition创建一个窗口函数通过使用该列new_column"

and create a window function with paritionBy using that column "new_column"

w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")

你会得到想要的结果:

+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
|     9|ROBERT           ...|GURGAON |     70000|      4|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
|     6|RAJAT TYAGI      ...|UP      |     65000|      9|
+------+--------------------+--------+----------+-------+

这篇关于向pyspark中的数据帧添加唯一的连续行号的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆