flink kafka消费者groupId不起作用 [英] flink kafka consumer groupId not working
问题描述
我正在使用带有flink的kafka. 在一个简单的程序中,我使用了flinks FlinkKafkaConsumer09,并为其分配了组ID.
I am using kafka with flink. In a simple program, I used flinks FlinkKafkaConsumer09, assigned the group id to it.
根据Kafka的行为,当我在具有相同group.Id的同一主题上运行2个使用者时,它应该像消息队列一样工作.我认为它应该像这样工作: 如果将2条消息发送到Kafka,则flink程序中的每一个或其中一个flink程序将对这2条消息进行总共两次处理(假设总共输出了2行).
According to Kafka's behavior, when I run 2 consumers on the same topic with same group.Id, it should work like a message queue. I think it's supposed to work like: If 2 messages sent to Kafka, each or one of the flink program would process the 2 messages totally twice(let's say 2 lines of output in total).
但是实际结果是,每个程序将接收2条消息.
But the actual result is that, each program would receive 2 pieces of the messages.
我尝试使用kafka服务器下载随附的消费者客户端.它以记录的方式工作(处理了2条消息).
我试图在flink程序的相同Main函数中使用2个kafka使用者.共处理4条消息.
我还尝试运行2个flink实例,并为每个实例分配相同的kafka使用者程序. 4条消息.
I have tried to use consumer client that came with the kafka server download. It worked in the documented way(2 messages processed).
I tried to use 2 kafka consumers in the same Main function of a flink programe. 4 messages processed totally.
I also tried to run 2 instances of flink, and assigned each one of them the same program of kafka consumer. 4 messages.
有什么想法吗? 这是我期望的输出:
Any ideas? This is the output I expect:
1> Kafka and Flink2 says: element-65
2> Kafka and Flink1 says: element-66
这是我总是得到的错误输出:
Here's the wrong output i always get:
1> Kafka and Flink2 says: element-65
1> Kafka and Flink1 says: element-65
2> Kafka and Flink2 says: element-66
2> Kafka and Flink1 says: element-66
这是代码段:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink1 says: " + value;
}
}).print();
env.execute();
}
我尝试两次运行它,也以另一种方式运行它: 在Main函数中分别创建2个数据流和env.execute().
I have tried to run it twice and also in the other way: create 2 datastreams and env.execute() for each one in the Main function.
推荐答案
今天的Flink用户邮件列表中存在一个非常相似的问题,但是我找不到在此处发布该链接的链接.所以这是答案的一部分:
There was a quite similar question on the Flink user mailing list today, but I can't find the link to post it here. So here a part of the answer:
"内部,Flink Kafka连接器不使用使用者组 管理功能,因为他们使用的是较低级别的API (在0.8中为SimpleConsumer,在0.9中为KafkaConsumer#assign(…)) 并行实例,可对单个分区进行更多控制 消耗.因此,从本质上讲,Flink中的"group.id"设置 Kafka连接器仅用于将偏移量提交回ZK/Kafka 经纪人."
"Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the "group.id" setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers."
也许这可以为您澄清一些事情.
Maybe that clarifies things for you.
此外,还有一篇有关使用Flink和Kafka的博客文章,可能会对您有所帮助( https://data-artisans.com/blog/kafka-flink-a-practical-how-to ).
Also, there is a blog post about working with Flink and Kafka that may help you (https://data-artisans.com/blog/kafka-flink-a-practical-how-to).
这篇关于flink kafka消费者groupId不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!