Apache flink 广播状态被刷新 [英] Apache flink broadcast state gets flushed

查看:30
本文介绍了Apache flink 广播状态被刷新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用广播模式连接两个流并从一个流读取到另一个流.代码看起来像这样

I am using the broadcast pattern to connect two streams and read data from one to another. The code looks like this

case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
  override def processBroadcastElement(in2: (String, Double), 
                                       context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
                                       collector:Collector[MyObject]):Unit={
    context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
  }

  override def processElement(obj: MyObject,
                            readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double), 
                            MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
    val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
    //If I print the context of the state here sometimes it is empty.
    out.collect(MyObject(new, properties, go, here))
  }
}

状态描述符:

val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])

我的执行代码如下所示.

My execution code looks like this.

val streamA :DataStream[MyObject] = ... 
val streamB :DataStream[(String,Double)] = ... 
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)

streamA.connect(streamB).process(new Broadcast)

问题出在 processElement 函数中,状态有时为空,有时为空.状态应该始终包含数据,因为我不断地从我知道它有数据的文件中流式传输.我不明白为什么它正在刷新状态并且我无法获取数据.

The problem is in the processElement function the state sometimes is empty and sometimes not. The state should always contain data since I am constantly streaming from a file that I know it has data. I do not understand why it is flushing the state and I cannot get the data.

我尝试在将数据放入状态之前和之后在 processBroadcastElement 中添加一些打印,结果如下

I tried adding some printing in the processBroadcastElement before and after putting the data to the state and the result is the following

0 - 1
1 - 2 
2 - 3 
.. all the way to 48 where it resets back to 0

更新:我注意到的一点是,当我减少流执行上下文的超时值时,结果会好一些.当我增加它时,地图总是空的.

UPDATE: something that I noticed is when I decrease the value of the timeout of the streaming execution context, the results are a bit better. when I increase it then the map is always empty.

env.setBufferTimeout(1) //better results 
env.setBufferTimeout(200) //worse result (default is 100)

推荐答案

正如 David 提到的,工作可能会重新启动.我禁用了检查点,所以我可以看到任何可能抛出的异常,而不是 flink 静默失败并重新启动作业.

As David mentioned the job could be restarting. I disabled the checkpoints so I could see any possible exception thrown instead of flink silently failing and restarting the job.

结果是尝试解析文件时出错.所以作业不断重启,状态为空,flink 不断地消耗流.

It turned out that there was an error while trying to parse the file. So the job kept restarting thus the state was empty and flink kept consuming the stream over and over again.

这篇关于Apache flink 广播状态被刷新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆