Spring Batch远程分块不会将消息排队,而是在本地运行? [英] spring batch remote chunking doesn't queue messages but runs it locally?
问题描述
以下是我的Spring Batch远程分块配置.我的步骤在本地运行,而不是在远程运行.我在rabbitmq中看不到消息.
Below is my configuration for spring batch remote chunking. My steps are running locally instead of remotely. I cant see messages in rabbitmq.
<beans:bean id="importExchangesChunkItemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>
<beans:bean id="importExchangesChunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>
<job id="importExchangesJob" restartable="true">
<step id="importExchangesStep" next="importEclsStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
commit-interval="${import.exchanges.commit.interval}" />
</tasklet>
</step>
</job>
<beans:bean id="passThroughItemProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" />
<rabbit:connection-factory id="connectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}"
password="${rabbitmq.password}" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin id="rmqAdmin" connection-factory="connectionFactory" />
<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />
<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" />
<beans:bean id="importExchangesMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />
<amqp:outbound-channel-adapter id="importExchangesOutboundAdapter"
channel="importExchangesChannel" />
<amqp:inbound-channel-adapter id="importExchangesInboundAdapter"
connection-factory="connectionFactory" channel="importExchangesReplyChannel"
queue-names="${import.exchanges.reply.queue}" />
<amqp:inbound-channel-adapter id="importExchangesSlaveInboundAdapter"
connection-factory="connectionFactory" channel="importExchangesChannel"
queue-names="${import.exchanges.queue}" />
<amqp:outbound-channel-adapter id="importExchangesSlaveOutboundAdapter"
channel="importExchangesReplyChannel" />
<int:service-activator id="serviceActivatorExchanges"
input-channel="importExchangesChannel" output-channel="importExchangesReplyChannel"
ref="chunkProcessorChunkHandlerExchanges" method="handleChunk" />
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter" p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"/>
<beans:bean id="chunkProcessorExchanges"
class="org.springframework.batch.core.step.item.SimpleChunkProcessor"
p:itemWriter-ref="importExchangesItemWriter" p:itemProcessor-ref="passThroughItemProcessor"/>
<beans:bean id="chunkProcessorChunkHandlerExchanges"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler"
p:chunkProcessor-ref="chunkProcessorExchanges" />
已将配置更改为此,现在它一次将单个消息排队,并且不处理多个消息(应该处理的消息数量等于侦听器并发性).
Changed configuration to this, now it queue single message at a time and doesn't process multiple (should process number of messages equal to listener concurrency).
<beans:bean id="simpleThreadScope"
class="org.springframework.context.support.SimpleThreadScope" />
<util:map id="scopesMap">
<beans:entry key="thread" value-ref="simpleThreadScope" />
</util:map>
<beans:bean
class="org.springframework.beans.factory.config.CustomScopeConfigurer"
p:scopes-ref="scopesMap" />
<int:channel id="importExchangesChannel" />
<int:channel id="importExchangesReplyChannel" scope="thread">
<int:queue />
</int:channel>
<beans:bean id="importExchangesMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importExchangesChannel" p:receiveTimeout="${import.exchanges.reply.timeout}" />
<amqp:outbound-channel-adapter
amqp-template="amqpTemplate" channel="importExchangesChannel"
exchange-name="${import.exchanges.exchange}" routing-key="${import.exchanges.routing.key}" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="${import.exchanges.listener.concurrency}"
requeue-rejected="false" prefetch="1">
<rabbit:listener queues="${import.exchanges.queue}"
ref="importExchangesChunkHandler" method="handleChunk" />
</rabbit:listener-container>
<int:channel id="importEclsChannel" />
<int:channel id="importEclsReplyChannel" scope="thread">
<int:queue />
</int:channel>
<beans:bean id="importEclsMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importEclsChannel" p:receiveTimeout="${import.ecls.reply.timeout}" />
<amqp:outbound-channel-adapter
amqp-template="amqpTemplate" channel="importEclsChannel"
exchange-name="${import.ecls.exchange}" routing-key="${import.ecls.routing.key}" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="${import.ecls.listener.concurrency}"
requeue-rejected="false" prefetch="1">
<rabbit:listener queues="${import.ecls.queue}"
ref="importEclsChunkHandler" method="handleChunk" />
</rabbit:listener-container>
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter"
p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}" />
<beans:bean id="importExchangesChunkItemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step" p:messagingOperations-ref="importExchangesMessagingTemplate"
p:replyChannel-ref="importExchangesReplyChannel">
</beans:bean>
<beans:bean id="importExchangesChunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean"
p:chunkWriter-ref="importExchangesChunkItemWriter" p:step-ref="importExchangesStep">
</beans:bean>
<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />
<rabbit:direct-exchange name="${import.exchanges.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${import.exchanges.queue}"
key="${import.exchanges.routing.key}" />
</rabbit:bindings>
</rabbit:direct-exchange>
我一次只能在队列中看到1条消息.我应该发送消息= $ {import.exchanges.commit.interval},所有消息都应由并发侦听器接收并进行并行处理.
I can see only 1 message in queue at a time. I should sent messages = ${import.exchanges.commit.interval} and all should be picked up by concurrent listeners and processed parallely.
推荐答案
我不确定本地运行"是什么意思,但是出站适配器上没有任何路由信息.如果Rabbit不知道如何路由消息,他只会丢弃它们.
I am not sure what you mean by "running locally" but you don't have any routing information on the outbound adapters; if rabbit doesn't know how to route messages, he simply drops them.
您需要将routing-key="${import.exchanges.queue}"
和routing-key="${import.exchanges.reply.queue}"
添加到适配器.这将使用默认交换("),其中使用队列的名称绑定队列.
You need to add routing-key="${import.exchanges.queue}"
and routing-key="${import.exchanges.reply.queue}"
to the adapters. This will use the default exchange ("") where the queues are bound using their names.
此外,您不能在两侧使用相同的频道名称(importExchangesChannel
).这样,出站适配器和服务激活器都将被订阅,并且消息将以循环方式进行分发.
Also, you can't use the same channel name on both sides (importExchangesChannel
). That way, the outbound adapter and service activator will both be subscribed and messages will be distributed in round-robin fashion.
因此,一些块将在本地运行;其他由于路由密钥问题而被丢弃.
So, some chunks will run locally; others will be dropped because of the routing key problem.
您需要修复路由密钥,并在服务端使用其他渠道.
You need to fix the routing key and use a different channel on service side.
这篇关于Spring Batch远程分块不会将消息排队,而是在本地运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!