当检查点还原时,flink kafkaproducer以完全一次的模式发送重复消息 [英] flink kafkaproducer send duplicate message in exactly once mode when checkpoint restore

查看:103
本文介绍了当检查点还原时,flink kafkaproducer以完全一次的模式发送重复消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在写一个案例来测试flink两步提交,下面是概述.

I am writing a case to test flink two step commit, below is overview.

sink kafka 曾经是kafka的生产者. sink step 是mysql接收器扩展两步提交. sink compare 是mysql接收器扩展两步提交,并且该接收器偶尔会抛出一个异常来模拟检查点失败.

sink kafka is exactly once kafka producer. sink step is mysql sink extend two step commit. sink compare is mysql sink extend two step commit, and this sink will occasionally throw a exeption to simulate checkpoint failed.

当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是,kafka使用者将读取上次成功的偏移量,即使在此检查点失败之前,kafka生产者也会产生消息.

When checkpoint is failed and restore, I find mysql two step commit will work fine, but kafka consumer will read offset from last success and kafka producer produce messages even he was done it before this checkpoint failed.

在这种情况下如何避免重复消息?

How to avoid duplicate message in this case?

感谢您的帮助.

环境:

  • 链接1.9.1

  • flink 1.9.1

java 1.8

kafka 2.11

kafka 2.11

kafka生产者代码:

        dataStreamReduce.addSink(new FlinkKafkaProducer<>(
                "flink_output",
                new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
                        UUID uuid = UUID.randomUUID();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("uuid", uuid.toString());
                        jsonObject.put("key1", element.f0);
                        jsonObject.put("key2", element.f1);
                        jsonObject.put("key3", element.f2);
                        jsonObject.put("indicate", element.f3);
                        return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
                    }
                },
                kafkaProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        )).name("sink kafka");

检查点设置:

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(10000);
        executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
        executionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);

mysql接收器:

dataStreamReduce.addSink(
                new TwoPhaseCommitSinkFunction<Tuple4<String, String, String, Long>,
                        Connection, Void>
                        (new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE) {

                    int count = 0;
                    Connection connection;

                    @Override
                    protected void invoke(Connection transaction, Tuple4<String, String, String, Long> value, Context context) throws Exception {
                        if (count > 10) {
                            throw new Exception("compare test exception.");
                        }
                        PreparedStatement ps = transaction.prepareStatement(
                                " insert into test_two_step_compare(slot_time, key1, key2, key3, indicate) " +
                                        " values(?, ?, ?, ?, ?) " +
                                        " ON DUPLICATE KEY UPDATE indicate = indicate + values(indicate) "
                        );
                        ps.setString(1, context.timestamp().toString());
                        ps.setString(2, value.f0);
                        ps.setString(3, value.f1);
                        ps.setString(4, value.f1);
                        ps.setLong(5, value.f3);
                        ps.execute();
                        ps.close();
                        count += 1;
                    }

                    @Override
                    protected Connection beginTransaction() throws Exception {
                        LOGGER.error("compare in begin transaction");
                        try {
                            if (connection.isClosed()) {
                                throw new Exception("mysql connection closed");
                            }
                        }catch (Exception e) {
                            LOGGER.error("mysql connection is error: " + e.toString());
                            LOGGER.error("reconnect mysql connection");
                            String jdbcURI = "jdbc:mysql://";
                            Class.forName("com.mysql.jdbc.Driver");
                            Connection connection = DriverManager.getConnection(jdbcURI);
                            connection.setAutoCommit(false);
                            this.connection = connection;
                        }
                        return this.connection;
                    }

                    @Override
                    protected void preCommit(Connection transaction) throws Exception {
                        LOGGER.error("compare in pre Commit");
                    }

                    @Override
                    protected void commit(Connection transaction) {
                        LOGGER.error("compare in commit");
                        try {
                            transaction.commit();
                        } catch (Exception e) {
                            LOGGER.error("compare Commit error: " + e.toString());
                        }
                    }

                    @Override
                    protected void abort(Connection transaction) {
                        LOGGER.error("compare in abort");
                        try {
                            transaction.rollback();
                        } catch (Exception e) {
                            LOGGER.error("compare abort error." + e.toString());
                        }
                    }

                    @Override
                    protected void recoverAndCommit(Connection transaction) {
                        super.recoverAndCommit(transaction);
                        LOGGER.error("compare in recover And Commit");
                    }

                    @Override
                    protected void recoverAndAbort(Connection transaction) {
                        super.recoverAndAbort(transaction);
                        LOGGER.error("compare in recover And Abort");
                    }
                })
                .setParallelism(1).name("sink compare");

推荐答案

我不太确定我是否正确理解了这个问题:

I'm not quite sure I understand the question correctly:

当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka生产者将读取上次成功的偏移量,并产生消息,即使他在此检查点失败之前完成了操作.

When checkpoint is failed and restore, I find mysql two step commit will work fine, but kafka producer will read offset from last success and produce message even he was done it before this checkpoint failed.

Kafka生产者未读取任何数据.因此,我假设您的整个管道会重新读取旧的偏移量并产生重复项.如果是这样,您需要了解Flink如何确保一次准确地完成

Kafka producer is not reading any data. So, I'm assuming your whole pipeline rereads old offsets and produces duplicates. If so, you need to understand how Flink ensures exactly once.

  1. 创建定期检查点以在发生故障的情况下具有一致的状态.
  2. 这些检查点包含检查点时最后一次成功读取的记录的偏移量.
  3. 恢复时,Flink将从存储在最后一个成功检查点中的偏移量中重新读取所有记录.因此,将重播与在最后一个检查点和故障之间生成的记录相同的记录.
  4. 重播的记录将在故障发生前立即恢复状态.
  5. 它将产生来自重复输入记录的重复输出.
  6. 接收器的责任是确保没有重复项被有效地写入目标系统.

最后一点,有两个选择:

For the last point, there are two options:

  • 仅在写入检查点后才输出数据,这样在目标中就不会出现有效的重复项.这种幼稚的方法非常通用(独立于接收器),但是会增加检查点间隔到延迟中.
  • 让接收器对输出进行重复数据删除.

后一个选项用于Kafka接收器.它使用Kafka事务进行重复数据删除.为了避免在用户端出现重复,您需要确保它没有读取

The latter option is used for the Kafka sink. It uses Kafka transactions for letting it deduplicate data. To avoid duplicates on consumer side, you need to ensure it's not reading uncommitted data as mentioned in the documentation. Also make sure your transaction timeout is large enough that it doesn't discard data between failure and recovery.

这篇关于当检查点还原时,flink kafkaproducer以完全一次的模式发送重复消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆