spark数据框中的withColumn在SaveMode.Append中插入NULL [英] withColumn in spark dataframe inserts NULL in SaveMode.Append

查看:305
本文介绍了spark数据框中的withColumn在SaveMode.Append中插入NULL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个spark应用程序来创建Hive外部表,该表在带有分区的Hive中创建表时第一次运行良好.我有三个分区,即event,centerCode,ExamDate

I have a spark application to create Hive external table which works fine for the first time that is while creating the table in hive with partitions. I have three partition namely event,centerCode,ExamDate

  var sqlContext = spark.sqlContext
  sqlContext.setConf("hive.exec.dynamic.partition", "true")
  sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
  import org.apache.spark.sql.functions._

  val candidateList = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("nullValue", "null").option("quote", "\"").option("dateFormat", "dd/MM/yyyy")
  .schema(StructType(Array(StructField("RollNo/SeatNo", StringType, true), StructField("LabName", StringType, true), StructField("Student_Name", StringType, true), StructField("ExamName", StringType, true), StructField("ExamDate", DateType, true), StructField("ExamTime", StringType, true), StructField("CenterCode", StringType, true), StructField("Center", StringType, true)))).option("multiLine", "true").option("mode", "DROPMALFORMED").load(filePath(0))
  val nef = candidateList.withColumn("event", lit(eventsId))

分区列event将不会出现在输入的csv文件中,因此我正在使用withColumn("event", lit(eventsId))

Partition column event will not be present in input csv file so I'm adding that column to the dataframe candidateList using withColumn("event", lit(eventsId))

在将其写入Hive表时,它可以正常工作withColumn添加到表中,并显示事件"ABCD",并且按预期方式创建了分区.

While im writing it to the Hive table it works fine withColumn added to the table with event say "ABCD" and the partitions are created as expected.

nef.repartition(1).write.mode(SaveMode.Overwrite).option("path", candidatePath).partitionBy("event", "CenterCode", "ExamDate").saveAsTable("sify_cvs_output.candidatelist")

candidateList.show()给予

 +-------------+--------------------+-------------------+----------+----------+--------+----------+--------------------+-----+
 |RollNo/SeatNo|             LabName|       Student_Name|  ExamName|  ExamDate|ExamTime|CenterCode|              Center|event|
 +-------------+--------------------+-------------------+----------+----------+--------+----------+--------------------+-----+
 |     80000077|BUILDING-MAIN FLO...|     ABBAS MOHAMMAD|PGECETICET|2018-07-30|10:00 AM|   500098A|500098A-SURYA TEC...| ABCD|
 |     80000056|BUILDING-MAIN FLO...|  ABDUL YASARARFATH|PGECETICET|2018-07-30|10:00 AM|   500098A|500098A-SURYA TEC...| ABCD|

但是我第二次尝试将数据追加到已经使用新事件"EFGH"创建的配置单元表中,但是第二次使用withColumn作为NULL

But for the second time i'm trying to Append the data to the hive table created already with a new event "EFGH" but for the second time the added column using withColumn inserted as NULL

   nef.write.mode(SaveMode.Append).insertInto("sify_cvs_output.candidatelist") and the partitions also haven't come properly  as one of the partition column becomes `NULL`, so I tried adding one more new column in the dataframe `.withColumn("sample", lit("sample"))` again for the first time it writes all the extra added columns to the table and the next time on `SaveMode.Append` inserts the `event` column and the `sample` column added to the table as `NULL` 

show create table下方

 CREATE EXTERNAL TABLE `candidatelist`(
   `rollno/seatno` string,
   `labname` string,
   `student_name` string,
   `examname` string,
   `examtime` string,
   `center` string,
   `sample` string)
 PARTITIONED BY (
   `event` string,
   `centercode` string,
   `examdate` date)
 ROW FORMAT SERDE
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
 WITH SERDEPROPERTIES (
   'path'='hdfs://172.16.2.191:8020/biometric/sify/cvs/output/candidate/')
 STORED AS INPUTFORMAT
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
 OUTPUTFORMAT
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
 LOCATION
   'hdfs://172.16.2.191:8020/biometric/sify/cvs/output/candidate'
 TBLPROPERTIES (
   'spark.sql.partitionProvider'='catalog',
   'spark.sql.sources.provider'='parquet',
   'spark.sql.sources.schema.numPartCols'='3',
   'spark.sql.sources.schema.numParts'='1',
   'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[{\"name\":\"RollNo/SeatNo\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"LabName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Student_Name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ExamName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ExamTime\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Center\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"sample\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"event\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CenterCode\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ExamDate\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}',
   'spark.sql.sources.schema.partCol.0'='event',
   'spark.sql.sources.schema.partCol.1'='CenterCode',
   'spark.sql.sources.schema.partCol.2'='ExamDate',
   'transient_lastDdlTime'='1536040545')
 Time taken: 0.025 seconds, Fetched: 32 row(s)
 hive>

我在这里做错什么了..!

What am I doing wrong here..!

更新

@ pasha701,下面是我的sparkSession

@pasha701, below is my sparkSession

 val Spark=SparkSession.builder().appName("splitInput").master("local").config("spark.hadoop.fs.defaultFS", "hdfs://" + hdfsIp)
    .config("hive.metastore.uris", "thrift://172.16.2.191:9083")
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    .enableHiveSupport()
    .getOrCreate()

,如果我在InsertInto

  nef.write.mode(SaveMode.Append).partitionBy("event", "CenterCode", "ExamDate").option("path", candidatePath).insertInto("sify_cvs_output.candidatelist")

它将异常抛出为org.apache.spark.sql.AnalysisException: insertInto() can't be used together with partitionBy(). Partition columns have already be defined for the table. It is not necessary to use partitionBy().;

推荐答案

还必须使用第二次"partitionBy".另外,可能还会需要"hive.exec.dynamic.partition.mode"选项.

Second time "partitionBy" also have to be used. Also maybe option "hive.exec.dynamic.partition.mode" will be required.

这篇关于spark数据框中的withColumn在SaveMode.Append中插入NULL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆