在Spring Integration中处理异常时遇到麻烦 [英] Having trouble handling exceptions in Spring Integration
问题描述
另外,我需要一个单独的异常处理类以使错误消息使其成为错误队列,或者我可以在我的转换方法中抛出异常?
这是我的xml配置:
< beans xmlns =http://www.springframework.org/schema/beans
xmlns:int = $
xmlns:int-amqp =http://www.springframework.org/schema/integration/amqp
xmlns:rabbit = http://www.springframework.org/schema/rabbit
xmlns:xsi =http://www.w3.org/2001/XMLSchema-instance
xmlns:context =http: //www.springframework.org/schema/context
xsi:schemaLocation =http://www.springframework.org/schema/beans
http://www.springframework.org/schema/ beans / spring-beans.xsd
ht tp://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/模式/集成/ amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework .ORG /模式/兔/弹簧rabbit.xsd>
< rabbit:connection-factory id =connectionFactoryhost =bigdata-rdpusername =myuserpassword =mypass/>
< rabbit:template id =amqpTemplateconnection-factory =connectionFactory/>
< rabbit:admin connection-factory =connectionFactory/>
< rabbit:queue name =firstauto-delete =falsedurable =true/>
< rabbit:queue name =secondauto-delete =falsedurable =true/>
< rabbit:queue name =errorQueueauto-delete =falsedurable =true/>
< int:poller default =truefixed-rate =100/>
< rabbit:fanout-exchange name =second-exchangeauto-delete =truedurable =true>
< rabbit:bindings>
< rabbit:binding queue =second/>
< / rabbit:bindings>
< / rabbit:fanout-exchange>
< rabbit:fanout-exchange name =error-exchangeauto-delete =truedurable =true>
< rabbit:bindings>
< rabbit:binding queue =errorQueue/>
< / rabbit:bindings>
< / rabbit:fanout-exchange>
< int-amqp:outbound-channel-adapter channel =messageOutputChannelexchange-name =second-exchangeamqp-template =amqpTemplate/>
< int-amqp:inbound-channel-adapter channel =messageInputChannelerror-channel =errorInputChannelqueue-names =firstconnection-factory =connectionFactoryconcurrent-consumers = 20/>
< int-amqp:outbound-channel-adapter channel =errorOutputChannelexchange-name =error-exchangeamqp-template =amqpTemplate/>
< int:channel id =messageInputChannel/>
< int:channel id =messageOutputChannel/>
< int:channel id =errorInputChannel/>
< int:service-activator input-channel =errorInputChanneloutput-channel =errorOutputChannelmethod =handleError>
< bean class =firstAttempt.MessageErrorHandler/>
< int:chain input-channel =messageInputChanneloutput-channel =messageOutputChannel>
< int:header-enrichher>
< int:error-channel ref =errorInputChannel/>
< / int:header-enrichher>
< int:transformer method =convert>
< bean class =firstAttempt.JsonObjectConverter/>
< / int:transformer>
< int:service-activator method =transform>
< bean class =firstAttempt.Transformer/>
< / int:service-activator>
< int:object-to-string-transformer />
< / int:chain>
< / beans>
错误类:
public class ErrorHandler {
public String errorHandle(MessageHandlingException exception){
return exception.getMessage();
QualityScorer类(由变压器调用):
public class QualityScorer {
private Hashtable< String,String>表;
private final static String csvFile =C:\\Users\\john\\Test.csv;
public QualityScorer()throws Exception {
table = new Hashtable< String,String>();
initializeTable();
}
private void initializeTable()throws异常{
BufferedReader br = null;
String line =;
String cvsSplitBy =,;
try {
br = new BufferedReader(new FileReader(csvFile));
while((line = br.readLine())!= null){
String [] data = line.split(cvsSplitBy);
if(data.length> 6&& data [1] .equals(1)&& data [4] .equals(0)&& data [5] .equals(1))
table.putIfAbsent(data [3],data [1]);
}
} catch(FileNotFoundException e){
throw new Exception(No file found);
} catch(IOException e){
e.printStackTrace();
} finally {
if(br!= null){
try {
br.close();
} catch(IOException e){
e.printStackTrace();
}
}
}
}
public float getScore(JSONObject object)throws Exception {
float score;
if(object == null){
throw new IllegalArgumentException(object);
}
if(!object.has(source)){
throw new Exception(Object does not have a source);
}
如果(!object.has(雇主)){
抛出新的异常(对象没有雇主);
}
String source = object.getString(Source);
String employer = object.getString(employer);
if(table.containsKey(employer)&!source.equals(packageOne)){
score = 1;
} else {
score = -1;
}
返回分数;
}
}
现在,正在加载的邮件没有源,所以程序应该将MessagingException抛出到MessageErrorHandler中。
变形金刚代码:
public class Transformer {
私人QualityScorer qualityScorer;
public Transformer()throws Exception {
qualityScorer = new QualityScorer();
}
public JSONObject transform(JSONObject object)throws Exception {
float score = qualityScorer.getScore(object);
object.put(score,score);
返回对象;
}
}
所有在一起,程序应该收到一个预加载来自队列的消息,将其转换并发送到第二个队列,如果在预加载的消息中提供源,则该队列成功完成。我试图处理错误,使它们被发送到作为消息头的错误队列。这个问题一直让我感到沮丧,所以非常感谢!
stacktrace中当前显示的错误是:
java.lang.NoSuchMethodError:org.springframework.messaging.MessageHandlingException:方法< init>(Lorg / springframework / messaging / Message; Ljava / lang / Throwable;)V在org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
$ org.springframework.integration.handler.MethodInvokingMessageProcessor.pro $ b org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) .springframework.integration.handler.MessageHandlerChain $ 1.send(MessageHandlerChain.java:129)
在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
在org.springframework .messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
在org.springframework.integration.handler .AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal (AbstractReplyProducingMessageHandler.java:115)
在org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.j ava:127)
在org.springframework.integration.handler.MessageHandlerChain $ 1.send(MessageHandlerChain.java:129)
在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114 )
在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
在org。 springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.j ava:115)
在org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
在org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110)
(在org.springframework.integration.handler.AbstractMessageHandler.handleMessage在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
在org.springframework .integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
在org.springframework.integration.channel .AbstractMess ageChannel.send(AbstractMessageChannel.java:373)
在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
在org.springframework.messaging.core.GenericMessagingTemplate.doSend( GenericMessagingTemplate.java:44)
在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
在org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java: 188)
在org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access $ 1100(AmqpInboundChannelAdapter.java:56)
在org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter $ Listener.processMessage(AmqpInboundChannelAdapter .java:246)
在org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter $ Listener.onMessage(AmqpInboundChannelAdapter.java:203)
在org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListene r(AbstractMessageListenerContainer.java:822)
在org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer。访问$ 001(SimpleMessageListenerContainer.java:97)
在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer $ 1.invokeListener(SimpleMessageListenerContainer.java:189)
在org.springframework.amqp.rabbit.listener。 SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
在org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
在org.springframework.amqp.rabbit.listener。 SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContain er.java:1189)
在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access $ 1500(SimpleMessageListenerContainer.java:97)
在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer $ AsyncMessageProcessingConsumer .run(SimpleMessageListenerContainer.java:1421)
在java.lang.Thread.run(Thread.java:748)
但是没有什么是错误队列。
当抛出异常时,它被包装连同 requestMessage
到 MessagingException
。您自己的业务例外是在的原因
中,您可以从中访问
属性。 requestMessage
MessagingException.failedMessage
所以,看起来你有一切你需要的用例。
只有在发送到错误交换
之前,您真的应该有一些< transformer>
在错误流程中将 MessagingException
正确转换为正确的消息发送到AMQP。
I'm new to spring integration and am confused about how to send error messages to a designated error queue. I want the error message to be a header on the original message and end up in a separate queue. I read that this can be done with a header enricher, which I tried to implement but nothing is showing up in the error queue.
Also, do I need a separate exception handling class in order for the error messages to make it to the error queue or can I just throw exceptions in my transforming methods?
Here is my xml config:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="first" auto-delete="false" durable="true" />
<rabbit:queue name="second" auto-delete="false" durable="true" />
<rabbit:queue name="errorQueue" auto-delete="false" durable="true" />
<int:poller default="true" fixed-rate="100"/>
<rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="second" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="errorQueue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange" amqp-template="amqpTemplate" />
<int-amqp:inbound-channel-adapter channel="messageInputChannel" error-channel="errorInputChannel" queue-names="first" connection-factory="connectionFactory" concurrent-consumers="20" />
<int-amqp:outbound-channel-adapter channel="errorOutputChannel" exchange-name="error-exchange" amqp-template="amqpTemplate" />
<int:channel id="messageInputChannel" />
<int:channel id="messageOutputChannel"/>
<int:channel id="errorInputChannel"/>
<int:service-activator input-channel="errorInputChannel" output-channel= "errorOutputChannel" method = "handleError" >
<bean class="firstAttempt.MessageErrorHandler"/>
<int:chain input-channel="messageInputChannel" output-channel="messageOutputChannel">
<int:header-enricher>
<int:error-channel ref="errorInputChannel" />
</int:header-enricher>
<int:transformer method = "convert" >
<bean class="firstAttempt.JsonObjectConverter" />
</int:transformer>
<int:service-activator method="transform">
<bean class="firstAttempt.Transformer" />
</int:service-activator>
<int:object-to-string-transformer />
</int:chain>
</beans>
Error Class:
public class ErrorHandler {
public String errorHandle(MessageHandlingException exception) {
return exception.getMessage();
QualityScorer class (called by transformer):
public class QualityScorer {
private Hashtable<String, String> table;
private final static String csvFile = "C:\\Users\\john\\Test.csv";
public QualityScorer() throws Exception {
table = new Hashtable<String, String>();
initializeTable();
}
private void initializeTable() throws Exception {
BufferedReader br = null;
String line = "";
String cvsSplitBy = ",";
try {
br = new BufferedReader(new FileReader(csvFile));
while ((line = br.readLine()) != null) {
String[] data = line.split(cvsSplitBy);
if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1"))
table.putIfAbsent(data[3], data[1]);
}
} catch (FileNotFoundException e) {
throw new Exception("No file found");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public float getScore(JSONObject object) throws Exception {
float score;
if (object == null) {
throw new IllegalArgumentException("object");
}
if (!object.has("source")) {
throw new Exception("Object does not have a source");
}
if (!object.has("employer")) {
throw new Exception("Object does not have an employer");
}
String source = object.getString("Source");
String employer = object.getString("employer");
if (table.containsKey(employer) && !source.equals("packageOne")) {
score = 1;
} else {
score = -1;
}
return score;
}
}
Right now, the message being loaded has no source, so the program should be throwing the MessagingException to the MessageErrorHandler.
Transformer code:
public class Transformer {
private QualityScorer qualityScorer;
public Transformer() throws Exception {
qualityScorer = new QualityScorer();
}
public JSONObject transform(JSONObject object) throws Exception {
float score = qualityScorer.getScore(object);
object.put("score", score);
return object;
}
}
All together, the program should receive a pre-loaded message from a queue, transform it and send it on to a second queue, which it does successfully if the source is provided in the pre-loaded message. I'm trying to handle errors and make it so they are sent to an error queue as a message header. This issue has been frustrating me for awhile, so help is greatly appreciated!
The error currently being shown in the stacktrace is:
java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.lang.Thread.run(Thread.java:748)
But nothing is going to the error queue.
When the exception is thrown, it is wrapped together with the requestMessage
to the MessagingException
. Your own business exception is in the cause
and you can get access to the requestMessage
from the MessagingException.failedMessage
property.
So, it looks like you have everything you need for your use-case.
Only the problem that before sending to the error-exchange
you really should have some <transformer>
in the error flow to properly convert that MessagingException
to the proper message to send to the AMQP.
这篇关于在Spring Integration中处理异常时遇到麻烦的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!