如何使用 JmsTemplate 提供手动确认并从 Rabbitmq 队列中删除消息 [英] How to Give manual Acknowledge using JmsTemplate and delete message from Rabbitmq queue
问题描述
我正在使用带有 jmsTemplate 的 RabbitMq(带 JMS)我能够从 RabbitMq 队列中使用消息,但它需要自动确认.
我有它的搜索 API,但无法找到它.
如何设置手动确认.
在下面的代码中,当从队列中使用消息时,我想使用该消息调用 Web 服务,并取决于来自我想从队列中删除该消息的响应.我创建了一个项目,其中我正在使用侦听器和其他项目调用从队列中读取消息
第一个项目:
包com.es.jms.listener;导入 javax.jms.ConnectionFactory;导入 javax.jms.JMSException;导入 javax.jms.Message;导入 javax.jms.MessageListener;导入 javax.jms.TextMessage;导入 org.springframework.context.annotation.Bean;导入 org.springframework.context.annotation.Configuration;导入 org.springframework.jms.listener.MessageListenerContainer;导入 org.springframework.jms.listener.SimpleMessageListenerContainer;导入 com.rabbitmq.jms.admin.RMQConnectionFactory;@配置公共类 RabbitMqMessageListener {@豆角,扁豆公共连接工厂 jmsConnectionFactory() {RMQConnectionFactory connectionFactory = new RMQConnectionFactory();connectionFactory.setUsername("用户名");connectionFactory.setPassword("密码");connectionFactory.setVirtualHost("vhostname");connectionFactory.setHost("主机名");返回连接工厂;}@豆角,扁豆公共消息侦听器 msgListener() {返回新的 MessageListener() {公共无效onMessage(消息消息){System.out.println(message.toString());if (message instanceof TextMessage) {尝试 {String msg = ((TextMessage) message).getText();System.out.println("收到消息:" + msg);//这里调用web service,依赖web service//回复//如果 200 则从队列中删除味精,否则保留味精//队列} 捕捉(JMSException ex){抛出新的 RuntimeException(ex);}}}};}@豆角,扁豆公共 MessageListenerContainer messageListenerContainer() {SimpleMessageListenerContainer 容器 = new SimpleMessageListenerContainer();container.setConnectionFactory(jmsConnectionFactory());container.setDestinationName("test");container.setMessageListener(msgListener());返回容器;}}
第二个项目:
包com.rabbitmq.jms.consumer.controller;导入 java.util.concurrent.ExecutionException;导入 java.util.concurrent.TimeoutException;导入 javax.jms.ConnectionFactory;导入 org.json.JSONException;导入 org.json.JSONObject;导入 org.springframework.beans.factory.annotation.Autowired;导入 org.springframework.context.annotation.Bean;导入 org.springframework.http.HttpStatus;导入 org.springframework.http.ResponseEntity;导入 org.springframework.jms.JmsException;导入 org.springframework.jms.core.JmsTemplate;导入 org.springframework.stereotype.Controller;导入 org.springframework.web.bind.annotation.CrossOrigin;导入 org.springframework.web.bind.annotation.RequestMapping;导入 org.springframework.web.bind.annotation.RequestMethod;导入 org.springframework.web.bind.annotation.ResponseBody;导入 com.rabbitmq.jms.admin.RMQConnectionFactory;导入 redis.clients.jedis.Jedis;@控制器公共类接收控制器 {@自动连线jms模板 jms模板;@豆角,扁豆公共连接工厂 jmsConnectionFactory() {RMQConnectionFactory connectionFactory = new RMQConnectionFactory();connectionFactory.setUsername("用户名");connectionFactory.setPassword("密码");connectionFactory.setVirtualHost("vhostname");connectionFactory.setHost("主机名");返回连接工厂;}@CrossOrigin@SuppressWarnings({ "未选中", "rawtypes" })@RequestMapping(method = RequestMethod.GET, value = "/getdata")@ResponseBody公共响应实体<字符串>fecthDataFromRedis()抛出 JSONException、InterruptedException、JmsException、ExecutionException、TimeoutException {System.out.println("在控制器中");jmsTemplate.setReceiveTimeout(500L);//jms模板.String message = (String) jmsTemplate.receiveAndConvert("test");//这里调用web service,依赖web service//回复//如果 200 则从队列中删除味精,否则保留味精//队列System.out.println(消息);}返回新的响应实体(消息,HttpStatus.OK);}}
我该怎么做?
提前致谢.
你不是在使用 JmsTemplate
,你是在使用 SimpleMessageListenerContainer
来接收消息.p>
如果您正在使用模板,则必须将 execute
方法与 SessionCallback
一起使用,因为确认必须发生在范围内收到消息的会话.
但是,使用 SimpleMessageListenerContainer
,您只需将 sessionAcknowledgeMode
设置为 Session.CLIENT_ACKNOWLEDGE
.查看容器 javadocs...
/*** 使用普通 JMS 客户端 API 的消息侦听器容器* {@code MessageConsumer.setMessageListener()} 方法* 为指定的侦听器创建并发 MessageConsumers.** <p>这是消息侦听器容器的最简单形式.* 它创建固定数量的 JMS 会话来调用侦听器,* 不允许动态适应运行时需求.它的主要* 优点是其复杂程度低且要求最低* 关于 JMS 提供者:甚至不需要 ServerSessionPool 工具.** <p>详情请参阅 {@link AbstractMessageListenerContainer} javadoc* 关于确认模式和交易选项.注意这个容器* 公开默认AUTO_ACKNOWLEDGE"模式的标准 JMS 行为:* 即监听器执行后自动消息确认,* 在抛出用户异常但可能发生的情况下不重新传递* 在侦听器执行期间 JVM 死机的情况下重新交付.** <p>对于不同风格的 MessageListener 处理,通过循环* {@code MessageConsumer.receive()} 调用也允许* 消息的事务接收(使用 XA 事务注册它们),* 见 {@link DefaultMessageListenerContainer}....
编辑
使用 JmsTemplate
时,您必须在会话范围内完成工作 - 方法如下...
首先,您必须在模板中启用客户端确认...
this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
然后,使用 execute
方法和 SessionCallback
...
布尔结果 = this.jmsTemplate.execute(session -> {MessageConsumer 消费者 = session.createConsumer(this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, "bar", false));字符串结果 = null;尝试 {收到的消息 = consumer.receive(5000);如果(收到!= null){结果 = (String) this.jmsTemplate.getMessageConverter().fromMessage(received);//在这里做一些事情.收到.确认();返回真;}}捕获(异常 e){返回假;}最后 {消费者.close();}}, 真的);
I am using RabbitMq(with JMS) with jmsTemplate I am able to Consume Message from RabbitMq Queue But it is taking acknowledgment AUTO.
I have Search API for it but not able to find it out.
How can I set manual acknowledgment.
In Below code when Message is consumed from queue I want to call web service with that message and depends on that response from from I want to delete that message from queue. I have created one project in which I am using Listener and other project with call to read message from queue
first Project:
package com.es.jms.listener;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
@Configuration
public class RabbitMqMessageListener {
@Bean
public ConnectionFactory jmsConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("Username");
connectionFactory.setPassword("Password");
connectionFactory.setVirtualHost("vhostname");
connectionFactory.setHost("hostname");
return connectionFactory;
}
@Bean
public MessageListener msgListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println(message.toString());
if (message instanceof TextMessage) {
try {
String msg = ((TextMessage) message).getText();
System.out.println("Received message: " + msg);
// call web service here and depends on web service
// response
// if 200 then delete msg from queue else keep msg in
// queue
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
}
};
}
@Bean
public MessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(jmsConnectionFactory());
container.setDestinationName("test");
container.setMessageListener(msgListener());
return container;
}
}
2nd Project:
package com.rabbitmq.jms.consumer.controller;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.jms.ConnectionFactory;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
import redis.clients.jedis.Jedis;
@Controller
public class ReceiverController {
@Autowired
JmsTemplate jmsTemplate;
@Bean
public ConnectionFactory jmsConnectionFactory() {
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("Username");
connectionFactory.setPassword("Password");
connectionFactory.setVirtualHost("vhostname");
connectionFactory.setHost("hostname");
return connectionFactory;
}
@CrossOrigin
@SuppressWarnings({ "unchecked", "rawtypes" })
@RequestMapping(method = RequestMethod.GET, value = "/getdata")
@ResponseBody
public ResponseEntity<String> fecthDataFromRedis()
throws JSONException, InterruptedException, JmsException, ExecutionException, TimeoutException {
System.out.println("in controller");
jmsTemplate.setReceiveTimeout(500L);
// jmsTemplate.
String message = (String) jmsTemplate.receiveAndConvert("test");
// call web service here and depends on web service
// response
// if 200 then delete msg from queue else keep msg in
// queue
System.out.println(message);
}
return new ResponseEntity(message , HttpStatus.OK);
}
}
How Can I do That?
Thanks In Advance.
You are not using a JmsTemplate
, you are using a SimpleMessageListenerContainer
to receive the message.
If you were using the template, you would have to use the execute
method with a SessionCallback
since the acknowledgement must occur within the scope of the session within which the message was received.
However, with the SimpleMessageListenerContainer
, you simply set the sessionAcknowledgeMode
to Session.CLIENT_ACKNOWLEDGE
. See the container javadocs...
/**
* Message listener container that uses the plain JMS client API's
* {@code MessageConsumer.setMessageListener()} method to
* create concurrent MessageConsumers for the specified listeners.
*
* <p>This is the simplest form of a message listener container.
* It creates a fixed number of JMS Sessions to invoke the listener,
* not allowing for dynamic adaptation to runtime demands. Its main
* advantage is its low level of complexity and the minimum requirements
* on the JMS provider: Not even the ServerSessionPool facility is required.
*
* <p>See the {@link AbstractMessageListenerContainer} javadoc for details
* on acknowledge modes and transaction options. Note that this container
* exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode:
* that is, automatic message acknowledgment after listener execution,
* with no redelivery in case of a user exception thrown but potential
* redelivery in case of the JVM dying during listener execution.
*
* <p>For a different style of MessageListener handling, through looped
* {@code MessageConsumer.receive()} calls that also allow for
* transactional reception of messages (registering them with XA transactions),
* see {@link DefaultMessageListenerContainer}.
...
EDIT
When using the JmsTemplate
, you must do your work within the scope of the session - here's how...
First, you have to enable client acknowledge in your template...
this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
Then, use the execute
method with a SessionCallback
...
Boolean result = this.jmsTemplate.execute(session -> {
MessageConsumer consumer = session.createConsumer(
this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, "bar", false));
String result = null;
try {
Message received = consumer.receive(5000);
if (received != null) {
result = (String) this.jmsTemplate.getMessageConverter().fromMessage(received);
// Do some stuff here.
received.acknowledge();
return true;
}
}
catch (Exception e) {
return false;
}
finally {
consumer.close();
}
}, true);
这篇关于如何使用 JmsTemplate 提供手动确认并从 Rabbitmq 队列中删除消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!