Spring @KafkaListener在一定间隔后执行并轮询记录 [英] Spring @KafkaListener execute and poll records after certain interval

查看:1590
本文介绍了Spring @KafkaListener在一定间隔后执行并轮询记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们想在一定间隔(例如每5分钟)后使用记录. 消费者属性是标准的:

We wanted to consume the records after a certain interval (e.g. every 5 minutes). Consumer properties are standard:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(300000);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}

即使我更改属性setPollTimeout,它在定义的间隔(5分钟)后也不会轮询,但在30秒后会连续轮询,这是我的日志:

Even though when i change the property setPollTimeout it doesnot poll after defined interval (5 minutes), it continuously polls after 30 seconds, here are my logs:

2018-01-23 18:07:26.875 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 2

2018-01-23 18:07:56.901 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 4

我们试图用窗口聚合构建一个kafka流应用程序,并计划在y间隔后使用窗口x.

We were trying to build a kafka stream application with windowed aggregations and planning to consume the window x after y interval.

我看到在类中:KafkaMessageListenerContainersetConsumerTaskExecutor被设置:

I can see that in the class: KafkaMessageListenerContainer, setConsumerTaskExecutor is set:

if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }

但是,当该(频率)线程池轮询记录时,我们如何配置.任何帮助表示赞赏.

But how do we configure when this (frequency) thread pool polls records. Any help appreciated.

推荐答案

您无法控制使用者的轮询速率,pollTimeout是poll()等待新记录到达的时间.如果新记录到达的频率更高,它将不会等待那么长时间.

You cannot control the rate at which the consumer polls, the pollTimeout is how long the poll() will wait for new records to arrive. If new records arrive more often, it will not wait that long.

如果希望控制接收记录的速率,只需使用DefaultKafkaConsumerFactory创建使用者并在需要时对其进行轮询.

If you wish to control the rate at which you receive records, simply use the DefaultKafkaConsumerFactory to create a consumer and poll it whenever you want.

但是您不能将它与@KafkaListener一起使用-您必须自己处理记录.

You can't use that with a @KafkaListener though - you have to deal with the record yourself.

这篇关于Spring @KafkaListener在一定间隔后执行并轮询记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆