向数据框添加新列的问题 - spark/scala [英] Problems with adding a new column to a dataframe - spark/scala
问题描述
我是 spark/scala 的新手.我正在尝试从配置单元表中读取一些数据到 spark 数据框,然后根据某些条件添加一列.这是我的代码:
I am new to spark/scala. I am trying to read some data from a hive table to a spark dataframe and then add a column based on some condition. Here is my code:
val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")
def dateDiff(partition_date: org.apache.spark.sql.Column, item_due_date: org.apache.spark.sql.Column): Long ={
ChronoUnit.DAYS.between(LocalDate.parse(partition_date.toString()), LocalDate.parse(item_due_date.toString))
}
val finalDF = DF.withColumn("status",
when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && !(col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "approved")
.when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "pending")
.when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) >= 0), "expired")
.otherwise("null"))
dateDiff
是计算partition_date
和item_due_date
之间差异的函数,它们是DF
中的列.我正在尝试使用 when
和 otherwise
向 DF
添加一个新列,它使用 dateDiff
来获取日期之间的差异.
dateDiff
is a function that calculates the difference between partition_date
and item_due_date
, which are columns in DF
. I am trying to add a new column to DF
by using when
and otherwise
which uses the dateDiff
to get the difference between dates.
现在,当我运行上面的代码时,出现以下错误:org.threeten.bp.format.DateTimeParseException: Text 'partition_date' could not be parsed at index 0
Now, when I run the above code, I get the following error: org.threeten.bp.format.DateTimeParseException: Text 'partition_date' could not be parsed at index 0
我相信 partition_date
列的值不会被转换为要解析为日期的字符串.这是正在发生的事情吗?如果是,如何将列值转换为 String ?
I believe the value of the column partition_date
is not being converted to a String to be parsed as a date. Is this what's happening? If yes, how do I cast the column value to a String ?
以下是我在 DF
中使用的列的架构:
Below is the schema of the columns I am using from the DF
:
|-- item_due_date: string (nullable = true)
|-- past_due: integer (nullable = true)
|-- item_decision: string (nullable = true)
|-- partition_date: string (nullable = true)
我使用的列的数据样本来自 DF
:
A data sample of the columns I am using from the DF
:
+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
| 1| 0001-01-14| null| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 0| 2018-03-18| null| 2017-11-22|
| 1| 2016-11-30| null| 2017-11-22|
+--------+-------------+-------------+--------------+
我也尝试使用自定义 UDF:
I also tried using a custom UDF:
def status(past_due: Int, item_decision: String, maxPartitionDate: String, item_due_date: String): String = {
if (past_due == 1 && item_due_date != "NULL") {
if (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(item_due_date.trim)) < 0) {
if (item_decision != "NULL") "pending"
else "approved"
} else "expired"
} else "NULL"
}
val statusUDF = sqlContext.udf.register("statusUDF", status _)
val DF2 = DF.withColumn("status", statusUDF(DF("past_due"),DF("item_decision"),DF("partition_date"),DF("item_due_date")))
DF2.show()
它每次都会在 DF2.show
语句中抛出以下错误:
And it throws the following error at the DF2.show
statement, everytime:
Container exited with a non-zero exit code 50
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1644)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1603)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1592)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
at driver$.main(driver.scala:109)
at driver.main(driver.scala)
任何帮助将不胜感激.谢谢!
Any help would be appreciated. Thank you!
推荐答案
您可以简单地使用 datediff
内置函数来检查两列之间的天数差异.你不需要编写你的函数或 udf
函数.当功能也比你的修改时
You can simply use datediff
inbuilt function to check for the days difference between two columns. you don't need to write your function or udf
function. And when function is also modified than yours
import org.apache.spark.sql.functions._
val finalDF = DF.withColumn("status",
when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && col("item_decision").isNotNull && !(lower(col("item_decision")).equalTo("null")), "approved")
.otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").isNull || lower(col("item_decision")).equalTo("null")), "pending")
.otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) >= 0), "expired")
.otherwise("null"))))
这个逻辑将转换dataframe
+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|1 |2017-12-14 |null |2017-11-22 |
|1 |2017-12-14 |Mitigate |2017-11-22 |
|1 |0001-01-14 |Mitigate |2017-11-22 |
|1 |0001-01-14 |Mitigate |2017-11-22 |
|0 |2018-03-18 |null |2017-11-22 |
|1 |2016-11-30 |null |2017-11-22 |
+--------+-------------+-------------+--------------+
添加 status
列作为
+--------+-------------+-------------+--------------+--------+
|past_due|item_due_date|item_decision|partition_date|status |
+--------+-------------+-------------+--------------+--------+
|1 |2017-12-14 |null |2017-11-22 |pending |
|1 |2017-12-14 |Mitigate |2017-11-22 |approved|
|1 |0001-01-14 |Mitigate |2017-11-22 |expired |
|1 |0001-01-14 |Mitigate |2017-11-22 |expired |
|0 |2018-03-18 |null |2017-11-22 |null |
|1 |2016-11-30 |null |2017-11-22 |expired |
+--------+-------------+-------------+--------------+--------+
希望回答对你有帮助
这篇关于向数据框添加新列的问题 - spark/scala的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!