flink kafka 消费者组 ID 不起作用 [英] flink kafka consumer groupId not working
问题描述
我正在使用带有 flink 的 kafka.在一个简单的程序中,我使用了flinks FlinkKafkaConsumer09,给它分配了组id.
I am using kafka with flink. In a simple program, I used flinks FlinkKafkaConsumer09, assigned the group id to it.
根据 Kafka 的行为,当我使用相同的 group.Id 在同一个主题上运行 2 个消费者时,它应该像消息队列一样工作.我认为它应该像这样工作:如果向 Kafka 发送 2 条消息,则每个或其中一个 flink 程序将总共处理 2 条消息两次(假设总共输出 2 行).
According to Kafka's behavior, when I run 2 consumers on the same topic with same group.Id, it should work like a message queue. I think it's supposed to work like: If 2 messages sent to Kafka, each or one of the flink program would process the 2 messages totally twice(let's say 2 lines of output in total).
但实际的结果是,每个程序都会收到2条消息.
But the actual result is that, each program would receive 2 pieces of the messages.
我尝试使用 kafka 服务器下载附带的消费者客户端.它以记录在案的方式工作(处理了 2 条消息).
我尝试在 flink 程序的同一个 Main 函数中使用 2 个 kafka 消费者.总共处理了 4 条消息.
我还尝试运行了 2 个 flink 实例,并为每个实例分配了相同的 kafka 消费者程序.4 条消息.
I have tried to use consumer client that came with the kafka server download. It worked in the documented way(2 messages processed).
I tried to use 2 kafka consumers in the same Main function of a flink programe. 4 messages processed totally.
I also tried to run 2 instances of flink, and assigned each one of them the same program of kafka consumer. 4 messages.
有什么想法吗?这是我期望的输出:
Any ideas? This is the output I expect:
1> Kafka and Flink2 says: element-65
2> Kafka and Flink1 says: element-66
这是我总是得到的错误输出:
Here's the wrong output i always get:
1> Kafka and Flink2 says: element-65
1> Kafka and Flink1 says: element-65
2> Kafka and Flink2 says: element-66
2> Kafka and Flink1 says: element-66
这是代码段:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink1 says: " + value;
}
}).print();
env.execute();
}
我试过两次运行它,也以另一种方式运行:在 Main 函数中为每个数据流创建 2 个数据流和 env.execute().
I have tried to run it twice and also in the other way: create 2 datastreams and env.execute() for each one in the Main function.
推荐答案
今天在 Flink 用户邮件列表上有一个非常相似的问题,但我找不到在这里发布的链接.所以这里是答案的一部分:
There was a quite similar question on the Flink user mailing list today, but I can't find the link to post it here. So here a part of the answer:
"在内部,Flink Kafka 连接器不使用消费者组管理功能,因为他们使用较低级别的 API(0.8 中的 SimpleConsumer,0.9 中的 KafkaConsumer#assign(...))并行实例可更好地控制单个分区消耗.所以,本质上,Flink 中的group.id"设置Kafka 连接器仅用于将偏移量提交回 ZK/Kafka经纪人."
"Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the "group.id" setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers."
也许这可以为您澄清一些事情.
Maybe that clarifies things for you.
此外,还有一篇关于使用 Flink 和 Kafka 的博客文章可能会对您有所帮助(https://data-artisans.com/blog/kafka-flink-a-practical-how-to).
Also, there is a blog post about working with Flink and Kafka that may help you (https://data-artisans.com/blog/kafka-flink-a-practical-how-to).
这篇关于flink kafka 消费者组 ID 不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!