Watermark在Flink CEP中远远落后 [英] Watermark fell far behind in Flink CEP

查看:468
本文介绍了Watermark在Flink CEP中远远落后的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Flink CEP来检测来自Kafka的事件的模式.为了简单起见,事件只有一种类型.我正在尝试检测连续事件流中字段值的变化.代码如下所示

I am using Flink CEP to detect patterns against events from Kafka. For simplicity, events only have one type. I am trying to detect the change in the value of a field in the continuous event stream. The code looks like the following

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(TypeInformation.of(classOf[...]))
    
val pattern: Pattern[Event, _] = 
          Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             }
          })
          .next("end").times(1)
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            }
          })
          .within(Time.seconds(30))

Kafka主题有104个分区,事件在这些分区之间均匀分布.当我提交作业时,parallelism设置为104.

Kafka topic has 104 partitions, events are distributed evenly across the partitions. When I submitted the job, parallelism was set to 104.

在Web UI中,有2个任务:第一个任务是Source->filter->map->timestamp/watermark;第二个任务是Source->filter->map->timestamp/watermark.第二个是CepOperator->sink.每个任务有104个并行度.

From Web UI, there were 2 tasks: the first one is Source->filter->map->timestamp/watermark; the second one is CepOperator->sink. Each task got 104 parallelism.

子任务的工作负载不均衡,应来自keyBy.子任务之间的水印有所不同,但是它们开始固定在一个值上,很长时间没有变化.从日志中,我可以看到CEP不断评估事件,并将匹配的结果推送到下游接收器.

The workload on subtasks was uneven, it should come from keyBy. Watermarks among subtasks were different, but they started to be stuck at a value, no change for a long time. From logs, I can see CEP kept evaluating events, and matched results being pushed to downstream sink.

事件速率为10k/s,第一个任务的背压保持为high,第二个任务的背压保持为ok.

The event rate was 10k/s, and the first task's backpressure kept high and the second one ok.

请帮助解释CEP中发生了什么以及如何解决该问题

Please help explain what happened in CEP and how to fix the issue

谢谢

推荐答案

考虑到您的问题,我正在修改我的答案.

Having given your question more careful consideration, I'm revising my answer.

听起来CEP正在继续产生火柴,并且将它们推到水槽,但是CEP +水槽的任务正在产生很高的背压.有助于确定背压的原因.

It sounds like CEP is continuing to produce matches and they are being pushed to the sink, but the CEP+sink task is producing high backpressure. What would help is to identity the cause of the backpressure.

如果可以从所有分区读取事件,而水印只是勉强前进,则听起来背压足够严重,根本无法吸收事件.

If events are available to read from all partitions, and yet the watermarks are only barely advancing, it sounds like the backpressure is severe enough to prevent events from being ingested at all.

我怀疑

  1. CEP引擎中的努力组合爆炸,和/或
  2. 足以使水槽无法跟上的比赛

可能的原因.

获取更多见解的一些想法:

A few ideas for getting more insight:

(1)尝试使用探查器确定CepOperator是否是瓶颈,并可能确定它在做什么.

(1) Try using a profiler to determine if the CepOperator is the bottleneck, and perhaps identify what it is doing.

(2)禁用CepOperator和接收器之间的运算符链接以隔离CEP-仅作为调试步骤.这将使您(通过度量标准和反压监视)更好地了解CEP和接收器各自在做什么.

(2) Disable operator chaining between the CepOperator and the sink in order to isolate CEP -- simply as a debugging step. This will give you better visibility (via the metrics and backpressure monitoring) as to what CEP and the sink are each doing.

(3)在较小的设置中进行测试,然后扩展CEP日志记录.

(3) Test this in a smaller setup, and expand the CEP logging.

这篇关于Watermark在Flink CEP中远远落后的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆