加入数据框并执行操作 [英] join dataframes and perform operation
本文介绍了加入数据框并执行操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
大家好,我有一个数据框,每个日期都是最新的,我每天需要将新的qte和新的ca添加到旧的框架中并更新日期.因此,我需要更新已经存在的那些并添加新的.这里有一个示例,我希望最后获得
Hello guys i have a dataframe that is being up to date each date , each day i need to add the new qte and the new ca to the old one and update the date . So i need to update the ones that are already existing and add the new ones.Here an example what i would like to have at the end
val histocaisse = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")
val hist = histocaisse
.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("article_id", 'pos_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(DoubleType))
.withColumn("ca", 'ca.cast(DoubleType))
val histocaisse2 = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")
val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("article_id", 'pos_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(DoubleType))
.withColumn("ca", 'ca.cast(DoubleType))
hist2.show(false)
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-07|2.5 |3.5 |
|2 |2 |2000-01-07|14.7|12.0|
|3 |3 |2000-01-07|3.5 |1.2 |
+------+----------+----------+----+----+
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-08|2.5 |3.5 |
|2 |2 |2000-01-08|14.7|12.0|
|3 |3 |2000-01-08|3.5 |1.2 |
|4 |4 |2000-01-08|3.5 |1.2 |
|5 |5 |2000-01-08|14.5|1.2 |
|6 |6 |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-08|5.0 |7.0 |
|2 |2 |2000-01-08|39.4|24.0|
|3 |3 |2000-01-08|7.0 |2.4 |
|4 |4 |2000-01-08|3.5 |1.2 |
|5 |5 |2000-01-08|14.5|1.2 |
|6 |6 |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+
这是我所做的
val histoCombinaison2=hist2.join(hist,Seq("article_id","pos_id"),"left")
.groupBy("article_id","pos_id").agg((hist2("qte")+hist("qte")) as ("qte"),(hist2("ca")+hist("ca")) as ("ca"),hist2("date"))
histoCombinaison2.show()
我收到以下异常
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression '`qte`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:218)
推荐答案
正如我提到的评论,您应该定义您的 schema
并将其用于读取 csv
dataframe
为
As I have mentioned your comment that you should define your schema
and use it in reading csv
to dataframe
as
import sqlContext.implicits._
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("pos_id", LongType, true),
StructField("article_id", LongType, true),
StructField("date", DateType, true),
StructField("qte", LongType, true),
StructField("ca", DoubleType, true)
))
val hist1 = sqlContext.read
.format("csv")
.option("header", "true")
.schema(schema)
.load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")
hist1.show
val hist2 = sqlContext.read
.format("csv")
.option("header", "true") //reading the headers
.schema(schema)
.load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")
hist2.show
然后,您应该使用 when
函数定义需要实现为的逻辑
Then you should use when
function to define the logic you need to implement as
val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left")
.select($"pos_id", $"article_id",
when(hist2("date").isNotNull, hist2("date")).otherwise(when(hist1("date").isNotNull, hist1("date")).otherwise(lit(null))).alias("date"),
(when(hist2("qte").isNotNull, hist2("qte")).otherwise(lit(0)) + when(hist1("qte").isNotNull, hist1("qte")).otherwise(lit(0))).alias("qte"),
(when(hist2("ca").isNotNull, hist2("ca")).otherwise(lit(0)) + when(hist1("ca").isNotNull, hist1("ca")).otherwise(lit(0))).alias("ca"))
我希望答案会有所帮助
这篇关于加入数据框并执行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文