当生产者(kafka)不产生新行时,如何让Flink刷新最后一行沉入 [英] How to let Flink flush last line to sink when producer(kafka) does not produce new line

查看:144
本文介绍了当生产者(kafka)不产生新行时,如何让Flink刷新最后一行沉入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我的Flink程序处于事件时间模式时,接收器将不会获得最后一行(例如A行).如果将新行(行B)输入Flink,我将获得行A,但仍然无法获得行b.

when my Flink program is in event time mode, sink will not get last line(say line A). If I feed new line(line B) to Flink, I will get the line A, but I still cann't get the line b.

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")

    val consumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)

    val stream: DataStream[String] = env.addSource(consumer).setParallelism(1)

    stream.map { m =>
      val result = JSON.parseFull(m).asInstanceOf[Some[Map[String, Any]]].get
      val msg = result("message").asInstanceOf[String]
      val num = parseMessage(msg)
      val key = s"${num.zoneId} ${num.subZoneId}"
      (key, num, num.onlineNum)
    }.filter { data =>
      data._2.subZoneId == 301 && data._2.zoneId == 5002
    }.assignTimestampsAndWatermarks(new MyTimestampExtractor()).keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
        .allowedLateness(Time.minutes(1))
      .maxBy(2).addSink { v =>
      System.out.println(s"${v._2.time} ${v._1}: ${v._2.onlineNum} ")
    }

class MyTimestampExtractor() extends AscendingTimestampExtractor[(String, OnlineNum, Int)](){
  val byMinute = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:SS")
  override def extractAscendingTimestamp(element: (String, OnlineNum, Int)): Long = {
    val dateTimeString =  element._2.date + " " + element._2.time
    val c1 = byMinute.parse(dateTimeString).getTime
    if ( element._2.time.contains("22:59") && element._2.subZoneId == 301){
      //System.out.println(s"${element._2.time} ${element._1}: ${element._2.onlineNum} ")
      // System.out.println(s"${element._2.time} ${c1 - getCurrentWatermark.getTimestamp}")
    }

    // System.out.println(s"${element._2.time} ${c1} ${c1 - getCurrentWatermark.getTimestamp}")
    return c1
  }
}

数据样本:

01:01:14 5002 301: 29 
01:01:36 5002 301: 27 
01:02:05 5002 301: 27 
01:02:31 5002 301: 29 
01:03:02 5002 301: 29 
01:03:50 5002 301: 29 
01:04:52 5002 301: 29 
01:07:24 5002 301: 26 
01:09:28 5002 301: 21 
01:11:04 5002 301: 22 
01:12:11 5002 301: 24 
01:13:54 5002 301: 23 
01:15:13 5002 301: 22 
01:16:04 5002 301: 19 (I can not get this line )

然后我将新行推送到Flink(通过kafka)

Then I push new line to Flink(via kafka)

01:17:28 5002 301: 15 

我会得到01:16:04 5002 301: 19,但是01:17:28 5002 301: 15可能保留在Flink中.

I will get 01:16:04 5002 301: 19, but 01:17:28 5002 301: 15 may be held in Flink.

推荐答案

之所以会发生这种情况,是因为它是事件时间,并且事件的时间戳用于测量窗口的时间流.

this happens because it's event time and the event's timestamp is used to measure the flow of time for windows.

在这种情况下,当窗口中只有一个事件时,Flink不知道应忽略该窗口.因此,当您添加下一个事件时,关闭前一个窗口并发出元素(在您的情况下为19),但随后又创建了下一个窗口(在您的情况下为15).

In such case, when only one event is in the window Flink does not know that the window should be omitted. For this reason, when You add next event, the previous window is closed and elements are emitted (in your case 19), but then again next window is created (in your case 15).

在这种情况下,最好的主意是添加自定义ProcessingTimeTrigger,无论事件是否进行,基本上都将允许您在经过一段时间后发射窗口.您可以在

Probably the best idea in such case is to add custom ProcessingTimeTrigger which will basically allow You to emit the window after some time has flown, no matter if the events are flowing or not. You can find info about Trigger in the documentation.

这篇关于当生产者(kafka)不产生新行时,如何让Flink刷新最后一行沉入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆