如何根据存储在 Cassandra 中的结果使用 spark 为多家公司执行累积平均? [英] How to perform accumulated avg for multiple companies using spark based on the results stored in Cassandra?

查看:15
本文介绍了如何根据存储在 Cassandra 中的结果使用 spark 为多家公司执行累积平均?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要获取给定数据帧的平均值和计数,并且需要从每个公司的 Cassandra 表值中获取先前存储的平均值和计数.

I need to get avg and count for given dataframe and need to get previously stored avg and count from Cassandra table values for each company.

然后需要计算 avg 和 count 并持久化回 Cassandra 表.

Then need to calculate avg and count and persist back into the Cassandra table.

我如何为每个公司做这件事?

How can I do it for each company ?

我有两个数据框架构如下

I have two dataframe schemas as below

ingested_df
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- min_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

cassandra_df 
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

对于每个 company_id,我需要存储mean" &数数"并计算new_mean" &new_count"并存储回 cassandra ...

For each company_id i need to get stored "mean" & "count" and calculate "new_mean" & "new_count" and store back in cassandra ...

    new_mean = ( ingested_df.mean  + cassandra_df.mean) / (ingested_df.count + cassandra_df.count)

   new_count  = (ingested_df.count + cassandra_df.count)

如何为每个公司做到这一点?

How can it be done for each company?

第二次:

当我尝试在下面加入上述相同的逻辑时

When i tried below join for the same logic above mentioned

 val resultDf = cassandra_df.join(ingested_df , 
                            ( cassandra_df("company_id") === ingested_df ("company_id") )
                            ( ingested_df ("min_dd") > cassandra_df("max_dd") )
                        , "left")

这是抛出错误如下:org.apache.spark.sql.AnalysisException:引用 'cassandra_df' 不明确,可能是:company_id、company_id.在 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)

This is throwing error as below : org.apache.spark.sql.AnalysisException: Reference 'cassandra_df' is ambiguous, could be: company_id, company_id.; at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)

这里有什么问题?

推荐答案

请尝试以下方法:

import spark.implicits._

val ingested_df = Seq(("1", "10", "3")).toDF("company_id", "mean", "count")
val cassandra_df = Seq(("1", "123123", "20", "10")).toDF("company_id", "max_dd", "mean", "count")

val preparedIngestedDf = ingested_df.select("company_id", "mean", "count")

val resultDf = cassandra_df.join(preparedIngestedDf, Seq("company_id"), "left")
  .withColumn("new_mean", (ingested_df("mean") + cassandra_df("mean")) / (ingested_df("count") + cassandra_df("count")))
  .withColumn("new_count", ingested_df("count") + cassandra_df("count"))
  .select(
    col("company_id"),
    col("max_dd"),
    col("new_mean").as("mean"),
    col("new_count").as("new_count")
  )

这篇关于如何根据存储在 Cassandra 中的结果使用 spark 为多家公司执行累积平均?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆