消息中存在_AMQ_GROUP_ID,但@JmsListener中的JMSXGroupID为空 [英] _AMQ_GROUP_ID present in message but JMSXGroupID null in @JmsListener

查看:107
本文介绍了消息中存在_AMQ_GROUP_ID,但@JmsListener中的JMSXGroupID为空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此文档中:

消息组中的消息共享相同的组ID,即它们具有相同的组标识符属性(JMS的JMSXGroupID,Apache ActiveMQ Artemis Core API的_AMQ_GROUP_ID).

Messages in a message group share the same group id, i.e. they have same group identifier property (JMSXGroupID for JMS, _AMQ_GROUP_ID for Apache ActiveMQ Artemis Core API).

当我在代理中浏览值为product = paper的消息时,我可以看到为什么最初通过JMSXGroupID设置的属性变为_AMQ_GROUP_ID的原因.但是,在我的@JmsListener带注释的方法中,我可以看到_AMQ_GROUP_ID属性丢失,并且JMSXGroupID在消息的headers哈希图中以null的形式通过.

I can see why the property originally set via JMSXGroupID becomes _AMQ_GROUP_ID when I browse the messages in the broker with a value of product=paper. However, In my @JmsListener annotated method I can see the _AMQ_GROUP_ID property is missing and the JMSXGroupID is coming through as null in the Message's headers hashmap.

@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
            containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)

所以

  1. 在将字符串属性JMSXGroupID设置为'product = paper'后,我的Producer应用程序将消息发送到队列.
  2. 当我在Artemis UI中浏览邮件的标题时,我看到_AMQ_GROUP_ID的值为'product = paper'
  3. 在调试侦听器应用程序并查看标头的映射时,_AMQ_GROUP_ID不存在,并且JMSXGroupID的值为null而不是'product = paper'.
  1. My Producer application sends the message to the queue after setting the string property JMSXGroupID to 'product=paper'
  2. I can see _AMQ_GROUP_ID has a value of 'product=paper' when I browse that message's headers in the Artemis UI
  3. When I debug my listener application and look at the map of headers, _AMQ_GROUP_ID is absent and JMSXGroupID has a value of null instead of 'product=paper'.

字符'='是否无效或是否有其他原因可能导致此错误?我没办法尝试了.

Is the character '=' invalid or is there something else that can cause this? I'm running out of things to try.

使用新代码进行

HeaderMapper:

HeaderMapper:

@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {

    @Override
    public MessageHeaders toHeaders(Message jmsMessage) {

        MessageHeaders messageHeaders = super.toHeaders(jmsMessage);

        Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);

        try {
            messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
        } catch (JMSException e) {
            e.printStackTrace();
        }

        // can see while debugging that this returns the correct headers
        return new MessageHeaders(messageHeadersMap);
    }
}

监听器:

@Component
public class CustomSpringJmsListener {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    @JmsListener(destination = "local-queue", subscription = "groupid-example",
            containerFactory = "myContainerFactory", concurrency = "15-15")
    public void receive(Message message) throws JMSException {
        LOG.info("Received message: " + message);
    }
}

应用代码:

@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {

    private static Logger LOG = LoggerFactory
            .getLogger(GroupidApplication.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired MessageConverter messageConverter;

    public static void main(String[] args) {
        LOG.info("STARTING THE APPLICATION");
        SpringApplication.run(GroupidApplication.class, args);

        LOG.info("APPLICATION FINISHED");
    }

    @Override
    public void run(String... args) {
        LOG.info("EXECUTING : command line runner");

        jmsTemplate.setPubSubDomain(true);

        createAndSendObjectMessage("Message1");
        createAndSendTextMessage("Message2");
        createAndSendTextMessage("Message3");
        createAndSendTextMessage("Message4");
        createAndSendTextMessage("Message5");
        createAndSendTextMessage("Message6");
    }

    private void createAndSendTextMessage(String messageBody) {
        jmsTemplate.send("local-queue", session -> {
            Message message = session.createTextMessage(messageBody);

            message.setStringProperty("JMSXGroupID", "product=paper");

            return message;
        });
    }

    // BEANS

    @Bean
    public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        // You could still override some of Boot's default if necessary.
        factory.setSubscriptionDurable(true);
        factory.setSubscriptionShared(true);
        factory.setMessageConverter(messagingMessageConverter());

        return factory;
    }

    @Bean
    public MessagingMessageConverter messagingMessageConverter() {
        return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
    }
}

调用SimpleJmsHeaderMapper的堆栈跟踪:

Stack trace of where SimpleJmsHeaderMapper is being called:

toHeaders:130,SimpleJmsHeaderMapper(org.springframework.jms.support) toHeaders:57,SimpleJmsHeaderMapper(org.springframework.jms.support) extractHeaders:148,MessagingMessageConverter (org.springframework.jms.support.converter)存取$ 100:466, AbstractAdaptableMessageListener $ MessagingMessageConverterAdapter (org.springframework.jms.listener.adapter)getHeaders:552, AbstractAdaptableMessageListener $ MessagingMessageConverterAdapter $ LazyResolutionMessage (org.springframework.jms.listener.adapter)resolveArgumentInternal:68, HeaderMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:100,AbstractNamedValueMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:117,HandlerMethodArgumentResolverComposite (org.springframework.messaging.handler.invocation) getMethodArgumentValues:148,InvocableHandlerMethod (org.springframework.messaging.handler.invocation)调用:116, InvocableHandlerMethod (org.springframework.messaging.handler.invocation)invokeHandler:114, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter)onMessage:77, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter)doInvokeListener:736, AbstractMessageListenerContainer(org.springframework.jms.listener) invokeListener:696,AbstractMessageListenerContainer (org.springframework.jms.listener)doExecuteListener:674, AbstractMessageListenerContainer(org.springframework.jms.listener) doReceiveAndExecute:318,AbstractPollingMessageListenerContainer (org.springframework.jms.listener)receiveAndExecute:257, AbstractPollingMessageListenerContainer (org.springframework.jms.listener)invokeListener:1190, DefaultMessageListenerContainer $ AsyncMessageListenerInvoker (org.springframework.jms.listener)executeOngoingLoop:1180, DefaultMessageListenerContainer $ AsyncMessageListenerInvoker (org.springframework.jms.listener)运行:1077, DefaultMessageListenerContainer $ AsyncMessageListenerInvoker (org.springframework.jms.listener)运行:748,线程(java.lang)

toHeaders:130, SimpleJmsHeaderMapper (org.springframework.jms.support) toHeaders:57, SimpleJmsHeaderMapper (org.springframework.jms.support) extractHeaders:148, MessagingMessageConverter (org.springframework.jms.support.converter) access$100:466, AbstractAdaptableMessageListener$MessagingMessageConverterAdapter (org.springframework.jms.listener.adapter) getHeaders:552, AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage (org.springframework.jms.listener.adapter) resolveArgumentInternal:68, HeaderMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:100, AbstractNamedValueMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:117, HandlerMethodArgumentResolverComposite (org.springframework.messaging.handler.invocation) getMethodArgumentValues:148, InvocableHandlerMethod (org.springframework.messaging.handler.invocation) invoke:116, InvocableHandlerMethod (org.springframework.messaging.handler.invocation) invokeHandler:114, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) onMessage:77, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) doInvokeListener:736, AbstractMessageListenerContainer (org.springframework.jms.listener) invokeListener:696, AbstractMessageListenerContainer (org.springframework.jms.listener) doExecuteListener:674, AbstractMessageListenerContainer (org.springframework.jms.listener) doReceiveAndExecute:318, AbstractPollingMessageListenerContainer (org.springframework.jms.listener) receiveAndExecute:257, AbstractPollingMessageListenerContainer (org.springframework.jms.listener) invokeListener:1190, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) executeOngoingLoop:1180, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) run:1077, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) run:748, Thread (java.lang)

推荐答案

尝试将SimpleJmsHeaderMapper子类化并覆盖toHeaders().调用super.toHeaders(),从结果中创建一个新的Map<>put()您要添加到地图中的所有其他标题,然后从地图中返回新的MessageHeaders.

Try subclassing the SimpleJmsHeaderMapper and override toHeaders(). Call super.toHeaders(), create a new Map<> from the result; put() any additional headers you want into the map and return a new MessageHeaders from the map.

将自定义映射器传递到新的MessagingMessageConverter中,并将其传递到容器工厂中.

Pass the custom mapper into a new MessagingMessageConverter and pass that into the container factory.

如果您使用的是Spring Boot,只需将转换器添加为@Bean,boot就会自动将其连接到工厂.

If you are using Spring Boot, simply add the converter as a @Bean and boot will auto wire it into the factory.

编辑

这一切;我只是写了一个应用程序,对我来说一切正常,无需任何自定义...

After all this; I just wrote an app and it works just fine for me without any customization at all...

@SpringBootApplication
public class So58399905Application {

    public static void main(String[] args) {
        SpringApplication.run(So58399905Application.class, args);
    }

    @JmsListener(destination = "foo")
    public void listen(String in, MessageHeaders headers) {
        System.out.println(in + headers);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> template.convertAndSend("foo", "bar", msg -> {
            msg.setStringProperty("JMSXGroupID", "product=x");
            return msg;
        });
    }

}

bar{jms_redelivered=false, JMSXGroupID=product=x, jms_deliveryMode=2, JMSXDeliveryCount=1,  ...

EDIT2

这是artemis客户端中的一个错误-仅当使用2.6.4(引导2.1.9)时,getStringProperty()在获取JMSXGroupID时会返回_AMQ_GROUP_ID属性的值.

It's a bug in the artemis client - with 2.6.4 (Boot 2.1.9) only getStringProperty() returns the value of the _AMQ_GROUP_ID property when getting JMSXGroupID.

映射器使用getObjectProperty()返回空值.使用2.10.1客户端;消息正确地从getObjectProperty()返回_AMQ_GROUP_ID属性的值.

The mapper uses getObjectProperty() which returned null. With the 2.10.1 client; the message properly returns the value of the _AMQ_GROUP_ID property from getObjectProperty().

这篇关于消息中存在_AMQ_GROUP_ID,但@JmsListener中的JMSXGroupID为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆