流静态联接:如何定期刷新(不持久/持久)静态数据帧 [英] Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically

查看:82
本文介绍了流静态联接:如何定期刷新(不持久/持久)静态数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个Spark结构化流应用程序,并在其中进行批处理流联接.批处理数据的来源会定期更新.

I am building a Spark Structured Streaming application where I am doing a batch-stream join. And the source for the batch data gets updated periodically.

因此,我打算定期对该批处理数据进行持久化/不持久化.

So, I am planning to do a persist/unpersist of that batch data periodically.

下面是一个示例代码,我用来持久化和取消持久化批处理数据.

Below is a sample code which I am using to persist and unpersist the batch data.

流量:

  • 读取批次数据
  • 保留批次数据
  • 每隔一小时,取消持久化数据并读取批处理数据,然后再次保存.

但是,我没有看到每小时刷新一次批处理数据.

But, I am not seeing the batch data getting refreshed for every hour.

代码:

var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()

if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
  refreshedTime = Instant.now()
  batchDF.unpersist(false)
  batchDF =  handler.readBatchDF(sparkSession)
    .persist(StorageLevel.MEMORY_AND_DISK)
}

在Spark结构化流作业中是否有更好的方法来实现此方案?

Is there any better way to achieve this scenario in spark structured streaming jobs ?

推荐答案

您可以通过利用结构化流提供的流调度功能来做到这一点.

You could do this by making use of the streaming scheduling capabilities that Structured Streaming provides.

您可以通过创建人为的费率"代码来触发静态数据帧的刷新(不持久->加载->持久).定期刷新静态数据帧的数据流.这个想法是:

You can trigger the refreshing (unpersist -> load -> persist) of a static Dataframe by creating an artificial "Rate" stream that refreshes the static Dataframe periodically. The idea is to:

  1. 首先加载静态数据框,并保持为 var
  2. 定义刷新静态数据框的方法
  3. 使用费率"以所需的时间间隔(例如1小时)触发的流
  4. 读取实际的流数据并使用静态Dataframe执行联接操作
  5. 在Rate Stream中,有一个 foreachBatch 接收器,它调用刷新方法
  1. Load the static Dataframe initially and keep as var
  2. Define a method that refreshes the static Dataframe
  3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  4. Read actual streaming data and perform join operation with static Dataframe
  5. Within that Rate Stream have a foreachBatch sink that calls refresher method

以下代码可在Spark 3.0.1,Scala 2.12.10和Delta 0.7.0上正常运行.

The following code runs fine with Spark 3.0.1, Scala 2.12.10 and Delta 0.7.0.

  // 1. Load the staticDataframe initially and keep as `var`
  var staticDf = spark.read.format("delta").load(deltaPath)
  staticDf.persist()

  //  2. Define a method that refreshes the static Dataframe
  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  }

  // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  val staticRefreshStream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .option("numPartitions", 1)
    .load()
    .selectExpr("CAST(value as LONG) as trigger")
    .as[Long]

  // 4. Read actual streaming data and perform join operation with static Dataframe
  // As an example I used Kafka as a streaming source
  val streamingDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")

  val joinDf = streamingDf.join(staticDf, "id")

  val query = joinDf.writeStream
    .format("console")
    .option("truncate", false)
    .option("checkpointLocation", "/path/to/sparkCheckpoint")
    .start()

  // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
  staticRefreshStream.writeStream
    .outputMode("append")
    .foreachBatch(foreachBatchMethod[Long] _)
    .queryName("RefreshStream")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .start()

要获得完整的示例,将创建增量表并使用以下新值对其进行更新:

To have a full example, the delta table got created and updated with new values as below:

  val deltaPath = "file:///tmp/delta/table"

  import spark.implicits._
  val df = Seq(
    (1L, "static1"),
    (2L, "static2")
  ).toDF("id", "deltaField")

  df.write
    .mode(SaveMode.Overwrite)
    .format("delta")
    .save(deltaPath)

这篇关于流静态联接:如何定期刷新(不持久/持久)静态数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆