如何在骆驼路线中手动确认/取消PubSub消息 [英] How to manually ack/nack a PubSub message in Camel Route
问题描述
我正在使用ackMode = NONE设置骆驼路线,这意味着不会自动进行确认.如何明确确认路由中的消息?
I am setting up a Camel Route with ackMode=NONE meaning acknowlegements are not done automatically. How do I explicitly acknowledge the message in the route?
在我的骆驼路线定义中,我已将ackMode设置为NONE.根据文档,我应该能够手动确认下游消息:
In my Camel Route definition I've set ackMode to NONE. According to the documentation, I should be able to manually acknowledge the message downstream:
"AUTO =交换在完成时被确认/不确认.NONE =下游进程必须明确确认/不确认"
"AUTO = exchange gets ack’ed/nack’ed on completion. NONE = downstream process has to ack/nack explicitly"
但是我不知道如何发送确认.
However I cannot figure out how to send the ack.
from("google-pubsub:<project>:<subscription>?concurrentConsumers=1&maxMessagesPerPoll=1&ackMode=NONE")
.bean("processingBean");
我的PubSub订阅的确认截止日期为10秒,因此由于ackMode = NONE,我的消息每10秒不断发送一次.这是预期的.但是,我无法找到一种方法来在处理完成后手动确认消息并停止重新发送.
My PubSub subscription has an acknowledgement deadline of 10 seconds and so my message keeps getting re-sent every 10 seconds due to ackMode=NONE. This is as expected. However I cannot find a way to manually acknowledge the message once processing is complete and stop the re-deliveries.
推荐答案
我能够深入研究Camel组件并弄清楚它是如何完成的.首先,我创建了一个GooglePubSubConnectionFactory
bean:
I was able to dig through the Camel components and figure out how it is done. First I created a GooglePubSubConnectionFactory
bean:
@Bean
public GooglePubsubConnectionFactory googlePubsubConnectionFactory() {
GooglePubsubConnectionFactory connectionFactory = new GooglePubsubConnectionFactory();
connectionFactory.setCredentialsFileLocation(pubsubKey);
return connectionFactory;
}
然后我能够从标题中引用消息的确认ID:
Then I was able to reference the ack id of the message from the header:
@Header(GooglePubsubConstants.ACK_ID) String ackId
然后我使用以下代码来确认该消息:
Then I used the following code to acknowledge the message:
List<String > ackIdList = new ArrayList<>();
ackIdList.add(ackId);
AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIdList);
Pubsub pubsub = googlePubsubConnectionFactory.getDefaultClient();
pubsub.projects().subscriptions().acknowledge("projects/<my project>/subscriptions/<my subscription>", ackRequest).execute();
这篇关于如何在骆驼路线中手动确认/取消PubSub消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!