在数据框usnig scala中添加序列号列 [英] add sequence number column in dataframe usnig scala

查看:97
本文介绍了在数据框usnig scala中添加序列号列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是在数据帧中添加序列号列的逻辑.当我从定界文件中读取数据时,它按预期工作.今天,我有一个新任务要从oracle表读取数据,并添加序列号并进一步处理.当我从oracle表中读取序列号时,我面临以下逻辑问题,即在数据帧中添加序列号.

below is the logic to add sequence number column in dataframe. Its working as expected when I am reading data from delimited files. Today I have a new task to read the data from oracle table and add Sequence number and process further. I am facing issue with below logic to add sequence number in data frame when I read it from oracle table.

oracleTableDF是我的数据框

oracleTableDF is my dataframe

   //creating Sequence no. logic for SeqNum
   val rowRDD = oracleTableDF.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq)) 

  //creating StructType to add Seqnum in schema
        val newstructure = StructType(Array(StructField("SeqNum",LongType)).++(oracleTableDF.schema.fields))

  //creating new Data Frame with seqnum
  oracleTableDF = spark.createDataFrame(rowRDD, newstructure)

我无法找到实际的问题.因为当我从文件中读取逻辑时,该逻辑在群集中按预期工作.但是当我从oracle表中读取它时遇到了一些问题.它也可以在本地模式下按预期工作.

I am not able to locate the actual Issue. because the logic is working as expected in cluster when I read it from files. but facing some issue when I read it from oracle table. its working as expected in local mode also.

以下是错误:

below is the error :

"ERROR scheduler.TaskSetManager:阶段1.0中的任务0失败4次;正在中止作业 org.apache.spark.SparkException:由于阶段失败而导致作业中止:阶段1.0中的任务0失败4次,最近一次失败:阶段1.0中的任务0.3(TID 4,xxxx,执行器1)丢失:java.lang.NoClassDefFoundError:无法初始化类oracleDataProcess $"

"ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, xxxx, executor 1): java.lang.NoClassDefFoundError: Could not initialize class oracleDataProcess$"

推荐答案

如果只需要在数据框中添加具有自动递增整数值的列,则可以使用monotonicallyIncreasingId,该值为LongType:

If all you need is to add a column to your dataframe with an auto-increment integer value, you can use monotonicallyIncreasingId which is of LongType:

val oracleTableDF2 = oracleTableDF.withColumn("SeqNum", monotonicallyIncreasingId)

[更新]

请注意,不建议使用monotonicallyIncreasingId.应该使用monotonically_increasing_id().

Note that monotonicallyIncreasingId is deprecated. monotonically_increasing_id() should be used instead.

这篇关于在数据框usnig scala中添加序列号列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆