spark数据框中的withColumn在SaveMode.Append中插入NULL [英] withColumn in spark dataframe inserts NULL in SaveMode.Append

查看:31
本文介绍了spark数据框中的withColumn在SaveMode.Append中插入NULL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用于创建 Hive 外部表的 Spark 应用程序,它第一次运行良好,即在带有分区的 Hive 中创建表时.我有三个分区,分别是 event,centerCode,ExamDate

I have a spark application to create Hive external table which works fine for the first time that is while creating the table in hive with partitions. I have three partition namely event,centerCode,ExamDate

  var sqlContext = spark.sqlContext
  sqlContext.setConf("hive.exec.dynamic.partition", "true")
  sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
  import org.apache.spark.sql.functions._

  val candidateList = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("nullValue", "null").option("quote", "\"").option("dateFormat", "dd/MM/yyyy")
  .schema(StructType(Array(StructField("RollNo/SeatNo", StringType, true), StructField("LabName", StringType, true), StructField("Student_Name", StringType, true), StructField("ExamName", StringType, true), StructField("ExamDate", DateType, true), StructField("ExamTime", StringType, true), StructField("CenterCode", StringType, true), StructField("Center", StringType, true)))).option("multiLine", "true").option("mode", "DROPMALFORMED").load(filePath(0))
  val nef = candidateList.withColumn("event", lit(eventsId))

分区列 event 不会出现在输入 csv 文件中,所以我将该列添加到数据框 candidateList 使用 withColumn("event", lit(eventsId))

Partition column event will not be present in input csv file so I'm adding that column to the dataframe candidateList using withColumn("event", lit(eventsId))

当我将它写入 Hive 表时,它工作正常 withColumn 添加到表中,事件说ABCD"并且分区按预期创建.

While im writing it to the Hive table it works fine withColumn added to the table with event say "ABCD" and the partitions are created as expected.

nef.repartition(1).write.mode(SaveMode.Overwrite).option("path", candidatePath).partitionBy("event", "CenterCode", "ExamDate").saveAsTable("sify_cvs_output.candidatelist")

candidateList.show() 给出

 +-------------+--------------------+-------------------+----------+----------+--------+----------+--------------------+-----+
 |RollNo/SeatNo|             LabName|       Student_Name|  ExamName|  ExamDate|ExamTime|CenterCode|              Center|event|
 +-------------+--------------------+-------------------+----------+----------+--------+----------+--------------------+-----+
 |     80000077|BUILDING-MAIN FLO...|     ABBAS MOHAMMAD|PGECETICET|2018-07-30|10:00 AM|   500098A|500098A-SURYA TEC...| ABCD|
 |     80000056|BUILDING-MAIN FLO...|  ABDUL YASARARFATH|PGECETICET|2018-07-30|10:00 AM|   500098A|500098A-SURYA TEC...| ABCD|

但是我第二次尝试将数据附加到已经使用新事件EFGH"创建的 hive 表中,但是第二次添加的列使用 withColumn 插入为 NULL

But for the second time i'm trying to Append the data to the hive table created already with a new event "EFGH" but for the second time the added column using withColumn inserted as NULL

   nef.write.mode(SaveMode.Append).insertInto("sify_cvs_output.candidatelist") and the partitions also haven't come properly  as one of the partition column becomes `NULL`, so I tried adding one more new column in the dataframe `.withColumn("sample", lit("sample"))` again for the first time it writes all the extra added columns to the table and the next time on `SaveMode.Append` inserts the `event` column and the `sample` column added to the table as `NULL` 

显示创建表如下

 CREATE EXTERNAL TABLE `candidatelist`(
   `rollno/seatno` string,
   `labname` string,
   `student_name` string,
   `examname` string,
   `examtime` string,
   `center` string,
   `sample` string)
 PARTITIONED BY (
   `event` string,
   `centercode` string,
   `examdate` date)
 ROW FORMAT SERDE
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
 WITH SERDEPROPERTIES (
   'path'='hdfs://172.16.2.191:8020/biometric/sify/cvs/output/candidate/')
 STORED AS INPUTFORMAT
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
 OUTPUTFORMAT
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
 LOCATION
   'hdfs://172.16.2.191:8020/biometric/sify/cvs/output/candidate'
 TBLPROPERTIES (
   'spark.sql.partitionProvider'='catalog',
   'spark.sql.sources.provider'='parquet',
   'spark.sql.sources.schema.numPartCols'='3',
   'spark.sql.sources.schema.numParts'='1',
   'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[{\"name\":\"RollNo/SeatNo\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"LabName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Student_Name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ExamName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ExamTime\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Center\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"sample\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"event\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CenterCode\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ExamDate\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}',
   'spark.sql.sources.schema.partCol.0'='event',
   'spark.sql.sources.schema.partCol.1'='CenterCode',
   'spark.sql.sources.schema.partCol.2'='ExamDate',
   'transient_lastDdlTime'='1536040545')
 Time taken: 0.025 seconds, Fetched: 32 row(s)
 hive>

我在这里做错了什么..!

What am I doing wrong here..!

更新

@pasha701,下面是我的 sparkSession

@pasha701, below is my sparkSession

 val Spark=SparkSession.builder().appName("splitInput").master("local").config("spark.hadoop.fs.defaultFS", "hdfs://" + hdfsIp)
    .config("hive.metastore.uris", "thrift://172.16.2.191:9083")
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    .enableHiveSupport()
    .getOrCreate()

如果我在 InsertInto 中添加 partitionBy

and if I add partitionBy in InsertInto

  nef.write.mode(SaveMode.Append).partitionBy("event", "CenterCode", "ExamDate").option("path", candidatePath).insertInto("sify_cvs_output.candidatelist")

它抛出异常 org.apache.spark.sql.AnalysisException:insertInto() 不能与 partitionBy() 一起使用.已经为表定义了分区列.没有必要使用 partitionBy().;

推荐答案

第二次也必须使用partitionBy".也可能需要选项hive.exec.dynamic.partition.mode".

Second time "partitionBy" also have to be used. Also maybe option "hive.exec.dynamic.partition.mode" will be required.

这篇关于spark数据框中的withColumn在SaveMode.Append中插入NULL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆