Flink Kafka Producer中的只有一次语义 [英] Exactly-once semantics in Flink Kafka Producer

查看:16
本文介绍了Flink Kafka Producer中的只有一次语义的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Kafka Source和Sink测试Flink只需一次的语义:

  1. 运行Flink应用程序,只需将消息从一个主题传输到另一个主题,并行度=1,检查点间隔20秒
  2. 每隔2秒使用Python脚本生成整数递增的消息。
  3. 读取控制台使用者处于READ_COMMITTED隔离级别的输出主题。
  4. 手动终止TaskManager

我希望在输出主题中看到整数单调递增,而不考虑TaskManager终止和恢复。

但实际上a在控制台-消费者输出中看到了一些意想不到的东西:

32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45
看起来像在输出主题中重播的检查点之间的所有消息。 这是应该是正确的行为还是我做错了什么?

已恢复一个快照: Flink UI

我的闪烁代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");
    }

推荐答案

除了将生产者设置为只读一次语义外,还需要将消费者配置为只读取来自kafka的提交消息。默认情况下,使用者将读取已提交和未提交的消息。将此设置添加到您的使用者应使您更接近所需的行为。

consumerProperties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

这篇关于Flink Kafka Producer中的只有一次语义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆