为什么kafka流会重新处理broker重启后产生的消息 [英] Why does kafka streams reprocess the messages produced after broker restart

查看:32
本文介绍了为什么kafka流会重新处理broker重启后产生的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个单节点 kafka 代理和简单的流应用程序.我创建了 2 个主题(topic1 和 topic2).

I have a single node kafka broker and simple streams application. I created 2 topics (topic1 and topic2).

产生于 topic1 - 处理过的消息 - 写入 topic2

注意:对于产生的每条消息,只有一条消息写入目标主题

Note: For each message produced only one message is written to destination topic

我生成了一条消息.写入 topic2 后,我停止了 kafka 代理.一段时间后,我重新启动了代理并在 topic1 上生成了另一条消息.现在流应用程序处理了该消息 3 次.现在没有停止代理,我向 topic1 生成消息,并等待流应用程序写入 topic2,然后再生成.

I produced a single message. After it was written to topic2, I stopped the kafka broker. After sometime I restarted the broker and produced another message on topic1. Now streams app processed that message 3 times. Now without stopping the broker I produced messages to topic1 and waited for streams app to write to topic2 before producing again.

Streams 应用程序运行异常.有时对于一条生成的消息,有 2 条消息写入目标主题,有时是 3 条.我不明白为什么会发生这种情况.我的意思是,即使是代理重启后产生的消息也会被复制.

Streams app is behaving strangely. Sometimes for one produced message there are 2 messages written to destination topic and sometimes 3. I don't understand why is this happening. I mean even the messages produced after broker restart are being duplicated.

更新 1:

我使用的是 Kafka 1.0.0 版和 Kafka-Streams 1.1.0 版

I am using Kafka version 1.0.0 and Kafka-Streams version 1.1.0

下面是代码.

Main.java

String credentials = env.get("CREDENTIALS");

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "activity-collection");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> activityStream = builder.stream("activity_contenturl");
KStream<String, String> activityResultStream = AppUtil.hitContentUrls(credentials , activityStream);
activityResultStream.to("o365_user_activity");

AppUtil.java

public static KStream<String, String> hitContentUrls(String credentials, KStream<String, String> activityStream) {

        KStream<String, String> activityResultStream = activityStream
                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {

                        ArrayList<String> log = new ArrayList<String>();
                        JSONObject received = new JSONObject(value);
                        String url = received.get("url").toString();

                        String accessToken = ServiceUtil.getAccessToken(credentials);
                        JSONObject activityLog = ServiceUtil.getActivityLogs(url, accessToken);

                        log.add(activityLog.toString());
                    }
                    return log;
                }                   
            });

                return activityResultStream;
    }

更新 2:

在具有上述配置的单代理和单分区环境中,我启动了 Kafka 代理和流应用程序.在源主题上生成了 6 条消息,当我在目标主题上启动消费者时,有 36 条消息并且还在计数.他们不断地来.

In a single broker and single partition environment with the above config, I started the Kafka broker and streams app. Produced 6 messages on source topic and when I started a consumer on destination topic there are 36 messages and counting. They keep on coming.

所以我运行这个来查看consumer-groups:

So I ran this to see consumer-groups:

kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

输出:

streams-collection-app-0

接下来我运行了这个:

kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app-0

输出:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                                                HOST            CLIENT-ID
o365_activity_contenturl 0          1               1               0               streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer-3a2940c2-47ab-49a0-ba72-4e49d341daee /127.0.0.1      streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer

过了一会儿,输出显示:

After a while the output showed this:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
o365_activity_contenturl 0          1               6               5               -               -               -

然后:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
o365_activity_contenturl 0          1               7               6               -               -               -

推荐答案

似乎您正面临着已知的限制.默认情况下,Kafka 主题存储消息至少 7 天,但提交的偏移量存储 1 天(默认配置值 offsets.retention.minutes = 1440).因此,如果超过 1 天没有向您的源主题生成消息,则在应用程序重新启动后,将再次重新处理来自主题的所有消息(实际上多次,取决于重新启动的次数,每个此类主题每天最多 1 次,但传入消息很少).

seems you are facing with known limitation. Kafka topic by default stores messages at least 7 days, but committed offsets stored for 1 day (default config value offsets.retention.minutes = 1440). so if no messages were produced to your source topic during more than 1 day, after app restart all messages from topic will be reprocessed again (actually multiple times, depending on number of restarts, max 1 time per day per such topic with rare incoming messages).

你可以找到过期提交偏移量的描述 消费组的偏移量如何过期.

you could find description of expiration committed offsets How does an offset expire for consumer group.

在 kafka 2.0 版中增加了提交偏移量的保留率 KIP-186:将抵消保留默认增加到 7 天.

in kafka version 2.0 retention for committed offsets was increased KIP-186: Increase offsets retention default to 7 days.

为了防止重新处理,您可以添加消费者属性auto.offset.reset:latest(默认值为earliest).latest 存在一个小风险:如果当天没有人将消息生成到源主题中,然后您重新启动应用程序,您可能会丢失一些消息(只有在重新启动期间准确到达的消息).

to prevent reprocessing, you could add consumer property auto.offset.reset: latest (default value is earliest). there is exist a small risk with latest: if no one produced message into a source topic longer that day, and after that you restarted app, you could lost some messages (only messages which arrived exactly during restart).

这篇关于为什么kafka流会重新处理broker重启后产生的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆