如何从程序停止flink流作业 [英] How to stop a flink streaming job from program

查看:1221
本文介绍了如何从程序停止flink流作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为Flink流作业创建JUnit测试,该作业将数据写入kafka主题,并分别使用FlinkKafkaProducer09FlinkKafkaConsumer09从同一kafka主题读取数据.我正在农产品中传递测试数据:

I am trying to create a JUnit test for a Flink streaming job which writes data to a kafka topic and read data from the same kafka topic using FlinkKafkaProducer09 and FlinkKafkaConsumer09 respectively. I am passing a test data in the produce:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

并检查是否来自消费者的数据与以下内容相同:

And checking whether same data is coming from the consumer as:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);

使用TestListResultSink.

通过打印流,我能够看到来自使用者的数据.但是无法获得Junit测试结果,因为即使消息完成后,使用者仍将继续运行.因此,它并没有成为测试的一部分.

I am able to see the data coming from the consumer as expected by printing the stream. But could not get the Junit test result as the consumer will keep on running even after the message finished. So it did not come to test part.

FlinkFlinkKafkaConsumer09中有什么方法可以停止进程或在特定时间运行?

Is thre any way in Flink or FlinkKafkaConsumer09 to stop the process or to run for specific time?

推荐答案

潜在的问题是流式程序通常不是有限的并且可以无限期地运行.

The underlying problem is that streaming programs are usually not finite and run indefinitely.

至少在目前,最好的方法是在流中插入一条特殊的控制消息,以使源正确终止(只需退出读取循环即可停止读取更多数据).这样,Flink会告诉所有下游运营商,他们在消耗完所有数据后可以停止.

The best way, at least for the moment, is to insert a special control message into your stream which lets the source properly terminate (simply stop reading more data by leaving the reading loop). That way Flink will tell all down-stream operators that they can stop after they have consumed all data.

或者,您可以在源中抛出一个特殊的异常(例如,一段时间后),以便您可以将正确的"终止与故障情况区分开(通过检查错误原因).在源中引发异常将使程序失败.

Alternatively, you can throw a special exception in your source (e.g. after some time) such that you can distinguish a "proper" termination from a failure case (by checking the error cause). Throwing an exception in the source will fail the program.

这篇关于如何从程序停止flink流作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆