如何在Apache Kafka连接器中实现精确的一次语义 [英] how to achieve exactly once semantics in apache kafka connector

查看:14
本文介绍了如何在Apache Kafka连接器中实现精确的一次语义的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是Flink 1.8.0版。我的应用程序从Kafka->Transform->Publish to Kafka读取数据。为了避免在重启过程中出现任何重复,我希望使用具有恰好一次语义的Kafka Producer,请在此处阅读:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-011-and-newer

我的卡夫卡版本是1.1。

        return new FlinkKafkaProducer<String>( topic,  new KeyedSerializationSchema<String>() {


            public byte[] serializeKey(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public byte[] serializeValue(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public String getTargetTopic(String element) {
                // TODO Auto-generated method stub
                return topic;
            }
        },prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);

检查点代码:

    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setCheckpointTimeout(15 * 1000 );
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.enableCheckpointing(5000 );

如果我在Kafka Producer中仅添加一次语义,则我的Flink使用者不会读取任何新数据。

是否有人可以仅使用一次语义共享任何示例代码/应用程序?

请在此处查找完整代码:

https://github.com/sris2/sample_flink_exactly_once

谢谢

推荐答案

是否有人可以仅使用一次语义共享任何示例代码/应用程序?

end-to-end test in flink中隐藏了正好一次的示例。由于它使用了一些方便的函数,如果不查看整个repo,可能很难跟踪。

如果我在Kafka Producer(我的Flink消费者)中恰好添加一次语义 没有读取任何新数据。 [.] 请在此处找到完整代码:

https://github.com/sris2/sample_flink_exactly_once

我检查了您的代码,发现了问题(必须修复整个设置/代码才能真正运行)。接收器实际上无法正确配置事务。如Flink Kafka connector documentation中所写,您需要在您的Kafka Broker中将transaction.timeout.ms调整到最多1小时,或者在您的应用程序中将transaction.timeout.ms调整到15分钟:

    prop.setProperty("transaction.timeout.ms", "900000");

各自摘录如下:

Kafka代理默认情况下将transaction.max.timeout.ms设置为15分钟。此属性不允许为生成器设置大于其值的事务超时。默认情况下,FlinkKafkaProducer011将生产者配置中的transaction.timeout.ms属性设置为1小时,因此在使用Semanti.EXACTLY_ONCE模式之前应增加transaction.max.timeout.ms。

这篇关于如何在Apache Kafka连接器中实现精确的一次语义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆