flink kafka 消费者组 ID 不起作用 [英] flink kafka consumer groupId not working

查看:183
本文介绍了flink kafka 消费者组 ID 不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用带有 flink 的 kafka.在一个简单的程序中,我使用了flinks FlinkKafkaConsumer09,给它分配了组id.

I am using kafka with flink. In a simple program, I used flinks FlinkKafkaConsumer09, assigned the group id to it.

根据 Kafka 的行为,当我使用相同的 group.Id 在同一个主题上运行 2 个消费者时,它应该像消息队列一样工作.我认为它应该像这样工作:如果向 Kafka 发送 2 条消息,则每个或其中一个 flink 程序将总共处理 2 条消息两次(假设总共输出 2 行).

According to Kafka's behavior, when I run 2 consumers on the same topic with same group.Id, it should work like a message queue. I think it's supposed to work like: If 2 messages sent to Kafka, each or one of the flink program would process the 2 messages totally twice(let's say 2 lines of output in total).

但实际的结果是,每个程序都会收到2条消息.

But the actual result is that, each program would receive 2 pieces of the messages.

我尝试使用 kafka 服务器下载附带的消费者客户端.它以记录在案的方式工作(处理了 2 条消息).
我尝试在 flink 程序的同一个 Main 函数中使用 2 个 kafka 消费者.总共处理了 4 条消息.
我还尝试运行了 2 个 flink 实例,并为每个实例分配了相同的 kafka 消费者程序.4 条消息.

I have tried to use consumer client that came with the kafka server download. It worked in the documented way(2 messages processed).
I tried to use 2 kafka consumers in the same Main function of a flink programe. 4 messages processed totally.
I also tried to run 2 instances of flink, and assigned each one of them the same program of kafka consumer. 4 messages.

有什么想法吗?这是我期望的输出:

Any ideas? This is the output I expect:

1> Kafka and Flink2 says: element-65  
2> Kafka and Flink1 says: element-66 

这是我总是得到的错误输出:

Here's the wrong output i always get:

1> Kafka and Flink2 says: element-65  
1> Kafka and Flink1 says: element-65  
2> Kafka and Flink2 says: element-66  
2> Kafka and Flink1 says: element-66 

这是代码段:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Kafka and Flink1 says: " + value;
        }
    }).print();


    env.execute();
}

我试过两次运行它,也以另一种方式运行:在 Main 函数中为每个数据流创建 2 个数据流和 env.execute().

I have tried to run it twice and also in the other way: create 2 datastreams and env.execute() for each one in the Main function.

推荐答案

今天在 Flink 用户邮件列表上有一个非常相似的问题,但我找不到在这里发布的链接.所以这里是答案的一部分:

There was a quite similar question on the Flink user mailing list today, but I can't find the link to post it here. So here a part of the answer:

"在内部,Flink Kafka 连接器不使用消费者组管理功能,因为他们使用较低级别的 API(0.8 中的 SimpleConsumer,0.9 中的 KafkaConsumer#assign(...))并行实例可更好地控制单个分区消耗.所以,本质上,Flink 中的group.id"设置Kafka 连接器仅用于将偏移量提交回 ZK/Kafka经纪人."

"Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the "group.id" setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers."

也许这可以为您澄清一些事情.

Maybe that clarifies things for you.

此外,还有一篇关于使用 Flink 和 Kafka 的博客文章可能会对您有所帮助(https://data-artisans.com/blog/kafka-flink-a-practical-how-to).

Also, there is a blog post about working with Flink and Kafka that may help you (https://data-artisans.com/blog/kafka-flink-a-practical-how-to).

这篇关于flink kafka 消费者组 ID 不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆