如何基于存储在Cassandra中的结果使用Spark对多家公司进行累计平均交易? [英] How to perform accumulated avg for multiple companies using spark based on the results stored in Cassandra?

查看:74
本文介绍了如何基于存储在Cassandra中的结果使用Spark对多家公司进行累计平均交易?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要获取给定数据帧的平均值和计数,还需要从每个公司的Cassandra表值中获取先前存储的平均值和计数.

I need to get avg and count for given dataframe and need to get previously stored avg and count from Cassandra table values for each company.

然后需要计算平均值和计数并继续返回到Cassandra表中.

Then need to calculate avg and count and persist back into the Cassandra table.

如何为每个公司做到这一点?

How can I do it for each company ?

我有两个如下所示的数据框架构

I have two dataframe schemas as below

ingested_df
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- min_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

cassandra_df 
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

对于每个company_id,我需要存储均值"& 数数" 并计算"new_mean" "new_count"并存储回cassandra ...

For each company_id i need to get stored "mean" & "count" and calculate "new_mean" & "new_count" and store back in cassandra ...

    new_mean = ( ingested_df.mean  + cassandra_df.mean) / (ingested_df.count + cassandra_df.count)

   new_count  = (ingested_df.count + cassandra_df.count)

每个公司怎么做?

第二次:

当我在下面尝试加入上面提到的相同逻辑时

When i tried below join for the same logic above mentioned

 val resultDf = cassandra_df.join(ingested_df , 
                            ( cassandra_df("company_id") === ingested_df ("company_id") )
                            ( ingested_df ("min_dd") > cassandra_df("max_dd") )
                        , "left")

这将引发以下错误: org.apache.spark.sql.AnalysisException:引用'cassandra_df'是不明确的,可能是:company_id,company_id. 在org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)

This is throwing error as below : org.apache.spark.sql.AnalysisException: Reference 'cassandra_df' is ambiguous, could be: company_id, company_id.; at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:213)

这是怎么了?

推荐答案

请尝试以下方法:

import spark.implicits._

val ingested_df = Seq(("1", "10", "3")).toDF("company_id", "mean", "count")
val cassandra_df = Seq(("1", "123123", "20", "10")).toDF("company_id", "max_dd", "mean", "count")

val preparedIngestedDf = ingested_df.select("company_id", "mean", "count")

val resultDf = cassandra_df.join(preparedIngestedDf, Seq("company_id"), "left")
  .withColumn("new_mean", (ingested_df("mean") + cassandra_df("mean")) / (ingested_df("count") + cassandra_df("count")))
  .withColumn("new_count", ingested_df("count") + cassandra_df("count"))
  .select(
    col("company_id"),
    col("max_dd"),
    col("new_mean").as("mean"),
    col("new_count").as("new_count")
  )

这篇关于如何基于存储在Cassandra中的结果使用Spark对多家公司进行累计平均交易?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆