如何从一开始就使用Kafka Consumer API读取数据? [英] How to read data using Kafka Consumer API from beginning?

查看:160
本文介绍了如何从一开始就使用Kafka Consumer API读取数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请问有谁能告诉我如何从每次运行使用者jar时开始就使用Kafka Consumer API阅读消息.

Please can anyone tell me how to read messages using the Kafka Consumer API from the beginning every time when I run the consumer jar.

推荐答案

这适用于0.9.x使用者.基本上,在创建使用者时,需要使用属性ConsumerConfig.GROUP_ID_CONFIG将此使用者组标识分配给该使用者.每次启动使用者执行类似properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());之类的操作时,随机生成使用者组ID(属性是java.util.Properties的实例,您将传递给构造函数new KafkaConsumer(properties)).

This works with the 0.9.x consumer. Basically when you create a consumer, you need to assign a consumer group id to this consumer using the property ConsumerConfig.GROUP_ID_CONFIG. Generate the consumer group id randomly every time you start the consumer doing something like this properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); (properties is an instance of java.util.Properties that you will pass to the constructor new KafkaConsumer(properties)).

随机生成客户端意味着新的消费者组在kafka中没有与之关联的任何偏移量.因此,在此之后,我们要做的就是为这种情况设置策略.正如auto.offset.reset属性的文档所述:

Generating the client randomly means that the new consumer group doesn't have any offset associated to it in kafka. So what we have to do after this is to set a policy for this scenario. As the documentation for the auto.offset.reset property says:

当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如,由于该数据已被删除),该怎么办:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • 最早:将偏移量自动重置为最早的偏移量
  • 最新:自动将偏移量重置为最新的偏移量
  • 无:如果未找到先前的偏移量或消费者的组,则向消费者抛出异常
  • 其他:向消费者抛出异常.
  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found or the consumer's group
  • anything else: throw exception to the consumer.

因此,从上面列出的选项中,我们需要选择earliest策略,以便新的消费者组每次都从头开始.

So from the options above listed we need to choose the earliest policy so the new consumer group starts from the beginning every time.

您在Java中的代码如下所示:

Your code in java, will look something like this:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);

您现在需要弄清楚的唯一事情是,当有多个消费者属于同一个消费者组,但被分配了多个消费者时,如何生成一个随机id并在这些实例之间分配它,以便它们都属于同一个消费者团体.

The only thing that you need to figure it out now, is when having multiple consumers that belong to the same consumer group but are distributed how to generate a random id and distribute it between those instances so they all belong to the same consumer group.

希望有帮助!

这篇关于如何从一开始就使用Kafka Consumer API读取数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆