如何在AUTO_ACKNOWLEDGE JMS会话方案中模拟消息重新交付? [英] How to simulate message redelivery in AUTO_ACKNOWLEDGE JMS Session Scenario?
问题描述
在以下测试中,我试图模拟以下情况:
In the following test I'm trying to simulate the following scenario:
- 消息队列已启动.
- 启动了一个在消息处理过程中失败的使用者.
- 产生一条消息.
- 消费者开始处理该消息.
- 在处理过程中会引发异常,以模拟消息处理失败.失败的使用者已停止.
- 另一个消费者开始接听重新传递的消息.
但是我的测试失败,并且消息没有重新发送给新使用者.我将对此表示感谢.
But my test fails and the message is not redelivered to the new consumer. I'll appreciate any hints on this.
MessageProcessingFailureAndReprocessingTest.java
@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests {
@Autowired
private FailureReprocessTestScenario testScenario;
@Before
public void setUp() {
testScenario.start();
}
@After
public void tearDown() throws Exception {
testScenario.stop();
}
@Test public void
should_reprocess_task_after_processing_failure() {
try {
Thread.sleep(20*1000);
assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
"task-1",
})));
} catch (InterruptedException e) {
fail();
}
}
@Configurable
public static class FailureReprocessTestScenario {
@Autowired
public BrokerService broker;
@Autowired
public MockTaskProducer mockTaskProducer;
@Autowired
public FailingWorker failingWorker;
@Autowired
public SucceedingWorker succeedingWorker;
@Autowired
public TaskScheduler scheduler;
public void start() {
Date now = new Date();
scheduler.schedule(new Runnable() {
public void run() { failingWorker.start(); }
}, now);
Date after1Seconds = new Date(now.getTime() + 1*1000);
scheduler.schedule(new Runnable() {
public void run() { mockTaskProducer.produceTask(); }
}, after1Seconds);
Date after2Seconds = new Date(now.getTime() + 2*1000);
scheduler.schedule(new Runnable() {
public void run() {
failingWorker.stop();
succeedingWorker.start();
}
}, after2Seconds);
}
public void stop() throws Exception {
succeedingWorker.stop();
broker.stop();
}
}
@Configuration
@ImportResource(value={"classpath:applicationContext-jms.xml",
"classpath:applicationContext-task.xml"})
public static class ContextConfig {
@Autowired
private ConnectionFactory jmsFactory;
@Bean
public FailureReprocessTestScenario testScenario() {
return new FailureReprocessTestScenario();
}
@Bean
public MockTaskProducer mockTaskProducer() {
return new MockTaskProducer();
}
@Bean
public FailingWorker failingWorker() {
TaskListener listener = new TaskListener();
FailingWorker worker = new FailingWorker(listenerContainer(listener));
listener.setProcessor(worker);
return worker;
}
@Bean
public SucceedingWorker succeedingWorker() {
TaskListener listener = new TaskListener();
SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
listener.setProcessor(worker);
return worker;
}
private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
listenerContainer.setConnectionFactory(jmsFactory);
listenerContainer.setDestinationName("tasksQueue");
listenerContainer.setMessageListener(listener);
listenerContainer.setAutoStartup(false);
listenerContainer.initialize();
return listenerContainer;
}
}
public static class FailingWorker implements TaskProcessor {
private Logger LOG = Logger.getLogger(FailingWorker.class.getName());
private final DefaultMessageListenerContainer listenerContainer;
public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
this.listenerContainer = listenerContainer;
}
public void start() {
LOG.info("FailingWorker.start()");
listenerContainer.start();
}
public void stop() {
LOG.info("FailingWorker.stop()");
listenerContainer.stop();
}
@Override
public void processTask(Object task) {
LOG.info("FailingWorker.processTask(" + task + ")");
try {
Thread.sleep(1*1000);
throw Throwables.propagate(new Exception("Simulate task processing failure"));
} catch (InterruptedException e) {
LOG.log(Level.SEVERE, "Unexpected interruption exception");
}
}
}
public static class SucceedingWorker implements TaskProcessor {
private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());
private final DefaultMessageListenerContainer listenerContainer;
public final List<String> processedTasks;
public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
this.listenerContainer = listenerContainer;
this.processedTasks = new ArrayList<String>();
}
public void start() {
LOG.info("SucceedingWorker.start()");
listenerContainer.start();
}
public void stop() {
LOG.info("SucceedingWorker.stop()");
listenerContainer.stop();
}
@Override
public void processTask(Object task) {
LOG.info("SucceedingWorker.processTask(" + task + ")");
try {
TextMessage taskText = (TextMessage) task;
processedTasks.add(taskText.getText());
} catch (JMSException e) {
LOG.log(Level.SEVERE, "Unexpected exception during task processing");
}
}
}
}
TaskListener.java
public class TaskListener implements MessageListener {
private TaskProcessor processor;
@Override
public void onMessage(Message message) {
processor.processTask(message);
}
public void setProcessor(TaskProcessor processor) {
this.processor = processor;
}
}
MockTaskProducer.java
@Configurable
public class MockTaskProducer implements ApplicationContextAware {
private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());
@Autowired
private JmsTemplate jmsTemplate;
private Destination destination;
private int taskCounter = 0;
public void produceTask() {
LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");
taskCounter++;
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("task-" + taskCounter);
return message;
}
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
destination = applicationContext.getBean("tasksQueue", Destination.class);
}
}
推荐答案
Apparently the source of documentation I was looking yesterday Creating Robust JMS Applications mislead me in a way (or I might have understood it incorrectly). Especially that excerpt:
在确认JMS消息之前,不认为它是 成功消费.成功消费一条消息 通常分为三个阶段.
Until a JMS message has been acknowledged, it is not considered to be successfully consumed. The successful consumption of a message ordinarily takes place in three stages.
- 客户端收到消息.
- 客户端处理消息.
- 该消息已确认.确认是由JMS提供者还是由客户端发起的,具体取决于会话 确认模式.
- The client receives the message.
- The client processes the message.
- The message is acknowledged. Acknowledgment is initiated either by the JMS provider or by the client, depending on the session acknowledgment mode.
我假设 AUTO_ACKNOWLEDGE 正是这样做的-在侦听器方法返回结果后确认了该消息.但是根据JMS规范,它有所不同,并且Spring侦听器容器不会像预期的那样尝试更改JMS规范的行为.这是 AbstractMessageListenerContainer 的javadoc必须说的-我强调了重要的句子:
I assumed AUTO_ACKNOWLEDGE does exactly that - acknowledged the message after the listener method returns a result. But according to the JMS specification it is a bit different and Spring listener containers as expected do not try to alter the behavior from the JMS specification. This is what the javadoc of AbstractMessageListenerContainer has to say - I've emphasized the important sentences:
侦听器容器提供以下消息确认 选项:
The listener container offers the following message acknowledgment options:
- 将"sessionAcknowledgeMode"设置为"AUTO_ACKNOWLEDGE"(默认值):在执行侦听器之前自动确认消息;
- 将"sessionAcknowledgeMode"设置为"CLIENT_ACKNOWLEDGE":成功执行侦听器后自动进行消息确认;不 发生异常时重新交付.
- 将"sessionAcknowledgeMode"设置为"DUPS_OK_ACKNOWLEDGE":在侦听器执行期间或之后执行惰性消息确认;潜在的 发生异常时重新交付.
- 将"sessionTransacted"设置为"true":成功执行侦听器后的事务确认;确保在发生异常的情况下重新交付.
- "sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): Automatic message acknowledgment before listener execution; no redelivery in case of exception thrown.
- "sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown.
- "sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": Lazy message acknowledgment during or after listener execution; potential redelivery in case of exception thrown.
- "sessionTransacted" set to "true": Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of exception thrown.
所以我的解决方案的关键是listenerContainer.setSessionTransacted(true);
So the key to my solution is listenerContainer.setSessionTransacted(true);
我面临的另一个问题是,JMS提供程序不断将失败的消息重新传递给在处理消息期间失败的同一使用者.我不知道JMS规范是否规定了提供者在这种情况下应该做什么,但是对我有用的是使用listenerContainer.shutdown();
来断开失败的使用者并允许提供者重新传递消息并给出有机会给另一个消费者.
Another issue I faced was that the JMS provider keeps redelivering the failed message back to the same consumer that had failed during the processing of the message. I don't know if the JMS specification gives a prescription what the provider should do in such situations, but what have worked for me was to use listenerContainer.shutdown();
in order to disconnect the failing consumer and allow the provider to redeliver the message and give a chance to another consumer.
这篇关于如何在AUTO_ACKNOWLEDGE JMS会话方案中模拟消息重新交付?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!