使用嵌入式 Kafka 测试 Flink [英] Testing Flink with embedded Kafka

查看:27
本文介绍了使用嵌入式 Kafka 测试 Flink的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的 Flink 应用程序,它总结了最后一分钟内具有相同 id 和时间戳的事件:

I have a simple Flink application, which sums up the events with the same id and timestamp within the last minute:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));

pixels
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(simpleNotificationServiceSink);


env.execute(jobName);


private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
        public TimestampsAndWatermarks() {
            super(Time.seconds(90));
        }

        // timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
        @Override
        public long extractTimestamp(Pixel pixel) {
            return Long.parseLong(pixel.timestampReadable);
        }
    }

我想实现这个场景:

  1. 启动嵌入式 Kafka

  1. Start embedded Kafka

向主题发布几条消息

使用 Flink 消费消息

Consume the messages with Flink

检查 Flink 产生的输出的正确性

Check the correctness of the output produced by Flink

Flink 是否提供实用程序来测试嵌入式 Kafka 的作业?如果是,推荐的方法是什么?

Does Flink provides utilities to test the job with embedded Kafka? If yes, what is the recommended approach?

谢谢.

推荐答案

您可以使用 JUnit 规则来启动嵌入式 Kafka -- 请参阅(请参阅 https://github.com/charithe/kafka-junit).

There's a JUnit rule you can use to bring up an embedded Kafka -- see (see https://github.com/charithe/kafka-junit).

要让测试完全终止,请尝试以下操作:

To have tests that terminate cleanly, try something like this:

public class TestDeserializer extends YourKafkaDeserializer<T> {
  public final static String END_APP_MARKER = "END_APP_MARKER"; // tests send as last record

  @Override
  public boolean isEndOfStream(ParseResult<T> nextElement) {
    if (nextElement.getParseError() == null)
      return false;

    if (END_APP_MARKER.equals(nextElement.getParseError().getRawData()))
      return true;

    return false;
  }
}

这篇关于使用嵌入式 Kafka 测试 Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆