如何从程序中停止 flink 流作业 [英] How to stop a flink streaming job from program

查看:40
本文介绍了如何从程序中停止 flink 流作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用 FlinkKafkaProducer09FlinkKafkaConsumer09 从同一 kafka 主题读取数据.我正在产品中传递测试数据:

I am trying to create a JUnit test for a Flink streaming job which writes data to a kafka topic and read data from the same kafka topic using FlinkKafkaProducer09 and FlinkKafkaConsumer09 respectively. I am passing a test data in the produce:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

并检查来自消费者的数据是否与以下内容相同:

And checking whether same data is coming from the consumer as:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);

使用TestListResultSink.

通过打印流,我能够按预期看到来自消费者的数据.但是无法获得 Junit 测试结果,因为即使在消息完成后消费者仍会继续运行.所以它没有来测试部分.

I am able to see the data coming from the consumer as expected by printing the stream. But could not get the Junit test result as the consumer will keep on running even after the message finished. So it did not come to test part.

FlinkFlinkKafkaConsumer09 有没有办法停止进程或运行特定时间?

Is thre any way in Flink or FlinkKafkaConsumer09 to stop the process or to run for specific time?

推荐答案

潜在的问题是流媒体程序通常不是有限的并且可以无限期地运行.

The underlying problem is that streaming programs are usually not finite and run indefinitely.

至少目前最好的方法是在您的流中插入一个特殊的控制消息,让源正确终止(只需通过离开读取循环来停止读取更多数据).这样 Flink 就会告诉所有下游的运营商,他们可以在消耗完所有数据后停止.

The best way, at least for the moment, is to insert a special control message into your stream which lets the source properly terminate (simply stop reading more data by leaving the reading loop). That way Flink will tell all down-stream operators that they can stop after they have consumed all data.

或者,您可以在源中抛出一个特殊异常(例如在一段时间后),以便您可以区分正确"终止和失败情况(通过检查错误原因).在源中抛出异常会使程序失败.

Alternatively, you can throw a special exception in your source (e.g. after some time) such that you can distinguish a "proper" termination from a failure case (by checking the error cause). Throwing an exception in the source will fail the program.

这篇关于如何从程序中停止 flink 流作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆