如何在骆驼路线中手动确认/取消PubSub消息 [英] How to manually ack/nack a PubSub message in Camel Route

查看:99
本文介绍了如何在骆驼路线中手动确认/取消PubSub消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用ackMode = NONE设置骆驼路线,这意味着不会自动进行确认.如何明确确认路由中的消息?

I am setting up a Camel Route with ackMode=NONE meaning acknowlegements are not done automatically. How do I explicitly acknowledge the message in the route?

在我的骆驼路线定义中,我已将ackMode设置为NONE.根据文档,我应该能够手动确认下游消息:

In my Camel Route definition I've set ackMode to NONE. According to the documentation, I should be able to manually acknowledge the message downstream:

"AUTO =交换在完成时被确认/不确认.NONE =下游进程必须明确确认/不确认"

"AUTO = exchange gets ack’ed/nack’ed on completion. NONE = downstream process has to ack/nack explicitly"

但是我不知道如何发送确认.

However I cannot figure out how to send the ack.

from("google-pubsub:<project>:<subscription>?concurrentConsumers=1&maxMessagesPerPoll=1&ackMode=NONE")
                .bean("processingBean");

我的PubSub订阅的确认截止日期为10秒,因此由于ackMode = NONE,我的消息每10秒不断发送一次.这是预期的.但是,我无法找到一种方法来在处理完成后手动确认消息并停止重新发送.

My PubSub subscription has an acknowledgement deadline of 10 seconds and so my message keeps getting re-sent every 10 seconds due to ackMode=NONE. This is as expected. However I cannot find a way to manually acknowledge the message once processing is complete and stop the re-deliveries.

推荐答案

我能够深入研究Camel组件并弄清楚它是如何完成的.首先,我创建了一个GooglePubSubConnectionFactory bean:

I was able to dig through the Camel components and figure out how it is done. First I created a GooglePubSubConnectionFactory bean:

@Bean
    public GooglePubsubConnectionFactory googlePubsubConnectionFactory() {
        GooglePubsubConnectionFactory connectionFactory = new GooglePubsubConnectionFactory();
        connectionFactory.setCredentialsFileLocation(pubsubKey);
        return connectionFactory;
    }

然后我能够从标题中引用消息的确认ID:

Then I was able to reference the ack id of the message from the header:

@Header(GooglePubsubConstants.ACK_ID) String ackId

然后我使用以下代码来确认该消息:

Then I used the following code to acknowledge the message:

List<String > ackIdList = new ArrayList<>();
        ackIdList.add(ackId);
        AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIdList);
        Pubsub pubsub = googlePubsubConnectionFactory.getDefaultClient();
        pubsub.projects().subscriptions().acknowledge("projects/<my project>/subscriptions/<my subscription>", ackRequest).execute();

这篇关于如何在骆驼路线中手动确认/取消PubSub消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆