使用嵌入式Kafka测试Flink [英] Testing Flink with embedded Kafka

查看:153
本文介绍了使用嵌入式Kafka测试Flink的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的Flink应用程序,该应用程序汇总了最后一分钟内具有相同ID和时间戳的事件:

I have a simple Flink application, which sums up the events with the same id and timestamp within the last minute:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));

pixels
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(simpleNotificationServiceSink);


env.execute(jobName);


private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
        public TimestampsAndWatermarks() {
            super(Time.seconds(90));
        }

        // timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
        @Override
        public long extractTimestamp(Pixel pixel) {
            return Long.parseLong(pixel.timestampReadable);
        }
    }

我想实现这种情况:

  1. 启动嵌入式Kafka

  1. Start embedded Kafka

将几个消息发布到主题

使用Flink消费消息

Consume the messages with Flink

检查Flink产生的输出的正确性

Check the correctness of the output produced by Flink

Flink是否提供实用程序来测试嵌入式Kafka的工作?如果是,推荐的方法是什么?

Does Flink provides utilities to test the job with embedded Kafka? If yes, what is the recommended approach?

谢谢.

推荐答案

有一个JUnit规则可用于生成嵌入式Kafka-请参阅(请参阅

There's a JUnit rule you can use to bring up an embedded Kafka -- see (see https://github.com/charithe/kafka-junit).

要完全终止测试,请尝试以下操作:

To have tests that terminate cleanly, try something like this:

public class TestDeserializer extends YourKafkaDeserializer<T> {
  public final static String END_APP_MARKER = "END_APP_MARKER"; // tests send as last record

  @Override
  public boolean isEndOfStream(ParseResult<T> nextElement) {
    if (nextElement.getParseError() == null)
      return false;

    if (END_APP_MARKER.equals(nextElement.getParseError().getRawData()))
      return true;

    return false;
  }
}

这篇关于使用嵌入式Kafka测试Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆