远程分区 - 奴隶变得贪婪 [英] Remote partition - slave getting greedy

查看:168
本文介绍了远程分区 - 奴隶变得贪婪的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是我们正在努力实现的目标。

Following is what we are trying to achieve.

我们希望在不同的vms中并行地在数据库中暂存一个大的xml文件。为实现这一目标,我们正在使用可扩展的Spring批量远程分区方法,我们遇到了一些问题。以下是高级别设置

We want a big xml file to be staged in a database parallely in different vms. To achieve this, we are using the scalable spring batch remote partition approach and we are running into some issues. Following is the high level setup


  • master - 将xml文件拆分为多个分区(我们目前的网格大小为3)。

  • slave 1 - 处理分区(读取基于索引的分区并写入DB)

  • slave 2 - 处理分区

  • master - splits an xml file into multiple partitions ( we currently have a grid size of 3).
  • slave 1 - processing partitions (reads index based partitions and writes to DB)
  • slave 2 - processing partitions

我们在Linux中使用活动MQ 5.15.3运行它。

We are running it in Linux and with active MQ 5.15.3.

使用上述设置


  • slave 1同时处理2个分区

  • slave 1 is processing 2 partitions at the same time

slave 2正在处理1个分区。

slave 2 is processing 1 partition .

主人不等待所有奴隶完成并进入未知状态。

The master is not waiting for all the slaves to complete and is going to an unknown state.

org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)

如果我们有网格大小为2然后奴隶1正在挑选它们,而奴隶2没有得到它们中的任何一个(因此贪婪的奴隶)。从属1正在并行处理它们,但作业将进入未知状态。

if we have the grid size to be 2 then slave 1 is picking both of them and slave 2 is not getting any of them (Hence greedy slave). Slave 1 is processing them in parallel but the job is going to a unknown state.

以下是我们的问题

  • How can we prevent the slave 1 from processing two partitions in parallel - we tried setting the prefetch to be 0 and 1 [a link]http://activemq.apache.org/what-is-the-prefetch-limit-for.html [a link] but that did not work. How can we get the slave to process one partition at a time?
  • Why is the master not waiting for all the slaves?

以下是我们的配置

 <?xml version="1.0" encoding="UTF-8"?>
    <step id="remotePartitionStagingStep">
     <partition partitioner="xmlPartitioner" handler="partitionStagingHandler"/>
       <listeners>
         <listener ref="jobListener" />
       </listeners>
     </step>
     <!-- XML Partitioner starts here -->
     <beans:bean id="xmlPartitioner" class="XMLPartitioner" scope="step">           <beans:property name="partitionThreadName" value="ImportXMLPartition-"/>
      <beans:property name="resource" value="file:///#{jobParameters[ImportFileProcessed]}"/>
            <beans:property name="rootElementNode" value="#{jobParameters[ImportFragmentRootNameWithPrefix]}"/>
        </beans:bean>
       <beans:bean id="partitionStagingHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
            <beans:property name="stepName" value="slave.ImportStagingStep"/>
            <beans:property name="gridSize" value="3"/>
            <beans:property name="replyChannel" ref="aggregatedReplyChannel"/>
            <beans:property name="jobExplorer" ref="jobExplorer"/>
            <beans:property name="messagingOperations">
                <beans:bean class="org.springframework.integration.core.MessagingTemplate">
                    <beans:property name="defaultChannel" ref="requestsChannel"/>
                    <beans:property name="receiveTimeout" value="${batch.gateway.receiveTimeout}" />  //360000 is the current value
                </beans:bean>
            </beans:property>
        </beans:bean>
         <int:aggregator ref="partitionStagingHandler"
                        input-channel="replyChannel"
                        output-channel="aggregatedReplyChannel"
                        send-timeout="${batch.gateway.receiveTimeout}"
                        expire-groups-upon-timeout="true"/>
         <int:channel id="requestsChannel">
            <int:interceptors>
                <int:wire-tap channel="logChannel"/>
            </int:interceptors>
        </int:channel>
         <int-jms:outbound-channel-adapter connection-factory="connectionFactory"
                                          channel="requestsChannel"
                                          destination-name="requestsQueue"/>
        <int:channel id="aggregatedReplyChannel">
            <int:queue/>
        </int:channel>
         <int:channel id="replyChannel">
            <int:interceptors>
                <int:wire-tap channel="logChannel"/>
            </int:interceptors>
        </int:channel>
         <int-jms:message-driven-channel-adapter connection-factory="connectionFactory"
                                                channel="replyChannel"
                                                error-channel="errorChannel"
                                                destination-name="replyQueue"/>



从属配置



Slave configuration

    <step id="slave.ImportStagingStep">
    <tasklet transaction-manager="transactionManager">
    <chunk reader="StagingSpecificItemReader" processor="chainedStagingProcessor" writer="StagingItemWriter"
             commit-interval="${import.CommitInterval}" skip-limit="${import.skipLimit}" retry-policy="neverRetryPolicy">
     <streams>
      <stream ref="errorFlatFileRecordWriter"/>
     </streams>
     <skippable-exception-classes>
       <include class="java.lang.Exception"/>
       <exclude class="org.springframework.oxm.UnmarshallingFailureException"/>
       <exclude class="java.lang.Error"/>
     </skippable-exception-classes>
     </chunk>
     <listeners>
        <listener ref="stepExceptionListener"/>
        <listener ref="stagingListener"/>
     </listeners>
       </tasklet>
       <listeners>
          <listener ref="slaveStepExecutionListener"/>
       </listeners>
      </step>
        <beans:bean id="slaveStepExecutionListener" class="StepExecutionListener"></beans:bean>
        <!-- JMS config for staging step starts here -->
        <int:channel id="replyChannel">
            <int:interceptors>
                <int:wire-tap channel="logChannel"/>
            </int:interceptors>
        </int:channel>
            <int:channel id="requestChannel">
            <int:interceptors>
                <int:wire-tap channel="logChannel"/>
            </int:interceptors>
        </int:channel>
        <int-jms:message-driven-channel-adapter connection-factory="connectionFactory"
                                                destination-name="requestsQueue"
                                                error-channel="errorChannel"
                                                channel="requestsChannel"/>

        <int-jms:outbound-channel-adapter connection-factory="connectionFactory"
                                          destination-name="replyQueue"
                                          channel="replyChannel"/>

        <int:service-activator input-channel="requestsChannel"
                               output-channel="replyChannel"
                               ref="stepExecutionRequestHandler"/>

        <!-- JMS config for staging step ends here -->

        <!-- The logChannel is configured as an interceptor to channels so that messages are logged. -->
        <int:logging-channel-adapter auto-startup="true" log-full-message="true" id="logChannel" level="INFO"/>

        <int:channel id="errorChannel" />

        <int:service-activator input-channel="errorChannel" method="handleException">
          <beans:bean class="ErrorHandler" />
        </int:service-activator>


推荐答案

发现我们遇到的问题。

Found the issue that we were having.

奴隶包括弹簧批次作业定义两次导致每个奴隶的消费者数量被登记两次。我认为当我们在从属设备上同时运行两个分区时,我们缺少一些配置来将消息从从设备路由回主设备,所以只要我们有一个分区转到一个从设备(我们是多线程的奴隶)我们没事。

The slave was including the spring batch job definition twice that caused the number of consumers to be registered twice for each slave. I think that we are missing some configuration to route messages back from the slave to the master when we have two partitions running concurrently on the slave correctly so at this point as long as we have one partition going to one slave (we are multithreading on the slave ) we are fine.

这篇关于远程分区 - 奴隶变得贪婪的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆