Apache Flink广播状态被刷新 [英] Apache flink broadcast state gets flushed

查看:313
本文介绍了Apache Flink广播状态被刷新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用广播模式来连接两个流,并将数据从一个流读取到另一个流.代码看起来像这样

I am using the broadcast pattern to connect two streams and read data from one to another. The code looks like this

case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
  override def processBroadcastElement(in2: (String, Double), 
                                       context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
                                       collector:Collector[MyObject]):Unit={
    context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
  }

  override def processElement(obj: MyObject,
                            readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double), 
                            MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
    val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
    //If I print the context of the state here sometimes it is empty.
    out.collect(MyObject(new, properties, go, here))
  }
}

状态描述符:

val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])

我的执行代码如下.

val streamA :DataStream[MyObject] = ... 
val streamB :DataStream[(String,Double)] = ... 
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)

streamA.connect(streamB).process(new Broadcast)

问题出在processElement函数中,状态有时为空,有时为空.由于我不断从我知道它具有数据的文件中进行流式传输,因此状态应始终包含数据.我不明白为什么它会刷新状态并且无法获取数据.

The problem is in the processElement function the state sometimes is empty and sometimes not. The state should always contain data since I am constantly streaming from a file that I know it has data. I do not understand why it is flushing the state and I cannot get the data.

我尝试在将数据置于状态之前和之后在processBroadcastElement中添加一些打印内容,结果如下所示:

I tried adding some printing in the processBroadcastElement before and after putting the data to the state and the result is the following

0 - 1
1 - 2 
2 - 3 
.. all the way to 48 where it resets back to 0

更新: 我注意到的事情是,当我减小流执行上下文的超时值时,结果会好一些.当我增加它时,地图总是空的.

UPDATE: something that I noticed is when I decrease the value of the timeout of the streaming execution context, the results are a bit better. when I increase it then the map is always empty.

env.setBufferTimeout(1) //better results 
env.setBufferTimeout(200) //worse result (default is 100)

推荐答案

正如David提到的那样,该作业可能正在重新启动.我禁用了检查点,因此可以看到引发任何可能的异常,而不是flink静默失败并重新启动作业.

As David mentioned the job could be restarting. I disabled the checkpoints so I could see any possible exception thrown instead of flink silently failing and restarting the job.

事实证明,尝试解析文件时出错.因此,作业一直在重新启动,因此状态为空,而flink却一遍又一遍地消耗流.

It turned out that there was an error while trying to parse the file. So the job kept restarting thus the state was empty and flink kept consuming the stream over and over again.

这篇关于Apache Flink广播状态被刷新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆