Kafka reactor - 如何禁用自动启动的 KAFKA 消费者? [英] Kafka reactor - How to disable KAFKA consumer being autostarted?

查看:113
本文介绍了Kafka reactor - 如何禁用自动启动的 KAFKA 消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是我的 KAFKA 消费者

Below is my KAFKA consumer

@Bean("kafkaConfluentInboundReceiver")
@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",
        matchIfMissing = false)
public KafkaReceiver<String, Object> kafkaInboundReceiver() {
    ReceiverOptions<String, Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
    receiverOptions.schedulerSupplier(() -> Schedulers
            .fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService", ExecutorService.class)));
    receiverOptions.maxCommitAttempts(kafkaProperties.getKafka().getCore().getMaxCommitAttempts());
    return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
            .subscription(Collections.singleton(
                    kafkaProperties.getKafka()
                            .getCore().getInbound().getConfluent()
                            .getTopicName()))
            .commitInterval(Duration.ZERO).commitBatchSize(0));
}

我的 KAFKA 消费者正在自动启动.但是我想禁用自动启动的 KAFKA 使用者.

My KAFKA consumer is getting started automatically. However I want to disable KAFKA consumer being autostarted.

我知道,在春季卡夫卡我们可以做这样的事情

I got to know that, In spring KAFKA we can do something like this

factory.setAutoStartup(start);

但是,我不确定如何在 Kafka reactor 中实现(控制自动启动/停止行为).我想要像下面这样的东西

however, I am not sure how I achieve(control auto start/stop behavior) in Kafka reactor. I want to have something like below

引入一个属性来处理自动启动/停止行为

Introducing a property to handle the auto start/stop behavior

@Value("${consumer.autostart:true}")
private boolean start;

使用上述属性,我应该能够在 Kafka reactor 中设置 KAFKA Auto-Start 标志,就像这样

using the above property I should be able to set the KAFKA Auto-Start flag in Kafka reactor, something like this

return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
        .subscription(Collections.singleton(
                kafkaProperties.getKafka()
                        .getCore().getInbound().getConfluent()
                        .getTopicName()))
        .commitInterval(Duration.ZERO).commitBatchSize(0)).setAutoStart(start);

注意:.setAutoStart(start);

这在 Kafka reactor 中是否可行,如果可以,我该怎么做?

Is this doable in Kafka reactor, if so, how do I do it?

更新:

protected void inboundEventHubListener(String topicName, List<String> allowedValues) {
    Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
    kafkaEventHubInboundReceiver
            .receive()
            .publishOn(scheduler)
            .groupBy(receiverRecord -> {
                try {
                    return receiverRecord.receiverOffset().topicPartition();
                } catch (Throwable throwable) {
                    log.error("exception in groupby", throwable);
                    return Flux.empty();
                }
            }).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
            .map(record -> {
                processMessage(record, topicName, allowedValues).block(
                        Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
                return record;
            }).concatMap(message -> {
                log.info("Received message after processing offset: {} partition: {} ",
                         message.offset(), message.partition());
                return message.receiverOffset()
                        .commit()
                        .onErrorContinue((t, o) -> log.error(
                                String.format("exception raised while commit offset %s", o), t)
                        );
            })).onErrorContinue((t, o) -> {
        try {
            if (null != o) {
                ReceiverRecord<String, Object> record = (ReceiverRecord<String, Object>) o;
                ReceiverOffset offset = record.receiverOffset();
                log.debug("failed to process message: {} partition: {} and message: {} ",
                          offset.offset(), record.partition(), record.value());
            }
            log.error(String.format("exception raised while processing message %s", o), t);
        } catch (Throwable inner) {
            log.error("encountered error in onErrorContinue", inner);
        }
    }).subscribeOn(scheduler).subscribe();

我可以做这样的事情吗?

Can I do something like this?

kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {
kafkaEventHubInboundReceiverObj.subscribe();
}

推荐答案

reactor-kafka 没有自动启动"的概念;您完全可以控制.

With reactor-kafka there is no concept of "auto start"; you are in complete control.

消费者没有开始"直到您订阅从 receiver.receive() 返回的 Flux.

The consumer is not "started" until you subscribe to the Flux returned from receiver.receive().

只需延迟 flux.subscribe() 直到您准备好使用数据.

Simply delay the flux.subscribe() until you are ready to consume data.

这篇关于Kafka reactor - 如何禁用自动启动的 KAFKA 消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆