Spring-Kafka并发属性 [英] Spring-Kafka Concurrency Property

查看:635
本文介绍了Spring-Kafka并发属性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spring-Kafka编写我的第一个Kafka Consumer.看了框架提供的不同选项,并且对此几乎没有疑问.有人可以在下面澄清一下,如果您已经做过.

I am progressing on writing my first Kafka Consumer by using Spring-Kafka. Had a look at the different options provided by framework, and have few doubts on the same. Can someone please clarify below if you have already worked on it.

问题-1 :根据Spring-Kafka文档,有两种实现Kafka-Consumer的方法. 您可以通过配置MessageListenerContainer并提供消息侦听器或使用@KafkaListener批注来接收消息".有人可以告诉我何时应该选择一个选项而不是另一个选项吗?

Question - 1 : As per Spring-Kafka documentation, there are 2 ways to implement Kafka-Consumer; "You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation". Can someone tell when should I choose one option over another ?

问题-2 :我选择了KafkaListener方法来编写我的应用程序.为此,我需要初始化一个容器工厂实例,并且在容器工厂内部有控制并发的选项.只是想仔细检查一下我对并发的理解是否正确.

Question - 2 : I have chosen KafkaListener approach for writing my application. For this I need to initialize a container factory instance and inside container factory there is option to control concurrency. Just want to double check if my understanding about concurrency is correct or not.

假设我有一个主题名称MyTopic,其中有4个分区.为了使用来自MyTopic的消息,我启动了我的应用程序的2个实例,并通过将并发设置为2来启动这些实例.因此,理想情况下,按照kafka分配策略,应将2个分区分配给consumer1,将另外2个分区分配给Consumer2 .由于并发设置为2,是否每个使用者都将启动2个线程,并并行使用主题中的数据?如果我们并行消费,我们还应该考虑什么.

Suppose, I have a topic name MyTopic which has 4 partitions in it. And to consume messages from MyTopic, I've started 2 instances of my application and these instances are started by setting concurrency as 2. So, Ideally as per kafka assignment strategy, 2 partitions should go to consumer1 and 2 other partitions should go to consumer2. Since the concurrency is set as 2, does each of the consumer will start 2 threads, and will consume data from the topics in parallel ? Also should we consider anything if we are consuming in parallel.

问题3 -我选择了手动确认模式,而不是在外部管理偏移量(不将偏移量持久保存到任何数据库/文件系统中).因此,我是否需要编写自定义代码来处理重新平衡,否则框架将自动对其进行管理?我认为没有,因为我只有在处理完所有记录后才确认.

Question 3 - I have chosen manual ack mode, and not managing the offsets externally (not persisting it to any database/filesystem). So should I need to write custom code to handle rebalance, or framework will manage it automatically ? I think no as I am acknowledging only after processing all the records.

问题-4 :此外,在手动ACK"模式下,哪个Listener可以提供更高的性能? BATCH消息侦听器或常规消息侦听器.我想如果我使用Normal Message侦听器,则在处理每条消息后将提交偏移量.

Question - 4 : Also, with Manual ACK mode, which Listener will give more performance? BATCH Message Listener or normal Message Listener. I guess if I use Normal Message listener, the offsets will be committed after processing each of the messages.

粘贴下面的代码以供参考.

Pasted the code below for your reference.

批确认用户:

    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
          Consumer<?, ?> consumer) {
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("Record : " + record.value());
          // Process the message here..
          listener.addOffset(record.topic(), record.partition(), record.offset());
       }
       acknowledgment.acknowledge();
    }

初始化容器工厂:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> configs = new HashMap<String, Object>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
    configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return configs;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    // Not sure about the impact of this property, so going with 1
    factory.setConcurrency(2);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
    return factory;
}

推荐答案

  1. @KafkaListener是消息驱动的"POJO",它添加了诸如有效负载转换,参数匹配等内容.如果实现MessageListener,则只能从Kafka获取原始的ConsumerRecord.参见 @KafkaListener注释.

  1. @KafkaListener is a message-driven "POJO" it adds stuff like payload conversion, argument matching, etc. If you implement MessageListener you can only get the raw ConsumerRecord from Kafka. See @KafkaListener Annotation.

是的,并发表示线程数;每个线程创建一个Consumer;它们并行运行;在您的示例中,每个分区都有2个分区.

Yes, the concurrency represents the number of threads; each thread creates a Consumer; they run in parallel; in your example, each would get 2 partitions.

如果我们并行消费,我们还应该考虑什么.

Also should we consider anything if we are consuming in parallel.

您的侦听器必须是线程安全的(没有共享状态或任何此类状态都需要通过锁进行保护.

Your listener must be thread-safe (no shared state or any such state needs to be protected by locks.

  1. 不清楚句柄重新平衡事件"的含义.当发生重新平衡时,框架将提交所有待处理的偏移量.

  1. It's not clear what you mean by "handle rebalance events". When a rebalance occurs, the framework will commit any pending offsets.

没有什么区别;消息监听器批处理侦听器只是一个首选项.即使使用MANUAL ackmode的消息侦听器,也要在处理完所有轮询结果后才提交偏移量.在MANUAL_IMMEDIATE模式下,偏移量是一对一提交的.

It doesn't make a difference; message listener Vs. batch listener is just a preference. Even with a message listener, with MANUAL ackmode, the offsets are committed when all the results from the poll have been processed. With MANUAL_IMMEDIATE mode, the offsets are committed one-by-one.

这篇关于Spring-Kafka并发属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆