为什么从已填充的队列中调度 Spring AMQP 消息会出现延迟? [英] Why is there a delay in Spring AMQP Message dispatching from a filled Queue?

查看:45
本文介绍了为什么从已填充的队列中调度 Spring AMQP 消息会出现延迟?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的消息驱动应用程序中使用 Spring AMQP.我注意到在我的消息侦听器的调用之间有大约 300 毫秒的几乎恒定的延迟,即使我确定队列中充满了消息.下面的日志文件显示了 BlockingQueueConsumer.nextMessageBlockingQueueConsumer.handle 之间的延迟,并从其间的另一个线程调用了 BlockingQueueConsumer.handleDelivery:>

I am using Spring AMQP in my message driven application. I noticed that there is a nearly constant delay of around 300ms between invocations of my message listener, even though I am sure that the queue is filled with messages. The logfile below shows this delay between BlockingQueueConsumer.nextMessage and BlockingQueueConsumer.handle with a call to BlockingQueueConsumer.handleDelivery from another thread in between:

2015-05-12 12:46:18,655 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,655 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [pool-1-thread-6 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@18dc305(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:16:06 CEST 2015, messageId=143134227498011576, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=8, messageCount=0])
2015-05-12 12:46:18,967 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:18,967 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:18,967 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [pool-1-thread-7 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1aaa7d8(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:17:08 CEST 2015, messageId=143134227498011584, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=9, messageCount=0])
2015-05-12 12:46:19,280 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,280 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:19,280 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [pool-1-thread-3 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1c893d2(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:18:07 CEST 2015, messageId=143134227498011592, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=10, messageCount=0])
2015-05-12 12:46:19,577 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,577 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done

日志文件显示了当队列肯定充满消息时的消息处理.我的 Spring 配置文件的相关部分如下所示:

The logfile shows the message processing when the queue is definately full of messages. The relevant parts of my Spring configuration file looks like this:

<rabbit:connection-factory id="amqpConnectionFactory" connection-factory="clientConnectionFactory"
    host="${amqp.broker.ip}"
    port="${amqp.broker.port}"
    virtual-host="${amqp.broker.vhost}"
    username="${amqp.user}"
    password="${amqp.password}"/>

<bean id="clientConnectionFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:server.ini"/>
</bean>

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="amqpConnectionFactory" />
    <property name="messageConverter" ref="marshallingMessageConverter"/>
</bean>

<bean id="marshallingMessageConverter" class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
    <constructor-arg ref="jaxbMarshaller" />
</bean>

<oxm:jaxb2-marshaller id="jaxbMarshaller" context-path="com.my.package"/>

<rabbit:listener-container id="heartbeatListenerContainer" connection-factory="amqpConnectionFactory" auto-startup="false">
    <rabbit:listener ref="queueMessageHandler" queue-names="heartbeat-bdwh" />
</rabbit:listener-container>

<bean id="queueMessageHandler" class="com.my.package.QueueMessageHandler"/>

我正在努力寻找这种延迟的原因.据我了解,它起源于 Spring BlockingQueueConsumer.我不确定发生了什么以及为什么从另一个线程调用 BlockingQueueConsumer.handleDelivery 方法.

I am struggling to find the reason for this delay. As far as I understand it originates from the Spring BlockingQueueConsumer. I am not sure what is happening and why there is a call from another thread to the BlockingQueueConsumer.handleDelivery method.

非常感谢任何帮助!

推荐答案

可能是网络问题?

默认配置一次处理 1 条消息,并且在发送 ack 之前,broker 不会发送下一条消息.

The default configuration handles 1 message at a time and the next is not sent by the broker until the ack is sent.

尝试增加侦听器容器上的 prefetch 以便容器在消费者线程准备就绪时始终有可用的消息.

Try increasing the prefetch on the listener container so the container always has a message available when the consumer thread is ready.

查看网络跟踪(Wireshark 或类似工具).

Take a look at a network trace (Wireshark or similar).

如果您的网络状况不佳,并且可以忍受重复交付的可能性增加,您还可以考虑增加 txSize 以便不会为每条消息发送 ack.不过,请务必将其设置为小于 prefetch 的值.

If you have a poor network, and can live with the increased possibility of duplicate deliveries, you can also consider increasing the txSize so acks are not sent for every message. Be sure to set it to something less than prefetch, though.

这篇关于为什么从已填充的队列中调度 Spring AMQP 消息会出现延迟?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆