迭代数据帧中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询 [英] Iterate each row in a dataframe, store it in val and pass as parameter to Spark SQL query

查看:14
本文介绍了迭代数据帧中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从查找表(3 行 3 列)中获取行并逐行迭代并将每行中的值作为参数传递给 SPARK SQL.

I am trying to fetch rows from a lookup table (3 rows and 3 columns) and iterate row by row and pass values in each row to a SPARK SQL as parameters.

DB | TBL   | COL
----------------
db | txn   | ID

db | sales | ID

db | fee   | ID

我在 spark shell 中尝试了一行,它奏效了.但我发现很难遍历行.

I tried this in spark shell for one row, it worked. But I am finding it difficult to iterate over rows.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val db_name:String = "db"

val tbl_name:String = "transaction"

val unique_col:String = "transaction_number"

val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1") 

请告诉我如何遍历行并作为参数传递?

Please let me know how I can iterate over the rows and pass as parameters ?

推荐答案

以上 2 种方法一般来说可能是正确的.. 但有些我不喜欢收集由于性能原因,数据......特别是如果数据很大......

Above 2 approaches are may be right in general.. but some how I dont like collecting the data because of performance reasons... specially if data is huge...

org.apache.spark.util.CollectionAccumulator 是此类要求的正确候选者...请参阅文档

org.apache.spark.util.CollectionAccumulator is right candidate for this kind of requirements... see docs

此外,如果数据很大,那么出于性能原因,foreachPartition 再次是合适的候选者!

Also if data is huge then foreachPartition is right candidate for this for performance reasons again!

下面是实现

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator

import scala.collection.JavaConversions._
import scala.collection.mutable

object TableTest extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)


  val spark = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate

  import spark.implicits._

 val lookup =
    Seq(("db", "txn", "ID"), ("db", "sales", "ID")
     , ("db", "fee", "ID")
    ).toDF("DB", "TBL", "COL")
  val collAcc: CollectionAccumulator[String] = spark.sparkContext.collectionAccumulator[String]("mySQL Accumulator")
  val data = lookup.foreachPartition { partition =>
    partition.foreach {
      {
        record => {
          val selectString = s"select count(*), transaction_number from ${record.getAs[String]("DB")}.${record.getAs[String]("TBL")} group by ${record.getAs[String]("COL")} having count(*)>1";
          collAcc.add(selectString)
          println(selectString)
        }
      }
    }
  }
  val mycollectionOfSelects: mutable.Seq[String] = asScalaBuffer(collAcc.value)
  val finaldf = mycollectionOfSelects.map { x => spark.sql(x)
  }.reduce(_ union _)
  finaldf.show

}

示例结果:

[2019-08-13 12:11:16,458] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[Stage 0:>                                                          (0 + 0) / 2]

select count(*), transaction_number from db.txn group by ID having count(*)>1

select count(*), transaction_number from db.sales group by ID having count(*)>1

select count(*), transaction_number from db.fee group by ID having count(*)>1


注意:由于这些是伪表,我没有显示数据框.

Note : since those are psuedo tables I have NOT displayed dataframe.

这篇关于迭代数据帧中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆