如何根据存储在 Cassandra 中的结果使用 spark 为多家公司执行累积平均? [英] How to perform accumulated avg for multiple companies using spark based on the results stored in Cassandra?
问题描述
我需要获取给定数据帧的平均值和计数,并且需要从每个公司的 Cassandra 表值中获取先前存储的平均值和计数.
I need to get avg and count for given dataframe and need to get previously stored avg and count from Cassandra table values for each company.
然后需要计算 avg 和 count 并持久化回 Cassandra 表.
Then need to calculate avg and count and persist back into the Cassandra table.
我如何为每个公司做这件事?
How can I do it for each company ?
我有两个数据框架构如下
I have two dataframe schemas as below
ingested_df
|-- company_id: string (nullable = true)
|-- max_dd: date (nullable = true)
|-- min_dd: date (nullable = true)
|-- mean: double (nullable = true)
|-- count: long (nullable = false)
cassandra_df
|-- company_id: string (nullable = true)
|-- max_dd: date (nullable = true)
|-- mean: double (nullable = true)
|-- count: long (nullable = false)
对于每个 company_id,我需要存储mean" &数数"并计算new_mean" &new_count"并存储回 cassandra ...
For each company_id i need to get stored "mean" & "count" and calculate "new_mean" & "new_count" and store back in cassandra ...
即
new_mean = ( ingested_df.mean + cassandra_df.mean) / (ingested_df.count + cassandra_df.count)
new_count = (ingested_df.count + cassandra_df.count)
如何为每个公司做到这一点?
How can it be done for each company?
第二次:
当我尝试在下面加入上述相同的逻辑时
When i tried below join for the same logic above mentioned
val resultDf = cassandra_df.join(ingested_df ,
( cassandra_df("company_id") === ingested_df ("company_id") )
( ingested_df ("min_dd") > cassandra_df("max_dd") )
, "left")
这是抛出错误如下:org.apache.spark.sql.AnalysisException:引用 'cassandra_df' 不明确,可能是:company_id、company_id.在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)
This is throwing error as below : org.apache.spark.sql.AnalysisException: Reference 'cassandra_df' is ambiguous, could be: company_id, company_id.; at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)
这里有什么问题?
推荐答案
请尝试以下方法:
import spark.implicits._
val ingested_df = Seq(("1", "10", "3")).toDF("company_id", "mean", "count")
val cassandra_df = Seq(("1", "123123", "20", "10")).toDF("company_id", "max_dd", "mean", "count")
val preparedIngestedDf = ingested_df.select("company_id", "mean", "count")
val resultDf = cassandra_df.join(preparedIngestedDf, Seq("company_id"), "left")
.withColumn("new_mean", (ingested_df("mean") + cassandra_df("mean")) / (ingested_df("count") + cassandra_df("count")))
.withColumn("new_count", ingested_df("count") + cassandra_df("count"))
.select(
col("company_id"),
col("max_dd"),
col("new_mean").as("mean"),
col("new_count").as("new_count")
)
这篇关于如何根据存储在 Cassandra 中的结果使用 spark 为多家公司执行累积平均?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!