为什么Kafka流会重新处理代理重新启动后产生的消息 [英] Why does kafka streams reprocess the messages produced after broker restart
问题描述
我有一个单节点kafka代理和简单流应用程序.我创建了两个主题(topic1和topic2).
I have a single node kafka broker and simple streams application. I created 2 topics (topic1 and topic2).
Produced on topic1 - processed message - write to topic2
注意:对于产生的每条消息,仅将一条消息写入目标主题
Note: For each message produced only one message is written to destination topic
我产生了一条消息.将其写入topic2后,我停止了kafka经纪人.一段时间后,我重新启动了代理,并在topic1上产生了另一条消息.现在,流应用处理了该消息3次.现在,我没有停止代理,而是向主题1生成了消息,并等待流应用写入主题2,然后再次生成.
I produced a single message. After it was written to topic2, I stopped the kafka broker. After sometime I restarted the broker and produced another message on topic1. Now streams app processed that message 3 times. Now without stopping the broker I produced messages to topic1 and waited for streams app to write to topic2 before producing again.
Streams应用程序的行为异常.有时对于一条产生的消息,有2条消息写到目标主题,有时是3条.我不明白为什么会这样.我的意思是即使代理重新启动后产生的消息也被复制了.
Streams app is behaving strangely. Sometimes for one produced message there are 2 messages written to destination topic and sometimes 3. I don't understand why is this happening. I mean even the messages produced after broker restart are being duplicated.
更新1:
我正在使用1.0.0版的Kafka和1.1.0版的Kafka-Streams
I am using Kafka version 1.0.0 and Kafka-Streams version 1.1.0
下面是代码.
Main.java
String credentials = env.get("CREDENTIALS");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "activity-collection");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> activityStream = builder.stream("activity_contenturl");
KStream<String, String> activityResultStream = AppUtil.hitContentUrls(credentials , activityStream);
activityResultStream.to("o365_user_activity");
AppUtil.java
public static KStream<String, String> hitContentUrls(String credentials, KStream<String, String> activityStream) {
KStream<String, String> activityResultStream = activityStream
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> log = new ArrayList<String>();
JSONObject received = new JSONObject(value);
String url = received.get("url").toString();
String accessToken = ServiceUtil.getAccessToken(credentials);
JSONObject activityLog = ServiceUtil.getActivityLogs(url, accessToken);
log.add(activityLog.toString());
}
return log;
}
});
return activityResultStream;
}
更新2:
在具有上述配置的单个代理和单个分区环境中,我启动了Kafka代理和流应用程序.在源主题上产生了6条消息,当我在目标主题上启动了使用者时,有36条消息并且还在计数.他们不断来.
In a single broker and single partition environment with the above config, I started the Kafka broker and streams app. Produced 6 messages on source topic and when I started a consumer on destination topic there are 36 messages and counting. They keep on coming.
所以我运行它来查看consumer-groups
:
kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
输出:
streams-collection-app-0
接下来我运行这个:
kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app-0
输出:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 1 0 streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer-3a2940c2-47ab-49a0-ba72-4e49d341daee /127.0.0.1 streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer
一段时间后,输出显示以下内容:
After a while the output showed this:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 6 5 - - -
然后:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 7 6 - - -
推荐答案
您似乎面临已知限制.默认情况下,Kafka主题将消息存储至少7天,但是将提交的偏移量存储1天(默认配置值offsets.retention.minutes = 1440
).因此,如果在超过1天的时间内没有对您的源主题产生任何消息,则应用重新启动后,来自该主题的所有消息将再次进行重新处理(实际上是多次,具体取决于重新启动的次数,每个此类主题每天最多1次,包含罕见的传入消息) ).
seems you are facing with known limitation. Kafka topic by default stores messages at least 7 days, but committed offsets stored for 1 day (default config value offsets.retention.minutes = 1440
). so if no messages were produced to your source topic during more than 1 day, after app restart all messages from topic will be reprocessed again (actually multiple times, depending on number of restarts, max 1 time per day per such topic with rare incoming messages).
您可以找到过期承诺偏移量的描述补偿对消费组如何过期.
you could find description of expiration committed offsets How does an offset expire for consumer group.
已提交的偏移量的保留增加了 KIP-186:将偏移量保留时间默认设置为7天.
in kafka version 2.0 retention for committed offsets was increased KIP-186: Increase offsets retention default to 7 days.
为防止重新处理,可以添加使用者属性auto.offset.reset: latest
(默认值为earliest
).
latest
的风险很小:如果当天没有人在源主题中产生任何消息,而在重启应用后,您可能会丢失一些消息(仅消息是在重新启动期间准确到达的).
to prevent reprocessing, you could add consumer property auto.offset.reset: latest
(default value is earliest
).
there is exist a small risk with latest
: if no one produced message into a source topic longer that day, and after that you restarted app, you could lost some messages (only messages which arrived exactly during restart).
这篇关于为什么Kafka流会重新处理代理重新启动后产生的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!