如何使用 spring 批量分区在所有服务器上只执行一次分区步骤? [英] How to execute some partition step on all servers only once using spring batch partitioning?

查看:23
本文介绍了如何使用 spring 批量分区在所有服务器上只执行一次分区步骤?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 spring 批处理分区.我阅读交换表单文件并为每个交换做一些处理.

I am using spring batch partitioning. I read exchanges form files and do some processing for each exchange.

交换分布在 4 个服务器上,以使用 Spring 批处理分区进行并行处理.

exchanges are distributed over 4 servers to do parallel processing using spring batch partitioning.

我的第一步是准备带有交换 ID 的输入文件.我需要在所有服务器上读取这些 ID.

I have first step which prepares input files with exchange ids. I need to read these ids on all servers.

有没有办法只在所有服务器上运行第一步以在所有服务器上准备输入文件?

Is there any way to run first step on all servers only once to prepare input files on all servers ?

我尝试通过设置网格大小 = 4(服务器数量)和消费者并发为 1,以便在每个服务器上只有 1 个消费者应该监听步骤执行请求.

I tried by setting grid size = 4 (number of servers) and consumer concurrency 1 so that on each server only 1 consumer should listen to step execution request.

问题是,超过 1 个请求由 1 个消费者处理,因此步骤在某些服务器上运行不止一次,因此不会在其他服务器上运行.结果是一些服务器上没有准备数据,其他步骤失败.

The problem is, more that 1 request are handled by 1 consumer so steps run more than once on some servers and so does't run on other servers. The result is data is not prepared on some servers and other steps gets failed.

如何确保该步骤仅在所有服务器上运行一次?

How can I make sure the step runs on all servers only once ?

下面是配置

导入作业的第一步是 prepareExchangeListJob,应该按照上面的说明工作,第二步 importExchanges 是正常的分区作业.并且在importExchanges之后还有很多步骤是正常的分区步骤.

Import job which has prepareExchangeListJob as first step which should work as explained above and second step importExchanges which is normal partition job. And after importExchanges there are many more steps which are normal partition steps.

<job id="importJob">
    <step id="import.prepareExchangesListStep" next="import.importExchangesStep">
        <job ref="prepareExchangesListJob" />
    </step>
    <step id="import.importExchangesStep">
        <job ref="importExchangesJob" />
        <listeners>
            <listener ref="importExchangesStepNotifier" />
        </listeners>
    </step>
</job>

PrepareExchangeList 作业,请注意网格大小 = 4(服务器数量)和消费者并发 = 1,以便该步骤应仅在每个服务器上执行一次,以准备所有服务器上的输入数据(交换).

PrepareExchangeList job, please note the grid size= 4 (number of servers) and consumer concurrency = 1 so that the step should exectute only once on each server to prepare input data (exchanges) on all servers.

<rabbit:template id="prepareExchangesListAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="prepareExchangesListQueue"
    reply-timeout="${prepare.exchanges.list.step.timeout}">
</rabbit:template>

<int:channel id="prepareExchangesListOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="prepareExchangesListInboundStagingChannel" />

<amqp:outbound-gateway request-channel="prepareExchangesListOutboundChannel"
    reply-channel="prepareExchangesListInboundStagingChannel"
    amqp-template="prepareExchangesListAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="prepareExchangesListMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="prepareExchangesListOutboundChannel"
    p:receiveTimeout="${prepare.exchanges.list.step.timeout}" />


<beans:bean id="prepareExchangesListPartitioner"
    class="org.springframework.batch.core.partition.support.SimplePartitioner"
    scope="step" />


<beans:bean id="prepareExchangesListPartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="prepareExchangesListStep" p:gridSize="${prepare.exchanges.list.grid.size}"
    p:messagingOperations-ref="prepareExchangesListMessagingTemplate" />

<int:aggregator ref="prepareExchangesListPartitionHandler"
    send-partial-result-on-expiry="true"
    send-timeout="${prepare.exchanges.list.step.timeout}"
    input-channel="prepareExchangesListInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="1"
    request-channel="prepareExchangesListInboundChannel" reply-channel="prepareExchangesListOutboundStagingChannel"
    queue-names="prepareExchangesListQueue" connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="prepareExchangesListInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="prepareExchangesListInboundChannel" output-channel="prepareExchangesListOutboundStagingChannel" />

<int:channel id="prepareExchangesListOutboundStagingChannel" />

<beans:bean id="prepareExchangesFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="classpath:primary_markets.txt"
    p:lineMapper-ref="stLineMapper" scope="step" />


<beans:bean id="prepareExchangesItemWriter"
    class="com.st.batch.foundation.writers.PrepareExchangesItemWriter"
    p:dirPath="${spring.tmp.batch.dir}/#{jobParameters[batch_id]}" p:numberOfFiles="4" 
    p:symfony-ref="symfonyStepScoped" scope="step" />


<step id="prepareExchangesListStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="prepareExchangesFileItemReader" writer="prepareExchangesItemWriter" commit-interval="${prepare.exchanges.commit.interval}"/>
    </tasklet>
</step>

<job id="prepareExchangesListJob" restartable="true">
    <step id="prepareExchangesListStep.master">
        <partition partitioner="prepareExchangesListPartitioner"
            handler="prepareExchangesListPartitionHandler" />
    </step>
</job>

导入交换作业

<rabbit:template id="importExchangesAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="importExchangesQueue"
    reply-timeout="${import.exchanges.partition.timeout}">
</rabbit:template>

<int:channel id="importExchangesOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="importExchangesInboundStagingChannel" />

<amqp:outbound-gateway request-channel="importExchangesOutboundChannel"
    reply-channel="importExchangesInboundStagingChannel" amqp-template="importExchangesAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesOutboundChannel"
    p:receiveTimeout="${import.exchanges.partition.timeout}" />


<beans:bean id="importExchangesPartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="importExchangesStep" p:gridSize="${import.exchanges.grid.size}"
    p:messagingOperations-ref="importExchangesMessagingTemplate" />

<int:aggregator ref="importExchangesPartitionHandler"
    send-partial-result-on-expiry="true"
    send-timeout="${import.exchanges.step.timeout}"
    input-channel="importExchangesInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="${import.exchanges.consumer.concurrency}"
    request-channel="importExchangesInboundChannel" reply-channel="importExchangesOutboundStagingChannel"
    queue-names="importExchangesQueue" connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="importExchangesInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="importExchangesInboundChannel" output-channel="importExchangesOutboundStagingChannel" />

<int:channel id="importExchangesOutboundStagingChannel" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.writers.ImportExchangesAndEclsItemWriter"
    p:symfony-ref="symfonyStepScoped" p:timeout="${import.exchanges.item.timeout}"
    scope="step" />

<beans:bean id="importExchangesPartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges/exchanges_*.txt"
    scope="step" />

<beans:bean id="importExchangesFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
    scope="step" />

<step id="importExchangesStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter" commit-interval="${import.exchanges.commit.interval}"/>
    </tasklet>
</step>

<job id="importExchangesJob" restartable="true">
    <step id="importExchangesStep.master">
        <partition partitioner="importExchangesPartitioner"
            handler="importExchangesPartitionHandler" />
    </step>
</job>

推荐答案

有趣的技巧.

我希望四个分区均匀分布;rabbit 通常会循环分配给竞争消费者 (AFAIK).所以我不确定为什么你没有看到这种行为.

I would expect the four partitions to be distributed evenly; rabbit typically does round robin distribution to competing consumers (AFAIK). So I am not exactly sure why you're not seeing that behavior.

您可以花一些时间尝试弄明白,但它很脆弱,因为您依赖于此;如果其中一个奴隶有网络故障,它的分区将转到其他奴隶之一.最好让每个从站绑定到不同的队列,并通过向(第一个)出站网关添加路由键表达式来显式路由分区...

You could spend some time trying to figure it out, but it's fragile in that you are relying on this; if one of the slaves had a network glitch, its partition would go to one of the others. It would be better to have each slave bind to a different queue and explicitly route the partitions by adding a routing key expression to the (first) outbound gateway...

routing-key-expression="'foo.' + headers['sequenceNumber']"

让slave监听foo.1foo.2等,然后继续使用公共队列进行第二步.

and have the slaves listen on foo.1, foo.2 etc., and continue to use a common queue for the second step.

这假设您使用默认交换器 ("") 并按队列名称路由;如果您有显式绑定,则可以在路由键表达式中使用它们.

This assumes you are using the default exchange ("") and route by queue name; if you have explicit bindings, you would use those in your routing key expression.

PS:提醒一下,如果您的分区需要超过默认的 5 秒才能完成,您需要增加 RabbitTemplate reply-timeout.

PS: As a reminder you need to increase the RabbitTemplate reply-timeout if your partitions take more than the default 5 seconds to complete.

这篇关于如何使用 spring 批量分区在所有服务器上只执行一次分区步骤?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆