DefaultMessageListenerContainer没有缩放 [英] DefaultMessageListenerContainer not scaling

查看:436
本文介绍了DefaultMessageListenerContainer没有缩放的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个DefaultMessageListenerContainer,(在我看来)没有扩展。 Container被定义为侦听队列,其中有100条消息。

I have a DefaultMessageListenerContainer, which is (in my opinion) not scaling up. The Container is defined to listen on a queue, where 100 messages are located in.

我希望,Container可以使用任何长度,消息将是尽可能快地消耗(通过观察maxConcurrentConsumers配置)。所以我认为,有7个并发的消费者。 (在容器启动时由2个并发消费者开始)一些记录信息:

I would expect, that the Container is going to any lengths, that the messages would be consumed as fast as it is possible (by observing the maxConcurrentConsumers configuration). So i would assume, that there are 7 concurrentConsumers. (beginning by 2 concurrentConsumers at container-startup) Some logging-information:

activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7

我的Spring-config(其中一部分):

<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
    <property name="connectionFactory" ref="jmscfCee" />
    <property name="maxConcurrentConsumers" value="7"/>
    <property name="receiveTimeout" value="100000" />
    <property name="concurrentConsumers" value="2" />
</bean>

<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
    <property name="destinationName" value="MY.QUEUE" />
    <property name="messageListener" ref="myMessageListener" />
</bean>

<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>

我的记录容器

public class LoggingListenerContainer extends DefaultMessageListenerContainer{

private static final Logger logger = Logger
        .getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
        throws JMSException {

    logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
    logger.info("concurrentConsumers: " +  this.getConcurrentConsumers());
    logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
    logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
    logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
    logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
    super.doInvokeListener(listener, message);
}

我的听众课程:

public class ListenerClass implements MessageListener {


    public void onMessage(Message msg) {
           //Do some business function
    }

}

可能有人如此友好地纠正我的配置或给我一些关于我的配置的tipps或解释我的容器的方法? (如果我误解了某些内容)

Could someone be so kind to correct my configuration or give me some tipps concerning my configuration or explain me the approach of the Container? (if i had misunderstood something)

我在本地使用ActiveMQ进行测试(使用WebSphere MQ进行生产) - 如果它与可伸缩性主题相关。

I'm locally testing with ActiveMQ (in Production with WebSphere MQ) - if it's relevant for scalability topics.

编辑:

<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>${jmscfCee.hostName}</value>
        </property>
</bean>

<bean id="jmscfCeeCachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory ">
    <constructor-arg ref="jmscfCee" />
    <property name="sessionCacheSize" value="10" />
</bean>


推荐答案

这取决于。几年前,我在ActiveMQ上遇到了类似的问题,其默认行为极大地偏向于大量(数千)小消息。默认情况下,每个消费者都会预先获取1000个批量的消息,因此如果您有少量消息,您可能会发现它们都已经在一个消费者的预取缓冲区中结束,而其他消费者则闲置。

It depends. I had a similar issue with ActiveMQ a few years back, whereby its default behaviour is heavily opimized towards high volumes (many thousands) of small messages. By default each consumer will pre-fetch messages in batches of 1000, so if you have small numbers of messages you'll probably find they have all ended up in the pre-fetch buffer of one consumer, leaving the other consumers idle.

您可以使用预取策略,可以是连接URI,也可以是Spring配置,如果这就是你建立连接工厂的方式。

You can tune this behaviour using a prefetch policy, either on the connection URI or in the Spring configuration if that's how you're building your connection factory.

<amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost">
  <property name="prefetchPolicy">
    <amq:prefetchPolicy all="1" />
  </property>
</amq:connectionFactory>

我当时使用的ActiveMQ版本不支持预取限制为0(即不要预取,只是每次都去经纪人)但是文档建议现在允许这样做。

The version of ActiveMQ I was using at the time didn't support a prefetch limit of 0 (i.e. don't prefetch, just go to the broker every time) but the documentation suggests that this is now allowed.

这篇关于DefaultMessageListenerContainer没有缩放的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆