spark结构化流式批处理数据刷新问题(partition by子句) [英] spark structured streaming batch data refresh issue (partition by clause)

查看:90
本文介绍了spark结构化流式批处理数据刷新问题(partition by子句)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在将 Spark 结构化流数据帧与批处理数据帧结合时遇到问题,我的场景我有一个 S3 流,它需要与历史数据进行左反连接,该数据返回历史中不存在的记录(找出新记录)我将这些记录作为新的追加写入历史记录(按列分区磁盘数据分区而不是内存).

I came across a problem while joining spark structured streaming data frame with batch data frame , my scenario I have a S3 stream which needs to do left anti join with history data which returns record not present in history (figures out new records) and I write these records to history as a new append (partition by columns disk data partition not memory).

当我刷新已分区的历史数据框时,我的历史数据框没有更新.

when I refresh my history data frame which is partitioned my history data frame doesn't get updated.

下面是两个代码片段,一个有效,另一个无效.

Below are the code two code snippets one which work's the other which doesn't work.

工作代码和非工作代码之间的唯一区别是 partition_by 子句.

Only difference between working code and non working code is partition_by clause.

工作代码:-(历史被刷新)

Working Code:- (history gets refreshed)

import spark.implicits._

    val inputSchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val historySchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val source = spark.readStream
      .schema(inputSchema)
      .option("header", "false")
      .csv("src/main/resources/Input/")

    val history = spark.read
      .schema(inputSchema)
      .option("header", "true")
      .csv("src/main/resources/history/")
      .withColumnRenamed("spark_id", "spark_id_2")
      .withColumnRenamed("account_id", "account_id_2")
      .withColumnRenamed("run_dt", "run_dt_2")
      .withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
      .withColumnRenamed("trxn_dt", "trxn_dt_2")
      .withColumnRenamed("trxn_amt", "trxn_amt_2")

    val readFilePersisted = history.persist()
    readFilePersisted.createOrReplaceTempView("hist")

    val recordsNotPresentInHist = source
      .join(
        history,
        source.col("account_id") === history.col("account_id_2") &&
          source.col("run_dt") === history.col("run_dt_2") &&
          source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
          source.col("trxn_dt") === history.col("trxn_dt_2") &&
          source.col("trxn_amt") === history.col("trxn_amt_2"),
        "leftanti"
      )

    recordsNotPresentInHist.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .mode(SaveMode.Append)
          //.partitionBy("spark_id", "account_id", "run_dt")
          .csv("src/main/resources/history/")

        val lkpChacheFileDf1 = spark.read
          .schema(inputSchema)
          .parquet("src/main/resources/history")

        val lkpChacheFileDf = lkpChacheFileDf1
        lkpChacheFileDf.unpersist(true)
        val histLkpPersist = lkpChacheFileDf.persist()
        histLkpPersist.createOrReplaceTempView("hist")

      }
      .start()

    println("This is the kafka dataset:")
    source
      .withColumn("Input", lit("Input-source"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    recordsNotPresentInHist
      .withColumn("reject", lit("recordsNotPresentInHist"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    spark.streams.awaitAnyTermination()

不起作用:-(历史没有刷新)

Doesn't Work:- (history is not getting refreshed)

import spark.implicits._

    val inputSchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val historySchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val source = spark.readStream
      .schema(inputSchema)
      .option("header", "false")
      .csv("src/main/resources/Input/")

    val history = spark.read
      .schema(inputSchema)
      .option("header", "true")
      .csv("src/main/resources/history/")
      .withColumnRenamed("spark_id", "spark_id_2")
      .withColumnRenamed("account_id", "account_id_2")
      .withColumnRenamed("run_dt", "run_dt_2")
      .withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
      .withColumnRenamed("trxn_dt", "trxn_dt_2")
      .withColumnRenamed("trxn_amt", "trxn_amt_2")

    val readFilePersisted = history.persist()
    readFilePersisted.createOrReplaceTempView("hist")

    val recordsNotPresentInHist = source
      .join(
        history,
        source.col("account_id") === history.col("account_id_2") &&
          source.col("run_dt") === history.col("run_dt_2") &&
          source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
          source.col("trxn_dt") === history.col("trxn_dt_2") &&
          source.col("trxn_amt") === history.col("trxn_amt_2"),
        "leftanti"
      )

    recordsNotPresentInHist.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .mode(SaveMode.Append)
          .partitionBy("spark_id", "account_id","run_dt")
          .csv("src/main/resources/history/")

        val lkpChacheFileDf1 = spark.read
          .schema(inputSchema)
          .parquet("src/main/resources/history")

        val lkpChacheFileDf = lkpChacheFileDf1
        lkpChacheFileDf.unpersist(true)
        val histLkpPersist = lkpChacheFileDf.persist()
        histLkpPersist.createOrReplaceTempView("hist")

      }
      .start()

    println("This is the kafka dataset:")
    source
      .withColumn("Input", lit("Input-source"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    recordsNotPresentInHist
      .withColumn("reject", lit("recordsNotPresentInHist"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    spark.streams.awaitAnyTermination()

谢谢斯里

推荐答案

我通过使用 union by name 函数而不是从磁盘读取刷新的数据来解决这个问题.

I resolved this problem by using union by name function instead of reading refreshed data from disk.

第 1 步:-读历史S3

Step 1:- Read history S3

第 2 步:-读卡夫卡查历史

Step 2:- Read Kafka and look up history

第 3 步:-将处理后的数据写入磁盘,并使用 union by name spark 函数附加到步骤 1 中创建的数据帧.

Step 3:- Write to processed data to Disk and append to data frame created in step 1 using union by name spark function.

第 1 步代码(阅读历史数据帧):-

Step 1 Code (Reading History Data Frame):-

val acctHistDF = sparkSession.read
.schema(schema)
.parquet(S3path)
val acctHistDFPersisted = acctHistDF.persist()
acctHistDFPersisted.createOrReplaceTempView("acctHist")

第 2 步(使用流数据刷新历史数据帧):-

Step 2 (Refreshing History Data Frame with stream data):-

val history = sparkSession.table("acctHist")
history.unionByName(stream)
history.createOrReplaceTempView("acctHist")

谢谢斯里

这篇关于spark结构化流式批处理数据刷新问题(partition by子句)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆