flink kafka消费者groupId不起作用 [英] flink kafka consumer groupId not working

查看:2351
本文介绍了flink kafka消费者groupId不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用带有flink的kafka. 在一个简单的程序中,我使用了flinks FlinkKafkaConsumer09,并为其分配了组ID.

I am using kafka with flink. In a simple program, I used flinks FlinkKafkaConsumer09, assigned the group id to it.

根据Kafka的行为,当我在具有相同group.Id的同一主题上运行2个使用者时,它应该像消息队列一样工作.我认为它应该像这样工作: 如果将2条消息发送到Kafka,则flink程序中的每一个或其中一个flink程序将对这2条消息进行总共两次处理(假设总共输出了2行).

According to Kafka's behavior, when I run 2 consumers on the same topic with same group.Id, it should work like a message queue. I think it's supposed to work like: If 2 messages sent to Kafka, each or one of the flink program would process the 2 messages totally twice(let's say 2 lines of output in total).

但是实际结果是,每个程序将接收2条消息.

But the actual result is that, each program would receive 2 pieces of the messages.

我尝试使用kafka服务器下载随附的消费者客户端.它以记录的方式工作(处理了2条消息).
我试图在flink程序的相同Main函数中使用2个kafka使用者.共处理4条消息.
我还尝试运行2个flink实例,并为每个实例分配相同的kafka使用者程序. 4条消息.

I have tried to use consumer client that came with the kafka server download. It worked in the documented way(2 messages processed).
I tried to use 2 kafka consumers in the same Main function of a flink programe. 4 messages processed totally.
I also tried to run 2 instances of flink, and assigned each one of them the same program of kafka consumer. 4 messages.

有什么想法吗? 这是我期望的输出:

Any ideas? This is the output I expect:

1> Kafka and Flink2 says: element-65  
2> Kafka and Flink1 says: element-66 

这是我总是得到的错误输出:

Here's the wrong output i always get:

1> Kafka and Flink2 says: element-65  
1> Kafka and Flink1 says: element-65  
2> Kafka and Flink2 says: element-66  
2> Kafka and Flink1 says: element-66 

这是代码段:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Kafka and Flink1 says: " + value;
        }
    }).print();


    env.execute();
}

我尝试两次运行它,也以另一种方式运行它: 在Main函数中分别创建2个数据流和env.execute().

I have tried to run it twice and also in the other way: create 2 datastreams and env.execute() for each one in the Main function.

推荐答案

今天的Flink用户邮件列表中存在一个非常相似的问题,但是我找不到在此处发布该链接的链接.所以这是答案的一部分:

There was a quite similar question on the Flink user mailing list today, but I can't find the link to post it here. So here a part of the answer:

"内部,Flink Kafka连接器不使用使用者组 管理功能,因为他们使用的是较低级别的API (在0.8中为SimpleConsumer,在0.9中为KafkaConsumer#assign(…)) 并行实例,可对单个分区进行更多控制 消耗.因此,从本质上讲,Flink中的"group.id"设置 Kafka连接器仅用于将偏移量提交回ZK/Kafka 经纪人."

"Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the "group.id" setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers."

也许这可以为您澄清一些事情.

Maybe that clarifies things for you.

此外,还有一篇有关使用Flink和Kafka的博客文章,可能会对您有所帮助( https://data-artisans.com/blog/kafka-flink-a-practical-how-to ).

Also, there is a blog post about working with Flink and Kafka that may help you (https://data-artisans.com/blog/kafka-flink-a-practical-how-to).

这篇关于flink kafka消费者groupId不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆