Spring Rabbitlistner停止使用注释语法监听队列 [英] Spring rabbitlistner stop listening to queue using annotation syntax

查看:124
本文介绍了Spring Rabbitlistner停止使用注释语法监听队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我和一位同事正在使用Spring开发一个应用程序,该应用程序需要从RabbitMQ队列中获取消息.这样做的目的是使用(通常是出色的)spring注释系统来使代码易于理解.我们让系统使用@RabbitListner批注工作,但我们希望按需获取消息. @RabbitListner批注不会执行此操作,它仅在可用时接收消息.需求由客户端的就绪"确定,即客户端应从队列停止列表中获取"消息并处理该消息.然后确定是否准备好接收新的队列并重新连接到队列.

A colleague and I are working on an application using Spring which needs to get a message from a RabbitMQ queue. The idea is to do this using (the usually excellent) spring annotation system to make the code easy to understand. We have the system working using the @RabbitListner annotation but we want to get a message on demand. The @RabbitListner annotation does not do this, it just receives messages when they are available. The demand is determined by the "readiness" of the client i.e. a client should "get" a message from te queue stop listing and process the message. Then determine if it is ready to receive a new one and reconnect to the queue.

我们一直在考虑仅使用spring-amqp/spring-rabbit模块来手工完成此操作,虽然这可能是可行的,但我们确实很想使用spring来执行此操作.经过数小时的搜索并浏览了文档,我们仍然找不到答案.

We have been looking into doing this by hand just using the spring-amqp/spring-rabbit modules and while this is probably possible we would really like to do this using spring. After many hours of searching and going through the documentation, we have not been able to find an answer.

这是我们目前拥有的接收代码:

Here is the recieving code we currently have:

@RabbitListener(queues = "jobRequests")
public class Receiver {

@Autowired
private JobProcessor jobProcessor;

@RabbitHandler
public void receive(Job job) throws InterruptedException, IOException {
    System.out.println(" [x] Received '" + job + "'");
    jobProcessor.processJob(job);
}

}

工作处理器:

@Service
public class JobProcessor {

@Autowired
private RabbitTemplate rabbitTemplate;

public boolean processJob(Job job) throws InterruptedException, IOException {
    rabbitTemplate.convertAndSend("jobResponses", job);

    System.out.println(" [x] Processing job: " + job);

    rabbitTemplate.convertAndSend("processedJobs", job);

    return true;
}

}

换句话说,当接收方接收到作业时,它应该停止侦听新作业,并等待作业处理程序完成,然后开始列出新消息.

In other words, when the job is received by the Receiver it should stop listening for new jobs and wait for the job processor to be done and then start listing for new messages.

我们已经重新创建了空指针异常,这是我们用来从服务器端发送的代码.

We have re-created the null pointer exception here is the code we use to send from the server side.

@Controller
public class MainController {

@Autowired
RabbitTemplate rabbitTemplate;

@Autowired
private Queue jobRequests;

@RequestMapping("/do-job")
public String doJob() {

    Job job = new Job(new Application(), "henk", 42);

    System.out.println(" [X] Job sent: " + job);

    rabbitTemplate.convertAndSend(jobRequests.getName(), job);

    return "index";
    }
}

然后在客户端接收代码

@Component
public class Receiver {

@Autowired
private JobProcessor jobProcessor;

@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

@RabbitListener(queues = "jobRequests")
public void receive(Job job) throws InterruptedException, IOException, TimeoutException {

    Collection<MessageListenerContainer> messageListenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();

    for (MessageListenerContainer listenerContainer :messageListenerContainers) {
        System.out.println(listenerContainer);
        listenerContainer.stop();
    }

    System.out.println(" [x] Received '" + job + "'");
    jobProcessor.processJob(job);

    for (MessageListenerContainer listenerContainer :messageListenerContainers) {
        listenerContainer.start();
    }
   }
}

以及更新的作业处理器

@Service
public class JobProcessor {

public boolean processJob(Job job) throws InterruptedException, IOException {

    System.out.println(" [x] Processing job: " + job);

    return true;
}

}

还有堆栈跟踪

[x] Received 'Job{application=com.olifarm.application.Application@aaa517, name='henk', id=42}'
[x] Processing job: Job{application=com.olifarm.application.Application@aaa517, name='henk', id=42}
Exception in thread "SimpleAsyncTaskExecutor-1" java.lang.NullPointerException
2015-12-18 11:17:44.494 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.isActive(SimpleMessageListenerContainer.java:838)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:93)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1301)
    at java.lang.Thread.run(Thread.java:745)
 WARN 325899 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

java.lang.NullPointerException: null
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.isActive(SimpleMessageListenerContainer.java:838) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_91]

停止侦听器的工作正常,我们确实收到了新工作,但是当尝试再次启动它时,将抛出NPE.我们检查了RabbitMQ日志,发现该连接已关闭约2秒钟,然后自动重新打开,即使我们将线程置于作业处理器中的睡眠状态也是如此.这可能是问题的根源吗?该错误不会破坏程序,但是在抛出该错误后,接收方仍然可以接收新作业.我们是在这里滥用机制还是这个有效代码?

The stopping of the listener works and we do receive a new job but when it try's to start it again the NPE is thrown. We checked the rabbitMQ log and found that the connection is closed for about 2 seconds and then re-opened automatically even if we put the thread in sleep in the job processor. This might be the source of the problem? The error doesn't break the program however and after it is thrown the receiver is still able to receive new jobs. Are we abusing the mechanism here or is this valid code?

推荐答案

要按需获取消息,通常最好使用rabbitTemplate.receiveAndConvert()而不是侦听器.这样,您就可以完全控制何时接收消息.

To get messages on-demand, it's generally better to use rabbitTemplate.receiveAndConvert() rather than a listener; that way you completely control when you receive messages.

版本1.5开始,您可以将模板配置为在一段时间(或直到收到消息)之前阻止.否则,如果没有消息,它将立即返回null.

Starting with version 1.5 you can configure the template to block for some period of time (or until a message arrives). Otherwise it immediately returns null if there's no message.

该侦听器实际上是为消息驱动的应用程序设计的.

The listener is really designed for message-driven applications.

如果您可以在侦听器中阻止线程,直到作业完成,则将不再传递任何消息-默认情况下,容器只有一个线程.

If you can block the thread in the listener until the job completes, no more messages will be delivered - by default the container has only one thread.

如果由于某种原因在作业完成之前不能阻塞线程,则可以通过从

If you can't block the thread until the job completes, for some reason, you can stop()/start() the listener container by getting a reference to it from the Endpoint Registry.

通常最好将容器停在单独的线程上.

It's generally better to stop the container on a separate thread.

这篇关于Spring Rabbitlistner停止使用注释语法监听队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆