使用嵌入式 Kafka 测试 Flink [英] Testing Flink with embedded Kafka
问题描述
我有一个简单的 Flink 应用程序,它总结了最后一分钟内具有相同 id 和时间戳的事件:
I have a simple Flink application, which sums up the events with the same id and timestamp within the last minute:
DataStream<String> input = env
.addSource(consumerProps)
.uid("app");
DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));
pixels
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(simpleNotificationServiceSink);
env.execute(jobName);
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
public TimestampsAndWatermarks() {
super(Time.seconds(90));
}
// timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
@Override
public long extractTimestamp(Pixel pixel) {
return Long.parseLong(pixel.timestampReadable);
}
}
我想实现这个场景:
启动嵌入式 Kafka
Start embedded Kafka
向主题发布几条消息
使用 Flink 消费消息
Consume the messages with Flink
检查 Flink 产生的输出的正确性
Check the correctness of the output produced by Flink
Flink 是否提供实用程序来测试嵌入式 Kafka 的作业?如果是,推荐的方法是什么?
Does Flink provides utilities to test the job with embedded Kafka? If yes, what is the recommended approach?
谢谢.
推荐答案
您可以使用 JUnit 规则来启动嵌入式 Kafka -- 请参阅(请参阅 https://github.com/charithe/kafka-junit).
There's a JUnit rule you can use to bring up an embedded Kafka -- see (see https://github.com/charithe/kafka-junit).
要让测试完全终止,请尝试以下操作:
To have tests that terminate cleanly, try something like this:
public class TestDeserializer extends YourKafkaDeserializer<T> {
public final static String END_APP_MARKER = "END_APP_MARKER"; // tests send as last record
@Override
public boolean isEndOfStream(ParseResult<T> nextElement) {
if (nextElement.getParseError() == null)
return false;
if (END_APP_MARKER.equals(nextElement.getParseError().getRawData()))
return true;
return false;
}
}
这篇关于使用嵌入式 Kafka 测试 Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!