从m子将消息存储在rabbitmq中 [英] to store a message in rabbitmq from mule
问题描述
下面是我的流程,我正在尝试将salesforce批次信息存储到Rabbit MQ中的队列中
Below is my flow and i'm trying to store my salesforce batch information into a queue in rabbit mq
<flow name="foreachsimilar_pmFlow1" doc:name="foreachsimilar_pmFlow1">
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP"/>
<sfdc:create-job config-ref="Salesforce1" type="HRISASI__c" operation="insert" doc:name="Salesforce"/>
<set-variable variableName="batchID" value="1" doc:name="Variable"/>
<set-property propertyName="jobInfo" value="#[payload]" doc:name="Property"/>
<set-variable variableName="jobId" value="#[payload.id]" doc:name="Variable"/>
<jdbc-ee:outbound-endpoint exchange-pattern="request-response" queryKey="id" queryTimeout="-1" connector-ref="Database1" doc:name="Database"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[payload.collect { it.EmpId }.collate(3).collect { [min: it[0], max: it[-1]] }
]]></scripting:script>
</scripting:transformer>
<foreach doc:name="For Each">
<set-variable variableName="bulkPayload" value="#[groovy: return[];]" doc:name="bulkPayload EmptyArray"/>
<set-variable variableName="IDs" value="#[groovy:return[];]" doc:name="Id EmptyArray"/>
<set-variable variableName="jdbdinsbatch" value="#[groovy: return[];]" doc:name="Variable"/>
<jdbc-ee:outbound-endpoint exchange-pattern="request-response" queryTimeout="-1" connector-ref="Database1" doc:name="Database" queryKey="all"/>
<foreach doc:name="For Each">
<set-variable variableName="empId" value="#[payload['EmpId']]" doc:name="Variable"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
<data-mapper:transform config-ref="map_to_hrisasi__c" doc:name="Map To HRISASI__c"/>
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[payloadMap = payload[0];
IDs = flowVars['IDs'];
IDs.add( [ EmpId: flowVars['EmpId'] ] );
flowVars['IDs'] = IDs;
jdbdinsbatch= flowVars['jdbdinsbatch'];
jdbdinsbatch.add( [ EmpId: flowVars['EmpId'], batchID: flowVars['batchID'] ] );
flowVars['jdbdinsbatch'] = jdbdinsbatch;
return [ payloadMap ];]]></scripting:script>
</scripting:transformer>
<set-variable variableName="bulkPayload" value="#[groovy: bulkPayload = flowVars['bulkPayload']; bulkPayload.add(payload[0]); return bulkPayload;]" doc:name="bulkPayload"/>
</foreach>
<set-payload value="#[flowVars['bulkPayload']]" doc:name="Set Payload"/>
<sfdc:create-batch config-ref="Salesforce1" doc:name="Salesforce">
<sfdc:job-info ref="#[message.outboundProperties['jobInfo']]"/>
<sfdc:objects ref="#[payload]"/>
</sfdc:create-batch>
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[return [ batch: payload, IDs: flowVars['IDs'], batchid: flowVars['batchID'] ]]]></scripting:script>
</scripting:transformer>
<amqp:outbound-endpoint exchangeName="Salesforce-Batch" queueName="batchInfo" exchangeDurable="true" queueDurable="true" responseTimeout="10000" doc:name="AMQP"/>
</foreach>
<flow-ref name="foreachsimilar_pmFlow2" doc:name="Flow Reference"/>
</flow>
抛出以下异常
Message : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=amqp://Salesforce-Batch/amqp-queue.batchInfo, connector=AmqpConnector
{
name=AMQP_Connector
lifecycle=start
this=33982399
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}
, name='endpoint.amqp.Salesforce.Batch.amqp.queue.batchInfo', mep=ONE_WAY, properties={queueDurable=true, exchangeDurable=true}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: AmqpMessage
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. invalid value in table (java.lang.IllegalArgumentException)
com.rabbitmq.client.impl.Frame:306 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=amqp://Salesforce-Batch/amqp-queue.batchInfo, connector=AmqpConnector
{
name=AMQP_Connector
lifecycle=start
this=33982399
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}
, name='endpoint.amqp.Salesforce.Batch.amqp.queue.batchInfo', mep=ONE_WAY, properties={queueDurable=true, exchangeDurable=true}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: AmqpMessage (org.mule.api.transport.DispatchException)
org.mule.transport.AbstractMessageDispatcher:109 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.IllegalArgumentException: invalid value in table
at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306)
at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246)
at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
请问一下为什么会抛出上述异常,并且应该如何解决以解决过去两天发生的异常
Could somebody please tell as why is the above exception thrown and what should be done to resolve it struck with this for the past two days
谢谢.
下面是启用详细异常日志记录后的堆栈跟踪
below is the stack trace after enabling verbose exception logging
********************************************************************************
Root Exception stack trace:
java.lang.IllegalArgumentException: invalid value in table
at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306)
at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246)
at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120)
at com.rabbitmq.client.impl.ContentHeaderPropertyWriter.writeTable(ContentHeaderPropertyWriter.java:98)
at com.rabbitmq.client.AMQP$BasicProperties.writePropertiesTo(AMQP.java:1782)
at com.rabbitmq.client.impl.AMQContentHeader.writeTo(AMQContentHeader.java:51)
at com.rabbitmq.client.impl.AMQContentHeader.toFrame(AMQContentHeader.java:78)
at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:106)
at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:636)
at org.mule.transport.amqp.AmqpMessageDispatcher$OutboundAction$1.run(AmqpMessageDispatcher.java:55)
at org.mule.transport.amqp.AmqpMessageDispatcher.doOutboundAction(AmqpMessageDispatcher.java:172)
at org.mule.transport.amqp.AmqpMessageDispatcher.doDispatch(AmqpMessageDispatcher.java:127)
at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:99)
at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2627)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:101)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.endpoint.outbound.OutboundResponsePropertiesMessageProcessor.process(OutboundResponsePropertiesMessageProcessor.java:39)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:50)
at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:47)
at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:20)
at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:58)
at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:48)
at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:54)
at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:44)
at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(I...
********************************************************************************
推荐答案
您要发送给Rabbit的有效负载看起来非常复杂(sfdc BatchInfo对象).除了队列消费者可以理解的以外,您还如何?我认为,由于某种原因,将有效负载转换为适合AMQP传输的字节数组所涉及的消息转换器失败了,您最终尝试将带有sfdc BatchInfo对象的常规生成的Map发送给AMQP客户端库.
The payload you're trying to send to Rabbit looks quite complex (sfdc BatchInfo object). How do you except it to be readable for the queue consumer? I think that, for some reason, the message transformers involved in transforming this payload to a byte array suited for AMQP transport fails and you end up trying to send the groovy generated Map with an sfdc BatchInfo object as is to the AMQP client lib.
在将地图内容发送到Rabbit之前,应将其内容转换为字符串. 这样的事情(在BatchInfo对象上调用toString,然后将映射转换为JSON):
You should convert the map with contents to a string before sending it to Rabbit. Something like this (call toString on the BatchInfo object and then convert the map to JSON):
已编辑(已删除出站属性)
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[return [ batch: payload.toString(), IDs: flowVars['IDs'], batchid: flowVars['batchID'] ]]]></scripting:script>
</scripting:transformer>
<json:object-to-json-transformer doc:name="Object to JSON"/>
<amqp:outbound-endpoint exchangeName="Salesforce-Batch" queueName="batchInfo" exchangeDurable="true" queueDurable="true" responseTimeout="10000" doc:name="AMQP">
<message-properties-transformer scope="outbound">
<delete-message-property key="*" />
</message-properties-transformer>
</amqp:outbound-endpoint>
这篇关于从m子将消息存储在rabbitmq中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!