当processElement依赖于广播数据时,如何在flink中对BroadcastProcessFunction进行单元测试 [英] How to unit test BroadcastProcessFunction in flink when processElement depends on broadcasted data

查看:65
本文介绍了当processElement依赖于广播数据时,如何在flink中对BroadcastProcessFunction进行单元测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 BroadcastProcessFunction 实现了一个 flink 流.我从 processBroadcastElement 获取我的模型,并将其应用于 processElement 中的事件.

I implemented a flink stream with a BroadcastProcessFunction. From the processBroadcastElement I get my model and I apply it on my event in processElement.

我没有找到对流进行单元测试的方法,因为我没有找到确保模型在第一个事件之前分派的解决方案.我想说有两种方法可以实现这一点:
1. 找到一个解决方案,先将模型推送到流中
2.在流执行之前用模型填充广播状态,以便恢复

I don't find a way to unit test my stream as I don't find a solution to ensure the model is dispatched prior to the first event. I would say there are two ways for achieving this:
1. Find a solution to have the model pushed in the stream first
2. Have the broadcast state filled with the model prio to the execution of the stream so that it is restored

我可能遗漏了一些东西,但我还没有找到一种简单的方法来做到这一点.

I may have missed something, but I have not found an simple way to do this.

这是一个关于我的问题的简单单元测试:

Here is a simple unit test with my issue:

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfter, FunSuite}

import scala.collection.mutable


class BroadCastProcessor extends BroadcastProcessFunction[Int, (Int, String), String] {

  import BroadCastProcessor._

  override def processElement(value: Int,
                              ctx: BroadcastProcessFunction[Int, (Int, String), String]#ReadOnlyContext,
                              out: Collector[String]): Unit = {
    val broadcastState = ctx.getBroadcastState(broadcastStateDescriptor)

    if (broadcastState.contains(value)) {
      out.collect(broadcastState.get(value))
    }
  }

  override def processBroadcastElement(value: (Int, String),
                                       ctx: BroadcastProcessFunction[Int, (Int, String), String]#Context,
                                       out: Collector[String]): Unit = {
    ctx.getBroadcastState(broadcastStateDescriptor).put(value._1, value._2)
  }
}

object BroadCastProcessor {
  val broadcastStateDescriptor: MapStateDescriptor[Int, String] = new MapStateDescriptor[Int, String]("int_to_string", classOf[Int], classOf[String])
}

class CollectSink extends SinkFunction[String] {

  import CollectSink._

  override def invoke(value: String): Unit = {
    values += value
  }
}

object CollectSink { // must be static
  val values: mutable.MutableList[String] = mutable.MutableList[String]()
}

class BroadCastProcessTest extends FunSuite with BeforeAndAfter {

  before {
    CollectSink.values.clear()
  }

  test("add_elem_to_broadcast_and_process_should_apply_broadcast_rule") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataToProcessStream = env.fromElements(1)

    val ruleToBroadcastStream = env.fromElements(1 -> "1", 2 -> "2", 3 -> "3")

    val broadcastStream = ruleToBroadcastStream.broadcast(BroadCastProcessor.broadcastStateDescriptor)

    dataToProcessStream
      .connect(broadcastStream)
      .process(new BroadCastProcessor)
      .addSink(new CollectSink())

    // execute
    env.execute()

    CollectSink.values should contain("1")
  }
}

更新感谢大卫·安德森
我去了缓冲溶液.我为同步定义了一个过程函数:

Update thanks to David Anderson
I went for the buffer solution. I defined a process function for the synchronization:

class SynchronizeModelAndEvent(modelNumberToWaitFor: Int) extends CoProcessFunction[Int, (Int, String), Int] {
  val eventBuffer: mutable.MutableList[Int] = mutable.MutableList[Int]()
  var modelEventsNumber = 0

  override def processElement1(value: Int, ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
    if (modelEventsNumber < modelNumberToWaitFor) {
      eventBuffer += value
      return
    }
    out.collect(value)
  }

  override def processElement2(value: (Int, String), ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
    modelEventsNumber += 1

    if (modelEventsNumber >= modelNumberToWaitFor) {
      eventBuffer.foreach(event => out.collect(event))
    }
  }
}

所以我需要将它添加到我的流中:

And so I need to add it to my stream:

dataToProcessStream
  .connect(ruleToBroadcastStream)
  .process(new SynchronizeModelAndEvent(3))
  .connect(broadcastStream)
  .process(new BroadCastProcessor)
  .addSink(new CollectSink())

谢谢

推荐答案

没有一种简单的方法可以做到这一点.您可以让 processElement 缓冲其所有输入,直到 processBroadcastElement 接收到模型.或者在没有事件流量的情况下运行一次作业,并在模型广播后获取保存点.然后将该保存点恢复到同一个作业中,但其事件输入已连接.

There isn't an easy way to do this. You could have processElement buffer all of its input until the model has been received by processBroadcastElement. Or run the job once with no event traffic and take a savepoint once the model has been broadcast. Then restore that savepoint into the same job, but with its event input connected.

顺便说一句,您正在寻找的功能在 Flink 社区中通常被称为侧输入".

By the way, the capability you are looking for is often referred to as "side inputs" in the Flink community.

这篇关于当processElement依赖于广播数据时,如何在flink中对BroadcastProcessFunction进行单元测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆