Flink Patterned Stream 没有打印任何内容 [英] Nothing is being printed out from a Flink Patterned Stream

查看:17
本文介绍了Flink Patterned Stream 没有打印任何内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在下面有这个代码:

import java.util.Properties

import com.google.gson._
import com.typesafe.config.ConfigFactory
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object WindowedWordCount {
  val configFactory = ConfigFactory.load()
  def main(args: Array[String]) = {
    val brokers = configFactory.getString("kafka.broker")
    val topicChannel1 = configFactory.getString("kafka.topic1")

    val props = new Properties()
    ...

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))

    val partitionedInput = dataStream.keyBy(jsonString => {
      val jsonParser = new JsonParser()
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
      jsonObject.get("account")
    })

    val priceCheck = Pattern.begin[String]("start").where({jsonString =>
      val jsonParser = new JsonParser()
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
      jsonObject.get("account").toString == "iOS"})

    val pattern = CEP.pattern(partitionedInput, priceCheck)

    val newStream = pattern.select(x =>
      x.get("start").map({str =>
        str
      })
    )

    newStream.print()

    env.execute()
  }
}

由于某些原因,newStream.print() 上面的代码没有打印出来.我很肯定 Kafka 中有数据与我上面定义的模式相匹配,但由于某种原因没有打印出来.

For some reason in the above code at the newStream.print() nothing is being printed out. I am positive that there is data in Kafka that matches the pattern that I defined above but for some reason nothing is being printed out.

谁能帮我找出这段代码中的错误?

Can anyone please help me spot an error in this code?

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {

  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    val jsonParser = new JsonParser()
    val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context")
    Instant.parse(context.get("serverTimestamp").toString.replaceAll("\"", "")).toEpochMilli
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis())
  }
}

我在 flink doc 上看到你可以在 extractTimestamp 方法中返回 prevElementTimestamp(如果你使用的是 Kafka010)和 new Watermark(System.currentTimeMillis)getCurrentWatermark 方法中.

I saw on the flink doc that you can just return prevElementTimestamp in the extractTimestamp method (if you are using Kafka010) and new Watermark(System.currentTimeMillis) in the getCurrentWatermark method.

但我不明白 prevElementTimestamp 是什么,或者为什么要返回 new Watermark(System.currentTimeMillis) 作为 WaterMark 而不是其他东西.请您详细说明我们为什么要这样做,以及 WaterMarkEventTime 如何协同工作?

But I don't understand what prevElementTimestamp is or why one would return new Watermark(System.currentTimeMillis) as the WaterMark and not something else. Can you please elaborate on why we do this on how WaterMark and EventTime work together please?

推荐答案

您确实将您的工作设置为在 EventTime 中工作,但您没有提供时间戳和水印提取器.

You do setup your job to work in EventTime, but you do not provide timestamp and watermark extractor.

有关在活动时间工作的更多信息,请参阅文档.如果你想使用 kafka 嵌入的时间戳,这个 docs 可能对您有所帮助.

For more on working in event time see those docs. If you want to use the kafka embedded timestamps this docs may help you.

EventTime 中,CEP 库在水印到达时缓冲事件,以便正确处理无序事件.在您的情况下,没有生成水印,因此事件被无限缓冲.

In EventTime the CEP library buffers events upon watermark arrival so to properly handle out-of-order events. In your case there are no watermarks generated, so the events are buffered infinitly.

  1. 对于 prevElementTimestamp,我认为文档非常清楚:

  1. For the prevElementTimestamp I think the docs are pretty clear:

使用来自 Kafka 的时间戳时无需定义时间戳提取器.extractTimestamp()方法的previousElementTimestamp参数包含Kafka消息携带的时间戳.

There is no need to define a timestamp extractor when using the timestamps from Kafka. The previousElementTimestamp argument of the extractTimestamp() method contains the timestamp carried by the Kafka message.

从 Kafka 0.10.x 开始,Kafka 消息可以嵌入时间戳.

Since Kafka 0.10.x Kafka messages can have embedded timestamp.

Watermark 生成为 new Watermark(System.currentTimeMillis) 在这种情况下不是一个好主意.您应该根据您对数据的了解创建 Watermark.有关 WatermarkEventTime 如何协同工作的信息,我没有比 docs

Generating Watermark as new Watermark(System.currentTimeMillis) in this case is not a good idea. You should create Watermark based on your knowledge of the data. For information on how Watermark and EventTime work together I could not be more clear than the docs

这篇关于Flink Patterned Stream 没有打印任何内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆