使用嵌入式Kafka测试Flink [英] Testing Flink with embedded Kafka
问题描述
我有一个简单的Flink应用程序,该应用程序汇总了最后一分钟内具有相同ID和时间戳的事件:
I have a simple Flink application, which sums up the events with the same id and timestamp within the last minute:
DataStream<String> input = env
.addSource(consumerProps)
.uid("app");
DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));
pixels
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(simpleNotificationServiceSink);
env.execute(jobName);
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
public TimestampsAndWatermarks() {
super(Time.seconds(90));
}
// timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
@Override
public long extractTimestamp(Pixel pixel) {
return Long.parseLong(pixel.timestampReadable);
}
}
我想实现这种情况:
-
启动嵌入式Kafka
Start embedded Kafka
将几个消息发布到主题
使用Flink消费消息
Consume the messages with Flink
检查Flink产生的输出的正确性
Check the correctness of the output produced by Flink
Flink是否提供实用程序来测试嵌入式Kafka的工作?如果是,推荐的方法是什么?
Does Flink provides utilities to test the job with embedded Kafka? If yes, what is the recommended approach?
谢谢.
推荐答案
有一个JUnit规则可用于生成嵌入式Kafka-请参阅(请参阅
There's a JUnit rule you can use to bring up an embedded Kafka -- see (see https://github.com/charithe/kafka-junit).
要完全终止测试,请尝试以下操作:
To have tests that terminate cleanly, try something like this:
public class TestDeserializer extends YourKafkaDeserializer<T> {
public final static String END_APP_MARKER = "END_APP_MARKER"; // tests send as last record
@Override
public boolean isEndOfStream(ParseResult<T> nextElement) {
if (nextElement.getParseError() == null)
return false;
if (END_APP_MARKER.equals(nextElement.getParseError().getRawData()))
return true;
return false;
}
}
这篇关于使用嵌入式Kafka测试Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!