当检查点恢复时,flink kafkaproducer 以完全一次模式发送重复消息 [英] flink kafkaproducer send duplicate message in exactly once mode when checkpoint restore

查看:160
本文介绍了当检查点恢复时,flink kafkaproducer 以完全一次模式发送重复消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在写一个案例来测试 flink 两步提交,下面是概述.

I am writing a case to test flink two step commit, below is overview.

sink kafka 恰好是一次 kafka 生产者.sink step 是mysql sink 扩展两步提交.sink compare是mysql sink扩展两步提交,这个sink偶尔会抛出一个异常来模拟checkpoint failed.

sink kafka is exactly once kafka producer. sink step is mysql sink extend two step commit. sink compare is mysql sink extend two step commit, and this sink will occasionally throw a exeption to simulate checkpoint failed.

当检查点失败并恢复时,我发现 mysql 两步提交可以正常工作,但是 kafka 消费者将从上次成功读取偏移量,而 kafka 生产者即使在此检查点失败之前完成了它也会产生消息.

When checkpoint is failed and restore, I find mysql two step commit will work fine, but kafka consumer will read offset from last success and kafka producer produce messages even he was done it before this checkpoint failed.

在这种情况下如何避免重复消息?

How to avoid duplicate message in this case?

感谢您的帮助.

环境:

  • flink 1.9.1

  • flink 1.9.1

Java 1.8

卡夫卡 2.11

kafka 生产者代码:

        dataStreamReduce.addSink(new FlinkKafkaProducer<>(
                "flink_output",
                new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
                        UUID uuid = UUID.randomUUID();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("uuid", uuid.toString());
                        jsonObject.put("key1", element.f0);
                        jsonObject.put("key2", element.f1);
                        jsonObject.put("key3", element.f2);
                        jsonObject.put("indicate", element.f3);
                        return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
                    }
                },
                kafkaProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        )).name("sink kafka");

检查点设置:

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(10000);
        executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
        executionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);

mysql 接收器:

dataStreamReduce.addSink(
                new TwoPhaseCommitSinkFunction<Tuple4<String, String, String, Long>,
                        Connection, Void>
                        (new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE) {

                    int count = 0;
                    Connection connection;

                    @Override
                    protected void invoke(Connection transaction, Tuple4<String, String, String, Long> value, Context context) throws Exception {
                        if (count > 10) {
                            throw new Exception("compare test exception.");
                        }
                        PreparedStatement ps = transaction.prepareStatement(
                                " insert into test_two_step_compare(slot_time, key1, key2, key3, indicate) " +
                                        " values(?, ?, ?, ?, ?) " +
                                        " ON DUPLICATE KEY UPDATE indicate = indicate + values(indicate) "
                        );
                        ps.setString(1, context.timestamp().toString());
                        ps.setString(2, value.f0);
                        ps.setString(3, value.f1);
                        ps.setString(4, value.f1);
                        ps.setLong(5, value.f3);
                        ps.execute();
                        ps.close();
                        count += 1;
                    }

                    @Override
                    protected Connection beginTransaction() throws Exception {
                        LOGGER.error("compare in begin transaction");
                        try {
                            if (connection.isClosed()) {
                                throw new Exception("mysql connection closed");
                            }
                        }catch (Exception e) {
                            LOGGER.error("mysql connection is error: " + e.toString());
                            LOGGER.error("reconnect mysql connection");
                            String jdbcURI = "jdbc:mysql://";
                            Class.forName("com.mysql.jdbc.Driver");
                            Connection connection = DriverManager.getConnection(jdbcURI);
                            connection.setAutoCommit(false);
                            this.connection = connection;
                        }
                        return this.connection;
                    }

                    @Override
                    protected void preCommit(Connection transaction) throws Exception {
                        LOGGER.error("compare in pre Commit");
                    }

                    @Override
                    protected void commit(Connection transaction) {
                        LOGGER.error("compare in commit");
                        try {
                            transaction.commit();
                        } catch (Exception e) {
                            LOGGER.error("compare Commit error: " + e.toString());
                        }
                    }

                    @Override
                    protected void abort(Connection transaction) {
                        LOGGER.error("compare in abort");
                        try {
                            transaction.rollback();
                        } catch (Exception e) {
                            LOGGER.error("compare abort error." + e.toString());
                        }
                    }

                    @Override
                    protected void recoverAndCommit(Connection transaction) {
                        super.recoverAndCommit(transaction);
                        LOGGER.error("compare in recover And Commit");
                    }

                    @Override
                    protected void recoverAndAbort(Connection transaction) {
                        super.recoverAndAbort(transaction);
                        LOGGER.error("compare in recover And Abort");
                    }
                })
                .setParallelism(1).name("sink compare");

推荐答案

我不太确定我是否正确理解了这个问题:

I'm not quite sure I understand the question correctly:

当检查点失败并恢复时,我发现 mysql 两步提交可以正常工作,但 kafka 生产者将从上次成功读取偏移量并生成消息,即使他在此检查点失败之前完成了它.

When checkpoint is failed and restore, I find mysql two step commit will work fine, but kafka producer will read offset from last success and produce message even he was done it before this checkpoint failed.

Kafka 生产者没有读取任何数据.因此,我假设您的整个管道重新读取旧偏移量并产生重复项.如果是这样,您需要了解 Flink 如何确保恰好一次.

Kafka producer is not reading any data. So, I'm assuming your whole pipeline rereads old offsets and produces duplicates. If so, you need to understand how Flink ensures exactly once.

  1. 创建定期检查点以在出现故障时保持一致的状态.
  2. 这些检查点包含检查点时最后成功读取的记录的偏移量.
  3. 恢复后,Flink 将从存储在最后一个成功检查点的偏移量中重新读取所有记录.因此,将重放上次检查点和失败之间生成的相同记录.
  4. 重放的记录将恢复故障前的状态.
  5. 它将产生源自重放输入记录的重复输出.
  6. 接收器有责任确保没有重复项有效地写入目标系统.

对于最后一点,有两个选项:

For the last point, there are two options:

  • 仅在写入检查点时输出数据,这样目标中就不会出现有效的重复项.这种简单的方法非常通用(独立于接收器),但会在延迟中增加检查点间隔.
  • 让接收器对输出进行重复数据删除.

后一个选项用于 Kafka sink.它使用 Kafka 事务来删除重复数据.为了避免消费者方面的重复,您需要确保它没有阅读 文档中提到的未提交数据.还要确保您的事务超时时间足够长,以免在失败和恢复之间丢弃数据.

The latter option is used for the Kafka sink. It uses Kafka transactions for letting it deduplicate data. To avoid duplicates on consumer side, you need to ensure it's not reading uncommitted data as mentioned in the documentation. Also make sure your transaction timeout is large enough that it doesn't discard data between failure and recovery.

这篇关于当检查点恢复时,flink kafkaproducer 以完全一次模式发送重复消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆