从m子将消息存储在rabbitmq中 [英] to store a message in rabbitmq from mule

查看:79
本文介绍了从m子将消息存储在rabbitmq中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是我的流程,我正在尝试将salesforce批次信息存储到Rabbit MQ中的队列中

Below is my flow and i'm trying to store my salesforce batch information into a queue in rabbit mq

<flow name="foreachsimilar_pmFlow1" doc:name="foreachsimilar_pmFlow1">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP"/>
        <sfdc:create-job config-ref="Salesforce1" type="HRISASI__c" operation="insert" doc:name="Salesforce"/>
        <set-variable variableName="batchID" value="1" doc:name="Variable"/>
        <set-property propertyName="jobInfo" value="#[payload]" doc:name="Property"/>
        <set-variable variableName="jobId" value="#[payload.id]" doc:name="Variable"/>
        <jdbc-ee:outbound-endpoint exchange-pattern="request-response" queryKey="id" queryTimeout="-1" connector-ref="Database1" doc:name="Database"/>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        <scripting:transformer doc:name="Groovy">
            <scripting:script engine="Groovy"><![CDATA[payload.collect { it.EmpId }.collate(3).collect { [min: it[0], max: it[-1]] }
                       ]]></scripting:script>
        </scripting:transformer>
        <foreach doc:name="For Each">
            <set-variable variableName="bulkPayload" value="#[groovy: return[];]" doc:name="bulkPayload EmptyArray"/>
            <set-variable variableName="IDs" value="#[groovy:return[];]" doc:name="Id EmptyArray"/>
            <set-variable variableName="jdbdinsbatch" value="#[groovy: return[];]" doc:name="Variable"/>
            <jdbc-ee:outbound-endpoint exchange-pattern="request-response" queryTimeout="-1" connector-ref="Database1" doc:name="Database" queryKey="all"/>
            <foreach doc:name="For Each">
            <set-variable variableName="empId" value="#[payload['EmpId']]" doc:name="Variable"/>
            <logger message="#[payload]" level="INFO" doc:name="Logger"/>
            <data-mapper:transform config-ref="map_to_hrisasi__c" doc:name="Map To HRISASI__c"/>
            <scripting:transformer doc:name="Groovy">
                <scripting:script engine="Groovy"><![CDATA[payloadMap = payload[0];
IDs = flowVars['IDs'];
IDs.add( [ EmpId: flowVars['EmpId'] ] );
flowVars['IDs'] = IDs;
jdbdinsbatch= flowVars['jdbdinsbatch'];
jdbdinsbatch.add( [ EmpId: flowVars['EmpId'], batchID: flowVars['batchID'] ] );
flowVars['jdbdinsbatch'] = jdbdinsbatch;
return [ payloadMap ];]]></scripting:script>
            </scripting:transformer>
            <set-variable variableName="bulkPayload" value="#[groovy: bulkPayload = flowVars['bulkPayload']; bulkPayload.add(payload[0]); return bulkPayload;]" doc:name="bulkPayload"/>
        </foreach>
        <set-payload value="#[flowVars['bulkPayload']]" doc:name="Set Payload"/>
        <sfdc:create-batch config-ref="Salesforce1"  doc:name="Salesforce">
                <sfdc:job-info ref="#[message.outboundProperties['jobInfo']]"/>
                <sfdc:objects ref="#[payload]"/>
            </sfdc:create-batch>
            <scripting:transformer doc:name="Groovy">
                <scripting:script engine="Groovy"><![CDATA[return [ batch: payload, IDs: flowVars['IDs'], batchid: flowVars['batchID'] ]]]></scripting:script>
            </scripting:transformer>
        <amqp:outbound-endpoint exchangeName="Salesforce-Batch" queueName="batchInfo" exchangeDurable="true" queueDurable="true" responseTimeout="10000" doc:name="AMQP"/>

         </foreach>
        <flow-ref name="foreachsimilar_pmFlow2" doc:name="Flow Reference"/>
    </flow>

抛出以下异常

Message               : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=amqp://Salesforce-Batch/amqp-queue.batchInfo, connector=AmqpConnector
{
  name=AMQP_Connector
  lifecycle=start
  this=33982399
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[amqp]
  serviceOverrides=<none>
}
,  name='endpoint.amqp.Salesforce.Batch.amqp.queue.batchInfo', mep=ONE_WAY, properties={queueDurable=true, exchangeDurable=true}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: AmqpMessage
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. invalid value in table (java.lang.IllegalArgumentException)
  com.rabbitmq.client.impl.Frame:306 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=amqp://Salesforce-Batch/amqp-queue.batchInfo, connector=AmqpConnector
{
  name=AMQP_Connector
  lifecycle=start
  this=33982399
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[amqp]
  serviceOverrides=<none>
}
,  name='endpoint.amqp.Salesforce.Batch.amqp.queue.batchInfo', mep=ONE_WAY, properties={queueDurable=true, exchangeDurable=true}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: AmqpMessage (org.mule.api.transport.DispatchException)
  org.mule.transport.AbstractMessageDispatcher:109 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.IllegalArgumentException: invalid value in table
    at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306)
    at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246)
    at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120)
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)

请问一下为什么会抛出上述异常,并且应该如何解决以解决过去两天发生的异常

Could somebody please tell as why is the above exception thrown and what should be done to resolve it struck with this for the past two days

谢谢.

下面是启用详细异常日志记录后的堆栈跟踪

below is the stack trace after enabling verbose exception logging

********************************************************************************
Root Exception stack trace:
java.lang.IllegalArgumentException: invalid value in table
    at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306)
    at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246)
    at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120)
    at com.rabbitmq.client.impl.ContentHeaderPropertyWriter.writeTable(ContentHeaderPropertyWriter.java:98)
    at com.rabbitmq.client.AMQP$BasicProperties.writePropertiesTo(AMQP.java:1782)
    at com.rabbitmq.client.impl.AMQContentHeader.writeTo(AMQContentHeader.java:51)
    at com.rabbitmq.client.impl.AMQContentHeader.toFrame(AMQContentHeader.java:78)
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:106)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292)
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:636)
    at org.mule.transport.amqp.AmqpMessageDispatcher$OutboundAction$1.run(AmqpMessageDispatcher.java:55)
    at org.mule.transport.amqp.AmqpMessageDispatcher.doOutboundAction(AmqpMessageDispatcher.java:172)
    at org.mule.transport.amqp.AmqpMessageDispatcher.doDispatch(AmqpMessageDispatcher.java:127)
    at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:99)
    at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2627)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:101)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.endpoint.outbound.OutboundResponsePropertiesMessageProcessor.process(OutboundResponsePropertiesMessageProcessor.java:39)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:50)
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:47)
    at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:20)
    at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:58)
    at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:48)
    at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:54)
    at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:44)
    at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(I...
********************************************************************************

推荐答案

您要发送给Rabbit的有效负载看起来非常复杂(sfdc BatchInfo对象).除了队列消费者可以理解的以外,您还如何?我认为,由于某种原因,将有效负载转换为适合AMQP传输的字节数组所涉及的消息转换器失败了,您最终尝试将带有sfdc BatchInfo对象的常规生成的Map发送给AMQP客户端库.

The payload you're trying to send to Rabbit looks quite complex (sfdc BatchInfo object). How do you except it to be readable for the queue consumer? I think that, for some reason, the message transformers involved in transforming this payload to a byte array suited for AMQP transport fails and you end up trying to send the groovy generated Map with an sfdc BatchInfo object as is to the AMQP client lib.

在将地图内容发送到Rabbit之前,应将其内容转换为字符串. 这样的事情(在BatchInfo对象上调用toString,然后将映射转换为JSON):

You should convert the map with contents to a string before sending it to Rabbit. Something like this (call toString on the BatchInfo object and then convert the map to JSON):

已编辑(已删除出站属性)

<scripting:transformer doc:name="Groovy">
    <scripting:script engine="Groovy"><![CDATA[return [ batch: payload.toString(), IDs: flowVars['IDs'], batchid: flowVars['batchID'] ]]]></scripting:script>
</scripting:transformer>
<json:object-to-json-transformer doc:name="Object to JSON"/>
<amqp:outbound-endpoint exchangeName="Salesforce-Batch" queueName="batchInfo" exchangeDurable="true" queueDurable="true" responseTimeout="10000" doc:name="AMQP">
    <message-properties-transformer scope="outbound">
        <delete-message-property key="*" />
    </message-properties-transformer>
</amqp:outbound-endpoint>

这篇关于从m子将消息存储在rabbitmq中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆