在运行时正常关闭Flink Kafka Comsumer [英] Gracefully shut down Flink Kafka Comsumer at run time

查看:137
本文介绍了在运行时正常关闭Flink Kafka Comsumer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将FlinkKafkaConsumer010与Flink 1.2.0一起使用,而我面临的问题是:如果看到某种情况,是否可以通过编程方式关闭整个管道?>

可能的解决方案是,我可以通过调用FlinkKafkaConsumer010内部定义的 close()方法来关闭kafka使用者源,然后调用关闭的管道.对于这种方法,我创建了一个列表,其中包含对在kafka主题的管道开头创建的所有FlinkKafkaConsumer010实例的引用.然后,在管道执行期间,我有另一个线程调用列表中每个FlinkKafkaConsumer010的 close().我希望这可以关闭使用者,但是结果是使用者仍在运行.

有人可以对此有所启发吗?或者可以给我一些其他建议,说明如何在运行时以编程方式关闭flink管道?

解决方案

您要基于输入事件尝试响应的方案吗?如果是这样,我建议在管道中某个合适的位置放置一个MapFunction,并在满足某些条件时故意抛出异常以使工作失败.

另一种选择是查看 KeyedDeserializationSchema 中的 isEndOfStream 方法.基本上,当某个事件满足条件时,表示流已结束.

要考虑的另一个选择是让上述MapFunction改为FlatMapFunction,该函数将信令事件发送到外界.Flink作业外部的一个单独的进程侦听该事件,并在收到该事件后,通过Flink CLI关闭Flink作业.

I am using FlinkKafkaConsumer010 with Flink 1.2.0, and the problem I am facing is that: Is there a way that I can shut down the entire pipeline programmatically if some scenario is seen?

On possible solution is that I can shut down the kafka consumer source by calling the close() method defined inside of FlinkKafkaConsumer010, then the pipeline with shut down as well. For this approach, I create a list that contains the references to all FlinkKafkaConsumer010 instance that I created at the beginning of the pipeline for the kafka topics. Then during the execution of the pipeline, I have another thread that calls close() of each of the FlinkKafkaConsumer010 in my list. I expect that this should shut down the consumer, but the result is that the consumer is still running.

Can someone shed some light on this or give me some other suggestion on how can I shut down the flink pipeline at runtime programmatically?

解决方案

Is the scenario that you're trying to respond to based on the input events? If so, I would suggest to have a MapFunction somewhere appropriate in the pipeline, and just deliberately throw an exception to fail the job when some condition is met.

The other alternative is to look at the isEndOfStream method in KeyedDeserializationSchema. Basically, when the condition is met for some event, signal that the stream has ended.

One other option to consider is to let the MapFunction mentioned above be instead a FlatMapFunction, that send an signaling event to the outside world. A separate process external to the Flink job listens to that event, and when received, shutdown the Flink job via the Flink CLI.

这篇关于在运行时正常关闭Flink Kafka Comsumer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆