在运行时正常关闭 Flink Kafka Comsumer [英] Gracefully shut down Flink Kafka Comsumer at run time

查看:23
本文介绍了在运行时正常关闭 Flink Kafka Comsumer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将 FlinkKafkaConsumer010 与 Flink 1.2.0 一起使用,我面临的问题是:如果看到某些场景,有没有一种方法可以以编程方式关闭整个管道?>

可能的解决方案是,我可以通过调用 FlinkKafkaConsumer010 内部定义的 close() 方法关闭 kafka 消费者源,然后关闭管道.对于这种方法,我创建了一个列表,其中包含对我在管道开头为 kafka 主题创建的所有 FlinkKafkaConsumer010 实例的引用.然后在管道的执行过程中,我有另一个线程调用列表中每个 FlinkKafkaConsumer010 的 close().我希望这应该关闭消费者,但结果是消费者仍在运行.

有人可以对此有所了解,或者就如何以编程方式在运行时关闭 flink 管道给我一些其他建议吗?

解决方案

您尝试响应的场景是否基于输入事件?如果是这样,我会建议在管道中的某个地方设置一个 MapFunction,并在满足某些条件时故意抛出异常以使作业失败.

另一种方法是查看KeyedDeserializationSchema 中的isEndOfStream 方法.基本上,当满足某个事件的条件时,表示流已结束.

要考虑的另一种选择是让上面提到的 MapFunction 改为 FlatMapFunction,向外界发送信号事件.Flink 作业外部的单独进程侦听该事件,并在接收到时通过 Flink CLI 关闭 Flink 作业.

I am using FlinkKafkaConsumer010 with Flink 1.2.0, and the problem I am facing is that: Is there a way that I can shut down the entire pipeline programmatically if some scenario is seen?

On possible solution is that I can shut down the kafka consumer source by calling the close() method defined inside of FlinkKafkaConsumer010, then the pipeline with shut down as well. For this approach, I create a list that contains the references to all FlinkKafkaConsumer010 instance that I created at the beginning of the pipeline for the kafka topics. Then during the execution of the pipeline, I have another thread that calls close() of each of the FlinkKafkaConsumer010 in my list. I expect that this should shut down the consumer, but the result is that the consumer is still running.

Can someone shed some light on this or give me some other suggestion on how can I shut down the flink pipeline at runtime programmatically?

解决方案

Is the scenario that you're trying to respond to based on the input events? If so, I would suggest to have a MapFunction somewhere appropriate in the pipeline, and just deliberately throw an exception to fail the job when some condition is met.

The other alternative is to look at the isEndOfStream method in KeyedDeserializationSchema. Basically, when the condition is met for some event, signal that the stream has ended.

One other option to consider is to let the MapFunction mentioned above be instead a FlatMapFunction, that send an signaling event to the outside world. A separate process external to the Flink job listens to that event, and when received, shutdown the Flink job via the Flink CLI.

这篇关于在运行时正常关闭 Flink Kafka Comsumer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆