消息中存在_AMQ_GROUP_ID,但@JmsListener中的JMSXGroupID为空 [英] _AMQ_GROUP_ID present in message but JMSXGroupID null in @JmsListener
问题描述
此文档中:
消息组中的消息共享相同的组ID,即它们具有相同的组标识符属性(JMS的JMSXGroupID,Apache ActiveMQ Artemis Core API的_AMQ_GROUP_ID).
Messages in a message group share the same group id, i.e. they have same group identifier property (JMSXGroupID for JMS, _AMQ_GROUP_ID for Apache ActiveMQ Artemis Core API).
当我在代理中浏览值为product = paper的消息时,我可以看到为什么最初通过JMSXGroupID
设置的属性变为_AMQ_GROUP_ID
的原因.但是,在我的@JmsListener
带注释的方法中,我可以看到_AMQ_GROUP_ID
属性丢失,并且JMSXGroupID
在消息的headers
哈希图中以null的形式通过.
I can see why the property originally set via JMSXGroupID
becomes _AMQ_GROUP_ID
when I browse the messages in the broker with a value of product=paper. However, In my @JmsListener
annotated method I can see the _AMQ_GROUP_ID
property is missing and the JMSXGroupID
is coming through as null in the Message's headers
hashmap.
@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)
所以
- 在将字符串属性
JMSXGroupID
设置为'product = paper'后,我的Producer应用程序将消息发送到队列. - 当我在Artemis UI中浏览邮件的标题时,我看到
_AMQ_GROUP_ID
的值为'product = paper' - 在调试侦听器应用程序并查看标头的映射时,
_AMQ_GROUP_ID
不存在,并且JMSXGroupID
的值为null而不是'product = paper'.
- My Producer application sends the message to the queue after setting the string property
JMSXGroupID
to 'product=paper' - I can see
_AMQ_GROUP_ID
has a value of 'product=paper' when I browse that message's headers in the Artemis UI - When I debug my listener application and look at the map of headers,
_AMQ_GROUP_ID
is absent andJMSXGroupID
has a value of null instead of 'product=paper'.
字符'='是否无效或是否有其他原因可能导致此错误?我没办法尝试了.
Is the character '=' invalid or is there something else that can cause this? I'm running out of things to try.
使用新代码进行
HeaderMapper:
HeaderMapper:
@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {
@Override
public MessageHeaders toHeaders(Message jmsMessage) {
MessageHeaders messageHeaders = super.toHeaders(jmsMessage);
Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);
try {
messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
} catch (JMSException e) {
e.printStackTrace();
}
// can see while debugging that this returns the correct headers
return new MessageHeaders(messageHeadersMap);
}
}
监听器:
@Component
public class CustomSpringJmsListener {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@JmsListener(destination = "local-queue", subscription = "groupid-example",
containerFactory = "myContainerFactory", concurrency = "15-15")
public void receive(Message message) throws JMSException {
LOG.info("Received message: " + message);
}
}
应用代码:
@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {
private static Logger LOG = LoggerFactory
.getLogger(GroupidApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@Autowired MessageConverter messageConverter;
public static void main(String[] args) {
LOG.info("STARTING THE APPLICATION");
SpringApplication.run(GroupidApplication.class, args);
LOG.info("APPLICATION FINISHED");
}
@Override
public void run(String... args) {
LOG.info("EXECUTING : command line runner");
jmsTemplate.setPubSubDomain(true);
createAndSendObjectMessage("Message1");
createAndSendTextMessage("Message2");
createAndSendTextMessage("Message3");
createAndSendTextMessage("Message4");
createAndSendTextMessage("Message5");
createAndSendTextMessage("Message6");
}
private void createAndSendTextMessage(String messageBody) {
jmsTemplate.send("local-queue", session -> {
Message message = session.createTextMessage(messageBody);
message.setStringProperty("JMSXGroupID", "product=paper");
return message;
});
}
// BEANS
@Bean
public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setSubscriptionDurable(true);
factory.setSubscriptionShared(true);
factory.setMessageConverter(messagingMessageConverter());
return factory;
}
@Bean
public MessagingMessageConverter messagingMessageConverter() {
return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
}
}
调用SimpleJmsHeaderMapper的堆栈跟踪:
Stack trace of where SimpleJmsHeaderMapper is being called:
toHeaders:130,SimpleJmsHeaderMapper(org.springframework.jms.support) toHeaders:57,SimpleJmsHeaderMapper(org.springframework.jms.support) extractHeaders:148,MessagingMessageConverter (org.springframework.jms.support.converter)存取$ 100:466, AbstractAdaptableMessageListener $ MessagingMessageConverterAdapter (org.springframework.jms.listener.adapter)getHeaders:552, AbstractAdaptableMessageListener $ MessagingMessageConverterAdapter $ LazyResolutionMessage (org.springframework.jms.listener.adapter)resolveArgumentInternal:68, HeaderMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:100,AbstractNamedValueMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:117,HandlerMethodArgumentResolverComposite (org.springframework.messaging.handler.invocation) getMethodArgumentValues:148,InvocableHandlerMethod (org.springframework.messaging.handler.invocation)调用:116, InvocableHandlerMethod (org.springframework.messaging.handler.invocation)invokeHandler:114, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter)onMessage:77, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter)doInvokeListener:736, AbstractMessageListenerContainer(org.springframework.jms.listener) invokeListener:696,AbstractMessageListenerContainer (org.springframework.jms.listener)doExecuteListener:674, AbstractMessageListenerContainer(org.springframework.jms.listener) doReceiveAndExecute:318,AbstractPollingMessageListenerContainer (org.springframework.jms.listener)receiveAndExecute:257, AbstractPollingMessageListenerContainer (org.springframework.jms.listener)invokeListener:1190, DefaultMessageListenerContainer $ AsyncMessageListenerInvoker (org.springframework.jms.listener)executeOngoingLoop:1180, DefaultMessageListenerContainer $ AsyncMessageListenerInvoker (org.springframework.jms.listener)运行:1077, DefaultMessageListenerContainer $ AsyncMessageListenerInvoker (org.springframework.jms.listener)运行:748,线程(java.lang)
toHeaders:130, SimpleJmsHeaderMapper (org.springframework.jms.support) toHeaders:57, SimpleJmsHeaderMapper (org.springframework.jms.support) extractHeaders:148, MessagingMessageConverter (org.springframework.jms.support.converter) access$100:466, AbstractAdaptableMessageListener$MessagingMessageConverterAdapter (org.springframework.jms.listener.adapter) getHeaders:552, AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage (org.springframework.jms.listener.adapter) resolveArgumentInternal:68, HeaderMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:100, AbstractNamedValueMethodArgumentResolver (org.springframework.messaging.handler.annotation.support) resolveArgument:117, HandlerMethodArgumentResolverComposite (org.springframework.messaging.handler.invocation) getMethodArgumentValues:148, InvocableHandlerMethod (org.springframework.messaging.handler.invocation) invoke:116, InvocableHandlerMethod (org.springframework.messaging.handler.invocation) invokeHandler:114, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) onMessage:77, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) doInvokeListener:736, AbstractMessageListenerContainer (org.springframework.jms.listener) invokeListener:696, AbstractMessageListenerContainer (org.springframework.jms.listener) doExecuteListener:674, AbstractMessageListenerContainer (org.springframework.jms.listener) doReceiveAndExecute:318, AbstractPollingMessageListenerContainer (org.springframework.jms.listener) receiveAndExecute:257, AbstractPollingMessageListenerContainer (org.springframework.jms.listener) invokeListener:1190, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) executeOngoingLoop:1180, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) run:1077, DefaultMessageListenerContainer$AsyncMessageListenerInvoker (org.springframework.jms.listener) run:748, Thread (java.lang)
推荐答案
尝试将SimpleJmsHeaderMapper
子类化并覆盖toHeaders()
.调用super.toHeaders()
,从结果中创建一个新的Map<>
; put()
您要添加到地图中的所有其他标题,然后从地图中返回新的MessageHeaders
.
Try subclassing the SimpleJmsHeaderMapper
and override toHeaders()
. Call super.toHeaders()
, create a new Map<>
from the result; put()
any additional headers you want into the map and return a new MessageHeaders
from the map.
将自定义映射器传递到新的MessagingMessageConverter
中,并将其传递到容器工厂中.
Pass the custom mapper into a new MessagingMessageConverter
and pass that into the container factory.
如果您使用的是Spring Boot,只需将转换器添加为@Bean
,boot就会自动将其连接到工厂.
If you are using Spring Boot, simply add the converter as a @Bean
and boot will auto wire it into the factory.
编辑
这一切;我只是写了一个应用程序,对我来说一切正常,无需任何自定义...
After all this; I just wrote an app and it works just fine for me without any customization at all...
@SpringBootApplication
public class So58399905Application {
public static void main(String[] args) {
SpringApplication.run(So58399905Application.class, args);
}
@JmsListener(destination = "foo")
public void listen(String in, MessageHeaders headers) {
System.out.println(in + headers);
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> template.convertAndSend("foo", "bar", msg -> {
msg.setStringProperty("JMSXGroupID", "product=x");
return msg;
});
}
}
和
bar{jms_redelivered=false, JMSXGroupID=product=x, jms_deliveryMode=2, JMSXDeliveryCount=1, ...
EDIT2
这是artemis客户端中的一个错误-仅当使用2.6.4(引导2.1.9)时,getStringProperty()
在获取JMSXGroupID
时会返回_AMQ_GROUP_ID
属性的值.
It's a bug in the artemis client - with 2.6.4 (Boot 2.1.9) only getStringProperty()
returns the value of the _AMQ_GROUP_ID
property when getting JMSXGroupID
.
映射器使用getObjectProperty()
返回空值.使用2.10.1客户端;消息正确地从getObjectProperty()
返回_AMQ_GROUP_ID
属性的值.
The mapper uses getObjectProperty()
which returned null. With the 2.10.1 client; the message properly returns the value of the _AMQ_GROUP_ID
property from getObjectProperty()
.
这篇关于消息中存在_AMQ_GROUP_ID,但@JmsListener中的JMSXGroupID为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!